1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class NumberWordPosition implements Type<NumberWordPosition> {
25 public int document;
26 public byte[] word;
27 public int position;
28
29 public NumberWordPosition() {}
30 public NumberWordPosition(int document, byte[] word, int position) {
31 this.document = document;
32 this.word = word;
33 this.position = position;
34 }
35
36 public String toString() {
37 try {
38 return String.format("%d,%s,%d",
39 document, new String(word, "UTF-8"), position);
40 } catch(UnsupportedEncodingException e) {
41 throw new RuntimeException("Couldn't convert string to UTF-8.");
42 }
43 }
44
45 public Order<NumberWordPosition> getOrder(String... spec) {
46 if (Arrays.equals(spec, new String[] { "+word", "+document", "+position" })) {
47 return new WordDocumentPositionOrder();
48 }
49 return null;
50 }
51
52 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<NumberWordPosition> {
53 public void process(NumberWordPosition object) throws IOException;
54 public void close() throws IOException;
55 }
56 public interface Source extends Step {
57 }
58 public static class WordDocumentPositionOrder implements Order<NumberWordPosition> {
59 public int hash(NumberWordPosition object) {
60 int h = 0;
61 h += Utility.hash(object.word);
62 h += Utility.hash(object.document);
63 h += Utility.hash(object.position);
64 return h;
65 }
66 public Comparator<NumberWordPosition> greaterThan() {
67 return new Comparator<NumberWordPosition>() {
68 public int compare(NumberWordPosition one, NumberWordPosition two) {
69 int result = 0;
70 do {
71 result = + Utility.compare(one.word, two.word);
72 if(result != 0) break;
73 result = + Utility.compare(one.document, two.document);
74 if(result != 0) break;
75 result = + Utility.compare(one.position, two.position);
76 if(result != 0) break;
77 } while (false);
78 return -result;
79 }
80 };
81 }
82 public Comparator<NumberWordPosition> lessThan() {
83 return new Comparator<NumberWordPosition>() {
84 public int compare(NumberWordPosition one, NumberWordPosition two) {
85 int result = 0;
86 do {
87 result = + Utility.compare(one.word, two.word);
88 if(result != 0) break;
89 result = + Utility.compare(one.document, two.document);
90 if(result != 0) break;
91 result = + Utility.compare(one.position, two.position);
92 if(result != 0) break;
93 } while (false);
94 return result;
95 }
96 };
97 }
98 public TypeReader<NumberWordPosition> orderedReader(ArrayInput _input) {
99 return new ShreddedReader(_input);
100 }
101
102 public TypeReader<NumberWordPosition> orderedReader(ArrayInput _input, int bufferSize) {
103 return new ShreddedReader(_input, bufferSize);
104 }
105 public OrderedWriter<NumberWordPosition> orderedWriter(ArrayOutput _output) {
106 ShreddedWriter w = new ShreddedWriter(_output);
107 return new OrderedWriterClass(w);
108 }
109 public static class OrderedWriterClass extends OrderedWriter< NumberWordPosition > {
110 NumberWordPosition last = null;
111 ShreddedWriter shreddedWriter = null;
112
113 public OrderedWriterClass(ShreddedWriter s) {
114 this.shreddedWriter = s;
115 }
116
117 public void process(NumberWordPosition object) throws IOException {
118 boolean processAll = false;
119 if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
120 if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
121 if (processAll || last == null || 0 != Utility.compare(object.position, last.position)) { processAll = true; shreddedWriter.processPosition(object.position); }
122 shreddedWriter.processTuple();
123 last = object;
124 }
125
126 public void close() throws IOException {
127 shreddedWriter.close();
128 }
129
130 public Class<NumberWordPosition> getInputClass() {
131 return NumberWordPosition.class;
132 }
133 }
134 public ReaderSource<NumberWordPosition> orderedCombiner(Collection<TypeReader<NumberWordPosition>> readers, boolean closeOnExit) {
135 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
136
137 for (TypeReader<NumberWordPosition> reader : readers) {
138 shreddedReaders.add((ShreddedReader)reader);
139 }
140
141 return new ShreddedCombiner(shreddedReaders, closeOnExit);
142 }
143 public NumberWordPosition clone(NumberWordPosition object) {
144 NumberWordPosition result = new NumberWordPosition();
145 if (object == null) return result;
146 result.document = object.document;
147 result.word = object.word;
148 result.position = object.position;
149 return result;
150 }
151 public Class<NumberWordPosition> getOrderedClass() {
152 return NumberWordPosition.class;
153 }
154 public String[] getOrderSpec() {
155 return new String[] {"+word", "+document", "+position"};
156 }
157
158 public static String getSpecString() {
159 return "+word +document +position";
160 }
161
162 public interface ShreddedProcessor extends Step {
163 public void processWord(byte[] word) throws IOException;
164 public void processDocument(int document) throws IOException;
165 public void processPosition(int position) throws IOException;
166 public void processTuple() throws IOException;
167 public void close() throws IOException;
168 }
169 public interface ShreddedSource extends Step {
170 }
171
172 public static class ShreddedWriter implements ShreddedProcessor {
173 ArrayOutput output;
174 ShreddedBuffer buffer = new ShreddedBuffer();
175 byte[] lastWord;
176 int lastDocument;
177 int lastPosition;
178 boolean lastFlush = false;
179
180 public ShreddedWriter(ArrayOutput output) {
181 this.output = output;
182 }
183
184 public void close() throws IOException {
185 flush();
186 }
187
188 public void processWord(byte[] word) {
189 lastWord = word;
190 buffer.processWord(word);
191 }
192 public void processDocument(int document) {
193 lastDocument = document;
194 buffer.processDocument(document);
195 }
196 public void processPosition(int position) {
197 lastPosition = position;
198 buffer.processPosition(position);
199 }
200 public final void processTuple() throws IOException {
201 if (lastFlush) {
202 if(buffer.words.size() == 0) buffer.processWord(lastWord);
203 if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
204 if(buffer.positions.size() == 0) buffer.processPosition(lastPosition);
205 lastFlush = false;
206 }
207 buffer.processTuple();
208 if (buffer.isFull())
209 flush();
210 }
211 public final void flushTuples(int pauseIndex) throws IOException {
212
213 while (buffer.getReadIndex() < pauseIndex) {
214
215 buffer.incrementTuple();
216 }
217 }
218 public final void flushWord(int pauseIndex) throws IOException {
219 while (buffer.getReadIndex() < pauseIndex) {
220 int nextPause = buffer.getWordEndIndex();
221 int count = nextPause - buffer.getReadIndex();
222
223 output.writeBytes(buffer.getWord());
224 output.writeInt(count);
225 buffer.incrementWord();
226
227 flushDocument(nextPause);
228 assert nextPause == buffer.getReadIndex();
229 }
230 }
231 public final void flushDocument(int pauseIndex) throws IOException {
232 while (buffer.getReadIndex() < pauseIndex) {
233 int nextPause = buffer.getDocumentEndIndex();
234 int count = nextPause - buffer.getReadIndex();
235
236 output.writeInt(buffer.getDocument());
237 output.writeInt(count);
238 buffer.incrementDocument();
239
240 flushPosition(nextPause);
241 assert nextPause == buffer.getReadIndex();
242 }
243 }
244 public final void flushPosition(int pauseIndex) throws IOException {
245 while (buffer.getReadIndex() < pauseIndex) {
246 int nextPause = buffer.getPositionEndIndex();
247 int count = nextPause - buffer.getReadIndex();
248
249 output.writeInt(buffer.getPosition());
250 output.writeInt(count);
251 buffer.incrementPosition();
252
253 flushTuples(nextPause);
254 assert nextPause == buffer.getReadIndex();
255 }
256 }
257 public void flush() throws IOException {
258 flushWord(buffer.getWriteIndex());
259 buffer.reset();
260 lastFlush = true;
261 }
262 }
263 public static class ShreddedBuffer {
264 ArrayList<byte[]> words = new ArrayList();
265 ArrayList<Integer> documents = new ArrayList();
266 ArrayList<Integer> positions = new ArrayList();
267 ArrayList<Integer> wordTupleIdx = new ArrayList();
268 ArrayList<Integer> documentTupleIdx = new ArrayList();
269 ArrayList<Integer> positionTupleIdx = new ArrayList();
270 int wordReadIdx = 0;
271 int documentReadIdx = 0;
272 int positionReadIdx = 0;
273
274 int writeTupleIndex = 0;
275 int readTupleIndex = 0;
276 int batchSize;
277
278 public ShreddedBuffer(int batchSize) {
279 this.batchSize = batchSize;
280
281 }
282
283 public ShreddedBuffer() {
284 this(10000);
285 }
286
287 public void processWord(byte[] word) {
288 words.add(word);
289 wordTupleIdx.add(writeTupleIndex);
290 }
291 public void processDocument(int document) {
292 documents.add(document);
293 documentTupleIdx.add(writeTupleIndex);
294 }
295 public void processPosition(int position) {
296 positions.add(position);
297 positionTupleIdx.add(writeTupleIndex);
298 }
299 public void processTuple() {
300 assert words.size() > 0;
301 assert documents.size() > 0;
302 assert positions.size() > 0;
303 writeTupleIndex++;
304 }
305 public void resetData() {
306 words.clear();
307 documents.clear();
308 positions.clear();
309 wordTupleIdx.clear();
310 documentTupleIdx.clear();
311 positionTupleIdx.clear();
312 writeTupleIndex = 0;
313 }
314
315 public void resetRead() {
316 readTupleIndex = 0;
317 wordReadIdx = 0;
318 documentReadIdx = 0;
319 positionReadIdx = 0;
320 }
321
322 public void reset() {
323 resetData();
324 resetRead();
325 }
326 public boolean isFull() {
327 return writeTupleIndex >= batchSize;
328 }
329
330 public boolean isEmpty() {
331 return writeTupleIndex == 0;
332 }
333
334 public boolean isAtEnd() {
335 return readTupleIndex >= writeTupleIndex;
336 }
337 public void incrementWord() {
338 wordReadIdx++;
339 }
340
341 public void autoIncrementWord() {
342 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
343 wordReadIdx++;
344 }
345 public void incrementDocument() {
346 documentReadIdx++;
347 }
348
349 public void autoIncrementDocument() {
350 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
351 documentReadIdx++;
352 }
353 public void incrementPosition() {
354 positionReadIdx++;
355 }
356
357 public void autoIncrementPosition() {
358 while (readTupleIndex >= getPositionEndIndex() && readTupleIndex < writeTupleIndex)
359 positionReadIdx++;
360 }
361 public void incrementTuple() {
362 readTupleIndex++;
363 }
364 public int getWordEndIndex() {
365 if ((wordReadIdx+1) >= wordTupleIdx.size())
366 return writeTupleIndex;
367 return wordTupleIdx.get(wordReadIdx+1);
368 }
369
370 public int getDocumentEndIndex() {
371 if ((documentReadIdx+1) >= documentTupleIdx.size())
372 return writeTupleIndex;
373 return documentTupleIdx.get(documentReadIdx+1);
374 }
375
376 public int getPositionEndIndex() {
377 if ((positionReadIdx+1) >= positionTupleIdx.size())
378 return writeTupleIndex;
379 return positionTupleIdx.get(positionReadIdx+1);
380 }
381 public int getReadIndex() {
382 return readTupleIndex;
383 }
384
385 public int getWriteIndex() {
386 return writeTupleIndex;
387 }
388 public byte[] getWord() {
389 assert readTupleIndex < writeTupleIndex;
390 assert wordReadIdx < words.size();
391
392 return words.get(wordReadIdx);
393 }
394 public int getDocument() {
395 assert readTupleIndex < writeTupleIndex;
396 assert documentReadIdx < documents.size();
397
398 return documents.get(documentReadIdx);
399 }
400 public int getPosition() {
401 assert readTupleIndex < writeTupleIndex;
402 assert positionReadIdx < positions.size();
403
404 return positions.get(positionReadIdx);
405 }
406
407 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
408 while (getReadIndex() < endIndex) {
409 output.processTuple();
410 incrementTuple();
411 }
412 }
413 public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
414 while (getReadIndex() < endIndex) {
415 output.processWord(getWord());
416 assert getWordEndIndex() <= endIndex;
417 copyUntilIndexDocument(getWordEndIndex(), output);
418 incrementWord();
419 }
420 }
421 public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
422 while (getReadIndex() < endIndex) {
423 output.processDocument(getDocument());
424 assert getDocumentEndIndex() <= endIndex;
425 copyUntilIndexPosition(getDocumentEndIndex(), output);
426 incrementDocument();
427 }
428 }
429 public void copyUntilIndexPosition(int endIndex, ShreddedProcessor output) throws IOException {
430 while (getReadIndex() < endIndex) {
431 output.processPosition(getPosition());
432 assert getPositionEndIndex() <= endIndex;
433 copyTuples(getPositionEndIndex(), output);
434 incrementPosition();
435 }
436 }
437 public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
438 while (!isAtEnd()) {
439 if (other != null) {
440 assert !other.isAtEnd();
441 int c = + Utility.compare(getWord(), other.getWord());
442
443 if (c > 0) {
444 break;
445 }
446
447 output.processWord(getWord());
448
449 if (c < 0) {
450 copyUntilIndexDocument(getWordEndIndex(), output);
451 } else if (c == 0) {
452 copyUntilDocument(other, output);
453 autoIncrementWord();
454 break;
455 }
456 } else {
457 output.processWord(getWord());
458 copyUntilIndexDocument(getWordEndIndex(), output);
459 }
460 incrementWord();
461
462
463 }
464 }
465 public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
466 while (!isAtEnd()) {
467 if (other != null) {
468 assert !other.isAtEnd();
469 int c = + Utility.compare(getDocument(), other.getDocument());
470
471 if (c > 0) {
472 break;
473 }
474
475 output.processDocument(getDocument());
476
477 if (c < 0) {
478 copyUntilIndexPosition(getDocumentEndIndex(), output);
479 } else if (c == 0) {
480 copyUntilPosition(other, output);
481 autoIncrementDocument();
482 break;
483 }
484 } else {
485 output.processDocument(getDocument());
486 copyUntilIndexPosition(getDocumentEndIndex(), output);
487 }
488 incrementDocument();
489
490 if (getWordEndIndex() <= readTupleIndex)
491 break;
492 }
493 }
494 public void copyUntilPosition(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
495 while (!isAtEnd()) {
496 if (other != null) {
497 assert !other.isAtEnd();
498 int c = + Utility.compare(getPosition(), other.getPosition());
499
500 if (c > 0) {
501 break;
502 }
503
504 output.processPosition(getPosition());
505
506 copyTuples(getPositionEndIndex(), output);
507 } else {
508 output.processPosition(getPosition());
509 copyTuples(getPositionEndIndex(), output);
510 }
511 incrementPosition();
512
513 if (getDocumentEndIndex() <= readTupleIndex)
514 break;
515 }
516 }
517 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
518 copyUntilWord(other, output);
519 }
520
521 }
522 public static class ShreddedCombiner implements ReaderSource<NumberWordPosition>, ShreddedSource {
523 public ShreddedProcessor processor;
524 Collection<ShreddedReader> readers;
525 boolean closeOnExit = false;
526 boolean uninitialized = true;
527 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
528
529 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
530 this.readers = readers;
531 this.closeOnExit = closeOnExit;
532 }
533
534 public void setProcessor(Step processor) throws IncompatibleProcessorException {
535 if (processor instanceof ShreddedProcessor) {
536 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
537 } else if (processor instanceof NumberWordPosition.Processor) {
538 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberWordPosition.Processor) processor));
539 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
540 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberWordPosition>) processor));
541 } else {
542 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
543 }
544 }
545
546 public Class<NumberWordPosition> getOutputClass() {
547 return NumberWordPosition.class;
548 }
549
550 public void initialize() throws IOException {
551 for (ShreddedReader reader : readers) {
552 reader.fill();
553
554 if (!reader.getBuffer().isAtEnd())
555 queue.add(reader);
556 }
557
558 uninitialized = false;
559 }
560
561 public void run() throws IOException {
562 initialize();
563
564 while (queue.size() > 0) {
565 ShreddedReader top = queue.poll();
566 ShreddedReader next = null;
567 ShreddedBuffer nextBuffer = null;
568
569 assert !top.getBuffer().isAtEnd();
570
571 if (queue.size() > 0) {
572 next = queue.peek();
573 nextBuffer = next.getBuffer();
574 assert !nextBuffer.isAtEnd();
575 }
576
577 top.getBuffer().copyUntil(nextBuffer, processor);
578 if (top.getBuffer().isAtEnd())
579 top.fill();
580
581 if (!top.getBuffer().isAtEnd())
582 queue.add(top);
583 }
584
585 if (closeOnExit)
586 processor.close();
587 }
588
589 public NumberWordPosition read() throws IOException {
590 if (uninitialized)
591 initialize();
592
593 NumberWordPosition result = null;
594
595 while (queue.size() > 0) {
596 ShreddedReader top = queue.poll();
597 result = top.read();
598
599 if (result != null) {
600 if (top.getBuffer().isAtEnd())
601 top.fill();
602
603 queue.offer(top);
604 break;
605 }
606 }
607
608 return result;
609 }
610 }
611 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberWordPosition>, ShreddedSource {
612 public ShreddedProcessor processor;
613 ShreddedBuffer buffer;
614 NumberWordPosition last = new NumberWordPosition();
615 long updateWordCount = -1;
616 long updateDocumentCount = -1;
617 long updatePositionCount = -1;
618 long tupleCount = 0;
619 long bufferStartCount = 0;
620 ArrayInput input;
621
622 public ShreddedReader(ArrayInput input) {
623 this.input = input;
624 this.buffer = new ShreddedBuffer();
625 }
626
627 public ShreddedReader(ArrayInput input, int bufferSize) {
628 this.input = input;
629 this.buffer = new ShreddedBuffer(bufferSize);
630 }
631
632 public final int compareTo(ShreddedReader other) {
633 ShreddedBuffer otherBuffer = other.getBuffer();
634
635 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
636 return 0;
637 } else if (buffer.isAtEnd()) {
638 return -1;
639 } else if (otherBuffer.isAtEnd()) {
640 return 1;
641 }
642
643 int result = 0;
644 do {
645 result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
646 if(result != 0) break;
647 result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
648 if(result != 0) break;
649 result = + Utility.compare(buffer.getPosition(), otherBuffer.getPosition());
650 if(result != 0) break;
651 } while (false);
652
653 return result;
654 }
655
656 public final ShreddedBuffer getBuffer() {
657 return buffer;
658 }
659
660 public final NumberWordPosition read() throws IOException {
661 if (buffer.isAtEnd()) {
662 fill();
663
664 if (buffer.isAtEnd()) {
665 return null;
666 }
667 }
668
669 assert !buffer.isAtEnd();
670 NumberWordPosition result = new NumberWordPosition();
671
672 result.word = buffer.getWord();
673 result.document = buffer.getDocument();
674 result.position = buffer.getPosition();
675
676 buffer.incrementTuple();
677 buffer.autoIncrementWord();
678 buffer.autoIncrementDocument();
679 buffer.autoIncrementPosition();
680
681 return result;
682 }
683
684 public final void fill() throws IOException {
685 try {
686 buffer.reset();
687
688 if (tupleCount != 0) {
689
690 if(updateWordCount - tupleCount > 0) {
691 buffer.words.add(last.word);
692 buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
693 }
694 if(updateDocumentCount - tupleCount > 0) {
695 buffer.documents.add(last.document);
696 buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
697 }
698 if(updatePositionCount - tupleCount > 0) {
699 buffer.positions.add(last.position);
700 buffer.positionTupleIdx.add((int) (updatePositionCount - tupleCount));
701 }
702 bufferStartCount = tupleCount;
703 }
704
705 while (!buffer.isFull()) {
706 updatePosition();
707 buffer.processTuple();
708 tupleCount++;
709 }
710 } catch(EOFException e) {}
711 }
712
713 public final void updateWord() throws IOException {
714 if (updateWordCount > tupleCount)
715 return;
716
717 last.word = input.readBytes();
718 updateWordCount = tupleCount + input.readInt();
719
720 buffer.processWord(last.word);
721 }
722 public final void updateDocument() throws IOException {
723 if (updateDocumentCount > tupleCount)
724 return;
725
726 updateWord();
727 last.document = input.readInt();
728 updateDocumentCount = tupleCount + input.readInt();
729
730 buffer.processDocument(last.document);
731 }
732 public final void updatePosition() throws IOException {
733 if (updatePositionCount > tupleCount)
734 return;
735
736 updateDocument();
737 last.position = input.readInt();
738 updatePositionCount = tupleCount + input.readInt();
739
740 buffer.processPosition(last.position);
741 }
742
743 public void run() throws IOException {
744 while (true) {
745 fill();
746
747 if (buffer.isAtEnd())
748 break;
749
750 buffer.copyUntil(null, processor);
751 }
752 processor.close();
753 }
754
755 public void setProcessor(Step processor) throws IncompatibleProcessorException {
756 if (processor instanceof ShreddedProcessor) {
757 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
758 } else if (processor instanceof NumberWordPosition.Processor) {
759 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberWordPosition.Processor) processor));
760 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
761 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberWordPosition>) processor));
762 } else {
763 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
764 }
765 }
766
767 public Class<NumberWordPosition> getOutputClass() {
768 return NumberWordPosition.class;
769 }
770 }
771
772 public static class DuplicateEliminator implements ShreddedProcessor {
773 public ShreddedProcessor processor;
774 NumberWordPosition last = new NumberWordPosition();
775 boolean wordProcess = true;
776 boolean documentProcess = true;
777 boolean positionProcess = true;
778
779 public DuplicateEliminator() {}
780 public DuplicateEliminator(ShreddedProcessor processor) {
781 this.processor = processor;
782 }
783
784 public void setShreddedProcessor(ShreddedProcessor processor) {
785 this.processor = processor;
786 }
787
788 public void processWord(byte[] word) throws IOException {
789 if (wordProcess || Utility.compare(word, last.word) != 0) {
790 last.word = word;
791 processor.processWord(word);
792 resetDocument();
793 wordProcess = false;
794 }
795 }
796 public void processDocument(int document) throws IOException {
797 if (documentProcess || Utility.compare(document, last.document) != 0) {
798 last.document = document;
799 processor.processDocument(document);
800 resetPosition();
801 documentProcess = false;
802 }
803 }
804 public void processPosition(int position) throws IOException {
805 if (positionProcess || Utility.compare(position, last.position) != 0) {
806 last.position = position;
807 processor.processPosition(position);
808 positionProcess = false;
809 }
810 }
811
812 public void resetWord() {
813 wordProcess = true;
814 resetDocument();
815 }
816 public void resetDocument() {
817 documentProcess = true;
818 resetPosition();
819 }
820 public void resetPosition() {
821 positionProcess = true;
822 }
823
824 public void processTuple() throws IOException {
825 processor.processTuple();
826 }
827
828 public void close() throws IOException {
829 processor.close();
830 }
831 }
832 public static class TupleUnshredder implements ShreddedProcessor {
833 NumberWordPosition last = new NumberWordPosition();
834 public org.galagosearch.tupleflow.Processor<NumberWordPosition> processor;
835
836 public TupleUnshredder(NumberWordPosition.Processor processor) {
837 this.processor = processor;
838 }
839
840 public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberWordPosition> processor) {
841 this.processor = processor;
842 }
843
844 public NumberWordPosition clone(NumberWordPosition object) {
845 NumberWordPosition result = new NumberWordPosition();
846 if (object == null) return result;
847 result.document = object.document;
848 result.word = object.word;
849 result.position = object.position;
850 return result;
851 }
852
853 public void processWord(byte[] word) throws IOException {
854 last.word = word;
855 }
856
857 public void processDocument(int document) throws IOException {
858 last.document = document;
859 }
860
861 public void processPosition(int position) throws IOException {
862 last.position = position;
863 }
864
865
866 public void processTuple() throws IOException {
867 processor.process(clone(last));
868 }
869
870 public void close() throws IOException {
871 processor.close();
872 }
873 }
874 public static class TupleShredder implements Processor {
875 NumberWordPosition last = new NumberWordPosition();
876 public ShreddedProcessor processor;
877
878 public TupleShredder(ShreddedProcessor processor) {
879 this.processor = processor;
880 }
881
882 public NumberWordPosition clone(NumberWordPosition object) {
883 NumberWordPosition result = new NumberWordPosition();
884 if (object == null) return result;
885 result.document = object.document;
886 result.word = object.word;
887 result.position = object.position;
888 return result;
889 }
890
891 public void process(NumberWordPosition object) throws IOException {
892 boolean processAll = false;
893 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
894 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
895 if(last == null || Utility.compare(last.position, object.position) != 0 || processAll) { processor.processPosition(object.position); processAll = true; }
896 processor.processTuple();
897 }
898
899 public Class<NumberWordPosition> getInputClass() {
900 return NumberWordPosition.class;
901 }
902
903 public void close() throws IOException {
904 processor.close();
905 }
906 }
907 }
908 }