View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.tupleflow.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class FileName implements Type<FileName> {
25      public String filename; 
26      
27      public FileName() {}
28      public FileName(String filename) {
29          this.filename = filename;
30      }  
31      
32      public String toString() {
33              return String.format("%s",
34                                     filename);
35      } 
36  
37      public Order<FileName> getOrder(String... spec) {
38          if (Arrays.equals(spec, new String[] {  })) {
39              return new Unordered();
40          }
41          if (Arrays.equals(spec, new String[] { "+filename" })) {
42              return new FilenameOrder();
43          }
44          return null;
45      } 
46        
47      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<FileName> {
48          public void process(FileName object) throws IOException;
49          public void close() throws IOException;
50      }                        
51      public interface Source extends Step {
52      }
53      public static class Unordered implements Order<FileName> {
54          public int hash(FileName object) {
55              int h = 0;
56              return h;
57          } 
58          public Comparator<FileName> greaterThan() {
59              return new Comparator<FileName>() {
60                  public int compare(FileName one, FileName two) {
61                      int result = 0;
62                      do {
63                      } while (false);
64                      return -result;
65                  }
66              };
67          }     
68          public Comparator<FileName> lessThan() {
69              return new Comparator<FileName>() {
70                  public int compare(FileName one, FileName two) {
71                      int result = 0;
72                      do {
73                      } while (false);
74                      return result;
75                  }
76              };
77          }     
78          public TypeReader<FileName> orderedReader(ArrayInput _input) {
79              return new ShreddedReader(_input);
80          }    
81  
82          public TypeReader<FileName> orderedReader(ArrayInput _input, int bufferSize) {
83              return new ShreddedReader(_input, bufferSize);
84          }    
85          public OrderedWriter<FileName> orderedWriter(ArrayOutput _output) {
86              ShreddedWriter w = new ShreddedWriter(_output);
87              return new OrderedWriterClass(w); 
88          }                                    
89          public static class OrderedWriterClass extends OrderedWriter< FileName > {
90              FileName last = null;
91              ShreddedWriter shreddedWriter = null; 
92              
93              public OrderedWriterClass(ShreddedWriter s) {
94                  this.shreddedWriter = s;
95              }
96              
97              public void process(FileName object) throws IOException {
98                 boolean processAll = false;
99                 shreddedWriter.processTuple(object.filename);
100                last = object;
101             }           
102                  
103             public void close() throws IOException {
104                 shreddedWriter.close();
105             }
106             
107             public Class<FileName> getInputClass() {
108                 return FileName.class;
109             }
110         } 
111         public ReaderSource<FileName> orderedCombiner(Collection<TypeReader<FileName>> readers, boolean closeOnExit) {
112             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
113             
114             for (TypeReader<FileName> reader : readers) {
115                 shreddedReaders.add((ShreddedReader)reader);
116             }
117             
118             return new ShreddedCombiner(shreddedReaders, closeOnExit);
119         }                  
120         public FileName clone(FileName object) {
121             FileName result = new FileName();
122             if (object == null) return result;
123             result.filename = object.filename; 
124             return result;
125         }                 
126         public Class<FileName> getOrderedClass() {
127             return FileName.class;
128         }                           
129         public String[] getOrderSpec() {
130             return new String[] {};
131         }
132 
133         public static String getSpecString() {
134             return "";
135         }
136                            
137         public interface ShreddedProcessor extends Step {
138             public void processTuple(String filename) throws IOException;
139             public void close() throws IOException;
140         }    
141         public interface ShreddedSource extends Step {
142         }                                              
143         
144         public static class ShreddedWriter implements ShreddedProcessor {
145             ArrayOutput output;
146             ShreddedBuffer buffer = new ShreddedBuffer();
147             boolean lastFlush = false;
148             
149             public ShreddedWriter(ArrayOutput output) {
150                 this.output = output;
151             }                        
152             
153             public void close() throws IOException {
154                 flush();
155             }
156             
157             public final void processTuple(String filename) throws IOException {
158                 if (lastFlush) {
159                     lastFlush = false;
160                 }
161                 buffer.processTuple(filename);
162                 if (buffer.isFull())
163                     flush();
164             }
165             public final void flushTuples(int pauseIndex) throws IOException {
166                 
167                 while (buffer.getReadIndex() < pauseIndex) {
168                            
169                     output.writeString(buffer.getFilename());
170                     buffer.incrementTuple();
171                 }
172             }  
173             public void flush() throws IOException { 
174                 flushTuples(buffer.getWriteIndex());
175                 buffer.reset(); 
176                 lastFlush = true;
177             }                           
178         }
179         public static class ShreddedBuffer {
180                             
181             String[] filenames;
182             int writeTupleIndex = 0;
183             int readTupleIndex = 0;
184             int batchSize;
185 
186             public ShreddedBuffer(int batchSize) {
187                 this.batchSize = batchSize;
188 
189                 filenames = new String[batchSize];
190             }                              
191 
192             public ShreddedBuffer() {    
193                 this(10000);
194             }                                                                                                                    
195             
196             public void processTuple(String filename) {
197                 filenames[writeTupleIndex] = filename;
198                 writeTupleIndex++;
199             }
200             public void resetData() {
201                 writeTupleIndex = 0;
202             }                  
203                                  
204             public void resetRead() {
205                 readTupleIndex = 0;
206             } 
207 
208             public void reset() {
209                 resetData();
210                 resetRead();
211             } 
212             public boolean isFull() {
213                 return writeTupleIndex >= batchSize;
214             }
215 
216             public boolean isEmpty() {
217                 return writeTupleIndex == 0;
218             }                          
219 
220             public boolean isAtEnd() {
221                 return readTupleIndex >= writeTupleIndex;
222             }           
223             public void incrementTuple() {
224                 readTupleIndex++;
225             }                    
226             public int getReadIndex() {
227                 return readTupleIndex;
228             }   
229 
230             public int getWriteIndex() {
231                 return writeTupleIndex;
232             } 
233             public String getFilename() {
234                 assert readTupleIndex < writeTupleIndex;
235                 return filenames[readTupleIndex];
236             }                                         
237             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
238                 while (getReadIndex() < endIndex) {
239                    output.processTuple(getFilename());
240                    incrementTuple();
241                 }
242             }                                                                           
243              
244             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
245             }
246             
247         }                         
248         public static class ShreddedCombiner implements ReaderSource<FileName>, ShreddedSource {   
249             public ShreddedProcessor processor;
250             Collection<ShreddedReader> readers;       
251             boolean closeOnExit = false;
252             boolean uninitialized = true;
253             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
254             
255             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
256                 this.readers = readers;                                                       
257                 this.closeOnExit = closeOnExit;
258             }
259                                   
260             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
261                 if (processor instanceof ShreddedProcessor) {
262                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
263                 } else if (processor instanceof FileName.Processor) {
264                     this.processor = new DuplicateEliminator(new TupleUnshredder((FileName.Processor) processor));
265                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
266                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<FileName>) processor));
267                 } else {
268                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
269                 }
270             }                                
271             
272             public Class<FileName> getOutputClass() {
273                 return FileName.class;
274             }
275             
276             public void initialize() throws IOException {
277                 for (ShreddedReader reader : readers) {
278                     reader.fill();                                        
279                     
280                     if (!reader.getBuffer().isAtEnd())
281                         queue.add(reader);
282                 }   
283 
284                 uninitialized = false;
285             }
286 
287             public void run() throws IOException {
288                 initialize();
289                
290                 while (queue.size() > 0) {
291                     ShreddedReader top = queue.poll();
292                     ShreddedReader next = null;
293                     ShreddedBuffer nextBuffer = null; 
294                     
295                     assert !top.getBuffer().isAtEnd();
296                                                   
297                     if (queue.size() > 0) {
298                         next = queue.peek();
299                         nextBuffer = next.getBuffer();
300                         assert !nextBuffer.isAtEnd();
301                     }
302                     
303                     top.getBuffer().copyUntil(nextBuffer, processor);
304                     if (top.getBuffer().isAtEnd())
305                         top.fill();                 
306                         
307                     if (!top.getBuffer().isAtEnd())
308                         queue.add(top);
309                 }              
310                 
311                 if (closeOnExit)
312                     processor.close();
313             }
314 
315             public FileName read() throws IOException {
316                 if (uninitialized)
317                     initialize();
318 
319                 FileName result = null;
320 
321                 while (queue.size() > 0) {
322                     ShreddedReader top = queue.poll();
323                     result = top.read();
324 
325                     if (result != null) {
326                         if (top.getBuffer().isAtEnd())
327                             top.fill();
328 
329                         queue.offer(top);
330                         break;
331                     } 
332                 }
333 
334                 return result;
335             }
336         } 
337         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<FileName>, ShreddedSource {      
338             public ShreddedProcessor processor;
339             ShreddedBuffer buffer;
340             FileName last = new FileName();         
341             long tupleCount = 0;
342             long bufferStartCount = 0;  
343             ArrayInput input;
344             
345             public ShreddedReader(ArrayInput input) {
346                 this.input = input; 
347                 this.buffer = new ShreddedBuffer();
348             }                               
349             
350             public ShreddedReader(ArrayInput input, int bufferSize) { 
351                 this.input = input;
352                 this.buffer = new ShreddedBuffer(bufferSize);
353             }
354                  
355             public final int compareTo(ShreddedReader other) {
356                 ShreddedBuffer otherBuffer = other.getBuffer();
357                 
358                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
359                     return 0;                 
360                 } else if (buffer.isAtEnd()) {
361                     return -1;
362                 } else if (otherBuffer.isAtEnd()) {
363                     return 1;
364                 }
365                                    
366                 int result = 0;
367                 do {
368                 } while (false);                                             
369                 
370                 return result;
371             }
372             
373             public final ShreddedBuffer getBuffer() {
374                 return buffer;
375             }                
376             
377             public final FileName read() throws IOException {
378                 if (buffer.isAtEnd()) {
379                     fill();             
380                 
381                     if (buffer.isAtEnd()) {
382                         return null;
383                     }
384                 }
385                       
386                 assert !buffer.isAtEnd();
387                 FileName result = new FileName();
388                 
389                 result.filename = buffer.getFilename();
390                 
391                 buffer.incrementTuple();
392                 
393                 return result;
394             }           
395             
396             public final void fill() throws IOException {
397                 try {   
398                     buffer.reset();
399                     
400                     if (tupleCount != 0) {
401                         bufferStartCount = tupleCount;
402                     }
403                     
404                     while (!buffer.isFull()) {
405                         buffer.processTuple(input.readString());
406                         tupleCount++;
407                     }
408                 } catch(EOFException e) {}
409             }
410 
411 
412             public void run() throws IOException {
413                 while (true) {
414                     fill();
415                     
416                     if (buffer.isAtEnd())
417                         break;
418                     
419                     buffer.copyUntil(null, processor);
420                 }      
421                 processor.close();
422             }
423             
424             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
425                 if (processor instanceof ShreddedProcessor) {
426                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
427                 } else if (processor instanceof FileName.Processor) {
428                     this.processor = new DuplicateEliminator(new TupleUnshredder((FileName.Processor) processor));
429                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
430                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<FileName>) processor));
431                 } else {
432                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
433                 }
434             }                                
435             
436             public Class<FileName> getOutputClass() {
437                 return FileName.class;
438             }                
439         }
440         
441         public static class DuplicateEliminator implements ShreddedProcessor {
442             public ShreddedProcessor processor;
443             FileName last = new FileName();
444                                            
445             public DuplicateEliminator() {}
446             public DuplicateEliminator(ShreddedProcessor processor) {
447                 this.processor = processor;
448             }
449             
450             public void setShreddedProcessor(ShreddedProcessor processor) {
451                 this.processor = processor;
452             }
453 
454           
455             
456                                
457             public void processTuple(String filename) throws IOException {
458                 processor.processTuple(filename);
459             } 
460             
461             public void close() throws IOException {
462                 processor.close();
463             }                    
464         }
465         public static class TupleUnshredder implements ShreddedProcessor {
466             FileName last = new FileName();
467             public org.galagosearch.tupleflow.Processor<FileName> processor;                               
468             
469             public TupleUnshredder(FileName.Processor processor) {
470                 this.processor = processor;
471             }         
472             
473             public TupleUnshredder(org.galagosearch.tupleflow.Processor<FileName> processor) {
474                 this.processor = processor;
475             }
476             
477             public FileName clone(FileName object) {
478                 FileName result = new FileName();
479                 if (object == null) return result;
480                 result.filename = object.filename; 
481                 return result;
482             }                 
483             
484             
485             public void processTuple(String filename) throws IOException {
486                 last.filename = filename;
487                 processor.process(clone(last));
488             }               
489             
490             public void close() throws IOException {
491                 processor.close();
492             }
493         }     
494         public static class TupleShredder implements Processor {
495             FileName last = new FileName();
496             public ShreddedProcessor processor;
497             
498             public TupleShredder(ShreddedProcessor processor) {
499                 this.processor = processor;
500             }                              
501             
502             public FileName clone(FileName object) {
503                 FileName result = new FileName();
504                 if (object == null) return result;
505                 result.filename = object.filename; 
506                 return result;
507             }                 
508             
509             public void process(FileName object) throws IOException {                                                                                                                                                   
510                 boolean processAll = false;
511                 processor.processTuple(object.filename);                                         
512             }
513                           
514             public Class<FileName> getInputClass() {
515                 return FileName.class;
516             }
517             
518             public void close() throws IOException {
519                 processor.close();
520             }                     
521         }
522     } 
523     public static class FilenameOrder implements Order<FileName> {
524         public int hash(FileName object) {
525             int h = 0;
526             h += Utility.hash(object.filename);
527             return h;
528         } 
529         public Comparator<FileName> greaterThan() {
530             return new Comparator<FileName>() {
531                 public int compare(FileName one, FileName two) {
532                     int result = 0;
533                     do {
534                         result = + Utility.compare(one.filename, two.filename);
535                         if(result != 0) break;
536                     } while (false);
537                     return -result;
538                 }
539             };
540         }     
541         public Comparator<FileName> lessThan() {
542             return new Comparator<FileName>() {
543                 public int compare(FileName one, FileName two) {
544                     int result = 0;
545                     do {
546                         result = + Utility.compare(one.filename, two.filename);
547                         if(result != 0) break;
548                     } while (false);
549                     return result;
550                 }
551             };
552         }     
553         public TypeReader<FileName> orderedReader(ArrayInput _input) {
554             return new ShreddedReader(_input);
555         }    
556 
557         public TypeReader<FileName> orderedReader(ArrayInput _input, int bufferSize) {
558             return new ShreddedReader(_input, bufferSize);
559         }    
560         public OrderedWriter<FileName> orderedWriter(ArrayOutput _output) {
561             ShreddedWriter w = new ShreddedWriter(_output);
562             return new OrderedWriterClass(w); 
563         }                                    
564         public static class OrderedWriterClass extends OrderedWriter< FileName > {
565             FileName last = null;
566             ShreddedWriter shreddedWriter = null; 
567             
568             public OrderedWriterClass(ShreddedWriter s) {
569                 this.shreddedWriter = s;
570             }
571             
572             public void process(FileName object) throws IOException {
573                boolean processAll = false;
574                if (processAll || last == null || 0 != Utility.compare(object.filename, last.filename)) { processAll = true; shreddedWriter.processFilename(object.filename); }
575                shreddedWriter.processTuple();
576                last = object;
577             }           
578                  
579             public void close() throws IOException {
580                 shreddedWriter.close();
581             }
582             
583             public Class<FileName> getInputClass() {
584                 return FileName.class;
585             }
586         } 
587         public ReaderSource<FileName> orderedCombiner(Collection<TypeReader<FileName>> readers, boolean closeOnExit) {
588             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
589             
590             for (TypeReader<FileName> reader : readers) {
591                 shreddedReaders.add((ShreddedReader)reader);
592             }
593             
594             return new ShreddedCombiner(shreddedReaders, closeOnExit);
595         }                  
596         public FileName clone(FileName object) {
597             FileName result = new FileName();
598             if (object == null) return result;
599             result.filename = object.filename; 
600             return result;
601         }                 
602         public Class<FileName> getOrderedClass() {
603             return FileName.class;
604         }                           
605         public String[] getOrderSpec() {
606             return new String[] {"+filename"};
607         }
608 
609         public static String getSpecString() {
610             return "+filename";
611         }
612                            
613         public interface ShreddedProcessor extends Step {
614             public void processFilename(String filename) throws IOException;
615             public void processTuple() throws IOException;
616             public void close() throws IOException;
617         }    
618         public interface ShreddedSource extends Step {
619         }                                              
620         
621         public static class ShreddedWriter implements ShreddedProcessor {
622             ArrayOutput output;
623             ShreddedBuffer buffer = new ShreddedBuffer();
624             String lastFilename;
625             boolean lastFlush = false;
626             
627             public ShreddedWriter(ArrayOutput output) {
628                 this.output = output;
629             }                        
630             
631             public void close() throws IOException {
632                 flush();
633             }
634             
635             public void processFilename(String filename) {
636                 lastFilename = filename;
637                 buffer.processFilename(filename);
638             }
639             public final void processTuple() throws IOException {
640                 if (lastFlush) {
641                     if(buffer.filenames.size() == 0) buffer.processFilename(lastFilename);
642                     lastFlush = false;
643                 }
644                 buffer.processTuple();
645                 if (buffer.isFull())
646                     flush();
647             }
648             public final void flushTuples(int pauseIndex) throws IOException {
649                 
650                 while (buffer.getReadIndex() < pauseIndex) {
651                            
652                     buffer.incrementTuple();
653                 }
654             }  
655             public final void flushFilename(int pauseIndex) throws IOException {
656                 while (buffer.getReadIndex() < pauseIndex) {
657                     int nextPause = buffer.getFilenameEndIndex();
658                     int count = nextPause - buffer.getReadIndex();
659                     
660                     output.writeString(buffer.getFilename());
661                     output.writeInt(count);
662                     buffer.incrementFilename();
663                       
664                     flushTuples(nextPause);
665                     assert nextPause == buffer.getReadIndex();
666                 }
667             }
668             public void flush() throws IOException { 
669                 flushFilename(buffer.getWriteIndex());
670                 buffer.reset(); 
671                 lastFlush = true;
672             }                           
673         }
674         public static class ShreddedBuffer {
675             ArrayList<String> filenames = new ArrayList();
676             ArrayList<Integer> filenameTupleIdx = new ArrayList();
677             int filenameReadIdx = 0;
678                             
679             int writeTupleIndex = 0;
680             int readTupleIndex = 0;
681             int batchSize;
682 
683             public ShreddedBuffer(int batchSize) {
684                 this.batchSize = batchSize;
685 
686             }                              
687 
688             public ShreddedBuffer() {    
689                 this(10000);
690             }                                                                                                                    
691             
692             public void processFilename(String filename) {
693                 filenames.add(filename);
694                 filenameTupleIdx.add(writeTupleIndex);
695             }                                      
696             public void processTuple() {
697                 assert filenames.size() > 0;
698                 writeTupleIndex++;
699             }
700             public void resetData() {
701                 filenames.clear();
702                 filenameTupleIdx.clear();
703                 writeTupleIndex = 0;
704             }                  
705                                  
706             public void resetRead() {
707                 readTupleIndex = 0;
708                 filenameReadIdx = 0;
709             } 
710 
711             public void reset() {
712                 resetData();
713                 resetRead();
714             } 
715             public boolean isFull() {
716                 return writeTupleIndex >= batchSize;
717             }
718 
719             public boolean isEmpty() {
720                 return writeTupleIndex == 0;
721             }                          
722 
723             public boolean isAtEnd() {
724                 return readTupleIndex >= writeTupleIndex;
725             }           
726             public void incrementFilename() {
727                 filenameReadIdx++;  
728             }                                                                                              
729 
730             public void autoIncrementFilename() {
731                 while (readTupleIndex >= getFilenameEndIndex() && readTupleIndex < writeTupleIndex)
732                     filenameReadIdx++;
733             }                 
734             public void incrementTuple() {
735                 readTupleIndex++;
736             }                    
737             public int getFilenameEndIndex() {
738                 if ((filenameReadIdx+1) >= filenameTupleIdx.size())
739                     return writeTupleIndex;
740                 return filenameTupleIdx.get(filenameReadIdx+1);
741             }
742             public int getReadIndex() {
743                 return readTupleIndex;
744             }   
745 
746             public int getWriteIndex() {
747                 return writeTupleIndex;
748             } 
749             public String getFilename() {
750                 assert readTupleIndex < writeTupleIndex;
751                 assert filenameReadIdx < filenames.size();
752                 
753                 return filenames.get(filenameReadIdx);
754             }
755 
756             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
757                 while (getReadIndex() < endIndex) {
758                    output.processTuple();
759                    incrementTuple();
760                 }
761             }                                                                           
762             public void copyUntilIndexFilename(int endIndex, ShreddedProcessor output) throws IOException {
763                 while (getReadIndex() < endIndex) {
764                     output.processFilename(getFilename());
765                     assert getFilenameEndIndex() <= endIndex;
766                     copyTuples(getFilenameEndIndex(), output);
767                     incrementFilename();
768                 }
769             }  
770             public void copyUntilFilename(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
771                 while (!isAtEnd()) {
772                     if (other != null) {   
773                         assert !other.isAtEnd();
774                         int c = + Utility.compare(getFilename(), other.getFilename());
775                     
776                         if (c > 0) {
777                             break;   
778                         }
779                         
780                         output.processFilename(getFilename());
781                                       
782                         copyTuples(getFilenameEndIndex(), output);
783                     } else {
784                         output.processFilename(getFilename());
785                         copyTuples(getFilenameEndIndex(), output);
786                     }
787                     incrementFilename();  
788                     
789                
790                 }
791             }
792             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
793                 copyUntilFilename(other, output);
794             }
795             
796         }                         
797         public static class ShreddedCombiner implements ReaderSource<FileName>, ShreddedSource {   
798             public ShreddedProcessor processor;
799             Collection<ShreddedReader> readers;       
800             boolean closeOnExit = false;
801             boolean uninitialized = true;
802             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
803             
804             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
805                 this.readers = readers;                                                       
806                 this.closeOnExit = closeOnExit;
807             }
808                                   
809             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
810                 if (processor instanceof ShreddedProcessor) {
811                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
812                 } else if (processor instanceof FileName.Processor) {
813                     this.processor = new DuplicateEliminator(new TupleUnshredder((FileName.Processor) processor));
814                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
815                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<FileName>) processor));
816                 } else {
817                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
818                 }
819             }                                
820             
821             public Class<FileName> getOutputClass() {
822                 return FileName.class;
823             }
824             
825             public void initialize() throws IOException {
826                 for (ShreddedReader reader : readers) {
827                     reader.fill();                                        
828                     
829                     if (!reader.getBuffer().isAtEnd())
830                         queue.add(reader);
831                 }   
832 
833                 uninitialized = false;
834             }
835 
836             public void run() throws IOException {
837                 initialize();
838                
839                 while (queue.size() > 0) {
840                     ShreddedReader top = queue.poll();
841                     ShreddedReader next = null;
842                     ShreddedBuffer nextBuffer = null; 
843                     
844                     assert !top.getBuffer().isAtEnd();
845                                                   
846                     if (queue.size() > 0) {
847                         next = queue.peek();
848                         nextBuffer = next.getBuffer();
849                         assert !nextBuffer.isAtEnd();
850                     }
851                     
852                     top.getBuffer().copyUntil(nextBuffer, processor);
853                     if (top.getBuffer().isAtEnd())
854                         top.fill();                 
855                         
856                     if (!top.getBuffer().isAtEnd())
857                         queue.add(top);
858                 }              
859                 
860                 if (closeOnExit)
861                     processor.close();
862             }
863 
864             public FileName read() throws IOException {
865                 if (uninitialized)
866                     initialize();
867 
868                 FileName result = null;
869 
870                 while (queue.size() > 0) {
871                     ShreddedReader top = queue.poll();
872                     result = top.read();
873 
874                     if (result != null) {
875                         if (top.getBuffer().isAtEnd())
876                             top.fill();
877 
878                         queue.offer(top);
879                         break;
880                     } 
881                 }
882 
883                 return result;
884             }
885         } 
886         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<FileName>, ShreddedSource {      
887             public ShreddedProcessor processor;
888             ShreddedBuffer buffer;
889             FileName last = new FileName();         
890             long updateFilenameCount = -1;
891             long tupleCount = 0;
892             long bufferStartCount = 0;  
893             ArrayInput input;
894             
895             public ShreddedReader(ArrayInput input) {
896                 this.input = input; 
897                 this.buffer = new ShreddedBuffer();
898             }                               
899             
900             public ShreddedReader(ArrayInput input, int bufferSize) { 
901                 this.input = input;
902                 this.buffer = new ShreddedBuffer(bufferSize);
903             }
904                  
905             public final int compareTo(ShreddedReader other) {
906                 ShreddedBuffer otherBuffer = other.getBuffer();
907                 
908                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
909                     return 0;                 
910                 } else if (buffer.isAtEnd()) {
911                     return -1;
912                 } else if (otherBuffer.isAtEnd()) {
913                     return 1;
914                 }
915                                    
916                 int result = 0;
917                 do {
918                     result = + Utility.compare(buffer.getFilename(), otherBuffer.getFilename());
919                     if(result != 0) break;
920                 } while (false);                                             
921                 
922                 return result;
923             }
924             
925             public final ShreddedBuffer getBuffer() {
926                 return buffer;
927             }                
928             
929             public final FileName read() throws IOException {
930                 if (buffer.isAtEnd()) {
931                     fill();             
932                 
933                     if (buffer.isAtEnd()) {
934                         return null;
935                     }
936                 }
937                       
938                 assert !buffer.isAtEnd();
939                 FileName result = new FileName();
940                 
941                 result.filename = buffer.getFilename();
942                 
943                 buffer.incrementTuple();
944                 buffer.autoIncrementFilename();
945                 
946                 return result;
947             }           
948             
949             public final void fill() throws IOException {
950                 try {   
951                     buffer.reset();
952                     
953                     if (tupleCount != 0) {
954                                                       
955                         if(updateFilenameCount - tupleCount > 0) {
956                             buffer.filenames.add(last.filename);
957                             buffer.filenameTupleIdx.add((int) (updateFilenameCount - tupleCount));
958                         }
959                         bufferStartCount = tupleCount;
960                     }
961                     
962                     while (!buffer.isFull()) {
963                         updateFilename();
964                         buffer.processTuple();
965                         tupleCount++;
966                     }
967                 } catch(EOFException e) {}
968             }
969 
970             public final void updateFilename() throws IOException {
971                 if (updateFilenameCount > tupleCount)
972                     return;
973                      
974                 last.filename = input.readString();
975                 updateFilenameCount = tupleCount + input.readInt();
976                                       
977                 buffer.processFilename(last.filename);
978             }
979 
980             public void run() throws IOException {
981                 while (true) {
982                     fill();
983                     
984                     if (buffer.isAtEnd())
985                         break;
986                     
987                     buffer.copyUntil(null, processor);
988                 }      
989                 processor.close();
990             }
991             
992             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
993                 if (processor instanceof ShreddedProcessor) {
994                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
995                 } else if (processor instanceof FileName.Processor) {
996                     this.processor = new DuplicateEliminator(new TupleUnshredder((FileName.Processor) processor));
997                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
998                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<FileName>) processor));
999                 } else {
1000                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1001                 }
1002             }                                
1003             
1004             public Class<FileName> getOutputClass() {
1005                 return FileName.class;
1006             }                
1007         }
1008         
1009         public static class DuplicateEliminator implements ShreddedProcessor {
1010             public ShreddedProcessor processor;
1011             FileName last = new FileName();
1012             boolean filenameProcess = true;
1013                                            
1014             public DuplicateEliminator() {}
1015             public DuplicateEliminator(ShreddedProcessor processor) {
1016                 this.processor = processor;
1017             }
1018             
1019             public void setShreddedProcessor(ShreddedProcessor processor) {
1020                 this.processor = processor;
1021             }
1022 
1023             public void processFilename(String filename) throws IOException {  
1024                 if (filenameProcess || Utility.compare(filename, last.filename) != 0) {
1025                     last.filename = filename;
1026                     processor.processFilename(filename);
1027                     filenameProcess = false;
1028                 }
1029             }  
1030             
1031             public void resetFilename() {
1032                  filenameProcess = true;
1033             }                                                
1034                                
1035             public void processTuple() throws IOException {
1036                 processor.processTuple();
1037             } 
1038             
1039             public void close() throws IOException {
1040                 processor.close();
1041             }                    
1042         }
1043         public static class TupleUnshredder implements ShreddedProcessor {
1044             FileName last = new FileName();
1045             public org.galagosearch.tupleflow.Processor<FileName> processor;                               
1046             
1047             public TupleUnshredder(FileName.Processor processor) {
1048                 this.processor = processor;
1049             }         
1050             
1051             public TupleUnshredder(org.galagosearch.tupleflow.Processor<FileName> processor) {
1052                 this.processor = processor;
1053             }
1054             
1055             public FileName clone(FileName object) {
1056                 FileName result = new FileName();
1057                 if (object == null) return result;
1058                 result.filename = object.filename; 
1059                 return result;
1060             }                 
1061             
1062             public void processFilename(String filename) throws IOException {
1063                 last.filename = filename;
1064             }   
1065                 
1066             
1067             public void processTuple() throws IOException {
1068                 processor.process(clone(last));
1069             }               
1070             
1071             public void close() throws IOException {
1072                 processor.close();
1073             }
1074         }     
1075         public static class TupleShredder implements Processor {
1076             FileName last = new FileName();
1077             public ShreddedProcessor processor;
1078             
1079             public TupleShredder(ShreddedProcessor processor) {
1080                 this.processor = processor;
1081             }                              
1082             
1083             public FileName clone(FileName object) {
1084                 FileName result = new FileName();
1085                 if (object == null) return result;
1086                 result.filename = object.filename; 
1087                 return result;
1088             }                 
1089             
1090             public void process(FileName object) throws IOException {                                                                                                                                                   
1091                 boolean processAll = false;
1092                 if(last == null || Utility.compare(last.filename, object.filename) != 0 || processAll) { processor.processFilename(object.filename); processAll = true; }
1093                 processor.processTuple();                                         
1094             }
1095                           
1096             public Class<FileName> getInputClass() {
1097                 return FileName.class;
1098             }
1099             
1100             public void close() throws IOException {
1101                 processor.close();
1102             }                     
1103         }
1104     } 
1105 }