Coverage Report - org.galagosearch.core.types.DocumentWordProbability
 
Classes in this File Line Coverage Branch Coverage Complexity
DocumentWordProbability
0%
0/16
0%
0/6
0
DocumentWordProbability$DocumentOrder
0%
0/24
0%
0/4
0
DocumentWordProbability$DocumentOrder$1
0%
0/5
0%
0/2
0
DocumentWordProbability$DocumentOrder$2
0%
0/5
0%
0/2
0
DocumentWordProbability$DocumentOrder$DuplicateEliminator
0%
0/19
0%
0/4
0
DocumentWordProbability$DocumentOrder$OrderedWriterClass
0%
0/14
0%
0/6
0
DocumentWordProbability$DocumentOrder$ShreddedBuffer
0%
0/78
0%
0/50
0
DocumentWordProbability$DocumentOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentWordProbability$DocumentOrder$ShreddedProcessor
N/A
N/A
0
DocumentWordProbability$DocumentOrder$ShreddedReader
0%
0/70
0%
0/34
0
DocumentWordProbability$DocumentOrder$ShreddedSource
N/A
N/A
0
DocumentWordProbability$DocumentOrder$ShreddedWriter
0%
0/37
0%
0/14
0
DocumentWordProbability$DocumentOrder$TupleShredder
0%
0/18
0%
0/8
0
DocumentWordProbability$DocumentOrder$TupleUnshredder
0%
0/21
0%
0/2
0
DocumentWordProbability$DocumentWordOrder
0%
0/25
0%
0/4
0
DocumentWordProbability$DocumentWordOrder$1
0%
0/7
0%
0/4
0
DocumentWordProbability$DocumentWordOrder$2
0%
0/7
0%
0/4
0
DocumentWordProbability$DocumentWordOrder$DuplicateEliminator
0%
0/29
0%
0/8
0
DocumentWordProbability$DocumentWordOrder$OrderedWriterClass
0%
0/15
0%
0/12
0
DocumentWordProbability$DocumentWordOrder$ShreddedBuffer
0%
0/121
0%
0/86
0
DocumentWordProbability$DocumentWordOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentWordProbability$DocumentWordOrder$ShreddedProcessor
N/A
N/A
0
DocumentWordProbability$DocumentWordOrder$ShreddedReader
0%
0/84
0%
0/40
0
DocumentWordProbability$DocumentWordOrder$ShreddedSource
N/A
N/A
0
DocumentWordProbability$DocumentWordOrder$ShreddedWriter
0%
0/50
0%
0/22
0
DocumentWordProbability$DocumentWordOrder$TupleShredder
0%
0/19
0%
0/14
0
DocumentWordProbability$DocumentWordOrder$TupleUnshredder
0%
0/22
0%
0/2
0
DocumentWordProbability$Processor
N/A
N/A
0
DocumentWordProbability$Source
N/A
N/A
0
DocumentWordProbability$WordOrder
0%
0/24
0%
0/4
0
DocumentWordProbability$WordOrder$1
0%
0/5
0%
0/2
0
DocumentWordProbability$WordOrder$2
0%
0/5
0%
0/2
0
DocumentWordProbability$WordOrder$DuplicateEliminator
0%
0/19
0%
0/4
0
DocumentWordProbability$WordOrder$OrderedWriterClass
0%
0/14
0%
0/6
0
DocumentWordProbability$WordOrder$ShreddedBuffer
0%
0/78
0%
0/50
0
DocumentWordProbability$WordOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentWordProbability$WordOrder$ShreddedProcessor
N/A
N/A
0
DocumentWordProbability$WordOrder$ShreddedReader
0%
0/70
0%
0/34
0
DocumentWordProbability$WordOrder$ShreddedSource
N/A
N/A
0
DocumentWordProbability$WordOrder$ShreddedWriter
0%
0/37
0%
0/14
0
DocumentWordProbability$WordOrder$TupleShredder
0%
0/18
0%
0/8
0
DocumentWordProbability$WordOrder$TupleUnshredder
0%
0/21
0%
0/2
0
 
 1  
 // This file was automatically generated with the command: 
 2  
 //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
 3  
 package org.galagosearch.core.types;
 4  
 
 5  
 import org.galagosearch.tupleflow.Utility;
 6  
 import org.galagosearch.tupleflow.ArrayInput;
 7  
 import org.galagosearch.tupleflow.ArrayOutput;
 8  
 import org.galagosearch.tupleflow.Order;   
 9  
 import org.galagosearch.tupleflow.OrderedWriter;
 10  
 import org.galagosearch.tupleflow.Type; 
 11  
 import org.galagosearch.tupleflow.TypeReader;
 12  
 import org.galagosearch.tupleflow.Step; 
 13  
 import org.galagosearch.tupleflow.IncompatibleProcessorException;
 14  
 import org.galagosearch.tupleflow.ReaderSource;
 15  
 import java.io.IOException;             
 16  
 import java.io.EOFException;
 17  
 import java.io.UnsupportedEncodingException;
 18  
 import java.util.ArrayList;
 19  
 import java.util.Arrays;   
 20  
 import java.util.Comparator;
 21  
 import java.util.PriorityQueue;
 22  
 import java.util.Collection;
 23  
 
 24  
 public class DocumentWordProbability implements Type<DocumentWordProbability> {
 25  
     public String document;
 26  
     public byte[] word;
 27  
     public double probability; 
 28  
     
 29  0
     public DocumentWordProbability() {}
 30  0
     public DocumentWordProbability(String document, byte[] word, double probability) {
 31  0
         this.document = document;
 32  0
         this.word = word;
 33  0
         this.probability = probability;
 34  0
     }  
 35  
     
 36  
     public String toString() {
 37  
         try {
 38  0
             return String.format("%s,%s,%f",
 39  
                                    document, new String(word, "UTF-8"), probability);
 40  0
         } catch(UnsupportedEncodingException e) {
 41  0
             throw new RuntimeException("Couldn't convert string to UTF-8.");
 42  
         }
 43  
     } 
 44  
 
 45  
     public Order<DocumentWordProbability> getOrder(String... spec) {
 46  0
         if (Arrays.equals(spec, new String[] { "+document", "+word" })) {
 47  0
             return new DocumentWordOrder();
 48  
         }
 49  0
         if (Arrays.equals(spec, new String[] { "+word" })) {
 50  0
             return new WordOrder();
 51  
         }
 52  0
         if (Arrays.equals(spec, new String[] { "+document" })) {
 53  0
             return new DocumentOrder();
 54  
         }
 55  0
         return null;
 56  
     } 
 57  
       
 58  
     public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentWordProbability> {
 59  
         public void process(DocumentWordProbability object) throws IOException;
 60  
         public void close() throws IOException;
 61  
     }                        
 62  
     public interface Source extends Step {
 63  
     }
 64  0
     public static class DocumentWordOrder implements Order<DocumentWordProbability> {
 65  
         public int hash(DocumentWordProbability object) {
 66  0
             int h = 0;
 67  0
             h += Utility.hash(object.document);
 68  0
             h += Utility.hash(object.word);
 69  0
             return h;
 70  
         } 
 71  
         public Comparator<DocumentWordProbability> greaterThan() {
 72  0
             return new Comparator<DocumentWordProbability>() {
 73  0
                 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
 74  0
                     int result = 0;
 75  
                     do {
 76  0
                         result = + Utility.compare(one.document, two.document);
 77  0
                         if(result != 0) break;
 78  0
                         result = + Utility.compare(one.word, two.word);
 79  0
                         if(result != 0) break;
 80  
                     } while (false);
 81  0
                     return -result;
 82  
                 }
 83  
             };
 84  
         }     
 85  
         public Comparator<DocumentWordProbability> lessThan() {
 86  0
             return new Comparator<DocumentWordProbability>() {
 87  0
                 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
 88  0
                     int result = 0;
 89  
                     do {
 90  0
                         result = + Utility.compare(one.document, two.document);
 91  0
                         if(result != 0) break;
 92  0
                         result = + Utility.compare(one.word, two.word);
 93  0
                         if(result != 0) break;
 94  
                     } while (false);
 95  0
                     return result;
 96  
                 }
 97  
             };
 98  
         }     
 99  
         public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input) {
 100  0
             return new ShreddedReader(_input);
 101  
         }    
 102  
 
 103  
         public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input, int bufferSize) {
 104  0
             return new ShreddedReader(_input, bufferSize);
 105  
         }    
 106  
         public OrderedWriter<DocumentWordProbability> orderedWriter(ArrayOutput _output) {
 107  0
             ShreddedWriter w = new ShreddedWriter(_output);
 108  0
             return new OrderedWriterClass(w); 
 109  
         }                                    
 110  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentWordProbability > {
 111  0
             DocumentWordProbability last = null;
 112  0
             ShreddedWriter shreddedWriter = null; 
 113  
             
 114  0
             public OrderedWriterClass(ShreddedWriter s) {
 115  0
                 this.shreddedWriter = s;
 116  0
             }
 117  
             
 118  
             public void process(DocumentWordProbability object) throws IOException {
 119  0
                boolean processAll = false;
 120  0
                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
 121  0
                if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
 122  0
                shreddedWriter.processTuple(object.probability);
 123  0
                last = object;
 124  0
             }           
 125  
                  
 126  
             public void close() throws IOException {
 127  0
                 shreddedWriter.close();
 128  0
             }
 129  
             
 130  
             public Class<DocumentWordProbability> getInputClass() {
 131  0
                 return DocumentWordProbability.class;
 132  
             }
 133  
         } 
 134  
         public ReaderSource<DocumentWordProbability> orderedCombiner(Collection<TypeReader<DocumentWordProbability>> readers, boolean closeOnExit) {
 135  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 136  
             
 137  0
             for (TypeReader<DocumentWordProbability> reader : readers) {
 138  0
                 shreddedReaders.add((ShreddedReader)reader);
 139  
             }
 140  
             
 141  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 142  
         }                  
 143  
         public DocumentWordProbability clone(DocumentWordProbability object) {
 144  0
             DocumentWordProbability result = new DocumentWordProbability();
 145  0
             if (object == null) return result;
 146  0
             result.document = object.document; 
 147  0
             result.word = object.word; 
 148  0
             result.probability = object.probability; 
 149  0
             return result;
 150  
         }                 
 151  
         public Class<DocumentWordProbability> getOrderedClass() {
 152  0
             return DocumentWordProbability.class;
 153  
         }                           
 154  
         public String[] getOrderSpec() {
 155  0
             return new String[] {"+document", "+word"};
 156  
         }
 157  
 
 158  
         public static String getSpecString() {
 159  0
             return "+document +word";
 160  
         }
 161  
                            
 162  
         public interface ShreddedProcessor extends Step {
 163  
             public void processDocument(String document) throws IOException;
 164  
             public void processWord(byte[] word) throws IOException;
 165  
             public void processTuple(double probability) throws IOException;
 166  
             public void close() throws IOException;
 167  
         }    
 168  
         public interface ShreddedSource extends Step {
 169  
         }                                              
 170  
         
 171  0
         public static class ShreddedWriter implements ShreddedProcessor {
 172  
             ArrayOutput output;
 173  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 174  
             String lastDocument;
 175  
             byte[] lastWord;
 176  0
             boolean lastFlush = false;
 177  
             
 178  0
             public ShreddedWriter(ArrayOutput output) {
 179  0
                 this.output = output;
 180  0
             }                        
 181  
             
 182  
             public void close() throws IOException {
 183  0
                 flush();
 184  0
             }
 185  
             
 186  
             public void processDocument(String document) {
 187  0
                 lastDocument = document;
 188  0
                 buffer.processDocument(document);
 189  0
             }
 190  
             public void processWord(byte[] word) {
 191  0
                 lastWord = word;
 192  0
                 buffer.processWord(word);
 193  0
             }
 194  
             public final void processTuple(double probability) throws IOException {
 195  0
                 if (lastFlush) {
 196  0
                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
 197  0
                     if(buffer.words.size() == 0) buffer.processWord(lastWord);
 198  0
                     lastFlush = false;
 199  
                 }
 200  0
                 buffer.processTuple(probability);
 201  0
                 if (buffer.isFull())
 202  0
                     flush();
 203  0
             }
 204  
             public final void flushTuples(int pauseIndex) throws IOException {
 205  
                 
 206  0
                 while (buffer.getReadIndex() < pauseIndex) {
 207  
                            
 208  0
                     output.writeDouble(buffer.getProbability());
 209  0
                     buffer.incrementTuple();
 210  
                 }
 211  0
             }  
 212  
             public final void flushDocument(int pauseIndex) throws IOException {
 213  0
                 while (buffer.getReadIndex() < pauseIndex) {
 214  0
                     int nextPause = buffer.getDocumentEndIndex();
 215  0
                     int count = nextPause - buffer.getReadIndex();
 216  
                     
 217  0
                     output.writeString(buffer.getDocument());
 218  0
                     output.writeInt(count);
 219  0
                     buffer.incrementDocument();
 220  
                       
 221  0
                     flushWord(nextPause);
 222  0
                     assert nextPause == buffer.getReadIndex();
 223  0
                 }
 224  0
             }
 225  
             public final void flushWord(int pauseIndex) throws IOException {
 226  0
                 while (buffer.getReadIndex() < pauseIndex) {
 227  0
                     int nextPause = buffer.getWordEndIndex();
 228  0
                     int count = nextPause - buffer.getReadIndex();
 229  
                     
 230  0
                     output.writeBytes(buffer.getWord());
 231  0
                     output.writeInt(count);
 232  0
                     buffer.incrementWord();
 233  
                       
 234  0
                     flushTuples(nextPause);
 235  0
                     assert nextPause == buffer.getReadIndex();
 236  0
                 }
 237  0
             }
 238  
             public void flush() throws IOException { 
 239  0
                 flushDocument(buffer.getWriteIndex());
 240  0
                 buffer.reset(); 
 241  0
                 lastFlush = true;
 242  0
             }                           
 243  
         }
 244  0
         public static class ShreddedBuffer {
 245  0
             ArrayList<String> documents = new ArrayList();
 246  0
             ArrayList<byte[]> words = new ArrayList();
 247  0
             ArrayList<Integer> documentTupleIdx = new ArrayList();
 248  0
             ArrayList<Integer> wordTupleIdx = new ArrayList();
 249  0
             int documentReadIdx = 0;
 250  0
             int wordReadIdx = 0;
 251  
                             
 252  
             double[] probabilitys;
 253  0
             int writeTupleIndex = 0;
 254  0
             int readTupleIndex = 0;
 255  
             int batchSize;
 256  
 
 257  0
             public ShreddedBuffer(int batchSize) {
 258  0
                 this.batchSize = batchSize;
 259  
 
 260  0
                 probabilitys = new double[batchSize];
 261  0
             }                              
 262  
 
 263  
             public ShreddedBuffer() {    
 264  0
                 this(10000);
 265  0
             }                                                                                                                    
 266  
             
 267  
             public void processDocument(String document) {
 268  0
                 documents.add(document);
 269  0
                 documentTupleIdx.add(writeTupleIndex);
 270  0
             }                                      
 271  
             public void processWord(byte[] word) {
 272  0
                 words.add(word);
 273  0
                 wordTupleIdx.add(writeTupleIndex);
 274  0
             }                                      
 275  
             public void processTuple(double probability) {
 276  0
                 assert documents.size() > 0;
 277  0
                 assert words.size() > 0;
 278  0
                 probabilitys[writeTupleIndex] = probability;
 279  0
                 writeTupleIndex++;
 280  0
             }
 281  
             public void resetData() {
 282  0
                 documents.clear();
 283  0
                 words.clear();
 284  0
                 documentTupleIdx.clear();
 285  0
                 wordTupleIdx.clear();
 286  0
                 writeTupleIndex = 0;
 287  0
             }                  
 288  
                                  
 289  
             public void resetRead() {
 290  0
                 readTupleIndex = 0;
 291  0
                 documentReadIdx = 0;
 292  0
                 wordReadIdx = 0;
 293  0
             } 
 294  
 
 295  
             public void reset() {
 296  0
                 resetData();
 297  0
                 resetRead();
 298  0
             } 
 299  
             public boolean isFull() {
 300  0
                 return writeTupleIndex >= batchSize;
 301  
             }
 302  
 
 303  
             public boolean isEmpty() {
 304  0
                 return writeTupleIndex == 0;
 305  
             }                          
 306  
 
 307  
             public boolean isAtEnd() {
 308  0
                 return readTupleIndex >= writeTupleIndex;
 309  
             }           
 310  
             public void incrementDocument() {
 311  0
                 documentReadIdx++;  
 312  0
             }                                                                                              
 313  
 
 314  
             public void autoIncrementDocument() {
 315  0
                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
 316  0
                     documentReadIdx++;
 317  0
             }                 
 318  
             public void incrementWord() {
 319  0
                 wordReadIdx++;  
 320  0
             }                                                                                              
 321  
 
 322  
             public void autoIncrementWord() {
 323  0
                 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
 324  0
                     wordReadIdx++;
 325  0
             }                 
 326  
             public void incrementTuple() {
 327  0
                 readTupleIndex++;
 328  0
             }                    
 329  
             public int getDocumentEndIndex() {
 330  0
                 if ((documentReadIdx+1) >= documentTupleIdx.size())
 331  0
                     return writeTupleIndex;
 332  0
                 return documentTupleIdx.get(documentReadIdx+1);
 333  
             }
 334  
 
 335  
             public int getWordEndIndex() {
 336  0
                 if ((wordReadIdx+1) >= wordTupleIdx.size())
 337  0
                     return writeTupleIndex;
 338  0
                 return wordTupleIdx.get(wordReadIdx+1);
 339  
             }
 340  
             public int getReadIndex() {
 341  0
                 return readTupleIndex;
 342  
             }   
 343  
 
 344  
             public int getWriteIndex() {
 345  0
                 return writeTupleIndex;
 346  
             } 
 347  
             public String getDocument() {
 348  0
                 assert readTupleIndex < writeTupleIndex;
 349  0
                 assert documentReadIdx < documents.size();
 350  
                 
 351  0
                 return documents.get(documentReadIdx);
 352  
             }
 353  
             public byte[] getWord() {
 354  0
                 assert readTupleIndex < writeTupleIndex;
 355  0
                 assert wordReadIdx < words.size();
 356  
                 
 357  0
                 return words.get(wordReadIdx);
 358  
             }
 359  
             public double getProbability() {
 360  0
                 assert readTupleIndex < writeTupleIndex;
 361  0
                 return probabilitys[readTupleIndex];
 362  
             }                                         
 363  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 364  0
                 while (getReadIndex() < endIndex) {
 365  0
                    output.processTuple(getProbability());
 366  0
                    incrementTuple();
 367  
                 }
 368  0
             }                                                                           
 369  
             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
 370  0
                 while (getReadIndex() < endIndex) {
 371  0
                     output.processDocument(getDocument());
 372  0
                     assert getDocumentEndIndex() <= endIndex;
 373  0
                     copyUntilIndexWord(getDocumentEndIndex(), output);
 374  0
                     incrementDocument();
 375  
                 }
 376  0
             } 
 377  
             public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
 378  0
                 while (getReadIndex() < endIndex) {
 379  0
                     output.processWord(getWord());
 380  0
                     assert getWordEndIndex() <= endIndex;
 381  0
                     copyTuples(getWordEndIndex(), output);
 382  0
                     incrementWord();
 383  
                 }
 384  0
             }  
 385  
             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 386  0
                 while (!isAtEnd()) {
 387  0
                     if (other != null) {   
 388  0
                         assert !other.isAtEnd();
 389  0
                         int c = + Utility.compare(getDocument(), other.getDocument());
 390  
                     
 391  0
                         if (c > 0) {
 392  0
                             break;   
 393  
                         }
 394  
                         
 395  0
                         output.processDocument(getDocument());
 396  
                                       
 397  0
                         if (c < 0) {
 398  0
                             copyUntilIndexWord(getDocumentEndIndex(), output);
 399  0
                         } else if (c == 0) {
 400  0
                             copyUntilWord(other, output);
 401  0
                             autoIncrementDocument();
 402  0
                             break;
 403  
                         }
 404  0
                     } else {
 405  0
                         output.processDocument(getDocument());
 406  0
                         copyUntilIndexWord(getDocumentEndIndex(), output);
 407  
                     }
 408  0
                     incrementDocument();  
 409  
                     
 410  
                
 411  
                 }
 412  0
             }
 413  
             public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 414  0
                 while (!isAtEnd()) {
 415  0
                     if (other != null) {   
 416  0
                         assert !other.isAtEnd();
 417  0
                         int c = + Utility.compare(getWord(), other.getWord());
 418  
                     
 419  0
                         if (c > 0) {
 420  0
                             break;   
 421  
                         }
 422  
                         
 423  0
                         output.processWord(getWord());
 424  
                                       
 425  0
                         copyTuples(getWordEndIndex(), output);
 426  0
                     } else {
 427  0
                         output.processWord(getWord());
 428  0
                         copyTuples(getWordEndIndex(), output);
 429  
                     }
 430  0
                     incrementWord();  
 431  
                     
 432  0
                     if (getDocumentEndIndex() <= readTupleIndex)
 433  0
                         break;   
 434  
                 }
 435  0
             }
 436  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 437  0
                 copyUntilDocument(other, output);
 438  0
             }
 439  
             
 440  
         }                         
 441  0
         public static class ShreddedCombiner implements ReaderSource<DocumentWordProbability>, ShreddedSource {   
 442  
             public ShreddedProcessor processor;
 443  
             Collection<ShreddedReader> readers;       
 444  0
             boolean closeOnExit = false;
 445  0
             boolean uninitialized = true;
 446  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 447  
             
 448  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 449  0
                 this.readers = readers;                                                       
 450  0
                 this.closeOnExit = closeOnExit;
 451  0
             }
 452  
                                   
 453  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 454  0
                 if (processor instanceof ShreddedProcessor) {
 455  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 456  0
                 } else if (processor instanceof DocumentWordProbability.Processor) {
 457  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
 458  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 459  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
 460  
                 } else {
 461  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 462  
                 }
 463  0
             }                                
 464  
             
 465  
             public Class<DocumentWordProbability> getOutputClass() {
 466  0
                 return DocumentWordProbability.class;
 467  
             }
 468  
             
 469  
             public void initialize() throws IOException {
 470  0
                 for (ShreddedReader reader : readers) {
 471  0
                     reader.fill();                                        
 472  
                     
 473  0
                     if (!reader.getBuffer().isAtEnd())
 474  0
                         queue.add(reader);
 475  
                 }   
 476  
 
 477  0
                 uninitialized = false;
 478  0
             }
 479  
 
 480  
             public void run() throws IOException {
 481  0
                 initialize();
 482  
                
 483  0
                 while (queue.size() > 0) {
 484  0
                     ShreddedReader top = queue.poll();
 485  0
                     ShreddedReader next = null;
 486  0
                     ShreddedBuffer nextBuffer = null; 
 487  
                     
 488  0
                     assert !top.getBuffer().isAtEnd();
 489  
                                                   
 490  0
                     if (queue.size() > 0) {
 491  0
                         next = queue.peek();
 492  0
                         nextBuffer = next.getBuffer();
 493  0
                         assert !nextBuffer.isAtEnd();
 494  
                     }
 495  
                     
 496  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 497  0
                     if (top.getBuffer().isAtEnd())
 498  0
                         top.fill();                 
 499  
                         
 500  0
                     if (!top.getBuffer().isAtEnd())
 501  0
                         queue.add(top);
 502  0
                 }              
 503  
                 
 504  0
                 if (closeOnExit)
 505  0
                     processor.close();
 506  0
             }
 507  
 
 508  
             public DocumentWordProbability read() throws IOException {
 509  0
                 if (uninitialized)
 510  0
                     initialize();
 511  
 
 512  0
                 DocumentWordProbability result = null;
 513  
 
 514  0
                 while (queue.size() > 0) {
 515  0
                     ShreddedReader top = queue.poll();
 516  0
                     result = top.read();
 517  
 
 518  0
                     if (result != null) {
 519  0
                         if (top.getBuffer().isAtEnd())
 520  0
                             top.fill();
 521  
 
 522  0
                         queue.offer(top);
 523  0
                         break;
 524  
                     } 
 525  0
                 }
 526  
 
 527  0
                 return result;
 528  
             }
 529  
         } 
 530  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordProbability>, ShreddedSource {      
 531  
             public ShreddedProcessor processor;
 532  
             ShreddedBuffer buffer;
 533  0
             DocumentWordProbability last = new DocumentWordProbability();         
 534  0
             long updateDocumentCount = -1;
 535  0
             long updateWordCount = -1;
 536  0
             long tupleCount = 0;
 537  0
             long bufferStartCount = 0;  
 538  
             ArrayInput input;
 539  
             
 540  0
             public ShreddedReader(ArrayInput input) {
 541  0
                 this.input = input; 
 542  0
                 this.buffer = new ShreddedBuffer();
 543  0
             }                               
 544  
             
 545  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 546  0
                 this.input = input;
 547  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 548  0
             }
 549  
                  
 550  
             public final int compareTo(ShreddedReader other) {
 551  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 552  
                 
 553  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 554  0
                     return 0;                 
 555  0
                 } else if (buffer.isAtEnd()) {
 556  0
                     return -1;
 557  0
                 } else if (otherBuffer.isAtEnd()) {
 558  0
                     return 1;
 559  
                 }
 560  
                                    
 561  0
                 int result = 0;
 562  
                 do {
 563  0
                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
 564  0
                     if(result != 0) break;
 565  0
                     result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
 566  0
                     if(result != 0) break;
 567  
                 } while (false);                                             
 568  
                 
 569  0
                 return result;
 570  
             }
 571  
             
 572  
             public final ShreddedBuffer getBuffer() {
 573  0
                 return buffer;
 574  
             }                
 575  
             
 576  
             public final DocumentWordProbability read() throws IOException {
 577  0
                 if (buffer.isAtEnd()) {
 578  0
                     fill();             
 579  
                 
 580  0
                     if (buffer.isAtEnd()) {
 581  0
                         return null;
 582  
                     }
 583  
                 }
 584  
                       
 585  0
                 assert !buffer.isAtEnd();
 586  0
                 DocumentWordProbability result = new DocumentWordProbability();
 587  
                 
 588  0
                 result.document = buffer.getDocument();
 589  0
                 result.word = buffer.getWord();
 590  0
                 result.probability = buffer.getProbability();
 591  
                 
 592  0
                 buffer.incrementTuple();
 593  0
                 buffer.autoIncrementDocument();
 594  0
                 buffer.autoIncrementWord();
 595  
                 
 596  0
                 return result;
 597  
             }           
 598  
             
 599  
             public final void fill() throws IOException {
 600  
                 try {   
 601  0
                     buffer.reset();
 602  
                     
 603  0
                     if (tupleCount != 0) {
 604  
                                                       
 605  0
                         if(updateDocumentCount - tupleCount > 0) {
 606  0
                             buffer.documents.add(last.document);
 607  0
                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
 608  
                         }                              
 609  0
                         if(updateWordCount - tupleCount > 0) {
 610  0
                             buffer.words.add(last.word);
 611  0
                             buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
 612  
                         }
 613  0
                         bufferStartCount = tupleCount;
 614  
                     }
 615  
                     
 616  0
                     while (!buffer.isFull()) {
 617  0
                         updateWord();
 618  0
                         buffer.processTuple(input.readDouble());
 619  0
                         tupleCount++;
 620  
                     }
 621  0
                 } catch(EOFException e) {}
 622  0
             }
 623  
 
 624  
             public final void updateDocument() throws IOException {
 625  0
                 if (updateDocumentCount > tupleCount)
 626  0
                     return;
 627  
                      
 628  0
                 last.document = input.readString();
 629  0
                 updateDocumentCount = tupleCount + input.readInt();
 630  
                                       
 631  0
                 buffer.processDocument(last.document);
 632  0
             }
 633  
             public final void updateWord() throws IOException {
 634  0
                 if (updateWordCount > tupleCount)
 635  0
                     return;
 636  
                      
 637  0
                 updateDocument();
 638  0
                 last.word = input.readBytes();
 639  0
                 updateWordCount = tupleCount + input.readInt();
 640  
                                       
 641  0
                 buffer.processWord(last.word);
 642  0
             }
 643  
 
 644  
             public void run() throws IOException {
 645  
                 while (true) {
 646  0
                     fill();
 647  
                     
 648  0
                     if (buffer.isAtEnd())
 649  0
                         break;
 650  
                     
 651  0
                     buffer.copyUntil(null, processor);
 652  
                 }      
 653  0
                 processor.close();
 654  0
             }
 655  
             
 656  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 657  0
                 if (processor instanceof ShreddedProcessor) {
 658  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 659  0
                 } else if (processor instanceof DocumentWordProbability.Processor) {
 660  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
 661  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 662  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
 663  
                 } else {
 664  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 665  
                 }
 666  0
             }                                
 667  
             
 668  
             public Class<DocumentWordProbability> getOutputClass() {
 669  0
                 return DocumentWordProbability.class;
 670  
             }                
 671  
         }
 672  
         
 673  
         public static class DuplicateEliminator implements ShreddedProcessor {
 674  
             public ShreddedProcessor processor;
 675  0
             DocumentWordProbability last = new DocumentWordProbability();
 676  0
             boolean documentProcess = true;
 677  0
             boolean wordProcess = true;
 678  
                                            
 679  0
             public DuplicateEliminator() {}
 680  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 681  0
                 this.processor = processor;
 682  0
             }
 683  
             
 684  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 685  0
                 this.processor = processor;
 686  0
             }
 687  
 
 688  
             public void processDocument(String document) throws IOException {  
 689  0
                 if (documentProcess || Utility.compare(document, last.document) != 0) {
 690  0
                     last.document = document;
 691  0
                     processor.processDocument(document);
 692  0
             resetWord();
 693  0
                     documentProcess = false;
 694  
                 }
 695  0
             }
 696  
             public void processWord(byte[] word) throws IOException {  
 697  0
                 if (wordProcess || Utility.compare(word, last.word) != 0) {
 698  0
                     last.word = word;
 699  0
                     processor.processWord(word);
 700  0
                     wordProcess = false;
 701  
                 }
 702  0
             }  
 703  
             
 704  
             public void resetDocument() {
 705  0
                  documentProcess = true;
 706  0
             resetWord();
 707  0
             }                                                
 708  
             public void resetWord() {
 709  0
                  wordProcess = true;
 710  0
             }                                                
 711  
                                
 712  
             public void processTuple(double probability) throws IOException {
 713  0
                 processor.processTuple(probability);
 714  0
             } 
 715  
             
 716  
             public void close() throws IOException {
 717  0
                 processor.close();
 718  0
             }                    
 719  
         }
 720  
         public static class TupleUnshredder implements ShreddedProcessor {
 721  0
             DocumentWordProbability last = new DocumentWordProbability();
 722  
             public org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor;                               
 723  
             
 724  0
             public TupleUnshredder(DocumentWordProbability.Processor processor) {
 725  0
                 this.processor = processor;
 726  0
             }         
 727  
             
 728  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor) {
 729  0
                 this.processor = processor;
 730  0
             }
 731  
             
 732  
             public DocumentWordProbability clone(DocumentWordProbability object) {
 733  0
                 DocumentWordProbability result = new DocumentWordProbability();
 734  0
                 if (object == null) return result;
 735  0
                 result.document = object.document; 
 736  0
                 result.word = object.word; 
 737  0
                 result.probability = object.probability; 
 738  0
                 return result;
 739  
             }                 
 740  
             
 741  
             public void processDocument(String document) throws IOException {
 742  0
                 last.document = document;
 743  0
             }   
 744  
                 
 745  
             public void processWord(byte[] word) throws IOException {
 746  0
                 last.word = word;
 747  0
             }   
 748  
                 
 749  
             
 750  
             public void processTuple(double probability) throws IOException {
 751  0
                 last.probability = probability;
 752  0
                 processor.process(clone(last));
 753  0
             }               
 754  
             
 755  
             public void close() throws IOException {
 756  0
                 processor.close();
 757  0
             }
 758  
         }     
 759  0
         public static class TupleShredder implements Processor {
 760  0
             DocumentWordProbability last = new DocumentWordProbability();
 761  
             public ShreddedProcessor processor;
 762  
             
 763  0
             public TupleShredder(ShreddedProcessor processor) {
 764  0
                 this.processor = processor;
 765  0
             }                              
 766  
             
 767  
             public DocumentWordProbability clone(DocumentWordProbability object) {
 768  0
                 DocumentWordProbability result = new DocumentWordProbability();
 769  0
                 if (object == null) return result;
 770  0
                 result.document = object.document; 
 771  0
                 result.word = object.word; 
 772  0
                 result.probability = object.probability; 
 773  0
                 return result;
 774  
             }                 
 775  
             
 776  
             public void process(DocumentWordProbability object) throws IOException {                                                                                                                                                   
 777  0
                 boolean processAll = false;
 778  0
                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
 779  0
                 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
 780  0
                 processor.processTuple(object.probability);                                         
 781  0
             }
 782  
                           
 783  
             public Class<DocumentWordProbability> getInputClass() {
 784  0
                 return DocumentWordProbability.class;
 785  
             }
 786  
             
 787  
             public void close() throws IOException {
 788  0
                 processor.close();
 789  0
             }                     
 790  
         }
 791  
     } 
 792  0
     public static class WordOrder implements Order<DocumentWordProbability> {
 793  
         public int hash(DocumentWordProbability object) {
 794  0
             int h = 0;
 795  0
             h += Utility.hash(object.word);
 796  0
             return h;
 797  
         } 
 798  
         public Comparator<DocumentWordProbability> greaterThan() {
 799  0
             return new Comparator<DocumentWordProbability>() {
 800  0
                 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
 801  0
                     int result = 0;
 802  
                     do {
 803  0
                         result = + Utility.compare(one.word, two.word);
 804  0
                         if(result != 0) break;
 805  
                     } while (false);
 806  0
                     return -result;
 807  
                 }
 808  
             };
 809  
         }     
 810  
         public Comparator<DocumentWordProbability> lessThan() {
 811  0
             return new Comparator<DocumentWordProbability>() {
 812  0
                 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
 813  0
                     int result = 0;
 814  
                     do {
 815  0
                         result = + Utility.compare(one.word, two.word);
 816  0
                         if(result != 0) break;
 817  
                     } while (false);
 818  0
                     return result;
 819  
                 }
 820  
             };
 821  
         }     
 822  
         public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input) {
 823  0
             return new ShreddedReader(_input);
 824  
         }    
 825  
 
 826  
         public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input, int bufferSize) {
 827  0
             return new ShreddedReader(_input, bufferSize);
 828  
         }    
 829  
         public OrderedWriter<DocumentWordProbability> orderedWriter(ArrayOutput _output) {
 830  0
             ShreddedWriter w = new ShreddedWriter(_output);
 831  0
             return new OrderedWriterClass(w); 
 832  
         }                                    
 833  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentWordProbability > {
 834  0
             DocumentWordProbability last = null;
 835  0
             ShreddedWriter shreddedWriter = null; 
 836  
             
 837  0
             public OrderedWriterClass(ShreddedWriter s) {
 838  0
                 this.shreddedWriter = s;
 839  0
             }
 840  
             
 841  
             public void process(DocumentWordProbability object) throws IOException {
 842  0
                boolean processAll = false;
 843  0
                if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
 844  0
                shreddedWriter.processTuple(object.document, object.probability);
 845  0
                last = object;
 846  0
             }           
 847  
                  
 848  
             public void close() throws IOException {
 849  0
                 shreddedWriter.close();
 850  0
             }
 851  
             
 852  
             public Class<DocumentWordProbability> getInputClass() {
 853  0
                 return DocumentWordProbability.class;
 854  
             }
 855  
         } 
 856  
         public ReaderSource<DocumentWordProbability> orderedCombiner(Collection<TypeReader<DocumentWordProbability>> readers, boolean closeOnExit) {
 857  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 858  
             
 859  0
             for (TypeReader<DocumentWordProbability> reader : readers) {
 860  0
                 shreddedReaders.add((ShreddedReader)reader);
 861  
             }
 862  
             
 863  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 864  
         }                  
 865  
         public DocumentWordProbability clone(DocumentWordProbability object) {
 866  0
             DocumentWordProbability result = new DocumentWordProbability();
 867  0
             if (object == null) return result;
 868  0
             result.document = object.document; 
 869  0
             result.word = object.word; 
 870  0
             result.probability = object.probability; 
 871  0
             return result;
 872  
         }                 
 873  
         public Class<DocumentWordProbability> getOrderedClass() {
 874  0
             return DocumentWordProbability.class;
 875  
         }                           
 876  
         public String[] getOrderSpec() {
 877  0
             return new String[] {"+word"};
 878  
         }
 879  
 
 880  
         public static String getSpecString() {
 881  0
             return "+word";
 882  
         }
 883  
                            
 884  
         public interface ShreddedProcessor extends Step {
 885  
             public void processWord(byte[] word) throws IOException;
 886  
             public void processTuple(String document, double probability) throws IOException;
 887  
             public void close() throws IOException;
 888  
         }    
 889  
         public interface ShreddedSource extends Step {
 890  
         }                                              
 891  
         
 892  0
         public static class ShreddedWriter implements ShreddedProcessor {
 893  
             ArrayOutput output;
 894  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 895  
             byte[] lastWord;
 896  0
             boolean lastFlush = false;
 897  
             
 898  0
             public ShreddedWriter(ArrayOutput output) {
 899  0
                 this.output = output;
 900  0
             }                        
 901  
             
 902  
             public void close() throws IOException {
 903  0
                 flush();
 904  0
             }
 905  
             
 906  
             public void processWord(byte[] word) {
 907  0
                 lastWord = word;
 908  0
                 buffer.processWord(word);
 909  0
             }
 910  
             public final void processTuple(String document, double probability) throws IOException {
 911  0
                 if (lastFlush) {
 912  0
                     if(buffer.words.size() == 0) buffer.processWord(lastWord);
 913  0
                     lastFlush = false;
 914  
                 }
 915  0
                 buffer.processTuple(document, probability);
 916  0
                 if (buffer.isFull())
 917  0
                     flush();
 918  0
             }
 919  
             public final void flushTuples(int pauseIndex) throws IOException {
 920  
                 
 921  0
                 while (buffer.getReadIndex() < pauseIndex) {
 922  
                            
 923  0
                     output.writeString(buffer.getDocument());
 924  0
                     output.writeDouble(buffer.getProbability());
 925  0
                     buffer.incrementTuple();
 926  
                 }
 927  0
             }  
 928  
             public final void flushWord(int pauseIndex) throws IOException {
 929  0
                 while (buffer.getReadIndex() < pauseIndex) {
 930  0
                     int nextPause = buffer.getWordEndIndex();
 931  0
                     int count = nextPause - buffer.getReadIndex();
 932  
                     
 933  0
                     output.writeBytes(buffer.getWord());
 934  0
                     output.writeInt(count);
 935  0
                     buffer.incrementWord();
 936  
                       
 937  0
                     flushTuples(nextPause);
 938  0
                     assert nextPause == buffer.getReadIndex();
 939  0
                 }
 940  0
             }
 941  
             public void flush() throws IOException { 
 942  0
                 flushWord(buffer.getWriteIndex());
 943  0
                 buffer.reset(); 
 944  0
                 lastFlush = true;
 945  0
             }                           
 946  
         }
 947  0
         public static class ShreddedBuffer {
 948  0
             ArrayList<byte[]> words = new ArrayList();
 949  0
             ArrayList<Integer> wordTupleIdx = new ArrayList();
 950  0
             int wordReadIdx = 0;
 951  
                             
 952  
             String[] documents;
 953  
             double[] probabilitys;
 954  0
             int writeTupleIndex = 0;
 955  0
             int readTupleIndex = 0;
 956  
             int batchSize;
 957  
 
 958  0
             public ShreddedBuffer(int batchSize) {
 959  0
                 this.batchSize = batchSize;
 960  
 
 961  0
                 documents = new String[batchSize];
 962  0
                 probabilitys = new double[batchSize];
 963  0
             }                              
 964  
 
 965  
             public ShreddedBuffer() {    
 966  0
                 this(10000);
 967  0
             }                                                                                                                    
 968  
             
 969  
             public void processWord(byte[] word) {
 970  0
                 words.add(word);
 971  0
                 wordTupleIdx.add(writeTupleIndex);
 972  0
             }                                      
 973  
             public void processTuple(String document, double probability) {
 974  0
                 assert words.size() > 0;
 975  0
                 documents[writeTupleIndex] = document;
 976  0
                 probabilitys[writeTupleIndex] = probability;
 977  0
                 writeTupleIndex++;
 978  0
             }
 979  
             public void resetData() {
 980  0
                 words.clear();
 981  0
                 wordTupleIdx.clear();
 982  0
                 writeTupleIndex = 0;
 983  0
             }                  
 984  
                                  
 985  
             public void resetRead() {
 986  0
                 readTupleIndex = 0;
 987  0
                 wordReadIdx = 0;
 988  0
             } 
 989  
 
 990  
             public void reset() {
 991  0
                 resetData();
 992  0
                 resetRead();
 993  0
             } 
 994  
             public boolean isFull() {
 995  0
                 return writeTupleIndex >= batchSize;
 996  
             }
 997  
 
 998  
             public boolean isEmpty() {
 999  0
                 return writeTupleIndex == 0;
 1000  
             }                          
 1001  
 
 1002  
             public boolean isAtEnd() {
 1003  0
                 return readTupleIndex >= writeTupleIndex;
 1004  
             }           
 1005  
             public void incrementWord() {
 1006  0
                 wordReadIdx++;  
 1007  0
             }                                                                                              
 1008  
 
 1009  
             public void autoIncrementWord() {
 1010  0
                 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
 1011  0
                     wordReadIdx++;
 1012  0
             }                 
 1013  
             public void incrementTuple() {
 1014  0
                 readTupleIndex++;
 1015  0
             }                    
 1016  
             public int getWordEndIndex() {
 1017  0
                 if ((wordReadIdx+1) >= wordTupleIdx.size())
 1018  0
                     return writeTupleIndex;
 1019  0
                 return wordTupleIdx.get(wordReadIdx+1);
 1020  
             }
 1021  
             public int getReadIndex() {
 1022  0
                 return readTupleIndex;
 1023  
             }   
 1024  
 
 1025  
             public int getWriteIndex() {
 1026  0
                 return writeTupleIndex;
 1027  
             } 
 1028  
             public byte[] getWord() {
 1029  0
                 assert readTupleIndex < writeTupleIndex;
 1030  0
                 assert wordReadIdx < words.size();
 1031  
                 
 1032  0
                 return words.get(wordReadIdx);
 1033  
             }
 1034  
             public String getDocument() {
 1035  0
                 assert readTupleIndex < writeTupleIndex;
 1036  0
                 return documents[readTupleIndex];
 1037  
             }                                         
 1038  
             public double getProbability() {
 1039  0
                 assert readTupleIndex < writeTupleIndex;
 1040  0
                 return probabilitys[readTupleIndex];
 1041  
             }                                         
 1042  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 1043  0
                 while (getReadIndex() < endIndex) {
 1044  0
                    output.processTuple(getDocument(), getProbability());
 1045  0
                    incrementTuple();
 1046  
                 }
 1047  0
             }                                                                           
 1048  
             public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
 1049  0
                 while (getReadIndex() < endIndex) {
 1050  0
                     output.processWord(getWord());
 1051  0
                     assert getWordEndIndex() <= endIndex;
 1052  0
                     copyTuples(getWordEndIndex(), output);
 1053  0
                     incrementWord();
 1054  
                 }
 1055  0
             }  
 1056  
             public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1057  0
                 while (!isAtEnd()) {
 1058  0
                     if (other != null) {   
 1059  0
                         assert !other.isAtEnd();
 1060  0
                         int c = + Utility.compare(getWord(), other.getWord());
 1061  
                     
 1062  0
                         if (c > 0) {
 1063  0
                             break;   
 1064  
                         }
 1065  
                         
 1066  0
                         output.processWord(getWord());
 1067  
                                       
 1068  0
                         copyTuples(getWordEndIndex(), output);
 1069  0
                     } else {
 1070  0
                         output.processWord(getWord());
 1071  0
                         copyTuples(getWordEndIndex(), output);
 1072  
                     }
 1073  0
                     incrementWord();  
 1074  
                     
 1075  
                
 1076  
                 }
 1077  0
             }
 1078  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1079  0
                 copyUntilWord(other, output);
 1080  0
             }
 1081  
             
 1082  
         }                         
 1083  0
         public static class ShreddedCombiner implements ReaderSource<DocumentWordProbability>, ShreddedSource {   
 1084  
             public ShreddedProcessor processor;
 1085  
             Collection<ShreddedReader> readers;       
 1086  0
             boolean closeOnExit = false;
 1087  0
             boolean uninitialized = true;
 1088  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 1089  
             
 1090  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 1091  0
                 this.readers = readers;                                                       
 1092  0
                 this.closeOnExit = closeOnExit;
 1093  0
             }
 1094  
                                   
 1095  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1096  0
                 if (processor instanceof ShreddedProcessor) {
 1097  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1098  0
                 } else if (processor instanceof DocumentWordProbability.Processor) {
 1099  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
 1100  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1101  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
 1102  
                 } else {
 1103  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1104  
                 }
 1105  0
             }                                
 1106  
             
 1107  
             public Class<DocumentWordProbability> getOutputClass() {
 1108  0
                 return DocumentWordProbability.class;
 1109  
             }
 1110  
             
 1111  
             public void initialize() throws IOException {
 1112  0
                 for (ShreddedReader reader : readers) {
 1113  0
                     reader.fill();                                        
 1114  
                     
 1115  0
                     if (!reader.getBuffer().isAtEnd())
 1116  0
                         queue.add(reader);
 1117  
                 }   
 1118  
 
 1119  0
                 uninitialized = false;
 1120  0
             }
 1121  
 
 1122  
             public void run() throws IOException {
 1123  0
                 initialize();
 1124  
                
 1125  0
                 while (queue.size() > 0) {
 1126  0
                     ShreddedReader top = queue.poll();
 1127  0
                     ShreddedReader next = null;
 1128  0
                     ShreddedBuffer nextBuffer = null; 
 1129  
                     
 1130  0
                     assert !top.getBuffer().isAtEnd();
 1131  
                                                   
 1132  0
                     if (queue.size() > 0) {
 1133  0
                         next = queue.peek();
 1134  0
                         nextBuffer = next.getBuffer();
 1135  0
                         assert !nextBuffer.isAtEnd();
 1136  
                     }
 1137  
                     
 1138  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 1139  0
                     if (top.getBuffer().isAtEnd())
 1140  0
                         top.fill();                 
 1141  
                         
 1142  0
                     if (!top.getBuffer().isAtEnd())
 1143  0
                         queue.add(top);
 1144  0
                 }              
 1145  
                 
 1146  0
                 if (closeOnExit)
 1147  0
                     processor.close();
 1148  0
             }
 1149  
 
 1150  
             public DocumentWordProbability read() throws IOException {
 1151  0
                 if (uninitialized)
 1152  0
                     initialize();
 1153  
 
 1154  0
                 DocumentWordProbability result = null;
 1155  
 
 1156  0
                 while (queue.size() > 0) {
 1157  0
                     ShreddedReader top = queue.poll();
 1158  0
                     result = top.read();
 1159  
 
 1160  0
                     if (result != null) {
 1161  0
                         if (top.getBuffer().isAtEnd())
 1162  0
                             top.fill();
 1163  
 
 1164  0
                         queue.offer(top);
 1165  0
                         break;
 1166  
                     } 
 1167  0
                 }
 1168  
 
 1169  0
                 return result;
 1170  
             }
 1171  
         } 
 1172  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordProbability>, ShreddedSource {      
 1173  
             public ShreddedProcessor processor;
 1174  
             ShreddedBuffer buffer;
 1175  0
             DocumentWordProbability last = new DocumentWordProbability();         
 1176  0
             long updateWordCount = -1;
 1177  0
             long tupleCount = 0;
 1178  0
             long bufferStartCount = 0;  
 1179  
             ArrayInput input;
 1180  
             
 1181  0
             public ShreddedReader(ArrayInput input) {
 1182  0
                 this.input = input; 
 1183  0
                 this.buffer = new ShreddedBuffer();
 1184  0
             }                               
 1185  
             
 1186  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 1187  0
                 this.input = input;
 1188  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 1189  0
             }
 1190  
                  
 1191  
             public final int compareTo(ShreddedReader other) {
 1192  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 1193  
                 
 1194  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 1195  0
                     return 0;                 
 1196  0
                 } else if (buffer.isAtEnd()) {
 1197  0
                     return -1;
 1198  0
                 } else if (otherBuffer.isAtEnd()) {
 1199  0
                     return 1;
 1200  
                 }
 1201  
                                    
 1202  0
                 int result = 0;
 1203  
                 do {
 1204  0
                     result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
 1205  0
                     if(result != 0) break;
 1206  
                 } while (false);                                             
 1207  
                 
 1208  0
                 return result;
 1209  
             }
 1210  
             
 1211  
             public final ShreddedBuffer getBuffer() {
 1212  0
                 return buffer;
 1213  
             }                
 1214  
             
 1215  
             public final DocumentWordProbability read() throws IOException {
 1216  0
                 if (buffer.isAtEnd()) {
 1217  0
                     fill();             
 1218  
                 
 1219  0
                     if (buffer.isAtEnd()) {
 1220  0
                         return null;
 1221  
                     }
 1222  
                 }
 1223  
                       
 1224  0
                 assert !buffer.isAtEnd();
 1225  0
                 DocumentWordProbability result = new DocumentWordProbability();
 1226  
                 
 1227  0
                 result.word = buffer.getWord();
 1228  0
                 result.document = buffer.getDocument();
 1229  0
                 result.probability = buffer.getProbability();
 1230  
                 
 1231  0
                 buffer.incrementTuple();
 1232  0
                 buffer.autoIncrementWord();
 1233  
                 
 1234  0
                 return result;
 1235  
             }           
 1236  
             
 1237  
             public final void fill() throws IOException {
 1238  
                 try {   
 1239  0
                     buffer.reset();
 1240  
                     
 1241  0
                     if (tupleCount != 0) {
 1242  
                                                       
 1243  0
                         if(updateWordCount - tupleCount > 0) {
 1244  0
                             buffer.words.add(last.word);
 1245  0
                             buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
 1246  
                         }
 1247  0
                         bufferStartCount = tupleCount;
 1248  
                     }
 1249  
                     
 1250  0
                     while (!buffer.isFull()) {
 1251  0
                         updateWord();
 1252  0
                         buffer.processTuple(input.readString(), input.readDouble());
 1253  0
                         tupleCount++;
 1254  
                     }
 1255  0
                 } catch(EOFException e) {}
 1256  0
             }
 1257  
 
 1258  
             public final void updateWord() throws IOException {
 1259  0
                 if (updateWordCount > tupleCount)
 1260  0
                     return;
 1261  
                      
 1262  0
                 last.word = input.readBytes();
 1263  0
                 updateWordCount = tupleCount + input.readInt();
 1264  
                                       
 1265  0
                 buffer.processWord(last.word);
 1266  0
             }
 1267  
 
 1268  
             public void run() throws IOException {
 1269  
                 while (true) {
 1270  0
                     fill();
 1271  
                     
 1272  0
                     if (buffer.isAtEnd())
 1273  0
                         break;
 1274  
                     
 1275  0
                     buffer.copyUntil(null, processor);
 1276  
                 }      
 1277  0
                 processor.close();
 1278  0
             }
 1279  
             
 1280  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1281  0
                 if (processor instanceof ShreddedProcessor) {
 1282  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1283  0
                 } else if (processor instanceof DocumentWordProbability.Processor) {
 1284  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
 1285  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1286  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
 1287  
                 } else {
 1288  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1289  
                 }
 1290  0
             }                                
 1291  
             
 1292  
             public Class<DocumentWordProbability> getOutputClass() {
 1293  0
                 return DocumentWordProbability.class;
 1294  
             }                
 1295  
         }
 1296  
         
 1297  
         public static class DuplicateEliminator implements ShreddedProcessor {
 1298  
             public ShreddedProcessor processor;
 1299  0
             DocumentWordProbability last = new DocumentWordProbability();
 1300  0
             boolean wordProcess = true;
 1301  
                                            
 1302  0
             public DuplicateEliminator() {}
 1303  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 1304  0
                 this.processor = processor;
 1305  0
             }
 1306  
             
 1307  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 1308  0
                 this.processor = processor;
 1309  0
             }
 1310  
 
 1311  
             public void processWord(byte[] word) throws IOException {  
 1312  0
                 if (wordProcess || Utility.compare(word, last.word) != 0) {
 1313  0
                     last.word = word;
 1314  0
                     processor.processWord(word);
 1315  0
                     wordProcess = false;
 1316  
                 }
 1317  0
             }  
 1318  
             
 1319  
             public void resetWord() {
 1320  0
                  wordProcess = true;
 1321  0
             }                                                
 1322  
                                
 1323  
             public void processTuple(String document, double probability) throws IOException {
 1324  0
                 processor.processTuple(document, probability);
 1325  0
             } 
 1326  
             
 1327  
             public void close() throws IOException {
 1328  0
                 processor.close();
 1329  0
             }                    
 1330  
         }
 1331  
         public static class TupleUnshredder implements ShreddedProcessor {
 1332  0
             DocumentWordProbability last = new DocumentWordProbability();
 1333  
             public org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor;                               
 1334  
             
 1335  0
             public TupleUnshredder(DocumentWordProbability.Processor processor) {
 1336  0
                 this.processor = processor;
 1337  0
             }         
 1338  
             
 1339  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor) {
 1340  0
                 this.processor = processor;
 1341  0
             }
 1342  
             
 1343  
             public DocumentWordProbability clone(DocumentWordProbability object) {
 1344  0
                 DocumentWordProbability result = new DocumentWordProbability();
 1345  0
                 if (object == null) return result;
 1346  0
                 result.document = object.document; 
 1347  0
                 result.word = object.word; 
 1348  0
                 result.probability = object.probability; 
 1349  0
                 return result;
 1350  
             }                 
 1351  
             
 1352  
             public void processWord(byte[] word) throws IOException {
 1353  0
                 last.word = word;
 1354  0
             }   
 1355  
                 
 1356  
             
 1357  
             public void processTuple(String document, double probability) throws IOException {
 1358  0
                 last.document = document;
 1359  0
                 last.probability = probability;
 1360  0
                 processor.process(clone(last));
 1361  0
             }               
 1362  
             
 1363  
             public void close() throws IOException {
 1364  0
                 processor.close();
 1365  0
             }
 1366  
         }     
 1367  0
         public static class TupleShredder implements Processor {
 1368  0
             DocumentWordProbability last = new DocumentWordProbability();
 1369  
             public ShreddedProcessor processor;
 1370  
             
 1371  0
             public TupleShredder(ShreddedProcessor processor) {
 1372  0
                 this.processor = processor;
 1373  0
             }                              
 1374  
             
 1375  
             public DocumentWordProbability clone(DocumentWordProbability object) {
 1376  0
                 DocumentWordProbability result = new DocumentWordProbability();
 1377  0
                 if (object == null) return result;
 1378  0
                 result.document = object.document; 
 1379  0
                 result.word = object.word; 
 1380  0
                 result.probability = object.probability; 
 1381  0
                 return result;
 1382  
             }                 
 1383  
             
 1384  
             public void process(DocumentWordProbability object) throws IOException {                                                                                                                                                   
 1385  0
                 boolean processAll = false;
 1386  0
                 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
 1387  0
                 processor.processTuple(object.document, object.probability);                                         
 1388  0
             }
 1389  
                           
 1390  
             public Class<DocumentWordProbability> getInputClass() {
 1391  0
                 return DocumentWordProbability.class;
 1392  
             }
 1393  
             
 1394  
             public void close() throws IOException {
 1395  0
                 processor.close();
 1396  0
             }                     
 1397  
         }
 1398  
     } 
 1399  0
     public static class DocumentOrder implements Order<DocumentWordProbability> {
 1400  
         public int hash(DocumentWordProbability object) {
 1401  0
             int h = 0;
 1402  0
             h += Utility.hash(object.document);
 1403  0
             return h;
 1404  
         } 
 1405  
         public Comparator<DocumentWordProbability> greaterThan() {
 1406  0
             return new Comparator<DocumentWordProbability>() {
 1407  0
                 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
 1408  0
                     int result = 0;
 1409  
                     do {
 1410  0
                         result = + Utility.compare(one.document, two.document);
 1411  0
                         if(result != 0) break;
 1412  
                     } while (false);
 1413  0
                     return -result;
 1414  
                 }
 1415  
             };
 1416  
         }     
 1417  
         public Comparator<DocumentWordProbability> lessThan() {
 1418  0
             return new Comparator<DocumentWordProbability>() {
 1419  0
                 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
 1420  0
                     int result = 0;
 1421  
                     do {
 1422  0
                         result = + Utility.compare(one.document, two.document);
 1423  0
                         if(result != 0) break;
 1424  
                     } while (false);
 1425  0
                     return result;
 1426  
                 }
 1427  
             };
 1428  
         }     
 1429  
         public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input) {
 1430  0
             return new ShreddedReader(_input);
 1431  
         }    
 1432  
 
 1433  
         public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input, int bufferSize) {
 1434  0
             return new ShreddedReader(_input, bufferSize);
 1435  
         }    
 1436  
         public OrderedWriter<DocumentWordProbability> orderedWriter(ArrayOutput _output) {
 1437  0
             ShreddedWriter w = new ShreddedWriter(_output);
 1438  0
             return new OrderedWriterClass(w); 
 1439  
         }                                    
 1440  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentWordProbability > {
 1441  0
             DocumentWordProbability last = null;
 1442  0
             ShreddedWriter shreddedWriter = null; 
 1443  
             
 1444  0
             public OrderedWriterClass(ShreddedWriter s) {
 1445  0
                 this.shreddedWriter = s;
 1446  0
             }
 1447  
             
 1448  
             public void process(DocumentWordProbability object) throws IOException {
 1449  0
                boolean processAll = false;
 1450  0
                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
 1451  0
                shreddedWriter.processTuple(object.word, object.probability);
 1452  0
                last = object;
 1453  0
             }           
 1454  
                  
 1455  
             public void close() throws IOException {
 1456  0
                 shreddedWriter.close();
 1457  0
             }
 1458  
             
 1459  
             public Class<DocumentWordProbability> getInputClass() {
 1460  0
                 return DocumentWordProbability.class;
 1461  
             }
 1462  
         } 
 1463  
         public ReaderSource<DocumentWordProbability> orderedCombiner(Collection<TypeReader<DocumentWordProbability>> readers, boolean closeOnExit) {
 1464  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 1465  
             
 1466  0
             for (TypeReader<DocumentWordProbability> reader : readers) {
 1467  0
                 shreddedReaders.add((ShreddedReader)reader);
 1468  
             }
 1469  
             
 1470  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 1471  
         }                  
 1472  
         public DocumentWordProbability clone(DocumentWordProbability object) {
 1473  0
             DocumentWordProbability result = new DocumentWordProbability();
 1474  0
             if (object == null) return result;
 1475  0
             result.document = object.document; 
 1476  0
             result.word = object.word; 
 1477  0
             result.probability = object.probability; 
 1478  0
             return result;
 1479  
         }                 
 1480  
         public Class<DocumentWordProbability> getOrderedClass() {
 1481  0
             return DocumentWordProbability.class;
 1482  
         }                           
 1483  
         public String[] getOrderSpec() {
 1484  0
             return new String[] {"+document"};
 1485  
         }
 1486  
 
 1487  
         public static String getSpecString() {
 1488  0
             return "+document";
 1489  
         }
 1490  
                            
 1491  
         public interface ShreddedProcessor extends Step {
 1492  
             public void processDocument(String document) throws IOException;
 1493  
             public void processTuple(byte[] word, double probability) throws IOException;
 1494  
             public void close() throws IOException;
 1495  
         }    
 1496  
         public interface ShreddedSource extends Step {
 1497  
         }                                              
 1498  
         
 1499  0
         public static class ShreddedWriter implements ShreddedProcessor {
 1500  
             ArrayOutput output;
 1501  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 1502  
             String lastDocument;
 1503  0
             boolean lastFlush = false;
 1504  
             
 1505  0
             public ShreddedWriter(ArrayOutput output) {
 1506  0
                 this.output = output;
 1507  0
             }                        
 1508  
             
 1509  
             public void close() throws IOException {
 1510  0
                 flush();
 1511  0
             }
 1512  
             
 1513  
             public void processDocument(String document) {
 1514  0
                 lastDocument = document;
 1515  0
                 buffer.processDocument(document);
 1516  0
             }
 1517  
             public final void processTuple(byte[] word, double probability) throws IOException {
 1518  0
                 if (lastFlush) {
 1519  0
                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
 1520  0
                     lastFlush = false;
 1521  
                 }
 1522  0
                 buffer.processTuple(word, probability);
 1523  0
                 if (buffer.isFull())
 1524  0
                     flush();
 1525  0
             }
 1526  
             public final void flushTuples(int pauseIndex) throws IOException {
 1527  
                 
 1528  0
                 while (buffer.getReadIndex() < pauseIndex) {
 1529  
                            
 1530  0
                     output.writeBytes(buffer.getWord());
 1531  0
                     output.writeDouble(buffer.getProbability());
 1532  0
                     buffer.incrementTuple();
 1533  
                 }
 1534  0
             }  
 1535  
             public final void flushDocument(int pauseIndex) throws IOException {
 1536  0
                 while (buffer.getReadIndex() < pauseIndex) {
 1537  0
                     int nextPause = buffer.getDocumentEndIndex();
 1538  0
                     int count = nextPause - buffer.getReadIndex();
 1539  
                     
 1540  0
                     output.writeString(buffer.getDocument());
 1541  0
                     output.writeInt(count);
 1542  0
                     buffer.incrementDocument();
 1543  
                       
 1544  0
                     flushTuples(nextPause);
 1545  0
                     assert nextPause == buffer.getReadIndex();
 1546  0
                 }
 1547  0
             }
 1548  
             public void flush() throws IOException { 
 1549  0
                 flushDocument(buffer.getWriteIndex());
 1550  0
                 buffer.reset(); 
 1551  0
                 lastFlush = true;
 1552  0
             }                           
 1553  
         }
 1554  0
         public static class ShreddedBuffer {
 1555  0
             ArrayList<String> documents = new ArrayList();
 1556  0
             ArrayList<Integer> documentTupleIdx = new ArrayList();
 1557  0
             int documentReadIdx = 0;
 1558  
                             
 1559  
             byte[][] words;
 1560  
             double[] probabilitys;
 1561  0
             int writeTupleIndex = 0;
 1562  0
             int readTupleIndex = 0;
 1563  
             int batchSize;
 1564  
 
 1565  0
             public ShreddedBuffer(int batchSize) {
 1566  0
                 this.batchSize = batchSize;
 1567  
 
 1568  0
                 words = new byte[batchSize][];
 1569  0
                 probabilitys = new double[batchSize];
 1570  0
             }                              
 1571  
 
 1572  
             public ShreddedBuffer() {    
 1573  0
                 this(10000);
 1574  0
             }                                                                                                                    
 1575  
             
 1576  
             public void processDocument(String document) {
 1577  0
                 documents.add(document);
 1578  0
                 documentTupleIdx.add(writeTupleIndex);
 1579  0
             }                                      
 1580  
             public void processTuple(byte[] word, double probability) {
 1581  0
                 assert documents.size() > 0;
 1582  0
                 words[writeTupleIndex] = word;
 1583  0
                 probabilitys[writeTupleIndex] = probability;
 1584  0
                 writeTupleIndex++;
 1585  0
             }
 1586  
             public void resetData() {
 1587  0
                 documents.clear();
 1588  0
                 documentTupleIdx.clear();
 1589  0
                 writeTupleIndex = 0;
 1590  0
             }                  
 1591  
                                  
 1592  
             public void resetRead() {
 1593  0
                 readTupleIndex = 0;
 1594  0
                 documentReadIdx = 0;
 1595  0
             } 
 1596  
 
 1597  
             public void reset() {
 1598  0
                 resetData();
 1599  0
                 resetRead();
 1600  0
             } 
 1601  
             public boolean isFull() {
 1602  0
                 return writeTupleIndex >= batchSize;
 1603  
             }
 1604  
 
 1605  
             public boolean isEmpty() {
 1606  0
                 return writeTupleIndex == 0;
 1607  
             }                          
 1608  
 
 1609  
             public boolean isAtEnd() {
 1610  0
                 return readTupleIndex >= writeTupleIndex;
 1611  
             }           
 1612  
             public void incrementDocument() {
 1613  0
                 documentReadIdx++;  
 1614  0
             }                                                                                              
 1615  
 
 1616  
             public void autoIncrementDocument() {
 1617  0
                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
 1618  0
                     documentReadIdx++;
 1619  0
             }                 
 1620  
             public void incrementTuple() {
 1621  0
                 readTupleIndex++;
 1622  0
             }                    
 1623  
             public int getDocumentEndIndex() {
 1624  0
                 if ((documentReadIdx+1) >= documentTupleIdx.size())
 1625  0
                     return writeTupleIndex;
 1626  0
                 return documentTupleIdx.get(documentReadIdx+1);
 1627  
             }
 1628  
             public int getReadIndex() {
 1629  0
                 return readTupleIndex;
 1630  
             }   
 1631  
 
 1632  
             public int getWriteIndex() {
 1633  0
                 return writeTupleIndex;
 1634  
             } 
 1635  
             public String getDocument() {
 1636  0
                 assert readTupleIndex < writeTupleIndex;
 1637  0
                 assert documentReadIdx < documents.size();
 1638  
                 
 1639  0
                 return documents.get(documentReadIdx);
 1640  
             }
 1641  
             public byte[] getWord() {
 1642  0
                 assert readTupleIndex < writeTupleIndex;
 1643  0
                 return words[readTupleIndex];
 1644  
             }                                         
 1645  
             public double getProbability() {
 1646  0
                 assert readTupleIndex < writeTupleIndex;
 1647  0
                 return probabilitys[readTupleIndex];
 1648  
             }                                         
 1649  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 1650  0
                 while (getReadIndex() < endIndex) {
 1651  0
                    output.processTuple(getWord(), getProbability());
 1652  0
                    incrementTuple();
 1653  
                 }
 1654  0
             }                                                                           
 1655  
             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
 1656  0
                 while (getReadIndex() < endIndex) {
 1657  0
                     output.processDocument(getDocument());
 1658  0
                     assert getDocumentEndIndex() <= endIndex;
 1659  0
                     copyTuples(getDocumentEndIndex(), output);
 1660  0
                     incrementDocument();
 1661  
                 }
 1662  0
             }  
 1663  
             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1664  0
                 while (!isAtEnd()) {
 1665  0
                     if (other != null) {   
 1666  0
                         assert !other.isAtEnd();
 1667  0
                         int c = + Utility.compare(getDocument(), other.getDocument());
 1668  
                     
 1669  0
                         if (c > 0) {
 1670  0
                             break;   
 1671  
                         }
 1672  
                         
 1673  0
                         output.processDocument(getDocument());
 1674  
                                       
 1675  0
                         copyTuples(getDocumentEndIndex(), output);
 1676  0
                     } else {
 1677  0
                         output.processDocument(getDocument());
 1678  0
                         copyTuples(getDocumentEndIndex(), output);
 1679  
                     }
 1680  0
                     incrementDocument();  
 1681  
                     
 1682  
                
 1683  
                 }
 1684  0
             }
 1685  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1686  0
                 copyUntilDocument(other, output);
 1687  0
             }
 1688  
             
 1689  
         }                         
 1690  0
         public static class ShreddedCombiner implements ReaderSource<DocumentWordProbability>, ShreddedSource {   
 1691  
             public ShreddedProcessor processor;
 1692  
             Collection<ShreddedReader> readers;       
 1693  0
             boolean closeOnExit = false;
 1694  0
             boolean uninitialized = true;
 1695  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 1696  
             
 1697  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 1698  0
                 this.readers = readers;                                                       
 1699  0
                 this.closeOnExit = closeOnExit;
 1700  0
             }
 1701  
                                   
 1702  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1703  0
                 if (processor instanceof ShreddedProcessor) {
 1704  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1705  0
                 } else if (processor instanceof DocumentWordProbability.Processor) {
 1706  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
 1707  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1708  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
 1709  
                 } else {
 1710  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1711  
                 }
 1712  0
             }                                
 1713  
             
 1714  
             public Class<DocumentWordProbability> getOutputClass() {
 1715  0
                 return DocumentWordProbability.class;
 1716  
             }
 1717  
             
 1718  
             public void initialize() throws IOException {
 1719  0
                 for (ShreddedReader reader : readers) {
 1720  0
                     reader.fill();                                        
 1721  
                     
 1722  0
                     if (!reader.getBuffer().isAtEnd())
 1723  0
                         queue.add(reader);
 1724  
                 }   
 1725  
 
 1726  0
                 uninitialized = false;
 1727  0
             }
 1728  
 
 1729  
             public void run() throws IOException {
 1730  0
                 initialize();
 1731  
                
 1732  0
                 while (queue.size() > 0) {
 1733  0
                     ShreddedReader top = queue.poll();
 1734  0
                     ShreddedReader next = null;
 1735  0
                     ShreddedBuffer nextBuffer = null; 
 1736  
                     
 1737  0
                     assert !top.getBuffer().isAtEnd();
 1738  
                                                   
 1739  0
                     if (queue.size() > 0) {
 1740  0
                         next = queue.peek();
 1741  0
                         nextBuffer = next.getBuffer();
 1742  0
                         assert !nextBuffer.isAtEnd();
 1743  
                     }
 1744  
                     
 1745  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 1746  0
                     if (top.getBuffer().isAtEnd())
 1747  0
                         top.fill();                 
 1748  
                         
 1749  0
                     if (!top.getBuffer().isAtEnd())
 1750  0
                         queue.add(top);
 1751  0
                 }              
 1752  
                 
 1753  0
                 if (closeOnExit)
 1754  0
                     processor.close();
 1755  0
             }
 1756  
 
 1757  
             public DocumentWordProbability read() throws IOException {
 1758  0
                 if (uninitialized)
 1759  0
                     initialize();
 1760  
 
 1761  0
                 DocumentWordProbability result = null;
 1762  
 
 1763  0
                 while (queue.size() > 0) {
 1764  0
                     ShreddedReader top = queue.poll();
 1765  0
                     result = top.read();
 1766  
 
 1767  0
                     if (result != null) {
 1768  0
                         if (top.getBuffer().isAtEnd())
 1769  0
                             top.fill();
 1770  
 
 1771  0
                         queue.offer(top);
 1772  0
                         break;
 1773  
                     } 
 1774  0
                 }
 1775  
 
 1776  0
                 return result;
 1777  
             }
 1778  
         } 
 1779  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordProbability>, ShreddedSource {      
 1780  
             public ShreddedProcessor processor;
 1781  
             ShreddedBuffer buffer;
 1782  0
             DocumentWordProbability last = new DocumentWordProbability();         
 1783  0
             long updateDocumentCount = -1;
 1784  0
             long tupleCount = 0;
 1785  0
             long bufferStartCount = 0;  
 1786  
             ArrayInput input;
 1787  
             
 1788  0
             public ShreddedReader(ArrayInput input) {
 1789  0
                 this.input = input; 
 1790  0
                 this.buffer = new ShreddedBuffer();
 1791  0
             }                               
 1792  
             
 1793  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 1794  0
                 this.input = input;
 1795  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 1796  0
             }
 1797  
                  
 1798  
             public final int compareTo(ShreddedReader other) {
 1799  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 1800  
                 
 1801  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 1802  0
                     return 0;                 
 1803  0
                 } else if (buffer.isAtEnd()) {
 1804  0
                     return -1;
 1805  0
                 } else if (otherBuffer.isAtEnd()) {
 1806  0
                     return 1;
 1807  
                 }
 1808  
                                    
 1809  0
                 int result = 0;
 1810  
                 do {
 1811  0
                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
 1812  0
                     if(result != 0) break;
 1813  
                 } while (false);                                             
 1814  
                 
 1815  0
                 return result;
 1816  
             }
 1817  
             
 1818  
             public final ShreddedBuffer getBuffer() {
 1819  0
                 return buffer;
 1820  
             }                
 1821  
             
 1822  
             public final DocumentWordProbability read() throws IOException {
 1823  0
                 if (buffer.isAtEnd()) {
 1824  0
                     fill();             
 1825  
                 
 1826  0
                     if (buffer.isAtEnd()) {
 1827  0
                         return null;
 1828  
                     }
 1829  
                 }
 1830  
                       
 1831  0
                 assert !buffer.isAtEnd();
 1832  0
                 DocumentWordProbability result = new DocumentWordProbability();
 1833  
                 
 1834  0
                 result.document = buffer.getDocument();
 1835  0
                 result.word = buffer.getWord();
 1836  0
                 result.probability = buffer.getProbability();
 1837  
                 
 1838  0
                 buffer.incrementTuple();
 1839  0
                 buffer.autoIncrementDocument();
 1840  
                 
 1841  0
                 return result;
 1842  
             }           
 1843  
             
 1844  
             public final void fill() throws IOException {
 1845  
                 try {   
 1846  0
                     buffer.reset();
 1847  
                     
 1848  0
                     if (tupleCount != 0) {
 1849  
                                                       
 1850  0
                         if(updateDocumentCount - tupleCount > 0) {
 1851  0
                             buffer.documents.add(last.document);
 1852  0
                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
 1853  
                         }
 1854  0
                         bufferStartCount = tupleCount;
 1855  
                     }
 1856  
                     
 1857  0
                     while (!buffer.isFull()) {
 1858  0
                         updateDocument();
 1859  0
                         buffer.processTuple(input.readBytes(), input.readDouble());
 1860  0
                         tupleCount++;
 1861  
                     }
 1862  0
                 } catch(EOFException e) {}
 1863  0
             }
 1864  
 
 1865  
             public final void updateDocument() throws IOException {
 1866  0
                 if (updateDocumentCount > tupleCount)
 1867  0
                     return;
 1868  
                      
 1869  0
                 last.document = input.readString();
 1870  0
                 updateDocumentCount = tupleCount + input.readInt();
 1871  
                                       
 1872  0
                 buffer.processDocument(last.document);
 1873  0
             }
 1874  
 
 1875  
             public void run() throws IOException {
 1876  
                 while (true) {
 1877  0
                     fill();
 1878  
                     
 1879  0
                     if (buffer.isAtEnd())
 1880  0
                         break;
 1881  
                     
 1882  0
                     buffer.copyUntil(null, processor);
 1883  
                 }      
 1884  0
                 processor.close();
 1885  0
             }
 1886  
             
 1887  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1888  0
                 if (processor instanceof ShreddedProcessor) {
 1889  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1890  0
                 } else if (processor instanceof DocumentWordProbability.Processor) {
 1891  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
 1892  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1893  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
 1894  
                 } else {
 1895  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1896  
                 }
 1897  0
             }                                
 1898  
             
 1899  
             public Class<DocumentWordProbability> getOutputClass() {
 1900  0
                 return DocumentWordProbability.class;
 1901  
             }                
 1902  
         }
 1903  
         
 1904  
         public static class DuplicateEliminator implements ShreddedProcessor {
 1905  
             public ShreddedProcessor processor;
 1906  0
             DocumentWordProbability last = new DocumentWordProbability();
 1907  0
             boolean documentProcess = true;
 1908  
                                            
 1909  0
             public DuplicateEliminator() {}
 1910  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 1911  0
                 this.processor = processor;
 1912  0
             }
 1913  
             
 1914  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 1915  0
                 this.processor = processor;
 1916  0
             }
 1917  
 
 1918  
             public void processDocument(String document) throws IOException {  
 1919  0
                 if (documentProcess || Utility.compare(document, last.document) != 0) {
 1920  0
                     last.document = document;
 1921  0
                     processor.processDocument(document);
 1922  0
                     documentProcess = false;
 1923  
                 }
 1924  0
             }  
 1925  
             
 1926  
             public void resetDocument() {
 1927  0
                  documentProcess = true;
 1928  0
             }                                                
 1929  
                                
 1930  
             public void processTuple(byte[] word, double probability) throws IOException {
 1931  0
                 processor.processTuple(word, probability);
 1932  0
             } 
 1933  
             
 1934  
             public void close() throws IOException {
 1935  0
                 processor.close();
 1936  0
             }                    
 1937  
         }
 1938  
         public static class TupleUnshredder implements ShreddedProcessor {
 1939  0
             DocumentWordProbability last = new DocumentWordProbability();
 1940  
             public org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor;                               
 1941  
             
 1942  0
             public TupleUnshredder(DocumentWordProbability.Processor processor) {
 1943  0
                 this.processor = processor;
 1944  0
             }         
 1945  
             
 1946  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor) {
 1947  0
                 this.processor = processor;
 1948  0
             }
 1949  
             
 1950  
             public DocumentWordProbability clone(DocumentWordProbability object) {
 1951  0
                 DocumentWordProbability result = new DocumentWordProbability();
 1952  0
                 if (object == null) return result;
 1953  0
                 result.document = object.document; 
 1954  0
                 result.word = object.word; 
 1955  0
                 result.probability = object.probability; 
 1956  0
                 return result;
 1957  
             }                 
 1958  
             
 1959  
             public void processDocument(String document) throws IOException {
 1960  0
                 last.document = document;
 1961  0
             }   
 1962  
                 
 1963  
             
 1964  
             public void processTuple(byte[] word, double probability) throws IOException {
 1965  0
                 last.word = word;
 1966  0
                 last.probability = probability;
 1967  0
                 processor.process(clone(last));
 1968  0
             }               
 1969  
             
 1970  
             public void close() throws IOException {
 1971  0
                 processor.close();
 1972  0
             }
 1973  
         }     
 1974  0
         public static class TupleShredder implements Processor {
 1975  0
             DocumentWordProbability last = new DocumentWordProbability();
 1976  
             public ShreddedProcessor processor;
 1977  
             
 1978  0
             public TupleShredder(ShreddedProcessor processor) {
 1979  0
                 this.processor = processor;
 1980  0
             }                              
 1981  
             
 1982  
             public DocumentWordProbability clone(DocumentWordProbability object) {
 1983  0
                 DocumentWordProbability result = new DocumentWordProbability();
 1984  0
                 if (object == null) return result;
 1985  0
                 result.document = object.document; 
 1986  0
                 result.word = object.word; 
 1987  0
                 result.probability = object.probability; 
 1988  0
                 return result;
 1989  
             }                 
 1990  
             
 1991  
             public void process(DocumentWordProbability object) throws IOException {                                                                                                                                                   
 1992  0
                 boolean processAll = false;
 1993  0
                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
 1994  0
                 processor.processTuple(object.word, object.probability);                                         
 1995  0
             }
 1996  
                           
 1997  
             public Class<DocumentWordProbability> getInputClass() {
 1998  0
                 return DocumentWordProbability.class;
 1999  
             }
 2000  
             
 2001  
             public void close() throws IOException {
 2002  0
                 processor.close();
 2003  0
             }                     
 2004  
         }
 2005  
     } 
 2006  
 }