Coverage Report - org.galagosearch.core.types.DocumentSplit
 
Classes in this File Line Coverage Branch Coverage Complexity
DocumentSplit
25%
4/16
50%
2/4
0
DocumentSplit$FileNameStartKeyOrder
15%
4/27
0%
0/4
0
DocumentSplit$FileNameStartKeyOrder$1
0%
0/7
0%
0/4
0
DocumentSplit$FileNameStartKeyOrder$2
0%
0/7
0%
0/4
0
DocumentSplit$FileNameStartKeyOrder$DuplicateEliminator
0%
0/29
0%
0/8
0
DocumentSplit$FileNameStartKeyOrder$OrderedWriterClass
0%
0/15
0%
0/12
0
DocumentSplit$FileNameStartKeyOrder$ShreddedBuffer
0%
0/129
0%
0/94
0
DocumentSplit$FileNameStartKeyOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentSplit$FileNameStartKeyOrder$ShreddedProcessor
N/A
N/A
0
DocumentSplit$FileNameStartKeyOrder$ShreddedReader
0%
0/86
0%
0/40
0
DocumentSplit$FileNameStartKeyOrder$ShreddedSource
N/A
N/A
0
DocumentSplit$FileNameStartKeyOrder$ShreddedWriter
0%
0/52
0%
0/22
0
DocumentSplit$FileNameStartKeyOrder$TupleShredder
0%
0/21
0%
0/14
0
DocumentSplit$FileNameStartKeyOrder$TupleUnshredder
0%
0/26
0%
0/2
0
DocumentSplit$Processor
N/A
N/A
0
DocumentSplit$Source
N/A
N/A
0
DocumentSplit$Unordered
0%
0/25
0%
0/4
0
DocumentSplit$Unordered$1
0%
0/3
N/A
0
DocumentSplit$Unordered$2
0%
0/3
N/A
0
DocumentSplit$Unordered$DuplicateEliminator
0%
0/11
N/A
0
DocumentSplit$Unordered$OrderedWriterClass
0%
0/13
N/A
0
DocumentSplit$Unordered$ShreddedBuffer
0%
0/49
0%
0/28
0
DocumentSplit$Unordered$ShreddedCombiner
0%
0/55
0%
0/36
0
DocumentSplit$Unordered$ShreddedProcessor
N/A
N/A
0
DocumentSplit$Unordered$ShreddedReader
0%
0/58
0%
0/28
0
DocumentSplit$Unordered$ShreddedSource
N/A
N/A
0
DocumentSplit$Unordered$ShreddedWriter
0%
0/25
0%
0/6
0
DocumentSplit$Unordered$TupleShredder
0%
0/19
0%
0/2
0
DocumentSplit$Unordered$TupleUnshredder
0%
0/24
0%
0/2
0
 
 1  
 // This file was automatically generated with the command: 
 2  
 //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
 3  
 package org.galagosearch.core.types;
 4  
 
 5  
 import org.galagosearch.tupleflow.Utility;
 6  
 import org.galagosearch.tupleflow.ArrayInput;
 7  
 import org.galagosearch.tupleflow.ArrayOutput;
 8  
 import org.galagosearch.tupleflow.Order;   
 9  
 import org.galagosearch.tupleflow.OrderedWriter;
 10  
 import org.galagosearch.tupleflow.Type; 
 11  
 import org.galagosearch.tupleflow.TypeReader;
 12  
 import org.galagosearch.tupleflow.Step; 
 13  
 import org.galagosearch.tupleflow.IncompatibleProcessorException;
 14  
 import org.galagosearch.tupleflow.ReaderSource;
 15  
 import java.io.IOException;             
 16  
 import java.io.EOFException;
 17  
 import java.io.UnsupportedEncodingException;
 18  
 import java.util.ArrayList;
 19  
 import java.util.Arrays;   
 20  
 import java.util.Comparator;
 21  
 import java.util.PriorityQueue;
 22  
 import java.util.Collection;
 23  
 
 24  
 public class DocumentSplit implements Type<DocumentSplit> {
 25  
     public String fileName;
 26  
     public String fileType;
 27  
     public boolean isCompressed;
 28  
     public byte[] startKey;
 29  
     public byte[] endKey; 
 30  
     
 31  32
     public DocumentSplit() {}
 32  0
     public DocumentSplit(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) {
 33  0
         this.fileName = fileName;
 34  0
         this.fileType = fileType;
 35  0
         this.isCompressed = isCompressed;
 36  0
         this.startKey = startKey;
 37  0
         this.endKey = endKey;
 38  0
     }  
 39  
     
 40  
     public String toString() {
 41  
         try {
 42  0
             return String.format("%s,%s,%b,%s,%s",
 43  
                                    fileName, fileType, isCompressed, new String(startKey, "UTF-8"), new String(endKey, "UTF-8"));
 44  0
         } catch(UnsupportedEncodingException e) {
 45  0
             throw new RuntimeException("Couldn't convert string to UTF-8.");
 46  
         }
 47  
     } 
 48  
 
 49  
     public Order<DocumentSplit> getOrder(String... spec) {
 50  20
         if (Arrays.equals(spec, new String[] {  })) {
 51  0
             return new Unordered();
 52  
         }
 53  20
         if (Arrays.equals(spec, new String[] { "+fileName", "+startKey" })) {
 54  20
             return new FileNameStartKeyOrder();
 55  
         }
 56  0
         return null;
 57  
     } 
 58  
       
 59  
     public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentSplit> {
 60  
         public void process(DocumentSplit object) throws IOException;
 61  
         public void close() throws IOException;
 62  
     }                        
 63  
     public interface Source extends Step {
 64  
     }
 65  0
     public static class Unordered implements Order<DocumentSplit> {
 66  
         public int hash(DocumentSplit object) {
 67  0
             int h = 0;
 68  0
             return h;
 69  
         } 
 70  
         public Comparator<DocumentSplit> greaterThan() {
 71  0
             return new Comparator<DocumentSplit>() {
 72  0
                 public int compare(DocumentSplit one, DocumentSplit two) {
 73  0
                     int result = 0;
 74  
                     do {
 75  
                     } while (false);
 76  0
                     return -result;
 77  
                 }
 78  
             };
 79  
         }     
 80  
         public Comparator<DocumentSplit> lessThan() {
 81  0
             return new Comparator<DocumentSplit>() {
 82  0
                 public int compare(DocumentSplit one, DocumentSplit two) {
 83  0
                     int result = 0;
 84  
                     do {
 85  
                     } while (false);
 86  0
                     return result;
 87  
                 }
 88  
             };
 89  
         }     
 90  
         public TypeReader<DocumentSplit> orderedReader(ArrayInput _input) {
 91  0
             return new ShreddedReader(_input);
 92  
         }    
 93  
 
 94  
         public TypeReader<DocumentSplit> orderedReader(ArrayInput _input, int bufferSize) {
 95  0
             return new ShreddedReader(_input, bufferSize);
 96  
         }    
 97  
         public OrderedWriter<DocumentSplit> orderedWriter(ArrayOutput _output) {
 98  0
             ShreddedWriter w = new ShreddedWriter(_output);
 99  0
             return new OrderedWriterClass(w); 
 100  
         }                                    
 101  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentSplit > {
 102  0
             DocumentSplit last = null;
 103  0
             ShreddedWriter shreddedWriter = null; 
 104  
             
 105  0
             public OrderedWriterClass(ShreddedWriter s) {
 106  0
                 this.shreddedWriter = s;
 107  0
             }
 108  
             
 109  
             public void process(DocumentSplit object) throws IOException {
 110  0
                boolean processAll = false;
 111  0
                shreddedWriter.processTuple(object.fileName, object.fileType, object.isCompressed, object.startKey, object.endKey);
 112  0
                last = object;
 113  0
             }           
 114  
                  
 115  
             public void close() throws IOException {
 116  0
                 shreddedWriter.close();
 117  0
             }
 118  
             
 119  
             public Class<DocumentSplit> getInputClass() {
 120  0
                 return DocumentSplit.class;
 121  
             }
 122  
         } 
 123  
         public ReaderSource<DocumentSplit> orderedCombiner(Collection<TypeReader<DocumentSplit>> readers, boolean closeOnExit) {
 124  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 125  
             
 126  0
             for (TypeReader<DocumentSplit> reader : readers) {
 127  0
                 shreddedReaders.add((ShreddedReader)reader);
 128  
             }
 129  
             
 130  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 131  
         }                  
 132  
         public DocumentSplit clone(DocumentSplit object) {
 133  0
             DocumentSplit result = new DocumentSplit();
 134  0
             if (object == null) return result;
 135  0
             result.fileName = object.fileName; 
 136  0
             result.fileType = object.fileType; 
 137  0
             result.isCompressed = object.isCompressed; 
 138  0
             result.startKey = object.startKey; 
 139  0
             result.endKey = object.endKey; 
 140  0
             return result;
 141  
         }                 
 142  
         public Class<DocumentSplit> getOrderedClass() {
 143  0
             return DocumentSplit.class;
 144  
         }                           
 145  
         public String[] getOrderSpec() {
 146  0
             return new String[] {};
 147  
         }
 148  
 
 149  
         public static String getSpecString() {
 150  0
             return "";
 151  
         }
 152  
                            
 153  
         public interface ShreddedProcessor extends Step {
 154  
             public void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) throws IOException;
 155  
             public void close() throws IOException;
 156  
         }    
 157  
         public interface ShreddedSource extends Step {
 158  
         }                                              
 159  
         
 160  
         public static class ShreddedWriter implements ShreddedProcessor {
 161  
             ArrayOutput output;
 162  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 163  0
             boolean lastFlush = false;
 164  
             
 165  0
             public ShreddedWriter(ArrayOutput output) {
 166  0
                 this.output = output;
 167  0
             }                        
 168  
             
 169  
             public void close() throws IOException {
 170  0
                 flush();
 171  0
             }
 172  
             
 173  
             public final void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) throws IOException {
 174  0
                 if (lastFlush) {
 175  0
                     lastFlush = false;
 176  
                 }
 177  0
                 buffer.processTuple(fileName, fileType, isCompressed, startKey, endKey);
 178  0
                 if (buffer.isFull())
 179  0
                     flush();
 180  0
             }
 181  
             public final void flushTuples(int pauseIndex) throws IOException {
 182  
                 
 183  0
                 while (buffer.getReadIndex() < pauseIndex) {
 184  
                            
 185  0
                     output.writeString(buffer.getFileName());
 186  0
                     output.writeString(buffer.getFileType());
 187  0
                     output.writeBoolean(buffer.getIsCompressed());
 188  0
                     output.writeBytes(buffer.getStartKey());
 189  0
                     output.writeBytes(buffer.getEndKey());
 190  0
                     buffer.incrementTuple();
 191  
                 }
 192  0
             }  
 193  
             public void flush() throws IOException { 
 194  0
                 flushTuples(buffer.getWriteIndex());
 195  0
                 buffer.reset(); 
 196  0
                 lastFlush = true;
 197  0
             }                           
 198  
         }
 199  0
         public static class ShreddedBuffer {
 200  
                             
 201  
             String[] fileNames;
 202  
             String[] fileTypes;
 203  
             boolean[] isCompresseds;
 204  
             byte[][] startKeys;
 205  
             byte[][] endKeys;
 206  0
             int writeTupleIndex = 0;
 207  0
             int readTupleIndex = 0;
 208  
             int batchSize;
 209  
 
 210  0
             public ShreddedBuffer(int batchSize) {
 211  0
                 this.batchSize = batchSize;
 212  
 
 213  0
                 fileNames = new String[batchSize];
 214  0
                 fileTypes = new String[batchSize];
 215  0
                 isCompresseds = new boolean[batchSize];
 216  0
                 startKeys = new byte[batchSize][];
 217  0
                 endKeys = new byte[batchSize][];
 218  0
             }                              
 219  
 
 220  
             public ShreddedBuffer() {    
 221  0
                 this(10000);
 222  0
             }                                                                                                                    
 223  
             
 224  
             public void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) {
 225  0
                 fileNames[writeTupleIndex] = fileName;
 226  0
                 fileTypes[writeTupleIndex] = fileType;
 227  0
                 isCompresseds[writeTupleIndex] = isCompressed;
 228  0
                 startKeys[writeTupleIndex] = startKey;
 229  0
                 endKeys[writeTupleIndex] = endKey;
 230  0
                 writeTupleIndex++;
 231  0
             }
 232  
             public void resetData() {
 233  0
                 writeTupleIndex = 0;
 234  0
             }                  
 235  
                                  
 236  
             public void resetRead() {
 237  0
                 readTupleIndex = 0;
 238  0
             } 
 239  
 
 240  
             public void reset() {
 241  0
                 resetData();
 242  0
                 resetRead();
 243  0
             } 
 244  
             public boolean isFull() {
 245  0
                 return writeTupleIndex >= batchSize;
 246  
             }
 247  
 
 248  
             public boolean isEmpty() {
 249  0
                 return writeTupleIndex == 0;
 250  
             }                          
 251  
 
 252  
             public boolean isAtEnd() {
 253  0
                 return readTupleIndex >= writeTupleIndex;
 254  
             }           
 255  
             public void incrementTuple() {
 256  0
                 readTupleIndex++;
 257  0
             }                    
 258  
             public int getReadIndex() {
 259  0
                 return readTupleIndex;
 260  
             }   
 261  
 
 262  
             public int getWriteIndex() {
 263  0
                 return writeTupleIndex;
 264  
             } 
 265  
             public String getFileName() {
 266  0
                 assert readTupleIndex < writeTupleIndex;
 267  0
                 return fileNames[readTupleIndex];
 268  
             }                                         
 269  
             public String getFileType() {
 270  0
                 assert readTupleIndex < writeTupleIndex;
 271  0
                 return fileTypes[readTupleIndex];
 272  
             }                                         
 273  
             public boolean getIsCompressed() {
 274  0
                 assert readTupleIndex < writeTupleIndex;
 275  0
                 return isCompresseds[readTupleIndex];
 276  
             }                                         
 277  
             public byte[] getStartKey() {
 278  0
                 assert readTupleIndex < writeTupleIndex;
 279  0
                 return startKeys[readTupleIndex];
 280  
             }                                         
 281  
             public byte[] getEndKey() {
 282  0
                 assert readTupleIndex < writeTupleIndex;
 283  0
                 return endKeys[readTupleIndex];
 284  
             }                                         
 285  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 286  0
                 while (getReadIndex() < endIndex) {
 287  0
                    output.processTuple(getFileName(), getFileType(), getIsCompressed(), getStartKey(), getEndKey());
 288  0
                    incrementTuple();
 289  
                 }
 290  0
             }                                                                           
 291  
              
 292  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 293  0
             }
 294  
             
 295  
         }                         
 296  0
         public static class ShreddedCombiner implements ReaderSource<DocumentSplit>, ShreddedSource {   
 297  
             public ShreddedProcessor processor;
 298  
             Collection<ShreddedReader> readers;       
 299  0
             boolean closeOnExit = false;
 300  0
             boolean uninitialized = true;
 301  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 302  
             
 303  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 304  0
                 this.readers = readers;                                                       
 305  0
                 this.closeOnExit = closeOnExit;
 306  0
             }
 307  
                                   
 308  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 309  0
                 if (processor instanceof ShreddedProcessor) {
 310  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 311  0
                 } else if (processor instanceof DocumentSplit.Processor) {
 312  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentSplit.Processor) processor));
 313  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 314  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentSplit>) processor));
 315  
                 } else {
 316  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 317  
                 }
 318  0
             }                                
 319  
             
 320  
             public Class<DocumentSplit> getOutputClass() {
 321  0
                 return DocumentSplit.class;
 322  
             }
 323  
             
 324  
             public void initialize() throws IOException {
 325  0
                 for (ShreddedReader reader : readers) {
 326  0
                     reader.fill();                                        
 327  
                     
 328  0
                     if (!reader.getBuffer().isAtEnd())
 329  0
                         queue.add(reader);
 330  
                 }   
 331  
 
 332  0
                 uninitialized = false;
 333  0
             }
 334  
 
 335  
             public void run() throws IOException {
 336  0
                 initialize();
 337  
                
 338  0
                 while (queue.size() > 0) {
 339  0
                     ShreddedReader top = queue.poll();
 340  0
                     ShreddedReader next = null;
 341  0
                     ShreddedBuffer nextBuffer = null; 
 342  
                     
 343  0
                     assert !top.getBuffer().isAtEnd();
 344  
                                                   
 345  0
                     if (queue.size() > 0) {
 346  0
                         next = queue.peek();
 347  0
                         nextBuffer = next.getBuffer();
 348  0
                         assert !nextBuffer.isAtEnd();
 349  
                     }
 350  
                     
 351  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 352  0
                     if (top.getBuffer().isAtEnd())
 353  0
                         top.fill();                 
 354  
                         
 355  0
                     if (!top.getBuffer().isAtEnd())
 356  0
                         queue.add(top);
 357  0
                 }              
 358  
                 
 359  0
                 if (closeOnExit)
 360  0
                     processor.close();
 361  0
             }
 362  
 
 363  
             public DocumentSplit read() throws IOException {
 364  0
                 if (uninitialized)
 365  0
                     initialize();
 366  
 
 367  0
                 DocumentSplit result = null;
 368  
 
 369  0
                 while (queue.size() > 0) {
 370  0
                     ShreddedReader top = queue.poll();
 371  0
                     result = top.read();
 372  
 
 373  0
                     if (result != null) {
 374  0
                         if (top.getBuffer().isAtEnd())
 375  0
                             top.fill();
 376  
 
 377  0
                         queue.offer(top);
 378  0
                         break;
 379  
                     } 
 380  0
                 }
 381  
 
 382  0
                 return result;
 383  
             }
 384  
         } 
 385  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentSplit>, ShreddedSource {      
 386  
             public ShreddedProcessor processor;
 387  
             ShreddedBuffer buffer;
 388  0
             DocumentSplit last = new DocumentSplit();         
 389  0
             long tupleCount = 0;
 390  0
             long bufferStartCount = 0;  
 391  
             ArrayInput input;
 392  
             
 393  0
             public ShreddedReader(ArrayInput input) {
 394  0
                 this.input = input; 
 395  0
                 this.buffer = new ShreddedBuffer();
 396  0
             }                               
 397  
             
 398  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 399  0
                 this.input = input;
 400  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 401  0
             }
 402  
                  
 403  
             public final int compareTo(ShreddedReader other) {
 404  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 405  
                 
 406  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 407  0
                     return 0;                 
 408  0
                 } else if (buffer.isAtEnd()) {
 409  0
                     return -1;
 410  0
                 } else if (otherBuffer.isAtEnd()) {
 411  0
                     return 1;
 412  
                 }
 413  
                                    
 414  0
                 int result = 0;
 415  
                 do {
 416  
                 } while (false);                                             
 417  
                 
 418  0
                 return result;
 419  
             }
 420  
             
 421  
             public final ShreddedBuffer getBuffer() {
 422  0
                 return buffer;
 423  
             }                
 424  
             
 425  
             public final DocumentSplit read() throws IOException {
 426  0
                 if (buffer.isAtEnd()) {
 427  0
                     fill();             
 428  
                 
 429  0
                     if (buffer.isAtEnd()) {
 430  0
                         return null;
 431  
                     }
 432  
                 }
 433  
                       
 434  0
                 assert !buffer.isAtEnd();
 435  0
                 DocumentSplit result = new DocumentSplit();
 436  
                 
 437  0
                 result.fileName = buffer.getFileName();
 438  0
                 result.fileType = buffer.getFileType();
 439  0
                 result.isCompressed = buffer.getIsCompressed();
 440  0
                 result.startKey = buffer.getStartKey();
 441  0
                 result.endKey = buffer.getEndKey();
 442  
                 
 443  0
                 buffer.incrementTuple();
 444  
                 
 445  0
                 return result;
 446  
             }           
 447  
             
 448  
             public final void fill() throws IOException {
 449  
                 try {   
 450  0
                     buffer.reset();
 451  
                     
 452  0
                     if (tupleCount != 0) {
 453  0
                         bufferStartCount = tupleCount;
 454  
                     }
 455  
                     
 456  0
                     while (!buffer.isFull()) {
 457  0
                         buffer.processTuple(input.readString(), input.readString(), input.readBoolean(), input.readBytes(), input.readBytes());
 458  0
                         tupleCount++;
 459  
                     }
 460  0
                 } catch(EOFException e) {}
 461  0
             }
 462  
 
 463  
 
 464  
             public void run() throws IOException {
 465  
                 while (true) {
 466  0
                     fill();
 467  
                     
 468  0
                     if (buffer.isAtEnd())
 469  0
                         break;
 470  
                     
 471  0
                     buffer.copyUntil(null, processor);
 472  
                 }      
 473  0
                 processor.close();
 474  0
             }
 475  
             
 476  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 477  0
                 if (processor instanceof ShreddedProcessor) {
 478  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 479  0
                 } else if (processor instanceof DocumentSplit.Processor) {
 480  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentSplit.Processor) processor));
 481  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 482  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentSplit>) processor));
 483  
                 } else {
 484  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 485  
                 }
 486  0
             }                                
 487  
             
 488  
             public Class<DocumentSplit> getOutputClass() {
 489  0
                 return DocumentSplit.class;
 490  
             }                
 491  
         }
 492  
         
 493  
         public static class DuplicateEliminator implements ShreddedProcessor {
 494  
             public ShreddedProcessor processor;
 495  0
             DocumentSplit last = new DocumentSplit();
 496  
                                            
 497  0
             public DuplicateEliminator() {}
 498  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 499  0
                 this.processor = processor;
 500  0
             }
 501  
             
 502  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 503  0
                 this.processor = processor;
 504  0
             }
 505  
 
 506  
           
 507  
             
 508  
                                
 509  
             public void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) throws IOException {
 510  0
                 processor.processTuple(fileName, fileType, isCompressed, startKey, endKey);
 511  0
             } 
 512  
             
 513  
             public void close() throws IOException {
 514  0
                 processor.close();
 515  0
             }                    
 516  
         }
 517  
         public static class TupleUnshredder implements ShreddedProcessor {
 518  0
             DocumentSplit last = new DocumentSplit();
 519  
             public org.galagosearch.tupleflow.Processor<DocumentSplit> processor;                               
 520  
             
 521  0
             public TupleUnshredder(DocumentSplit.Processor processor) {
 522  0
                 this.processor = processor;
 523  0
             }         
 524  
             
 525  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentSplit> processor) {
 526  0
                 this.processor = processor;
 527  0
             }
 528  
             
 529  
             public DocumentSplit clone(DocumentSplit object) {
 530  0
                 DocumentSplit result = new DocumentSplit();
 531  0
                 if (object == null) return result;
 532  0
                 result.fileName = object.fileName; 
 533  0
                 result.fileType = object.fileType; 
 534  0
                 result.isCompressed = object.isCompressed; 
 535  0
                 result.startKey = object.startKey; 
 536  0
                 result.endKey = object.endKey; 
 537  0
                 return result;
 538  
             }                 
 539  
             
 540  
             
 541  
             public void processTuple(String fileName, String fileType, boolean isCompressed, byte[] startKey, byte[] endKey) throws IOException {
 542  0
                 last.fileName = fileName;
 543  0
                 last.fileType = fileType;
 544  0
                 last.isCompressed = isCompressed;
 545  0
                 last.startKey = startKey;
 546  0
                 last.endKey = endKey;
 547  0
                 processor.process(clone(last));
 548  0
             }               
 549  
             
 550  
             public void close() throws IOException {
 551  0
                 processor.close();
 552  0
             }
 553  
         }     
 554  0
         public static class TupleShredder implements Processor {
 555  0
             DocumentSplit last = new DocumentSplit();
 556  
             public ShreddedProcessor processor;
 557  
             
 558  0
             public TupleShredder(ShreddedProcessor processor) {
 559  0
                 this.processor = processor;
 560  0
             }                              
 561  
             
 562  
             public DocumentSplit clone(DocumentSplit object) {
 563  0
                 DocumentSplit result = new DocumentSplit();
 564  0
                 if (object == null) return result;
 565  0
                 result.fileName = object.fileName; 
 566  0
                 result.fileType = object.fileType; 
 567  0
                 result.isCompressed = object.isCompressed; 
 568  0
                 result.startKey = object.startKey; 
 569  0
                 result.endKey = object.endKey; 
 570  0
                 return result;
 571  
             }                 
 572  
             
 573  
             public void process(DocumentSplit object) throws IOException {                                                                                                                                                   
 574  0
                 boolean processAll = false;
 575  0
                 processor.processTuple(object.fileName, object.fileType, object.isCompressed, object.startKey, object.endKey);                                         
 576  0
             }
 577  
                           
 578  
             public Class<DocumentSplit> getInputClass() {
 579  0
                 return DocumentSplit.class;
 580  
             }
 581  
             
 582  
             public void close() throws IOException {
 583  0
                 processor.close();
 584  0
             }                     
 585  
         }
 586  
     } 
 587  88
     public static class FileNameStartKeyOrder implements Order<DocumentSplit> {
 588  
         public int hash(DocumentSplit object) {
 589  0
             int h = 0;
 590  0
             h += Utility.hash(object.fileName);
 591  0
             h += Utility.hash(object.startKey);
 592  0
             return h;
 593  
         } 
 594  
         public Comparator<DocumentSplit> greaterThan() {
 595  0
             return new Comparator<DocumentSplit>() {
 596  0
                 public int compare(DocumentSplit one, DocumentSplit two) {
 597  0
                     int result = 0;
 598  
                     do {
 599  0
                         result = + Utility.compare(one.fileName, two.fileName);
 600  0
                         if(result != 0) break;
 601  0
                         result = + Utility.compare(one.startKey, two.startKey);
 602  0
                         if(result != 0) break;
 603  
                     } while (false);
 604  0
                     return -result;
 605  
                 }
 606  
             };
 607  
         }     
 608  
         public Comparator<DocumentSplit> lessThan() {
 609  0
             return new Comparator<DocumentSplit>() {
 610  0
                 public int compare(DocumentSplit one, DocumentSplit two) {
 611  0
                     int result = 0;
 612  
                     do {
 613  0
                         result = + Utility.compare(one.fileName, two.fileName);
 614  0
                         if(result != 0) break;
 615  0
                         result = + Utility.compare(one.startKey, two.startKey);
 616  0
                         if(result != 0) break;
 617  
                     } while (false);
 618  0
                     return result;
 619  
                 }
 620  
             };
 621  
         }     
 622  
         public TypeReader<DocumentSplit> orderedReader(ArrayInput _input) {
 623  0
             return new ShreddedReader(_input);
 624  
         }    
 625  
 
 626  
         public TypeReader<DocumentSplit> orderedReader(ArrayInput _input, int bufferSize) {
 627  0
             return new ShreddedReader(_input, bufferSize);
 628  
         }    
 629  
         public OrderedWriter<DocumentSplit> orderedWriter(ArrayOutput _output) {
 630  0
             ShreddedWriter w = new ShreddedWriter(_output);
 631  0
             return new OrderedWriterClass(w); 
 632  
         }                                    
 633  0
         public static class OrderedWriterClass extends OrderedWriter< DocumentSplit > {
 634  0
             DocumentSplit last = null;
 635  0
             ShreddedWriter shreddedWriter = null; 
 636  
             
 637  0
             public OrderedWriterClass(ShreddedWriter s) {
 638  0
                 this.shreddedWriter = s;
 639  0
             }
 640  
             
 641  
             public void process(DocumentSplit object) throws IOException {
 642  0
                boolean processAll = false;
 643  0
                if (processAll || last == null || 0 != Utility.compare(object.fileName, last.fileName)) { processAll = true; shreddedWriter.processFileName(object.fileName); }
 644  0
                if (processAll || last == null || 0 != Utility.compare(object.startKey, last.startKey)) { processAll = true; shreddedWriter.processStartKey(object.startKey); }
 645  0
                shreddedWriter.processTuple(object.fileType, object.isCompressed, object.endKey);
 646  0
                last = object;
 647  0
             }           
 648  
                  
 649  
             public void close() throws IOException {
 650  0
                 shreddedWriter.close();
 651  0
             }
 652  
             
 653  
             public Class<DocumentSplit> getInputClass() {
 654  0
                 return DocumentSplit.class;
 655  
             }
 656  
         } 
 657  
         public ReaderSource<DocumentSplit> orderedCombiner(Collection<TypeReader<DocumentSplit>> readers, boolean closeOnExit) {
 658  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 659  
             
 660  0
             for (TypeReader<DocumentSplit> reader : readers) {
 661  0
                 shreddedReaders.add((ShreddedReader)reader);
 662  
             }
 663  
             
 664  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 665  
         }                  
 666  
         public DocumentSplit clone(DocumentSplit object) {
 667  0
             DocumentSplit result = new DocumentSplit();
 668  0
             if (object == null) return result;
 669  0
             result.fileName = object.fileName; 
 670  0
             result.fileType = object.fileType; 
 671  0
             result.isCompressed = object.isCompressed; 
 672  0
             result.startKey = object.startKey; 
 673  0
             result.endKey = object.endKey; 
 674  0
             return result;
 675  
         }                 
 676  
         public Class<DocumentSplit> getOrderedClass() {
 677  68
             return DocumentSplit.class;
 678  
         }                           
 679  
         public String[] getOrderSpec() {
 680  68
             return new String[] {"+fileName", "+startKey"};
 681  
         }
 682  
 
 683  
         public static String getSpecString() {
 684  0
             return "+fileName +startKey";
 685  
         }
 686  
                            
 687  
         public interface ShreddedProcessor extends Step {
 688  
             public void processFileName(String fileName) throws IOException;
 689  
             public void processStartKey(byte[] startKey) throws IOException;
 690  
             public void processTuple(String fileType, boolean isCompressed, byte[] endKey) throws IOException;
 691  
             public void close() throws IOException;
 692  
         }    
 693  
         public interface ShreddedSource extends Step {
 694  
         }                                              
 695  
         
 696  0
         public static class ShreddedWriter implements ShreddedProcessor {
 697  
             ArrayOutput output;
 698  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 699  
             String lastFileName;
 700  
             byte[] lastStartKey;
 701  0
             boolean lastFlush = false;
 702  
             
 703  0
             public ShreddedWriter(ArrayOutput output) {
 704  0
                 this.output = output;
 705  0
             }                        
 706  
             
 707  
             public void close() throws IOException {
 708  0
                 flush();
 709  0
             }
 710  
             
 711  
             public void processFileName(String fileName) {
 712  0
                 lastFileName = fileName;
 713  0
                 buffer.processFileName(fileName);
 714  0
             }
 715  
             public void processStartKey(byte[] startKey) {
 716  0
                 lastStartKey = startKey;
 717  0
                 buffer.processStartKey(startKey);
 718  0
             }
 719  
             public final void processTuple(String fileType, boolean isCompressed, byte[] endKey) throws IOException {
 720  0
                 if (lastFlush) {
 721  0
                     if(buffer.fileNames.size() == 0) buffer.processFileName(lastFileName);
 722  0
                     if(buffer.startKeys.size() == 0) buffer.processStartKey(lastStartKey);
 723  0
                     lastFlush = false;
 724  
                 }
 725  0
                 buffer.processTuple(fileType, isCompressed, endKey);
 726  0
                 if (buffer.isFull())
 727  0
                     flush();
 728  0
             }
 729  
             public final void flushTuples(int pauseIndex) throws IOException {
 730  
                 
 731  0
                 while (buffer.getReadIndex() < pauseIndex) {
 732  
                            
 733  0
                     output.writeString(buffer.getFileType());
 734  0
                     output.writeBoolean(buffer.getIsCompressed());
 735  0
                     output.writeBytes(buffer.getEndKey());
 736  0
                     buffer.incrementTuple();
 737  
                 }
 738  0
             }  
 739  
             public final void flushFileName(int pauseIndex) throws IOException {
 740  0
                 while (buffer.getReadIndex() < pauseIndex) {
 741  0
                     int nextPause = buffer.getFileNameEndIndex();
 742  0
                     int count = nextPause - buffer.getReadIndex();
 743  
                     
 744  0
                     output.writeString(buffer.getFileName());
 745  0
                     output.writeInt(count);
 746  0
                     buffer.incrementFileName();
 747  
                       
 748  0
                     flushStartKey(nextPause);
 749  0
                     assert nextPause == buffer.getReadIndex();
 750  0
                 }
 751  0
             }
 752  
             public final void flushStartKey(int pauseIndex) throws IOException {
 753  0
                 while (buffer.getReadIndex() < pauseIndex) {
 754  0
                     int nextPause = buffer.getStartKeyEndIndex();
 755  0
                     int count = nextPause - buffer.getReadIndex();
 756  
                     
 757  0
                     output.writeBytes(buffer.getStartKey());
 758  0
                     output.writeInt(count);
 759  0
                     buffer.incrementStartKey();
 760  
                       
 761  0
                     flushTuples(nextPause);
 762  0
                     assert nextPause == buffer.getReadIndex();
 763  0
                 }
 764  0
             }
 765  
             public void flush() throws IOException { 
 766  0
                 flushFileName(buffer.getWriteIndex());
 767  0
                 buffer.reset(); 
 768  0
                 lastFlush = true;
 769  0
             }                           
 770  
         }
 771  0
         public static class ShreddedBuffer {
 772  0
             ArrayList<String> fileNames = new ArrayList();
 773  0
             ArrayList<byte[]> startKeys = new ArrayList();
 774  0
             ArrayList<Integer> fileNameTupleIdx = new ArrayList();
 775  0
             ArrayList<Integer> startKeyTupleIdx = new ArrayList();
 776  0
             int fileNameReadIdx = 0;
 777  0
             int startKeyReadIdx = 0;
 778  
                             
 779  
             String[] fileTypes;
 780  
             boolean[] isCompresseds;
 781  
             byte[][] endKeys;
 782  0
             int writeTupleIndex = 0;
 783  0
             int readTupleIndex = 0;
 784  
             int batchSize;
 785  
 
 786  0
             public ShreddedBuffer(int batchSize) {
 787  0
                 this.batchSize = batchSize;
 788  
 
 789  0
                 fileTypes = new String[batchSize];
 790  0
                 isCompresseds = new boolean[batchSize];
 791  0
                 endKeys = new byte[batchSize][];
 792  0
             }                              
 793  
 
 794  
             public ShreddedBuffer() {    
 795  0
                 this(10000);
 796  0
             }                                                                                                                    
 797  
             
 798  
             public void processFileName(String fileName) {
 799  0
                 fileNames.add(fileName);
 800  0
                 fileNameTupleIdx.add(writeTupleIndex);
 801  0
             }                                      
 802  
             public void processStartKey(byte[] startKey) {
 803  0
                 startKeys.add(startKey);
 804  0
                 startKeyTupleIdx.add(writeTupleIndex);
 805  0
             }                                      
 806  
             public void processTuple(String fileType, boolean isCompressed, byte[] endKey) {
 807  0
                 assert fileNames.size() > 0;
 808  0
                 assert startKeys.size() > 0;
 809  0
                 fileTypes[writeTupleIndex] = fileType;
 810  0
                 isCompresseds[writeTupleIndex] = isCompressed;
 811  0
                 endKeys[writeTupleIndex] = endKey;
 812  0
                 writeTupleIndex++;
 813  0
             }
 814  
             public void resetData() {
 815  0
                 fileNames.clear();
 816  0
                 startKeys.clear();
 817  0
                 fileNameTupleIdx.clear();
 818  0
                 startKeyTupleIdx.clear();
 819  0
                 writeTupleIndex = 0;
 820  0
             }                  
 821  
                                  
 822  
             public void resetRead() {
 823  0
                 readTupleIndex = 0;
 824  0
                 fileNameReadIdx = 0;
 825  0
                 startKeyReadIdx = 0;
 826  0
             } 
 827  
 
 828  
             public void reset() {
 829  0
                 resetData();
 830  0
                 resetRead();
 831  0
             } 
 832  
             public boolean isFull() {
 833  0
                 return writeTupleIndex >= batchSize;
 834  
             }
 835  
 
 836  
             public boolean isEmpty() {
 837  0
                 return writeTupleIndex == 0;
 838  
             }                          
 839  
 
 840  
             public boolean isAtEnd() {
 841  0
                 return readTupleIndex >= writeTupleIndex;
 842  
             }           
 843  
             public void incrementFileName() {
 844  0
                 fileNameReadIdx++;  
 845  0
             }                                                                                              
 846  
 
 847  
             public void autoIncrementFileName() {
 848  0
                 while (readTupleIndex >= getFileNameEndIndex() && readTupleIndex < writeTupleIndex)
 849  0
                     fileNameReadIdx++;
 850  0
             }                 
 851  
             public void incrementStartKey() {
 852  0
                 startKeyReadIdx++;  
 853  0
             }                                                                                              
 854  
 
 855  
             public void autoIncrementStartKey() {
 856  0
                 while (readTupleIndex >= getStartKeyEndIndex() && readTupleIndex < writeTupleIndex)
 857  0
                     startKeyReadIdx++;
 858  0
             }                 
 859  
             public void incrementTuple() {
 860  0
                 readTupleIndex++;
 861  0
             }                    
 862  
             public int getFileNameEndIndex() {
 863  0
                 if ((fileNameReadIdx+1) >= fileNameTupleIdx.size())
 864  0
                     return writeTupleIndex;
 865  0
                 return fileNameTupleIdx.get(fileNameReadIdx+1);
 866  
             }
 867  
 
 868  
             public int getStartKeyEndIndex() {
 869  0
                 if ((startKeyReadIdx+1) >= startKeyTupleIdx.size())
 870  0
                     return writeTupleIndex;
 871  0
                 return startKeyTupleIdx.get(startKeyReadIdx+1);
 872  
             }
 873  
             public int getReadIndex() {
 874  0
                 return readTupleIndex;
 875  
             }   
 876  
 
 877  
             public int getWriteIndex() {
 878  0
                 return writeTupleIndex;
 879  
             } 
 880  
             public String getFileName() {
 881  0
                 assert readTupleIndex < writeTupleIndex;
 882  0
                 assert fileNameReadIdx < fileNames.size();
 883  
                 
 884  0
                 return fileNames.get(fileNameReadIdx);
 885  
             }
 886  
             public byte[] getStartKey() {
 887  0
                 assert readTupleIndex < writeTupleIndex;
 888  0
                 assert startKeyReadIdx < startKeys.size();
 889  
                 
 890  0
                 return startKeys.get(startKeyReadIdx);
 891  
             }
 892  
             public String getFileType() {
 893  0
                 assert readTupleIndex < writeTupleIndex;
 894  0
                 return fileTypes[readTupleIndex];
 895  
             }                                         
 896  
             public boolean getIsCompressed() {
 897  0
                 assert readTupleIndex < writeTupleIndex;
 898  0
                 return isCompresseds[readTupleIndex];
 899  
             }                                         
 900  
             public byte[] getEndKey() {
 901  0
                 assert readTupleIndex < writeTupleIndex;
 902  0
                 return endKeys[readTupleIndex];
 903  
             }                                         
 904  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 905  0
                 while (getReadIndex() < endIndex) {
 906  0
                    output.processTuple(getFileType(), getIsCompressed(), getEndKey());
 907  0
                    incrementTuple();
 908  
                 }
 909  0
             }                                                                           
 910  
             public void copyUntilIndexFileName(int endIndex, ShreddedProcessor output) throws IOException {
 911  0
                 while (getReadIndex() < endIndex) {
 912  0
                     output.processFileName(getFileName());
 913  0
                     assert getFileNameEndIndex() <= endIndex;
 914  0
                     copyUntilIndexStartKey(getFileNameEndIndex(), output);
 915  0
                     incrementFileName();
 916  
                 }
 917  0
             } 
 918  
             public void copyUntilIndexStartKey(int endIndex, ShreddedProcessor output) throws IOException {
 919  0
                 while (getReadIndex() < endIndex) {
 920  0
                     output.processStartKey(getStartKey());
 921  0
                     assert getStartKeyEndIndex() <= endIndex;
 922  0
                     copyTuples(getStartKeyEndIndex(), output);
 923  0
                     incrementStartKey();
 924  
                 }
 925  0
             }  
 926  
             public void copyUntilFileName(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 927  0
                 while (!isAtEnd()) {
 928  0
                     if (other != null) {   
 929  0
                         assert !other.isAtEnd();
 930  0
                         int c = + Utility.compare(getFileName(), other.getFileName());
 931  
                     
 932  0
                         if (c > 0) {
 933  0
                             break;   
 934  
                         }
 935  
                         
 936  0
                         output.processFileName(getFileName());
 937  
                                       
 938  0
                         if (c < 0) {
 939  0
                             copyUntilIndexStartKey(getFileNameEndIndex(), output);
 940  0
                         } else if (c == 0) {
 941  0
                             copyUntilStartKey(other, output);
 942  0
                             autoIncrementFileName();
 943  0
                             break;
 944  
                         }
 945  0
                     } else {
 946  0
                         output.processFileName(getFileName());
 947  0
                         copyUntilIndexStartKey(getFileNameEndIndex(), output);
 948  
                     }
 949  0
                     incrementFileName();  
 950  
                     
 951  
                
 952  
                 }
 953  0
             }
 954  
             public void copyUntilStartKey(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 955  0
                 while (!isAtEnd()) {
 956  0
                     if (other != null) {   
 957  0
                         assert !other.isAtEnd();
 958  0
                         int c = + Utility.compare(getStartKey(), other.getStartKey());
 959  
                     
 960  0
                         if (c > 0) {
 961  0
                             break;   
 962  
                         }
 963  
                         
 964  0
                         output.processStartKey(getStartKey());
 965  
                                       
 966  0
                         copyTuples(getStartKeyEndIndex(), output);
 967  0
                     } else {
 968  0
                         output.processStartKey(getStartKey());
 969  0
                         copyTuples(getStartKeyEndIndex(), output);
 970  
                     }
 971  0
                     incrementStartKey();  
 972  
                     
 973  0
                     if (getFileNameEndIndex() <= readTupleIndex)
 974  0
                         break;   
 975  
                 }
 976  0
             }
 977  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 978  0
                 copyUntilFileName(other, output);
 979  0
             }
 980  
             
 981  
         }                         
 982  0
         public static class ShreddedCombiner implements ReaderSource<DocumentSplit>, ShreddedSource {   
 983  
             public ShreddedProcessor processor;
 984  
             Collection<ShreddedReader> readers;       
 985  0
             boolean closeOnExit = false;
 986  0
             boolean uninitialized = true;
 987  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 988  
             
 989  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 990  0
                 this.readers = readers;                                                       
 991  0
                 this.closeOnExit = closeOnExit;
 992  0
             }
 993  
                                   
 994  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 995  0
                 if (processor instanceof ShreddedProcessor) {
 996  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 997  0
                 } else if (processor instanceof DocumentSplit.Processor) {
 998  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentSplit.Processor) processor));
 999  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1000  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentSplit>) processor));
 1001  
                 } else {
 1002  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1003  
                 }
 1004  0
             }                                
 1005  
             
 1006  
             public Class<DocumentSplit> getOutputClass() {
 1007  0
                 return DocumentSplit.class;
 1008  
             }
 1009  
             
 1010  
             public void initialize() throws IOException {
 1011  0
                 for (ShreddedReader reader : readers) {
 1012  0
                     reader.fill();                                        
 1013  
                     
 1014  0
                     if (!reader.getBuffer().isAtEnd())
 1015  0
                         queue.add(reader);
 1016  
                 }   
 1017  
 
 1018  0
                 uninitialized = false;
 1019  0
             }
 1020  
 
 1021  
             public void run() throws IOException {
 1022  0
                 initialize();
 1023  
                
 1024  0
                 while (queue.size() > 0) {
 1025  0
                     ShreddedReader top = queue.poll();
 1026  0
                     ShreddedReader next = null;
 1027  0
                     ShreddedBuffer nextBuffer = null; 
 1028  
                     
 1029  0
                     assert !top.getBuffer().isAtEnd();
 1030  
                                                   
 1031  0
                     if (queue.size() > 0) {
 1032  0
                         next = queue.peek();
 1033  0
                         nextBuffer = next.getBuffer();
 1034  0
                         assert !nextBuffer.isAtEnd();
 1035  
                     }
 1036  
                     
 1037  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 1038  0
                     if (top.getBuffer().isAtEnd())
 1039  0
                         top.fill();                 
 1040  
                         
 1041  0
                     if (!top.getBuffer().isAtEnd())
 1042  0
                         queue.add(top);
 1043  0
                 }              
 1044  
                 
 1045  0
                 if (closeOnExit)
 1046  0
                     processor.close();
 1047  0
             }
 1048  
 
 1049  
             public DocumentSplit read() throws IOException {
 1050  0
                 if (uninitialized)
 1051  0
                     initialize();
 1052  
 
 1053  0
                 DocumentSplit result = null;
 1054  
 
 1055  0
                 while (queue.size() > 0) {
 1056  0
                     ShreddedReader top = queue.poll();
 1057  0
                     result = top.read();
 1058  
 
 1059  0
                     if (result != null) {
 1060  0
                         if (top.getBuffer().isAtEnd())
 1061  0
                             top.fill();
 1062  
 
 1063  0
                         queue.offer(top);
 1064  0
                         break;
 1065  
                     } 
 1066  0
                 }
 1067  
 
 1068  0
                 return result;
 1069  
             }
 1070  
         } 
 1071  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentSplit>, ShreddedSource {      
 1072  
             public ShreddedProcessor processor;
 1073  
             ShreddedBuffer buffer;
 1074  0
             DocumentSplit last = new DocumentSplit();         
 1075  0
             long updateFileNameCount = -1;
 1076  0
             long updateStartKeyCount = -1;
 1077  0
             long tupleCount = 0;
 1078  0
             long bufferStartCount = 0;  
 1079  
             ArrayInput input;
 1080  
             
 1081  0
             public ShreddedReader(ArrayInput input) {
 1082  0
                 this.input = input; 
 1083  0
                 this.buffer = new ShreddedBuffer();
 1084  0
             }                               
 1085  
             
 1086  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 1087  0
                 this.input = input;
 1088  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 1089  0
             }
 1090  
                  
 1091  
             public final int compareTo(ShreddedReader other) {
 1092  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 1093  
                 
 1094  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 1095  0
                     return 0;                 
 1096  0
                 } else if (buffer.isAtEnd()) {
 1097  0
                     return -1;
 1098  0
                 } else if (otherBuffer.isAtEnd()) {
 1099  0
                     return 1;
 1100  
                 }
 1101  
                                    
 1102  0
                 int result = 0;
 1103  
                 do {
 1104  0
                     result = + Utility.compare(buffer.getFileName(), otherBuffer.getFileName());
 1105  0
                     if(result != 0) break;
 1106  0
                     result = + Utility.compare(buffer.getStartKey(), otherBuffer.getStartKey());
 1107  0
                     if(result != 0) break;
 1108  
                 } while (false);                                             
 1109  
                 
 1110  0
                 return result;
 1111  
             }
 1112  
             
 1113  
             public final ShreddedBuffer getBuffer() {
 1114  0
                 return buffer;
 1115  
             }                
 1116  
             
 1117  
             public final DocumentSplit read() throws IOException {
 1118  0
                 if (buffer.isAtEnd()) {
 1119  0
                     fill();             
 1120  
                 
 1121  0
                     if (buffer.isAtEnd()) {
 1122  0
                         return null;
 1123  
                     }
 1124  
                 }
 1125  
                       
 1126  0
                 assert !buffer.isAtEnd();
 1127  0
                 DocumentSplit result = new DocumentSplit();
 1128  
                 
 1129  0
                 result.fileName = buffer.getFileName();
 1130  0
                 result.startKey = buffer.getStartKey();
 1131  0
                 result.fileType = buffer.getFileType();
 1132  0
                 result.isCompressed = buffer.getIsCompressed();
 1133  0
                 result.endKey = buffer.getEndKey();
 1134  
                 
 1135  0
                 buffer.incrementTuple();
 1136  0
                 buffer.autoIncrementFileName();
 1137  0
                 buffer.autoIncrementStartKey();
 1138  
                 
 1139  0
                 return result;
 1140  
             }           
 1141  
             
 1142  
             public final void fill() throws IOException {
 1143  
                 try {   
 1144  0
                     buffer.reset();
 1145  
                     
 1146  0
                     if (tupleCount != 0) {
 1147  
                                                       
 1148  0
                         if(updateFileNameCount - tupleCount > 0) {
 1149  0
                             buffer.fileNames.add(last.fileName);
 1150  0
                             buffer.fileNameTupleIdx.add((int) (updateFileNameCount - tupleCount));
 1151  
                         }                              
 1152  0
                         if(updateStartKeyCount - tupleCount > 0) {
 1153  0
                             buffer.startKeys.add(last.startKey);
 1154  0
                             buffer.startKeyTupleIdx.add((int) (updateStartKeyCount - tupleCount));
 1155  
                         }
 1156  0
                         bufferStartCount = tupleCount;
 1157  
                     }
 1158  
                     
 1159  0
                     while (!buffer.isFull()) {
 1160  0
                         updateStartKey();
 1161  0
                         buffer.processTuple(input.readString(), input.readBoolean(), input.readBytes());
 1162  0
                         tupleCount++;
 1163  
                     }
 1164  0
                 } catch(EOFException e) {}
 1165  0
             }
 1166  
 
 1167  
             public final void updateFileName() throws IOException {
 1168  0
                 if (updateFileNameCount > tupleCount)
 1169  0
                     return;
 1170  
                      
 1171  0
                 last.fileName = input.readString();
 1172  0
                 updateFileNameCount = tupleCount + input.readInt();
 1173  
                                       
 1174  0
                 buffer.processFileName(last.fileName);
 1175  0
             }
 1176  
             public final void updateStartKey() throws IOException {
 1177  0
                 if (updateStartKeyCount > tupleCount)
 1178  0
                     return;
 1179  
                      
 1180  0
                 updateFileName();
 1181  0
                 last.startKey = input.readBytes();
 1182  0
                 updateStartKeyCount = tupleCount + input.readInt();
 1183  
                                       
 1184  0
                 buffer.processStartKey(last.startKey);
 1185  0
             }
 1186  
 
 1187  
             public void run() throws IOException {
 1188  
                 while (true) {
 1189  0
                     fill();
 1190  
                     
 1191  0
                     if (buffer.isAtEnd())
 1192  0
                         break;
 1193  
                     
 1194  0
                     buffer.copyUntil(null, processor);
 1195  
                 }      
 1196  0
                 processor.close();
 1197  0
             }
 1198  
             
 1199  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1200  0
                 if (processor instanceof ShreddedProcessor) {
 1201  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1202  0
                 } else if (processor instanceof DocumentSplit.Processor) {
 1203  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentSplit.Processor) processor));
 1204  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1205  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentSplit>) processor));
 1206  
                 } else {
 1207  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1208  
                 }
 1209  0
             }                                
 1210  
             
 1211  
             public Class<DocumentSplit> getOutputClass() {
 1212  0
                 return DocumentSplit.class;
 1213  
             }                
 1214  
         }
 1215  
         
 1216  
         public static class DuplicateEliminator implements ShreddedProcessor {
 1217  
             public ShreddedProcessor processor;
 1218  0
             DocumentSplit last = new DocumentSplit();
 1219  0
             boolean fileNameProcess = true;
 1220  0
             boolean startKeyProcess = true;
 1221  
                                            
 1222  0
             public DuplicateEliminator() {}
 1223  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 1224  0
                 this.processor = processor;
 1225  0
             }
 1226  
             
 1227  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 1228  0
                 this.processor = processor;
 1229  0
             }
 1230  
 
 1231  
             public void processFileName(String fileName) throws IOException {  
 1232  0
                 if (fileNameProcess || Utility.compare(fileName, last.fileName) != 0) {
 1233  0
                     last.fileName = fileName;
 1234  0
                     processor.processFileName(fileName);
 1235  0
             resetStartKey();
 1236  0
                     fileNameProcess = false;
 1237  
                 }
 1238  0
             }
 1239  
             public void processStartKey(byte[] startKey) throws IOException {  
 1240  0
                 if (startKeyProcess || Utility.compare(startKey, last.startKey) != 0) {
 1241  0
                     last.startKey = startKey;
 1242  0
                     processor.processStartKey(startKey);
 1243  0
                     startKeyProcess = false;
 1244  
                 }
 1245  0
             }  
 1246  
             
 1247  
             public void resetFileName() {
 1248  0
                  fileNameProcess = true;
 1249  0
             resetStartKey();
 1250  0
             }                                                
 1251  
             public void resetStartKey() {
 1252  0
                  startKeyProcess = true;
 1253  0
             }                                                
 1254  
                                
 1255  
             public void processTuple(String fileType, boolean isCompressed, byte[] endKey) throws IOException {
 1256  0
                 processor.processTuple(fileType, isCompressed, endKey);
 1257  0
             } 
 1258  
             
 1259  
             public void close() throws IOException {
 1260  0
                 processor.close();
 1261  0
             }                    
 1262  
         }
 1263  
         public static class TupleUnshredder implements ShreddedProcessor {
 1264  0
             DocumentSplit last = new DocumentSplit();
 1265  
             public org.galagosearch.tupleflow.Processor<DocumentSplit> processor;                               
 1266  
             
 1267  0
             public TupleUnshredder(DocumentSplit.Processor processor) {
 1268  0
                 this.processor = processor;
 1269  0
             }         
 1270  
             
 1271  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentSplit> processor) {
 1272  0
                 this.processor = processor;
 1273  0
             }
 1274  
             
 1275  
             public DocumentSplit clone(DocumentSplit object) {
 1276  0
                 DocumentSplit result = new DocumentSplit();
 1277  0
                 if (object == null) return result;
 1278  0
                 result.fileName = object.fileName; 
 1279  0
                 result.fileType = object.fileType; 
 1280  0
                 result.isCompressed = object.isCompressed; 
 1281  0
                 result.startKey = object.startKey; 
 1282  0
                 result.endKey = object.endKey; 
 1283  0
                 return result;
 1284  
             }                 
 1285  
             
 1286  
             public void processFileName(String fileName) throws IOException {
 1287  0
                 last.fileName = fileName;
 1288  0
             }   
 1289  
                 
 1290  
             public void processStartKey(byte[] startKey) throws IOException {
 1291  0
                 last.startKey = startKey;
 1292  0
             }   
 1293  
                 
 1294  
             
 1295  
             public void processTuple(String fileType, boolean isCompressed, byte[] endKey) throws IOException {
 1296  0
                 last.fileType = fileType;
 1297  0
                 last.isCompressed = isCompressed;
 1298  0
                 last.endKey = endKey;
 1299  0
                 processor.process(clone(last));
 1300  0
             }               
 1301  
             
 1302  
             public void close() throws IOException {
 1303  0
                 processor.close();
 1304  0
             }
 1305  
         }     
 1306  88
         public static class TupleShredder implements Processor {
 1307  0
             DocumentSplit last = new DocumentSplit();
 1308  
             public ShreddedProcessor processor;
 1309  
             
 1310  0
             public TupleShredder(ShreddedProcessor processor) {
 1311  0
                 this.processor = processor;
 1312  0
             }                              
 1313  
             
 1314  
             public DocumentSplit clone(DocumentSplit object) {
 1315  0
                 DocumentSplit result = new DocumentSplit();
 1316  0
                 if (object == null) return result;
 1317  0
                 result.fileName = object.fileName; 
 1318  0
                 result.fileType = object.fileType; 
 1319  0
                 result.isCompressed = object.isCompressed; 
 1320  0
                 result.startKey = object.startKey; 
 1321  0
                 result.endKey = object.endKey; 
 1322  0
                 return result;
 1323  
             }                 
 1324  
             
 1325  
             public void process(DocumentSplit object) throws IOException {                                                                                                                                                   
 1326  0
                 boolean processAll = false;
 1327  0
                 if(last == null || Utility.compare(last.fileName, object.fileName) != 0 || processAll) { processor.processFileName(object.fileName); processAll = true; }
 1328  0
                 if(last == null || Utility.compare(last.startKey, object.startKey) != 0 || processAll) { processor.processStartKey(object.startKey); processAll = true; }
 1329  0
                 processor.processTuple(object.fileType, object.isCompressed, object.endKey);                                         
 1330  0
             }
 1331  
                           
 1332  
             public Class<DocumentSplit> getInputClass() {
 1333  0
                 return DocumentSplit.class;
 1334  
             }
 1335  
             
 1336  
             public void close() throws IOException {
 1337  0
                 processor.close();
 1338  0
             }                     
 1339  
         }
 1340  
     } 
 1341  
 }