Coverage Report - org.galagosearch.core.types.DocumentData
 
Classes in this File Line Coverage Branch Coverage Complexity
DocumentData
43%
6/14
67%
4/6
0
DocumentData$IdentifierOrder
17%
4/24
0%
0/4
0
DocumentData$IdentifierOrder$1
0%
0/5
0%
0/2
0
DocumentData$IdentifierOrder$2
0%
0/5
0%
0/2
0
DocumentData$IdentifierOrder$DuplicateEliminator
0%
0/19
0%
0/4
0
DocumentData$IdentifierOrder$OrderedWriterClass
0%
0/14
0%
0/6
0
DocumentData$IdentifierOrder$ShreddedBuffer
0%
0/78
0%
0/50
0
DocumentData$IdentifierOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentData$IdentifierOrder$ShreddedProcessor
N/A
N/A
0
DocumentData$IdentifierOrder$ShreddedReader
0%
0/70
0%
0/34
0
DocumentData$IdentifierOrder$ShreddedSource
N/A
N/A
0
DocumentData$IdentifierOrder$ShreddedWriter
0%
0/37
0%
0/14
0
DocumentData$IdentifierOrder$TupleShredder
0%
0/18
0%
0/8
0
DocumentData$IdentifierOrder$TupleUnshredder
0%
0/21
0%
0/2
0
DocumentData$Processor
N/A
N/A
0
DocumentData$Source
N/A
N/A
0
DocumentData$Unordered
0%
0/23
0%
0/4
0
DocumentData$Unordered$1
0%
0/3
N/A
0
DocumentData$Unordered$2
0%
0/3
N/A
0
DocumentData$Unordered$DuplicateEliminator
0%
0/11
N/A
0
DocumentData$Unordered$OrderedWriterClass
0%
0/13
N/A
0
DocumentData$Unordered$ShreddedBuffer
0%
0/41
0%
0/20
0
DocumentData$Unordered$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentData$Unordered$ShreddedProcessor
N/A
N/A
0
DocumentData$Unordered$ShreddedReader
0%
0/56
0%
0/28
0
DocumentData$Unordered$ShreddedSource
N/A
N/A
0
DocumentData$Unordered$ShreddedWriter
0%
0/23
0%
0/6
0
DocumentData$Unordered$TupleShredder
0%
0/17
0%
0/2
0
DocumentData$Unordered$TupleUnshredder
0%
0/20
0%
0/2
0
DocumentData$UrlOrder
17%
4/24
0%
0/4
0
DocumentData$UrlOrder$1
0%
0/5
0%
0/2
0
DocumentData$UrlOrder$2
0%
0/5
0%
0/2
0
DocumentData$UrlOrder$DuplicateEliminator
0%
0/19
0%
0/4
0
DocumentData$UrlOrder$OrderedWriterClass
0%
0/14
0%
0/6
0
DocumentData$UrlOrder$ShreddedBuffer
0%
0/78
0%
0/50
0
DocumentData$UrlOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentData$UrlOrder$ShreddedProcessor
N/A
N/A
0
DocumentData$UrlOrder$ShreddedReader
0%
0/70
0%
0/34
0
DocumentData$UrlOrder$ShreddedSource
N/A
N/A
0
DocumentData$UrlOrder$ShreddedWriter
0%
0/37
0%
0/14
0
DocumentData$UrlOrder$TupleShredder
0%
0/18
0%
0/8
0
DocumentData$UrlOrder$TupleUnshredder
0%
0/21
0%
0/2
0
 
 1  
 // This file was automatically generated with the command: 
 2  
 //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
 3  
 package org.galagosearch.core.types;
 4  
 
 5  
 import org.galagosearch.tupleflow.Utility;
 6  
 import org.galagosearch.tupleflow.ArrayInput;
 7  
 import org.galagosearch.tupleflow.ArrayOutput;
 8  
 import org.galagosearch.tupleflow.Order;   
 9  
 import org.galagosearch.tupleflow.OrderedWriter;
 10  
 import org.galagosearch.tupleflow.Type; 
 11  
 import org.galagosearch.tupleflow.TypeReader;
 12  
 import org.galagosearch.tupleflow.Step; 
 13  
 import org.galagosearch.tupleflow.IncompatibleProcessorException;
 14  
 import org.galagosearch.tupleflow.ReaderSource;
 15  
 import java.io.IOException;             
 16  
 import java.io.EOFException;
 17  
 import java.io.UnsupportedEncodingException;
 18  
 import java.util.ArrayList;
 19  
 import java.util.Arrays;   
 20  
 import java.util.Comparator;
 21  
 import java.util.PriorityQueue;
 22  
 import java.util.Collection;
 23  
 
 24  
 public class DocumentData implements Type<DocumentData> {
 25  
     public String identifier;
 26  
     public String url;
 27  
     public int textLength; 
 28  
     
 29  28
     public DocumentData() {}
 30  0
     public DocumentData(String identifier, String url, int textLength) {
 31  0
         this.identifier = identifier;
 32  0
         this.url = url;
 33  0
         this.textLength = textLength;
 34  0
     }  
 35  
     
 36  
     public String toString() {
 37  0
             return String.format("%s,%s,%d",
 38  
                                    identifier, url, textLength);
 39  
     } 
 40  
 
 41  
     public Order<DocumentData> getOrder(String... spec) {
 42  28
         if (Arrays.equals(spec, new String[] {  })) {
 43  0
             return new Unordered();
 44  
         }
 45  28
         if (Arrays.equals(spec, new String[] { "+url" })) {
 46  8
             return new UrlOrder();
 47  
         }
 48  20
         if (Arrays.equals(spec, new String[] { "+identifier" })) {
 49  20
             return new IdentifierOrder();
 50  
         }
 51  0
         return null;
 52  
     } 
 53  
       
 54  
     public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentData> {
 55  
         public void process(DocumentData object) throws IOException;
 56  
         public void close() throws IOException;
 57  
     }                        
 58  
     public interface Source extends Step {
 59  
     }
 60  0
     public static class Unordered implements Order<DocumentData> {
 61  
         public int hash(DocumentData object) {
 62  0
             int h = 0;
 63  0
             return h;
 64  
         } 
 65  
         public Comparator<DocumentData> greaterThan() {
 66  0
             return new Comparator<DocumentData>() {
 67  0
                 public int compare(DocumentData one, DocumentData two) {
 68  0
                     int result = 0;
 69  
                     do {
 70  
                     } while (false);
 71  0
                     return -result;
 72  
                 }
 73  
             };
 74  
         }     
 75  
         public Comparator<DocumentData> lessThan() {
 76  0
             return new Comparator<DocumentData>() {
 77  0
                 public int compare(DocumentData one, DocumentData two) {
 78  0
                     int result = 0;
 79  
                     do {
 80  
                     } while (false);
 81  0
                     return result;
 82  
                 }
 83  
             };
 84  
         }     
 85  
         public TypeReader<DocumentData> orderedReader(ArrayInput _input) {
 86  0
             return new ShreddedReader(_input);
 87  
         }    
 88  
 
 89  
         public TypeReader<DocumentData> orderedReader(ArrayInput _input, int bufferSize) {
 90  0
             return new ShreddedReader(_input, bufferSize);
 91  
         }    
 92  
         public OrderedWriter<DocumentData> orderedWriter(ArrayOutput _output) {
 93  0
             ShreddedWriter w = new ShreddedWriter(_output);
 94  0
             return new OrderedWriterClass(w); 
 95  
         }                                    
 96  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentData > {
 97  0
             DocumentData last = null;
 98  0
             ShreddedWriter shreddedWriter = null; 
 99  
             
 100  0
             public OrderedWriterClass(ShreddedWriter s) {
 101  0
                 this.shreddedWriter = s;
 102  0
             }
 103  
             
 104  
             public void process(DocumentData object) throws IOException {
 105  0
                boolean processAll = false;
 106  0
                shreddedWriter.processTuple(object.identifier, object.url, object.textLength);
 107  0
                last = object;
 108  0
             }           
 109  
                  
 110  
             public void close() throws IOException {
 111  0
                 shreddedWriter.close();
 112  0
             }
 113  
             
 114  
             public Class<DocumentData> getInputClass() {
 115  0
                 return DocumentData.class;
 116  
             }
 117  
         } 
 118  
         public ReaderSource<DocumentData> orderedCombiner(Collection<TypeReader<DocumentData>> readers, boolean closeOnExit) {
 119  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 120  
             
 121  0
             for (TypeReader<DocumentData> reader : readers) {
 122  0
                 shreddedReaders.add((ShreddedReader)reader);
 123  
             }
 124  
             
 125  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 126  
         }                  
 127  
         public DocumentData clone(DocumentData object) {
 128  0
             DocumentData result = new DocumentData();
 129  0
             if (object == null) return result;
 130  0
             result.identifier = object.identifier; 
 131  0
             result.url = object.url; 
 132  0
             result.textLength = object.textLength; 
 133  0
             return result;
 134  
         }                 
 135  
         public Class<DocumentData> getOrderedClass() {
 136  0
             return DocumentData.class;
 137  
         }                           
 138  
         public String[] getOrderSpec() {
 139  0
             return new String[] {};
 140  
         }
 141  
 
 142  
         public static String getSpecString() {
 143  0
             return "";
 144  
         }
 145  
                            
 146  
         public interface ShreddedProcessor extends Step {
 147  
             public void processTuple(String identifier, String url, int textLength) throws IOException;
 148  
             public void close() throws IOException;
 149  
         }    
 150  
         public interface ShreddedSource extends Step {
 151  
         }                                              
 152  
         
 153  
         public static class ShreddedWriter implements ShreddedProcessor {
 154  
             ArrayOutput output;
 155  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 156  0
             boolean lastFlush = false;
 157  
             
 158  0
             public ShreddedWriter(ArrayOutput output) {
 159  0
                 this.output = output;
 160  0
             }                        
 161  
             
 162  
             public void close() throws IOException {
 163  0
                 flush();
 164  0
             }
 165  
             
 166  
             public final void processTuple(String identifier, String url, int textLength) throws IOException {
 167  0
                 if (lastFlush) {
 168  0
                     lastFlush = false;
 169  
                 }
 170  0
                 buffer.processTuple(identifier, url, textLength);
 171  0
                 if (buffer.isFull())
 172  0
                     flush();
 173  0
             }
 174  
             public final void flushTuples(int pauseIndex) throws IOException {
 175  
                 
 176  0
                 while (buffer.getReadIndex() < pauseIndex) {
 177  
                            
 178  0
                     output.writeString(buffer.getIdentifier());
 179  0
                     output.writeString(buffer.getUrl());
 180  0
                     output.writeInt(buffer.getTextLength());
 181  0
                     buffer.incrementTuple();
 182  
                 }
 183  0
             }  
 184  
             public void flush() throws IOException { 
 185  0
                 flushTuples(buffer.getWriteIndex());
 186  0
                 buffer.reset(); 
 187  0
                 lastFlush = true;
 188  0
             }                           
 189  
         }
 190  0
         public static class ShreddedBuffer {
 191  
                             
 192  
             String[] identifiers;
 193  
             String[] urls;
 194  
             int[] textLengths;
 195  0
             int writeTupleIndex = 0;
 196  0
             int readTupleIndex = 0;
 197  
             int batchSize;
 198  
 
 199  0
             public ShreddedBuffer(int batchSize) {
 200  0
                 this.batchSize = batchSize;
 201  
 
 202  0
                 identifiers = new String[batchSize];
 203  0
                 urls = new String[batchSize];
 204  0
                 textLengths = new int[batchSize];
 205  0
             }                              
 206  
 
 207  
             public ShreddedBuffer() {    
 208  0
                 this(10000);
 209  0
             }                                                                                                                    
 210  
             
 211  
             public void processTuple(String identifier, String url, int textLength) {
 212  0
                 identifiers[writeTupleIndex] = identifier;
 213  0
                 urls[writeTupleIndex] = url;
 214  0
                 textLengths[writeTupleIndex] = textLength;
 215  0
                 writeTupleIndex++;
 216  0
             }
 217  
             public void resetData() {
 218  0
                 writeTupleIndex = 0;
 219  0
             }                  
 220  
                                  
 221  
             public void resetRead() {
 222  0
                 readTupleIndex = 0;
 223  0
             } 
 224  
 
 225  
             public void reset() {
 226  0
                 resetData();
 227  0
                 resetRead();
 228  0
             } 
 229  
             public boolean isFull() {
 230  0
                 return writeTupleIndex >= batchSize;
 231  
             }
 232  
 
 233  
             public boolean isEmpty() {
 234  0
                 return writeTupleIndex == 0;
 235  
             }                          
 236  
 
 237  
             public boolean isAtEnd() {
 238  0
                 return readTupleIndex >= writeTupleIndex;
 239  
             }           
 240  
             public void incrementTuple() {
 241  0
                 readTupleIndex++;
 242  0
             }                    
 243  
             public int getReadIndex() {
 244  0
                 return readTupleIndex;
 245  
             }   
 246  
 
 247  
             public int getWriteIndex() {
 248  0
                 return writeTupleIndex;
 249  
             } 
 250  
             public String getIdentifier() {
 251  0
                 assert readTupleIndex < writeTupleIndex;
 252  0
                 return identifiers[readTupleIndex];
 253  
             }                                         
 254  
             public String getUrl() {
 255  0
                 assert readTupleIndex < writeTupleIndex;
 256  0
                 return urls[readTupleIndex];
 257  
             }                                         
 258  
             public int getTextLength() {
 259  0
                 assert readTupleIndex < writeTupleIndex;
 260  0
                 return textLengths[readTupleIndex];
 261  
             }                                         
 262  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 263  0
                 while (getReadIndex() < endIndex) {
 264  0
                    output.processTuple(getIdentifier(), getUrl(), getTextLength());
 265  0
                    incrementTuple();
 266  
                 }
 267  0
             }                                                                           
 268  
              
 269  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 270  0
             }
 271  
             
 272  
         }                         
 273  0
         public static class ShreddedCombiner implements ReaderSource<DocumentData>, ShreddedSource {   
 274  
             public ShreddedProcessor processor;
 275  
             Collection<ShreddedReader> readers;       
 276  0
             boolean closeOnExit = false;
 277  0
             boolean uninitialized = true;
 278  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 279  
             
 280  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 281  0
                 this.readers = readers;                                                       
 282  0
                 this.closeOnExit = closeOnExit;
 283  0
             }
 284  
                                   
 285  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 286  0
                 if (processor instanceof ShreddedProcessor) {
 287  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 288  0
                 } else if (processor instanceof DocumentData.Processor) {
 289  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
 290  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 291  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
 292  
                 } else {
 293  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 294  
                 }
 295  0
             }                                
 296  
             
 297  
             public Class<DocumentData> getOutputClass() {
 298  0
                 return DocumentData.class;
 299  
             }
 300  
             
 301  
             public void initialize() throws IOException {
 302  0
                 for (ShreddedReader reader : readers) {
 303  0
                     reader.fill();                                        
 304  
                     
 305  0
                     if (!reader.getBuffer().isAtEnd())
 306  0
                         queue.add(reader);
 307  
                 }   
 308  
 
 309  0
                 uninitialized = false;
 310  0
             }
 311  
 
 312  
             public void run() throws IOException {
 313  0
                 initialize();
 314  
                
 315  0
                 while (queue.size() > 0) {
 316  0
                     ShreddedReader top = queue.poll();
 317  0
                     ShreddedReader next = null;
 318  0
                     ShreddedBuffer nextBuffer = null; 
 319  
                     
 320  0
                     assert !top.getBuffer().isAtEnd();
 321  
                                                   
 322  0
                     if (queue.size() > 0) {
 323  0
                         next = queue.peek();
 324  0
                         nextBuffer = next.getBuffer();
 325  0
                         assert !nextBuffer.isAtEnd();
 326  
                     }
 327  
                     
 328  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 329  0
                     if (top.getBuffer().isAtEnd())
 330  0
                         top.fill();                 
 331  
                         
 332  0
                     if (!top.getBuffer().isAtEnd())
 333  0
                         queue.add(top);
 334  0
                 }              
 335  
                 
 336  0
                 if (closeOnExit)
 337  0
                     processor.close();
 338  0
             }
 339  
 
 340  
             public DocumentData read() throws IOException {
 341  0
                 if (uninitialized)
 342  0
                     initialize();
 343  
 
 344  0
                 DocumentData result = null;
 345  
 
 346  0
                 while (queue.size() > 0) {
 347  0
                     ShreddedReader top = queue.poll();
 348  0
                     result = top.read();
 349  
 
 350  0
                     if (result != null) {
 351  0
                         if (top.getBuffer().isAtEnd())
 352  0
                             top.fill();
 353  
 
 354  0
                         queue.offer(top);
 355  0
                         break;
 356  
                     } 
 357  0
                 }
 358  
 
 359  0
                 return result;
 360  
             }
 361  
         } 
 362  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentData>, ShreddedSource {      
 363  
             public ShreddedProcessor processor;
 364  
             ShreddedBuffer buffer;
 365  0
             DocumentData last = new DocumentData();         
 366  0
             long tupleCount = 0;
 367  0
             long bufferStartCount = 0;  
 368  
             ArrayInput input;
 369  
             
 370  0
             public ShreddedReader(ArrayInput input) {
 371  0
                 this.input = input; 
 372  0
                 this.buffer = new ShreddedBuffer();
 373  0
             }                               
 374  
             
 375  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 376  0
                 this.input = input;
 377  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 378  0
             }
 379  
                  
 380  
             public final int compareTo(ShreddedReader other) {
 381  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 382  
                 
 383  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 384  0
                     return 0;                 
 385  0
                 } else if (buffer.isAtEnd()) {
 386  0
                     return -1;
 387  0
                 } else if (otherBuffer.isAtEnd()) {
 388  0
                     return 1;
 389  
                 }
 390  
                                    
 391  0
                 int result = 0;
 392  
                 do {
 393  
                 } while (false);                                             
 394  
                 
 395  0
                 return result;
 396  
             }
 397  
             
 398  
             public final ShreddedBuffer getBuffer() {
 399  0
                 return buffer;
 400  
             }                
 401  
             
 402  
             public final DocumentData read() throws IOException {
 403  0
                 if (buffer.isAtEnd()) {
 404  0
                     fill();             
 405  
                 
 406  0
                     if (buffer.isAtEnd()) {
 407  0
                         return null;
 408  
                     }
 409  
                 }
 410  
                       
 411  0
                 assert !buffer.isAtEnd();
 412  0
                 DocumentData result = new DocumentData();
 413  
                 
 414  0
                 result.identifier = buffer.getIdentifier();
 415  0
                 result.url = buffer.getUrl();
 416  0
                 result.textLength = buffer.getTextLength();
 417  
                 
 418  0
                 buffer.incrementTuple();
 419  
                 
 420  0
                 return result;
 421  
             }           
 422  
             
 423  
             public final void fill() throws IOException {
 424  
                 try {   
 425  0
                     buffer.reset();
 426  
                     
 427  0
                     if (tupleCount != 0) {
 428  0
                         bufferStartCount = tupleCount;
 429  
                     }
 430  
                     
 431  0
                     while (!buffer.isFull()) {
 432  0
                         buffer.processTuple(input.readString(), input.readString(), input.readInt());
 433  0
                         tupleCount++;
 434  
                     }
 435  0
                 } catch(EOFException e) {}
 436  0
             }
 437  
 
 438  
 
 439  
             public void run() throws IOException {
 440  
                 while (true) {
 441  0
                     fill();
 442  
                     
 443  0
                     if (buffer.isAtEnd())
 444  0
                         break;
 445  
                     
 446  0
                     buffer.copyUntil(null, processor);
 447  
                 }      
 448  0
                 processor.close();
 449  0
             }
 450  
             
 451  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 452  0
                 if (processor instanceof ShreddedProcessor) {
 453  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 454  0
                 } else if (processor instanceof DocumentData.Processor) {
 455  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
 456  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 457  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
 458  
                 } else {
 459  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 460  
                 }
 461  0
             }                                
 462  
             
 463  
             public Class<DocumentData> getOutputClass() {
 464  0
                 return DocumentData.class;
 465  
             }                
 466  
         }
 467  
         
 468  
         public static class DuplicateEliminator implements ShreddedProcessor {
 469  
             public ShreddedProcessor processor;
 470  0
             DocumentData last = new DocumentData();
 471  
                                            
 472  0
             public DuplicateEliminator() {}
 473  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 474  0
                 this.processor = processor;
 475  0
             }
 476  
             
 477  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 478  0
                 this.processor = processor;
 479  0
             }
 480  
 
 481  
           
 482  
             
 483  
                                
 484  
             public void processTuple(String identifier, String url, int textLength) throws IOException {
 485  0
                 processor.processTuple(identifier, url, textLength);
 486  0
             } 
 487  
             
 488  
             public void close() throws IOException {
 489  0
                 processor.close();
 490  0
             }                    
 491  
         }
 492  
         public static class TupleUnshredder implements ShreddedProcessor {
 493  0
             DocumentData last = new DocumentData();
 494  
             public org.galagosearch.tupleflow.Processor<DocumentData> processor;                               
 495  
             
 496  0
             public TupleUnshredder(DocumentData.Processor processor) {
 497  0
                 this.processor = processor;
 498  0
             }         
 499  
             
 500  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentData> processor) {
 501  0
                 this.processor = processor;
 502  0
             }
 503  
             
 504  
             public DocumentData clone(DocumentData object) {
 505  0
                 DocumentData result = new DocumentData();
 506  0
                 if (object == null) return result;
 507  0
                 result.identifier = object.identifier; 
 508  0
                 result.url = object.url; 
 509  0
                 result.textLength = object.textLength; 
 510  0
                 return result;
 511  
             }                 
 512  
             
 513  
             
 514  
             public void processTuple(String identifier, String url, int textLength) throws IOException {
 515  0
                 last.identifier = identifier;
 516  0
                 last.url = url;
 517  0
                 last.textLength = textLength;
 518  0
                 processor.process(clone(last));
 519  0
             }               
 520  
             
 521  
             public void close() throws IOException {
 522  0
                 processor.close();
 523  0
             }
 524  
         }     
 525  0
         public static class TupleShredder implements Processor {
 526  0
             DocumentData last = new DocumentData();
 527  
             public ShreddedProcessor processor;
 528  
             
 529  0
             public TupleShredder(ShreddedProcessor processor) {
 530  0
                 this.processor = processor;
 531  0
             }                              
 532  
             
 533  
             public DocumentData clone(DocumentData object) {
 534  0
                 DocumentData result = new DocumentData();
 535  0
                 if (object == null) return result;
 536  0
                 result.identifier = object.identifier; 
 537  0
                 result.url = object.url; 
 538  0
                 result.textLength = object.textLength; 
 539  0
                 return result;
 540  
             }                 
 541  
             
 542  
             public void process(DocumentData object) throws IOException {                                                                                                                                                   
 543  0
                 boolean processAll = false;
 544  0
                 processor.processTuple(object.identifier, object.url, object.textLength);                                         
 545  0
             }
 546  
                           
 547  
             public Class<DocumentData> getInputClass() {
 548  0
                 return DocumentData.class;
 549  
             }
 550  
             
 551  
             public void close() throws IOException {
 552  0
                 processor.close();
 553  0
             }                     
 554  
         }
 555  
     } 
 556  36
     public static class UrlOrder implements Order<DocumentData> {
 557  
         public int hash(DocumentData object) {
 558  0
             int h = 0;
 559  0
             h += Utility.hash(object.url);
 560  0
             return h;
 561  
         } 
 562  
         public Comparator<DocumentData> greaterThan() {
 563  0
             return new Comparator<DocumentData>() {
 564  0
                 public int compare(DocumentData one, DocumentData two) {
 565  0
                     int result = 0;
 566  
                     do {
 567  0
                         result = + Utility.compare(one.url, two.url);
 568  0
                         if(result != 0) break;
 569  
                     } while (false);
 570  0
                     return -result;
 571  
                 }
 572  
             };
 573  
         }     
 574  
         public Comparator<DocumentData> lessThan() {
 575  0
             return new Comparator<DocumentData>() {
 576  0
                 public int compare(DocumentData one, DocumentData two) {
 577  0
                     int result = 0;
 578  
                     do {
 579  0
                         result = + Utility.compare(one.url, two.url);
 580  0
                         if(result != 0) break;
 581  
                     } while (false);
 582  0
                     return result;
 583  
                 }
 584  
             };
 585  
         }     
 586  
         public TypeReader<DocumentData> orderedReader(ArrayInput _input) {
 587  0
             return new ShreddedReader(_input);
 588  
         }    
 589  
 
 590  
         public TypeReader<DocumentData> orderedReader(ArrayInput _input, int bufferSize) {
 591  0
             return new ShreddedReader(_input, bufferSize);
 592  
         }    
 593  
         public OrderedWriter<DocumentData> orderedWriter(ArrayOutput _output) {
 594  0
             ShreddedWriter w = new ShreddedWriter(_output);
 595  0
             return new OrderedWriterClass(w); 
 596  
         }                                    
 597  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentData > {
 598  0
             DocumentData last = null;
 599  0
             ShreddedWriter shreddedWriter = null; 
 600  
             
 601  0
             public OrderedWriterClass(ShreddedWriter s) {
 602  0
                 this.shreddedWriter = s;
 603  0
             }
 604  
             
 605  
             public void process(DocumentData object) throws IOException {
 606  0
                boolean processAll = false;
 607  0
                if (processAll || last == null || 0 != Utility.compare(object.url, last.url)) { processAll = true; shreddedWriter.processUrl(object.url); }
 608  0
                shreddedWriter.processTuple(object.identifier, object.textLength);
 609  0
                last = object;
 610  0
             }           
 611  
                  
 612  
             public void close() throws IOException {
 613  0
                 shreddedWriter.close();
 614  0
             }
 615  
             
 616  
             public Class<DocumentData> getInputClass() {
 617  0
                 return DocumentData.class;
 618  
             }
 619  
         } 
 620  
         public ReaderSource<DocumentData> orderedCombiner(Collection<TypeReader<DocumentData>> readers, boolean closeOnExit) {
 621  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 622  
             
 623  0
             for (TypeReader<DocumentData> reader : readers) {
 624  0
                 shreddedReaders.add((ShreddedReader)reader);
 625  
             }
 626  
             
 627  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 628  
         }                  
 629  
         public DocumentData clone(DocumentData object) {
 630  0
             DocumentData result = new DocumentData();
 631  0
             if (object == null) return result;
 632  0
             result.identifier = object.identifier; 
 633  0
             result.url = object.url; 
 634  0
             result.textLength = object.textLength; 
 635  0
             return result;
 636  
         }                 
 637  
         public Class<DocumentData> getOrderedClass() {
 638  28
             return DocumentData.class;
 639  
         }                           
 640  
         public String[] getOrderSpec() {
 641  28
             return new String[] {"+url"};
 642  
         }
 643  
 
 644  
         public static String getSpecString() {
 645  0
             return "+url";
 646  
         }
 647  
                            
 648  
         public interface ShreddedProcessor extends Step {
 649  
             public void processUrl(String url) throws IOException;
 650  
             public void processTuple(String identifier, int textLength) throws IOException;
 651  
             public void close() throws IOException;
 652  
         }    
 653  
         public interface ShreddedSource extends Step {
 654  
         }                                              
 655  
         
 656  0
         public static class ShreddedWriter implements ShreddedProcessor {
 657  
             ArrayOutput output;
 658  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 659  
             String lastUrl;
 660  0
             boolean lastFlush = false;
 661  
             
 662  0
             public ShreddedWriter(ArrayOutput output) {
 663  0
                 this.output = output;
 664  0
             }                        
 665  
             
 666  
             public void close() throws IOException {
 667  0
                 flush();
 668  0
             }
 669  
             
 670  
             public void processUrl(String url) {
 671  0
                 lastUrl = url;
 672  0
                 buffer.processUrl(url);
 673  0
             }
 674  
             public final void processTuple(String identifier, int textLength) throws IOException {
 675  0
                 if (lastFlush) {
 676  0
                     if(buffer.urls.size() == 0) buffer.processUrl(lastUrl);
 677  0
                     lastFlush = false;
 678  
                 }
 679  0
                 buffer.processTuple(identifier, textLength);
 680  0
                 if (buffer.isFull())
 681  0
                     flush();
 682  0
             }
 683  
             public final void flushTuples(int pauseIndex) throws IOException {
 684  
                 
 685  0
                 while (buffer.getReadIndex() < pauseIndex) {
 686  
                            
 687  0
                     output.writeString(buffer.getIdentifier());
 688  0
                     output.writeInt(buffer.getTextLength());
 689  0
                     buffer.incrementTuple();
 690  
                 }
 691  0
             }  
 692  
             public final void flushUrl(int pauseIndex) throws IOException {
 693  0
                 while (buffer.getReadIndex() < pauseIndex) {
 694  0
                     int nextPause = buffer.getUrlEndIndex();
 695  0
                     int count = nextPause - buffer.getReadIndex();
 696  
                     
 697  0
                     output.writeString(buffer.getUrl());
 698  0
                     output.writeInt(count);
 699  0
                     buffer.incrementUrl();
 700  
                       
 701  0
                     flushTuples(nextPause);
 702  0
                     assert nextPause == buffer.getReadIndex();
 703  0
                 }
 704  0
             }
 705  
             public void flush() throws IOException { 
 706  0
                 flushUrl(buffer.getWriteIndex());
 707  0
                 buffer.reset(); 
 708  0
                 lastFlush = true;
 709  0
             }                           
 710  
         }
 711  0
         public static class ShreddedBuffer {
 712  0
             ArrayList<String> urls = new ArrayList();
 713  0
             ArrayList<Integer> urlTupleIdx = new ArrayList();
 714  0
             int urlReadIdx = 0;
 715  
                             
 716  
             String[] identifiers;
 717  
             int[] textLengths;
 718  0
             int writeTupleIndex = 0;
 719  0
             int readTupleIndex = 0;
 720  
             int batchSize;
 721  
 
 722  0
             public ShreddedBuffer(int batchSize) {
 723  0
                 this.batchSize = batchSize;
 724  
 
 725  0
                 identifiers = new String[batchSize];
 726  0
                 textLengths = new int[batchSize];
 727  0
             }                              
 728  
 
 729  
             public ShreddedBuffer() {    
 730  0
                 this(10000);
 731  0
             }                                                                                                                    
 732  
             
 733  
             public void processUrl(String url) {
 734  0
                 urls.add(url);
 735  0
                 urlTupleIdx.add(writeTupleIndex);
 736  0
             }                                      
 737  
             public void processTuple(String identifier, int textLength) {
 738  0
                 assert urls.size() > 0;
 739  0
                 identifiers[writeTupleIndex] = identifier;
 740  0
                 textLengths[writeTupleIndex] = textLength;
 741  0
                 writeTupleIndex++;
 742  0
             }
 743  
             public void resetData() {
 744  0
                 urls.clear();
 745  0
                 urlTupleIdx.clear();
 746  0
                 writeTupleIndex = 0;
 747  0
             }                  
 748  
                                  
 749  
             public void resetRead() {
 750  0
                 readTupleIndex = 0;
 751  0
                 urlReadIdx = 0;
 752  0
             } 
 753  
 
 754  
             public void reset() {
 755  0
                 resetData();
 756  0
                 resetRead();
 757  0
             } 
 758  
             public boolean isFull() {
 759  0
                 return writeTupleIndex >= batchSize;
 760  
             }
 761  
 
 762  
             public boolean isEmpty() {
 763  0
                 return writeTupleIndex == 0;
 764  
             }                          
 765  
 
 766  
             public boolean isAtEnd() {
 767  0
                 return readTupleIndex >= writeTupleIndex;
 768  
             }           
 769  
             public void incrementUrl() {
 770  0
                 urlReadIdx++;  
 771  0
             }                                                                                              
 772  
 
 773  
             public void autoIncrementUrl() {
 774  0
                 while (readTupleIndex >= getUrlEndIndex() && readTupleIndex < writeTupleIndex)
 775  0
                     urlReadIdx++;
 776  0
             }                 
 777  
             public void incrementTuple() {
 778  0
                 readTupleIndex++;
 779  0
             }                    
 780  
             public int getUrlEndIndex() {
 781  0
                 if ((urlReadIdx+1) >= urlTupleIdx.size())
 782  0
                     return writeTupleIndex;
 783  0
                 return urlTupleIdx.get(urlReadIdx+1);
 784  
             }
 785  
             public int getReadIndex() {
 786  0
                 return readTupleIndex;
 787  
             }   
 788  
 
 789  
             public int getWriteIndex() {
 790  0
                 return writeTupleIndex;
 791  
             } 
 792  
             public String getUrl() {
 793  0
                 assert readTupleIndex < writeTupleIndex;
 794  0
                 assert urlReadIdx < urls.size();
 795  
                 
 796  0
                 return urls.get(urlReadIdx);
 797  
             }
 798  
             public String getIdentifier() {
 799  0
                 assert readTupleIndex < writeTupleIndex;
 800  0
                 return identifiers[readTupleIndex];
 801  
             }                                         
 802  
             public int getTextLength() {
 803  0
                 assert readTupleIndex < writeTupleIndex;
 804  0
                 return textLengths[readTupleIndex];
 805  
             }                                         
 806  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 807  0
                 while (getReadIndex() < endIndex) {
 808  0
                    output.processTuple(getIdentifier(), getTextLength());
 809  0
                    incrementTuple();
 810  
                 }
 811  0
             }                                                                           
 812  
             public void copyUntilIndexUrl(int endIndex, ShreddedProcessor output) throws IOException {
 813  0
                 while (getReadIndex() < endIndex) {
 814  0
                     output.processUrl(getUrl());
 815  0
                     assert getUrlEndIndex() <= endIndex;
 816  0
                     copyTuples(getUrlEndIndex(), output);
 817  0
                     incrementUrl();
 818  
                 }
 819  0
             }  
 820  
             public void copyUntilUrl(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 821  0
                 while (!isAtEnd()) {
 822  0
                     if (other != null) {   
 823  0
                         assert !other.isAtEnd();
 824  0
                         int c = + Utility.compare(getUrl(), other.getUrl());
 825  
                     
 826  0
                         if (c > 0) {
 827  0
                             break;   
 828  
                         }
 829  
                         
 830  0
                         output.processUrl(getUrl());
 831  
                                       
 832  0
                         copyTuples(getUrlEndIndex(), output);
 833  0
                     } else {
 834  0
                         output.processUrl(getUrl());
 835  0
                         copyTuples(getUrlEndIndex(), output);
 836  
                     }
 837  0
                     incrementUrl();  
 838  
                     
 839  
                
 840  
                 }
 841  0
             }
 842  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 843  0
                 copyUntilUrl(other, output);
 844  0
             }
 845  
             
 846  
         }                         
 847  0
         public static class ShreddedCombiner implements ReaderSource<DocumentData>, ShreddedSource {   
 848  
             public ShreddedProcessor processor;
 849  
             Collection<ShreddedReader> readers;       
 850  0
             boolean closeOnExit = false;
 851  0
             boolean uninitialized = true;
 852  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 853  
             
 854  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 855  0
                 this.readers = readers;                                                       
 856  0
                 this.closeOnExit = closeOnExit;
 857  0
             }
 858  
                                   
 859  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 860  0
                 if (processor instanceof ShreddedProcessor) {
 861  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 862  0
                 } else if (processor instanceof DocumentData.Processor) {
 863  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
 864  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 865  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
 866  
                 } else {
 867  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 868  
                 }
 869  0
             }                                
 870  
             
 871  
             public Class<DocumentData> getOutputClass() {
 872  0
                 return DocumentData.class;
 873  
             }
 874  
             
 875  
             public void initialize() throws IOException {
 876  0
                 for (ShreddedReader reader : readers) {
 877  0
                     reader.fill();                                        
 878  
                     
 879  0
                     if (!reader.getBuffer().isAtEnd())
 880  0
                         queue.add(reader);
 881  
                 }   
 882  
 
 883  0
                 uninitialized = false;
 884  0
             }
 885  
 
 886  
             public void run() throws IOException {
 887  0
                 initialize();
 888  
                
 889  0
                 while (queue.size() > 0) {
 890  0
                     ShreddedReader top = queue.poll();
 891  0
                     ShreddedReader next = null;
 892  0
                     ShreddedBuffer nextBuffer = null; 
 893  
                     
 894  0
                     assert !top.getBuffer().isAtEnd();
 895  
                                                   
 896  0
                     if (queue.size() > 0) {
 897  0
                         next = queue.peek();
 898  0
                         nextBuffer = next.getBuffer();
 899  0
                         assert !nextBuffer.isAtEnd();
 900  
                     }
 901  
                     
 902  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 903  0
                     if (top.getBuffer().isAtEnd())
 904  0
                         top.fill();                 
 905  
                         
 906  0
                     if (!top.getBuffer().isAtEnd())
 907  0
                         queue.add(top);
 908  0
                 }              
 909  
                 
 910  0
                 if (closeOnExit)
 911  0
                     processor.close();
 912  0
             }
 913  
 
 914  
             public DocumentData read() throws IOException {
 915  0
                 if (uninitialized)
 916  0
                     initialize();
 917  
 
 918  0
                 DocumentData result = null;
 919  
 
 920  0
                 while (queue.size() > 0) {
 921  0
                     ShreddedReader top = queue.poll();
 922  0
                     result = top.read();
 923  
 
 924  0
                     if (result != null) {
 925  0
                         if (top.getBuffer().isAtEnd())
 926  0
                             top.fill();
 927  
 
 928  0
                         queue.offer(top);
 929  0
                         break;
 930  
                     } 
 931  0
                 }
 932  
 
 933  0
                 return result;
 934  
             }
 935  
         } 
 936  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentData>, ShreddedSource {      
 937  
             public ShreddedProcessor processor;
 938  
             ShreddedBuffer buffer;
 939  0
             DocumentData last = new DocumentData();         
 940  0
             long updateUrlCount = -1;
 941  0
             long tupleCount = 0;
 942  0
             long bufferStartCount = 0;  
 943  
             ArrayInput input;
 944  
             
 945  0
             public ShreddedReader(ArrayInput input) {
 946  0
                 this.input = input; 
 947  0
                 this.buffer = new ShreddedBuffer();
 948  0
             }                               
 949  
             
 950  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 951  0
                 this.input = input;
 952  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 953  0
             }
 954  
                  
 955  
             public final int compareTo(ShreddedReader other) {
 956  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 957  
                 
 958  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 959  0
                     return 0;                 
 960  0
                 } else if (buffer.isAtEnd()) {
 961  0
                     return -1;
 962  0
                 } else if (otherBuffer.isAtEnd()) {
 963  0
                     return 1;
 964  
                 }
 965  
                                    
 966  0
                 int result = 0;
 967  
                 do {
 968  0
                     result = + Utility.compare(buffer.getUrl(), otherBuffer.getUrl());
 969  0
                     if(result != 0) break;
 970  
                 } while (false);                                             
 971  
                 
 972  0
                 return result;
 973  
             }
 974  
             
 975  
             public final ShreddedBuffer getBuffer() {
 976  0
                 return buffer;
 977  
             }                
 978  
             
 979  
             public final DocumentData read() throws IOException {
 980  0
                 if (buffer.isAtEnd()) {
 981  0
                     fill();             
 982  
                 
 983  0
                     if (buffer.isAtEnd()) {
 984  0
                         return null;
 985  
                     }
 986  
                 }
 987  
                       
 988  0
                 assert !buffer.isAtEnd();
 989  0
                 DocumentData result = new DocumentData();
 990  
                 
 991  0
                 result.url = buffer.getUrl();
 992  0
                 result.identifier = buffer.getIdentifier();
 993  0
                 result.textLength = buffer.getTextLength();
 994  
                 
 995  0
                 buffer.incrementTuple();
 996  0
                 buffer.autoIncrementUrl();
 997  
                 
 998  0
                 return result;
 999  
             }           
 1000  
             
 1001  
             public final void fill() throws IOException {
 1002  
                 try {   
 1003  0
                     buffer.reset();
 1004  
                     
 1005  0
                     if (tupleCount != 0) {
 1006  
                                                       
 1007  0
                         if(updateUrlCount - tupleCount > 0) {
 1008  0
                             buffer.urls.add(last.url);
 1009  0
                             buffer.urlTupleIdx.add((int) (updateUrlCount - tupleCount));
 1010  
                         }
 1011  0
                         bufferStartCount = tupleCount;
 1012  
                     }
 1013  
                     
 1014  0
                     while (!buffer.isFull()) {
 1015  0
                         updateUrl();
 1016  0
                         buffer.processTuple(input.readString(), input.readInt());
 1017  0
                         tupleCount++;
 1018  
                     }
 1019  0
                 } catch(EOFException e) {}
 1020  0
             }
 1021  
 
 1022  
             public final void updateUrl() throws IOException {
 1023  0
                 if (updateUrlCount > tupleCount)
 1024  0
                     return;
 1025  
                      
 1026  0
                 last.url = input.readString();
 1027  0
                 updateUrlCount = tupleCount + input.readInt();
 1028  
                                       
 1029  0
                 buffer.processUrl(last.url);
 1030  0
             }
 1031  
 
 1032  
             public void run() throws IOException {
 1033  
                 while (true) {
 1034  0
                     fill();
 1035  
                     
 1036  0
                     if (buffer.isAtEnd())
 1037  0
                         break;
 1038  
                     
 1039  0
                     buffer.copyUntil(null, processor);
 1040  
                 }      
 1041  0
                 processor.close();
 1042  0
             }
 1043  
             
 1044  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1045  0
                 if (processor instanceof ShreddedProcessor) {
 1046  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1047  0
                 } else if (processor instanceof DocumentData.Processor) {
 1048  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
 1049  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1050  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
 1051  
                 } else {
 1052  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1053  
                 }
 1054  0
             }                                
 1055  
             
 1056  
             public Class<DocumentData> getOutputClass() {
 1057  0
                 return DocumentData.class;
 1058  
             }                
 1059  
         }
 1060  
         
 1061  
         public static class DuplicateEliminator implements ShreddedProcessor {
 1062  
             public ShreddedProcessor processor;
 1063  0
             DocumentData last = new DocumentData();
 1064  0
             boolean urlProcess = true;
 1065  
                                            
 1066  0
             public DuplicateEliminator() {}
 1067  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 1068  0
                 this.processor = processor;
 1069  0
             }
 1070  
             
 1071  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 1072  0
                 this.processor = processor;
 1073  0
             }
 1074  
 
 1075  
             public void processUrl(String url) throws IOException {  
 1076  0
                 if (urlProcess || Utility.compare(url, last.url) != 0) {
 1077  0
                     last.url = url;
 1078  0
                     processor.processUrl(url);
 1079  0
                     urlProcess = false;
 1080  
                 }
 1081  0
             }  
 1082  
             
 1083  
             public void resetUrl() {
 1084  0
                  urlProcess = true;
 1085  0
             }                                                
 1086  
                                
 1087  
             public void processTuple(String identifier, int textLength) throws IOException {
 1088  0
                 processor.processTuple(identifier, textLength);
 1089  0
             } 
 1090  
             
 1091  
             public void close() throws IOException {
 1092  0
                 processor.close();
 1093  0
             }                    
 1094  
         }
 1095  
         public static class TupleUnshredder implements ShreddedProcessor {
 1096  0
             DocumentData last = new DocumentData();
 1097  
             public org.galagosearch.tupleflow.Processor<DocumentData> processor;                               
 1098  
             
 1099  0
             public TupleUnshredder(DocumentData.Processor processor) {
 1100  0
                 this.processor = processor;
 1101  0
             }         
 1102  
             
 1103  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentData> processor) {
 1104  0
                 this.processor = processor;
 1105  0
             }
 1106  
             
 1107  
             public DocumentData clone(DocumentData object) {
 1108  0
                 DocumentData result = new DocumentData();
 1109  0
                 if (object == null) return result;
 1110  0
                 result.identifier = object.identifier; 
 1111  0
                 result.url = object.url; 
 1112  0
                 result.textLength = object.textLength; 
 1113  0
                 return result;
 1114  
             }                 
 1115  
             
 1116  
             public void processUrl(String url) throws IOException {
 1117  0
                 last.url = url;
 1118  0
             }   
 1119  
                 
 1120  
             
 1121  
             public void processTuple(String identifier, int textLength) throws IOException {
 1122  0
                 last.identifier = identifier;
 1123  0
                 last.textLength = textLength;
 1124  0
                 processor.process(clone(last));
 1125  0
             }               
 1126  
             
 1127  
             public void close() throws IOException {
 1128  0
                 processor.close();
 1129  0
             }
 1130  
         }     
 1131  36
         public static class TupleShredder implements Processor {
 1132  0
             DocumentData last = new DocumentData();
 1133  
             public ShreddedProcessor processor;
 1134  
             
 1135  0
             public TupleShredder(ShreddedProcessor processor) {
 1136  0
                 this.processor = processor;
 1137  0
             }                              
 1138  
             
 1139  
             public DocumentData clone(DocumentData object) {
 1140  0
                 DocumentData result = new DocumentData();
 1141  0
                 if (object == null) return result;
 1142  0
                 result.identifier = object.identifier; 
 1143  0
                 result.url = object.url; 
 1144  0
                 result.textLength = object.textLength; 
 1145  0
                 return result;
 1146  
             }                 
 1147  
             
 1148  
             public void process(DocumentData object) throws IOException {                                                                                                                                                   
 1149  0
                 boolean processAll = false;
 1150  0
                 if(last == null || Utility.compare(last.url, object.url) != 0 || processAll) { processor.processUrl(object.url); processAll = true; }
 1151  0
                 processor.processTuple(object.identifier, object.textLength);                                         
 1152  0
             }
 1153  
                           
 1154  
             public Class<DocumentData> getInputClass() {
 1155  0
                 return DocumentData.class;
 1156  
             }
 1157  
             
 1158  
             public void close() throws IOException {
 1159  0
                 processor.close();
 1160  0
             }                     
 1161  
         }
 1162  
     } 
 1163  96
     public static class IdentifierOrder implements Order<DocumentData> {
 1164  
         public int hash(DocumentData object) {
 1165  0
             int h = 0;
 1166  0
             h += Utility.hash(object.identifier);
 1167  0
             return h;
 1168  
         } 
 1169  
         public Comparator<DocumentData> greaterThan() {
 1170  0
             return new Comparator<DocumentData>() {
 1171  0
                 public int compare(DocumentData one, DocumentData two) {
 1172  0
                     int result = 0;
 1173  
                     do {
 1174  0
                         result = + Utility.compare(one.identifier, two.identifier);
 1175  0
                         if(result != 0) break;
 1176  
                     } while (false);
 1177  0
                     return -result;
 1178  
                 }
 1179  
             };
 1180  
         }     
 1181  
         public Comparator<DocumentData> lessThan() {
 1182  0
             return new Comparator<DocumentData>() {
 1183  0
                 public int compare(DocumentData one, DocumentData two) {
 1184  0
                     int result = 0;
 1185  
                     do {
 1186  0
                         result = + Utility.compare(one.identifier, two.identifier);
 1187  0
                         if(result != 0) break;
 1188  
                     } while (false);
 1189  0
                     return result;
 1190  
                 }
 1191  
             };
 1192  
         }     
 1193  
         public TypeReader<DocumentData> orderedReader(ArrayInput _input) {
 1194  0
             return new ShreddedReader(_input);
 1195  
         }    
 1196  
 
 1197  
         public TypeReader<DocumentData> orderedReader(ArrayInput _input, int bufferSize) {
 1198  0
             return new ShreddedReader(_input, bufferSize);
 1199  
         }    
 1200  
         public OrderedWriter<DocumentData> orderedWriter(ArrayOutput _output) {
 1201  0
             ShreddedWriter w = new ShreddedWriter(_output);
 1202  0
             return new OrderedWriterClass(w); 
 1203  
         }                                    
 1204  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentData > {
 1205  0
             DocumentData last = null;
 1206  0
             ShreddedWriter shreddedWriter = null; 
 1207  
             
 1208  0
             public OrderedWriterClass(ShreddedWriter s) {
 1209  0
                 this.shreddedWriter = s;
 1210  0
             }
 1211  
             
 1212  
             public void process(DocumentData object) throws IOException {
 1213  0
                boolean processAll = false;
 1214  0
                if (processAll || last == null || 0 != Utility.compare(object.identifier, last.identifier)) { processAll = true; shreddedWriter.processIdentifier(object.identifier); }
 1215  0
                shreddedWriter.processTuple(object.url, object.textLength);
 1216  0
                last = object;
 1217  0
             }           
 1218  
                  
 1219  
             public void close() throws IOException {
 1220  0
                 shreddedWriter.close();
 1221  0
             }
 1222  
             
 1223  
             public Class<DocumentData> getInputClass() {
 1224  0
                 return DocumentData.class;
 1225  
             }
 1226  
         } 
 1227  
         public ReaderSource<DocumentData> orderedCombiner(Collection<TypeReader<DocumentData>> readers, boolean closeOnExit) {
 1228  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 1229  
             
 1230  0
             for (TypeReader<DocumentData> reader : readers) {
 1231  0
                 shreddedReaders.add((ShreddedReader)reader);
 1232  
             }
 1233  
             
 1234  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 1235  
         }                  
 1236  
         public DocumentData clone(DocumentData object) {
 1237  0
             DocumentData result = new DocumentData();
 1238  0
             if (object == null) return result;
 1239  0
             result.identifier = object.identifier; 
 1240  0
             result.url = object.url; 
 1241  0
             result.textLength = object.textLength; 
 1242  0
             return result;
 1243  
         }                 
 1244  
         public Class<DocumentData> getOrderedClass() {
 1245  76
             return DocumentData.class;
 1246  
         }                           
 1247  
         public String[] getOrderSpec() {
 1248  76
             return new String[] {"+identifier"};
 1249  
         }
 1250  
 
 1251  
         public static String getSpecString() {
 1252  0
             return "+identifier";
 1253  
         }
 1254  
                            
 1255  
         public interface ShreddedProcessor extends Step {
 1256  
             public void processIdentifier(String identifier) throws IOException;
 1257  
             public void processTuple(String url, int textLength) throws IOException;
 1258  
             public void close() throws IOException;
 1259  
         }    
 1260  
         public interface ShreddedSource extends Step {
 1261  
         }                                              
 1262  
         
 1263  0
         public static class ShreddedWriter implements ShreddedProcessor {
 1264  
             ArrayOutput output;
 1265  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 1266  
             String lastIdentifier;
 1267  0
             boolean lastFlush = false;
 1268  
             
 1269  0
             public ShreddedWriter(ArrayOutput output) {
 1270  0
                 this.output = output;
 1271  0
             }                        
 1272  
             
 1273  
             public void close() throws IOException {
 1274  0
                 flush();
 1275  0
             }
 1276  
             
 1277  
             public void processIdentifier(String identifier) {
 1278  0
                 lastIdentifier = identifier;
 1279  0
                 buffer.processIdentifier(identifier);
 1280  0
             }
 1281  
             public final void processTuple(String url, int textLength) throws IOException {
 1282  0
                 if (lastFlush) {
 1283  0
                     if(buffer.identifiers.size() == 0) buffer.processIdentifier(lastIdentifier);
 1284  0
                     lastFlush = false;
 1285  
                 }
 1286  0
                 buffer.processTuple(url, textLength);
 1287  0
                 if (buffer.isFull())
 1288  0
                     flush();
 1289  0
             }
 1290  
             public final void flushTuples(int pauseIndex) throws IOException {
 1291  
                 
 1292  0
                 while (buffer.getReadIndex() < pauseIndex) {
 1293  
                            
 1294  0
                     output.writeString(buffer.getUrl());
 1295  0
                     output.writeInt(buffer.getTextLength());
 1296  0
                     buffer.incrementTuple();
 1297  
                 }
 1298  0
             }  
 1299  
             public final void flushIdentifier(int pauseIndex) throws IOException {
 1300  0
                 while (buffer.getReadIndex() < pauseIndex) {
 1301  0
                     int nextPause = buffer.getIdentifierEndIndex();
 1302  0
                     int count = nextPause - buffer.getReadIndex();
 1303  
                     
 1304  0
                     output.writeString(buffer.getIdentifier());
 1305  0
                     output.writeInt(count);
 1306  0
                     buffer.incrementIdentifier();
 1307  
                       
 1308  0
                     flushTuples(nextPause);
 1309  0
                     assert nextPause == buffer.getReadIndex();
 1310  0
                 }
 1311  0
             }
 1312  
             public void flush() throws IOException { 
 1313  0
                 flushIdentifier(buffer.getWriteIndex());
 1314  0
                 buffer.reset(); 
 1315  0
                 lastFlush = true;
 1316  0
             }                           
 1317  
         }
 1318  0
         public static class ShreddedBuffer {
 1319  0
             ArrayList<String> identifiers = new ArrayList();
 1320  0
             ArrayList<Integer> identifierTupleIdx = new ArrayList();
 1321  0
             int identifierReadIdx = 0;
 1322  
                             
 1323  
             String[] urls;
 1324  
             int[] textLengths;
 1325  0
             int writeTupleIndex = 0;
 1326  0
             int readTupleIndex = 0;
 1327  
             int batchSize;
 1328  
 
 1329  0
             public ShreddedBuffer(int batchSize) {
 1330  0
                 this.batchSize = batchSize;
 1331  
 
 1332  0
                 urls = new String[batchSize];
 1333  0
                 textLengths = new int[batchSize];
 1334  0
             }                              
 1335  
 
 1336  
             public ShreddedBuffer() {    
 1337  0
                 this(10000);
 1338  0
             }                                                                                                                    
 1339  
             
 1340  
             public void processIdentifier(String identifier) {
 1341  0
                 identifiers.add(identifier);
 1342  0
                 identifierTupleIdx.add(writeTupleIndex);
 1343  0
             }                                      
 1344  
             public void processTuple(String url, int textLength) {
 1345  0
                 assert identifiers.size() > 0;
 1346  0
                 urls[writeTupleIndex] = url;
 1347  0
                 textLengths[writeTupleIndex] = textLength;
 1348  0
                 writeTupleIndex++;
 1349  0
             }
 1350  
             public void resetData() {
 1351  0
                 identifiers.clear();
 1352  0
                 identifierTupleIdx.clear();
 1353  0
                 writeTupleIndex = 0;
 1354  0
             }                  
 1355  
                                  
 1356  
             public void resetRead() {
 1357  0
                 readTupleIndex = 0;
 1358  0
                 identifierReadIdx = 0;
 1359  0
             } 
 1360  
 
 1361  
             public void reset() {
 1362  0
                 resetData();
 1363  0
                 resetRead();
 1364  0
             } 
 1365  
             public boolean isFull() {
 1366  0
                 return writeTupleIndex >= batchSize;
 1367  
             }
 1368  
 
 1369  
             public boolean isEmpty() {
 1370  0
                 return writeTupleIndex == 0;
 1371  
             }                          
 1372  
 
 1373  
             public boolean isAtEnd() {
 1374  0
                 return readTupleIndex >= writeTupleIndex;
 1375  
             }           
 1376  
             public void incrementIdentifier() {
 1377  0
                 identifierReadIdx++;  
 1378  0
             }                                                                                              
 1379  
 
 1380  
             public void autoIncrementIdentifier() {
 1381  0
                 while (readTupleIndex >= getIdentifierEndIndex() && readTupleIndex < writeTupleIndex)
 1382  0
                     identifierReadIdx++;
 1383  0
             }                 
 1384  
             public void incrementTuple() {
 1385  0
                 readTupleIndex++;
 1386  0
             }                    
 1387  
             public int getIdentifierEndIndex() {
 1388  0
                 if ((identifierReadIdx+1) >= identifierTupleIdx.size())
 1389  0
                     return writeTupleIndex;
 1390  0
                 return identifierTupleIdx.get(identifierReadIdx+1);
 1391  
             }
 1392  
             public int getReadIndex() {
 1393  0
                 return readTupleIndex;
 1394  
             }   
 1395  
 
 1396  
             public int getWriteIndex() {
 1397  0
                 return writeTupleIndex;
 1398  
             } 
 1399  
             public String getIdentifier() {
 1400  0
                 assert readTupleIndex < writeTupleIndex;
 1401  0
                 assert identifierReadIdx < identifiers.size();
 1402  
                 
 1403  0
                 return identifiers.get(identifierReadIdx);
 1404  
             }
 1405  
             public String getUrl() {
 1406  0
                 assert readTupleIndex < writeTupleIndex;
 1407  0
                 return urls[readTupleIndex];
 1408  
             }                                         
 1409  
             public int getTextLength() {
 1410  0
                 assert readTupleIndex < writeTupleIndex;
 1411  0
                 return textLengths[readTupleIndex];
 1412  
             }                                         
 1413  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 1414  0
                 while (getReadIndex() < endIndex) {
 1415  0
                    output.processTuple(getUrl(), getTextLength());
 1416  0
                    incrementTuple();
 1417  
                 }
 1418  0
             }                                                                           
 1419  
             public void copyUntilIndexIdentifier(int endIndex, ShreddedProcessor output) throws IOException {
 1420  0
                 while (getReadIndex() < endIndex) {
 1421  0
                     output.processIdentifier(getIdentifier());
 1422  0
                     assert getIdentifierEndIndex() <= endIndex;
 1423  0
                     copyTuples(getIdentifierEndIndex(), output);
 1424  0
                     incrementIdentifier();
 1425  
                 }
 1426  0
             }  
 1427  
             public void copyUntilIdentifier(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1428  0
                 while (!isAtEnd()) {
 1429  0
                     if (other != null) {   
 1430  0
                         assert !other.isAtEnd();
 1431  0
                         int c = + Utility.compare(getIdentifier(), other.getIdentifier());
 1432  
                     
 1433  0
                         if (c > 0) {
 1434  0
                             break;   
 1435  
                         }
 1436  
                         
 1437  0
                         output.processIdentifier(getIdentifier());
 1438  
                                       
 1439  0
                         copyTuples(getIdentifierEndIndex(), output);
 1440  0
                     } else {
 1441  0
                         output.processIdentifier(getIdentifier());
 1442  0
                         copyTuples(getIdentifierEndIndex(), output);
 1443  
                     }
 1444  0
                     incrementIdentifier();  
 1445  
                     
 1446  
                
 1447  
                 }
 1448  0
             }
 1449  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 1450  0
                 copyUntilIdentifier(other, output);
 1451  0
             }
 1452  
             
 1453  
         }                         
 1454  0
         public static class ShreddedCombiner implements ReaderSource<DocumentData>, ShreddedSource {   
 1455  
             public ShreddedProcessor processor;
 1456  
             Collection<ShreddedReader> readers;       
 1457  0
             boolean closeOnExit = false;
 1458  0
             boolean uninitialized = true;
 1459  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 1460  
             
 1461  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 1462  0
                 this.readers = readers;                                                       
 1463  0
                 this.closeOnExit = closeOnExit;
 1464  0
             }
 1465  
                                   
 1466  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1467  0
                 if (processor instanceof ShreddedProcessor) {
 1468  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1469  0
                 } else if (processor instanceof DocumentData.Processor) {
 1470  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
 1471  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1472  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
 1473  
                 } else {
 1474  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1475  
                 }
 1476  0
             }                                
 1477  
             
 1478  
             public Class<DocumentData> getOutputClass() {
 1479  0
                 return DocumentData.class;
 1480  
             }
 1481  
             
 1482  
             public void initialize() throws IOException {
 1483  0
                 for (ShreddedReader reader : readers) {
 1484  0
                     reader.fill();                                        
 1485  
                     
 1486  0
                     if (!reader.getBuffer().isAtEnd())
 1487  0
                         queue.add(reader);
 1488  
                 }   
 1489  
 
 1490  0
                 uninitialized = false;
 1491  0
             }
 1492  
 
 1493  
             public void run() throws IOException {
 1494  0
                 initialize();
 1495  
                
 1496  0
                 while (queue.size() > 0) {
 1497  0
                     ShreddedReader top = queue.poll();
 1498  0
                     ShreddedReader next = null;
 1499  0
                     ShreddedBuffer nextBuffer = null; 
 1500  
                     
 1501  0
                     assert !top.getBuffer().isAtEnd();
 1502  
                                                   
 1503  0
                     if (queue.size() > 0) {
 1504  0
                         next = queue.peek();
 1505  0
                         nextBuffer = next.getBuffer();
 1506  0
                         assert !nextBuffer.isAtEnd();
 1507  
                     }
 1508  
                     
 1509  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 1510  0
                     if (top.getBuffer().isAtEnd())
 1511  0
                         top.fill();                 
 1512  
                         
 1513  0
                     if (!top.getBuffer().isAtEnd())
 1514  0
                         queue.add(top);
 1515  0
                 }              
 1516  
                 
 1517  0
                 if (closeOnExit)
 1518  0
                     processor.close();
 1519  0
             }
 1520  
 
 1521  
             public DocumentData read() throws IOException {
 1522  0
                 if (uninitialized)
 1523  0
                     initialize();
 1524  
 
 1525  0
                 DocumentData result = null;
 1526  
 
 1527  0
                 while (queue.size() > 0) {
 1528  0
                     ShreddedReader top = queue.poll();
 1529  0
                     result = top.read();
 1530  
 
 1531  0
                     if (result != null) {
 1532  0
                         if (top.getBuffer().isAtEnd())
 1533  0
                             top.fill();
 1534  
 
 1535  0
                         queue.offer(top);
 1536  0
                         break;
 1537  
                     } 
 1538  0
                 }
 1539  
 
 1540  0
                 return result;
 1541  
             }
 1542  
         } 
 1543  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentData>, ShreddedSource {      
 1544  
             public ShreddedProcessor processor;
 1545  
             ShreddedBuffer buffer;
 1546  0
             DocumentData last = new DocumentData();         
 1547  0
             long updateIdentifierCount = -1;
 1548  0
             long tupleCount = 0;
 1549  0
             long bufferStartCount = 0;  
 1550  
             ArrayInput input;
 1551  
             
 1552  0
             public ShreddedReader(ArrayInput input) {
 1553  0
                 this.input = input; 
 1554  0
                 this.buffer = new ShreddedBuffer();
 1555  0
             }                               
 1556  
             
 1557  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 1558  0
                 this.input = input;
 1559  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 1560  0
             }
 1561  
                  
 1562  
             public final int compareTo(ShreddedReader other) {
 1563  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 1564  
                 
 1565  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 1566  0
                     return 0;                 
 1567  0
                 } else if (buffer.isAtEnd()) {
 1568  0
                     return -1;
 1569  0
                 } else if (otherBuffer.isAtEnd()) {
 1570  0
                     return 1;
 1571  
                 }
 1572  
                                    
 1573  0
                 int result = 0;
 1574  
                 do {
 1575  0
                     result = + Utility.compare(buffer.getIdentifier(), otherBuffer.getIdentifier());
 1576  0
                     if(result != 0) break;
 1577  
                 } while (false);                                             
 1578  
                 
 1579  0
                 return result;
 1580  
             }
 1581  
             
 1582  
             public final ShreddedBuffer getBuffer() {
 1583  0
                 return buffer;
 1584  
             }                
 1585  
             
 1586  
             public final DocumentData read() throws IOException {
 1587  0
                 if (buffer.isAtEnd()) {
 1588  0
                     fill();             
 1589  
                 
 1590  0
                     if (buffer.isAtEnd()) {
 1591  0
                         return null;
 1592  
                     }
 1593  
                 }
 1594  
                       
 1595  0
                 assert !buffer.isAtEnd();
 1596  0
                 DocumentData result = new DocumentData();
 1597  
                 
 1598  0
                 result.identifier = buffer.getIdentifier();
 1599  0
                 result.url = buffer.getUrl();
 1600  0
                 result.textLength = buffer.getTextLength();
 1601  
                 
 1602  0
                 buffer.incrementTuple();
 1603  0
                 buffer.autoIncrementIdentifier();
 1604  
                 
 1605  0
                 return result;
 1606  
             }           
 1607  
             
 1608  
             public final void fill() throws IOException {
 1609  
                 try {   
 1610  0
                     buffer.reset();
 1611  
                     
 1612  0
                     if (tupleCount != 0) {
 1613  
                                                       
 1614  0
                         if(updateIdentifierCount - tupleCount > 0) {
 1615  0
                             buffer.identifiers.add(last.identifier);
 1616  0
                             buffer.identifierTupleIdx.add((int) (updateIdentifierCount - tupleCount));
 1617  
                         }
 1618  0
                         bufferStartCount = tupleCount;
 1619  
                     }
 1620  
                     
 1621  0
                     while (!buffer.isFull()) {
 1622  0
                         updateIdentifier();
 1623  0
                         buffer.processTuple(input.readString(), input.readInt());
 1624  0
                         tupleCount++;
 1625  
                     }
 1626  0
                 } catch(EOFException e) {}
 1627  0
             }
 1628  
 
 1629  
             public final void updateIdentifier() throws IOException {
 1630  0
                 if (updateIdentifierCount > tupleCount)
 1631  0
                     return;
 1632  
                      
 1633  0
                 last.identifier = input.readString();
 1634  0
                 updateIdentifierCount = tupleCount + input.readInt();
 1635  
                                       
 1636  0
                 buffer.processIdentifier(last.identifier);
 1637  0
             }
 1638  
 
 1639  
             public void run() throws IOException {
 1640  
                 while (true) {
 1641  0
                     fill();
 1642  
                     
 1643  0
                     if (buffer.isAtEnd())
 1644  0
                         break;
 1645  
                     
 1646  0
                     buffer.copyUntil(null, processor);
 1647  
                 }      
 1648  0
                 processor.close();
 1649  0
             }
 1650  
             
 1651  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1652  0
                 if (processor instanceof ShreddedProcessor) {
 1653  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1654  0
                 } else if (processor instanceof DocumentData.Processor) {
 1655  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
 1656  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1657  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
 1658  
                 } else {
 1659  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1660  
                 }
 1661  0
             }                                
 1662  
             
 1663  
             public Class<DocumentData> getOutputClass() {
 1664  0
                 return DocumentData.class;
 1665  
             }                
 1666  
         }
 1667  
         
 1668  
         public static class DuplicateEliminator implements ShreddedProcessor {
 1669  
             public ShreddedProcessor processor;
 1670  0
             DocumentData last = new DocumentData();
 1671  0
             boolean identifierProcess = true;
 1672  
                                            
 1673  0
             public DuplicateEliminator() {}
 1674  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 1675  0
                 this.processor = processor;
 1676  0
             }
 1677  
             
 1678  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 1679  0
                 this.processor = processor;
 1680  0
             }
 1681  
 
 1682  
             public void processIdentifier(String identifier) throws IOException {  
 1683  0
                 if (identifierProcess || Utility.compare(identifier, last.identifier) != 0) {
 1684  0
                     last.identifier = identifier;
 1685  0
                     processor.processIdentifier(identifier);
 1686  0
                     identifierProcess = false;
 1687  
                 }
 1688  0
             }  
 1689  
             
 1690  
             public void resetIdentifier() {
 1691  0
                  identifierProcess = true;
 1692  0
             }                                                
 1693  
                                
 1694  
             public void processTuple(String url, int textLength) throws IOException {
 1695  0
                 processor.processTuple(url, textLength);
 1696  0
             } 
 1697  
             
 1698  
             public void close() throws IOException {
 1699  0
                 processor.close();
 1700  0
             }                    
 1701  
         }
 1702  
         public static class TupleUnshredder implements ShreddedProcessor {
 1703  0
             DocumentData last = new DocumentData();
 1704  
             public org.galagosearch.tupleflow.Processor<DocumentData> processor;                               
 1705  
             
 1706  0
             public TupleUnshredder(DocumentData.Processor processor) {
 1707  0
                 this.processor = processor;
 1708  0
             }         
 1709  
             
 1710  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentData> processor) {
 1711  0
                 this.processor = processor;
 1712  0
             }
 1713  
             
 1714  
             public DocumentData clone(DocumentData object) {
 1715  0
                 DocumentData result = new DocumentData();
 1716  0
                 if (object == null) return result;
 1717  0
                 result.identifier = object.identifier; 
 1718  0
                 result.url = object.url; 
 1719  0
                 result.textLength = object.textLength; 
 1720  0
                 return result;
 1721  
             }                 
 1722  
             
 1723  
             public void processIdentifier(String identifier) throws IOException {
 1724  0
                 last.identifier = identifier;
 1725  0
             }   
 1726  
                 
 1727  
             
 1728  
             public void processTuple(String url, int textLength) throws IOException {
 1729  0
                 last.url = url;
 1730  0
                 last.textLength = textLength;
 1731  0
                 processor.process(clone(last));
 1732  0
             }               
 1733  
             
 1734  
             public void close() throws IOException {
 1735  0
                 processor.close();
 1736  0
             }
 1737  
         }     
 1738  96
         public static class TupleShredder implements Processor {
 1739  0
             DocumentData last = new DocumentData();
 1740  
             public ShreddedProcessor processor;
 1741  
             
 1742  0
             public TupleShredder(ShreddedProcessor processor) {
 1743  0
                 this.processor = processor;
 1744  0
             }                              
 1745  
             
 1746  
             public DocumentData clone(DocumentData object) {
 1747  0
                 DocumentData result = new DocumentData();
 1748  0
                 if (object == null) return result;
 1749  0
                 result.identifier = object.identifier; 
 1750  0
                 result.url = object.url; 
 1751  0
                 result.textLength = object.textLength; 
 1752  0
                 return result;
 1753  
             }                 
 1754  
             
 1755  
             public void process(DocumentData object) throws IOException {                                                                                                                                                   
 1756  0
                 boolean processAll = false;
 1757  0
                 if(last == null || Utility.compare(last.identifier, object.identifier) != 0 || processAll) { processor.processIdentifier(object.identifier); processAll = true; }
 1758  0
                 processor.processTuple(object.url, object.textLength);                                         
 1759  0
             }
 1760  
                           
 1761  
             public Class<DocumentData> getInputClass() {
 1762  0
                 return DocumentData.class;
 1763  
             }
 1764  
             
 1765  
             public void close() throws IOException {
 1766  0
                 processor.close();
 1767  0
             }                     
 1768  
         }
 1769  
     } 
 1770  
 }