1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class NumberedExtent implements Type<NumberedExtent> {
25 public byte[] extentName;
26 public long number;
27 public int begin;
28 public int end;
29
30 public NumberedExtent() {}
31 public NumberedExtent(byte[] extentName, long number, int begin, int end) {
32 this.extentName = extentName;
33 this.number = number;
34 this.begin = begin;
35 this.end = end;
36 }
37
38 public String toString() {
39 try {
40 return String.format("%s,%d,%d,%d",
41 new String(extentName, "UTF-8"), number, begin, end);
42 } catch(UnsupportedEncodingException e) {
43 throw new RuntimeException("Couldn't convert string to UTF-8.");
44 }
45 }
46
47 public Order<NumberedExtent> getOrder(String... spec) {
48 if (Arrays.equals(spec, new String[] { "+extentName", "+number", "+begin" })) {
49 return new ExtentNameNumberBeginOrder();
50 }
51 return null;
52 }
53
54 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<NumberedExtent> {
55 public void process(NumberedExtent object) throws IOException;
56 public void close() throws IOException;
57 }
58 public interface Source extends Step {
59 }
60 public static class ExtentNameNumberBeginOrder implements Order<NumberedExtent> {
61 public int hash(NumberedExtent object) {
62 int h = 0;
63 h += Utility.hash(object.extentName);
64 h += Utility.hash(object.number);
65 h += Utility.hash(object.begin);
66 return h;
67 }
68 public Comparator<NumberedExtent> greaterThan() {
69 return new Comparator<NumberedExtent>() {
70 public int compare(NumberedExtent one, NumberedExtent two) {
71 int result = 0;
72 do {
73 result = + Utility.compare(one.extentName, two.extentName);
74 if(result != 0) break;
75 result = + Utility.compare(one.number, two.number);
76 if(result != 0) break;
77 result = + Utility.compare(one.begin, two.begin);
78 if(result != 0) break;
79 } while (false);
80 return -result;
81 }
82 };
83 }
84 public Comparator<NumberedExtent> lessThan() {
85 return new Comparator<NumberedExtent>() {
86 public int compare(NumberedExtent one, NumberedExtent two) {
87 int result = 0;
88 do {
89 result = + Utility.compare(one.extentName, two.extentName);
90 if(result != 0) break;
91 result = + Utility.compare(one.number, two.number);
92 if(result != 0) break;
93 result = + Utility.compare(one.begin, two.begin);
94 if(result != 0) break;
95 } while (false);
96 return result;
97 }
98 };
99 }
100 public TypeReader<NumberedExtent> orderedReader(ArrayInput _input) {
101 return new ShreddedReader(_input);
102 }
103
104 public TypeReader<NumberedExtent> orderedReader(ArrayInput _input, int bufferSize) {
105 return new ShreddedReader(_input, bufferSize);
106 }
107 public OrderedWriter<NumberedExtent> orderedWriter(ArrayOutput _output) {
108 ShreddedWriter w = new ShreddedWriter(_output);
109 return new OrderedWriterClass(w);
110 }
111 public static class OrderedWriterClass extends OrderedWriter< NumberedExtent > {
112 NumberedExtent last = null;
113 ShreddedWriter shreddedWriter = null;
114
115 public OrderedWriterClass(ShreddedWriter s) {
116 this.shreddedWriter = s;
117 }
118
119 public void process(NumberedExtent object) throws IOException {
120 boolean processAll = false;
121 if (processAll || last == null || 0 != Utility.compare(object.extentName, last.extentName)) { processAll = true; shreddedWriter.processExtentName(object.extentName); }
122 if (processAll || last == null || 0 != Utility.compare(object.number, last.number)) { processAll = true; shreddedWriter.processNumber(object.number); }
123 if (processAll || last == null || 0 != Utility.compare(object.begin, last.begin)) { processAll = true; shreddedWriter.processBegin(object.begin); }
124 shreddedWriter.processTuple(object.end);
125 last = object;
126 }
127
128 public void close() throws IOException {
129 shreddedWriter.close();
130 }
131
132 public Class<NumberedExtent> getInputClass() {
133 return NumberedExtent.class;
134 }
135 }
136 public ReaderSource<NumberedExtent> orderedCombiner(Collection<TypeReader<NumberedExtent>> readers, boolean closeOnExit) {
137 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
138
139 for (TypeReader<NumberedExtent> reader : readers) {
140 shreddedReaders.add((ShreddedReader)reader);
141 }
142
143 return new ShreddedCombiner(shreddedReaders, closeOnExit);
144 }
145 public NumberedExtent clone(NumberedExtent object) {
146 NumberedExtent result = new NumberedExtent();
147 if (object == null) return result;
148 result.extentName = object.extentName;
149 result.number = object.number;
150 result.begin = object.begin;
151 result.end = object.end;
152 return result;
153 }
154 public Class<NumberedExtent> getOrderedClass() {
155 return NumberedExtent.class;
156 }
157 public String[] getOrderSpec() {
158 return new String[] {"+extentName", "+number", "+begin"};
159 }
160
161 public static String getSpecString() {
162 return "+extentName +number +begin";
163 }
164
165 public interface ShreddedProcessor extends Step {
166 public void processExtentName(byte[] extentName) throws IOException;
167 public void processNumber(long number) throws IOException;
168 public void processBegin(int begin) throws IOException;
169 public void processTuple(int end) throws IOException;
170 public void close() throws IOException;
171 }
172 public interface ShreddedSource extends Step {
173 }
174
175 public static class ShreddedWriter implements ShreddedProcessor {
176 ArrayOutput output;
177 ShreddedBuffer buffer = new ShreddedBuffer();
178 byte[] lastExtentName;
179 long lastNumber;
180 int lastBegin;
181 boolean lastFlush = false;
182
183 public ShreddedWriter(ArrayOutput output) {
184 this.output = output;
185 }
186
187 public void close() throws IOException {
188 flush();
189 }
190
191 public void processExtentName(byte[] extentName) {
192 lastExtentName = extentName;
193 buffer.processExtentName(extentName);
194 }
195 public void processNumber(long number) {
196 lastNumber = number;
197 buffer.processNumber(number);
198 }
199 public void processBegin(int begin) {
200 lastBegin = begin;
201 buffer.processBegin(begin);
202 }
203 public final void processTuple(int end) throws IOException {
204 if (lastFlush) {
205 if(buffer.extentNames.size() == 0) buffer.processExtentName(lastExtentName);
206 if(buffer.numbers.size() == 0) buffer.processNumber(lastNumber);
207 if(buffer.begins.size() == 0) buffer.processBegin(lastBegin);
208 lastFlush = false;
209 }
210 buffer.processTuple(end);
211 if (buffer.isFull())
212 flush();
213 }
214 public final void flushTuples(int pauseIndex) throws IOException {
215
216 while (buffer.getReadIndex() < pauseIndex) {
217
218 output.writeInt(buffer.getEnd());
219 buffer.incrementTuple();
220 }
221 }
222 public final void flushExtentName(int pauseIndex) throws IOException {
223 while (buffer.getReadIndex() < pauseIndex) {
224 int nextPause = buffer.getExtentNameEndIndex();
225 int count = nextPause - buffer.getReadIndex();
226
227 output.writeBytes(buffer.getExtentName());
228 output.writeInt(count);
229 buffer.incrementExtentName();
230
231 flushNumber(nextPause);
232 assert nextPause == buffer.getReadIndex();
233 }
234 }
235 public final void flushNumber(int pauseIndex) throws IOException {
236 while (buffer.getReadIndex() < pauseIndex) {
237 int nextPause = buffer.getNumberEndIndex();
238 int count = nextPause - buffer.getReadIndex();
239
240 output.writeLong(buffer.getNumber());
241 output.writeInt(count);
242 buffer.incrementNumber();
243
244 flushBegin(nextPause);
245 assert nextPause == buffer.getReadIndex();
246 }
247 }
248 public final void flushBegin(int pauseIndex) throws IOException {
249 while (buffer.getReadIndex() < pauseIndex) {
250 int nextPause = buffer.getBeginEndIndex();
251 int count = nextPause - buffer.getReadIndex();
252
253 output.writeInt(buffer.getBegin());
254 output.writeInt(count);
255 buffer.incrementBegin();
256
257 flushTuples(nextPause);
258 assert nextPause == buffer.getReadIndex();
259 }
260 }
261 public void flush() throws IOException {
262 flushExtentName(buffer.getWriteIndex());
263 buffer.reset();
264 lastFlush = true;
265 }
266 }
267 public static class ShreddedBuffer {
268 ArrayList<byte[]> extentNames = new ArrayList();
269 ArrayList<Long> numbers = new ArrayList();
270 ArrayList<Integer> begins = new ArrayList();
271 ArrayList<Integer> extentNameTupleIdx = new ArrayList();
272 ArrayList<Integer> numberTupleIdx = new ArrayList();
273 ArrayList<Integer> beginTupleIdx = new ArrayList();
274 int extentNameReadIdx = 0;
275 int numberReadIdx = 0;
276 int beginReadIdx = 0;
277
278 int[] ends;
279 int writeTupleIndex = 0;
280 int readTupleIndex = 0;
281 int batchSize;
282
283 public ShreddedBuffer(int batchSize) {
284 this.batchSize = batchSize;
285
286 ends = new int[batchSize];
287 }
288
289 public ShreddedBuffer() {
290 this(10000);
291 }
292
293 public void processExtentName(byte[] extentName) {
294 extentNames.add(extentName);
295 extentNameTupleIdx.add(writeTupleIndex);
296 }
297 public void processNumber(long number) {
298 numbers.add(number);
299 numberTupleIdx.add(writeTupleIndex);
300 }
301 public void processBegin(int begin) {
302 begins.add(begin);
303 beginTupleIdx.add(writeTupleIndex);
304 }
305 public void processTuple(int end) {
306 assert extentNames.size() > 0;
307 assert numbers.size() > 0;
308 assert begins.size() > 0;
309 ends[writeTupleIndex] = end;
310 writeTupleIndex++;
311 }
312 public void resetData() {
313 extentNames.clear();
314 numbers.clear();
315 begins.clear();
316 extentNameTupleIdx.clear();
317 numberTupleIdx.clear();
318 beginTupleIdx.clear();
319 writeTupleIndex = 0;
320 }
321
322 public void resetRead() {
323 readTupleIndex = 0;
324 extentNameReadIdx = 0;
325 numberReadIdx = 0;
326 beginReadIdx = 0;
327 }
328
329 public void reset() {
330 resetData();
331 resetRead();
332 }
333 public boolean isFull() {
334 return writeTupleIndex >= batchSize;
335 }
336
337 public boolean isEmpty() {
338 return writeTupleIndex == 0;
339 }
340
341 public boolean isAtEnd() {
342 return readTupleIndex >= writeTupleIndex;
343 }
344 public void incrementExtentName() {
345 extentNameReadIdx++;
346 }
347
348 public void autoIncrementExtentName() {
349 while (readTupleIndex >= getExtentNameEndIndex() && readTupleIndex < writeTupleIndex)
350 extentNameReadIdx++;
351 }
352 public void incrementNumber() {
353 numberReadIdx++;
354 }
355
356 public void autoIncrementNumber() {
357 while (readTupleIndex >= getNumberEndIndex() && readTupleIndex < writeTupleIndex)
358 numberReadIdx++;
359 }
360 public void incrementBegin() {
361 beginReadIdx++;
362 }
363
364 public void autoIncrementBegin() {
365 while (readTupleIndex >= getBeginEndIndex() && readTupleIndex < writeTupleIndex)
366 beginReadIdx++;
367 }
368 public void incrementTuple() {
369 readTupleIndex++;
370 }
371 public int getExtentNameEndIndex() {
372 if ((extentNameReadIdx+1) >= extentNameTupleIdx.size())
373 return writeTupleIndex;
374 return extentNameTupleIdx.get(extentNameReadIdx+1);
375 }
376
377 public int getNumberEndIndex() {
378 if ((numberReadIdx+1) >= numberTupleIdx.size())
379 return writeTupleIndex;
380 return numberTupleIdx.get(numberReadIdx+1);
381 }
382
383 public int getBeginEndIndex() {
384 if ((beginReadIdx+1) >= beginTupleIdx.size())
385 return writeTupleIndex;
386 return beginTupleIdx.get(beginReadIdx+1);
387 }
388 public int getReadIndex() {
389 return readTupleIndex;
390 }
391
392 public int getWriteIndex() {
393 return writeTupleIndex;
394 }
395 public byte[] getExtentName() {
396 assert readTupleIndex < writeTupleIndex;
397 assert extentNameReadIdx < extentNames.size();
398
399 return extentNames.get(extentNameReadIdx);
400 }
401 public long getNumber() {
402 assert readTupleIndex < writeTupleIndex;
403 assert numberReadIdx < numbers.size();
404
405 return numbers.get(numberReadIdx);
406 }
407 public int getBegin() {
408 assert readTupleIndex < writeTupleIndex;
409 assert beginReadIdx < begins.size();
410
411 return begins.get(beginReadIdx);
412 }
413 public int getEnd() {
414 assert readTupleIndex < writeTupleIndex;
415 return ends[readTupleIndex];
416 }
417 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
418 while (getReadIndex() < endIndex) {
419 output.processTuple(getEnd());
420 incrementTuple();
421 }
422 }
423 public void copyUntilIndexExtentName(int endIndex, ShreddedProcessor output) throws IOException {
424 while (getReadIndex() < endIndex) {
425 output.processExtentName(getExtentName());
426 assert getExtentNameEndIndex() <= endIndex;
427 copyUntilIndexNumber(getExtentNameEndIndex(), output);
428 incrementExtentName();
429 }
430 }
431 public void copyUntilIndexNumber(int endIndex, ShreddedProcessor output) throws IOException {
432 while (getReadIndex() < endIndex) {
433 output.processNumber(getNumber());
434 assert getNumberEndIndex() <= endIndex;
435 copyUntilIndexBegin(getNumberEndIndex(), output);
436 incrementNumber();
437 }
438 }
439 public void copyUntilIndexBegin(int endIndex, ShreddedProcessor output) throws IOException {
440 while (getReadIndex() < endIndex) {
441 output.processBegin(getBegin());
442 assert getBeginEndIndex() <= endIndex;
443 copyTuples(getBeginEndIndex(), output);
444 incrementBegin();
445 }
446 }
447 public void copyUntilExtentName(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
448 while (!isAtEnd()) {
449 if (other != null) {
450 assert !other.isAtEnd();
451 int c = + Utility.compare(getExtentName(), other.getExtentName());
452
453 if (c > 0) {
454 break;
455 }
456
457 output.processExtentName(getExtentName());
458
459 if (c < 0) {
460 copyUntilIndexNumber(getExtentNameEndIndex(), output);
461 } else if (c == 0) {
462 copyUntilNumber(other, output);
463 autoIncrementExtentName();
464 break;
465 }
466 } else {
467 output.processExtentName(getExtentName());
468 copyUntilIndexNumber(getExtentNameEndIndex(), output);
469 }
470 incrementExtentName();
471
472
473 }
474 }
475 public void copyUntilNumber(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
476 while (!isAtEnd()) {
477 if (other != null) {
478 assert !other.isAtEnd();
479 int c = + Utility.compare(getNumber(), other.getNumber());
480
481 if (c > 0) {
482 break;
483 }
484
485 output.processNumber(getNumber());
486
487 if (c < 0) {
488 copyUntilIndexBegin(getNumberEndIndex(), output);
489 } else if (c == 0) {
490 copyUntilBegin(other, output);
491 autoIncrementNumber();
492 break;
493 }
494 } else {
495 output.processNumber(getNumber());
496 copyUntilIndexBegin(getNumberEndIndex(), output);
497 }
498 incrementNumber();
499
500 if (getExtentNameEndIndex() <= readTupleIndex)
501 break;
502 }
503 }
504 public void copyUntilBegin(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
505 while (!isAtEnd()) {
506 if (other != null) {
507 assert !other.isAtEnd();
508 int c = + Utility.compare(getBegin(), other.getBegin());
509
510 if (c > 0) {
511 break;
512 }
513
514 output.processBegin(getBegin());
515
516 copyTuples(getBeginEndIndex(), output);
517 } else {
518 output.processBegin(getBegin());
519 copyTuples(getBeginEndIndex(), output);
520 }
521 incrementBegin();
522
523 if (getNumberEndIndex() <= readTupleIndex)
524 break;
525 }
526 }
527 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
528 copyUntilExtentName(other, output);
529 }
530
531 }
532 public static class ShreddedCombiner implements ReaderSource<NumberedExtent>, ShreddedSource {
533 public ShreddedProcessor processor;
534 Collection<ShreddedReader> readers;
535 boolean closeOnExit = false;
536 boolean uninitialized = true;
537 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
538
539 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
540 this.readers = readers;
541 this.closeOnExit = closeOnExit;
542 }
543
544 public void setProcessor(Step processor) throws IncompatibleProcessorException {
545 if (processor instanceof ShreddedProcessor) {
546 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
547 } else if (processor instanceof NumberedExtent.Processor) {
548 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedExtent.Processor) processor));
549 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
550 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedExtent>) processor));
551 } else {
552 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
553 }
554 }
555
556 public Class<NumberedExtent> getOutputClass() {
557 return NumberedExtent.class;
558 }
559
560 public void initialize() throws IOException {
561 for (ShreddedReader reader : readers) {
562 reader.fill();
563
564 if (!reader.getBuffer().isAtEnd())
565 queue.add(reader);
566 }
567
568 uninitialized = false;
569 }
570
571 public void run() throws IOException {
572 initialize();
573
574 while (queue.size() > 0) {
575 ShreddedReader top = queue.poll();
576 ShreddedReader next = null;
577 ShreddedBuffer nextBuffer = null;
578
579 assert !top.getBuffer().isAtEnd();
580
581 if (queue.size() > 0) {
582 next = queue.peek();
583 nextBuffer = next.getBuffer();
584 assert !nextBuffer.isAtEnd();
585 }
586
587 top.getBuffer().copyUntil(nextBuffer, processor);
588 if (top.getBuffer().isAtEnd())
589 top.fill();
590
591 if (!top.getBuffer().isAtEnd())
592 queue.add(top);
593 }
594
595 if (closeOnExit)
596 processor.close();
597 }
598
599 public NumberedExtent read() throws IOException {
600 if (uninitialized)
601 initialize();
602
603 NumberedExtent result = null;
604
605 while (queue.size() > 0) {
606 ShreddedReader top = queue.poll();
607 result = top.read();
608
609 if (result != null) {
610 if (top.getBuffer().isAtEnd())
611 top.fill();
612
613 queue.offer(top);
614 break;
615 }
616 }
617
618 return result;
619 }
620 }
621 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedExtent>, ShreddedSource {
622 public ShreddedProcessor processor;
623 ShreddedBuffer buffer;
624 NumberedExtent last = new NumberedExtent();
625 long updateExtentNameCount = -1;
626 long updateNumberCount = -1;
627 long updateBeginCount = -1;
628 long tupleCount = 0;
629 long bufferStartCount = 0;
630 ArrayInput input;
631
632 public ShreddedReader(ArrayInput input) {
633 this.input = input;
634 this.buffer = new ShreddedBuffer();
635 }
636
637 public ShreddedReader(ArrayInput input, int bufferSize) {
638 this.input = input;
639 this.buffer = new ShreddedBuffer(bufferSize);
640 }
641
642 public final int compareTo(ShreddedReader other) {
643 ShreddedBuffer otherBuffer = other.getBuffer();
644
645 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
646 return 0;
647 } else if (buffer.isAtEnd()) {
648 return -1;
649 } else if (otherBuffer.isAtEnd()) {
650 return 1;
651 }
652
653 int result = 0;
654 do {
655 result = + Utility.compare(buffer.getExtentName(), otherBuffer.getExtentName());
656 if(result != 0) break;
657 result = + Utility.compare(buffer.getNumber(), otherBuffer.getNumber());
658 if(result != 0) break;
659 result = + Utility.compare(buffer.getBegin(), otherBuffer.getBegin());
660 if(result != 0) break;
661 } while (false);
662
663 return result;
664 }
665
666 public final ShreddedBuffer getBuffer() {
667 return buffer;
668 }
669
670 public final NumberedExtent read() throws IOException {
671 if (buffer.isAtEnd()) {
672 fill();
673
674 if (buffer.isAtEnd()) {
675 return null;
676 }
677 }
678
679 assert !buffer.isAtEnd();
680 NumberedExtent result = new NumberedExtent();
681
682 result.extentName = buffer.getExtentName();
683 result.number = buffer.getNumber();
684 result.begin = buffer.getBegin();
685 result.end = buffer.getEnd();
686
687 buffer.incrementTuple();
688 buffer.autoIncrementExtentName();
689 buffer.autoIncrementNumber();
690 buffer.autoIncrementBegin();
691
692 return result;
693 }
694
695 public final void fill() throws IOException {
696 try {
697 buffer.reset();
698
699 if (tupleCount != 0) {
700
701 if(updateExtentNameCount - tupleCount > 0) {
702 buffer.extentNames.add(last.extentName);
703 buffer.extentNameTupleIdx.add((int) (updateExtentNameCount - tupleCount));
704 }
705 if(updateNumberCount - tupleCount > 0) {
706 buffer.numbers.add(last.number);
707 buffer.numberTupleIdx.add((int) (updateNumberCount - tupleCount));
708 }
709 if(updateBeginCount - tupleCount > 0) {
710 buffer.begins.add(last.begin);
711 buffer.beginTupleIdx.add((int) (updateBeginCount - tupleCount));
712 }
713 bufferStartCount = tupleCount;
714 }
715
716 while (!buffer.isFull()) {
717 updateBegin();
718 buffer.processTuple(input.readInt());
719 tupleCount++;
720 }
721 } catch(EOFException e) {}
722 }
723
724 public final void updateExtentName() throws IOException {
725 if (updateExtentNameCount > tupleCount)
726 return;
727
728 last.extentName = input.readBytes();
729 updateExtentNameCount = tupleCount + input.readInt();
730
731 buffer.processExtentName(last.extentName);
732 }
733 public final void updateNumber() throws IOException {
734 if (updateNumberCount > tupleCount)
735 return;
736
737 updateExtentName();
738 last.number = input.readLong();
739 updateNumberCount = tupleCount + input.readInt();
740
741 buffer.processNumber(last.number);
742 }
743 public final void updateBegin() throws IOException {
744 if (updateBeginCount > tupleCount)
745 return;
746
747 updateNumber();
748 last.begin = input.readInt();
749 updateBeginCount = tupleCount + input.readInt();
750
751 buffer.processBegin(last.begin);
752 }
753
754 public void run() throws IOException {
755 while (true) {
756 fill();
757
758 if (buffer.isAtEnd())
759 break;
760
761 buffer.copyUntil(null, processor);
762 }
763 processor.close();
764 }
765
766 public void setProcessor(Step processor) throws IncompatibleProcessorException {
767 if (processor instanceof ShreddedProcessor) {
768 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
769 } else if (processor instanceof NumberedExtent.Processor) {
770 this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedExtent.Processor) processor));
771 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
772 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedExtent>) processor));
773 } else {
774 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
775 }
776 }
777
778 public Class<NumberedExtent> getOutputClass() {
779 return NumberedExtent.class;
780 }
781 }
782
783 public static class DuplicateEliminator implements ShreddedProcessor {
784 public ShreddedProcessor processor;
785 NumberedExtent last = new NumberedExtent();
786 boolean extentNameProcess = true;
787 boolean numberProcess = true;
788 boolean beginProcess = true;
789
790 public DuplicateEliminator() {}
791 public DuplicateEliminator(ShreddedProcessor processor) {
792 this.processor = processor;
793 }
794
795 public void setShreddedProcessor(ShreddedProcessor processor) {
796 this.processor = processor;
797 }
798
799 public void processExtentName(byte[] extentName) throws IOException {
800 if (extentNameProcess || Utility.compare(extentName, last.extentName) != 0) {
801 last.extentName = extentName;
802 processor.processExtentName(extentName);
803 resetNumber();
804 extentNameProcess = false;
805 }
806 }
807 public void processNumber(long number) throws IOException {
808 if (numberProcess || Utility.compare(number, last.number) != 0) {
809 last.number = number;
810 processor.processNumber(number);
811 resetBegin();
812 numberProcess = false;
813 }
814 }
815 public void processBegin(int begin) throws IOException {
816 if (beginProcess || Utility.compare(begin, last.begin) != 0) {
817 last.begin = begin;
818 processor.processBegin(begin);
819 beginProcess = false;
820 }
821 }
822
823 public void resetExtentName() {
824 extentNameProcess = true;
825 resetNumber();
826 }
827 public void resetNumber() {
828 numberProcess = true;
829 resetBegin();
830 }
831 public void resetBegin() {
832 beginProcess = true;
833 }
834
835 public void processTuple(int end) throws IOException {
836 processor.processTuple(end);
837 }
838
839 public void close() throws IOException {
840 processor.close();
841 }
842 }
843 public static class TupleUnshredder implements ShreddedProcessor {
844 NumberedExtent last = new NumberedExtent();
845 public org.galagosearch.tupleflow.Processor<NumberedExtent> processor;
846
847 public TupleUnshredder(NumberedExtent.Processor processor) {
848 this.processor = processor;
849 }
850
851 public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedExtent> processor) {
852 this.processor = processor;
853 }
854
855 public NumberedExtent clone(NumberedExtent object) {
856 NumberedExtent result = new NumberedExtent();
857 if (object == null) return result;
858 result.extentName = object.extentName;
859 result.number = object.number;
860 result.begin = object.begin;
861 result.end = object.end;
862 return result;
863 }
864
865 public void processExtentName(byte[] extentName) throws IOException {
866 last.extentName = extentName;
867 }
868
869 public void processNumber(long number) throws IOException {
870 last.number = number;
871 }
872
873 public void processBegin(int begin) throws IOException {
874 last.begin = begin;
875 }
876
877
878 public void processTuple(int end) throws IOException {
879 last.end = end;
880 processor.process(clone(last));
881 }
882
883 public void close() throws IOException {
884 processor.close();
885 }
886 }
887 public static class TupleShredder implements Processor {
888 NumberedExtent last = new NumberedExtent();
889 public ShreddedProcessor processor;
890
891 public TupleShredder(ShreddedProcessor processor) {
892 this.processor = processor;
893 }
894
895 public NumberedExtent clone(NumberedExtent object) {
896 NumberedExtent result = new NumberedExtent();
897 if (object == null) return result;
898 result.extentName = object.extentName;
899 result.number = object.number;
900 result.begin = object.begin;
901 result.end = object.end;
902 return result;
903 }
904
905 public void process(NumberedExtent object) throws IOException {
906 boolean processAll = false;
907 if(last == null || Utility.compare(last.extentName, object.extentName) != 0 || processAll) { processor.processExtentName(object.extentName); processAll = true; }
908 if(last == null || Utility.compare(last.number, object.number) != 0 || processAll) { processor.processNumber(object.number); processAll = true; }
909 if(last == null || Utility.compare(last.begin, object.begin) != 0 || processAll) { processor.processBegin(object.begin); processAll = true; }
910 processor.processTuple(object.end);
911 }
912
913 public Class<NumberedExtent> getInputClass() {
914 return NumberedExtent.class;
915 }
916
917 public void close() throws IOException {
918 processor.close();
919 }
920 }
921 }
922 }