1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class DocumentSplit implements Type<DocumentSplit> {
25 public String fileName;
26 public String fileType;
27 public boolean isCompressed;
28 public byte[] startKey;
29 public byte[] endKey;
30
31 public DocumentSplit() {}
32 public DocumentSplit(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) {
33 this.fileName = fileName;
34 this.fileType = fileType;
35 this.isCompressed = isCompressed;
36 this.startKey = startKey;
37 this.endKey = endKey;
38 }
39
40 public String toString() {
41 try {
42 return String.format("%s,%s,%b,%s,%s",
43 fileName, fileType, isCompressed, new String(startKey, "UTF-8"), new String(endKey, "UTF-8"));
44 } catch(UnsupportedEncodingException e) {
45 throw new RuntimeException("Couldn't convert string to UTF-8.");
46 }
47 }
48
49 public Order<DocumentSplit> getOrder(String... spec) {
50 if (Arrays.equals(spec, new String[] { })) {
51 return new Unordered();
52 }
53 if (Arrays.equals(spec, new String[] { "+fileName", "+startKey" })) {
54 return new FileNameStartKeyOrder();
55 }
56 return null;
57 }
58
59 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentSplit> {
60 public void process(DocumentSplit object) throws IOException;
61 public void close() throws IOException;
62 }
63 public interface Source extends Step {
64 }
65 public static class Unordered implements Order<DocumentSplit> {
66 public int hash(DocumentSplit object) {
67 int h = 0;
68 return h;
69 }
70 public Comparator<DocumentSplit> greaterThan() {
71 return new Comparator<DocumentSplit>() {
72 public int compare(DocumentSplit one, DocumentSplit two) {
73 int result = 0;
74 do {
75 } while (false);
76 return -result;
77 }
78 };
79 }
80 public Comparator<DocumentSplit> lessThan() {
81 return new Comparator<DocumentSplit>() {
82 public int compare(DocumentSplit one, DocumentSplit two) {
83 int result = 0;
84 do {
85 } while (false);
86 return result;
87 }
88 };
89 }
90 public TypeReader<DocumentSplit> orderedReader(ArrayInput _input) {
91 return new ShreddedReader(_input);
92 }
93
94 public TypeReader<DocumentSplit> orderedReader(ArrayInput _input, int bufferSize) {
95 return new ShreddedReader(_input, bufferSize);
96 }
97 public OrderedWriter<DocumentSplit> orderedWriter(ArrayOutput _output) {
98 ShreddedWriter w = new ShreddedWriter(_output);
99 return new OrderedWriterClass(w);
100 }
101 public static class OrderedWriterClass extends OrderedWriter< DocumentSplit > {
102 DocumentSplit last = null;
103 ShreddedWriter shreddedWriter = null;
104
105 public OrderedWriterClass(ShreddedWriter s) {
106 this.shreddedWriter = s;
107 }
108
109 public void process(DocumentSplit object) throws IOException {
110 boolean processAll = false;
111 shreddedWriter.processTuple(object.fileName, object.fileType, object.isCompressed, object.startKey, object.endKey);
112 last = object;
113 }
114
115 public void close() throws IOException {
116 shreddedWriter.close();
117 }
118
119 public Class<DocumentSplit> getInputClass() {
120 return DocumentSplit.class;
121 }
122 }
123 public ReaderSource<DocumentSplit> orderedCombiner(Collection<TypeReader<DocumentSplit>> readers, boolean closeOnExit) {
124 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
125
126 for (TypeReader<DocumentSplit> reader : readers) {
127 shreddedReaders.add((ShreddedReader)reader);
128 }
129
130 return new ShreddedCombiner(shreddedReaders, closeOnExit);
131 }
132 public DocumentSplit clone(DocumentSplit object) {
133 DocumentSplit result = new DocumentSplit();
134 if (object == null) return result;
135 result.fileName = object.fileName;
136 result.fileType = object.fileType;
137 result.isCompressed = object.isCompressed;
138 result.startKey = object.startKey;
139 result.endKey = object.endKey;
140 return result;
141 }
142 public Class<DocumentSplit> getOrderedClass() {
143 return DocumentSplit.class;
144 }
145 public String[] getOrderSpec() {
146 return new String[] {};
147 }
148
149 public static String getSpecString() {
150 return "";
151 }
152
153 public interface ShreddedProcessor extends Step {
154 public void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) throws IOException;
155 public void close() throws IOException;
156 }
157 public interface ShreddedSource extends Step {
158 }
159
160 public static class ShreddedWriter implements ShreddedProcessor {
161 ArrayOutput output;
162 ShreddedBuffer buffer = new ShreddedBuffer();
163 boolean lastFlush = false;
164
165 public ShreddedWriter(ArrayOutput output) {
166 this.output = output;
167 }
168
169 public void close() throws IOException {
170 flush();
171 }
172
173 public final void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) throws IOException {
174 if (lastFlush) {
175 lastFlush = false;
176 }
177 buffer.processTuple(fileName, fileType, isCompressed, startKey, endKey);
178 if (buffer.isFull())
179 flush();
180 }
181 public final void flushTuples(int pauseIndex) throws IOException {
182
183 while (buffer.getReadIndex() < pauseIndex) {
184
185 output.writeString(buffer.getFileName());
186 output.writeString(buffer.getFileType());
187 output.writeBoolean(buffer.getIsCompressed());
188 output.writeBytes(buffer.getStartKey());
189 output.writeBytes(buffer.getEndKey());
190 buffer.incrementTuple();
191 }
192 }
193 public void flush() throws IOException {
194 flushTuples(buffer.getWriteIndex());
195 buffer.reset();
196 lastFlush = true;
197 }
198 }
199 public static class ShreddedBuffer {
200
201 String[] fileNames;
202 String[] fileTypes;
203 boolean[] isCompresseds;
204 byte[][] startKeys;
205 byte[][] endKeys;
206 int writeTupleIndex = 0;
207 int readTupleIndex = 0;
208 int batchSize;
209
210 public ShreddedBuffer(int batchSize) {
211 this.batchSize = batchSize;
212
213 fileNames = new String[batchSize];
214 fileTypes = new String[batchSize];
215 isCompresseds = new boolean[batchSize];
216 startKeys = new byte[batchSize][];
217 endKeys = new byte[batchSize][];
218 }
219
220 public ShreddedBuffer() {
221 this(10000);
222 }
223
224 public void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) {
225 fileNames[writeTupleIndex] = fileName;
226 fileTypes[writeTupleIndex] = fileType;
227 isCompresseds[writeTupleIndex] = isCompressed;
228 startKeys[writeTupleIndex] = startKey;
229 endKeys[writeTupleIndex] = endKey;
230 writeTupleIndex++;
231 }
232 public void resetData() {
233 writeTupleIndex = 0;
234 }
235
236 public void resetRead() {
237 readTupleIndex = 0;
238 }
239
240 public void reset() {
241 resetData();
242 resetRead();
243 }
244 public boolean isFull() {
245 return writeTupleIndex >= batchSize;
246 }
247
248 public boolean isEmpty() {
249 return writeTupleIndex == 0;
250 }
251
252 public boolean isAtEnd() {
253 return readTupleIndex >= writeTupleIndex;
254 }
255 public void incrementTuple() {
256 readTupleIndex++;
257 }
258 public int getReadIndex() {
259 return readTupleIndex;
260 }
261
262 public int getWriteIndex() {
263 return writeTupleIndex;
264 }
265 public String getFileName() {
266 assert readTupleIndex < writeTupleIndex;
267 return fileNames[readTupleIndex];
268 }
269 public String getFileType() {
270 assert readTupleIndex < writeTupleIndex;
271 return fileTypes[readTupleIndex];
272 }
273 public boolean getIsCompressed() {
274 assert readTupleIndex < writeTupleIndex;
275 return isCompresseds[readTupleIndex];
276 }
277 public byte[] getStartKey() {
278 assert readTupleIndex < writeTupleIndex;
279 return startKeys[readTupleIndex];
280 }
281 public byte[] getEndKey() {
282 assert readTupleIndex < writeTupleIndex;
283 return endKeys[readTupleIndex];
284 }
285 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
286 while (getReadIndex() < endIndex) {
287 output.processTuple(getFileName(), getFileType(), getIsCompressed(), getStartKey(), getEndKey());
288 incrementTuple();
289 }
290 }
291
292 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
293 }
294
295 }
296 public static class ShreddedCombiner implements ReaderSource<DocumentSplit>, ShreddedSource {
297 public ShreddedProcessor processor;
298 Collection<ShreddedReader> readers;
299 boolean closeOnExit = false;
300 boolean uninitialized = true;
301 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
302
303 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
304 this.readers = readers;
305 this.closeOnExit = closeOnExit;
306 }
307
308 public void setProcessor(Step processor) throws IncompatibleProcessorException {
309 if (processor instanceof ShreddedProcessor) {
310 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
311 } else if (processor instanceof DocumentSplit.Processor) {
312 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentSplit.Processor) processor));
313 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
314 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentSplit>) processor));
315 } else {
316 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
317 }
318 }
319
320 public Class<DocumentSplit> getOutputClass() {
321 return DocumentSplit.class;
322 }
323
324 public void initialize() throws IOException {
325 for (ShreddedReader reader : readers) {
326 reader.fill();
327
328 if (!reader.getBuffer().isAtEnd())
329 queue.add(reader);
330 }
331
332 uninitialized = false;
333 }
334
335 public void run() throws IOException {
336 initialize();
337
338 while (queue.size() > 0) {
339 ShreddedReader top = queue.poll();
340 ShreddedReader next = null;
341 ShreddedBuffer nextBuffer = null;
342
343 assert !top.getBuffer().isAtEnd();
344
345 if (queue.size() > 0) {
346 next = queue.peek();
347 nextBuffer = next.getBuffer();
348 assert !nextBuffer.isAtEnd();
349 }
350
351 top.getBuffer().copyUntil(nextBuffer, processor);
352 if (top.getBuffer().isAtEnd())
353 top.fill();
354
355 if (!top.getBuffer().isAtEnd())
356 queue.add(top);
357 }
358
359 if (closeOnExit)
360 processor.close();
361 }
362
363 public DocumentSplit read() throws IOException {
364 if (uninitialized)
365 initialize();
366
367 DocumentSplit result = null;
368
369 while (queue.size() > 0) {
370 ShreddedReader top = queue.poll();
371 result = top.read();
372
373 if (result != null) {
374 if (top.getBuffer().isAtEnd())
375 top.fill();
376
377 queue.offer(top);
378 break;
379 }
380 }
381
382 return result;
383 }
384 }
385 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentSplit>, ShreddedSource {
386 public ShreddedProcessor processor;
387 ShreddedBuffer buffer;
388 DocumentSplit last = new DocumentSplit();
389 long tupleCount = 0;
390 long bufferStartCount = 0;
391 ArrayInput input;
392
393 public ShreddedReader(ArrayInput input) {
394 this.input = input;
395 this.buffer = new ShreddedBuffer();
396 }
397
398 public ShreddedReader(ArrayInput input, int bufferSize) {
399 this.input = input;
400 this.buffer = new ShreddedBuffer(bufferSize);
401 }
402
403 public final int compareTo(ShreddedReader other) {
404 ShreddedBuffer otherBuffer = other.getBuffer();
405
406 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
407 return 0;
408 } else if (buffer.isAtEnd()) {
409 return -1;
410 } else if (otherBuffer.isAtEnd()) {
411 return 1;
412 }
413
414 int result = 0;
415 do {
416 } while (false);
417
418 return result;
419 }
420
421 public final ShreddedBuffer getBuffer() {
422 return buffer;
423 }
424
425 public final DocumentSplit read() throws IOException {
426 if (buffer.isAtEnd()) {
427 fill();
428
429 if (buffer.isAtEnd()) {
430 return null;
431 }
432 }
433
434 assert !buffer.isAtEnd();
435 DocumentSplit result = new DocumentSplit();
436
437 result.fileName = buffer.getFileName();
438 result.fileType = buffer.getFileType();
439 result.isCompressed = buffer.getIsCompressed();
440 result.startKey = buffer.getStartKey();
441 result.endKey = buffer.getEndKey();
442
443 buffer.incrementTuple();
444
445 return result;
446 }
447
448 public final void fill() throws IOException {
449 try {
450 buffer.reset();
451
452 if (tupleCount != 0) {
453 bufferStartCount = tupleCount;
454 }
455
456 while (!buffer.isFull()) {
457 buffer.processTuple(input.readString(), input.readString(), input.readBoolean(), input.readBytes(), input.readBytes());
458 tupleCount++;
459 }
460 } catch(EOFException e) {}
461 }
462
463
464 public void run() throws IOException {
465 while (true) {
466 fill();
467
468 if (buffer.isAtEnd())
469 break;
470
471 buffer.copyUntil(null, processor);
472 }
473 processor.close();
474 }
475
476 public void setProcessor(Step processor) throws IncompatibleProcessorException {
477 if (processor instanceof ShreddedProcessor) {
478 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
479 } else if (processor instanceof DocumentSplit.Processor) {
480 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentSplit.Processor) processor));
481 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
482 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentSplit>) processor));
483 } else {
484 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
485 }
486 }
487
488 public Class<DocumentSplit> getOutputClass() {
489 return DocumentSplit.class;
490 }
491 }
492
493 public static class DuplicateEliminator implements ShreddedProcessor {
494 public ShreddedProcessor processor;
495 DocumentSplit last = new DocumentSplit();
496
497 public DuplicateEliminator() {}
498 public DuplicateEliminator(ShreddedProcessor processor) {
499 this.processor = processor;
500 }
501
502 public void setShreddedProcessor(ShreddedProcessor processor) {
503 this.processor = processor;
504 }
505
506
507
508
509 public void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) throws IOException {
510 processor.processTuple(fileName, fileType, isCompressed, startKey, endKey);
511 }
512
513 public void close() throws IOException {
514 processor.close();
515 }
516 }
517 public static class TupleUnshredder implements ShreddedProcessor {
518 DocumentSplit last = new DocumentSplit();
519 public org.galagosearch.tupleflow.Processor<DocumentSplit> processor;
520
521 public TupleUnshredder(DocumentSplit.Processor processor) {
522 this.processor = processor;
523 }
524
525 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentSplit> processor) {
526 this.processor = processor;
527 }
528
529 public DocumentSplit clone(DocumentSplit object) {
530 DocumentSplit result = new DocumentSplit();
531 if (object == null) return result;
532 result.fileName = object.fileName;
533 result.fileType = object.fileType;
534 result.isCompressed = object.isCompressed;
535 result.startKey = object.startKey;
536 result.endKey = object.endKey;
537 return result;
538 }
539
540
541 public void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) throws IOException {
542 last.fileName = fileName;
543 last.fileType = fileType;
544 last.isCompressed = isCompressed;
545 last.startKey = startKey;
546 last.endKey = endKey;
547 processor.process(clone(last));
548 }
549
550 public void close() throws IOException {
551 processor.close();
552 }
553 }
554 public static class TupleShredder implements Processor {
555 DocumentSplit last = new DocumentSplit();
556 public ShreddedProcessor processor;
557
558 public TupleShredder(ShreddedProcessor processor) {
559 this.processor = processor;
560 }
561
562 public DocumentSplit clone(DocumentSplit object) {
563 DocumentSplit result = new DocumentSplit();
564 if (object == null) return result;
565 result.fileName = object.fileName;
566 result.fileType = object.fileType;
567 result.isCompressed = object.isCompressed;
568 result.startKey = object.startKey;
569 result.endKey = object.endKey;
570 return result;
571 }
572
573 public void process(DocumentSplit object) throws IOException {
574 boolean processAll = false;
575 processor.processTuple(object.fileName, object.fileType, object.isCompressed, object.startKey, object.endKey);
576 }
577
578 public Class<DocumentSplit> getInputClass() {
579 return DocumentSplit.class;
580 }
581
582 public void close() throws IOException {
583 processor.close();
584 }
585 }
586 }
587 public static class FileNameStartKeyOrder implements Order<DocumentSplit> {
588 public int hash(DocumentSplit object) {
589 int h = 0;
590 h += Utility.hash(object.fileName);
591 h += Utility.hash(object.startKey);
592 return h;
593 }
594 public Comparator<DocumentSplit> greaterThan() {
595 return new Comparator<DocumentSplit>() {
596 public int compare(DocumentSplit one, DocumentSplit two) {
597 int result = 0;
598 do {
599 result = + Utility.compare(one.fileName, two.fileName);
600 if(result != 0) break;
601 result = + Utility.compare(one.startKey, two.startKey);
602 if(result != 0) break;
603 } while (false);
604 return -result;
605 }
606 };
607 }
608 public Comparator<DocumentSplit> lessThan() {
609 return new Comparator<DocumentSplit>() {
610 public int compare(DocumentSplit one, DocumentSplit two) {
611 int result = 0;
612 do {
613 result = + Utility.compare(one.fileName, two.fileName);
614 if(result != 0) break;
615 result = + Utility.compare(one.startKey, two.startKey);
616 if(result != 0) break;
617 } while (false);
618 return result;
619 }
620 };
621 }
622 public TypeReader<DocumentSplit> orderedReader(ArrayInput _input) {
623 return new ShreddedReader(_input);
624 }
625
626 public TypeReader<DocumentSplit> orderedReader(ArrayInput _input, int bufferSize) {
627 return new ShreddedReader(_input, bufferSize);
628 }
629 public OrderedWriter<DocumentSplit> orderedWriter(ArrayOutput _output) {
630 ShreddedWriter w = new ShreddedWriter(_output);
631 return new OrderedWriterClass(w);
632 }
633 public static class OrderedWriterClass extends OrderedWriter< DocumentSplit > {
634 DocumentSplit last = null;
635 ShreddedWriter shreddedWriter = null;
636
637 public OrderedWriterClass(ShreddedWriter s) {
638 this.shreddedWriter = s;
639 }
640
641 public void process(DocumentSplit object) throws IOException {
642 boolean processAll = false;
643 if (processAll || last == null || 0 != Utility.compare(object.fileName, last.fileName)) { processAll = true; shreddedWriter.processFileName(object.fileName); }
644 if (processAll || last == null || 0 != Utility.compare(object.startKey, last.startKey)) { processAll = true; shreddedWriter.processStartKey(object.startKey); }
645 shreddedWriter.processTuple(object.fileType, object.isCompressed, object.endKey);
646 last = object;
647 }
648
649 public void close() throws IOException {
650 shreddedWriter.close();
651 }
652
653 public Class<DocumentSplit> getInputClass() {
654 return DocumentSplit.class;
655 }
656 }
657 public ReaderSource<DocumentSplit> orderedCombiner(Collection<TypeReader<DocumentSplit>> readers, boolean closeOnExit) {
658 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
659
660 for (TypeReader<DocumentSplit> reader : readers) {
661 shreddedReaders.add((ShreddedReader)reader);
662 }
663
664 return new ShreddedCombiner(shreddedReaders, closeOnExit);
665 }
666 public DocumentSplit clone(DocumentSplit object) {
667 DocumentSplit result = new DocumentSplit();
668 if (object == null) return result;
669 result.fileName = object.fileName;
670 result.fileType = object.fileType;
671 result.isCompressed = object.isCompressed;
672 result.startKey = object.startKey;
673 result.endKey = object.endKey;
674 return result;
675 }
676 public Class<DocumentSplit> getOrderedClass() {
677 return DocumentSplit.class;
678 }
679 public String[] getOrderSpec() {
680 return new String[] {"+fileName", "+startKey"};
681 }
682
683 public static String getSpecString() {
684 return "+fileName +startKey";
685 }
686
687 public interface ShreddedProcessor extends Step {
688 public void processFileName(String fileName) throws IOException;
689 public void processStartKey(byte[] startKey) throws IOException;
690 public void processTuple(String fileType, boolean isCompressed, byte[] endKey) throws IOException;
691 public void close() throws IOException;
692 }
693 public interface ShreddedSource extends Step {
694 }
695
696 public static class ShreddedWriter implements ShreddedProcessor {
697 ArrayOutput output;
698 ShreddedBuffer buffer = new ShreddedBuffer();
699 String lastFileName;
700 byte[] lastStartKey;
701 boolean lastFlush = false;
702
703 public ShreddedWriter(ArrayOutput output) {
704 this.output = output;
705 }
706
707 public void close() throws IOException {
708 flush();
709 }
710
711 public void processFileName(String fileName) {
712 lastFileName = fileName;
713 buffer.processFileName(fileName);
714 }
715 public void processStartKey(byte[] startKey) {
716 lastStartKey = startKey;
717 buffer.processStartKey(startKey);
718 }
719 public final void processTuple(String fileType, boolean isCompressed, byte[] endKey) throws IOException {
720 if (lastFlush) {
721 if(buffer.fileNames.size() == 0) buffer.processFileName(lastFileName);
722 if(buffer.startKeys.size() == 0) buffer.processStartKey(lastStartKey);
723 lastFlush = false;
724 }
725 buffer.processTuple(fileType, isCompressed, endKey);
726 if (buffer.isFull())
727 flush();
728 }
729 public final void flushTuples(int pauseIndex) throws IOException {
730
731 while (buffer.getReadIndex() < pauseIndex) {
732
733 output.writeString(buffer.getFileType());
734 output.writeBoolean(buffer.getIsCompressed());
735 output.writeBytes(buffer.getEndKey());
736 buffer.incrementTuple();
737 }
738 }
739 public final void flushFileName(int pauseIndex) throws IOException {
740 while (buffer.getReadIndex() < pauseIndex) {
741 int nextPause = buffer.getFileNameEndIndex();
742 int count = nextPause - buffer.getReadIndex();
743
744 output.writeString(buffer.getFileName());
745 output.writeInt(count);
746 buffer.incrementFileName();
747
748 flushStartKey(nextPause);
749 assert nextPause == buffer.getReadIndex();
750 }
751 }
752 public final void flushStartKey(int pauseIndex) throws IOException {
753 while (buffer.getReadIndex() < pauseIndex) {
754 int nextPause = buffer.getStartKeyEndIndex();
755 int count = nextPause - buffer.getReadIndex();
756
757 output.writeBytes(buffer.getStartKey());
758 output.writeInt(count);
759 buffer.incrementStartKey();
760
761 flushTuples(nextPause);
762 assert nextPause == buffer.getReadIndex();
763 }
764 }
765 public void flush() throws IOException {
766 flushFileName(buffer.getWriteIndex());
767 buffer.reset();
768 lastFlush = true;
769 }
770 }
771 public static class ShreddedBuffer {
772 ArrayList<String> fileNames = new ArrayList();
773 ArrayList<byte[]> startKeys = new ArrayList();
774 ArrayList<Integer> fileNameTupleIdx = new ArrayList();
775 ArrayList<Integer> startKeyTupleIdx = new ArrayList();
776 int fileNameReadIdx = 0;
777 int startKeyReadIdx = 0;
778
779 String[] fileTypes;
780 boolean[] isCompresseds;
781 byte[][] endKeys;
782 int writeTupleIndex = 0;
783 int readTupleIndex = 0;
784 int batchSize;
785
786 public ShreddedBuffer(int batchSize) {
787 this.batchSize = batchSize;
788
789 fileTypes = new String[batchSize];
790 isCompresseds = new boolean[batchSize];
791 endKeys = new byte[batchSize][];
792 }
793
794 public ShreddedBuffer() {
795 this(10000);
796 }
797
798 public void processFileName(String fileName) {
799 fileNames.add(fileName);
800 fileNameTupleIdx.add(writeTupleIndex);
801 }
802 public void processStartKey(byte[] startKey) {
803 startKeys.add(startKey);
804 startKeyTupleIdx.add(writeTupleIndex);
805 }
806 public void processTuple(String fileType, boolean isCompressed, byte[] endKey) {
807 assert fileNames.size() > 0;
808 assert startKeys.size() > 0;
809 fileTypes[writeTupleIndex] = fileType;
810 isCompresseds[writeTupleIndex] = isCompressed;
811 endKeys[writeTupleIndex] = endKey;
812 writeTupleIndex++;
813 }
814 public void resetData() {
815 fileNames.clear();
816 startKeys.clear();
817 fileNameTupleIdx.clear();
818 startKeyTupleIdx.clear();
819 writeTupleIndex = 0;
820 }
821
822 public void resetRead() {
823 readTupleIndex = 0;
824 fileNameReadIdx = 0;
825 startKeyReadIdx = 0;
826 }
827
828 public void reset() {
829 resetData();
830 resetRead();
831 }
832 public boolean isFull() {
833 return writeTupleIndex >= batchSize;
834 }
835
836 public boolean isEmpty() {
837 return writeTupleIndex == 0;
838 }
839
840 public boolean isAtEnd() {
841 return readTupleIndex >= writeTupleIndex;
842 }
843 public void incrementFileName() {
844 fileNameReadIdx++;
845 }
846
847 public void autoIncrementFileName() {
848 while (readTupleIndex >= getFileNameEndIndex() && readTupleIndex < writeTupleIndex)
849 fileNameReadIdx++;
850 }
851 public void incrementStartKey() {
852 startKeyReadIdx++;
853 }
854
855 public void autoIncrementStartKey() {
856 while (readTupleIndex >= getStartKeyEndIndex() && readTupleIndex < writeTupleIndex)
857 startKeyReadIdx++;
858 }
859 public void incrementTuple() {
860 readTupleIndex++;
861 }
862 public int getFileNameEndIndex() {
863 if ((fileNameReadIdx+1) >= fileNameTupleIdx.size())
864 return writeTupleIndex;
865 return fileNameTupleIdx.get(fileNameReadIdx+1);
866 }
867
868 public int getStartKeyEndIndex() {
869 if ((startKeyReadIdx+1) >= startKeyTupleIdx.size())
870 return writeTupleIndex;
871 return startKeyTupleIdx.get(startKeyReadIdx+1);
872 }
873 public int getReadIndex() {
874 return readTupleIndex;
875 }
876
877 public int getWriteIndex() {
878 return writeTupleIndex;
879 }
880 public String getFileName() {
881 assert readTupleIndex < writeTupleIndex;
882 assert fileNameReadIdx < fileNames.size();
883
884 return fileNames.get(fileNameReadIdx);
885 }
886 public byte[] getStartKey() {
887 assert readTupleIndex < writeTupleIndex;
888 assert startKeyReadIdx < startKeys.size();
889
890 return startKeys.get(startKeyReadIdx);
891 }
892 public String getFileType() {
893 assert readTupleIndex < writeTupleIndex;
894 return fileTypes[readTupleIndex];
895 }
896 public boolean getIsCompressed() {
897 assert readTupleIndex < writeTupleIndex;
898 return isCompresseds[readTupleIndex];
899 }
900 public byte[] getEndKey() {
901 assert readTupleIndex < writeTupleIndex;
902 return endKeys[readTupleIndex];
903 }
904 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
905 while (getReadIndex() < endIndex) {
906 output.processTuple(getFileType(), getIsCompressed(), getEndKey());
907 incrementTuple();
908 }
909 }
910 public void copyUntilIndexFileName(int endIndex, ShreddedProcessor output) throws IOException {
911 while (getReadIndex() < endIndex) {
912 output.processFileName(getFileName());
913 assert getFileNameEndIndex() <= endIndex;
914 copyUntilIndexStartKey(getFileNameEndIndex(), output);
915 incrementFileName();
916 }
917 }
918 public void copyUntilIndexStartKey(int endIndex, ShreddedProcessor output) throws IOException {
919 while (getReadIndex() < endIndex) {
920 output.processStartKey(getStartKey());
921 assert getStartKeyEndIndex() <= endIndex;
922 copyTuples(getStartKeyEndIndex(), output);
923 incrementStartKey();
924 }
925 }
926 public void copyUntilFileName(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
927 while (!isAtEnd()) {
928 if (other != null) {
929 assert !other.isAtEnd();
930 int c = + Utility.compare(getFileName(), other.getFileName());
931
932 if (c > 0) {
933 break;
934 }
935
936 output.processFileName(getFileName());
937
938 if (c < 0) {
939 copyUntilIndexStartKey(getFileNameEndIndex(), output);
940 } else if (c == 0) {
941 copyUntilStartKey(other, output);
942 autoIncrementFileName();
943 break;
944 }
945 } else {
946 output.processFileName(getFileName());
947 copyUntilIndexStartKey(getFileNameEndIndex(), output);
948 }
949 incrementFileName();
950
951
952 }
953 }
954 public void copyUntilStartKey(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
955 while (!isAtEnd()) {
956 if (other != null) {
957 assert !other.isAtEnd();
958 int c = + Utility.compare(getStartKey(), other.getStartKey());
959
960 if (c > 0) {
961 break;
962 }
963
964 output.processStartKey(getStartKey());
965
966 copyTuples(getStartKeyEndIndex(), output);
967 } else {
968 output.processStartKey(getStartKey());
969 copyTuples(getStartKeyEndIndex(), output);
970 }
971 incrementStartKey();
972
973 if (getFileNameEndIndex() <= readTupleIndex)
974 break;
975 }
976 }
977 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
978 copyUntilFileName(other, output);
979 }
980
981 }
982 public static class ShreddedCombiner implements ReaderSource<DocumentSplit>, ShreddedSource {
983 public ShreddedProcessor processor;
984 Collection<ShreddedReader> readers;
985 boolean closeOnExit = false;
986 boolean uninitialized = true;
987 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
988
989 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
990 this.readers = readers;
991 this.closeOnExit = closeOnExit;
992 }
993
994 public void setProcessor(Step processor) throws IncompatibleProcessorException {
995 if (processor instanceof ShreddedProcessor) {
996 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
997 } else if (processor instanceof DocumentSplit.Processor) {
998 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentSplit.Processor) processor));
999 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1000 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentSplit>) processor));
1001 } else {
1002 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1003 }
1004 }
1005
1006 public Class<DocumentSplit> getOutputClass() {
1007 return DocumentSplit.class;
1008 }
1009
1010 public void initialize() throws IOException {
1011 for (ShreddedReader reader : readers) {
1012 reader.fill();
1013
1014 if (!reader.getBuffer().isAtEnd())
1015 queue.add(reader);
1016 }
1017
1018 uninitialized = false;
1019 }
1020
1021 public void run() throws IOException {
1022 initialize();
1023
1024 while (queue.size() > 0) {
1025 ShreddedReader top = queue.poll();
1026 ShreddedReader next = null;
1027 ShreddedBuffer nextBuffer = null;
1028
1029 assert !top.getBuffer().isAtEnd();
1030
1031 if (queue.size() > 0) {
1032 next = queue.peek();
1033 nextBuffer = next.getBuffer();
1034 assert !nextBuffer.isAtEnd();
1035 }
1036
1037 top.getBuffer().copyUntil(nextBuffer, processor);
1038 if (top.getBuffer().isAtEnd())
1039 top.fill();
1040
1041 if (!top.getBuffer().isAtEnd())
1042 queue.add(top);
1043 }
1044
1045 if (closeOnExit)
1046 processor.close();
1047 }
1048
1049 public DocumentSplit read() throws IOException {
1050 if (uninitialized)
1051 initialize();
1052
1053 DocumentSplit result = null;
1054
1055 while (queue.size() > 0) {
1056 ShreddedReader top = queue.poll();
1057 result = top.read();
1058
1059 if (result != null) {
1060 if (top.getBuffer().isAtEnd())
1061 top.fill();
1062
1063 queue.offer(top);
1064 break;
1065 }
1066 }
1067
1068 return result;
1069 }
1070 }
1071 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentSplit>, ShreddedSource {
1072 public ShreddedProcessor processor;
1073 ShreddedBuffer buffer;
1074 DocumentSplit last = new DocumentSplit();
1075 long updateFileNameCount = -1;
1076 long updateStartKeyCount = -1;
1077 long tupleCount = 0;
1078 long bufferStartCount = 0;
1079 ArrayInput input;
1080
1081 public ShreddedReader(ArrayInput input) {
1082 this.input = input;
1083 this.buffer = new ShreddedBuffer();
1084 }
1085
1086 public ShreddedReader(ArrayInput input, int bufferSize) {
1087 this.input = input;
1088 this.buffer = new ShreddedBuffer(bufferSize);
1089 }
1090
1091 public final int compareTo(ShreddedReader other) {
1092 ShreddedBuffer otherBuffer = other.getBuffer();
1093
1094 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1095 return 0;
1096 } else if (buffer.isAtEnd()) {
1097 return -1;
1098 } else if (otherBuffer.isAtEnd()) {
1099 return 1;
1100 }
1101
1102 int result = 0;
1103 do {
1104 result = + Utility.compare(buffer.getFileName(), otherBuffer.getFileName());
1105 if(result != 0) break;
1106 result = + Utility.compare(buffer.getStartKey(), otherBuffer.getStartKey());
1107 if(result != 0) break;
1108 } while (false);
1109
1110 return result;
1111 }
1112
1113 public final ShreddedBuffer getBuffer() {
1114 return buffer;
1115 }
1116
1117 public final DocumentSplit read() throws IOException {
1118 if (buffer.isAtEnd()) {
1119 fill();
1120
1121 if (buffer.isAtEnd()) {
1122 return null;
1123 }
1124 }
1125
1126 assert !buffer.isAtEnd();
1127 DocumentSplit result = new DocumentSplit();
1128
1129 result.fileName = buffer.getFileName();
1130 result.startKey = buffer.getStartKey();
1131 result.fileType = buffer.getFileType();
1132 result.isCompressed = buffer.getIsCompressed();
1133 result.endKey = buffer.getEndKey();
1134
1135 buffer.incrementTuple();
1136 buffer.autoIncrementFileName();
1137 buffer.autoIncrementStartKey();
1138
1139 return result;
1140 }
1141
1142 public final void fill() throws IOException {
1143 try {
1144 buffer.reset();
1145
1146 if (tupleCount != 0) {
1147
1148 if(updateFileNameCount - tupleCount > 0) {
1149 buffer.fileNames.add(last.fileName);
1150 buffer.fileNameTupleIdx.add((int) (updateFileNameCount - tupleCount));
1151 }
1152 if(updateStartKeyCount - tupleCount > 0) {
1153 buffer.startKeys.add(last.startKey);
1154 buffer.startKeyTupleIdx.add((int) (updateStartKeyCount - tupleCount));
1155 }
1156 bufferStartCount = tupleCount;
1157 }
1158
1159 while (!buffer.isFull()) {
1160 updateStartKey();
1161 buffer.processTuple(input.readString(), input.readBoolean(), input.readBytes());
1162 tupleCount++;
1163 }
1164 } catch(EOFException e) {}
1165 }
1166
1167 public final void updateFileName() throws IOException {
1168 if (updateFileNameCount > tupleCount)
1169 return;
1170
1171 last.fileName = input.readString();
1172 updateFileNameCount = tupleCount + input.readInt();
1173
1174 buffer.processFileName(last.fileName);
1175 }
1176 public final void updateStartKey() throws IOException {
1177 if (updateStartKeyCount > tupleCount)
1178 return;
1179
1180 updateFileName();
1181 last.startKey = input.readBytes();
1182 updateStartKeyCount = tupleCount + input.readInt();
1183
1184 buffer.processStartKey(last.startKey);
1185 }
1186
1187 public void run() throws IOException {
1188 while (true) {
1189 fill();
1190
1191 if (buffer.isAtEnd())
1192 break;
1193
1194 buffer.copyUntil(null, processor);
1195 }
1196 processor.close();
1197 }
1198
1199 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1200 if (processor instanceof ShreddedProcessor) {
1201 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1202 } else if (processor instanceof DocumentSplit.Processor) {
1203 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentSplit.Processor) processor));
1204 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1205 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentSplit>) processor));
1206 } else {
1207 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1208 }
1209 }
1210
1211 public Class<DocumentSplit> getOutputClass() {
1212 return DocumentSplit.class;
1213 }
1214 }
1215
1216 public static class DuplicateEliminator implements ShreddedProcessor {
1217 public ShreddedProcessor processor;
1218 DocumentSplit last = new DocumentSplit();
1219 boolean fileNameProcess = true;
1220 boolean startKeyProcess = true;
1221
1222 public DuplicateEliminator() {}
1223 public DuplicateEliminator(ShreddedProcessor processor) {
1224 this.processor = processor;
1225 }
1226
1227 public void setShreddedProcessor(ShreddedProcessor processor) {
1228 this.processor = processor;
1229 }
1230
1231 public void processFileName(String fileName) throws IOException {
1232 if (fileNameProcess || Utility.compare(fileName, last.fileName) != 0) {
1233 last.fileName = fileName;
1234 processor.processFileName(fileName);
1235 resetStartKey();
1236 fileNameProcess = false;
1237 }
1238 }
1239 public void processStartKey(byte[] startKey) throws IOException {
1240 if (startKeyProcess || Utility.compare(startKey, last.startKey) != 0) {
1241 last.startKey = startKey;
1242 processor.processStartKey(startKey);
1243 startKeyProcess = false;
1244 }
1245 }
1246
1247 public void resetFileName() {
1248 fileNameProcess = true;
1249 resetStartKey();
1250 }
1251 public void resetStartKey() {
1252 startKeyProcess = true;
1253 }
1254
1255 public void processTuple(String fileType, boolean isCompressed, byte[] endKey) throws IOException {
1256 processor.processTuple(fileType, isCompressed, endKey);
1257 }
1258
1259 public void close() throws IOException {
1260 processor.close();
1261 }
1262 }
1263 public static class TupleUnshredder implements ShreddedProcessor {
1264 DocumentSplit last = new DocumentSplit();
1265 public org.galagosearch.tupleflow.Processor<DocumentSplit> processor;
1266
1267 public TupleUnshredder(DocumentSplit.Processor processor) {
1268 this.processor = processor;
1269 }
1270
1271 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentSplit> processor) {
1272 this.processor = processor;
1273 }
1274
1275 public DocumentSplit clone(DocumentSplit object) {
1276 DocumentSplit result = new DocumentSplit();
1277 if (object == null) return result;
1278 result.fileName = object.fileName;
1279 result.fileType = object.fileType;
1280 result.isCompressed = object.isCompressed;
1281 result.startKey = object.startKey;
1282 result.endKey = object.endKey;
1283 return result;
1284 }
1285
1286 public void processFileName(String fileName) throws IOException {
1287 last.fileName = fileName;
1288 }
1289
1290 public void processStartKey(byte[] startKey) throws IOException {
1291 last.startKey = startKey;
1292 }
1293
1294
1295 public void processTuple(String fileType, boolean isCompressed, byte[] endKey) throws IOException {
1296 last.fileType = fileType;
1297 last.isCompressed = isCompressed;
1298 last.endKey = endKey;
1299 processor.process(clone(last));
1300 }
1301
1302 public void close() throws IOException {
1303 processor.close();
1304 }
1305 }
1306 public static class TupleShredder implements Processor {
1307 DocumentSplit last = new DocumentSplit();
1308 public ShreddedProcessor processor;
1309
1310 public TupleShredder(ShreddedProcessor processor) {
1311 this.processor = processor;
1312 }
1313
1314 public DocumentSplit clone(DocumentSplit object) {
1315 DocumentSplit result = new DocumentSplit();
1316 if (object == null) return result;
1317 result.fileName = object.fileName;
1318 result.fileType = object.fileType;
1319 result.isCompressed = object.isCompressed;
1320 result.startKey = object.startKey;
1321 result.endKey = object.endKey;
1322 return result;
1323 }
1324
1325 public void process(DocumentSplit object) throws IOException {
1326 boolean processAll = false;
1327 if(last == null || Utility.compare(last.fileName, object.fileName) != 0 || processAll) { processor.processFileName(object.fileName); processAll = true; }
1328 if(last == null || Utility.compare(last.startKey, object.startKey) != 0 || processAll) { processor.processStartKey(object.startKey); processAll = true; }
1329 processor.processTuple(object.fileType, object.isCompressed, object.endKey);
1330 }
1331
1332 public Class<DocumentSplit> getInputClass() {
1333 return DocumentSplit.class;
1334 }
1335
1336 public void close() throws IOException {
1337 processor.close();
1338 }
1339 }
1340 }
1341 }