1
2
3 package org.galagosearch.tupleflow.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class FileName implements Type<FileName> {
25 public String filename;
26
27 public FileName() {}
28 public FileName(String filename) {
29 this.filename = filename;
30 }
31
32 public String toString() {
33 return String.format("%s",
34 filename);
35 }
36
37 public Order<FileName> getOrder(String... spec) {
38 if (Arrays.equals(spec, new String[] { })) {
39 return new Unordered();
40 }
41 if (Arrays.equals(spec, new String[] { "+filename" })) {
42 return new FilenameOrder();
43 }
44 return null;
45 }
46
47 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<FileName> {
48 public void process(FileName object) throws IOException;
49 public void close() throws IOException;
50 }
51 public interface Source extends Step {
52 }
53 public static class Unordered implements Order<FileName> {
54 public int hash(FileName object) {
55 int h = 0;
56 return h;
57 }
58 public Comparator<FileName> greaterThan() {
59 return new Comparator<FileName>() {
60 public int compare(FileName one, FileName two) {
61 int result = 0;
62 do {
63 } while (false);
64 return -result;
65 }
66 };
67 }
68 public Comparator<FileName> lessThan() {
69 return new Comparator<FileName>() {
70 public int compare(FileName one, FileName two) {
71 int result = 0;
72 do {
73 } while (false);
74 return result;
75 }
76 };
77 }
78 public TypeReader<FileName> orderedReader(ArrayInput _input) {
79 return new ShreddedReader(_input);
80 }
81
82 public TypeReader<FileName> orderedReader(ArrayInput _input, int bufferSize) {
83 return new ShreddedReader(_input, bufferSize);
84 }
85 public OrderedWriter<FileName> orderedWriter(ArrayOutput _output) {
86 ShreddedWriter w = new ShreddedWriter(_output);
87 return new OrderedWriterClass(w);
88 }
89 public static class OrderedWriterClass extends OrderedWriter< FileName > {
90 FileName last = null;
91 ShreddedWriter shreddedWriter = null;
92
93 public OrderedWriterClass(ShreddedWriter s) {
94 this.shreddedWriter = s;
95 }
96
97 public void process(FileName object) throws IOException {
98 boolean processAll = false;
99 shreddedWriter.processTuple(object.filename);
100 last = object;
101 }
102
103 public void close() throws IOException {
104 shreddedWriter.close();
105 }
106
107 public Class<FileName> getInputClass() {
108 return FileName.class;
109 }
110 }
111 public ReaderSource<FileName> orderedCombiner(Collection<TypeReader<FileName>> readers, boolean closeOnExit) {
112 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
113
114 for (TypeReader<FileName> reader : readers) {
115 shreddedReaders.add((ShreddedReader)reader);
116 }
117
118 return new ShreddedCombiner(shreddedReaders, closeOnExit);
119 }
120 public FileName clone(FileName object) {
121 FileName result = new FileName();
122 if (object == null) return result;
123 result.filename = object.filename;
124 return result;
125 }
126 public Class<FileName> getOrderedClass() {
127 return FileName.class;
128 }
129 public String[] getOrderSpec() {
130 return new String[] {};
131 }
132
133 public static String getSpecString() {
134 return "";
135 }
136
137 public interface ShreddedProcessor extends Step {
138 public void processTuple(String filename) throws IOException;
139 public void close() throws IOException;
140 }
141 public interface ShreddedSource extends Step {
142 }
143
144 public static class ShreddedWriter implements ShreddedProcessor {
145 ArrayOutput output;
146 ShreddedBuffer buffer = new ShreddedBuffer();
147 boolean lastFlush = false;
148
149 public ShreddedWriter(ArrayOutput output) {
150 this.output = output;
151 }
152
153 public void close() throws IOException {
154 flush();
155 }
156
157 public final void processTuple(String filename) throws IOException {
158 if (lastFlush) {
159 lastFlush = false;
160 }
161 buffer.processTuple(filename);
162 if (buffer.isFull())
163 flush();
164 }
165 public final void flushTuples(int pauseIndex) throws IOException {
166
167 while (buffer.getReadIndex() < pauseIndex) {
168
169 output.writeString(buffer.getFilename());
170 buffer.incrementTuple();
171 }
172 }
173 public void flush() throws IOException {
174 flushTuples(buffer.getWriteIndex());
175 buffer.reset();
176 lastFlush = true;
177 }
178 }
179 public static class ShreddedBuffer {
180
181 String[] filenames;
182 int writeTupleIndex = 0;
183 int readTupleIndex = 0;
184 int batchSize;
185
186 public ShreddedBuffer(int batchSize) {
187 this.batchSize = batchSize;
188
189 filenames = new String[batchSize];
190 }
191
192 public ShreddedBuffer() {
193 this(10000);
194 }
195
196 public void processTuple(String filename) {
197 filenames[writeTupleIndex] = filename;
198 writeTupleIndex++;
199 }
200 public void resetData() {
201 writeTupleIndex = 0;
202 }
203
204 public void resetRead() {
205 readTupleIndex = 0;
206 }
207
208 public void reset() {
209 resetData();
210 resetRead();
211 }
212 public boolean isFull() {
213 return writeTupleIndex >= batchSize;
214 }
215
216 public boolean isEmpty() {
217 return writeTupleIndex == 0;
218 }
219
220 public boolean isAtEnd() {
221 return readTupleIndex >= writeTupleIndex;
222 }
223 public void incrementTuple() {
224 readTupleIndex++;
225 }
226 public int getReadIndex() {
227 return readTupleIndex;
228 }
229
230 public int getWriteIndex() {
231 return writeTupleIndex;
232 }
233 public String getFilename() {
234 assert readTupleIndex < writeTupleIndex;
235 return filenames[readTupleIndex];
236 }
237 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
238 while (getReadIndex() < endIndex) {
239 output.processTuple(getFilename());
240 incrementTuple();
241 }
242 }
243
244 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
245 }
246
247 }
248 public static class ShreddedCombiner implements ReaderSource<FileName>, ShreddedSource {
249 public ShreddedProcessor processor;
250 Collection<ShreddedReader> readers;
251 boolean closeOnExit = false;
252 boolean uninitialized = true;
253 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
254
255 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
256 this.readers = readers;
257 this.closeOnExit = closeOnExit;
258 }
259
260 public void setProcessor(Step processor) throws IncompatibleProcessorException {
261 if (processor instanceof ShreddedProcessor) {
262 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
263 } else if (processor instanceof FileName.Processor) {
264 this.processor = new DuplicateEliminator(new TupleUnshredder((FileName.Processor) processor));
265 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
266 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<FileName>) processor));
267 } else {
268 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
269 }
270 }
271
272 public Class<FileName> getOutputClass() {
273 return FileName.class;
274 }
275
276 public void initialize() throws IOException {
277 for (ShreddedReader reader : readers) {
278 reader.fill();
279
280 if (!reader.getBuffer().isAtEnd())
281 queue.add(reader);
282 }
283
284 uninitialized = false;
285 }
286
287 public void run() throws IOException {
288 initialize();
289
290 while (queue.size() > 0) {
291 ShreddedReader top = queue.poll();
292 ShreddedReader next = null;
293 ShreddedBuffer nextBuffer = null;
294
295 assert !top.getBuffer().isAtEnd();
296
297 if (queue.size() > 0) {
298 next = queue.peek();
299 nextBuffer = next.getBuffer();
300 assert !nextBuffer.isAtEnd();
301 }
302
303 top.getBuffer().copyUntil(nextBuffer, processor);
304 if (top.getBuffer().isAtEnd())
305 top.fill();
306
307 if (!top.getBuffer().isAtEnd())
308 queue.add(top);
309 }
310
311 if (closeOnExit)
312 processor.close();
313 }
314
315 public FileName read() throws IOException {
316 if (uninitialized)
317 initialize();
318
319 FileName result = null;
320
321 while (queue.size() > 0) {
322 ShreddedReader top = queue.poll();
323 result = top.read();
324
325 if (result != null) {
326 if (top.getBuffer().isAtEnd())
327 top.fill();
328
329 queue.offer(top);
330 break;
331 }
332 }
333
334 return result;
335 }
336 }
337 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<FileName>, ShreddedSource {
338 public ShreddedProcessor processor;
339 ShreddedBuffer buffer;
340 FileName last = new FileName();
341 long tupleCount = 0;
342 long bufferStartCount = 0;
343 ArrayInput input;
344
345 public ShreddedReader(ArrayInput input) {
346 this.input = input;
347 this.buffer = new ShreddedBuffer();
348 }
349
350 public ShreddedReader(ArrayInput input, int bufferSize) {
351 this.input = input;
352 this.buffer = new ShreddedBuffer(bufferSize);
353 }
354
355 public final int compareTo(ShreddedReader other) {
356 ShreddedBuffer otherBuffer = other.getBuffer();
357
358 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
359 return 0;
360 } else if (buffer.isAtEnd()) {
361 return -1;
362 } else if (otherBuffer.isAtEnd()) {
363 return 1;
364 }
365
366 int result = 0;
367 do {
368 } while (false);
369
370 return result;
371 }
372
373 public final ShreddedBuffer getBuffer() {
374 return buffer;
375 }
376
377 public final FileName read() throws IOException {
378 if (buffer.isAtEnd()) {
379 fill();
380
381 if (buffer.isAtEnd()) {
382 return null;
383 }
384 }
385
386 assert !buffer.isAtEnd();
387 FileName result = new FileName();
388
389 result.filename = buffer.getFilename();
390
391 buffer.incrementTuple();
392
393 return result;
394 }
395
396 public final void fill() throws IOException {
397 try {
398 buffer.reset();
399
400 if (tupleCount != 0) {
401 bufferStartCount = tupleCount;
402 }
403
404 while (!buffer.isFull()) {
405 buffer.processTuple(input.readString());
406 tupleCount++;
407 }
408 } catch(EOFException e) {}
409 }
410
411
412 public void run() throws IOException {
413 while (true) {
414 fill();
415
416 if (buffer.isAtEnd())
417 break;
418
419 buffer.copyUntil(null, processor);
420 }
421 processor.close();
422 }
423
424 public void setProcessor(Step processor) throws IncompatibleProcessorException {
425 if (processor instanceof ShreddedProcessor) {
426 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
427 } else if (processor instanceof FileName.Processor) {
428 this.processor = new DuplicateEliminator(new TupleUnshredder((FileName.Processor) processor));
429 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
430 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<FileName>) processor));
431 } else {
432 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
433 }
434 }
435
436 public Class<FileName> getOutputClass() {
437 return FileName.class;
438 }
439 }
440
441 public static class DuplicateEliminator implements ShreddedProcessor {
442 public ShreddedProcessor processor;
443 FileName last = new FileName();
444
445 public DuplicateEliminator() {}
446 public DuplicateEliminator(ShreddedProcessor processor) {
447 this.processor = processor;
448 }
449
450 public void setShreddedProcessor(ShreddedProcessor processor) {
451 this.processor = processor;
452 }
453
454
455
456
457 public void processTuple(String filename) throws IOException {
458 processor.processTuple(filename);
459 }
460
461 public void close() throws IOException {
462 processor.close();
463 }
464 }
465 public static class TupleUnshredder implements ShreddedProcessor {
466 FileName last = new FileName();
467 public org.galagosearch.tupleflow.Processor<FileName> processor;
468
469 public TupleUnshredder(FileName.Processor processor) {
470 this.processor = processor;
471 }
472
473 public TupleUnshredder(org.galagosearch.tupleflow.Processor<FileName> processor) {
474 this.processor = processor;
475 }
476
477 public FileName clone(FileName object) {
478 FileName result = new FileName();
479 if (object == null) return result;
480 result.filename = object.filename;
481 return result;
482 }
483
484
485 public void processTuple(String filename) throws IOException {
486 last.filename = filename;
487 processor.process(clone(last));
488 }
489
490 public void close() throws IOException {
491 processor.close();
492 }
493 }
494 public static class TupleShredder implements Processor {
495 FileName last = new FileName();
496 public ShreddedProcessor processor;
497
498 public TupleShredder(ShreddedProcessor processor) {
499 this.processor = processor;
500 }
501
502 public FileName clone(FileName object) {
503 FileName result = new FileName();
504 if (object == null) return result;
505 result.filename = object.filename;
506 return result;
507 }
508
509 public void process(FileName object) throws IOException {
510 boolean processAll = false;
511 processor.processTuple(object.filename);
512 }
513
514 public Class<FileName> getInputClass() {
515 return FileName.class;
516 }
517
518 public void close() throws IOException {
519 processor.close();
520 }
521 }
522 }
523 public static class FilenameOrder implements Order<FileName> {
524 public int hash(FileName object) {
525 int h = 0;
526 h += Utility.hash(object.filename);
527 return h;
528 }
529 public Comparator<FileName> greaterThan() {
530 return new Comparator<FileName>() {
531 public int compare(FileName one, FileName two) {
532 int result = 0;
533 do {
534 result = + Utility.compare(one.filename, two.filename);
535 if(result != 0) break;
536 } while (false);
537 return -result;
538 }
539 };
540 }
541 public Comparator<FileName> lessThan() {
542 return new Comparator<FileName>() {
543 public int compare(FileName one, FileName two) {
544 int result = 0;
545 do {
546 result = + Utility.compare(one.filename, two.filename);
547 if(result != 0) break;
548 } while (false);
549 return result;
550 }
551 };
552 }
553 public TypeReader<FileName> orderedReader(ArrayInput _input) {
554 return new ShreddedReader(_input);
555 }
556
557 public TypeReader<FileName> orderedReader(ArrayInput _input, int bufferSize) {
558 return new ShreddedReader(_input, bufferSize);
559 }
560 public OrderedWriter<FileName> orderedWriter(ArrayOutput _output) {
561 ShreddedWriter w = new ShreddedWriter(_output);
562 return new OrderedWriterClass(w);
563 }
564 public static class OrderedWriterClass extends OrderedWriter< FileName > {
565 FileName last = null;
566 ShreddedWriter shreddedWriter = null;
567
568 public OrderedWriterClass(ShreddedWriter s) {
569 this.shreddedWriter = s;
570 }
571
572 public void process(FileName object) throws IOException {
573 boolean processAll = false;
574 if (processAll || last == null || 0 != Utility.compare(object.filename, last.filename)) { processAll = true; shreddedWriter.processFilename(object.filename); }
575 shreddedWriter.processTuple();
576 last = object;
577 }
578
579 public void close() throws IOException {
580 shreddedWriter.close();
581 }
582
583 public Class<FileName> getInputClass() {
584 return FileName.class;
585 }
586 }
587 public ReaderSource<FileName> orderedCombiner(Collection<TypeReader<FileName>> readers, boolean closeOnExit) {
588 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
589
590 for (TypeReader<FileName> reader : readers) {
591 shreddedReaders.add((ShreddedReader)reader);
592 }
593
594 return new ShreddedCombiner(shreddedReaders, closeOnExit);
595 }
596 public FileName clone(FileName object) {
597 FileName result = new FileName();
598 if (object == null) return result;
599 result.filename = object.filename;
600 return result;
601 }
602 public Class<FileName> getOrderedClass() {
603 return FileName.class;
604 }
605 public String[] getOrderSpec() {
606 return new String[] {"+filename"};
607 }
608
609 public static String getSpecString() {
610 return "+filename";
611 }
612
613 public interface ShreddedProcessor extends Step {
614 public void processFilename(String filename) throws IOException;
615 public void processTuple() throws IOException;
616 public void close() throws IOException;
617 }
618 public interface ShreddedSource extends Step {
619 }
620
621 public static class ShreddedWriter implements ShreddedProcessor {
622 ArrayOutput output;
623 ShreddedBuffer buffer = new ShreddedBuffer();
624 String lastFilename;
625 boolean lastFlush = false;
626
627 public ShreddedWriter(ArrayOutput output) {
628 this.output = output;
629 }
630
631 public void close() throws IOException {
632 flush();
633 }
634
635 public void processFilename(String filename) {
636 lastFilename = filename;
637 buffer.processFilename(filename);
638 }
639 public final void processTuple() throws IOException {
640 if (lastFlush) {
641 if(buffer.filenames.size() == 0) buffer.processFilename(lastFilename);
642 lastFlush = false;
643 }
644 buffer.processTuple();
645 if (buffer.isFull())
646 flush();
647 }
648 public final void flushTuples(int pauseIndex) throws IOException {
649
650 while (buffer.getReadIndex() < pauseIndex) {
651
652 buffer.incrementTuple();
653 }
654 }
655 public final void flushFilename(int pauseIndex) throws IOException {
656 while (buffer.getReadIndex() < pauseIndex) {
657 int nextPause = buffer.getFilenameEndIndex();
658 int count = nextPause - buffer.getReadIndex();
659
660 output.writeString(buffer.getFilename());
661 output.writeInt(count);
662 buffer.incrementFilename();
663
664 flushTuples(nextPause);
665 assert nextPause == buffer.getReadIndex();
666 }
667 }
668 public void flush() throws IOException {
669 flushFilename(buffer.getWriteIndex());
670 buffer.reset();
671 lastFlush = true;
672 }
673 }
674 public static class ShreddedBuffer {
675 ArrayList<String> filenames = new ArrayList();
676 ArrayList<Integer> filenameTupleIdx = new ArrayList();
677 int filenameReadIdx = 0;
678
679 int writeTupleIndex = 0;
680 int readTupleIndex = 0;
681 int batchSize;
682
683 public ShreddedBuffer(int batchSize) {
684 this.batchSize = batchSize;
685
686 }
687
688 public ShreddedBuffer() {
689 this(10000);
690 }
691
692 public void processFilename(String filename) {
693 filenames.add(filename);
694 filenameTupleIdx.add(writeTupleIndex);
695 }
696 public void processTuple() {
697 assert filenames.size() > 0;
698 writeTupleIndex++;
699 }
700 public void resetData() {
701 filenames.clear();
702 filenameTupleIdx.clear();
703 writeTupleIndex = 0;
704 }
705
706 public void resetRead() {
707 readTupleIndex = 0;
708 filenameReadIdx = 0;
709 }
710
711 public void reset() {
712 resetData();
713 resetRead();
714 }
715 public boolean isFull() {
716 return writeTupleIndex >= batchSize;
717 }
718
719 public boolean isEmpty() {
720 return writeTupleIndex == 0;
721 }
722
723 public boolean isAtEnd() {
724 return readTupleIndex >= writeTupleIndex;
725 }
726 public void incrementFilename() {
727 filenameReadIdx++;
728 }
729
730 public void autoIncrementFilename() {
731 while (readTupleIndex >= getFilenameEndIndex() && readTupleIndex < writeTupleIndex)
732 filenameReadIdx++;
733 }
734 public void incrementTuple() {
735 readTupleIndex++;
736 }
737 public int getFilenameEndIndex() {
738 if ((filenameReadIdx+1) >= filenameTupleIdx.size())
739 return writeTupleIndex;
740 return filenameTupleIdx.get(filenameReadIdx+1);
741 }
742 public int getReadIndex() {
743 return readTupleIndex;
744 }
745
746 public int getWriteIndex() {
747 return writeTupleIndex;
748 }
749 public String getFilename() {
750 assert readTupleIndex < writeTupleIndex;
751 assert filenameReadIdx < filenames.size();
752
753 return filenames.get(filenameReadIdx);
754 }
755
756 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
757 while (getReadIndex() < endIndex) {
758 output.processTuple();
759 incrementTuple();
760 }
761 }
762 public void copyUntilIndexFilename(int endIndex, ShreddedProcessor output) throws IOException {
763 while (getReadIndex() < endIndex) {
764 output.processFilename(getFilename());
765 assert getFilenameEndIndex() <= endIndex;
766 copyTuples(getFilenameEndIndex(), output);
767 incrementFilename();
768 }
769 }
770 public void copyUntilFilename(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
771 while (!isAtEnd()) {
772 if (other != null) {
773 assert !other.isAtEnd();
774 int c = + Utility.compare(getFilename(), other.getFilename());
775
776 if (c > 0) {
777 break;
778 }
779
780 output.processFilename(getFilename());
781
782 copyTuples(getFilenameEndIndex(), output);
783 } else {
784 output.processFilename(getFilename());
785 copyTuples(getFilenameEndIndex(), output);
786 }
787 incrementFilename();
788
789
790 }
791 }
792 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
793 copyUntilFilename(other, output);
794 }
795
796 }
797 public static class ShreddedCombiner implements ReaderSource<FileName>, ShreddedSource {
798 public ShreddedProcessor processor;
799 Collection<ShreddedReader> readers;
800 boolean closeOnExit = false;
801 boolean uninitialized = true;
802 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
803
804 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
805 this.readers = readers;
806 this.closeOnExit = closeOnExit;
807 }
808
809 public void setProcessor(Step processor) throws IncompatibleProcessorException {
810 if (processor instanceof ShreddedProcessor) {
811 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
812 } else if (processor instanceof FileName.Processor) {
813 this.processor = new DuplicateEliminator(new TupleUnshredder((FileName.Processor) processor));
814 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
815 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<FileName>) processor));
816 } else {
817 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
818 }
819 }
820
821 public Class<FileName> getOutputClass() {
822 return FileName.class;
823 }
824
825 public void initialize() throws IOException {
826 for (ShreddedReader reader : readers) {
827 reader.fill();
828
829 if (!reader.getBuffer().isAtEnd())
830 queue.add(reader);
831 }
832
833 uninitialized = false;
834 }
835
836 public void run() throws IOException {
837 initialize();
838
839 while (queue.size() > 0) {
840 ShreddedReader top = queue.poll();
841 ShreddedReader next = null;
842 ShreddedBuffer nextBuffer = null;
843
844 assert !top.getBuffer().isAtEnd();
845
846 if (queue.size() > 0) {
847 next = queue.peek();
848 nextBuffer = next.getBuffer();
849 assert !nextBuffer.isAtEnd();
850 }
851
852 top.getBuffer().copyUntil(nextBuffer, processor);
853 if (top.getBuffer().isAtEnd())
854 top.fill();
855
856 if (!top.getBuffer().isAtEnd())
857 queue.add(top);
858 }
859
860 if (closeOnExit)
861 processor.close();
862 }
863
864 public FileName read() throws IOException {
865 if (uninitialized)
866 initialize();
867
868 FileName result = null;
869
870 while (queue.size() > 0) {
871 ShreddedReader top = queue.poll();
872 result = top.read();
873
874 if (result != null) {
875 if (top.getBuffer().isAtEnd())
876 top.fill();
877
878 queue.offer(top);
879 break;
880 }
881 }
882
883 return result;
884 }
885 }
886 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<FileName>, ShreddedSource {
887 public ShreddedProcessor processor;
888 ShreddedBuffer buffer;
889 FileName last = new FileName();
890 long updateFilenameCount = -1;
891 long tupleCount = 0;
892 long bufferStartCount = 0;
893 ArrayInput input;
894
895 public ShreddedReader(ArrayInput input) {
896 this.input = input;
897 this.buffer = new ShreddedBuffer();
898 }
899
900 public ShreddedReader(ArrayInput input, int bufferSize) {
901 this.input = input;
902 this.buffer = new ShreddedBuffer(bufferSize);
903 }
904
905 public final int compareTo(ShreddedReader other) {
906 ShreddedBuffer otherBuffer = other.getBuffer();
907
908 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
909 return 0;
910 } else if (buffer.isAtEnd()) {
911 return -1;
912 } else if (otherBuffer.isAtEnd()) {
913 return 1;
914 }
915
916 int result = 0;
917 do {
918 result = + Utility.compare(buffer.getFilename(), otherBuffer.getFilename());
919 if(result != 0) break;
920 } while (false);
921
922 return result;
923 }
924
925 public final ShreddedBuffer getBuffer() {
926 return buffer;
927 }
928
929 public final FileName read() throws IOException {
930 if (buffer.isAtEnd()) {
931 fill();
932
933 if (buffer.isAtEnd()) {
934 return null;
935 }
936 }
937
938 assert !buffer.isAtEnd();
939 FileName result = new FileName();
940
941 result.filename = buffer.getFilename();
942
943 buffer.incrementTuple();
944 buffer.autoIncrementFilename();
945
946 return result;
947 }
948
949 public final void fill() throws IOException {
950 try {
951 buffer.reset();
952
953 if (tupleCount != 0) {
954
955 if(updateFilenameCount - tupleCount > 0) {
956 buffer.filenames.add(last.filename);
957 buffer.filenameTupleIdx.add((int) (updateFilenameCount - tupleCount));
958 }
959 bufferStartCount = tupleCount;
960 }
961
962 while (!buffer.isFull()) {
963 updateFilename();
964 buffer.processTuple();
965 tupleCount++;
966 }
967 } catch(EOFException e) {}
968 }
969
970 public final void updateFilename() throws IOException {
971 if (updateFilenameCount > tupleCount)
972 return;
973
974 last.filename = input.readString();
975 updateFilenameCount = tupleCount + input.readInt();
976
977 buffer.processFilename(last.filename);
978 }
979
980 public void run() throws IOException {
981 while (true) {
982 fill();
983
984 if (buffer.isAtEnd())
985 break;
986
987 buffer.copyUntil(null, processor);
988 }
989 processor.close();
990 }
991
992 public void setProcessor(Step processor) throws IncompatibleProcessorException {
993 if (processor instanceof ShreddedProcessor) {
994 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
995 } else if (processor instanceof FileName.Processor) {
996 this.processor = new DuplicateEliminator(new TupleUnshredder((FileName.Processor) processor));
997 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
998 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<FileName>) processor));
999 } else {
1000 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1001 }
1002 }
1003
1004 public Class<FileName> getOutputClass() {
1005 return FileName.class;
1006 }
1007 }
1008
1009 public static class DuplicateEliminator implements ShreddedProcessor {
1010 public ShreddedProcessor processor;
1011 FileName last = new FileName();
1012 boolean filenameProcess = true;
1013
1014 public DuplicateEliminator() {}
1015 public DuplicateEliminator(ShreddedProcessor processor) {
1016 this.processor = processor;
1017 }
1018
1019 public void setShreddedProcessor(ShreddedProcessor processor) {
1020 this.processor = processor;
1021 }
1022
1023 public void processFilename(String filename) throws IOException {
1024 if (filenameProcess || Utility.compare(filename, last.filename) != 0) {
1025 last.filename = filename;
1026 processor.processFilename(filename);
1027 filenameProcess = false;
1028 }
1029 }
1030
1031 public void resetFilename() {
1032 filenameProcess = true;
1033 }
1034
1035 public void processTuple() throws IOException {
1036 processor.processTuple();
1037 }
1038
1039 public void close() throws IOException {
1040 processor.close();
1041 }
1042 }
1043 public static class TupleUnshredder implements ShreddedProcessor {
1044 FileName last = new FileName();
1045 public org.galagosearch.tupleflow.Processor<FileName> processor;
1046
1047 public TupleUnshredder(FileName.Processor processor) {
1048 this.processor = processor;
1049 }
1050
1051 public TupleUnshredder(org.galagosearch.tupleflow.Processor<FileName> processor) {
1052 this.processor = processor;
1053 }
1054
1055 public FileName clone(FileName object) {
1056 FileName result = new FileName();
1057 if (object == null) return result;
1058 result.filename = object.filename;
1059 return result;
1060 }
1061
1062 public void processFilename(String filename) throws IOException {
1063 last.filename = filename;
1064 }
1065
1066
1067 public void processTuple() throws IOException {
1068 processor.process(clone(last));
1069 }
1070
1071 public void close() throws IOException {
1072 processor.close();
1073 }
1074 }
1075 public static class TupleShredder implements Processor {
1076 FileName last = new FileName();
1077 public ShreddedProcessor processor;
1078
1079 public TupleShredder(ShreddedProcessor processor) {
1080 this.processor = processor;
1081 }
1082
1083 public FileName clone(FileName object) {
1084 FileName result = new FileName();
1085 if (object == null) return result;
1086 result.filename = object.filename;
1087 return result;
1088 }
1089
1090 public void process(FileName object) throws IOException {
1091 boolean processAll = false;
1092 if(last == null || Utility.compare(last.filename, object.filename) != 0 || processAll) { processor.processFilename(object.filename); processAll = true; }
1093 processor.processTuple();
1094 }
1095
1096 public Class<FileName> getInputClass() {
1097 return FileName.class;
1098 }
1099
1100 public void close() throws IOException {
1101 processor.close();
1102 }
1103 }
1104 }
1105 }