1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class NumberedLink implements Type<NumberedLink> {
25 public long source;
26 public long destination;
27
28 public NumberedLink() {}
29 public NumberedLink(long source, long destination) {
30 this.source = source;
31 this.destination = destination;
32 }
33
34 public String toString() {
35 return String.format("%d,%d",
36 source, destination);
37 }
38
39 public Order<NumberedLink> getOrder(String... spec) {
40 if (Arrays.equals(spec, new String[] { "+source" })) {
41 return new SourceOrder();
42 }
43 if (Arrays.equals(spec, new String[] { "+destination" })) {
44 return new DestinationOrder();
45 }
46 return null;
47 }
48
49 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<NumberedLink> {
50 public void process(NumberedLink object) throws IOException;
51 public void close() throws IOException;
52 }
53 public interface Source extends Step {
54 }
55 public static class SourceOrder implements Order<NumberedLink> {
56 public int hash(NumberedLink object) {
57 int h = 0;
58 h += Utility.hash(object.source);
59 return h;
60 }
61 public Comparator<NumberedLink> greaterThan() {
62 return new Comparator<NumberedLink>() {
63 public int compare(NumberedLink one, NumberedLink two) {
64 int result = 0;
65 do {
66 result = + Utility.compare(one.source, two.source);
67 if(result != 0) break;
68 } while (false);
69 return -result;
70 }
71 };
72 }
73 public Comparator<NumberedLink> lessThan() {
74 return new Comparator<NumberedLink>() {
75 public int compare(NumberedLink one, NumberedLink two) {
76 int result = 0;
77 do {
78 result = + Utility.compare(one.source, two.source);
79 if(result != 0) break;
80 } while (false);
81 return result;
82 }
83 };
84 }
85 public TypeReader<NumberedLink> orderedReader(ArrayInput _input) {
86 return new ShreddedReader(_input);
87 }
88
89 public TypeReader<NumberedLink> orderedReader(ArrayInput _input, int bufferSize) {
90 return new ShreddedReader(_input, bufferSize);
91 }
92 public OrderedWriter<NumberedLink> orderedWriter(ArrayOutput _output) {
93 ShreddedWriter w = new ShreddedWriter(_output);
94 return new OrderedWriterClass(w);
95 }
96 public static class OrderedWriterClass extends OrderedWriter< NumberedLink > {
97 NumberedLink last = null;
98 ShreddedWriter shreddedWriter = null;
99
100 public OrderedWriterClass(ShreddedWriter s) {
101 this.shreddedWriter = s;
102 }
103
104 public void process(NumberedLink object) throws IOException {
105 boolean processAll = false;
106 if (processAll || last == null || 0 != Utility.compare(object.source, last.source)) { processAll = true; shreddedWriter.processSource(object.source); }
107 shreddedWriter.processTuple(object.destination);
108 last = object;
109 }
110
111 public void close() throws IOException {
112 shreddedWriter.close();
113 }
114
115 public Class<NumberedLink> getInputClass() {
116 return NumberedLink.class;
117 }
118 }
119 public ReaderSource<NumberedLink> orderedCombiner(Collection<TypeReader<NumberedLink>> readers, boolean closeOnExit) {
120 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
121
122 for (TypeReader<NumberedLink> reader : readers) {
123 shreddedReaders.add((ShreddedReader)reader);
124 }
125
126 return new ShreddedCombiner(shreddedReaders, closeOnExit);
127 }
128 public NumberedLink clone(NumberedLink object) {
129 NumberedLink result = new NumberedLink();
130 if (object == null) return result;
131 result.source = object.source;
132 result.destination = object.destination;
133 return result;
134 }
135 public Class<NumberedLink> getOrderedClass() {
136 return NumberedLink.class;
137 }
138 public String[] getOrderSpec() {
139 return new String[] {"+source"};
140 }
141
142 public static String getSpecString() {
143 return "+source";
144 }
145
146 public interface ShreddedProcessor extends Step {
147 public void processSource(long source) throws IOException;
148 public void processTuple(long destination) throws IOException;
149 public void close() throws IOException;
150 }
151 public interface ShreddedSource extends Step {
152 }
153
154 public static class ShreddedWriter implements ShreddedProcessor {
155 ArrayOutput output;
156 ShreddedBuffer buffer = new ShreddedBuffer();
157 long lastSource;
158 boolean lastFlush = false;
159
160 public ShreddedWriter(ArrayOutput output) {
161 this.output = output;
162 }
163
164 public void close() throws IOException {
165 flush();
166 }
167
168 public void processSource(long source) {
169 lastSource = source;
170 buffer.processSource(source);
171 }
172 public final void processTuple(long destination) throws IOException {
173 if (lastFlush) {
174 if(buffer.sources.size() == 0) buffer.processSource(lastSource);
175 lastFlush = false;
176 }
177 buffer.processTuple(destination);
178 if (buffer.isFull())
179 flush();
180 }
181 public final void flushTuples(int pauseIndex) throws IOException {
182
183 while (buffer.getReadIndex() < pauseIndex) {
184
185 output.writeLong(buffer.getDestination());
186 buffer.incrementTuple();
187 }
188 }
189 public final void flushSource(int pauseIndex) throws IOException {
190 while (buffer.getReadIndex() < pauseIndex) {
191 int nextPause = buffer.getSourceEndIndex();
192 int count = nextPause - buffer.getReadIndex();
193
194 output.writeLong(buffer.getSource());
195 output.writeInt(count);
196 buffer.incrementSource();
197
198 flushTuples(nextPause);
199 assert nextPause == buffer.getReadIndex();
200 }
201 }
202 public void flush() throws IOException {
203 flushSource(buffer.getWriteIndex());
204 buffer.reset();
205 lastFlush = true;
206 }
207 }
208 public static class ShreddedBuffer {
209 ArrayList<Long> sources = new ArrayList();
210 ArrayList<Integer> sourceTupleIdx = new ArrayList();
211 int sourceReadIdx = 0;
212
213 long[] destinations;
214 int writeTupleIndex = 0;
215 int readTupleIndex = 0;
216 int batchSize;
217
218 public ShreddedBuffer(int batchSize) {
219 this.batchSize = batchSize;
220
221 destinations = new long[batchSize];
222 }
223
224 public ShreddedBuffer() {
225 this(10000);
226 }
227
228 public void processSource(long source) {
229 sources.add(source);
230 sourceTupleIdx.add(writeTupleIndex);
231 }
232 public void processTuple(long destination) {
233 assert sources.size() > 0;
234 destinations[writeTupleIndex] = destination;
235 writeTupleIndex++;
236 }
237 public void resetData() {
238 sources.clear();
239 sourceTupleIdx.clear();
240 writeTupleIndex = 0;
241 }
242
243 public void resetRead() {
244 readTupleIndex = 0;
245 sourceReadIdx = 0;
246 }
247
248 public void reset() {
249 resetData();
250 resetRead();
251 }
252 public boolean isFull() {
253 return writeTupleIndex >= batchSize;
254 }
255
256 public boolean isEmpty() {
257 return writeTupleIndex == 0;
258 }
259
260 public boolean isAtEnd() {
261 return readTupleIndex >= writeTupleIndex;
262 }
263 public void incrementSource() {
264 sourceReadIdx++;
265 }
266
267 public void autoIncrementSource() {
268 while (readTupleIndex >= getSourceEndIndex() && readTupleIndex < writeTupleIndex)
269 sourceReadIdx++;
270 }
271 public void incrementTuple() {
272 readTupleIndex++;
273 }
274 public int getSourceEndIndex() {
275 if ((sourceReadIdx+1) >= sourceTupleIdx.size())
276 return writeTupleIndex;
277 return sourceTupleIdx.get(sourceReadIdx+1);
278 }
279 public int getReadIndex() {
280 return readTupleIndex;
281 }
282
283 public int getWriteIndex() {
284 return writeTupleIndex;
285 }
286 public long getSource() {
287 assert readTupleIndex < writeTupleIndex;
288 assert sourceReadIdx < sources.size();
289
290 return sources.get(sourceReadIdx);
291 }
292 public long getDestination() {
293 assert readTupleIndex < writeTupleIndex;
294 return destinations[readTupleIndex];
295 }
296 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
297 while (getReadIndex() < endIndex) {
298 output.processTuple(getDestination());
299 incrementTuple();
300 }
301 }
302 public void copyUntilIndexSource(int endIndex, ShreddedProcessor output) throws IOException {
303 while (getReadIndex() < endIndex) {
304 output.processSource(getSource());
305 assert getSourceEndIndex() <= endIndex;
306 copyTuples(getSourceEndIndex(), output);
307 incrementSource();
308 }
309 }
310 public void copyUntilSource(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
311 while (!isAtEnd()) {
312 if (other != null) {
313 assert !other.isAtEnd();
314 int c = + Utility.compare(getSource(), other.getSource());
315
316 if (c > 0) {
317 break;
318 }
319
320 output.processSource(getSource());
321
322 copyTuples(getSourceEndIndex(), output);
323 } else {
324 output.processSource(getSource());
325 copyTuples(getSourceEndIndex(), output);
326 }
327 incrementSource();
328
329
330 }
331 }
332 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
333 copyUntilSource(other, output);
334 }
335
336 }
337 public static class ShreddedCombiner implements ReaderSource<NumberedLink>, ShreddedSource {
338 public ShreddedProcessor processor;
339 Collection<ShreddedReader> readers;
340 boolean closeOnExit = false;
341 boolean uninitialized = true;
342 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
343
344 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
345 this.readers = readers;
346 this.closeOnExit = closeOnExit;
347 }
348
349 public void setProcessor(Step processor) throws IncompatibleProcessorException {
350 if (processor instanceof ShreddedProcessor) {
351 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
352 } else if (processor instanceof NumberedLink.Processor) {
353 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedLink.Processor) processor));
354 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
355 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedLink>) processor));
356 } else {
357 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
358 }
359 }
360
361 public Class<NumberedLink> getOutputClass() {
362 return NumberedLink.class;
363 }
364
365 public void initialize() throws IOException {
366 for (ShreddedReader reader : readers) {
367 reader.fill();
368
369 if (!reader.getBuffer().isAtEnd())
370 queue.add(reader);
371 }
372
373 uninitialized = false;
374 }
375
376 public void run() throws IOException {
377 initialize();
378
379 while (queue.size() > 0) {
380 ShreddedReader top = queue.poll();
381 ShreddedReader next = null;
382 ShreddedBuffer nextBuffer = null;
383
384 assert !top.getBuffer().isAtEnd();
385
386 if (queue.size() > 0) {
387 next = queue.peek();
388 nextBuffer = next.getBuffer();
389 assert !nextBuffer.isAtEnd();
390 }
391
392 top.getBuffer().copyUntil(nextBuffer, processor);
393 if (top.getBuffer().isAtEnd())
394 top.fill();
395
396 if (!top.getBuffer().isAtEnd())
397 queue.add(top);
398 }
399
400 if (closeOnExit)
401 processor.close();
402 }
403
404 public NumberedLink read() throws IOException {
405 if (uninitialized)
406 initialize();
407
408 NumberedLink result = null;
409
410 while (queue.size() > 0) {
411 ShreddedReader top = queue.poll();
412 result = top.read();
413
414 if (result != null) {
415 if (top.getBuffer().isAtEnd())
416 top.fill();
417
418 queue.offer(top);
419 break;
420 }
421 }
422
423 return result;
424 }
425 }
426 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedLink>, ShreddedSource {
427 public ShreddedProcessor processor;
428 ShreddedBuffer buffer;
429 NumberedLink last = new NumberedLink();
430 long updateSourceCount = -1;
431 long tupleCount = 0;
432 long bufferStartCount = 0;
433 ArrayInput input;
434
435 public ShreddedReader(ArrayInput input) {
436 this.input = input;
437 this.buffer = new ShreddedBuffer();
438 }
439
440 public ShreddedReader(ArrayInput input, int bufferSize) {
441 this.input = input;
442 this.buffer = new ShreddedBuffer(bufferSize);
443 }
444
445 public final int compareTo(ShreddedReader other) {
446 ShreddedBuffer otherBuffer = other.getBuffer();
447
448 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
449 return 0;
450 } else if (buffer.isAtEnd()) {
451 return -1;
452 } else if (otherBuffer.isAtEnd()) {
453 return 1;
454 }
455
456 int result = 0;
457 do {
458 result = + Utility.compare(buffer.getSource(), otherBuffer.getSource());
459 if(result != 0) break;
460 } while (false);
461
462 return result;
463 }
464
465 public final ShreddedBuffer getBuffer() {
466 return buffer;
467 }
468
469 public final NumberedLink read() throws IOException {
470 if (buffer.isAtEnd()) {
471 fill();
472
473 if (buffer.isAtEnd()) {
474 return null;
475 }
476 }
477
478 assert !buffer.isAtEnd();
479 NumberedLink result = new NumberedLink();
480
481 result.source = buffer.getSource();
482 result.destination = buffer.getDestination();
483
484 buffer.incrementTuple();
485 buffer.autoIncrementSource();
486
487 return result;
488 }
489
490 public final void fill() throws IOException {
491 try {
492 buffer.reset();
493
494 if (tupleCount != 0) {
495
496 if(updateSourceCount - tupleCount > 0) {
497 buffer.sources.add(last.source);
498 buffer.sourceTupleIdx.add((int) (updateSourceCount - tupleCount));
499 }
500 bufferStartCount = tupleCount;
501 }
502
503 while (!buffer.isFull()) {
504 updateSource();
505 buffer.processTuple(input.readLong());
506 tupleCount++;
507 }
508 } catch(EOFException e) {}
509 }
510
511 public final void updateSource() throws IOException {
512 if (updateSourceCount > tupleCount)
513 return;
514
515 last.source = input.readLong();
516 updateSourceCount = tupleCount + input.readInt();
517
518 buffer.processSource(last.source);
519 }
520
521 public void run() throws IOException {
522 while (true) {
523 fill();
524
525 if (buffer.isAtEnd())
526 break;
527
528 buffer.copyUntil(null, processor);
529 }
530 processor.close();
531 }
532
533 public void setProcessor(Step processor) throws IncompatibleProcessorException {
534 if (processor instanceof ShreddedProcessor) {
535 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
536 } else if (processor instanceof NumberedLink.Processor) {
537 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedLink.Processor) processor));
538 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
539 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedLink>) processor));
540 } else {
541 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
542 }
543 }
544
545 public Class<NumberedLink> getOutputClass() {
546 return NumberedLink.class;
547 }
548 }
549
550 public static class DuplicateEliminator implements ShreddedProcessor {
551 public ShreddedProcessor processor;
552 NumberedLink last = new NumberedLink();
553 boolean sourceProcess = true;
554
555 public DuplicateEliminator() {}
556 public DuplicateEliminator(ShreddedProcessor processor) {
557 this.processor = processor;
558 }
559
560 public void setShreddedProcessor(ShreddedProcessor processor) {
561 this.processor = processor;
562 }
563
564 public void processSource(long source) throws IOException {
565 if (sourceProcess || Utility.compare(source, last.source) != 0) {
566 last.source = source;
567 processor.processSource(source);
568 sourceProcess = false;
569 }
570 }
571
572 public void resetSource() {
573 sourceProcess = true;
574 }
575
576 public void processTuple(long destination) throws IOException {
577 processor.processTuple(destination);
578 }
579
580 public void close() throws IOException {
581 processor.close();
582 }
583 }
584 public static class TupleUnshredder implements ShreddedProcessor {
585 NumberedLink last = new NumberedLink();
586 public org.galagosearch.tupleflow.Processor<NumberedLink> processor;
587
588 public TupleUnshredder(NumberedLink.Processor processor) {
589 this.processor = processor;
590 }
591
592 public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedLink> processor) {
593 this.processor = processor;
594 }
595
596 public NumberedLink clone(NumberedLink object) {
597 NumberedLink result = new NumberedLink();
598 if (object == null) return result;
599 result.source = object.source;
600 result.destination = object.destination;
601 return result;
602 }
603
604 public void processSource(long source) throws IOException {
605 last.source = source;
606 }
607
608
609 public void processTuple(long destination) throws IOException {
610 last.destination = destination;
611 processor.process(clone(last));
612 }
613
614 public void close() throws IOException {
615 processor.close();
616 }
617 }
618 public static class TupleShredder implements Processor {
619 NumberedLink last = new NumberedLink();
620 public ShreddedProcessor processor;
621
622 public TupleShredder(ShreddedProcessor processor) {
623 this.processor = processor;
624 }
625
626 public NumberedLink clone(NumberedLink object) {
627 NumberedLink result = new NumberedLink();
628 if (object == null) return result;
629 result.source = object.source;
630 result.destination = object.destination;
631 return result;
632 }
633
634 public void process(NumberedLink object) throws IOException {
635 boolean processAll = false;
636 if(last == null || Utility.compare(last.source, object.source) != 0 || processAll) { processor.processSource(object.source); processAll = true; }
637 processor.processTuple(object.destination);
638 }
639
640 public Class<NumberedLink> getInputClass() {
641 return NumberedLink.class;
642 }
643
644 public void close() throws IOException {
645 processor.close();
646 }
647 }
648 }
649 public static class DestinationOrder implements Order<NumberedLink> {
650 public int hash(NumberedLink object) {
651 int h = 0;
652 h += Utility.hash(object.destination);
653 return h;
654 }
655 public Comparator<NumberedLink> greaterThan() {
656 return new Comparator<NumberedLink>() {
657 public int compare(NumberedLink one, NumberedLink two) {
658 int result = 0;
659 do {
660 result = + Utility.compare(one.destination, two.destination);
661 if(result != 0) break;
662 } while (false);
663 return -result;
664 }
665 };
666 }
667 public Comparator<NumberedLink> lessThan() {
668 return new Comparator<NumberedLink>() {
669 public int compare(NumberedLink one, NumberedLink two) {
670 int result = 0;
671 do {
672 result = + Utility.compare(one.destination, two.destination);
673 if(result != 0) break;
674 } while (false);
675 return result;
676 }
677 };
678 }
679 public TypeReader<NumberedLink> orderedReader(ArrayInput _input) {
680 return new ShreddedReader(_input);
681 }
682
683 public TypeReader<NumberedLink> orderedReader(ArrayInput _input, int bufferSize) {
684 return new ShreddedReader(_input, bufferSize);
685 }
686 public OrderedWriter<NumberedLink> orderedWriter(ArrayOutput _output) {
687 ShreddedWriter w = new ShreddedWriter(_output);
688 return new OrderedWriterClass(w);
689 }
690 public static class OrderedWriterClass extends OrderedWriter< NumberedLink > {
691 NumberedLink last = null;
692 ShreddedWriter shreddedWriter = null;
693
694 public OrderedWriterClass(ShreddedWriter s) {
695 this.shreddedWriter = s;
696 }
697
698 public void process(NumberedLink object) throws IOException {
699 boolean processAll = false;
700 if (processAll || last == null || 0 != Utility.compare(object.destination, last.destination)) { processAll = true; shreddedWriter.processDestination(object.destination); }
701 shreddedWriter.processTuple(object.source);
702 last = object;
703 }
704
705 public void close() throws IOException {
706 shreddedWriter.close();
707 }
708
709 public Class<NumberedLink> getInputClass() {
710 return NumberedLink.class;
711 }
712 }
713 public ReaderSource<NumberedLink> orderedCombiner(Collection<TypeReader<NumberedLink>> readers, boolean closeOnExit) {
714 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
715
716 for (TypeReader<NumberedLink> reader : readers) {
717 shreddedReaders.add((ShreddedReader)reader);
718 }
719
720 return new ShreddedCombiner(shreddedReaders, closeOnExit);
721 }
722 public NumberedLink clone(NumberedLink object) {
723 NumberedLink result = new NumberedLink();
724 if (object == null) return result;
725 result.source = object.source;
726 result.destination = object.destination;
727 return result;
728 }
729 public Class<NumberedLink> getOrderedClass() {
730 return NumberedLink.class;
731 }
732 public String[] getOrderSpec() {
733 return new String[] {"+destination"};
734 }
735
736 public static String getSpecString() {
737 return "+destination";
738 }
739
740 public interface ShreddedProcessor extends Step {
741 public void processDestination(long destination) throws IOException;
742 public void processTuple(long source) throws IOException;
743 public void close() throws IOException;
744 }
745 public interface ShreddedSource extends Step {
746 }
747
748 public static class ShreddedWriter implements ShreddedProcessor {
749 ArrayOutput output;
750 ShreddedBuffer buffer = new ShreddedBuffer();
751 long lastDestination;
752 boolean lastFlush = false;
753
754 public ShreddedWriter(ArrayOutput output) {
755 this.output = output;
756 }
757
758 public void close() throws IOException {
759 flush();
760 }
761
762 public void processDestination(long destination) {
763 lastDestination = destination;
764 buffer.processDestination(destination);
765 }
766 public final void processTuple(long source) throws IOException {
767 if (lastFlush) {
768 if(buffer.destinations.size() == 0) buffer.processDestination(lastDestination);
769 lastFlush = false;
770 }
771 buffer.processTuple(source);
772 if (buffer.isFull())
773 flush();
774 }
775 public final void flushTuples(int pauseIndex) throws IOException {
776
777 while (buffer.getReadIndex() < pauseIndex) {
778
779 output.writeLong(buffer.getSource());
780 buffer.incrementTuple();
781 }
782 }
783 public final void flushDestination(int pauseIndex) throws IOException {
784 while (buffer.getReadIndex() < pauseIndex) {
785 int nextPause = buffer.getDestinationEndIndex();
786 int count = nextPause - buffer.getReadIndex();
787
788 output.writeLong(buffer.getDestination());
789 output.writeInt(count);
790 buffer.incrementDestination();
791
792 flushTuples(nextPause);
793 assert nextPause == buffer.getReadIndex();
794 }
795 }
796 public void flush() throws IOException {
797 flushDestination(buffer.getWriteIndex());
798 buffer.reset();
799 lastFlush = true;
800 }
801 }
802 public static class ShreddedBuffer {
803 ArrayList<Long> destinations = new ArrayList();
804 ArrayList<Integer> destinationTupleIdx = new ArrayList();
805 int destinationReadIdx = 0;
806
807 long[] sources;
808 int writeTupleIndex = 0;
809 int readTupleIndex = 0;
810 int batchSize;
811
812 public ShreddedBuffer(int batchSize) {
813 this.batchSize = batchSize;
814
815 sources = new long[batchSize];
816 }
817
818 public ShreddedBuffer() {
819 this(10000);
820 }
821
822 public void processDestination(long destination) {
823 destinations.add(destination);
824 destinationTupleIdx.add(writeTupleIndex);
825 }
826 public void processTuple(long source) {
827 assert destinations.size() > 0;
828 sources[writeTupleIndex] = source;
829 writeTupleIndex++;
830 }
831 public void resetData() {
832 destinations.clear();
833 destinationTupleIdx.clear();
834 writeTupleIndex = 0;
835 }
836
837 public void resetRead() {
838 readTupleIndex = 0;
839 destinationReadIdx = 0;
840 }
841
842 public void reset() {
843 resetData();
844 resetRead();
845 }
846 public boolean isFull() {
847 return writeTupleIndex >= batchSize;
848 }
849
850 public boolean isEmpty() {
851 return writeTupleIndex == 0;
852 }
853
854 public boolean isAtEnd() {
855 return readTupleIndex >= writeTupleIndex;
856 }
857 public void incrementDestination() {
858 destinationReadIdx++;
859 }
860
861 public void autoIncrementDestination() {
862 while (readTupleIndex >= getDestinationEndIndex() && readTupleIndex < writeTupleIndex)
863 destinationReadIdx++;
864 }
865 public void incrementTuple() {
866 readTupleIndex++;
867 }
868 public int getDestinationEndIndex() {
869 if ((destinationReadIdx+1) >= destinationTupleIdx.size())
870 return writeTupleIndex;
871 return destinationTupleIdx.get(destinationReadIdx+1);
872 }
873 public int getReadIndex() {
874 return readTupleIndex;
875 }
876
877 public int getWriteIndex() {
878 return writeTupleIndex;
879 }
880 public long getDestination() {
881 assert readTupleIndex < writeTupleIndex;
882 assert destinationReadIdx < destinations.size();
883
884 return destinations.get(destinationReadIdx);
885 }
886 public long getSource() {
887 assert readTupleIndex < writeTupleIndex;
888 return sources[readTupleIndex];
889 }
890 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
891 while (getReadIndex() < endIndex) {
892 output.processTuple(getSource());
893 incrementTuple();
894 }
895 }
896 public void copyUntilIndexDestination(int endIndex, ShreddedProcessor output) throws IOException {
897 while (getReadIndex() < endIndex) {
898 output.processDestination(getDestination());
899 assert getDestinationEndIndex() <= endIndex;
900 copyTuples(getDestinationEndIndex(), output);
901 incrementDestination();
902 }
903 }
904 public void copyUntilDestination(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
905 while (!isAtEnd()) {
906 if (other != null) {
907 assert !other.isAtEnd();
908 int c = + Utility.compare(getDestination(), other.getDestination());
909
910 if (c > 0) {
911 break;
912 }
913
914 output.processDestination(getDestination());
915
916 copyTuples(getDestinationEndIndex(), output);
917 } else {
918 output.processDestination(getDestination());
919 copyTuples(getDestinationEndIndex(), output);
920 }
921 incrementDestination();
922
923
924 }
925 }
926 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
927 copyUntilDestination(other, output);
928 }
929
930 }
931 public static class ShreddedCombiner implements ReaderSource<NumberedLink>, ShreddedSource {
932 public ShreddedProcessor processor;
933 Collection<ShreddedReader> readers;
934 boolean closeOnExit = false;
935 boolean uninitialized = true;
936 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
937
938 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
939 this.readers = readers;
940 this.closeOnExit = closeOnExit;
941 }
942
943 public void setProcessor(Step processor) throws IncompatibleProcessorException {
944 if (processor instanceof ShreddedProcessor) {
945 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
946 } else if (processor instanceof NumberedLink.Processor) {
947 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedLink.Processor) processor));
948 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
949 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedLink>) processor));
950 } else {
951 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
952 }
953 }
954
955 public Class<NumberedLink> getOutputClass() {
956 return NumberedLink.class;
957 }
958
959 public void initialize() throws IOException {
960 for (ShreddedReader reader : readers) {
961 reader.fill();
962
963 if (!reader.getBuffer().isAtEnd())
964 queue.add(reader);
965 }
966
967 uninitialized = false;
968 }
969
970 public void run() throws IOException {
971 initialize();
972
973 while (queue.size() > 0) {
974 ShreddedReader top = queue.poll();
975 ShreddedReader next = null;
976 ShreddedBuffer nextBuffer = null;
977
978 assert !top.getBuffer().isAtEnd();
979
980 if (queue.size() > 0) {
981 next = queue.peek();
982 nextBuffer = next.getBuffer();
983 assert !nextBuffer.isAtEnd();
984 }
985
986 top.getBuffer().copyUntil(nextBuffer, processor);
987 if (top.getBuffer().isAtEnd())
988 top.fill();
989
990 if (!top.getBuffer().isAtEnd())
991 queue.add(top);
992 }
993
994 if (closeOnExit)
995 processor.close();
996 }
997
998 public NumberedLink read() throws IOException {
999 if (uninitialized)
1000 initialize();
1001
1002 NumberedLink result = null;
1003
1004 while (queue.size() > 0) {
1005 ShreddedReader top = queue.poll();
1006 result = top.read();
1007
1008 if (result != null) {
1009 if (top.getBuffer().isAtEnd())
1010 top.fill();
1011
1012 queue.offer(top);
1013 break;
1014 }
1015 }
1016
1017 return result;
1018 }
1019 }
1020 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedLink>, ShreddedSource {
1021 public ShreddedProcessor processor;
1022 ShreddedBuffer buffer;
1023 NumberedLink last = new NumberedLink();
1024 long updateDestinationCount = -1;
1025 long tupleCount = 0;
1026 long bufferStartCount = 0;
1027 ArrayInput input;
1028
1029 public ShreddedReader(ArrayInput input) {
1030 this.input = input;
1031 this.buffer = new ShreddedBuffer();
1032 }
1033
1034 public ShreddedReader(ArrayInput input, int bufferSize) {
1035 this.input = input;
1036 this.buffer = new ShreddedBuffer(bufferSize);
1037 }
1038
1039 public final int compareTo(ShreddedReader other) {
1040 ShreddedBuffer otherBuffer = other.getBuffer();
1041
1042 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1043 return 0;
1044 } else if (buffer.isAtEnd()) {
1045 return -1;
1046 } else if (otherBuffer.isAtEnd()) {
1047 return 1;
1048 }
1049
1050 int result = 0;
1051 do {
1052 result = + Utility.compare(buffer.getDestination(), otherBuffer.getDestination());
1053 if(result != 0) break;
1054 } while (false);
1055
1056 return result;
1057 }
1058
1059 public final ShreddedBuffer getBuffer() {
1060 return buffer;
1061 }
1062
1063 public final NumberedLink read() throws IOException {
1064 if (buffer.isAtEnd()) {
1065 fill();
1066
1067 if (buffer.isAtEnd()) {
1068 return null;
1069 }
1070 }
1071
1072 assert !buffer.isAtEnd();
1073 NumberedLink result = new NumberedLink();
1074
1075 result.destination = buffer.getDestination();
1076 result.source = buffer.getSource();
1077
1078 buffer.incrementTuple();
1079 buffer.autoIncrementDestination();
1080
1081 return result;
1082 }
1083
1084 public final void fill() throws IOException {
1085 try {
1086 buffer.reset();
1087
1088 if (tupleCount != 0) {
1089
1090 if(updateDestinationCount - tupleCount > 0) {
1091 buffer.destinations.add(last.destination);
1092 buffer.destinationTupleIdx.add((int) (updateDestinationCount - tupleCount));
1093 }
1094 bufferStartCount = tupleCount;
1095 }
1096
1097 while (!buffer.isFull()) {
1098 updateDestination();
1099 buffer.processTuple(input.readLong());
1100 tupleCount++;
1101 }
1102 } catch(EOFException e) {}
1103 }
1104
1105 public final void updateDestination() throws IOException {
1106 if (updateDestinationCount > tupleCount)
1107 return;
1108
1109 last.destination = input.readLong();
1110 updateDestinationCount = tupleCount + input.readInt();
1111
1112 buffer.processDestination(last.destination);
1113 }
1114
1115 public void run() throws IOException {
1116 while (true) {
1117 fill();
1118
1119 if (buffer.isAtEnd())
1120 break;
1121
1122 buffer.copyUntil(null, processor);
1123 }
1124 processor.close();
1125 }
1126
1127 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1128 if (processor instanceof ShreddedProcessor) {
1129 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1130 } else if (processor instanceof NumberedLink.Processor) {
1131 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedLink.Processor) processor));
1132 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1133 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedLink>) processor));
1134 } else {
1135 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1136 }
1137 }
1138
1139 public Class<NumberedLink> getOutputClass() {
1140 return NumberedLink.class;
1141 }
1142 }
1143
1144 public static class DuplicateEliminator implements ShreddedProcessor {
1145 public ShreddedProcessor processor;
1146 NumberedLink last = new NumberedLink();
1147 boolean destinationProcess = true;
1148
1149 public DuplicateEliminator() {}
1150 public DuplicateEliminator(ShreddedProcessor processor) {
1151 this.processor = processor;
1152 }
1153
1154 public void setShreddedProcessor(ShreddedProcessor processor) {
1155 this.processor = processor;
1156 }
1157
1158 public void processDestination(long destination) throws IOException {
1159 if (destinationProcess || Utility.compare(destination, last.destination) != 0) {
1160 last.destination = destination;
1161 processor.processDestination(destination);
1162 destinationProcess = false;
1163 }
1164 }
1165
1166 public void resetDestination() {
1167 destinationProcess = true;
1168 }
1169
1170 public void processTuple(long source) throws IOException {
1171 processor.processTuple(source);
1172 }
1173
1174 public void close() throws IOException {
1175 processor.close();
1176 }
1177 }
1178 public static class TupleUnshredder implements ShreddedProcessor {
1179 NumberedLink last = new NumberedLink();
1180 public org.galagosearch.tupleflow.Processor<NumberedLink> processor;
1181
1182 public TupleUnshredder(NumberedLink.Processor processor) {
1183 this.processor = processor;
1184 }
1185
1186 public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedLink> processor) {
1187 this.processor = processor;
1188 }
1189
1190 public NumberedLink clone(NumberedLink object) {
1191 NumberedLink result = new NumberedLink();
1192 if (object == null) return result;
1193 result.source = object.source;
1194 result.destination = object.destination;
1195 return result;
1196 }
1197
1198 public void processDestination(long destination) throws IOException {
1199 last.destination = destination;
1200 }
1201
1202
1203 public void processTuple(long source) throws IOException {
1204 last.source = source;
1205 processor.process(clone(last));
1206 }
1207
1208 public void close() throws IOException {
1209 processor.close();
1210 }
1211 }
1212 public static class TupleShredder implements Processor {
1213 NumberedLink last = new NumberedLink();
1214 public ShreddedProcessor processor;
1215
1216 public TupleShredder(ShreddedProcessor processor) {
1217 this.processor = processor;
1218 }
1219
1220 public NumberedLink clone(NumberedLink object) {
1221 NumberedLink result = new NumberedLink();
1222 if (object == null) return result;
1223 result.source = object.source;
1224 result.destination = object.destination;
1225 return result;
1226 }
1227
1228 public void process(NumberedLink object) throws IOException {
1229 boolean processAll = false;
1230 if(last == null || Utility.compare(last.destination, object.destination) != 0 || processAll) { processor.processDestination(object.destination); processAll = true; }
1231 processor.processTuple(object.source);
1232 }
1233
1234 public Class<NumberedLink> getInputClass() {
1235 return NumberedLink.class;
1236 }
1237
1238 public void close() throws IOException {
1239 processor.close();
1240 }
1241 }
1242 }
1243 }