1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class NumberedValuedExtent implements Type<NumberedValuedExtent> {
25 public byte[] extentName;
26 public long number;
27 public int begin;
28 public int end;
29 public long value;
30
31 public NumberedValuedExtent() {}
32 public NumberedValuedExtent(byte[] extentName, long number, int begin, int end, long value) {
33 this.extentName = extentName;
34 this.number = number;
35 this.begin = begin;
36 this.end = end;
37 this.value = value;
38 }
39
40 public String toString() {
41 try {
42 return String.format("%s,%d,%d,%d,%d",
43 new String(extentName, "UTF-8"), number, begin, end, value);
44 } catch(UnsupportedEncodingException e) {
45 throw new RuntimeException("Couldn't convert string to UTF-8.");
46 }
47 }
48
49 public Order<NumberedValuedExtent> getOrder(String... spec) {
50 if (Arrays.equals(spec, new String[] { "+extentName", "+number", "+begin" })) {
51 return new ExtentNameNumberBeginOrder();
52 }
53 return null;
54 }
55
56 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<NumberedValuedExtent> {
57 public void process(NumberedValuedExtent object) throws IOException;
58 public void close() throws IOException;
59 }
60 public interface Source extends Step {
61 }
62 public static class ExtentNameNumberBeginOrder implements Order<NumberedValuedExtent> {
63 public int hash(NumberedValuedExtent object) {
64 int h = 0;
65 h += Utility.hash(object.extentName);
66 h += Utility.hash(object.number);
67 h += Utility.hash(object.begin);
68 return h;
69 }
70 public Comparator<NumberedValuedExtent> greaterThan() {
71 return new Comparator<NumberedValuedExtent>() {
72 public int compare(NumberedValuedExtent one, NumberedValuedExtent two) {
73 int result = 0;
74 do {
75 result = + Utility.compare(one.extentName, two.extentName);
76 if(result != 0) break;
77 result = + Utility.compare(one.number, two.number);
78 if(result != 0) break;
79 result = + Utility.compare(one.begin, two.begin);
80 if(result != 0) break;
81 } while (false);
82 return -result;
83 }
84 };
85 }
86 public Comparator<NumberedValuedExtent> lessThan() {
87 return new Comparator<NumberedValuedExtent>() {
88 public int compare(NumberedValuedExtent one, NumberedValuedExtent two) {
89 int result = 0;
90 do {
91 result = + Utility.compare(one.extentName, two.extentName);
92 if(result != 0) break;
93 result = + Utility.compare(one.number, two.number);
94 if(result != 0) break;
95 result = + Utility.compare(one.begin, two.begin);
96 if(result != 0) break;
97 } while (false);
98 return result;
99 }
100 };
101 }
102 public TypeReader<NumberedValuedExtent> orderedReader(ArrayInput _input) {
103 return new ShreddedReader(_input);
104 }
105
106 public TypeReader<NumberedValuedExtent> orderedReader(ArrayInput _input, int bufferSize) {
107 return new ShreddedReader(_input, bufferSize);
108 }
109 public OrderedWriter<NumberedValuedExtent> orderedWriter(ArrayOutput _output) {
110 ShreddedWriter w = new ShreddedWriter(_output);
111 return new OrderedWriterClass(w);
112 }
113 public static class OrderedWriterClass extends OrderedWriter< NumberedValuedExtent > {
114 NumberedValuedExtent last = null;
115 ShreddedWriter shreddedWriter = null;
116
117 public OrderedWriterClass(ShreddedWriter s) {
118 this.shreddedWriter = s;
119 }
120
121 public void process(NumberedValuedExtent object) throws IOException {
122 boolean processAll = false;
123 if (processAll || last == null || 0 != Utility.compare(object.extentName, last.extentName)) { processAll = true; shreddedWriter.processExtentName(object.extentName); }
124 if (processAll || last == null || 0 != Utility.compare(object.number, last.number)) { processAll = true; shreddedWriter.processNumber(object.number); }
125 if (processAll || last == null || 0 != Utility.compare(object.begin, last.begin)) { processAll = true; shreddedWriter.processBegin(object.begin); }
126 shreddedWriter.processTuple(object.end, object.value);
127 last = object;
128 }
129
130 public void close() throws IOException {
131 shreddedWriter.close();
132 }
133
134 public Class<NumberedValuedExtent> getInputClass() {
135 return NumberedValuedExtent.class;
136 }
137 }
138 public ReaderSource<NumberedValuedExtent> orderedCombiner(Collection<TypeReader<NumberedValuedExtent>> readers, boolean closeOnExit) {
139 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
140
141 for (TypeReader<NumberedValuedExtent> reader : readers) {
142 shreddedReaders.add((ShreddedReader)reader);
143 }
144
145 return new ShreddedCombiner(shreddedReaders, closeOnExit);
146 }
147 public NumberedValuedExtent clone(NumberedValuedExtent object) {
148 NumberedValuedExtent result = new NumberedValuedExtent();
149 if (object == null) return result;
150 result.extentName = object.extentName;
151 result.number = object.number;
152 result.begin = object.begin;
153 result.end = object.end;
154 result.value = object.value;
155 return result;
156 }
157 public Class<NumberedValuedExtent> getOrderedClass() {
158 return NumberedValuedExtent.class;
159 }
160 public String[] getOrderSpec() {
161 return new String[] {"+extentName", "+number", "+begin"};
162 }
163
164 public static String getSpecString() {
165 return "+extentName +number +begin";
166 }
167
168 public interface ShreddedProcessor extends Step {
169 public void processExtentName(byte[] extentName) throws IOException;
170 public void processNumber(long number) throws IOException;
171 public void processBegin(int begin) throws IOException;
172 public void processTuple(int end, long value) throws IOException;
173 public void close() throws IOException;
174 }
175 public interface ShreddedSource extends Step {
176 }
177
178 public static class ShreddedWriter implements ShreddedProcessor {
179 ArrayOutput output;
180 ShreddedBuffer buffer = new ShreddedBuffer();
181 byte[] lastExtentName;
182 long lastNumber;
183 int lastBegin;
184 boolean lastFlush = false;
185
186 public ShreddedWriter(ArrayOutput output) {
187 this.output = output;
188 }
189
190 public void close() throws IOException {
191 flush();
192 }
193
194 public void processExtentName(byte[] extentName) {
195 lastExtentName = extentName;
196 buffer.processExtentName(extentName);
197 }
198 public void processNumber(long number) {
199 lastNumber = number;
200 buffer.processNumber(number);
201 }
202 public void processBegin(int begin) {
203 lastBegin = begin;
204 buffer.processBegin(begin);
205 }
206 public final void processTuple(int end, long value) throws IOException {
207 if (lastFlush) {
208 if(buffer.extentNames.size() == 0) buffer.processExtentName(lastExtentName);
209 if(buffer.numbers.size() == 0) buffer.processNumber(lastNumber);
210 if(buffer.begins.size() == 0) buffer.processBegin(lastBegin);
211 lastFlush = false;
212 }
213 buffer.processTuple(end, value);
214 if (buffer.isFull())
215 flush();
216 }
217 public final void flushTuples(int pauseIndex) throws IOException {
218
219 while (buffer.getReadIndex() < pauseIndex) {
220
221 output.writeInt(buffer.getEnd());
222 output.writeLong(buffer.getValue());
223 buffer.incrementTuple();
224 }
225 }
226 public final void flushExtentName(int pauseIndex) throws IOException {
227 while (buffer.getReadIndex() < pauseIndex) {
228 int nextPause = buffer.getExtentNameEndIndex();
229 int count = nextPause - buffer.getReadIndex();
230
231 output.writeBytes(buffer.getExtentName());
232 output.writeInt(count);
233 buffer.incrementExtentName();
234
235 flushNumber(nextPause);
236 assert nextPause == buffer.getReadIndex();
237 }
238 }
239 public final void flushNumber(int pauseIndex) throws IOException {
240 while (buffer.getReadIndex() < pauseIndex) {
241 int nextPause = buffer.getNumberEndIndex();
242 int count = nextPause - buffer.getReadIndex();
243
244 output.writeLong(buffer.getNumber());
245 output.writeInt(count);
246 buffer.incrementNumber();
247
248 flushBegin(nextPause);
249 assert nextPause == buffer.getReadIndex();
250 }
251 }
252 public final void flushBegin(int pauseIndex) throws IOException {
253 while (buffer.getReadIndex() < pauseIndex) {
254 int nextPause = buffer.getBeginEndIndex();
255 int count = nextPause - buffer.getReadIndex();
256
257 output.writeInt(buffer.getBegin());
258 output.writeInt(count);
259 buffer.incrementBegin();
260
261 flushTuples(nextPause);
262 assert nextPause == buffer.getReadIndex();
263 }
264 }
265 public void flush() throws IOException {
266 flushExtentName(buffer.getWriteIndex());
267 buffer.reset();
268 lastFlush = true;
269 }
270 }
271 public static class ShreddedBuffer {
272 ArrayList<byte[]> extentNames = new ArrayList();
273 ArrayList<Long> numbers = new ArrayList();
274 ArrayList<Integer> begins = new ArrayList();
275 ArrayList<Integer> extentNameTupleIdx = new ArrayList();
276 ArrayList<Integer> numberTupleIdx = new ArrayList();
277 ArrayList<Integer> beginTupleIdx = new ArrayList();
278 int extentNameReadIdx = 0;
279 int numberReadIdx = 0;
280 int beginReadIdx = 0;
281
282 int[] ends;
283 long[] values;
284 int writeTupleIndex = 0;
285 int readTupleIndex = 0;
286 int batchSize;
287
288 public ShreddedBuffer(int batchSize) {
289 this.batchSize = batchSize;
290
291 ends = new int[batchSize];
292 values = new long[batchSize];
293 }
294
295 public ShreddedBuffer() {
296 this(10000);
297 }
298
299 public void processExtentName(byte[] extentName) {
300 extentNames.add(extentName);
301 extentNameTupleIdx.add(writeTupleIndex);
302 }
303 public void processNumber(long number) {
304 numbers.add(number);
305 numberTupleIdx.add(writeTupleIndex);
306 }
307 public void processBegin(int begin) {
308 begins.add(begin);
309 beginTupleIdx.add(writeTupleIndex);
310 }
311 public void processTuple(int end, long value) {
312 assert extentNames.size() > 0;
313 assert numbers.size() > 0;
314 assert begins.size() > 0;
315 ends[writeTupleIndex] = end;
316 values[writeTupleIndex] = value;
317 writeTupleIndex++;
318 }
319 public void resetData() {
320 extentNames.clear();
321 numbers.clear();
322 begins.clear();
323 extentNameTupleIdx.clear();
324 numberTupleIdx.clear();
325 beginTupleIdx.clear();
326 writeTupleIndex = 0;
327 }
328
329 public void resetRead() {
330 readTupleIndex = 0;
331 extentNameReadIdx = 0;
332 numberReadIdx = 0;
333 beginReadIdx = 0;
334 }
335
336 public void reset() {
337 resetData();
338 resetRead();
339 }
340 public boolean isFull() {
341 return writeTupleIndex >= batchSize;
342 }
343
344 public boolean isEmpty() {
345 return writeTupleIndex == 0;
346 }
347
348 public boolean isAtEnd() {
349 return readTupleIndex >= writeTupleIndex;
350 }
351 public void incrementExtentName() {
352 extentNameReadIdx++;
353 }
354
355 public void autoIncrementExtentName() {
356 while (readTupleIndex >= getExtentNameEndIndex() && readTupleIndex < writeTupleIndex)
357 extentNameReadIdx++;
358 }
359 public void incrementNumber() {
360 numberReadIdx++;
361 }
362
363 public void autoIncrementNumber() {
364 while (readTupleIndex >= getNumberEndIndex() && readTupleIndex < writeTupleIndex)
365 numberReadIdx++;
366 }
367 public void incrementBegin() {
368 beginReadIdx++;
369 }
370
371 public void autoIncrementBegin() {
372 while (readTupleIndex >= getBeginEndIndex() && readTupleIndex < writeTupleIndex)
373 beginReadIdx++;
374 }
375 public void incrementTuple() {
376 readTupleIndex++;
377 }
378 public int getExtentNameEndIndex() {
379 if ((extentNameReadIdx+1) >= extentNameTupleIdx.size())
380 return writeTupleIndex;
381 return extentNameTupleIdx.get(extentNameReadIdx+1);
382 }
383
384 public int getNumberEndIndex() {
385 if ((numberReadIdx+1) >= numberTupleIdx.size())
386 return writeTupleIndex;
387 return numberTupleIdx.get(numberReadIdx+1);
388 }
389
390 public int getBeginEndIndex() {
391 if ((beginReadIdx+1) >= beginTupleIdx.size())
392 return writeTupleIndex;
393 return beginTupleIdx.get(beginReadIdx+1);
394 }
395 public int getReadIndex() {
396 return readTupleIndex;
397 }
398
399 public int getWriteIndex() {
400 return writeTupleIndex;
401 }
402 public byte[] getExtentName() {
403 assert readTupleIndex < writeTupleIndex;
404 assert extentNameReadIdx < extentNames.size();
405
406 return extentNames.get(extentNameReadIdx);
407 }
408 public long getNumber() {
409 assert readTupleIndex < writeTupleIndex;
410 assert numberReadIdx < numbers.size();
411
412 return numbers.get(numberReadIdx);
413 }
414 public int getBegin() {
415 assert readTupleIndex < writeTupleIndex;
416 assert beginReadIdx < begins.size();
417
418 return begins.get(beginReadIdx);
419 }
420 public int getEnd() {
421 assert readTupleIndex < writeTupleIndex;
422 return ends[readTupleIndex];
423 }
424 public long getValue() {
425 assert readTupleIndex < writeTupleIndex;
426 return values[readTupleIndex];
427 }
428 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
429 while (getReadIndex() < endIndex) {
430 output.processTuple(getEnd(), getValue());
431 incrementTuple();
432 }
433 }
434 public void copyUntilIndexExtentName(int endIndex, ShreddedProcessor output) throws IOException {
435 while (getReadIndex() < endIndex) {
436 output.processExtentName(getExtentName());
437 assert getExtentNameEndIndex() <= endIndex;
438 copyUntilIndexNumber(getExtentNameEndIndex(), output);
439 incrementExtentName();
440 }
441 }
442 public void copyUntilIndexNumber(int endIndex, ShreddedProcessor output) throws IOException {
443 while (getReadIndex() < endIndex) {
444 output.processNumber(getNumber());
445 assert getNumberEndIndex() <= endIndex;
446 copyUntilIndexBegin(getNumberEndIndex(), output);
447 incrementNumber();
448 }
449 }
450 public void copyUntilIndexBegin(int endIndex, ShreddedProcessor output) throws IOException {
451 while (getReadIndex() < endIndex) {
452 output.processBegin(getBegin());
453 assert getBeginEndIndex() <= endIndex;
454 copyTuples(getBeginEndIndex(), output);
455 incrementBegin();
456 }
457 }
458 public void copyUntilExtentName(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
459 while (!isAtEnd()) {
460 if (other != null) {
461 assert !other.isAtEnd();
462 int c = + Utility.compare(getExtentName(), other.getExtentName());
463
464 if (c > 0) {
465 break;
466 }
467
468 output.processExtentName(getExtentName());
469
470 if (c < 0) {
471 copyUntilIndexNumber(getExtentNameEndIndex(), output);
472 } else if (c == 0) {
473 copyUntilNumber(other, output);
474 autoIncrementExtentName();
475 break;
476 }
477 } else {
478 output.processExtentName(getExtentName());
479 copyUntilIndexNumber(getExtentNameEndIndex(), output);
480 }
481 incrementExtentName();
482
483
484 }
485 }
486 public void copyUntilNumber(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
487 while (!isAtEnd()) {
488 if (other != null) {
489 assert !other.isAtEnd();
490 int c = + Utility.compare(getNumber(), other.getNumber());
491
492 if (c > 0) {
493 break;
494 }
495
496 output.processNumber(getNumber());
497
498 if (c < 0) {
499 copyUntilIndexBegin(getNumberEndIndex(), output);
500 } else if (c == 0) {
501 copyUntilBegin(other, output);
502 autoIncrementNumber();
503 break;
504 }
505 } else {
506 output.processNumber(getNumber());
507 copyUntilIndexBegin(getNumberEndIndex(), output);
508 }
509 incrementNumber();
510
511 if (getExtentNameEndIndex() <= readTupleIndex)
512 break;
513 }
514 }
515 public void copyUntilBegin(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
516 while (!isAtEnd()) {
517 if (other != null) {
518 assert !other.isAtEnd();
519 int c = + Utility.compare(getBegin(), other.getBegin());
520
521 if (c > 0) {
522 break;
523 }
524
525 output.processBegin(getBegin());
526
527 copyTuples(getBeginEndIndex(), output);
528 } else {
529 output.processBegin(getBegin());
530 copyTuples(getBeginEndIndex(), output);
531 }
532 incrementBegin();
533
534 if (getNumberEndIndex() <= readTupleIndex)
535 break;
536 }
537 }
538 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
539 copyUntilExtentName(other, output);
540 }
541
542 }
543 public static class ShreddedCombiner implements ReaderSource<NumberedValuedExtent>, ShreddedSource {
544 public ShreddedProcessor processor;
545 Collection<ShreddedReader> readers;
546 boolean closeOnExit = false;
547 boolean uninitialized = true;
548 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
549
550 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
551 this.readers = readers;
552 this.closeOnExit = closeOnExit;
553 }
554
555 public void setProcessor(Step processor) throws IncompatibleProcessorException {
556 if (processor instanceof ShreddedProcessor) {
557 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
558 } else if (processor instanceof NumberedValuedExtent.Processor) {
559 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedValuedExtent.Processor) processor));
560 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
561 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedValuedExtent>) processor));
562 } else {
563 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
564 }
565 }
566
567 public Class<NumberedValuedExtent> getOutputClass() {
568 return NumberedValuedExtent.class;
569 }
570
571 public void initialize() throws IOException {
572 for (ShreddedReader reader : readers) {
573 reader.fill();
574
575 if (!reader.getBuffer().isAtEnd())
576 queue.add(reader);
577 }
578
579 uninitialized = false;
580 }
581
582 public void run() throws IOException {
583 initialize();
584
585 while (queue.size() > 0) {
586 ShreddedReader top = queue.poll();
587 ShreddedReader next = null;
588 ShreddedBuffer nextBuffer = null;
589
590 assert !top.getBuffer().isAtEnd();
591
592 if (queue.size() > 0) {
593 next = queue.peek();
594 nextBuffer = next.getBuffer();
595 assert !nextBuffer.isAtEnd();
596 }
597
598 top.getBuffer().copyUntil(nextBuffer, processor);
599 if (top.getBuffer().isAtEnd())
600 top.fill();
601
602 if (!top.getBuffer().isAtEnd())
603 queue.add(top);
604 }
605
606 if (closeOnExit)
607 processor.close();
608 }
609
610 public NumberedValuedExtent read() throws IOException {
611 if (uninitialized)
612 initialize();
613
614 NumberedValuedExtent result = null;
615
616 while (queue.size() > 0) {
617 ShreddedReader top = queue.poll();
618 result = top.read();
619
620 if (result != null) {
621 if (top.getBuffer().isAtEnd())
622 top.fill();
623
624 queue.offer(top);
625 break;
626 }
627 }
628
629 return result;
630 }
631 }
632 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedValuedExtent>, ShreddedSource {
633 public ShreddedProcessor processor;
634 ShreddedBuffer buffer;
635 NumberedValuedExtent last = new NumberedValuedExtent();
636 long updateExtentNameCount = -1;
637 long updateNumberCount = -1;
638 long updateBeginCount = -1;
639 long tupleCount = 0;
640 long bufferStartCount = 0;
641 ArrayInput input;
642
643 public ShreddedReader(ArrayInput input) {
644 this.input = input;
645 this.buffer = new ShreddedBuffer();
646 }
647
648 public ShreddedReader(ArrayInput input, int bufferSize) {
649 this.input = input;
650 this.buffer = new ShreddedBuffer(bufferSize);
651 }
652
653 public final int compareTo(ShreddedReader other) {
654 ShreddedBuffer otherBuffer = other.getBuffer();
655
656 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
657 return 0;
658 } else if (buffer.isAtEnd()) {
659 return -1;
660 } else if (otherBuffer.isAtEnd()) {
661 return 1;
662 }
663
664 int result = 0;
665 do {
666 result = + Utility.compare(buffer.getExtentName(), otherBuffer.getExtentName());
667 if(result != 0) break;
668 result = + Utility.compare(buffer.getNumber(), otherBuffer.getNumber());
669 if(result != 0) break;
670 result = + Utility.compare(buffer.getBegin(), otherBuffer.getBegin());
671 if(result != 0) break;
672 } while (false);
673
674 return result;
675 }
676
677 public final ShreddedBuffer getBuffer() {
678 return buffer;
679 }
680
681 public final NumberedValuedExtent read() throws IOException {
682 if (buffer.isAtEnd()) {
683 fill();
684
685 if (buffer.isAtEnd()) {
686 return null;
687 }
688 }
689
690 assert !buffer.isAtEnd();
691 NumberedValuedExtent result = new NumberedValuedExtent();
692
693 result.extentName = buffer.getExtentName();
694 result.number = buffer.getNumber();
695 result.begin = buffer.getBegin();
696 result.end = buffer.getEnd();
697 result.value = buffer.getValue();
698
699 buffer.incrementTuple();
700 buffer.autoIncrementExtentName();
701 buffer.autoIncrementNumber();
702 buffer.autoIncrementBegin();
703
704 return result;
705 }
706
707 public final void fill() throws IOException {
708 try {
709 buffer.reset();
710
711 if (tupleCount != 0) {
712
713 if(updateExtentNameCount - tupleCount > 0) {
714 buffer.extentNames.add(last.extentName);
715 buffer.extentNameTupleIdx.add((int) (updateExtentNameCount - tupleCount));
716 }
717 if(updateNumberCount - tupleCount > 0) {
718 buffer.numbers.add(last.number);
719 buffer.numberTupleIdx.add((int) (updateNumberCount - tupleCount));
720 }
721 if(updateBeginCount - tupleCount > 0) {
722 buffer.begins.add(last.begin);
723 buffer.beginTupleIdx.add((int) (updateBeginCount - tupleCount));
724 }
725 bufferStartCount = tupleCount;
726 }
727
728 while (!buffer.isFull()) {
729 updateBegin();
730 buffer.processTuple(input.readInt(), input.readLong());
731 tupleCount++;
732 }
733 } catch(EOFException e) {}
734 }
735
736 public final void updateExtentName() throws IOException {
737 if (updateExtentNameCount > tupleCount)
738 return;
739
740 last.extentName = input.readBytes();
741 updateExtentNameCount = tupleCount + input.readInt();
742
743 buffer.processExtentName(last.extentName);
744 }
745 public final void updateNumber() throws IOException {
746 if (updateNumberCount > tupleCount)
747 return;
748
749 updateExtentName();
750 last.number = input.readLong();
751 updateNumberCount = tupleCount + input.readInt();
752
753 buffer.processNumber(last.number);
754 }
755 public final void updateBegin() throws IOException {
756 if (updateBeginCount > tupleCount)
757 return;
758
759 updateNumber();
760 last.begin = input.readInt();
761 updateBeginCount = tupleCount + input.readInt();
762
763 buffer.processBegin(last.begin);
764 }
765
766 public void run() throws IOException {
767 while (true) {
768 fill();
769
770 if (buffer.isAtEnd())
771 break;
772
773 buffer.copyUntil(null, processor);
774 }
775 processor.close();
776 }
777
778 public void setProcessor(Step processor) throws IncompatibleProcessorException {
779 if (processor instanceof ShreddedProcessor) {
780 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
781 } else if (processor instanceof NumberedValuedExtent.Processor) {
782 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedValuedExtent.Processor) processor));
783 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
784 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedValuedExtent>) processor));
785 } else {
786 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
787 }
788 }
789
790 public Class<NumberedValuedExtent> getOutputClass() {
791 return NumberedValuedExtent.class;
792 }
793 }
794
795 public static class DuplicateEliminator implements ShreddedProcessor {
796 public ShreddedProcessor processor;
797 NumberedValuedExtent last = new NumberedValuedExtent();
798 boolean extentNameProcess = true;
799 boolean numberProcess = true;
800 boolean beginProcess = true;
801
802 public DuplicateEliminator() {}
803 public DuplicateEliminator(ShreddedProcessor processor) {
804 this.processor = processor;
805 }
806
807 public void setShreddedProcessor(ShreddedProcessor processor) {
808 this.processor = processor;
809 }
810
811 public void processExtentName(byte[] extentName) throws IOException {
812 if (extentNameProcess || Utility.compare(extentName, last.extentName) != 0) {
813 last.extentName = extentName;
814 processor.processExtentName(extentName);
815 resetNumber();
816 extentNameProcess = false;
817 }
818 }
819 public void processNumber(long number) throws IOException {
820 if (numberProcess || Utility.compare(number, last.number) != 0) {
821 last.number = number;
822 processor.processNumber(number);
823 resetBegin();
824 numberProcess = false;
825 }
826 }
827 public void processBegin(int begin) throws IOException {
828 if (beginProcess || Utility.compare(begin, last.begin) != 0) {
829 last.begin = begin;
830 processor.processBegin(begin);
831 beginProcess = false;
832 }
833 }
834
835 public void resetExtentName() {
836 extentNameProcess = true;
837 resetNumber();
838 }
839 public void resetNumber() {
840 numberProcess = true;
841 resetBegin();
842 }
843 public void resetBegin() {
844 beginProcess = true;
845 }
846
847 public void processTuple(int end, long value) throws IOException {
848 processor.processTuple(end, value);
849 }
850
851 public void close() throws IOException {
852 processor.close();
853 }
854 }
855 public static class TupleUnshredder implements ShreddedProcessor {
856 NumberedValuedExtent last = new NumberedValuedExtent();
857 public org.galagosearch.tupleflow.Processor<NumberedValuedExtent> processor;
858
859 public TupleUnshredder(NumberedValuedExtent.Processor processor) {
860 this.processor = processor;
861 }
862
863 public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedValuedExtent> processor) {
864 this.processor = processor;
865 }
866
867 public NumberedValuedExtent clone(NumberedValuedExtent object) {
868 NumberedValuedExtent result = new NumberedValuedExtent();
869 if (object == null) return result;
870 result.extentName = object.extentName;
871 result.number = object.number;
872 result.begin = object.begin;
873 result.end = object.end;
874 result.value = object.value;
875 return result;
876 }
877
878 public void processExtentName(byte[] extentName) throws IOException {
879 last.extentName = extentName;
880 }
881
882 public void processNumber(long number) throws IOException {
883 last.number = number;
884 }
885
886 public void processBegin(int begin) throws IOException {
887 last.begin = begin;
888 }
889
890
891 public void processTuple(int end, long value) throws IOException {
892 last.end = end;
893 last.value = value;
894 processor.process(clone(last));
895 }
896
897 public void close() throws IOException {
898 processor.close();
899 }
900 }
901 public static class TupleShredder implements Processor {
902 NumberedValuedExtent last = new NumberedValuedExtent();
903 public ShreddedProcessor processor;
904
905 public TupleShredder(ShreddedProcessor processor) {
906 this.processor = processor;
907 }
908
909 public NumberedValuedExtent clone(NumberedValuedExtent object) {
910 NumberedValuedExtent result = new NumberedValuedExtent();
911 if (object == null) return result;
912 result.extentName = object.extentName;
913 result.number = object.number;
914 result.begin = object.begin;
915 result.end = object.end;
916 result.value = object.value;
917 return result;
918 }
919
920 public void process(NumberedValuedExtent object) throws IOException {
921 boolean processAll = false;
922 if(last == null || Utility.compare(last.extentName, object.extentName) != 0 || processAll) { processor.processExtentName(object.extentName); processAll = true; }
923 if(last == null || Utility.compare(last.number, object.number) != 0 || processAll) { processor.processNumber(object.number); processAll = true; }
924 if(last == null || Utility.compare(last.begin, object.begin) != 0 || processAll) { processor.processBegin(object.begin); processAll = true; }
925 processor.processTuple(object.end, object.value);
926 }
927
928 public Class<NumberedValuedExtent> getInputClass() {
929 return NumberedValuedExtent.class;
930 }
931
932 public void close() throws IOException {
933 processor.close();
934 }
935 }
936 }
937 }