View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class DocumentLengthWordCount implements Type<DocumentLengthWordCount> {
25      public String document;
26      public int length;
27      public String word;
28      public int count; 
29      
30      public DocumentLengthWordCount() {}
31      public DocumentLengthWordCount(String document, int length, String word, int count) {
32          this.document = document;
33          this.length = length;
34          this.word = word;
35          this.count = count;
36      }  
37      
38      public String toString() {
39              return String.format("%s,%d,%s,%d",
40                                     document, length, word, count);
41      } 
42  
43      public Order<DocumentLengthWordCount> getOrder(String... spec) {
44          if (Arrays.equals(spec, new String[] { "+document", "+length" })) {
45              return new DocumentLengthOrder();
46          }
47          if (Arrays.equals(spec, new String[] { "+document", "+word" })) {
48              return new DocumentWordOrder();
49          }
50          if (Arrays.equals(spec, new String[] { "+word", "+document" })) {
51              return new WordDocumentOrder();
52          }
53          if (Arrays.equals(spec, new String[] { "+document" })) {
54              return new DocumentOrder();
55          }
56          return null;
57      } 
58        
59      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> {
60          public void process(DocumentLengthWordCount object) throws IOException;
61          public void close() throws IOException;
62      }                        
63      public interface Source extends Step {
64      }
65      public static class DocumentLengthOrder implements Order<DocumentLengthWordCount> {
66          public int hash(DocumentLengthWordCount object) {
67              int h = 0;
68              h += Utility.hash(object.document);
69              h += Utility.hash(object.length);
70              return h;
71          } 
72          public Comparator<DocumentLengthWordCount> greaterThan() {
73              return new Comparator<DocumentLengthWordCount>() {
74                  public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
75                      int result = 0;
76                      do {
77                          result = + Utility.compare(one.document, two.document);
78                          if(result != 0) break;
79                          result = + Utility.compare(one.length, two.length);
80                          if(result != 0) break;
81                      } while (false);
82                      return -result;
83                  }
84              };
85          }     
86          public Comparator<DocumentLengthWordCount> lessThan() {
87              return new Comparator<DocumentLengthWordCount>() {
88                  public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
89                      int result = 0;
90                      do {
91                          result = + Utility.compare(one.document, two.document);
92                          if(result != 0) break;
93                          result = + Utility.compare(one.length, two.length);
94                          if(result != 0) break;
95                      } while (false);
96                      return result;
97                  }
98              };
99          }     
100         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input) {
101             return new ShreddedReader(_input);
102         }    
103 
104         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input, int bufferSize) {
105             return new ShreddedReader(_input, bufferSize);
106         }    
107         public OrderedWriter<DocumentLengthWordCount> orderedWriter(ArrayOutput _output) {
108             ShreddedWriter w = new ShreddedWriter(_output);
109             return new OrderedWriterClass(w); 
110         }                                    
111         public static class OrderedWriterClass extends OrderedWriter< DocumentLengthWordCount > {
112             DocumentLengthWordCount last = null;
113             ShreddedWriter shreddedWriter = null; 
114             
115             public OrderedWriterClass(ShreddedWriter s) {
116                 this.shreddedWriter = s;
117             }
118             
119             public void process(DocumentLengthWordCount object) throws IOException {
120                boolean processAll = false;
121                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
122                if (processAll || last == null || 0 != Utility.compare(object.length, last.length)) { processAll = true; shreddedWriter.processLength(object.length); }
123                shreddedWriter.processTuple(object.word, object.count);
124                last = object;
125             }           
126                  
127             public void close() throws IOException {
128                 shreddedWriter.close();
129             }
130             
131             public Class<DocumentLengthWordCount> getInputClass() {
132                 return DocumentLengthWordCount.class;
133             }
134         } 
135         public ReaderSource<DocumentLengthWordCount> orderedCombiner(Collection<TypeReader<DocumentLengthWordCount>> readers, boolean closeOnExit) {
136             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
137             
138             for (TypeReader<DocumentLengthWordCount> reader : readers) {
139                 shreddedReaders.add((ShreddedReader)reader);
140             }
141             
142             return new ShreddedCombiner(shreddedReaders, closeOnExit);
143         }                  
144         public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
145             DocumentLengthWordCount result = new DocumentLengthWordCount();
146             if (object == null) return result;
147             result.document = object.document; 
148             result.length = object.length; 
149             result.word = object.word; 
150             result.count = object.count; 
151             return result;
152         }                 
153         public Class<DocumentLengthWordCount> getOrderedClass() {
154             return DocumentLengthWordCount.class;
155         }                           
156         public String[] getOrderSpec() {
157             return new String[] {"+document", "+length"};
158         }
159 
160         public static String getSpecString() {
161             return "+document +length";
162         }
163                            
164         public interface ShreddedProcessor extends Step {
165             public void processDocument(String document) throws IOException;
166             public void processLength(int length) throws IOException;
167             public void processTuple(String word, int count) throws IOException;
168             public void close() throws IOException;
169         }    
170         public interface ShreddedSource extends Step {
171         }                                              
172         
173         public static class ShreddedWriter implements ShreddedProcessor {
174             ArrayOutput output;
175             ShreddedBuffer buffer = new ShreddedBuffer();
176             String lastDocument;
177             int lastLength;
178             boolean lastFlush = false;
179             
180             public ShreddedWriter(ArrayOutput output) {
181                 this.output = output;
182             }                        
183             
184             public void close() throws IOException {
185                 flush();
186             }
187             
188             public void processDocument(String document) {
189                 lastDocument = document;
190                 buffer.processDocument(document);
191             }
192             public void processLength(int length) {
193                 lastLength = length;
194                 buffer.processLength(length);
195             }
196             public final void processTuple(String word, int count) throws IOException {
197                 if (lastFlush) {
198                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
199                     if(buffer.lengths.size() == 0) buffer.processLength(lastLength);
200                     lastFlush = false;
201                 }
202                 buffer.processTuple(word, count);
203                 if (buffer.isFull())
204                     flush();
205             }
206             public final void flushTuples(int pauseIndex) throws IOException {
207                 
208                 while (buffer.getReadIndex() < pauseIndex) {
209                            
210                     output.writeString(buffer.getWord());
211                     output.writeInt(buffer.getCount());
212                     buffer.incrementTuple();
213                 }
214             }  
215             public final void flushDocument(int pauseIndex) throws IOException {
216                 while (buffer.getReadIndex() < pauseIndex) {
217                     int nextPause = buffer.getDocumentEndIndex();
218                     int count = nextPause - buffer.getReadIndex();
219                     
220                     output.writeString(buffer.getDocument());
221                     output.writeInt(count);
222                     buffer.incrementDocument();
223                       
224                     flushLength(nextPause);
225                     assert nextPause == buffer.getReadIndex();
226                 }
227             }
228             public final void flushLength(int pauseIndex) throws IOException {
229                 while (buffer.getReadIndex() < pauseIndex) {
230                     int nextPause = buffer.getLengthEndIndex();
231                     int count = nextPause - buffer.getReadIndex();
232                     
233                     output.writeInt(buffer.getLength());
234                     output.writeInt(count);
235                     buffer.incrementLength();
236                       
237                     flushTuples(nextPause);
238                     assert nextPause == buffer.getReadIndex();
239                 }
240             }
241             public void flush() throws IOException { 
242                 flushDocument(buffer.getWriteIndex());
243                 buffer.reset(); 
244                 lastFlush = true;
245             }                           
246         }
247         public static class ShreddedBuffer {
248             ArrayList<String> documents = new ArrayList();
249             ArrayList<Integer> lengths = new ArrayList();
250             ArrayList<Integer> documentTupleIdx = new ArrayList();
251             ArrayList<Integer> lengthTupleIdx = new ArrayList();
252             int documentReadIdx = 0;
253             int lengthReadIdx = 0;
254                             
255             String[] words;
256             int[] counts;
257             int writeTupleIndex = 0;
258             int readTupleIndex = 0;
259             int batchSize;
260 
261             public ShreddedBuffer(int batchSize) {
262                 this.batchSize = batchSize;
263 
264                 words = new String[batchSize];
265                 counts = new int[batchSize];
266             }                              
267 
268             public ShreddedBuffer() {    
269                 this(10000);
270             }                                                                                                                    
271             
272             public void processDocument(String document) {
273                 documents.add(document);
274                 documentTupleIdx.add(writeTupleIndex);
275             }                                      
276             public void processLength(int length) {
277                 lengths.add(length);
278                 lengthTupleIdx.add(writeTupleIndex);
279             }                                      
280             public void processTuple(String word, int count) {
281                 assert documents.size() > 0;
282                 assert lengths.size() > 0;
283                 words[writeTupleIndex] = word;
284                 counts[writeTupleIndex] = count;
285                 writeTupleIndex++;
286             }
287             public void resetData() {
288                 documents.clear();
289                 lengths.clear();
290                 documentTupleIdx.clear();
291                 lengthTupleIdx.clear();
292                 writeTupleIndex = 0;
293             }                  
294                                  
295             public void resetRead() {
296                 readTupleIndex = 0;
297                 documentReadIdx = 0;
298                 lengthReadIdx = 0;
299             } 
300 
301             public void reset() {
302                 resetData();
303                 resetRead();
304             } 
305             public boolean isFull() {
306                 return writeTupleIndex >= batchSize;
307             }
308 
309             public boolean isEmpty() {
310                 return writeTupleIndex == 0;
311             }                          
312 
313             public boolean isAtEnd() {
314                 return readTupleIndex >= writeTupleIndex;
315             }           
316             public void incrementDocument() {
317                 documentReadIdx++;  
318             }                                                                                              
319 
320             public void autoIncrementDocument() {
321                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
322                     documentReadIdx++;
323             }                 
324             public void incrementLength() {
325                 lengthReadIdx++;  
326             }                                                                                              
327 
328             public void autoIncrementLength() {
329                 while (readTupleIndex >= getLengthEndIndex() && readTupleIndex < writeTupleIndex)
330                     lengthReadIdx++;
331             }                 
332             public void incrementTuple() {
333                 readTupleIndex++;
334             }                    
335             public int getDocumentEndIndex() {
336                 if ((documentReadIdx+1) >= documentTupleIdx.size())
337                     return writeTupleIndex;
338                 return documentTupleIdx.get(documentReadIdx+1);
339             }
340 
341             public int getLengthEndIndex() {
342                 if ((lengthReadIdx+1) >= lengthTupleIdx.size())
343                     return writeTupleIndex;
344                 return lengthTupleIdx.get(lengthReadIdx+1);
345             }
346             public int getReadIndex() {
347                 return readTupleIndex;
348             }   
349 
350             public int getWriteIndex() {
351                 return writeTupleIndex;
352             } 
353             public String getDocument() {
354                 assert readTupleIndex < writeTupleIndex;
355                 assert documentReadIdx < documents.size();
356                 
357                 return documents.get(documentReadIdx);
358             }
359             public int getLength() {
360                 assert readTupleIndex < writeTupleIndex;
361                 assert lengthReadIdx < lengths.size();
362                 
363                 return lengths.get(lengthReadIdx);
364             }
365             public String getWord() {
366                 assert readTupleIndex < writeTupleIndex;
367                 return words[readTupleIndex];
368             }                                         
369             public int getCount() {
370                 assert readTupleIndex < writeTupleIndex;
371                 return counts[readTupleIndex];
372             }                                         
373             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
374                 while (getReadIndex() < endIndex) {
375                    output.processTuple(getWord(), getCount());
376                    incrementTuple();
377                 }
378             }                                                                           
379             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
380                 while (getReadIndex() < endIndex) {
381                     output.processDocument(getDocument());
382                     assert getDocumentEndIndex() <= endIndex;
383                     copyUntilIndexLength(getDocumentEndIndex(), output);
384                     incrementDocument();
385                 }
386             } 
387             public void copyUntilIndexLength(int endIndex, ShreddedProcessor output) throws IOException {
388                 while (getReadIndex() < endIndex) {
389                     output.processLength(getLength());
390                     assert getLengthEndIndex() <= endIndex;
391                     copyTuples(getLengthEndIndex(), output);
392                     incrementLength();
393                 }
394             }  
395             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
396                 while (!isAtEnd()) {
397                     if (other != null) {   
398                         assert !other.isAtEnd();
399                         int c = + Utility.compare(getDocument(), other.getDocument());
400                     
401                         if (c > 0) {
402                             break;   
403                         }
404                         
405                         output.processDocument(getDocument());
406                                       
407                         if (c < 0) {
408                             copyUntilIndexLength(getDocumentEndIndex(), output);
409                         } else if (c == 0) {
410                             copyUntilLength(other, output);
411                             autoIncrementDocument();
412                             break;
413                         }
414                     } else {
415                         output.processDocument(getDocument());
416                         copyUntilIndexLength(getDocumentEndIndex(), output);
417                     }
418                     incrementDocument();  
419                     
420                
421                 }
422             }
423             public void copyUntilLength(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
424                 while (!isAtEnd()) {
425                     if (other != null) {   
426                         assert !other.isAtEnd();
427                         int c = + Utility.compare(getLength(), other.getLength());
428                     
429                         if (c > 0) {
430                             break;   
431                         }
432                         
433                         output.processLength(getLength());
434                                       
435                         copyTuples(getLengthEndIndex(), output);
436                     } else {
437                         output.processLength(getLength());
438                         copyTuples(getLengthEndIndex(), output);
439                     }
440                     incrementLength();  
441                     
442                     if (getDocumentEndIndex() <= readTupleIndex)
443                         break;   
444                 }
445             }
446             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
447                 copyUntilDocument(other, output);
448             }
449             
450         }                         
451         public static class ShreddedCombiner implements ReaderSource<DocumentLengthWordCount>, ShreddedSource {   
452             public ShreddedProcessor processor;
453             Collection<ShreddedReader> readers;       
454             boolean closeOnExit = false;
455             boolean uninitialized = true;
456             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
457             
458             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
459                 this.readers = readers;                                                       
460                 this.closeOnExit = closeOnExit;
461             }
462                                   
463             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
464                 if (processor instanceof ShreddedProcessor) {
465                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
466                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
467                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
468                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
469                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
470                 } else {
471                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
472                 }
473             }                                
474             
475             public Class<DocumentLengthWordCount> getOutputClass() {
476                 return DocumentLengthWordCount.class;
477             }
478             
479             public void initialize() throws IOException {
480                 for (ShreddedReader reader : readers) {
481                     reader.fill();                                        
482                     
483                     if (!reader.getBuffer().isAtEnd())
484                         queue.add(reader);
485                 }   
486 
487                 uninitialized = false;
488             }
489 
490             public void run() throws IOException {
491                 initialize();
492                
493                 while (queue.size() > 0) {
494                     ShreddedReader top = queue.poll();
495                     ShreddedReader next = null;
496                     ShreddedBuffer nextBuffer = null; 
497                     
498                     assert !top.getBuffer().isAtEnd();
499                                                   
500                     if (queue.size() > 0) {
501                         next = queue.peek();
502                         nextBuffer = next.getBuffer();
503                         assert !nextBuffer.isAtEnd();
504                     }
505                     
506                     top.getBuffer().copyUntil(nextBuffer, processor);
507                     if (top.getBuffer().isAtEnd())
508                         top.fill();                 
509                         
510                     if (!top.getBuffer().isAtEnd())
511                         queue.add(top);
512                 }              
513                 
514                 if (closeOnExit)
515                     processor.close();
516             }
517 
518             public DocumentLengthWordCount read() throws IOException {
519                 if (uninitialized)
520                     initialize();
521 
522                 DocumentLengthWordCount result = null;
523 
524                 while (queue.size() > 0) {
525                     ShreddedReader top = queue.poll();
526                     result = top.read();
527 
528                     if (result != null) {
529                         if (top.getBuffer().isAtEnd())
530                             top.fill();
531 
532                         queue.offer(top);
533                         break;
534                     } 
535                 }
536 
537                 return result;
538             }
539         } 
540         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentLengthWordCount>, ShreddedSource {      
541             public ShreddedProcessor processor;
542             ShreddedBuffer buffer;
543             DocumentLengthWordCount last = new DocumentLengthWordCount();         
544             long updateDocumentCount = -1;
545             long updateLengthCount = -1;
546             long tupleCount = 0;
547             long bufferStartCount = 0;  
548             ArrayInput input;
549             
550             public ShreddedReader(ArrayInput input) {
551                 this.input = input; 
552                 this.buffer = new ShreddedBuffer();
553             }                               
554             
555             public ShreddedReader(ArrayInput input, int bufferSize) { 
556                 this.input = input;
557                 this.buffer = new ShreddedBuffer(bufferSize);
558             }
559                  
560             public final int compareTo(ShreddedReader other) {
561                 ShreddedBuffer otherBuffer = other.getBuffer();
562                 
563                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
564                     return 0;                 
565                 } else if (buffer.isAtEnd()) {
566                     return -1;
567                 } else if (otherBuffer.isAtEnd()) {
568                     return 1;
569                 }
570                                    
571                 int result = 0;
572                 do {
573                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
574                     if(result != 0) break;
575                     result = + Utility.compare(buffer.getLength(), otherBuffer.getLength());
576                     if(result != 0) break;
577                 } while (false);                                             
578                 
579                 return result;
580             }
581             
582             public final ShreddedBuffer getBuffer() {
583                 return buffer;
584             }                
585             
586             public final DocumentLengthWordCount read() throws IOException {
587                 if (buffer.isAtEnd()) {
588                     fill();             
589                 
590                     if (buffer.isAtEnd()) {
591                         return null;
592                     }
593                 }
594                       
595                 assert !buffer.isAtEnd();
596                 DocumentLengthWordCount result = new DocumentLengthWordCount();
597                 
598                 result.document = buffer.getDocument();
599                 result.length = buffer.getLength();
600                 result.word = buffer.getWord();
601                 result.count = buffer.getCount();
602                 
603                 buffer.incrementTuple();
604                 buffer.autoIncrementDocument();
605                 buffer.autoIncrementLength();
606                 
607                 return result;
608             }           
609             
610             public final void fill() throws IOException {
611                 try {   
612                     buffer.reset();
613                     
614                     if (tupleCount != 0) {
615                                                       
616                         if(updateDocumentCount - tupleCount > 0) {
617                             buffer.documents.add(last.document);
618                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
619                         }                              
620                         if(updateLengthCount - tupleCount > 0) {
621                             buffer.lengths.add(last.length);
622                             buffer.lengthTupleIdx.add((int) (updateLengthCount - tupleCount));
623                         }
624                         bufferStartCount = tupleCount;
625                     }
626                     
627                     while (!buffer.isFull()) {
628                         updateLength();
629                         buffer.processTuple(input.readString(), input.readInt());
630                         tupleCount++;
631                     }
632                 } catch(EOFException e) {}
633             }
634 
635             public final void updateDocument() throws IOException {
636                 if (updateDocumentCount > tupleCount)
637                     return;
638                      
639                 last.document = input.readString();
640                 updateDocumentCount = tupleCount + input.readInt();
641                                       
642                 buffer.processDocument(last.document);
643             }
644             public final void updateLength() throws IOException {
645                 if (updateLengthCount > tupleCount)
646                     return;
647                      
648                 updateDocument();
649                 last.length = input.readInt();
650                 updateLengthCount = tupleCount + input.readInt();
651                                       
652                 buffer.processLength(last.length);
653             }
654 
655             public void run() throws IOException {
656                 while (true) {
657                     fill();
658                     
659                     if (buffer.isAtEnd())
660                         break;
661                     
662                     buffer.copyUntil(null, processor);
663                 }      
664                 processor.close();
665             }
666             
667             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
668                 if (processor instanceof ShreddedProcessor) {
669                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
670                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
671                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
672                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
673                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
674                 } else {
675                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
676                 }
677             }                                
678             
679             public Class<DocumentLengthWordCount> getOutputClass() {
680                 return DocumentLengthWordCount.class;
681             }                
682         }
683         
684         public static class DuplicateEliminator implements ShreddedProcessor {
685             public ShreddedProcessor processor;
686             DocumentLengthWordCount last = new DocumentLengthWordCount();
687             boolean documentProcess = true;
688             boolean lengthProcess = true;
689                                            
690             public DuplicateEliminator() {}
691             public DuplicateEliminator(ShreddedProcessor processor) {
692                 this.processor = processor;
693             }
694             
695             public void setShreddedProcessor(ShreddedProcessor processor) {
696                 this.processor = processor;
697             }
698 
699             public void processDocument(String document) throws IOException {  
700                 if (documentProcess || Utility.compare(document, last.document) != 0) {
701                     last.document = document;
702                     processor.processDocument(document);
703             resetLength();
704                     documentProcess = false;
705                 }
706             }
707             public void processLength(int length) throws IOException {  
708                 if (lengthProcess || Utility.compare(length, last.length) != 0) {
709                     last.length = length;
710                     processor.processLength(length);
711                     lengthProcess = false;
712                 }
713             }  
714             
715             public void resetDocument() {
716                  documentProcess = true;
717             resetLength();
718             }                                                
719             public void resetLength() {
720                  lengthProcess = true;
721             }                                                
722                                
723             public void processTuple(String word, int count) throws IOException {
724                 processor.processTuple(word, count);
725             } 
726             
727             public void close() throws IOException {
728                 processor.close();
729             }                    
730         }
731         public static class TupleUnshredder implements ShreddedProcessor {
732             DocumentLengthWordCount last = new DocumentLengthWordCount();
733             public org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor;                               
734             
735             public TupleUnshredder(DocumentLengthWordCount.Processor processor) {
736                 this.processor = processor;
737             }         
738             
739             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor) {
740                 this.processor = processor;
741             }
742             
743             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
744                 DocumentLengthWordCount result = new DocumentLengthWordCount();
745                 if (object == null) return result;
746                 result.document = object.document; 
747                 result.length = object.length; 
748                 result.word = object.word; 
749                 result.count = object.count; 
750                 return result;
751             }                 
752             
753             public void processDocument(String document) throws IOException {
754                 last.document = document;
755             }   
756                 
757             public void processLength(int length) throws IOException {
758                 last.length = length;
759             }   
760                 
761             
762             public void processTuple(String word, int count) throws IOException {
763                 last.word = word;
764                 last.count = count;
765                 processor.process(clone(last));
766             }               
767             
768             public void close() throws IOException {
769                 processor.close();
770             }
771         }     
772         public static class TupleShredder implements Processor {
773             DocumentLengthWordCount last = new DocumentLengthWordCount();
774             public ShreddedProcessor processor;
775             
776             public TupleShredder(ShreddedProcessor processor) {
777                 this.processor = processor;
778             }                              
779             
780             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
781                 DocumentLengthWordCount result = new DocumentLengthWordCount();
782                 if (object == null) return result;
783                 result.document = object.document; 
784                 result.length = object.length; 
785                 result.word = object.word; 
786                 result.count = object.count; 
787                 return result;
788             }                 
789             
790             public void process(DocumentLengthWordCount object) throws IOException {                                                                                                                                                   
791                 boolean processAll = false;
792                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
793                 if(last == null || Utility.compare(last.length, object.length) != 0 || processAll) { processor.processLength(object.length); processAll = true; }
794                 processor.processTuple(object.word, object.count);                                         
795             }
796                           
797             public Class<DocumentLengthWordCount> getInputClass() {
798                 return DocumentLengthWordCount.class;
799             }
800             
801             public void close() throws IOException {
802                 processor.close();
803             }                     
804         }
805     } 
806     public static class DocumentWordOrder implements Order<DocumentLengthWordCount> {
807         public int hash(DocumentLengthWordCount object) {
808             int h = 0;
809             h += Utility.hash(object.document);
810             h += Utility.hash(object.word);
811             return h;
812         } 
813         public Comparator<DocumentLengthWordCount> greaterThan() {
814             return new Comparator<DocumentLengthWordCount>() {
815                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
816                     int result = 0;
817                     do {
818                         result = + Utility.compare(one.document, two.document);
819                         if(result != 0) break;
820                         result = + Utility.compare(one.word, two.word);
821                         if(result != 0) break;
822                     } while (false);
823                     return -result;
824                 }
825             };
826         }     
827         public Comparator<DocumentLengthWordCount> lessThan() {
828             return new Comparator<DocumentLengthWordCount>() {
829                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
830                     int result = 0;
831                     do {
832                         result = + Utility.compare(one.document, two.document);
833                         if(result != 0) break;
834                         result = + Utility.compare(one.word, two.word);
835                         if(result != 0) break;
836                     } while (false);
837                     return result;
838                 }
839             };
840         }     
841         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input) {
842             return new ShreddedReader(_input);
843         }    
844 
845         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input, int bufferSize) {
846             return new ShreddedReader(_input, bufferSize);
847         }    
848         public OrderedWriter<DocumentLengthWordCount> orderedWriter(ArrayOutput _output) {
849             ShreddedWriter w = new ShreddedWriter(_output);
850             return new OrderedWriterClass(w); 
851         }                                    
852         public static class OrderedWriterClass extends OrderedWriter< DocumentLengthWordCount > {
853             DocumentLengthWordCount last = null;
854             ShreddedWriter shreddedWriter = null; 
855             
856             public OrderedWriterClass(ShreddedWriter s) {
857                 this.shreddedWriter = s;
858             }
859             
860             public void process(DocumentLengthWordCount object) throws IOException {
861                boolean processAll = false;
862                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
863                if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
864                shreddedWriter.processTuple(object.length, object.count);
865                last = object;
866             }           
867                  
868             public void close() throws IOException {
869                 shreddedWriter.close();
870             }
871             
872             public Class<DocumentLengthWordCount> getInputClass() {
873                 return DocumentLengthWordCount.class;
874             }
875         } 
876         public ReaderSource<DocumentLengthWordCount> orderedCombiner(Collection<TypeReader<DocumentLengthWordCount>> readers, boolean closeOnExit) {
877             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
878             
879             for (TypeReader<DocumentLengthWordCount> reader : readers) {
880                 shreddedReaders.add((ShreddedReader)reader);
881             }
882             
883             return new ShreddedCombiner(shreddedReaders, closeOnExit);
884         }                  
885         public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
886             DocumentLengthWordCount result = new DocumentLengthWordCount();
887             if (object == null) return result;
888             result.document = object.document; 
889             result.length = object.length; 
890             result.word = object.word; 
891             result.count = object.count; 
892             return result;
893         }                 
894         public Class<DocumentLengthWordCount> getOrderedClass() {
895             return DocumentLengthWordCount.class;
896         }                           
897         public String[] getOrderSpec() {
898             return new String[] {"+document", "+word"};
899         }
900 
901         public static String getSpecString() {
902             return "+document +word";
903         }
904                            
905         public interface ShreddedProcessor extends Step {
906             public void processDocument(String document) throws IOException;
907             public void processWord(String word) throws IOException;
908             public void processTuple(int length, int count) throws IOException;
909             public void close() throws IOException;
910         }    
911         public interface ShreddedSource extends Step {
912         }                                              
913         
914         public static class ShreddedWriter implements ShreddedProcessor {
915             ArrayOutput output;
916             ShreddedBuffer buffer = new ShreddedBuffer();
917             String lastDocument;
918             String lastWord;
919             boolean lastFlush = false;
920             
921             public ShreddedWriter(ArrayOutput output) {
922                 this.output = output;
923             }                        
924             
925             public void close() throws IOException {
926                 flush();
927             }
928             
929             public void processDocument(String document) {
930                 lastDocument = document;
931                 buffer.processDocument(document);
932             }
933             public void processWord(String word) {
934                 lastWord = word;
935                 buffer.processWord(word);
936             }
937             public final void processTuple(int length, int count) throws IOException {
938                 if (lastFlush) {
939                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
940                     if(buffer.words.size() == 0) buffer.processWord(lastWord);
941                     lastFlush = false;
942                 }
943                 buffer.processTuple(length, count);
944                 if (buffer.isFull())
945                     flush();
946             }
947             public final void flushTuples(int pauseIndex) throws IOException {
948                 
949                 while (buffer.getReadIndex() < pauseIndex) {
950                            
951                     output.writeInt(buffer.getLength());
952                     output.writeInt(buffer.getCount());
953                     buffer.incrementTuple();
954                 }
955             }  
956             public final void flushDocument(int pauseIndex) throws IOException {
957                 while (buffer.getReadIndex() < pauseIndex) {
958                     int nextPause = buffer.getDocumentEndIndex();
959                     int count = nextPause - buffer.getReadIndex();
960                     
961                     output.writeString(buffer.getDocument());
962                     output.writeInt(count);
963                     buffer.incrementDocument();
964                       
965                     flushWord(nextPause);
966                     assert nextPause == buffer.getReadIndex();
967                 }
968             }
969             public final void flushWord(int pauseIndex) throws IOException {
970                 while (buffer.getReadIndex() < pauseIndex) {
971                     int nextPause = buffer.getWordEndIndex();
972                     int count = nextPause - buffer.getReadIndex();
973                     
974                     output.writeString(buffer.getWord());
975                     output.writeInt(count);
976                     buffer.incrementWord();
977                       
978                     flushTuples(nextPause);
979                     assert nextPause == buffer.getReadIndex();
980                 }
981             }
982             public void flush() throws IOException { 
983                 flushDocument(buffer.getWriteIndex());
984                 buffer.reset(); 
985                 lastFlush = true;
986             }                           
987         }
988         public static class ShreddedBuffer {
989             ArrayList<String> documents = new ArrayList();
990             ArrayList<String> words = new ArrayList();
991             ArrayList<Integer> documentTupleIdx = new ArrayList();
992             ArrayList<Integer> wordTupleIdx = new ArrayList();
993             int documentReadIdx = 0;
994             int wordReadIdx = 0;
995                             
996             int[] lengths;
997             int[] counts;
998             int writeTupleIndex = 0;
999             int readTupleIndex = 0;
1000             int batchSize;
1001 
1002             public ShreddedBuffer(int batchSize) {
1003                 this.batchSize = batchSize;
1004 
1005                 lengths = new int[batchSize];
1006                 counts = new int[batchSize];
1007             }                              
1008 
1009             public ShreddedBuffer() {    
1010                 this(10000);
1011             }                                                                                                                    
1012             
1013             public void processDocument(String document) {
1014                 documents.add(document);
1015                 documentTupleIdx.add(writeTupleIndex);
1016             }                                      
1017             public void processWord(String word) {
1018                 words.add(word);
1019                 wordTupleIdx.add(writeTupleIndex);
1020             }                                      
1021             public void processTuple(int length, int count) {
1022                 assert documents.size() > 0;
1023                 assert words.size() > 0;
1024                 lengths[writeTupleIndex] = length;
1025                 counts[writeTupleIndex] = count;
1026                 writeTupleIndex++;
1027             }
1028             public void resetData() {
1029                 documents.clear();
1030                 words.clear();
1031                 documentTupleIdx.clear();
1032                 wordTupleIdx.clear();
1033                 writeTupleIndex = 0;
1034             }                  
1035                                  
1036             public void resetRead() {
1037                 readTupleIndex = 0;
1038                 documentReadIdx = 0;
1039                 wordReadIdx = 0;
1040             } 
1041 
1042             public void reset() {
1043                 resetData();
1044                 resetRead();
1045             } 
1046             public boolean isFull() {
1047                 return writeTupleIndex >= batchSize;
1048             }
1049 
1050             public boolean isEmpty() {
1051                 return writeTupleIndex == 0;
1052             }                          
1053 
1054             public boolean isAtEnd() {
1055                 return readTupleIndex >= writeTupleIndex;
1056             }           
1057             public void incrementDocument() {
1058                 documentReadIdx++;  
1059             }                                                                                              
1060 
1061             public void autoIncrementDocument() {
1062                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
1063                     documentReadIdx++;
1064             }                 
1065             public void incrementWord() {
1066                 wordReadIdx++;  
1067             }                                                                                              
1068 
1069             public void autoIncrementWord() {
1070                 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
1071                     wordReadIdx++;
1072             }                 
1073             public void incrementTuple() {
1074                 readTupleIndex++;
1075             }                    
1076             public int getDocumentEndIndex() {
1077                 if ((documentReadIdx+1) >= documentTupleIdx.size())
1078                     return writeTupleIndex;
1079                 return documentTupleIdx.get(documentReadIdx+1);
1080             }
1081 
1082             public int getWordEndIndex() {
1083                 if ((wordReadIdx+1) >= wordTupleIdx.size())
1084                     return writeTupleIndex;
1085                 return wordTupleIdx.get(wordReadIdx+1);
1086             }
1087             public int getReadIndex() {
1088                 return readTupleIndex;
1089             }   
1090 
1091             public int getWriteIndex() {
1092                 return writeTupleIndex;
1093             } 
1094             public String getDocument() {
1095                 assert readTupleIndex < writeTupleIndex;
1096                 assert documentReadIdx < documents.size();
1097                 
1098                 return documents.get(documentReadIdx);
1099             }
1100             public String getWord() {
1101                 assert readTupleIndex < writeTupleIndex;
1102                 assert wordReadIdx < words.size();
1103                 
1104                 return words.get(wordReadIdx);
1105             }
1106             public int getLength() {
1107                 assert readTupleIndex < writeTupleIndex;
1108                 return lengths[readTupleIndex];
1109             }                                         
1110             public int getCount() {
1111                 assert readTupleIndex < writeTupleIndex;
1112                 return counts[readTupleIndex];
1113             }                                         
1114             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1115                 while (getReadIndex() < endIndex) {
1116                    output.processTuple(getLength(), getCount());
1117                    incrementTuple();
1118                 }
1119             }                                                                           
1120             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
1121                 while (getReadIndex() < endIndex) {
1122                     output.processDocument(getDocument());
1123                     assert getDocumentEndIndex() <= endIndex;
1124                     copyUntilIndexWord(getDocumentEndIndex(), output);
1125                     incrementDocument();
1126                 }
1127             } 
1128             public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
1129                 while (getReadIndex() < endIndex) {
1130                     output.processWord(getWord());
1131                     assert getWordEndIndex() <= endIndex;
1132                     copyTuples(getWordEndIndex(), output);
1133                     incrementWord();
1134                 }
1135             }  
1136             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1137                 while (!isAtEnd()) {
1138                     if (other != null) {   
1139                         assert !other.isAtEnd();
1140                         int c = + Utility.compare(getDocument(), other.getDocument());
1141                     
1142                         if (c > 0) {
1143                             break;   
1144                         }
1145                         
1146                         output.processDocument(getDocument());
1147                                       
1148                         if (c < 0) {
1149                             copyUntilIndexWord(getDocumentEndIndex(), output);
1150                         } else if (c == 0) {
1151                             copyUntilWord(other, output);
1152                             autoIncrementDocument();
1153                             break;
1154                         }
1155                     } else {
1156                         output.processDocument(getDocument());
1157                         copyUntilIndexWord(getDocumentEndIndex(), output);
1158                     }
1159                     incrementDocument();  
1160                     
1161                
1162                 }
1163             }
1164             public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1165                 while (!isAtEnd()) {
1166                     if (other != null) {   
1167                         assert !other.isAtEnd();
1168                         int c = + Utility.compare(getWord(), other.getWord());
1169                     
1170                         if (c > 0) {
1171                             break;   
1172                         }
1173                         
1174                         output.processWord(getWord());
1175                                       
1176                         copyTuples(getWordEndIndex(), output);
1177                     } else {
1178                         output.processWord(getWord());
1179                         copyTuples(getWordEndIndex(), output);
1180                     }
1181                     incrementWord();  
1182                     
1183                     if (getDocumentEndIndex() <= readTupleIndex)
1184                         break;   
1185                 }
1186             }
1187             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1188                 copyUntilDocument(other, output);
1189             }
1190             
1191         }                         
1192         public static class ShreddedCombiner implements ReaderSource<DocumentLengthWordCount>, ShreddedSource {   
1193             public ShreddedProcessor processor;
1194             Collection<ShreddedReader> readers;       
1195             boolean closeOnExit = false;
1196             boolean uninitialized = true;
1197             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1198             
1199             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1200                 this.readers = readers;                                                       
1201                 this.closeOnExit = closeOnExit;
1202             }
1203                                   
1204             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1205                 if (processor instanceof ShreddedProcessor) {
1206                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1207                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
1208                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
1209                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1210                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
1211                 } else {
1212                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1213                 }
1214             }                                
1215             
1216             public Class<DocumentLengthWordCount> getOutputClass() {
1217                 return DocumentLengthWordCount.class;
1218             }
1219             
1220             public void initialize() throws IOException {
1221                 for (ShreddedReader reader : readers) {
1222                     reader.fill();                                        
1223                     
1224                     if (!reader.getBuffer().isAtEnd())
1225                         queue.add(reader);
1226                 }   
1227 
1228                 uninitialized = false;
1229             }
1230 
1231             public void run() throws IOException {
1232                 initialize();
1233                
1234                 while (queue.size() > 0) {
1235                     ShreddedReader top = queue.poll();
1236                     ShreddedReader next = null;
1237                     ShreddedBuffer nextBuffer = null; 
1238                     
1239                     assert !top.getBuffer().isAtEnd();
1240                                                   
1241                     if (queue.size() > 0) {
1242                         next = queue.peek();
1243                         nextBuffer = next.getBuffer();
1244                         assert !nextBuffer.isAtEnd();
1245                     }
1246                     
1247                     top.getBuffer().copyUntil(nextBuffer, processor);
1248                     if (top.getBuffer().isAtEnd())
1249                         top.fill();                 
1250                         
1251                     if (!top.getBuffer().isAtEnd())
1252                         queue.add(top);
1253                 }              
1254                 
1255                 if (closeOnExit)
1256                     processor.close();
1257             }
1258 
1259             public DocumentLengthWordCount read() throws IOException {
1260                 if (uninitialized)
1261                     initialize();
1262 
1263                 DocumentLengthWordCount result = null;
1264 
1265                 while (queue.size() > 0) {
1266                     ShreddedReader top = queue.poll();
1267                     result = top.read();
1268 
1269                     if (result != null) {
1270                         if (top.getBuffer().isAtEnd())
1271                             top.fill();
1272 
1273                         queue.offer(top);
1274                         break;
1275                     } 
1276                 }
1277 
1278                 return result;
1279             }
1280         } 
1281         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentLengthWordCount>, ShreddedSource {      
1282             public ShreddedProcessor processor;
1283             ShreddedBuffer buffer;
1284             DocumentLengthWordCount last = new DocumentLengthWordCount();         
1285             long updateDocumentCount = -1;
1286             long updateWordCount = -1;
1287             long tupleCount = 0;
1288             long bufferStartCount = 0;  
1289             ArrayInput input;
1290             
1291             public ShreddedReader(ArrayInput input) {
1292                 this.input = input; 
1293                 this.buffer = new ShreddedBuffer();
1294             }                               
1295             
1296             public ShreddedReader(ArrayInput input, int bufferSize) { 
1297                 this.input = input;
1298                 this.buffer = new ShreddedBuffer(bufferSize);
1299             }
1300                  
1301             public final int compareTo(ShreddedReader other) {
1302                 ShreddedBuffer otherBuffer = other.getBuffer();
1303                 
1304                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1305                     return 0;                 
1306                 } else if (buffer.isAtEnd()) {
1307                     return -1;
1308                 } else if (otherBuffer.isAtEnd()) {
1309                     return 1;
1310                 }
1311                                    
1312                 int result = 0;
1313                 do {
1314                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
1315                     if(result != 0) break;
1316                     result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
1317                     if(result != 0) break;
1318                 } while (false);                                             
1319                 
1320                 return result;
1321             }
1322             
1323             public final ShreddedBuffer getBuffer() {
1324                 return buffer;
1325             }                
1326             
1327             public final DocumentLengthWordCount read() throws IOException {
1328                 if (buffer.isAtEnd()) {
1329                     fill();             
1330                 
1331                     if (buffer.isAtEnd()) {
1332                         return null;
1333                     }
1334                 }
1335                       
1336                 assert !buffer.isAtEnd();
1337                 DocumentLengthWordCount result = new DocumentLengthWordCount();
1338                 
1339                 result.document = buffer.getDocument();
1340                 result.word = buffer.getWord();
1341                 result.length = buffer.getLength();
1342                 result.count = buffer.getCount();
1343                 
1344                 buffer.incrementTuple();
1345                 buffer.autoIncrementDocument();
1346                 buffer.autoIncrementWord();
1347                 
1348                 return result;
1349             }           
1350             
1351             public final void fill() throws IOException {
1352                 try {   
1353                     buffer.reset();
1354                     
1355                     if (tupleCount != 0) {
1356                                                       
1357                         if(updateDocumentCount - tupleCount > 0) {
1358                             buffer.documents.add(last.document);
1359                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
1360                         }                              
1361                         if(updateWordCount - tupleCount > 0) {
1362                             buffer.words.add(last.word);
1363                             buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
1364                         }
1365                         bufferStartCount = tupleCount;
1366                     }
1367                     
1368                     while (!buffer.isFull()) {
1369                         updateWord();
1370                         buffer.processTuple(input.readInt(), input.readInt());
1371                         tupleCount++;
1372                     }
1373                 } catch(EOFException e) {}
1374             }
1375 
1376             public final void updateDocument() throws IOException {
1377                 if (updateDocumentCount > tupleCount)
1378                     return;
1379                      
1380                 last.document = input.readString();
1381                 updateDocumentCount = tupleCount + input.readInt();
1382                                       
1383                 buffer.processDocument(last.document);
1384             }
1385             public final void updateWord() throws IOException {
1386                 if (updateWordCount > tupleCount)
1387                     return;
1388                      
1389                 updateDocument();
1390                 last.word = input.readString();
1391                 updateWordCount = tupleCount + input.readInt();
1392                                       
1393                 buffer.processWord(last.word);
1394             }
1395 
1396             public void run() throws IOException {
1397                 while (true) {
1398                     fill();
1399                     
1400                     if (buffer.isAtEnd())
1401                         break;
1402                     
1403                     buffer.copyUntil(null, processor);
1404                 }      
1405                 processor.close();
1406             }
1407             
1408             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1409                 if (processor instanceof ShreddedProcessor) {
1410                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1411                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
1412                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
1413                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1414                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
1415                 } else {
1416                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1417                 }
1418             }                                
1419             
1420             public Class<DocumentLengthWordCount> getOutputClass() {
1421                 return DocumentLengthWordCount.class;
1422             }                
1423         }
1424         
1425         public static class DuplicateEliminator implements ShreddedProcessor {
1426             public ShreddedProcessor processor;
1427             DocumentLengthWordCount last = new DocumentLengthWordCount();
1428             boolean documentProcess = true;
1429             boolean wordProcess = true;
1430                                            
1431             public DuplicateEliminator() {}
1432             public DuplicateEliminator(ShreddedProcessor processor) {
1433                 this.processor = processor;
1434             }
1435             
1436             public void setShreddedProcessor(ShreddedProcessor processor) {
1437                 this.processor = processor;
1438             }
1439 
1440             public void processDocument(String document) throws IOException {  
1441                 if (documentProcess || Utility.compare(document, last.document) != 0) {
1442                     last.document = document;
1443                     processor.processDocument(document);
1444             resetWord();
1445                     documentProcess = false;
1446                 }
1447             }
1448             public void processWord(String word) throws IOException {  
1449                 if (wordProcess || Utility.compare(word, last.word) != 0) {
1450                     last.word = word;
1451                     processor.processWord(word);
1452                     wordProcess = false;
1453                 }
1454             }  
1455             
1456             public void resetDocument() {
1457                  documentProcess = true;
1458             resetWord();
1459             }                                                
1460             public void resetWord() {
1461                  wordProcess = true;
1462             }                                                
1463                                
1464             public void processTuple(int length, int count) throws IOException {
1465                 processor.processTuple(length, count);
1466             } 
1467             
1468             public void close() throws IOException {
1469                 processor.close();
1470             }                    
1471         }
1472         public static class TupleUnshredder implements ShreddedProcessor {
1473             DocumentLengthWordCount last = new DocumentLengthWordCount();
1474             public org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor;                               
1475             
1476             public TupleUnshredder(DocumentLengthWordCount.Processor processor) {
1477                 this.processor = processor;
1478             }         
1479             
1480             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor) {
1481                 this.processor = processor;
1482             }
1483             
1484             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
1485                 DocumentLengthWordCount result = new DocumentLengthWordCount();
1486                 if (object == null) return result;
1487                 result.document = object.document; 
1488                 result.length = object.length; 
1489                 result.word = object.word; 
1490                 result.count = object.count; 
1491                 return result;
1492             }                 
1493             
1494             public void processDocument(String document) throws IOException {
1495                 last.document = document;
1496             }   
1497                 
1498             public void processWord(String word) throws IOException {
1499                 last.word = word;
1500             }   
1501                 
1502             
1503             public void processTuple(int length, int count) throws IOException {
1504                 last.length = length;
1505                 last.count = count;
1506                 processor.process(clone(last));
1507             }               
1508             
1509             public void close() throws IOException {
1510                 processor.close();
1511             }
1512         }     
1513         public static class TupleShredder implements Processor {
1514             DocumentLengthWordCount last = new DocumentLengthWordCount();
1515             public ShreddedProcessor processor;
1516             
1517             public TupleShredder(ShreddedProcessor processor) {
1518                 this.processor = processor;
1519             }                              
1520             
1521             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
1522                 DocumentLengthWordCount result = new DocumentLengthWordCount();
1523                 if (object == null) return result;
1524                 result.document = object.document; 
1525                 result.length = object.length; 
1526                 result.word = object.word; 
1527                 result.count = object.count; 
1528                 return result;
1529             }                 
1530             
1531             public void process(DocumentLengthWordCount object) throws IOException {                                                                                                                                                   
1532                 boolean processAll = false;
1533                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
1534                 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
1535                 processor.processTuple(object.length, object.count);                                         
1536             }
1537                           
1538             public Class<DocumentLengthWordCount> getInputClass() {
1539                 return DocumentLengthWordCount.class;
1540             }
1541             
1542             public void close() throws IOException {
1543                 processor.close();
1544             }                     
1545         }
1546     } 
1547     public static class WordDocumentOrder implements Order<DocumentLengthWordCount> {
1548         public int hash(DocumentLengthWordCount object) {
1549             int h = 0;
1550             h += Utility.hash(object.word);
1551             h += Utility.hash(object.document);
1552             return h;
1553         } 
1554         public Comparator<DocumentLengthWordCount> greaterThan() {
1555             return new Comparator<DocumentLengthWordCount>() {
1556                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
1557                     int result = 0;
1558                     do {
1559                         result = + Utility.compare(one.word, two.word);
1560                         if(result != 0) break;
1561                         result = + Utility.compare(one.document, two.document);
1562                         if(result != 0) break;
1563                     } while (false);
1564                     return -result;
1565                 }
1566             };
1567         }     
1568         public Comparator<DocumentLengthWordCount> lessThan() {
1569             return new Comparator<DocumentLengthWordCount>() {
1570                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
1571                     int result = 0;
1572                     do {
1573                         result = + Utility.compare(one.word, two.word);
1574                         if(result != 0) break;
1575                         result = + Utility.compare(one.document, two.document);
1576                         if(result != 0) break;
1577                     } while (false);
1578                     return result;
1579                 }
1580             };
1581         }     
1582         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input) {
1583             return new ShreddedReader(_input);
1584         }    
1585 
1586         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input, int bufferSize) {
1587             return new ShreddedReader(_input, bufferSize);
1588         }    
1589         public OrderedWriter<DocumentLengthWordCount> orderedWriter(ArrayOutput _output) {
1590             ShreddedWriter w = new ShreddedWriter(_output);
1591             return new OrderedWriterClass(w); 
1592         }                                    
1593         public static class OrderedWriterClass extends OrderedWriter< DocumentLengthWordCount > {
1594             DocumentLengthWordCount last = null;
1595             ShreddedWriter shreddedWriter = null; 
1596             
1597             public OrderedWriterClass(ShreddedWriter s) {
1598                 this.shreddedWriter = s;
1599             }
1600             
1601             public void process(DocumentLengthWordCount object) throws IOException {
1602                boolean processAll = false;
1603                if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
1604                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
1605                shreddedWriter.processTuple(object.length, object.count);
1606                last = object;
1607             }           
1608                  
1609             public void close() throws IOException {
1610                 shreddedWriter.close();
1611             }
1612             
1613             public Class<DocumentLengthWordCount> getInputClass() {
1614                 return DocumentLengthWordCount.class;
1615             }
1616         } 
1617         public ReaderSource<DocumentLengthWordCount> orderedCombiner(Collection<TypeReader<DocumentLengthWordCount>> readers, boolean closeOnExit) {
1618             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
1619             
1620             for (TypeReader<DocumentLengthWordCount> reader : readers) {
1621                 shreddedReaders.add((ShreddedReader)reader);
1622             }
1623             
1624             return new ShreddedCombiner(shreddedReaders, closeOnExit);
1625         }                  
1626         public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
1627             DocumentLengthWordCount result = new DocumentLengthWordCount();
1628             if (object == null) return result;
1629             result.document = object.document; 
1630             result.length = object.length; 
1631             result.word = object.word; 
1632             result.count = object.count; 
1633             return result;
1634         }                 
1635         public Class<DocumentLengthWordCount> getOrderedClass() {
1636             return DocumentLengthWordCount.class;
1637         }                           
1638         public String[] getOrderSpec() {
1639             return new String[] {"+word", "+document"};
1640         }
1641 
1642         public static String getSpecString() {
1643             return "+word +document";
1644         }
1645                            
1646         public interface ShreddedProcessor extends Step {
1647             public void processWord(String word) throws IOException;
1648             public void processDocument(String document) throws IOException;
1649             public void processTuple(int length, int count) throws IOException;
1650             public void close() throws IOException;
1651         }    
1652         public interface ShreddedSource extends Step {
1653         }                                              
1654         
1655         public static class ShreddedWriter implements ShreddedProcessor {
1656             ArrayOutput output;
1657             ShreddedBuffer buffer = new ShreddedBuffer();
1658             String lastWord;
1659             String lastDocument;
1660             boolean lastFlush = false;
1661             
1662             public ShreddedWriter(ArrayOutput output) {
1663                 this.output = output;
1664             }                        
1665             
1666             public void close() throws IOException {
1667                 flush();
1668             }
1669             
1670             public void processWord(String word) {
1671                 lastWord = word;
1672                 buffer.processWord(word);
1673             }
1674             public void processDocument(String document) {
1675                 lastDocument = document;
1676                 buffer.processDocument(document);
1677             }
1678             public final void processTuple(int length, int count) throws IOException {
1679                 if (lastFlush) {
1680                     if(buffer.words.size() == 0) buffer.processWord(lastWord);
1681                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
1682                     lastFlush = false;
1683                 }
1684                 buffer.processTuple(length, count);
1685                 if (buffer.isFull())
1686                     flush();
1687             }
1688             public final void flushTuples(int pauseIndex) throws IOException {
1689                 
1690                 while (buffer.getReadIndex() < pauseIndex) {
1691                            
1692                     output.writeInt(buffer.getLength());
1693                     output.writeInt(buffer.getCount());
1694                     buffer.incrementTuple();
1695                 }
1696             }  
1697             public final void flushWord(int pauseIndex) throws IOException {
1698                 while (buffer.getReadIndex() < pauseIndex) {
1699                     int nextPause = buffer.getWordEndIndex();
1700                     int count = nextPause - buffer.getReadIndex();
1701                     
1702                     output.writeString(buffer.getWord());
1703                     output.writeInt(count);
1704                     buffer.incrementWord();
1705                       
1706                     flushDocument(nextPause);
1707                     assert nextPause == buffer.getReadIndex();
1708                 }
1709             }
1710             public final void flushDocument(int pauseIndex) throws IOException {
1711                 while (buffer.getReadIndex() < pauseIndex) {
1712                     int nextPause = buffer.getDocumentEndIndex();
1713                     int count = nextPause - buffer.getReadIndex();
1714                     
1715                     output.writeString(buffer.getDocument());
1716                     output.writeInt(count);
1717                     buffer.incrementDocument();
1718                       
1719                     flushTuples(nextPause);
1720                     assert nextPause == buffer.getReadIndex();
1721                 }
1722             }
1723             public void flush() throws IOException { 
1724                 flushWord(buffer.getWriteIndex());
1725                 buffer.reset(); 
1726                 lastFlush = true;
1727             }                           
1728         }
1729         public static class ShreddedBuffer {
1730             ArrayList<String> words = new ArrayList();
1731             ArrayList<String> documents = new ArrayList();
1732             ArrayList<Integer> wordTupleIdx = new ArrayList();
1733             ArrayList<Integer> documentTupleIdx = new ArrayList();
1734             int wordReadIdx = 0;
1735             int documentReadIdx = 0;
1736                             
1737             int[] lengths;
1738             int[] counts;
1739             int writeTupleIndex = 0;
1740             int readTupleIndex = 0;
1741             int batchSize;
1742 
1743             public ShreddedBuffer(int batchSize) {
1744                 this.batchSize = batchSize;
1745 
1746                 lengths = new int[batchSize];
1747                 counts = new int[batchSize];
1748             }                              
1749 
1750             public ShreddedBuffer() {    
1751                 this(10000);
1752             }                                                                                                                    
1753             
1754             public void processWord(String word) {
1755                 words.add(word);
1756                 wordTupleIdx.add(writeTupleIndex);
1757             }                                      
1758             public void processDocument(String document) {
1759                 documents.add(document);
1760                 documentTupleIdx.add(writeTupleIndex);
1761             }                                      
1762             public void processTuple(int length, int count) {
1763                 assert words.size() > 0;
1764                 assert documents.size() > 0;
1765                 lengths[writeTupleIndex] = length;
1766                 counts[writeTupleIndex] = count;
1767                 writeTupleIndex++;
1768             }
1769             public void resetData() {
1770                 words.clear();
1771                 documents.clear();
1772                 wordTupleIdx.clear();
1773                 documentTupleIdx.clear();
1774                 writeTupleIndex = 0;
1775             }                  
1776                                  
1777             public void resetRead() {
1778                 readTupleIndex = 0;
1779                 wordReadIdx = 0;
1780                 documentReadIdx = 0;
1781             } 
1782 
1783             public void reset() {
1784                 resetData();
1785                 resetRead();
1786             } 
1787             public boolean isFull() {
1788                 return writeTupleIndex >= batchSize;
1789             }
1790 
1791             public boolean isEmpty() {
1792                 return writeTupleIndex == 0;
1793             }                          
1794 
1795             public boolean isAtEnd() {
1796                 return readTupleIndex >= writeTupleIndex;
1797             }           
1798             public void incrementWord() {
1799                 wordReadIdx++;  
1800             }                                                                                              
1801 
1802             public void autoIncrementWord() {
1803                 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
1804                     wordReadIdx++;
1805             }                 
1806             public void incrementDocument() {
1807                 documentReadIdx++;  
1808             }                                                                                              
1809 
1810             public void autoIncrementDocument() {
1811                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
1812                     documentReadIdx++;
1813             }                 
1814             public void incrementTuple() {
1815                 readTupleIndex++;
1816             }                    
1817             public int getWordEndIndex() {
1818                 if ((wordReadIdx+1) >= wordTupleIdx.size())
1819                     return writeTupleIndex;
1820                 return wordTupleIdx.get(wordReadIdx+1);
1821             }
1822 
1823             public int getDocumentEndIndex() {
1824                 if ((documentReadIdx+1) >= documentTupleIdx.size())
1825                     return writeTupleIndex;
1826                 return documentTupleIdx.get(documentReadIdx+1);
1827             }
1828             public int getReadIndex() {
1829                 return readTupleIndex;
1830             }   
1831 
1832             public int getWriteIndex() {
1833                 return writeTupleIndex;
1834             } 
1835             public String getWord() {
1836                 assert readTupleIndex < writeTupleIndex;
1837                 assert wordReadIdx < words.size();
1838                 
1839                 return words.get(wordReadIdx);
1840             }
1841             public String getDocument() {
1842                 assert readTupleIndex < writeTupleIndex;
1843                 assert documentReadIdx < documents.size();
1844                 
1845                 return documents.get(documentReadIdx);
1846             }
1847             public int getLength() {
1848                 assert readTupleIndex < writeTupleIndex;
1849                 return lengths[readTupleIndex];
1850             }                                         
1851             public int getCount() {
1852                 assert readTupleIndex < writeTupleIndex;
1853                 return counts[readTupleIndex];
1854             }                                         
1855             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1856                 while (getReadIndex() < endIndex) {
1857                    output.processTuple(getLength(), getCount());
1858                    incrementTuple();
1859                 }
1860             }                                                                           
1861             public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
1862                 while (getReadIndex() < endIndex) {
1863                     output.processWord(getWord());
1864                     assert getWordEndIndex() <= endIndex;
1865                     copyUntilIndexDocument(getWordEndIndex(), output);
1866                     incrementWord();
1867                 }
1868             } 
1869             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
1870                 while (getReadIndex() < endIndex) {
1871                     output.processDocument(getDocument());
1872                     assert getDocumentEndIndex() <= endIndex;
1873                     copyTuples(getDocumentEndIndex(), output);
1874                     incrementDocument();
1875                 }
1876             }  
1877             public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1878                 while (!isAtEnd()) {
1879                     if (other != null) {   
1880                         assert !other.isAtEnd();
1881                         int c = + Utility.compare(getWord(), other.getWord());
1882                     
1883                         if (c > 0) {
1884                             break;   
1885                         }
1886                         
1887                         output.processWord(getWord());
1888                                       
1889                         if (c < 0) {
1890                             copyUntilIndexDocument(getWordEndIndex(), output);
1891                         } else if (c == 0) {
1892                             copyUntilDocument(other, output);
1893                             autoIncrementWord();
1894                             break;
1895                         }
1896                     } else {
1897                         output.processWord(getWord());
1898                         copyUntilIndexDocument(getWordEndIndex(), output);
1899                     }
1900                     incrementWord();  
1901                     
1902                
1903                 }
1904             }
1905             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1906                 while (!isAtEnd()) {
1907                     if (other != null) {   
1908                         assert !other.isAtEnd();
1909                         int c = + Utility.compare(getDocument(), other.getDocument());
1910                     
1911                         if (c > 0) {
1912                             break;   
1913                         }
1914                         
1915                         output.processDocument(getDocument());
1916                                       
1917                         copyTuples(getDocumentEndIndex(), output);
1918                     } else {
1919                         output.processDocument(getDocument());
1920                         copyTuples(getDocumentEndIndex(), output);
1921                     }
1922                     incrementDocument();  
1923                     
1924                     if (getWordEndIndex() <= readTupleIndex)
1925                         break;   
1926                 }
1927             }
1928             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1929                 copyUntilWord(other, output);
1930             }
1931             
1932         }                         
1933         public static class ShreddedCombiner implements ReaderSource<DocumentLengthWordCount>, ShreddedSource {   
1934             public ShreddedProcessor processor;
1935             Collection<ShreddedReader> readers;       
1936             boolean closeOnExit = false;
1937             boolean uninitialized = true;
1938             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1939             
1940             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1941                 this.readers = readers;                                                       
1942                 this.closeOnExit = closeOnExit;
1943             }
1944                                   
1945             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1946                 if (processor instanceof ShreddedProcessor) {
1947                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1948                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
1949                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
1950                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1951                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
1952                 } else {
1953                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1954                 }
1955             }                                
1956             
1957             public Class<DocumentLengthWordCount> getOutputClass() {
1958                 return DocumentLengthWordCount.class;
1959             }
1960             
1961             public void initialize() throws IOException {
1962                 for (ShreddedReader reader : readers) {
1963                     reader.fill();                                        
1964                     
1965                     if (!reader.getBuffer().isAtEnd())
1966                         queue.add(reader);
1967                 }   
1968 
1969                 uninitialized = false;
1970             }
1971 
1972             public void run() throws IOException {
1973                 initialize();
1974                
1975                 while (queue.size() > 0) {
1976                     ShreddedReader top = queue.poll();
1977                     ShreddedReader next = null;
1978                     ShreddedBuffer nextBuffer = null; 
1979                     
1980                     assert !top.getBuffer().isAtEnd();
1981                                                   
1982                     if (queue.size() > 0) {
1983                         next = queue.peek();
1984                         nextBuffer = next.getBuffer();
1985                         assert !nextBuffer.isAtEnd();
1986                     }
1987                     
1988                     top.getBuffer().copyUntil(nextBuffer, processor);
1989                     if (top.getBuffer().isAtEnd())
1990                         top.fill();                 
1991                         
1992                     if (!top.getBuffer().isAtEnd())
1993                         queue.add(top);
1994                 }              
1995                 
1996                 if (closeOnExit)
1997                     processor.close();
1998             }
1999 
2000             public DocumentLengthWordCount read() throws IOException {
2001                 if (uninitialized)
2002                     initialize();
2003 
2004                 DocumentLengthWordCount result = null;
2005 
2006                 while (queue.size() > 0) {
2007                     ShreddedReader top = queue.poll();
2008                     result = top.read();
2009 
2010                     if (result != null) {
2011                         if (top.getBuffer().isAtEnd())
2012                             top.fill();
2013 
2014                         queue.offer(top);
2015                         break;
2016                     } 
2017                 }
2018 
2019                 return result;
2020             }
2021         } 
2022         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentLengthWordCount>, ShreddedSource {      
2023             public ShreddedProcessor processor;
2024             ShreddedBuffer buffer;
2025             DocumentLengthWordCount last = new DocumentLengthWordCount();         
2026             long updateWordCount = -1;
2027             long updateDocumentCount = -1;
2028             long tupleCount = 0;
2029             long bufferStartCount = 0;  
2030             ArrayInput input;
2031             
2032             public ShreddedReader(ArrayInput input) {
2033                 this.input = input; 
2034                 this.buffer = new ShreddedBuffer();
2035             }                               
2036             
2037             public ShreddedReader(ArrayInput input, int bufferSize) { 
2038                 this.input = input;
2039                 this.buffer = new ShreddedBuffer(bufferSize);
2040             }
2041                  
2042             public final int compareTo(ShreddedReader other) {
2043                 ShreddedBuffer otherBuffer = other.getBuffer();
2044                 
2045                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
2046                     return 0;                 
2047                 } else if (buffer.isAtEnd()) {
2048                     return -1;
2049                 } else if (otherBuffer.isAtEnd()) {
2050                     return 1;
2051                 }
2052                                    
2053                 int result = 0;
2054                 do {
2055                     result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
2056                     if(result != 0) break;
2057                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
2058                     if(result != 0) break;
2059                 } while (false);                                             
2060                 
2061                 return result;
2062             }
2063             
2064             public final ShreddedBuffer getBuffer() {
2065                 return buffer;
2066             }                
2067             
2068             public final DocumentLengthWordCount read() throws IOException {
2069                 if (buffer.isAtEnd()) {
2070                     fill();             
2071                 
2072                     if (buffer.isAtEnd()) {
2073                         return null;
2074                     }
2075                 }
2076                       
2077                 assert !buffer.isAtEnd();
2078                 DocumentLengthWordCount result = new DocumentLengthWordCount();
2079                 
2080                 result.word = buffer.getWord();
2081                 result.document = buffer.getDocument();
2082                 result.length = buffer.getLength();
2083                 result.count = buffer.getCount();
2084                 
2085                 buffer.incrementTuple();
2086                 buffer.autoIncrementWord();
2087                 buffer.autoIncrementDocument();
2088                 
2089                 return result;
2090             }           
2091             
2092             public final void fill() throws IOException {
2093                 try {   
2094                     buffer.reset();
2095                     
2096                     if (tupleCount != 0) {
2097                                                       
2098                         if(updateWordCount - tupleCount > 0) {
2099                             buffer.words.add(last.word);
2100                             buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
2101                         }                              
2102                         if(updateDocumentCount - tupleCount > 0) {
2103                             buffer.documents.add(last.document);
2104                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
2105                         }
2106                         bufferStartCount = tupleCount;
2107                     }
2108                     
2109                     while (!buffer.isFull()) {
2110                         updateDocument();
2111                         buffer.processTuple(input.readInt(), input.readInt());
2112                         tupleCount++;
2113                     }
2114                 } catch(EOFException e) {}
2115             }
2116 
2117             public final void updateWord() throws IOException {
2118                 if (updateWordCount > tupleCount)
2119                     return;
2120                      
2121                 last.word = input.readString();
2122                 updateWordCount = tupleCount + input.readInt();
2123                                       
2124                 buffer.processWord(last.word);
2125             }
2126             public final void updateDocument() throws IOException {
2127                 if (updateDocumentCount > tupleCount)
2128                     return;
2129                      
2130                 updateWord();
2131                 last.document = input.readString();
2132                 updateDocumentCount = tupleCount + input.readInt();
2133                                       
2134                 buffer.processDocument(last.document);
2135             }
2136 
2137             public void run() throws IOException {
2138                 while (true) {
2139                     fill();
2140                     
2141                     if (buffer.isAtEnd())
2142                         break;
2143                     
2144                     buffer.copyUntil(null, processor);
2145                 }      
2146                 processor.close();
2147             }
2148             
2149             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
2150                 if (processor instanceof ShreddedProcessor) {
2151                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
2152                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
2153                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
2154                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
2155                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
2156                 } else {
2157                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
2158                 }
2159             }                                
2160             
2161             public Class<DocumentLengthWordCount> getOutputClass() {
2162                 return DocumentLengthWordCount.class;
2163             }                
2164         }
2165         
2166         public static class DuplicateEliminator implements ShreddedProcessor {
2167             public ShreddedProcessor processor;
2168             DocumentLengthWordCount last = new DocumentLengthWordCount();
2169             boolean wordProcess = true;
2170             boolean documentProcess = true;
2171                                            
2172             public DuplicateEliminator() {}
2173             public DuplicateEliminator(ShreddedProcessor processor) {
2174                 this.processor = processor;
2175             }
2176             
2177             public void setShreddedProcessor(ShreddedProcessor processor) {
2178                 this.processor = processor;
2179             }
2180 
2181             public void processWord(String word) throws IOException {  
2182                 if (wordProcess || Utility.compare(word, last.word) != 0) {
2183                     last.word = word;
2184                     processor.processWord(word);
2185             resetDocument();
2186                     wordProcess = false;
2187                 }
2188             }
2189             public void processDocument(String document) throws IOException {  
2190                 if (documentProcess || Utility.compare(document, last.document) != 0) {
2191                     last.document = document;
2192                     processor.processDocument(document);
2193                     documentProcess = false;
2194                 }
2195             }  
2196             
2197             public void resetWord() {
2198                  wordProcess = true;
2199             resetDocument();
2200             }                                                
2201             public void resetDocument() {
2202                  documentProcess = true;
2203             }                                                
2204                                
2205             public void processTuple(int length, int count) throws IOException {
2206                 processor.processTuple(length, count);
2207             } 
2208             
2209             public void close() throws IOException {
2210                 processor.close();
2211             }                    
2212         }
2213         public static class TupleUnshredder implements ShreddedProcessor {
2214             DocumentLengthWordCount last = new DocumentLengthWordCount();
2215             public org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor;                               
2216             
2217             public TupleUnshredder(DocumentLengthWordCount.Processor processor) {
2218                 this.processor = processor;
2219             }         
2220             
2221             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor) {
2222                 this.processor = processor;
2223             }
2224             
2225             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
2226                 DocumentLengthWordCount result = new DocumentLengthWordCount();
2227                 if (object == null) return result;
2228                 result.document = object.document; 
2229                 result.length = object.length; 
2230                 result.word = object.word; 
2231                 result.count = object.count; 
2232                 return result;
2233             }                 
2234             
2235             public void processWord(String word) throws IOException {
2236                 last.word = word;
2237             }   
2238                 
2239             public void processDocument(String document) throws IOException {
2240                 last.document = document;
2241             }   
2242                 
2243             
2244             public void processTuple(int length, int count) throws IOException {
2245                 last.length = length;
2246                 last.count = count;
2247                 processor.process(clone(last));
2248             }               
2249             
2250             public void close() throws IOException {
2251                 processor.close();
2252             }
2253         }     
2254         public static class TupleShredder implements Processor {
2255             DocumentLengthWordCount last = new DocumentLengthWordCount();
2256             public ShreddedProcessor processor;
2257             
2258             public TupleShredder(ShreddedProcessor processor) {
2259                 this.processor = processor;
2260             }                              
2261             
2262             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
2263                 DocumentLengthWordCount result = new DocumentLengthWordCount();
2264                 if (object == null) return result;
2265                 result.document = object.document; 
2266                 result.length = object.length; 
2267                 result.word = object.word; 
2268                 result.count = object.count; 
2269                 return result;
2270             }                 
2271             
2272             public void process(DocumentLengthWordCount object) throws IOException {                                                                                                                                                   
2273                 boolean processAll = false;
2274                 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
2275                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
2276                 processor.processTuple(object.length, object.count);                                         
2277             }
2278                           
2279             public Class<DocumentLengthWordCount> getInputClass() {
2280                 return DocumentLengthWordCount.class;
2281             }
2282             
2283             public void close() throws IOException {
2284                 processor.close();
2285             }                     
2286         }
2287     } 
2288     public static class DocumentOrder implements Order<DocumentLengthWordCount> {
2289         public int hash(DocumentLengthWordCount object) {
2290             int h = 0;
2291             h += Utility.hash(object.document);
2292             return h;
2293         } 
2294         public Comparator<DocumentLengthWordCount> greaterThan() {
2295             return new Comparator<DocumentLengthWordCount>() {
2296                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
2297                     int result = 0;
2298                     do {
2299                         result = + Utility.compare(one.document, two.document);
2300                         if(result != 0) break;
2301                     } while (false);
2302                     return -result;
2303                 }
2304             };
2305         }     
2306         public Comparator<DocumentLengthWordCount> lessThan() {
2307             return new Comparator<DocumentLengthWordCount>() {
2308                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
2309                     int result = 0;
2310                     do {
2311                         result = + Utility.compare(one.document, two.document);
2312                         if(result != 0) break;
2313                     } while (false);
2314                     return result;
2315                 }
2316             };
2317         }     
2318         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input) {
2319             return new ShreddedReader(_input);
2320         }    
2321 
2322         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input, int bufferSize) {
2323             return new ShreddedReader(_input, bufferSize);
2324         }    
2325         public OrderedWriter<DocumentLengthWordCount> orderedWriter(ArrayOutput _output) {
2326             ShreddedWriter w = new ShreddedWriter(_output);
2327             return new OrderedWriterClass(w); 
2328         }                                    
2329         public static class OrderedWriterClass extends OrderedWriter< DocumentLengthWordCount > {
2330             DocumentLengthWordCount last = null;
2331             ShreddedWriter shreddedWriter = null; 
2332             
2333             public OrderedWriterClass(ShreddedWriter s) {
2334                 this.shreddedWriter = s;
2335             }
2336             
2337             public void process(DocumentLengthWordCount object) throws IOException {
2338                boolean processAll = false;
2339                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
2340                shreddedWriter.processTuple(object.length, object.word, object.count);
2341                last = object;
2342             }           
2343                  
2344             public void close() throws IOException {
2345                 shreddedWriter.close();
2346             }
2347             
2348             public Class<DocumentLengthWordCount> getInputClass() {
2349                 return DocumentLengthWordCount.class;
2350             }
2351         } 
2352         public ReaderSource<DocumentLengthWordCount> orderedCombiner(Collection<TypeReader<DocumentLengthWordCount>> readers, boolean closeOnExit) {
2353             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
2354             
2355             for (TypeReader<DocumentLengthWordCount> reader : readers) {
2356                 shreddedReaders.add((ShreddedReader)reader);
2357             }
2358             
2359             return new ShreddedCombiner(shreddedReaders, closeOnExit);
2360         }                  
2361         public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
2362             DocumentLengthWordCount result = new DocumentLengthWordCount();
2363             if (object == null) return result;
2364             result.document = object.document; 
2365             result.length = object.length; 
2366             result.word = object.word; 
2367             result.count = object.count; 
2368             return result;
2369         }                 
2370         public Class<DocumentLengthWordCount> getOrderedClass() {
2371             return DocumentLengthWordCount.class;
2372         }                           
2373         public String[] getOrderSpec() {
2374             return new String[] {"+document"};
2375         }
2376 
2377         public static String getSpecString() {
2378             return "+document";
2379         }
2380                            
2381         public interface ShreddedProcessor extends Step {
2382             public void processDocument(String document) throws IOException;
2383             public void processTuple(int length, String word, int count) throws IOException;
2384             public void close() throws IOException;
2385         }    
2386         public interface ShreddedSource extends Step {
2387         }                                              
2388         
2389         public static class ShreddedWriter implements ShreddedProcessor {
2390             ArrayOutput output;
2391             ShreddedBuffer buffer = new ShreddedBuffer();
2392             String lastDocument;
2393             boolean lastFlush = false;
2394             
2395             public ShreddedWriter(ArrayOutput output) {
2396                 this.output = output;
2397             }                        
2398             
2399             public void close() throws IOException {
2400                 flush();
2401             }
2402             
2403             public void processDocument(String document) {
2404                 lastDocument = document;
2405                 buffer.processDocument(document);
2406             }
2407             public final void processTuple(int length, String word, int count) throws IOException {
2408                 if (lastFlush) {
2409                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
2410                     lastFlush = false;
2411                 }
2412                 buffer.processTuple(length, word, count);
2413                 if (buffer.isFull())
2414                     flush();
2415             }
2416             public final void flushTuples(int pauseIndex) throws IOException {
2417                 
2418                 while (buffer.getReadIndex() < pauseIndex) {
2419                            
2420                     output.writeInt(buffer.getLength());
2421                     output.writeString(buffer.getWord());
2422                     output.writeInt(buffer.getCount());
2423                     buffer.incrementTuple();
2424                 }
2425             }  
2426             public final void flushDocument(int pauseIndex) throws IOException {
2427                 while (buffer.getReadIndex() < pauseIndex) {
2428                     int nextPause = buffer.getDocumentEndIndex();
2429                     int count = nextPause - buffer.getReadIndex();
2430                     
2431                     output.writeString(buffer.getDocument());
2432                     output.writeInt(count);
2433                     buffer.incrementDocument();
2434                       
2435                     flushTuples(nextPause);
2436                     assert nextPause == buffer.getReadIndex();
2437                 }
2438             }
2439             public void flush() throws IOException { 
2440                 flushDocument(buffer.getWriteIndex());
2441                 buffer.reset(); 
2442                 lastFlush = true;
2443             }                           
2444         }
2445         public static class ShreddedBuffer {
2446             ArrayList<String> documents = new ArrayList();
2447             ArrayList<Integer> documentTupleIdx = new ArrayList();
2448             int documentReadIdx = 0;
2449                             
2450             int[] lengths;
2451             String[] words;
2452             int[] counts;
2453             int writeTupleIndex = 0;
2454             int readTupleIndex = 0;
2455             int batchSize;
2456 
2457             public ShreddedBuffer(int batchSize) {
2458                 this.batchSize = batchSize;
2459 
2460                 lengths = new int[batchSize];
2461                 words = new String[batchSize];
2462                 counts = new int[batchSize];
2463             }                              
2464 
2465             public ShreddedBuffer() {    
2466                 this(10000);
2467             }                                                                                                                    
2468             
2469             public void processDocument(String document) {
2470                 documents.add(document);
2471                 documentTupleIdx.add(writeTupleIndex);
2472             }                                      
2473             public void processTuple(int length, String word, int count) {
2474                 assert documents.size() > 0;
2475                 lengths[writeTupleIndex] = length;
2476                 words[writeTupleIndex] = word;
2477                 counts[writeTupleIndex] = count;
2478                 writeTupleIndex++;
2479             }
2480             public void resetData() {
2481                 documents.clear();
2482                 documentTupleIdx.clear();
2483                 writeTupleIndex = 0;
2484             }                  
2485                                  
2486             public void resetRead() {
2487                 readTupleIndex = 0;
2488                 documentReadIdx = 0;
2489             } 
2490 
2491             public void reset() {
2492                 resetData();
2493                 resetRead();
2494             } 
2495             public boolean isFull() {
2496                 return writeTupleIndex >= batchSize;
2497             }
2498 
2499             public boolean isEmpty() {
2500                 return writeTupleIndex == 0;
2501             }                          
2502 
2503             public boolean isAtEnd() {
2504                 return readTupleIndex >= writeTupleIndex;
2505             }           
2506             public void incrementDocument() {
2507                 documentReadIdx++;  
2508             }                                                                                              
2509 
2510             public void autoIncrementDocument() {
2511                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
2512                     documentReadIdx++;
2513             }                 
2514             public void incrementTuple() {
2515                 readTupleIndex++;
2516             }                    
2517             public int getDocumentEndIndex() {
2518                 if ((documentReadIdx+1) >= documentTupleIdx.size())
2519                     return writeTupleIndex;
2520                 return documentTupleIdx.get(documentReadIdx+1);
2521             }
2522             public int getReadIndex() {
2523                 return readTupleIndex;
2524             }   
2525 
2526             public int getWriteIndex() {
2527                 return writeTupleIndex;
2528             } 
2529             public String getDocument() {
2530                 assert readTupleIndex < writeTupleIndex;
2531                 assert documentReadIdx < documents.size();
2532                 
2533                 return documents.get(documentReadIdx);
2534             }
2535             public int getLength() {
2536                 assert readTupleIndex < writeTupleIndex;
2537                 return lengths[readTupleIndex];
2538             }                                         
2539             public String getWord() {
2540                 assert readTupleIndex < writeTupleIndex;
2541                 return words[readTupleIndex];
2542             }                                         
2543             public int getCount() {
2544                 assert readTupleIndex < writeTupleIndex;
2545                 return counts[readTupleIndex];
2546             }                                         
2547             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
2548                 while (getReadIndex() < endIndex) {
2549                    output.processTuple(getLength(), getWord(), getCount());
2550                    incrementTuple();
2551                 }
2552             }                                                                           
2553             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
2554                 while (getReadIndex() < endIndex) {
2555                     output.processDocument(getDocument());
2556                     assert getDocumentEndIndex() <= endIndex;
2557                     copyTuples(getDocumentEndIndex(), output);
2558                     incrementDocument();
2559                 }
2560             }  
2561             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
2562                 while (!isAtEnd()) {
2563                     if (other != null) {   
2564                         assert !other.isAtEnd();
2565                         int c = + Utility.compare(getDocument(), other.getDocument());
2566                     
2567                         if (c > 0) {
2568                             break;   
2569                         }
2570                         
2571                         output.processDocument(getDocument());
2572                                       
2573                         copyTuples(getDocumentEndIndex(), output);
2574                     } else {
2575                         output.processDocument(getDocument());
2576                         copyTuples(getDocumentEndIndex(), output);
2577                     }
2578                     incrementDocument();  
2579                     
2580                
2581                 }
2582             }
2583             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
2584                 copyUntilDocument(other, output);
2585             }
2586             
2587         }                         
2588         public static class ShreddedCombiner implements ReaderSource<DocumentLengthWordCount>, ShreddedSource {   
2589             public ShreddedProcessor processor;
2590             Collection<ShreddedReader> readers;       
2591             boolean closeOnExit = false;
2592             boolean uninitialized = true;
2593             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
2594             
2595             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
2596                 this.readers = readers;                                                       
2597                 this.closeOnExit = closeOnExit;
2598             }
2599                                   
2600             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
2601                 if (processor instanceof ShreddedProcessor) {
2602                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
2603                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
2604                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
2605                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
2606                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
2607                 } else {
2608                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
2609                 }
2610             }                                
2611             
2612             public Class<DocumentLengthWordCount> getOutputClass() {
2613                 return DocumentLengthWordCount.class;
2614             }
2615             
2616             public void initialize() throws IOException {
2617                 for (ShreddedReader reader : readers) {
2618                     reader.fill();                                        
2619                     
2620                     if (!reader.getBuffer().isAtEnd())
2621                         queue.add(reader);
2622                 }   
2623 
2624                 uninitialized = false;
2625             }
2626 
2627             public void run() throws IOException {
2628                 initialize();
2629                
2630                 while (queue.size() > 0) {
2631                     ShreddedReader top = queue.poll();
2632                     ShreddedReader next = null;
2633                     ShreddedBuffer nextBuffer = null; 
2634                     
2635                     assert !top.getBuffer().isAtEnd();
2636                                                   
2637                     if (queue.size() > 0) {
2638                         next = queue.peek();
2639                         nextBuffer = next.getBuffer();
2640                         assert !nextBuffer.isAtEnd();
2641                     }
2642                     
2643                     top.getBuffer().copyUntil(nextBuffer, processor);
2644                     if (top.getBuffer().isAtEnd())
2645                         top.fill();                 
2646                         
2647                     if (!top.getBuffer().isAtEnd())
2648                         queue.add(top);
2649                 }              
2650                 
2651                 if (closeOnExit)
2652                     processor.close();
2653             }
2654 
2655             public DocumentLengthWordCount read() throws IOException {
2656                 if (uninitialized)
2657                     initialize();
2658 
2659                 DocumentLengthWordCount result = null;
2660 
2661                 while (queue.size() > 0) {
2662                     ShreddedReader top = queue.poll();
2663                     result = top.read();
2664 
2665                     if (result != null) {
2666                         if (top.getBuffer().isAtEnd())
2667                             top.fill();
2668 
2669                         queue.offer(top);
2670                         break;
2671                     } 
2672                 }
2673 
2674                 return result;
2675             }
2676         } 
2677         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentLengthWordCount>, ShreddedSource {      
2678             public ShreddedProcessor processor;
2679             ShreddedBuffer buffer;
2680             DocumentLengthWordCount last = new DocumentLengthWordCount();         
2681             long updateDocumentCount = -1;
2682             long tupleCount = 0;
2683             long bufferStartCount = 0;  
2684             ArrayInput input;
2685             
2686             public ShreddedReader(ArrayInput input) {
2687                 this.input = input; 
2688                 this.buffer = new ShreddedBuffer();
2689             }                               
2690             
2691             public ShreddedReader(ArrayInput input, int bufferSize) { 
2692                 this.input = input;
2693                 this.buffer = new ShreddedBuffer(bufferSize);
2694             }
2695                  
2696             public final int compareTo(ShreddedReader other) {
2697                 ShreddedBuffer otherBuffer = other.getBuffer();
2698                 
2699                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
2700                     return 0;                 
2701                 } else if (buffer.isAtEnd()) {
2702                     return -1;
2703                 } else if (otherBuffer.isAtEnd()) {
2704                     return 1;
2705                 }
2706                                    
2707                 int result = 0;
2708                 do {
2709                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
2710                     if(result != 0) break;
2711                 } while (false);                                             
2712                 
2713                 return result;
2714             }
2715             
2716             public final ShreddedBuffer getBuffer() {
2717                 return buffer;
2718             }                
2719             
2720             public final DocumentLengthWordCount read() throws IOException {
2721                 if (buffer.isAtEnd()) {
2722                     fill();             
2723                 
2724                     if (buffer.isAtEnd()) {
2725                         return null;
2726                     }
2727                 }
2728                       
2729                 assert !buffer.isAtEnd();
2730                 DocumentLengthWordCount result = new DocumentLengthWordCount();
2731                 
2732                 result.document = buffer.getDocument();
2733                 result.length = buffer.getLength();
2734                 result.word = buffer.getWord();
2735                 result.count = buffer.getCount();
2736                 
2737                 buffer.incrementTuple();
2738                 buffer.autoIncrementDocument();
2739                 
2740                 return result;
2741             }           
2742             
2743             public final void fill() throws IOException {
2744                 try {   
2745                     buffer.reset();
2746                     
2747                     if (tupleCount != 0) {
2748                                                       
2749                         if(updateDocumentCount - tupleCount > 0) {
2750                             buffer.documents.add(last.document);
2751                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
2752                         }
2753                         bufferStartCount = tupleCount;
2754                     }
2755                     
2756                     while (!buffer.isFull()) {
2757                         updateDocument();
2758                         buffer.processTuple(input.readInt(), input.readString(), input.readInt());
2759                         tupleCount++;
2760                     }
2761                 } catch(EOFException e) {}
2762             }
2763 
2764             public final void updateDocument() throws IOException {
2765                 if (updateDocumentCount > tupleCount)
2766                     return;
2767                      
2768                 last.document = input.readString();
2769                 updateDocumentCount = tupleCount + input.readInt();
2770                                       
2771                 buffer.processDocument(last.document);
2772             }
2773 
2774             public void run() throws IOException {
2775                 while (true) {
2776                     fill();
2777                     
2778                     if (buffer.isAtEnd())
2779                         break;
2780                     
2781                     buffer.copyUntil(null, processor);
2782                 }      
2783                 processor.close();
2784             }
2785             
2786             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
2787                 if (processor instanceof ShreddedProcessor) {
2788                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
2789                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
2790                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
2791                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
2792                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
2793                 } else {
2794                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
2795                 }
2796             }                                
2797             
2798             public Class<DocumentLengthWordCount> getOutputClass() {
2799                 return DocumentLengthWordCount.class;
2800             }                
2801         }
2802         
2803         public static class DuplicateEliminator implements ShreddedProcessor {
2804             public ShreddedProcessor processor;
2805             DocumentLengthWordCount last = new DocumentLengthWordCount();
2806             boolean documentProcess = true;
2807                                            
2808             public DuplicateEliminator() {}
2809             public DuplicateEliminator(ShreddedProcessor processor) {
2810                 this.processor = processor;
2811             }
2812             
2813             public void setShreddedProcessor(ShreddedProcessor processor) {
2814                 this.processor = processor;
2815             }
2816 
2817             public void processDocument(String document) throws IOException {  
2818                 if (documentProcess || Utility.compare(document, last.document) != 0) {
2819                     last.document = document;
2820                     processor.processDocument(document);
2821                     documentProcess = false;
2822                 }
2823             }  
2824             
2825             public void resetDocument() {
2826                  documentProcess = true;
2827             }                                                
2828                                
2829             public void processTuple(int length, String word, int count) throws IOException {
2830                 processor.processTuple(length, word, count);
2831             } 
2832             
2833             public void close() throws IOException {
2834                 processor.close();
2835             }                    
2836         }
2837         public static class TupleUnshredder implements ShreddedProcessor {
2838             DocumentLengthWordCount last = new DocumentLengthWordCount();
2839             public org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor;                               
2840             
2841             public TupleUnshredder(DocumentLengthWordCount.Processor processor) {
2842                 this.processor = processor;
2843             }         
2844             
2845             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor) {
2846                 this.processor = processor;
2847             }
2848             
2849             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
2850                 DocumentLengthWordCount result = new DocumentLengthWordCount();
2851                 if (object == null) return result;
2852                 result.document = object.document; 
2853                 result.length = object.length; 
2854                 result.word = object.word; 
2855                 result.count = object.count; 
2856                 return result;
2857             }                 
2858             
2859             public void processDocument(String document) throws IOException {
2860                 last.document = document;
2861             }   
2862                 
2863             
2864             public void processTuple(int length, String word, int count) throws IOException {
2865                 last.length = length;
2866                 last.word = word;
2867                 last.count = count;
2868                 processor.process(clone(last));
2869             }               
2870             
2871             public void close() throws IOException {
2872                 processor.close();
2873             }
2874         }     
2875         public static class TupleShredder implements Processor {
2876             DocumentLengthWordCount last = new DocumentLengthWordCount();
2877             public ShreddedProcessor processor;
2878             
2879             public TupleShredder(ShreddedProcessor processor) {
2880                 this.processor = processor;
2881             }                              
2882             
2883             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
2884                 DocumentLengthWordCount result = new DocumentLengthWordCount();
2885                 if (object == null) return result;
2886                 result.document = object.document; 
2887                 result.length = object.length; 
2888                 result.word = object.word; 
2889                 result.count = object.count; 
2890                 return result;
2891             }                 
2892             
2893             public void process(DocumentLengthWordCount object) throws IOException {                                                                                                                                                   
2894                 boolean processAll = false;
2895                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
2896                 processor.processTuple(object.length, object.word, object.count);                                         
2897             }
2898                           
2899             public Class<DocumentLengthWordCount> getInputClass() {
2900                 return DocumentLengthWordCount.class;
2901             }
2902             
2903             public void close() throws IOException {
2904                 processor.close();
2905             }                     
2906         }
2907     } 
2908 }