1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class DocumentWordProbability implements Type<DocumentWordProbability> {
25 public String document;
26 public byte[] word;
27 public double probability;
28
29 public DocumentWordProbability() {}
30 public DocumentWordProbability(String document, byte[] word, double probability) {
31 this.document = document;
32 this.word = word;
33 this.probability = probability;
34 }
35
36 public String toString() {
37 try {
38 return String.format("%s,%s,%f",
39 document, new String(word, "UTF-8"), probability);
40 } catch(UnsupportedEncodingException e) {
41 throw new RuntimeException("Couldn't convert string to UTF-8.");
42 }
43 }
44
45 public Order<DocumentWordProbability> getOrder(String... spec) {
46 if (Arrays.equals(spec, new String[] { "+document", "+word" })) {
47 return new DocumentWordOrder();
48 }
49 if (Arrays.equals(spec, new String[] { "+word" })) {
50 return new WordOrder();
51 }
52 if (Arrays.equals(spec, new String[] { "+document" })) {
53 return new DocumentOrder();
54 }
55 return null;
56 }
57
58 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentWordProbability> {
59 public void process(DocumentWordProbability object) throws IOException;
60 public void close() throws IOException;
61 }
62 public interface Source extends Step {
63 }
64 public static class DocumentWordOrder implements Order<DocumentWordProbability> {
65 public int hash(DocumentWordProbability object) {
66 int h = 0;
67 h += Utility.hash(object.document);
68 h += Utility.hash(object.word);
69 return h;
70 }
71 public Comparator<DocumentWordProbability> greaterThan() {
72 return new Comparator<DocumentWordProbability>() {
73 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
74 int result = 0;
75 do {
76 result = + Utility.compare(one.document, two.document);
77 if(result != 0) break;
78 result = + Utility.compare(one.word, two.word);
79 if(result != 0) break;
80 } while (false);
81 return -result;
82 }
83 };
84 }
85 public Comparator<DocumentWordProbability> lessThan() {
86 return new Comparator<DocumentWordProbability>() {
87 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
88 int result = 0;
89 do {
90 result = + Utility.compare(one.document, two.document);
91 if(result != 0) break;
92 result = + Utility.compare(one.word, two.word);
93 if(result != 0) break;
94 } while (false);
95 return result;
96 }
97 };
98 }
99 public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input) {
100 return new ShreddedReader(_input);
101 }
102
103 public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input, int bufferSize) {
104 return new ShreddedReader(_input, bufferSize);
105 }
106 public OrderedWriter<DocumentWordProbability> orderedWriter(ArrayOutput _output) {
107 ShreddedWriter w = new ShreddedWriter(_output);
108 return new OrderedWriterClass(w);
109 }
110 public static class OrderedWriterClass extends OrderedWriter< DocumentWordProbability > {
111 DocumentWordProbability last = null;
112 ShreddedWriter shreddedWriter = null;
113
114 public OrderedWriterClass(ShreddedWriter s) {
115 this.shreddedWriter = s;
116 }
117
118 public void process(DocumentWordProbability object) throws IOException {
119 boolean processAll = false;
120 if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
121 if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
122 shreddedWriter.processTuple(object.probability);
123 last = object;
124 }
125
126 public void close() throws IOException {
127 shreddedWriter.close();
128 }
129
130 public Class<DocumentWordProbability> getInputClass() {
131 return DocumentWordProbability.class;
132 }
133 }
134 public ReaderSource<DocumentWordProbability> orderedCombiner(Collection<TypeReader<DocumentWordProbability>> readers, boolean closeOnExit) {
135 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
136
137 for (TypeReader<DocumentWordProbability> reader : readers) {
138 shreddedReaders.add((ShreddedReader)reader);
139 }
140
141 return new ShreddedCombiner(shreddedReaders, closeOnExit);
142 }
143 public DocumentWordProbability clone(DocumentWordProbability object) {
144 DocumentWordProbability result = new DocumentWordProbability();
145 if (object == null) return result;
146 result.document = object.document;
147 result.word = object.word;
148 result.probability = object.probability;
149 return result;
150 }
151 public Class<DocumentWordProbability> getOrderedClass() {
152 return DocumentWordProbability.class;
153 }
154 public String[] getOrderSpec() {
155 return new String[] {"+document", "+word"};
156 }
157
158 public static String getSpecString() {
159 return "+document +word";
160 }
161
162 public interface ShreddedProcessor extends Step {
163 public void processDocument(String document) throws IOException;
164 public void processWord(byte[] word) throws IOException;
165 public void processTuple(double probability) throws IOException;
166 public void close() throws IOException;
167 }
168 public interface ShreddedSource extends Step {
169 }
170
171 public static class ShreddedWriter implements ShreddedProcessor {
172 ArrayOutput output;
173 ShreddedBuffer buffer = new ShreddedBuffer();
174 String lastDocument;
175 byte[] lastWord;
176 boolean lastFlush = false;
177
178 public ShreddedWriter(ArrayOutput output) {
179 this.output = output;
180 }
181
182 public void close() throws IOException {
183 flush();
184 }
185
186 public void processDocument(String document) {
187 lastDocument = document;
188 buffer.processDocument(document);
189 }
190 public void processWord(byte[] word) {
191 lastWord = word;
192 buffer.processWord(word);
193 }
194 public final void processTuple(double probability) throws IOException {
195 if (lastFlush) {
196 if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
197 if(buffer.words.size() == 0) buffer.processWord(lastWord);
198 lastFlush = false;
199 }
200 buffer.processTuple(probability);
201 if (buffer.isFull())
202 flush();
203 }
204 public final void flushTuples(int pauseIndex) throws IOException {
205
206 while (buffer.getReadIndex() < pauseIndex) {
207
208 output.writeDouble(buffer.getProbability());
209 buffer.incrementTuple();
210 }
211 }
212 public final void flushDocument(int pauseIndex) throws IOException {
213 while (buffer.getReadIndex() < pauseIndex) {
214 int nextPause = buffer.getDocumentEndIndex();
215 int count = nextPause - buffer.getReadIndex();
216
217 output.writeString(buffer.getDocument());
218 output.writeInt(count);
219 buffer.incrementDocument();
220
221 flushWord(nextPause);
222 assert nextPause == buffer.getReadIndex();
223 }
224 }
225 public final void flushWord(int pauseIndex) throws IOException {
226 while (buffer.getReadIndex() < pauseIndex) {
227 int nextPause = buffer.getWordEndIndex();
228 int count = nextPause - buffer.getReadIndex();
229
230 output.writeBytes(buffer.getWord());
231 output.writeInt(count);
232 buffer.incrementWord();
233
234 flushTuples(nextPause);
235 assert nextPause == buffer.getReadIndex();
236 }
237 }
238 public void flush() throws IOException {
239 flushDocument(buffer.getWriteIndex());
240 buffer.reset();
241 lastFlush = true;
242 }
243 }
244 public static class ShreddedBuffer {
245 ArrayList<String> documents = new ArrayList();
246 ArrayList<byte[]> words = new ArrayList();
247 ArrayList<Integer> documentTupleIdx = new ArrayList();
248 ArrayList<Integer> wordTupleIdx = new ArrayList();
249 int documentReadIdx = 0;
250 int wordReadIdx = 0;
251
252 double[] probabilitys;
253 int writeTupleIndex = 0;
254 int readTupleIndex = 0;
255 int batchSize;
256
257 public ShreddedBuffer(int batchSize) {
258 this.batchSize = batchSize;
259
260 probabilitys = new double[batchSize];
261 }
262
263 public ShreddedBuffer() {
264 this(10000);
265 }
266
267 public void processDocument(String document) {
268 documents.add(document);
269 documentTupleIdx.add(writeTupleIndex);
270 }
271 public void processWord(byte[] word) {
272 words.add(word);
273 wordTupleIdx.add(writeTupleIndex);
274 }
275 public void processTuple(double probability) {
276 assert documents.size() > 0;
277 assert words.size() > 0;
278 probabilitys[writeTupleIndex] = probability;
279 writeTupleIndex++;
280 }
281 public void resetData() {
282 documents.clear();
283 words.clear();
284 documentTupleIdx.clear();
285 wordTupleIdx.clear();
286 writeTupleIndex = 0;
287 }
288
289 public void resetRead() {
290 readTupleIndex = 0;
291 documentReadIdx = 0;
292 wordReadIdx = 0;
293 }
294
295 public void reset() {
296 resetData();
297 resetRead();
298 }
299 public boolean isFull() {
300 return writeTupleIndex >= batchSize;
301 }
302
303 public boolean isEmpty() {
304 return writeTupleIndex == 0;
305 }
306
307 public boolean isAtEnd() {
308 return readTupleIndex >= writeTupleIndex;
309 }
310 public void incrementDocument() {
311 documentReadIdx++;
312 }
313
314 public void autoIncrementDocument() {
315 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
316 documentReadIdx++;
317 }
318 public void incrementWord() {
319 wordReadIdx++;
320 }
321
322 public void autoIncrementWord() {
323 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
324 wordReadIdx++;
325 }
326 public void incrementTuple() {
327 readTupleIndex++;
328 }
329 public int getDocumentEndIndex() {
330 if ((documentReadIdx+1) >= documentTupleIdx.size())
331 return writeTupleIndex;
332 return documentTupleIdx.get(documentReadIdx+1);
333 }
334
335 public int getWordEndIndex() {
336 if ((wordReadIdx+1) >= wordTupleIdx.size())
337 return writeTupleIndex;
338 return wordTupleIdx.get(wordReadIdx+1);
339 }
340 public int getReadIndex() {
341 return readTupleIndex;
342 }
343
344 public int getWriteIndex() {
345 return writeTupleIndex;
346 }
347 public String getDocument() {
348 assert readTupleIndex < writeTupleIndex;
349 assert documentReadIdx < documents.size();
350
351 return documents.get(documentReadIdx);
352 }
353 public byte[] getWord() {
354 assert readTupleIndex < writeTupleIndex;
355 assert wordReadIdx < words.size();
356
357 return words.get(wordReadIdx);
358 }
359 public double getProbability() {
360 assert readTupleIndex < writeTupleIndex;
361 return probabilitys[readTupleIndex];
362 }
363 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
364 while (getReadIndex() < endIndex) {
365 output.processTuple(getProbability());
366 incrementTuple();
367 }
368 }
369 public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
370 while (getReadIndex() < endIndex) {
371 output.processDocument(getDocument());
372 assert getDocumentEndIndex() <= endIndex;
373 copyUntilIndexWord(getDocumentEndIndex(), output);
374 incrementDocument();
375 }
376 }
377 public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
378 while (getReadIndex() < endIndex) {
379 output.processWord(getWord());
380 assert getWordEndIndex() <= endIndex;
381 copyTuples(getWordEndIndex(), output);
382 incrementWord();
383 }
384 }
385 public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
386 while (!isAtEnd()) {
387 if (other != null) {
388 assert !other.isAtEnd();
389 int c = + Utility.compare(getDocument(), other.getDocument());
390
391 if (c > 0) {
392 break;
393 }
394
395 output.processDocument(getDocument());
396
397 if (c < 0) {
398 copyUntilIndexWord(getDocumentEndIndex(), output);
399 } else if (c == 0) {
400 copyUntilWord(other, output);
401 autoIncrementDocument();
402 break;
403 }
404 } else {
405 output.processDocument(getDocument());
406 copyUntilIndexWord(getDocumentEndIndex(), output);
407 }
408 incrementDocument();
409
410
411 }
412 }
413 public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
414 while (!isAtEnd()) {
415 if (other != null) {
416 assert !other.isAtEnd();
417 int c = + Utility.compare(getWord(), other.getWord());
418
419 if (c > 0) {
420 break;
421 }
422
423 output.processWord(getWord());
424
425 copyTuples(getWordEndIndex(), output);
426 } else {
427 output.processWord(getWord());
428 copyTuples(getWordEndIndex(), output);
429 }
430 incrementWord();
431
432 if (getDocumentEndIndex() <= readTupleIndex)
433 break;
434 }
435 }
436 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
437 copyUntilDocument(other, output);
438 }
439
440 }
441 public static class ShreddedCombiner implements ReaderSource<DocumentWordProbability>, ShreddedSource {
442 public ShreddedProcessor processor;
443 Collection<ShreddedReader> readers;
444 boolean closeOnExit = false;
445 boolean uninitialized = true;
446 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
447
448 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
449 this.readers = readers;
450 this.closeOnExit = closeOnExit;
451 }
452
453 public void setProcessor(Step processor) throws IncompatibleProcessorException {
454 if (processor instanceof ShreddedProcessor) {
455 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
456 } else if (processor instanceof DocumentWordProbability.Processor) {
457 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
458 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
459 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
460 } else {
461 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
462 }
463 }
464
465 public Class<DocumentWordProbability> getOutputClass() {
466 return DocumentWordProbability.class;
467 }
468
469 public void initialize() throws IOException {
470 for (ShreddedReader reader : readers) {
471 reader.fill();
472
473 if (!reader.getBuffer().isAtEnd())
474 queue.add(reader);
475 }
476
477 uninitialized = false;
478 }
479
480 public void run() throws IOException {
481 initialize();
482
483 while (queue.size() > 0) {
484 ShreddedReader top = queue.poll();
485 ShreddedReader next = null;
486 ShreddedBuffer nextBuffer = null;
487
488 assert !top.getBuffer().isAtEnd();
489
490 if (queue.size() > 0) {
491 next = queue.peek();
492 nextBuffer = next.getBuffer();
493 assert !nextBuffer.isAtEnd();
494 }
495
496 top.getBuffer().copyUntil(nextBuffer, processor);
497 if (top.getBuffer().isAtEnd())
498 top.fill();
499
500 if (!top.getBuffer().isAtEnd())
501 queue.add(top);
502 }
503
504 if (closeOnExit)
505 processor.close();
506 }
507
508 public DocumentWordProbability read() throws IOException {
509 if (uninitialized)
510 initialize();
511
512 DocumentWordProbability result = null;
513
514 while (queue.size() > 0) {
515 ShreddedReader top = queue.poll();
516 result = top.read();
517
518 if (result != null) {
519 if (top.getBuffer().isAtEnd())
520 top.fill();
521
522 queue.offer(top);
523 break;
524 }
525 }
526
527 return result;
528 }
529 }
530 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordProbability>, ShreddedSource {
531 public ShreddedProcessor processor;
532 ShreddedBuffer buffer;
533 DocumentWordProbability last = new DocumentWordProbability();
534 long updateDocumentCount = -1;
535 long updateWordCount = -1;
536 long tupleCount = 0;
537 long bufferStartCount = 0;
538 ArrayInput input;
539
540 public ShreddedReader(ArrayInput input) {
541 this.input = input;
542 this.buffer = new ShreddedBuffer();
543 }
544
545 public ShreddedReader(ArrayInput input, int bufferSize) {
546 this.input = input;
547 this.buffer = new ShreddedBuffer(bufferSize);
548 }
549
550 public final int compareTo(ShreddedReader other) {
551 ShreddedBuffer otherBuffer = other.getBuffer();
552
553 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
554 return 0;
555 } else if (buffer.isAtEnd()) {
556 return -1;
557 } else if (otherBuffer.isAtEnd()) {
558 return 1;
559 }
560
561 int result = 0;
562 do {
563 result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
564 if(result != 0) break;
565 result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
566 if(result != 0) break;
567 } while (false);
568
569 return result;
570 }
571
572 public final ShreddedBuffer getBuffer() {
573 return buffer;
574 }
575
576 public final DocumentWordProbability read() throws IOException {
577 if (buffer.isAtEnd()) {
578 fill();
579
580 if (buffer.isAtEnd()) {
581 return null;
582 }
583 }
584
585 assert !buffer.isAtEnd();
586 DocumentWordProbability result = new DocumentWordProbability();
587
588 result.document = buffer.getDocument();
589 result.word = buffer.getWord();
590 result.probability = buffer.getProbability();
591
592 buffer.incrementTuple();
593 buffer.autoIncrementDocument();
594 buffer.autoIncrementWord();
595
596 return result;
597 }
598
599 public final void fill() throws IOException {
600 try {
601 buffer.reset();
602
603 if (tupleCount != 0) {
604
605 if(updateDocumentCount - tupleCount > 0) {
606 buffer.documents.add(last.document);
607 buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
608 }
609 if(updateWordCount - tupleCount > 0) {
610 buffer.words.add(last.word);
611 buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
612 }
613 bufferStartCount = tupleCount;
614 }
615
616 while (!buffer.isFull()) {
617 updateWord();
618 buffer.processTuple(input.readDouble());
619 tupleCount++;
620 }
621 } catch(EOFException e) {}
622 }
623
624 public final void updateDocument() throws IOException {
625 if (updateDocumentCount > tupleCount)
626 return;
627
628 last.document = input.readString();
629 updateDocumentCount = tupleCount + input.readInt();
630
631 buffer.processDocument(last.document);
632 }
633 public final void updateWord() throws IOException {
634 if (updateWordCount > tupleCount)
635 return;
636
637 updateDocument();
638 last.word = input.readBytes();
639 updateWordCount = tupleCount + input.readInt();
640
641 buffer.processWord(last.word);
642 }
643
644 public void run() throws IOException {
645 while (true) {
646 fill();
647
648 if (buffer.isAtEnd())
649 break;
650
651 buffer.copyUntil(null, processor);
652 }
653 processor.close();
654 }
655
656 public void setProcessor(Step processor) throws IncompatibleProcessorException {
657 if (processor instanceof ShreddedProcessor) {
658 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
659 } else if (processor instanceof DocumentWordProbability.Processor) {
660 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
661 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
662 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
663 } else {
664 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
665 }
666 }
667
668 public Class<DocumentWordProbability> getOutputClass() {
669 return DocumentWordProbability.class;
670 }
671 }
672
673 public static class DuplicateEliminator implements ShreddedProcessor {
674 public ShreddedProcessor processor;
675 DocumentWordProbability last = new DocumentWordProbability();
676 boolean documentProcess = true;
677 boolean wordProcess = true;
678
679 public DuplicateEliminator() {}
680 public DuplicateEliminator(ShreddedProcessor processor) {
681 this.processor = processor;
682 }
683
684 public void setShreddedProcessor(ShreddedProcessor processor) {
685 this.processor = processor;
686 }
687
688 public void processDocument(String document) throws IOException {
689 if (documentProcess || Utility.compare(document, last.document) != 0) {
690 last.document = document;
691 processor.processDocument(document);
692 resetWord();
693 documentProcess = false;
694 }
695 }
696 public void processWord(byte[] word) throws IOException {
697 if (wordProcess || Utility.compare(word, last.word) != 0) {
698 last.word = word;
699 processor.processWord(word);
700 wordProcess = false;
701 }
702 }
703
704 public void resetDocument() {
705 documentProcess = true;
706 resetWord();
707 }
708 public void resetWord() {
709 wordProcess = true;
710 }
711
712 public void processTuple(double probability) throws IOException {
713 processor.processTuple(probability);
714 }
715
716 public void close() throws IOException {
717 processor.close();
718 }
719 }
720 public static class TupleUnshredder implements ShreddedProcessor {
721 DocumentWordProbability last = new DocumentWordProbability();
722 public org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor;
723
724 public TupleUnshredder(DocumentWordProbability.Processor processor) {
725 this.processor = processor;
726 }
727
728 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor) {
729 this.processor = processor;
730 }
731
732 public DocumentWordProbability clone(DocumentWordProbability object) {
733 DocumentWordProbability result = new DocumentWordProbability();
734 if (object == null) return result;
735 result.document = object.document;
736 result.word = object.word;
737 result.probability = object.probability;
738 return result;
739 }
740
741 public void processDocument(String document) throws IOException {
742 last.document = document;
743 }
744
745 public void processWord(byte[] word) throws IOException {
746 last.word = word;
747 }
748
749
750 public void processTuple(double probability) throws IOException {
751 last.probability = probability;
752 processor.process(clone(last));
753 }
754
755 public void close() throws IOException {
756 processor.close();
757 }
758 }
759 public static class TupleShredder implements Processor {
760 DocumentWordProbability last = new DocumentWordProbability();
761 public ShreddedProcessor processor;
762
763 public TupleShredder(ShreddedProcessor processor) {
764 this.processor = processor;
765 }
766
767 public DocumentWordProbability clone(DocumentWordProbability object) {
768 DocumentWordProbability result = new DocumentWordProbability();
769 if (object == null) return result;
770 result.document = object.document;
771 result.word = object.word;
772 result.probability = object.probability;
773 return result;
774 }
775
776 public void process(DocumentWordProbability object) throws IOException {
777 boolean processAll = false;
778 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
779 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
780 processor.processTuple(object.probability);
781 }
782
783 public Class<DocumentWordProbability> getInputClass() {
784 return DocumentWordProbability.class;
785 }
786
787 public void close() throws IOException {
788 processor.close();
789 }
790 }
791 }
792 public static class WordOrder implements Order<DocumentWordProbability> {
793 public int hash(DocumentWordProbability object) {
794 int h = 0;
795 h += Utility.hash(object.word);
796 return h;
797 }
798 public Comparator<DocumentWordProbability> greaterThan() {
799 return new Comparator<DocumentWordProbability>() {
800 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
801 int result = 0;
802 do {
803 result = + Utility.compare(one.word, two.word);
804 if(result != 0) break;
805 } while (false);
806 return -result;
807 }
808 };
809 }
810 public Comparator<DocumentWordProbability> lessThan() {
811 return new Comparator<DocumentWordProbability>() {
812 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
813 int result = 0;
814 do {
815 result = + Utility.compare(one.word, two.word);
816 if(result != 0) break;
817 } while (false);
818 return result;
819 }
820 };
821 }
822 public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input) {
823 return new ShreddedReader(_input);
824 }
825
826 public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input, int bufferSize) {
827 return new ShreddedReader(_input, bufferSize);
828 }
829 public OrderedWriter<DocumentWordProbability> orderedWriter(ArrayOutput _output) {
830 ShreddedWriter w = new ShreddedWriter(_output);
831 return new OrderedWriterClass(w);
832 }
833 public static class OrderedWriterClass extends OrderedWriter< DocumentWordProbability > {
834 DocumentWordProbability last = null;
835 ShreddedWriter shreddedWriter = null;
836
837 public OrderedWriterClass(ShreddedWriter s) {
838 this.shreddedWriter = s;
839 }
840
841 public void process(DocumentWordProbability object) throws IOException {
842 boolean processAll = false;
843 if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
844 shreddedWriter.processTuple(object.document, object.probability);
845 last = object;
846 }
847
848 public void close() throws IOException {
849 shreddedWriter.close();
850 }
851
852 public Class<DocumentWordProbability> getInputClass() {
853 return DocumentWordProbability.class;
854 }
855 }
856 public ReaderSource<DocumentWordProbability> orderedCombiner(Collection<TypeReader<DocumentWordProbability>> readers, boolean closeOnExit) {
857 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
858
859 for (TypeReader<DocumentWordProbability> reader : readers) {
860 shreddedReaders.add((ShreddedReader)reader);
861 }
862
863 return new ShreddedCombiner(shreddedReaders, closeOnExit);
864 }
865 public DocumentWordProbability clone(DocumentWordProbability object) {
866 DocumentWordProbability result = new DocumentWordProbability();
867 if (object == null) return result;
868 result.document = object.document;
869 result.word = object.word;
870 result.probability = object.probability;
871 return result;
872 }
873 public Class<DocumentWordProbability> getOrderedClass() {
874 return DocumentWordProbability.class;
875 }
876 public String[] getOrderSpec() {
877 return new String[] {"+word"};
878 }
879
880 public static String getSpecString() {
881 return "+word";
882 }
883
884 public interface ShreddedProcessor extends Step {
885 public void processWord(byte[] word) throws IOException;
886 public void processTuple(String document, double probability) throws IOException;
887 public void close() throws IOException;
888 }
889 public interface ShreddedSource extends Step {
890 }
891
892 public static class ShreddedWriter implements ShreddedProcessor {
893 ArrayOutput output;
894 ShreddedBuffer buffer = new ShreddedBuffer();
895 byte[] lastWord;
896 boolean lastFlush = false;
897
898 public ShreddedWriter(ArrayOutput output) {
899 this.output = output;
900 }
901
902 public void close() throws IOException {
903 flush();
904 }
905
906 public void processWord(byte[] word) {
907 lastWord = word;
908 buffer.processWord(word);
909 }
910 public final void processTuple(String document, double probability) throws IOException {
911 if (lastFlush) {
912 if(buffer.words.size() == 0) buffer.processWord(lastWord);
913 lastFlush = false;
914 }
915 buffer.processTuple(document, probability);
916 if (buffer.isFull())
917 flush();
918 }
919 public final void flushTuples(int pauseIndex) throws IOException {
920
921 while (buffer.getReadIndex() < pauseIndex) {
922
923 output.writeString(buffer.getDocument());
924 output.writeDouble(buffer.getProbability());
925 buffer.incrementTuple();
926 }
927 }
928 public final void flushWord(int pauseIndex) throws IOException {
929 while (buffer.getReadIndex() < pauseIndex) {
930 int nextPause = buffer.getWordEndIndex();
931 int count = nextPause - buffer.getReadIndex();
932
933 output.writeBytes(buffer.getWord());
934 output.writeInt(count);
935 buffer.incrementWord();
936
937 flushTuples(nextPause);
938 assert nextPause == buffer.getReadIndex();
939 }
940 }
941 public void flush() throws IOException {
942 flushWord(buffer.getWriteIndex());
943 buffer.reset();
944 lastFlush = true;
945 }
946 }
947 public static class ShreddedBuffer {
948 ArrayList<byte[]> words = new ArrayList();
949 ArrayList<Integer> wordTupleIdx = new ArrayList();
950 int wordReadIdx = 0;
951
952 String[] documents;
953 double[] probabilitys;
954 int writeTupleIndex = 0;
955 int readTupleIndex = 0;
956 int batchSize;
957
958 public ShreddedBuffer(int batchSize) {
959 this.batchSize = batchSize;
960
961 documents = new String[batchSize];
962 probabilitys = new double[batchSize];
963 }
964
965 public ShreddedBuffer() {
966 this(10000);
967 }
968
969 public void processWord(byte[] word) {
970 words.add(word);
971 wordTupleIdx.add(writeTupleIndex);
972 }
973 public void processTuple(String document, double probability) {
974 assert words.size() > 0;
975 documents[writeTupleIndex] = document;
976 probabilitys[writeTupleIndex] = probability;
977 writeTupleIndex++;
978 }
979 public void resetData() {
980 words.clear();
981 wordTupleIdx.clear();
982 writeTupleIndex = 0;
983 }
984
985 public void resetRead() {
986 readTupleIndex = 0;
987 wordReadIdx = 0;
988 }
989
990 public void reset() {
991 resetData();
992 resetRead();
993 }
994 public boolean isFull() {
995 return writeTupleIndex >= batchSize;
996 }
997
998 public boolean isEmpty() {
999 return writeTupleIndex == 0;
1000 }
1001
1002 public boolean isAtEnd() {
1003 return readTupleIndex >= writeTupleIndex;
1004 }
1005 public void incrementWord() {
1006 wordReadIdx++;
1007 }
1008
1009 public void autoIncrementWord() {
1010 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
1011 wordReadIdx++;
1012 }
1013 public void incrementTuple() {
1014 readTupleIndex++;
1015 }
1016 public int getWordEndIndex() {
1017 if ((wordReadIdx+1) >= wordTupleIdx.size())
1018 return writeTupleIndex;
1019 return wordTupleIdx.get(wordReadIdx+1);
1020 }
1021 public int getReadIndex() {
1022 return readTupleIndex;
1023 }
1024
1025 public int getWriteIndex() {
1026 return writeTupleIndex;
1027 }
1028 public byte[] getWord() {
1029 assert readTupleIndex < writeTupleIndex;
1030 assert wordReadIdx < words.size();
1031
1032 return words.get(wordReadIdx);
1033 }
1034 public String getDocument() {
1035 assert readTupleIndex < writeTupleIndex;
1036 return documents[readTupleIndex];
1037 }
1038 public double getProbability() {
1039 assert readTupleIndex < writeTupleIndex;
1040 return probabilitys[readTupleIndex];
1041 }
1042 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1043 while (getReadIndex() < endIndex) {
1044 output.processTuple(getDocument(), getProbability());
1045 incrementTuple();
1046 }
1047 }
1048 public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
1049 while (getReadIndex() < endIndex) {
1050 output.processWord(getWord());
1051 assert getWordEndIndex() <= endIndex;
1052 copyTuples(getWordEndIndex(), output);
1053 incrementWord();
1054 }
1055 }
1056 public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1057 while (!isAtEnd()) {
1058 if (other != null) {
1059 assert !other.isAtEnd();
1060 int c = + Utility.compare(getWord(), other.getWord());
1061
1062 if (c > 0) {
1063 break;
1064 }
1065
1066 output.processWord(getWord());
1067
1068 copyTuples(getWordEndIndex(), output);
1069 } else {
1070 output.processWord(getWord());
1071 copyTuples(getWordEndIndex(), output);
1072 }
1073 incrementWord();
1074
1075
1076 }
1077 }
1078 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1079 copyUntilWord(other, output);
1080 }
1081
1082 }
1083 public static class ShreddedCombiner implements ReaderSource<DocumentWordProbability>, ShreddedSource {
1084 public ShreddedProcessor processor;
1085 Collection<ShreddedReader> readers;
1086 boolean closeOnExit = false;
1087 boolean uninitialized = true;
1088 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1089
1090 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1091 this.readers = readers;
1092 this.closeOnExit = closeOnExit;
1093 }
1094
1095 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1096 if (processor instanceof ShreddedProcessor) {
1097 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1098 } else if (processor instanceof DocumentWordProbability.Processor) {
1099 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
1100 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1101 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
1102 } else {
1103 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1104 }
1105 }
1106
1107 public Class<DocumentWordProbability> getOutputClass() {
1108 return DocumentWordProbability.class;
1109 }
1110
1111 public void initialize() throws IOException {
1112 for (ShreddedReader reader : readers) {
1113 reader.fill();
1114
1115 if (!reader.getBuffer().isAtEnd())
1116 queue.add(reader);
1117 }
1118
1119 uninitialized = false;
1120 }
1121
1122 public void run() throws IOException {
1123 initialize();
1124
1125 while (queue.size() > 0) {
1126 ShreddedReader top = queue.poll();
1127 ShreddedReader next = null;
1128 ShreddedBuffer nextBuffer = null;
1129
1130 assert !top.getBuffer().isAtEnd();
1131
1132 if (queue.size() > 0) {
1133 next = queue.peek();
1134 nextBuffer = next.getBuffer();
1135 assert !nextBuffer.isAtEnd();
1136 }
1137
1138 top.getBuffer().copyUntil(nextBuffer, processor);
1139 if (top.getBuffer().isAtEnd())
1140 top.fill();
1141
1142 if (!top.getBuffer().isAtEnd())
1143 queue.add(top);
1144 }
1145
1146 if (closeOnExit)
1147 processor.close();
1148 }
1149
1150 public DocumentWordProbability read() throws IOException {
1151 if (uninitialized)
1152 initialize();
1153
1154 DocumentWordProbability result = null;
1155
1156 while (queue.size() > 0) {
1157 ShreddedReader top = queue.poll();
1158 result = top.read();
1159
1160 if (result != null) {
1161 if (top.getBuffer().isAtEnd())
1162 top.fill();
1163
1164 queue.offer(top);
1165 break;
1166 }
1167 }
1168
1169 return result;
1170 }
1171 }
1172 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordProbability>, ShreddedSource {
1173 public ShreddedProcessor processor;
1174 ShreddedBuffer buffer;
1175 DocumentWordProbability last = new DocumentWordProbability();
1176 long updateWordCount = -1;
1177 long tupleCount = 0;
1178 long bufferStartCount = 0;
1179 ArrayInput input;
1180
1181 public ShreddedReader(ArrayInput input) {
1182 this.input = input;
1183 this.buffer = new ShreddedBuffer();
1184 }
1185
1186 public ShreddedReader(ArrayInput input, int bufferSize) {
1187 this.input = input;
1188 this.buffer = new ShreddedBuffer(bufferSize);
1189 }
1190
1191 public final int compareTo(ShreddedReader other) {
1192 ShreddedBuffer otherBuffer = other.getBuffer();
1193
1194 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1195 return 0;
1196 } else if (buffer.isAtEnd()) {
1197 return -1;
1198 } else if (otherBuffer.isAtEnd()) {
1199 return 1;
1200 }
1201
1202 int result = 0;
1203 do {
1204 result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
1205 if(result != 0) break;
1206 } while (false);
1207
1208 return result;
1209 }
1210
1211 public final ShreddedBuffer getBuffer() {
1212 return buffer;
1213 }
1214
1215 public final DocumentWordProbability read() throws IOException {
1216 if (buffer.isAtEnd()) {
1217 fill();
1218
1219 if (buffer.isAtEnd()) {
1220 return null;
1221 }
1222 }
1223
1224 assert !buffer.isAtEnd();
1225 DocumentWordProbability result = new DocumentWordProbability();
1226
1227 result.word = buffer.getWord();
1228 result.document = buffer.getDocument();
1229 result.probability = buffer.getProbability();
1230
1231 buffer.incrementTuple();
1232 buffer.autoIncrementWord();
1233
1234 return result;
1235 }
1236
1237 public final void fill() throws IOException {
1238 try {
1239 buffer.reset();
1240
1241 if (tupleCount != 0) {
1242
1243 if(updateWordCount - tupleCount > 0) {
1244 buffer.words.add(last.word);
1245 buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
1246 }
1247 bufferStartCount = tupleCount;
1248 }
1249
1250 while (!buffer.isFull()) {
1251 updateWord();
1252 buffer.processTuple(input.readString(), input.readDouble());
1253 tupleCount++;
1254 }
1255 } catch(EOFException e) {}
1256 }
1257
1258 public final void updateWord() throws IOException {
1259 if (updateWordCount > tupleCount)
1260 return;
1261
1262 last.word = input.readBytes();
1263 updateWordCount = tupleCount + input.readInt();
1264
1265 buffer.processWord(last.word);
1266 }
1267
1268 public void run() throws IOException {
1269 while (true) {
1270 fill();
1271
1272 if (buffer.isAtEnd())
1273 break;
1274
1275 buffer.copyUntil(null, processor);
1276 }
1277 processor.close();
1278 }
1279
1280 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1281 if (processor instanceof ShreddedProcessor) {
1282 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1283 } else if (processor instanceof DocumentWordProbability.Processor) {
1284 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
1285 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1286 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
1287 } else {
1288 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1289 }
1290 }
1291
1292 public Class<DocumentWordProbability> getOutputClass() {
1293 return DocumentWordProbability.class;
1294 }
1295 }
1296
1297 public static class DuplicateEliminator implements ShreddedProcessor {
1298 public ShreddedProcessor processor;
1299 DocumentWordProbability last = new DocumentWordProbability();
1300 boolean wordProcess = true;
1301
1302 public DuplicateEliminator() {}
1303 public DuplicateEliminator(ShreddedProcessor processor) {
1304 this.processor = processor;
1305 }
1306
1307 public void setShreddedProcessor(ShreddedProcessor processor) {
1308 this.processor = processor;
1309 }
1310
1311 public void processWord(byte[] word) throws IOException {
1312 if (wordProcess || Utility.compare(word, last.word) != 0) {
1313 last.word = word;
1314 processor.processWord(word);
1315 wordProcess = false;
1316 }
1317 }
1318
1319 public void resetWord() {
1320 wordProcess = true;
1321 }
1322
1323 public void processTuple(String document, double probability) throws IOException {
1324 processor.processTuple(document, probability);
1325 }
1326
1327 public void close() throws IOException {
1328 processor.close();
1329 }
1330 }
1331 public static class TupleUnshredder implements ShreddedProcessor {
1332 DocumentWordProbability last = new DocumentWordProbability();
1333 public org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor;
1334
1335 public TupleUnshredder(DocumentWordProbability.Processor processor) {
1336 this.processor = processor;
1337 }
1338
1339 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor) {
1340 this.processor = processor;
1341 }
1342
1343 public DocumentWordProbability clone(DocumentWordProbability object) {
1344 DocumentWordProbability result = new DocumentWordProbability();
1345 if (object == null) return result;
1346 result.document = object.document;
1347 result.word = object.word;
1348 result.probability = object.probability;
1349 return result;
1350 }
1351
1352 public void processWord(byte[] word) throws IOException {
1353 last.word = word;
1354 }
1355
1356
1357 public void processTuple(String document, double probability) throws IOException {
1358 last.document = document;
1359 last.probability = probability;
1360 processor.process(clone(last));
1361 }
1362
1363 public void close() throws IOException {
1364 processor.close();
1365 }
1366 }
1367 public static class TupleShredder implements Processor {
1368 DocumentWordProbability last = new DocumentWordProbability();
1369 public ShreddedProcessor processor;
1370
1371 public TupleShredder(ShreddedProcessor processor) {
1372 this.processor = processor;
1373 }
1374
1375 public DocumentWordProbability clone(DocumentWordProbability object) {
1376 DocumentWordProbability result = new DocumentWordProbability();
1377 if (object == null) return result;
1378 result.document = object.document;
1379 result.word = object.word;
1380 result.probability = object.probability;
1381 return result;
1382 }
1383
1384 public void process(DocumentWordProbability object) throws IOException {
1385 boolean processAll = false;
1386 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
1387 processor.processTuple(object.document, object.probability);
1388 }
1389
1390 public Class<DocumentWordProbability> getInputClass() {
1391 return DocumentWordProbability.class;
1392 }
1393
1394 public void close() throws IOException {
1395 processor.close();
1396 }
1397 }
1398 }
1399 public static class DocumentOrder implements Order<DocumentWordProbability> {
1400 public int hash(DocumentWordProbability object) {
1401 int h = 0;
1402 h += Utility.hash(object.document);
1403 return h;
1404 }
1405 public Comparator<DocumentWordProbability> greaterThan() {
1406 return new Comparator<DocumentWordProbability>() {
1407 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
1408 int result = 0;
1409 do {
1410 result = + Utility.compare(one.document, two.document);
1411 if(result != 0) break;
1412 } while (false);
1413 return -result;
1414 }
1415 };
1416 }
1417 public Comparator<DocumentWordProbability> lessThan() {
1418 return new Comparator<DocumentWordProbability>() {
1419 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
1420 int result = 0;
1421 do {
1422 result = + Utility.compare(one.document, two.document);
1423 if(result != 0) break;
1424 } while (false);
1425 return result;
1426 }
1427 };
1428 }
1429 public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input) {
1430 return new ShreddedReader(_input);
1431 }
1432
1433 public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input, int bufferSize) {
1434 return new ShreddedReader(_input, bufferSize);
1435 }
1436 public OrderedWriter<DocumentWordProbability> orderedWriter(ArrayOutput _output) {
1437 ShreddedWriter w = new ShreddedWriter(_output);
1438 return new OrderedWriterClass(w);
1439 }
1440 public static class OrderedWriterClass extends OrderedWriter< DocumentWordProbability > {
1441 DocumentWordProbability last = null;
1442 ShreddedWriter shreddedWriter = null;
1443
1444 public OrderedWriterClass(ShreddedWriter s) {
1445 this.shreddedWriter = s;
1446 }
1447
1448 public void process(DocumentWordProbability object) throws IOException {
1449 boolean processAll = false;
1450 if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
1451 shreddedWriter.processTuple(object.word, object.probability);
1452 last = object;
1453 }
1454
1455 public void close() throws IOException {
1456 shreddedWriter.close();
1457 }
1458
1459 public Class<DocumentWordProbability> getInputClass() {
1460 return DocumentWordProbability.class;
1461 }
1462 }
1463 public ReaderSource<DocumentWordProbability> orderedCombiner(Collection<TypeReader<DocumentWordProbability>> readers, boolean closeOnExit) {
1464 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
1465
1466 for (TypeReader<DocumentWordProbability> reader : readers) {
1467 shreddedReaders.add((ShreddedReader)reader);
1468 }
1469
1470 return new ShreddedCombiner(shreddedReaders, closeOnExit);
1471 }
1472 public DocumentWordProbability clone(DocumentWordProbability object) {
1473 DocumentWordProbability result = new DocumentWordProbability();
1474 if (object == null) return result;
1475 result.document = object.document;
1476 result.word = object.word;
1477 result.probability = object.probability;
1478 return result;
1479 }
1480 public Class<DocumentWordProbability> getOrderedClass() {
1481 return DocumentWordProbability.class;
1482 }
1483 public String[] getOrderSpec() {
1484 return new String[] {"+document"};
1485 }
1486
1487 public static String getSpecString() {
1488 return "+document";
1489 }
1490
1491 public interface ShreddedProcessor extends Step {
1492 public void processDocument(String document) throws IOException;
1493 public void processTuple(byte[] word, double probability) throws IOException;
1494 public void close() throws IOException;
1495 }
1496 public interface ShreddedSource extends Step {
1497 }
1498
1499 public static class ShreddedWriter implements ShreddedProcessor {
1500 ArrayOutput output;
1501 ShreddedBuffer buffer = new ShreddedBuffer();
1502 String lastDocument;
1503 boolean lastFlush = false;
1504
1505 public ShreddedWriter(ArrayOutput output) {
1506 this.output = output;
1507 }
1508
1509 public void close() throws IOException {
1510 flush();
1511 }
1512
1513 public void processDocument(String document) {
1514 lastDocument = document;
1515 buffer.processDocument(document);
1516 }
1517 public final void processTuple(byte[] word, double probability) throws IOException {
1518 if (lastFlush) {
1519 if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
1520 lastFlush = false;
1521 }
1522 buffer.processTuple(word, probability);
1523 if (buffer.isFull())
1524 flush();
1525 }
1526 public final void flushTuples(int pauseIndex) throws IOException {
1527
1528 while (buffer.getReadIndex() < pauseIndex) {
1529
1530 output.writeBytes(buffer.getWord());
1531 output.writeDouble(buffer.getProbability());
1532 buffer.incrementTuple();
1533 }
1534 }
1535 public final void flushDocument(int pauseIndex) throws IOException {
1536 while (buffer.getReadIndex() < pauseIndex) {
1537 int nextPause = buffer.getDocumentEndIndex();
1538 int count = nextPause - buffer.getReadIndex();
1539
1540 output.writeString(buffer.getDocument());
1541 output.writeInt(count);
1542 buffer.incrementDocument();
1543
1544 flushTuples(nextPause);
1545 assert nextPause == buffer.getReadIndex();
1546 }
1547 }
1548 public void flush() throws IOException {
1549 flushDocument(buffer.getWriteIndex());
1550 buffer.reset();
1551 lastFlush = true;
1552 }
1553 }
1554 public static class ShreddedBuffer {
1555 ArrayList<String> documents = new ArrayList();
1556 ArrayList<Integer> documentTupleIdx = new ArrayList();
1557 int documentReadIdx = 0;
1558
1559 byte[][] words;
1560 double[] probabilitys;
1561 int writeTupleIndex = 0;
1562 int readTupleIndex = 0;
1563 int batchSize;
1564
1565 public ShreddedBuffer(int batchSize) {
1566 this.batchSize = batchSize;
1567
1568 words = new byte[batchSize][];
1569 probabilitys = new double[batchSize];
1570 }
1571
1572 public ShreddedBuffer() {
1573 this(10000);
1574 }
1575
1576 public void processDocument(String document) {
1577 documents.add(document);
1578 documentTupleIdx.add(writeTupleIndex);
1579 }
1580 public void processTuple(byte[] word, double probability) {
1581 assert documents.size() > 0;
1582 words[writeTupleIndex] = word;
1583 probabilitys[writeTupleIndex] = probability;
1584 writeTupleIndex++;
1585 }
1586 public void resetData() {
1587 documents.clear();
1588 documentTupleIdx.clear();
1589 writeTupleIndex = 0;
1590 }
1591
1592 public void resetRead() {
1593 readTupleIndex = 0;
1594 documentReadIdx = 0;
1595 }
1596
1597 public void reset() {
1598 resetData();
1599 resetRead();
1600 }
1601 public boolean isFull() {
1602 return writeTupleIndex >= batchSize;
1603 }
1604
1605 public boolean isEmpty() {
1606 return writeTupleIndex == 0;
1607 }
1608
1609 public boolean isAtEnd() {
1610 return readTupleIndex >= writeTupleIndex;
1611 }
1612 public void incrementDocument() {
1613 documentReadIdx++;
1614 }
1615
1616 public void autoIncrementDocument() {
1617 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
1618 documentReadIdx++;
1619 }
1620 public void incrementTuple() {
1621 readTupleIndex++;
1622 }
1623 public int getDocumentEndIndex() {
1624 if ((documentReadIdx+1) >= documentTupleIdx.size())
1625 return writeTupleIndex;
1626 return documentTupleIdx.get(documentReadIdx+1);
1627 }
1628 public int getReadIndex() {
1629 return readTupleIndex;
1630 }
1631
1632 public int getWriteIndex() {
1633 return writeTupleIndex;
1634 }
1635 public String getDocument() {
1636 assert readTupleIndex < writeTupleIndex;
1637 assert documentReadIdx < documents.size();
1638
1639 return documents.get(documentReadIdx);
1640 }
1641 public byte[] getWord() {
1642 assert readTupleIndex < writeTupleIndex;
1643 return words[readTupleIndex];
1644 }
1645 public double getProbability() {
1646 assert readTupleIndex < writeTupleIndex;
1647 return probabilitys[readTupleIndex];
1648 }
1649 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1650 while (getReadIndex() < endIndex) {
1651 output.processTuple(getWord(), getProbability());
1652 incrementTuple();
1653 }
1654 }
1655 public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
1656 while (getReadIndex() < endIndex) {
1657 output.processDocument(getDocument());
1658 assert getDocumentEndIndex() <= endIndex;
1659 copyTuples(getDocumentEndIndex(), output);
1660 incrementDocument();
1661 }
1662 }
1663 public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1664 while (!isAtEnd()) {
1665 if (other != null) {
1666 assert !other.isAtEnd();
1667 int c = + Utility.compare(getDocument(), other.getDocument());
1668
1669 if (c > 0) {
1670 break;
1671 }
1672
1673 output.processDocument(getDocument());
1674
1675 copyTuples(getDocumentEndIndex(), output);
1676 } else {
1677 output.processDocument(getDocument());
1678 copyTuples(getDocumentEndIndex(), output);
1679 }
1680 incrementDocument();
1681
1682
1683 }
1684 }
1685 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1686 copyUntilDocument(other, output);
1687 }
1688
1689 }
1690 public static class ShreddedCombiner implements ReaderSource<DocumentWordProbability>, ShreddedSource {
1691 public ShreddedProcessor processor;
1692 Collection<ShreddedReader> readers;
1693 boolean closeOnExit = false;
1694 boolean uninitialized = true;
1695 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1696
1697 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1698 this.readers = readers;
1699 this.closeOnExit = closeOnExit;
1700 }
1701
1702 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1703 if (processor instanceof ShreddedProcessor) {
1704 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1705 } else if (processor instanceof DocumentWordProbability.Processor) {
1706 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
1707 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1708 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
1709 } else {
1710 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1711 }
1712 }
1713
1714 public Class<DocumentWordProbability> getOutputClass() {
1715 return DocumentWordProbability.class;
1716 }
1717
1718 public void initialize() throws IOException {
1719 for (ShreddedReader reader : readers) {
1720 reader.fill();
1721
1722 if (!reader.getBuffer().isAtEnd())
1723 queue.add(reader);
1724 }
1725
1726 uninitialized = false;
1727 }
1728
1729 public void run() throws IOException {
1730 initialize();
1731
1732 while (queue.size() > 0) {
1733 ShreddedReader top = queue.poll();
1734 ShreddedReader next = null;
1735 ShreddedBuffer nextBuffer = null;
1736
1737 assert !top.getBuffer().isAtEnd();
1738
1739 if (queue.size() > 0) {
1740 next = queue.peek();
1741 nextBuffer = next.getBuffer();
1742 assert !nextBuffer.isAtEnd();
1743 }
1744
1745 top.getBuffer().copyUntil(nextBuffer, processor);
1746 if (top.getBuffer().isAtEnd())
1747 top.fill();
1748
1749 if (!top.getBuffer().isAtEnd())
1750 queue.add(top);
1751 }
1752
1753 if (closeOnExit)
1754 processor.close();
1755 }
1756
1757 public DocumentWordProbability read() throws IOException {
1758 if (uninitialized)
1759 initialize();
1760
1761 DocumentWordProbability result = null;
1762
1763 while (queue.size() > 0) {
1764 ShreddedReader top = queue.poll();
1765 result = top.read();
1766
1767 if (result != null) {
1768 if (top.getBuffer().isAtEnd())
1769 top.fill();
1770
1771 queue.offer(top);
1772 break;
1773 }
1774 }
1775
1776 return result;
1777 }
1778 }
1779 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordProbability>, ShreddedSource {
1780 public ShreddedProcessor processor;
1781 ShreddedBuffer buffer;
1782 DocumentWordProbability last = new DocumentWordProbability();
1783 long updateDocumentCount = -1;
1784 long tupleCount = 0;
1785 long bufferStartCount = 0;
1786 ArrayInput input;
1787
1788 public ShreddedReader(ArrayInput input) {
1789 this.input = input;
1790 this.buffer = new ShreddedBuffer();
1791 }
1792
1793 public ShreddedReader(ArrayInput input, int bufferSize) {
1794 this.input = input;
1795 this.buffer = new ShreddedBuffer(bufferSize);
1796 }
1797
1798 public final int compareTo(ShreddedReader other) {
1799 ShreddedBuffer otherBuffer = other.getBuffer();
1800
1801 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1802 return 0;
1803 } else if (buffer.isAtEnd()) {
1804 return -1;
1805 } else if (otherBuffer.isAtEnd()) {
1806 return 1;
1807 }
1808
1809 int result = 0;
1810 do {
1811 result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
1812 if(result != 0) break;
1813 } while (false);
1814
1815 return result;
1816 }
1817
1818 public final ShreddedBuffer getBuffer() {
1819 return buffer;
1820 }
1821
1822 public final DocumentWordProbability read() throws IOException {
1823 if (buffer.isAtEnd()) {
1824 fill();
1825
1826 if (buffer.isAtEnd()) {
1827 return null;
1828 }
1829 }
1830
1831 assert !buffer.isAtEnd();
1832 DocumentWordProbability result = new DocumentWordProbability();
1833
1834 result.document = buffer.getDocument();
1835 result.word = buffer.getWord();
1836 result.probability = buffer.getProbability();
1837
1838 buffer.incrementTuple();
1839 buffer.autoIncrementDocument();
1840
1841 return result;
1842 }
1843
1844 public final void fill() throws IOException {
1845 try {
1846 buffer.reset();
1847
1848 if (tupleCount != 0) {
1849
1850 if(updateDocumentCount - tupleCount > 0) {
1851 buffer.documents.add(last.document);
1852 buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
1853 }
1854 bufferStartCount = tupleCount;
1855 }
1856
1857 while (!buffer.isFull()) {
1858 updateDocument();
1859 buffer.processTuple(input.readBytes(), input.readDouble());
1860 tupleCount++;
1861 }
1862 } catch(EOFException e) {}
1863 }
1864
1865 public final void updateDocument() throws IOException {
1866 if (updateDocumentCount > tupleCount)
1867 return;
1868
1869 last.document = input.readString();
1870 updateDocumentCount = tupleCount + input.readInt();
1871
1872 buffer.processDocument(last.document);
1873 }
1874
1875 public void run() throws IOException {
1876 while (true) {
1877 fill();
1878
1879 if (buffer.isAtEnd())
1880 break;
1881
1882 buffer.copyUntil(null, processor);
1883 }
1884 processor.close();
1885 }
1886
1887 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1888 if (processor instanceof ShreddedProcessor) {
1889 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1890 } else if (processor instanceof DocumentWordProbability.Processor) {
1891 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
1892 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1893 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
1894 } else {
1895 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1896 }
1897 }
1898
1899 public Class<DocumentWordProbability> getOutputClass() {
1900 return DocumentWordProbability.class;
1901 }
1902 }
1903
1904 public static class DuplicateEliminator implements ShreddedProcessor {
1905 public ShreddedProcessor processor;
1906 DocumentWordProbability last = new DocumentWordProbability();
1907 boolean documentProcess = true;
1908
1909 public DuplicateEliminator() {}
1910 public DuplicateEliminator(ShreddedProcessor processor) {
1911 this.processor = processor;
1912 }
1913
1914 public void setShreddedProcessor(ShreddedProcessor processor) {
1915 this.processor = processor;
1916 }
1917
1918 public void processDocument(String document) throws IOException {
1919 if (documentProcess || Utility.compare(document, last.document) != 0) {
1920 last.document = document;
1921 processor.processDocument(document);
1922 documentProcess = false;
1923 }
1924 }
1925
1926 public void resetDocument() {
1927 documentProcess = true;
1928 }
1929
1930 public void processTuple(byte[] word, double probability) throws IOException {
1931 processor.processTuple(word, probability);
1932 }
1933
1934 public void close() throws IOException {
1935 processor.close();
1936 }
1937 }
1938 public static class TupleUnshredder implements ShreddedProcessor {
1939 DocumentWordProbability last = new DocumentWordProbability();
1940 public org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor;
1941
1942 public TupleUnshredder(DocumentWordProbability.Processor processor) {
1943 this.processor = processor;
1944 }
1945
1946 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor) {
1947 this.processor = processor;
1948 }
1949
1950 public DocumentWordProbability clone(DocumentWordProbability object) {
1951 DocumentWordProbability result = new DocumentWordProbability();
1952 if (object == null) return result;
1953 result.document = object.document;
1954 result.word = object.word;
1955 result.probability = object.probability;
1956 return result;
1957 }
1958
1959 public void processDocument(String document) throws IOException {
1960 last.document = document;
1961 }
1962
1963
1964 public void processTuple(byte[] word, double probability) throws IOException {
1965 last.word = word;
1966 last.probability = probability;
1967 processor.process(clone(last));
1968 }
1969
1970 public void close() throws IOException {
1971 processor.close();
1972 }
1973 }
1974 public static class TupleShredder implements Processor {
1975 DocumentWordProbability last = new DocumentWordProbability();
1976 public ShreddedProcessor processor;
1977
1978 public TupleShredder(ShreddedProcessor processor) {
1979 this.processor = processor;
1980 }
1981
1982 public DocumentWordProbability clone(DocumentWordProbability object) {
1983 DocumentWordProbability result = new DocumentWordProbability();
1984 if (object == null) return result;
1985 result.document = object.document;
1986 result.word = object.word;
1987 result.probability = object.probability;
1988 return result;
1989 }
1990
1991 public void process(DocumentWordProbability object) throws IOException {
1992 boolean processAll = false;
1993 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
1994 processor.processTuple(object.word, object.probability);
1995 }
1996
1997 public Class<DocumentWordProbability> getInputClass() {
1998 return DocumentWordProbability.class;
1999 }
2000
2001 public void close() throws IOException {
2002 processor.close();
2003 }
2004 }
2005 }
2006 }