View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class DocumentNumberWordInteger implements Type<DocumentNumberWordInteger> {
25      public byte[] word;
26      public int document;
27      public int value; 
28      
29      public DocumentNumberWordInteger() {}
30      public DocumentNumberWordInteger(byte[] word, int document, int value) {
31          this.word = word;
32          this.document = document;
33          this.value = value;
34      }  
35      
36      public String toString() {
37          try {
38              return String.format("%s,%d,%d",
39                                     new String(word, "UTF-8"), document, value);
40          } catch(UnsupportedEncodingException e) {
41              throw new RuntimeException("Couldn't convert string to UTF-8.");
42          }
43      } 
44  
45      public Order<DocumentNumberWordInteger> getOrder(String... spec) {
46          if (Arrays.equals(spec, new String[] {  })) {
47              return new Unordered();
48          }
49          if (Arrays.equals(spec, new String[] { "+word", "+document" })) {
50              return new WordDocumentOrder();
51          }
52          if (Arrays.equals(spec, new String[] { "+document" })) {
53              return new DocumentOrder();
54          }
55          if (Arrays.equals(spec, new String[] { "+value" })) {
56              return new ValueOrder();
57          }
58          return null;
59      } 
60        
61      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> {
62          public void process(DocumentNumberWordInteger object) throws IOException;
63          public void close() throws IOException;
64      }                        
65      public interface Source extends Step {
66      }
67      public static class Unordered implements Order<DocumentNumberWordInteger> {
68          public int hash(DocumentNumberWordInteger object) {
69              int h = 0;
70              return h;
71          } 
72          public Comparator<DocumentNumberWordInteger> greaterThan() {
73              return new Comparator<DocumentNumberWordInteger>() {
74                  public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
75                      int result = 0;
76                      do {
77                      } while (false);
78                      return -result;
79                  }
80              };
81          }     
82          public Comparator<DocumentNumberWordInteger> lessThan() {
83              return new Comparator<DocumentNumberWordInteger>() {
84                  public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
85                      int result = 0;
86                      do {
87                      } while (false);
88                      return result;
89                  }
90              };
91          }     
92          public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input) {
93              return new ShreddedReader(_input);
94          }    
95  
96          public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input, int bufferSize) {
97              return new ShreddedReader(_input, bufferSize);
98          }    
99          public OrderedWriter<DocumentNumberWordInteger> orderedWriter(ArrayOutput _output) {
100             ShreddedWriter w = new ShreddedWriter(_output);
101             return new OrderedWriterClass(w); 
102         }                                    
103         public static class OrderedWriterClass extends OrderedWriter< DocumentNumberWordInteger > {
104             DocumentNumberWordInteger last = null;
105             ShreddedWriter shreddedWriter = null; 
106             
107             public OrderedWriterClass(ShreddedWriter s) {
108                 this.shreddedWriter = s;
109             }
110             
111             public void process(DocumentNumberWordInteger object) throws IOException {
112                boolean processAll = false;
113                shreddedWriter.processTuple(object.word, object.document, object.value);
114                last = object;
115             }           
116                  
117             public void close() throws IOException {
118                 shreddedWriter.close();
119             }
120             
121             public Class<DocumentNumberWordInteger> getInputClass() {
122                 return DocumentNumberWordInteger.class;
123             }
124         } 
125         public ReaderSource<DocumentNumberWordInteger> orderedCombiner(Collection<TypeReader<DocumentNumberWordInteger>> readers, boolean closeOnExit) {
126             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
127             
128             for (TypeReader<DocumentNumberWordInteger> reader : readers) {
129                 shreddedReaders.add((ShreddedReader)reader);
130             }
131             
132             return new ShreddedCombiner(shreddedReaders, closeOnExit);
133         }                  
134         public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
135             DocumentNumberWordInteger result = new DocumentNumberWordInteger();
136             if (object == null) return result;
137             result.word = object.word; 
138             result.document = object.document; 
139             result.value = object.value; 
140             return result;
141         }                 
142         public Class<DocumentNumberWordInteger> getOrderedClass() {
143             return DocumentNumberWordInteger.class;
144         }                           
145         public String[] getOrderSpec() {
146             return new String[] {};
147         }
148 
149         public static String getSpecString() {
150             return "";
151         }
152                            
153         public interface ShreddedProcessor extends Step {
154             public void processTuple(byte[] word, int document, int value) throws IOException;
155             public void close() throws IOException;
156         }    
157         public interface ShreddedSource extends Step {
158         }                                              
159         
160         public static class ShreddedWriter implements ShreddedProcessor {
161             ArrayOutput output;
162             ShreddedBuffer buffer = new ShreddedBuffer();
163             boolean lastFlush = false;
164             
165             public ShreddedWriter(ArrayOutput output) {
166                 this.output = output;
167             }                        
168             
169             public void close() throws IOException {
170                 flush();
171             }
172             
173             public final void processTuple(byte[] word, int document, int value) throws IOException {
174                 if (lastFlush) {
175                     lastFlush = false;
176                 }
177                 buffer.processTuple(word, document, value);
178                 if (buffer.isFull())
179                     flush();
180             }
181             public final void flushTuples(int pauseIndex) throws IOException {
182                 
183                 while (buffer.getReadIndex() < pauseIndex) {
184                            
185                     output.writeBytes(buffer.getWord());
186                     output.writeInt(buffer.getDocument());
187                     output.writeInt(buffer.getValue());
188                     buffer.incrementTuple();
189                 }
190             }  
191             public void flush() throws IOException { 
192                 flushTuples(buffer.getWriteIndex());
193                 buffer.reset(); 
194                 lastFlush = true;
195             }                           
196         }
197         public static class ShreddedBuffer {
198                             
199             byte[][] words;
200             int[] documents;
201             int[] values;
202             int writeTupleIndex = 0;
203             int readTupleIndex = 0;
204             int batchSize;
205 
206             public ShreddedBuffer(int batchSize) {
207                 this.batchSize = batchSize;
208 
209                 words = new byte[batchSize][];
210                 documents = new int[batchSize];
211                 values = new int[batchSize];
212             }                              
213 
214             public ShreddedBuffer() {    
215                 this(10000);
216             }                                                                                                                    
217             
218             public void processTuple(byte[] word, int document, int value) {
219                 words[writeTupleIndex] = word;
220                 documents[writeTupleIndex] = document;
221                 values[writeTupleIndex] = value;
222                 writeTupleIndex++;
223             }
224             public void resetData() {
225                 writeTupleIndex = 0;
226             }                  
227                                  
228             public void resetRead() {
229                 readTupleIndex = 0;
230             } 
231 
232             public void reset() {
233                 resetData();
234                 resetRead();
235             } 
236             public boolean isFull() {
237                 return writeTupleIndex >= batchSize;
238             }
239 
240             public boolean isEmpty() {
241                 return writeTupleIndex == 0;
242             }                          
243 
244             public boolean isAtEnd() {
245                 return readTupleIndex >= writeTupleIndex;
246             }           
247             public void incrementTuple() {
248                 readTupleIndex++;
249             }                    
250             public int getReadIndex() {
251                 return readTupleIndex;
252             }   
253 
254             public int getWriteIndex() {
255                 return writeTupleIndex;
256             } 
257             public byte[] getWord() {
258                 assert readTupleIndex < writeTupleIndex;
259                 return words[readTupleIndex];
260             }                                         
261             public int getDocument() {
262                 assert readTupleIndex < writeTupleIndex;
263                 return documents[readTupleIndex];
264             }                                         
265             public int getValue() {
266                 assert readTupleIndex < writeTupleIndex;
267                 return values[readTupleIndex];
268             }                                         
269             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
270                 while (getReadIndex() < endIndex) {
271                    output.processTuple(getWord(), getDocument(), getValue());
272                    incrementTuple();
273                 }
274             }                                                                           
275              
276             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
277             }
278             
279         }                         
280         public static class ShreddedCombiner implements ReaderSource<DocumentNumberWordInteger>, ShreddedSource {   
281             public ShreddedProcessor processor;
282             Collection<ShreddedReader> readers;       
283             boolean closeOnExit = false;
284             boolean uninitialized = true;
285             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
286             
287             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
288                 this.readers = readers;                                                       
289                 this.closeOnExit = closeOnExit;
290             }
291                                   
292             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
293                 if (processor instanceof ShreddedProcessor) {
294                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
295                 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
296                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
297                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
298                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
299                 } else {
300                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
301                 }
302             }                                
303             
304             public Class<DocumentNumberWordInteger> getOutputClass() {
305                 return DocumentNumberWordInteger.class;
306             }
307             
308             public void initialize() throws IOException {
309                 for (ShreddedReader reader : readers) {
310                     reader.fill();                                        
311                     
312                     if (!reader.getBuffer().isAtEnd())
313                         queue.add(reader);
314                 }   
315 
316                 uninitialized = false;
317             }
318 
319             public void run() throws IOException {
320                 initialize();
321                
322                 while (queue.size() > 0) {
323                     ShreddedReader top = queue.poll();
324                     ShreddedReader next = null;
325                     ShreddedBuffer nextBuffer = null; 
326                     
327                     assert !top.getBuffer().isAtEnd();
328                                                   
329                     if (queue.size() > 0) {
330                         next = queue.peek();
331                         nextBuffer = next.getBuffer();
332                         assert !nextBuffer.isAtEnd();
333                     }
334                     
335                     top.getBuffer().copyUntil(nextBuffer, processor);
336                     if (top.getBuffer().isAtEnd())
337                         top.fill();                 
338                         
339                     if (!top.getBuffer().isAtEnd())
340                         queue.add(top);
341                 }              
342                 
343                 if (closeOnExit)
344                     processor.close();
345             }
346 
347             public DocumentNumberWordInteger read() throws IOException {
348                 if (uninitialized)
349                     initialize();
350 
351                 DocumentNumberWordInteger result = null;
352 
353                 while (queue.size() > 0) {
354                     ShreddedReader top = queue.poll();
355                     result = top.read();
356 
357                     if (result != null) {
358                         if (top.getBuffer().isAtEnd())
359                             top.fill();
360 
361                         queue.offer(top);
362                         break;
363                     } 
364                 }
365 
366                 return result;
367             }
368         } 
369         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentNumberWordInteger>, ShreddedSource {      
370             public ShreddedProcessor processor;
371             ShreddedBuffer buffer;
372             DocumentNumberWordInteger last = new DocumentNumberWordInteger();         
373             long tupleCount = 0;
374             long bufferStartCount = 0;  
375             ArrayInput input;
376             
377             public ShreddedReader(ArrayInput input) {
378                 this.input = input; 
379                 this.buffer = new ShreddedBuffer();
380             }                               
381             
382             public ShreddedReader(ArrayInput input, int bufferSize) { 
383                 this.input = input;
384                 this.buffer = new ShreddedBuffer(bufferSize);
385             }
386                  
387             public final int compareTo(ShreddedReader other) {
388                 ShreddedBuffer otherBuffer = other.getBuffer();
389                 
390                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
391                     return 0;                 
392                 } else if (buffer.isAtEnd()) {
393                     return -1;
394                 } else if (otherBuffer.isAtEnd()) {
395                     return 1;
396                 }
397                                    
398                 int result = 0;
399                 do {
400                 } while (false);                                             
401                 
402                 return result;
403             }
404             
405             public final ShreddedBuffer getBuffer() {
406                 return buffer;
407             }                
408             
409             public final DocumentNumberWordInteger read() throws IOException {
410                 if (buffer.isAtEnd()) {
411                     fill();             
412                 
413                     if (buffer.isAtEnd()) {
414                         return null;
415                     }
416                 }
417                       
418                 assert !buffer.isAtEnd();
419                 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
420                 
421                 result.word = buffer.getWord();
422                 result.document = buffer.getDocument();
423                 result.value = buffer.getValue();
424                 
425                 buffer.incrementTuple();
426                 
427                 return result;
428             }           
429             
430             public final void fill() throws IOException {
431                 try {   
432                     buffer.reset();
433                     
434                     if (tupleCount != 0) {
435                         bufferStartCount = tupleCount;
436                     }
437                     
438                     while (!buffer.isFull()) {
439                         buffer.processTuple(input.readBytes(), input.readInt(), input.readInt());
440                         tupleCount++;
441                     }
442                 } catch(EOFException e) {}
443             }
444 
445 
446             public void run() throws IOException {
447                 while (true) {
448                     fill();
449                     
450                     if (buffer.isAtEnd())
451                         break;
452                     
453                     buffer.copyUntil(null, processor);
454                 }      
455                 processor.close();
456             }
457             
458             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
459                 if (processor instanceof ShreddedProcessor) {
460                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
461                 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
462                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
463                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
464                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
465                 } else {
466                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
467                 }
468             }                                
469             
470             public Class<DocumentNumberWordInteger> getOutputClass() {
471                 return DocumentNumberWordInteger.class;
472             }                
473         }
474         
475         public static class DuplicateEliminator implements ShreddedProcessor {
476             public ShreddedProcessor processor;
477             DocumentNumberWordInteger last = new DocumentNumberWordInteger();
478                                            
479             public DuplicateEliminator() {}
480             public DuplicateEliminator(ShreddedProcessor processor) {
481                 this.processor = processor;
482             }
483             
484             public void setShreddedProcessor(ShreddedProcessor processor) {
485                 this.processor = processor;
486             }
487 
488           
489             
490                                
491             public void processTuple(byte[] word, int document, int value) throws IOException {
492                 processor.processTuple(word, document, value);
493             } 
494             
495             public void close() throws IOException {
496                 processor.close();
497             }                    
498         }
499         public static class TupleUnshredder implements ShreddedProcessor {
500             DocumentNumberWordInteger last = new DocumentNumberWordInteger();
501             public org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor;                               
502             
503             public TupleUnshredder(DocumentNumberWordInteger.Processor processor) {
504                 this.processor = processor;
505             }         
506             
507             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor) {
508                 this.processor = processor;
509             }
510             
511             public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
512                 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
513                 if (object == null) return result;
514                 result.word = object.word; 
515                 result.document = object.document; 
516                 result.value = object.value; 
517                 return result;
518             }                 
519             
520             
521             public void processTuple(byte[] word, int document, int value) throws IOException {
522                 last.word = word;
523                 last.document = document;
524                 last.value = value;
525                 processor.process(clone(last));
526             }               
527             
528             public void close() throws IOException {
529                 processor.close();
530             }
531         }     
532         public static class TupleShredder implements Processor {
533             DocumentNumberWordInteger last = new DocumentNumberWordInteger();
534             public ShreddedProcessor processor;
535             
536             public TupleShredder(ShreddedProcessor processor) {
537                 this.processor = processor;
538             }                              
539             
540             public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
541                 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
542                 if (object == null) return result;
543                 result.word = object.word; 
544                 result.document = object.document; 
545                 result.value = object.value; 
546                 return result;
547             }                 
548             
549             public void process(DocumentNumberWordInteger object) throws IOException {                                                                                                                                                   
550                 boolean processAll = false;
551                 processor.processTuple(object.word, object.document, object.value);                                         
552             }
553                           
554             public Class<DocumentNumberWordInteger> getInputClass() {
555                 return DocumentNumberWordInteger.class;
556             }
557             
558             public void close() throws IOException {
559                 processor.close();
560             }                     
561         }
562     } 
563     public static class WordDocumentOrder implements Order<DocumentNumberWordInteger> {
564         public int hash(DocumentNumberWordInteger object) {
565             int h = 0;
566             h += Utility.hash(object.word);
567             h += Utility.hash(object.document);
568             return h;
569         } 
570         public Comparator<DocumentNumberWordInteger> greaterThan() {
571             return new Comparator<DocumentNumberWordInteger>() {
572                 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
573                     int result = 0;
574                     do {
575                         result = + Utility.compare(one.word, two.word);
576                         if(result != 0) break;
577                         result = + Utility.compare(one.document, two.document);
578                         if(result != 0) break;
579                     } while (false);
580                     return -result;
581                 }
582             };
583         }     
584         public Comparator<DocumentNumberWordInteger> lessThan() {
585             return new Comparator<DocumentNumberWordInteger>() {
586                 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
587                     int result = 0;
588                     do {
589                         result = + Utility.compare(one.word, two.word);
590                         if(result != 0) break;
591                         result = + Utility.compare(one.document, two.document);
592                         if(result != 0) break;
593                     } while (false);
594                     return result;
595                 }
596             };
597         }     
598         public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input) {
599             return new ShreddedReader(_input);
600         }    
601 
602         public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input, int bufferSize) {
603             return new ShreddedReader(_input, bufferSize);
604         }    
605         public OrderedWriter<DocumentNumberWordInteger> orderedWriter(ArrayOutput _output) {
606             ShreddedWriter w = new ShreddedWriter(_output);
607             return new OrderedWriterClass(w); 
608         }                                    
609         public static class OrderedWriterClass extends OrderedWriter< DocumentNumberWordInteger > {
610             DocumentNumberWordInteger last = null;
611             ShreddedWriter shreddedWriter = null; 
612             
613             public OrderedWriterClass(ShreddedWriter s) {
614                 this.shreddedWriter = s;
615             }
616             
617             public void process(DocumentNumberWordInteger object) throws IOException {
618                boolean processAll = false;
619                if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
620                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
621                shreddedWriter.processTuple(object.value);
622                last = object;
623             }           
624                  
625             public void close() throws IOException {
626                 shreddedWriter.close();
627             }
628             
629             public Class<DocumentNumberWordInteger> getInputClass() {
630                 return DocumentNumberWordInteger.class;
631             }
632         } 
633         public ReaderSource<DocumentNumberWordInteger> orderedCombiner(Collection<TypeReader<DocumentNumberWordInteger>> readers, boolean closeOnExit) {
634             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
635             
636             for (TypeReader<DocumentNumberWordInteger> reader : readers) {
637                 shreddedReaders.add((ShreddedReader)reader);
638             }
639             
640             return new ShreddedCombiner(shreddedReaders, closeOnExit);
641         }                  
642         public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
643             DocumentNumberWordInteger result = new DocumentNumberWordInteger();
644             if (object == null) return result;
645             result.word = object.word; 
646             result.document = object.document; 
647             result.value = object.value; 
648             return result;
649         }                 
650         public Class<DocumentNumberWordInteger> getOrderedClass() {
651             return DocumentNumberWordInteger.class;
652         }                           
653         public String[] getOrderSpec() {
654             return new String[] {"+word", "+document"};
655         }
656 
657         public static String getSpecString() {
658             return "+word +document";
659         }
660                            
661         public interface ShreddedProcessor extends Step {
662             public void processWord(byte[] word) throws IOException;
663             public void processDocument(int document) throws IOException;
664             public void processTuple(int value) throws IOException;
665             public void close() throws IOException;
666         }    
667         public interface ShreddedSource extends Step {
668         }                                              
669         
670         public static class ShreddedWriter implements ShreddedProcessor {
671             ArrayOutput output;
672             ShreddedBuffer buffer = new ShreddedBuffer();
673             byte[] lastWord;
674             int lastDocument;
675             boolean lastFlush = false;
676             
677             public ShreddedWriter(ArrayOutput output) {
678                 this.output = output;
679             }                        
680             
681             public void close() throws IOException {
682                 flush();
683             }
684             
685             public void processWord(byte[] word) {
686                 lastWord = word;
687                 buffer.processWord(word);
688             }
689             public void processDocument(int document) {
690                 lastDocument = document;
691                 buffer.processDocument(document);
692             }
693             public final void processTuple(int value) throws IOException {
694                 if (lastFlush) {
695                     if(buffer.words.size() == 0) buffer.processWord(lastWord);
696                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
697                     lastFlush = false;
698                 }
699                 buffer.processTuple(value);
700                 if (buffer.isFull())
701                     flush();
702             }
703             public final void flushTuples(int pauseIndex) throws IOException {
704                 
705                 while (buffer.getReadIndex() < pauseIndex) {
706                            
707                     output.writeInt(buffer.getValue());
708                     buffer.incrementTuple();
709                 }
710             }  
711             public final void flushWord(int pauseIndex) throws IOException {
712                 while (buffer.getReadIndex() < pauseIndex) {
713                     int nextPause = buffer.getWordEndIndex();
714                     int count = nextPause - buffer.getReadIndex();
715                     
716                     output.writeBytes(buffer.getWord());
717                     output.writeInt(count);
718                     buffer.incrementWord();
719                       
720                     flushDocument(nextPause);
721                     assert nextPause == buffer.getReadIndex();
722                 }
723             }
724             public final void flushDocument(int pauseIndex) throws IOException {
725                 while (buffer.getReadIndex() < pauseIndex) {
726                     int nextPause = buffer.getDocumentEndIndex();
727                     int count = nextPause - buffer.getReadIndex();
728                     
729                     output.writeInt(buffer.getDocument());
730                     output.writeInt(count);
731                     buffer.incrementDocument();
732                       
733                     flushTuples(nextPause);
734                     assert nextPause == buffer.getReadIndex();
735                 }
736             }
737             public void flush() throws IOException { 
738                 flushWord(buffer.getWriteIndex());
739                 buffer.reset(); 
740                 lastFlush = true;
741             }                           
742         }
743         public static class ShreddedBuffer {
744             ArrayList<byte[]> words = new ArrayList();
745             ArrayList<Integer> documents = new ArrayList();
746             ArrayList<Integer> wordTupleIdx = new ArrayList();
747             ArrayList<Integer> documentTupleIdx = new ArrayList();
748             int wordReadIdx = 0;
749             int documentReadIdx = 0;
750                             
751             int[] values;
752             int writeTupleIndex = 0;
753             int readTupleIndex = 0;
754             int batchSize;
755 
756             public ShreddedBuffer(int batchSize) {
757                 this.batchSize = batchSize;
758 
759                 values = new int[batchSize];
760             }                              
761 
762             public ShreddedBuffer() {    
763                 this(10000);
764             }                                                                                                                    
765             
766             public void processWord(byte[] word) {
767                 words.add(word);
768                 wordTupleIdx.add(writeTupleIndex);
769             }                                      
770             public void processDocument(int document) {
771                 documents.add(document);
772                 documentTupleIdx.add(writeTupleIndex);
773             }                                      
774             public void processTuple(int value) {
775                 assert words.size() > 0;
776                 assert documents.size() > 0;
777                 values[writeTupleIndex] = value;
778                 writeTupleIndex++;
779             }
780             public void resetData() {
781                 words.clear();
782                 documents.clear();
783                 wordTupleIdx.clear();
784                 documentTupleIdx.clear();
785                 writeTupleIndex = 0;
786             }                  
787                                  
788             public void resetRead() {
789                 readTupleIndex = 0;
790                 wordReadIdx = 0;
791                 documentReadIdx = 0;
792             } 
793 
794             public void reset() {
795                 resetData();
796                 resetRead();
797             } 
798             public boolean isFull() {
799                 return writeTupleIndex >= batchSize;
800             }
801 
802             public boolean isEmpty() {
803                 return writeTupleIndex == 0;
804             }                          
805 
806             public boolean isAtEnd() {
807                 return readTupleIndex >= writeTupleIndex;
808             }           
809             public void incrementWord() {
810                 wordReadIdx++;  
811             }                                                                                              
812 
813             public void autoIncrementWord() {
814                 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
815                     wordReadIdx++;
816             }                 
817             public void incrementDocument() {
818                 documentReadIdx++;  
819             }                                                                                              
820 
821             public void autoIncrementDocument() {
822                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
823                     documentReadIdx++;
824             }                 
825             public void incrementTuple() {
826                 readTupleIndex++;
827             }                    
828             public int getWordEndIndex() {
829                 if ((wordReadIdx+1) >= wordTupleIdx.size())
830                     return writeTupleIndex;
831                 return wordTupleIdx.get(wordReadIdx+1);
832             }
833 
834             public int getDocumentEndIndex() {
835                 if ((documentReadIdx+1) >= documentTupleIdx.size())
836                     return writeTupleIndex;
837                 return documentTupleIdx.get(documentReadIdx+1);
838             }
839             public int getReadIndex() {
840                 return readTupleIndex;
841             }   
842 
843             public int getWriteIndex() {
844                 return writeTupleIndex;
845             } 
846             public byte[] getWord() {
847                 assert readTupleIndex < writeTupleIndex;
848                 assert wordReadIdx < words.size();
849                 
850                 return words.get(wordReadIdx);
851             }
852             public int getDocument() {
853                 assert readTupleIndex < writeTupleIndex;
854                 assert documentReadIdx < documents.size();
855                 
856                 return documents.get(documentReadIdx);
857             }
858             public int getValue() {
859                 assert readTupleIndex < writeTupleIndex;
860                 return values[readTupleIndex];
861             }                                         
862             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
863                 while (getReadIndex() < endIndex) {
864                    output.processTuple(getValue());
865                    incrementTuple();
866                 }
867             }                                                                           
868             public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
869                 while (getReadIndex() < endIndex) {
870                     output.processWord(getWord());
871                     assert getWordEndIndex() <= endIndex;
872                     copyUntilIndexDocument(getWordEndIndex(), output);
873                     incrementWord();
874                 }
875             } 
876             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
877                 while (getReadIndex() < endIndex) {
878                     output.processDocument(getDocument());
879                     assert getDocumentEndIndex() <= endIndex;
880                     copyTuples(getDocumentEndIndex(), output);
881                     incrementDocument();
882                 }
883             }  
884             public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
885                 while (!isAtEnd()) {
886                     if (other != null) {   
887                         assert !other.isAtEnd();
888                         int c = + Utility.compare(getWord(), other.getWord());
889                     
890                         if (c > 0) {
891                             break;   
892                         }
893                         
894                         output.processWord(getWord());
895                                       
896                         if (c < 0) {
897                             copyUntilIndexDocument(getWordEndIndex(), output);
898                         } else if (c == 0) {
899                             copyUntilDocument(other, output);
900                             autoIncrementWord();
901                             break;
902                         }
903                     } else {
904                         output.processWord(getWord());
905                         copyUntilIndexDocument(getWordEndIndex(), output);
906                     }
907                     incrementWord();  
908                     
909                
910                 }
911             }
912             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
913                 while (!isAtEnd()) {
914                     if (other != null) {   
915                         assert !other.isAtEnd();
916                         int c = + Utility.compare(getDocument(), other.getDocument());
917                     
918                         if (c > 0) {
919                             break;   
920                         }
921                         
922                         output.processDocument(getDocument());
923                                       
924                         copyTuples(getDocumentEndIndex(), output);
925                     } else {
926                         output.processDocument(getDocument());
927                         copyTuples(getDocumentEndIndex(), output);
928                     }
929                     incrementDocument();  
930                     
931                     if (getWordEndIndex() <= readTupleIndex)
932                         break;   
933                 }
934             }
935             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
936                 copyUntilWord(other, output);
937             }
938             
939         }                         
940         public static class ShreddedCombiner implements ReaderSource<DocumentNumberWordInteger>, ShreddedSource {   
941             public ShreddedProcessor processor;
942             Collection<ShreddedReader> readers;       
943             boolean closeOnExit = false;
944             boolean uninitialized = true;
945             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
946             
947             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
948                 this.readers = readers;                                                       
949                 this.closeOnExit = closeOnExit;
950             }
951                                   
952             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
953                 if (processor instanceof ShreddedProcessor) {
954                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
955                 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
956                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
957                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
958                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
959                 } else {
960                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
961                 }
962             }                                
963             
964             public Class<DocumentNumberWordInteger> getOutputClass() {
965                 return DocumentNumberWordInteger.class;
966             }
967             
968             public void initialize() throws IOException {
969                 for (ShreddedReader reader : readers) {
970                     reader.fill();                                        
971                     
972                     if (!reader.getBuffer().isAtEnd())
973                         queue.add(reader);
974                 }   
975 
976                 uninitialized = false;
977             }
978 
979             public void run() throws IOException {
980                 initialize();
981                
982                 while (queue.size() > 0) {
983                     ShreddedReader top = queue.poll();
984                     ShreddedReader next = null;
985                     ShreddedBuffer nextBuffer = null; 
986                     
987                     assert !top.getBuffer().isAtEnd();
988                                                   
989                     if (queue.size() > 0) {
990                         next = queue.peek();
991                         nextBuffer = next.getBuffer();
992                         assert !nextBuffer.isAtEnd();
993                     }
994                     
995                     top.getBuffer().copyUntil(nextBuffer, processor);
996                     if (top.getBuffer().isAtEnd())
997                         top.fill();                 
998                         
999                     if (!top.getBuffer().isAtEnd())
1000                         queue.add(top);
1001                 }              
1002                 
1003                 if (closeOnExit)
1004                     processor.close();
1005             }
1006 
1007             public DocumentNumberWordInteger read() throws IOException {
1008                 if (uninitialized)
1009                     initialize();
1010 
1011                 DocumentNumberWordInteger result = null;
1012 
1013                 while (queue.size() > 0) {
1014                     ShreddedReader top = queue.poll();
1015                     result = top.read();
1016 
1017                     if (result != null) {
1018                         if (top.getBuffer().isAtEnd())
1019                             top.fill();
1020 
1021                         queue.offer(top);
1022                         break;
1023                     } 
1024                 }
1025 
1026                 return result;
1027             }
1028         } 
1029         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentNumberWordInteger>, ShreddedSource {      
1030             public ShreddedProcessor processor;
1031             ShreddedBuffer buffer;
1032             DocumentNumberWordInteger last = new DocumentNumberWordInteger();         
1033             long updateWordCount = -1;
1034             long updateDocumentCount = -1;
1035             long tupleCount = 0;
1036             long bufferStartCount = 0;  
1037             ArrayInput input;
1038             
1039             public ShreddedReader(ArrayInput input) {
1040                 this.input = input; 
1041                 this.buffer = new ShreddedBuffer();
1042             }                               
1043             
1044             public ShreddedReader(ArrayInput input, int bufferSize) { 
1045                 this.input = input;
1046                 this.buffer = new ShreddedBuffer(bufferSize);
1047             }
1048                  
1049             public final int compareTo(ShreddedReader other) {
1050                 ShreddedBuffer otherBuffer = other.getBuffer();
1051                 
1052                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1053                     return 0;                 
1054                 } else if (buffer.isAtEnd()) {
1055                     return -1;
1056                 } else if (otherBuffer.isAtEnd()) {
1057                     return 1;
1058                 }
1059                                    
1060                 int result = 0;
1061                 do {
1062                     result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
1063                     if(result != 0) break;
1064                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
1065                     if(result != 0) break;
1066                 } while (false);                                             
1067                 
1068                 return result;
1069             }
1070             
1071             public final ShreddedBuffer getBuffer() {
1072                 return buffer;
1073             }                
1074             
1075             public final DocumentNumberWordInteger read() throws IOException {
1076                 if (buffer.isAtEnd()) {
1077                     fill();             
1078                 
1079                     if (buffer.isAtEnd()) {
1080                         return null;
1081                     }
1082                 }
1083                       
1084                 assert !buffer.isAtEnd();
1085                 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1086                 
1087                 result.word = buffer.getWord();
1088                 result.document = buffer.getDocument();
1089                 result.value = buffer.getValue();
1090                 
1091                 buffer.incrementTuple();
1092                 buffer.autoIncrementWord();
1093                 buffer.autoIncrementDocument();
1094                 
1095                 return result;
1096             }           
1097             
1098             public final void fill() throws IOException {
1099                 try {   
1100                     buffer.reset();
1101                     
1102                     if (tupleCount != 0) {
1103                                                       
1104                         if(updateWordCount - tupleCount > 0) {
1105                             buffer.words.add(last.word);
1106                             buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
1107                         }                              
1108                         if(updateDocumentCount - tupleCount > 0) {
1109                             buffer.documents.add(last.document);
1110                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
1111                         }
1112                         bufferStartCount = tupleCount;
1113                     }
1114                     
1115                     while (!buffer.isFull()) {
1116                         updateDocument();
1117                         buffer.processTuple(input.readInt());
1118                         tupleCount++;
1119                     }
1120                 } catch(EOFException e) {}
1121             }
1122 
1123             public final void updateWord() throws IOException {
1124                 if (updateWordCount > tupleCount)
1125                     return;
1126                      
1127                 last.word = input.readBytes();
1128                 updateWordCount = tupleCount + input.readInt();
1129                                       
1130                 buffer.processWord(last.word);
1131             }
1132             public final void updateDocument() throws IOException {
1133                 if (updateDocumentCount > tupleCount)
1134                     return;
1135                      
1136                 updateWord();
1137                 last.document = input.readInt();
1138                 updateDocumentCount = tupleCount + input.readInt();
1139                                       
1140                 buffer.processDocument(last.document);
1141             }
1142 
1143             public void run() throws IOException {
1144                 while (true) {
1145                     fill();
1146                     
1147                     if (buffer.isAtEnd())
1148                         break;
1149                     
1150                     buffer.copyUntil(null, processor);
1151                 }      
1152                 processor.close();
1153             }
1154             
1155             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1156                 if (processor instanceof ShreddedProcessor) {
1157                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1158                 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
1159                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
1160                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1161                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
1162                 } else {
1163                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1164                 }
1165             }                                
1166             
1167             public Class<DocumentNumberWordInteger> getOutputClass() {
1168                 return DocumentNumberWordInteger.class;
1169             }                
1170         }
1171         
1172         public static class DuplicateEliminator implements ShreddedProcessor {
1173             public ShreddedProcessor processor;
1174             DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1175             boolean wordProcess = true;
1176             boolean documentProcess = true;
1177                                            
1178             public DuplicateEliminator() {}
1179             public DuplicateEliminator(ShreddedProcessor processor) {
1180                 this.processor = processor;
1181             }
1182             
1183             public void setShreddedProcessor(ShreddedProcessor processor) {
1184                 this.processor = processor;
1185             }
1186 
1187             public void processWord(byte[] word) throws IOException {  
1188                 if (wordProcess || Utility.compare(word, last.word) != 0) {
1189                     last.word = word;
1190                     processor.processWord(word);
1191             resetDocument();
1192                     wordProcess = false;
1193                 }
1194             }
1195             public void processDocument(int document) throws IOException {  
1196                 if (documentProcess || Utility.compare(document, last.document) != 0) {
1197                     last.document = document;
1198                     processor.processDocument(document);
1199                     documentProcess = false;
1200                 }
1201             }  
1202             
1203             public void resetWord() {
1204                  wordProcess = true;
1205             resetDocument();
1206             }                                                
1207             public void resetDocument() {
1208                  documentProcess = true;
1209             }                                                
1210                                
1211             public void processTuple(int value) throws IOException {
1212                 processor.processTuple(value);
1213             } 
1214             
1215             public void close() throws IOException {
1216                 processor.close();
1217             }                    
1218         }
1219         public static class TupleUnshredder implements ShreddedProcessor {
1220             DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1221             public org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor;                               
1222             
1223             public TupleUnshredder(DocumentNumberWordInteger.Processor processor) {
1224                 this.processor = processor;
1225             }         
1226             
1227             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor) {
1228                 this.processor = processor;
1229             }
1230             
1231             public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
1232                 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1233                 if (object == null) return result;
1234                 result.word = object.word; 
1235                 result.document = object.document; 
1236                 result.value = object.value; 
1237                 return result;
1238             }                 
1239             
1240             public void processWord(byte[] word) throws IOException {
1241                 last.word = word;
1242             }   
1243                 
1244             public void processDocument(int document) throws IOException {
1245                 last.document = document;
1246             }   
1247                 
1248             
1249             public void processTuple(int value) throws IOException {
1250                 last.value = value;
1251                 processor.process(clone(last));
1252             }               
1253             
1254             public void close() throws IOException {
1255                 processor.close();
1256             }
1257         }     
1258         public static class TupleShredder implements Processor {
1259             DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1260             public ShreddedProcessor processor;
1261             
1262             public TupleShredder(ShreddedProcessor processor) {
1263                 this.processor = processor;
1264             }                              
1265             
1266             public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
1267                 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1268                 if (object == null) return result;
1269                 result.word = object.word; 
1270                 result.document = object.document; 
1271                 result.value = object.value; 
1272                 return result;
1273             }                 
1274             
1275             public void process(DocumentNumberWordInteger object) throws IOException {                                                                                                                                                   
1276                 boolean processAll = false;
1277                 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
1278                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
1279                 processor.processTuple(object.value);                                         
1280             }
1281                           
1282             public Class<DocumentNumberWordInteger> getInputClass() {
1283                 return DocumentNumberWordInteger.class;
1284             }
1285             
1286             public void close() throws IOException {
1287                 processor.close();
1288             }                     
1289         }
1290     } 
1291     public static class DocumentOrder implements Order<DocumentNumberWordInteger> {
1292         public int hash(DocumentNumberWordInteger object) {
1293             int h = 0;
1294             h += Utility.hash(object.document);
1295             return h;
1296         } 
1297         public Comparator<DocumentNumberWordInteger> greaterThan() {
1298             return new Comparator<DocumentNumberWordInteger>() {
1299                 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
1300                     int result = 0;
1301                     do {
1302                         result = + Utility.compare(one.document, two.document);
1303                         if(result != 0) break;
1304                     } while (false);
1305                     return -result;
1306                 }
1307             };
1308         }     
1309         public Comparator<DocumentNumberWordInteger> lessThan() {
1310             return new Comparator<DocumentNumberWordInteger>() {
1311                 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
1312                     int result = 0;
1313                     do {
1314                         result = + Utility.compare(one.document, two.document);
1315                         if(result != 0) break;
1316                     } while (false);
1317                     return result;
1318                 }
1319             };
1320         }     
1321         public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input) {
1322             return new ShreddedReader(_input);
1323         }    
1324 
1325         public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input, int bufferSize) {
1326             return new ShreddedReader(_input, bufferSize);
1327         }    
1328         public OrderedWriter<DocumentNumberWordInteger> orderedWriter(ArrayOutput _output) {
1329             ShreddedWriter w = new ShreddedWriter(_output);
1330             return new OrderedWriterClass(w); 
1331         }                                    
1332         public static class OrderedWriterClass extends OrderedWriter< DocumentNumberWordInteger > {
1333             DocumentNumberWordInteger last = null;
1334             ShreddedWriter shreddedWriter = null; 
1335             
1336             public OrderedWriterClass(ShreddedWriter s) {
1337                 this.shreddedWriter = s;
1338             }
1339             
1340             public void process(DocumentNumberWordInteger object) throws IOException {
1341                boolean processAll = false;
1342                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
1343                shreddedWriter.processTuple(object.word, object.value);
1344                last = object;
1345             }           
1346                  
1347             public void close() throws IOException {
1348                 shreddedWriter.close();
1349             }
1350             
1351             public Class<DocumentNumberWordInteger> getInputClass() {
1352                 return DocumentNumberWordInteger.class;
1353             }
1354         } 
1355         public ReaderSource<DocumentNumberWordInteger> orderedCombiner(Collection<TypeReader<DocumentNumberWordInteger>> readers, boolean closeOnExit) {
1356             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
1357             
1358             for (TypeReader<DocumentNumberWordInteger> reader : readers) {
1359                 shreddedReaders.add((ShreddedReader)reader);
1360             }
1361             
1362             return new ShreddedCombiner(shreddedReaders, closeOnExit);
1363         }                  
1364         public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
1365             DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1366             if (object == null) return result;
1367             result.word = object.word; 
1368             result.document = object.document; 
1369             result.value = object.value; 
1370             return result;
1371         }                 
1372         public Class<DocumentNumberWordInteger> getOrderedClass() {
1373             return DocumentNumberWordInteger.class;
1374         }                           
1375         public String[] getOrderSpec() {
1376             return new String[] {"+document"};
1377         }
1378 
1379         public static String getSpecString() {
1380             return "+document";
1381         }
1382                            
1383         public interface ShreddedProcessor extends Step {
1384             public void processDocument(int document) throws IOException;
1385             public void processTuple(byte[] word, int value) throws IOException;
1386             public void close() throws IOException;
1387         }    
1388         public interface ShreddedSource extends Step {
1389         }                                              
1390         
1391         public static class ShreddedWriter implements ShreddedProcessor {
1392             ArrayOutput output;
1393             ShreddedBuffer buffer = new ShreddedBuffer();
1394             int lastDocument;
1395             boolean lastFlush = false;
1396             
1397             public ShreddedWriter(ArrayOutput output) {
1398                 this.output = output;
1399             }                        
1400             
1401             public void close() throws IOException {
1402                 flush();
1403             }
1404             
1405             public void processDocument(int document) {
1406                 lastDocument = document;
1407                 buffer.processDocument(document);
1408             }
1409             public final void processTuple(byte[] word, int value) throws IOException {
1410                 if (lastFlush) {
1411                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
1412                     lastFlush = false;
1413                 }
1414                 buffer.processTuple(word, value);
1415                 if (buffer.isFull())
1416                     flush();
1417             }
1418             public final void flushTuples(int pauseIndex) throws IOException {
1419                 
1420                 while (buffer.getReadIndex() < pauseIndex) {
1421                            
1422                     output.writeBytes(buffer.getWord());
1423                     output.writeInt(buffer.getValue());
1424                     buffer.incrementTuple();
1425                 }
1426             }  
1427             public final void flushDocument(int pauseIndex) throws IOException {
1428                 while (buffer.getReadIndex() < pauseIndex) {
1429                     int nextPause = buffer.getDocumentEndIndex();
1430                     int count = nextPause - buffer.getReadIndex();
1431                     
1432                     output.writeInt(buffer.getDocument());
1433                     output.writeInt(count);
1434                     buffer.incrementDocument();
1435                       
1436                     flushTuples(nextPause);
1437                     assert nextPause == buffer.getReadIndex();
1438                 }
1439             }
1440             public void flush() throws IOException { 
1441                 flushDocument(buffer.getWriteIndex());
1442                 buffer.reset(); 
1443                 lastFlush = true;
1444             }                           
1445         }
1446         public static class ShreddedBuffer {
1447             ArrayList<Integer> documents = new ArrayList();
1448             ArrayList<Integer> documentTupleIdx = new ArrayList();
1449             int documentReadIdx = 0;
1450                             
1451             byte[][] words;
1452             int[] values;
1453             int writeTupleIndex = 0;
1454             int readTupleIndex = 0;
1455             int batchSize;
1456 
1457             public ShreddedBuffer(int batchSize) {
1458                 this.batchSize = batchSize;
1459 
1460                 words = new byte[batchSize][];
1461                 values = new int[batchSize];
1462             }                              
1463 
1464             public ShreddedBuffer() {    
1465                 this(10000);
1466             }                                                                                                                    
1467             
1468             public void processDocument(int document) {
1469                 documents.add(document);
1470                 documentTupleIdx.add(writeTupleIndex);
1471             }                                      
1472             public void processTuple(byte[] word, int value) {
1473                 assert documents.size() > 0;
1474                 words[writeTupleIndex] = word;
1475                 values[writeTupleIndex] = value;
1476                 writeTupleIndex++;
1477             }
1478             public void resetData() {
1479                 documents.clear();
1480                 documentTupleIdx.clear();
1481                 writeTupleIndex = 0;
1482             }                  
1483                                  
1484             public void resetRead() {
1485                 readTupleIndex = 0;
1486                 documentReadIdx = 0;
1487             } 
1488 
1489             public void reset() {
1490                 resetData();
1491                 resetRead();
1492             } 
1493             public boolean isFull() {
1494                 return writeTupleIndex >= batchSize;
1495             }
1496 
1497             public boolean isEmpty() {
1498                 return writeTupleIndex == 0;
1499             }                          
1500 
1501             public boolean isAtEnd() {
1502                 return readTupleIndex >= writeTupleIndex;
1503             }           
1504             public void incrementDocument() {
1505                 documentReadIdx++;  
1506             }                                                                                              
1507 
1508             public void autoIncrementDocument() {
1509                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
1510                     documentReadIdx++;
1511             }                 
1512             public void incrementTuple() {
1513                 readTupleIndex++;
1514             }                    
1515             public int getDocumentEndIndex() {
1516                 if ((documentReadIdx+1) >= documentTupleIdx.size())
1517                     return writeTupleIndex;
1518                 return documentTupleIdx.get(documentReadIdx+1);
1519             }
1520             public int getReadIndex() {
1521                 return readTupleIndex;
1522             }   
1523 
1524             public int getWriteIndex() {
1525                 return writeTupleIndex;
1526             } 
1527             public int getDocument() {
1528                 assert readTupleIndex < writeTupleIndex;
1529                 assert documentReadIdx < documents.size();
1530                 
1531                 return documents.get(documentReadIdx);
1532             }
1533             public byte[] getWord() {
1534                 assert readTupleIndex < writeTupleIndex;
1535                 return words[readTupleIndex];
1536             }                                         
1537             public int getValue() {
1538                 assert readTupleIndex < writeTupleIndex;
1539                 return values[readTupleIndex];
1540             }                                         
1541             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1542                 while (getReadIndex() < endIndex) {
1543                    output.processTuple(getWord(), getValue());
1544                    incrementTuple();
1545                 }
1546             }                                                                           
1547             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
1548                 while (getReadIndex() < endIndex) {
1549                     output.processDocument(getDocument());
1550                     assert getDocumentEndIndex() <= endIndex;
1551                     copyTuples(getDocumentEndIndex(), output);
1552                     incrementDocument();
1553                 }
1554             }  
1555             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1556                 while (!isAtEnd()) {
1557                     if (other != null) {   
1558                         assert !other.isAtEnd();
1559                         int c = + Utility.compare(getDocument(), other.getDocument());
1560                     
1561                         if (c > 0) {
1562                             break;   
1563                         }
1564                         
1565                         output.processDocument(getDocument());
1566                                       
1567                         copyTuples(getDocumentEndIndex(), output);
1568                     } else {
1569                         output.processDocument(getDocument());
1570                         copyTuples(getDocumentEndIndex(), output);
1571                     }
1572                     incrementDocument();  
1573                     
1574                
1575                 }
1576             }
1577             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1578                 copyUntilDocument(other, output);
1579             }
1580             
1581         }                         
1582         public static class ShreddedCombiner implements ReaderSource<DocumentNumberWordInteger>, ShreddedSource {   
1583             public ShreddedProcessor processor;
1584             Collection<ShreddedReader> readers;       
1585             boolean closeOnExit = false;
1586             boolean uninitialized = true;
1587             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1588             
1589             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1590                 this.readers = readers;                                                       
1591                 this.closeOnExit = closeOnExit;
1592             }
1593                                   
1594             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1595                 if (processor instanceof ShreddedProcessor) {
1596                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1597                 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
1598                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
1599                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1600                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
1601                 } else {
1602                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1603                 }
1604             }                                
1605             
1606             public Class<DocumentNumberWordInteger> getOutputClass() {
1607                 return DocumentNumberWordInteger.class;
1608             }
1609             
1610             public void initialize() throws IOException {
1611                 for (ShreddedReader reader : readers) {
1612                     reader.fill();                                        
1613                     
1614                     if (!reader.getBuffer().isAtEnd())
1615                         queue.add(reader);
1616                 }   
1617 
1618                 uninitialized = false;
1619             }
1620 
1621             public void run() throws IOException {
1622                 initialize();
1623                
1624                 while (queue.size() > 0) {
1625                     ShreddedReader top = queue.poll();
1626                     ShreddedReader next = null;
1627                     ShreddedBuffer nextBuffer = null; 
1628                     
1629                     assert !top.getBuffer().isAtEnd();
1630                                                   
1631                     if (queue.size() > 0) {
1632                         next = queue.peek();
1633                         nextBuffer = next.getBuffer();
1634                         assert !nextBuffer.isAtEnd();
1635                     }
1636                     
1637                     top.getBuffer().copyUntil(nextBuffer, processor);
1638                     if (top.getBuffer().isAtEnd())
1639                         top.fill();                 
1640                         
1641                     if (!top.getBuffer().isAtEnd())
1642                         queue.add(top);
1643                 }              
1644                 
1645                 if (closeOnExit)
1646                     processor.close();
1647             }
1648 
1649             public DocumentNumberWordInteger read() throws IOException {
1650                 if (uninitialized)
1651                     initialize();
1652 
1653                 DocumentNumberWordInteger result = null;
1654 
1655                 while (queue.size() > 0) {
1656                     ShreddedReader top = queue.poll();
1657                     result = top.read();
1658 
1659                     if (result != null) {
1660                         if (top.getBuffer().isAtEnd())
1661                             top.fill();
1662 
1663                         queue.offer(top);
1664                         break;
1665                     } 
1666                 }
1667 
1668                 return result;
1669             }
1670         } 
1671         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentNumberWordInteger>, ShreddedSource {      
1672             public ShreddedProcessor processor;
1673             ShreddedBuffer buffer;
1674             DocumentNumberWordInteger last = new DocumentNumberWordInteger();         
1675             long updateDocumentCount = -1;
1676             long tupleCount = 0;
1677             long bufferStartCount = 0;  
1678             ArrayInput input;
1679             
1680             public ShreddedReader(ArrayInput input) {
1681                 this.input = input; 
1682                 this.buffer = new ShreddedBuffer();
1683             }                               
1684             
1685             public ShreddedReader(ArrayInput input, int bufferSize) { 
1686                 this.input = input;
1687                 this.buffer = new ShreddedBuffer(bufferSize);
1688             }
1689                  
1690             public final int compareTo(ShreddedReader other) {
1691                 ShreddedBuffer otherBuffer = other.getBuffer();
1692                 
1693                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1694                     return 0;                 
1695                 } else if (buffer.isAtEnd()) {
1696                     return -1;
1697                 } else if (otherBuffer.isAtEnd()) {
1698                     return 1;
1699                 }
1700                                    
1701                 int result = 0;
1702                 do {
1703                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
1704                     if(result != 0) break;
1705                 } while (false);                                             
1706                 
1707                 return result;
1708             }
1709             
1710             public final ShreddedBuffer getBuffer() {
1711                 return buffer;
1712             }                
1713             
1714             public final DocumentNumberWordInteger read() throws IOException {
1715                 if (buffer.isAtEnd()) {
1716                     fill();             
1717                 
1718                     if (buffer.isAtEnd()) {
1719                         return null;
1720                     }
1721                 }
1722                       
1723                 assert !buffer.isAtEnd();
1724                 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1725                 
1726                 result.document = buffer.getDocument();
1727                 result.word = buffer.getWord();
1728                 result.value = buffer.getValue();
1729                 
1730                 buffer.incrementTuple();
1731                 buffer.autoIncrementDocument();
1732                 
1733                 return result;
1734             }           
1735             
1736             public final void fill() throws IOException {
1737                 try {   
1738                     buffer.reset();
1739                     
1740                     if (tupleCount != 0) {
1741                                                       
1742                         if(updateDocumentCount - tupleCount > 0) {
1743                             buffer.documents.add(last.document);
1744                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
1745                         }
1746                         bufferStartCount = tupleCount;
1747                     }
1748                     
1749                     while (!buffer.isFull()) {
1750                         updateDocument();
1751                         buffer.processTuple(input.readBytes(), input.readInt());
1752                         tupleCount++;
1753                     }
1754                 } catch(EOFException e) {}
1755             }
1756 
1757             public final void updateDocument() throws IOException {
1758                 if (updateDocumentCount > tupleCount)
1759                     return;
1760                      
1761                 last.document = input.readInt();
1762                 updateDocumentCount = tupleCount + input.readInt();
1763                                       
1764                 buffer.processDocument(last.document);
1765             }
1766 
1767             public void run() throws IOException {
1768                 while (true) {
1769                     fill();
1770                     
1771                     if (buffer.isAtEnd())
1772                         break;
1773                     
1774                     buffer.copyUntil(null, processor);
1775                 }      
1776                 processor.close();
1777             }
1778             
1779             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1780                 if (processor instanceof ShreddedProcessor) {
1781                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1782                 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
1783                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
1784                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1785                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
1786                 } else {
1787                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1788                 }
1789             }                                
1790             
1791             public Class<DocumentNumberWordInteger> getOutputClass() {
1792                 return DocumentNumberWordInteger.class;
1793             }                
1794         }
1795         
1796         public static class DuplicateEliminator implements ShreddedProcessor {
1797             public ShreddedProcessor processor;
1798             DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1799             boolean documentProcess = true;
1800                                            
1801             public DuplicateEliminator() {}
1802             public DuplicateEliminator(ShreddedProcessor processor) {
1803                 this.processor = processor;
1804             }
1805             
1806             public void setShreddedProcessor(ShreddedProcessor processor) {
1807                 this.processor = processor;
1808             }
1809 
1810             public void processDocument(int document) throws IOException {  
1811                 if (documentProcess || Utility.compare(document, last.document) != 0) {
1812                     last.document = document;
1813                     processor.processDocument(document);
1814                     documentProcess = false;
1815                 }
1816             }  
1817             
1818             public void resetDocument() {
1819                  documentProcess = true;
1820             }                                                
1821                                
1822             public void processTuple(byte[] word, int value) throws IOException {
1823                 processor.processTuple(word, value);
1824             } 
1825             
1826             public void close() throws IOException {
1827                 processor.close();
1828             }                    
1829         }
1830         public static class TupleUnshredder implements ShreddedProcessor {
1831             DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1832             public org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor;                               
1833             
1834             public TupleUnshredder(DocumentNumberWordInteger.Processor processor) {
1835                 this.processor = processor;
1836             }         
1837             
1838             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor) {
1839                 this.processor = processor;
1840             }
1841             
1842             public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
1843                 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1844                 if (object == null) return result;
1845                 result.word = object.word; 
1846                 result.document = object.document; 
1847                 result.value = object.value; 
1848                 return result;
1849             }                 
1850             
1851             public void processDocument(int document) throws IOException {
1852                 last.document = document;
1853             }   
1854                 
1855             
1856             public void processTuple(byte[] word, int value) throws IOException {
1857                 last.word = word;
1858                 last.value = value;
1859                 processor.process(clone(last));
1860             }               
1861             
1862             public void close() throws IOException {
1863                 processor.close();
1864             }
1865         }     
1866         public static class TupleShredder implements Processor {
1867             DocumentNumberWordInteger last = new DocumentNumberWordInteger();
1868             public ShreddedProcessor processor;
1869             
1870             public TupleShredder(ShreddedProcessor processor) {
1871                 this.processor = processor;
1872             }                              
1873             
1874             public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
1875                 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1876                 if (object == null) return result;
1877                 result.word = object.word; 
1878                 result.document = object.document; 
1879                 result.value = object.value; 
1880                 return result;
1881             }                 
1882             
1883             public void process(DocumentNumberWordInteger object) throws IOException {                                                                                                                                                   
1884                 boolean processAll = false;
1885                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
1886                 processor.processTuple(object.word, object.value);                                         
1887             }
1888                           
1889             public Class<DocumentNumberWordInteger> getInputClass() {
1890                 return DocumentNumberWordInteger.class;
1891             }
1892             
1893             public void close() throws IOException {
1894                 processor.close();
1895             }                     
1896         }
1897     } 
1898     public static class ValueOrder implements Order<DocumentNumberWordInteger> {
1899         public int hash(DocumentNumberWordInteger object) {
1900             int h = 0;
1901             h += Utility.hash(object.value);
1902             return h;
1903         } 
1904         public Comparator<DocumentNumberWordInteger> greaterThan() {
1905             return new Comparator<DocumentNumberWordInteger>() {
1906                 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
1907                     int result = 0;
1908                     do {
1909                         result = + Utility.compare(one.value, two.value);
1910                         if(result != 0) break;
1911                     } while (false);
1912                     return -result;
1913                 }
1914             };
1915         }     
1916         public Comparator<DocumentNumberWordInteger> lessThan() {
1917             return new Comparator<DocumentNumberWordInteger>() {
1918                 public int compare(DocumentNumberWordInteger one, DocumentNumberWordInteger two) {
1919                     int result = 0;
1920                     do {
1921                         result = + Utility.compare(one.value, two.value);
1922                         if(result != 0) break;
1923                     } while (false);
1924                     return result;
1925                 }
1926             };
1927         }     
1928         public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input) {
1929             return new ShreddedReader(_input);
1930         }    
1931 
1932         public TypeReader<DocumentNumberWordInteger> orderedReader(ArrayInput _input, int bufferSize) {
1933             return new ShreddedReader(_input, bufferSize);
1934         }    
1935         public OrderedWriter<DocumentNumberWordInteger> orderedWriter(ArrayOutput _output) {
1936             ShreddedWriter w = new ShreddedWriter(_output);
1937             return new OrderedWriterClass(w); 
1938         }                                    
1939         public static class OrderedWriterClass extends OrderedWriter< DocumentNumberWordInteger > {
1940             DocumentNumberWordInteger last = null;
1941             ShreddedWriter shreddedWriter = null; 
1942             
1943             public OrderedWriterClass(ShreddedWriter s) {
1944                 this.shreddedWriter = s;
1945             }
1946             
1947             public void process(DocumentNumberWordInteger object) throws IOException {
1948                boolean processAll = false;
1949                if (processAll || last == null || 0 != Utility.compare(object.value, last.value)) { processAll = true; shreddedWriter.processValue(object.value); }
1950                shreddedWriter.processTuple(object.word, object.document);
1951                last = object;
1952             }           
1953                  
1954             public void close() throws IOException {
1955                 shreddedWriter.close();
1956             }
1957             
1958             public Class<DocumentNumberWordInteger> getInputClass() {
1959                 return DocumentNumberWordInteger.class;
1960             }
1961         } 
1962         public ReaderSource<DocumentNumberWordInteger> orderedCombiner(Collection<TypeReader<DocumentNumberWordInteger>> readers, boolean closeOnExit) {
1963             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
1964             
1965             for (TypeReader<DocumentNumberWordInteger> reader : readers) {
1966                 shreddedReaders.add((ShreddedReader)reader);
1967             }
1968             
1969             return new ShreddedCombiner(shreddedReaders, closeOnExit);
1970         }                  
1971         public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
1972             DocumentNumberWordInteger result = new DocumentNumberWordInteger();
1973             if (object == null) return result;
1974             result.word = object.word; 
1975             result.document = object.document; 
1976             result.value = object.value; 
1977             return result;
1978         }                 
1979         public Class<DocumentNumberWordInteger> getOrderedClass() {
1980             return DocumentNumberWordInteger.class;
1981         }                           
1982         public String[] getOrderSpec() {
1983             return new String[] {"+value"};
1984         }
1985 
1986         public static String getSpecString() {
1987             return "+value";
1988         }
1989                            
1990         public interface ShreddedProcessor extends Step {
1991             public void processValue(int value) throws IOException;
1992             public void processTuple(byte[] word, int document) throws IOException;
1993             public void close() throws IOException;
1994         }    
1995         public interface ShreddedSource extends Step {
1996         }                                              
1997         
1998         public static class ShreddedWriter implements ShreddedProcessor {
1999             ArrayOutput output;
2000             ShreddedBuffer buffer = new ShreddedBuffer();
2001             int lastValue;
2002             boolean lastFlush = false;
2003             
2004             public ShreddedWriter(ArrayOutput output) {
2005                 this.output = output;
2006             }                        
2007             
2008             public void close() throws IOException {
2009                 flush();
2010             }
2011             
2012             public void processValue(int value) {
2013                 lastValue = value;
2014                 buffer.processValue(value);
2015             }
2016             public final void processTuple(byte[] word, int document) throws IOException {
2017                 if (lastFlush) {
2018                     if(buffer.values.size() == 0) buffer.processValue(lastValue);
2019                     lastFlush = false;
2020                 }
2021                 buffer.processTuple(word, document);
2022                 if (buffer.isFull())
2023                     flush();
2024             }
2025             public final void flushTuples(int pauseIndex) throws IOException {
2026                 
2027                 while (buffer.getReadIndex() < pauseIndex) {
2028                            
2029                     output.writeBytes(buffer.getWord());
2030                     output.writeInt(buffer.getDocument());
2031                     buffer.incrementTuple();
2032                 }
2033             }  
2034             public final void flushValue(int pauseIndex) throws IOException {
2035                 while (buffer.getReadIndex() < pauseIndex) {
2036                     int nextPause = buffer.getValueEndIndex();
2037                     int count = nextPause - buffer.getReadIndex();
2038                     
2039                     output.writeInt(buffer.getValue());
2040                     output.writeInt(count);
2041                     buffer.incrementValue();
2042                       
2043                     flushTuples(nextPause);
2044                     assert nextPause == buffer.getReadIndex();
2045                 }
2046             }
2047             public void flush() throws IOException { 
2048                 flushValue(buffer.getWriteIndex());
2049                 buffer.reset(); 
2050                 lastFlush = true;
2051             }                           
2052         }
2053         public static class ShreddedBuffer {
2054             ArrayList<Integer> values = new ArrayList();
2055             ArrayList<Integer> valueTupleIdx = new ArrayList();
2056             int valueReadIdx = 0;
2057                             
2058             byte[][] words;
2059             int[] documents;
2060             int writeTupleIndex = 0;
2061             int readTupleIndex = 0;
2062             int batchSize;
2063 
2064             public ShreddedBuffer(int batchSize) {
2065                 this.batchSize = batchSize;
2066 
2067                 words = new byte[batchSize][];
2068                 documents = new int[batchSize];
2069             }                              
2070 
2071             public ShreddedBuffer() {    
2072                 this(10000);
2073             }                                                                                                                    
2074             
2075             public void processValue(int value) {
2076                 values.add(value);
2077                 valueTupleIdx.add(writeTupleIndex);
2078             }                                      
2079             public void processTuple(byte[] word, int document) {
2080                 assert values.size() > 0;
2081                 words[writeTupleIndex] = word;
2082                 documents[writeTupleIndex] = document;
2083                 writeTupleIndex++;
2084             }
2085             public void resetData() {
2086                 values.clear();
2087                 valueTupleIdx.clear();
2088                 writeTupleIndex = 0;
2089             }                  
2090                                  
2091             public void resetRead() {
2092                 readTupleIndex = 0;
2093                 valueReadIdx = 0;
2094             } 
2095 
2096             public void reset() {
2097                 resetData();
2098                 resetRead();
2099             } 
2100             public boolean isFull() {
2101                 return writeTupleIndex >= batchSize;
2102             }
2103 
2104             public boolean isEmpty() {
2105                 return writeTupleIndex == 0;
2106             }                          
2107 
2108             public boolean isAtEnd() {
2109                 return readTupleIndex >= writeTupleIndex;
2110             }           
2111             public void incrementValue() {
2112                 valueReadIdx++;  
2113             }                                                                                              
2114 
2115             public void autoIncrementValue() {
2116                 while (readTupleIndex >= getValueEndIndex() && readTupleIndex < writeTupleIndex)
2117                     valueReadIdx++;
2118             }                 
2119             public void incrementTuple() {
2120                 readTupleIndex++;
2121             }                    
2122             public int getValueEndIndex() {
2123                 if ((valueReadIdx+1) >= valueTupleIdx.size())
2124                     return writeTupleIndex;
2125                 return valueTupleIdx.get(valueReadIdx+1);
2126             }
2127             public int getReadIndex() {
2128                 return readTupleIndex;
2129             }   
2130 
2131             public int getWriteIndex() {
2132                 return writeTupleIndex;
2133             } 
2134             public int getValue() {
2135                 assert readTupleIndex < writeTupleIndex;
2136                 assert valueReadIdx < values.size();
2137                 
2138                 return values.get(valueReadIdx);
2139             }
2140             public byte[] getWord() {
2141                 assert readTupleIndex < writeTupleIndex;
2142                 return words[readTupleIndex];
2143             }                                         
2144             public int getDocument() {
2145                 assert readTupleIndex < writeTupleIndex;
2146                 return documents[readTupleIndex];
2147             }                                         
2148             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
2149                 while (getReadIndex() < endIndex) {
2150                    output.processTuple(getWord(), getDocument());
2151                    incrementTuple();
2152                 }
2153             }                                                                           
2154             public void copyUntilIndexValue(int endIndex, ShreddedProcessor output) throws IOException {
2155                 while (getReadIndex() < endIndex) {
2156                     output.processValue(getValue());
2157                     assert getValueEndIndex() <= endIndex;
2158                     copyTuples(getValueEndIndex(), output);
2159                     incrementValue();
2160                 }
2161             }  
2162             public void copyUntilValue(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
2163                 while (!isAtEnd()) {
2164                     if (other != null) {   
2165                         assert !other.isAtEnd();
2166                         int c = + Utility.compare(getValue(), other.getValue());
2167                     
2168                         if (c > 0) {
2169                             break;   
2170                         }
2171                         
2172                         output.processValue(getValue());
2173                                       
2174                         copyTuples(getValueEndIndex(), output);
2175                     } else {
2176                         output.processValue(getValue());
2177                         copyTuples(getValueEndIndex(), output);
2178                     }
2179                     incrementValue();  
2180                     
2181                
2182                 }
2183             }
2184             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
2185                 copyUntilValue(other, output);
2186             }
2187             
2188         }                         
2189         public static class ShreddedCombiner implements ReaderSource<DocumentNumberWordInteger>, ShreddedSource {   
2190             public ShreddedProcessor processor;
2191             Collection<ShreddedReader> readers;       
2192             boolean closeOnExit = false;
2193             boolean uninitialized = true;
2194             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
2195             
2196             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
2197                 this.readers = readers;                                                       
2198                 this.closeOnExit = closeOnExit;
2199             }
2200                                   
2201             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
2202                 if (processor instanceof ShreddedProcessor) {
2203                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
2204                 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
2205                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
2206                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
2207                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
2208                 } else {
2209                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
2210                 }
2211             }                                
2212             
2213             public Class<DocumentNumberWordInteger> getOutputClass() {
2214                 return DocumentNumberWordInteger.class;
2215             }
2216             
2217             public void initialize() throws IOException {
2218                 for (ShreddedReader reader : readers) {
2219                     reader.fill();                                        
2220                     
2221                     if (!reader.getBuffer().isAtEnd())
2222                         queue.add(reader);
2223                 }   
2224 
2225                 uninitialized = false;
2226             }
2227 
2228             public void run() throws IOException {
2229                 initialize();
2230                
2231                 while (queue.size() > 0) {
2232                     ShreddedReader top = queue.poll();
2233                     ShreddedReader next = null;
2234                     ShreddedBuffer nextBuffer = null; 
2235                     
2236                     assert !top.getBuffer().isAtEnd();
2237                                                   
2238                     if (queue.size() > 0) {
2239                         next = queue.peek();
2240                         nextBuffer = next.getBuffer();
2241                         assert !nextBuffer.isAtEnd();
2242                     }
2243                     
2244                     top.getBuffer().copyUntil(nextBuffer, processor);
2245                     if (top.getBuffer().isAtEnd())
2246                         top.fill();                 
2247                         
2248                     if (!top.getBuffer().isAtEnd())
2249                         queue.add(top);
2250                 }              
2251                 
2252                 if (closeOnExit)
2253                     processor.close();
2254             }
2255 
2256             public DocumentNumberWordInteger read() throws IOException {
2257                 if (uninitialized)
2258                     initialize();
2259 
2260                 DocumentNumberWordInteger result = null;
2261 
2262                 while (queue.size() > 0) {
2263                     ShreddedReader top = queue.poll();
2264                     result = top.read();
2265 
2266                     if (result != null) {
2267                         if (top.getBuffer().isAtEnd())
2268                             top.fill();
2269 
2270                         queue.offer(top);
2271                         break;
2272                     } 
2273                 }
2274 
2275                 return result;
2276             }
2277         } 
2278         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentNumberWordInteger>, ShreddedSource {      
2279             public ShreddedProcessor processor;
2280             ShreddedBuffer buffer;
2281             DocumentNumberWordInteger last = new DocumentNumberWordInteger();         
2282             long updateValueCount = -1;
2283             long tupleCount = 0;
2284             long bufferStartCount = 0;  
2285             ArrayInput input;
2286             
2287             public ShreddedReader(ArrayInput input) {
2288                 this.input = input; 
2289                 this.buffer = new ShreddedBuffer();
2290             }                               
2291             
2292             public ShreddedReader(ArrayInput input, int bufferSize) { 
2293                 this.input = input;
2294                 this.buffer = new ShreddedBuffer(bufferSize);
2295             }
2296                  
2297             public final int compareTo(ShreddedReader other) {
2298                 ShreddedBuffer otherBuffer = other.getBuffer();
2299                 
2300                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
2301                     return 0;                 
2302                 } else if (buffer.isAtEnd()) {
2303                     return -1;
2304                 } else if (otherBuffer.isAtEnd()) {
2305                     return 1;
2306                 }
2307                                    
2308                 int result = 0;
2309                 do {
2310                     result = + Utility.compare(buffer.getValue(), otherBuffer.getValue());
2311                     if(result != 0) break;
2312                 } while (false);                                             
2313                 
2314                 return result;
2315             }
2316             
2317             public final ShreddedBuffer getBuffer() {
2318                 return buffer;
2319             }                
2320             
2321             public final DocumentNumberWordInteger read() throws IOException {
2322                 if (buffer.isAtEnd()) {
2323                     fill();             
2324                 
2325                     if (buffer.isAtEnd()) {
2326                         return null;
2327                     }
2328                 }
2329                       
2330                 assert !buffer.isAtEnd();
2331                 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
2332                 
2333                 result.value = buffer.getValue();
2334                 result.word = buffer.getWord();
2335                 result.document = buffer.getDocument();
2336                 
2337                 buffer.incrementTuple();
2338                 buffer.autoIncrementValue();
2339                 
2340                 return result;
2341             }           
2342             
2343             public final void fill() throws IOException {
2344                 try {   
2345                     buffer.reset();
2346                     
2347                     if (tupleCount != 0) {
2348                                                       
2349                         if(updateValueCount - tupleCount > 0) {
2350                             buffer.values.add(last.value);
2351                             buffer.valueTupleIdx.add((int) (updateValueCount - tupleCount));
2352                         }
2353                         bufferStartCount = tupleCount;
2354                     }
2355                     
2356                     while (!buffer.isFull()) {
2357                         updateValue();
2358                         buffer.processTuple(input.readBytes(), input.readInt());
2359                         tupleCount++;
2360                     }
2361                 } catch(EOFException e) {}
2362             }
2363 
2364             public final void updateValue() throws IOException {
2365                 if (updateValueCount > tupleCount)
2366                     return;
2367                      
2368                 last.value = input.readInt();
2369                 updateValueCount = tupleCount + input.readInt();
2370                                       
2371                 buffer.processValue(last.value);
2372             }
2373 
2374             public void run() throws IOException {
2375                 while (true) {
2376                     fill();
2377                     
2378                     if (buffer.isAtEnd())
2379                         break;
2380                     
2381                     buffer.copyUntil(null, processor);
2382                 }      
2383                 processor.close();
2384             }
2385             
2386             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
2387                 if (processor instanceof ShreddedProcessor) {
2388                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
2389                 } else if (processor instanceof DocumentNumberWordInteger.Processor) {
2390                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentNumberWordInteger.Processor) processor));
2391                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
2392                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger>) processor));
2393                 } else {
2394                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
2395                 }
2396             }                                
2397             
2398             public Class<DocumentNumberWordInteger> getOutputClass() {
2399                 return DocumentNumberWordInteger.class;
2400             }                
2401         }
2402         
2403         public static class DuplicateEliminator implements ShreddedProcessor {
2404             public ShreddedProcessor processor;
2405             DocumentNumberWordInteger last = new DocumentNumberWordInteger();
2406             boolean valueProcess = true;
2407                                            
2408             public DuplicateEliminator() {}
2409             public DuplicateEliminator(ShreddedProcessor processor) {
2410                 this.processor = processor;
2411             }
2412             
2413             public void setShreddedProcessor(ShreddedProcessor processor) {
2414                 this.processor = processor;
2415             }
2416 
2417             public void processValue(int value) throws IOException {  
2418                 if (valueProcess || Utility.compare(value, last.value) != 0) {
2419                     last.value = value;
2420                     processor.processValue(value);
2421                     valueProcess = false;
2422                 }
2423             }  
2424             
2425             public void resetValue() {
2426                  valueProcess = true;
2427             }                                                
2428                                
2429             public void processTuple(byte[] word, int document) throws IOException {
2430                 processor.processTuple(word, document);
2431             } 
2432             
2433             public void close() throws IOException {
2434                 processor.close();
2435             }                    
2436         }
2437         public static class TupleUnshredder implements ShreddedProcessor {
2438             DocumentNumberWordInteger last = new DocumentNumberWordInteger();
2439             public org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor;                               
2440             
2441             public TupleUnshredder(DocumentNumberWordInteger.Processor processor) {
2442                 this.processor = processor;
2443             }         
2444             
2445             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentNumberWordInteger> processor) {
2446                 this.processor = processor;
2447             }
2448             
2449             public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
2450                 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
2451                 if (object == null) return result;
2452                 result.word = object.word; 
2453                 result.document = object.document; 
2454                 result.value = object.value; 
2455                 return result;
2456             }                 
2457             
2458             public void processValue(int value) throws IOException {
2459                 last.value = value;
2460             }   
2461                 
2462             
2463             public void processTuple(byte[] word, int document) throws IOException {
2464                 last.word = word;
2465                 last.document = document;
2466                 processor.process(clone(last));
2467             }               
2468             
2469             public void close() throws IOException {
2470                 processor.close();
2471             }
2472         }     
2473         public static class TupleShredder implements Processor {
2474             DocumentNumberWordInteger last = new DocumentNumberWordInteger();
2475             public ShreddedProcessor processor;
2476             
2477             public TupleShredder(ShreddedProcessor processor) {
2478                 this.processor = processor;
2479             }                              
2480             
2481             public DocumentNumberWordInteger clone(DocumentNumberWordInteger object) {
2482                 DocumentNumberWordInteger result = new DocumentNumberWordInteger();
2483                 if (object == null) return result;
2484                 result.word = object.word; 
2485                 result.document = object.document; 
2486                 result.value = object.value; 
2487                 return result;
2488             }                 
2489             
2490             public void process(DocumentNumberWordInteger object) throws IOException {                                                                                                                                                   
2491                 boolean processAll = false;
2492                 if(last == null || Utility.compare(last.value, object.value) != 0 || processAll) { processor.processValue(object.value); processAll = true; }
2493                 processor.processTuple(object.word, object.document);                                         
2494             }
2495                           
2496             public Class<DocumentNumberWordInteger> getInputClass() {
2497                 return DocumentNumberWordInteger.class;
2498             }
2499             
2500             public void close() throws IOException {
2501                 processor.close();
2502             }                     
2503         }
2504     } 
2505 }