1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class NumberedDocumentData implements Type<NumberedDocumentData> {
25 public String identifier;
26 public String url;
27 public int number;
28 public int textLength;
29
30 public NumberedDocumentData() {}
31 public NumberedDocumentData(String identifier, String url, int number, int textLength) {
32 this.identifier = identifier;
33 this.url = url;
34 this.number = number;
35 this.textLength = textLength;
36 }
37
38 public String toString() {
39 return String.format("%s,%s,%d,%d",
40 identifier, url, number, textLength);
41 }
42
43 public Order<NumberedDocumentData> getOrder(String... spec) {
44 if (Arrays.equals(spec, new String[] { })) {
45 return new Unordered();
46 }
47 if (Arrays.equals(spec, new String[] { "+number" })) {
48 return new NumberOrder();
49 }
50 return null;
51 }
52
53 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<NumberedDocumentData> {
54 public void process(NumberedDocumentData object) throws IOException;
55 public void close() throws IOException;
56 }
57 public interface Source extends Step {
58 }
59 public static class Unordered implements Order<NumberedDocumentData> {
60 public int hash(NumberedDocumentData object) {
61 int h = 0;
62 return h;
63 }
64 public Comparator<NumberedDocumentData> greaterThan() {
65 return new Comparator<NumberedDocumentData>() {
66 public int compare(NumberedDocumentData one, NumberedDocumentData two) {
67 int result = 0;
68 do {
69 } while (false);
70 return -result;
71 }
72 };
73 }
74 public Comparator<NumberedDocumentData> lessThan() {
75 return new Comparator<NumberedDocumentData>() {
76 public int compare(NumberedDocumentData one, NumberedDocumentData two) {
77 int result = 0;
78 do {
79 } while (false);
80 return result;
81 }
82 };
83 }
84 public TypeReader<NumberedDocumentData> orderedReader(ArrayInput _input) {
85 return new ShreddedReader(_input);
86 }
87
88 public TypeReader<NumberedDocumentData> orderedReader(ArrayInput _input, int bufferSize) {
89 return new ShreddedReader(_input, bufferSize);
90 }
91 public OrderedWriter<NumberedDocumentData> orderedWriter(ArrayOutput _output) {
92 ShreddedWriter w = new ShreddedWriter(_output);
93 return new OrderedWriterClass(w);
94 }
95 public static class OrderedWriterClass extends OrderedWriter< NumberedDocumentData > {
96 NumberedDocumentData last = null;
97 ShreddedWriter shreddedWriter = null;
98
99 public OrderedWriterClass(ShreddedWriter s) {
100 this.shreddedWriter = s;
101 }
102
103 public void process(NumberedDocumentData object) throws IOException {
104 boolean processAll = false;
105 shreddedWriter.processTuple(object.identifier, object.url, object.number, object.textLength);
106 last = object;
107 }
108
109 public void close() throws IOException {
110 shreddedWriter.close();
111 }
112
113 public Class<NumberedDocumentData> getInputClass() {
114 return NumberedDocumentData.class;
115 }
116 }
117 public ReaderSource<NumberedDocumentData> orderedCombiner(Collection<TypeReader<NumberedDocumentData>> readers, boolean closeOnExit) {
118 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
119
120 for (TypeReader<NumberedDocumentData> reader : readers) {
121 shreddedReaders.add((ShreddedReader)reader);
122 }
123
124 return new ShreddedCombiner(shreddedReaders, closeOnExit);
125 }
126 public NumberedDocumentData clone(NumberedDocumentData object) {
127 NumberedDocumentData result = new NumberedDocumentData();
128 if (object == null) return result;
129 result.identifier = object.identifier;
130 result.url = object.url;
131 result.number = object.number;
132 result.textLength = object.textLength;
133 return result;
134 }
135 public Class<NumberedDocumentData> getOrderedClass() {
136 return NumberedDocumentData.class;
137 }
138 public String[] getOrderSpec() {
139 return new String[] {};
140 }
141
142 public static String getSpecString() {
143 return "";
144 }
145
146 public interface ShreddedProcessor extends Step {
147 public void processTuple(String identifier, String url, int number, int textLength) throws IOException;
148 public void close() throws IOException;
149 }
150 public interface ShreddedSource extends Step {
151 }
152
153 public static class ShreddedWriter implements ShreddedProcessor {
154 ArrayOutput output;
155 ShreddedBuffer buffer = new ShreddedBuffer();
156 boolean lastFlush = false;
157
158 public ShreddedWriter(ArrayOutput output) {
159 this.output = output;
160 }
161
162 public void close() throws IOException {
163 flush();
164 }
165
166 public final void processTuple(String identifier, String url, int number, int textLength) throws IOException {
167 if (lastFlush) {
168 lastFlush = false;
169 }
170 buffer.processTuple(identifier, url, number, textLength);
171 if (buffer.isFull())
172 flush();
173 }
174 public final void flushTuples(int pauseIndex) throws IOException {
175
176 while (buffer.getReadIndex() < pauseIndex) {
177
178 output.writeString(buffer.getIdentifier());
179 output.writeString(buffer.getUrl());
180 output.writeInt(buffer.getNumber());
181 output.writeInt(buffer.getTextLength());
182 buffer.incrementTuple();
183 }
184 }
185 public void flush() throws IOException {
186 flushTuples(buffer.getWriteIndex());
187 buffer.reset();
188 lastFlush = true;
189 }
190 }
191 public static class ShreddedBuffer {
192
193 String[] identifiers;
194 String[] urls;
195 int[] numbers;
196 int[] textLengths;
197 int writeTupleIndex = 0;
198 int readTupleIndex = 0;
199 int batchSize;
200
201 public ShreddedBuffer(int batchSize) {
202 this.batchSize = batchSize;
203
204 identifiers = new String[batchSize];
205 urls = new String[batchSize];
206 numbers = new int[batchSize];
207 textLengths = new int[batchSize];
208 }
209
210 public ShreddedBuffer() {
211 this(10000);
212 }
213
214 public void processTuple(String identifier, String url, int number, int textLength) {
215 identifiers[writeTupleIndex] = identifier;
216 urls[writeTupleIndex] = url;
217 numbers[writeTupleIndex] = number;
218 textLengths[writeTupleIndex] = textLength;
219 writeTupleIndex++;
220 }
221 public void resetData() {
222 writeTupleIndex = 0;
223 }
224
225 public void resetRead() {
226 readTupleIndex = 0;
227 }
228
229 public void reset() {
230 resetData();
231 resetRead();
232 }
233 public boolean isFull() {
234 return writeTupleIndex >= batchSize;
235 }
236
237 public boolean isEmpty() {
238 return writeTupleIndex == 0;
239 }
240
241 public boolean isAtEnd() {
242 return readTupleIndex >= writeTupleIndex;
243 }
244 public void incrementTuple() {
245 readTupleIndex++;
246 }
247 public int getReadIndex() {
248 return readTupleIndex;
249 }
250
251 public int getWriteIndex() {
252 return writeTupleIndex;
253 }
254 public String getIdentifier() {
255 assert readTupleIndex < writeTupleIndex;
256 return identifiers[readTupleIndex];
257 }
258 public String getUrl() {
259 assert readTupleIndex < writeTupleIndex;
260 return urls[readTupleIndex];
261 }
262 public int getNumber() {
263 assert readTupleIndex < writeTupleIndex;
264 return numbers[readTupleIndex];
265 }
266 public int getTextLength() {
267 assert readTupleIndex < writeTupleIndex;
268 return textLengths[readTupleIndex];
269 }
270 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
271 while (getReadIndex() < endIndex) {
272 output.processTuple(getIdentifier(), getUrl(), getNumber(), getTextLength());
273 incrementTuple();
274 }
275 }
276
277 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
278 }
279
280 }
281 public static class ShreddedCombiner implements ReaderSource<NumberedDocumentData>, ShreddedSource {
282 public ShreddedProcessor processor;
283 Collection<ShreddedReader> readers;
284 boolean closeOnExit = false;
285 boolean uninitialized = true;
286 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
287
288 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
289 this.readers = readers;
290 this.closeOnExit = closeOnExit;
291 }
292
293 public void setProcessor(Step processor) throws IncompatibleProcessorException {
294 if (processor instanceof ShreddedProcessor) {
295 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
296 } else if (processor instanceof NumberedDocumentData.Processor) {
297 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedDocumentData.Processor) processor));
298 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
299 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedDocumentData>) processor));
300 } else {
301 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
302 }
303 }
304
305 public Class<NumberedDocumentData> getOutputClass() {
306 return NumberedDocumentData.class;
307 }
308
309 public void initialize() throws IOException {
310 for (ShreddedReader reader : readers) {
311 reader.fill();
312
313 if (!reader.getBuffer().isAtEnd())
314 queue.add(reader);
315 }
316
317 uninitialized = false;
318 }
319
320 public void run() throws IOException {
321 initialize();
322
323 while (queue.size() > 0) {
324 ShreddedReader top = queue.poll();
325 ShreddedReader next = null;
326 ShreddedBuffer nextBuffer = null;
327
328 assert !top.getBuffer().isAtEnd();
329
330 if (queue.size() > 0) {
331 next = queue.peek();
332 nextBuffer = next.getBuffer();
333 assert !nextBuffer.isAtEnd();
334 }
335
336 top.getBuffer().copyUntil(nextBuffer, processor);
337 if (top.getBuffer().isAtEnd())
338 top.fill();
339
340 if (!top.getBuffer().isAtEnd())
341 queue.add(top);
342 }
343
344 if (closeOnExit)
345 processor.close();
346 }
347
348 public NumberedDocumentData read() throws IOException {
349 if (uninitialized)
350 initialize();
351
352 NumberedDocumentData result = null;
353
354 while (queue.size() > 0) {
355 ShreddedReader top = queue.poll();
356 result = top.read();
357
358 if (result != null) {
359 if (top.getBuffer().isAtEnd())
360 top.fill();
361
362 queue.offer(top);
363 break;
364 }
365 }
366
367 return result;
368 }
369 }
370 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedDocumentData>, ShreddedSource {
371 public ShreddedProcessor processor;
372 ShreddedBuffer buffer;
373 NumberedDocumentData last = new NumberedDocumentData();
374 long tupleCount = 0;
375 long bufferStartCount = 0;
376 ArrayInput input;
377
378 public ShreddedReader(ArrayInput input) {
379 this.input = input;
380 this.buffer = new ShreddedBuffer();
381 }
382
383 public ShreddedReader(ArrayInput input, int bufferSize) {
384 this.input = input;
385 this.buffer = new ShreddedBuffer(bufferSize);
386 }
387
388 public final int compareTo(ShreddedReader other) {
389 ShreddedBuffer otherBuffer = other.getBuffer();
390
391 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
392 return 0;
393 } else if (buffer.isAtEnd()) {
394 return -1;
395 } else if (otherBuffer.isAtEnd()) {
396 return 1;
397 }
398
399 int result = 0;
400 do {
401 } while (false);
402
403 return result;
404 }
405
406 public final ShreddedBuffer getBuffer() {
407 return buffer;
408 }
409
410 public final NumberedDocumentData read() throws IOException {
411 if (buffer.isAtEnd()) {
412 fill();
413
414 if (buffer.isAtEnd()) {
415 return null;
416 }
417 }
418
419 assert !buffer.isAtEnd();
420 NumberedDocumentData result = new NumberedDocumentData();
421
422 result.identifier = buffer.getIdentifier();
423 result.url = buffer.getUrl();
424 result.number = buffer.getNumber();
425 result.textLength = buffer.getTextLength();
426
427 buffer.incrementTuple();
428
429 return result;
430 }
431
432 public final void fill() throws IOException {
433 try {
434 buffer.reset();
435
436 if (tupleCount != 0) {
437 bufferStartCount = tupleCount;
438 }
439
440 while (!buffer.isFull()) {
441 buffer.processTuple(input.readString(), input.readString(), input.readInt(), input.readInt());
442 tupleCount++;
443 }
444 } catch(EOFException e) {}
445 }
446
447
448 public void run() throws IOException {
449 while (true) {
450 fill();
451
452 if (buffer.isAtEnd())
453 break;
454
455 buffer.copyUntil(null, processor);
456 }
457 processor.close();
458 }
459
460 public void setProcessor(Step processor) throws IncompatibleProcessorException {
461 if (processor instanceof ShreddedProcessor) {
462 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
463 } else if (processor instanceof NumberedDocumentData.Processor) {
464 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedDocumentData.Processor) processor));
465 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
466 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedDocumentData>) processor));
467 } else {
468 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
469 }
470 }
471
472 public Class<NumberedDocumentData> getOutputClass() {
473 return NumberedDocumentData.class;
474 }
475 }
476
477 public static class DuplicateEliminator implements ShreddedProcessor {
478 public ShreddedProcessor processor;
479 NumberedDocumentData last = new NumberedDocumentData();
480
481 public DuplicateEliminator() {}
482 public DuplicateEliminator(ShreddedProcessor processor) {
483 this.processor = processor;
484 }
485
486 public void setShreddedProcessor(ShreddedProcessor processor) {
487 this.processor = processor;
488 }
489
490
491
492
493 public void processTuple(String identifier, String url, int number, int textLength) throws IOException {
494 processor.processTuple(identifier, url, number, textLength);
495 }
496
497 public void close() throws IOException {
498 processor.close();
499 }
500 }
501 public static class TupleUnshredder implements ShreddedProcessor {
502 NumberedDocumentData last = new NumberedDocumentData();
503 public org.galagosearch.tupleflow.Processor<NumberedDocumentData> processor;
504
505 public TupleUnshredder(NumberedDocumentData.Processor processor) {
506 this.processor = processor;
507 }
508
509 public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedDocumentData> processor) {
510 this.processor = processor;
511 }
512
513 public NumberedDocumentData clone(NumberedDocumentData object) {
514 NumberedDocumentData result = new NumberedDocumentData();
515 if (object == null) return result;
516 result.identifier = object.identifier;
517 result.url = object.url;
518 result.number = object.number;
519 result.textLength = object.textLength;
520 return result;
521 }
522
523
524 public void processTuple(String identifier, String url, int number, int textLength) throws IOException {
525 last.identifier = identifier;
526 last.url = url;
527 last.number = number;
528 last.textLength = textLength;
529 processor.process(clone(last));
530 }
531
532 public void close() throws IOException {
533 processor.close();
534 }
535 }
536 public static class TupleShredder implements Processor {
537 NumberedDocumentData last = new NumberedDocumentData();
538 public ShreddedProcessor processor;
539
540 public TupleShredder(ShreddedProcessor processor) {
541 this.processor = processor;
542 }
543
544 public NumberedDocumentData clone(NumberedDocumentData object) {
545 NumberedDocumentData result = new NumberedDocumentData();
546 if (object == null) return result;
547 result.identifier = object.identifier;
548 result.url = object.url;
549 result.number = object.number;
550 result.textLength = object.textLength;
551 return result;
552 }
553
554 public void process(NumberedDocumentData object) throws IOException {
555 boolean processAll = false;
556 processor.processTuple(object.identifier, object.url, object.number, object.textLength);
557 }
558
559 public Class<NumberedDocumentData> getInputClass() {
560 return NumberedDocumentData.class;
561 }
562
563 public void close() throws IOException {
564 processor.close();
565 }
566 }
567 }
568 public static class NumberOrder implements Order<NumberedDocumentData> {
569 public int hash(NumberedDocumentData object) {
570 int h = 0;
571 h += Utility.hash(object.number);
572 return h;
573 }
574 public Comparator<NumberedDocumentData> greaterThan() {
575 return new Comparator<NumberedDocumentData>() {
576 public int compare(NumberedDocumentData one, NumberedDocumentData two) {
577 int result = 0;
578 do {
579 result = + Utility.compare(one.number, two.number);
580 if(result != 0) break;
581 } while (false);
582 return -result;
583 }
584 };
585 }
586 public Comparator<NumberedDocumentData> lessThan() {
587 return new Comparator<NumberedDocumentData>() {
588 public int compare(NumberedDocumentData one, NumberedDocumentData two) {
589 int result = 0;
590 do {
591 result = + Utility.compare(one.number, two.number);
592 if(result != 0) break;
593 } while (false);
594 return result;
595 }
596 };
597 }
598 public TypeReader<NumberedDocumentData> orderedReader(ArrayInput _input) {
599 return new ShreddedReader(_input);
600 }
601
602 public TypeReader<NumberedDocumentData> orderedReader(ArrayInput _input, int bufferSize) {
603 return new ShreddedReader(_input, bufferSize);
604 }
605 public OrderedWriter<NumberedDocumentData> orderedWriter(ArrayOutput _output) {
606 ShreddedWriter w = new ShreddedWriter(_output);
607 return new OrderedWriterClass(w);
608 }
609 public static class OrderedWriterClass extends OrderedWriter< NumberedDocumentData > {
610 NumberedDocumentData last = null;
611 ShreddedWriter shreddedWriter = null;
612
613 public OrderedWriterClass(ShreddedWriter s) {
614 this.shreddedWriter = s;
615 }
616
617 public void process(NumberedDocumentData object) throws IOException {
618 boolean processAll = false;
619 if (processAll || last == null || 0 != Utility.compare(object.number, last.number)) { processAll = true; shreddedWriter.processNumber(object.number); }
620 shreddedWriter.processTuple(object.identifier, object.url, object.textLength);
621 last = object;
622 }
623
624 public void close() throws IOException {
625 shreddedWriter.close();
626 }
627
628 public Class<NumberedDocumentData> getInputClass() {
629 return NumberedDocumentData.class;
630 }
631 }
632 public ReaderSource<NumberedDocumentData> orderedCombiner(Collection<TypeReader<NumberedDocumentData>> readers, boolean closeOnExit) {
633 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
634
635 for (TypeReader<NumberedDocumentData> reader : readers) {
636 shreddedReaders.add((ShreddedReader)reader);
637 }
638
639 return new ShreddedCombiner(shreddedReaders, closeOnExit);
640 }
641 public NumberedDocumentData clone(NumberedDocumentData object) {
642 NumberedDocumentData result = new NumberedDocumentData();
643 if (object == null) return result;
644 result.identifier = object.identifier;
645 result.url = object.url;
646 result.number = object.number;
647 result.textLength = object.textLength;
648 return result;
649 }
650 public Class<NumberedDocumentData> getOrderedClass() {
651 return NumberedDocumentData.class;
652 }
653 public String[] getOrderSpec() {
654 return new String[] {"+number"};
655 }
656
657 public static String getSpecString() {
658 return "+number";
659 }
660
661 public interface ShreddedProcessor extends Step {
662 public void processNumber(int number) throws IOException;
663 public void processTuple(String identifier, String url, int textLength) throws IOException;
664 public void close() throws IOException;
665 }
666 public interface ShreddedSource extends Step {
667 }
668
669 public static class ShreddedWriter implements ShreddedProcessor {
670 ArrayOutput output;
671 ShreddedBuffer buffer = new ShreddedBuffer();
672 int lastNumber;
673 boolean lastFlush = false;
674
675 public ShreddedWriter(ArrayOutput output) {
676 this.output = output;
677 }
678
679 public void close() throws IOException {
680 flush();
681 }
682
683 public void processNumber(int number) {
684 lastNumber = number;
685 buffer.processNumber(number);
686 }
687 public final void processTuple(String identifier, String url, int textLength) throws IOException {
688 if (lastFlush) {
689 if(buffer.numbers.size() == 0) buffer.processNumber(lastNumber);
690 lastFlush = false;
691 }
692 buffer.processTuple(identifier, url, textLength);
693 if (buffer.isFull())
694 flush();
695 }
696 public final void flushTuples(int pauseIndex) throws IOException {
697
698 while (buffer.getReadIndex() < pauseIndex) {
699
700 output.writeString(buffer.getIdentifier());
701 output.writeString(buffer.getUrl());
702 output.writeInt(buffer.getTextLength());
703 buffer.incrementTuple();
704 }
705 }
706 public final void flushNumber(int pauseIndex) throws IOException {
707 while (buffer.getReadIndex() < pauseIndex) {
708 int nextPause = buffer.getNumberEndIndex();
709 int count = nextPause - buffer.getReadIndex();
710
711 output.writeInt(buffer.getNumber());
712 output.writeInt(count);
713 buffer.incrementNumber();
714
715 flushTuples(nextPause);
716 assert nextPause == buffer.getReadIndex();
717 }
718 }
719 public void flush() throws IOException {
720 flushNumber(buffer.getWriteIndex());
721 buffer.reset();
722 lastFlush = true;
723 }
724 }
725 public static class ShreddedBuffer {
726 ArrayList<Integer> numbers = new ArrayList();
727 ArrayList<Integer> numberTupleIdx = new ArrayList();
728 int numberReadIdx = 0;
729
730 String[] identifiers;
731 String[] urls;
732 int[] textLengths;
733 int writeTupleIndex = 0;
734 int readTupleIndex = 0;
735 int batchSize;
736
737 public ShreddedBuffer(int batchSize) {
738 this.batchSize = batchSize;
739
740 identifiers = new String[batchSize];
741 urls = new String[batchSize];
742 textLengths = new int[batchSize];
743 }
744
745 public ShreddedBuffer() {
746 this(10000);
747 }
748
749 public void processNumber(int number) {
750 numbers.add(number);
751 numberTupleIdx.add(writeTupleIndex);
752 }
753 public void processTuple(String identifier, String url, int textLength) {
754 assert numbers.size() > 0;
755 identifiers[writeTupleIndex] = identifier;
756 urls[writeTupleIndex] = url;
757 textLengths[writeTupleIndex] = textLength;
758 writeTupleIndex++;
759 }
760 public void resetData() {
761 numbers.clear();
762 numberTupleIdx.clear();
763 writeTupleIndex = 0;
764 }
765
766 public void resetRead() {
767 readTupleIndex = 0;
768 numberReadIdx = 0;
769 }
770
771 public void reset() {
772 resetData();
773 resetRead();
774 }
775 public boolean isFull() {
776 return writeTupleIndex >= batchSize;
777 }
778
779 public boolean isEmpty() {
780 return writeTupleIndex == 0;
781 }
782
783 public boolean isAtEnd() {
784 return readTupleIndex >= writeTupleIndex;
785 }
786 public void incrementNumber() {
787 numberReadIdx++;
788 }
789
790 public void autoIncrementNumber() {
791 while (readTupleIndex >= getNumberEndIndex() && readTupleIndex < writeTupleIndex)
792 numberReadIdx++;
793 }
794 public void incrementTuple() {
795 readTupleIndex++;
796 }
797 public int getNumberEndIndex() {
798 if ((numberReadIdx+1) >= numberTupleIdx.size())
799 return writeTupleIndex;
800 return numberTupleIdx.get(numberReadIdx+1);
801 }
802 public int getReadIndex() {
803 return readTupleIndex;
804 }
805
806 public int getWriteIndex() {
807 return writeTupleIndex;
808 }
809 public int getNumber() {
810 assert readTupleIndex < writeTupleIndex;
811 assert numberReadIdx < numbers.size();
812
813 return numbers.get(numberReadIdx);
814 }
815 public String getIdentifier() {
816 assert readTupleIndex < writeTupleIndex;
817 return identifiers[readTupleIndex];
818 }
819 public String getUrl() {
820 assert readTupleIndex < writeTupleIndex;
821 return urls[readTupleIndex];
822 }
823 public int getTextLength() {
824 assert readTupleIndex < writeTupleIndex;
825 return textLengths[readTupleIndex];
826 }
827 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
828 while (getReadIndex() < endIndex) {
829 output.processTuple(getIdentifier(), getUrl(), getTextLength());
830 incrementTuple();
831 }
832 }
833 public void copyUntilIndexNumber(int endIndex, ShreddedProcessor output) throws IOException {
834 while (getReadIndex() < endIndex) {
835 output.processNumber(getNumber());
836 assert getNumberEndIndex() <= endIndex;
837 copyTuples(getNumberEndIndex(), output);
838 incrementNumber();
839 }
840 }
841 public void copyUntilNumber(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
842 while (!isAtEnd()) {
843 if (other != null) {
844 assert !other.isAtEnd();
845 int c = + Utility.compare(getNumber(), other.getNumber());
846
847 if (c > 0) {
848 break;
849 }
850
851 output.processNumber(getNumber());
852
853 copyTuples(getNumberEndIndex(), output);
854 } else {
855 output.processNumber(getNumber());
856 copyTuples(getNumberEndIndex(), output);
857 }
858 incrementNumber();
859
860
861 }
862 }
863 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
864 copyUntilNumber(other, output);
865 }
866
867 }
868 public static class ShreddedCombiner implements ReaderSource<NumberedDocumentData>, ShreddedSource {
869 public ShreddedProcessor processor;
870 Collection<ShreddedReader> readers;
871 boolean closeOnExit = false;
872 boolean uninitialized = true;
873 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
874
875 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
876 this.readers = readers;
877 this.closeOnExit = closeOnExit;
878 }
879
880 public void setProcessor(Step processor) throws IncompatibleProcessorException {
881 if (processor instanceof ShreddedProcessor) {
882 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
883 } else if (processor instanceof NumberedDocumentData.Processor) {
884 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedDocumentData.Processor) processor));
885 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
886 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedDocumentData>) processor));
887 } else {
888 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
889 }
890 }
891
892 public Class<NumberedDocumentData> getOutputClass() {
893 return NumberedDocumentData.class;
894 }
895
896 public void initialize() throws IOException {
897 for (ShreddedReader reader : readers) {
898 reader.fill();
899
900 if (!reader.getBuffer().isAtEnd())
901 queue.add(reader);
902 }
903
904 uninitialized = false;
905 }
906
907 public void run() throws IOException {
908 initialize();
909
910 while (queue.size() > 0) {
911 ShreddedReader top = queue.poll();
912 ShreddedReader next = null;
913 ShreddedBuffer nextBuffer = null;
914
915 assert !top.getBuffer().isAtEnd();
916
917 if (queue.size() > 0) {
918 next = queue.peek();
919 nextBuffer = next.getBuffer();
920 assert !nextBuffer.isAtEnd();
921 }
922
923 top.getBuffer().copyUntil(nextBuffer, processor);
924 if (top.getBuffer().isAtEnd())
925 top.fill();
926
927 if (!top.getBuffer().isAtEnd())
928 queue.add(top);
929 }
930
931 if (closeOnExit)
932 processor.close();
933 }
934
935 public NumberedDocumentData read() throws IOException {
936 if (uninitialized)
937 initialize();
938
939 NumberedDocumentData result = null;
940
941 while (queue.size() > 0) {
942 ShreddedReader top = queue.poll();
943 result = top.read();
944
945 if (result != null) {
946 if (top.getBuffer().isAtEnd())
947 top.fill();
948
949 queue.offer(top);
950 break;
951 }
952 }
953
954 return result;
955 }
956 }
957 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedDocumentData>, ShreddedSource {
958 public ShreddedProcessor processor;
959 ShreddedBuffer buffer;
960 NumberedDocumentData last = new NumberedDocumentData();
961 long updateNumberCount = -1;
962 long tupleCount = 0;
963 long bufferStartCount = 0;
964 ArrayInput input;
965
966 public ShreddedReader(ArrayInput input) {
967 this.input = input;
968 this.buffer = new ShreddedBuffer();
969 }
970
971 public ShreddedReader(ArrayInput input, int bufferSize) {
972 this.input = input;
973 this.buffer = new ShreddedBuffer(bufferSize);
974 }
975
976 public final int compareTo(ShreddedReader other) {
977 ShreddedBuffer otherBuffer = other.getBuffer();
978
979 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
980 return 0;
981 } else if (buffer.isAtEnd()) {
982 return -1;
983 } else if (otherBuffer.isAtEnd()) {
984 return 1;
985 }
986
987 int result = 0;
988 do {
989 result = + Utility.compare(buffer.getNumber(), otherBuffer.getNumber());
990 if(result != 0) break;
991 } while (false);
992
993 return result;
994 }
995
996 public final ShreddedBuffer getBuffer() {
997 return buffer;
998 }
999
1000 public final NumberedDocumentData read() throws IOException {
1001 if (buffer.isAtEnd()) {
1002 fill();
1003
1004 if (buffer.isAtEnd()) {
1005 return null;
1006 }
1007 }
1008
1009 assert !buffer.isAtEnd();
1010 NumberedDocumentData result = new NumberedDocumentData();
1011
1012 result.number = buffer.getNumber();
1013 result.identifier = buffer.getIdentifier();
1014 result.url = buffer.getUrl();
1015 result.textLength = buffer.getTextLength();
1016
1017 buffer.incrementTuple();
1018 buffer.autoIncrementNumber();
1019
1020 return result;
1021 }
1022
1023 public final void fill() throws IOException {
1024 try {
1025 buffer.reset();
1026
1027 if (tupleCount != 0) {
1028
1029 if(updateNumberCount - tupleCount > 0) {
1030 buffer.numbers.add(last.number);
1031 buffer.numberTupleIdx.add((int) (updateNumberCount - tupleCount));
1032 }
1033 bufferStartCount = tupleCount;
1034 }
1035
1036 while (!buffer.isFull()) {
1037 updateNumber();
1038 buffer.processTuple(input.readString(), input.readString(), input.readInt());
1039 tupleCount++;
1040 }
1041 } catch(EOFException e) {}
1042 }
1043
1044 public final void updateNumber() throws IOException {
1045 if (updateNumberCount > tupleCount)
1046 return;
1047
1048 last.number = input.readInt();
1049 updateNumberCount = tupleCount + input.readInt();
1050
1051 buffer.processNumber(last.number);
1052 }
1053
1054 public void run() throws IOException {
1055 while (true) {
1056 fill();
1057
1058 if (buffer.isAtEnd())
1059 break;
1060
1061 buffer.copyUntil(null, processor);
1062 }
1063 processor.close();
1064 }
1065
1066 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1067 if (processor instanceof ShreddedProcessor) {
1068 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1069 } else if (processor instanceof NumberedDocumentData.Processor) {
1070 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedDocumentData.Processor) processor));
1071 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1072 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedDocumentData>) processor));
1073 } else {
1074 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1075 }
1076 }
1077
1078 public Class<NumberedDocumentData> getOutputClass() {
1079 return NumberedDocumentData.class;
1080 }
1081 }
1082
1083 public static class DuplicateEliminator implements ShreddedProcessor {
1084 public ShreddedProcessor processor;
1085 NumberedDocumentData last = new NumberedDocumentData();
1086 boolean numberProcess = true;
1087
1088 public DuplicateEliminator() {}
1089 public DuplicateEliminator(ShreddedProcessor processor) {
1090 this.processor = processor;
1091 }
1092
1093 public void setShreddedProcessor(ShreddedProcessor processor) {
1094 this.processor = processor;
1095 }
1096
1097 public void processNumber(int number) throws IOException {
1098 if (numberProcess || Utility.compare(number, last.number) != 0) {
1099 last.number = number;
1100 processor.processNumber(number);
1101 numberProcess = false;
1102 }
1103 }
1104
1105 public void resetNumber() {
1106 numberProcess = true;
1107 }
1108
1109 public void processTuple(String identifier, String url, int textLength) throws IOException {
1110 processor.processTuple(identifier, url, textLength);
1111 }
1112
1113 public void close() throws IOException {
1114 processor.close();
1115 }
1116 }
1117 public static class TupleUnshredder implements ShreddedProcessor {
1118 NumberedDocumentData last = new NumberedDocumentData();
1119 public org.galagosearch.tupleflow.Processor<NumberedDocumentData> processor;
1120
1121 public TupleUnshredder(NumberedDocumentData.Processor processor) {
1122 this.processor = processor;
1123 }
1124
1125 public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedDocumentData> processor) {
1126 this.processor = processor;
1127 }
1128
1129 public NumberedDocumentData clone(NumberedDocumentData object) {
1130 NumberedDocumentData result = new NumberedDocumentData();
1131 if (object == null) return result;
1132 result.identifier = object.identifier;
1133 result.url = object.url;
1134 result.number = object.number;
1135 result.textLength = object.textLength;
1136 return result;
1137 }
1138
1139 public void processNumber(int number) throws IOException {
1140 last.number = number;
1141 }
1142
1143
1144 public void processTuple(String identifier, String url, int textLength) throws IOException {
1145 last.identifier = identifier;
1146 last.url = url;
1147 last.textLength = textLength;
1148 processor.process(clone(last));
1149 }
1150
1151 public void close() throws IOException {
1152 processor.close();
1153 }
1154 }
1155 public static class TupleShredder implements Processor {
1156 NumberedDocumentData last = new NumberedDocumentData();
1157 public ShreddedProcessor processor;
1158
1159 public TupleShredder(ShreddedProcessor processor) {
1160 this.processor = processor;
1161 }
1162
1163 public NumberedDocumentData clone(NumberedDocumentData object) {
1164 NumberedDocumentData result = new NumberedDocumentData();
1165 if (object == null) return result;
1166 result.identifier = object.identifier;
1167 result.url = object.url;
1168 result.number = object.number;
1169 result.textLength = object.textLength;
1170 return result;
1171 }
1172
1173 public void process(NumberedDocumentData object) throws IOException {
1174 boolean processAll = false;
1175 if(last == null || Utility.compare(last.number, object.number) != 0 || processAll) { processor.processNumber(object.number); processAll = true; }
1176 processor.processTuple(object.identifier, object.url, object.textLength);
1177 }
1178
1179 public Class<NumberedDocumentData> getInputClass() {
1180 return NumberedDocumentData.class;
1181 }
1182
1183 public void close() throws IOException {
1184 processor.close();
1185 }
1186 }
1187 }
1188 }