Coverage Report - org.galagosearch.core.types.DocumentLengthWordCount
 
Classes in this File Line Coverage Branch Coverage Complexity
DocumentLengthWordCount
82%
14/17
62%
5/8
0
DocumentLengthWordCount$DocumentLengthOrder
73%
19/26
25%
1/4
0
DocumentLengthWordCount$DocumentLengthOrder$1
100%
7/7
75%
3/4
0
DocumentLengthWordCount$DocumentLengthOrder$2
100%
7/7
75%
3/4
0
DocumentLengthWordCount$DocumentLengthOrder$DuplicateEliminator
79%
23/29
100%
8/8
0
DocumentLengthWordCount$DocumentLengthOrder$OrderedWriterClass
93%
14/15
83%
10/12
0
DocumentLengthWordCount$DocumentLengthOrder$ShreddedBuffer
92%
115/125
62%
56/90
0
DocumentLengthWordCount$DocumentLengthOrder$ShreddedCombiner
65%
36/55
47%
17/36
0
DocumentLengthWordCount$DocumentLengthOrder$ShreddedProcessor
N/A
N/A
0
DocumentLengthWordCount$DocumentLengthOrder$ShreddedReader
66%
56/85
48%
19/40
0
DocumentLengthWordCount$DocumentLengthOrder$ShreddedSource
N/A
N/A
0
DocumentLengthWordCount$DocumentLengthOrder$ShreddedWriter
100%
51/51
73%
16/22
0
DocumentLengthWordCount$DocumentLengthOrder$TupleShredder
0%
0/20
0%
0/14
0
DocumentLengthWordCount$DocumentLengthOrder$TupleUnshredder
0%
0/24
0%
0/2
0
DocumentLengthWordCount$DocumentOrder
8%
2/25
0%
0/4
0
DocumentLengthWordCount$DocumentOrder$1
0%
0/5
0%
0/2
0
DocumentLengthWordCount$DocumentOrder$2
0%
0/5
0%
0/2
0
DocumentLengthWordCount$DocumentOrder$DuplicateEliminator
0%
0/19
0%
0/4
0
DocumentLengthWordCount$DocumentOrder$OrderedWriterClass
0%
0/14
0%
0/6
0
DocumentLengthWordCount$DocumentOrder$ShreddedBuffer
0%
0/82
0%
0/54
0
DocumentLengthWordCount$DocumentOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentLengthWordCount$DocumentOrder$ShreddedProcessor
N/A
N/A
0
DocumentLengthWordCount$DocumentOrder$ShreddedReader
0%
0/71
0%
0/34
0
DocumentLengthWordCount$DocumentOrder$ShreddedSource
N/A
N/A
0
DocumentLengthWordCount$DocumentOrder$ShreddedWriter
0%
0/38
0%
0/14
0
DocumentLengthWordCount$DocumentOrder$TupleShredder
0%
0/19
0%
0/8
0
DocumentLengthWordCount$DocumentOrder$TupleUnshredder
0%
0/23
0%
0/2
0
DocumentLengthWordCount$DocumentWordOrder
0%
0/26
0%
0/4
0
DocumentLengthWordCount$DocumentWordOrder$1
0%
0/7
0%
0/4
0
DocumentLengthWordCount$DocumentWordOrder$2
0%
0/7
0%
0/4
0
DocumentLengthWordCount$DocumentWordOrder$DuplicateEliminator
0%
0/29
0%
0/8
0
DocumentLengthWordCount$DocumentWordOrder$OrderedWriterClass
0%
0/15
0%
0/12
0
DocumentLengthWordCount$DocumentWordOrder$ShreddedBuffer
0%
0/125
0%
0/90
0
DocumentLengthWordCount$DocumentWordOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentLengthWordCount$DocumentWordOrder$ShreddedProcessor
N/A
N/A
0
DocumentLengthWordCount$DocumentWordOrder$ShreddedReader
0%
0/85
0%
0/40
0
DocumentLengthWordCount$DocumentWordOrder$ShreddedSource
N/A
N/A
0
DocumentLengthWordCount$DocumentWordOrder$ShreddedWriter
0%
0/51
0%
0/22
0
DocumentLengthWordCount$DocumentWordOrder$TupleShredder
0%
0/20
0%
0/14
0
DocumentLengthWordCount$DocumentWordOrder$TupleUnshredder
0%
0/24
0%
0/2
0
DocumentLengthWordCount$Processor
N/A
N/A
0
DocumentLengthWordCount$Source
N/A
N/A
0
DocumentLengthWordCount$WordDocumentOrder
0%
0/26
0%
0/4
0
DocumentLengthWordCount$WordDocumentOrder$1
0%
0/7
0%
0/4
0
DocumentLengthWordCount$WordDocumentOrder$2
0%
0/7
0%
0/4
0
DocumentLengthWordCount$WordDocumentOrder$DuplicateEliminator
0%
0/29
0%
0/8
0
DocumentLengthWordCount$WordDocumentOrder$OrderedWriterClass
0%
0/15
0%
0/12
0
DocumentLengthWordCount$WordDocumentOrder$ShreddedBuffer
0%
0/125
0%
0/90
0
DocumentLengthWordCount$WordDocumentOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentLengthWordCount$WordDocumentOrder$ShreddedProcessor
N/A
N/A
0
DocumentLengthWordCount$WordDocumentOrder$ShreddedReader
0%
0/85
0%
0/40
0
DocumentLengthWordCount$WordDocumentOrder$ShreddedSource
N/A
N/A
0
DocumentLengthWordCount$WordDocumentOrder$ShreddedWriter
0%
0/51
0%
0/22
0
DocumentLengthWordCount$WordDocumentOrder$TupleShredder
0%
0/20
0%
0/14
0
DocumentLengthWordCount$WordDocumentOrder$TupleUnshredder
0%
0/24
0%
0/2
0
 
 1  
 // This file was automatically generated with the command: 
 2  
 //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
 3  
 package org.galagosearch.core.types;
 4  
 
 5  
 import org.galagosearch.tupleflow.Utility;
 6  
 import org.galagosearch.tupleflow.ArrayInput;
 7  
 import org.galagosearch.tupleflow.ArrayOutput;
 8  
 import org.galagosearch.tupleflow.Order;   
 9  
 import org.galagosearch.tupleflow.OrderedWriter;
 10  
 import org.galagosearch.tupleflow.Type; 
 11  
 import org.galagosearch.tupleflow.TypeReader;
 12  
 import org.galagosearch.tupleflow.Step; 
 13  
 import org.galagosearch.tupleflow.IncompatibleProcessorException;
 14  
 import org.galagosearch.tupleflow.ReaderSource;
 15  
 import java.io.IOException;             
 16  
 import java.io.EOFException;
 17  
 import java.io.UnsupportedEncodingException;
 18  
 import java.util.ArrayList;
 19  
 import java.util.Arrays;   
 20  
 import java.util.Comparator;
 21  
 import java.util.PriorityQueue;
 22  
 import java.util.Collection;
 23  
 
 24  
 public class DocumentLengthWordCount implements Type<DocumentLengthWordCount> {
 25  
     public String document;
 26  
     public int length;
 27  
     public String word;
 28  
     public int count; 
 29  
     
 30  159864
     public DocumentLengthWordCount() {}
 31  160016
     public DocumentLengthWordCount(String document, int length, String word, int count) {
 32  160016
         this.document = document;
 33  160016
         this.length = length;
 34  160016
         this.word = word;
 35  160016
         this.count = count;
 36  160016
     }  
 37  
     
 38  
     public String toString() {
 39  80008
             return String.format("%s,%d,%s,%d",
 40  
                                    document, length, word, count);
 41  
     } 
 42  
 
 43  
     public Order<DocumentLengthWordCount> getOrder(String... spec) {
 44  8
         if (Arrays.equals(spec, new String[] { "+document", "+length" })) {
 45  0
             return new DocumentLengthOrder();
 46  
         }
 47  8
         if (Arrays.equals(spec, new String[] { "+document", "+word" })) {
 48  0
             return new DocumentWordOrder();
 49  
         }
 50  8
         if (Arrays.equals(spec, new String[] { "+word", "+document" })) {
 51  0
             return new WordDocumentOrder();
 52  
         }
 53  8
         if (Arrays.equals(spec, new String[] { "+document" })) {
 54  4
             return new DocumentOrder();
 55  
         }
 56  4
         return null;
 57  
     } 
 58  
       
 59  
     public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> {
 60  
         public void process(DocumentLengthWordCount object) throws IOException;
 61  
         public void close() throws IOException;
 62  
     }                        
 63  
     public interface Source extends Step {
 64  
     }
 65  36
     public static class DocumentLengthOrder implements Order<DocumentLengthWordCount> {
 66  
         public int hash(DocumentLengthWordCount object) {
 67  16
             int h = 0;
 68  16
             h += Utility.hash(object.document);
 69  16
             h += Utility.hash(object.length);
 70  16
             return h;
 71  
         } 
 72  
         public Comparator<DocumentLengthWordCount> greaterThan() {
 73  4
             return new Comparator<DocumentLengthWordCount>() {
 74  16
                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
 75  12
                     int result = 0;
 76  
                     do {
 77  12
                         result = + Utility.compare(one.document, two.document);
 78  12
                         if(result != 0) break;
 79  8
                         result = + Utility.compare(one.length, two.length);
 80  8
                         if(result != 0) break;
 81  
                     } while (false);
 82  12
                     return -result;
 83  
                 }
 84  
             };
 85  
         }     
 86  
         public Comparator<DocumentLengthWordCount> lessThan() {
 87  8
             return new Comparator<DocumentLengthWordCount>() {
 88  40020
                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
 89  40012
                     int result = 0;
 90  
                     do {
 91  40012
                         result = + Utility.compare(one.document, two.document);
 92  40012
                         if(result != 0) break;
 93  40008
                         result = + Utility.compare(one.length, two.length);
 94  40008
                         if(result != 0) break;
 95  
                     } while (false);
 96  40012
                     return result;
 97  
                 }
 98  
             };
 99  
         }     
 100  
         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input) {
 101  8
             return new ShreddedReader(_input);
 102  
         }    
 103  
 
 104  
         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input, int bufferSize) {
 105  0
             return new ShreddedReader(_input, bufferSize);
 106  
         }    
 107  
         public OrderedWriter<DocumentLengthWordCount> orderedWriter(ArrayOutput _output) {
 108  16
             ShreddedWriter w = new ShreddedWriter(_output);
 109  16
             return new OrderedWriterClass(w); 
 110  
         }                                    
 111  159800
         public static class OrderedWriterClass extends OrderedWriter< DocumentLengthWordCount > {
 112  16
             DocumentLengthWordCount last = null;
 113  16
             ShreddedWriter shreddedWriter = null; 
 114  
             
 115  16
             public OrderedWriterClass(ShreddedWriter s) {
 116  16
                 this.shreddedWriter = s;
 117  16
             }
 118  
             
 119  
             public void process(DocumentLengthWordCount object) throws IOException {
 120  159800
                boolean processAll = false;
 121  159800
                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
 122  159800
                if (processAll || last == null || 0 != Utility.compare(object.length, last.length)) { processAll = true; shreddedWriter.processLength(object.length); }
 123  159800
                shreddedWriter.processTuple(object.word, object.count);
 124  159800
                last = object;
 125  159800
             }           
 126  
                  
 127  
             public void close() throws IOException {
 128  12
                 shreddedWriter.close();
 129  12
             }
 130  
             
 131  
             public Class<DocumentLengthWordCount> getInputClass() {
 132  0
                 return DocumentLengthWordCount.class;
 133  
             }
 134  
         } 
 135  
         public ReaderSource<DocumentLengthWordCount> orderedCombiner(Collection<TypeReader<DocumentLengthWordCount>> readers, boolean closeOnExit) {
 136  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 137  
             
 138  0
             for (TypeReader<DocumentLengthWordCount> reader : readers) {
 139  0
                 shreddedReaders.add((ShreddedReader)reader);
 140  
             }
 141  
             
 142  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 143  
         }                  
 144  
         public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
 145  4
             DocumentLengthWordCount result = new DocumentLengthWordCount();
 146  4
             if (object == null) return result;
 147  4
             result.document = object.document; 
 148  4
             result.length = object.length; 
 149  4
             result.word = object.word; 
 150  4
             result.count = object.count; 
 151  4
             return result;
 152  
         }                 
 153  
         public Class<DocumentLengthWordCount> getOrderedClass() {
 154  4
             return DocumentLengthWordCount.class;
 155  
         }                           
 156  
         public String[] getOrderSpec() {
 157  0
             return new String[] {"+document", "+length"};
 158  
         }
 159  
 
 160  
         public static String getSpecString() {
 161  0
             return "+document +length";
 162  
         }
 163  
                            
 164  
         public interface ShreddedProcessor extends Step {
 165  
             public void processDocument(String document) throws IOException;
 166  
             public void processLength(int length) throws IOException;
 167  
             public void processTuple(String word, int count) throws IOException;
 168  
             public void close() throws IOException;
 169  
         }    
 170  
         public interface ShreddedSource extends Step {
 171  
         }                                              
 172  
         
 173  4
         public static class ShreddedWriter implements ShreddedProcessor {
 174  
             ArrayOutput output;
 175  16
             ShreddedBuffer buffer = new ShreddedBuffer();
 176  
             String lastDocument;
 177  
             int lastLength;
 178  16
             boolean lastFlush = false;
 179  
             
 180  16
             public ShreddedWriter(ArrayOutput output) {
 181  16
                 this.output = output;
 182  16
             }                        
 183  
             
 184  
             public void close() throws IOException {
 185  12
                 flush();
 186  12
             }
 187  
             
 188  
             public void processDocument(String document) {
 189  7996
                 lastDocument = document;
 190  7996
                 buffer.processDocument(document);
 191  7996
             }
 192  
             public void processLength(int length) {
 193  31964
                 lastLength = length;
 194  31964
                 buffer.processLength(length);
 195  31964
             }
 196  
             public final void processTuple(String word, int count) throws IOException {
 197  159800
                 if (lastFlush) {
 198  8
                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
 199  8
                     if(buffer.lengths.size() == 0) buffer.processLength(lastLength);
 200  8
                     lastFlush = false;
 201  
                 }
 202  159800
                 buffer.processTuple(word, count);
 203  159800
                 if (buffer.isFull())
 204  8
                     flush();
 205  159800
             }
 206  
             public final void flushTuples(int pauseIndex) throws IOException {
 207  
                 
 208  191764
                 while (buffer.getReadIndex() < pauseIndex) {
 209  
                            
 210  159800
                     output.writeString(buffer.getWord());
 211  159800
                     output.writeInt(buffer.getCount());
 212  159800
                     buffer.incrementTuple();
 213  
                 }
 214  31964
             }  
 215  
             public final void flushDocument(int pauseIndex) throws IOException {
 216  8016
                 while (buffer.getReadIndex() < pauseIndex) {
 217  7996
                     int nextPause = buffer.getDocumentEndIndex();
 218  7996
                     int count = nextPause - buffer.getReadIndex();
 219  
                     
 220  7996
                     output.writeString(buffer.getDocument());
 221  7996
                     output.writeInt(count);
 222  7996
                     buffer.incrementDocument();
 223  
                       
 224  7996
                     flushLength(nextPause);
 225  7996
                     assert nextPause == buffer.getReadIndex();
 226  7996
                 }
 227  20
             }
 228  
             public final void flushLength(int pauseIndex) throws IOException {
 229  39960
                 while (buffer.getReadIndex() < pauseIndex) {
 230  31964
                     int nextPause = buffer.getLengthEndIndex();
 231  31964
                     int count = nextPause - buffer.getReadIndex();
 232  
                     
 233  31964
                     output.writeInt(buffer.getLength());
 234  31964
                     output.writeInt(count);
 235  31964
                     buffer.incrementLength();
 236  
                       
 237  31964
                     flushTuples(nextPause);
 238  31964
                     assert nextPause == buffer.getReadIndex();
 239  31964
                 }
 240  7996
             }
 241  
             public void flush() throws IOException { 
 242  20
                 flushDocument(buffer.getWriteIndex());
 243  20
                 buffer.reset(); 
 244  20
                 lastFlush = true;
 245  20
             }                           
 246  
         }
 247  4
         public static class ShreddedBuffer {
 248  32
             ArrayList<String> documents = new ArrayList();
 249  32
             ArrayList<Integer> lengths = new ArrayList();
 250  32
             ArrayList<Integer> documentTupleIdx = new ArrayList();
 251  32
             ArrayList<Integer> lengthTupleIdx = new ArrayList();
 252  32
             int documentReadIdx = 0;
 253  32
             int lengthReadIdx = 0;
 254  
                             
 255  
             String[] words;
 256  
             int[] counts;
 257  32
             int writeTupleIndex = 0;
 258  32
             int readTupleIndex = 0;
 259  
             int batchSize;
 260  
 
 261  32
             public ShreddedBuffer(int batchSize) {
 262  32
                 this.batchSize = batchSize;
 263  
 
 264  32
                 words = new String[batchSize];
 265  32
                 counts = new int[batchSize];
 266  32
             }                              
 267  
 
 268  
             public ShreddedBuffer() {    
 269  32
                 this(10000);
 270  32
             }                                                                                                                    
 271  
             
 272  
             public void processDocument(String document) {
 273  15992
                 documents.add(document);
 274  15992
                 documentTupleIdx.add(writeTupleIndex);
 275  15992
             }                                      
 276  
             public void processLength(int length) {
 277  63928
                 lengths.add(length);
 278  63928
                 lengthTupleIdx.add(writeTupleIndex);
 279  63928
             }                                      
 280  
             public void processTuple(String word, int count) {
 281  319600
                 assert documents.size() > 0;
 282  319600
                 assert lengths.size() > 0;
 283  319600
                 words[writeTupleIndex] = word;
 284  319600
                 counts[writeTupleIndex] = count;
 285  319600
                 writeTupleIndex++;
 286  319600
             }
 287  
             public void resetData() {
 288  48
                 documents.clear();
 289  48
                 lengths.clear();
 290  48
                 documentTupleIdx.clear();
 291  48
                 lengthTupleIdx.clear();
 292  48
                 writeTupleIndex = 0;
 293  48
             }                  
 294  
                                  
 295  
             public void resetRead() {
 296  48
                 readTupleIndex = 0;
 297  48
                 documentReadIdx = 0;
 298  48
                 lengthReadIdx = 0;
 299  48
             } 
 300  
 
 301  
             public void reset() {
 302  48
                 resetData();
 303  48
                 resetRead();
 304  48
             } 
 305  
             public boolean isFull() {
 306  319628
                 return writeTupleIndex >= batchSize;
 307  
             }
 308  
 
 309  
             public boolean isEmpty() {
 310  0
                 return writeTupleIndex == 0;
 311  
             }                          
 312  
 
 313  
             public boolean isAtEnd() {
 314  243608
                 return readTupleIndex >= writeTupleIndex;
 315  
             }           
 316  
             public void incrementDocument() {
 317  9996
                 documentReadIdx++;  
 318  9996
             }                                                                                              
 319  
 
 320  
             public void autoIncrementDocument() {
 321  125788
                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
 322  5984
                     documentReadIdx++;
 323  119804
             }                 
 324  
             public void incrementLength() {
 325  39968
                 lengthReadIdx++;  
 326  39968
             }                                                                                              
 327  
 
 328  
             public void autoIncrementLength() {
 329  143748
                 while (readTupleIndex >= getLengthEndIndex() && readTupleIndex < writeTupleIndex)
 330  23948
                     lengthReadIdx++;
 331  119800
             }                 
 332  
             public void incrementTuple() {
 333  319600
                 readTupleIndex++;
 334  319600
             }                    
 335  
             public int getDocumentEndIndex() {
 336  135800
                 if ((documentReadIdx+1) >= documentTupleIdx.size())
 337  240
                     return writeTupleIndex;
 338  135560
                 return documentTupleIdx.get(documentReadIdx+1);
 339  
             }
 340  
 
 341  
             public int getLengthEndIndex() {
 342  191704
                 if ((lengthReadIdx+1) >= lengthTupleIdx.size())
 343  108
                     return writeTupleIndex;
 344  191596
                 return lengthTupleIdx.get(lengthReadIdx+1);
 345  
             }
 346  
             public int getReadIndex() {
 347  377652
                 return readTupleIndex;
 348  
             }   
 349  
 
 350  
             public int getWriteIndex() {
 351  20
                 return writeTupleIndex;
 352  
             } 
 353  
             public String getDocument() {
 354  133456
                 assert readTupleIndex < writeTupleIndex;
 355  133456
                 assert documentReadIdx < documents.size();
 356  
                 
 357  133456
                 return documents.get(documentReadIdx);
 358  
             }
 359  
             public int getLength() {
 360  159800
                 assert readTupleIndex < writeTupleIndex;
 361  159800
                 assert lengthReadIdx < lengths.size();
 362  
                 
 363  159800
                 return lengths.get(lengthReadIdx);
 364  
             }
 365  
             public String getWord() {
 366  319600
                 assert readTupleIndex < writeTupleIndex;
 367  319600
                 return words[readTupleIndex];
 368  
             }                                         
 369  
             public int getCount() {
 370  319600
                 assert readTupleIndex < writeTupleIndex;
 371  319600
                 return counts[readTupleIndex];
 372  
             }                                         
 373  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 374  48004
                 while (getReadIndex() < endIndex) {
 375  40000
                    output.processTuple(getWord(), getCount());
 376  40000
                    incrementTuple();
 377  
                 }
 378  8004
             }                                                                           
 379  
             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
 380  0
                 while (getReadIndex() < endIndex) {
 381  0
                     output.processDocument(getDocument());
 382  0
                     assert getDocumentEndIndex() <= endIndex;
 383  0
                     copyUntilIndexLength(getDocumentEndIndex(), output);
 384  0
                     incrementDocument();
 385  
                 }
 386  0
             } 
 387  
             public void copyUntilIndexLength(int endIndex, ShreddedProcessor output) throws IOException {
 388  9988
                 while (getReadIndex() < endIndex) {
 389  7988
                     output.processLength(getLength());
 390  7988
                     assert getLengthEndIndex() <= endIndex;
 391  7988
                     copyTuples(getLengthEndIndex(), output);
 392  7988
                     incrementLength();
 393  
                 }
 394  2000
             }  
 395  
             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 396  2020
                 while (!isAtEnd()) {
 397  2012
                     if (other != null) {   
 398  1812
                         assert !other.isAtEnd();
 399  1812
                         int c = + Utility.compare(getDocument(), other.getDocument());
 400  
                     
 401  1812
                         if (c > 0) {
 402  8
                             break;   
 403  
                         }
 404  
                         
 405  1804
                         output.processDocument(getDocument());
 406  
                                       
 407  1804
                         if (c < 0) {
 408  1800
                             copyUntilIndexLength(getDocumentEndIndex(), output);
 409  4
                         } else if (c == 0) {
 410  4
                             copyUntilLength(other, output);
 411  4
                             autoIncrementDocument();
 412  4
                             break;
 413  
                         }
 414  1800
                     } else {
 415  200
                         output.processDocument(getDocument());
 416  200
                         copyUntilIndexLength(getDocumentEndIndex(), output);
 417  
                     }
 418  2000
                     incrementDocument();  
 419  
                     
 420  
                
 421  
                 }
 422  20
             }
 423  
             public void copyUntilLength(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 424  16
                 while (!isAtEnd()) {
 425  16
                     if (other != null) {   
 426  16
                         assert !other.isAtEnd();
 427  16
                         int c = + Utility.compare(getLength(), other.getLength());
 428  
                     
 429  16
                         if (c > 0) {
 430  0
                             break;   
 431  
                         }
 432  
                         
 433  16
                         output.processLength(getLength());
 434  
                                       
 435  16
                         copyTuples(getLengthEndIndex(), output);
 436  16
                     } else {
 437  0
                         output.processLength(getLength());
 438  0
                         copyTuples(getLengthEndIndex(), output);
 439  
                     }
 440  16
                     incrementLength();  
 441  
                     
 442  16
                     if (getDocumentEndIndex() <= readTupleIndex)
 443  4
                         break;   
 444  
                 }
 445  4
             }
 446  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 447  20
                 copyUntilDocument(other, output);
 448  20
             }
 449  
             
 450  
         }                         
 451  4
         public static class ShreddedCombiner implements ReaderSource<DocumentLengthWordCount>, ShreddedSource {   
 452  
             public ShreddedProcessor processor;
 453  
             Collection<ShreddedReader> readers;       
 454  4
             boolean closeOnExit = false;
 455  4
             boolean uninitialized = true;
 456  4
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 457  
             
 458  4
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 459  4
                 this.readers = readers;                                                       
 460  4
                 this.closeOnExit = closeOnExit;
 461  4
             }
 462  
                                   
 463  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 464  4
                 if (processor instanceof ShreddedProcessor) {
 465  4
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 466  0
                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
 467  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
 468  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 469  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
 470  
                 } else {
 471  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 472  
                 }
 473  4
             }                                
 474  
             
 475  
             public Class<DocumentLengthWordCount> getOutputClass() {
 476  0
                 return DocumentLengthWordCount.class;
 477  
             }
 478  
             
 479  
             public void initialize() throws IOException {
 480  4
                 for (ShreddedReader reader : readers) {
 481  8
                     reader.fill();                                        
 482  
                     
 483  8
                     if (!reader.getBuffer().isAtEnd())
 484  8
                         queue.add(reader);
 485  
                 }   
 486  
 
 487  4
                 uninitialized = false;
 488  4
             }
 489  
 
 490  
             public void run() throws IOException {
 491  4
                 initialize();
 492  
                
 493  24
                 while (queue.size() > 0) {
 494  20
                     ShreddedReader top = queue.poll();
 495  20
                     ShreddedReader next = null;
 496  20
                     ShreddedBuffer nextBuffer = null; 
 497  
                     
 498  20
                     assert !top.getBuffer().isAtEnd();
 499  
                                                   
 500  20
                     if (queue.size() > 0) {
 501  16
                         next = queue.peek();
 502  16
                         nextBuffer = next.getBuffer();
 503  16
                         assert !nextBuffer.isAtEnd();
 504  
                     }
 505  
                     
 506  20
                     top.getBuffer().copyUntil(nextBuffer, processor);
 507  20
                     if (top.getBuffer().isAtEnd())
 508  8
                         top.fill();                 
 509  
                         
 510  20
                     if (!top.getBuffer().isAtEnd())
 511  12
                         queue.add(top);
 512  20
                 }              
 513  
                 
 514  4
                 if (closeOnExit)
 515  4
                     processor.close();
 516  4
             }
 517  
 
 518  
             public DocumentLengthWordCount read() throws IOException {
 519  0
                 if (uninitialized)
 520  0
                     initialize();
 521  
 
 522  0
                 DocumentLengthWordCount result = null;
 523  
 
 524  0
                 while (queue.size() > 0) {
 525  0
                     ShreddedReader top = queue.poll();
 526  0
                     result = top.read();
 527  
 
 528  0
                     if (result != null) {
 529  0
                         if (top.getBuffer().isAtEnd())
 530  0
                             top.fill();
 531  
 
 532  0
                         queue.offer(top);
 533  0
                         break;
 534  
                     } 
 535  0
                 }
 536  
 
 537  0
                 return result;
 538  
             }
 539  
         } 
 540  119820
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentLengthWordCount>, ShreddedSource {      
 541  
             public ShreddedProcessor processor;
 542  
             ShreddedBuffer buffer;
 543  16
             DocumentLengthWordCount last = new DocumentLengthWordCount();         
 544  16
             long updateDocumentCount = -1;
 545  16
             long updateLengthCount = -1;
 546  16
             long tupleCount = 0;
 547  16
             long bufferStartCount = 0;  
 548  
             ArrayInput input;
 549  
             
 550  16
             public ShreddedReader(ArrayInput input) {
 551  16
                 this.input = input; 
 552  16
                 this.buffer = new ShreddedBuffer();
 553  16
             }                               
 554  
             
 555  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 556  0
                 this.input = input;
 557  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 558  0
             }
 559  
                  
 560  
             public final int compareTo(ShreddedReader other) {
 561  16
                 ShreddedBuffer otherBuffer = other.getBuffer();
 562  
                 
 563  16
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 564  0
                     return 0;                 
 565  16
                 } else if (buffer.isAtEnd()) {
 566  0
                     return -1;
 567  16
                 } else if (otherBuffer.isAtEnd()) {
 568  0
                     return 1;
 569  
                 }
 570  
                                    
 571  16
                 int result = 0;
 572  
                 do {
 573  16
                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
 574  16
                     if(result != 0) break;
 575  0
                     result = + Utility.compare(buffer.getLength(), otherBuffer.getLength());
 576  0
                     if(result != 0) break;
 577  
                 } while (false);                                             
 578  
                 
 579  16
                 return result;
 580  
             }
 581  
             
 582  
             public final ShreddedBuffer getBuffer() {
 583  120
                 return buffer;
 584  
             }                
 585  
             
 586  
             public final DocumentLengthWordCount read() throws IOException {
 587  119800
                 if (buffer.isAtEnd()) {
 588  12
                     fill();             
 589  
                 
 590  12
                     if (buffer.isAtEnd()) {
 591  0
                         return null;
 592  
                     }
 593  
                 }
 594  
                       
 595  119800
                 assert !buffer.isAtEnd();
 596  119800
                 DocumentLengthWordCount result = new DocumentLengthWordCount();
 597  
                 
 598  119800
                 result.document = buffer.getDocument();
 599  119800
                 result.length = buffer.getLength();
 600  119800
                 result.word = buffer.getWord();
 601  119800
                 result.count = buffer.getCount();
 602  
                 
 603  119800
                 buffer.incrementTuple();
 604  119800
                 buffer.autoIncrementDocument();
 605  119800
                 buffer.autoIncrementLength();
 606  
                 
 607  119800
                 return result;
 608  
             }           
 609  
             
 610  
             public final void fill() throws IOException {
 611  
                 try {   
 612  28
                     buffer.reset();
 613  
                     
 614  28
                     if (tupleCount != 0) {
 615  
                                                       
 616  16
                         if(updateDocumentCount - tupleCount > 0) {
 617  0
                             buffer.documents.add(last.document);
 618  0
                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
 619  
                         }                              
 620  16
                         if(updateLengthCount - tupleCount > 0) {
 621  0
                             buffer.lengths.add(last.length);
 622  0
                             buffer.lengthTupleIdx.add((int) (updateLengthCount - tupleCount));
 623  
                         }
 624  16
                         bufferStartCount = tupleCount;
 625  
                     }
 626  
                     
 627  159828
                     while (!buffer.isFull()) {
 628  159820
                         updateLength();
 629  159800
                         buffer.processTuple(input.readString(), input.readInt());
 630  159800
                         tupleCount++;
 631  
                     }
 632  8
                 } catch(EOFException e) {}
 633  28
             }
 634  
 
 635  
             public final void updateDocument() throws IOException {
 636  31984
                 if (updateDocumentCount > tupleCount)
 637  23968
                     return;
 638  
                      
 639  8016
                 last.document = input.readString();
 640  7996
                 updateDocumentCount = tupleCount + input.readInt();
 641  
                                       
 642  7996
                 buffer.processDocument(last.document);
 643  7996
             }
 644  
             public final void updateLength() throws IOException {
 645  159820
                 if (updateLengthCount > tupleCount)
 646  127836
                     return;
 647  
                      
 648  31984
                 updateDocument();
 649  31964
                 last.length = input.readInt();
 650  31964
                 updateLengthCount = tupleCount + input.readInt();
 651  
                                       
 652  31964
                 buffer.processLength(last.length);
 653  31964
             }
 654  
 
 655  
             public void run() throws IOException {
 656  
                 while (true) {
 657  0
                     fill();
 658  
                     
 659  0
                     if (buffer.isAtEnd())
 660  0
                         break;
 661  
                     
 662  0
                     buffer.copyUntil(null, processor);
 663  
                 }      
 664  0
                 processor.close();
 665  0
             }
 666  
             
 667  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 668  0
                 if (processor instanceof ShreddedProcessor) {
 669  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 670  0
                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
 671  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
 672  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 673  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
 674  
                 } else {
 675  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 676  
                 }
 677  0
             }                                
 678  
             
 679  
             public Class<DocumentLengthWordCount> getOutputClass() {
 680  0
                 return DocumentLengthWordCount.class;
 681  
             }                
 682  
         }
 683  
         
 684  
         public static class DuplicateEliminator implements ShreddedProcessor {
 685  
             public ShreddedProcessor processor;
 686  8
             DocumentLengthWordCount last = new DocumentLengthWordCount();
 687  8
             boolean documentProcess = true;
 688  8
             boolean lengthProcess = true;
 689  
                                            
 690  0
             public DuplicateEliminator() {}
 691  8
             public DuplicateEliminator(ShreddedProcessor processor) {
 692  8
                 this.processor = processor;
 693  8
             }
 694  
             
 695  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 696  0
                 this.processor = processor;
 697  0
             }
 698  
 
 699  
             public void processDocument(String document) throws IOException {  
 700  2024
                 if (documentProcess || Utility.compare(document, last.document) != 0) {
 701  2004
                     last.document = document;
 702  2004
                     processor.processDocument(document);
 703  2004
             resetLength();
 704  2004
                     documentProcess = false;
 705  
                 }
 706  2024
             }
 707  
             public void processLength(int length) throws IOException {  
 708  8024
                 if (lengthProcess || Utility.compare(length, last.length) != 0) {
 709  8004
                     last.length = length;
 710  8004
                     processor.processLength(length);
 711  8004
                     lengthProcess = false;
 712  
                 }
 713  8024
             }  
 714  
             
 715  
             public void resetDocument() {
 716  0
                  documentProcess = true;
 717  0
             resetLength();
 718  0
             }                                                
 719  
             public void resetLength() {
 720  2004
                  lengthProcess = true;
 721  2004
             }                                                
 722  
                                
 723  
             public void processTuple(String word, int count) throws IOException {
 724  40020
                 processor.processTuple(word, count);
 725  40020
             } 
 726  
             
 727  
             public void close() throws IOException {
 728  4
                 processor.close();
 729  4
             }                    
 730  
         }
 731  
         public static class TupleUnshredder implements ShreddedProcessor {
 732  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();
 733  
             public org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor;                               
 734  
             
 735  0
             public TupleUnshredder(DocumentLengthWordCount.Processor processor) {
 736  0
                 this.processor = processor;
 737  0
             }         
 738  
             
 739  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor) {
 740  0
                 this.processor = processor;
 741  0
             }
 742  
             
 743  
             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
 744  0
                 DocumentLengthWordCount result = new DocumentLengthWordCount();
 745  0
                 if (object == null) return result;
 746  0
                 result.document = object.document; 
 747  0
                 result.length = object.length; 
 748  0
                 result.word = object.word; 
 749  0
                 result.count = object.count; 
 750  0
                 return result;
 751  
             }                 
 752  
             
 753  
             public void processDocument(String document) throws IOException {
 754  0
                 last.document = document;
 755  0
             }   
 756  
                 
 757  
             public void processLength(int length) throws IOException {
 758  0
                 last.length = length;
 759  0
             }   
 760  
                 
 761  
             
 762  
             public void processTuple(String word, int count) throws IOException {
 763  0
                 last.word = word;
 764  0
                 last.count = count;
 765  0
                 processor.process(clone(last));
 766  0
             }               
 767  
             
 768  
             public void close() throws IOException {
 769  0
                 processor.close();
 770  0
             }
 771  
         }     
 772  36
         public static class TupleShredder implements Processor {
 773  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();
 774  
             public ShreddedProcessor processor;
 775  
             
 776  0
             public TupleShredder(ShreddedProcessor processor) {
 777  0
                 this.processor = processor;
 778  0
             }                              
 779  
             
 780  
             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
 781  0
                 DocumentLengthWordCount result = new DocumentLengthWordCount();
 782  0
                 if (object == null) return result;
 783  0
                 result.document = object.document; 
 784  0
                 result.length = object.length; 
 785  0
                 result.word = object.word; 
 786  0
                 result.count = object.count; 
 787  0
                 return result;
 788  
             }                 
 789  
             
 790  
             public void process(DocumentLengthWordCount object) throws IOException {                                                                                                                                                   
 791  0
                 boolean processAll = false;
 792  0
                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
 793  0
                 if(last == null || Utility.compare(last.length, object.length) != 0 || processAll) { processor.processLength(object.length); processAll = true; }
 794  0
                 processor.processTuple(object.word, object.count);                                         
 795  0
             }
 796  
                           
 797  
             public Class<DocumentLengthWordCount> getInputClass() {
 798  0
                 return DocumentLengthWordCount.class;
 799  
             }
 800  
             
 801  
             public void close() throws IOException {
 802  0
                 processor.close();
 803  0
             }                     
 804  
         }
 805  
     } 
 806  0
     public static class DocumentWordOrder implements Order<DocumentLengthWordCount> {
 807  
         public int hash(DocumentLengthWordCount object) {
 808  0
             int h = 0;
 809  0
             h += Utility.hash(object.document);
 810  0
             h += Utility.hash(object.word);
 811  0
             return h;
 812  
         } 
 813  
         public Comparator<DocumentLengthWordCount> greaterThan() {
 814  0
             return new Comparator<DocumentLengthWordCount>() {
 815  0
                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
 816  0
                     int result = 0;
 817  
                     do {
 818  0
                         result = + Utility.compare(one.document, two.document);
 819  0
                         if(result != 0) break;
 820  0
                         result = + Utility.compare(one.word, two.word);
 821  0
                         if(result != 0) break;
 822  
                     } while (false);
 823  0
                     return -result;
 824  
                 }
 825  
             };
 826  
         }     
 827  
         public Comparator<DocumentLengthWordCount> lessThan() {
 828  0
             return new Comparator<DocumentLengthWordCount>() {
 829  0
                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
 830  0
                     int result = 0;
 831  
                     do {
 832  0
                         result = + Utility.compare(one.document, two.document);
 833  0
                         if(result != 0) break;
 834  0
                         result = + Utility.compare(one.word, two.word);
 835  0
                         if(result != 0) break;
 836  
                     } while (false);
 837  0
                     return result;
 838  
                 }
 839  
             };
 840  
         }     
 841  
         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input) {
 842  0
             return new ShreddedReader(_input);
 843  
         }    
 844  
 
 845  
         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input, int bufferSize) {
 846  0
             return new ShreddedReader(_input, bufferSize);
 847  
         }    
 848  
         public OrderedWriter<DocumentLengthWordCount> orderedWriter(ArrayOutput _output) {
 849  0
             ShreddedWriter w = new ShreddedWriter(_output);
 850  0
             return new OrderedWriterClass(w); 
 851  
         }                                    
 852  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentLengthWordCount > {
 853  0
             DocumentLengthWordCount last = null;
 854  0
             ShreddedWriter shreddedWriter = null; 
 855  
             
 856  0
             public OrderedWriterClass(ShreddedWriter s) {
 857  0
                 this.shreddedWriter = s;
 858  0
             }
 859  
             
 860  
             public void process(DocumentLengthWordCount object) throws IOException {
 861  0
                boolean processAll = false;
 862  0
                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
 863  0
                if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
 864  0
                shreddedWriter.processTuple(object.length, object.count);
 865  0
                last = object;
 866  0
             }           
 867  
                  
 868  
             public void close() throws IOException {
 869  0
                 shreddedWriter.close();
 870  0
             }
 871  
             
 872  
             public Class<DocumentLengthWordCount> getInputClass() {
 873  0
                 return DocumentLengthWordCount.class;
 874  
             }
 875  
         } 
 876  
         public ReaderSource<DocumentLengthWordCount> orderedCombiner(Collection<TypeReader<DocumentLengthWordCount>> readers, boolean closeOnExit) {
 877  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 878  
             
 879  0
             for (TypeReader<DocumentLengthWordCount> reader : readers) {
 880  0
                 shreddedReaders.add((ShreddedReader)reader);
 881  
             }
 882  
             
 883  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 884  
         }                  
 885  
         public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
 886  0
             DocumentLengthWordCount result = new DocumentLengthWordCount();
 887  0
             if (object == null) return result;
 888  0
             result.document = object.document; 
 889  0
             result.length = object.length; 
 890  0
             result.word = object.word; 
 891  0
             result.count = object.count; 
 892  0
             return result;
 893  
         }                 
 894  
         public Class<DocumentLengthWordCount> getOrderedClass() {
 895  0
             return DocumentLengthWordCount.class;
 896  
         }                           
 897  
         public String[] getOrderSpec() {
 898  0
             return new String[] {"+document", "+word"};
 899  
         }
 900  
 
 901  
         public static String getSpecString() {
 902  0
             return "+document +word";
 903  
         }
 904  
                            
 905  
         public interface ShreddedProcessor extends Step {
 906  
             public void processDocument(String document) throws IOException;
 907  
             public void processWord(String word) throws IOException;
 908  
             public void processTuple(int length, int count) throws IOException;
 909  
             public void close() throws IOException;
 910  
         }    
 911  
         public interface ShreddedSource extends Step {
 912  
         }                                              
 913  
         
 914  0
         public static class ShreddedWriter implements ShreddedProcessor {
 915  
             ArrayOutput output;
 916  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 917  
             String lastDocument;
 918  
             String lastWord;
 919  0
             boolean lastFlush = false;
 920  
             
 921  0
             public ShreddedWriter(ArrayOutput output) {
 922  0
                 this.output = output;
 923  0
             }                        
 924  
             
 925  
             public void close() throws IOException {
 926  0
                 flush();
 927  0
             }
 928  
             
 929  
             public void processDocument(String document) {
 930  0
                 lastDocument = document;
 931  0
                 buffer.processDocument(document);
 932  0
             }
 933  
             public void processWord(String word) {
 934  0
                 lastWord = word;
 935  0
                 buffer.processWord(word);
 936  0
             }
 937  
             public final void processTuple(int length, int count) throws IOException {
 938  0
                 if (lastFlush) {
 939  0
                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
 940  0
                     if(buffer.words.size() == 0) buffer.processWord(lastWord);
 941  0
                     lastFlush = false;
 942  
                 }
 943  0
                 buffer.processTuple(length, count);
 944  0
                 if (buffer.isFull())
 945  0
                     flush();
 946  0
             }
 947  
             public final void flushTuples(int pauseIndex) throws IOException {
 948  
                 
 949  0
                 while (buffer.getReadIndex() < pauseIndex) {
 950  
                            
 951  0
                     output.writeInt(buffer.getLength());
 952  0
                     output.writeInt(buffer.getCount());
 953  0
                     buffer.incrementTuple();
 954  
                 }
 955  0
             }  
 956  
             public final void flushDocument(int pauseIndex) throws IOException {
 957  0
                 while (buffer.getReadIndex() < pauseIndex) {
 958  0
                     int nextPause = buffer.getDocumentEndIndex();
 959  0
                     int count = nextPause - buffer.getReadIndex();
 960  
                     
 961  0
                     output.writeString(buffer.getDocument());
 962  0
                     output.writeInt(count);
 963  0
                     buffer.incrementDocument();
 964  
                       
 965  0
                     flushWord(nextPause);
 966  0
                     assert nextPause == buffer.getReadIndex();
 967  0
                 }
 968  0
             }
 969  
             public final void flushWord(int pauseIndex) throws IOException {
 970  0
                 while (buffer.getReadIndex() < pauseIndex) {
 971  0
                     int nextPause = buffer.getWordEndIndex();
 972  0
                     int count = nextPause - buffer.getReadIndex();
 973  
                     
 974  0
                     output.writeString(buffer.getWord());
 975  0
                     output.writeInt(count);
 976  0
                     buffer.incrementWord();
 977  
                       
 978  0
                     flushTuples(nextPause);
 979  0
                     assert nextPause == buffer.getReadIndex();
 980  0
                 }
 981  0
             }
 982  
             public void flush() throws IOException { 
 983  0
                 flushDocument(buffer.getWriteIndex());
 984  0
                 buffer.reset(); 
 985  0
                 lastFlush = true;
 986  0
             }                           
 987  
         }
 988  0
         public static class ShreddedBuffer {
 989  0
             ArrayList<String> documents = new ArrayList();
 990  0
             ArrayList<String> words = new ArrayList();
 991  0
             ArrayList<Integer> documentTupleIdx = new ArrayList();
 992  0
             ArrayList<Integer> wordTupleIdx = new ArrayList();
 993  0
             int documentReadIdx = 0;
 994  0
             int wordReadIdx = 0;
 995  
                             
 996  
             int[] lengths;
 997  
             int[] counts;
 998  0
             int writeTupleIndex = 0;
 999  0
             int readTupleIndex = 0;
 1000  
             int batchSize;
 1001  
 
 1002  0
             public ShreddedBuffer(int batchSize) {
 1003  0
                 this.batchSize = batchSize;
 1004  
 
 1005  0
                 lengths = new int[batchSize];
 1006  0
                 counts = new int[batchSize];
 1007  0
             }                              
 1008  
 
 1009  
             public ShreddedBuffer() {    
 1010  0
                 this(10000);
 1011  0
             }                                                                                                                    
 1012  
             
 1013  
             public void processDocument(String document) {
 1014  0
                 documents.add(document);
 1015  0
                 documentTupleIdx.add(writeTupleIndex);
 1016  0
             }                                      
 1017  
             public void processWord(String word) {
 1018  0
                 words.add(word);
 1019  0
                 wordTupleIdx.add(writeTupleIndex);
 1020  0
             }                                      
 1021  
             public void processTuple(int length, int count) {
 1022  0
                 assert documents.size() > 0;
 1023  0
                 assert words.size() > 0;
 1024  0
                 lengths[writeTupleIndex] = length;
 1025  0
                 counts[writeTupleIndex] = count;
 1026  0
                 writeTupleIndex++;
 1027  0
             }
 1028  
             public void resetData() {
 1029  0
                 documents.clear();
 1030  0
                 words.clear();
 1031  0
                 documentTupleIdx.clear();
 1032  0
                 wordTupleIdx.clear();
 1033  0
                 writeTupleIndex = 0;
 1034  0
             }                  
 1035  
                                  
 1036  
             public void resetRead() {
 1037  0
                 readTupleIndex = 0;
 1038  0
                 documentReadIdx = 0;
 1039  0
                 wordReadIdx = 0;
 1040  0
             } 
 1041  
 
 1042  
             public void reset() {
 1043  0
                 resetData();
 1044  0
                 resetRead();
 1045  0
             } 
 1046  
             public boolean isFull() {
 1047  0
                 return writeTupleIndex >= batchSize;
 1048  
             }
 1049  
 
 1050  
             public boolean isEmpty() {
 1051  0
                 return writeTupleIndex == 0;
 1052  
             }                          
 1053  
 
 1054  
             public boolean isAtEnd() {
 1055  0
                 return readTupleIndex >= writeTupleIndex;
 1056  
             }           
 1057  
             public void incrementDocument() {
 1058  0
                 documentReadIdx++;  
 1059  0
             }                                                                                              
 1060  
 
 1061  
             public void autoIncrementDocument() {
 1062  0
                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
 1063  0
                     documentReadIdx++;
 1064  0
             }                 
 1065  
             public void incrementWord() {
 1066  0
                 wordReadIdx++;  
 1067  0
             }                                                                                              
 1068  
 
 1069  
             public void autoIncrementWord() {
 1070  0
                 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
 1071  0
                     wordReadIdx++;
 1072  0
             }                 
 1073  
             public void incrementTuple() {
 1074  0
                 readTupleIndex++;
 1075  0
             }                    
 1076  
             public int getDocumentEndIndex() {
 1077  0
                 if ((documentReadIdx+1) >= documentTupleIdx.size())
 1078  0
                     return writeTupleIndex;
 1079  0
                 return documentTupleIdx.get(documentReadIdx+1);
 1080  
             }
 1081  
 
 1082  
             public int getWordEndIndex() {
 1083  0
                 if ((wordReadIdx+1) >= wordTupleIdx.size())
 1084  0
                     return writeTupleIndex;
 1085  0
                 return wordTupleIdx.get(wordReadIdx+1);
 1086  
             }
 1087  
             public int getReadIndex() {
 1088  0
                 return readTupleIndex;
 1089  
             }   
 1090  
 
 1091  
             public int getWriteIndex() {
 1092  0
                 return writeTupleIndex;
 1093  
             } 
 1094  
             public String getDocument() {
 1095  0
                 assert readTupleIndex < writeTupleIndex;
 1096  0
                 assert documentReadIdx < documents.size();
 1097  
                 
 1098  0
                 return documents.get(documentReadIdx);
 1099  
             }
 1100  
             public String getWord() {
 1101  0
                 assert readTupleIndex < writeTupleIndex;
 1102  0
                 assert wordReadIdx < words.size();
 1103  
                 
 1104  0
                 return words.get(wordReadIdx);
 1105  
             }
 1106  
             public int getLength() {
 1107  0
                 assert readTupleIndex < writeTupleIndex;
 1108  0
                 return lengths[readTupleIndex];
 1109  
             }                                         
 1110  
             public int getCount() {
 1111  0
                 assert readTupleIndex < writeTupleIndex;
 1112  0
                 return counts[readTupleIndex];
 1113  
             }                                         
 1114  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 1115  0
                 while (getReadIndex() < endIndex) {
 1116  0
                    output.processTuple(getLength(), getCount());
 1117  0
                    incrementTuple();
 1118  
                 }
 1119  0
             }                                                                           
 1120  
             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
 1121  0
                 while (getReadIndex() < endIndex) {
 1122  0
                     output.processDocument(getDocument());
 1123  0
                     assert getDocumentEndIndex() <= endIndex;
 1124  0
                     copyUntilIndexWord(getDocumentEndIndex(), output);
 1125  0
                     incrementDocument();
 1126  
                 }
 1127  0
             } 
 1128  
             public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
 1129  0
                 while (getReadIndex() < endIndex) {
 1130  0
                     output.processWord(getWord());
 1131  0
                     assert getWordEndIndex() <= endIndex;
 1132  0
                     copyTuples(getWordEndIndex(), output);
 1133  0
                     incrementWord();
 1134  
                 }
 1135  0
             }  
 1136  
             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1137  0
                 while (!isAtEnd()) {
 1138  0
                     if (other != null) {   
 1139  0
                         assert !other.isAtEnd();
 1140  0
                         int c = + Utility.compare(getDocument(), other.getDocument());
 1141  
                     
 1142  0
                         if (c > 0) {
 1143  0
                             break;   
 1144  
                         }
 1145  
                         
 1146  0
                         output.processDocument(getDocument());
 1147  
                                       
 1148  0
                         if (c < 0) {
 1149  0
                             copyUntilIndexWord(getDocumentEndIndex(), output);
 1150  0
                         } else if (c == 0) {
 1151  0
                             copyUntilWord(other, output);
 1152  0
                             autoIncrementDocument();
 1153  0
                             break;
 1154  
                         }
 1155  0
                     } else {
 1156  0
                         output.processDocument(getDocument());
 1157  0
                         copyUntilIndexWord(getDocumentEndIndex(), output);
 1158  
                     }
 1159  0
                     incrementDocument();  
 1160  
                     
 1161  
                
 1162  
                 }
 1163  0
             }
 1164  
             public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1165  0
                 while (!isAtEnd()) {
 1166  0
                     if (other != null) {   
 1167  0
                         assert !other.isAtEnd();
 1168  0
                         int c = + Utility.compare(getWord(), other.getWord());
 1169  
                     
 1170  0
                         if (c > 0) {
 1171  0
                             break;   
 1172  
                         }
 1173  
                         
 1174  0
                         output.processWord(getWord());
 1175  
                                       
 1176  0
                         copyTuples(getWordEndIndex(), output);
 1177  0
                     } else {
 1178  0
                         output.processWord(getWord());
 1179  0
                         copyTuples(getWordEndIndex(), output);
 1180  
                     }
 1181  0
                     incrementWord();  
 1182  
                     
 1183  0
                     if (getDocumentEndIndex() <= readTupleIndex)
 1184  0
                         break;   
 1185  
                 }
 1186  0
             }
 1187  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1188  0
                 copyUntilDocument(other, output);
 1189  0
             }
 1190  
             
 1191  
         }                         
 1192  0
         public static class ShreddedCombiner implements ReaderSource<DocumentLengthWordCount>, ShreddedSource {   
 1193  
             public ShreddedProcessor processor;
 1194  
             Collection<ShreddedReader> readers;       
 1195  0
             boolean closeOnExit = false;
 1196  0
             boolean uninitialized = true;
 1197  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 1198  
             
 1199  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 1200  0
                 this.readers = readers;                                                       
 1201  0
                 this.closeOnExit = closeOnExit;
 1202  0
             }
 1203  
                                   
 1204  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1205  0
                 if (processor instanceof ShreddedProcessor) {
 1206  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1207  0
                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
 1208  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
 1209  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1210  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
 1211  
                 } else {
 1212  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1213  
                 }
 1214  0
             }                                
 1215  
             
 1216  
             public Class<DocumentLengthWordCount> getOutputClass() {
 1217  0
                 return DocumentLengthWordCount.class;
 1218  
             }
 1219  
             
 1220  
             public void initialize() throws IOException {
 1221  0
                 for (ShreddedReader reader : readers) {
 1222  0
                     reader.fill();                                        
 1223  
                     
 1224  0
                     if (!reader.getBuffer().isAtEnd())
 1225  0
                         queue.add(reader);
 1226  
                 }   
 1227  
 
 1228  0
                 uninitialized = false;
 1229  0
             }
 1230  
 
 1231  
             public void run() throws IOException {
 1232  0
                 initialize();
 1233  
                
 1234  0
                 while (queue.size() > 0) {
 1235  0
                     ShreddedReader top = queue.poll();
 1236  0
                     ShreddedReader next = null;
 1237  0
                     ShreddedBuffer nextBuffer = null; 
 1238  
                     
 1239  0
                     assert !top.getBuffer().isAtEnd();
 1240  
                                                   
 1241  0
                     if (queue.size() > 0) {
 1242  0
                         next = queue.peek();
 1243  0
                         nextBuffer = next.getBuffer();
 1244  0
                         assert !nextBuffer.isAtEnd();
 1245  
                     }
 1246  
                     
 1247  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 1248  0
                     if (top.getBuffer().isAtEnd())
 1249  0
                         top.fill();                 
 1250  
                         
 1251  0
                     if (!top.getBuffer().isAtEnd())
 1252  0
                         queue.add(top);
 1253  0
                 }              
 1254  
                 
 1255  0
                 if (closeOnExit)
 1256  0
                     processor.close();
 1257  0
             }
 1258  
 
 1259  
             public DocumentLengthWordCount read() throws IOException {
 1260  0
                 if (uninitialized)
 1261  0
                     initialize();
 1262  
 
 1263  0
                 DocumentLengthWordCount result = null;
 1264  
 
 1265  0
                 while (queue.size() > 0) {
 1266  0
                     ShreddedReader top = queue.poll();
 1267  0
                     result = top.read();
 1268  
 
 1269  0
                     if (result != null) {
 1270  0
                         if (top.getBuffer().isAtEnd())
 1271  0
                             top.fill();
 1272  
 
 1273  0
                         queue.offer(top);
 1274  0
                         break;
 1275  
                     } 
 1276  0
                 }
 1277  
 
 1278  0
                 return result;
 1279  
             }
 1280  
         } 
 1281  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentLengthWordCount>, ShreddedSource {      
 1282  
             public ShreddedProcessor processor;
 1283  
             ShreddedBuffer buffer;
 1284  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();         
 1285  0
             long updateDocumentCount = -1;
 1286  0
             long updateWordCount = -1;
 1287  0
             long tupleCount = 0;
 1288  0
             long bufferStartCount = 0;  
 1289  
             ArrayInput input;
 1290  
             
 1291  0
             public ShreddedReader(ArrayInput input) {
 1292  0
                 this.input = input; 
 1293  0
                 this.buffer = new ShreddedBuffer();
 1294  0
             }                               
 1295  
             
 1296  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 1297  0
                 this.input = input;
 1298  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 1299  0
             }
 1300  
                  
 1301  
             public final int compareTo(ShreddedReader other) {
 1302  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 1303  
                 
 1304  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 1305  0
                     return 0;                 
 1306  0
                 } else if (buffer.isAtEnd()) {
 1307  0
                     return -1;
 1308  0
                 } else if (otherBuffer.isAtEnd()) {
 1309  0
                     return 1;
 1310  
                 }
 1311  
                                    
 1312  0
                 int result = 0;
 1313  
                 do {
 1314  0
                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
 1315  0
                     if(result != 0) break;
 1316  0
                     result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
 1317  0
                     if(result != 0) break;
 1318  
                 } while (false);                                             
 1319  
                 
 1320  0
                 return result;
 1321  
             }
 1322  
             
 1323  
             public final ShreddedBuffer getBuffer() {
 1324  0
                 return buffer;
 1325  
             }                
 1326  
             
 1327  
             public final DocumentLengthWordCount read() throws IOException {
 1328  0
                 if (buffer.isAtEnd()) {
 1329  0
                     fill();             
 1330  
                 
 1331  0
                     if (buffer.isAtEnd()) {
 1332  0
                         return null;
 1333  
                     }
 1334  
                 }
 1335  
                       
 1336  0
                 assert !buffer.isAtEnd();
 1337  0
                 DocumentLengthWordCount result = new DocumentLengthWordCount();
 1338  
                 
 1339  0
                 result.document = buffer.getDocument();
 1340  0
                 result.word = buffer.getWord();
 1341  0
                 result.length = buffer.getLength();
 1342  0
                 result.count = buffer.getCount();
 1343  
                 
 1344  0
                 buffer.incrementTuple();
 1345  0
                 buffer.autoIncrementDocument();
 1346  0
                 buffer.autoIncrementWord();
 1347  
                 
 1348  0
                 return result;
 1349  
             }           
 1350  
             
 1351  
             public final void fill() throws IOException {
 1352  
                 try {   
 1353  0
                     buffer.reset();
 1354  
                     
 1355  0
                     if (tupleCount != 0) {
 1356  
                                                       
 1357  0
                         if(updateDocumentCount - tupleCount > 0) {
 1358  0
                             buffer.documents.add(last.document);
 1359  0
                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
 1360  
                         }                              
 1361  0
                         if(updateWordCount - tupleCount > 0) {
 1362  0
                             buffer.words.add(last.word);
 1363  0
                             buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
 1364  
                         }
 1365  0
                         bufferStartCount = tupleCount;
 1366  
                     }
 1367  
                     
 1368  0
                     while (!buffer.isFull()) {
 1369  0
                         updateWord();
 1370  0
                         buffer.processTuple(input.readInt(), input.readInt());
 1371  0
                         tupleCount++;
 1372  
                     }
 1373  0
                 } catch(EOFException e) {}
 1374  0
             }
 1375  
 
 1376  
             public final void updateDocument() throws IOException {
 1377  0
                 if (updateDocumentCount > tupleCount)
 1378  0
                     return;
 1379  
                      
 1380  0
                 last.document = input.readString();
 1381  0
                 updateDocumentCount = tupleCount + input.readInt();
 1382  
                                       
 1383  0
                 buffer.processDocument(last.document);
 1384  0
             }
 1385  
             public final void updateWord() throws IOException {
 1386  0
                 if (updateWordCount > tupleCount)
 1387  0
                     return;
 1388  
                      
 1389  0
                 updateDocument();
 1390  0
                 last.word = input.readString();
 1391  0
                 updateWordCount = tupleCount + input.readInt();
 1392  
                                       
 1393  0
                 buffer.processWord(last.word);
 1394  0
             }
 1395  
 
 1396  
             public void run() throws IOException {
 1397  
                 while (true) {
 1398  0
                     fill();
 1399  
                     
 1400  0
                     if (buffer.isAtEnd())
 1401  0
                         break;
 1402  
                     
 1403  0
                     buffer.copyUntil(null, processor);
 1404  
                 }      
 1405  0
                 processor.close();
 1406  0
             }
 1407  
             
 1408  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1409  0
                 if (processor instanceof ShreddedProcessor) {
 1410  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1411  0
                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
 1412  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
 1413  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1414  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
 1415  
                 } else {
 1416  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1417  
                 }
 1418  0
             }                                
 1419  
             
 1420  
             public Class<DocumentLengthWordCount> getOutputClass() {
 1421  0
                 return DocumentLengthWordCount.class;
 1422  
             }                
 1423  
         }
 1424  
         
 1425  
         public static class DuplicateEliminator implements ShreddedProcessor {
 1426  
             public ShreddedProcessor processor;
 1427  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();
 1428  0
             boolean documentProcess = true;
 1429  0
             boolean wordProcess = true;
 1430  
                                            
 1431  0
             public DuplicateEliminator() {}
 1432  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 1433  0
                 this.processor = processor;
 1434  0
             }
 1435  
             
 1436  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 1437  0
                 this.processor = processor;
 1438  0
             }
 1439  
 
 1440  
             public void processDocument(String document) throws IOException {  
 1441  0
                 if (documentProcess || Utility.compare(document, last.document) != 0) {
 1442  0
                     last.document = document;
 1443  0
                     processor.processDocument(document);
 1444  0
             resetWord();
 1445  0
                     documentProcess = false;
 1446  
                 }
 1447  0
             }
 1448  
             public void processWord(String word) throws IOException {  
 1449  0
                 if (wordProcess || Utility.compare(word, last.word) != 0) {
 1450  0
                     last.word = word;
 1451  0
                     processor.processWord(word);
 1452  0
                     wordProcess = false;
 1453  
                 }
 1454  0
             }  
 1455  
             
 1456  
             public void resetDocument() {
 1457  0
                  documentProcess = true;
 1458  0
             resetWord();
 1459  0
             }                                                
 1460  
             public void resetWord() {
 1461  0
                  wordProcess = true;
 1462  0
             }                                                
 1463  
                                
 1464  
             public void processTuple(int length, int count) throws IOException {
 1465  0
                 processor.processTuple(length, count);
 1466  0
             } 
 1467  
             
 1468  
             public void close() throws IOException {
 1469  0
                 processor.close();
 1470  0
             }                    
 1471  
         }
 1472  
         public static class TupleUnshredder implements ShreddedProcessor {
 1473  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();
 1474  
             public org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor;                               
 1475  
             
 1476  0
             public TupleUnshredder(DocumentLengthWordCount.Processor processor) {
 1477  0
                 this.processor = processor;
 1478  0
             }         
 1479  
             
 1480  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor) {
 1481  0
                 this.processor = processor;
 1482  0
             }
 1483  
             
 1484  
             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
 1485  0
                 DocumentLengthWordCount result = new DocumentLengthWordCount();
 1486  0
                 if (object == null) return result;
 1487  0
                 result.document = object.document; 
 1488  0
                 result.length = object.length; 
 1489  0
                 result.word = object.word; 
 1490  0
                 result.count = object.count; 
 1491  0
                 return result;
 1492  
             }                 
 1493  
             
 1494  
             public void processDocument(String document) throws IOException {
 1495  0
                 last.document = document;
 1496  0
             }   
 1497  
                 
 1498  
             public void processWord(String word) throws IOException {
 1499  0
                 last.word = word;
 1500  0
             }   
 1501  
                 
 1502  
             
 1503  
             public void processTuple(int length, int count) throws IOException {
 1504  0
                 last.length = length;
 1505  0
                 last.count = count;
 1506  0
                 processor.process(clone(last));
 1507  0
             }               
 1508  
             
 1509  
             public void close() throws IOException {
 1510  0
                 processor.close();
 1511  0
             }
 1512  
         }     
 1513  0
         public static class TupleShredder implements Processor {
 1514  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();
 1515  
             public ShreddedProcessor processor;
 1516  
             
 1517  0
             public TupleShredder(ShreddedProcessor processor) {
 1518  0
                 this.processor = processor;
 1519  0
             }                              
 1520  
             
 1521  
             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
 1522  0
                 DocumentLengthWordCount result = new DocumentLengthWordCount();
 1523  0
                 if (object == null) return result;
 1524  0
                 result.document = object.document; 
 1525  0
                 result.length = object.length; 
 1526  0
                 result.word = object.word; 
 1527  0
                 result.count = object.count; 
 1528  0
                 return result;
 1529  
             }                 
 1530  
             
 1531  
             public void process(DocumentLengthWordCount object) throws IOException {                                                                                                                                                   
 1532  0
                 boolean processAll = false;
 1533  0
                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
 1534  0
                 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
 1535  0
                 processor.processTuple(object.length, object.count);                                         
 1536  0
             }
 1537  
                           
 1538  
             public Class<DocumentLengthWordCount> getInputClass() {
 1539  0
                 return DocumentLengthWordCount.class;
 1540  
             }
 1541  
             
 1542  
             public void close() throws IOException {
 1543  0
                 processor.close();
 1544  0
             }                     
 1545  
         }
 1546  
     } 
 1547  0
     public static class WordDocumentOrder implements Order<DocumentLengthWordCount> {
 1548  
         public int hash(DocumentLengthWordCount object) {
 1549  0
             int h = 0;
 1550  0
             h += Utility.hash(object.word);
 1551  0
             h += Utility.hash(object.document);
 1552  0
             return h;
 1553  
         } 
 1554  
         public Comparator<DocumentLengthWordCount> greaterThan() {
 1555  0
             return new Comparator<DocumentLengthWordCount>() {
 1556  0
                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
 1557  0
                     int result = 0;
 1558  
                     do {
 1559  0
                         result = + Utility.compare(one.word, two.word);
 1560  0
                         if(result != 0) break;
 1561  0
                         result = + Utility.compare(one.document, two.document);
 1562  0
                         if(result != 0) break;
 1563  
                     } while (false);
 1564  0
                     return -result;
 1565  
                 }
 1566  
             };
 1567  
         }     
 1568  
         public Comparator<DocumentLengthWordCount> lessThan() {
 1569  0
             return new Comparator<DocumentLengthWordCount>() {
 1570  0
                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
 1571  0
                     int result = 0;
 1572  
                     do {
 1573  0
                         result = + Utility.compare(one.word, two.word);
 1574  0
                         if(result != 0) break;
 1575  0
                         result = + Utility.compare(one.document, two.document);
 1576  0
                         if(result != 0) break;
 1577  
                     } while (false);
 1578  0
                     return result;
 1579  
                 }
 1580  
             };
 1581  
         }     
 1582  
         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input) {
 1583  0
             return new ShreddedReader(_input);
 1584  
         }    
 1585  
 
 1586  
         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input, int bufferSize) {
 1587  0
             return new ShreddedReader(_input, bufferSize);
 1588  
         }    
 1589  
         public OrderedWriter<DocumentLengthWordCount> orderedWriter(ArrayOutput _output) {
 1590  0
             ShreddedWriter w = new ShreddedWriter(_output);
 1591  0
             return new OrderedWriterClass(w); 
 1592  
         }                                    
 1593  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentLengthWordCount > {
 1594  0
             DocumentLengthWordCount last = null;
 1595  0
             ShreddedWriter shreddedWriter = null; 
 1596  
             
 1597  0
             public OrderedWriterClass(ShreddedWriter s) {
 1598  0
                 this.shreddedWriter = s;
 1599  0
             }
 1600  
             
 1601  
             public void process(DocumentLengthWordCount object) throws IOException {
 1602  0
                boolean processAll = false;
 1603  0
                if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
 1604  0
                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
 1605  0
                shreddedWriter.processTuple(object.length, object.count);
 1606  0
                last = object;
 1607  0
             }           
 1608  
                  
 1609  
             public void close() throws IOException {
 1610  0
                 shreddedWriter.close();
 1611  0
             }
 1612  
             
 1613  
             public Class<DocumentLengthWordCount> getInputClass() {
 1614  0
                 return DocumentLengthWordCount.class;
 1615  
             }
 1616  
         } 
 1617  
         public ReaderSource<DocumentLengthWordCount> orderedCombiner(Collection<TypeReader<DocumentLengthWordCount>> readers, boolean closeOnExit) {
 1618  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 1619  
             
 1620  0
             for (TypeReader<DocumentLengthWordCount> reader : readers) {
 1621  0
                 shreddedReaders.add((ShreddedReader)reader);
 1622  
             }
 1623  
             
 1624  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 1625  
         }                  
 1626  
         public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
 1627  0
             DocumentLengthWordCount result = new DocumentLengthWordCount();
 1628  0
             if (object == null) return result;
 1629  0
             result.document = object.document; 
 1630  0
             result.length = object.length; 
 1631  0
             result.word = object.word; 
 1632  0
             result.count = object.count; 
 1633  0
             return result;
 1634  
         }                 
 1635  
         public Class<DocumentLengthWordCount> getOrderedClass() {
 1636  0
             return DocumentLengthWordCount.class;
 1637  
         }                           
 1638  
         public String[] getOrderSpec() {
 1639  0
             return new String[] {"+word", "+document"};
 1640  
         }
 1641  
 
 1642  
         public static String getSpecString() {
 1643  0
             return "+word +document";
 1644  
         }
 1645  
                            
 1646  
         public interface ShreddedProcessor extends Step {
 1647  
             public void processWord(String word) throws IOException;
 1648  
             public void processDocument(String document) throws IOException;
 1649  
             public void processTuple(int length, int count) throws IOException;
 1650  
             public void close() throws IOException;
 1651  
         }    
 1652  
         public interface ShreddedSource extends Step {
 1653  
         }                                              
 1654  
         
 1655  0
         public static class ShreddedWriter implements ShreddedProcessor {
 1656  
             ArrayOutput output;
 1657  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 1658  
             String lastWord;
 1659  
             String lastDocument;
 1660  0
             boolean lastFlush = false;
 1661  
             
 1662  0
             public ShreddedWriter(ArrayOutput output) {
 1663  0
                 this.output = output;
 1664  0
             }                        
 1665  
             
 1666  
             public void close() throws IOException {
 1667  0
                 flush();
 1668  0
             }
 1669  
             
 1670  
             public void processWord(String word) {
 1671  0
                 lastWord = word;
 1672  0
                 buffer.processWord(word);
 1673  0
             }
 1674  
             public void processDocument(String document) {
 1675  0
                 lastDocument = document;
 1676  0
                 buffer.processDocument(document);
 1677  0
             }
 1678  
             public final void processTuple(int length, int count) throws IOException {
 1679  0
                 if (lastFlush) {
 1680  0
                     if(buffer.words.size() == 0) buffer.processWord(lastWord);
 1681  0
                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
 1682  0
                     lastFlush = false;
 1683  
                 }
 1684  0
                 buffer.processTuple(length, count);
 1685  0
                 if (buffer.isFull())
 1686  0
                     flush();
 1687  0
             }
 1688  
             public final void flushTuples(int pauseIndex) throws IOException {
 1689  
                 
 1690  0
                 while (buffer.getReadIndex() < pauseIndex) {
 1691  
                            
 1692  0
                     output.writeInt(buffer.getLength());
 1693  0
                     output.writeInt(buffer.getCount());
 1694  0
                     buffer.incrementTuple();
 1695  
                 }
 1696  0
             }  
 1697  
             public final void flushWord(int pauseIndex) throws IOException {
 1698  0
                 while (buffer.getReadIndex() < pauseIndex) {
 1699  0
                     int nextPause = buffer.getWordEndIndex();
 1700  0
                     int count = nextPause - buffer.getReadIndex();
 1701  
                     
 1702  0
                     output.writeString(buffer.getWord());
 1703  0
                     output.writeInt(count);
 1704  0
                     buffer.incrementWord();
 1705  
                       
 1706  0
                     flushDocument(nextPause);
 1707  0
                     assert nextPause == buffer.getReadIndex();
 1708  0
                 }
 1709  0
             }
 1710  
             public final void flushDocument(int pauseIndex) throws IOException {
 1711  0
                 while (buffer.getReadIndex() < pauseIndex) {
 1712  0
                     int nextPause = buffer.getDocumentEndIndex();
 1713  0
                     int count = nextPause - buffer.getReadIndex();
 1714  
                     
 1715  0
                     output.writeString(buffer.getDocument());
 1716  0
                     output.writeInt(count);
 1717  0
                     buffer.incrementDocument();
 1718  
                       
 1719  0
                     flushTuples(nextPause);
 1720  0
                     assert nextPause == buffer.getReadIndex();
 1721  0
                 }
 1722  0
             }
 1723  
             public void flush() throws IOException { 
 1724  0
                 flushWord(buffer.getWriteIndex());
 1725  0
                 buffer.reset(); 
 1726  0
                 lastFlush = true;
 1727  0
             }                           
 1728  
         }
 1729  0
         public static class ShreddedBuffer {
 1730  0
             ArrayList<String> words = new ArrayList();
 1731  0
             ArrayList<String> documents = new ArrayList();
 1732  0
             ArrayList<Integer> wordTupleIdx = new ArrayList();
 1733  0
             ArrayList<Integer> documentTupleIdx = new ArrayList();
 1734  0
             int wordReadIdx = 0;
 1735  0
             int documentReadIdx = 0;
 1736  
                             
 1737  
             int[] lengths;
 1738  
             int[] counts;
 1739  0
             int writeTupleIndex = 0;
 1740  0
             int readTupleIndex = 0;
 1741  
             int batchSize;
 1742  
 
 1743  0
             public ShreddedBuffer(int batchSize) {
 1744  0
                 this.batchSize = batchSize;
 1745  
 
 1746  0
                 lengths = new int[batchSize];
 1747  0
                 counts = new int[batchSize];
 1748  0
             }                              
 1749  
 
 1750  
             public ShreddedBuffer() {    
 1751  0
                 this(10000);
 1752  0
             }                                                                                                                    
 1753  
             
 1754  
             public void processWord(String word) {
 1755  0
                 words.add(word);
 1756  0
                 wordTupleIdx.add(writeTupleIndex);
 1757  0
             }                                      
 1758  
             public void processDocument(String document) {
 1759  0
                 documents.add(document);
 1760  0
                 documentTupleIdx.add(writeTupleIndex);
 1761  0
             }                                      
 1762  
             public void processTuple(int length, int count) {
 1763  0
                 assert words.size() > 0;
 1764  0
                 assert documents.size() > 0;
 1765  0
                 lengths[writeTupleIndex] = length;
 1766  0
                 counts[writeTupleIndex] = count;
 1767  0
                 writeTupleIndex++;
 1768  0
             }
 1769  
             public void resetData() {
 1770  0
                 words.clear();
 1771  0
                 documents.clear();
 1772  0
                 wordTupleIdx.clear();
 1773  0
                 documentTupleIdx.clear();
 1774  0
                 writeTupleIndex = 0;
 1775  0
             }                  
 1776  
                                  
 1777  
             public void resetRead() {
 1778  0
                 readTupleIndex = 0;
 1779  0
                 wordReadIdx = 0;
 1780  0
                 documentReadIdx = 0;
 1781  0
             } 
 1782  
 
 1783  
             public void reset() {
 1784  0
                 resetData();
 1785  0
                 resetRead();
 1786  0
             } 
 1787  
             public boolean isFull() {
 1788  0
                 return writeTupleIndex >= batchSize;
 1789  
             }
 1790  
 
 1791  
             public boolean isEmpty() {
 1792  0
                 return writeTupleIndex == 0;
 1793  
             }                          
 1794  
 
 1795  
             public boolean isAtEnd() {
 1796  0
                 return readTupleIndex >= writeTupleIndex;
 1797  
             }           
 1798  
             public void incrementWord() {
 1799  0
                 wordReadIdx++;  
 1800  0
             }                                                                                              
 1801  
 
 1802  
             public void autoIncrementWord() {
 1803  0
                 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
 1804  0
                     wordReadIdx++;
 1805  0
             }                 
 1806  
             public void incrementDocument() {
 1807  0
                 documentReadIdx++;  
 1808  0
             }                                                                                              
 1809  
 
 1810  
             public void autoIncrementDocument() {
 1811  0
                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
 1812  0
                     documentReadIdx++;
 1813  0
             }                 
 1814  
             public void incrementTuple() {
 1815  0
                 readTupleIndex++;
 1816  0
             }                    
 1817  
             public int getWordEndIndex() {
 1818  0
                 if ((wordReadIdx+1) >= wordTupleIdx.size())
 1819  0
                     return writeTupleIndex;
 1820  0
                 return wordTupleIdx.get(wordReadIdx+1);
 1821  
             }
 1822  
 
 1823  
             public int getDocumentEndIndex() {
 1824  0
                 if ((documentReadIdx+1) >= documentTupleIdx.size())
 1825  0
                     return writeTupleIndex;
 1826  0
                 return documentTupleIdx.get(documentReadIdx+1);
 1827  
             }
 1828  
             public int getReadIndex() {
 1829  0
                 return readTupleIndex;
 1830  
             }   
 1831  
 
 1832  
             public int getWriteIndex() {
 1833  0
                 return writeTupleIndex;
 1834  
             } 
 1835  
             public String getWord() {
 1836  0
                 assert readTupleIndex < writeTupleIndex;
 1837  0
                 assert wordReadIdx < words.size();
 1838  
                 
 1839  0
                 return words.get(wordReadIdx);
 1840  
             }
 1841  
             public String getDocument() {
 1842  0
                 assert readTupleIndex < writeTupleIndex;
 1843  0
                 assert documentReadIdx < documents.size();
 1844  
                 
 1845  0
                 return documents.get(documentReadIdx);
 1846  
             }
 1847  
             public int getLength() {
 1848  0
                 assert readTupleIndex < writeTupleIndex;
 1849  0
                 return lengths[readTupleIndex];
 1850  
             }                                         
 1851  
             public int getCount() {
 1852  0
                 assert readTupleIndex < writeTupleIndex;
 1853  0
                 return counts[readTupleIndex];
 1854  
             }                                         
 1855  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 1856  0
                 while (getReadIndex() < endIndex) {
 1857  0
                    output.processTuple(getLength(), getCount());
 1858  0
                    incrementTuple();
 1859  
                 }
 1860  0
             }                                                                           
 1861  
             public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
 1862  0
                 while (getReadIndex() < endIndex) {
 1863  0
                     output.processWord(getWord());
 1864  0
                     assert getWordEndIndex() <= endIndex;
 1865  0
                     copyUntilIndexDocument(getWordEndIndex(), output);
 1866  0
                     incrementWord();
 1867  
                 }
 1868  0
             } 
 1869  
             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
 1870  0
                 while (getReadIndex() < endIndex) {
 1871  0
                     output.processDocument(getDocument());
 1872  0
                     assert getDocumentEndIndex() <= endIndex;
 1873  0
                     copyTuples(getDocumentEndIndex(), output);
 1874  0
                     incrementDocument();
 1875  
                 }
 1876  0
             }  
 1877  
             public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1878  0
                 while (!isAtEnd()) {
 1879  0
                     if (other != null) {   
 1880  0
                         assert !other.isAtEnd();
 1881  0
                         int c = + Utility.compare(getWord(), other.getWord());
 1882  
                     
 1883  0
                         if (c > 0) {
 1884  0
                             break;   
 1885  
                         }
 1886  
                         
 1887  0
                         output.processWord(getWord());
 1888  
                                       
 1889  0
                         if (c < 0) {
 1890  0
                             copyUntilIndexDocument(getWordEndIndex(), output);
 1891  0
                         } else if (c == 0) {
 1892  0
                             copyUntilDocument(other, output);
 1893  0
                             autoIncrementWord();
 1894  0
                             break;
 1895  
                         }
 1896  0
                     } else {
 1897  0
                         output.processWord(getWord());
 1898  0
                         copyUntilIndexDocument(getWordEndIndex(), output);
 1899  
                     }
 1900  0
                     incrementWord();  
 1901  
                     
 1902  
                
 1903  
                 }
 1904  0
             }
 1905  
             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1906  0
                 while (!isAtEnd()) {
 1907  0
                     if (other != null) {   
 1908  0
                         assert !other.isAtEnd();
 1909  0
                         int c = + Utility.compare(getDocument(), other.getDocument());
 1910  
                     
 1911  0
                         if (c > 0) {
 1912  0
                             break;   
 1913  
                         }
 1914  
                         
 1915  0
                         output.processDocument(getDocument());
 1916  
                                       
 1917  0
                         copyTuples(getDocumentEndIndex(), output);
 1918  0
                     } else {
 1919  0
                         output.processDocument(getDocument());
 1920  0
                         copyTuples(getDocumentEndIndex(), output);
 1921  
                     }
 1922  0
                     incrementDocument();  
 1923  
                     
 1924  0
                     if (getWordEndIndex() <= readTupleIndex)
 1925  0
                         break;   
 1926  
                 }
 1927  0
             }
 1928  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1929  0
                 copyUntilWord(other, output);
 1930  0
             }
 1931  
             
 1932  
         }                         
 1933  0
         public static class ShreddedCombiner implements ReaderSource<DocumentLengthWordCount>, ShreddedSource {   
 1934  
             public ShreddedProcessor processor;
 1935  
             Collection<ShreddedReader> readers;       
 1936  0
             boolean closeOnExit = false;
 1937  0
             boolean uninitialized = true;
 1938  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 1939  
             
 1940  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 1941  0
                 this.readers = readers;                                                       
 1942  0
                 this.closeOnExit = closeOnExit;
 1943  0
             }
 1944  
                                   
 1945  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1946  0
                 if (processor instanceof ShreddedProcessor) {
 1947  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1948  0
                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
 1949  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
 1950  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1951  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
 1952  
                 } else {
 1953  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1954  
                 }
 1955  0
             }                                
 1956  
             
 1957  
             public Class<DocumentLengthWordCount> getOutputClass() {
 1958  0
                 return DocumentLengthWordCount.class;
 1959  
             }
 1960  
             
 1961  
             public void initialize() throws IOException {
 1962  0
                 for (ShreddedReader reader : readers) {
 1963  0
                     reader.fill();                                        
 1964  
                     
 1965  0
                     if (!reader.getBuffer().isAtEnd())
 1966  0
                         queue.add(reader);
 1967  
                 }   
 1968  
 
 1969  0
                 uninitialized = false;
 1970  0
             }
 1971  
 
 1972  
             public void run() throws IOException {
 1973  0
                 initialize();
 1974  
                
 1975  0
                 while (queue.size() > 0) {
 1976  0
                     ShreddedReader top = queue.poll();
 1977  0
                     ShreddedReader next = null;
 1978  0
                     ShreddedBuffer nextBuffer = null; 
 1979  
                     
 1980  0
                     assert !top.getBuffer().isAtEnd();
 1981  
                                                   
 1982  0
                     if (queue.size() > 0) {
 1983  0
                         next = queue.peek();
 1984  0
                         nextBuffer = next.getBuffer();
 1985  0
                         assert !nextBuffer.isAtEnd();
 1986  
                     }
 1987  
                     
 1988  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 1989  0
                     if (top.getBuffer().isAtEnd())
 1990  0
                         top.fill();                 
 1991  
                         
 1992  0
                     if (!top.getBuffer().isAtEnd())
 1993  0
                         queue.add(top);
 1994  0
                 }              
 1995  
                 
 1996  0
                 if (closeOnExit)
 1997  0
                     processor.close();
 1998  0
             }
 1999  
 
 2000  
             public DocumentLengthWordCount read() throws IOException {
 2001  0
                 if (uninitialized)
 2002  0
                     initialize();
 2003  
 
 2004  0
                 DocumentLengthWordCount result = null;
 2005  
 
 2006  0
                 while (queue.size() > 0) {
 2007  0
                     ShreddedReader top = queue.poll();
 2008  0
                     result = top.read();
 2009  
 
 2010  0
                     if (result != null) {
 2011  0
                         if (top.getBuffer().isAtEnd())
 2012  0
                             top.fill();
 2013  
 
 2014  0
                         queue.offer(top);
 2015  0
                         break;
 2016  
                     } 
 2017  0
                 }
 2018  
 
 2019  0
                 return result;
 2020  
             }
 2021  
         } 
 2022  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentLengthWordCount>, ShreddedSource {      
 2023  
             public ShreddedProcessor processor;
 2024  
             ShreddedBuffer buffer;
 2025  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();         
 2026  0
             long updateWordCount = -1;
 2027  0
             long updateDocumentCount = -1;
 2028  0
             long tupleCount = 0;
 2029  0
             long bufferStartCount = 0;  
 2030  
             ArrayInput input;
 2031  
             
 2032  0
             public ShreddedReader(ArrayInput input) {
 2033  0
                 this.input = input; 
 2034  0
                 this.buffer = new ShreddedBuffer();
 2035  0
             }                               
 2036  
             
 2037  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 2038  0
                 this.input = input;
 2039  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 2040  0
             }
 2041  
                  
 2042  
             public final int compareTo(ShreddedReader other) {
 2043  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 2044  
                 
 2045  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 2046  0
                     return 0;                 
 2047  0
                 } else if (buffer.isAtEnd()) {
 2048  0
                     return -1;
 2049  0
                 } else if (otherBuffer.isAtEnd()) {
 2050  0
                     return 1;
 2051  
                 }
 2052  
                                    
 2053  0
                 int result = 0;
 2054  
                 do {
 2055  0
                     result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
 2056  0
                     if(result != 0) break;
 2057  0
                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
 2058  0
                     if(result != 0) break;
 2059  
                 } while (false);                                             
 2060  
                 
 2061  0
                 return result;
 2062  
             }
 2063  
             
 2064  
             public final ShreddedBuffer getBuffer() {
 2065  0
                 return buffer;
 2066  
             }                
 2067  
             
 2068  
             public final DocumentLengthWordCount read() throws IOException {
 2069  0
                 if (buffer.isAtEnd()) {
 2070  0
                     fill();             
 2071  
                 
 2072  0
                     if (buffer.isAtEnd()) {
 2073  0
                         return null;
 2074  
                     }
 2075  
                 }
 2076  
                       
 2077  0
                 assert !buffer.isAtEnd();
 2078  0
                 DocumentLengthWordCount result = new DocumentLengthWordCount();
 2079  
                 
 2080  0
                 result.word = buffer.getWord();
 2081  0
                 result.document = buffer.getDocument();
 2082  0
                 result.length = buffer.getLength();
 2083  0
                 result.count = buffer.getCount();
 2084  
                 
 2085  0
                 buffer.incrementTuple();
 2086  0
                 buffer.autoIncrementWord();
 2087  0
                 buffer.autoIncrementDocument();
 2088  
                 
 2089  0
                 return result;
 2090  
             }           
 2091  
             
 2092  
             public final void fill() throws IOException {
 2093  
                 try {   
 2094  0
                     buffer.reset();
 2095  
                     
 2096  0
                     if (tupleCount != 0) {
 2097  
                                                       
 2098  0
                         if(updateWordCount - tupleCount > 0) {
 2099  0
                             buffer.words.add(last.word);
 2100  0
                             buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
 2101  
                         }                              
 2102  0
                         if(updateDocumentCount - tupleCount > 0) {
 2103  0
                             buffer.documents.add(last.document);
 2104  0
                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
 2105  
                         }
 2106  0
                         bufferStartCount = tupleCount;
 2107  
                     }
 2108  
                     
 2109  0
                     while (!buffer.isFull()) {
 2110  0
                         updateDocument();
 2111  0
                         buffer.processTuple(input.readInt(), input.readInt());
 2112  0
                         tupleCount++;
 2113  
                     }
 2114  0
                 } catch(EOFException e) {}
 2115  0
             }
 2116  
 
 2117  
             public final void updateWord() throws IOException {
 2118  0
                 if (updateWordCount > tupleCount)
 2119  0
                     return;
 2120  
                      
 2121  0
                 last.word = input.readString();
 2122  0
                 updateWordCount = tupleCount + input.readInt();
 2123  
                                       
 2124  0
                 buffer.processWord(last.word);
 2125  0
             }
 2126  
             public final void updateDocument() throws IOException {
 2127  0
                 if (updateDocumentCount > tupleCount)
 2128  0
                     return;
 2129  
                      
 2130  0
                 updateWord();
 2131  0
                 last.document = input.readString();
 2132  0
                 updateDocumentCount = tupleCount + input.readInt();
 2133  
                                       
 2134  0
                 buffer.processDocument(last.document);
 2135  0
             }
 2136  
 
 2137  
             public void run() throws IOException {
 2138  
                 while (true) {
 2139  0
                     fill();
 2140  
                     
 2141  0
                     if (buffer.isAtEnd())
 2142  0
                         break;
 2143  
                     
 2144  0
                     buffer.copyUntil(null, processor);
 2145  
                 }      
 2146  0
                 processor.close();
 2147  0
             }
 2148  
             
 2149  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 2150  0
                 if (processor instanceof ShreddedProcessor) {
 2151  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 2152  0
                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
 2153  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
 2154  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 2155  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
 2156  
                 } else {
 2157  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 2158  
                 }
 2159  0
             }                                
 2160  
             
 2161  
             public Class<DocumentLengthWordCount> getOutputClass() {
 2162  0
                 return DocumentLengthWordCount.class;
 2163  
             }                
 2164  
         }
 2165  
         
 2166  
         public static class DuplicateEliminator implements ShreddedProcessor {
 2167  
             public ShreddedProcessor processor;
 2168  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();
 2169  0
             boolean wordProcess = true;
 2170  0
             boolean documentProcess = true;
 2171  
                                            
 2172  0
             public DuplicateEliminator() {}
 2173  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 2174  0
                 this.processor = processor;
 2175  0
             }
 2176  
             
 2177  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 2178  0
                 this.processor = processor;
 2179  0
             }
 2180  
 
 2181  
             public void processWord(String word) throws IOException {  
 2182  0
                 if (wordProcess || Utility.compare(word, last.word) != 0) {
 2183  0
                     last.word = word;
 2184  0
                     processor.processWord(word);
 2185  0
             resetDocument();
 2186  0
                     wordProcess = false;
 2187  
                 }
 2188  0
             }
 2189  
             public void processDocument(String document) throws IOException {  
 2190  0
                 if (documentProcess || Utility.compare(document, last.document) != 0) {
 2191  0
                     last.document = document;
 2192  0
                     processor.processDocument(document);
 2193  0
                     documentProcess = false;
 2194  
                 }
 2195  0
             }  
 2196  
             
 2197  
             public void resetWord() {
 2198  0
                  wordProcess = true;
 2199  0
             resetDocument();
 2200  0
             }                                                
 2201  
             public void resetDocument() {
 2202  0
                  documentProcess = true;
 2203  0
             }                                                
 2204  
                                
 2205  
             public void processTuple(int length, int count) throws IOException {
 2206  0
                 processor.processTuple(length, count);
 2207  0
             } 
 2208  
             
 2209  
             public void close() throws IOException {
 2210  0
                 processor.close();
 2211  0
             }                    
 2212  
         }
 2213  
         public static class TupleUnshredder implements ShreddedProcessor {
 2214  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();
 2215  
             public org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor;                               
 2216  
             
 2217  0
             public TupleUnshredder(DocumentLengthWordCount.Processor processor) {
 2218  0
                 this.processor = processor;
 2219  0
             }         
 2220  
             
 2221  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor) {
 2222  0
                 this.processor = processor;
 2223  0
             }
 2224  
             
 2225  
             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
 2226  0
                 DocumentLengthWordCount result = new DocumentLengthWordCount();
 2227  0
                 if (object == null) return result;
 2228  0
                 result.document = object.document; 
 2229  0
                 result.length = object.length; 
 2230  0
                 result.word = object.word; 
 2231  0
                 result.count = object.count; 
 2232  0
                 return result;
 2233  
             }                 
 2234  
             
 2235  
             public void processWord(String word) throws IOException {
 2236  0
                 last.word = word;
 2237  0
             }   
 2238  
                 
 2239  
             public void processDocument(String document) throws IOException {
 2240  0
                 last.document = document;
 2241  0
             }   
 2242  
                 
 2243  
             
 2244  
             public void processTuple(int length, int count) throws IOException {
 2245  0
                 last.length = length;
 2246  0
                 last.count = count;
 2247  0
                 processor.process(clone(last));
 2248  0
             }               
 2249  
             
 2250  
             public void close() throws IOException {
 2251  0
                 processor.close();
 2252  0
             }
 2253  
         }     
 2254  0
         public static class TupleShredder implements Processor {
 2255  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();
 2256  
             public ShreddedProcessor processor;
 2257  
             
 2258  0
             public TupleShredder(ShreddedProcessor processor) {
 2259  0
                 this.processor = processor;
 2260  0
             }                              
 2261  
             
 2262  
             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
 2263  0
                 DocumentLengthWordCount result = new DocumentLengthWordCount();
 2264  0
                 if (object == null) return result;
 2265  0
                 result.document = object.document; 
 2266  0
                 result.length = object.length; 
 2267  0
                 result.word = object.word; 
 2268  0
                 result.count = object.count; 
 2269  0
                 return result;
 2270  
             }                 
 2271  
             
 2272  
             public void process(DocumentLengthWordCount object) throws IOException {                                                                                                                                                   
 2273  0
                 boolean processAll = false;
 2274  0
                 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
 2275  0
                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
 2276  0
                 processor.processTuple(object.length, object.count);                                         
 2277  0
             }
 2278  
                           
 2279  
             public Class<DocumentLengthWordCount> getInputClass() {
 2280  0
                 return DocumentLengthWordCount.class;
 2281  
             }
 2282  
             
 2283  
             public void close() throws IOException {
 2284  0
                 processor.close();
 2285  0
             }                     
 2286  
         }
 2287  
     } 
 2288  4
     public static class DocumentOrder implements Order<DocumentLengthWordCount> {
 2289  
         public int hash(DocumentLengthWordCount object) {
 2290  0
             int h = 0;
 2291  0
             h += Utility.hash(object.document);
 2292  0
             return h;
 2293  
         } 
 2294  
         public Comparator<DocumentLengthWordCount> greaterThan() {
 2295  0
             return new Comparator<DocumentLengthWordCount>() {
 2296  0
                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
 2297  0
                     int result = 0;
 2298  
                     do {
 2299  0
                         result = + Utility.compare(one.document, two.document);
 2300  0
                         if(result != 0) break;
 2301  
                     } while (false);
 2302  0
                     return -result;
 2303  
                 }
 2304  
             };
 2305  
         }     
 2306  
         public Comparator<DocumentLengthWordCount> lessThan() {
 2307  0
             return new Comparator<DocumentLengthWordCount>() {
 2308  0
                 public int compare(DocumentLengthWordCount one, DocumentLengthWordCount two) {
 2309  0
                     int result = 0;
 2310  
                     do {
 2311  0
                         result = + Utility.compare(one.document, two.document);
 2312  0
                         if(result != 0) break;
 2313  
                     } while (false);
 2314  0
                     return result;
 2315  
                 }
 2316  
             };
 2317  
         }     
 2318  
         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input) {
 2319  0
             return new ShreddedReader(_input);
 2320  
         }    
 2321  
 
 2322  
         public TypeReader<DocumentLengthWordCount> orderedReader(ArrayInput _input, int bufferSize) {
 2323  0
             return new ShreddedReader(_input, bufferSize);
 2324  
         }    
 2325  
         public OrderedWriter<DocumentLengthWordCount> orderedWriter(ArrayOutput _output) {
 2326  0
             ShreddedWriter w = new ShreddedWriter(_output);
 2327  0
             return new OrderedWriterClass(w); 
 2328  
         }                                    
 2329  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentLengthWordCount > {
 2330  0
             DocumentLengthWordCount last = null;
 2331  0
             ShreddedWriter shreddedWriter = null; 
 2332  
             
 2333  0
             public OrderedWriterClass(ShreddedWriter s) {
 2334  0
                 this.shreddedWriter = s;
 2335  0
             }
 2336  
             
 2337  
             public void process(DocumentLengthWordCount object) throws IOException {
 2338  0
                boolean processAll = false;
 2339  0
                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
 2340  0
                shreddedWriter.processTuple(object.length, object.word, object.count);
 2341  0
                last = object;
 2342  0
             }           
 2343  
                  
 2344  
             public void close() throws IOException {
 2345  0
                 shreddedWriter.close();
 2346  0
             }
 2347  
             
 2348  
             public Class<DocumentLengthWordCount> getInputClass() {
 2349  0
                 return DocumentLengthWordCount.class;
 2350  
             }
 2351  
         } 
 2352  
         public ReaderSource<DocumentLengthWordCount> orderedCombiner(Collection<TypeReader<DocumentLengthWordCount>> readers, boolean closeOnExit) {
 2353  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 2354  
             
 2355  0
             for (TypeReader<DocumentLengthWordCount> reader : readers) {
 2356  0
                 shreddedReaders.add((ShreddedReader)reader);
 2357  
             }
 2358  
             
 2359  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 2360  
         }                  
 2361  
         public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
 2362  0
             DocumentLengthWordCount result = new DocumentLengthWordCount();
 2363  0
             if (object == null) return result;
 2364  0
             result.document = object.document; 
 2365  0
             result.length = object.length; 
 2366  0
             result.word = object.word; 
 2367  0
             result.count = object.count; 
 2368  0
             return result;
 2369  
         }                 
 2370  
         public Class<DocumentLengthWordCount> getOrderedClass() {
 2371  0
             return DocumentLengthWordCount.class;
 2372  
         }                           
 2373  
         public String[] getOrderSpec() {
 2374  0
             return new String[] {"+document"};
 2375  
         }
 2376  
 
 2377  
         public static String getSpecString() {
 2378  0
             return "+document";
 2379  
         }
 2380  
                            
 2381  
         public interface ShreddedProcessor extends Step {
 2382  
             public void processDocument(String document) throws IOException;
 2383  
             public void processTuple(int length, String word, int count) throws IOException;
 2384  
             public void close() throws IOException;
 2385  
         }    
 2386  
         public interface ShreddedSource extends Step {
 2387  
         }                                              
 2388  
         
 2389  0
         public static class ShreddedWriter implements ShreddedProcessor {
 2390  
             ArrayOutput output;
 2391  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 2392  
             String lastDocument;
 2393  0
             boolean lastFlush = false;
 2394  
             
 2395  0
             public ShreddedWriter(ArrayOutput output) {
 2396  0
                 this.output = output;
 2397  0
             }                        
 2398  
             
 2399  
             public void close() throws IOException {
 2400  0
                 flush();
 2401  0
             }
 2402  
             
 2403  
             public void processDocument(String document) {
 2404  0
                 lastDocument = document;
 2405  0
                 buffer.processDocument(document);
 2406  0
             }
 2407  
             public final void processTuple(int length, String word, int count) throws IOException {
 2408  0
                 if (lastFlush) {
 2409  0
                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
 2410  0
                     lastFlush = false;
 2411  
                 }
 2412  0
                 buffer.processTuple(length, word, count);
 2413  0
                 if (buffer.isFull())
 2414  0
                     flush();
 2415  0
             }
 2416  
             public final void flushTuples(int pauseIndex) throws IOException {
 2417  
                 
 2418  0
                 while (buffer.getReadIndex() < pauseIndex) {
 2419  
                            
 2420  0
                     output.writeInt(buffer.getLength());
 2421  0
                     output.writeString(buffer.getWord());
 2422  0
                     output.writeInt(buffer.getCount());
 2423  0
                     buffer.incrementTuple();
 2424  
                 }
 2425  0
             }  
 2426  
             public final void flushDocument(int pauseIndex) throws IOException {
 2427  0
                 while (buffer.getReadIndex() < pauseIndex) {
 2428  0
                     int nextPause = buffer.getDocumentEndIndex();
 2429  0
                     int count = nextPause - buffer.getReadIndex();
 2430  
                     
 2431  0
                     output.writeString(buffer.getDocument());
 2432  0
                     output.writeInt(count);
 2433  0
                     buffer.incrementDocument();
 2434  
                       
 2435  0
                     flushTuples(nextPause);
 2436  0
                     assert nextPause == buffer.getReadIndex();
 2437  0
                 }
 2438  0
             }
 2439  
             public void flush() throws IOException { 
 2440  0
                 flushDocument(buffer.getWriteIndex());
 2441  0
                 buffer.reset(); 
 2442  0
                 lastFlush = true;
 2443  0
             }                           
 2444  
         }
 2445  0
         public static class ShreddedBuffer {
 2446  0
             ArrayList<String> documents = new ArrayList();
 2447  0
             ArrayList<Integer> documentTupleIdx = new ArrayList();
 2448  0
             int documentReadIdx = 0;
 2449  
                             
 2450  
             int[] lengths;
 2451  
             String[] words;
 2452  
             int[] counts;
 2453  0
             int writeTupleIndex = 0;
 2454  0
             int readTupleIndex = 0;
 2455  
             int batchSize;
 2456  
 
 2457  0
             public ShreddedBuffer(int batchSize) {
 2458  0
                 this.batchSize = batchSize;
 2459  
 
 2460  0
                 lengths = new int[batchSize];
 2461  0
                 words = new String[batchSize];
 2462  0
                 counts = new int[batchSize];
 2463  0
             }                              
 2464  
 
 2465  
             public ShreddedBuffer() {    
 2466  0
                 this(10000);
 2467  0
             }                                                                                                                    
 2468  
             
 2469  
             public void processDocument(String document) {
 2470  0
                 documents.add(document);
 2471  0
                 documentTupleIdx.add(writeTupleIndex);
 2472  0
             }                                      
 2473  
             public void processTuple(int length, String word, int count) {
 2474  0
                 assert documents.size() > 0;
 2475  0
                 lengths[writeTupleIndex] = length;
 2476  0
                 words[writeTupleIndex] = word;
 2477  0
                 counts[writeTupleIndex] = count;
 2478  0
                 writeTupleIndex++;
 2479  0
             }
 2480  
             public void resetData() {
 2481  0
                 documents.clear();
 2482  0
                 documentTupleIdx.clear();
 2483  0
                 writeTupleIndex = 0;
 2484  0
             }                  
 2485  
                                  
 2486  
             public void resetRead() {
 2487  0
                 readTupleIndex = 0;
 2488  0
                 documentReadIdx = 0;
 2489  0
             } 
 2490  
 
 2491  
             public void reset() {
 2492  0
                 resetData();
 2493  0
                 resetRead();
 2494  0
             } 
 2495  
             public boolean isFull() {
 2496  0
                 return writeTupleIndex >= batchSize;
 2497  
             }
 2498  
 
 2499  
             public boolean isEmpty() {
 2500  0
                 return writeTupleIndex == 0;
 2501  
             }                          
 2502  
 
 2503  
             public boolean isAtEnd() {
 2504  0
                 return readTupleIndex >= writeTupleIndex;
 2505  
             }           
 2506  
             public void incrementDocument() {
 2507  0
                 documentReadIdx++;  
 2508  0
             }                                                                                              
 2509  
 
 2510  
             public void autoIncrementDocument() {
 2511  0
                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
 2512  0
                     documentReadIdx++;
 2513  0
             }                 
 2514  
             public void incrementTuple() {
 2515  0
                 readTupleIndex++;
 2516  0
             }                    
 2517  
             public int getDocumentEndIndex() {
 2518  0
                 if ((documentReadIdx+1) >= documentTupleIdx.size())
 2519  0
                     return writeTupleIndex;
 2520  0
                 return documentTupleIdx.get(documentReadIdx+1);
 2521  
             }
 2522  
             public int getReadIndex() {
 2523  0
                 return readTupleIndex;
 2524  
             }   
 2525  
 
 2526  
             public int getWriteIndex() {
 2527  0
                 return writeTupleIndex;
 2528  
             } 
 2529  
             public String getDocument() {
 2530  0
                 assert readTupleIndex < writeTupleIndex;
 2531  0
                 assert documentReadIdx < documents.size();
 2532  
                 
 2533  0
                 return documents.get(documentReadIdx);
 2534  
             }
 2535  
             public int getLength() {
 2536  0
                 assert readTupleIndex < writeTupleIndex;
 2537  0
                 return lengths[readTupleIndex];
 2538  
             }                                         
 2539  
             public String getWord() {
 2540  0
                 assert readTupleIndex < writeTupleIndex;
 2541  0
                 return words[readTupleIndex];
 2542  
             }                                         
 2543  
             public int getCount() {
 2544  0
                 assert readTupleIndex < writeTupleIndex;
 2545  0
                 return counts[readTupleIndex];
 2546  
             }                                         
 2547  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 2548  0
                 while (getReadIndex() < endIndex) {
 2549  0
                    output.processTuple(getLength(), getWord(), getCount());
 2550  0
                    incrementTuple();
 2551  
                 }
 2552  0
             }                                                                           
 2553  
             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
 2554  0
                 while (getReadIndex() < endIndex) {
 2555  0
                     output.processDocument(getDocument());
 2556  0
                     assert getDocumentEndIndex() <= endIndex;
 2557  0
                     copyTuples(getDocumentEndIndex(), output);
 2558  0
                     incrementDocument();
 2559  
                 }
 2560  0
             }  
 2561  
             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 2562  0
                 while (!isAtEnd()) {
 2563  0
                     if (other != null) {   
 2564  0
                         assert !other.isAtEnd();
 2565  0
                         int c = + Utility.compare(getDocument(), other.getDocument());
 2566  
                     
 2567  0
                         if (c > 0) {
 2568  0
                             break;   
 2569  
                         }
 2570  
                         
 2571  0
                         output.processDocument(getDocument());
 2572  
                                       
 2573  0
                         copyTuples(getDocumentEndIndex(), output);
 2574  0
                     } else {
 2575  0
                         output.processDocument(getDocument());
 2576  0
                         copyTuples(getDocumentEndIndex(), output);
 2577  
                     }
 2578  0
                     incrementDocument();  
 2579  
                     
 2580  
                
 2581  
                 }
 2582  0
             }
 2583  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 2584  0
                 copyUntilDocument(other, output);
 2585  0
             }
 2586  
             
 2587  
         }                         
 2588  0
         public static class ShreddedCombiner implements ReaderSource<DocumentLengthWordCount>, ShreddedSource {   
 2589  
             public ShreddedProcessor processor;
 2590  
             Collection<ShreddedReader> readers;       
 2591  0
             boolean closeOnExit = false;
 2592  0
             boolean uninitialized = true;
 2593  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 2594  
             
 2595  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 2596  0
                 this.readers = readers;                                                       
 2597  0
                 this.closeOnExit = closeOnExit;
 2598  0
             }
 2599  
                                   
 2600  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 2601  0
                 if (processor instanceof ShreddedProcessor) {
 2602  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 2603  0
                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
 2604  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
 2605  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 2606  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
 2607  
                 } else {
 2608  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 2609  
                 }
 2610  0
             }                                
 2611  
             
 2612  
             public Class<DocumentLengthWordCount> getOutputClass() {
 2613  0
                 return DocumentLengthWordCount.class;
 2614  
             }
 2615  
             
 2616  
             public void initialize() throws IOException {
 2617  0
                 for (ShreddedReader reader : readers) {
 2618  0
                     reader.fill();                                        
 2619  
                     
 2620  0
                     if (!reader.getBuffer().isAtEnd())
 2621  0
                         queue.add(reader);
 2622  
                 }   
 2623  
 
 2624  0
                 uninitialized = false;
 2625  0
             }
 2626  
 
 2627  
             public void run() throws IOException {
 2628  0
                 initialize();
 2629  
                
 2630  0
                 while (queue.size() > 0) {
 2631  0
                     ShreddedReader top = queue.poll();
 2632  0
                     ShreddedReader next = null;
 2633  0
                     ShreddedBuffer nextBuffer = null; 
 2634  
                     
 2635  0
                     assert !top.getBuffer().isAtEnd();
 2636  
                                                   
 2637  0
                     if (queue.size() > 0) {
 2638  0
                         next = queue.peek();
 2639  0
                         nextBuffer = next.getBuffer();
 2640  0
                         assert !nextBuffer.isAtEnd();
 2641  
                     }
 2642  
                     
 2643  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 2644  0
                     if (top.getBuffer().isAtEnd())
 2645  0
                         top.fill();                 
 2646  
                         
 2647  0
                     if (!top.getBuffer().isAtEnd())
 2648  0
                         queue.add(top);
 2649  0
                 }              
 2650  
                 
 2651  0
                 if (closeOnExit)
 2652  0
                     processor.close();
 2653  0
             }
 2654  
 
 2655  
             public DocumentLengthWordCount read() throws IOException {
 2656  0
                 if (uninitialized)
 2657  0
                     initialize();
 2658  
 
 2659  0
                 DocumentLengthWordCount result = null;
 2660  
 
 2661  0
                 while (queue.size() > 0) {
 2662  0
                     ShreddedReader top = queue.poll();
 2663  0
                     result = top.read();
 2664  
 
 2665  0
                     if (result != null) {
 2666  0
                         if (top.getBuffer().isAtEnd())
 2667  0
                             top.fill();
 2668  
 
 2669  0
                         queue.offer(top);
 2670  0
                         break;
 2671  
                     } 
 2672  0
                 }
 2673  
 
 2674  0
                 return result;
 2675  
             }
 2676  
         } 
 2677  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentLengthWordCount>, ShreddedSource {      
 2678  
             public ShreddedProcessor processor;
 2679  
             ShreddedBuffer buffer;
 2680  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();         
 2681  0
             long updateDocumentCount = -1;
 2682  0
             long tupleCount = 0;
 2683  0
             long bufferStartCount = 0;  
 2684  
             ArrayInput input;
 2685  
             
 2686  0
             public ShreddedReader(ArrayInput input) {
 2687  0
                 this.input = input; 
 2688  0
                 this.buffer = new ShreddedBuffer();
 2689  0
             }                               
 2690  
             
 2691  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 2692  0
                 this.input = input;
 2693  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 2694  0
             }
 2695  
                  
 2696  
             public final int compareTo(ShreddedReader other) {
 2697  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 2698  
                 
 2699  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 2700  0
                     return 0;                 
 2701  0
                 } else if (buffer.isAtEnd()) {
 2702  0
                     return -1;
 2703  0
                 } else if (otherBuffer.isAtEnd()) {
 2704  0
                     return 1;
 2705  
                 }
 2706  
                                    
 2707  0
                 int result = 0;
 2708  
                 do {
 2709  0
                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
 2710  0
                     if(result != 0) break;
 2711  
                 } while (false);                                             
 2712  
                 
 2713  0
                 return result;
 2714  
             }
 2715  
             
 2716  
             public final ShreddedBuffer getBuffer() {
 2717  0
                 return buffer;
 2718  
             }                
 2719  
             
 2720  
             public final DocumentLengthWordCount read() throws IOException {
 2721  0
                 if (buffer.isAtEnd()) {
 2722  0
                     fill();             
 2723  
                 
 2724  0
                     if (buffer.isAtEnd()) {
 2725  0
                         return null;
 2726  
                     }
 2727  
                 }
 2728  
                       
 2729  0
                 assert !buffer.isAtEnd();
 2730  0
                 DocumentLengthWordCount result = new DocumentLengthWordCount();
 2731  
                 
 2732  0
                 result.document = buffer.getDocument();
 2733  0
                 result.length = buffer.getLength();
 2734  0
                 result.word = buffer.getWord();
 2735  0
                 result.count = buffer.getCount();
 2736  
                 
 2737  0
                 buffer.incrementTuple();
 2738  0
                 buffer.autoIncrementDocument();
 2739  
                 
 2740  0
                 return result;
 2741  
             }           
 2742  
             
 2743  
             public final void fill() throws IOException {
 2744  
                 try {   
 2745  0
                     buffer.reset();
 2746  
                     
 2747  0
                     if (tupleCount != 0) {
 2748  
                                                       
 2749  0
                         if(updateDocumentCount - tupleCount > 0) {
 2750  0
                             buffer.documents.add(last.document);
 2751  0
                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
 2752  
                         }
 2753  0
                         bufferStartCount = tupleCount;
 2754  
                     }
 2755  
                     
 2756  0
                     while (!buffer.isFull()) {
 2757  0
                         updateDocument();
 2758  0
                         buffer.processTuple(input.readInt(), input.readString(), input.readInt());
 2759  0
                         tupleCount++;
 2760  
                     }
 2761  0
                 } catch(EOFException e) {}
 2762  0
             }
 2763  
 
 2764  
             public final void updateDocument() throws IOException {
 2765  0
                 if (updateDocumentCount > tupleCount)
 2766  0
                     return;
 2767  
                      
 2768  0
                 last.document = input.readString();
 2769  0
                 updateDocumentCount = tupleCount + input.readInt();
 2770  
                                       
 2771  0
                 buffer.processDocument(last.document);
 2772  0
             }
 2773  
 
 2774  
             public void run() throws IOException {
 2775  
                 while (true) {
 2776  0
                     fill();
 2777  
                     
 2778  0
                     if (buffer.isAtEnd())
 2779  0
                         break;
 2780  
                     
 2781  0
                     buffer.copyUntil(null, processor);
 2782  
                 }      
 2783  0
                 processor.close();
 2784  0
             }
 2785  
             
 2786  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 2787  0
                 if (processor instanceof ShreddedProcessor) {
 2788  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 2789  0
                 } else if (processor instanceof DocumentLengthWordCount.Processor) {
 2790  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentLengthWordCount.Processor) processor));
 2791  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 2792  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentLengthWordCount>) processor));
 2793  
                 } else {
 2794  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 2795  
                 }
 2796  0
             }                                
 2797  
             
 2798  
             public Class<DocumentLengthWordCount> getOutputClass() {
 2799  0
                 return DocumentLengthWordCount.class;
 2800  
             }                
 2801  
         }
 2802  
         
 2803  
         public static class DuplicateEliminator implements ShreddedProcessor {
 2804  
             public ShreddedProcessor processor;
 2805  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();
 2806  0
             boolean documentProcess = true;
 2807  
                                            
 2808  0
             public DuplicateEliminator() {}
 2809  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 2810  0
                 this.processor = processor;
 2811  0
             }
 2812  
             
 2813  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 2814  0
                 this.processor = processor;
 2815  0
             }
 2816  
 
 2817  
             public void processDocument(String document) throws IOException {  
 2818  0
                 if (documentProcess || Utility.compare(document, last.document) != 0) {
 2819  0
                     last.document = document;
 2820  0
                     processor.processDocument(document);
 2821  0
                     documentProcess = false;
 2822  
                 }
 2823  0
             }  
 2824  
             
 2825  
             public void resetDocument() {
 2826  0
                  documentProcess = true;
 2827  0
             }                                                
 2828  
                                
 2829  
             public void processTuple(int length, String word, int count) throws IOException {
 2830  0
                 processor.processTuple(length, word, count);
 2831  0
             } 
 2832  
             
 2833  
             public void close() throws IOException {
 2834  0
                 processor.close();
 2835  0
             }                    
 2836  
         }
 2837  
         public static class TupleUnshredder implements ShreddedProcessor {
 2838  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();
 2839  
             public org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor;                               
 2840  
             
 2841  0
             public TupleUnshredder(DocumentLengthWordCount.Processor processor) {
 2842  0
                 this.processor = processor;
 2843  0
             }         
 2844  
             
 2845  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentLengthWordCount> processor) {
 2846  0
                 this.processor = processor;
 2847  0
             }
 2848  
             
 2849  
             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
 2850  0
                 DocumentLengthWordCount result = new DocumentLengthWordCount();
 2851  0
                 if (object == null) return result;
 2852  0
                 result.document = object.document; 
 2853  0
                 result.length = object.length; 
 2854  0
                 result.word = object.word; 
 2855  0
                 result.count = object.count; 
 2856  0
                 return result;
 2857  
             }                 
 2858  
             
 2859  
             public void processDocument(String document) throws IOException {
 2860  0
                 last.document = document;
 2861  0
             }   
 2862  
                 
 2863  
             
 2864  
             public void processTuple(int length, String word, int count) throws IOException {
 2865  0
                 last.length = length;
 2866  0
                 last.word = word;
 2867  0
                 last.count = count;
 2868  0
                 processor.process(clone(last));
 2869  0
             }               
 2870  
             
 2871  
             public void close() throws IOException {
 2872  0
                 processor.close();
 2873  0
             }
 2874  
         }     
 2875  0
         public static class TupleShredder implements Processor {
 2876  0
             DocumentLengthWordCount last = new DocumentLengthWordCount();
 2877  
             public ShreddedProcessor processor;
 2878  
             
 2879  0
             public TupleShredder(ShreddedProcessor processor) {
 2880  0
                 this.processor = processor;
 2881  0
             }                              
 2882  
             
 2883  
             public DocumentLengthWordCount clone(DocumentLengthWordCount object) {
 2884  0
                 DocumentLengthWordCount result = new DocumentLengthWordCount();
 2885  0
                 if (object == null) return result;
 2886  0
                 result.document = object.document; 
 2887  0
                 result.length = object.length; 
 2888  0
                 result.word = object.word; 
 2889  0
                 result.count = object.count; 
 2890  0
                 return result;
 2891  
             }                 
 2892  
             
 2893  
             public void process(DocumentLengthWordCount object) throws IOException {                                                                                                                                                   
 2894  0
                 boolean processAll = false;
 2895  0
                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
 2896  0
                 processor.processTuple(object.length, object.word, object.count);                                         
 2897  0
             }
 2898  
                           
 2899  
             public Class<DocumentLengthWordCount> getInputClass() {
 2900  0
                 return DocumentLengthWordCount.class;
 2901  
             }
 2902  
             
 2903  
             public void close() throws IOException {
 2904  0
                 processor.close();
 2905  0
             }                     
 2906  
         }
 2907  
     } 
 2908  
 }