1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class DocumentWordPosition implements Type<DocumentWordPosition> {
25 public String document;
26 public byte[] word;
27 public int position;
28
29 public DocumentWordPosition() {}
30 public DocumentWordPosition(String document, byte[] word, int position) {
31 this.document = document;
32 this.word = word;
33 this.position = position;
34 }
35
36 public String toString() {
37 try {
38 return String.format("%s,%s,%d",
39 document, new String(word, "UTF-8"), position);
40 } catch(UnsupportedEncodingException e) {
41 throw new RuntimeException("Couldn't convert string to UTF-8.");
42 }
43 }
44
45 public Order<DocumentWordPosition> getOrder(String... spec) {
46 if (Arrays.equals(spec, new String[] { "+document" })) {
47 return new DocumentOrder();
48 }
49 if (Arrays.equals(spec, new String[] { "+document", "+word", "+position" })) {
50 return new DocumentWordPositionOrder();
51 }
52 return null;
53 }
54
55 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentWordPosition> {
56 public void process(DocumentWordPosition object) throws IOException;
57 public void close() throws IOException;
58 }
59 public interface Source extends Step {
60 }
61 public static class DocumentOrder implements Order<DocumentWordPosition> {
62 public int hash(DocumentWordPosition object) {
63 int h = 0;
64 h += Utility.hash(object.document);
65 return h;
66 }
67 public Comparator<DocumentWordPosition> greaterThan() {
68 return new Comparator<DocumentWordPosition>() {
69 public int compare(DocumentWordPosition one, DocumentWordPosition two) {
70 int result = 0;
71 do {
72 result = + Utility.compare(one.document, two.document);
73 if(result != 0) break;
74 } while (false);
75 return -result;
76 }
77 };
78 }
79 public Comparator<DocumentWordPosition> lessThan() {
80 return new Comparator<DocumentWordPosition>() {
81 public int compare(DocumentWordPosition one, DocumentWordPosition two) {
82 int result = 0;
83 do {
84 result = + Utility.compare(one.document, two.document);
85 if(result != 0) break;
86 } while (false);
87 return result;
88 }
89 };
90 }
91 public TypeReader<DocumentWordPosition> orderedReader(ArrayInput _input) {
92 return new ShreddedReader(_input);
93 }
94
95 public TypeReader<DocumentWordPosition> orderedReader(ArrayInput _input, int bufferSize) {
96 return new ShreddedReader(_input, bufferSize);
97 }
98 public OrderedWriter<DocumentWordPosition> orderedWriter(ArrayOutput _output) {
99 ShreddedWriter w = new ShreddedWriter(_output);
100 return new OrderedWriterClass(w);
101 }
102 public static class OrderedWriterClass extends OrderedWriter< DocumentWordPosition > {
103 DocumentWordPosition last = null;
104 ShreddedWriter shreddedWriter = null;
105
106 public OrderedWriterClass(ShreddedWriter s) {
107 this.shreddedWriter = s;
108 }
109
110 public void process(DocumentWordPosition object) throws IOException {
111 boolean processAll = false;
112 if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
113 shreddedWriter.processTuple(object.word, object.position);
114 last = object;
115 }
116
117 public void close() throws IOException {
118 shreddedWriter.close();
119 }
120
121 public Class<DocumentWordPosition> getInputClass() {
122 return DocumentWordPosition.class;
123 }
124 }
125 public ReaderSource<DocumentWordPosition> orderedCombiner(Collection<TypeReader<DocumentWordPosition>> readers, boolean closeOnExit) {
126 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
127
128 for (TypeReader<DocumentWordPosition> reader : readers) {
129 shreddedReaders.add((ShreddedReader)reader);
130 }
131
132 return new ShreddedCombiner(shreddedReaders, closeOnExit);
133 }
134 public DocumentWordPosition clone(DocumentWordPosition object) {
135 DocumentWordPosition result = new DocumentWordPosition();
136 if (object == null) return result;
137 result.document = object.document;
138 result.word = object.word;
139 result.position = object.position;
140 return result;
141 }
142 public Class<DocumentWordPosition> getOrderedClass() {
143 return DocumentWordPosition.class;
144 }
145 public String[] getOrderSpec() {
146 return new String[] {"+document"};
147 }
148
149 public static String getSpecString() {
150 return "+document";
151 }
152
153 public interface ShreddedProcessor extends Step {
154 public void processDocument(String document) throws IOException;
155 public void processTuple(byte[] word, int position) throws IOException;
156 public void close() throws IOException;
157 }
158 public interface ShreddedSource extends Step {
159 }
160
161 public static class ShreddedWriter implements ShreddedProcessor {
162 ArrayOutput output;
163 ShreddedBuffer buffer = new ShreddedBuffer();
164 String lastDocument;
165 boolean lastFlush = false;
166
167 public ShreddedWriter(ArrayOutput output) {
168 this.output = output;
169 }
170
171 public void close() throws IOException {
172 flush();
173 }
174
175 public void processDocument(String document) {
176 lastDocument = document;
177 buffer.processDocument(document);
178 }
179 public final void processTuple(byte[] word, int position) throws IOException {
180 if (lastFlush) {
181 if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
182 lastFlush = false;
183 }
184 buffer.processTuple(word, position);
185 if (buffer.isFull())
186 flush();
187 }
188 public final void flushTuples(int pauseIndex) throws IOException {
189
190 while (buffer.getReadIndex() < pauseIndex) {
191
192 output.writeBytes(buffer.getWord());
193 output.writeInt(buffer.getPosition());
194 buffer.incrementTuple();
195 }
196 }
197 public final void flushDocument(int pauseIndex) throws IOException {
198 while (buffer.getReadIndex() < pauseIndex) {
199 int nextPause = buffer.getDocumentEndIndex();
200 int count = nextPause - buffer.getReadIndex();
201
202 output.writeString(buffer.getDocument());
203 output.writeInt(count);
204 buffer.incrementDocument();
205
206 flushTuples(nextPause);
207 assert nextPause == buffer.getReadIndex();
208 }
209 }
210 public void flush() throws IOException {
211 flushDocument(buffer.getWriteIndex());
212 buffer.reset();
213 lastFlush = true;
214 }
215 }
216 public static class ShreddedBuffer {
217 ArrayList<String> documents = new ArrayList();
218 ArrayList<Integer> documentTupleIdx = new ArrayList();
219 int documentReadIdx = 0;
220
221 byte[][] words;
222 int[] positions;
223 int writeTupleIndex = 0;
224 int readTupleIndex = 0;
225 int batchSize;
226
227 public ShreddedBuffer(int batchSize) {
228 this.batchSize = batchSize;
229
230 words = new byte[batchSize][];
231 positions = new int[batchSize];
232 }
233
234 public ShreddedBuffer() {
235 this(10000);
236 }
237
238 public void processDocument(String document) {
239 documents.add(document);
240 documentTupleIdx.add(writeTupleIndex);
241 }
242 public void processTuple(byte[] word, int position) {
243 assert documents.size() > 0;
244 words[writeTupleIndex] = word;
245 positions[writeTupleIndex] = position;
246 writeTupleIndex++;
247 }
248 public void resetData() {
249 documents.clear();
250 documentTupleIdx.clear();
251 writeTupleIndex = 0;
252 }
253
254 public void resetRead() {
255 readTupleIndex = 0;
256 documentReadIdx = 0;
257 }
258
259 public void reset() {
260 resetData();
261 resetRead();
262 }
263 public boolean isFull() {
264 return writeTupleIndex >= batchSize;
265 }
266
267 public boolean isEmpty() {
268 return writeTupleIndex == 0;
269 }
270
271 public boolean isAtEnd() {
272 return readTupleIndex >= writeTupleIndex;
273 }
274 public void incrementDocument() {
275 documentReadIdx++;
276 }
277
278 public void autoIncrementDocument() {
279 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
280 documentReadIdx++;
281 }
282 public void incrementTuple() {
283 readTupleIndex++;
284 }
285 public int getDocumentEndIndex() {
286 if ((documentReadIdx+1) >= documentTupleIdx.size())
287 return writeTupleIndex;
288 return documentTupleIdx.get(documentReadIdx+1);
289 }
290 public int getReadIndex() {
291 return readTupleIndex;
292 }
293
294 public int getWriteIndex() {
295 return writeTupleIndex;
296 }
297 public String getDocument() {
298 assert readTupleIndex < writeTupleIndex;
299 assert documentReadIdx < documents.size();
300
301 return documents.get(documentReadIdx);
302 }
303 public byte[] getWord() {
304 assert readTupleIndex < writeTupleIndex;
305 return words[readTupleIndex];
306 }
307 public int getPosition() {
308 assert readTupleIndex < writeTupleIndex;
309 return positions[readTupleIndex];
310 }
311 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
312 while (getReadIndex() < endIndex) {
313 output.processTuple(getWord(), getPosition());
314 incrementTuple();
315 }
316 }
317 public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
318 while (getReadIndex() < endIndex) {
319 output.processDocument(getDocument());
320 assert getDocumentEndIndex() <= endIndex;
321 copyTuples(getDocumentEndIndex(), output);
322 incrementDocument();
323 }
324 }
325 public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
326 while (!isAtEnd()) {
327 if (other != null) {
328 assert !other.isAtEnd();
329 int c = + Utility.compare(getDocument(), other.getDocument());
330
331 if (c > 0) {
332 break;
333 }
334
335 output.processDocument(getDocument());
336
337 copyTuples(getDocumentEndIndex(), output);
338 } else {
339 output.processDocument(getDocument());
340 copyTuples(getDocumentEndIndex(), output);
341 }
342 incrementDocument();
343
344
345 }
346 }
347 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
348 copyUntilDocument(other, output);
349 }
350
351 }
352 public static class ShreddedCombiner implements ReaderSource<DocumentWordPosition>, ShreddedSource {
353 public ShreddedProcessor processor;
354 Collection<ShreddedReader> readers;
355 boolean closeOnExit = false;
356 boolean uninitialized = true;
357 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
358
359 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
360 this.readers = readers;
361 this.closeOnExit = closeOnExit;
362 }
363
364 public void setProcessor(Step processor) throws IncompatibleProcessorException {
365 if (processor instanceof ShreddedProcessor) {
366 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
367 } else if (processor instanceof DocumentWordPosition.Processor) {
368 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordPosition.Processor) processor));
369 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
370 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordPosition>) processor));
371 } else {
372 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
373 }
374 }
375
376 public Class<DocumentWordPosition> getOutputClass() {
377 return DocumentWordPosition.class;
378 }
379
380 public void initialize() throws IOException {
381 for (ShreddedReader reader : readers) {
382 reader.fill();
383
384 if (!reader.getBuffer().isAtEnd())
385 queue.add(reader);
386 }
387
388 uninitialized = false;
389 }
390
391 public void run() throws IOException {
392 initialize();
393
394 while (queue.size() > 0) {
395 ShreddedReader top = queue.poll();
396 ShreddedReader next = null;
397 ShreddedBuffer nextBuffer = null;
398
399 assert !top.getBuffer().isAtEnd();
400
401 if (queue.size() > 0) {
402 next = queue.peek();
403 nextBuffer = next.getBuffer();
404 assert !nextBuffer.isAtEnd();
405 }
406
407 top.getBuffer().copyUntil(nextBuffer, processor);
408 if (top.getBuffer().isAtEnd())
409 top.fill();
410
411 if (!top.getBuffer().isAtEnd())
412 queue.add(top);
413 }
414
415 if (closeOnExit)
416 processor.close();
417 }
418
419 public DocumentWordPosition read() throws IOException {
420 if (uninitialized)
421 initialize();
422
423 DocumentWordPosition result = null;
424
425 while (queue.size() > 0) {
426 ShreddedReader top = queue.poll();
427 result = top.read();
428
429 if (result != null) {
430 if (top.getBuffer().isAtEnd())
431 top.fill();
432
433 queue.offer(top);
434 break;
435 }
436 }
437
438 return result;
439 }
440 }
441 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordPosition>, ShreddedSource {
442 public ShreddedProcessor processor;
443 ShreddedBuffer buffer;
444 DocumentWordPosition last = new DocumentWordPosition();
445 long updateDocumentCount = -1;
446 long tupleCount = 0;
447 long bufferStartCount = 0;
448 ArrayInput input;
449
450 public ShreddedReader(ArrayInput input) {
451 this.input = input;
452 this.buffer = new ShreddedBuffer();
453 }
454
455 public ShreddedReader(ArrayInput input, int bufferSize) {
456 this.input = input;
457 this.buffer = new ShreddedBuffer(bufferSize);
458 }
459
460 public final int compareTo(ShreddedReader other) {
461 ShreddedBuffer otherBuffer = other.getBuffer();
462
463 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
464 return 0;
465 } else if (buffer.isAtEnd()) {
466 return -1;
467 } else if (otherBuffer.isAtEnd()) {
468 return 1;
469 }
470
471 int result = 0;
472 do {
473 result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
474 if(result != 0) break;
475 } while (false);
476
477 return result;
478 }
479
480 public final ShreddedBuffer getBuffer() {
481 return buffer;
482 }
483
484 public final DocumentWordPosition read() throws IOException {
485 if (buffer.isAtEnd()) {
486 fill();
487
488 if (buffer.isAtEnd()) {
489 return null;
490 }
491 }
492
493 assert !buffer.isAtEnd();
494 DocumentWordPosition result = new DocumentWordPosition();
495
496 result.document = buffer.getDocument();
497 result.word = buffer.getWord();
498 result.position = buffer.getPosition();
499
500 buffer.incrementTuple();
501 buffer.autoIncrementDocument();
502
503 return result;
504 }
505
506 public final void fill() throws IOException {
507 try {
508 buffer.reset();
509
510 if (tupleCount != 0) {
511
512 if(updateDocumentCount - tupleCount > 0) {
513 buffer.documents.add(last.document);
514 buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
515 }
516 bufferStartCount = tupleCount;
517 }
518
519 while (!buffer.isFull()) {
520 updateDocument();
521 buffer.processTuple(input.readBytes(), input.readInt());
522 tupleCount++;
523 }
524 } catch(EOFException e) {}
525 }
526
527 public final void updateDocument() throws IOException {
528 if (updateDocumentCount > tupleCount)
529 return;
530
531 last.document = input.readString();
532 updateDocumentCount = tupleCount + input.readInt();
533
534 buffer.processDocument(last.document);
535 }
536
537 public void run() throws IOException {
538 while (true) {
539 fill();
540
541 if (buffer.isAtEnd())
542 break;
543
544 buffer.copyUntil(null, processor);
545 }
546 processor.close();
547 }
548
549 public void setProcessor(Step processor) throws IncompatibleProcessorException {
550 if (processor instanceof ShreddedProcessor) {
551 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
552 } else if (processor instanceof DocumentWordPosition.Processor) {
553 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordPosition.Processor) processor));
554 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
555 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordPosition>) processor));
556 } else {
557 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
558 }
559 }
560
561 public Class<DocumentWordPosition> getOutputClass() {
562 return DocumentWordPosition.class;
563 }
564 }
565
566 public static class DuplicateEliminator implements ShreddedProcessor {
567 public ShreddedProcessor processor;
568 DocumentWordPosition last = new DocumentWordPosition();
569 boolean documentProcess = true;
570
571 public DuplicateEliminator() {}
572 public DuplicateEliminator(ShreddedProcessor processor) {
573 this.processor = processor;
574 }
575
576 public void setShreddedProcessor(ShreddedProcessor processor) {
577 this.processor = processor;
578 }
579
580 public void processDocument(String document) throws IOException {
581 if (documentProcess || Utility.compare(document, last.document) != 0) {
582 last.document = document;
583 processor.processDocument(document);
584 documentProcess = false;
585 }
586 }
587
588 public void resetDocument() {
589 documentProcess = true;
590 }
591
592 public void processTuple(byte[] word, int position) throws IOException {
593 processor.processTuple(word, position);
594 }
595
596 public void close() throws IOException {
597 processor.close();
598 }
599 }
600 public static class TupleUnshredder implements ShreddedProcessor {
601 DocumentWordPosition last = new DocumentWordPosition();
602 public org.galagosearch.tupleflow.Processor<DocumentWordPosition> processor;
603
604 public TupleUnshredder(DocumentWordPosition.Processor processor) {
605 this.processor = processor;
606 }
607
608 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordPosition> processor) {
609 this.processor = processor;
610 }
611
612 public DocumentWordPosition clone(DocumentWordPosition object) {
613 DocumentWordPosition result = new DocumentWordPosition();
614 if (object == null) return result;
615 result.document = object.document;
616 result.word = object.word;
617 result.position = object.position;
618 return result;
619 }
620
621 public void processDocument(String document) throws IOException {
622 last.document = document;
623 }
624
625
626 public void processTuple(byte[] word, int position) throws IOException {
627 last.word = word;
628 last.position = position;
629 processor.process(clone(last));
630 }
631
632 public void close() throws IOException {
633 processor.close();
634 }
635 }
636 public static class TupleShredder implements Processor {
637 DocumentWordPosition last = new DocumentWordPosition();
638 public ShreddedProcessor processor;
639
640 public TupleShredder(ShreddedProcessor processor) {
641 this.processor = processor;
642 }
643
644 public DocumentWordPosition clone(DocumentWordPosition object) {
645 DocumentWordPosition result = new DocumentWordPosition();
646 if (object == null) return result;
647 result.document = object.document;
648 result.word = object.word;
649 result.position = object.position;
650 return result;
651 }
652
653 public void process(DocumentWordPosition object) throws IOException {
654 boolean processAll = false;
655 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
656 processor.processTuple(object.word, object.position);
657 }
658
659 public Class<DocumentWordPosition> getInputClass() {
660 return DocumentWordPosition.class;
661 }
662
663 public void close() throws IOException {
664 processor.close();
665 }
666 }
667 }
668 public static class DocumentWordPositionOrder implements Order<DocumentWordPosition> {
669 public int hash(DocumentWordPosition object) {
670 int h = 0;
671 h += Utility.hash(object.document);
672 h += Utility.hash(object.word);
673 h += Utility.hash(object.position);
674 return h;
675 }
676 public Comparator<DocumentWordPosition> greaterThan() {
677 return new Comparator<DocumentWordPosition>() {
678 public int compare(DocumentWordPosition one, DocumentWordPosition two) {
679 int result = 0;
680 do {
681 result = + Utility.compare(one.document, two.document);
682 if(result != 0) break;
683 result = + Utility.compare(one.word, two.word);
684 if(result != 0) break;
685 result = + Utility.compare(one.position, two.position);
686 if(result != 0) break;
687 } while (false);
688 return -result;
689 }
690 };
691 }
692 public Comparator<DocumentWordPosition> lessThan() {
693 return new Comparator<DocumentWordPosition>() {
694 public int compare(DocumentWordPosition one, DocumentWordPosition two) {
695 int result = 0;
696 do {
697 result = + Utility.compare(one.document, two.document);
698 if(result != 0) break;
699 result = + Utility.compare(one.word, two.word);
700 if(result != 0) break;
701 result = + Utility.compare(one.position, two.position);
702 if(result != 0) break;
703 } while (false);
704 return result;
705 }
706 };
707 }
708 public TypeReader<DocumentWordPosition> orderedReader(ArrayInput _input) {
709 return new ShreddedReader(_input);
710 }
711
712 public TypeReader<DocumentWordPosition> orderedReader(ArrayInput _input, int bufferSize) {
713 return new ShreddedReader(_input, bufferSize);
714 }
715 public OrderedWriter<DocumentWordPosition> orderedWriter(ArrayOutput _output) {
716 ShreddedWriter w = new ShreddedWriter(_output);
717 return new OrderedWriterClass(w);
718 }
719 public static class OrderedWriterClass extends OrderedWriter< DocumentWordPosition > {
720 DocumentWordPosition last = null;
721 ShreddedWriter shreddedWriter = null;
722
723 public OrderedWriterClass(ShreddedWriter s) {
724 this.shreddedWriter = s;
725 }
726
727 public void process(DocumentWordPosition object) throws IOException {
728 boolean processAll = false;
729 if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
730 if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
731 if (processAll || last == null || 0 != Utility.compare(object.position, last.position)) { processAll = true; shreddedWriter.processPosition(object.position); }
732 shreddedWriter.processTuple();
733 last = object;
734 }
735
736 public void close() throws IOException {
737 shreddedWriter.close();
738 }
739
740 public Class<DocumentWordPosition> getInputClass() {
741 return DocumentWordPosition.class;
742 }
743 }
744 public ReaderSource<DocumentWordPosition> orderedCombiner(Collection<TypeReader<DocumentWordPosition>> readers, boolean closeOnExit) {
745 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
746
747 for (TypeReader<DocumentWordPosition> reader : readers) {
748 shreddedReaders.add((ShreddedReader)reader);
749 }
750
751 return new ShreddedCombiner(shreddedReaders, closeOnExit);
752 }
753 public DocumentWordPosition clone(DocumentWordPosition object) {
754 DocumentWordPosition result = new DocumentWordPosition();
755 if (object == null) return result;
756 result.document = object.document;
757 result.word = object.word;
758 result.position = object.position;
759 return result;
760 }
761 public Class<DocumentWordPosition> getOrderedClass() {
762 return DocumentWordPosition.class;
763 }
764 public String[] getOrderSpec() {
765 return new String[] {"+document", "+word", "+position"};
766 }
767
768 public static String getSpecString() {
769 return "+document +word +position";
770 }
771
772 public interface ShreddedProcessor extends Step {
773 public void processDocument(String document) throws IOException;
774 public void processWord(byte[] word) throws IOException;
775 public void processPosition(int position) throws IOException;
776 public void processTuple() throws IOException;
777 public void close() throws IOException;
778 }
779 public interface ShreddedSource extends Step {
780 }
781
782 public static class ShreddedWriter implements ShreddedProcessor {
783 ArrayOutput output;
784 ShreddedBuffer buffer = new ShreddedBuffer();
785 String lastDocument;
786 byte[] lastWord;
787 int lastPosition;
788 boolean lastFlush = false;
789
790 public ShreddedWriter(ArrayOutput output) {
791 this.output = output;
792 }
793
794 public void close() throws IOException {
795 flush();
796 }
797
798 public void processDocument(String document) {
799 lastDocument = document;
800 buffer.processDocument(document);
801 }
802 public void processWord(byte[] word) {
803 lastWord = word;
804 buffer.processWord(word);
805 }
806 public void processPosition(int position) {
807 lastPosition = position;
808 buffer.processPosition(position);
809 }
810 public final void processTuple() throws IOException {
811 if (lastFlush) {
812 if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
813 if(buffer.words.size() == 0) buffer.processWord(lastWord);
814 if(buffer.positions.size() == 0) buffer.processPosition(lastPosition);
815 lastFlush = false;
816 }
817 buffer.processTuple();
818 if (buffer.isFull())
819 flush();
820 }
821 public final void flushTuples(int pauseIndex) throws IOException {
822
823 while (buffer.getReadIndex() < pauseIndex) {
824
825 buffer.incrementTuple();
826 }
827 }
828 public final void flushDocument(int pauseIndex) throws IOException {
829 while (buffer.getReadIndex() < pauseIndex) {
830 int nextPause = buffer.getDocumentEndIndex();
831 int count = nextPause - buffer.getReadIndex();
832
833 output.writeString(buffer.getDocument());
834 output.writeInt(count);
835 buffer.incrementDocument();
836
837 flushWord(nextPause);
838 assert nextPause == buffer.getReadIndex();
839 }
840 }
841 public final void flushWord(int pauseIndex) throws IOException {
842 while (buffer.getReadIndex() < pauseIndex) {
843 int nextPause = buffer.getWordEndIndex();
844 int count = nextPause - buffer.getReadIndex();
845
846 output.writeBytes(buffer.getWord());
847 output.writeInt(count);
848 buffer.incrementWord();
849
850 flushPosition(nextPause);
851 assert nextPause == buffer.getReadIndex();
852 }
853 }
854 public final void flushPosition(int pauseIndex) throws IOException {
855 while (buffer.getReadIndex() < pauseIndex) {
856 int nextPause = buffer.getPositionEndIndex();
857 int count = nextPause - buffer.getReadIndex();
858
859 output.writeInt(buffer.getPosition());
860 output.writeInt(count);
861 buffer.incrementPosition();
862
863 flushTuples(nextPause);
864 assert nextPause == buffer.getReadIndex();
865 }
866 }
867 public void flush() throws IOException {
868 flushDocument(buffer.getWriteIndex());
869 buffer.reset();
870 lastFlush = true;
871 }
872 }
873 public static class ShreddedBuffer {
874 ArrayList<String> documents = new ArrayList();
875 ArrayList<byte[]> words = new ArrayList();
876 ArrayList<Integer> positions = new ArrayList();
877 ArrayList<Integer> documentTupleIdx = new ArrayList();
878 ArrayList<Integer> wordTupleIdx = new ArrayList();
879 ArrayList<Integer> positionTupleIdx = new ArrayList();
880 int documentReadIdx = 0;
881 int wordReadIdx = 0;
882 int positionReadIdx = 0;
883
884 int writeTupleIndex = 0;
885 int readTupleIndex = 0;
886 int batchSize;
887
888 public ShreddedBuffer(int batchSize) {
889 this.batchSize = batchSize;
890
891 }
892
893 public ShreddedBuffer() {
894 this(10000);
895 }
896
897 public void processDocument(String document) {
898 documents.add(document);
899 documentTupleIdx.add(writeTupleIndex);
900 }
901 public void processWord(byte[] word) {
902 words.add(word);
903 wordTupleIdx.add(writeTupleIndex);
904 }
905 public void processPosition(int position) {
906 positions.add(position);
907 positionTupleIdx.add(writeTupleIndex);
908 }
909 public void processTuple() {
910 assert documents.size() > 0;
911 assert words.size() > 0;
912 assert positions.size() > 0;
913 writeTupleIndex++;
914 }
915 public void resetData() {
916 documents.clear();
917 words.clear();
918 positions.clear();
919 documentTupleIdx.clear();
920 wordTupleIdx.clear();
921 positionTupleIdx.clear();
922 writeTupleIndex = 0;
923 }
924
925 public void resetRead() {
926 readTupleIndex = 0;
927 documentReadIdx = 0;
928 wordReadIdx = 0;
929 positionReadIdx = 0;
930 }
931
932 public void reset() {
933 resetData();
934 resetRead();
935 }
936 public boolean isFull() {
937 return writeTupleIndex >= batchSize;
938 }
939
940 public boolean isEmpty() {
941 return writeTupleIndex == 0;
942 }
943
944 public boolean isAtEnd() {
945 return readTupleIndex >= writeTupleIndex;
946 }
947 public void incrementDocument() {
948 documentReadIdx++;
949 }
950
951 public void autoIncrementDocument() {
952 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
953 documentReadIdx++;
954 }
955 public void incrementWord() {
956 wordReadIdx++;
957 }
958
959 public void autoIncrementWord() {
960 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
961 wordReadIdx++;
962 }
963 public void incrementPosition() {
964 positionReadIdx++;
965 }
966
967 public void autoIncrementPosition() {
968 while (readTupleIndex >= getPositionEndIndex() && readTupleIndex < writeTupleIndex)
969 positionReadIdx++;
970 }
971 public void incrementTuple() {
972 readTupleIndex++;
973 }
974 public int getDocumentEndIndex() {
975 if ((documentReadIdx+1) >= documentTupleIdx.size())
976 return writeTupleIndex;
977 return documentTupleIdx.get(documentReadIdx+1);
978 }
979
980 public int getWordEndIndex() {
981 if ((wordReadIdx+1) >= wordTupleIdx.size())
982 return writeTupleIndex;
983 return wordTupleIdx.get(wordReadIdx+1);
984 }
985
986 public int getPositionEndIndex() {
987 if ((positionReadIdx+1) >= positionTupleIdx.size())
988 return writeTupleIndex;
989 return positionTupleIdx.get(positionReadIdx+1);
990 }
991 public int getReadIndex() {
992 return readTupleIndex;
993 }
994
995 public int getWriteIndex() {
996 return writeTupleIndex;
997 }
998 public String getDocument() {
999 assert readTupleIndex < writeTupleIndex;
1000 assert documentReadIdx < documents.size();
1001
1002 return documents.get(documentReadIdx);
1003 }
1004 public byte[] getWord() {
1005 assert readTupleIndex < writeTupleIndex;
1006 assert wordReadIdx < words.size();
1007
1008 return words.get(wordReadIdx);
1009 }
1010 public int getPosition() {
1011 assert readTupleIndex < writeTupleIndex;
1012 assert positionReadIdx < positions.size();
1013
1014 return positions.get(positionReadIdx);
1015 }
1016
1017 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1018 while (getReadIndex() < endIndex) {
1019 output.processTuple();
1020 incrementTuple();
1021 }
1022 }
1023 public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
1024 while (getReadIndex() < endIndex) {
1025 output.processDocument(getDocument());
1026 assert getDocumentEndIndex() <= endIndex;
1027 copyUntilIndexWord(getDocumentEndIndex(), output);
1028 incrementDocument();
1029 }
1030 }
1031 public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
1032 while (getReadIndex() < endIndex) {
1033 output.processWord(getWord());
1034 assert getWordEndIndex() <= endIndex;
1035 copyUntilIndexPosition(getWordEndIndex(), output);
1036 incrementWord();
1037 }
1038 }
1039 public void copyUntilIndexPosition(int endIndex, ShreddedProcessor output) throws IOException {
1040 while (getReadIndex() < endIndex) {
1041 output.processPosition(getPosition());
1042 assert getPositionEndIndex() <= endIndex;
1043 copyTuples(getPositionEndIndex(), output);
1044 incrementPosition();
1045 }
1046 }
1047 public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1048 while (!isAtEnd()) {
1049 if (other != null) {
1050 assert !other.isAtEnd();
1051 int c = + Utility.compare(getDocument(), other.getDocument());
1052
1053 if (c > 0) {
1054 break;
1055 }
1056
1057 output.processDocument(getDocument());
1058
1059 if (c < 0) {
1060 copyUntilIndexWord(getDocumentEndIndex(), output);
1061 } else if (c == 0) {
1062 copyUntilWord(other, output);
1063 autoIncrementDocument();
1064 break;
1065 }
1066 } else {
1067 output.processDocument(getDocument());
1068 copyUntilIndexWord(getDocumentEndIndex(), output);
1069 }
1070 incrementDocument();
1071
1072
1073 }
1074 }
1075 public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1076 while (!isAtEnd()) {
1077 if (other != null) {
1078 assert !other.isAtEnd();
1079 int c = + Utility.compare(getWord(), other.getWord());
1080
1081 if (c > 0) {
1082 break;
1083 }
1084
1085 output.processWord(getWord());
1086
1087 if (c < 0) {
1088 copyUntilIndexPosition(getWordEndIndex(), output);
1089 } else if (c == 0) {
1090 copyUntilPosition(other, output);
1091 autoIncrementWord();
1092 break;
1093 }
1094 } else {
1095 output.processWord(getWord());
1096 copyUntilIndexPosition(getWordEndIndex(), output);
1097 }
1098 incrementWord();
1099
1100 if (getDocumentEndIndex() <= readTupleIndex)
1101 break;
1102 }
1103 }
1104 public void copyUntilPosition(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1105 while (!isAtEnd()) {
1106 if (other != null) {
1107 assert !other.isAtEnd();
1108 int c = + Utility.compare(getPosition(), other.getPosition());
1109
1110 if (c > 0) {
1111 break;
1112 }
1113
1114 output.processPosition(getPosition());
1115
1116 copyTuples(getPositionEndIndex(), output);
1117 } else {
1118 output.processPosition(getPosition());
1119 copyTuples(getPositionEndIndex(), output);
1120 }
1121 incrementPosition();
1122
1123 if (getWordEndIndex() <= readTupleIndex)
1124 break;
1125 }
1126 }
1127 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1128 copyUntilDocument(other, output);
1129 }
1130
1131 }
1132 public static class ShreddedCombiner implements ReaderSource<DocumentWordPosition>, ShreddedSource {
1133 public ShreddedProcessor processor;
1134 Collection<ShreddedReader> readers;
1135 boolean closeOnExit = false;
1136 boolean uninitialized = true;
1137 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1138
1139 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1140 this.readers = readers;
1141 this.closeOnExit = closeOnExit;
1142 }
1143
1144 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1145 if (processor instanceof ShreddedProcessor) {
1146 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1147 } else if (processor instanceof DocumentWordPosition.Processor) {
1148 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordPosition.Processor) processor));
1149 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1150 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordPosition>) processor));
1151 } else {
1152 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1153 }
1154 }
1155
1156 public Class<DocumentWordPosition> getOutputClass() {
1157 return DocumentWordPosition.class;
1158 }
1159
1160 public void initialize() throws IOException {
1161 for (ShreddedReader reader : readers) {
1162 reader.fill();
1163
1164 if (!reader.getBuffer().isAtEnd())
1165 queue.add(reader);
1166 }
1167
1168 uninitialized = false;
1169 }
1170
1171 public void run() throws IOException {
1172 initialize();
1173
1174 while (queue.size() > 0) {
1175 ShreddedReader top = queue.poll();
1176 ShreddedReader next = null;
1177 ShreddedBuffer nextBuffer = null;
1178
1179 assert !top.getBuffer().isAtEnd();
1180
1181 if (queue.size() > 0) {
1182 next = queue.peek();
1183 nextBuffer = next.getBuffer();
1184 assert !nextBuffer.isAtEnd();
1185 }
1186
1187 top.getBuffer().copyUntil(nextBuffer, processor);
1188 if (top.getBuffer().isAtEnd())
1189 top.fill();
1190
1191 if (!top.getBuffer().isAtEnd())
1192 queue.add(top);
1193 }
1194
1195 if (closeOnExit)
1196 processor.close();
1197 }
1198
1199 public DocumentWordPosition read() throws IOException {
1200 if (uninitialized)
1201 initialize();
1202
1203 DocumentWordPosition result = null;
1204
1205 while (queue.size() > 0) {
1206 ShreddedReader top = queue.poll();
1207 result = top.read();
1208
1209 if (result != null) {
1210 if (top.getBuffer().isAtEnd())
1211 top.fill();
1212
1213 queue.offer(top);
1214 break;
1215 }
1216 }
1217
1218 return result;
1219 }
1220 }
1221 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordPosition>, ShreddedSource {
1222 public ShreddedProcessor processor;
1223 ShreddedBuffer buffer;
1224 DocumentWordPosition last = new DocumentWordPosition();
1225 long updateDocumentCount = -1;
1226 long updateWordCount = -1;
1227 long updatePositionCount = -1;
1228 long tupleCount = 0;
1229 long bufferStartCount = 0;
1230 ArrayInput input;
1231
1232 public ShreddedReader(ArrayInput input) {
1233 this.input = input;
1234 this.buffer = new ShreddedBuffer();
1235 }
1236
1237 public ShreddedReader(ArrayInput input, int bufferSize) {
1238 this.input = input;
1239 this.buffer = new ShreddedBuffer(bufferSize);
1240 }
1241
1242 public final int compareTo(ShreddedReader other) {
1243 ShreddedBuffer otherBuffer = other.getBuffer();
1244
1245 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1246 return 0;
1247 } else if (buffer.isAtEnd()) {
1248 return -1;
1249 } else if (otherBuffer.isAtEnd()) {
1250 return 1;
1251 }
1252
1253 int result = 0;
1254 do {
1255 result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
1256 if(result != 0) break;
1257 result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
1258 if(result != 0) break;
1259 result = + Utility.compare(buffer.getPosition(), otherBuffer.getPosition());
1260 if(result != 0) break;
1261 } while (false);
1262
1263 return result;
1264 }
1265
1266 public final ShreddedBuffer getBuffer() {
1267 return buffer;
1268 }
1269
1270 public final DocumentWordPosition read() throws IOException {
1271 if (buffer.isAtEnd()) {
1272 fill();
1273
1274 if (buffer.isAtEnd()) {
1275 return null;
1276 }
1277 }
1278
1279 assert !buffer.isAtEnd();
1280 DocumentWordPosition result = new DocumentWordPosition();
1281
1282 result.document = buffer.getDocument();
1283 result.word = buffer.getWord();
1284 result.position = buffer.getPosition();
1285
1286 buffer.incrementTuple();
1287 buffer.autoIncrementDocument();
1288 buffer.autoIncrementWord();
1289 buffer.autoIncrementPosition();
1290
1291 return result;
1292 }
1293
1294 public final void fill() throws IOException {
1295 try {
1296 buffer.reset();
1297
1298 if (tupleCount != 0) {
1299
1300 if(updateDocumentCount - tupleCount > 0) {
1301 buffer.documents.add(last.document);
1302 buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
1303 }
1304 if(updateWordCount - tupleCount > 0) {
1305 buffer.words.add(last.word);
1306 buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
1307 }
1308 if(updatePositionCount - tupleCount > 0) {
1309 buffer.positions.add(last.position);
1310 buffer.positionTupleIdx.add((int) (updatePositionCount - tupleCount));
1311 }
1312 bufferStartCount = tupleCount;
1313 }
1314
1315 while (!buffer.isFull()) {
1316 updatePosition();
1317 buffer.processTuple();
1318 tupleCount++;
1319 }
1320 } catch(EOFException e) {}
1321 }
1322
1323 public final void updateDocument() throws IOException {
1324 if (updateDocumentCount > tupleCount)
1325 return;
1326
1327 last.document = input.readString();
1328 updateDocumentCount = tupleCount + input.readInt();
1329
1330 buffer.processDocument(last.document);
1331 }
1332 public final void updateWord() throws IOException {
1333 if (updateWordCount > tupleCount)
1334 return;
1335
1336 updateDocument();
1337 last.word = input.readBytes();
1338 updateWordCount = tupleCount + input.readInt();
1339
1340 buffer.processWord(last.word);
1341 }
1342 public final void updatePosition() throws IOException {
1343 if (updatePositionCount > tupleCount)
1344 return;
1345
1346 updateWord();
1347 last.position = input.readInt();
1348 updatePositionCount = tupleCount + input.readInt();
1349
1350 buffer.processPosition(last.position);
1351 }
1352
1353 public void run() throws IOException {
1354 while (true) {
1355 fill();
1356
1357 if (buffer.isAtEnd())
1358 break;
1359
1360 buffer.copyUntil(null, processor);
1361 }
1362 processor.close();
1363 }
1364
1365 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1366 if (processor instanceof ShreddedProcessor) {
1367 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1368 } else if (processor instanceof DocumentWordPosition.Processor) {
1369 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordPosition.Processor) processor));
1370 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1371 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordPosition>) processor));
1372 } else {
1373 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1374 }
1375 }
1376
1377 public Class<DocumentWordPosition> getOutputClass() {
1378 return DocumentWordPosition.class;
1379 }
1380 }
1381
1382 public static class DuplicateEliminator implements ShreddedProcessor {
1383 public ShreddedProcessor processor;
1384 DocumentWordPosition last = new DocumentWordPosition();
1385 boolean documentProcess = true;
1386 boolean wordProcess = true;
1387 boolean positionProcess = true;
1388
1389 public DuplicateEliminator() {}
1390 public DuplicateEliminator(ShreddedProcessor processor) {
1391 this.processor = processor;
1392 }
1393
1394 public void setShreddedProcessor(ShreddedProcessor processor) {
1395 this.processor = processor;
1396 }
1397
1398 public void processDocument(String document) throws IOException {
1399 if (documentProcess || Utility.compare(document, last.document) != 0) {
1400 last.document = document;
1401 processor.processDocument(document);
1402 resetWord();
1403 documentProcess = false;
1404 }
1405 }
1406 public void processWord(byte[] word) throws IOException {
1407 if (wordProcess || Utility.compare(word, last.word) != 0) {
1408 last.word = word;
1409 processor.processWord(word);
1410 resetPosition();
1411 wordProcess = false;
1412 }
1413 }
1414 public void processPosition(int position) throws IOException {
1415 if (positionProcess || Utility.compare(position, last.position) != 0) {
1416 last.position = position;
1417 processor.processPosition(position);
1418 positionProcess = false;
1419 }
1420 }
1421
1422 public void resetDocument() {
1423 documentProcess = true;
1424 resetWord();
1425 }
1426 public void resetWord() {
1427 wordProcess = true;
1428 resetPosition();
1429 }
1430 public void resetPosition() {
1431 positionProcess = true;
1432 }
1433
1434 public void processTuple() throws IOException {
1435 processor.processTuple();
1436 }
1437
1438 public void close() throws IOException {
1439 processor.close();
1440 }
1441 }
1442 public static class TupleUnshredder implements ShreddedProcessor {
1443 DocumentWordPosition last = new DocumentWordPosition();
1444 public org.galagosearch.tupleflow.Processor<DocumentWordPosition> processor;
1445
1446 public TupleUnshredder(DocumentWordPosition.Processor processor) {
1447 this.processor = processor;
1448 }
1449
1450 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordPosition> processor) {
1451 this.processor = processor;
1452 }
1453
1454 public DocumentWordPosition clone(DocumentWordPosition object) {
1455 DocumentWordPosition result = new DocumentWordPosition();
1456 if (object == null) return result;
1457 result.document = object.document;
1458 result.word = object.word;
1459 result.position = object.position;
1460 return result;
1461 }
1462
1463 public void processDocument(String document) throws IOException {
1464 last.document = document;
1465 }
1466
1467 public void processWord(byte[] word) throws IOException {
1468 last.word = word;
1469 }
1470
1471 public void processPosition(int position) throws IOException {
1472 last.position = position;
1473 }
1474
1475
1476 public void processTuple() throws IOException {
1477 processor.process(clone(last));
1478 }
1479
1480 public void close() throws IOException {
1481 processor.close();
1482 }
1483 }
1484 public static class TupleShredder implements Processor {
1485 DocumentWordPosition last = new DocumentWordPosition();
1486 public ShreddedProcessor processor;
1487
1488 public TupleShredder(ShreddedProcessor processor) {
1489 this.processor = processor;
1490 }
1491
1492 public DocumentWordPosition clone(DocumentWordPosition object) {
1493 DocumentWordPosition result = new DocumentWordPosition();
1494 if (object == null) return result;
1495 result.document = object.document;
1496 result.word = object.word;
1497 result.position = object.position;
1498 return result;
1499 }
1500
1501 public void process(DocumentWordPosition object) throws IOException {
1502 boolean processAll = false;
1503 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
1504 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
1505 if(last == null || Utility.compare(last.position, object.position) != 0 || processAll) { processor.processPosition(object.position); processAll = true; }
1506 processor.processTuple();
1507 }
1508
1509 public Class<DocumentWordPosition> getInputClass() {
1510 return DocumentWordPosition.class;
1511 }
1512
1513 public void close() throws IOException {
1514 processor.close();
1515 }
1516 }
1517 }
1518 }