1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class DocumentLengthWordCount implements Type<DocumentLengthWordCount> {
25 public String document;
26 public int length;
27 public String word;
28 public int count;
29
30 public DocumentLengthWordCount() {}
31 public DocumentLengthWordCount(String document, int length, String word, int count) {
32 this.document = document;
33 this.length = length;
34 this.word = word;
35 this.count = count;
36 }
37
38 public String toString() {
39 return String.format("%s,%d,%s,%d",
40 document, length, word, count);
41 }
42
43 public Order<DocumentLengthWordCount> getOrder(String... spec) {
44 if (Arrays.equals(spec, new String[] { "+document", "+length" })) {
45 return new DocumentLengthOrder();
46 }
47 if (Arrays.equals(spec, new String[] { "+document", "+word" })) {
48 return new DocumentWordOrder();
49 }
50 if (Arrays.equals(spec, new String[] { "+word", "+document" })) {
51 return new WordDocumentOrder();
52 }
53 if (Arrays.equals(spec, new String[] { "+document" })) {
54 return new DocumentOrder();
55 }
56 return null;
57 }
58
59 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> {
60 public void process(DocumentLengthWordCount object) throws IOException;
61 public void close() throws IOException;
62 }
63 public interface Source extends Step {
64 }
65 public static class DocumentLengthOrder implements Order<DocumentLengthWordCount> {
66 public int hash(DocumentLengthWordCount object) {
67 int h = 0;
68 h += Utility.hash(object.document);
69 h += Utility.hash(object.length);
70 return h;
71 }
72 public Comparator<DocumentLengthWordCount> greaterThan() {
73 return new Comparator<DocumentLengthWordCount>() {
74 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
75 int result = 0;
76 do {
77 result = + Utility.compare(one.document, two.document);
78 if(result != 0) break;
79 result = + Utility.compare(one.length, two.length);
80 if(result != 0) break;
81 } while (false);
82 return -result;
83 }
84 };
85 }
86 public Comparator<DocumentLengthWordCount> lessThan() {
87 return new Comparator<DocumentLengthWordCount>() {
88 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
89 int result = 0;
90 do {
91 result = + Utility.compare(one.document, two.document);
92 if(result != 0) break;
93 result = + Utility.compare(one.length, two.length);
94 if(result != 0) break;
95 } while (false);
96 return result;
97 }
98 };
99 }
100 public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input) {
101 return new ShreddedReader(_input);
102 }
103
104 public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input, int bufferSize) {
105 return new ShreddedReader(_input, bufferSize);
106 }
107 public OrderedWriter<DocumentLengthWordCount> orderedWriter(ArrayOutput _output) {
108 ShreddedWriter w = new ShreddedWriter(_output);
109 return new OrderedWriterClass(w);
110 }
111 public static class OrderedWriterClass extends OrderedWriter< DocumentLengthWordCount > {
112 DocumentLengthWordCount last = null;
113 ShreddedWriter shreddedWriter = null;
114
115 public OrderedWriterClass(ShreddedWriter s) {
116 this.shreddedWriter = s;
117 }
118
119 public void process(DocumentLengthWordCount object) throws IOException {
120 boolean processAll = false;
121 if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
122 if (processAll || last == null || 0 != Utility.compare(object.length, last.length)) { processAll = true; shreddedWriter.processLength(object.length); }
123 shreddedWriter.processTuple(object.word, object.count);
124 last = object;
125 }
126
127 public void close() throws IOException {
128 shreddedWriter.close();
129 }
130
131 public Class<DocumentLengthWordCount> getInputClass() {
132 return DocumentLengthWordCount.class;
133 }
134 }
135 public ReaderSource<DocumentLengthWordCount> orderedCombiner(Collection<TypeReader<DocumentLengthWordCount>> readers, boolean closeOnExit) {
136 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
137
138 for (TypeReader<DocumentLengthWordCount> reader : readers) {
139 shreddedReaders.add((ShreddedReader)reader);
140 }
141
142 return new ShreddedCombiner(shreddedReaders, closeOnExit);
143 }
144 public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
145 DocumentLengthWordCount result = new DocumentLengthWordCount();
146 if (object == null) return result;
147 result.document = object.document;
148 result.length = object.length;
149 result.word = object.word;
150 result.count = object.count;
151 return result;
152 }
153 public Class<DocumentLengthWordCount> getOrderedClass() {
154 return DocumentLengthWordCount.class;
155 }
156 public String[] getOrderSpec() {
157 return new String[] {"+document", "+length"};
158 }
159
160 public static String getSpecString() {
161 return "+document +length";
162 }
163
164 public interface ShreddedProcessor extends Step {
165 public void processDocument(String document) throws IOException;
166 public void processLength(int length) throws IOException;
167 public void processTuple(String word, int count) throws IOException;
168 public void close() throws IOException;
169 }
170 public interface ShreddedSource extends Step {
171 }
172
173 public static class ShreddedWriter implements ShreddedProcessor {
174 ArrayOutput output;
175 ShreddedBuffer buffer = new ShreddedBuffer();
176 String lastDocument;
177 int lastLength;
178 boolean lastFlush = false;
179
180 public ShreddedWriter(ArrayOutput output) {
181 this.output = output;
182 }
183
184 public void close() throws IOException {
185 flush();
186 }
187
188 public void processDocument(String document) {
189 lastDocument = document;
190 buffer.processDocument(document);
191 }
192 public void processLength(int length) {
193 lastLength = length;
194 buffer.processLength(length);
195 }
196 public final void processTuple(String word, int count) throws IOException {
197 if (lastFlush) {
198 if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
199 if(buffer.lengths.size() == 0) buffer.processLength(lastLength);
200 lastFlush = false;
201 }
202 buffer.processTuple(word, count);
203 if (buffer.isFull())
204 flush();
205 }
206 public final void flushTuples(int pauseIndex) throws IOException {
207
208 while (buffer.getReadIndex() < pauseIndex) {
209
210 output.writeString(buffer.getWord());
211 output.writeInt(buffer.getCount());
212 buffer.incrementTuple();
213 }
214 }
215 public final void flushDocument(int pauseIndex) throws IOException {
216 while (buffer.getReadIndex() < pauseIndex) {
217 int nextPause = buffer.getDocumentEndIndex();
218 int count = nextPause - buffer.getReadIndex();
219
220 output.writeString(buffer.getDocument());
221 output.writeInt(count);
222 buffer.incrementDocument();
223
224 flushLength(nextPause);
225 assert nextPause == buffer.getReadIndex();
226 }
227 }
228 public final void flushLength(int pauseIndex) throws IOException {
229 while (buffer.getReadIndex() < pauseIndex) {
230 int nextPause = buffer.getLengthEndIndex();
231 int count = nextPause - buffer.getReadIndex();
232
233 output.writeInt(buffer.getLength());
234 output.writeInt(count);
235 buffer.incrementLength();
236
237 flushTuples(nextPause);
238 assert nextPause == buffer.getReadIndex();
239 }
240 }
241 public void flush() throws IOException {
242 flushDocument(buffer.getWriteIndex());
243 buffer.reset();
244 lastFlush = true;
245 }
246 }
247 public static class ShreddedBuffer {
248 ArrayList<String> documents = new ArrayList();
249 ArrayList<Integer> lengths = new ArrayList();
250 ArrayList<Integer> documentTupleIdx = new ArrayList();
251 ArrayList<Integer> lengthTupleIdx = new ArrayList();
252 int documentReadIdx = 0;
253 int lengthReadIdx = 0;
254
255 String[] words;
256 int[] counts;
257 int writeTupleIndex = 0;
258 int readTupleIndex = 0;
259 int batchSize;
260
261 public ShreddedBuffer(int batchSize) {
262 this.batchSize = batchSize;
263
264 words = new String[batchSize];
265 counts = new int[batchSize];
266 }
267
268 public ShreddedBuffer() {
269 this(10000);
270 }
271
272 public void processDocument(String document) {
273 documents.add(document);
274 documentTupleIdx.add(writeTupleIndex);
275 }
276 public void processLength(int length) {
277 lengths.add(length);
278 lengthTupleIdx.add(writeTupleIndex);
279 }
280 public void processTuple(String word, int count) {
281 assert documents.size() > 0;
282 assert lengths.size() > 0;
283 words[writeTupleIndex] = word;
284 counts[writeTupleIndex] = count;
285 writeTupleIndex++;
286 }
287 public void resetData() {
288 documents.clear();
289 lengths.clear();
290 documentTupleIdx.clear();
291 lengthTupleIdx.clear();
292 writeTupleIndex = 0;
293 }
294
295 public void resetRead() {
296 readTupleIndex = 0;
297 documentReadIdx = 0;
298 lengthReadIdx = 0;
299 }
300
301 public void reset() {
302 resetData();
303 resetRead();
304 }
305 public boolean isFull() {
306 return writeTupleIndex >= batchSize;
307 }
308
309 public boolean isEmpty() {
310 return writeTupleIndex == 0;
311 }
312
313 public boolean isAtEnd() {
314 return readTupleIndex >= writeTupleIndex;
315 }
316 public void incrementDocument() {
317 documentReadIdx++;
318 }
319
320 public void autoIncrementDocument() {
321 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
322 documentReadIdx++;
323 }
324 public void incrementLength() {
325 lengthReadIdx++;
326 }
327
328 public void autoIncrementLength() {
329 while (readTupleIndex >= getLengthEndIndex() && readTupleIndex < writeTupleIndex)
330 lengthReadIdx++;
331 }
332 public void incrementTuple() {
333 readTupleIndex++;
334 }
335 public int getDocumentEndIndex() {
336 if ((documentReadIdx+1) >= documentTupleIdx.size())
337 return writeTupleIndex;
338 return documentTupleIdx.get(documentReadIdx+1);
339 }
340
341 public int getLengthEndIndex() {
342 if ((lengthReadIdx+1) >= lengthTupleIdx.size())
343 return writeTupleIndex;
344 return lengthTupleIdx.get(lengthReadIdx+1);
345 }
346 public int getReadIndex() {
347 return readTupleIndex;
348 }
349
350 public int getWriteIndex() {
351 return writeTupleIndex;
352 }
353 public String getDocument() {
354 assert readTupleIndex < writeTupleIndex;
355 assert documentReadIdx < documents.size();
356
357 return documents.get(documentReadIdx);
358 }
359 public int getLength() {
360 assert readTupleIndex < writeTupleIndex;
361 assert lengthReadIdx < lengths.size();
362
363 return lengths.get(lengthReadIdx);
364 }
365 public String getWord() {
366 assert readTupleIndex < writeTupleIndex;
367 return words[readTupleIndex];
368 }
369 public int getCount() {
370 assert readTupleIndex < writeTupleIndex;
371 return counts[readTupleIndex];
372 }
373 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
374 while (getReadIndex() < endIndex) {
375 output.processTuple(getWord(), getCount());
376 incrementTuple();
377 }
378 }
379 public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
380 while (getReadIndex() < endIndex) {
381 output.processDocument(getDocument());
382 assert getDocumentEndIndex() <= endIndex;
383 copyUntilIndexLength(getDocumentEndIndex(), output);
384 incrementDocument();
385 }
386 }
387 public void copyUntilIndexLength(int endIndex, ShreddedProcessor output) throws IOException {
388 while (getReadIndex() < endIndex) {
389 output.processLength(getLength());
390 assert getLengthEndIndex() <= endIndex;
391 copyTuples(getLengthEndIndex(), output);
392 incrementLength();
393 }
394 }
395 public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
396 while (!isAtEnd()) {
397 if (other != null) {
398 assert !other.isAtEnd();
399 int c = + Utility.compare(getDocument(), other.getDocument());
400
401 if (c > 0) {
402 break;
403 }
404
405 output.processDocument(getDocument());
406
407 if (c < 0) {
408 copyUntilIndexLength(getDocumentEndIndex(), output);
409 } else if (c == 0) {
410 copyUntilLength(other, output);
411 autoIncrementDocument();
412 break;
413 }
414 } else {
415 output.processDocument(getDocument());
416 copyUntilIndexLength(getDocumentEndIndex(), output);
417 }
418 incrementDocument();
419
420
421 }
422 }
423 public void copyUntilLength(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
424 while (!isAtEnd()) {
425 if (other != null) {
426 assert !other.isAtEnd();
427 int c = + Utility.compare(getLength(), other.getLength());
428
429 if (c > 0) {
430 break;
431 }
432
433 output.processLength(getLength());
434
435 copyTuples(getLengthEndIndex(), output);
436 } else {
437 output.processLength(getLength());
438 copyTuples(getLengthEndIndex(), output);
439 }
440 incrementLength();
441
442 if (getDocumentEndIndex() <= readTupleIndex)
443 break;
444 }
445 }
446 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
447 copyUntilDocument(other, output);
448 }
449
450 }
451 public static class ShreddedCombiner implements ReaderSource<DocumentLengthWordCount>, ShreddedSource {
452 public ShreddedProcessor processor;
453 Collection<ShreddedReader> readers;
454 boolean closeOnExit = false;
455 boolean uninitialized = true;
456 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
457
458 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
459 this.readers = readers;
460 this.closeOnExit = closeOnExit;
461 }
462
463 public void setProcessor(Step processor) throws IncompatibleProcessorException {
464 if (processor instanceof ShreddedProcessor) {
465 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
466 } else if (processor instanceof DocumentLengthWordCount.Processor) {
467 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
468 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
469 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
470 } else {
471 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
472 }
473 }
474
475 public Class<DocumentLengthWordCount> getOutputClass() {
476 return DocumentLengthWordCount.class;
477 }
478
479 public void initialize() throws IOException {
480 for (ShreddedReader reader : readers) {
481 reader.fill();
482
483 if (!reader.getBuffer().isAtEnd())
484 queue.add(reader);
485 }
486
487 uninitialized = false;
488 }
489
490 public void run() throws IOException {
491 initialize();
492
493 while (queue.size() > 0) {
494 ShreddedReader top = queue.poll();
495 ShreddedReader next = null;
496 ShreddedBuffer nextBuffer = null;
497
498 assert !top.getBuffer().isAtEnd();
499
500 if (queue.size() > 0) {
501 next = queue.peek();
502 nextBuffer = next.getBuffer();
503 assert !nextBuffer.isAtEnd();
504 }
505
506 top.getBuffer().copyUntil(nextBuffer, processor);
507 if (top.getBuffer().isAtEnd())
508 top.fill();
509
510 if (!top.getBuffer().isAtEnd())
511 queue.add(top);
512 }
513
514 if (closeOnExit)
515 processor.close();
516 }
517
518 public DocumentLengthWordCount read() throws IOException {
519 if (uninitialized)
520 initialize();
521
522 DocumentLengthWordCount result = null;
523
524 while (queue.size() > 0) {
525 ShreddedReader top = queue.poll();
526 result = top.read();
527
528 if (result != null) {
529 if (top.getBuffer().isAtEnd())
530 top.fill();
531
532 queue.offer(top);
533 break;
534 }
535 }
536
537 return result;
538 }
539 }
540 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentLengthWordCount>, ShreddedSource {
541 public ShreddedProcessor processor;
542 ShreddedBuffer buffer;
543 DocumentLengthWordCount last = new DocumentLengthWordCount();
544 long updateDocumentCount = -1;
545 long updateLengthCount = -1;
546 long tupleCount = 0;
547 long bufferStartCount = 0;
548 ArrayInput input;
549
550 public ShreddedReader(ArrayInput input) {
551 this.input = input;
552 this.buffer = new ShreddedBuffer();
553 }
554
555 public ShreddedReader(ArrayInput input, int bufferSize) {
556 this.input = input;
557 this.buffer = new ShreddedBuffer(bufferSize);
558 }
559
560 public final int compareTo(ShreddedReader other) {
561 ShreddedBuffer otherBuffer = other.getBuffer();
562
563 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
564 return 0;
565 } else if (buffer.isAtEnd()) {
566 return -1;
567 } else if (otherBuffer.isAtEnd()) {
568 return 1;
569 }
570
571 int result = 0;
572 do {
573 result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
574 if(result != 0) break;
575 result = + Utility.compare(buffer.getLength(), otherBuffer.getLength());
576 if(result != 0) break;
577 } while (false);
578
579 return result;
580 }
581
582 public final ShreddedBuffer getBuffer() {
583 return buffer;
584 }
585
586 public final DocumentLengthWordCount read() throws IOException {
587 if (buffer.isAtEnd()) {
588 fill();
589
590 if (buffer.isAtEnd()) {
591 return null;
592 }
593 }
594
595 assert !buffer.isAtEnd();
596 DocumentLengthWordCount result = new DocumentLengthWordCount();
597
598 result.document = buffer.getDocument();
599 result.length = buffer.getLength();
600 result.word = buffer.getWord();
601 result.count = buffer.getCount();
602
603 buffer.incrementTuple();
604 buffer.autoIncrementDocument();
605 buffer.autoIncrementLength();
606
607 return result;
608 }
609
610 public final void fill() throws IOException {
611 try {
612 buffer.reset();
613
614 if (tupleCount != 0) {
615
616 if(updateDocumentCount - tupleCount > 0) {
617 buffer.documents.add(last.document);
618 buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
619 }
620 if(updateLengthCount - tupleCount > 0) {
621 buffer.lengths.add(last.length);
622 buffer.lengthTupleIdx.add((int) (updateLengthCount - tupleCount));
623 }
624 bufferStartCount = tupleCount;
625 }
626
627 while (!buffer.isFull()) {
628 updateLength();
629 buffer.processTuple(input.readString(), input.readInt());
630 tupleCount++;
631 }
632 } catch(EOFException e) {}
633 }
634
635 public final void updateDocument() throws IOException {
636 if (updateDocumentCount > tupleCount)
637 return;
638
639 last.document = input.readString();
640 updateDocumentCount = tupleCount + input.readInt();
641
642 buffer.processDocument(last.document);
643 }
644 public final void updateLength() throws IOException {
645 if (updateLengthCount > tupleCount)
646 return;
647
648 updateDocument();
649 last.length = input.readInt();
650 updateLengthCount = tupleCount + input.readInt();
651
652 buffer.processLength(last.length);
653 }
654
655 public void run() throws IOException {
656 while (true) {
657 fill();
658
659 if (buffer.isAtEnd())
660 break;
661
662 buffer.copyUntil(null, processor);
663 }
664 processor.close();
665 }
666
667 public void setProcessor(Step processor) throws IncompatibleProcessorException {
668 if (processor instanceof ShreddedProcessor) {
669 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
670 } else if (processor instanceof DocumentLengthWordCount.Processor) {
671 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
672 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
673 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
674 } else {
675 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
676 }
677 }
678
679 public Class<DocumentLengthWordCount> getOutputClass() {
680 return DocumentLengthWordCount.class;
681 }
682 }
683
684 public static class DuplicateEliminator implements ShreddedProcessor {
685 public ShreddedProcessor processor;
686 DocumentLengthWordCount last = new DocumentLengthWordCount();
687 boolean documentProcess = true;
688 boolean lengthProcess = true;
689
690 public DuplicateEliminator() {}
691 public DuplicateEliminator(ShreddedProcessor processor) {
692 this.processor = processor;
693 }
694
695 public void setShreddedProcessor(ShreddedProcessor processor) {
696 this.processor = processor;
697 }
698
699 public void processDocument(String document) throws IOException {
700 if (documentProcess || Utility.compare(document, last.document) != 0) {
701 last.document = document;
702 processor.processDocument(document);
703 resetLength();
704 documentProcess = false;
705 }
706 }
707 public void processLength(int length) throws IOException {
708 if (lengthProcess || Utility.compare(length, last.length) != 0) {
709 last.length = length;
710 processor.processLength(length);
711 lengthProcess = false;
712 }
713 }
714
715 public void resetDocument() {
716 documentProcess = true;
717 resetLength();
718 }
719 public void resetLength() {
720 lengthProcess = true;
721 }
722
723 public void processTuple(String word, int count) throws IOException {
724 processor.processTuple(word, count);
725 }
726
727 public void close() throws IOException {
728 processor.close();
729 }
730 }
731 public static class TupleUnshredder implements ShreddedProcessor {
732 DocumentLengthWordCount last = new DocumentLengthWordCount();
733 public org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor;
734
735 public TupleUnshredder(DocumentLengthWordCount.Processor processor) {
736 this.processor = processor;
737 }
738
739 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor) {
740 this.processor = processor;
741 }
742
743 public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
744 DocumentLengthWordCount result = new DocumentLengthWordCount();
745 if (object == null) return result;
746 result.document = object.document;
747 result.length = object.length;
748 result.word = object.word;
749 result.count = object.count;
750 return result;
751 }
752
753 public void processDocument(String document) throws IOException {
754 last.document = document;
755 }
756
757 public void processLength(int length) throws IOException {
758 last.length = length;
759 }
760
761
762 public void processTuple(String word, int count) throws IOException {
763 last.word = word;
764 last.count = count;
765 processor.process(clone(last));
766 }
767
768 public void close() throws IOException {
769 processor.close();
770 }
771 }
772 public static class TupleShredder implements Processor {
773 DocumentLengthWordCount last = new DocumentLengthWordCount();
774 public ShreddedProcessor processor;
775
776 public TupleShredder(ShreddedProcessor processor) {
777 this.processor = processor;
778 }
779
780 public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
781 DocumentLengthWordCount result = new DocumentLengthWordCount();
782 if (object == null) return result;
783 result.document = object.document;
784 result.length = object.length;
785 result.word = object.word;
786 result.count = object.count;
787 return result;
788 }
789
790 public void process(DocumentLengthWordCount object) throws IOException {
791 boolean processAll = false;
792 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
793 if(last == null || Utility.compare(last.length, object.length) != 0 || processAll) { processor.processLength(object.length); processAll = true; }
794 processor.processTuple(object.word, object.count);
795 }
796
797 public Class<DocumentLengthWordCount> getInputClass() {
798 return DocumentLengthWordCount.class;
799 }
800
801 public void close() throws IOException {
802 processor.close();
803 }
804 }
805 }
806 public static class DocumentWordOrder implements Order<DocumentLengthWordCount> {
807 public int hash(DocumentLengthWordCount object) {
808 int h = 0;
809 h += Utility.hash(object.document);
810 h += Utility.hash(object.word);
811 return h;
812 }
813 public Comparator<DocumentLengthWordCount> greaterThan() {
814 return new Comparator<DocumentLengthWordCount>() {
815 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
816 int result = 0;
817 do {
818 result = + Utility.compare(one.document, two.document);
819 if(result != 0) break;
820 result = + Utility.compare(one.word, two.word);
821 if(result != 0) break;
822 } while (false);
823 return -result;
824 }
825 };
826 }
827 public Comparator<DocumentLengthWordCount> lessThan() {
828 return new Comparator<DocumentLengthWordCount>() {
829 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
830 int result = 0;
831 do {
832 result = + Utility.compare(one.document, two.document);
833 if(result != 0) break;
834 result = + Utility.compare(one.word, two.word);
835 if(result != 0) break;
836 } while (false);
837 return result;
838 }
839 };
840 }
841 public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input) {
842 return new ShreddedReader(_input);
843 }
844
845 public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input, int bufferSize) {
846 return new ShreddedReader(_input, bufferSize);
847 }
848 public OrderedWriter<DocumentLengthWordCount> orderedWriter(ArrayOutput _output) {
849 ShreddedWriter w = new ShreddedWriter(_output);
850 return new OrderedWriterClass(w);
851 }
852 public static class OrderedWriterClass extends OrderedWriter< DocumentLengthWordCount > {
853 DocumentLengthWordCount last = null;
854 ShreddedWriter shreddedWriter = null;
855
856 public OrderedWriterClass(ShreddedWriter s) {
857 this.shreddedWriter = s;
858 }
859
860 public void process(DocumentLengthWordCount object) throws IOException {
861 boolean processAll = false;
862 if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
863 if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
864 shreddedWriter.processTuple(object.length, object.count);
865 last = object;
866 }
867
868 public void close() throws IOException {
869 shreddedWriter.close();
870 }
871
872 public Class<DocumentLengthWordCount> getInputClass() {
873 return DocumentLengthWordCount.class;
874 }
875 }
876 public ReaderSource<DocumentLengthWordCount> orderedCombiner(Collection<TypeReader<DocumentLengthWordCount>> readers, boolean closeOnExit) {
877 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
878
879 for (TypeReader<DocumentLengthWordCount> reader : readers) {
880 shreddedReaders.add((ShreddedReader)reader);
881 }
882
883 return new ShreddedCombiner(shreddedReaders, closeOnExit);
884 }
885 public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
886 DocumentLengthWordCount result = new DocumentLengthWordCount();
887 if (object == null) return result;
888 result.document = object.document;
889 result.length = object.length;
890 result.word = object.word;
891 result.count = object.count;
892 return result;
893 }
894 public Class<DocumentLengthWordCount> getOrderedClass() {
895 return DocumentLengthWordCount.class;
896 }
897 public String[] getOrderSpec() {
898 return new String[] {"+document", "+word"};
899 }
900
901 public static String getSpecString() {
902 return "+document +word";
903 }
904
905 public interface ShreddedProcessor extends Step {
906 public void processDocument(String document) throws IOException;
907 public void processWord(String word) throws IOException;
908 public void processTuple(int length, int count) throws IOException;
909 public void close() throws IOException;
910 }
911 public interface ShreddedSource extends Step {
912 }
913
914 public static class ShreddedWriter implements ShreddedProcessor {
915 ArrayOutput output;
916 ShreddedBuffer buffer = new ShreddedBuffer();
917 String lastDocument;
918 String lastWord;
919 boolean lastFlush = false;
920
921 public ShreddedWriter(ArrayOutput output) {
922 this.output = output;
923 }
924
925 public void close() throws IOException {
926 flush();
927 }
928
929 public void processDocument(String document) {
930 lastDocument = document;
931 buffer.processDocument(document);
932 }
933 public void processWord(String word) {
934 lastWord = word;
935 buffer.processWord(word);
936 }
937 public final void processTuple(int length, int count) throws IOException {
938 if (lastFlush) {
939 if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
940 if(buffer.words.size() == 0) buffer.processWord(lastWord);
941 lastFlush = false;
942 }
943 buffer.processTuple(length, count);
944 if (buffer.isFull())
945 flush();
946 }
947 public final void flushTuples(int pauseIndex) throws IOException {
948
949 while (buffer.getReadIndex() < pauseIndex) {
950
951 output.writeInt(buffer.getLength());
952 output.writeInt(buffer.getCount());
953 buffer.incrementTuple();
954 }
955 }
956 public final void flushDocument(int pauseIndex) throws IOException {
957 while (buffer.getReadIndex() < pauseIndex) {
958 int nextPause = buffer.getDocumentEndIndex();
959 int count = nextPause - buffer.getReadIndex();
960
961 output.writeString(buffer.getDocument());
962 output.writeInt(count);
963 buffer.incrementDocument();
964
965 flushWord(nextPause);
966 assert nextPause == buffer.getReadIndex();
967 }
968 }
969 public final void flushWord(int pauseIndex) throws IOException {
970 while (buffer.getReadIndex() < pauseIndex) {
971 int nextPause = buffer.getWordEndIndex();
972 int count = nextPause - buffer.getReadIndex();
973
974 output.writeString(buffer.getWord());
975 output.writeInt(count);
976 buffer.incrementWord();
977
978 flushTuples(nextPause);
979 assert nextPause == buffer.getReadIndex();
980 }
981 }
982 public void flush() throws IOException {
983 flushDocument(buffer.getWriteIndex());
984 buffer.reset();
985 lastFlush = true;
986 }
987 }
988 public static class ShreddedBuffer {
989 ArrayList<String> documents = new ArrayList();
990 ArrayList<String> words = new ArrayList();
991 ArrayList<Integer> documentTupleIdx = new ArrayList();
992 ArrayList<Integer> wordTupleIdx = new ArrayList();
993 int documentReadIdx = 0;
994 int wordReadIdx = 0;
995
996 int[] lengths;
997 int[] counts;
998 int writeTupleIndex = 0;
999 int readTupleIndex = 0;
1000 int batchSize;
1001
1002 public ShreddedBuffer(int batchSize) {
1003 this.batchSize = batchSize;
1004
1005 lengths = new int[batchSize];
1006 counts = new int[batchSize];
1007 }
1008
1009 public ShreddedBuffer() {
1010 this(10000);
1011 }
1012
1013 public void processDocument(String document) {
1014 documents.add(document);
1015 documentTupleIdx.add(writeTupleIndex);
1016 }
1017 public void processWord(String word) {
1018 words.add(word);
1019 wordTupleIdx.add(writeTupleIndex);
1020 }
1021 public void processTuple(int length, int count) {
1022 assert documents.size() > 0;
1023 assert words.size() > 0;
1024 lengths[writeTupleIndex] = length;
1025 counts[writeTupleIndex] = count;
1026 writeTupleIndex++;
1027 }
1028 public void resetData() {
1029 documents.clear();
1030 words.clear();
1031 documentTupleIdx.clear();
1032 wordTupleIdx.clear();
1033 writeTupleIndex = 0;
1034 }
1035
1036 public void resetRead() {
1037 readTupleIndex = 0;
1038 documentReadIdx = 0;
1039 wordReadIdx = 0;
1040 }
1041
1042 public void reset() {
1043 resetData();
1044 resetRead();
1045 }
1046 public boolean isFull() {
1047 return writeTupleIndex >= batchSize;
1048 }
1049
1050 public boolean isEmpty() {
1051 return writeTupleIndex == 0;
1052 }
1053
1054 public boolean isAtEnd() {
1055 return readTupleIndex >= writeTupleIndex;
1056 }
1057 public void incrementDocument() {
1058 documentReadIdx++;
1059 }
1060
1061 public void autoIncrementDocument() {
1062 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
1063 documentReadIdx++;
1064 }
1065 public void incrementWord() {
1066 wordReadIdx++;
1067 }
1068
1069 public void autoIncrementWord() {
1070 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
1071 wordReadIdx++;
1072 }
1073 public void incrementTuple() {
1074 readTupleIndex++;
1075 }
1076 public int getDocumentEndIndex() {
1077 if ((documentReadIdx+1) >= documentTupleIdx.size())
1078 return writeTupleIndex;
1079 return documentTupleIdx.get(documentReadIdx+1);
1080 }
1081
1082 public int getWordEndIndex() {
1083 if ((wordReadIdx+1) >= wordTupleIdx.size())
1084 return writeTupleIndex;
1085 return wordTupleIdx.get(wordReadIdx+1);
1086 }
1087 public int getReadIndex() {
1088 return readTupleIndex;
1089 }
1090
1091 public int getWriteIndex() {
1092 return writeTupleIndex;
1093 }
1094 public String getDocument() {
1095 assert readTupleIndex < writeTupleIndex;
1096 assert documentReadIdx < documents.size();
1097
1098 return documents.get(documentReadIdx);
1099 }
1100 public String getWord() {
1101 assert readTupleIndex < writeTupleIndex;
1102 assert wordReadIdx < words.size();
1103
1104 return words.get(wordReadIdx);
1105 }
1106 public int getLength() {
1107 assert readTupleIndex < writeTupleIndex;
1108 return lengths[readTupleIndex];
1109 }
1110 public int getCount() {
1111 assert readTupleIndex < writeTupleIndex;
1112 return counts[readTupleIndex];
1113 }
1114 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1115 while (getReadIndex() < endIndex) {
1116 output.processTuple(getLength(), getCount());
1117 incrementTuple();
1118 }
1119 }
1120 public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
1121 while (getReadIndex() < endIndex) {
1122 output.processDocument(getDocument());
1123 assert getDocumentEndIndex() <= endIndex;
1124 copyUntilIndexWord(getDocumentEndIndex(), output);
1125 incrementDocument();
1126 }
1127 }
1128 public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
1129 while (getReadIndex() < endIndex) {
1130 output.processWord(getWord());
1131 assert getWordEndIndex() <= endIndex;
1132 copyTuples(getWordEndIndex(), output);
1133 incrementWord();
1134 }
1135 }
1136 public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1137 while (!isAtEnd()) {
1138 if (other != null) {
1139 assert !other.isAtEnd();
1140 int c = + Utility.compare(getDocument(), other.getDocument());
1141
1142 if (c > 0) {
1143 break;
1144 }
1145
1146 output.processDocument(getDocument());
1147
1148 if (c < 0) {
1149 copyUntilIndexWord(getDocumentEndIndex(), output);
1150 } else if (c == 0) {
1151 copyUntilWord(other, output);
1152 autoIncrementDocument();
1153 break;
1154 }
1155 } else {
1156 output.processDocument(getDocument());
1157 copyUntilIndexWord(getDocumentEndIndex(), output);
1158 }
1159 incrementDocument();
1160
1161
1162 }
1163 }
1164 public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1165 while (!isAtEnd()) {
1166 if (other != null) {
1167 assert !other.isAtEnd();
1168 int c = + Utility.compare(getWord(), other.getWord());
1169
1170 if (c > 0) {
1171 break;
1172 }
1173
1174 output.processWord(getWord());
1175
1176 copyTuples(getWordEndIndex(), output);
1177 } else {
1178 output.processWord(getWord());
1179 copyTuples(getWordEndIndex(), output);
1180 }
1181 incrementWord();
1182
1183 if (getDocumentEndIndex() <= readTupleIndex)
1184 break;
1185 }
1186 }
1187 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1188 copyUntilDocument(other, output);
1189 }
1190
1191 }
1192 public static class ShreddedCombiner implements ReaderSource<DocumentLengthWordCount>, ShreddedSource {
1193 public ShreddedProcessor processor;
1194 Collection<ShreddedReader> readers;
1195 boolean closeOnExit = false;
1196 boolean uninitialized = true;
1197 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1198
1199 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1200 this.readers = readers;
1201 this.closeOnExit = closeOnExit;
1202 }
1203
1204 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1205 if (processor instanceof ShreddedProcessor) {
1206 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1207 } else if (processor instanceof DocumentLengthWordCount.Processor) {
1208 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
1209 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1210 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
1211 } else {
1212 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1213 }
1214 }
1215
1216 public Class<DocumentLengthWordCount> getOutputClass() {
1217 return DocumentLengthWordCount.class;
1218 }
1219
1220 public void initialize() throws IOException {
1221 for (ShreddedReader reader : readers) {
1222 reader.fill();
1223
1224 if (!reader.getBuffer().isAtEnd())
1225 queue.add(reader);
1226 }
1227
1228 uninitialized = false;
1229 }
1230
1231 public void run() throws IOException {
1232 initialize();
1233
1234 while (queue.size() > 0) {
1235 ShreddedReader top = queue.poll();
1236 ShreddedReader next = null;
1237 ShreddedBuffer nextBuffer = null;
1238
1239 assert !top.getBuffer().isAtEnd();
1240
1241 if (queue.size() > 0) {
1242 next = queue.peek();
1243 nextBuffer = next.getBuffer();
1244 assert !nextBuffer.isAtEnd();
1245 }
1246
1247 top.getBuffer().copyUntil(nextBuffer, processor);
1248 if (top.getBuffer().isAtEnd())
1249 top.fill();
1250
1251 if (!top.getBuffer().isAtEnd())
1252 queue.add(top);
1253 }
1254
1255 if (closeOnExit)
1256 processor.close();
1257 }
1258
1259 public DocumentLengthWordCount read() throws IOException {
1260 if (uninitialized)
1261 initialize();
1262
1263 DocumentLengthWordCount result = null;
1264
1265 while (queue.size() > 0) {
1266 ShreddedReader top = queue.poll();
1267 result = top.read();
1268
1269 if (result != null) {
1270 if (top.getBuffer().isAtEnd())
1271 top.fill();
1272
1273 queue.offer(top);
1274 break;
1275 }
1276 }
1277
1278 return result;
1279 }
1280 }
1281 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentLengthWordCount>, ShreddedSource {
1282 public ShreddedProcessor processor;
1283 ShreddedBuffer buffer;
1284 DocumentLengthWordCount last = new DocumentLengthWordCount();
1285 long updateDocumentCount = -1;
1286 long updateWordCount = -1;
1287 long tupleCount = 0;
1288 long bufferStartCount = 0;
1289 ArrayInput input;
1290
1291 public ShreddedReader(ArrayInput input) {
1292 this.input = input;
1293 this.buffer = new ShreddedBuffer();
1294 }
1295
1296 public ShreddedReader(ArrayInput input, int bufferSize) {
1297 this.input = input;
1298 this.buffer = new ShreddedBuffer(bufferSize);
1299 }
1300
1301 public final int compareTo(ShreddedReader other) {
1302 ShreddedBuffer otherBuffer = other.getBuffer();
1303
1304 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1305 return 0;
1306 } else if (buffer.isAtEnd()) {
1307 return -1;
1308 } else if (otherBuffer.isAtEnd()) {
1309 return 1;
1310 }
1311
1312 int result = 0;
1313 do {
1314 result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
1315 if(result != 0) break;
1316 result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
1317 if(result != 0) break;
1318 } while (false);
1319
1320 return result;
1321 }
1322
1323 public final ShreddedBuffer getBuffer() {
1324 return buffer;
1325 }
1326
1327 public final DocumentLengthWordCount read() throws IOException {
1328 if (buffer.isAtEnd()) {
1329 fill();
1330
1331 if (buffer.isAtEnd()) {
1332 return null;
1333 }
1334 }
1335
1336 assert !buffer.isAtEnd();
1337 DocumentLengthWordCount result = new DocumentLengthWordCount();
1338
1339 result.document = buffer.getDocument();
1340 result.word = buffer.getWord();
1341 result.length = buffer.getLength();
1342 result.count = buffer.getCount();
1343
1344 buffer.incrementTuple();
1345 buffer.autoIncrementDocument();
1346 buffer.autoIncrementWord();
1347
1348 return result;
1349 }
1350
1351 public final void fill() throws IOException {
1352 try {
1353 buffer.reset();
1354
1355 if (tupleCount != 0) {
1356
1357 if(updateDocumentCount - tupleCount > 0) {
1358 buffer.documents.add(last.document);
1359 buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
1360 }
1361 if(updateWordCount - tupleCount > 0) {
1362 buffer.words.add(last.word);
1363 buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
1364 }
1365 bufferStartCount = tupleCount;
1366 }
1367
1368 while (!buffer.isFull()) {
1369 updateWord();
1370 buffer.processTuple(input.readInt(), input.readInt());
1371 tupleCount++;
1372 }
1373 } catch(EOFException e) {}
1374 }
1375
1376 public final void updateDocument() throws IOException {
1377 if (updateDocumentCount > tupleCount)
1378 return;
1379
1380 last.document = input.readString();
1381 updateDocumentCount = tupleCount + input.readInt();
1382
1383 buffer.processDocument(last.document);
1384 }
1385 public final void updateWord() throws IOException {
1386 if (updateWordCount > tupleCount)
1387 return;
1388
1389 updateDocument();
1390 last.word = input.readString();
1391 updateWordCount = tupleCount + input.readInt();
1392
1393 buffer.processWord(last.word);
1394 }
1395
1396 public void run() throws IOException {
1397 while (true) {
1398 fill();
1399
1400 if (buffer.isAtEnd())
1401 break;
1402
1403 buffer.copyUntil(null, processor);
1404 }
1405 processor.close();
1406 }
1407
1408 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1409 if (processor instanceof ShreddedProcessor) {
1410 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1411 } else if (processor instanceof DocumentLengthWordCount.Processor) {
1412 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
1413 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1414 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
1415 } else {
1416 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1417 }
1418 }
1419
1420 public Class<DocumentLengthWordCount> getOutputClass() {
1421 return DocumentLengthWordCount.class;
1422 }
1423 }
1424
1425 public static class DuplicateEliminator implements ShreddedProcessor {
1426 public ShreddedProcessor processor;
1427 DocumentLengthWordCount last = new DocumentLengthWordCount();
1428 boolean documentProcess = true;
1429 boolean wordProcess = true;
1430
1431 public DuplicateEliminator() {}
1432 public DuplicateEliminator(ShreddedProcessor processor) {
1433 this.processor = processor;
1434 }
1435
1436 public void setShreddedProcessor(ShreddedProcessor processor) {
1437 this.processor = processor;
1438 }
1439
1440 public void processDocument(String document) throws IOException {
1441 if (documentProcess || Utility.compare(document, last.document) != 0) {
1442 last.document = document;
1443 processor.processDocument(document);
1444 resetWord();
1445 documentProcess = false;
1446 }
1447 }
1448 public void processWord(String word) throws IOException {
1449 if (wordProcess || Utility.compare(word, last.word) != 0) {
1450 last.word = word;
1451 processor.processWord(word);
1452 wordProcess = false;
1453 }
1454 }
1455
1456 public void resetDocument() {
1457 documentProcess = true;
1458 resetWord();
1459 }
1460 public void resetWord() {
1461 wordProcess = true;
1462 }
1463
1464 public void processTuple(int length, int count) throws IOException {
1465 processor.processTuple(length, count);
1466 }
1467
1468 public void close() throws IOException {
1469 processor.close();
1470 }
1471 }
1472 public static class TupleUnshredder implements ShreddedProcessor {
1473 DocumentLengthWordCount last = new DocumentLengthWordCount();
1474 public org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor;
1475
1476 public TupleUnshredder(DocumentLengthWordCount.Processor processor) {
1477 this.processor = processor;
1478 }
1479
1480 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor) {
1481 this.processor = processor;
1482 }
1483
1484 public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
1485 DocumentLengthWordCount result = new DocumentLengthWordCount();
1486 if (object == null) return result;
1487 result.document = object.document;
1488 result.length = object.length;
1489 result.word = object.word;
1490 result.count = object.count;
1491 return result;
1492 }
1493
1494 public void processDocument(String document) throws IOException {
1495 last.document = document;
1496 }
1497
1498 public void processWord(String word) throws IOException {
1499 last.word = word;
1500 }
1501
1502
1503 public void processTuple(int length, int count) throws IOException {
1504 last.length = length;
1505 last.count = count;
1506 processor.process(clone(last));
1507 }
1508
1509 public void close() throws IOException {
1510 processor.close();
1511 }
1512 }
1513 public static class TupleShredder implements Processor {
1514 DocumentLengthWordCount last = new DocumentLengthWordCount();
1515 public ShreddedProcessor processor;
1516
1517 public TupleShredder(ShreddedProcessor processor) {
1518 this.processor = processor;
1519 }
1520
1521 public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
1522 DocumentLengthWordCount result = new DocumentLengthWordCount();
1523 if (object == null) return result;
1524 result.document = object.document;
1525 result.length = object.length;
1526 result.word = object.word;
1527 result.count = object.count;
1528 return result;
1529 }
1530
1531 public void process(DocumentLengthWordCount object) throws IOException {
1532 boolean processAll = false;
1533 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
1534 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
1535 processor.processTuple(object.length, object.count);
1536 }
1537
1538 public Class<DocumentLengthWordCount> getInputClass() {
1539 return DocumentLengthWordCount.class;
1540 }
1541
1542 public void close() throws IOException {
1543 processor.close();
1544 }
1545 }
1546 }
1547 public static class WordDocumentOrder implements Order<DocumentLengthWordCount> {
1548 public int hash(DocumentLengthWordCount object) {
1549 int h = 0;
1550 h += Utility.hash(object.word);
1551 h += Utility.hash(object.document);
1552 return h;
1553 }
1554 public Comparator<DocumentLengthWordCount> greaterThan() {
1555 return new Comparator<DocumentLengthWordCount>() {
1556 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
1557 int result = 0;
1558 do {
1559 result = + Utility.compare(one.word, two.word);
1560 if(result != 0) break;
1561 result = + Utility.compare(one.document, two.document);
1562 if(result != 0) break;
1563 } while (false);
1564 return -result;
1565 }
1566 };
1567 }
1568 public Comparator<DocumentLengthWordCount> lessThan() {
1569 return new Comparator<DocumentLengthWordCount>() {
1570 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
1571 int result = 0;
1572 do {
1573 result = + Utility.compare(one.word, two.word);
1574 if(result != 0) break;
1575 result = + Utility.compare(one.document, two.document);
1576 if(result != 0) break;
1577 } while (false);
1578 return result;
1579 }
1580 };
1581 }
1582 public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input) {
1583 return new ShreddedReader(_input);
1584 }
1585
1586 public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input, int bufferSize) {
1587 return new ShreddedReader(_input, bufferSize);
1588 }
1589 public OrderedWriter<DocumentLengthWordCount> orderedWriter(ArrayOutput _output) {
1590 ShreddedWriter w = new ShreddedWriter(_output);
1591 return new OrderedWriterClass(w);
1592 }
1593 public static class OrderedWriterClass extends OrderedWriter< DocumentLengthWordCount > {
1594 DocumentLengthWordCount last = null;
1595 ShreddedWriter shreddedWriter = null;
1596
1597 public OrderedWriterClass(ShreddedWriter s) {
1598 this.shreddedWriter = s;
1599 }
1600
1601 public void process(DocumentLengthWordCount object) throws IOException {
1602 boolean processAll = false;
1603 if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
1604 if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
1605 shreddedWriter.processTuple(object.length, object.count);
1606 last = object;
1607 }
1608
1609 public void close() throws IOException {
1610 shreddedWriter.close();
1611 }
1612
1613 public Class<DocumentLengthWordCount> getInputClass() {
1614 return DocumentLengthWordCount.class;
1615 }
1616 }
1617 public ReaderSource<DocumentLengthWordCount> orderedCombiner(Collection<TypeReader<DocumentLengthWordCount>> readers, boolean closeOnExit) {
1618 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
1619
1620 for (TypeReader<DocumentLengthWordCount> reader : readers) {
1621 shreddedReaders.add((ShreddedReader)reader);
1622 }
1623
1624 return new ShreddedCombiner(shreddedReaders, closeOnExit);
1625 }
1626 public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
1627 DocumentLengthWordCount result = new DocumentLengthWordCount();
1628 if (object == null) return result;
1629 result.document = object.document;
1630 result.length = object.length;
1631 result.word = object.word;
1632 result.count = object.count;
1633 return result;
1634 }
1635 public Class<DocumentLengthWordCount> getOrderedClass() {
1636 return DocumentLengthWordCount.class;
1637 }
1638 public String[] getOrderSpec() {
1639 return new String[] {"+word", "+document"};
1640 }
1641
1642 public static String getSpecString() {
1643 return "+word +document";
1644 }
1645
1646 public interface ShreddedProcessor extends Step {
1647 public void processWord(String word) throws IOException;
1648 public void processDocument(String document) throws IOException;
1649 public void processTuple(int length, int count) throws IOException;
1650 public void close() throws IOException;
1651 }
1652 public interface ShreddedSource extends Step {
1653 }
1654
1655 public static class ShreddedWriter implements ShreddedProcessor {
1656 ArrayOutput output;
1657 ShreddedBuffer buffer = new ShreddedBuffer();
1658 String lastWord;
1659 String lastDocument;
1660 boolean lastFlush = false;
1661
1662 public ShreddedWriter(ArrayOutput output) {
1663 this.output = output;
1664 }
1665
1666 public void close() throws IOException {
1667 flush();
1668 }
1669
1670 public void processWord(String word) {
1671 lastWord = word;
1672 buffer.processWord(word);
1673 }
1674 public void processDocument(String document) {
1675 lastDocument = document;
1676 buffer.processDocument(document);
1677 }
1678 public final void processTuple(int length, int count) throws IOException {
1679 if (lastFlush) {
1680 if(buffer.words.size() == 0) buffer.processWord(lastWord);
1681 if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
1682 lastFlush = false;
1683 }
1684 buffer.processTuple(length, count);
1685 if (buffer.isFull())
1686 flush();
1687 }
1688 public final void flushTuples(int pauseIndex) throws IOException {
1689
1690 while (buffer.getReadIndex() < pauseIndex) {
1691
1692 output.writeInt(buffer.getLength());
1693 output.writeInt(buffer.getCount());
1694 buffer.incrementTuple();
1695 }
1696 }
1697 public final void flushWord(int pauseIndex) throws IOException {
1698 while (buffer.getReadIndex() < pauseIndex) {
1699 int nextPause = buffer.getWordEndIndex();
1700 int count = nextPause - buffer.getReadIndex();
1701
1702 output.writeString(buffer.getWord());
1703 output.writeInt(count);
1704 buffer.incrementWord();
1705
1706 flushDocument(nextPause);
1707 assert nextPause == buffer.getReadIndex();
1708 }
1709 }
1710 public final void flushDocument(int pauseIndex) throws IOException {
1711 while (buffer.getReadIndex() < pauseIndex) {
1712 int nextPause = buffer.getDocumentEndIndex();
1713 int count = nextPause - buffer.getReadIndex();
1714
1715 output.writeString(buffer.getDocument());
1716 output.writeInt(count);
1717 buffer.incrementDocument();
1718
1719 flushTuples(nextPause);
1720 assert nextPause == buffer.getReadIndex();
1721 }
1722 }
1723 public void flush() throws IOException {
1724 flushWord(buffer.getWriteIndex());
1725 buffer.reset();
1726 lastFlush = true;
1727 }
1728 }
1729 public static class ShreddedBuffer {
1730 ArrayList<String> words = new ArrayList();
1731 ArrayList<String> documents = new ArrayList();
1732 ArrayList<Integer> wordTupleIdx = new ArrayList();
1733 ArrayList<Integer> documentTupleIdx = new ArrayList();
1734 int wordReadIdx = 0;
1735 int documentReadIdx = 0;
1736
1737 int[] lengths;
1738 int[] counts;
1739 int writeTupleIndex = 0;
1740 int readTupleIndex = 0;
1741 int batchSize;
1742
1743 public ShreddedBuffer(int batchSize) {
1744 this.batchSize = batchSize;
1745
1746 lengths = new int[batchSize];
1747 counts = new int[batchSize];
1748 }
1749
1750 public ShreddedBuffer() {
1751 this(10000);
1752 }
1753
1754 public void processWord(String word) {
1755 words.add(word);
1756 wordTupleIdx.add(writeTupleIndex);
1757 }
1758 public void processDocument(String document) {
1759 documents.add(document);
1760 documentTupleIdx.add(writeTupleIndex);
1761 }
1762 public void processTuple(int length, int count) {
1763 assert words.size() > 0;
1764 assert documents.size() > 0;
1765 lengths[writeTupleIndex] = length;
1766 counts[writeTupleIndex] = count;
1767 writeTupleIndex++;
1768 }
1769 public void resetData() {
1770 words.clear();
1771 documents.clear();
1772 wordTupleIdx.clear();
1773 documentTupleIdx.clear();
1774 writeTupleIndex = 0;
1775 }
1776
1777 public void resetRead() {
1778 readTupleIndex = 0;
1779 wordReadIdx = 0;
1780 documentReadIdx = 0;
1781 }
1782
1783 public void reset() {
1784 resetData();
1785 resetRead();
1786 }
1787 public boolean isFull() {
1788 return writeTupleIndex >= batchSize;
1789 }
1790
1791 public boolean isEmpty() {
1792 return writeTupleIndex == 0;
1793 }
1794
1795 public boolean isAtEnd() {
1796 return readTupleIndex >= writeTupleIndex;
1797 }
1798 public void incrementWord() {
1799 wordReadIdx++;
1800 }
1801
1802 public void autoIncrementWord() {
1803 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
1804 wordReadIdx++;
1805 }
1806 public void incrementDocument() {
1807 documentReadIdx++;
1808 }
1809
1810 public void autoIncrementDocument() {
1811 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
1812 documentReadIdx++;
1813 }
1814 public void incrementTuple() {
1815 readTupleIndex++;
1816 }
1817 public int getWordEndIndex() {
1818 if ((wordReadIdx+1) >= wordTupleIdx.size())
1819 return writeTupleIndex;
1820 return wordTupleIdx.get(wordReadIdx+1);
1821 }
1822
1823 public int getDocumentEndIndex() {
1824 if ((documentReadIdx+1) >= documentTupleIdx.size())
1825 return writeTupleIndex;
1826 return documentTupleIdx.get(documentReadIdx+1);
1827 }
1828 public int getReadIndex() {
1829 return readTupleIndex;
1830 }
1831
1832 public int getWriteIndex() {
1833 return writeTupleIndex;
1834 }
1835 public String getWord() {
1836 assert readTupleIndex < writeTupleIndex;
1837 assert wordReadIdx < words.size();
1838
1839 return words.get(wordReadIdx);
1840 }
1841 public String getDocument() {
1842 assert readTupleIndex < writeTupleIndex;
1843 assert documentReadIdx < documents.size();
1844
1845 return documents.get(documentReadIdx);
1846 }
1847 public int getLength() {
1848 assert readTupleIndex < writeTupleIndex;
1849 return lengths[readTupleIndex];
1850 }
1851 public int getCount() {
1852 assert readTupleIndex < writeTupleIndex;
1853 return counts[readTupleIndex];
1854 }
1855 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1856 while (getReadIndex() < endIndex) {
1857 output.processTuple(getLength(), getCount());
1858 incrementTuple();
1859 }
1860 }
1861 public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
1862 while (getReadIndex() < endIndex) {
1863 output.processWord(getWord());
1864 assert getWordEndIndex() <= endIndex;
1865 copyUntilIndexDocument(getWordEndIndex(), output);
1866 incrementWord();
1867 }
1868 }
1869 public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
1870 while (getReadIndex() < endIndex) {
1871 output.processDocument(getDocument());
1872 assert getDocumentEndIndex() <= endIndex;
1873 copyTuples(getDocumentEndIndex(), output);
1874 incrementDocument();
1875 }
1876 }
1877 public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1878 while (!isAtEnd()) {
1879 if (other != null) {
1880 assert !other.isAtEnd();
1881 int c = + Utility.compare(getWord(), other.getWord());
1882
1883 if (c > 0) {
1884 break;
1885 }
1886
1887 output.processWord(getWord());
1888
1889 if (c < 0) {
1890 copyUntilIndexDocument(getWordEndIndex(), output);
1891 } else if (c == 0) {
1892 copyUntilDocument(other, output);
1893 autoIncrementWord();
1894 break;
1895 }
1896 } else {
1897 output.processWord(getWord());
1898 copyUntilIndexDocument(getWordEndIndex(), output);
1899 }
1900 incrementWord();
1901
1902
1903 }
1904 }
1905 public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1906 while (!isAtEnd()) {
1907 if (other != null) {
1908 assert !other.isAtEnd();
1909 int c = + Utility.compare(getDocument(), other.getDocument());
1910
1911 if (c > 0) {
1912 break;
1913 }
1914
1915 output.processDocument(getDocument());
1916
1917 copyTuples(getDocumentEndIndex(), output);
1918 } else {
1919 output.processDocument(getDocument());
1920 copyTuples(getDocumentEndIndex(), output);
1921 }
1922 incrementDocument();
1923
1924 if (getWordEndIndex() <= readTupleIndex)
1925 break;
1926 }
1927 }
1928 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1929 copyUntilWord(other, output);
1930 }
1931
1932 }
1933 public static class ShreddedCombiner implements ReaderSource<DocumentLengthWordCount>, ShreddedSource {
1934 public ShreddedProcessor processor;
1935 Collection<ShreddedReader> readers;
1936 boolean closeOnExit = false;
1937 boolean uninitialized = true;
1938 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1939
1940 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1941 this.readers = readers;
1942 this.closeOnExit = closeOnExit;
1943 }
1944
1945 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1946 if (processor instanceof ShreddedProcessor) {
1947 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1948 } else if (processor instanceof DocumentLengthWordCount.Processor) {
1949 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
1950 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1951 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
1952 } else {
1953 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1954 }
1955 }
1956
1957 public Class<DocumentLengthWordCount> getOutputClass() {
1958 return DocumentLengthWordCount.class;
1959 }
1960
1961 public void initialize() throws IOException {
1962 for (ShreddedReader reader : readers) {
1963 reader.fill();
1964
1965 if (!reader.getBuffer().isAtEnd())
1966 queue.add(reader);
1967 }
1968
1969 uninitialized = false;
1970 }
1971
1972 public void run() throws IOException {
1973 initialize();
1974
1975 while (queue.size() > 0) {
1976 ShreddedReader top = queue.poll();
1977 ShreddedReader next = null;
1978 ShreddedBuffer nextBuffer = null;
1979
1980 assert !top.getBuffer().isAtEnd();
1981
1982 if (queue.size() > 0) {
1983 next = queue.peek();
1984 nextBuffer = next.getBuffer();
1985 assert !nextBuffer.isAtEnd();
1986 }
1987
1988 top.getBuffer().copyUntil(nextBuffer, processor);
1989 if (top.getBuffer().isAtEnd())
1990 top.fill();
1991
1992 if (!top.getBuffer().isAtEnd())
1993 queue.add(top);
1994 }
1995
1996 if (closeOnExit)
1997 processor.close();
1998 }
1999
2000 public DocumentLengthWordCount read() throws IOException {
2001 if (uninitialized)
2002 initialize();
2003
2004 DocumentLengthWordCount result = null;
2005
2006 while (queue.size() > 0) {
2007 ShreddedReader top = queue.poll();
2008 result = top.read();
2009
2010 if (result != null) {
2011 if (top.getBuffer().isAtEnd())
2012 top.fill();
2013
2014 queue.offer(top);
2015 break;
2016 }
2017 }
2018
2019 return result;
2020 }
2021 }
2022 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentLengthWordCount>, ShreddedSource {
2023 public ShreddedProcessor processor;
2024 ShreddedBuffer buffer;
2025 DocumentLengthWordCount last = new DocumentLengthWordCount();
2026 long updateWordCount = -1;
2027 long updateDocumentCount = -1;
2028 long tupleCount = 0;
2029 long bufferStartCount = 0;
2030 ArrayInput input;
2031
2032 public ShreddedReader(ArrayInput input) {
2033 this.input = input;
2034 this.buffer = new ShreddedBuffer();
2035 }
2036
2037 public ShreddedReader(ArrayInput input, int bufferSize) {
2038 this.input = input;
2039 this.buffer = new ShreddedBuffer(bufferSize);
2040 }
2041
2042 public final int compareTo(ShreddedReader other) {
2043 ShreddedBuffer otherBuffer = other.getBuffer();
2044
2045 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
2046 return 0;
2047 } else if (buffer.isAtEnd()) {
2048 return -1;
2049 } else if (otherBuffer.isAtEnd()) {
2050 return 1;
2051 }
2052
2053 int result = 0;
2054 do {
2055 result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
2056 if(result != 0) break;
2057 result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
2058 if(result != 0) break;
2059 } while (false);
2060
2061 return result;
2062 }
2063
2064 public final ShreddedBuffer getBuffer() {
2065 return buffer;
2066 }
2067
2068 public final DocumentLengthWordCount read() throws IOException {
2069 if (buffer.isAtEnd()) {
2070 fill();
2071
2072 if (buffer.isAtEnd()) {
2073 return null;
2074 }
2075 }
2076
2077 assert !buffer.isAtEnd();
2078 DocumentLengthWordCount result = new DocumentLengthWordCount();
2079
2080 result.word = buffer.getWord();
2081 result.document = buffer.getDocument();
2082 result.length = buffer.getLength();
2083 result.count = buffer.getCount();
2084
2085 buffer.incrementTuple();
2086 buffer.autoIncrementWord();
2087 buffer.autoIncrementDocument();
2088
2089 return result;
2090 }
2091
2092 public final void fill() throws IOException {
2093 try {
2094 buffer.reset();
2095
2096 if (tupleCount != 0) {
2097
2098 if(updateWordCount - tupleCount > 0) {
2099 buffer.words.add(last.word);
2100 buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
2101 }
2102 if(updateDocumentCount - tupleCount > 0) {
2103 buffer.documents.add(last.document);
2104 buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
2105 }
2106 bufferStartCount = tupleCount;
2107 }
2108
2109 while (!buffer.isFull()) {
2110 updateDocument();
2111 buffer.processTuple(input.readInt(), input.readInt());
2112 tupleCount++;
2113 }
2114 } catch(EOFException e) {}
2115 }
2116
2117 public final void updateWord() throws IOException {
2118 if (updateWordCount > tupleCount)
2119 return;
2120
2121 last.word = input.readString();
2122 updateWordCount = tupleCount + input.readInt();
2123
2124 buffer.processWord(last.word);
2125 }
2126 public final void updateDocument() throws IOException {
2127 if (updateDocumentCount > tupleCount)
2128 return;
2129
2130 updateWord();
2131 last.document = input.readString();
2132 updateDocumentCount = tupleCount + input.readInt();
2133
2134 buffer.processDocument(last.document);
2135 }
2136
2137 public void run() throws IOException {
2138 while (true) {
2139 fill();
2140
2141 if (buffer.isAtEnd())
2142 break;
2143
2144 buffer.copyUntil(null, processor);
2145 }
2146 processor.close();
2147 }
2148
2149 public void setProcessor(Step processor) throws IncompatibleProcessorException {
2150 if (processor instanceof ShreddedProcessor) {
2151 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
2152 } else if (processor instanceof DocumentLengthWordCount.Processor) {
2153 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
2154 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
2155 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
2156 } else {
2157 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
2158 }
2159 }
2160
2161 public Class<DocumentLengthWordCount> getOutputClass() {
2162 return DocumentLengthWordCount.class;
2163 }
2164 }
2165
2166 public static class DuplicateEliminator implements ShreddedProcessor {
2167 public ShreddedProcessor processor;
2168 DocumentLengthWordCount last = new DocumentLengthWordCount();
2169 boolean wordProcess = true;
2170 boolean documentProcess = true;
2171
2172 public DuplicateEliminator() {}
2173 public DuplicateEliminator(ShreddedProcessor processor) {
2174 this.processor = processor;
2175 }
2176
2177 public void setShreddedProcessor(ShreddedProcessor processor) {
2178 this.processor = processor;
2179 }
2180
2181 public void processWord(String word) throws IOException {
2182 if (wordProcess || Utility.compare(word, last.word) != 0) {
2183 last.word = word;
2184 processor.processWord(word);
2185 resetDocument();
2186 wordProcess = false;
2187 }
2188 }
2189 public void processDocument(String document) throws IOException {
2190 if (documentProcess || Utility.compare(document, last.document) != 0) {
2191 last.document = document;
2192 processor.processDocument(document);
2193 documentProcess = false;
2194 }
2195 }
2196
2197 public void resetWord() {
2198 wordProcess = true;
2199 resetDocument();
2200 }
2201 public void resetDocument() {
2202 documentProcess = true;
2203 }
2204
2205 public void processTuple(int length, int count) throws IOException {
2206 processor.processTuple(length, count);
2207 }
2208
2209 public void close() throws IOException {
2210 processor.close();
2211 }
2212 }
2213 public static class TupleUnshredder implements ShreddedProcessor {
2214 DocumentLengthWordCount last = new DocumentLengthWordCount();
2215 public org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor;
2216
2217 public TupleUnshredder(DocumentLengthWordCount.Processor processor) {
2218 this.processor = processor;
2219 }
2220
2221 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor) {
2222 this.processor = processor;
2223 }
2224
2225 public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
2226 DocumentLengthWordCount result = new DocumentLengthWordCount();
2227 if (object == null) return result;
2228 result.document = object.document;
2229 result.length = object.length;
2230 result.word = object.word;
2231 result.count = object.count;
2232 return result;
2233 }
2234
2235 public void processWord(String word) throws IOException {
2236 last.word = word;
2237 }
2238
2239 public void processDocument(String document) throws IOException {
2240 last.document = document;
2241 }
2242
2243
2244 public void processTuple(int length, int count) throws IOException {
2245 last.length = length;
2246 last.count = count;
2247 processor.process(clone(last));
2248 }
2249
2250 public void close() throws IOException {
2251 processor.close();
2252 }
2253 }
2254 public static class TupleShredder implements Processor {
2255 DocumentLengthWordCount last = new DocumentLengthWordCount();
2256 public ShreddedProcessor processor;
2257
2258 public TupleShredder(ShreddedProcessor processor) {
2259 this.processor = processor;
2260 }
2261
2262 public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
2263 DocumentLengthWordCount result = new DocumentLengthWordCount();
2264 if (object == null) return result;
2265 result.document = object.document;
2266 result.length = object.length;
2267 result.word = object.word;
2268 result.count = object.count;
2269 return result;
2270 }
2271
2272 public void process(DocumentLengthWordCount object) throws IOException {
2273 boolean processAll = false;
2274 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
2275 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
2276 processor.processTuple(object.length, object.count);
2277 }
2278
2279 public Class<DocumentLengthWordCount> getInputClass() {
2280 return DocumentLengthWordCount.class;
2281 }
2282
2283 public void close() throws IOException {
2284 processor.close();
2285 }
2286 }
2287 }
2288 public static class DocumentOrder implements Order<DocumentLengthWordCount> {
2289 public int hash(DocumentLengthWordCount object) {
2290 int h = 0;
2291 h += Utility.hash(object.document);
2292 return h;
2293 }
2294 public Comparator<DocumentLengthWordCount> greaterThan() {
2295 return new Comparator<DocumentLengthWordCount>() {
2296 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
2297 int result = 0;
2298 do {
2299 result = + Utility.compare(one.document, two.document);
2300 if(result != 0) break;
2301 } while (false);
2302 return -result;
2303 }
2304 };
2305 }
2306 public Comparator<DocumentLengthWordCount> lessThan() {
2307 return new Comparator<DocumentLengthWordCount>() {
2308 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
2309 int result = 0;
2310 do {
2311 result = + Utility.compare(one.document, two.document);
2312 if(result != 0) break;
2313 } while (false);
2314 return result;
2315 }
2316 };
2317 }
2318 public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input) {
2319 return new ShreddedReader(_input);
2320 }
2321
2322 public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input, int bufferSize) {
2323 return new ShreddedReader(_input, bufferSize);
2324 }
2325 public OrderedWriter<DocumentLengthWordCount> orderedWriter(ArrayOutput _output) {
2326 ShreddedWriter w = new ShreddedWriter(_output);
2327 return new OrderedWriterClass(w);
2328 }
2329 public static class OrderedWriterClass extends OrderedWriter< DocumentLengthWordCount > {
2330 DocumentLengthWordCount last = null;
2331 ShreddedWriter shreddedWriter = null;
2332
2333 public OrderedWriterClass(ShreddedWriter s) {
2334 this.shreddedWriter = s;
2335 }
2336
2337 public void process(DocumentLengthWordCount object) throws IOException {
2338 boolean processAll = false;
2339 if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
2340 shreddedWriter.processTuple(object.length, object.word, object.count);
2341 last = object;
2342 }
2343
2344 public void close() throws IOException {
2345 shreddedWriter.close();
2346 }
2347
2348 public Class<DocumentLengthWordCount> getInputClass() {
2349 return DocumentLengthWordCount.class;
2350 }
2351 }
2352 public ReaderSource<DocumentLengthWordCount> orderedCombiner(Collection<TypeReader<DocumentLengthWordCount>> readers, boolean closeOnExit) {
2353 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
2354
2355 for (TypeReader<DocumentLengthWordCount> reader : readers) {
2356 shreddedReaders.add((ShreddedReader)reader);
2357 }
2358
2359 return new ShreddedCombiner(shreddedReaders, closeOnExit);
2360 }
2361 public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
2362 DocumentLengthWordCount result = new DocumentLengthWordCount();
2363 if (object == null) return result;
2364 result.document = object.document;
2365 result.length = object.length;
2366 result.word = object.word;
2367 result.count = object.count;
2368 return result;
2369 }
2370 public Class<DocumentLengthWordCount> getOrderedClass() {
2371 return DocumentLengthWordCount.class;
2372 }
2373 public String[] getOrderSpec() {
2374 return new String[] {"+document"};
2375 }
2376
2377 public static String getSpecString() {
2378 return "+document";
2379 }
2380
2381 public interface ShreddedProcessor extends Step {
2382 public void processDocument(String document) throws IOException;
2383 public void processTuple(int length, String word, int count) throws IOException;
2384 public void close() throws IOException;
2385 }
2386 public interface ShreddedSource extends Step {
2387 }
2388
2389 public static class ShreddedWriter implements ShreddedProcessor {
2390 ArrayOutput output;
2391 ShreddedBuffer buffer = new ShreddedBuffer();
2392 String lastDocument;
2393 boolean lastFlush = false;
2394
2395 public ShreddedWriter(ArrayOutput output) {
2396 this.output = output;
2397 }
2398
2399 public void close() throws IOException {
2400 flush();
2401 }
2402
2403 public void processDocument(String document) {
2404 lastDocument = document;
2405 buffer.processDocument(document);
2406 }
2407 public final void processTuple(int length, String word, int count) throws IOException {
2408 if (lastFlush) {
2409 if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
2410 lastFlush = false;
2411 }
2412 buffer.processTuple(length, word, count);
2413 if (buffer.isFull())
2414 flush();
2415 }
2416 public final void flushTuples(int pauseIndex) throws IOException {
2417
2418 while (buffer.getReadIndex() < pauseIndex) {
2419
2420 output.writeInt(buffer.getLength());
2421 output.writeString(buffer.getWord());
2422 output.writeInt(buffer.getCount());
2423 buffer.incrementTuple();
2424 }
2425 }
2426 public final void flushDocument(int pauseIndex) throws IOException {
2427 while (buffer.getReadIndex() < pauseIndex) {
2428 int nextPause = buffer.getDocumentEndIndex();
2429 int count = nextPause - buffer.getReadIndex();
2430
2431 output.writeString(buffer.getDocument());
2432 output.writeInt(count);
2433 buffer.incrementDocument();
2434
2435 flushTuples(nextPause);
2436 assert nextPause == buffer.getReadIndex();
2437 }
2438 }
2439 public void flush() throws IOException {
2440 flushDocument(buffer.getWriteIndex());
2441 buffer.reset();
2442 lastFlush = true;
2443 }
2444 }
2445 public static class ShreddedBuffer {
2446 ArrayList<String> documents = new ArrayList();
2447 ArrayList<Integer> documentTupleIdx = new ArrayList();
2448 int documentReadIdx = 0;
2449
2450 int[] lengths;
2451 String[] words;
2452 int[] counts;
2453 int writeTupleIndex = 0;
2454 int readTupleIndex = 0;
2455 int batchSize;
2456
2457 public ShreddedBuffer(int batchSize) {
2458 this.batchSize = batchSize;
2459
2460 lengths = new int[batchSize];
2461 words = new String[batchSize];
2462 counts = new int[batchSize];
2463 }
2464
2465 public ShreddedBuffer() {
2466 this(10000);
2467 }
2468
2469 public void processDocument(String document) {
2470 documents.add(document);
2471 documentTupleIdx.add(writeTupleIndex);
2472 }
2473 public void processTuple(int length, String word, int count) {
2474 assert documents.size() > 0;
2475 lengths[writeTupleIndex] = length;
2476 words[writeTupleIndex] = word;
2477 counts[writeTupleIndex] = count;
2478 writeTupleIndex++;
2479 }
2480 public void resetData() {
2481 documents.clear();
2482 documentTupleIdx.clear();
2483 writeTupleIndex = 0;
2484 }
2485
2486 public void resetRead() {
2487 readTupleIndex = 0;
2488 documentReadIdx = 0;
2489 }
2490
2491 public void reset() {
2492 resetData();
2493 resetRead();
2494 }
2495 public boolean isFull() {
2496 return writeTupleIndex >= batchSize;
2497 }
2498
2499 public boolean isEmpty() {
2500 return writeTupleIndex == 0;
2501 }
2502
2503 public boolean isAtEnd() {
2504 return readTupleIndex >= writeTupleIndex;
2505 }
2506 public void incrementDocument() {
2507 documentReadIdx++;
2508 }
2509
2510 public void autoIncrementDocument() {
2511 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
2512 documentReadIdx++;
2513 }
2514 public void incrementTuple() {
2515 readTupleIndex++;
2516 }
2517 public int getDocumentEndIndex() {
2518 if ((documentReadIdx+1) >= documentTupleIdx.size())
2519 return writeTupleIndex;
2520 return documentTupleIdx.get(documentReadIdx+1);
2521 }
2522 public int getReadIndex() {
2523 return readTupleIndex;
2524 }
2525
2526 public int getWriteIndex() {
2527 return writeTupleIndex;
2528 }
2529 public String getDocument() {
2530 assert readTupleIndex < writeTupleIndex;
2531 assert documentReadIdx < documents.size();
2532
2533 return documents.get(documentReadIdx);
2534 }
2535 public int getLength() {
2536 assert readTupleIndex < writeTupleIndex;
2537 return lengths[readTupleIndex];
2538 }
2539 public String getWord() {
2540 assert readTupleIndex < writeTupleIndex;
2541 return words[readTupleIndex];
2542 }
2543 public int getCount() {
2544 assert readTupleIndex < writeTupleIndex;
2545 return counts[readTupleIndex];
2546 }
2547 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
2548 while (getReadIndex() < endIndex) {
2549 output.processTuple(getLength(), getWord(), getCount());
2550 incrementTuple();
2551 }
2552 }
2553 public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
2554 while (getReadIndex() < endIndex) {
2555 output.processDocument(getDocument());
2556 assert getDocumentEndIndex() <= endIndex;
2557 copyTuples(getDocumentEndIndex(), output);
2558 incrementDocument();
2559 }
2560 }
2561 public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
2562 while (!isAtEnd()) {
2563 if (other != null) {
2564 assert !other.isAtEnd();
2565 int c = + Utility.compare(getDocument(), other.getDocument());
2566
2567 if (c > 0) {
2568 break;
2569 }
2570
2571 output.processDocument(getDocument());
2572
2573 copyTuples(getDocumentEndIndex(), output);
2574 } else {
2575 output.processDocument(getDocument());
2576 copyTuples(getDocumentEndIndex(), output);
2577 }
2578 incrementDocument();
2579
2580
2581 }
2582 }
2583 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
2584 copyUntilDocument(other, output);
2585 }
2586
2587 }
2588 public static class ShreddedCombiner implements ReaderSource<DocumentLengthWordCount>, ShreddedSource {
2589 public ShreddedProcessor processor;
2590 Collection<ShreddedReader> readers;
2591 boolean closeOnExit = false;
2592 boolean uninitialized = true;
2593 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
2594
2595 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
2596 this.readers = readers;
2597 this.closeOnExit = closeOnExit;
2598 }
2599
2600 public void setProcessor(Step processor) throws IncompatibleProcessorException {
2601 if (processor instanceof ShreddedProcessor) {
2602 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
2603 } else if (processor instanceof DocumentLengthWordCount.Processor) {
2604 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
2605 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
2606 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
2607 } else {
2608 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
2609 }
2610 }
2611
2612 public Class<DocumentLengthWordCount> getOutputClass() {
2613 return DocumentLengthWordCount.class;
2614 }
2615
2616 public void initialize() throws IOException {
2617 for (ShreddedReader reader : readers) {
2618 reader.fill();
2619
2620 if (!reader.getBuffer().isAtEnd())
2621 queue.add(reader);
2622 }
2623
2624 uninitialized = false;
2625 }
2626
2627 public void run() throws IOException {
2628 initialize();
2629
2630 while (queue.size() > 0) {
2631 ShreddedReader top = queue.poll();
2632 ShreddedReader next = null;
2633 ShreddedBuffer nextBuffer = null;
2634
2635 assert !top.getBuffer().isAtEnd();
2636
2637 if (queue.size() > 0) {
2638 next = queue.peek();
2639 nextBuffer = next.getBuffer();
2640 assert !nextBuffer.isAtEnd();
2641 }
2642
2643 top.getBuffer().copyUntil(nextBuffer, processor);
2644 if (top.getBuffer().isAtEnd())
2645 top.fill();
2646
2647 if (!top.getBuffer().isAtEnd())
2648 queue.add(top);
2649 }
2650
2651 if (closeOnExit)
2652 processor.close();
2653 }
2654
2655 public DocumentLengthWordCount read() throws IOException {
2656 if (uninitialized)
2657 initialize();
2658
2659 DocumentLengthWordCount result = null;
2660
2661 while (queue.size() > 0) {
2662 ShreddedReader top = queue.poll();
2663 result = top.read();
2664
2665 if (result != null) {
2666 if (top.getBuffer().isAtEnd())
2667 top.fill();
2668
2669 queue.offer(top);
2670 break;
2671 }
2672 }
2673
2674 return result;
2675 }
2676 }
2677 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentLengthWordCount>, ShreddedSource {
2678 public ShreddedProcessor processor;
2679 ShreddedBuffer buffer;
2680 DocumentLengthWordCount last = new DocumentLengthWordCount();
2681 long updateDocumentCount = -1;
2682 long tupleCount = 0;
2683 long bufferStartCount = 0;
2684 ArrayInput input;
2685
2686 public ShreddedReader(ArrayInput input) {
2687 this.input = input;
2688 this.buffer = new ShreddedBuffer();
2689 }
2690
2691 public ShreddedReader(ArrayInput input, int bufferSize) {
2692 this.input = input;
2693 this.buffer = new ShreddedBuffer(bufferSize);
2694 }
2695
2696 public final int compareTo(ShreddedReader other) {
2697 ShreddedBuffer otherBuffer = other.getBuffer();
2698
2699 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
2700 return 0;
2701 } else if (buffer.isAtEnd()) {
2702 return -1;
2703 } else if (otherBuffer.isAtEnd()) {
2704 return 1;
2705 }
2706
2707 int result = 0;
2708 do {
2709 result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
2710 if(result != 0) break;
2711 } while (false);
2712
2713 return result;
2714 }
2715
2716 public final ShreddedBuffer getBuffer() {
2717 return buffer;
2718 }
2719
2720 public final DocumentLengthWordCount read() throws IOException {
2721 if (buffer.isAtEnd()) {
2722 fill();
2723
2724 if (buffer.isAtEnd()) {
2725 return null;
2726 }
2727 }
2728
2729 assert !buffer.isAtEnd();
2730 DocumentLengthWordCount result = new DocumentLengthWordCount();
2731
2732 result.document = buffer.getDocument();
2733 result.length = buffer.getLength();
2734 result.word = buffer.getWord();
2735 result.count = buffer.getCount();
2736
2737 buffer.incrementTuple();
2738 buffer.autoIncrementDocument();
2739
2740 return result;
2741 }
2742
2743 public final void fill() throws IOException {
2744 try {
2745 buffer.reset();
2746
2747 if (tupleCount != 0) {
2748
2749 if(updateDocumentCount - tupleCount > 0) {
2750 buffer.documents.add(last.document);
2751 buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
2752 }
2753 bufferStartCount = tupleCount;
2754 }
2755
2756 while (!buffer.isFull()) {
2757 updateDocument();
2758 buffer.processTuple(input.readInt(), input.readString(), input.readInt());
2759 tupleCount++;
2760 }
2761 } catch(EOFException e) {}
2762 }
2763
2764 public final void updateDocument() throws IOException {
2765 if (updateDocumentCount > tupleCount)
2766 return;
2767
2768 last.document = input.readString();
2769 updateDocumentCount = tupleCount + input.readInt();
2770
2771 buffer.processDocument(last.document);
2772 }
2773
2774 public void run() throws IOException {
2775 while (true) {
2776 fill();
2777
2778 if (buffer.isAtEnd())
2779 break;
2780
2781 buffer.copyUntil(null, processor);
2782 }
2783 processor.close();
2784 }
2785
2786 public void setProcessor(Step processor) throws IncompatibleProcessorException {
2787 if (processor instanceof ShreddedProcessor) {
2788 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
2789 } else if (processor instanceof DocumentLengthWordCount.Processor) {
2790 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
2791 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
2792 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
2793 } else {
2794 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
2795 }
2796 }
2797
2798 public Class<DocumentLengthWordCount> getOutputClass() {
2799 return DocumentLengthWordCount.class;
2800 }
2801 }
2802
2803 public static class DuplicateEliminator implements ShreddedProcessor {
2804 public ShreddedProcessor processor;
2805 DocumentLengthWordCount last = new DocumentLengthWordCount();
2806 boolean documentProcess = true;
2807
2808 public DuplicateEliminator() {}
2809 public DuplicateEliminator(ShreddedProcessor processor) {
2810 this.processor = processor;
2811 }
2812
2813 public void setShreddedProcessor(ShreddedProcessor processor) {
2814 this.processor = processor;
2815 }
2816
2817 public void processDocument(String document) throws IOException {
2818 if (documentProcess || Utility.compare(document, last.document) != 0) {
2819 last.document = document;
2820 processor.processDocument(document);
2821 documentProcess = false;
2822 }
2823 }
2824
2825 public void resetDocument() {
2826 documentProcess = true;
2827 }
2828
2829 public void processTuple(int length, String word, int count) throws IOException {
2830 processor.processTuple(length, word, count);
2831 }
2832
2833 public void close() throws IOException {
2834 processor.close();
2835 }
2836 }
2837 public static class TupleUnshredder implements ShreddedProcessor {
2838 DocumentLengthWordCount last = new DocumentLengthWordCount();
2839 public org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor;
2840
2841 public TupleUnshredder(DocumentLengthWordCount.Processor processor) {
2842 this.processor = processor;
2843 }
2844
2845 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor) {
2846 this.processor = processor;
2847 }
2848
2849 public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
2850 DocumentLengthWordCount result = new DocumentLengthWordCount();
2851 if (object == null) return result;
2852 result.document = object.document;
2853 result.length = object.length;
2854 result.word = object.word;
2855 result.count = object.count;
2856 return result;
2857 }
2858
2859 public void processDocument(String document) throws IOException {
2860 last.document = document;
2861 }
2862
2863
2864 public void processTuple(int length, String word, int count) throws IOException {
2865 last.length = length;
2866 last.word = word;
2867 last.count = count;
2868 processor.process(clone(last));
2869 }
2870
2871 public void close() throws IOException {
2872 processor.close();
2873 }
2874 }
2875 public static class TupleShredder implements Processor {
2876 DocumentLengthWordCount last = new DocumentLengthWordCount();
2877 public ShreddedProcessor processor;
2878
2879 public TupleShredder(ShreddedProcessor processor) {
2880 this.processor = processor;
2881 }
2882
2883 public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
2884 DocumentLengthWordCount result = new DocumentLengthWordCount();
2885 if (object == null) return result;
2886 result.document = object.document;
2887 result.length = object.length;
2888 result.word = object.word;
2889 result.count = object.count;
2890 return result;
2891 }
2892
2893 public void process(DocumentLengthWordCount object) throws IOException {
2894 boolean processAll = false;
2895 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
2896 processor.processTuple(object.length, object.word, object.count);
2897 }
2898
2899 public Class<DocumentLengthWordCount> getInputClass() {
2900 return DocumentLengthWordCount.class;
2901 }
2902
2903 public void close() throws IOException {
2904 processor.close();
2905 }
2906 }
2907 }
2908 }