1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class AdditionalDocumentText implements Type<AdditionalDocumentText> {
25 public String identifier;
26 public String text;
27
28 public AdditionalDocumentText() {}
29 public AdditionalDocumentText(String identifier, String text) {
30 this.identifier = identifier;
31 this.text = text;
32 }
33
34 public String toString() {
35 return String.format("%s,%s",
36 identifier, text);
37 }
38
39 public Order<AdditionalDocumentText> getOrder(String... spec) {
40 if (Arrays.equals(spec, new String[] { })) {
41 return new Unordered();
42 }
43 if (Arrays.equals(spec, new String[] { "+identifier" })) {
44 return new IdentifierOrder();
45 }
46 return null;
47 }
48
49 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<AdditionalDocumentText> {
50 public void process(AdditionalDocumentText object) throws IOException;
51 public void close() throws IOException;
52 }
53 public interface Source extends Step {
54 }
55 public static class Unordered implements Order<AdditionalDocumentText> {
56 public int hash(AdditionalDocumentText object) {
57 int h = 0;
58 return h;
59 }
60 public Comparator<AdditionalDocumentText> greaterThan() {
61 return new Comparator<AdditionalDocumentText>() {
62 public int compare(AdditionalDocumentText one, AdditionalDocumentText two) {
63 int result = 0;
64 do {
65 } while (false);
66 return -result;
67 }
68 };
69 }
70 public Comparator<AdditionalDocumentText> lessThan() {
71 return new Comparator<AdditionalDocumentText>() {
72 public int compare(AdditionalDocumentText one, AdditionalDocumentText two) {
73 int result = 0;
74 do {
75 } while (false);
76 return result;
77 }
78 };
79 }
80 public TypeReader<AdditionalDocumentText> orderedReader(ArrayInput _input) {
81 return new ShreddedReader(_input);
82 }
83
84 public TypeReader<AdditionalDocumentText> orderedReader(ArrayInput _input, int bufferSize) {
85 return new ShreddedReader(_input, bufferSize);
86 }
87 public OrderedWriter<AdditionalDocumentText> orderedWriter(ArrayOutput _output) {
88 ShreddedWriter w = new ShreddedWriter(_output);
89 return new OrderedWriterClass(w);
90 }
91 public static class OrderedWriterClass extends OrderedWriter< AdditionalDocumentText > {
92 AdditionalDocumentText last = null;
93 ShreddedWriter shreddedWriter = null;
94
95 public OrderedWriterClass(ShreddedWriter s) {
96 this.shreddedWriter = s;
97 }
98
99 public void process(AdditionalDocumentText object) throws IOException {
100 boolean processAll = false;
101 shreddedWriter.processTuple(object.identifier, object.text);
102 last = object;
103 }
104
105 public void close() throws IOException {
106 shreddedWriter.close();
107 }
108
109 public Class<AdditionalDocumentText> getInputClass() {
110 return AdditionalDocumentText.class;
111 }
112 }
113 public ReaderSource<AdditionalDocumentText> orderedCombiner(Collection<TypeReader<AdditionalDocumentText>> readers, boolean closeOnExit) {
114 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
115
116 for (TypeReader<AdditionalDocumentText> reader : readers) {
117 shreddedReaders.add((ShreddedReader)reader);
118 }
119
120 return new ShreddedCombiner(shreddedReaders, closeOnExit);
121 }
122 public AdditionalDocumentText clone(AdditionalDocumentText object) {
123 AdditionalDocumentText result = new AdditionalDocumentText();
124 if (object == null) return result;
125 result.identifier = object.identifier;
126 result.text = object.text;
127 return result;
128 }
129 public Class<AdditionalDocumentText> getOrderedClass() {
130 return AdditionalDocumentText.class;
131 }
132 public String[] getOrderSpec() {
133 return new String[] {};
134 }
135
136 public static String getSpecString() {
137 return "";
138 }
139
140 public interface ShreddedProcessor extends Step {
141 public void processTuple(String identifier, String text) throws IOException;
142 public void close() throws IOException;
143 }
144 public interface ShreddedSource extends Step {
145 }
146
147 public static class ShreddedWriter implements ShreddedProcessor {
148 ArrayOutput output;
149 ShreddedBuffer buffer = new ShreddedBuffer();
150 boolean lastFlush = false;
151
152 public ShreddedWriter(ArrayOutput output) {
153 this.output = output;
154 }
155
156 public void close() throws IOException {
157 flush();
158 }
159
160 public final void processTuple(String identifier, String text) throws IOException {
161 if (lastFlush) {
162 lastFlush = false;
163 }
164 buffer.processTuple(identifier, text);
165 if (buffer.isFull())
166 flush();
167 }
168 public final void flushTuples(int pauseIndex) throws IOException {
169
170 while (buffer.getReadIndex() < pauseIndex) {
171
172 output.writeString(buffer.getIdentifier());
173 output.writeString(buffer.getText());
174 buffer.incrementTuple();
175 }
176 }
177 public void flush() throws IOException {
178 flushTuples(buffer.getWriteIndex());
179 buffer.reset();
180 lastFlush = true;
181 }
182 }
183 public static class ShreddedBuffer {
184
185 String[] identifiers;
186 String[] texts;
187 int writeTupleIndex = 0;
188 int readTupleIndex = 0;
189 int batchSize;
190
191 public ShreddedBuffer(int batchSize) {
192 this.batchSize = batchSize;
193
194 identifiers = new String[batchSize];
195 texts = new String[batchSize];
196 }
197
198 public ShreddedBuffer() {
199 this(10000);
200 }
201
202 public void processTuple(String identifier, String text) {
203 identifiers[writeTupleIndex] = identifier;
204 texts[writeTupleIndex] = text;
205 writeTupleIndex++;
206 }
207 public void resetData() {
208 writeTupleIndex = 0;
209 }
210
211 public void resetRead() {
212 readTupleIndex = 0;
213 }
214
215 public void reset() {
216 resetData();
217 resetRead();
218 }
219 public boolean isFull() {
220 return writeTupleIndex >= batchSize;
221 }
222
223 public boolean isEmpty() {
224 return writeTupleIndex == 0;
225 }
226
227 public boolean isAtEnd() {
228 return readTupleIndex >= writeTupleIndex;
229 }
230 public void incrementTuple() {
231 readTupleIndex++;
232 }
233 public int getReadIndex() {
234 return readTupleIndex;
235 }
236
237 public int getWriteIndex() {
238 return writeTupleIndex;
239 }
240 public String getIdentifier() {
241 assert readTupleIndex < writeTupleIndex;
242 return identifiers[readTupleIndex];
243 }
244 public String getText() {
245 assert readTupleIndex < writeTupleIndex;
246 return texts[readTupleIndex];
247 }
248 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
249 while (getReadIndex() < endIndex) {
250 output.processTuple(getIdentifier(), getText());
251 incrementTuple();
252 }
253 }
254
255 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
256 }
257
258 }
259 public static class ShreddedCombiner implements ReaderSource<AdditionalDocumentText>, ShreddedSource {
260 public ShreddedProcessor processor;
261 Collection<ShreddedReader> readers;
262 boolean closeOnExit = false;
263 boolean uninitialized = true;
264 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
265
266 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
267 this.readers = readers;
268 this.closeOnExit = closeOnExit;
269 }
270
271 public void setProcessor(Step processor) throws IncompatibleProcessorException {
272 if (processor instanceof ShreddedProcessor) {
273 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
274 } else if (processor instanceof AdditionalDocumentText.Processor) {
275 this.processor = new DuplicateEliminator(new TupleUnshredder((AdditionalDocumentText.Processor) processor));
276 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
277 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<AdditionalDocumentText>) processor));
278 } else {
279 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
280 }
281 }
282
283 public Class<AdditionalDocumentText> getOutputClass() {
284 return AdditionalDocumentText.class;
285 }
286
287 public void initialize() throws IOException {
288 for (ShreddedReader reader : readers) {
289 reader.fill();
290
291 if (!reader.getBuffer().isAtEnd())
292 queue.add(reader);
293 }
294
295 uninitialized = false;
296 }
297
298 public void run() throws IOException {
299 initialize();
300
301 while (queue.size() > 0) {
302 ShreddedReader top = queue.poll();
303 ShreddedReader next = null;
304 ShreddedBuffer nextBuffer = null;
305
306 assert !top.getBuffer().isAtEnd();
307
308 if (queue.size() > 0) {
309 next = queue.peek();
310 nextBuffer = next.getBuffer();
311 assert !nextBuffer.isAtEnd();
312 }
313
314 top.getBuffer().copyUntil(nextBuffer, processor);
315 if (top.getBuffer().isAtEnd())
316 top.fill();
317
318 if (!top.getBuffer().isAtEnd())
319 queue.add(top);
320 }
321
322 if (closeOnExit)
323 processor.close();
324 }
325
326 public AdditionalDocumentText read() throws IOException {
327 if (uninitialized)
328 initialize();
329
330 AdditionalDocumentText result = null;
331
332 while (queue.size() > 0) {
333 ShreddedReader top = queue.poll();
334 result = top.read();
335
336 if (result != null) {
337 if (top.getBuffer().isAtEnd())
338 top.fill();
339
340 queue.offer(top);
341 break;
342 }
343 }
344
345 return result;
346 }
347 }
348 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<AdditionalDocumentText>, ShreddedSource {
349 public ShreddedProcessor processor;
350 ShreddedBuffer buffer;
351 AdditionalDocumentText last = new AdditionalDocumentText();
352 long tupleCount = 0;
353 long bufferStartCount = 0;
354 ArrayInput input;
355
356 public ShreddedReader(ArrayInput input) {
357 this.input = input;
358 this.buffer = new ShreddedBuffer();
359 }
360
361 public ShreddedReader(ArrayInput input, int bufferSize) {
362 this.input = input;
363 this.buffer = new ShreddedBuffer(bufferSize);
364 }
365
366 public final int compareTo(ShreddedReader other) {
367 ShreddedBuffer otherBuffer = other.getBuffer();
368
369 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
370 return 0;
371 } else if (buffer.isAtEnd()) {
372 return -1;
373 } else if (otherBuffer.isAtEnd()) {
374 return 1;
375 }
376
377 int result = 0;
378 do {
379 } while (false);
380
381 return result;
382 }
383
384 public final ShreddedBuffer getBuffer() {
385 return buffer;
386 }
387
388 public final AdditionalDocumentText read() throws IOException {
389 if (buffer.isAtEnd()) {
390 fill();
391
392 if (buffer.isAtEnd()) {
393 return null;
394 }
395 }
396
397 assert !buffer.isAtEnd();
398 AdditionalDocumentText result = new AdditionalDocumentText();
399
400 result.identifier = buffer.getIdentifier();
401 result.text = buffer.getText();
402
403 buffer.incrementTuple();
404
405 return result;
406 }
407
408 public final void fill() throws IOException {
409 try {
410 buffer.reset();
411
412 if (tupleCount != 0) {
413 bufferStartCount = tupleCount;
414 }
415
416 while (!buffer.isFull()) {
417 buffer.processTuple(input.readString(), input.readString());
418 tupleCount++;
419 }
420 } catch(EOFException e) {}
421 }
422
423
424 public void run() throws IOException {
425 while (true) {
426 fill();
427
428 if (buffer.isAtEnd())
429 break;
430
431 buffer.copyUntil(null, processor);
432 }
433 processor.close();
434 }
435
436 public void setProcessor(Step processor) throws IncompatibleProcessorException {
437 if (processor instanceof ShreddedProcessor) {
438 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
439 } else if (processor instanceof AdditionalDocumentText.Processor) {
440 this.processor = new DuplicateEliminator(new TupleUnshredder((AdditionalDocumentText.Processor) processor));
441 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
442 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<AdditionalDocumentText>) processor));
443 } else {
444 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
445 }
446 }
447
448 public Class<AdditionalDocumentText> getOutputClass() {
449 return AdditionalDocumentText.class;
450 }
451 }
452
453 public static class DuplicateEliminator implements ShreddedProcessor {
454 public ShreddedProcessor processor;
455 AdditionalDocumentText last = new AdditionalDocumentText();
456
457 public DuplicateEliminator() {}
458 public DuplicateEliminator(ShreddedProcessor processor) {
459 this.processor = processor;
460 }
461
462 public void setShreddedProcessor(ShreddedProcessor processor) {
463 this.processor = processor;
464 }
465
466
467
468
469 public void processTuple(String identifier, String text) throws IOException {
470 processor.processTuple(identifier, text);
471 }
472
473 public void close() throws IOException {
474 processor.close();
475 }
476 }
477 public static class TupleUnshredder implements ShreddedProcessor {
478 AdditionalDocumentText last = new AdditionalDocumentText();
479 public org.galagosearch.tupleflow.Processor<AdditionalDocumentText> processor;
480
481 public TupleUnshredder(AdditionalDocumentText.Processor processor) {
482 this.processor = processor;
483 }
484
485 public TupleUnshredder(org.galagosearch.tupleflow.Processor<AdditionalDocumentText> processor) {
486 this.processor = processor;
487 }
488
489 public AdditionalDocumentText clone(AdditionalDocumentText object) {
490 AdditionalDocumentText result = new AdditionalDocumentText();
491 if (object == null) return result;
492 result.identifier = object.identifier;
493 result.text = object.text;
494 return result;
495 }
496
497
498 public void processTuple(String identifier, String text) throws IOException {
499 last.identifier = identifier;
500 last.text = text;
501 processor.process(clone(last));
502 }
503
504 public void close() throws IOException {
505 processor.close();
506 }
507 }
508 public static class TupleShredder implements Processor {
509 AdditionalDocumentText last = new AdditionalDocumentText();
510 public ShreddedProcessor processor;
511
512 public TupleShredder(ShreddedProcessor processor) {
513 this.processor = processor;
514 }
515
516 public AdditionalDocumentText clone(AdditionalDocumentText object) {
517 AdditionalDocumentText result = new AdditionalDocumentText();
518 if (object == null) return result;
519 result.identifier = object.identifier;
520 result.text = object.text;
521 return result;
522 }
523
524 public void process(AdditionalDocumentText object) throws IOException {
525 boolean processAll = false;
526 processor.processTuple(object.identifier, object.text);
527 }
528
529 public Class<AdditionalDocumentText> getInputClass() {
530 return AdditionalDocumentText.class;
531 }
532
533 public void close() throws IOException {
534 processor.close();
535 }
536 }
537 }
538 public static class IdentifierOrder implements Order<AdditionalDocumentText> {
539 public int hash(AdditionalDocumentText object) {
540 int h = 0;
541 h += Utility.hash(object.identifier);
542 return h;
543 }
544 public Comparator<AdditionalDocumentText> greaterThan() {
545 return new Comparator<AdditionalDocumentText>() {
546 public int compare(AdditionalDocumentText one, AdditionalDocumentText two) {
547 int result = 0;
548 do {
549 result = + Utility.compare(one.identifier, two.identifier);
550 if(result != 0) break;
551 } while (false);
552 return -result;
553 }
554 };
555 }
556 public Comparator<AdditionalDocumentText> lessThan() {
557 return new Comparator<AdditionalDocumentText>() {
558 public int compare(AdditionalDocumentText one, AdditionalDocumentText two) {
559 int result = 0;
560 do {
561 result = + Utility.compare(one.identifier, two.identifier);
562 if(result != 0) break;
563 } while (false);
564 return result;
565 }
566 };
567 }
568 public TypeReader<AdditionalDocumentText> orderedReader(ArrayInput _input) {
569 return new ShreddedReader(_input);
570 }
571
572 public TypeReader<AdditionalDocumentText> orderedReader(ArrayInput _input, int bufferSize) {
573 return new ShreddedReader(_input, bufferSize);
574 }
575 public OrderedWriter<AdditionalDocumentText> orderedWriter(ArrayOutput _output) {
576 ShreddedWriter w = new ShreddedWriter(_output);
577 return new OrderedWriterClass(w);
578 }
579 public static class OrderedWriterClass extends OrderedWriter< AdditionalDocumentText > {
580 AdditionalDocumentText last = null;
581 ShreddedWriter shreddedWriter = null;
582
583 public OrderedWriterClass(ShreddedWriter s) {
584 this.shreddedWriter = s;
585 }
586
587 public void process(AdditionalDocumentText object) throws IOException {
588 boolean processAll = false;
589 if (processAll || last == null || 0 != Utility.compare(object.identifier, last.identifier)) { processAll = true; shreddedWriter.processIdentifier(object.identifier); }
590 shreddedWriter.processTuple(object.text);
591 last = object;
592 }
593
594 public void close() throws IOException {
595 shreddedWriter.close();
596 }
597
598 public Class<AdditionalDocumentText> getInputClass() {
599 return AdditionalDocumentText.class;
600 }
601 }
602 public ReaderSource<AdditionalDocumentText> orderedCombiner(Collection<TypeReader<AdditionalDocumentText>> readers, boolean closeOnExit) {
603 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
604
605 for (TypeReader<AdditionalDocumentText> reader : readers) {
606 shreddedReaders.add((ShreddedReader)reader);
607 }
608
609 return new ShreddedCombiner(shreddedReaders, closeOnExit);
610 }
611 public AdditionalDocumentText clone(AdditionalDocumentText object) {
612 AdditionalDocumentText result = new AdditionalDocumentText();
613 if (object == null) return result;
614 result.identifier = object.identifier;
615 result.text = object.text;
616 return result;
617 }
618 public Class<AdditionalDocumentText> getOrderedClass() {
619 return AdditionalDocumentText.class;
620 }
621 public String[] getOrderSpec() {
622 return new String[] {"+identifier"};
623 }
624
625 public static String getSpecString() {
626 return "+identifier";
627 }
628
629 public interface ShreddedProcessor extends Step {
630 public void processIdentifier(String identifier) throws IOException;
631 public void processTuple(String text) throws IOException;
632 public void close() throws IOException;
633 }
634 public interface ShreddedSource extends Step {
635 }
636
637 public static class ShreddedWriter implements ShreddedProcessor {
638 ArrayOutput output;
639 ShreddedBuffer buffer = new ShreddedBuffer();
640 String lastIdentifier;
641 boolean lastFlush = false;
642
643 public ShreddedWriter(ArrayOutput output) {
644 this.output = output;
645 }
646
647 public void close() throws IOException {
648 flush();
649 }
650
651 public void processIdentifier(String identifier) {
652 lastIdentifier = identifier;
653 buffer.processIdentifier(identifier);
654 }
655 public final void processTuple(String text) throws IOException {
656 if (lastFlush) {
657 if(buffer.identifiers.size() == 0) buffer.processIdentifier(lastIdentifier);
658 lastFlush = false;
659 }
660 buffer.processTuple(text);
661 if (buffer.isFull())
662 flush();
663 }
664 public final void flushTuples(int pauseIndex) throws IOException {
665
666 while (buffer.getReadIndex() < pauseIndex) {
667
668 output.writeString(buffer.getText());
669 buffer.incrementTuple();
670 }
671 }
672 public final void flushIdentifier(int pauseIndex) throws IOException {
673 while (buffer.getReadIndex() < pauseIndex) {
674 int nextPause = buffer.getIdentifierEndIndex();
675 int count = nextPause - buffer.getReadIndex();
676
677 output.writeString(buffer.getIdentifier());
678 output.writeInt(count);
679 buffer.incrementIdentifier();
680
681 flushTuples(nextPause);
682 assert nextPause == buffer.getReadIndex();
683 }
684 }
685 public void flush() throws IOException {
686 flushIdentifier(buffer.getWriteIndex());
687 buffer.reset();
688 lastFlush = true;
689 }
690 }
691 public static class ShreddedBuffer {
692 ArrayList<String> identifiers = new ArrayList();
693 ArrayList<Integer> identifierTupleIdx = new ArrayList();
694 int identifierReadIdx = 0;
695
696 String[] texts;
697 int writeTupleIndex = 0;
698 int readTupleIndex = 0;
699 int batchSize;
700
701 public ShreddedBuffer(int batchSize) {
702 this.batchSize = batchSize;
703
704 texts = new String[batchSize];
705 }
706
707 public ShreddedBuffer() {
708 this(10000);
709 }
710
711 public void processIdentifier(String identifier) {
712 identifiers.add(identifier);
713 identifierTupleIdx.add(writeTupleIndex);
714 }
715 public void processTuple(String text) {
716 assert identifiers.size() > 0;
717 texts[writeTupleIndex] = text;
718 writeTupleIndex++;
719 }
720 public void resetData() {
721 identifiers.clear();
722 identifierTupleIdx.clear();
723 writeTupleIndex = 0;
724 }
725
726 public void resetRead() {
727 readTupleIndex = 0;
728 identifierReadIdx = 0;
729 }
730
731 public void reset() {
732 resetData();
733 resetRead();
734 }
735 public boolean isFull() {
736 return writeTupleIndex >= batchSize;
737 }
738
739 public boolean isEmpty() {
740 return writeTupleIndex == 0;
741 }
742
743 public boolean isAtEnd() {
744 return readTupleIndex >= writeTupleIndex;
745 }
746 public void incrementIdentifier() {
747 identifierReadIdx++;
748 }
749
750 public void autoIncrementIdentifier() {
751 while (readTupleIndex >= getIdentifierEndIndex() && readTupleIndex < writeTupleIndex)
752 identifierReadIdx++;
753 }
754 public void incrementTuple() {
755 readTupleIndex++;
756 }
757 public int getIdentifierEndIndex() {
758 if ((identifierReadIdx+1) >= identifierTupleIdx.size())
759 return writeTupleIndex;
760 return identifierTupleIdx.get(identifierReadIdx+1);
761 }
762 public int getReadIndex() {
763 return readTupleIndex;
764 }
765
766 public int getWriteIndex() {
767 return writeTupleIndex;
768 }
769 public String getIdentifier() {
770 assert readTupleIndex < writeTupleIndex;
771 assert identifierReadIdx < identifiers.size();
772
773 return identifiers.get(identifierReadIdx);
774 }
775 public String getText() {
776 assert readTupleIndex < writeTupleIndex;
777 return texts[readTupleIndex];
778 }
779 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
780 while (getReadIndex() < endIndex) {
781 output.processTuple(getText());
782 incrementTuple();
783 }
784 }
785 public void copyUntilIndexIdentifier(int endIndex, ShreddedProcessor output) throws IOException {
786 while (getReadIndex() < endIndex) {
787 output.processIdentifier(getIdentifier());
788 assert getIdentifierEndIndex() <= endIndex;
789 copyTuples(getIdentifierEndIndex(), output);
790 incrementIdentifier();
791 }
792 }
793 public void copyUntilIdentifier(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
794 while (!isAtEnd()) {
795 if (other != null) {
796 assert !other.isAtEnd();
797 int c = + Utility.compare(getIdentifier(), other.getIdentifier());
798
799 if (c > 0) {
800 break;
801 }
802
803 output.processIdentifier(getIdentifier());
804
805 copyTuples(getIdentifierEndIndex(), output);
806 } else {
807 output.processIdentifier(getIdentifier());
808 copyTuples(getIdentifierEndIndex(), output);
809 }
810 incrementIdentifier();
811
812
813 }
814 }
815 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
816 copyUntilIdentifier(other, output);
817 }
818
819 }
820 public static class ShreddedCombiner implements ReaderSource<AdditionalDocumentText>, ShreddedSource {
821 public ShreddedProcessor processor;
822 Collection<ShreddedReader> readers;
823 boolean closeOnExit = false;
824 boolean uninitialized = true;
825 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
826
827 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
828 this.readers = readers;
829 this.closeOnExit = closeOnExit;
830 }
831
832 public void setProcessor(Step processor) throws IncompatibleProcessorException {
833 if (processor instanceof ShreddedProcessor) {
834 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
835 } else if (processor instanceof AdditionalDocumentText.Processor) {
836 this.processor = new DuplicateEliminator(new TupleUnshredder((AdditionalDocumentText.Processor) processor));
837 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
838 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<AdditionalDocumentText>) processor));
839 } else {
840 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
841 }
842 }
843
844 public Class<AdditionalDocumentText> getOutputClass() {
845 return AdditionalDocumentText.class;
846 }
847
848 public void initialize() throws IOException {
849 for (ShreddedReader reader : readers) {
850 reader.fill();
851
852 if (!reader.getBuffer().isAtEnd())
853 queue.add(reader);
854 }
855
856 uninitialized = false;
857 }
858
859 public void run() throws IOException {
860 initialize();
861
862 while (queue.size() > 0) {
863 ShreddedReader top = queue.poll();
864 ShreddedReader next = null;
865 ShreddedBuffer nextBuffer = null;
866
867 assert !top.getBuffer().isAtEnd();
868
869 if (queue.size() > 0) {
870 next = queue.peek();
871 nextBuffer = next.getBuffer();
872 assert !nextBuffer.isAtEnd();
873 }
874
875 top.getBuffer().copyUntil(nextBuffer, processor);
876 if (top.getBuffer().isAtEnd())
877 top.fill();
878
879 if (!top.getBuffer().isAtEnd())
880 queue.add(top);
881 }
882
883 if (closeOnExit)
884 processor.close();
885 }
886
887 public AdditionalDocumentText read() throws IOException {
888 if (uninitialized)
889 initialize();
890
891 AdditionalDocumentText result = null;
892
893 while (queue.size() > 0) {
894 ShreddedReader top = queue.poll();
895 result = top.read();
896
897 if (result != null) {
898 if (top.getBuffer().isAtEnd())
899 top.fill();
900
901 queue.offer(top);
902 break;
903 }
904 }
905
906 return result;
907 }
908 }
909 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<AdditionalDocumentText>, ShreddedSource {
910 public ShreddedProcessor processor;
911 ShreddedBuffer buffer;
912 AdditionalDocumentText last = new AdditionalDocumentText();
913 long updateIdentifierCount = -1;
914 long tupleCount = 0;
915 long bufferStartCount = 0;
916 ArrayInput input;
917
918 public ShreddedReader(ArrayInput input) {
919 this.input = input;
920 this.buffer = new ShreddedBuffer();
921 }
922
923 public ShreddedReader(ArrayInput input, int bufferSize) {
924 this.input = input;
925 this.buffer = new ShreddedBuffer(bufferSize);
926 }
927
928 public final int compareTo(ShreddedReader other) {
929 ShreddedBuffer otherBuffer = other.getBuffer();
930
931 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
932 return 0;
933 } else if (buffer.isAtEnd()) {
934 return -1;
935 } else if (otherBuffer.isAtEnd()) {
936 return 1;
937 }
938
939 int result = 0;
940 do {
941 result = + Utility.compare(buffer.getIdentifier(), otherBuffer.getIdentifier());
942 if(result != 0) break;
943 } while (false);
944
945 return result;
946 }
947
948 public final ShreddedBuffer getBuffer() {
949 return buffer;
950 }
951
952 public final AdditionalDocumentText read() throws IOException {
953 if (buffer.isAtEnd()) {
954 fill();
955
956 if (buffer.isAtEnd()) {
957 return null;
958 }
959 }
960
961 assert !buffer.isAtEnd();
962 AdditionalDocumentText result = new AdditionalDocumentText();
963
964 result.identifier = buffer.getIdentifier();
965 result.text = buffer.getText();
966
967 buffer.incrementTuple();
968 buffer.autoIncrementIdentifier();
969
970 return result;
971 }
972
973 public final void fill() throws IOException {
974 try {
975 buffer.reset();
976
977 if (tupleCount != 0) {
978
979 if(updateIdentifierCount - tupleCount > 0) {
980 buffer.identifiers.add(last.identifier);
981 buffer.identifierTupleIdx.add((int) (updateIdentifierCount - tupleCount));
982 }
983 bufferStartCount = tupleCount;
984 }
985
986 while (!buffer.isFull()) {
987 updateIdentifier();
988 buffer.processTuple(input.readString());
989 tupleCount++;
990 }
991 } catch(EOFException e) {}
992 }
993
994 public final void updateIdentifier() throws IOException {
995 if (updateIdentifierCount > tupleCount)
996 return;
997
998 last.identifier = input.readString();
999 updateIdentifierCount = tupleCount + input.readInt();
1000
1001 buffer.processIdentifier(last.identifier);
1002 }
1003
1004 public void run() throws IOException {
1005 while (true) {
1006 fill();
1007
1008 if (buffer.isAtEnd())
1009 break;
1010
1011 buffer.copyUntil(null, processor);
1012 }
1013 processor.close();
1014 }
1015
1016 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1017 if (processor instanceof ShreddedProcessor) {
1018 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1019 } else if (processor instanceof AdditionalDocumentText.Processor) {
1020 this.processor = new DuplicateEliminator(new TupleUnshredder((AdditionalDocumentText.Processor) processor));
1021 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1022 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<AdditionalDocumentText>) processor));
1023 } else {
1024 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1025 }
1026 }
1027
1028 public Class<AdditionalDocumentText> getOutputClass() {
1029 return AdditionalDocumentText.class;
1030 }
1031 }
1032
1033 public static class DuplicateEliminator implements ShreddedProcessor {
1034 public ShreddedProcessor processor;
1035 AdditionalDocumentText last = new AdditionalDocumentText();
1036 boolean identifierProcess = true;
1037
1038 public DuplicateEliminator() {}
1039 public DuplicateEliminator(ShreddedProcessor processor) {
1040 this.processor = processor;
1041 }
1042
1043 public void setShreddedProcessor(ShreddedProcessor processor) {
1044 this.processor = processor;
1045 }
1046
1047 public void processIdentifier(String identifier) throws IOException {
1048 if (identifierProcess || Utility.compare(identifier, last.identifier) != 0) {
1049 last.identifier = identifier;
1050 processor.processIdentifier(identifier);
1051 identifierProcess = false;
1052 }
1053 }
1054
1055 public void resetIdentifier() {
1056 identifierProcess = true;
1057 }
1058
1059 public void processTuple(String text) throws IOException {
1060 processor.processTuple(text);
1061 }
1062
1063 public void close() throws IOException {
1064 processor.close();
1065 }
1066 }
1067 public static class TupleUnshredder implements ShreddedProcessor {
1068 AdditionalDocumentText last = new AdditionalDocumentText();
1069 public org.galagosearch.tupleflow.Processor<AdditionalDocumentText> processor;
1070
1071 public TupleUnshredder(AdditionalDocumentText.Processor processor) {
1072 this.processor = processor;
1073 }
1074
1075 public TupleUnshredder(org.galagosearch.tupleflow.Processor<AdditionalDocumentText> processor) {
1076 this.processor = processor;
1077 }
1078
1079 public AdditionalDocumentText clone(AdditionalDocumentText object) {
1080 AdditionalDocumentText result = new AdditionalDocumentText();
1081 if (object == null) return result;
1082 result.identifier = object.identifier;
1083 result.text = object.text;
1084 return result;
1085 }
1086
1087 public void processIdentifier(String identifier) throws IOException {
1088 last.identifier = identifier;
1089 }
1090
1091
1092 public void processTuple(String text) throws IOException {
1093 last.text = text;
1094 processor.process(clone(last));
1095 }
1096
1097 public void close() throws IOException {
1098 processor.close();
1099 }
1100 }
1101 public static class TupleShredder implements Processor {
1102 AdditionalDocumentText last = new AdditionalDocumentText();
1103 public ShreddedProcessor processor;
1104
1105 public TupleShredder(ShreddedProcessor processor) {
1106 this.processor = processor;
1107 }
1108
1109 public AdditionalDocumentText clone(AdditionalDocumentText object) {
1110 AdditionalDocumentText result = new AdditionalDocumentText();
1111 if (object == null) return result;
1112 result.identifier = object.identifier;
1113 result.text = object.text;
1114 return result;
1115 }
1116
1117 public void process(AdditionalDocumentText object) throws IOException {
1118 boolean processAll = false;
1119 if(last == null || Utility.compare(last.identifier, object.identifier) != 0 || processAll) { processor.processIdentifier(object.identifier); processAll = true; }
1120 processor.processTuple(object.text);
1121 }
1122
1123 public Class<AdditionalDocumentText> getInputClass() {
1124 return AdditionalDocumentText.class;
1125 }
1126
1127 public void close() throws IOException {
1128 processor.close();
1129 }
1130 }
1131 }
1132 }