View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class DocumentSplit implements Type<DocumentSplit> {
25      public String fileName;
26      public String fileType;
27      public boolean isCompressed;
28      public byte[] startKey;
29      public byte[] endKey; 
30      
31      public DocumentSplit() {}
32      public DocumentSplit(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) {
33          this.fileName = fileName;
34          this.fileType = fileType;
35          this.isCompressed = isCompressed;
36          this.startKey = startKey;
37          this.endKey = endKey;
38      }  
39      
40      public String toString() {
41          try {
42              return String.format("%s,%s,%b,%s,%s",
43                                     fileName, fileType, isCompressed, new String(startKey, "UTF-8"), new String(endKey, "UTF-8"));
44          } catch(UnsupportedEncodingException e) {
45              throw new RuntimeException("Couldn't convert string to UTF-8.");
46          }
47      } 
48  
49      public Order<DocumentSplit> getOrder(String... spec) {
50          if (Arrays.equals(spec, new String[] {  })) {
51              return new Unordered();
52          }
53          if (Arrays.equals(spec, new String[] { "+fileName", "+startKey" })) {
54              return new FileNameStartKeyOrder();
55          }
56          return null;
57      } 
58        
59      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentSplit> {
60          public void process(DocumentSplit object) throws IOException;
61          public void close() throws IOException;
62      }                        
63      public interface Source extends Step {
64      }
65      public static class Unordered implements Order<DocumentSplit> {
66          public int hash(DocumentSplit object) {
67              int h = 0;
68              return h;
69          } 
70          public Comparator<DocumentSplit> greaterThan() {
71              return new Comparator<DocumentSplit>() {
72                  public int compare(DocumentSplit one, DocumentSplit two) {
73                      int result = 0;
74                      do {
75                      } while (false);
76                      return -result;
77                  }
78              };
79          }     
80          public Comparator<DocumentSplit> lessThan() {
81              return new Comparator<DocumentSplit>() {
82                  public int compare(DocumentSplit one, DocumentSplit two) {
83                      int result = 0;
84                      do {
85                      } while (false);
86                      return result;
87                  }
88              };
89          }     
90          public TypeReader<DocumentSplit> orderedReader(ArrayInput _input) {
91              return new ShreddedReader(_input);
92          }    
93  
94          public TypeReader<DocumentSplit> orderedReader(ArrayInput _input, int bufferSize) {
95              return new ShreddedReader(_input, bufferSize);
96          }    
97          public OrderedWriter<DocumentSplit> orderedWriter(ArrayOutput _output) {
98              ShreddedWriter w = new ShreddedWriter(_output);
99              return new OrderedWriterClass(w); 
100         }                                    
101         public static class OrderedWriterClass extends OrderedWriter< DocumentSplit > {
102             DocumentSplit last = null;
103             ShreddedWriter shreddedWriter = null; 
104             
105             public OrderedWriterClass(ShreddedWriter s) {
106                 this.shreddedWriter = s;
107             }
108             
109             public void process(DocumentSplit object) throws IOException {
110                boolean processAll = false;
111                shreddedWriter.processTuple(object.fileName, object.fileType, object.isCompressed, object.startKey, object.endKey);
112                last = object;
113             }           
114                  
115             public void close() throws IOException {
116                 shreddedWriter.close();
117             }
118             
119             public Class<DocumentSplit> getInputClass() {
120                 return DocumentSplit.class;
121             }
122         } 
123         public ReaderSource<DocumentSplit> orderedCombiner(Collection<TypeReader<DocumentSplit>> readers, boolean closeOnExit) {
124             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
125             
126             for (TypeReader<DocumentSplit> reader : readers) {
127                 shreddedReaders.add((ShreddedReader)reader);
128             }
129             
130             return new ShreddedCombiner(shreddedReaders, closeOnExit);
131         }                  
132         public DocumentSplit clone(DocumentSplit object) {
133             DocumentSplit result = new DocumentSplit();
134             if (object == null) return result;
135             result.fileName = object.fileName; 
136             result.fileType = object.fileType; 
137             result.isCompressed = object.isCompressed; 
138             result.startKey = object.startKey; 
139             result.endKey = object.endKey; 
140             return result;
141         }                 
142         public Class<DocumentSplit> getOrderedClass() {
143             return DocumentSplit.class;
144         }                           
145         public String[] getOrderSpec() {
146             return new String[] {};
147         }
148 
149         public static String getSpecString() {
150             return "";
151         }
152                            
153         public interface ShreddedProcessor extends Step {
154             public void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) throws IOException;
155             public void close() throws IOException;
156         }    
157         public interface ShreddedSource extends Step {
158         }                                              
159         
160         public static class ShreddedWriter implements ShreddedProcessor {
161             ArrayOutput output;
162             ShreddedBuffer buffer = new ShreddedBuffer();
163             boolean lastFlush = false;
164             
165             public ShreddedWriter(ArrayOutput output) {
166                 this.output = output;
167             }                        
168             
169             public void close() throws IOException {
170                 flush();
171             }
172             
173             public final void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) throws IOException {
174                 if (lastFlush) {
175                     lastFlush = false;
176                 }
177                 buffer.processTuple(fileName, fileType, isCompressed, startKey, endKey);
178                 if (buffer.isFull())
179                     flush();
180             }
181             public final void flushTuples(int pauseIndex) throws IOException {
182                 
183                 while (buffer.getReadIndex() < pauseIndex) {
184                            
185                     output.writeString(buffer.getFileName());
186                     output.writeString(buffer.getFileType());
187                     output.writeBoolean(buffer.getIsCompressed());
188                     output.writeBytes(buffer.getStartKey());
189                     output.writeBytes(buffer.getEndKey());
190                     buffer.incrementTuple();
191                 }
192             }  
193             public void flush() throws IOException { 
194                 flushTuples(buffer.getWriteIndex());
195                 buffer.reset(); 
196                 lastFlush = true;
197             }                           
198         }
199         public static class ShreddedBuffer {
200                             
201             String[] fileNames;
202             String[] fileTypes;
203             boolean[] isCompresseds;
204             byte[][] startKeys;
205             byte[][] endKeys;
206             int writeTupleIndex = 0;
207             int readTupleIndex = 0;
208             int batchSize;
209 
210             public ShreddedBuffer(int batchSize) {
211                 this.batchSize = batchSize;
212 
213                 fileNames = new String[batchSize];
214                 fileTypes = new String[batchSize];
215                 isCompresseds = new boolean[batchSize];
216                 startKeys = new byte[batchSize][];
217                 endKeys = new byte[batchSize][];
218             }                              
219 
220             public ShreddedBuffer() {    
221                 this(10000);
222             }                                                                                                                    
223             
224             public void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) {
225                 fileNames[writeTupleIndex] = fileName;
226                 fileTypes[writeTupleIndex] = fileType;
227                 isCompresseds[writeTupleIndex] = isCompressed;
228                 startKeys[writeTupleIndex] = startKey;
229                 endKeys[writeTupleIndex] = endKey;
230                 writeTupleIndex++;
231             }
232             public void resetData() {
233                 writeTupleIndex = 0;
234             }                  
235                                  
236             public void resetRead() {
237                 readTupleIndex = 0;
238             } 
239 
240             public void reset() {
241                 resetData();
242                 resetRead();
243             } 
244             public boolean isFull() {
245                 return writeTupleIndex >= batchSize;
246             }
247 
248             public boolean isEmpty() {
249                 return writeTupleIndex == 0;
250             }                          
251 
252             public boolean isAtEnd() {
253                 return readTupleIndex >= writeTupleIndex;
254             }           
255             public void incrementTuple() {
256                 readTupleIndex++;
257             }                    
258             public int getReadIndex() {
259                 return readTupleIndex;
260             }   
261 
262             public int getWriteIndex() {
263                 return writeTupleIndex;
264             } 
265             public String getFileName() {
266                 assert readTupleIndex < writeTupleIndex;
267                 return fileNames[readTupleIndex];
268             }                                         
269             public String getFileType() {
270                 assert readTupleIndex < writeTupleIndex;
271                 return fileTypes[readTupleIndex];
272             }                                         
273             public boolean getIsCompressed() {
274                 assert readTupleIndex < writeTupleIndex;
275                 return isCompresseds[readTupleIndex];
276             }                                         
277             public byte[] getStartKey() {
278                 assert readTupleIndex < writeTupleIndex;
279                 return startKeys[readTupleIndex];
280             }                                         
281             public byte[] getEndKey() {
282                 assert readTupleIndex < writeTupleIndex;
283                 return endKeys[readTupleIndex];
284             }                                         
285             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
286                 while (getReadIndex() < endIndex) {
287                    output.processTuple(getFileName(), getFileType(), getIsCompressed(), getStartKey(), getEndKey());
288                    incrementTuple();
289                 }
290             }                                                                           
291              
292             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
293             }
294             
295         }                         
296         public static class ShreddedCombiner implements ReaderSource<DocumentSplit>, ShreddedSource {   
297             public ShreddedProcessor processor;
298             Collection<ShreddedReader> readers;       
299             boolean closeOnExit = false;
300             boolean uninitialized = true;
301             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
302             
303             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
304                 this.readers = readers;                                                       
305                 this.closeOnExit = closeOnExit;
306             }
307                                   
308             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
309                 if (processor instanceof ShreddedProcessor) {
310                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
311                 } else if (processor instanceof DocumentSplit.Processor) {
312                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentSplit.Processor) processor));
313                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
314                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentSplit>) processor));
315                 } else {
316                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
317                 }
318             }                                
319             
320             public Class<DocumentSplit> getOutputClass() {
321                 return DocumentSplit.class;
322             }
323             
324             public void initialize() throws IOException {
325                 for (ShreddedReader reader : readers) {
326                     reader.fill();                                        
327                     
328                     if (!reader.getBuffer().isAtEnd())
329                         queue.add(reader);
330                 }   
331 
332                 uninitialized = false;
333             }
334 
335             public void run() throws IOException {
336                 initialize();
337                
338                 while (queue.size() > 0) {
339                     ShreddedReader top = queue.poll();
340                     ShreddedReader next = null;
341                     ShreddedBuffer nextBuffer = null; 
342                     
343                     assert !top.getBuffer().isAtEnd();
344                                                   
345                     if (queue.size() > 0) {
346                         next = queue.peek();
347                         nextBuffer = next.getBuffer();
348                         assert !nextBuffer.isAtEnd();
349                     }
350                     
351                     top.getBuffer().copyUntil(nextBuffer, processor);
352                     if (top.getBuffer().isAtEnd())
353                         top.fill();                 
354                         
355                     if (!top.getBuffer().isAtEnd())
356                         queue.add(top);
357                 }              
358                 
359                 if (closeOnExit)
360                     processor.close();
361             }
362 
363             public DocumentSplit read() throws IOException {
364                 if (uninitialized)
365                     initialize();
366 
367                 DocumentSplit result = null;
368 
369                 while (queue.size() > 0) {
370                     ShreddedReader top = queue.poll();
371                     result = top.read();
372 
373                     if (result != null) {
374                         if (top.getBuffer().isAtEnd())
375                             top.fill();
376 
377                         queue.offer(top);
378                         break;
379                     } 
380                 }
381 
382                 return result;
383             }
384         } 
385         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentSplit>, ShreddedSource {      
386             public ShreddedProcessor processor;
387             ShreddedBuffer buffer;
388             DocumentSplit last = new DocumentSplit();         
389             long tupleCount = 0;
390             long bufferStartCount = 0;  
391             ArrayInput input;
392             
393             public ShreddedReader(ArrayInput input) {
394                 this.input = input; 
395                 this.buffer = new ShreddedBuffer();
396             }                               
397             
398             public ShreddedReader(ArrayInput input, int bufferSize) { 
399                 this.input = input;
400                 this.buffer = new ShreddedBuffer(bufferSize);
401             }
402                  
403             public final int compareTo(ShreddedReader other) {
404                 ShreddedBuffer otherBuffer = other.getBuffer();
405                 
406                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
407                     return 0;                 
408                 } else if (buffer.isAtEnd()) {
409                     return -1;
410                 } else if (otherBuffer.isAtEnd()) {
411                     return 1;
412                 }
413                                    
414                 int result = 0;
415                 do {
416                 } while (false);                                             
417                 
418                 return result;
419             }
420             
421             public final ShreddedBuffer getBuffer() {
422                 return buffer;
423             }                
424             
425             public final DocumentSplit read() throws IOException {
426                 if (buffer.isAtEnd()) {
427                     fill();             
428                 
429                     if (buffer.isAtEnd()) {
430                         return null;
431                     }
432                 }
433                       
434                 assert !buffer.isAtEnd();
435                 DocumentSplit result = new DocumentSplit();
436                 
437                 result.fileName = buffer.getFileName();
438                 result.fileType = buffer.getFileType();
439                 result.isCompressed = buffer.getIsCompressed();
440                 result.startKey = buffer.getStartKey();
441                 result.endKey = buffer.getEndKey();
442                 
443                 buffer.incrementTuple();
444                 
445                 return result;
446             }           
447             
448             public final void fill() throws IOException {
449                 try {   
450                     buffer.reset();
451                     
452                     if (tupleCount != 0) {
453                         bufferStartCount = tupleCount;
454                     }
455                     
456                     while (!buffer.isFull()) {
457                         buffer.processTuple(input.readString(), input.readString(), input.readBoolean(), input.readBytes(), input.readBytes());
458                         tupleCount++;
459                     }
460                 } catch(EOFException e) {}
461             }
462 
463 
464             public void run() throws IOException {
465                 while (true) {
466                     fill();
467                     
468                     if (buffer.isAtEnd())
469                         break;
470                     
471                     buffer.copyUntil(null, processor);
472                 }      
473                 processor.close();
474             }
475             
476             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
477                 if (processor instanceof ShreddedProcessor) {
478                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
479                 } else if (processor instanceof DocumentSplit.Processor) {
480                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentSplit.Processor) processor));
481                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
482                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentSplit>) processor));
483                 } else {
484                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
485                 }
486             }                                
487             
488             public Class<DocumentSplit> getOutputClass() {
489                 return DocumentSplit.class;
490             }                
491         }
492         
493         public static class DuplicateEliminator implements ShreddedProcessor {
494             public ShreddedProcessor processor;
495             DocumentSplit last = new DocumentSplit();
496                                            
497             public DuplicateEliminator() {}
498             public DuplicateEliminator(ShreddedProcessor processor) {
499                 this.processor = processor;
500             }
501             
502             public void setShreddedProcessor(ShreddedProcessor processor) {
503                 this.processor = processor;
504             }
505 
506           
507             
508                                
509             public void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) throws IOException {
510                 processor.processTuple(fileName, fileType, isCompressed, startKey, endKey);
511             } 
512             
513             public void close() throws IOException {
514                 processor.close();
515             }                    
516         }
517         public static class TupleUnshredder implements ShreddedProcessor {
518             DocumentSplit last = new DocumentSplit();
519             public org.galagosearch.tupleflow.Processor<DocumentSplit> processor;                               
520             
521             public TupleUnshredder(DocumentSplit.Processor processor) {
522                 this.processor = processor;
523             }         
524             
525             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentSplit> processor) {
526                 this.processor = processor;
527             }
528             
529             public DocumentSplit clone(DocumentSplit object) {
530                 DocumentSplit result = new DocumentSplit();
531                 if (object == null) return result;
532                 result.fileName = object.fileName; 
533                 result.fileType = object.fileType; 
534                 result.isCompressed = object.isCompressed; 
535                 result.startKey = object.startKey; 
536                 result.endKey = object.endKey; 
537                 return result;
538             }                 
539             
540             
541             public void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) throws IOException {
542                 last.fileName = fileName;
543                 last.fileType = fileType;
544                 last.isCompressed = isCompressed;
545                 last.startKey = startKey;
546                 last.endKey = endKey;
547                 processor.process(clone(last));
548             }               
549             
550             public void close() throws IOException {
551                 processor.close();
552             }
553         }     
554         public static class TupleShredder implements Processor {
555             DocumentSplit last = new DocumentSplit();
556             public ShreddedProcessor processor;
557             
558             public TupleShredder(ShreddedProcessor processor) {
559                 this.processor = processor;
560             }                              
561             
562             public DocumentSplit clone(DocumentSplit object) {
563                 DocumentSplit result = new DocumentSplit();
564                 if (object == null) return result;
565                 result.fileName = object.fileName; 
566                 result.fileType = object.fileType; 
567                 result.isCompressed = object.isCompressed; 
568                 result.startKey = object.startKey; 
569                 result.endKey = object.endKey; 
570                 return result;
571             }                 
572             
573             public void process(DocumentSplit object) throws IOException {                                                                                                                                                   
574                 boolean processAll = false;
575                 processor.processTuple(object.fileName, object.fileType, object.isCompressed, object.startKey, object.endKey);                                         
576             }
577                           
578             public Class<DocumentSplit> getInputClass() {
579                 return DocumentSplit.class;
580             }
581             
582             public void close() throws IOException {
583                 processor.close();
584             }                     
585         }
586     } 
587     public static class FileNameStartKeyOrder implements Order<DocumentSplit> {
588         public int hash(DocumentSplit object) {
589             int h = 0;
590             h += Utility.hash(object.fileName);
591             h += Utility.hash(object.startKey);
592             return h;
593         } 
594         public Comparator<DocumentSplit> greaterThan() {
595             return new Comparator<DocumentSplit>() {
596                 public int compare(DocumentSplit one, DocumentSplit two) {
597                     int result = 0;
598                     do {
599                         result = + Utility.compare(one.fileName, two.fileName);
600                         if(result != 0) break;
601                         result = + Utility.compare(one.startKey, two.startKey);
602                         if(result != 0) break;
603                     } while (false);
604                     return -result;
605                 }
606             };
607         }     
608         public Comparator<DocumentSplit> lessThan() {
609             return new Comparator<DocumentSplit>() {
610                 public int compare(DocumentSplit one, DocumentSplit two) {
611                     int result = 0;
612                     do {
613                         result = + Utility.compare(one.fileName, two.fileName);
614                         if(result != 0) break;
615                         result = + Utility.compare(one.startKey, two.startKey);
616                         if(result != 0) break;
617                     } while (false);
618                     return result;
619                 }
620             };
621         }     
622         public TypeReader<DocumentSplit> orderedReader(ArrayInput _input) {
623             return new ShreddedReader(_input);
624         }    
625 
626         public TypeReader<DocumentSplit> orderedReader(ArrayInput _input, int bufferSize) {
627             return new ShreddedReader(_input, bufferSize);
628         }    
629         public OrderedWriter<DocumentSplit> orderedWriter(ArrayOutput _output) {
630             ShreddedWriter w = new ShreddedWriter(_output);
631             return new OrderedWriterClass(w); 
632         }                                    
633         public static class OrderedWriterClass extends OrderedWriter< DocumentSplit > {
634             DocumentSplit last = null;
635             ShreddedWriter shreddedWriter = null; 
636             
637             public OrderedWriterClass(ShreddedWriter s) {
638                 this.shreddedWriter = s;
639             }
640             
641             public void process(DocumentSplit object) throws IOException {
642                boolean processAll = false;
643                if (processAll || last == null || 0 != Utility.compare(object.fileName, last.fileName)) { processAll = true; shreddedWriter.processFileName(object.fileName); }
644                if (processAll || last == null || 0 != Utility.compare(object.startKey, last.startKey)) { processAll = true; shreddedWriter.processStartKey(object.startKey); }
645                shreddedWriter.processTuple(object.fileType, object.isCompressed, object.endKey);
646                last = object;
647             }           
648                  
649             public void close() throws IOException {
650                 shreddedWriter.close();
651             }
652             
653             public Class<DocumentSplit> getInputClass() {
654                 return DocumentSplit.class;
655             }
656         } 
657         public ReaderSource<DocumentSplit> orderedCombiner(Collection<TypeReader<DocumentSplit>> readers, boolean closeOnExit) {
658             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
659             
660             for (TypeReader<DocumentSplit> reader : readers) {
661                 shreddedReaders.add((ShreddedReader)reader);
662             }
663             
664             return new ShreddedCombiner(shreddedReaders, closeOnExit);
665         }                  
666         public DocumentSplit clone(DocumentSplit object) {
667             DocumentSplit result = new DocumentSplit();
668             if (object == null) return result;
669             result.fileName = object.fileName; 
670             result.fileType = object.fileType; 
671             result.isCompressed = object.isCompressed; 
672             result.startKey = object.startKey; 
673             result.endKey = object.endKey; 
674             return result;
675         }                 
676         public Class<DocumentSplit> getOrderedClass() {
677             return DocumentSplit.class;
678         }                           
679         public String[] getOrderSpec() {
680             return new String[] {"+fileName", "+startKey"};
681         }
682 
683         public static String getSpecString() {
684             return "+fileName +startKey";
685         }
686                            
687         public interface ShreddedProcessor extends Step {
688             public void processFileName(String fileName) throws IOException;
689             public void processStartKey(byte[] startKey) throws IOException;
690             public void processTuple(String fileType, boolean isCompressed, byte[] endKey) throws IOException;
691             public void close() throws IOException;
692         }    
693         public interface ShreddedSource extends Step {
694         }                                              
695         
696         public static class ShreddedWriter implements ShreddedProcessor {
697             ArrayOutput output;
698             ShreddedBuffer buffer = new ShreddedBuffer();
699             String lastFileName;
700             byte[] lastStartKey;
701             boolean lastFlush = false;
702             
703             public ShreddedWriter(ArrayOutput output) {
704                 this.output = output;
705             }                        
706             
707             public void close() throws IOException {
708                 flush();
709             }
710             
711             public void processFileName(String fileName) {
712                 lastFileName = fileName;
713                 buffer.processFileName(fileName);
714             }
715             public void processStartKey(byte[] startKey) {
716                 lastStartKey = startKey;
717                 buffer.processStartKey(startKey);
718             }
719             public final void processTuple(String fileType, boolean isCompressed, byte[] endKey) throws IOException {
720                 if (lastFlush) {
721                     if(buffer.fileNames.size() == 0) buffer.processFileName(lastFileName);
722                     if(buffer.startKeys.size() == 0) buffer.processStartKey(lastStartKey);
723                     lastFlush = false;
724                 }
725                 buffer.processTuple(fileType, isCompressed, endKey);
726                 if (buffer.isFull())
727                     flush();
728             }
729             public final void flushTuples(int pauseIndex) throws IOException {
730                 
731                 while (buffer.getReadIndex() < pauseIndex) {
732                            
733                     output.writeString(buffer.getFileType());
734                     output.writeBoolean(buffer.getIsCompressed());
735                     output.writeBytes(buffer.getEndKey());
736                     buffer.incrementTuple();
737                 }
738             }  
739             public final void flushFileName(int pauseIndex) throws IOException {
740                 while (buffer.getReadIndex() < pauseIndex) {
741                     int nextPause = buffer.getFileNameEndIndex();
742                     int count = nextPause - buffer.getReadIndex();
743                     
744                     output.writeString(buffer.getFileName());
745                     output.writeInt(count);
746                     buffer.incrementFileName();
747                       
748                     flushStartKey(nextPause);
749                     assert nextPause == buffer.getReadIndex();
750                 }
751             }
752             public final void flushStartKey(int pauseIndex) throws IOException {
753                 while (buffer.getReadIndex() < pauseIndex) {
754                     int nextPause = buffer.getStartKeyEndIndex();
755                     int count = nextPause - buffer.getReadIndex();
756                     
757                     output.writeBytes(buffer.getStartKey());
758                     output.writeInt(count);
759                     buffer.incrementStartKey();
760                       
761                     flushTuples(nextPause);
762                     assert nextPause == buffer.getReadIndex();
763                 }
764             }
765             public void flush() throws IOException { 
766                 flushFileName(buffer.getWriteIndex());
767                 buffer.reset(); 
768                 lastFlush = true;
769             }                           
770         }
771         public static class ShreddedBuffer {
772             ArrayList<String> fileNames = new ArrayList();
773             ArrayList<byte[]> startKeys = new ArrayList();
774             ArrayList<Integer> fileNameTupleIdx = new ArrayList();
775             ArrayList<Integer> startKeyTupleIdx = new ArrayList();
776             int fileNameReadIdx = 0;
777             int startKeyReadIdx = 0;
778                             
779             String[] fileTypes;
780             boolean[] isCompresseds;
781             byte[][] endKeys;
782             int writeTupleIndex = 0;
783             int readTupleIndex = 0;
784             int batchSize;
785 
786             public ShreddedBuffer(int batchSize) {
787                 this.batchSize = batchSize;
788 
789                 fileTypes = new String[batchSize];
790                 isCompresseds = new boolean[batchSize];
791                 endKeys = new byte[batchSize][];
792             }                              
793 
794             public ShreddedBuffer() {    
795                 this(10000);
796             }                                                                                                                    
797             
798             public void processFileName(String fileName) {
799                 fileNames.add(fileName);
800                 fileNameTupleIdx.add(writeTupleIndex);
801             }                                      
802             public void processStartKey(byte[] startKey) {
803                 startKeys.add(startKey);
804                 startKeyTupleIdx.add(writeTupleIndex);
805             }                                      
806             public void processTuple(String fileType, boolean isCompressed, byte[] endKey) {
807                 assert fileNames.size() > 0;
808                 assert startKeys.size() > 0;
809                 fileTypes[writeTupleIndex] = fileType;
810                 isCompresseds[writeTupleIndex] = isCompressed;
811                 endKeys[writeTupleIndex] = endKey;
812                 writeTupleIndex++;
813             }
814             public void resetData() {
815                 fileNames.clear();
816                 startKeys.clear();
817                 fileNameTupleIdx.clear();
818                 startKeyTupleIdx.clear();
819                 writeTupleIndex = 0;
820             }                  
821                                  
822             public void resetRead() {
823                 readTupleIndex = 0;
824                 fileNameReadIdx = 0;
825                 startKeyReadIdx = 0;
826             } 
827 
828             public void reset() {
829                 resetData();
830                 resetRead();
831             } 
832             public boolean isFull() {
833                 return writeTupleIndex >= batchSize;
834             }
835 
836             public boolean isEmpty() {
837                 return writeTupleIndex == 0;
838             }                          
839 
840             public boolean isAtEnd() {
841                 return readTupleIndex >= writeTupleIndex;
842             }           
843             public void incrementFileName() {
844                 fileNameReadIdx++;  
845             }                                                                                              
846 
847             public void autoIncrementFileName() {
848                 while (readTupleIndex >= getFileNameEndIndex() && readTupleIndex < writeTupleIndex)
849                     fileNameReadIdx++;
850             }                 
851             public void incrementStartKey() {
852                 startKeyReadIdx++;  
853             }                                                                                              
854 
855             public void autoIncrementStartKey() {
856                 while (readTupleIndex >= getStartKeyEndIndex() && readTupleIndex < writeTupleIndex)
857                     startKeyReadIdx++;
858             }                 
859             public void incrementTuple() {
860                 readTupleIndex++;
861             }                    
862             public int getFileNameEndIndex() {
863                 if ((fileNameReadIdx+1) >= fileNameTupleIdx.size())
864                     return writeTupleIndex;
865                 return fileNameTupleIdx.get(fileNameReadIdx+1);
866             }
867 
868             public int getStartKeyEndIndex() {
869                 if ((startKeyReadIdx+1) >= startKeyTupleIdx.size())
870                     return writeTupleIndex;
871                 return startKeyTupleIdx.get(startKeyReadIdx+1);
872             }
873             public int getReadIndex() {
874                 return readTupleIndex;
875             }   
876 
877             public int getWriteIndex() {
878                 return writeTupleIndex;
879             } 
880             public String getFileName() {
881                 assert readTupleIndex < writeTupleIndex;
882                 assert fileNameReadIdx < fileNames.size();
883                 
884                 return fileNames.get(fileNameReadIdx);
885             }
886             public byte[] getStartKey() {
887                 assert readTupleIndex < writeTupleIndex;
888                 assert startKeyReadIdx < startKeys.size();
889                 
890                 return startKeys.get(startKeyReadIdx);
891             }
892             public String getFileType() {
893                 assert readTupleIndex < writeTupleIndex;
894                 return fileTypes[readTupleIndex];
895             }                                         
896             public boolean getIsCompressed() {
897                 assert readTupleIndex < writeTupleIndex;
898                 return isCompresseds[readTupleIndex];
899             }                                         
900             public byte[] getEndKey() {
901                 assert readTupleIndex < writeTupleIndex;
902                 return endKeys[readTupleIndex];
903             }                                         
904             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
905                 while (getReadIndex() < endIndex) {
906                    output.processTuple(getFileType(), getIsCompressed(), getEndKey());
907                    incrementTuple();
908                 }
909             }                                                                           
910             public void copyUntilIndexFileName(int endIndex, ShreddedProcessor output) throws IOException {
911                 while (getReadIndex() < endIndex) {
912                     output.processFileName(getFileName());
913                     assert getFileNameEndIndex() <= endIndex;
914                     copyUntilIndexStartKey(getFileNameEndIndex(), output);
915                     incrementFileName();
916                 }
917             } 
918             public void copyUntilIndexStartKey(int endIndex, ShreddedProcessor output) throws IOException {
919                 while (getReadIndex() < endIndex) {
920                     output.processStartKey(getStartKey());
921                     assert getStartKeyEndIndex() <= endIndex;
922                     copyTuples(getStartKeyEndIndex(), output);
923                     incrementStartKey();
924                 }
925             }  
926             public void copyUntilFileName(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
927                 while (!isAtEnd()) {
928                     if (other != null) {   
929                         assert !other.isAtEnd();
930                         int c = + Utility.compare(getFileName(), other.getFileName());
931                     
932                         if (c > 0) {
933                             break;   
934                         }
935                         
936                         output.processFileName(getFileName());
937                                       
938                         if (c < 0) {
939                             copyUntilIndexStartKey(getFileNameEndIndex(), output);
940                         } else if (c == 0) {
941                             copyUntilStartKey(other, output);
942                             autoIncrementFileName();
943                             break;
944                         }
945                     } else {
946                         output.processFileName(getFileName());
947                         copyUntilIndexStartKey(getFileNameEndIndex(), output);
948                     }
949                     incrementFileName();  
950                     
951                
952                 }
953             }
954             public void copyUntilStartKey(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
955                 while (!isAtEnd()) {
956                     if (other != null) {   
957                         assert !other.isAtEnd();
958                         int c = + Utility.compare(getStartKey(), other.getStartKey());
959                     
960                         if (c > 0) {
961                             break;   
962                         }
963                         
964                         output.processStartKey(getStartKey());
965                                       
966                         copyTuples(getStartKeyEndIndex(), output);
967                     } else {
968                         output.processStartKey(getStartKey());
969                         copyTuples(getStartKeyEndIndex(), output);
970                     }
971                     incrementStartKey();  
972                     
973                     if (getFileNameEndIndex() <= readTupleIndex)
974                         break;   
975                 }
976             }
977             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
978                 copyUntilFileName(other, output);
979             }
980             
981         }                         
982         public static class ShreddedCombiner implements ReaderSource<DocumentSplit>, ShreddedSource {   
983             public ShreddedProcessor processor;
984             Collection<ShreddedReader> readers;       
985             boolean closeOnExit = false;
986             boolean uninitialized = true;
987             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
988             
989             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
990                 this.readers = readers;                                                       
991                 this.closeOnExit = closeOnExit;
992             }
993                                   
994             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
995                 if (processor instanceof ShreddedProcessor) {
996                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
997                 } else if (processor instanceof DocumentSplit.Processor) {
998                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentSplit.Processor) processor));
999                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1000                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentSplit>) processor));
1001                 } else {
1002                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1003                 }
1004             }                                
1005             
1006             public Class<DocumentSplit> getOutputClass() {
1007                 return DocumentSplit.class;
1008             }
1009             
1010             public void initialize() throws IOException {
1011                 for (ShreddedReader reader : readers) {
1012                     reader.fill();                                        
1013                     
1014                     if (!reader.getBuffer().isAtEnd())
1015                         queue.add(reader);
1016                 }   
1017 
1018                 uninitialized = false;
1019             }
1020 
1021             public void run() throws IOException {
1022                 initialize();
1023                
1024                 while (queue.size() > 0) {
1025                     ShreddedReader top = queue.poll();
1026                     ShreddedReader next = null;
1027                     ShreddedBuffer nextBuffer = null; 
1028                     
1029                     assert !top.getBuffer().isAtEnd();
1030                                                   
1031                     if (queue.size() > 0) {
1032                         next = queue.peek();
1033                         nextBuffer = next.getBuffer();
1034                         assert !nextBuffer.isAtEnd();
1035                     }
1036                     
1037                     top.getBuffer().copyUntil(nextBuffer, processor);
1038                     if (top.getBuffer().isAtEnd())
1039                         top.fill();                 
1040                         
1041                     if (!top.getBuffer().isAtEnd())
1042                         queue.add(top);
1043                 }              
1044                 
1045                 if (closeOnExit)
1046                     processor.close();
1047             }
1048 
1049             public DocumentSplit read() throws IOException {
1050                 if (uninitialized)
1051                     initialize();
1052 
1053                 DocumentSplit result = null;
1054 
1055                 while (queue.size() > 0) {
1056                     ShreddedReader top = queue.poll();
1057                     result = top.read();
1058 
1059                     if (result != null) {
1060                         if (top.getBuffer().isAtEnd())
1061                             top.fill();
1062 
1063                         queue.offer(top);
1064                         break;
1065                     } 
1066                 }
1067 
1068                 return result;
1069             }
1070         } 
1071         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentSplit>, ShreddedSource {      
1072             public ShreddedProcessor processor;
1073             ShreddedBuffer buffer;
1074             DocumentSplit last = new DocumentSplit();         
1075             long updateFileNameCount = -1;
1076             long updateStartKeyCount = -1;
1077             long tupleCount = 0;
1078             long bufferStartCount = 0;  
1079             ArrayInput input;
1080             
1081             public ShreddedReader(ArrayInput input) {
1082                 this.input = input; 
1083                 this.buffer = new ShreddedBuffer();
1084             }                               
1085             
1086             public ShreddedReader(ArrayInput input, int bufferSize) { 
1087                 this.input = input;
1088                 this.buffer = new ShreddedBuffer(bufferSize);
1089             }
1090                  
1091             public final int compareTo(ShreddedReader other) {
1092                 ShreddedBuffer otherBuffer = other.getBuffer();
1093                 
1094                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1095                     return 0;                 
1096                 } else if (buffer.isAtEnd()) {
1097                     return -1;
1098                 } else if (otherBuffer.isAtEnd()) {
1099                     return 1;
1100                 }
1101                                    
1102                 int result = 0;
1103                 do {
1104                     result = + Utility.compare(buffer.getFileName(), otherBuffer.getFileName());
1105                     if(result != 0) break;
1106                     result = + Utility.compare(buffer.getStartKey(), otherBuffer.getStartKey());
1107                     if(result != 0) break;
1108                 } while (false);                                             
1109                 
1110                 return result;
1111             }
1112             
1113             public final ShreddedBuffer getBuffer() {
1114                 return buffer;
1115             }                
1116             
1117             public final DocumentSplit read() throws IOException {
1118                 if (buffer.isAtEnd()) {
1119                     fill();             
1120                 
1121                     if (buffer.isAtEnd()) {
1122                         return null;
1123                     }
1124                 }
1125                       
1126                 assert !buffer.isAtEnd();
1127                 DocumentSplit result = new DocumentSplit();
1128                 
1129                 result.fileName = buffer.getFileName();
1130                 result.startKey = buffer.getStartKey();
1131                 result.fileType = buffer.getFileType();
1132                 result.isCompressed = buffer.getIsCompressed();
1133                 result.endKey = buffer.getEndKey();
1134                 
1135                 buffer.incrementTuple();
1136                 buffer.autoIncrementFileName();
1137                 buffer.autoIncrementStartKey();
1138                 
1139                 return result;
1140             }           
1141             
1142             public final void fill() throws IOException {
1143                 try {   
1144                     buffer.reset();
1145                     
1146                     if (tupleCount != 0) {
1147                                                       
1148                         if(updateFileNameCount - tupleCount > 0) {
1149                             buffer.fileNames.add(last.fileName);
1150                             buffer.fileNameTupleIdx.add((int) (updateFileNameCount - tupleCount));
1151                         }                              
1152                         if(updateStartKeyCount - tupleCount > 0) {
1153                             buffer.startKeys.add(last.startKey);
1154                             buffer.startKeyTupleIdx.add((int) (updateStartKeyCount - tupleCount));
1155                         }
1156                         bufferStartCount = tupleCount;
1157                     }
1158                     
1159                     while (!buffer.isFull()) {
1160                         updateStartKey();
1161                         buffer.processTuple(input.readString(), input.readBoolean(), input.readBytes());
1162                         tupleCount++;
1163                     }
1164                 } catch(EOFException e) {}
1165             }
1166 
1167             public final void updateFileName() throws IOException {
1168                 if (updateFileNameCount > tupleCount)
1169                     return;
1170                      
1171                 last.fileName = input.readString();
1172                 updateFileNameCount = tupleCount + input.readInt();
1173                                       
1174                 buffer.processFileName(last.fileName);
1175             }
1176             public final void updateStartKey() throws IOException {
1177                 if (updateStartKeyCount > tupleCount)
1178                     return;
1179                      
1180                 updateFileName();
1181                 last.startKey = input.readBytes();
1182                 updateStartKeyCount = tupleCount + input.readInt();
1183                                       
1184                 buffer.processStartKey(last.startKey);
1185             }
1186 
1187             public void run() throws IOException {
1188                 while (true) {
1189                     fill();
1190                     
1191                     if (buffer.isAtEnd())
1192                         break;
1193                     
1194                     buffer.copyUntil(null, processor);
1195                 }      
1196                 processor.close();
1197             }
1198             
1199             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1200                 if (processor instanceof ShreddedProcessor) {
1201                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1202                 } else if (processor instanceof DocumentSplit.Processor) {
1203                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentSplit.Processor) processor));
1204                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1205                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentSplit>) processor));
1206                 } else {
1207                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1208                 }
1209             }                                
1210             
1211             public Class<DocumentSplit> getOutputClass() {
1212                 return DocumentSplit.class;
1213             }                
1214         }
1215         
1216         public static class DuplicateEliminator implements ShreddedProcessor {
1217             public ShreddedProcessor processor;
1218             DocumentSplit last = new DocumentSplit();
1219             boolean fileNameProcess = true;
1220             boolean startKeyProcess = true;
1221                                            
1222             public DuplicateEliminator() {}
1223             public DuplicateEliminator(ShreddedProcessor processor) {
1224                 this.processor = processor;
1225             }
1226             
1227             public void setShreddedProcessor(ShreddedProcessor processor) {
1228                 this.processor = processor;
1229             }
1230 
1231             public void processFileName(String fileName) throws IOException {  
1232                 if (fileNameProcess || Utility.compare(fileName, last.fileName) != 0) {
1233                     last.fileName = fileName;
1234                     processor.processFileName(fileName);
1235             resetStartKey();
1236                     fileNameProcess = false;
1237                 }
1238             }
1239             public void processStartKey(byte[] startKey) throws IOException {  
1240                 if (startKeyProcess || Utility.compare(startKey, last.startKey) != 0) {
1241                     last.startKey = startKey;
1242                     processor.processStartKey(startKey);
1243                     startKeyProcess = false;
1244                 }
1245             }  
1246             
1247             public void resetFileName() {
1248                  fileNameProcess = true;
1249             resetStartKey();
1250             }                                                
1251             public void resetStartKey() {
1252                  startKeyProcess = true;
1253             }                                                
1254                                
1255             public void processTuple(String fileType, boolean isCompressed, byte[] endKey) throws IOException {
1256                 processor.processTuple(fileType, isCompressed, endKey);
1257             } 
1258             
1259             public void close() throws IOException {
1260                 processor.close();
1261             }                    
1262         }
1263         public static class TupleUnshredder implements ShreddedProcessor {
1264             DocumentSplit last = new DocumentSplit();
1265             public org.galagosearch.tupleflow.Processor<DocumentSplit> processor;                               
1266             
1267             public TupleUnshredder(DocumentSplit.Processor processor) {
1268                 this.processor = processor;
1269             }         
1270             
1271             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentSplit> processor) {
1272                 this.processor = processor;
1273             }
1274             
1275             public DocumentSplit clone(DocumentSplit object) {
1276                 DocumentSplit result = new DocumentSplit();
1277                 if (object == null) return result;
1278                 result.fileName = object.fileName; 
1279                 result.fileType = object.fileType; 
1280                 result.isCompressed = object.isCompressed; 
1281                 result.startKey = object.startKey; 
1282                 result.endKey = object.endKey; 
1283                 return result;
1284             }                 
1285             
1286             public void processFileName(String fileName) throws IOException {
1287                 last.fileName = fileName;
1288             }   
1289                 
1290             public void processStartKey(byte[] startKey) throws IOException {
1291                 last.startKey = startKey;
1292             }   
1293                 
1294             
1295             public void processTuple(String fileType, boolean isCompressed, byte[] endKey) throws IOException {
1296                 last.fileType = fileType;
1297                 last.isCompressed = isCompressed;
1298                 last.endKey = endKey;
1299                 processor.process(clone(last));
1300             }               
1301             
1302             public void close() throws IOException {
1303                 processor.close();
1304             }
1305         }     
1306         public static class TupleShredder implements Processor {
1307             DocumentSplit last = new DocumentSplit();
1308             public ShreddedProcessor processor;
1309             
1310             public TupleShredder(ShreddedProcessor processor) {
1311                 this.processor = processor;
1312             }                              
1313             
1314             public DocumentSplit clone(DocumentSplit object) {
1315                 DocumentSplit result = new DocumentSplit();
1316                 if (object == null) return result;
1317                 result.fileName = object.fileName; 
1318                 result.fileType = object.fileType; 
1319                 result.isCompressed = object.isCompressed; 
1320                 result.startKey = object.startKey; 
1321                 result.endKey = object.endKey; 
1322                 return result;
1323             }                 
1324             
1325             public void process(DocumentSplit object) throws IOException {                                                                                                                                                   
1326                 boolean processAll = false;
1327                 if(last == null || Utility.compare(last.fileName, object.fileName) != 0 || processAll) { processor.processFileName(object.fileName); processAll = true; }
1328                 if(last == null || Utility.compare(last.startKey, object.startKey) != 0 || processAll) { processor.processStartKey(object.startKey); processAll = true; }
1329                 processor.processTuple(object.fileType, object.isCompressed, object.endKey);                                         
1330             }
1331                           
1332             public Class<DocumentSplit> getInputClass() {
1333                 return DocumentSplit.class;
1334             }
1335             
1336             public void close() throws IOException {
1337                 processor.close();
1338             }                     
1339         }
1340     } 
1341 }