Coverage Report - org.galagosearch.core.types.DocumentWordPosition
 
Classes in this File Line Coverage Branch Coverage Complexity
DocumentWordPosition
29%
4/14
50%
2/4
0
DocumentWordPosition$DocumentOrder
0%
0/24
0%
0/4
0
DocumentWordPosition$DocumentOrder$1
0%
0/5
0%
0/2
0
DocumentWordPosition$DocumentOrder$2
0%
0/5
0%
0/2
0
DocumentWordPosition$DocumentOrder$DuplicateEliminator
0%
0/19
0%
0/4
0
DocumentWordPosition$DocumentOrder$OrderedWriterClass
0%
0/14
0%
0/6
0
DocumentWordPosition$DocumentOrder$ShreddedBuffer
0%
0/78
0%
0/50
0
DocumentWordPosition$DocumentOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentWordPosition$DocumentOrder$ShreddedProcessor
N/A
N/A
0
DocumentWordPosition$DocumentOrder$ShreddedReader
0%
0/70
0%
0/34
0
DocumentWordPosition$DocumentOrder$ShreddedSource
N/A
N/A
0
DocumentWordPosition$DocumentOrder$ShreddedWriter
0%
0/37
0%
0/14
0
DocumentWordPosition$DocumentOrder$TupleShredder
0%
0/18
0%
0/8
0
DocumentWordPosition$DocumentOrder$TupleUnshredder
0%
0/21
0%
0/2
0
DocumentWordPosition$DocumentWordPositionOrder
15%
4/26
0%
0/4
0
DocumentWordPosition$DocumentWordPositionOrder$1
0%
0/9
0%
0/6
0
DocumentWordPosition$DocumentWordPositionOrder$2
0%
0/9
0%
0/6
0
DocumentWordPosition$DocumentWordPositionOrder$DuplicateEliminator
0%
0/39
0%
0/12
0
DocumentWordPosition$DocumentWordPositionOrder$OrderedWriterClass
0%
0/16
0%
0/18
0
DocumentWordPosition$DocumentWordPositionOrder$ShreddedBuffer
0%
0/164
0%
0/122
0
DocumentWordPosition$DocumentWordPositionOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentWordPosition$DocumentWordPositionOrder$ShreddedProcessor
N/A
N/A
0
DocumentWordPosition$DocumentWordPositionOrder$ShreddedReader
0%
0/98
0%
0/46
0
DocumentWordPosition$DocumentWordPositionOrder$ShreddedSource
N/A
N/A
0
DocumentWordPosition$DocumentWordPositionOrder$ShreddedWriter
0%
0/63
0%
0/30
0
DocumentWordPosition$DocumentWordPositionOrder$TupleShredder
0%
0/20
0%
0/20
0
DocumentWordPosition$DocumentWordPositionOrder$TupleUnshredder
0%
0/23
0%
0/2
0
DocumentWordPosition$Processor
N/A
N/A
0
DocumentWordPosition$Source
N/A
N/A
0
 
 1  
 // This file was automatically generated with the command: 
 2  
 //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
 3  
 package org.galagosearch.core.types;
 4  
 
 5  
 import org.galagosearch.tupleflow.Utility;
 6  
 import org.galagosearch.tupleflow.ArrayInput;
 7  
 import org.galagosearch.tupleflow.ArrayOutput;
 8  
 import org.galagosearch.tupleflow.Order;   
 9  
 import org.galagosearch.tupleflow.OrderedWriter;
 10  
 import org.galagosearch.tupleflow.Type; 
 11  
 import org.galagosearch.tupleflow.TypeReader;
 12  
 import org.galagosearch.tupleflow.Step; 
 13  
 import org.galagosearch.tupleflow.IncompatibleProcessorException;
 14  
 import org.galagosearch.tupleflow.ReaderSource;
 15  
 import java.io.IOException;             
 16  
 import java.io.EOFException;
 17  
 import java.io.UnsupportedEncodingException;
 18  
 import java.util.ArrayList;
 19  
 import java.util.Arrays;   
 20  
 import java.util.Comparator;
 21  
 import java.util.PriorityQueue;
 22  
 import java.util.Collection;
 23  
 
 24  
 public class DocumentWordPosition implements Type<DocumentWordPosition> {
 25  
     public String document;
 26  
     public byte[] word;
 27  
     public int position; 
 28  
     
 29  28
     public DocumentWordPosition() {}
 30  0
     public DocumentWordPosition(String document, byte[] word, int position) {
 31  0
         this.document = document;
 32  0
         this.word = word;
 33  0
         this.position = position;
 34  0
     }  
 35  
     
 36  
     public String toString() {
 37  
         try {
 38  0
             return String.format("%s,%s,%d",
 39  
                                    document, new String(word, "UTF-8"), position);
 40  0
         } catch(UnsupportedEncodingException e) {
 41  0
             throw new RuntimeException("Couldn't convert string to UTF-8.");
 42  
         }
 43  
     } 
 44  
 
 45  
     public Order<DocumentWordPosition> getOrder(String... spec) {
 46  28
         if (Arrays.equals(spec, new String[] { "+document" })) {
 47  0
             return new DocumentOrder();
 48  
         }
 49  28
         if (Arrays.equals(spec, new String[] { "+document", "+word", "+position" })) {
 50  28
             return new DocumentWordPositionOrder();
 51  
         }
 52  0
         return null;
 53  
     } 
 54  
       
 55  
     public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentWordPosition> {
 56  
         public void process(DocumentWordPosition object) throws IOException;
 57  
         public void close() throws IOException;
 58  
     }                        
 59  
     public interface Source extends Step {
 60  
     }
 61  0
     public static class DocumentOrder implements Order<DocumentWordPosition> {
 62  
         public int hash(DocumentWordPosition object) {
 63  0
             int h = 0;
 64  0
             h += Utility.hash(object.document);
 65  0
             return h;
 66  
         } 
 67  
         public Comparator<DocumentWordPosition> greaterThan() {
 68  0
             return new Comparator<DocumentWordPosition>() {
 69  0
                 public int compare(DocumentWordPosition one, DocumentWordPosition two) {
 70  0
                     int result = 0;
 71  
                     do {
 72  0
                         result = + Utility.compare(one.document, two.document);
 73  0
                         if(result != 0) break;
 74  
                     } while (false);
 75  0
                     return -result;
 76  
                 }
 77  
             };
 78  
         }     
 79  
         public Comparator<DocumentWordPosition> lessThan() {
 80  0
             return new Comparator<DocumentWordPosition>() {
 81  0
                 public int compare(DocumentWordPosition one, DocumentWordPosition two) {
 82  0
                     int result = 0;
 83  
                     do {
 84  0
                         result = + Utility.compare(one.document, two.document);
 85  0
                         if(result != 0) break;
 86  
                     } while (false);
 87  0
                     return result;
 88  
                 }
 89  
             };
 90  
         }     
 91  
         public TypeReader<DocumentWordPosition> orderedReader(ArrayInput _input) {
 92  0
             return new ShreddedReader(_input);
 93  
         }    
 94  
 
 95  
         public TypeReader<DocumentWordPosition> orderedReader(ArrayInput _input, int bufferSize) {
 96  0
             return new ShreddedReader(_input, bufferSize);
 97  
         }    
 98  
         public OrderedWriter<DocumentWordPosition> orderedWriter(ArrayOutput _output) {
 99  0
             ShreddedWriter w = new ShreddedWriter(_output);
 100  0
             return new OrderedWriterClass(w); 
 101  
         }                                    
 102  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentWordPosition > {
 103  0
             DocumentWordPosition last = null;
 104  0
             ShreddedWriter shreddedWriter = null; 
 105  
             
 106  0
             public OrderedWriterClass(ShreddedWriter s) {
 107  0
                 this.shreddedWriter = s;
 108  0
             }
 109  
             
 110  
             public void process(DocumentWordPosition object) throws IOException {
 111  0
                boolean processAll = false;
 112  0
                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
 113  0
                shreddedWriter.processTuple(object.word, object.position);
 114  0
                last = object;
 115  0
             }           
 116  
                  
 117  
             public void close() throws IOException {
 118  0
                 shreddedWriter.close();
 119  0
             }
 120  
             
 121  
             public Class<DocumentWordPosition> getInputClass() {
 122  0
                 return DocumentWordPosition.class;
 123  
             }
 124  
         } 
 125  
         public ReaderSource<DocumentWordPosition> orderedCombiner(Collection<TypeReader<DocumentWordPosition>> readers, boolean closeOnExit) {
 126  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 127  
             
 128  0
             for (TypeReader<DocumentWordPosition> reader : readers) {
 129  0
                 shreddedReaders.add((ShreddedReader)reader);
 130  
             }
 131  
             
 132  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 133  
         }                  
 134  
         public DocumentWordPosition clone(DocumentWordPosition object) {
 135  0
             DocumentWordPosition result = new DocumentWordPosition();
 136  0
             if (object == null) return result;
 137  0
             result.document = object.document; 
 138  0
             result.word = object.word; 
 139  0
             result.position = object.position; 
 140  0
             return result;
 141  
         }                 
 142  
         public Class<DocumentWordPosition> getOrderedClass() {
 143  0
             return DocumentWordPosition.class;
 144  
         }                           
 145  
         public String[] getOrderSpec() {
 146  0
             return new String[] {"+document"};
 147  
         }
 148  
 
 149  
         public static String getSpecString() {
 150  0
             return "+document";
 151  
         }
 152  
                            
 153  
         public interface ShreddedProcessor extends Step {
 154  
             public void processDocument(String document) throws IOException;
 155  
             public void processTuple(byte[] word, int position) throws IOException;
 156  
             public void close() throws IOException;
 157  
         }    
 158  
         public interface ShreddedSource extends Step {
 159  
         }                                              
 160  
         
 161  0
         public static class ShreddedWriter implements ShreddedProcessor {
 162  
             ArrayOutput output;
 163  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 164  
             String lastDocument;
 165  0
             boolean lastFlush = false;
 166  
             
 167  0
             public ShreddedWriter(ArrayOutput output) {
 168  0
                 this.output = output;
 169  0
             }                        
 170  
             
 171  
             public void close() throws IOException {
 172  0
                 flush();
 173  0
             }
 174  
             
 175  
             public void processDocument(String document) {
 176  0
                 lastDocument = document;
 177  0
                 buffer.processDocument(document);
 178  0
             }
 179  
             public final void processTuple(byte[] word, int position) throws IOException {
 180  0
                 if (lastFlush) {
 181  0
                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
 182  0
                     lastFlush = false;
 183  
                 }
 184  0
                 buffer.processTuple(word, position);
 185  0
                 if (buffer.isFull())
 186  0
                     flush();
 187  0
             }
 188  
             public final void flushTuples(int pauseIndex) throws IOException {
 189  
                 
 190  0
                 while (buffer.getReadIndex() < pauseIndex) {
 191  
                            
 192  0
                     output.writeBytes(buffer.getWord());
 193  0
                     output.writeInt(buffer.getPosition());
 194  0
                     buffer.incrementTuple();
 195  
                 }
 196  0
             }  
 197  
             public final void flushDocument(int pauseIndex) throws IOException {
 198  0
                 while (buffer.getReadIndex() < pauseIndex) {
 199  0
                     int nextPause = buffer.getDocumentEndIndex();
 200  0
                     int count = nextPause - buffer.getReadIndex();
 201  
                     
 202  0
                     output.writeString(buffer.getDocument());
 203  0
                     output.writeInt(count);
 204  0
                     buffer.incrementDocument();
 205  
                       
 206  0
                     flushTuples(nextPause);
 207  0
                     assert nextPause == buffer.getReadIndex();
 208  0
                 }
 209  0
             }
 210  
             public void flush() throws IOException { 
 211  0
                 flushDocument(buffer.getWriteIndex());
 212  0
                 buffer.reset(); 
 213  0
                 lastFlush = true;
 214  0
             }                           
 215  
         }
 216  0
         public static class ShreddedBuffer {
 217  0
             ArrayList<String> documents = new ArrayList();
 218  0
             ArrayList<Integer> documentTupleIdx = new ArrayList();
 219  0
             int documentReadIdx = 0;
 220  
                             
 221  
             byte[][] words;
 222  
             int[] positions;
 223  0
             int writeTupleIndex = 0;
 224  0
             int readTupleIndex = 0;
 225  
             int batchSize;
 226  
 
 227  0
             public ShreddedBuffer(int batchSize) {
 228  0
                 this.batchSize = batchSize;
 229  
 
 230  0
                 words = new byte[batchSize][];
 231  0
                 positions = new int[batchSize];
 232  0
             }                              
 233  
 
 234  
             public ShreddedBuffer() {    
 235  0
                 this(10000);
 236  0
             }                                                                                                                    
 237  
             
 238  
             public void processDocument(String document) {
 239  0
                 documents.add(document);
 240  0
                 documentTupleIdx.add(writeTupleIndex);
 241  0
             }                                      
 242  
             public void processTuple(byte[] word, int position) {
 243  0
                 assert documents.size() > 0;
 244  0
                 words[writeTupleIndex] = word;
 245  0
                 positions[writeTupleIndex] = position;
 246  0
                 writeTupleIndex++;
 247  0
             }
 248  
             public void resetData() {
 249  0
                 documents.clear();
 250  0
                 documentTupleIdx.clear();
 251  0
                 writeTupleIndex = 0;
 252  0
             }                  
 253  
                                  
 254  
             public void resetRead() {
 255  0
                 readTupleIndex = 0;
 256  0
                 documentReadIdx = 0;
 257  0
             } 
 258  
 
 259  
             public void reset() {
 260  0
                 resetData();
 261  0
                 resetRead();
 262  0
             } 
 263  
             public boolean isFull() {
 264  0
                 return writeTupleIndex >= batchSize;
 265  
             }
 266  
 
 267  
             public boolean isEmpty() {
 268  0
                 return writeTupleIndex == 0;
 269  
             }                          
 270  
 
 271  
             public boolean isAtEnd() {
 272  0
                 return readTupleIndex >= writeTupleIndex;
 273  
             }           
 274  
             public void incrementDocument() {
 275  0
                 documentReadIdx++;  
 276  0
             }                                                                                              
 277  
 
 278  
             public void autoIncrementDocument() {
 279  0
                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
 280  0
                     documentReadIdx++;
 281  0
             }                 
 282  
             public void incrementTuple() {
 283  0
                 readTupleIndex++;
 284  0
             }                    
 285  
             public int getDocumentEndIndex() {
 286  0
                 if ((documentReadIdx+1) >= documentTupleIdx.size())
 287  0
                     return writeTupleIndex;
 288  0
                 return documentTupleIdx.get(documentReadIdx+1);
 289  
             }
 290  
             public int getReadIndex() {
 291  0
                 return readTupleIndex;
 292  
             }   
 293  
 
 294  
             public int getWriteIndex() {
 295  0
                 return writeTupleIndex;
 296  
             } 
 297  
             public String getDocument() {
 298  0
                 assert readTupleIndex < writeTupleIndex;
 299  0
                 assert documentReadIdx < documents.size();
 300  
                 
 301  0
                 return documents.get(documentReadIdx);
 302  
             }
 303  
             public byte[] getWord() {
 304  0
                 assert readTupleIndex < writeTupleIndex;
 305  0
                 return words[readTupleIndex];
 306  
             }                                         
 307  
             public int getPosition() {
 308  0
                 assert readTupleIndex < writeTupleIndex;
 309  0
                 return positions[readTupleIndex];
 310  
             }                                         
 311  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 312  0
                 while (getReadIndex() < endIndex) {
 313  0
                    output.processTuple(getWord(), getPosition());
 314  0
                    incrementTuple();
 315  
                 }
 316  0
             }                                                                           
 317  
             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
 318  0
                 while (getReadIndex() < endIndex) {
 319  0
                     output.processDocument(getDocument());
 320  0
                     assert getDocumentEndIndex() <= endIndex;
 321  0
                     copyTuples(getDocumentEndIndex(), output);
 322  0
                     incrementDocument();
 323  
                 }
 324  0
             }  
 325  
             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 326  0
                 while (!isAtEnd()) {
 327  0
                     if (other != null) {   
 328  0
                         assert !other.isAtEnd();
 329  0
                         int c = + Utility.compare(getDocument(), other.getDocument());
 330  
                     
 331  0
                         if (c > 0) {
 332  0
                             break;   
 333  
                         }
 334  
                         
 335  0
                         output.processDocument(getDocument());
 336  
                                       
 337  0
                         copyTuples(getDocumentEndIndex(), output);
 338  0
                     } else {
 339  0
                         output.processDocument(getDocument());
 340  0
                         copyTuples(getDocumentEndIndex(), output);
 341  
                     }
 342  0
                     incrementDocument();  
 343  
                     
 344  
                
 345  
                 }
 346  0
             }
 347  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 348  0
                 copyUntilDocument(other, output);
 349  0
             }
 350  
             
 351  
         }                         
 352  0
         public static class ShreddedCombiner implements ReaderSource<DocumentWordPosition>, ShreddedSource {   
 353  
             public ShreddedProcessor processor;
 354  
             Collection<ShreddedReader> readers;       
 355  0
             boolean closeOnExit = false;
 356  0
             boolean uninitialized = true;
 357  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 358  
             
 359  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 360  0
                 this.readers = readers;                                                       
 361  0
                 this.closeOnExit = closeOnExit;
 362  0
             }
 363  
                                   
 364  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 365  0
                 if (processor instanceof ShreddedProcessor) {
 366  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 367  0
                 } else if (processor instanceof DocumentWordPosition.Processor) {
 368  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordPosition.Processor) processor));
 369  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 370  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordPosition>) processor));
 371  
                 } else {
 372  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 373  
                 }
 374  0
             }                                
 375  
             
 376  
             public Class<DocumentWordPosition> getOutputClass() {
 377  0
                 return DocumentWordPosition.class;
 378  
             }
 379  
             
 380  
             public void initialize() throws IOException {
 381  0
                 for (ShreddedReader reader : readers) {
 382  0
                     reader.fill();                                        
 383  
                     
 384  0
                     if (!reader.getBuffer().isAtEnd())
 385  0
                         queue.add(reader);
 386  
                 }   
 387  
 
 388  0
                 uninitialized = false;
 389  0
             }
 390  
 
 391  
             public void run() throws IOException {
 392  0
                 initialize();
 393  
                
 394  0
                 while (queue.size() > 0) {
 395  0
                     ShreddedReader top = queue.poll();
 396  0
                     ShreddedReader next = null;
 397  0
                     ShreddedBuffer nextBuffer = null; 
 398  
                     
 399  0
                     assert !top.getBuffer().isAtEnd();
 400  
                                                   
 401  0
                     if (queue.size() > 0) {
 402  0
                         next = queue.peek();
 403  0
                         nextBuffer = next.getBuffer();
 404  0
                         assert !nextBuffer.isAtEnd();
 405  
                     }
 406  
                     
 407  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 408  0
                     if (top.getBuffer().isAtEnd())
 409  0
                         top.fill();                 
 410  
                         
 411  0
                     if (!top.getBuffer().isAtEnd())
 412  0
                         queue.add(top);
 413  0
                 }              
 414  
                 
 415  0
                 if (closeOnExit)
 416  0
                     processor.close();
 417  0
             }
 418  
 
 419  
             public DocumentWordPosition read() throws IOException {
 420  0
                 if (uninitialized)
 421  0
                     initialize();
 422  
 
 423  0
                 DocumentWordPosition result = null;
 424  
 
 425  0
                 while (queue.size() > 0) {
 426  0
                     ShreddedReader top = queue.poll();
 427  0
                     result = top.read();
 428  
 
 429  0
                     if (result != null) {
 430  0
                         if (top.getBuffer().isAtEnd())
 431  0
                             top.fill();
 432  
 
 433  0
                         queue.offer(top);
 434  0
                         break;
 435  
                     } 
 436  0
                 }
 437  
 
 438  0
                 return result;
 439  
             }
 440  
         } 
 441  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordPosition>, ShreddedSource {      
 442  
             public ShreddedProcessor processor;
 443  
             ShreddedBuffer buffer;
 444  0
             DocumentWordPosition last = new DocumentWordPosition();         
 445  0
             long updateDocumentCount = -1;
 446  0
             long tupleCount = 0;
 447  0
             long bufferStartCount = 0;  
 448  
             ArrayInput input;
 449  
             
 450  0
             public ShreddedReader(ArrayInput input) {
 451  0
                 this.input = input; 
 452  0
                 this.buffer = new ShreddedBuffer();
 453  0
             }                               
 454  
             
 455  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 456  0
                 this.input = input;
 457  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 458  0
             }
 459  
                  
 460  
             public final int compareTo(ShreddedReader other) {
 461  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 462  
                 
 463  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 464  0
                     return 0;                 
 465  0
                 } else if (buffer.isAtEnd()) {
 466  0
                     return -1;
 467  0
                 } else if (otherBuffer.isAtEnd()) {
 468  0
                     return 1;
 469  
                 }
 470  
                                    
 471  0
                 int result = 0;
 472  
                 do {
 473  0
                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
 474  0
                     if(result != 0) break;
 475  
                 } while (false);                                             
 476  
                 
 477  0
                 return result;
 478  
             }
 479  
             
 480  
             public final ShreddedBuffer getBuffer() {
 481  0
                 return buffer;
 482  
             }                
 483  
             
 484  
             public final DocumentWordPosition read() throws IOException {
 485  0
                 if (buffer.isAtEnd()) {
 486  0
                     fill();             
 487  
                 
 488  0
                     if (buffer.isAtEnd()) {
 489  0
                         return null;
 490  
                     }
 491  
                 }
 492  
                       
 493  0
                 assert !buffer.isAtEnd();
 494  0
                 DocumentWordPosition result = new DocumentWordPosition();
 495  
                 
 496  0
                 result.document = buffer.getDocument();
 497  0
                 result.word = buffer.getWord();
 498  0
                 result.position = buffer.getPosition();
 499  
                 
 500  0
                 buffer.incrementTuple();
 501  0
                 buffer.autoIncrementDocument();
 502  
                 
 503  0
                 return result;
 504  
             }           
 505  
             
 506  
             public final void fill() throws IOException {
 507  
                 try {   
 508  0
                     buffer.reset();
 509  
                     
 510  0
                     if (tupleCount != 0) {
 511  
                                                       
 512  0
                         if(updateDocumentCount - tupleCount > 0) {
 513  0
                             buffer.documents.add(last.document);
 514  0
                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
 515  
                         }
 516  0
                         bufferStartCount = tupleCount;
 517  
                     }
 518  
                     
 519  0
                     while (!buffer.isFull()) {
 520  0
                         updateDocument();
 521  0
                         buffer.processTuple(input.readBytes(), input.readInt());
 522  0
                         tupleCount++;
 523  
                     }
 524  0
                 } catch(EOFException e) {}
 525  0
             }
 526  
 
 527  
             public final void updateDocument() throws IOException {
 528  0
                 if (updateDocumentCount > tupleCount)
 529  0
                     return;
 530  
                      
 531  0
                 last.document = input.readString();
 532  0
                 updateDocumentCount = tupleCount + input.readInt();
 533  
                                       
 534  0
                 buffer.processDocument(last.document);
 535  0
             }
 536  
 
 537  
             public void run() throws IOException {
 538  
                 while (true) {
 539  0
                     fill();
 540  
                     
 541  0
                     if (buffer.isAtEnd())
 542  0
                         break;
 543  
                     
 544  0
                     buffer.copyUntil(null, processor);
 545  
                 }      
 546  0
                 processor.close();
 547  0
             }
 548  
             
 549  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 550  0
                 if (processor instanceof ShreddedProcessor) {
 551  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 552  0
                 } else if (processor instanceof DocumentWordPosition.Processor) {
 553  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordPosition.Processor) processor));
 554  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 555  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordPosition>) processor));
 556  
                 } else {
 557  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 558  
                 }
 559  0
             }                                
 560  
             
 561  
             public Class<DocumentWordPosition> getOutputClass() {
 562  0
                 return DocumentWordPosition.class;
 563  
             }                
 564  
         }
 565  
         
 566  
         public static class DuplicateEliminator implements ShreddedProcessor {
 567  
             public ShreddedProcessor processor;
 568  0
             DocumentWordPosition last = new DocumentWordPosition();
 569  0
             boolean documentProcess = true;
 570  
                                            
 571  0
             public DuplicateEliminator() {}
 572  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 573  0
                 this.processor = processor;
 574  0
             }
 575  
             
 576  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 577  0
                 this.processor = processor;
 578  0
             }
 579  
 
 580  
             public void processDocument(String document) throws IOException {  
 581  0
                 if (documentProcess || Utility.compare(document, last.document) != 0) {
 582  0
                     last.document = document;
 583  0
                     processor.processDocument(document);
 584  0
                     documentProcess = false;
 585  
                 }
 586  0
             }  
 587  
             
 588  
             public void resetDocument() {
 589  0
                  documentProcess = true;
 590  0
             }                                                
 591  
                                
 592  
             public void processTuple(byte[] word, int position) throws IOException {
 593  0
                 processor.processTuple(word, position);
 594  0
             } 
 595  
             
 596  
             public void close() throws IOException {
 597  0
                 processor.close();
 598  0
             }                    
 599  
         }
 600  
         public static class TupleUnshredder implements ShreddedProcessor {
 601  0
             DocumentWordPosition last = new DocumentWordPosition();
 602  
             public org.galagosearch.tupleflow.Processor<DocumentWordPosition> processor;                               
 603  
             
 604  0
             public TupleUnshredder(DocumentWordPosition.Processor processor) {
 605  0
                 this.processor = processor;
 606  0
             }         
 607  
             
 608  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordPosition> processor) {
 609  0
                 this.processor = processor;
 610  0
             }
 611  
             
 612  
             public DocumentWordPosition clone(DocumentWordPosition object) {
 613  0
                 DocumentWordPosition result = new DocumentWordPosition();
 614  0
                 if (object == null) return result;
 615  0
                 result.document = object.document; 
 616  0
                 result.word = object.word; 
 617  0
                 result.position = object.position; 
 618  0
                 return result;
 619  
             }                 
 620  
             
 621  
             public void processDocument(String document) throws IOException {
 622  0
                 last.document = document;
 623  0
             }   
 624  
                 
 625  
             
 626  
             public void processTuple(byte[] word, int position) throws IOException {
 627  0
                 last.word = word;
 628  0
                 last.position = position;
 629  0
                 processor.process(clone(last));
 630  0
             }               
 631  
             
 632  
             public void close() throws IOException {
 633  0
                 processor.close();
 634  0
             }
 635  
         }     
 636  0
         public static class TupleShredder implements Processor {
 637  0
             DocumentWordPosition last = new DocumentWordPosition();
 638  
             public ShreddedProcessor processor;
 639  
             
 640  0
             public TupleShredder(ShreddedProcessor processor) {
 641  0
                 this.processor = processor;
 642  0
             }                              
 643  
             
 644  
             public DocumentWordPosition clone(DocumentWordPosition object) {
 645  0
                 DocumentWordPosition result = new DocumentWordPosition();
 646  0
                 if (object == null) return result;
 647  0
                 result.document = object.document; 
 648  0
                 result.word = object.word; 
 649  0
                 result.position = object.position; 
 650  0
                 return result;
 651  
             }                 
 652  
             
 653  
             public void process(DocumentWordPosition object) throws IOException {                                                                                                                                                   
 654  0
                 boolean processAll = false;
 655  0
                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
 656  0
                 processor.processTuple(object.word, object.position);                                         
 657  0
             }
 658  
                           
 659  
             public Class<DocumentWordPosition> getInputClass() {
 660  0
                 return DocumentWordPosition.class;
 661  
             }
 662  
             
 663  
             public void close() throws IOException {
 664  0
                 processor.close();
 665  0
             }                     
 666  
         }
 667  
     } 
 668  108
     public static class DocumentWordPositionOrder implements Order<DocumentWordPosition> {
 669  
         public int hash(DocumentWordPosition object) {
 670  0
             int h = 0;
 671  0
             h += Utility.hash(object.document);
 672  0
             h += Utility.hash(object.word);
 673  0
             h += Utility.hash(object.position);
 674  0
             return h;
 675  
         } 
 676  
         public Comparator<DocumentWordPosition> greaterThan() {
 677  0
             return new Comparator<DocumentWordPosition>() {
 678  0
                 public int compare(DocumentWordPosition one, DocumentWordPosition two) {
 679  0
                     int result = 0;
 680  
                     do {
 681  0
                         result = + Utility.compare(one.document, two.document);
 682  0
                         if(result != 0) break;
 683  0
                         result = + Utility.compare(one.word, two.word);
 684  0
                         if(result != 0) break;
 685  0
                         result = + Utility.compare(one.position, two.position);
 686  0
                         if(result != 0) break;
 687  
                     } while (false);
 688  0
                     return -result;
 689  
                 }
 690  
             };
 691  
         }     
 692  
         public Comparator<DocumentWordPosition> lessThan() {
 693  0
             return new Comparator<DocumentWordPosition>() {
 694  0
                 public int compare(DocumentWordPosition one, DocumentWordPosition two) {
 695  0
                     int result = 0;
 696  
                     do {
 697  0
                         result = + Utility.compare(one.document, two.document);
 698  0
                         if(result != 0) break;
 699  0
                         result = + Utility.compare(one.word, two.word);
 700  0
                         if(result != 0) break;
 701  0
                         result = + Utility.compare(one.position, two.position);
 702  0
                         if(result != 0) break;
 703  
                     } while (false);
 704  0
                     return result;
 705  
                 }
 706  
             };
 707  
         }     
 708  
         public TypeReader<DocumentWordPosition> orderedReader(ArrayInput _input) {
 709  0
             return new ShreddedReader(_input);
 710  
         }    
 711  
 
 712  
         public TypeReader<DocumentWordPosition> orderedReader(ArrayInput _input, int bufferSize) {
 713  0
             return new ShreddedReader(_input, bufferSize);
 714  
         }    
 715  
         public OrderedWriter<DocumentWordPosition> orderedWriter(ArrayOutput _output) {
 716  0
             ShreddedWriter w = new ShreddedWriter(_output);
 717  0
             return new OrderedWriterClass(w); 
 718  
         }                                    
 719  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentWordPosition > {
 720  0
             DocumentWordPosition last = null;
 721  0
             ShreddedWriter shreddedWriter = null; 
 722  
             
 723  0
             public OrderedWriterClass(ShreddedWriter s) {
 724  0
                 this.shreddedWriter = s;
 725  0
             }
 726  
             
 727  
             public void process(DocumentWordPosition object) throws IOException {
 728  0
                boolean processAll = false;
 729  0
                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
 730  0
                if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
 731  0
                if (processAll || last == null || 0 != Utility.compare(object.position, last.position)) { processAll = true; shreddedWriter.processPosition(object.position); }
 732  0
                shreddedWriter.processTuple();
 733  0
                last = object;
 734  0
             }           
 735  
                  
 736  
             public void close() throws IOException {
 737  0
                 shreddedWriter.close();
 738  0
             }
 739  
             
 740  
             public Class<DocumentWordPosition> getInputClass() {
 741  0
                 return DocumentWordPosition.class;
 742  
             }
 743  
         } 
 744  
         public ReaderSource<DocumentWordPosition> orderedCombiner(Collection<TypeReader<DocumentWordPosition>> readers, boolean closeOnExit) {
 745  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 746  
             
 747  0
             for (TypeReader<DocumentWordPosition> reader : readers) {
 748  0
                 shreddedReaders.add((ShreddedReader)reader);
 749  
             }
 750  
             
 751  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 752  
         }                  
 753  
         public DocumentWordPosition clone(DocumentWordPosition object) {
 754  0
             DocumentWordPosition result = new DocumentWordPosition();
 755  0
             if (object == null) return result;
 756  0
             result.document = object.document; 
 757  0
             result.word = object.word; 
 758  0
             result.position = object.position; 
 759  0
             return result;
 760  
         }                 
 761  
         public Class<DocumentWordPosition> getOrderedClass() {
 762  80
             return DocumentWordPosition.class;
 763  
         }                           
 764  
         public String[] getOrderSpec() {
 765  80
             return new String[] {"+document", "+word", "+position"};
 766  
         }
 767  
 
 768  
         public static String getSpecString() {
 769  0
             return "+document +word +position";
 770  
         }
 771  
                            
 772  
         public interface ShreddedProcessor extends Step {
 773  
             public void processDocument(String document) throws IOException;
 774  
             public void processWord(byte[] word) throws IOException;
 775  
             public void processPosition(int position) throws IOException;
 776  
             public void processTuple() throws IOException;
 777  
             public void close() throws IOException;
 778  
         }    
 779  
         public interface ShreddedSource extends Step {
 780  
         }                                              
 781  
         
 782  0
         public static class ShreddedWriter implements ShreddedProcessor {
 783  
             ArrayOutput output;
 784  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 785  
             String lastDocument;
 786  
             byte[] lastWord;
 787  
             int lastPosition;
 788  0
             boolean lastFlush = false;
 789  
             
 790  0
             public ShreddedWriter(ArrayOutput output) {
 791  0
                 this.output = output;
 792  0
             }                        
 793  
             
 794  
             public void close() throws IOException {
 795  0
                 flush();
 796  0
             }
 797  
             
 798  
             public void processDocument(String document) {
 799  0
                 lastDocument = document;
 800  0
                 buffer.processDocument(document);
 801  0
             }
 802  
             public void processWord(byte[] word) {
 803  0
                 lastWord = word;
 804  0
                 buffer.processWord(word);
 805  0
             }
 806  
             public void processPosition(int position) {
 807  0
                 lastPosition = position;
 808  0
                 buffer.processPosition(position);
 809  0
             }
 810  
             public final void processTuple() throws IOException {
 811  0
                 if (lastFlush) {
 812  0
                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
 813  0
                     if(buffer.words.size() == 0) buffer.processWord(lastWord);
 814  0
                     if(buffer.positions.size() == 0) buffer.processPosition(lastPosition);
 815  0
                     lastFlush = false;
 816  
                 }
 817  0
                 buffer.processTuple();
 818  0
                 if (buffer.isFull())
 819  0
                     flush();
 820  0
             }
 821  
             public final void flushTuples(int pauseIndex) throws IOException {
 822  
                 
 823  0
                 while (buffer.getReadIndex() < pauseIndex) {
 824  
                            
 825  0
                     buffer.incrementTuple();
 826  
                 }
 827  0
             }  
 828  
             public final void flushDocument(int pauseIndex) throws IOException {
 829  0
                 while (buffer.getReadIndex() < pauseIndex) {
 830  0
                     int nextPause = buffer.getDocumentEndIndex();
 831  0
                     int count = nextPause - buffer.getReadIndex();
 832  
                     
 833  0
                     output.writeString(buffer.getDocument());
 834  0
                     output.writeInt(count);
 835  0
                     buffer.incrementDocument();
 836  
                       
 837  0
                     flushWord(nextPause);
 838  0
                     assert nextPause == buffer.getReadIndex();
 839  0
                 }
 840  0
             }
 841  
             public final void flushWord(int pauseIndex) throws IOException {
 842  0
                 while (buffer.getReadIndex() < pauseIndex) {
 843  0
                     int nextPause = buffer.getWordEndIndex();
 844  0
                     int count = nextPause - buffer.getReadIndex();
 845  
                     
 846  0
                     output.writeBytes(buffer.getWord());
 847  0
                     output.writeInt(count);
 848  0
                     buffer.incrementWord();
 849  
                       
 850  0
                     flushPosition(nextPause);
 851  0
                     assert nextPause == buffer.getReadIndex();
 852  0
                 }
 853  0
             }
 854  
             public final void flushPosition(int pauseIndex) throws IOException {
 855  0
                 while (buffer.getReadIndex() < pauseIndex) {
 856  0
                     int nextPause = buffer.getPositionEndIndex();
 857  0
                     int count = nextPause - buffer.getReadIndex();
 858  
                     
 859  0
                     output.writeInt(buffer.getPosition());
 860  0
                     output.writeInt(count);
 861  0
                     buffer.incrementPosition();
 862  
                       
 863  0
                     flushTuples(nextPause);
 864  0
                     assert nextPause == buffer.getReadIndex();
 865  0
                 }
 866  0
             }
 867  
             public void flush() throws IOException { 
 868  0
                 flushDocument(buffer.getWriteIndex());
 869  0
                 buffer.reset(); 
 870  0
                 lastFlush = true;
 871  0
             }                           
 872  
         }
 873  0
         public static class ShreddedBuffer {
 874  0
             ArrayList<String> documents = new ArrayList();
 875  0
             ArrayList<byte[]> words = new ArrayList();
 876  0
             ArrayList<Integer> positions = new ArrayList();
 877  0
             ArrayList<Integer> documentTupleIdx = new ArrayList();
 878  0
             ArrayList<Integer> wordTupleIdx = new ArrayList();
 879  0
             ArrayList<Integer> positionTupleIdx = new ArrayList();
 880  0
             int documentReadIdx = 0;
 881  0
             int wordReadIdx = 0;
 882  0
             int positionReadIdx = 0;
 883  
                             
 884  0
             int writeTupleIndex = 0;
 885  0
             int readTupleIndex = 0;
 886  
             int batchSize;
 887  
 
 888  0
             public ShreddedBuffer(int batchSize) {
 889  0
                 this.batchSize = batchSize;
 890  
 
 891  0
             }                              
 892  
 
 893  
             public ShreddedBuffer() {    
 894  0
                 this(10000);
 895  0
             }                                                                                                                    
 896  
             
 897  
             public void processDocument(String document) {
 898  0
                 documents.add(document);
 899  0
                 documentTupleIdx.add(writeTupleIndex);
 900  0
             }                                      
 901  
             public void processWord(byte[] word) {
 902  0
                 words.add(word);
 903  0
                 wordTupleIdx.add(writeTupleIndex);
 904  0
             }                                      
 905  
             public void processPosition(int position) {
 906  0
                 positions.add(position);
 907  0
                 positionTupleIdx.add(writeTupleIndex);
 908  0
             }                                      
 909  
             public void processTuple() {
 910  0
                 assert documents.size() > 0;
 911  0
                 assert words.size() > 0;
 912  0
                 assert positions.size() > 0;
 913  0
                 writeTupleIndex++;
 914  0
             }
 915  
             public void resetData() {
 916  0
                 documents.clear();
 917  0
                 words.clear();
 918  0
                 positions.clear();
 919  0
                 documentTupleIdx.clear();
 920  0
                 wordTupleIdx.clear();
 921  0
                 positionTupleIdx.clear();
 922  0
                 writeTupleIndex = 0;
 923  0
             }                  
 924  
                                  
 925  
             public void resetRead() {
 926  0
                 readTupleIndex = 0;
 927  0
                 documentReadIdx = 0;
 928  0
                 wordReadIdx = 0;
 929  0
                 positionReadIdx = 0;
 930  0
             } 
 931  
 
 932  
             public void reset() {
 933  0
                 resetData();
 934  0
                 resetRead();
 935  0
             } 
 936  
             public boolean isFull() {
 937  0
                 return writeTupleIndex >= batchSize;
 938  
             }
 939  
 
 940  
             public boolean isEmpty() {
 941  0
                 return writeTupleIndex == 0;
 942  
             }                          
 943  
 
 944  
             public boolean isAtEnd() {
 945  0
                 return readTupleIndex >= writeTupleIndex;
 946  
             }           
 947  
             public void incrementDocument() {
 948  0
                 documentReadIdx++;  
 949  0
             }                                                                                              
 950  
 
 951  
             public void autoIncrementDocument() {
 952  0
                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
 953  0
                     documentReadIdx++;
 954  0
             }                 
 955  
             public void incrementWord() {
 956  0
                 wordReadIdx++;  
 957  0
             }                                                                                              
 958  
 
 959  
             public void autoIncrementWord() {
 960  0
                 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
 961  0
                     wordReadIdx++;
 962  0
             }                 
 963  
             public void incrementPosition() {
 964  0
                 positionReadIdx++;  
 965  0
             }                                                                                              
 966  
 
 967  
             public void autoIncrementPosition() {
 968  0
                 while (readTupleIndex >= getPositionEndIndex() && readTupleIndex < writeTupleIndex)
 969  0
                     positionReadIdx++;
 970  0
             }                 
 971  
             public void incrementTuple() {
 972  0
                 readTupleIndex++;
 973  0
             }                    
 974  
             public int getDocumentEndIndex() {
 975  0
                 if ((documentReadIdx+1) >= documentTupleIdx.size())
 976  0
                     return writeTupleIndex;
 977  0
                 return documentTupleIdx.get(documentReadIdx+1);
 978  
             }
 979  
 
 980  
             public int getWordEndIndex() {
 981  0
                 if ((wordReadIdx+1) >= wordTupleIdx.size())
 982  0
                     return writeTupleIndex;
 983  0
                 return wordTupleIdx.get(wordReadIdx+1);
 984  
             }
 985  
 
 986  
             public int getPositionEndIndex() {
 987  0
                 if ((positionReadIdx+1) >= positionTupleIdx.size())
 988  0
                     return writeTupleIndex;
 989  0
                 return positionTupleIdx.get(positionReadIdx+1);
 990  
             }
 991  
             public int getReadIndex() {
 992  0
                 return readTupleIndex;
 993  
             }   
 994  
 
 995  
             public int getWriteIndex() {
 996  0
                 return writeTupleIndex;
 997  
             } 
 998  
             public String getDocument() {
 999  0
                 assert readTupleIndex < writeTupleIndex;
 1000  0
                 assert documentReadIdx < documents.size();
 1001  
                 
 1002  0
                 return documents.get(documentReadIdx);
 1003  
             }
 1004  
             public byte[] getWord() {
 1005  0
                 assert readTupleIndex < writeTupleIndex;
 1006  0
                 assert wordReadIdx < words.size();
 1007  
                 
 1008  0
                 return words.get(wordReadIdx);
 1009  
             }
 1010  
             public int getPosition() {
 1011  0
                 assert readTupleIndex < writeTupleIndex;
 1012  0
                 assert positionReadIdx < positions.size();
 1013  
                 
 1014  0
                 return positions.get(positionReadIdx);
 1015  
             }
 1016  
 
 1017  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 1018  0
                 while (getReadIndex() < endIndex) {
 1019  0
                    output.processTuple();
 1020  0
                    incrementTuple();
 1021  
                 }
 1022  0
             }                                                                           
 1023  
             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
 1024  0
                 while (getReadIndex() < endIndex) {
 1025  0
                     output.processDocument(getDocument());
 1026  0
                     assert getDocumentEndIndex() <= endIndex;
 1027  0
                     copyUntilIndexWord(getDocumentEndIndex(), output);
 1028  0
                     incrementDocument();
 1029  
                 }
 1030  0
             } 
 1031  
             public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
 1032  0
                 while (getReadIndex() < endIndex) {
 1033  0
                     output.processWord(getWord());
 1034  0
                     assert getWordEndIndex() <= endIndex;
 1035  0
                     copyUntilIndexPosition(getWordEndIndex(), output);
 1036  0
                     incrementWord();
 1037  
                 }
 1038  0
             } 
 1039  
             public void copyUntilIndexPosition(int endIndex, ShreddedProcessor output) throws IOException {
 1040  0
                 while (getReadIndex() < endIndex) {
 1041  0
                     output.processPosition(getPosition());
 1042  0
                     assert getPositionEndIndex() <= endIndex;
 1043  0
                     copyTuples(getPositionEndIndex(), output);
 1044  0
                     incrementPosition();
 1045  
                 }
 1046  0
             }  
 1047  
             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1048  0
                 while (!isAtEnd()) {
 1049  0
                     if (other != null) {   
 1050  0
                         assert !other.isAtEnd();
 1051  0
                         int c = + Utility.compare(getDocument(), other.getDocument());
 1052  
                     
 1053  0
                         if (c > 0) {
 1054  0
                             break;   
 1055  
                         }
 1056  
                         
 1057  0
                         output.processDocument(getDocument());
 1058  
                                       
 1059  0
                         if (c < 0) {
 1060  0
                             copyUntilIndexWord(getDocumentEndIndex(), output);
 1061  0
                         } else if (c == 0) {
 1062  0
                             copyUntilWord(other, output);
 1063  0
                             autoIncrementDocument();
 1064  0
                             break;
 1065  
                         }
 1066  0
                     } else {
 1067  0
                         output.processDocument(getDocument());
 1068  0
                         copyUntilIndexWord(getDocumentEndIndex(), output);
 1069  
                     }
 1070  0
                     incrementDocument();  
 1071  
                     
 1072  
                
 1073  
                 }
 1074  0
             }
 1075  
             public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1076  0
                 while (!isAtEnd()) {
 1077  0
                     if (other != null) {   
 1078  0
                         assert !other.isAtEnd();
 1079  0
                         int c = + Utility.compare(getWord(), other.getWord());
 1080  
                     
 1081  0
                         if (c > 0) {
 1082  0
                             break;   
 1083  
                         }
 1084  
                         
 1085  0
                         output.processWord(getWord());
 1086  
                                       
 1087  0
                         if (c < 0) {
 1088  0
                             copyUntilIndexPosition(getWordEndIndex(), output);
 1089  0
                         } else if (c == 0) {
 1090  0
                             copyUntilPosition(other, output);
 1091  0
                             autoIncrementWord();
 1092  0
                             break;
 1093  
                         }
 1094  0
                     } else {
 1095  0
                         output.processWord(getWord());
 1096  0
                         copyUntilIndexPosition(getWordEndIndex(), output);
 1097  
                     }
 1098  0
                     incrementWord();  
 1099  
                     
 1100  0
                     if (getDocumentEndIndex() <= readTupleIndex)
 1101  0
                         break;   
 1102  
                 }
 1103  0
             }
 1104  
             public void copyUntilPosition(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1105  0
                 while (!isAtEnd()) {
 1106  0
                     if (other != null) {   
 1107  0
                         assert !other.isAtEnd();
 1108  0
                         int c = + Utility.compare(getPosition(), other.getPosition());
 1109  
                     
 1110  0
                         if (c > 0) {
 1111  0
                             break;   
 1112  
                         }
 1113  
                         
 1114  0
                         output.processPosition(getPosition());
 1115  
                                       
 1116  0
                         copyTuples(getPositionEndIndex(), output);
 1117  0
                     } else {
 1118  0
                         output.processPosition(getPosition());
 1119  0
                         copyTuples(getPositionEndIndex(), output);
 1120  
                     }
 1121  0
                     incrementPosition();  
 1122  
                     
 1123  0
                     if (getWordEndIndex() <= readTupleIndex)
 1124  0
                         break;   
 1125  
                 }
 1126  0
             }
 1127  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1128  0
                 copyUntilDocument(other, output);
 1129  0
             }
 1130  
             
 1131  
         }                         
 1132  0
         public static class ShreddedCombiner implements ReaderSource<DocumentWordPosition>, ShreddedSource {   
 1133  
             public ShreddedProcessor processor;
 1134  
             Collection<ShreddedReader> readers;       
 1135  0
             boolean closeOnExit = false;
 1136  0
             boolean uninitialized = true;
 1137  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 1138  
             
 1139  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 1140  0
                 this.readers = readers;                                                       
 1141  0
                 this.closeOnExit = closeOnExit;
 1142  0
             }
 1143  
                                   
 1144  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1145  0
                 if (processor instanceof ShreddedProcessor) {
 1146  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1147  0
                 } else if (processor instanceof DocumentWordPosition.Processor) {
 1148  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordPosition.Processor) processor));
 1149  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1150  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordPosition>) processor));
 1151  
                 } else {
 1152  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1153  
                 }
 1154  0
             }                                
 1155  
             
 1156  
             public Class<DocumentWordPosition> getOutputClass() {
 1157  0
                 return DocumentWordPosition.class;
 1158  
             }
 1159  
             
 1160  
             public void initialize() throws IOException {
 1161  0
                 for (ShreddedReader reader : readers) {
 1162  0
                     reader.fill();                                        
 1163  
                     
 1164  0
                     if (!reader.getBuffer().isAtEnd())
 1165  0
                         queue.add(reader);
 1166  
                 }   
 1167  
 
 1168  0
                 uninitialized = false;
 1169  0
             }
 1170  
 
 1171  
             public void run() throws IOException {
 1172  0
                 initialize();
 1173  
                
 1174  0
                 while (queue.size() > 0) {
 1175  0
                     ShreddedReader top = queue.poll();
 1176  0
                     ShreddedReader next = null;
 1177  0
                     ShreddedBuffer nextBuffer = null; 
 1178  
                     
 1179  0
                     assert !top.getBuffer().isAtEnd();
 1180  
                                                   
 1181  0
                     if (queue.size() > 0) {
 1182  0
                         next = queue.peek();
 1183  0
                         nextBuffer = next.getBuffer();
 1184  0
                         assert !nextBuffer.isAtEnd();
 1185  
                     }
 1186  
                     
 1187  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 1188  0
                     if (top.getBuffer().isAtEnd())
 1189  0
                         top.fill();                 
 1190  
                         
 1191  0
                     if (!top.getBuffer().isAtEnd())
 1192  0
                         queue.add(top);
 1193  0
                 }              
 1194  
                 
 1195  0
                 if (closeOnExit)
 1196  0
                     processor.close();
 1197  0
             }
 1198  
 
 1199  
             public DocumentWordPosition read() throws IOException {
 1200  0
                 if (uninitialized)
 1201  0
                     initialize();
 1202  
 
 1203  0
                 DocumentWordPosition result = null;
 1204  
 
 1205  0
                 while (queue.size() > 0) {
 1206  0
                     ShreddedReader top = queue.poll();
 1207  0
                     result = top.read();
 1208  
 
 1209  0
                     if (result != null) {
 1210  0
                         if (top.getBuffer().isAtEnd())
 1211  0
                             top.fill();
 1212  
 
 1213  0
                         queue.offer(top);
 1214  0
                         break;
 1215  
                     } 
 1216  0
                 }
 1217  
 
 1218  0
                 return result;
 1219  
             }
 1220  
         } 
 1221  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordPosition>, ShreddedSource {      
 1222  
             public ShreddedProcessor processor;
 1223  
             ShreddedBuffer buffer;
 1224  0
             DocumentWordPosition last = new DocumentWordPosition();         
 1225  0
             long updateDocumentCount = -1;
 1226  0
             long updateWordCount = -1;
 1227  0
             long updatePositionCount = -1;
 1228  0
             long tupleCount = 0;
 1229  0
             long bufferStartCount = 0;  
 1230  
             ArrayInput input;
 1231  
             
 1232  0
             public ShreddedReader(ArrayInput input) {
 1233  0
                 this.input = input; 
 1234  0
                 this.buffer = new ShreddedBuffer();
 1235  0
             }                               
 1236  
             
 1237  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 1238  0
                 this.input = input;
 1239  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 1240  0
             }
 1241  
                  
 1242  
             public final int compareTo(ShreddedReader other) {
 1243  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 1244  
                 
 1245  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 1246  0
                     return 0;                 
 1247  0
                 } else if (buffer.isAtEnd()) {
 1248  0
                     return -1;
 1249  0
                 } else if (otherBuffer.isAtEnd()) {
 1250  0
                     return 1;
 1251  
                 }
 1252  
                                    
 1253  0
                 int result = 0;
 1254  
                 do {
 1255  0
                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
 1256  0
                     if(result != 0) break;
 1257  0
                     result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
 1258  0
                     if(result != 0) break;
 1259  0
                     result = + Utility.compare(buffer.getPosition(), otherBuffer.getPosition());
 1260  0
                     if(result != 0) break;
 1261  
                 } while (false);                                             
 1262  
                 
 1263  0
                 return result;
 1264  
             }
 1265  
             
 1266  
             public final ShreddedBuffer getBuffer() {
 1267  0
                 return buffer;
 1268  
             }                
 1269  
             
 1270  
             public final DocumentWordPosition read() throws IOException {
 1271  0
                 if (buffer.isAtEnd()) {
 1272  0
                     fill();             
 1273  
                 
 1274  0
                     if (buffer.isAtEnd()) {
 1275  0
                         return null;
 1276  
                     }
 1277  
                 }
 1278  
                       
 1279  0
                 assert !buffer.isAtEnd();
 1280  0
                 DocumentWordPosition result = new DocumentWordPosition();
 1281  
                 
 1282  0
                 result.document = buffer.getDocument();
 1283  0
                 result.word = buffer.getWord();
 1284  0
                 result.position = buffer.getPosition();
 1285  
                 
 1286  0
                 buffer.incrementTuple();
 1287  0
                 buffer.autoIncrementDocument();
 1288  0
                 buffer.autoIncrementWord();
 1289  0
                 buffer.autoIncrementPosition();
 1290  
                 
 1291  0
                 return result;
 1292  
             }           
 1293  
             
 1294  
             public final void fill() throws IOException {
 1295  
                 try {   
 1296  0
                     buffer.reset();
 1297  
                     
 1298  0
                     if (tupleCount != 0) {
 1299  
                                                       
 1300  0
                         if(updateDocumentCount - tupleCount > 0) {
 1301  0
                             buffer.documents.add(last.document);
 1302  0
                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
 1303  
                         }                              
 1304  0
                         if(updateWordCount - tupleCount > 0) {
 1305  0
                             buffer.words.add(last.word);
 1306  0
                             buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
 1307  
                         }                              
 1308  0
                         if(updatePositionCount - tupleCount > 0) {
 1309  0
                             buffer.positions.add(last.position);
 1310  0
                             buffer.positionTupleIdx.add((int) (updatePositionCount - tupleCount));
 1311  
                         }
 1312  0
                         bufferStartCount = tupleCount;
 1313  
                     }
 1314  
                     
 1315  0
                     while (!buffer.isFull()) {
 1316  0
                         updatePosition();
 1317  0
                         buffer.processTuple();
 1318  0
                         tupleCount++;
 1319  
                     }
 1320  0
                 } catch(EOFException e) {}
 1321  0
             }
 1322  
 
 1323  
             public final void updateDocument() throws IOException {
 1324  0
                 if (updateDocumentCount > tupleCount)
 1325  0
                     return;
 1326  
                      
 1327  0
                 last.document = input.readString();
 1328  0
                 updateDocumentCount = tupleCount + input.readInt();
 1329  
                                       
 1330  0
                 buffer.processDocument(last.document);
 1331  0
             }
 1332  
             public final void updateWord() throws IOException {
 1333  0
                 if (updateWordCount > tupleCount)
 1334  0
                     return;
 1335  
                      
 1336  0
                 updateDocument();
 1337  0
                 last.word = input.readBytes();
 1338  0
                 updateWordCount = tupleCount + input.readInt();
 1339  
                                       
 1340  0
                 buffer.processWord(last.word);
 1341  0
             }
 1342  
             public final void updatePosition() throws IOException {
 1343  0
                 if (updatePositionCount > tupleCount)
 1344  0
                     return;
 1345  
                      
 1346  0
                 updateWord();
 1347  0
                 last.position = input.readInt();
 1348  0
                 updatePositionCount = tupleCount + input.readInt();
 1349  
                                       
 1350  0
                 buffer.processPosition(last.position);
 1351  0
             }
 1352  
 
 1353  
             public void run() throws IOException {
 1354  
                 while (true) {
 1355  0
                     fill();
 1356  
                     
 1357  0
                     if (buffer.isAtEnd())
 1358  0
                         break;
 1359  
                     
 1360  0
                     buffer.copyUntil(null, processor);
 1361  
                 }      
 1362  0
                 processor.close();
 1363  0
             }
 1364  
             
 1365  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1366  0
                 if (processor instanceof ShreddedProcessor) {
 1367  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1368  0
                 } else if (processor instanceof DocumentWordPosition.Processor) {
 1369  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordPosition.Processor) processor));
 1370  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1371  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordPosition>) processor));
 1372  
                 } else {
 1373  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1374  
                 }
 1375  0
             }                                
 1376  
             
 1377  
             public Class<DocumentWordPosition> getOutputClass() {
 1378  0
                 return DocumentWordPosition.class;
 1379  
             }                
 1380  
         }
 1381  
         
 1382  
         public static class DuplicateEliminator implements ShreddedProcessor {
 1383  
             public ShreddedProcessor processor;
 1384  0
             DocumentWordPosition last = new DocumentWordPosition();
 1385  0
             boolean documentProcess = true;
 1386  0
             boolean wordProcess = true;
 1387  0
             boolean positionProcess = true;
 1388  
                                            
 1389  0
             public DuplicateEliminator() {}
 1390  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 1391  0
                 this.processor = processor;
 1392  0
             }
 1393  
             
 1394  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 1395  0
                 this.processor = processor;
 1396  0
             }
 1397  
 
 1398  
             public void processDocument(String document) throws IOException {  
 1399  0
                 if (documentProcess || Utility.compare(document, last.document) != 0) {
 1400  0
                     last.document = document;
 1401  0
                     processor.processDocument(document);
 1402  0
             resetWord();
 1403  0
                     documentProcess = false;
 1404  
                 }
 1405  0
             }
 1406  
             public void processWord(byte[] word) throws IOException {  
 1407  0
                 if (wordProcess || Utility.compare(word, last.word) != 0) {
 1408  0
                     last.word = word;
 1409  0
                     processor.processWord(word);
 1410  0
             resetPosition();
 1411  0
                     wordProcess = false;
 1412  
                 }
 1413  0
             }
 1414  
             public void processPosition(int position) throws IOException {  
 1415  0
                 if (positionProcess || Utility.compare(position, last.position) != 0) {
 1416  0
                     last.position = position;
 1417  0
                     processor.processPosition(position);
 1418  0
                     positionProcess = false;
 1419  
                 }
 1420  0
             }  
 1421  
             
 1422  
             public void resetDocument() {
 1423  0
                  documentProcess = true;
 1424  0
             resetWord();
 1425  0
             }                                                
 1426  
             public void resetWord() {
 1427  0
                  wordProcess = true;
 1428  0
             resetPosition();
 1429  0
             }                                                
 1430  
             public void resetPosition() {
 1431  0
                  positionProcess = true;
 1432  0
             }                                                
 1433  
                                
 1434  
             public void processTuple() throws IOException {
 1435  0
                 processor.processTuple();
 1436  0
             } 
 1437  
             
 1438  
             public void close() throws IOException {
 1439  0
                 processor.close();
 1440  0
             }                    
 1441  
         }
 1442  
         public static class TupleUnshredder implements ShreddedProcessor {
 1443  0
             DocumentWordPosition last = new DocumentWordPosition();
 1444  
             public org.galagosearch.tupleflow.Processor<DocumentWordPosition> processor;                               
 1445  
             
 1446  0
             public TupleUnshredder(DocumentWordPosition.Processor processor) {
 1447  0
                 this.processor = processor;
 1448  0
             }         
 1449  
             
 1450  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordPosition> processor) {
 1451  0
                 this.processor = processor;
 1452  0
             }
 1453  
             
 1454  
             public DocumentWordPosition clone(DocumentWordPosition object) {
 1455  0
                 DocumentWordPosition result = new DocumentWordPosition();
 1456  0
                 if (object == null) return result;
 1457  0
                 result.document = object.document; 
 1458  0
                 result.word = object.word; 
 1459  0
                 result.position = object.position; 
 1460  0
                 return result;
 1461  
             }                 
 1462  
             
 1463  
             public void processDocument(String document) throws IOException {
 1464  0
                 last.document = document;
 1465  0
             }   
 1466  
                 
 1467  
             public void processWord(byte[] word) throws IOException {
 1468  0
                 last.word = word;
 1469  0
             }   
 1470  
                 
 1471  
             public void processPosition(int position) throws IOException {
 1472  0
                 last.position = position;
 1473  0
             }   
 1474  
                 
 1475  
             
 1476  
             public void processTuple() throws IOException {
 1477  0
                 processor.process(clone(last));
 1478  0
             }               
 1479  
             
 1480  
             public void close() throws IOException {
 1481  0
                 processor.close();
 1482  0
             }
 1483  
         }     
 1484  0
         public static class TupleShredder implements Processor {
 1485  0
             DocumentWordPosition last = new DocumentWordPosition();
 1486  
             public ShreddedProcessor processor;
 1487  
             
 1488  0
             public TupleShredder(ShreddedProcessor processor) {
 1489  0
                 this.processor = processor;
 1490  0
             }                              
 1491  
             
 1492  
             public DocumentWordPosition clone(DocumentWordPosition object) {
 1493  0
                 DocumentWordPosition result = new DocumentWordPosition();
 1494  0
                 if (object == null) return result;
 1495  0
                 result.document = object.document; 
 1496  0
                 result.word = object.word; 
 1497  0
                 result.position = object.position; 
 1498  0
                 return result;
 1499  
             }                 
 1500  
             
 1501  
             public void process(DocumentWordPosition object) throws IOException {                                                                                                                                                   
 1502  0
                 boolean processAll = false;
 1503  0
                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
 1504  0
                 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
 1505  0
                 if(last == null || Utility.compare(last.position, object.position) != 0 || processAll) { processor.processPosition(object.position); processAll = true; }
 1506  0
                 processor.processTuple();                                         
 1507  0
             }
 1508  
                           
 1509  
             public Class<DocumentWordPosition> getInputClass() {
 1510  0
                 return DocumentWordPosition.class;
 1511  
             }
 1512  
             
 1513  
             public void close() throws IOException {
 1514  0
                 processor.close();
 1515  0
             }                     
 1516  
         }
 1517  
     } 
 1518  
 }