1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class DocumentData implements Type<DocumentData> {
25 public String identifier;
26 public String url;
27 public int textLength;
28
29 public DocumentData() {}
30 public DocumentData(String identifier, String url, int textLength) {
31 this.identifier = identifier;
32 this.url = url;
33 this.textLength = textLength;
34 }
35
36 public String toString() {
37 return String.format("%s,%s,%d",
38 identifier, url, textLength);
39 }
40
41 public Order<DocumentData> getOrder(String... spec) {
42 if (Arrays.equals(spec, new String[] { })) {
43 return new Unordered();
44 }
45 if (Arrays.equals(spec, new String[] { "+url" })) {
46 return new UrlOrder();
47 }
48 if (Arrays.equals(spec, new String[] { "+identifier" })) {
49 return new IdentifierOrder();
50 }
51 return null;
52 }
53
54 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentData> {
55 public void process(DocumentData object) throws IOException;
56 public void close() throws IOException;
57 }
58 public interface Source extends Step {
59 }
60 public static class Unordered implements Order<DocumentData> {
61 public int hash(DocumentData object) {
62 int h = 0;
63 return h;
64 }
65 public Comparator<DocumentData> greaterThan() {
66 return new Comparator<DocumentData>() {
67 public int compare(DocumentData one, DocumentData two) {
68 int result = 0;
69 do {
70 } while (false);
71 return -result;
72 }
73 };
74 }
75 public Comparator<DocumentData> lessThan() {
76 return new Comparator<DocumentData>() {
77 public int compare(DocumentData one, DocumentData two) {
78 int result = 0;
79 do {
80 } while (false);
81 return result;
82 }
83 };
84 }
85 public TypeReader<DocumentData> orderedReader(ArrayInput _input) {
86 return new ShreddedReader(_input);
87 }
88
89 public TypeReader<DocumentData> orderedReader(ArrayInput _input, int bufferSize) {
90 return new ShreddedReader(_input, bufferSize);
91 }
92 public OrderedWriter<DocumentData> orderedWriter(ArrayOutput _output) {
93 ShreddedWriter w = new ShreddedWriter(_output);
94 return new OrderedWriterClass(w);
95 }
96 public static class OrderedWriterClass extends OrderedWriter< DocumentData > {
97 DocumentData last = null;
98 ShreddedWriter shreddedWriter = null;
99
100 public OrderedWriterClass(ShreddedWriter s) {
101 this.shreddedWriter = s;
102 }
103
104 public void process(DocumentData object) throws IOException {
105 boolean processAll = false;
106 shreddedWriter.processTuple(object.identifier, object.url, object.textLength);
107 last = object;
108 }
109
110 public void close() throws IOException {
111 shreddedWriter.close();
112 }
113
114 public Class<DocumentData> getInputClass() {
115 return DocumentData.class;
116 }
117 }
118 public ReaderSource<DocumentData> orderedCombiner(Collection<TypeReader<DocumentData>> readers, boolean closeOnExit) {
119 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
120
121 for (TypeReader<DocumentData> reader : readers) {
122 shreddedReaders.add((ShreddedReader)reader);
123 }
124
125 return new ShreddedCombiner(shreddedReaders, closeOnExit);
126 }
127 public DocumentData clone(DocumentData object) {
128 DocumentData result = new DocumentData();
129 if (object == null) return result;
130 result.identifier = object.identifier;
131 result.url = object.url;
132 result.textLength = object.textLength;
133 return result;
134 }
135 public Class<DocumentData> getOrderedClass() {
136 return DocumentData.class;
137 }
138 public String[] getOrderSpec() {
139 return new String[] {};
140 }
141
142 public static String getSpecString() {
143 return "";
144 }
145
146 public interface ShreddedProcessor extends Step {
147 public void processTuple(String identifier, String url, int textLength) throws IOException;
148 public void close() throws IOException;
149 }
150 public interface ShreddedSource extends Step {
151 }
152
153 public static class ShreddedWriter implements ShreddedProcessor {
154 ArrayOutput output;
155 ShreddedBuffer buffer = new ShreddedBuffer();
156 boolean lastFlush = false;
157
158 public ShreddedWriter(ArrayOutput output) {
159 this.output = output;
160 }
161
162 public void close() throws IOException {
163 flush();
164 }
165
166 public final void processTuple(String identifier, String url, int textLength) throws IOException {
167 if (lastFlush) {
168 lastFlush = false;
169 }
170 buffer.processTuple(identifier, url, textLength);
171 if (buffer.isFull())
172 flush();
173 }
174 public final void flushTuples(int pauseIndex) throws IOException {
175
176 while (buffer.getReadIndex() < pauseIndex) {
177
178 output.writeString(buffer.getIdentifier());
179 output.writeString(buffer.getUrl());
180 output.writeInt(buffer.getTextLength());
181 buffer.incrementTuple();
182 }
183 }
184 public void flush() throws IOException {
185 flushTuples(buffer.getWriteIndex());
186 buffer.reset();
187 lastFlush = true;
188 }
189 }
190 public static class ShreddedBuffer {
191
192 String[] identifiers;
193 String[] urls;
194 int[] textLengths;
195 int writeTupleIndex = 0;
196 int readTupleIndex = 0;
197 int batchSize;
198
199 public ShreddedBuffer(int batchSize) {
200 this.batchSize = batchSize;
201
202 identifiers = new String[batchSize];
203 urls = new String[batchSize];
204 textLengths = new int[batchSize];
205 }
206
207 public ShreddedBuffer() {
208 this(10000);
209 }
210
211 public void processTuple(String identifier, String url, int textLength) {
212 identifiers[writeTupleIndex] = identifier;
213 urls[writeTupleIndex] = url;
214 textLengths[writeTupleIndex] = textLength;
215 writeTupleIndex++;
216 }
217 public void resetData() {
218 writeTupleIndex = 0;
219 }
220
221 public void resetRead() {
222 readTupleIndex = 0;
223 }
224
225 public void reset() {
226 resetData();
227 resetRead();
228 }
229 public boolean isFull() {
230 return writeTupleIndex >= batchSize;
231 }
232
233 public boolean isEmpty() {
234 return writeTupleIndex == 0;
235 }
236
237 public boolean isAtEnd() {
238 return readTupleIndex >= writeTupleIndex;
239 }
240 public void incrementTuple() {
241 readTupleIndex++;
242 }
243 public int getReadIndex() {
244 return readTupleIndex;
245 }
246
247 public int getWriteIndex() {
248 return writeTupleIndex;
249 }
250 public String getIdentifier() {
251 assert readTupleIndex < writeTupleIndex;
252 return identifiers[readTupleIndex];
253 }
254 public String getUrl() {
255 assert readTupleIndex < writeTupleIndex;
256 return urls[readTupleIndex];
257 }
258 public int getTextLength() {
259 assert readTupleIndex < writeTupleIndex;
260 return textLengths[readTupleIndex];
261 }
262 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
263 while (getReadIndex() < endIndex) {
264 output.processTuple(getIdentifier(), getUrl(), getTextLength());
265 incrementTuple();
266 }
267 }
268
269 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
270 }
271
272 }
273 public static class ShreddedCombiner implements ReaderSource<DocumentData>, ShreddedSource {
274 public ShreddedProcessor processor;
275 Collection<ShreddedReader> readers;
276 boolean closeOnExit = false;
277 boolean uninitialized = true;
278 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
279
280 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
281 this.readers = readers;
282 this.closeOnExit = closeOnExit;
283 }
284
285 public void setProcessor(Step processor) throws IncompatibleProcessorException {
286 if (processor instanceof ShreddedProcessor) {
287 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
288 } else if (processor instanceof DocumentData.Processor) {
289 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
290 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
291 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
292 } else {
293 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
294 }
295 }
296
297 public Class<DocumentData> getOutputClass() {
298 return DocumentData.class;
299 }
300
301 public void initialize() throws IOException {
302 for (ShreddedReader reader : readers) {
303 reader.fill();
304
305 if (!reader.getBuffer().isAtEnd())
306 queue.add(reader);
307 }
308
309 uninitialized = false;
310 }
311
312 public void run() throws IOException {
313 initialize();
314
315 while (queue.size() > 0) {
316 ShreddedReader top = queue.poll();
317 ShreddedReader next = null;
318 ShreddedBuffer nextBuffer = null;
319
320 assert !top.getBuffer().isAtEnd();
321
322 if (queue.size() > 0) {
323 next = queue.peek();
324 nextBuffer = next.getBuffer();
325 assert !nextBuffer.isAtEnd();
326 }
327
328 top.getBuffer().copyUntil(nextBuffer, processor);
329 if (top.getBuffer().isAtEnd())
330 top.fill();
331
332 if (!top.getBuffer().isAtEnd())
333 queue.add(top);
334 }
335
336 if (closeOnExit)
337 processor.close();
338 }
339
340 public DocumentData read() throws IOException {
341 if (uninitialized)
342 initialize();
343
344 DocumentData result = null;
345
346 while (queue.size() > 0) {
347 ShreddedReader top = queue.poll();
348 result = top.read();
349
350 if (result != null) {
351 if (top.getBuffer().isAtEnd())
352 top.fill();
353
354 queue.offer(top);
355 break;
356 }
357 }
358
359 return result;
360 }
361 }
362 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentData>, ShreddedSource {
363 public ShreddedProcessor processor;
364 ShreddedBuffer buffer;
365 DocumentData last = new DocumentData();
366 long tupleCount = 0;
367 long bufferStartCount = 0;
368 ArrayInput input;
369
370 public ShreddedReader(ArrayInput input) {
371 this.input = input;
372 this.buffer = new ShreddedBuffer();
373 }
374
375 public ShreddedReader(ArrayInput input, int bufferSize) {
376 this.input = input;
377 this.buffer = new ShreddedBuffer(bufferSize);
378 }
379
380 public final int compareTo(ShreddedReader other) {
381 ShreddedBuffer otherBuffer = other.getBuffer();
382
383 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
384 return 0;
385 } else if (buffer.isAtEnd()) {
386 return -1;
387 } else if (otherBuffer.isAtEnd()) {
388 return 1;
389 }
390
391 int result = 0;
392 do {
393 } while (false);
394
395 return result;
396 }
397
398 public final ShreddedBuffer getBuffer() {
399 return buffer;
400 }
401
402 public final DocumentData read() throws IOException {
403 if (buffer.isAtEnd()) {
404 fill();
405
406 if (buffer.isAtEnd()) {
407 return null;
408 }
409 }
410
411 assert !buffer.isAtEnd();
412 DocumentData result = new DocumentData();
413
414 result.identifier = buffer.getIdentifier();
415 result.url = buffer.getUrl();
416 result.textLength = buffer.getTextLength();
417
418 buffer.incrementTuple();
419
420 return result;
421 }
422
423 public final void fill() throws IOException {
424 try {
425 buffer.reset();
426
427 if (tupleCount != 0) {
428 bufferStartCount = tupleCount;
429 }
430
431 while (!buffer.isFull()) {
432 buffer.processTuple(input.readString(), input.readString(), input.readInt());
433 tupleCount++;
434 }
435 } catch(EOFException e) {}
436 }
437
438
439 public void run() throws IOException {
440 while (true) {
441 fill();
442
443 if (buffer.isAtEnd())
444 break;
445
446 buffer.copyUntil(null, processor);
447 }
448 processor.close();
449 }
450
451 public void setProcessor(Step processor) throws IncompatibleProcessorException {
452 if (processor instanceof ShreddedProcessor) {
453 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
454 } else if (processor instanceof DocumentData.Processor) {
455 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
456 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
457 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
458 } else {
459 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
460 }
461 }
462
463 public Class<DocumentData> getOutputClass() {
464 return DocumentData.class;
465 }
466 }
467
468 public static class DuplicateEliminator implements ShreddedProcessor {
469 public ShreddedProcessor processor;
470 DocumentData last = new DocumentData();
471
472 public DuplicateEliminator() {}
473 public DuplicateEliminator(ShreddedProcessor processor) {
474 this.processor = processor;
475 }
476
477 public void setShreddedProcessor(ShreddedProcessor processor) {
478 this.processor = processor;
479 }
480
481
482
483
484 public void processTuple(String identifier, String url, int textLength) throws IOException {
485 processor.processTuple(identifier, url, textLength);
486 }
487
488 public void close() throws IOException {
489 processor.close();
490 }
491 }
492 public static class TupleUnshredder implements ShreddedProcessor {
493 DocumentData last = new DocumentData();
494 public org.galagosearch.tupleflow.Processor<DocumentData> processor;
495
496 public TupleUnshredder(DocumentData.Processor processor) {
497 this.processor = processor;
498 }
499
500 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentData> processor) {
501 this.processor = processor;
502 }
503
504 public DocumentData clone(DocumentData object) {
505 DocumentData result = new DocumentData();
506 if (object == null) return result;
507 result.identifier = object.identifier;
508 result.url = object.url;
509 result.textLength = object.textLength;
510 return result;
511 }
512
513
514 public void processTuple(String identifier, String url, int textLength) throws IOException {
515 last.identifier = identifier;
516 last.url = url;
517 last.textLength = textLength;
518 processor.process(clone(last));
519 }
520
521 public void close() throws IOException {
522 processor.close();
523 }
524 }
525 public static class TupleShredder implements Processor {
526 DocumentData last = new DocumentData();
527 public ShreddedProcessor processor;
528
529 public TupleShredder(ShreddedProcessor processor) {
530 this.processor = processor;
531 }
532
533 public DocumentData clone(DocumentData object) {
534 DocumentData result = new DocumentData();
535 if (object == null) return result;
536 result.identifier = object.identifier;
537 result.url = object.url;
538 result.textLength = object.textLength;
539 return result;
540 }
541
542 public void process(DocumentData object) throws IOException {
543 boolean processAll = false;
544 processor.processTuple(object.identifier, object.url, object.textLength);
545 }
546
547 public Class<DocumentData> getInputClass() {
548 return DocumentData.class;
549 }
550
551 public void close() throws IOException {
552 processor.close();
553 }
554 }
555 }
556 public static class UrlOrder implements Order<DocumentData> {
557 public int hash(DocumentData object) {
558 int h = 0;
559 h += Utility.hash(object.url);
560 return h;
561 }
562 public Comparator<DocumentData> greaterThan() {
563 return new Comparator<DocumentData>() {
564 public int compare(DocumentData one, DocumentData two) {
565 int result = 0;
566 do {
567 result = + Utility.compare(one.url, two.url);
568 if(result != 0) break;
569 } while (false);
570 return -result;
571 }
572 };
573 }
574 public Comparator<DocumentData> lessThan() {
575 return new Comparator<DocumentData>() {
576 public int compare(DocumentData one, DocumentData two) {
577 int result = 0;
578 do {
579 result = + Utility.compare(one.url, two.url);
580 if(result != 0) break;
581 } while (false);
582 return result;
583 }
584 };
585 }
586 public TypeReader<DocumentData> orderedReader(ArrayInput _input) {
587 return new ShreddedReader(_input);
588 }
589
590 public TypeReader<DocumentData> orderedReader(ArrayInput _input, int bufferSize) {
591 return new ShreddedReader(_input, bufferSize);
592 }
593 public OrderedWriter<DocumentData> orderedWriter(ArrayOutput _output) {
594 ShreddedWriter w = new ShreddedWriter(_output);
595 return new OrderedWriterClass(w);
596 }
597 public static class OrderedWriterClass extends OrderedWriter< DocumentData > {
598 DocumentData last = null;
599 ShreddedWriter shreddedWriter = null;
600
601 public OrderedWriterClass(ShreddedWriter s) {
602 this.shreddedWriter = s;
603 }
604
605 public void process(DocumentData object) throws IOException {
606 boolean processAll = false;
607 if (processAll || last == null || 0 != Utility.compare(object.url, last.url)) { processAll = true; shreddedWriter.processUrl(object.url); }
608 shreddedWriter.processTuple(object.identifier, object.textLength);
609 last = object;
610 }
611
612 public void close() throws IOException {
613 shreddedWriter.close();
614 }
615
616 public Class<DocumentData> getInputClass() {
617 return DocumentData.class;
618 }
619 }
620 public ReaderSource<DocumentData> orderedCombiner(Collection<TypeReader<DocumentData>> readers, boolean closeOnExit) {
621 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
622
623 for (TypeReader<DocumentData> reader : readers) {
624 shreddedReaders.add((ShreddedReader)reader);
625 }
626
627 return new ShreddedCombiner(shreddedReaders, closeOnExit);
628 }
629 public DocumentData clone(DocumentData object) {
630 DocumentData result = new DocumentData();
631 if (object == null) return result;
632 result.identifier = object.identifier;
633 result.url = object.url;
634 result.textLength = object.textLength;
635 return result;
636 }
637 public Class<DocumentData> getOrderedClass() {
638 return DocumentData.class;
639 }
640 public String[] getOrderSpec() {
641 return new String[] {"+url"};
642 }
643
644 public static String getSpecString() {
645 return "+url";
646 }
647
648 public interface ShreddedProcessor extends Step {
649 public void processUrl(String url) throws IOException;
650 public void processTuple(String identifier, int textLength) throws IOException;
651 public void close() throws IOException;
652 }
653 public interface ShreddedSource extends Step {
654 }
655
656 public static class ShreddedWriter implements ShreddedProcessor {
657 ArrayOutput output;
658 ShreddedBuffer buffer = new ShreddedBuffer();
659 String lastUrl;
660 boolean lastFlush = false;
661
662 public ShreddedWriter(ArrayOutput output) {
663 this.output = output;
664 }
665
666 public void close() throws IOException {
667 flush();
668 }
669
670 public void processUrl(String url) {
671 lastUrl = url;
672 buffer.processUrl(url);
673 }
674 public final void processTuple(String identifier, int textLength) throws IOException {
675 if (lastFlush) {
676 if(buffer.urls.size() == 0) buffer.processUrl(lastUrl);
677 lastFlush = false;
678 }
679 buffer.processTuple(identifier, textLength);
680 if (buffer.isFull())
681 flush();
682 }
683 public final void flushTuples(int pauseIndex) throws IOException {
684
685 while (buffer.getReadIndex() < pauseIndex) {
686
687 output.writeString(buffer.getIdentifier());
688 output.writeInt(buffer.getTextLength());
689 buffer.incrementTuple();
690 }
691 }
692 public final void flushUrl(int pauseIndex) throws IOException {
693 while (buffer.getReadIndex() < pauseIndex) {
694 int nextPause = buffer.getUrlEndIndex();
695 int count = nextPause - buffer.getReadIndex();
696
697 output.writeString(buffer.getUrl());
698 output.writeInt(count);
699 buffer.incrementUrl();
700
701 flushTuples(nextPause);
702 assert nextPause == buffer.getReadIndex();
703 }
704 }
705 public void flush() throws IOException {
706 flushUrl(buffer.getWriteIndex());
707 buffer.reset();
708 lastFlush = true;
709 }
710 }
711 public static class ShreddedBuffer {
712 ArrayList<String> urls = new ArrayList();
713 ArrayList<Integer> urlTupleIdx = new ArrayList();
714 int urlReadIdx = 0;
715
716 String[] identifiers;
717 int[] textLengths;
718 int writeTupleIndex = 0;
719 int readTupleIndex = 0;
720 int batchSize;
721
722 public ShreddedBuffer(int batchSize) {
723 this.batchSize = batchSize;
724
725 identifiers = new String[batchSize];
726 textLengths = new int[batchSize];
727 }
728
729 public ShreddedBuffer() {
730 this(10000);
731 }
732
733 public void processUrl(String url) {
734 urls.add(url);
735 urlTupleIdx.add(writeTupleIndex);
736 }
737 public void processTuple(String identifier, int textLength) {
738 assert urls.size() > 0;
739 identifiers[writeTupleIndex] = identifier;
740 textLengths[writeTupleIndex] = textLength;
741 writeTupleIndex++;
742 }
743 public void resetData() {
744 urls.clear();
745 urlTupleIdx.clear();
746 writeTupleIndex = 0;
747 }
748
749 public void resetRead() {
750 readTupleIndex = 0;
751 urlReadIdx = 0;
752 }
753
754 public void reset() {
755 resetData();
756 resetRead();
757 }
758 public boolean isFull() {
759 return writeTupleIndex >= batchSize;
760 }
761
762 public boolean isEmpty() {
763 return writeTupleIndex == 0;
764 }
765
766 public boolean isAtEnd() {
767 return readTupleIndex >= writeTupleIndex;
768 }
769 public void incrementUrl() {
770 urlReadIdx++;
771 }
772
773 public void autoIncrementUrl() {
774 while (readTupleIndex >= getUrlEndIndex() && readTupleIndex < writeTupleIndex)
775 urlReadIdx++;
776 }
777 public void incrementTuple() {
778 readTupleIndex++;
779 }
780 public int getUrlEndIndex() {
781 if ((urlReadIdx+1) >= urlTupleIdx.size())
782 return writeTupleIndex;
783 return urlTupleIdx.get(urlReadIdx+1);
784 }
785 public int getReadIndex() {
786 return readTupleIndex;
787 }
788
789 public int getWriteIndex() {
790 return writeTupleIndex;
791 }
792 public String getUrl() {
793 assert readTupleIndex < writeTupleIndex;
794 assert urlReadIdx < urls.size();
795
796 return urls.get(urlReadIdx);
797 }
798 public String getIdentifier() {
799 assert readTupleIndex < writeTupleIndex;
800 return identifiers[readTupleIndex];
801 }
802 public int getTextLength() {
803 assert readTupleIndex < writeTupleIndex;
804 return textLengths[readTupleIndex];
805 }
806 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
807 while (getReadIndex() < endIndex) {
808 output.processTuple(getIdentifier(), getTextLength());
809 incrementTuple();
810 }
811 }
812 public void copyUntilIndexUrl(int endIndex, ShreddedProcessor output) throws IOException {
813 while (getReadIndex() < endIndex) {
814 output.processUrl(getUrl());
815 assert getUrlEndIndex() <= endIndex;
816 copyTuples(getUrlEndIndex(), output);
817 incrementUrl();
818 }
819 }
820 public void copyUntilUrl(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
821 while (!isAtEnd()) {
822 if (other != null) {
823 assert !other.isAtEnd();
824 int c = + Utility.compare(getUrl(), other.getUrl());
825
826 if (c > 0) {
827 break;
828 }
829
830 output.processUrl(getUrl());
831
832 copyTuples(getUrlEndIndex(), output);
833 } else {
834 output.processUrl(getUrl());
835 copyTuples(getUrlEndIndex(), output);
836 }
837 incrementUrl();
838
839
840 }
841 }
842 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
843 copyUntilUrl(other, output);
844 }
845
846 }
847 public static class ShreddedCombiner implements ReaderSource<DocumentData>, ShreddedSource {
848 public ShreddedProcessor processor;
849 Collection<ShreddedReader> readers;
850 boolean closeOnExit = false;
851 boolean uninitialized = true;
852 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
853
854 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
855 this.readers = readers;
856 this.closeOnExit = closeOnExit;
857 }
858
859 public void setProcessor(Step processor) throws IncompatibleProcessorException {
860 if (processor instanceof ShreddedProcessor) {
861 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
862 } else if (processor instanceof DocumentData.Processor) {
863 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
864 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
865 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
866 } else {
867 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
868 }
869 }
870
871 public Class<DocumentData> getOutputClass() {
872 return DocumentData.class;
873 }
874
875 public void initialize() throws IOException {
876 for (ShreddedReader reader : readers) {
877 reader.fill();
878
879 if (!reader.getBuffer().isAtEnd())
880 queue.add(reader);
881 }
882
883 uninitialized = false;
884 }
885
886 public void run() throws IOException {
887 initialize();
888
889 while (queue.size() > 0) {
890 ShreddedReader top = queue.poll();
891 ShreddedReader next = null;
892 ShreddedBuffer nextBuffer = null;
893
894 assert !top.getBuffer().isAtEnd();
895
896 if (queue.size() > 0) {
897 next = queue.peek();
898 nextBuffer = next.getBuffer();
899 assert !nextBuffer.isAtEnd();
900 }
901
902 top.getBuffer().copyUntil(nextBuffer, processor);
903 if (top.getBuffer().isAtEnd())
904 top.fill();
905
906 if (!top.getBuffer().isAtEnd())
907 queue.add(top);
908 }
909
910 if (closeOnExit)
911 processor.close();
912 }
913
914 public DocumentData read() throws IOException {
915 if (uninitialized)
916 initialize();
917
918 DocumentData result = null;
919
920 while (queue.size() > 0) {
921 ShreddedReader top = queue.poll();
922 result = top.read();
923
924 if (result != null) {
925 if (top.getBuffer().isAtEnd())
926 top.fill();
927
928 queue.offer(top);
929 break;
930 }
931 }
932
933 return result;
934 }
935 }
936 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentData>, ShreddedSource {
937 public ShreddedProcessor processor;
938 ShreddedBuffer buffer;
939 DocumentData last = new DocumentData();
940 long updateUrlCount = -1;
941 long tupleCount = 0;
942 long bufferStartCount = 0;
943 ArrayInput input;
944
945 public ShreddedReader(ArrayInput input) {
946 this.input = input;
947 this.buffer = new ShreddedBuffer();
948 }
949
950 public ShreddedReader(ArrayInput input, int bufferSize) {
951 this.input = input;
952 this.buffer = new ShreddedBuffer(bufferSize);
953 }
954
955 public final int compareTo(ShreddedReader other) {
956 ShreddedBuffer otherBuffer = other.getBuffer();
957
958 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
959 return 0;
960 } else if (buffer.isAtEnd()) {
961 return -1;
962 } else if (otherBuffer.isAtEnd()) {
963 return 1;
964 }
965
966 int result = 0;
967 do {
968 result = + Utility.compare(buffer.getUrl(), otherBuffer.getUrl());
969 if(result != 0) break;
970 } while (false);
971
972 return result;
973 }
974
975 public final ShreddedBuffer getBuffer() {
976 return buffer;
977 }
978
979 public final DocumentData read() throws IOException {
980 if (buffer.isAtEnd()) {
981 fill();
982
983 if (buffer.isAtEnd()) {
984 return null;
985 }
986 }
987
988 assert !buffer.isAtEnd();
989 DocumentData result = new DocumentData();
990
991 result.url = buffer.getUrl();
992 result.identifier = buffer.getIdentifier();
993 result.textLength = buffer.getTextLength();
994
995 buffer.incrementTuple();
996 buffer.autoIncrementUrl();
997
998 return result;
999 }
1000
1001 public final void fill() throws IOException {
1002 try {
1003 buffer.reset();
1004
1005 if (tupleCount != 0) {
1006
1007 if(updateUrlCount - tupleCount > 0) {
1008 buffer.urls.add(last.url);
1009 buffer.urlTupleIdx.add((int) (updateUrlCount - tupleCount));
1010 }
1011 bufferStartCount = tupleCount;
1012 }
1013
1014 while (!buffer.isFull()) {
1015 updateUrl();
1016 buffer.processTuple(input.readString(), input.readInt());
1017 tupleCount++;
1018 }
1019 } catch(EOFException e) {}
1020 }
1021
1022 public final void updateUrl() throws IOException {
1023 if (updateUrlCount > tupleCount)
1024 return;
1025
1026 last.url = input.readString();
1027 updateUrlCount = tupleCount + input.readInt();
1028
1029 buffer.processUrl(last.url);
1030 }
1031
1032 public void run() throws IOException {
1033 while (true) {
1034 fill();
1035
1036 if (buffer.isAtEnd())
1037 break;
1038
1039 buffer.copyUntil(null, processor);
1040 }
1041 processor.close();
1042 }
1043
1044 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1045 if (processor instanceof ShreddedProcessor) {
1046 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1047 } else if (processor instanceof DocumentData.Processor) {
1048 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
1049 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1050 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
1051 } else {
1052 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1053 }
1054 }
1055
1056 public Class<DocumentData> getOutputClass() {
1057 return DocumentData.class;
1058 }
1059 }
1060
1061 public static class DuplicateEliminator implements ShreddedProcessor {
1062 public ShreddedProcessor processor;
1063 DocumentData last = new DocumentData();
1064 boolean urlProcess = true;
1065
1066 public DuplicateEliminator() {}
1067 public DuplicateEliminator(ShreddedProcessor processor) {
1068 this.processor = processor;
1069 }
1070
1071 public void setShreddedProcessor(ShreddedProcessor processor) {
1072 this.processor = processor;
1073 }
1074
1075 public void processUrl(String url) throws IOException {
1076 if (urlProcess || Utility.compare(url, last.url) != 0) {
1077 last.url = url;
1078 processor.processUrl(url);
1079 urlProcess = false;
1080 }
1081 }
1082
1083 public void resetUrl() {
1084 urlProcess = true;
1085 }
1086
1087 public void processTuple(String identifier, int textLength) throws IOException {
1088 processor.processTuple(identifier, textLength);
1089 }
1090
1091 public void close() throws IOException {
1092 processor.close();
1093 }
1094 }
1095 public static class TupleUnshredder implements ShreddedProcessor {
1096 DocumentData last = new DocumentData();
1097 public org.galagosearch.tupleflow.Processor<DocumentData> processor;
1098
1099 public TupleUnshredder(DocumentData.Processor processor) {
1100 this.processor = processor;
1101 }
1102
1103 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentData> processor) {
1104 this.processor = processor;
1105 }
1106
1107 public DocumentData clone(DocumentData object) {
1108 DocumentData result = new DocumentData();
1109 if (object == null) return result;
1110 result.identifier = object.identifier;
1111 result.url = object.url;
1112 result.textLength = object.textLength;
1113 return result;
1114 }
1115
1116 public void processUrl(String url) throws IOException {
1117 last.url = url;
1118 }
1119
1120
1121 public void processTuple(String identifier, int textLength) throws IOException {
1122 last.identifier = identifier;
1123 last.textLength = textLength;
1124 processor.process(clone(last));
1125 }
1126
1127 public void close() throws IOException {
1128 processor.close();
1129 }
1130 }
1131 public static class TupleShredder implements Processor {
1132 DocumentData last = new DocumentData();
1133 public ShreddedProcessor processor;
1134
1135 public TupleShredder(ShreddedProcessor processor) {
1136 this.processor = processor;
1137 }
1138
1139 public DocumentData clone(DocumentData object) {
1140 DocumentData result = new DocumentData();
1141 if (object == null) return result;
1142 result.identifier = object.identifier;
1143 result.url = object.url;
1144 result.textLength = object.textLength;
1145 return result;
1146 }
1147
1148 public void process(DocumentData object) throws IOException {
1149 boolean processAll = false;
1150 if(last == null || Utility.compare(last.url, object.url) != 0 || processAll) { processor.processUrl(object.url); processAll = true; }
1151 processor.processTuple(object.identifier, object.textLength);
1152 }
1153
1154 public Class<DocumentData> getInputClass() {
1155 return DocumentData.class;
1156 }
1157
1158 public void close() throws IOException {
1159 processor.close();
1160 }
1161 }
1162 }
1163 public static class IdentifierOrder implements Order<DocumentData> {
1164 public int hash(DocumentData object) {
1165 int h = 0;
1166 h += Utility.hash(object.identifier);
1167 return h;
1168 }
1169 public Comparator<DocumentData> greaterThan() {
1170 return new Comparator<DocumentData>() {
1171 public int compare(DocumentData one, DocumentData two) {
1172 int result = 0;
1173 do {
1174 result = + Utility.compare(one.identifier, two.identifier);
1175 if(result != 0) break;
1176 } while (false);
1177 return -result;
1178 }
1179 };
1180 }
1181 public Comparator<DocumentData> lessThan() {
1182 return new Comparator<DocumentData>() {
1183 public int compare(DocumentData one, DocumentData two) {
1184 int result = 0;
1185 do {
1186 result = + Utility.compare(one.identifier, two.identifier);
1187 if(result != 0) break;
1188 } while (false);
1189 return result;
1190 }
1191 };
1192 }
1193 public TypeReader<DocumentData> orderedReader(ArrayInput _input) {
1194 return new ShreddedReader(_input);
1195 }
1196
1197 public TypeReader<DocumentData> orderedReader(ArrayInput _input, int bufferSize) {
1198 return new ShreddedReader(_input, bufferSize);
1199 }
1200 public OrderedWriter<DocumentData> orderedWriter(ArrayOutput _output) {
1201 ShreddedWriter w = new ShreddedWriter(_output);
1202 return new OrderedWriterClass(w);
1203 }
1204 public static class OrderedWriterClass extends OrderedWriter< DocumentData > {
1205 DocumentData last = null;
1206 ShreddedWriter shreddedWriter = null;
1207
1208 public OrderedWriterClass(ShreddedWriter s) {
1209 this.shreddedWriter = s;
1210 }
1211
1212 public void process(DocumentData object) throws IOException {
1213 boolean processAll = false;
1214 if (processAll || last == null || 0 != Utility.compare(object.identifier, last.identifier)) { processAll = true; shreddedWriter.processIdentifier(object.identifier); }
1215 shreddedWriter.processTuple(object.url, object.textLength);
1216 last = object;
1217 }
1218
1219 public void close() throws IOException {
1220 shreddedWriter.close();
1221 }
1222
1223 public Class<DocumentData> getInputClass() {
1224 return DocumentData.class;
1225 }
1226 }
1227 public ReaderSource<DocumentData> orderedCombiner(Collection<TypeReader<DocumentData>> readers, boolean closeOnExit) {
1228 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
1229
1230 for (TypeReader<DocumentData> reader : readers) {
1231 shreddedReaders.add((ShreddedReader)reader);
1232 }
1233
1234 return new ShreddedCombiner(shreddedReaders, closeOnExit);
1235 }
1236 public DocumentData clone(DocumentData object) {
1237 DocumentData result = new DocumentData();
1238 if (object == null) return result;
1239 result.identifier = object.identifier;
1240 result.url = object.url;
1241 result.textLength = object.textLength;
1242 return result;
1243 }
1244 public Class<DocumentData> getOrderedClass() {
1245 return DocumentData.class;
1246 }
1247 public String[] getOrderSpec() {
1248 return new String[] {"+identifier"};
1249 }
1250
1251 public static String getSpecString() {
1252 return "+identifier";
1253 }
1254
1255 public interface ShreddedProcessor extends Step {
1256 public void processIdentifier(String identifier) throws IOException;
1257 public void processTuple(String url, int textLength) throws IOException;
1258 public void close() throws IOException;
1259 }
1260 public interface ShreddedSource extends Step {
1261 }
1262
1263 public static class ShreddedWriter implements ShreddedProcessor {
1264 ArrayOutput output;
1265 ShreddedBuffer buffer = new ShreddedBuffer();
1266 String lastIdentifier;
1267 boolean lastFlush = false;
1268
1269 public ShreddedWriter(ArrayOutput output) {
1270 this.output = output;
1271 }
1272
1273 public void close() throws IOException {
1274 flush();
1275 }
1276
1277 public void processIdentifier(String identifier) {
1278 lastIdentifier = identifier;
1279 buffer.processIdentifier(identifier);
1280 }
1281 public final void processTuple(String url, int textLength) throws IOException {
1282 if (lastFlush) {
1283 if(buffer.identifiers.size() == 0) buffer.processIdentifier(lastIdentifier);
1284 lastFlush = false;
1285 }
1286 buffer.processTuple(url, textLength);
1287 if (buffer.isFull())
1288 flush();
1289 }
1290 public final void flushTuples(int pauseIndex) throws IOException {
1291
1292 while (buffer.getReadIndex() < pauseIndex) {
1293
1294 output.writeString(buffer.getUrl());
1295 output.writeInt(buffer.getTextLength());
1296 buffer.incrementTuple();
1297 }
1298 }
1299 public final void flushIdentifier(int pauseIndex) throws IOException {
1300 while (buffer.getReadIndex() < pauseIndex) {
1301 int nextPause = buffer.getIdentifierEndIndex();
1302 int count = nextPause - buffer.getReadIndex();
1303
1304 output.writeString(buffer.getIdentifier());
1305 output.writeInt(count);
1306 buffer.incrementIdentifier();
1307
1308 flushTuples(nextPause);
1309 assert nextPause == buffer.getReadIndex();
1310 }
1311 }
1312 public void flush() throws IOException {
1313 flushIdentifier(buffer.getWriteIndex());
1314 buffer.reset();
1315 lastFlush = true;
1316 }
1317 }
1318 public static class ShreddedBuffer {
1319 ArrayList<String> identifiers = new ArrayList();
1320 ArrayList<Integer> identifierTupleIdx = new ArrayList();
1321 int identifierReadIdx = 0;
1322
1323 String[] urls;
1324 int[] textLengths;
1325 int writeTupleIndex = 0;
1326 int readTupleIndex = 0;
1327 int batchSize;
1328
1329 public ShreddedBuffer(int batchSize) {
1330 this.batchSize = batchSize;
1331
1332 urls = new String[batchSize];
1333 textLengths = new int[batchSize];
1334 }
1335
1336 public ShreddedBuffer() {
1337 this(10000);
1338 }
1339
1340 public void processIdentifier(String identifier) {
1341 identifiers.add(identifier);
1342 identifierTupleIdx.add(writeTupleIndex);
1343 }
1344 public void processTuple(String url, int textLength) {
1345 assert identifiers.size() > 0;
1346 urls[writeTupleIndex] = url;
1347 textLengths[writeTupleIndex] = textLength;
1348 writeTupleIndex++;
1349 }
1350 public void resetData() {
1351 identifiers.clear();
1352 identifierTupleIdx.clear();
1353 writeTupleIndex = 0;
1354 }
1355
1356 public void resetRead() {
1357 readTupleIndex = 0;
1358 identifierReadIdx = 0;
1359 }
1360
1361 public void reset() {
1362 resetData();
1363 resetRead();
1364 }
1365 public boolean isFull() {
1366 return writeTupleIndex >= batchSize;
1367 }
1368
1369 public boolean isEmpty() {
1370 return writeTupleIndex == 0;
1371 }
1372
1373 public boolean isAtEnd() {
1374 return readTupleIndex >= writeTupleIndex;
1375 }
1376 public void incrementIdentifier() {
1377 identifierReadIdx++;
1378 }
1379
1380 public void autoIncrementIdentifier() {
1381 while (readTupleIndex >= getIdentifierEndIndex() && readTupleIndex < writeTupleIndex)
1382 identifierReadIdx++;
1383 }
1384 public void incrementTuple() {
1385 readTupleIndex++;
1386 }
1387 public int getIdentifierEndIndex() {
1388 if ((identifierReadIdx+1) >= identifierTupleIdx.size())
1389 return writeTupleIndex;
1390 return identifierTupleIdx.get(identifierReadIdx+1);
1391 }
1392 public int getReadIndex() {
1393 return readTupleIndex;
1394 }
1395
1396 public int getWriteIndex() {
1397 return writeTupleIndex;
1398 }
1399 public String getIdentifier() {
1400 assert readTupleIndex < writeTupleIndex;
1401 assert identifierReadIdx < identifiers.size();
1402
1403 return identifiers.get(identifierReadIdx);
1404 }
1405 public String getUrl() {
1406 assert readTupleIndex < writeTupleIndex;
1407 return urls[readTupleIndex];
1408 }
1409 public int getTextLength() {
1410 assert readTupleIndex < writeTupleIndex;
1411 return textLengths[readTupleIndex];
1412 }
1413 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1414 while (getReadIndex() < endIndex) {
1415 output.processTuple(getUrl(), getTextLength());
1416 incrementTuple();
1417 }
1418 }
1419 public void copyUntilIndexIdentifier(int endIndex, ShreddedProcessor output) throws IOException {
1420 while (getReadIndex() < endIndex) {
1421 output.processIdentifier(getIdentifier());
1422 assert getIdentifierEndIndex() <= endIndex;
1423 copyTuples(getIdentifierEndIndex(), output);
1424 incrementIdentifier();
1425 }
1426 }
1427 public void copyUntilIdentifier(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1428 while (!isAtEnd()) {
1429 if (other != null) {
1430 assert !other.isAtEnd();
1431 int c = + Utility.compare(getIdentifier(), other.getIdentifier());
1432
1433 if (c > 0) {
1434 break;
1435 }
1436
1437 output.processIdentifier(getIdentifier());
1438
1439 copyTuples(getIdentifierEndIndex(), output);
1440 } else {
1441 output.processIdentifier(getIdentifier());
1442 copyTuples(getIdentifierEndIndex(), output);
1443 }
1444 incrementIdentifier();
1445
1446
1447 }
1448 }
1449 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1450 copyUntilIdentifier(other, output);
1451 }
1452
1453 }
1454 public static class ShreddedCombiner implements ReaderSource<DocumentData>, ShreddedSource {
1455 public ShreddedProcessor processor;
1456 Collection<ShreddedReader> readers;
1457 boolean closeOnExit = false;
1458 boolean uninitialized = true;
1459 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1460
1461 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1462 this.readers = readers;
1463 this.closeOnExit = closeOnExit;
1464 }
1465
1466 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1467 if (processor instanceof ShreddedProcessor) {
1468 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1469 } else if (processor instanceof DocumentData.Processor) {
1470 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
1471 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1472 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
1473 } else {
1474 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1475 }
1476 }
1477
1478 public Class<DocumentData> getOutputClass() {
1479 return DocumentData.class;
1480 }
1481
1482 public void initialize() throws IOException {
1483 for (ShreddedReader reader : readers) {
1484 reader.fill();
1485
1486 if (!reader.getBuffer().isAtEnd())
1487 queue.add(reader);
1488 }
1489
1490 uninitialized = false;
1491 }
1492
1493 public void run() throws IOException {
1494 initialize();
1495
1496 while (queue.size() > 0) {
1497 ShreddedReader top = queue.poll();
1498 ShreddedReader next = null;
1499 ShreddedBuffer nextBuffer = null;
1500
1501 assert !top.getBuffer().isAtEnd();
1502
1503 if (queue.size() > 0) {
1504 next = queue.peek();
1505 nextBuffer = next.getBuffer();
1506 assert !nextBuffer.isAtEnd();
1507 }
1508
1509 top.getBuffer().copyUntil(nextBuffer, processor);
1510 if (top.getBuffer().isAtEnd())
1511 top.fill();
1512
1513 if (!top.getBuffer().isAtEnd())
1514 queue.add(top);
1515 }
1516
1517 if (closeOnExit)
1518 processor.close();
1519 }
1520
1521 public DocumentData read() throws IOException {
1522 if (uninitialized)
1523 initialize();
1524
1525 DocumentData result = null;
1526
1527 while (queue.size() > 0) {
1528 ShreddedReader top = queue.poll();
1529 result = top.read();
1530
1531 if (result != null) {
1532 if (top.getBuffer().isAtEnd())
1533 top.fill();
1534
1535 queue.offer(top);
1536 break;
1537 }
1538 }
1539
1540 return result;
1541 }
1542 }
1543 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentData>, ShreddedSource {
1544 public ShreddedProcessor processor;
1545 ShreddedBuffer buffer;
1546 DocumentData last = new DocumentData();
1547 long updateIdentifierCount = -1;
1548 long tupleCount = 0;
1549 long bufferStartCount = 0;
1550 ArrayInput input;
1551
1552 public ShreddedReader(ArrayInput input) {
1553 this.input = input;
1554 this.buffer = new ShreddedBuffer();
1555 }
1556
1557 public ShreddedReader(ArrayInput input, int bufferSize) {
1558 this.input = input;
1559 this.buffer = new ShreddedBuffer(bufferSize);
1560 }
1561
1562 public final int compareTo(ShreddedReader other) {
1563 ShreddedBuffer otherBuffer = other.getBuffer();
1564
1565 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1566 return 0;
1567 } else if (buffer.isAtEnd()) {
1568 return -1;
1569 } else if (otherBuffer.isAtEnd()) {
1570 return 1;
1571 }
1572
1573 int result = 0;
1574 do {
1575 result = + Utility.compare(buffer.getIdentifier(), otherBuffer.getIdentifier());
1576 if(result != 0) break;
1577 } while (false);
1578
1579 return result;
1580 }
1581
1582 public final ShreddedBuffer getBuffer() {
1583 return buffer;
1584 }
1585
1586 public final DocumentData read() throws IOException {
1587 if (buffer.isAtEnd()) {
1588 fill();
1589
1590 if (buffer.isAtEnd()) {
1591 return null;
1592 }
1593 }
1594
1595 assert !buffer.isAtEnd();
1596 DocumentData result = new DocumentData();
1597
1598 result.identifier = buffer.getIdentifier();
1599 result.url = buffer.getUrl();
1600 result.textLength = buffer.getTextLength();
1601
1602 buffer.incrementTuple();
1603 buffer.autoIncrementIdentifier();
1604
1605 return result;
1606 }
1607
1608 public final void fill() throws IOException {
1609 try {
1610 buffer.reset();
1611
1612 if (tupleCount != 0) {
1613
1614 if(updateIdentifierCount - tupleCount > 0) {
1615 buffer.identifiers.add(last.identifier);
1616 buffer.identifierTupleIdx.add((int) (updateIdentifierCount - tupleCount));
1617 }
1618 bufferStartCount = tupleCount;
1619 }
1620
1621 while (!buffer.isFull()) {
1622 updateIdentifier();
1623 buffer.processTuple(input.readString(), input.readInt());
1624 tupleCount++;
1625 }
1626 } catch(EOFException e) {}
1627 }
1628
1629 public final void updateIdentifier() throws IOException {
1630 if (updateIdentifierCount > tupleCount)
1631 return;
1632
1633 last.identifier = input.readString();
1634 updateIdentifierCount = tupleCount + input.readInt();
1635
1636 buffer.processIdentifier(last.identifier);
1637 }
1638
1639 public void run() throws IOException {
1640 while (true) {
1641 fill();
1642
1643 if (buffer.isAtEnd())
1644 break;
1645
1646 buffer.copyUntil(null, processor);
1647 }
1648 processor.close();
1649 }
1650
1651 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1652 if (processor instanceof ShreddedProcessor) {
1653 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1654 } else if (processor instanceof DocumentData.Processor) {
1655 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
1656 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1657 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
1658 } else {
1659 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1660 }
1661 }
1662
1663 public Class<DocumentData> getOutputClass() {
1664 return DocumentData.class;
1665 }
1666 }
1667
1668 public static class DuplicateEliminator implements ShreddedProcessor {
1669 public ShreddedProcessor processor;
1670 DocumentData last = new DocumentData();
1671 boolean identifierProcess = true;
1672
1673 public DuplicateEliminator() {}
1674 public DuplicateEliminator(ShreddedProcessor processor) {
1675 this.processor = processor;
1676 }
1677
1678 public void setShreddedProcessor(ShreddedProcessor processor) {
1679 this.processor = processor;
1680 }
1681
1682 public void processIdentifier(String identifier) throws IOException {
1683 if (identifierProcess || Utility.compare(identifier, last.identifier) != 0) {
1684 last.identifier = identifier;
1685 processor.processIdentifier(identifier);
1686 identifierProcess = false;
1687 }
1688 }
1689
1690 public void resetIdentifier() {
1691 identifierProcess = true;
1692 }
1693
1694 public void processTuple(String url, int textLength) throws IOException {
1695 processor.processTuple(url, textLength);
1696 }
1697
1698 public void close() throws IOException {
1699 processor.close();
1700 }
1701 }
1702 public static class TupleUnshredder implements ShreddedProcessor {
1703 DocumentData last = new DocumentData();
1704 public org.galagosearch.tupleflow.Processor<DocumentData> processor;
1705
1706 public TupleUnshredder(DocumentData.Processor processor) {
1707 this.processor = processor;
1708 }
1709
1710 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentData> processor) {
1711 this.processor = processor;
1712 }
1713
1714 public DocumentData clone(DocumentData object) {
1715 DocumentData result = new DocumentData();
1716 if (object == null) return result;
1717 result.identifier = object.identifier;
1718 result.url = object.url;
1719 result.textLength = object.textLength;
1720 return result;
1721 }
1722
1723 public void processIdentifier(String identifier) throws IOException {
1724 last.identifier = identifier;
1725 }
1726
1727
1728 public void processTuple(String url, int textLength) throws IOException {
1729 last.url = url;
1730 last.textLength = textLength;
1731 processor.process(clone(last));
1732 }
1733
1734 public void close() throws IOException {
1735 processor.close();
1736 }
1737 }
1738 public static class TupleShredder implements Processor {
1739 DocumentData last = new DocumentData();
1740 public ShreddedProcessor processor;
1741
1742 public TupleShredder(ShreddedProcessor processor) {
1743 this.processor = processor;
1744 }
1745
1746 public DocumentData clone(DocumentData object) {
1747 DocumentData result = new DocumentData();
1748 if (object == null) return result;
1749 result.identifier = object.identifier;
1750 result.url = object.url;
1751 result.textLength = object.textLength;
1752 return result;
1753 }
1754
1755 public void process(DocumentData object) throws IOException {
1756 boolean processAll = false;
1757 if(last == null || Utility.compare(last.identifier, object.identifier) != 0 || processAll) { processor.processIdentifier(object.identifier); processAll = true; }
1758 processor.processTuple(object.url, object.textLength);
1759 }
1760
1761 public Class<DocumentData> getInputClass() {
1762 return DocumentData.class;
1763 }
1764
1765 public void close() throws IOException {
1766 processor.close();
1767 }
1768 }
1769 }
1770 }