1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class DocumentNumberWordInteger implements Type<DocumentNumberWordInteger> {
25 public byte[] word;
26 public int document;
27 public int value;
28
29 public DocumentNumberWordInteger() {}
30 public DocumentNumberWordInteger(byte[] word, int document, int value) {
31 this.word = word;
32 this.document = document;
33 this.value = value;
34 }
35
36 public String toString() {
37 try {
38 return String.format("%s,%d,%d",
39 new String(word, "UTF-8"), document, value);
40 } catch(UnsupportedEncodingException e) {
41 throw new RuntimeException("Couldn't convert string to UTF-8.");
42 }
43 }
44
45 public Order<DocumentNumberWordInteger> getOrder(String... spec) {
46 if (Arrays.equals(spec, new String[] { })) {
47 return new Unordered();
48 }
49 if (Arrays.equals(spec, new String[] { "+word", "+document" })) {
50 return new WordDocumentOrder();
51 }
52 if (Arrays.equals(spec, new String[] { "+document" })) {
53 return new DocumentOrder();
54 }
55 if (Arrays.equals(spec, new String[] { "+value" })) {
56 return new ValueOrder();
57 }
58 return null;
59 }
60
61 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> {
62 public void process(DocumentNumberWordInteger object) throws IOException;
63 public void close() throws IOException;
64 }
65 public interface Source extends Step {
66 }
67 public static class Unordered implements Order<DocumentNumberWordInteger> {
68 public int hash(DocumentNumberWordInteger object) {
69 int h = 0;
70 return h;
71 }
72 public Comparator<DocumentNumberWordInteger> greaterThan() {
73 return new Comparator<DocumentNumberWordInteger>() {
74 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
75 int result = 0;
76 do {
77 } while (false);
78 return -result;
79 }
80 };
81 }
82 public Comparator<DocumentNumberWordInteger> lessThan() {
83 return new Comparator<DocumentNumberWordInteger>() {
84 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
85 int result = 0;
86 do {
87 } while (false);
88 return result;
89 }
90 };
91 }
92 public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input) {
93 return new ShreddedReader(_input);
94 }
95
96 public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input, int bufferSize) {
97 return new ShreddedReader(_input, bufferSize);
98 }
99 public OrderedWriter<DocumentNumberWordInteger> orderedWriter(ArrayOutput _output) {
100 ShreddedWriter w = new ShreddedWriter(_output);
101 return new OrderedWriterClass(w);
102 }
103 public static class OrderedWriterClass extends OrderedWriter< DocumentNumberWordInteger > {
104 DocumentNumberWordInteger last = null;
105 ShreddedWriter shreddedWriter = null;
106
107 public OrderedWriterClass(ShreddedWriter s) {
108 this.shreddedWriter = s;
109 }
110
111 public void process(DocumentNumberWordInteger object) throws IOException {
112 boolean processAll = false;
113 shreddedWriter.processTuple(object.word, object.document, object.value);
114 last = object;
115 }
116
117 public void close() throws IOException {
118 shreddedWriter.close();
119 }
120
121 public Class<DocumentNumberWordInteger> getInputClass() {
122 return DocumentNumberWordInteger.class;
123 }
124 }
125 public ReaderSource<DocumentNumberWordInteger> orderedCombiner(Collection<TypeReader<DocumentNumberWordInteger>> readers, boolean closeOnExit) {
126 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
127
128 for (TypeReader<DocumentNumberWordInteger> reader : readers) {
129 shreddedReaders.add((ShreddedReader)reader);
130 }
131
132 return new ShreddedCombiner(shreddedReaders, closeOnExit);
133 }
134 public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
135 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
136 if (object == null) return result;
137 result.word = object.word;
138 result.document = object.document;
139 result.value = object.value;
140 return result;
141 }
142 public Class<DocumentNumberWordInteger> getOrderedClass() {
143 return DocumentNumberWordInteger.class;
144 }
145 public String[] getOrderSpec() {
146 return new String[] {};
147 }
148
149 public static String getSpecString() {
150 return "";
151 }
152
153 public interface ShreddedProcessor extends Step {
154 public void processTuple(byte[] word, int document, int value) throws IOException;
155 public void close() throws IOException;
156 }
157 public interface ShreddedSource extends Step {
158 }
159
160 public static class ShreddedWriter implements ShreddedProcessor {
161 ArrayOutput output;
162 ShreddedBuffer buffer = new ShreddedBuffer();
163 boolean lastFlush = false;
164
165 public ShreddedWriter(ArrayOutput output) {
166 this.output = output;
167 }
168
169 public void close() throws IOException {
170 flush();
171 }
172
173 public final void processTuple(byte[] word, int document, int value) throws IOException {
174 if (lastFlush) {
175 lastFlush = false;
176 }
177 buffer.processTuple(word, document, value);
178 if (buffer.isFull())
179 flush();
180 }
181 public final void flushTuples(int pauseIndex) throws IOException {
182
183 while (buffer.getReadIndex() < pauseIndex) {
184
185 output.writeBytes(buffer.getWord());
186 output.writeInt(buffer.getDocument());
187 output.writeInt(buffer.getValue());
188 buffer.incrementTuple();
189 }
190 }
191 public void flush() throws IOException {
192 flushTuples(buffer.getWriteIndex());
193 buffer.reset();
194 lastFlush = true;
195 }
196 }
197 public static class ShreddedBuffer {
198
199 byte[][] words;
200 int[] documents;
201 int[] values;
202 int writeTupleIndex = 0;
203 int readTupleIndex = 0;
204 int batchSize;
205
206 public ShreddedBuffer(int batchSize) {
207 this.batchSize = batchSize;
208
209 words = new byte[batchSize][];
210 documents = new int[batchSize];
211 values = new int[batchSize];
212 }
213
214 public ShreddedBuffer() {
215 this(10000);
216 }
217
218 public void processTuple(byte[] word, int document, int value) {
219 words[writeTupleIndex] = word;
220 documents[writeTupleIndex] = document;
221 values[writeTupleIndex] = value;
222 writeTupleIndex++;
223 }
224 public void resetData() {
225 writeTupleIndex = 0;
226 }
227
228 public void resetRead() {
229 readTupleIndex = 0;
230 }
231
232 public void reset() {
233 resetData();
234 resetRead();
235 }
236 public boolean isFull() {
237 return writeTupleIndex >= batchSize;
238 }
239
240 public boolean isEmpty() {
241 return writeTupleIndex == 0;
242 }
243
244 public boolean isAtEnd() {
245 return readTupleIndex >= writeTupleIndex;
246 }
247 public void incrementTuple() {
248 readTupleIndex++;
249 }
250 public int getReadIndex() {
251 return readTupleIndex;
252 }
253
254 public int getWriteIndex() {
255 return writeTupleIndex;
256 }
257 public byte[] getWord() {
258 assert readTupleIndex < writeTupleIndex;
259 return words[readTupleIndex];
260 }
261 public int getDocument() {
262 assert readTupleIndex < writeTupleIndex;
263 return documents[readTupleIndex];
264 }
265 public int getValue() {
266 assert readTupleIndex < writeTupleIndex;
267 return values[readTupleIndex];
268 }
269 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
270 while (getReadIndex() < endIndex) {
271 output.processTuple(getWord(), getDocument(), getValue());
272 incrementTuple();
273 }
274 }
275
276 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
277 }
278
279 }
280 public static class ShreddedCombiner implements ReaderSource<DocumentNumberWordInteger>, ShreddedSource {
281 public ShreddedProcessor processor;
282 Collection<ShreddedReader> readers;
283 boolean closeOnExit = false;
284 boolean uninitialized = true;
285 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
286
287 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
288 this.readers = readers;
289 this.closeOnExit = closeOnExit;
290 }
291
292 public void setProcessor(Step processor) throws IncompatibleProcessorException {
293 if (processor instanceof ShreddedProcessor) {
294 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
295 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
296 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
297 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
298 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
299 } else {
300 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
301 }
302 }
303
304 public Class<DocumentNumberWordInteger> getOutputClass() {
305 return DocumentNumberWordInteger.class;
306 }
307
308 public void initialize() throws IOException {
309 for (ShreddedReader reader : readers) {
310 reader.fill();
311
312 if (!reader.getBuffer().isAtEnd())
313 queue.add(reader);
314 }
315
316 uninitialized = false;
317 }
318
319 public void run() throws IOException {
320 initialize();
321
322 while (queue.size() > 0) {
323 ShreddedReader top = queue.poll();
324 ShreddedReader next = null;
325 ShreddedBuffer nextBuffer = null;
326
327 assert !top.getBuffer().isAtEnd();
328
329 if (queue.size() > 0) {
330 next = queue.peek();
331 nextBuffer = next.getBuffer();
332 assert !nextBuffer.isAtEnd();
333 }
334
335 top.getBuffer().copyUntil(nextBuffer, processor);
336 if (top.getBuffer().isAtEnd())
337 top.fill();
338
339 if (!top.getBuffer().isAtEnd())
340 queue.add(top);
341 }
342
343 if (closeOnExit)
344 processor.close();
345 }
346
347 public DocumentNumberWordInteger read() throws IOException {
348 if (uninitialized)
349 initialize();
350
351 DocumentNumberWordInteger result = null;
352
353 while (queue.size() > 0) {
354 ShreddedReader top = queue.poll();
355 result = top.read();
356
357 if (result != null) {
358 if (top.getBuffer().isAtEnd())
359 top.fill();
360
361 queue.offer(top);
362 break;
363 }
364 }
365
366 return result;
367 }
368 }
369 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentNumberWordInteger>, ShreddedSource {
370 public ShreddedProcessor processor;
371 ShreddedBuffer buffer;
372 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
373 long tupleCount = 0;
374 long bufferStartCount = 0;
375 ArrayInput input;
376
377 public ShreddedReader(ArrayInput input) {
378 this.input = input;
379 this.buffer = new ShreddedBuffer();
380 }
381
382 public ShreddedReader(ArrayInput input, int bufferSize) {
383 this.input = input;
384 this.buffer = new ShreddedBuffer(bufferSize);
385 }
386
387 public final int compareTo(ShreddedReader other) {
388 ShreddedBuffer otherBuffer = other.getBuffer();
389
390 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
391 return 0;
392 } else if (buffer.isAtEnd()) {
393 return -1;
394 } else if (otherBuffer.isAtEnd()) {
395 return 1;
396 }
397
398 int result = 0;
399 do {
400 } while (false);
401
402 return result;
403 }
404
405 public final ShreddedBuffer getBuffer() {
406 return buffer;
407 }
408
409 public final DocumentNumberWordInteger read() throws IOException {
410 if (buffer.isAtEnd()) {
411 fill();
412
413 if (buffer.isAtEnd()) {
414 return null;
415 }
416 }
417
418 assert !buffer.isAtEnd();
419 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
420
421 result.word = buffer.getWord();
422 result.document = buffer.getDocument();
423 result.value = buffer.getValue();
424
425 buffer.incrementTuple();
426
427 return result;
428 }
429
430 public final void fill() throws IOException {
431 try {
432 buffer.reset();
433
434 if (tupleCount != 0) {
435 bufferStartCount = tupleCount;
436 }
437
438 while (!buffer.isFull()) {
439 buffer.processTuple(input.readBytes(), input.readInt(), input.readInt());
440 tupleCount++;
441 }
442 } catch(EOFException e) {}
443 }
444
445
446 public void run() throws IOException {
447 while (true) {
448 fill();
449
450 if (buffer.isAtEnd())
451 break;
452
453 buffer.copyUntil(null, processor);
454 }
455 processor.close();
456 }
457
458 public void setProcessor(Step processor) throws IncompatibleProcessorException {
459 if (processor instanceof ShreddedProcessor) {
460 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
461 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
462 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
463 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
464 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
465 } else {
466 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
467 }
468 }
469
470 public Class<DocumentNumberWordInteger> getOutputClass() {
471 return DocumentNumberWordInteger.class;
472 }
473 }
474
475 public static class DuplicateEliminator implements ShreddedProcessor {
476 public ShreddedProcessor processor;
477 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
478
479 public DuplicateEliminator() {}
480 public DuplicateEliminator(ShreddedProcessor processor) {
481 this.processor = processor;
482 }
483
484 public void setShreddedProcessor(ShreddedProcessor processor) {
485 this.processor = processor;
486 }
487
488
489
490
491 public void processTuple(byte[] word, int document, int value) throws IOException {
492 processor.processTuple(word, document, value);
493 }
494
495 public void close() throws IOException {
496 processor.close();
497 }
498 }
499 public static class TupleUnshredder implements ShreddedProcessor {
500 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
501 public org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor;
502
503 public TupleUnshredder(DocumentNumberWordInteger.Processor processor) {
504 this.processor = processor;
505 }
506
507 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor) {
508 this.processor = processor;
509 }
510
511 public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
512 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
513 if (object == null) return result;
514 result.word = object.word;
515 result.document = object.document;
516 result.value = object.value;
517 return result;
518 }
519
520
521 public void processTuple(byte[] word, int document, int value) throws IOException {
522 last.word = word;
523 last.document = document;
524 last.value = value;
525 processor.process(clone(last));
526 }
527
528 public void close() throws IOException {
529 processor.close();
530 }
531 }
532 public static class TupleShredder implements Processor {
533 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
534 public ShreddedProcessor processor;
535
536 public TupleShredder(ShreddedProcessor processor) {
537 this.processor = processor;
538 }
539
540 public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
541 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
542 if (object == null) return result;
543 result.word = object.word;
544 result.document = object.document;
545 result.value = object.value;
546 return result;
547 }
548
549 public void process(DocumentNumberWordInteger object) throws IOException {
550 boolean processAll = false;
551 processor.processTuple(object.word, object.document, object.value);
552 }
553
554 public Class<DocumentNumberWordInteger> getInputClass() {
555 return DocumentNumberWordInteger.class;
556 }
557
558 public void close() throws IOException {
559 processor.close();
560 }
561 }
562 }
563 public static class WordDocumentOrder implements Order<DocumentNumberWordInteger> {
564 public int hash(DocumentNumberWordInteger object) {
565 int h = 0;
566 h += Utility.hash(object.word);
567 h += Utility.hash(object.document);
568 return h;
569 }
570 public Comparator<DocumentNumberWordInteger> greaterThan() {
571 return new Comparator<DocumentNumberWordInteger>() {
572 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
573 int result = 0;
574 do {
575 result = + Utility.compare(one.word, two.word);
576 if(result != 0) break;
577 result = + Utility.compare(one.document, two.document);
578 if(result != 0) break;
579 } while (false);
580 return -result;
581 }
582 };
583 }
584 public Comparator<DocumentNumberWordInteger> lessThan() {
585 return new Comparator<DocumentNumberWordInteger>() {
586 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
587 int result = 0;
588 do {
589 result = + Utility.compare(one.word, two.word);
590 if(result != 0) break;
591 result = + Utility.compare(one.document, two.document);
592 if(result != 0) break;
593 } while (false);
594 return result;
595 }
596 };
597 }
598 public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input) {
599 return new ShreddedReader(_input);
600 }
601
602 public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input, int bufferSize) {
603 return new ShreddedReader(_input, bufferSize);
604 }
605 public OrderedWriter<DocumentNumberWordInteger> orderedWriter(ArrayOutput _output) {
606 ShreddedWriter w = new ShreddedWriter(_output);
607 return new OrderedWriterClass(w);
608 }
609 public static class OrderedWriterClass extends OrderedWriter< DocumentNumberWordInteger > {
610 DocumentNumberWordInteger last = null;
611 ShreddedWriter shreddedWriter = null;
612
613 public OrderedWriterClass(ShreddedWriter s) {
614 this.shreddedWriter = s;
615 }
616
617 public void process(DocumentNumberWordInteger object) throws IOException {
618 boolean processAll = false;
619 if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
620 if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
621 shreddedWriter.processTuple(object.value);
622 last = object;
623 }
624
625 public void close() throws IOException {
626 shreddedWriter.close();
627 }
628
629 public Class<DocumentNumberWordInteger> getInputClass() {
630 return DocumentNumberWordInteger.class;
631 }
632 }
633 public ReaderSource<DocumentNumberWordInteger> orderedCombiner(Collection<TypeReader<DocumentNumberWordInteger>> readers, boolean closeOnExit) {
634 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
635
636 for (TypeReader<DocumentNumberWordInteger> reader : readers) {
637 shreddedReaders.add((ShreddedReader)reader);
638 }
639
640 return new ShreddedCombiner(shreddedReaders, closeOnExit);
641 }
642 public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
643 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
644 if (object == null) return result;
645 result.word = object.word;
646 result.document = object.document;
647 result.value = object.value;
648 return result;
649 }
650 public Class<DocumentNumberWordInteger> getOrderedClass() {
651 return DocumentNumberWordInteger.class;
652 }
653 public String[] getOrderSpec() {
654 return new String[] {"+word", "+document"};
655 }
656
657 public static String getSpecString() {
658 return "+word +document";
659 }
660
661 public interface ShreddedProcessor extends Step {
662 public void processWord(byte[] word) throws IOException;
663 public void processDocument(int document) throws IOException;
664 public void processTuple(int value) throws IOException;
665 public void close() throws IOException;
666 }
667 public interface ShreddedSource extends Step {
668 }
669
670 public static class ShreddedWriter implements ShreddedProcessor {
671 ArrayOutput output;
672 ShreddedBuffer buffer = new ShreddedBuffer();
673 byte[] lastWord;
674 int lastDocument;
675 boolean lastFlush = false;
676
677 public ShreddedWriter(ArrayOutput output) {
678 this.output = output;
679 }
680
681 public void close() throws IOException {
682 flush();
683 }
684
685 public void processWord(byte[] word) {
686 lastWord = word;
687 buffer.processWord(word);
688 }
689 public void processDocument(int document) {
690 lastDocument = document;
691 buffer.processDocument(document);
692 }
693 public final void processTuple(int value) throws IOException {
694 if (lastFlush) {
695 if(buffer.words.size() == 0) buffer.processWord(lastWord);
696 if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
697 lastFlush = false;
698 }
699 buffer.processTuple(value);
700 if (buffer.isFull())
701 flush();
702 }
703 public final void flushTuples(int pauseIndex) throws IOException {
704
705 while (buffer.getReadIndex() < pauseIndex) {
706
707 output.writeInt(buffer.getValue());
708 buffer.incrementTuple();
709 }
710 }
711 public final void flushWord(int pauseIndex) throws IOException {
712 while (buffer.getReadIndex() < pauseIndex) {
713 int nextPause = buffer.getWordEndIndex();
714 int count = nextPause - buffer.getReadIndex();
715
716 output.writeBytes(buffer.getWord());
717 output.writeInt(count);
718 buffer.incrementWord();
719
720 flushDocument(nextPause);
721 assert nextPause == buffer.getReadIndex();
722 }
723 }
724 public final void flushDocument(int pauseIndex) throws IOException {
725 while (buffer.getReadIndex() < pauseIndex) {
726 int nextPause = buffer.getDocumentEndIndex();
727 int count = nextPause - buffer.getReadIndex();
728
729 output.writeInt(buffer.getDocument());
730 output.writeInt(count);
731 buffer.incrementDocument();
732
733 flushTuples(nextPause);
734 assert nextPause == buffer.getReadIndex();
735 }
736 }
737 public void flush() throws IOException {
738 flushWord(buffer.getWriteIndex());
739 buffer.reset();
740 lastFlush = true;
741 }
742 }
743 public static class ShreddedBuffer {
744 ArrayList<byte[]> words = new ArrayList();
745 ArrayList<Integer> documents = new ArrayList();
746 ArrayList<Integer> wordTupleIdx = new ArrayList();
747 ArrayList<Integer> documentTupleIdx = new ArrayList();
748 int wordReadIdx = 0;
749 int documentReadIdx = 0;
750
751 int[] values;
752 int writeTupleIndex = 0;
753 int readTupleIndex = 0;
754 int batchSize;
755
756 public ShreddedBuffer(int batchSize) {
757 this.batchSize = batchSize;
758
759 values = new int[batchSize];
760 }
761
762 public ShreddedBuffer() {
763 this(10000);
764 }
765
766 public void processWord(byte[] word) {
767 words.add(word);
768 wordTupleIdx.add(writeTupleIndex);
769 }
770 public void processDocument(int document) {
771 documents.add(document);
772 documentTupleIdx.add(writeTupleIndex);
773 }
774 public void processTuple(int value) {
775 assert words.size() > 0;
776 assert documents.size() > 0;
777 values[writeTupleIndex] = value;
778 writeTupleIndex++;
779 }
780 public void resetData() {
781 words.clear();
782 documents.clear();
783 wordTupleIdx.clear();
784 documentTupleIdx.clear();
785 writeTupleIndex = 0;
786 }
787
788 public void resetRead() {
789 readTupleIndex = 0;
790 wordReadIdx = 0;
791 documentReadIdx = 0;
792 }
793
794 public void reset() {
795 resetData();
796 resetRead();
797 }
798 public boolean isFull() {
799 return writeTupleIndex >= batchSize;
800 }
801
802 public boolean isEmpty() {
803 return writeTupleIndex == 0;
804 }
805
806 public boolean isAtEnd() {
807 return readTupleIndex >= writeTupleIndex;
808 }
809 public void incrementWord() {
810 wordReadIdx++;
811 }
812
813 public void autoIncrementWord() {
814 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
815 wordReadIdx++;
816 }
817 public void incrementDocument() {
818 documentReadIdx++;
819 }
820
821 public void autoIncrementDocument() {
822 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
823 documentReadIdx++;
824 }
825 public void incrementTuple() {
826 readTupleIndex++;
827 }
828 public int getWordEndIndex() {
829 if ((wordReadIdx+1) >= wordTupleIdx.size())
830 return writeTupleIndex;
831 return wordTupleIdx.get(wordReadIdx+1);
832 }
833
834 public int getDocumentEndIndex() {
835 if ((documentReadIdx+1) >= documentTupleIdx.size())
836 return writeTupleIndex;
837 return documentTupleIdx.get(documentReadIdx+1);
838 }
839 public int getReadIndex() {
840 return readTupleIndex;
841 }
842
843 public int getWriteIndex() {
844 return writeTupleIndex;
845 }
846 public byte[] getWord() {
847 assert readTupleIndex < writeTupleIndex;
848 assert wordReadIdx < words.size();
849
850 return words.get(wordReadIdx);
851 }
852 public int getDocument() {
853 assert readTupleIndex < writeTupleIndex;
854 assert documentReadIdx < documents.size();
855
856 return documents.get(documentReadIdx);
857 }
858 public int getValue() {
859 assert readTupleIndex < writeTupleIndex;
860 return values[readTupleIndex];
861 }
862 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
863 while (getReadIndex() < endIndex) {
864 output.processTuple(getValue());
865 incrementTuple();
866 }
867 }
868 public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
869 while (getReadIndex() < endIndex) {
870 output.processWord(getWord());
871 assert getWordEndIndex() <= endIndex;
872 copyUntilIndexDocument(getWordEndIndex(), output);
873 incrementWord();
874 }
875 }
876 public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
877 while (getReadIndex() < endIndex) {
878 output.processDocument(getDocument());
879 assert getDocumentEndIndex() <= endIndex;
880 copyTuples(getDocumentEndIndex(), output);
881 incrementDocument();
882 }
883 }
884 public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
885 while (!isAtEnd()) {
886 if (other != null) {
887 assert !other.isAtEnd();
888 int c = + Utility.compare(getWord(), other.getWord());
889
890 if (c > 0) {
891 break;
892 }
893
894 output.processWord(getWord());
895
896 if (c < 0) {
897 copyUntilIndexDocument(getWordEndIndex(), output);
898 } else if (c == 0) {
899 copyUntilDocument(other, output);
900 autoIncrementWord();
901 break;
902 }
903 } else {
904 output.processWord(getWord());
905 copyUntilIndexDocument(getWordEndIndex(), output);
906 }
907 incrementWord();
908
909
910 }
911 }
912 public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
913 while (!isAtEnd()) {
914 if (other != null) {
915 assert !other.isAtEnd();
916 int c = + Utility.compare(getDocument(), other.getDocument());
917
918 if (c > 0) {
919 break;
920 }
921
922 output.processDocument(getDocument());
923
924 copyTuples(getDocumentEndIndex(), output);
925 } else {
926 output.processDocument(getDocument());
927 copyTuples(getDocumentEndIndex(), output);
928 }
929 incrementDocument();
930
931 if (getWordEndIndex() <= readTupleIndex)
932 break;
933 }
934 }
935 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
936 copyUntilWord(other, output);
937 }
938
939 }
940 public static class ShreddedCombiner implements ReaderSource<DocumentNumberWordInteger>, ShreddedSource {
941 public ShreddedProcessor processor;
942 Collection<ShreddedReader> readers;
943 boolean closeOnExit = false;
944 boolean uninitialized = true;
945 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
946
947 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
948 this.readers = readers;
949 this.closeOnExit = closeOnExit;
950 }
951
952 public void setProcessor(Step processor) throws IncompatibleProcessorException {
953 if (processor instanceof ShreddedProcessor) {
954 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
955 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
956 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
957 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
958 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
959 } else {
960 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
961 }
962 }
963
964 public Class<DocumentNumberWordInteger> getOutputClass() {
965 return DocumentNumberWordInteger.class;
966 }
967
968 public void initialize() throws IOException {
969 for (ShreddedReader reader : readers) {
970 reader.fill();
971
972 if (!reader.getBuffer().isAtEnd())
973 queue.add(reader);
974 }
975
976 uninitialized = false;
977 }
978
979 public void run() throws IOException {
980 initialize();
981
982 while (queue.size() > 0) {
983 ShreddedReader top = queue.poll();
984 ShreddedReader next = null;
985 ShreddedBuffer nextBuffer = null;
986
987 assert !top.getBuffer().isAtEnd();
988
989 if (queue.size() > 0) {
990 next = queue.peek();
991 nextBuffer = next.getBuffer();
992 assert !nextBuffer.isAtEnd();
993 }
994
995 top.getBuffer().copyUntil(nextBuffer, processor);
996 if (top.getBuffer().isAtEnd())
997 top.fill();
998
999 if (!top.getBuffer().isAtEnd())
1000 queue.add(top);
1001 }
1002
1003 if (closeOnExit)
1004 processor.close();
1005 }
1006
1007 public DocumentNumberWordInteger read() throws IOException {
1008 if (uninitialized)
1009 initialize();
1010
1011 DocumentNumberWordInteger result = null;
1012
1013 while (queue.size() > 0) {
1014 ShreddedReader top = queue.poll();
1015 result = top.read();
1016
1017 if (result != null) {
1018 if (top.getBuffer().isAtEnd())
1019 top.fill();
1020
1021 queue.offer(top);
1022 break;
1023 }
1024 }
1025
1026 return result;
1027 }
1028 }
1029 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentNumberWordInteger>, ShreddedSource {
1030 public ShreddedProcessor processor;
1031 ShreddedBuffer buffer;
1032 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1033 long updateWordCount = -1;
1034 long updateDocumentCount = -1;
1035 long tupleCount = 0;
1036 long bufferStartCount = 0;
1037 ArrayInput input;
1038
1039 public ShreddedReader(ArrayInput input) {
1040 this.input = input;
1041 this.buffer = new ShreddedBuffer();
1042 }
1043
1044 public ShreddedReader(ArrayInput input, int bufferSize) {
1045 this.input = input;
1046 this.buffer = new ShreddedBuffer(bufferSize);
1047 }
1048
1049 public final int compareTo(ShreddedReader other) {
1050 ShreddedBuffer otherBuffer = other.getBuffer();
1051
1052 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1053 return 0;
1054 } else if (buffer.isAtEnd()) {
1055 return -1;
1056 } else if (otherBuffer.isAtEnd()) {
1057 return 1;
1058 }
1059
1060 int result = 0;
1061 do {
1062 result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
1063 if(result != 0) break;
1064 result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
1065 if(result != 0) break;
1066 } while (false);
1067
1068 return result;
1069 }
1070
1071 public final ShreddedBuffer getBuffer() {
1072 return buffer;
1073 }
1074
1075 public final DocumentNumberWordInteger read() throws IOException {
1076 if (buffer.isAtEnd()) {
1077 fill();
1078
1079 if (buffer.isAtEnd()) {
1080 return null;
1081 }
1082 }
1083
1084 assert !buffer.isAtEnd();
1085 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1086
1087 result.word = buffer.getWord();
1088 result.document = buffer.getDocument();
1089 result.value = buffer.getValue();
1090
1091 buffer.incrementTuple();
1092 buffer.autoIncrementWord();
1093 buffer.autoIncrementDocument();
1094
1095 return result;
1096 }
1097
1098 public final void fill() throws IOException {
1099 try {
1100 buffer.reset();
1101
1102 if (tupleCount != 0) {
1103
1104 if(updateWordCount - tupleCount > 0) {
1105 buffer.words.add(last.word);
1106 buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
1107 }
1108 if(updateDocumentCount - tupleCount > 0) {
1109 buffer.documents.add(last.document);
1110 buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
1111 }
1112 bufferStartCount = tupleCount;
1113 }
1114
1115 while (!buffer.isFull()) {
1116 updateDocument();
1117 buffer.processTuple(input.readInt());
1118 tupleCount++;
1119 }
1120 } catch(EOFException e) {}
1121 }
1122
1123 public final void updateWord() throws IOException {
1124 if (updateWordCount > tupleCount)
1125 return;
1126
1127 last.word = input.readBytes();
1128 updateWordCount = tupleCount + input.readInt();
1129
1130 buffer.processWord(last.word);
1131 }
1132 public final void updateDocument() throws IOException {
1133 if (updateDocumentCount > tupleCount)
1134 return;
1135
1136 updateWord();
1137 last.document = input.readInt();
1138 updateDocumentCount = tupleCount + input.readInt();
1139
1140 buffer.processDocument(last.document);
1141 }
1142
1143 public void run() throws IOException {
1144 while (true) {
1145 fill();
1146
1147 if (buffer.isAtEnd())
1148 break;
1149
1150 buffer.copyUntil(null, processor);
1151 }
1152 processor.close();
1153 }
1154
1155 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1156 if (processor instanceof ShreddedProcessor) {
1157 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1158 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
1159 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
1160 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1161 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
1162 } else {
1163 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1164 }
1165 }
1166
1167 public Class<DocumentNumberWordInteger> getOutputClass() {
1168 return DocumentNumberWordInteger.class;
1169 }
1170 }
1171
1172 public static class DuplicateEliminator implements ShreddedProcessor {
1173 public ShreddedProcessor processor;
1174 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1175 boolean wordProcess = true;
1176 boolean documentProcess = true;
1177
1178 public DuplicateEliminator() {}
1179 public DuplicateEliminator(ShreddedProcessor processor) {
1180 this.processor = processor;
1181 }
1182
1183 public void setShreddedProcessor(ShreddedProcessor processor) {
1184 this.processor = processor;
1185 }
1186
1187 public void processWord(byte[] word) throws IOException {
1188 if (wordProcess || Utility.compare(word, last.word) != 0) {
1189 last.word = word;
1190 processor.processWord(word);
1191 resetDocument();
1192 wordProcess = false;
1193 }
1194 }
1195 public void processDocument(int document) throws IOException {
1196 if (documentProcess || Utility.compare(document, last.document) != 0) {
1197 last.document = document;
1198 processor.processDocument(document);
1199 documentProcess = false;
1200 }
1201 }
1202
1203 public void resetWord() {
1204 wordProcess = true;
1205 resetDocument();
1206 }
1207 public void resetDocument() {
1208 documentProcess = true;
1209 }
1210
1211 public void processTuple(int value) throws IOException {
1212 processor.processTuple(value);
1213 }
1214
1215 public void close() throws IOException {
1216 processor.close();
1217 }
1218 }
1219 public static class TupleUnshredder implements ShreddedProcessor {
1220 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1221 public org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor;
1222
1223 public TupleUnshredder(DocumentNumberWordInteger.Processor processor) {
1224 this.processor = processor;
1225 }
1226
1227 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor) {
1228 this.processor = processor;
1229 }
1230
1231 public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
1232 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1233 if (object == null) return result;
1234 result.word = object.word;
1235 result.document = object.document;
1236 result.value = object.value;
1237 return result;
1238 }
1239
1240 public void processWord(byte[] word) throws IOException {
1241 last.word = word;
1242 }
1243
1244 public void processDocument(int document) throws IOException {
1245 last.document = document;
1246 }
1247
1248
1249 public void processTuple(int value) throws IOException {
1250 last.value = value;
1251 processor.process(clone(last));
1252 }
1253
1254 public void close() throws IOException {
1255 processor.close();
1256 }
1257 }
1258 public static class TupleShredder implements Processor {
1259 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1260 public ShreddedProcessor processor;
1261
1262 public TupleShredder(ShreddedProcessor processor) {
1263 this.processor = processor;
1264 }
1265
1266 public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
1267 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1268 if (object == null) return result;
1269 result.word = object.word;
1270 result.document = object.document;
1271 result.value = object.value;
1272 return result;
1273 }
1274
1275 public void process(DocumentNumberWordInteger object) throws IOException {
1276 boolean processAll = false;
1277 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
1278 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
1279 processor.processTuple(object.value);
1280 }
1281
1282 public Class<DocumentNumberWordInteger> getInputClass() {
1283 return DocumentNumberWordInteger.class;
1284 }
1285
1286 public void close() throws IOException {
1287 processor.close();
1288 }
1289 }
1290 }
1291 public static class DocumentOrder implements Order<DocumentNumberWordInteger> {
1292 public int hash(DocumentNumberWordInteger object) {
1293 int h = 0;
1294 h += Utility.hash(object.document);
1295 return h;
1296 }
1297 public Comparator<DocumentNumberWordInteger> greaterThan() {
1298 return new Comparator<DocumentNumberWordInteger>() {
1299 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
1300 int result = 0;
1301 do {
1302 result = + Utility.compare(one.document, two.document);
1303 if(result != 0) break;
1304 } while (false);
1305 return -result;
1306 }
1307 };
1308 }
1309 public Comparator<DocumentNumberWordInteger> lessThan() {
1310 return new Comparator<DocumentNumberWordInteger>() {
1311 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
1312 int result = 0;
1313 do {
1314 result = + Utility.compare(one.document, two.document);
1315 if(result != 0) break;
1316 } while (false);
1317 return result;
1318 }
1319 };
1320 }
1321 public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input) {
1322 return new ShreddedReader(_input);
1323 }
1324
1325 public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input, int bufferSize) {
1326 return new ShreddedReader(_input, bufferSize);
1327 }
1328 public OrderedWriter<DocumentNumberWordInteger> orderedWriter(ArrayOutput _output) {
1329 ShreddedWriter w = new ShreddedWriter(_output);
1330 return new OrderedWriterClass(w);
1331 }
1332 public static class OrderedWriterClass extends OrderedWriter< DocumentNumberWordInteger > {
1333 DocumentNumberWordInteger last = null;
1334 ShreddedWriter shreddedWriter = null;
1335
1336 public OrderedWriterClass(ShreddedWriter s) {
1337 this.shreddedWriter = s;
1338 }
1339
1340 public void process(DocumentNumberWordInteger object) throws IOException {
1341 boolean processAll = false;
1342 if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
1343 shreddedWriter.processTuple(object.word, object.value);
1344 last = object;
1345 }
1346
1347 public void close() throws IOException {
1348 shreddedWriter.close();
1349 }
1350
1351 public Class<DocumentNumberWordInteger> getInputClass() {
1352 return DocumentNumberWordInteger.class;
1353 }
1354 }
1355 public ReaderSource<DocumentNumberWordInteger> orderedCombiner(Collection<TypeReader<DocumentNumberWordInteger>> readers, boolean closeOnExit) {
1356 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
1357
1358 for (TypeReader<DocumentNumberWordInteger> reader : readers) {
1359 shreddedReaders.add((ShreddedReader)reader);
1360 }
1361
1362 return new ShreddedCombiner(shreddedReaders, closeOnExit);
1363 }
1364 public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
1365 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1366 if (object == null) return result;
1367 result.word = object.word;
1368 result.document = object.document;
1369 result.value = object.value;
1370 return result;
1371 }
1372 public Class<DocumentNumberWordInteger> getOrderedClass() {
1373 return DocumentNumberWordInteger.class;
1374 }
1375 public String[] getOrderSpec() {
1376 return new String[] {"+document"};
1377 }
1378
1379 public static String getSpecString() {
1380 return "+document";
1381 }
1382
1383 public interface ShreddedProcessor extends Step {
1384 public void processDocument(int document) throws IOException;
1385 public void processTuple(byte[] word, int value) throws IOException;
1386 public void close() throws IOException;
1387 }
1388 public interface ShreddedSource extends Step {
1389 }
1390
1391 public static class ShreddedWriter implements ShreddedProcessor {
1392 ArrayOutput output;
1393 ShreddedBuffer buffer = new ShreddedBuffer();
1394 int lastDocument;
1395 boolean lastFlush = false;
1396
1397 public ShreddedWriter(ArrayOutput output) {
1398 this.output = output;
1399 }
1400
1401 public void close() throws IOException {
1402 flush();
1403 }
1404
1405 public void processDocument(int document) {
1406 lastDocument = document;
1407 buffer.processDocument(document);
1408 }
1409 public final void processTuple(byte[] word, int value) throws IOException {
1410 if (lastFlush) {
1411 if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
1412 lastFlush = false;
1413 }
1414 buffer.processTuple(word, value);
1415 if (buffer.isFull())
1416 flush();
1417 }
1418 public final void flushTuples(int pauseIndex) throws IOException {
1419
1420 while (buffer.getReadIndex() < pauseIndex) {
1421
1422 output.writeBytes(buffer.getWord());
1423 output.writeInt(buffer.getValue());
1424 buffer.incrementTuple();
1425 }
1426 }
1427 public final void flushDocument(int pauseIndex) throws IOException {
1428 while (buffer.getReadIndex() < pauseIndex) {
1429 int nextPause = buffer.getDocumentEndIndex();
1430 int count = nextPause - buffer.getReadIndex();
1431
1432 output.writeInt(buffer.getDocument());
1433 output.writeInt(count);
1434 buffer.incrementDocument();
1435
1436 flushTuples(nextPause);
1437 assert nextPause == buffer.getReadIndex();
1438 }
1439 }
1440 public void flush() throws IOException {
1441 flushDocument(buffer.getWriteIndex());
1442 buffer.reset();
1443 lastFlush = true;
1444 }
1445 }
1446 public static class ShreddedBuffer {
1447 ArrayList<Integer> documents = new ArrayList();
1448 ArrayList<Integer> documentTupleIdx = new ArrayList();
1449 int documentReadIdx = 0;
1450
1451 byte[][] words;
1452 int[] values;
1453 int writeTupleIndex = 0;
1454 int readTupleIndex = 0;
1455 int batchSize;
1456
1457 public ShreddedBuffer(int batchSize) {
1458 this.batchSize = batchSize;
1459
1460 words = new byte[batchSize][];
1461 values = new int[batchSize];
1462 }
1463
1464 public ShreddedBuffer() {
1465 this(10000);
1466 }
1467
1468 public void processDocument(int document) {
1469 documents.add(document);
1470 documentTupleIdx.add(writeTupleIndex);
1471 }
1472 public void processTuple(byte[] word, int value) {
1473 assert documents.size() > 0;
1474 words[writeTupleIndex] = word;
1475 values[writeTupleIndex] = value;
1476 writeTupleIndex++;
1477 }
1478 public void resetData() {
1479 documents.clear();
1480 documentTupleIdx.clear();
1481 writeTupleIndex = 0;
1482 }
1483
1484 public void resetRead() {
1485 readTupleIndex = 0;
1486 documentReadIdx = 0;
1487 }
1488
1489 public void reset() {
1490 resetData();
1491 resetRead();
1492 }
1493 public boolean isFull() {
1494 return writeTupleIndex >= batchSize;
1495 }
1496
1497 public boolean isEmpty() {
1498 return writeTupleIndex == 0;
1499 }
1500
1501 public boolean isAtEnd() {
1502 return readTupleIndex >= writeTupleIndex;
1503 }
1504 public void incrementDocument() {
1505 documentReadIdx++;
1506 }
1507
1508 public void autoIncrementDocument() {
1509 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
1510 documentReadIdx++;
1511 }
1512 public void incrementTuple() {
1513 readTupleIndex++;
1514 }
1515 public int getDocumentEndIndex() {
1516 if ((documentReadIdx+1) >= documentTupleIdx.size())
1517 return writeTupleIndex;
1518 return documentTupleIdx.get(documentReadIdx+1);
1519 }
1520 public int getReadIndex() {
1521 return readTupleIndex;
1522 }
1523
1524 public int getWriteIndex() {
1525 return writeTupleIndex;
1526 }
1527 public int getDocument() {
1528 assert readTupleIndex < writeTupleIndex;
1529 assert documentReadIdx < documents.size();
1530
1531 return documents.get(documentReadIdx);
1532 }
1533 public byte[] getWord() {
1534 assert readTupleIndex < writeTupleIndex;
1535 return words[readTupleIndex];
1536 }
1537 public int getValue() {
1538 assert readTupleIndex < writeTupleIndex;
1539 return values[readTupleIndex];
1540 }
1541 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1542 while (getReadIndex() < endIndex) {
1543 output.processTuple(getWord(), getValue());
1544 incrementTuple();
1545 }
1546 }
1547 public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
1548 while (getReadIndex() < endIndex) {
1549 output.processDocument(getDocument());
1550 assert getDocumentEndIndex() <= endIndex;
1551 copyTuples(getDocumentEndIndex(), output);
1552 incrementDocument();
1553 }
1554 }
1555 public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1556 while (!isAtEnd()) {
1557 if (other != null) {
1558 assert !other.isAtEnd();
1559 int c = + Utility.compare(getDocument(), other.getDocument());
1560
1561 if (c > 0) {
1562 break;
1563 }
1564
1565 output.processDocument(getDocument());
1566
1567 copyTuples(getDocumentEndIndex(), output);
1568 } else {
1569 output.processDocument(getDocument());
1570 copyTuples(getDocumentEndIndex(), output);
1571 }
1572 incrementDocument();
1573
1574
1575 }
1576 }
1577 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1578 copyUntilDocument(other, output);
1579 }
1580
1581 }
1582 public static class ShreddedCombiner implements ReaderSource<DocumentNumberWordInteger>, ShreddedSource {
1583 public ShreddedProcessor processor;
1584 Collection<ShreddedReader> readers;
1585 boolean closeOnExit = false;
1586 boolean uninitialized = true;
1587 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1588
1589 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1590 this.readers = readers;
1591 this.closeOnExit = closeOnExit;
1592 }
1593
1594 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1595 if (processor instanceof ShreddedProcessor) {
1596 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1597 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
1598 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
1599 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1600 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
1601 } else {
1602 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1603 }
1604 }
1605
1606 public Class<DocumentNumberWordInteger> getOutputClass() {
1607 return DocumentNumberWordInteger.class;
1608 }
1609
1610 public void initialize() throws IOException {
1611 for (ShreddedReader reader : readers) {
1612 reader.fill();
1613
1614 if (!reader.getBuffer().isAtEnd())
1615 queue.add(reader);
1616 }
1617
1618 uninitialized = false;
1619 }
1620
1621 public void run() throws IOException {
1622 initialize();
1623
1624 while (queue.size() > 0) {
1625 ShreddedReader top = queue.poll();
1626 ShreddedReader next = null;
1627 ShreddedBuffer nextBuffer = null;
1628
1629 assert !top.getBuffer().isAtEnd();
1630
1631 if (queue.size() > 0) {
1632 next = queue.peek();
1633 nextBuffer = next.getBuffer();
1634 assert !nextBuffer.isAtEnd();
1635 }
1636
1637 top.getBuffer().copyUntil(nextBuffer, processor);
1638 if (top.getBuffer().isAtEnd())
1639 top.fill();
1640
1641 if (!top.getBuffer().isAtEnd())
1642 queue.add(top);
1643 }
1644
1645 if (closeOnExit)
1646 processor.close();
1647 }
1648
1649 public DocumentNumberWordInteger read() throws IOException {
1650 if (uninitialized)
1651 initialize();
1652
1653 DocumentNumberWordInteger result = null;
1654
1655 while (queue.size() > 0) {
1656 ShreddedReader top = queue.poll();
1657 result = top.read();
1658
1659 if (result != null) {
1660 if (top.getBuffer().isAtEnd())
1661 top.fill();
1662
1663 queue.offer(top);
1664 break;
1665 }
1666 }
1667
1668 return result;
1669 }
1670 }
1671 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentNumberWordInteger>, ShreddedSource {
1672 public ShreddedProcessor processor;
1673 ShreddedBuffer buffer;
1674 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1675 long updateDocumentCount = -1;
1676 long tupleCount = 0;
1677 long bufferStartCount = 0;
1678 ArrayInput input;
1679
1680 public ShreddedReader(ArrayInput input) {
1681 this.input = input;
1682 this.buffer = new ShreddedBuffer();
1683 }
1684
1685 public ShreddedReader(ArrayInput input, int bufferSize) {
1686 this.input = input;
1687 this.buffer = new ShreddedBuffer(bufferSize);
1688 }
1689
1690 public final int compareTo(ShreddedReader other) {
1691 ShreddedBuffer otherBuffer = other.getBuffer();
1692
1693 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1694 return 0;
1695 } else if (buffer.isAtEnd()) {
1696 return -1;
1697 } else if (otherBuffer.isAtEnd()) {
1698 return 1;
1699 }
1700
1701 int result = 0;
1702 do {
1703 result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
1704 if(result != 0) break;
1705 } while (false);
1706
1707 return result;
1708 }
1709
1710 public final ShreddedBuffer getBuffer() {
1711 return buffer;
1712 }
1713
1714 public final DocumentNumberWordInteger read() throws IOException {
1715 if (buffer.isAtEnd()) {
1716 fill();
1717
1718 if (buffer.isAtEnd()) {
1719 return null;
1720 }
1721 }
1722
1723 assert !buffer.isAtEnd();
1724 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1725
1726 result.document = buffer.getDocument();
1727 result.word = buffer.getWord();
1728 result.value = buffer.getValue();
1729
1730 buffer.incrementTuple();
1731 buffer.autoIncrementDocument();
1732
1733 return result;
1734 }
1735
1736 public final void fill() throws IOException {
1737 try {
1738 buffer.reset();
1739
1740 if (tupleCount != 0) {
1741
1742 if(updateDocumentCount - tupleCount > 0) {
1743 buffer.documents.add(last.document);
1744 buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
1745 }
1746 bufferStartCount = tupleCount;
1747 }
1748
1749 while (!buffer.isFull()) {
1750 updateDocument();
1751 buffer.processTuple(input.readBytes(), input.readInt());
1752 tupleCount++;
1753 }
1754 } catch(EOFException e) {}
1755 }
1756
1757 public final void updateDocument() throws IOException {
1758 if (updateDocumentCount > tupleCount)
1759 return;
1760
1761 last.document = input.readInt();
1762 updateDocumentCount = tupleCount + input.readInt();
1763
1764 buffer.processDocument(last.document);
1765 }
1766
1767 public void run() throws IOException {
1768 while (true) {
1769 fill();
1770
1771 if (buffer.isAtEnd())
1772 break;
1773
1774 buffer.copyUntil(null, processor);
1775 }
1776 processor.close();
1777 }
1778
1779 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1780 if (processor instanceof ShreddedProcessor) {
1781 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1782 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
1783 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
1784 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1785 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
1786 } else {
1787 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1788 }
1789 }
1790
1791 public Class<DocumentNumberWordInteger> getOutputClass() {
1792 return DocumentNumberWordInteger.class;
1793 }
1794 }
1795
1796 public static class DuplicateEliminator implements ShreddedProcessor {
1797 public ShreddedProcessor processor;
1798 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1799 boolean documentProcess = true;
1800
1801 public DuplicateEliminator() {}
1802 public DuplicateEliminator(ShreddedProcessor processor) {
1803 this.processor = processor;
1804 }
1805
1806 public void setShreddedProcessor(ShreddedProcessor processor) {
1807 this.processor = processor;
1808 }
1809
1810 public void processDocument(int document) throws IOException {
1811 if (documentProcess || Utility.compare(document, last.document) != 0) {
1812 last.document = document;
1813 processor.processDocument(document);
1814 documentProcess = false;
1815 }
1816 }
1817
1818 public void resetDocument() {
1819 documentProcess = true;
1820 }
1821
1822 public void processTuple(byte[] word, int value) throws IOException {
1823 processor.processTuple(word, value);
1824 }
1825
1826 public void close() throws IOException {
1827 processor.close();
1828 }
1829 }
1830 public static class TupleUnshredder implements ShreddedProcessor {
1831 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1832 public org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor;
1833
1834 public TupleUnshredder(DocumentNumberWordInteger.Processor processor) {
1835 this.processor = processor;
1836 }
1837
1838 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor) {
1839 this.processor = processor;
1840 }
1841
1842 public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
1843 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1844 if (object == null) return result;
1845 result.word = object.word;
1846 result.document = object.document;
1847 result.value = object.value;
1848 return result;
1849 }
1850
1851 public void processDocument(int document) throws IOException {
1852 last.document = document;
1853 }
1854
1855
1856 public void processTuple(byte[] word, int value) throws IOException {
1857 last.word = word;
1858 last.value = value;
1859 processor.process(clone(last));
1860 }
1861
1862 public void close() throws IOException {
1863 processor.close();
1864 }
1865 }
1866 public static class TupleShredder implements Processor {
1867 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1868 public ShreddedProcessor processor;
1869
1870 public TupleShredder(ShreddedProcessor processor) {
1871 this.processor = processor;
1872 }
1873
1874 public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
1875 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1876 if (object == null) return result;
1877 result.word = object.word;
1878 result.document = object.document;
1879 result.value = object.value;
1880 return result;
1881 }
1882
1883 public void process(DocumentNumberWordInteger object) throws IOException {
1884 boolean processAll = false;
1885 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
1886 processor.processTuple(object.word, object.value);
1887 }
1888
1889 public Class<DocumentNumberWordInteger> getInputClass() {
1890 return DocumentNumberWordInteger.class;
1891 }
1892
1893 public void close() throws IOException {
1894 processor.close();
1895 }
1896 }
1897 }
1898 public static class ValueOrder implements Order<DocumentNumberWordInteger> {
1899 public int hash(DocumentNumberWordInteger object) {
1900 int h = 0;
1901 h += Utility.hash(object.value);
1902 return h;
1903 }
1904 public Comparator<DocumentNumberWordInteger> greaterThan() {
1905 return new Comparator<DocumentNumberWordInteger>() {
1906 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
1907 int result = 0;
1908 do {
1909 result = + Utility.compare(one.value, two.value);
1910 if(result != 0) break;
1911 } while (false);
1912 return -result;
1913 }
1914 };
1915 }
1916 public Comparator<DocumentNumberWordInteger> lessThan() {
1917 return new Comparator<DocumentNumberWordInteger>() {
1918 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
1919 int result = 0;
1920 do {
1921 result = + Utility.compare(one.value, two.value);
1922 if(result != 0) break;
1923 } while (false);
1924 return result;
1925 }
1926 };
1927 }
1928 public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input) {
1929 return new ShreddedReader(_input);
1930 }
1931
1932 public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input, int bufferSize) {
1933 return new ShreddedReader(_input, bufferSize);
1934 }
1935 public OrderedWriter<DocumentNumberWordInteger> orderedWriter(ArrayOutput _output) {
1936 ShreddedWriter w = new ShreddedWriter(_output);
1937 return new OrderedWriterClass(w);
1938 }
1939 public static class OrderedWriterClass extends OrderedWriter< DocumentNumberWordInteger > {
1940 DocumentNumberWordInteger last = null;
1941 ShreddedWriter shreddedWriter = null;
1942
1943 public OrderedWriterClass(ShreddedWriter s) {
1944 this.shreddedWriter = s;
1945 }
1946
1947 public void process(DocumentNumberWordInteger object) throws IOException {
1948 boolean processAll = false;
1949 if (processAll || last == null || 0 != Utility.compare(object.value, last.value)) { processAll = true; shreddedWriter.processValue(object.value); }
1950 shreddedWriter.processTuple(object.word, object.document);
1951 last = object;
1952 }
1953
1954 public void close() throws IOException {
1955 shreddedWriter.close();
1956 }
1957
1958 public Class<DocumentNumberWordInteger> getInputClass() {
1959 return DocumentNumberWordInteger.class;
1960 }
1961 }
1962 public ReaderSource<DocumentNumberWordInteger> orderedCombiner(Collection<TypeReader<DocumentNumberWordInteger>> readers, boolean closeOnExit) {
1963 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
1964
1965 for (TypeReader<DocumentNumberWordInteger> reader : readers) {
1966 shreddedReaders.add((ShreddedReader)reader);
1967 }
1968
1969 return new ShreddedCombiner(shreddedReaders, closeOnExit);
1970 }
1971 public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
1972 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1973 if (object == null) return result;
1974 result.word = object.word;
1975 result.document = object.document;
1976 result.value = object.value;
1977 return result;
1978 }
1979 public Class<DocumentNumberWordInteger> getOrderedClass() {
1980 return DocumentNumberWordInteger.class;
1981 }
1982 public String[] getOrderSpec() {
1983 return new String[] {"+value"};
1984 }
1985
1986 public static String getSpecString() {
1987 return "+value";
1988 }
1989
1990 public interface ShreddedProcessor extends Step {
1991 public void processValue(int value) throws IOException;
1992 public void processTuple(byte[] word, int document) throws IOException;
1993 public void close() throws IOException;
1994 }
1995 public interface ShreddedSource extends Step {
1996 }
1997
1998 public static class ShreddedWriter implements ShreddedProcessor {
1999 ArrayOutput output;
2000 ShreddedBuffer buffer = new ShreddedBuffer();
2001 int lastValue;
2002 boolean lastFlush = false;
2003
2004 public ShreddedWriter(ArrayOutput output) {
2005 this.output = output;
2006 }
2007
2008 public void close() throws IOException {
2009 flush();
2010 }
2011
2012 public void processValue(int value) {
2013 lastValue = value;
2014 buffer.processValue(value);
2015 }
2016 public final void processTuple(byte[] word, int document) throws IOException {
2017 if (lastFlush) {
2018 if(buffer.values.size() == 0) buffer.processValue(lastValue);
2019 lastFlush = false;
2020 }
2021 buffer.processTuple(word, document);
2022 if (buffer.isFull())
2023 flush();
2024 }
2025 public final void flushTuples(int pauseIndex) throws IOException {
2026
2027 while (buffer.getReadIndex() < pauseIndex) {
2028
2029 output.writeBytes(buffer.getWord());
2030 output.writeInt(buffer.getDocument());
2031 buffer.incrementTuple();
2032 }
2033 }
2034 public final void flushValue(int pauseIndex) throws IOException {
2035 while (buffer.getReadIndex() < pauseIndex) {
2036 int nextPause = buffer.getValueEndIndex();
2037 int count = nextPause - buffer.getReadIndex();
2038
2039 output.writeInt(buffer.getValue());
2040 output.writeInt(count);
2041 buffer.incrementValue();
2042
2043 flushTuples(nextPause);
2044 assert nextPause == buffer.getReadIndex();
2045 }
2046 }
2047 public void flush() throws IOException {
2048 flushValue(buffer.getWriteIndex());
2049 buffer.reset();
2050 lastFlush = true;
2051 }
2052 }
2053 public static class ShreddedBuffer {
2054 ArrayList<Integer> values = new ArrayList();
2055 ArrayList<Integer> valueTupleIdx = new ArrayList();
2056 int valueReadIdx = 0;
2057
2058 byte[][] words;
2059 int[] documents;
2060 int writeTupleIndex = 0;
2061 int readTupleIndex = 0;
2062 int batchSize;
2063
2064 public ShreddedBuffer(int batchSize) {
2065 this.batchSize = batchSize;
2066
2067 words = new byte[batchSize][];
2068 documents = new int[batchSize];
2069 }
2070
2071 public ShreddedBuffer() {
2072 this(10000);
2073 }
2074
2075 public void processValue(int value) {
2076 values.add(value);
2077 valueTupleIdx.add(writeTupleIndex);
2078 }
2079 public void processTuple(byte[] word, int document) {
2080 assert values.size() > 0;
2081 words[writeTupleIndex] = word;
2082 documents[writeTupleIndex] = document;
2083 writeTupleIndex++;
2084 }
2085 public void resetData() {
2086 values.clear();
2087 valueTupleIdx.clear();
2088 writeTupleIndex = 0;
2089 }
2090
2091 public void resetRead() {
2092 readTupleIndex = 0;
2093 valueReadIdx = 0;
2094 }
2095
2096 public void reset() {
2097 resetData();
2098 resetRead();
2099 }
2100 public boolean isFull() {
2101 return writeTupleIndex >= batchSize;
2102 }
2103
2104 public boolean isEmpty() {
2105 return writeTupleIndex == 0;
2106 }
2107
2108 public boolean isAtEnd() {
2109 return readTupleIndex >= writeTupleIndex;
2110 }
2111 public void incrementValue() {
2112 valueReadIdx++;
2113 }
2114
2115 public void autoIncrementValue() {
2116 while (readTupleIndex >= getValueEndIndex() && readTupleIndex < writeTupleIndex)
2117 valueReadIdx++;
2118 }
2119 public void incrementTuple() {
2120 readTupleIndex++;
2121 }
2122 public int getValueEndIndex() {
2123 if ((valueReadIdx+1) >= valueTupleIdx.size())
2124 return writeTupleIndex;
2125 return valueTupleIdx.get(valueReadIdx+1);
2126 }
2127 public int getReadIndex() {
2128 return readTupleIndex;
2129 }
2130
2131 public int getWriteIndex() {
2132 return writeTupleIndex;
2133 }
2134 public int getValue() {
2135 assert readTupleIndex < writeTupleIndex;
2136 assert valueReadIdx < values.size();
2137
2138 return values.get(valueReadIdx);
2139 }
2140 public byte[] getWord() {
2141 assert readTupleIndex < writeTupleIndex;
2142 return words[readTupleIndex];
2143 }
2144 public int getDocument() {
2145 assert readTupleIndex < writeTupleIndex;
2146 return documents[readTupleIndex];
2147 }
2148 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
2149 while (getReadIndex() < endIndex) {
2150 output.processTuple(getWord(), getDocument());
2151 incrementTuple();
2152 }
2153 }
2154 public void copyUntilIndexValue(int endIndex, ShreddedProcessor output) throws IOException {
2155 while (getReadIndex() < endIndex) {
2156 output.processValue(getValue());
2157 assert getValueEndIndex() <= endIndex;
2158 copyTuples(getValueEndIndex(), output);
2159 incrementValue();
2160 }
2161 }
2162 public void copyUntilValue(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
2163 while (!isAtEnd()) {
2164 if (other != null) {
2165 assert !other.isAtEnd();
2166 int c = + Utility.compare(getValue(), other.getValue());
2167
2168 if (c > 0) {
2169 break;
2170 }
2171
2172 output.processValue(getValue());
2173
2174 copyTuples(getValueEndIndex(), output);
2175 } else {
2176 output.processValue(getValue());
2177 copyTuples(getValueEndIndex(), output);
2178 }
2179 incrementValue();
2180
2181
2182 }
2183 }
2184 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
2185 copyUntilValue(other, output);
2186 }
2187
2188 }
2189 public static class ShreddedCombiner implements ReaderSource<DocumentNumberWordInteger>, ShreddedSource {
2190 public ShreddedProcessor processor;
2191 Collection<ShreddedReader> readers;
2192 boolean closeOnExit = false;
2193 boolean uninitialized = true;
2194 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
2195
2196 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
2197 this.readers = readers;
2198 this.closeOnExit = closeOnExit;
2199 }
2200
2201 public void setProcessor(Step processor) throws IncompatibleProcessorException {
2202 if (processor instanceof ShreddedProcessor) {
2203 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
2204 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
2205 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
2206 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
2207 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
2208 } else {
2209 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
2210 }
2211 }
2212
2213 public Class<DocumentNumberWordInteger> getOutputClass() {
2214 return DocumentNumberWordInteger.class;
2215 }
2216
2217 public void initialize() throws IOException {
2218 for (ShreddedReader reader : readers) {
2219 reader.fill();
2220
2221 if (!reader.getBuffer().isAtEnd())
2222 queue.add(reader);
2223 }
2224
2225 uninitialized = false;
2226 }
2227
2228 public void run() throws IOException {
2229 initialize();
2230
2231 while (queue.size() > 0) {
2232 ShreddedReader top = queue.poll();
2233 ShreddedReader next = null;
2234 ShreddedBuffer nextBuffer = null;
2235
2236 assert !top.getBuffer().isAtEnd();
2237
2238 if (queue.size() > 0) {
2239 next = queue.peek();
2240 nextBuffer = next.getBuffer();
2241 assert !nextBuffer.isAtEnd();
2242 }
2243
2244 top.getBuffer().copyUntil(nextBuffer, processor);
2245 if (top.getBuffer().isAtEnd())
2246 top.fill();
2247
2248 if (!top.getBuffer().isAtEnd())
2249 queue.add(top);
2250 }
2251
2252 if (closeOnExit)
2253 processor.close();
2254 }
2255
2256 public DocumentNumberWordInteger read() throws IOException {
2257 if (uninitialized)
2258 initialize();
2259
2260 DocumentNumberWordInteger result = null;
2261
2262 while (queue.size() > 0) {
2263 ShreddedReader top = queue.poll();
2264 result = top.read();
2265
2266 if (result != null) {
2267 if (top.getBuffer().isAtEnd())
2268 top.fill();
2269
2270 queue.offer(top);
2271 break;
2272 }
2273 }
2274
2275 return result;
2276 }
2277 }
2278 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentNumberWordInteger>, ShreddedSource {
2279 public ShreddedProcessor processor;
2280 ShreddedBuffer buffer;
2281 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
2282 long updateValueCount = -1;
2283 long tupleCount = 0;
2284 long bufferStartCount = 0;
2285 ArrayInput input;
2286
2287 public ShreddedReader(ArrayInput input) {
2288 this.input = input;
2289 this.buffer = new ShreddedBuffer();
2290 }
2291
2292 public ShreddedReader(ArrayInput input, int bufferSize) {
2293 this.input = input;
2294 this.buffer = new ShreddedBuffer(bufferSize);
2295 }
2296
2297 public final int compareTo(ShreddedReader other) {
2298 ShreddedBuffer otherBuffer = other.getBuffer();
2299
2300 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
2301 return 0;
2302 } else if (buffer.isAtEnd()) {
2303 return -1;
2304 } else if (otherBuffer.isAtEnd()) {
2305 return 1;
2306 }
2307
2308 int result = 0;
2309 do {
2310 result = + Utility.compare(buffer.getValue(), otherBuffer.getValue());
2311 if(result != 0) break;
2312 } while (false);
2313
2314 return result;
2315 }
2316
2317 public final ShreddedBuffer getBuffer() {
2318 return buffer;
2319 }
2320
2321 public final DocumentNumberWordInteger read() throws IOException {
2322 if (buffer.isAtEnd()) {
2323 fill();
2324
2325 if (buffer.isAtEnd()) {
2326 return null;
2327 }
2328 }
2329
2330 assert !buffer.isAtEnd();
2331 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
2332
2333 result.value = buffer.getValue();
2334 result.word = buffer.getWord();
2335 result.document = buffer.getDocument();
2336
2337 buffer.incrementTuple();
2338 buffer.autoIncrementValue();
2339
2340 return result;
2341 }
2342
2343 public final void fill() throws IOException {
2344 try {
2345 buffer.reset();
2346
2347 if (tupleCount != 0) {
2348
2349 if(updateValueCount - tupleCount > 0) {
2350 buffer.values.add(last.value);
2351 buffer.valueTupleIdx.add((int) (updateValueCount - tupleCount));
2352 }
2353 bufferStartCount = tupleCount;
2354 }
2355
2356 while (!buffer.isFull()) {
2357 updateValue();
2358 buffer.processTuple(input.readBytes(), input.readInt());
2359 tupleCount++;
2360 }
2361 } catch(EOFException e) {}
2362 }
2363
2364 public final void updateValue() throws IOException {
2365 if (updateValueCount > tupleCount)
2366 return;
2367
2368 last.value = input.readInt();
2369 updateValueCount = tupleCount + input.readInt();
2370
2371 buffer.processValue(last.value);
2372 }
2373
2374 public void run() throws IOException {
2375 while (true) {
2376 fill();
2377
2378 if (buffer.isAtEnd())
2379 break;
2380
2381 buffer.copyUntil(null, processor);
2382 }
2383 processor.close();
2384 }
2385
2386 public void setProcessor(Step processor) throws IncompatibleProcessorException {
2387 if (processor instanceof ShreddedProcessor) {
2388 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
2389 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
2390 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
2391 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
2392 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
2393 } else {
2394 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
2395 }
2396 }
2397
2398 public Class<DocumentNumberWordInteger> getOutputClass() {
2399 return DocumentNumberWordInteger.class;
2400 }
2401 }
2402
2403 public static class DuplicateEliminator implements ShreddedProcessor {
2404 public ShreddedProcessor processor;
2405 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
2406 boolean valueProcess = true;
2407
2408 public DuplicateEliminator() {}
2409 public DuplicateEliminator(ShreddedProcessor processor) {
2410 this.processor = processor;
2411 }
2412
2413 public void setShreddedProcessor(ShreddedProcessor processor) {
2414 this.processor = processor;
2415 }
2416
2417 public void processValue(int value) throws IOException {
2418 if (valueProcess || Utility.compare(value, last.value) != 0) {
2419 last.value = value;
2420 processor.processValue(value);
2421 valueProcess = false;
2422 }
2423 }
2424
2425 public void resetValue() {
2426 valueProcess = true;
2427 }
2428
2429 public void processTuple(byte[] word, int document) throws IOException {
2430 processor.processTuple(word, document);
2431 }
2432
2433 public void close() throws IOException {
2434 processor.close();
2435 }
2436 }
2437 public static class TupleUnshredder implements ShreddedProcessor {
2438 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
2439 public org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor;
2440
2441 public TupleUnshredder(DocumentNumberWordInteger.Processor processor) {
2442 this.processor = processor;
2443 }
2444
2445 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor) {
2446 this.processor = processor;
2447 }
2448
2449 public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
2450 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
2451 if (object == null) return result;
2452 result.word = object.word;
2453 result.document = object.document;
2454 result.value = object.value;
2455 return result;
2456 }
2457
2458 public void processValue(int value) throws IOException {
2459 last.value = value;
2460 }
2461
2462
2463 public void processTuple(byte[] word, int document) throws IOException {
2464 last.word = word;
2465 last.document = document;
2466 processor.process(clone(last));
2467 }
2468
2469 public void close() throws IOException {
2470 processor.close();
2471 }
2472 }
2473 public static class TupleShredder implements Processor {
2474 DocumentNumberWordInteger last = new DocumentNumberWordInteger();
2475 public ShreddedProcessor processor;
2476
2477 public TupleShredder(ShreddedProcessor processor) {
2478 this.processor = processor;
2479 }
2480
2481 public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
2482 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
2483 if (object == null) return result;
2484 result.word = object.word;
2485 result.document = object.document;
2486 result.value = object.value;
2487 return result;
2488 }
2489
2490 public void process(DocumentNumberWordInteger object) throws IOException {
2491 boolean processAll = false;
2492 if(last == null || Utility.compare(last.value, object.value) != 0 || processAll) { processor.processValue(object.value); processAll = true; }
2493 processor.processTuple(object.word, object.document);
2494 }
2495
2496 public Class<DocumentNumberWordInteger> getInputClass() {
2497 return DocumentNumberWordInteger.class;
2498 }
2499
2500 public void close() throws IOException {
2501 processor.close();
2502 }
2503 }
2504 }
2505 }