View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class NumberedDocumentData implements Type<NumberedDocumentData> {
25      public String identifier;
26      public String url;
27      public int number;
28      public int textLength; 
29      
30      public NumberedDocumentData() {}
31      public NumberedDocumentData(String identifier, String url, int number, int textLength) {
32          this.identifier = identifier;
33          this.url = url;
34          this.number = number;
35          this.textLength = textLength;
36      }  
37      
38      public String toString() {
39              return String.format("%s,%s,%d,%d",
40                                     identifier, url, number, textLength);
41      } 
42  
43      public Order<NumberedDocumentData> getOrder(String... spec) {
44          if (Arrays.equals(spec, new String[] {  })) {
45              return new Unordered();
46          }
47          if (Arrays.equals(spec, new String[] { "+number" })) {
48              return new NumberOrder();
49          }
50          return null;
51      } 
52        
53      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<NumberedDocumentData> {
54          public void process(NumberedDocumentData object) throws IOException;
55          public void close() throws IOException;
56      }                        
57      public interface Source extends Step {
58      }
59      public static class Unordered implements Order<NumberedDocumentData> {
60          public int hash(NumberedDocumentData object) {
61              int h = 0;
62              return h;
63          } 
64          public Comparator<NumberedDocumentData> greaterThan() {
65              return new Comparator<NumberedDocumentData>() {
66                  public int compare(NumberedDocumentData one, NumberedDocumentData two) {
67                      int result = 0;
68                      do {
69                      } while (false);
70                      return -result;
71                  }
72              };
73          }     
74          public Comparator<NumberedDocumentData> lessThan() {
75              return new Comparator<NumberedDocumentData>() {
76                  public int compare(NumberedDocumentData one, NumberedDocumentData two) {
77                      int result = 0;
78                      do {
79                      } while (false);
80                      return result;
81                  }
82              };
83          }     
84          public TypeReader<NumberedDocumentData> orderedReader(ArrayInput _input) {
85              return new ShreddedReader(_input);
86          }    
87  
88          public TypeReader<NumberedDocumentData> orderedReader(ArrayInput _input, int bufferSize) {
89              return new ShreddedReader(_input, bufferSize);
90          }    
91          public OrderedWriter<NumberedDocumentData> orderedWriter(ArrayOutput _output) {
92              ShreddedWriter w = new ShreddedWriter(_output);
93              return new OrderedWriterClass(w); 
94          }                                    
95          public static class OrderedWriterClass extends OrderedWriter< NumberedDocumentData > {
96              NumberedDocumentData last = null;
97              ShreddedWriter shreddedWriter = null; 
98              
99              public OrderedWriterClass(ShreddedWriter s) {
100                 this.shreddedWriter = s;
101             }
102             
103             public void process(NumberedDocumentData object) throws IOException {
104                boolean processAll = false;
105                shreddedWriter.processTuple(object.identifier, object.url, object.number, object.textLength);
106                last = object;
107             }           
108                  
109             public void close() throws IOException {
110                 shreddedWriter.close();
111             }
112             
113             public Class<NumberedDocumentData> getInputClass() {
114                 return NumberedDocumentData.class;
115             }
116         } 
117         public ReaderSource<NumberedDocumentData> orderedCombiner(Collection<TypeReader<NumberedDocumentData>> readers, boolean closeOnExit) {
118             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
119             
120             for (TypeReader<NumberedDocumentData> reader : readers) {
121                 shreddedReaders.add((ShreddedReader)reader);
122             }
123             
124             return new ShreddedCombiner(shreddedReaders, closeOnExit);
125         }                  
126         public NumberedDocumentData clone(NumberedDocumentData object) {
127             NumberedDocumentData result = new NumberedDocumentData();
128             if (object == null) return result;
129             result.identifier = object.identifier; 
130             result.url = object.url; 
131             result.number = object.number; 
132             result.textLength = object.textLength; 
133             return result;
134         }                 
135         public Class<NumberedDocumentData> getOrderedClass() {
136             return NumberedDocumentData.class;
137         }                           
138         public String[] getOrderSpec() {
139             return new String[] {};
140         }
141 
142         public static String getSpecString() {
143             return "";
144         }
145                            
146         public interface ShreddedProcessor extends Step {
147             public void processTuple(String identifier, String url, int number, int textLength) throws IOException;
148             public void close() throws IOException;
149         }    
150         public interface ShreddedSource extends Step {
151         }                                              
152         
153         public static class ShreddedWriter implements ShreddedProcessor {
154             ArrayOutput output;
155             ShreddedBuffer buffer = new ShreddedBuffer();
156             boolean lastFlush = false;
157             
158             public ShreddedWriter(ArrayOutput output) {
159                 this.output = output;
160             }                        
161             
162             public void close() throws IOException {
163                 flush();
164             }
165             
166             public final void processTuple(String identifier, String url, int number, int textLength) throws IOException {
167                 if (lastFlush) {
168                     lastFlush = false;
169                 }
170                 buffer.processTuple(identifier, url, number, textLength);
171                 if (buffer.isFull())
172                     flush();
173             }
174             public final void flushTuples(int pauseIndex) throws IOException {
175                 
176                 while (buffer.getReadIndex() < pauseIndex) {
177                            
178                     output.writeString(buffer.getIdentifier());
179                     output.writeString(buffer.getUrl());
180                     output.writeInt(buffer.getNumber());
181                     output.writeInt(buffer.getTextLength());
182                     buffer.incrementTuple();
183                 }
184             }  
185             public void flush() throws IOException { 
186                 flushTuples(buffer.getWriteIndex());
187                 buffer.reset(); 
188                 lastFlush = true;
189             }                           
190         }
191         public static class ShreddedBuffer {
192                             
193             String[] identifiers;
194             String[] urls;
195             int[] numbers;
196             int[] textLengths;
197             int writeTupleIndex = 0;
198             int readTupleIndex = 0;
199             int batchSize;
200 
201             public ShreddedBuffer(int batchSize) {
202                 this.batchSize = batchSize;
203 
204                 identifiers = new String[batchSize];
205                 urls = new String[batchSize];
206                 numbers = new int[batchSize];
207                 textLengths = new int[batchSize];
208             }                              
209 
210             public ShreddedBuffer() {    
211                 this(10000);
212             }                                                                                                                    
213             
214             public void processTuple(String identifier, String url, int number, int textLength) {
215                 identifiers[writeTupleIndex] = identifier;
216                 urls[writeTupleIndex] = url;
217                 numbers[writeTupleIndex] = number;
218                 textLengths[writeTupleIndex] = textLength;
219                 writeTupleIndex++;
220             }
221             public void resetData() {
222                 writeTupleIndex = 0;
223             }                  
224                                  
225             public void resetRead() {
226                 readTupleIndex = 0;
227             } 
228 
229             public void reset() {
230                 resetData();
231                 resetRead();
232             } 
233             public boolean isFull() {
234                 return writeTupleIndex >= batchSize;
235             }
236 
237             public boolean isEmpty() {
238                 return writeTupleIndex == 0;
239             }                          
240 
241             public boolean isAtEnd() {
242                 return readTupleIndex >= writeTupleIndex;
243             }           
244             public void incrementTuple() {
245                 readTupleIndex++;
246             }                    
247             public int getReadIndex() {
248                 return readTupleIndex;
249             }   
250 
251             public int getWriteIndex() {
252                 return writeTupleIndex;
253             } 
254             public String getIdentifier() {
255                 assert readTupleIndex < writeTupleIndex;
256                 return identifiers[readTupleIndex];
257             }                                         
258             public String getUrl() {
259                 assert readTupleIndex < writeTupleIndex;
260                 return urls[readTupleIndex];
261             }                                         
262             public int getNumber() {
263                 assert readTupleIndex < writeTupleIndex;
264                 return numbers[readTupleIndex];
265             }                                         
266             public int getTextLength() {
267                 assert readTupleIndex < writeTupleIndex;
268                 return textLengths[readTupleIndex];
269             }                                         
270             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
271                 while (getReadIndex() < endIndex) {
272                    output.processTuple(getIdentifier(), getUrl(), getNumber(), getTextLength());
273                    incrementTuple();
274                 }
275             }                                                                           
276              
277             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
278             }
279             
280         }                         
281         public static class ShreddedCombiner implements ReaderSource<NumberedDocumentData>, ShreddedSource {   
282             public ShreddedProcessor processor;
283             Collection<ShreddedReader> readers;       
284             boolean closeOnExit = false;
285             boolean uninitialized = true;
286             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
287             
288             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
289                 this.readers = readers;                                                       
290                 this.closeOnExit = closeOnExit;
291             }
292                                   
293             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
294                 if (processor instanceof ShreddedProcessor) {
295                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
296                 } else if (processor instanceof NumberedDocumentData.Processor) {
297                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedDocumentData.Processor) processor));
298                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
299                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedDocumentData>) processor));
300                 } else {
301                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
302                 }
303             }                                
304             
305             public Class<NumberedDocumentData> getOutputClass() {
306                 return NumberedDocumentData.class;
307             }
308             
309             public void initialize() throws IOException {
310                 for (ShreddedReader reader : readers) {
311                     reader.fill();                                        
312                     
313                     if (!reader.getBuffer().isAtEnd())
314                         queue.add(reader);
315                 }   
316 
317                 uninitialized = false;
318             }
319 
320             public void run() throws IOException {
321                 initialize();
322                
323                 while (queue.size() > 0) {
324                     ShreddedReader top = queue.poll();
325                     ShreddedReader next = null;
326                     ShreddedBuffer nextBuffer = null; 
327                     
328                     assert !top.getBuffer().isAtEnd();
329                                                   
330                     if (queue.size() > 0) {
331                         next = queue.peek();
332                         nextBuffer = next.getBuffer();
333                         assert !nextBuffer.isAtEnd();
334                     }
335                     
336                     top.getBuffer().copyUntil(nextBuffer, processor);
337                     if (top.getBuffer().isAtEnd())
338                         top.fill();                 
339                         
340                     if (!top.getBuffer().isAtEnd())
341                         queue.add(top);
342                 }              
343                 
344                 if (closeOnExit)
345                     processor.close();
346             }
347 
348             public NumberedDocumentData read() throws IOException {
349                 if (uninitialized)
350                     initialize();
351 
352                 NumberedDocumentData result = null;
353 
354                 while (queue.size() > 0) {
355                     ShreddedReader top = queue.poll();
356                     result = top.read();
357 
358                     if (result != null) {
359                         if (top.getBuffer().isAtEnd())
360                             top.fill();
361 
362                         queue.offer(top);
363                         break;
364                     } 
365                 }
366 
367                 return result;
368             }
369         } 
370         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedDocumentData>, ShreddedSource {      
371             public ShreddedProcessor processor;
372             ShreddedBuffer buffer;
373             NumberedDocumentData last = new NumberedDocumentData();         
374             long tupleCount = 0;
375             long bufferStartCount = 0;  
376             ArrayInput input;
377             
378             public ShreddedReader(ArrayInput input) {
379                 this.input = input; 
380                 this.buffer = new ShreddedBuffer();
381             }                               
382             
383             public ShreddedReader(ArrayInput input, int bufferSize) { 
384                 this.input = input;
385                 this.buffer = new ShreddedBuffer(bufferSize);
386             }
387                  
388             public final int compareTo(ShreddedReader other) {
389                 ShreddedBuffer otherBuffer = other.getBuffer();
390                 
391                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
392                     return 0;                 
393                 } else if (buffer.isAtEnd()) {
394                     return -1;
395                 } else if (otherBuffer.isAtEnd()) {
396                     return 1;
397                 }
398                                    
399                 int result = 0;
400                 do {
401                 } while (false);                                             
402                 
403                 return result;
404             }
405             
406             public final ShreddedBuffer getBuffer() {
407                 return buffer;
408             }                
409             
410             public final NumberedDocumentData read() throws IOException {
411                 if (buffer.isAtEnd()) {
412                     fill();             
413                 
414                     if (buffer.isAtEnd()) {
415                         return null;
416                     }
417                 }
418                       
419                 assert !buffer.isAtEnd();
420                 NumberedDocumentData result = new NumberedDocumentData();
421                 
422                 result.identifier = buffer.getIdentifier();
423                 result.url = buffer.getUrl();
424                 result.number = buffer.getNumber();
425                 result.textLength = buffer.getTextLength();
426                 
427                 buffer.incrementTuple();
428                 
429                 return result;
430             }           
431             
432             public final void fill() throws IOException {
433                 try {   
434                     buffer.reset();
435                     
436                     if (tupleCount != 0) {
437                         bufferStartCount = tupleCount;
438                     }
439                     
440                     while (!buffer.isFull()) {
441                         buffer.processTuple(input.readString(), input.readString(), input.readInt(), input.readInt());
442                         tupleCount++;
443                     }
444                 } catch(EOFException e) {}
445             }
446 
447 
448             public void run() throws IOException {
449                 while (true) {
450                     fill();
451                     
452                     if (buffer.isAtEnd())
453                         break;
454                     
455                     buffer.copyUntil(null, processor);
456                 }      
457                 processor.close();
458             }
459             
460             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
461                 if (processor instanceof ShreddedProcessor) {
462                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
463                 } else if (processor instanceof NumberedDocumentData.Processor) {
464                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedDocumentData.Processor) processor));
465                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
466                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedDocumentData>) processor));
467                 } else {
468                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
469                 }
470             }                                
471             
472             public Class<NumberedDocumentData> getOutputClass() {
473                 return NumberedDocumentData.class;
474             }                
475         }
476         
477         public static class DuplicateEliminator implements ShreddedProcessor {
478             public ShreddedProcessor processor;
479             NumberedDocumentData last = new NumberedDocumentData();
480                                            
481             public DuplicateEliminator() {}
482             public DuplicateEliminator(ShreddedProcessor processor) {
483                 this.processor = processor;
484             }
485             
486             public void setShreddedProcessor(ShreddedProcessor processor) {
487                 this.processor = processor;
488             }
489 
490           
491             
492                                
493             public void processTuple(String identifier, String url, int number, int textLength) throws IOException {
494                 processor.processTuple(identifier, url, number, textLength);
495             } 
496             
497             public void close() throws IOException {
498                 processor.close();
499             }                    
500         }
501         public static class TupleUnshredder implements ShreddedProcessor {
502             NumberedDocumentData last = new NumberedDocumentData();
503             public org.galagosearch.tupleflow.Processor<NumberedDocumentData> processor;                               
504             
505             public TupleUnshredder(NumberedDocumentData.Processor processor) {
506                 this.processor = processor;
507             }         
508             
509             public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedDocumentData> processor) {
510                 this.processor = processor;
511             }
512             
513             public NumberedDocumentData clone(NumberedDocumentData object) {
514                 NumberedDocumentData result = new NumberedDocumentData();
515                 if (object == null) return result;
516                 result.identifier = object.identifier; 
517                 result.url = object.url; 
518                 result.number = object.number; 
519                 result.textLength = object.textLength; 
520                 return result;
521             }                 
522             
523             
524             public void processTuple(String identifier, String url, int number, int textLength) throws IOException {
525                 last.identifier = identifier;
526                 last.url = url;
527                 last.number = number;
528                 last.textLength = textLength;
529                 processor.process(clone(last));
530             }               
531             
532             public void close() throws IOException {
533                 processor.close();
534             }
535         }     
536         public static class TupleShredder implements Processor {
537             NumberedDocumentData last = new NumberedDocumentData();
538             public ShreddedProcessor processor;
539             
540             public TupleShredder(ShreddedProcessor processor) {
541                 this.processor = processor;
542             }                              
543             
544             public NumberedDocumentData clone(NumberedDocumentData object) {
545                 NumberedDocumentData result = new NumberedDocumentData();
546                 if (object == null) return result;
547                 result.identifier = object.identifier; 
548                 result.url = object.url; 
549                 result.number = object.number; 
550                 result.textLength = object.textLength; 
551                 return result;
552             }                 
553             
554             public void process(NumberedDocumentData object) throws IOException {                                                                                                                                                   
555                 boolean processAll = false;
556                 processor.processTuple(object.identifier, object.url, object.number, object.textLength);                                         
557             }
558                           
559             public Class<NumberedDocumentData> getInputClass() {
560                 return NumberedDocumentData.class;
561             }
562             
563             public void close() throws IOException {
564                 processor.close();
565             }                     
566         }
567     } 
568     public static class NumberOrder implements Order<NumberedDocumentData> {
569         public int hash(NumberedDocumentData object) {
570             int h = 0;
571             h += Utility.hash(object.number);
572             return h;
573         } 
574         public Comparator<NumberedDocumentData> greaterThan() {
575             return new Comparator<NumberedDocumentData>() {
576                 public int compare(NumberedDocumentData one, NumberedDocumentData two) {
577                     int result = 0;
578                     do {
579                         result = + Utility.compare(one.number, two.number);
580                         if(result != 0) break;
581                     } while (false);
582                     return -result;
583                 }
584             };
585         }     
586         public Comparator<NumberedDocumentData> lessThan() {
587             return new Comparator<NumberedDocumentData>() {
588                 public int compare(NumberedDocumentData one, NumberedDocumentData two) {
589                     int result = 0;
590                     do {
591                         result = + Utility.compare(one.number, two.number);
592                         if(result != 0) break;
593                     } while (false);
594                     return result;
595                 }
596             };
597         }     
598         public TypeReader<NumberedDocumentData> orderedReader(ArrayInput _input) {
599             return new ShreddedReader(_input);
600         }    
601 
602         public TypeReader<NumberedDocumentData> orderedReader(ArrayInput _input, int bufferSize) {
603             return new ShreddedReader(_input, bufferSize);
604         }    
605         public OrderedWriter<NumberedDocumentData> orderedWriter(ArrayOutput _output) {
606             ShreddedWriter w = new ShreddedWriter(_output);
607             return new OrderedWriterClass(w); 
608         }                                    
609         public static class OrderedWriterClass extends OrderedWriter< NumberedDocumentData > {
610             NumberedDocumentData last = null;
611             ShreddedWriter shreddedWriter = null; 
612             
613             public OrderedWriterClass(ShreddedWriter s) {
614                 this.shreddedWriter = s;
615             }
616             
617             public void process(NumberedDocumentData object) throws IOException {
618                boolean processAll = false;
619                if (processAll || last == null || 0 != Utility.compare(object.number, last.number)) { processAll = true; shreddedWriter.processNumber(object.number); }
620                shreddedWriter.processTuple(object.identifier, object.url, object.textLength);
621                last = object;
622             }           
623                  
624             public void close() throws IOException {
625                 shreddedWriter.close();
626             }
627             
628             public Class<NumberedDocumentData> getInputClass() {
629                 return NumberedDocumentData.class;
630             }
631         } 
632         public ReaderSource<NumberedDocumentData> orderedCombiner(Collection<TypeReader<NumberedDocumentData>> readers, boolean closeOnExit) {
633             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
634             
635             for (TypeReader<NumberedDocumentData> reader : readers) {
636                 shreddedReaders.add((ShreddedReader)reader);
637             }
638             
639             return new ShreddedCombiner(shreddedReaders, closeOnExit);
640         }                  
641         public NumberedDocumentData clone(NumberedDocumentData object) {
642             NumberedDocumentData result = new NumberedDocumentData();
643             if (object == null) return result;
644             result.identifier = object.identifier; 
645             result.url = object.url; 
646             result.number = object.number; 
647             result.textLength = object.textLength; 
648             return result;
649         }                 
650         public Class<NumberedDocumentData> getOrderedClass() {
651             return NumberedDocumentData.class;
652         }                           
653         public String[] getOrderSpec() {
654             return new String[] {"+number"};
655         }
656 
657         public static String getSpecString() {
658             return "+number";
659         }
660                            
661         public interface ShreddedProcessor extends Step {
662             public void processNumber(int number) throws IOException;
663             public void processTuple(String identifier, String url, int textLength) throws IOException;
664             public void close() throws IOException;
665         }    
666         public interface ShreddedSource extends Step {
667         }                                              
668         
669         public static class ShreddedWriter implements ShreddedProcessor {
670             ArrayOutput output;
671             ShreddedBuffer buffer = new ShreddedBuffer();
672             int lastNumber;
673             boolean lastFlush = false;
674             
675             public ShreddedWriter(ArrayOutput output) {
676                 this.output = output;
677             }                        
678             
679             public void close() throws IOException {
680                 flush();
681             }
682             
683             public void processNumber(int number) {
684                 lastNumber = number;
685                 buffer.processNumber(number);
686             }
687             public final void processTuple(String identifier, String url, int textLength) throws IOException {
688                 if (lastFlush) {
689                     if(buffer.numbers.size() == 0) buffer.processNumber(lastNumber);
690                     lastFlush = false;
691                 }
692                 buffer.processTuple(identifier, url, textLength);
693                 if (buffer.isFull())
694                     flush();
695             }
696             public final void flushTuples(int pauseIndex) throws IOException {
697                 
698                 while (buffer.getReadIndex() < pauseIndex) {
699                            
700                     output.writeString(buffer.getIdentifier());
701                     output.writeString(buffer.getUrl());
702                     output.writeInt(buffer.getTextLength());
703                     buffer.incrementTuple();
704                 }
705             }  
706             public final void flushNumber(int pauseIndex) throws IOException {
707                 while (buffer.getReadIndex() < pauseIndex) {
708                     int nextPause = buffer.getNumberEndIndex();
709                     int count = nextPause - buffer.getReadIndex();
710                     
711                     output.writeInt(buffer.getNumber());
712                     output.writeInt(count);
713                     buffer.incrementNumber();
714                       
715                     flushTuples(nextPause);
716                     assert nextPause == buffer.getReadIndex();
717                 }
718             }
719             public void flush() throws IOException { 
720                 flushNumber(buffer.getWriteIndex());
721                 buffer.reset(); 
722                 lastFlush = true;
723             }                           
724         }
725         public static class ShreddedBuffer {
726             ArrayList<Integer> numbers = new ArrayList();
727             ArrayList<Integer> numberTupleIdx = new ArrayList();
728             int numberReadIdx = 0;
729                             
730             String[] identifiers;
731             String[] urls;
732             int[] textLengths;
733             int writeTupleIndex = 0;
734             int readTupleIndex = 0;
735             int batchSize;
736 
737             public ShreddedBuffer(int batchSize) {
738                 this.batchSize = batchSize;
739 
740                 identifiers = new String[batchSize];
741                 urls = new String[batchSize];
742                 textLengths = new int[batchSize];
743             }                              
744 
745             public ShreddedBuffer() {    
746                 this(10000);
747             }                                                                                                                    
748             
749             public void processNumber(int number) {
750                 numbers.add(number);
751                 numberTupleIdx.add(writeTupleIndex);
752             }                                      
753             public void processTuple(String identifier, String url, int textLength) {
754                 assert numbers.size() > 0;
755                 identifiers[writeTupleIndex] = identifier;
756                 urls[writeTupleIndex] = url;
757                 textLengths[writeTupleIndex] = textLength;
758                 writeTupleIndex++;
759             }
760             public void resetData() {
761                 numbers.clear();
762                 numberTupleIdx.clear();
763                 writeTupleIndex = 0;
764             }                  
765                                  
766             public void resetRead() {
767                 readTupleIndex = 0;
768                 numberReadIdx = 0;
769             } 
770 
771             public void reset() {
772                 resetData();
773                 resetRead();
774             } 
775             public boolean isFull() {
776                 return writeTupleIndex >= batchSize;
777             }
778 
779             public boolean isEmpty() {
780                 return writeTupleIndex == 0;
781             }                          
782 
783             public boolean isAtEnd() {
784                 return readTupleIndex >= writeTupleIndex;
785             }           
786             public void incrementNumber() {
787                 numberReadIdx++;  
788             }                                                                                              
789 
790             public void autoIncrementNumber() {
791                 while (readTupleIndex >= getNumberEndIndex() && readTupleIndex < writeTupleIndex)
792                     numberReadIdx++;
793             }                 
794             public void incrementTuple() {
795                 readTupleIndex++;
796             }                    
797             public int getNumberEndIndex() {
798                 if ((numberReadIdx+1) >= numberTupleIdx.size())
799                     return writeTupleIndex;
800                 return numberTupleIdx.get(numberReadIdx+1);
801             }
802             public int getReadIndex() {
803                 return readTupleIndex;
804             }   
805 
806             public int getWriteIndex() {
807                 return writeTupleIndex;
808             } 
809             public int getNumber() {
810                 assert readTupleIndex < writeTupleIndex;
811                 assert numberReadIdx < numbers.size();
812                 
813                 return numbers.get(numberReadIdx);
814             }
815             public String getIdentifier() {
816                 assert readTupleIndex < writeTupleIndex;
817                 return identifiers[readTupleIndex];
818             }                                         
819             public String getUrl() {
820                 assert readTupleIndex < writeTupleIndex;
821                 return urls[readTupleIndex];
822             }                                         
823             public int getTextLength() {
824                 assert readTupleIndex < writeTupleIndex;
825                 return textLengths[readTupleIndex];
826             }                                         
827             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
828                 while (getReadIndex() < endIndex) {
829                    output.processTuple(getIdentifier(), getUrl(), getTextLength());
830                    incrementTuple();
831                 }
832             }                                                                           
833             public void copyUntilIndexNumber(int endIndex, ShreddedProcessor output) throws IOException {
834                 while (getReadIndex() < endIndex) {
835                     output.processNumber(getNumber());
836                     assert getNumberEndIndex() <= endIndex;
837                     copyTuples(getNumberEndIndex(), output);
838                     incrementNumber();
839                 }
840             }  
841             public void copyUntilNumber(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
842                 while (!isAtEnd()) {
843                     if (other != null) {   
844                         assert !other.isAtEnd();
845                         int c = + Utility.compare(getNumber(), other.getNumber());
846                     
847                         if (c > 0) {
848                             break;   
849                         }
850                         
851                         output.processNumber(getNumber());
852                                       
853                         copyTuples(getNumberEndIndex(), output);
854                     } else {
855                         output.processNumber(getNumber());
856                         copyTuples(getNumberEndIndex(), output);
857                     }
858                     incrementNumber();  
859                     
860                
861                 }
862             }
863             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
864                 copyUntilNumber(other, output);
865             }
866             
867         }                         
868         public static class ShreddedCombiner implements ReaderSource<NumberedDocumentData>, ShreddedSource {   
869             public ShreddedProcessor processor;
870             Collection<ShreddedReader> readers;       
871             boolean closeOnExit = false;
872             boolean uninitialized = true;
873             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
874             
875             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
876                 this.readers = readers;                                                       
877                 this.closeOnExit = closeOnExit;
878             }
879                                   
880             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
881                 if (processor instanceof ShreddedProcessor) {
882                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
883                 } else if (processor instanceof NumberedDocumentData.Processor) {
884                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedDocumentData.Processor) processor));
885                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
886                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedDocumentData>) processor));
887                 } else {
888                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
889                 }
890             }                                
891             
892             public Class<NumberedDocumentData> getOutputClass() {
893                 return NumberedDocumentData.class;
894             }
895             
896             public void initialize() throws IOException {
897                 for (ShreddedReader reader : readers) {
898                     reader.fill();                                        
899                     
900                     if (!reader.getBuffer().isAtEnd())
901                         queue.add(reader);
902                 }   
903 
904                 uninitialized = false;
905             }
906 
907             public void run() throws IOException {
908                 initialize();
909                
910                 while (queue.size() > 0) {
911                     ShreddedReader top = queue.poll();
912                     ShreddedReader next = null;
913                     ShreddedBuffer nextBuffer = null; 
914                     
915                     assert !top.getBuffer().isAtEnd();
916                                                   
917                     if (queue.size() > 0) {
918                         next = queue.peek();
919                         nextBuffer = next.getBuffer();
920                         assert !nextBuffer.isAtEnd();
921                     }
922                     
923                     top.getBuffer().copyUntil(nextBuffer, processor);
924                     if (top.getBuffer().isAtEnd())
925                         top.fill();                 
926                         
927                     if (!top.getBuffer().isAtEnd())
928                         queue.add(top);
929                 }              
930                 
931                 if (closeOnExit)
932                     processor.close();
933             }
934 
935             public NumberedDocumentData read() throws IOException {
936                 if (uninitialized)
937                     initialize();
938 
939                 NumberedDocumentData result = null;
940 
941                 while (queue.size() > 0) {
942                     ShreddedReader top = queue.poll();
943                     result = top.read();
944 
945                     if (result != null) {
946                         if (top.getBuffer().isAtEnd())
947                             top.fill();
948 
949                         queue.offer(top);
950                         break;
951                     } 
952                 }
953 
954                 return result;
955             }
956         } 
957         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedDocumentData>, ShreddedSource {      
958             public ShreddedProcessor processor;
959             ShreddedBuffer buffer;
960             NumberedDocumentData last = new NumberedDocumentData();         
961             long updateNumberCount = -1;
962             long tupleCount = 0;
963             long bufferStartCount = 0;  
964             ArrayInput input;
965             
966             public ShreddedReader(ArrayInput input) {
967                 this.input = input; 
968                 this.buffer = new ShreddedBuffer();
969             }                               
970             
971             public ShreddedReader(ArrayInput input, int bufferSize) { 
972                 this.input = input;
973                 this.buffer = new ShreddedBuffer(bufferSize);
974             }
975                  
976             public final int compareTo(ShreddedReader other) {
977                 ShreddedBuffer otherBuffer = other.getBuffer();
978                 
979                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
980                     return 0;                 
981                 } else if (buffer.isAtEnd()) {
982                     return -1;
983                 } else if (otherBuffer.isAtEnd()) {
984                     return 1;
985                 }
986                                    
987                 int result = 0;
988                 do {
989                     result = + Utility.compare(buffer.getNumber(), otherBuffer.getNumber());
990                     if(result != 0) break;
991                 } while (false);                                             
992                 
993                 return result;
994             }
995             
996             public final ShreddedBuffer getBuffer() {
997                 return buffer;
998             }                
999             
1000             public final NumberedDocumentData read() throws IOException {
1001                 if (buffer.isAtEnd()) {
1002                     fill();             
1003                 
1004                     if (buffer.isAtEnd()) {
1005                         return null;
1006                     }
1007                 }
1008                       
1009                 assert !buffer.isAtEnd();
1010                 NumberedDocumentData result = new NumberedDocumentData();
1011                 
1012                 result.number = buffer.getNumber();
1013                 result.identifier = buffer.getIdentifier();
1014                 result.url = buffer.getUrl();
1015                 result.textLength = buffer.getTextLength();
1016                 
1017                 buffer.incrementTuple();
1018                 buffer.autoIncrementNumber();
1019                 
1020                 return result;
1021             }           
1022             
1023             public final void fill() throws IOException {
1024                 try {   
1025                     buffer.reset();
1026                     
1027                     if (tupleCount != 0) {
1028                                                       
1029                         if(updateNumberCount - tupleCount > 0) {
1030                             buffer.numbers.add(last.number);
1031                             buffer.numberTupleIdx.add((int) (updateNumberCount - tupleCount));
1032                         }
1033                         bufferStartCount = tupleCount;
1034                     }
1035                     
1036                     while (!buffer.isFull()) {
1037                         updateNumber();
1038                         buffer.processTuple(input.readString(), input.readString(), input.readInt());
1039                         tupleCount++;
1040                     }
1041                 } catch(EOFException e) {}
1042             }
1043 
1044             public final void updateNumber() throws IOException {
1045                 if (updateNumberCount > tupleCount)
1046                     return;
1047                      
1048                 last.number = input.readInt();
1049                 updateNumberCount = tupleCount + input.readInt();
1050                                       
1051                 buffer.processNumber(last.number);
1052             }
1053 
1054             public void run() throws IOException {
1055                 while (true) {
1056                     fill();
1057                     
1058                     if (buffer.isAtEnd())
1059                         break;
1060                     
1061                     buffer.copyUntil(null, processor);
1062                 }      
1063                 processor.close();
1064             }
1065             
1066             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1067                 if (processor instanceof ShreddedProcessor) {
1068                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1069                 } else if (processor instanceof NumberedDocumentData.Processor) {
1070                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedDocumentData.Processor) processor));
1071                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1072                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedDocumentData>) processor));
1073                 } else {
1074                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1075                 }
1076             }                                
1077             
1078             public Class<NumberedDocumentData> getOutputClass() {
1079                 return NumberedDocumentData.class;
1080             }                
1081         }
1082         
1083         public static class DuplicateEliminator implements ShreddedProcessor {
1084             public ShreddedProcessor processor;
1085             NumberedDocumentData last = new NumberedDocumentData();
1086             boolean numberProcess = true;
1087                                            
1088             public DuplicateEliminator() {}
1089             public DuplicateEliminator(ShreddedProcessor processor) {
1090                 this.processor = processor;
1091             }
1092             
1093             public void setShreddedProcessor(ShreddedProcessor processor) {
1094                 this.processor = processor;
1095             }
1096 
1097             public void processNumber(int number) throws IOException {  
1098                 if (numberProcess || Utility.compare(number, last.number) != 0) {
1099                     last.number = number;
1100                     processor.processNumber(number);
1101                     numberProcess = false;
1102                 }
1103             }  
1104             
1105             public void resetNumber() {
1106                  numberProcess = true;
1107             }                                                
1108                                
1109             public void processTuple(String identifier, String url, int textLength) throws IOException {
1110                 processor.processTuple(identifier, url, textLength);
1111             } 
1112             
1113             public void close() throws IOException {
1114                 processor.close();
1115             }                    
1116         }
1117         public static class TupleUnshredder implements ShreddedProcessor {
1118             NumberedDocumentData last = new NumberedDocumentData();
1119             public org.galagosearch.tupleflow.Processor<NumberedDocumentData> processor;                               
1120             
1121             public TupleUnshredder(NumberedDocumentData.Processor processor) {
1122                 this.processor = processor;
1123             }         
1124             
1125             public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedDocumentData> processor) {
1126                 this.processor = processor;
1127             }
1128             
1129             public NumberedDocumentData clone(NumberedDocumentData object) {
1130                 NumberedDocumentData result = new NumberedDocumentData();
1131                 if (object == null) return result;
1132                 result.identifier = object.identifier; 
1133                 result.url = object.url; 
1134                 result.number = object.number; 
1135                 result.textLength = object.textLength; 
1136                 return result;
1137             }                 
1138             
1139             public void processNumber(int number) throws IOException {
1140                 last.number = number;
1141             }   
1142                 
1143             
1144             public void processTuple(String identifier, String url, int textLength) throws IOException {
1145                 last.identifier = identifier;
1146                 last.url = url;
1147                 last.textLength = textLength;
1148                 processor.process(clone(last));
1149             }               
1150             
1151             public void close() throws IOException {
1152                 processor.close();
1153             }
1154         }     
1155         public static class TupleShredder implements Processor {
1156             NumberedDocumentData last = new NumberedDocumentData();
1157             public ShreddedProcessor processor;
1158             
1159             public TupleShredder(ShreddedProcessor processor) {
1160                 this.processor = processor;
1161             }                              
1162             
1163             public NumberedDocumentData clone(NumberedDocumentData object) {
1164                 NumberedDocumentData result = new NumberedDocumentData();
1165                 if (object == null) return result;
1166                 result.identifier = object.identifier; 
1167                 result.url = object.url; 
1168                 result.number = object.number; 
1169                 result.textLength = object.textLength; 
1170                 return result;
1171             }                 
1172             
1173             public void process(NumberedDocumentData object) throws IOException {                                                                                                                                                   
1174                 boolean processAll = false;
1175                 if(last == null || Utility.compare(last.number, object.number) != 0 || processAll) { processor.processNumber(object.number); processAll = true; }
1176                 processor.processTuple(object.identifier, object.url, object.textLength);                                         
1177             }
1178                           
1179             public Class<NumberedDocumentData> getInputClass() {
1180                 return NumberedDocumentData.class;
1181             }
1182             
1183             public void close() throws IOException {
1184                 processor.close();
1185             }                     
1186         }
1187     } 
1188 }