Coverage Report - org.galagosearch.core.types.KeyValuePair
 
Classes in this File Line Coverage Branch Coverage Complexity
KeyValuePair
64%
7/11
50%
1/2
0
KeyValuePair$KeyOrder
17%
4/23
0%
0/4
0
KeyValuePair$KeyOrder$1
0%
0/5
0%
0/2
0
KeyValuePair$KeyOrder$2
0%
0/5
0%
0/2
0
KeyValuePair$KeyOrder$DuplicateEliminator
0%
0/19
0%
0/4
0
KeyValuePair$KeyOrder$OrderedWriterClass
0%
0/14
0%
0/6
0
KeyValuePair$KeyOrder$ShreddedBuffer
0%
0/74
0%
0/46
0
KeyValuePair$KeyOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
KeyValuePair$KeyOrder$ShreddedProcessor
N/A
N/A
0
KeyValuePair$KeyOrder$ShreddedReader
0%
0/69
0%
0/34
0
KeyValuePair$KeyOrder$ShreddedSource
N/A
N/A
0
KeyValuePair$KeyOrder$ShreddedWriter
0%
0/36
0%
0/14
0
KeyValuePair$KeyOrder$TupleShredder
0%
0/17
0%
0/8
0
KeyValuePair$KeyOrder$TupleUnshredder
0%
0/19
0%
0/2
0
KeyValuePair$Processor
N/A
N/A
0
KeyValuePair$Source
N/A
N/A
0
 
 1  
 // This file was automatically generated with the command: 
 2  
 //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
 3  
 package org.galagosearch.core.types;
 4  
 
 5  
 import org.galagosearch.tupleflow.Utility;
 6  
 import org.galagosearch.tupleflow.ArrayInput;
 7  
 import org.galagosearch.tupleflow.ArrayOutput;
 8  
 import org.galagosearch.tupleflow.Order;   
 9  
 import org.galagosearch.tupleflow.OrderedWriter;
 10  
 import org.galagosearch.tupleflow.Type; 
 11  
 import org.galagosearch.tupleflow.TypeReader;
 12  
 import org.galagosearch.tupleflow.Step; 
 13  
 import org.galagosearch.tupleflow.IncompatibleProcessorException;
 14  
 import org.galagosearch.tupleflow.ReaderSource;
 15  
 import java.io.IOException;             
 16  
 import java.io.EOFException;
 17  
 import java.io.UnsupportedEncodingException;
 18  
 import java.util.ArrayList;
 19  
 import java.util.Arrays;   
 20  
 import java.util.Comparator;
 21  
 import java.util.PriorityQueue;
 22  
 import java.util.Collection;
 23  
 
 24  
 public class KeyValuePair implements Type<KeyValuePair> {
 25  
     public byte[] key;
 26  
     public byte[] value; 
 27  
     
 28  4
     public KeyValuePair() {}
 29  8
     public KeyValuePair(byte[] key, byte[] value) {
 30  8
         this.key = key;
 31  8
         this.value = value;
 32  8
     }  
 33  
     
 34  
     public String toString() {
 35  
         try {
 36  0
             return String.format("%s,%s",
 37  
                                    new String(key, "UTF-8"), new String(value, "UTF-8"));
 38  0
         } catch(UnsupportedEncodingException e) {
 39  0
             throw new RuntimeException("Couldn't convert string to UTF-8.");
 40  
         }
 41  
     } 
 42  
 
 43  
     public Order<KeyValuePair> getOrder(String... spec) {
 44  4
         if (Arrays.equals(spec, new String[] { "+key" })) {
 45  4
             return new KeyOrder();
 46  
         }
 47  0
         return null;
 48  
     } 
 49  
       
 50  
     public interface Processor extends Step, org.galagosearch.tupleflow.Processor<KeyValuePair> {
 51  
         public void process(KeyValuePair object) throws IOException;
 52  
         public void close() throws IOException;
 53  
     }                        
 54  
     public interface Source extends Step {
 55  
     }
 56  16
     public static class KeyOrder implements Order<KeyValuePair> {
 57  
         public int hash(KeyValuePair object) {
 58  0
             int h = 0;
 59  0
             h += Utility.hash(object.key);
 60  0
             return h;
 61  
         } 
 62  
         public Comparator<KeyValuePair> greaterThan() {
 63  0
             return new Comparator<KeyValuePair>() {
 64  0
                 public int compare(KeyValuePair one, KeyValuePair two) {
 65  0
                     int result = 0;
 66  
                     do {
 67  0
                         result = + Utility.compare(one.key, two.key);
 68  0
                         if(result != 0) break;
 69  
                     } while (false);
 70  0
                     return -result;
 71  
                 }
 72  
             };
 73  
         }     
 74  
         public Comparator<KeyValuePair> lessThan() {
 75  0
             return new Comparator<KeyValuePair>() {
 76  0
                 public int compare(KeyValuePair one, KeyValuePair two) {
 77  0
                     int result = 0;
 78  
                     do {
 79  0
                         result = + Utility.compare(one.key, two.key);
 80  0
                         if(result != 0) break;
 81  
                     } while (false);
 82  0
                     return result;
 83  
                 }
 84  
             };
 85  
         }     
 86  
         public TypeReader<KeyValuePair> orderedReader(ArrayInput _input) {
 87  0
             return new ShreddedReader(_input);
 88  
         }    
 89  
 
 90  
         public TypeReader<KeyValuePair> orderedReader(ArrayInput _input, int bufferSize) {
 91  0
             return new ShreddedReader(_input, bufferSize);
 92  
         }    
 93  
         public OrderedWriter<KeyValuePair> orderedWriter(ArrayOutput _output) {
 94  0
             ShreddedWriter w = new ShreddedWriter(_output);
 95  0
             return new OrderedWriterClass(w); 
 96  
         }                                    
 97  0
         public static class OrderedWriterClass extends OrderedWriter< KeyValuePair > {
 98  0
             KeyValuePair last = null;
 99  0
             ShreddedWriter shreddedWriter = null; 
 100  
             
 101  0
             public OrderedWriterClass(ShreddedWriter s) {
 102  0
                 this.shreddedWriter = s;
 103  0
             }
 104  
             
 105  
             public void process(KeyValuePair object) throws IOException {
 106  0
                boolean processAll = false;
 107  0
                if (processAll || last == null || 0 != Utility.compare(object.key, last.key)) { processAll = true; shreddedWriter.processKey(object.key); }
 108  0
                shreddedWriter.processTuple(object.value);
 109  0
                last = object;
 110  0
             }           
 111  
                  
 112  
             public void close() throws IOException {
 113  0
                 shreddedWriter.close();
 114  0
             }
 115  
             
 116  
             public Class<KeyValuePair> getInputClass() {
 117  0
                 return KeyValuePair.class;
 118  
             }
 119  
         } 
 120  
         public ReaderSource<KeyValuePair> orderedCombiner(Collection<TypeReader<KeyValuePair>> readers, boolean closeOnExit) {
 121  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 122  
             
 123  0
             for (TypeReader<KeyValuePair> reader : readers) {
 124  0
                 shreddedReaders.add((ShreddedReader)reader);
 125  
             }
 126  
             
 127  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 128  
         }                  
 129  
         public KeyValuePair clone(KeyValuePair object) {
 130  0
             KeyValuePair result = new KeyValuePair();
 131  0
             if (object == null) return result;
 132  0
             result.key = object.key; 
 133  0
             result.value = object.value; 
 134  0
             return result;
 135  
         }                 
 136  
         public Class<KeyValuePair> getOrderedClass() {
 137  12
             return KeyValuePair.class;
 138  
         }                           
 139  
         public String[] getOrderSpec() {
 140  12
             return new String[] {"+key"};
 141  
         }
 142  
 
 143  
         public static String getSpecString() {
 144  0
             return "+key";
 145  
         }
 146  
                            
 147  
         public interface ShreddedProcessor extends Step {
 148  
             public void processKey(byte[] key) throws IOException;
 149  
             public void processTuple(byte[] value) throws IOException;
 150  
             public void close() throws IOException;
 151  
         }    
 152  
         public interface ShreddedSource extends Step {
 153  
         }                                              
 154  
         
 155  0
         public static class ShreddedWriter implements ShreddedProcessor {
 156  
             ArrayOutput output;
 157  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 158  
             byte[] lastKey;
 159  0
             boolean lastFlush = false;
 160  
             
 161  0
             public ShreddedWriter(ArrayOutput output) {
 162  0
                 this.output = output;
 163  0
             }                        
 164  
             
 165  
             public void close() throws IOException {
 166  0
                 flush();
 167  0
             }
 168  
             
 169  
             public void processKey(byte[] key) {
 170  0
                 lastKey = key;
 171  0
                 buffer.processKey(key);
 172  0
             }
 173  
             public final void processTuple(byte[] value) throws IOException {
 174  0
                 if (lastFlush) {
 175  0
                     if(buffer.keys.size() == 0) buffer.processKey(lastKey);
 176  0
                     lastFlush = false;
 177  
                 }
 178  0
                 buffer.processTuple(value);
 179  0
                 if (buffer.isFull())
 180  0
                     flush();
 181  0
             }
 182  
             public final void flushTuples(int pauseIndex) throws IOException {
 183  
                 
 184  0
                 while (buffer.getReadIndex() < pauseIndex) {
 185  
                            
 186  0
                     output.writeBytes(buffer.getValue());
 187  0
                     buffer.incrementTuple();
 188  
                 }
 189  0
             }  
 190  
             public final void flushKey(int pauseIndex) throws IOException {
 191  0
                 while (buffer.getReadIndex() < pauseIndex) {
 192  0
                     int nextPause = buffer.getKeyEndIndex();
 193  0
                     int count = nextPause - buffer.getReadIndex();
 194  
                     
 195  0
                     output.writeBytes(buffer.getKey());
 196  0
                     output.writeInt(count);
 197  0
                     buffer.incrementKey();
 198  
                       
 199  0
                     flushTuples(nextPause);
 200  0
                     assert nextPause == buffer.getReadIndex();
 201  0
                 }
 202  0
             }
 203  
             public void flush() throws IOException { 
 204  0
                 flushKey(buffer.getWriteIndex());
 205  0
                 buffer.reset(); 
 206  0
                 lastFlush = true;
 207  0
             }                           
 208  
         }
 209  0
         public static class ShreddedBuffer {
 210  0
             ArrayList<byte[]> keys = new ArrayList();
 211  0
             ArrayList<Integer> keyTupleIdx = new ArrayList();
 212  0
             int keyReadIdx = 0;
 213  
                             
 214  
             byte[][] values;
 215  0
             int writeTupleIndex = 0;
 216  0
             int readTupleIndex = 0;
 217  
             int batchSize;
 218  
 
 219  0
             public ShreddedBuffer(int batchSize) {
 220  0
                 this.batchSize = batchSize;
 221  
 
 222  0
                 values = new byte[batchSize][];
 223  0
             }                              
 224  
 
 225  
             public ShreddedBuffer() {    
 226  0
                 this(10000);
 227  0
             }                                                                                                                    
 228  
             
 229  
             public void processKey(byte[] key) {
 230  0
                 keys.add(key);
 231  0
                 keyTupleIdx.add(writeTupleIndex);
 232  0
             }                                      
 233  
             public void processTuple(byte[] value) {
 234  0
                 assert keys.size() > 0;
 235  0
                 values[writeTupleIndex] = value;
 236  0
                 writeTupleIndex++;
 237  0
             }
 238  
             public void resetData() {
 239  0
                 keys.clear();
 240  0
                 keyTupleIdx.clear();
 241  0
                 writeTupleIndex = 0;
 242  0
             }                  
 243  
                                  
 244  
             public void resetRead() {
 245  0
                 readTupleIndex = 0;
 246  0
                 keyReadIdx = 0;
 247  0
             } 
 248  
 
 249  
             public void reset() {
 250  0
                 resetData();
 251  0
                 resetRead();
 252  0
             } 
 253  
             public boolean isFull() {
 254  0
                 return writeTupleIndex >= batchSize;
 255  
             }
 256  
 
 257  
             public boolean isEmpty() {
 258  0
                 return writeTupleIndex == 0;
 259  
             }                          
 260  
 
 261  
             public boolean isAtEnd() {
 262  0
                 return readTupleIndex >= writeTupleIndex;
 263  
             }           
 264  
             public void incrementKey() {
 265  0
                 keyReadIdx++;  
 266  0
             }                                                                                              
 267  
 
 268  
             public void autoIncrementKey() {
 269  0
                 while (readTupleIndex >= getKeyEndIndex() && readTupleIndex < writeTupleIndex)
 270  0
                     keyReadIdx++;
 271  0
             }                 
 272  
             public void incrementTuple() {
 273  0
                 readTupleIndex++;
 274  0
             }                    
 275  
             public int getKeyEndIndex() {
 276  0
                 if ((keyReadIdx+1) >= keyTupleIdx.size())
 277  0
                     return writeTupleIndex;
 278  0
                 return keyTupleIdx.get(keyReadIdx+1);
 279  
             }
 280  
             public int getReadIndex() {
 281  0
                 return readTupleIndex;
 282  
             }   
 283  
 
 284  
             public int getWriteIndex() {
 285  0
                 return writeTupleIndex;
 286  
             } 
 287  
             public byte[] getKey() {
 288  0
                 assert readTupleIndex < writeTupleIndex;
 289  0
                 assert keyReadIdx < keys.size();
 290  
                 
 291  0
                 return keys.get(keyReadIdx);
 292  
             }
 293  
             public byte[] getValue() {
 294  0
                 assert readTupleIndex < writeTupleIndex;
 295  0
                 return values[readTupleIndex];
 296  
             }                                         
 297  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 298  0
                 while (getReadIndex() < endIndex) {
 299  0
                    output.processTuple(getValue());
 300  0
                    incrementTuple();
 301  
                 }
 302  0
             }                                                                           
 303  
             public void copyUntilIndexKey(int endIndex, ShreddedProcessor output) throws IOException {
 304  0
                 while (getReadIndex() < endIndex) {
 305  0
                     output.processKey(getKey());
 306  0
                     assert getKeyEndIndex() <= endIndex;
 307  0
                     copyTuples(getKeyEndIndex(), output);
 308  0
                     incrementKey();
 309  
                 }
 310  0
             }  
 311  
             public void copyUntilKey(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 312  0
                 while (!isAtEnd()) {
 313  0
                     if (other != null) {   
 314  0
                         assert !other.isAtEnd();
 315  0
                         int c = + Utility.compare(getKey(), other.getKey());
 316  
                     
 317  0
                         if (c > 0) {
 318  0
                             break;   
 319  
                         }
 320  
                         
 321  0
                         output.processKey(getKey());
 322  
                                       
 323  0
                         copyTuples(getKeyEndIndex(), output);
 324  0
                     } else {
 325  0
                         output.processKey(getKey());
 326  0
                         copyTuples(getKeyEndIndex(), output);
 327  
                     }
 328  0
                     incrementKey();  
 329  
                     
 330  
                
 331  
                 }
 332  0
             }
 333  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 334  0
                 copyUntilKey(other, output);
 335  0
             }
 336  
             
 337  
         }                         
 338  0
         public static class ShreddedCombiner implements ReaderSource<KeyValuePair>, ShreddedSource {   
 339  
             public ShreddedProcessor processor;
 340  
             Collection<ShreddedReader> readers;       
 341  0
             boolean closeOnExit = false;
 342  0
             boolean uninitialized = true;
 343  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 344  
             
 345  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 346  0
                 this.readers = readers;                                                       
 347  0
                 this.closeOnExit = closeOnExit;
 348  0
             }
 349  
                                   
 350  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 351  0
                 if (processor instanceof ShreddedProcessor) {
 352  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 353  0
                 } else if (processor instanceof KeyValuePair.Processor) {
 354  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((KeyValuePair.Processor) processor));
 355  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 356  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<KeyValuePair>) processor));
 357  
                 } else {
 358  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 359  
                 }
 360  0
             }                                
 361  
             
 362  
             public Class<KeyValuePair> getOutputClass() {
 363  0
                 return KeyValuePair.class;
 364  
             }
 365  
             
 366  
             public void initialize() throws IOException {
 367  0
                 for (ShreddedReader reader : readers) {
 368  0
                     reader.fill();                                        
 369  
                     
 370  0
                     if (!reader.getBuffer().isAtEnd())
 371  0
                         queue.add(reader);
 372  
                 }   
 373  
 
 374  0
                 uninitialized = false;
 375  0
             }
 376  
 
 377  
             public void run() throws IOException {
 378  0
                 initialize();
 379  
                
 380  0
                 while (queue.size() > 0) {
 381  0
                     ShreddedReader top = queue.poll();
 382  0
                     ShreddedReader next = null;
 383  0
                     ShreddedBuffer nextBuffer = null; 
 384  
                     
 385  0
                     assert !top.getBuffer().isAtEnd();
 386  
                                                   
 387  0
                     if (queue.size() > 0) {
 388  0
                         next = queue.peek();
 389  0
                         nextBuffer = next.getBuffer();
 390  0
                         assert !nextBuffer.isAtEnd();
 391  
                     }
 392  
                     
 393  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 394  0
                     if (top.getBuffer().isAtEnd())
 395  0
                         top.fill();                 
 396  
                         
 397  0
                     if (!top.getBuffer().isAtEnd())
 398  0
                         queue.add(top);
 399  0
                 }              
 400  
                 
 401  0
                 if (closeOnExit)
 402  0
                     processor.close();
 403  0
             }
 404  
 
 405  
             public KeyValuePair read() throws IOException {
 406  0
                 if (uninitialized)
 407  0
                     initialize();
 408  
 
 409  0
                 KeyValuePair result = null;
 410  
 
 411  0
                 while (queue.size() > 0) {
 412  0
                     ShreddedReader top = queue.poll();
 413  0
                     result = top.read();
 414  
 
 415  0
                     if (result != null) {
 416  0
                         if (top.getBuffer().isAtEnd())
 417  0
                             top.fill();
 418  
 
 419  0
                         queue.offer(top);
 420  0
                         break;
 421  
                     } 
 422  0
                 }
 423  
 
 424  0
                 return result;
 425  
             }
 426  
         } 
 427  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<KeyValuePair>, ShreddedSource {      
 428  
             public ShreddedProcessor processor;
 429  
             ShreddedBuffer buffer;
 430  0
             KeyValuePair last = new KeyValuePair();         
 431  0
             long updateKeyCount = -1;
 432  0
             long tupleCount = 0;
 433  0
             long bufferStartCount = 0;  
 434  
             ArrayInput input;
 435  
             
 436  0
             public ShreddedReader(ArrayInput input) {
 437  0
                 this.input = input; 
 438  0
                 this.buffer = new ShreddedBuffer();
 439  0
             }                               
 440  
             
 441  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 442  0
                 this.input = input;
 443  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 444  0
             }
 445  
                  
 446  
             public final int compareTo(ShreddedReader other) {
 447  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 448  
                 
 449  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 450  0
                     return 0;                 
 451  0
                 } else if (buffer.isAtEnd()) {
 452  0
                     return -1;
 453  0
                 } else if (otherBuffer.isAtEnd()) {
 454  0
                     return 1;
 455  
                 }
 456  
                                    
 457  0
                 int result = 0;
 458  
                 do {
 459  0
                     result = + Utility.compare(buffer.getKey(), otherBuffer.getKey());
 460  0
                     if(result != 0) break;
 461  
                 } while (false);                                             
 462  
                 
 463  0
                 return result;
 464  
             }
 465  
             
 466  
             public final ShreddedBuffer getBuffer() {
 467  0
                 return buffer;
 468  
             }                
 469  
             
 470  
             public final KeyValuePair read() throws IOException {
 471  0
                 if (buffer.isAtEnd()) {
 472  0
                     fill();             
 473  
                 
 474  0
                     if (buffer.isAtEnd()) {
 475  0
                         return null;
 476  
                     }
 477  
                 }
 478  
                       
 479  0
                 assert !buffer.isAtEnd();
 480  0
                 KeyValuePair result = new KeyValuePair();
 481  
                 
 482  0
                 result.key = buffer.getKey();
 483  0
                 result.value = buffer.getValue();
 484  
                 
 485  0
                 buffer.incrementTuple();
 486  0
                 buffer.autoIncrementKey();
 487  
                 
 488  0
                 return result;
 489  
             }           
 490  
             
 491  
             public final void fill() throws IOException {
 492  
                 try {   
 493  0
                     buffer.reset();
 494  
                     
 495  0
                     if (tupleCount != 0) {
 496  
                                                       
 497  0
                         if(updateKeyCount - tupleCount > 0) {
 498  0
                             buffer.keys.add(last.key);
 499  0
                             buffer.keyTupleIdx.add((int) (updateKeyCount - tupleCount));
 500  
                         }
 501  0
                         bufferStartCount = tupleCount;
 502  
                     }
 503  
                     
 504  0
                     while (!buffer.isFull()) {
 505  0
                         updateKey();
 506  0
                         buffer.processTuple(input.readBytes());
 507  0
                         tupleCount++;
 508  
                     }
 509  0
                 } catch(EOFException e) {}
 510  0
             }
 511  
 
 512  
             public final void updateKey() throws IOException {
 513  0
                 if (updateKeyCount > tupleCount)
 514  0
                     return;
 515  
                      
 516  0
                 last.key = input.readBytes();
 517  0
                 updateKeyCount = tupleCount + input.readInt();
 518  
                                       
 519  0
                 buffer.processKey(last.key);
 520  0
             }
 521  
 
 522  
             public void run() throws IOException {
 523  
                 while (true) {
 524  0
                     fill();
 525  
                     
 526  0
                     if (buffer.isAtEnd())
 527  0
                         break;
 528  
                     
 529  0
                     buffer.copyUntil(null, processor);
 530  
                 }      
 531  0
                 processor.close();
 532  0
             }
 533  
             
 534  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 535  0
                 if (processor instanceof ShreddedProcessor) {
 536  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 537  0
                 } else if (processor instanceof KeyValuePair.Processor) {
 538  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((KeyValuePair.Processor) processor));
 539  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 540  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<KeyValuePair>) processor));
 541  
                 } else {
 542  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 543  
                 }
 544  0
             }                                
 545  
             
 546  
             public Class<KeyValuePair> getOutputClass() {
 547  0
                 return KeyValuePair.class;
 548  
             }                
 549  
         }
 550  
         
 551  
         public static class DuplicateEliminator implements ShreddedProcessor {
 552  
             public ShreddedProcessor processor;
 553  0
             KeyValuePair last = new KeyValuePair();
 554  0
             boolean keyProcess = true;
 555  
                                            
 556  0
             public DuplicateEliminator() {}
 557  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 558  0
                 this.processor = processor;
 559  0
             }
 560  
             
 561  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 562  0
                 this.processor = processor;
 563  0
             }
 564  
 
 565  
             public void processKey(byte[] key) throws IOException {  
 566  0
                 if (keyProcess || Utility.compare(key, last.key) != 0) {
 567  0
                     last.key = key;
 568  0
                     processor.processKey(key);
 569  0
                     keyProcess = false;
 570  
                 }
 571  0
             }  
 572  
             
 573  
             public void resetKey() {
 574  0
                  keyProcess = true;
 575  0
             }                                                
 576  
                                
 577  
             public void processTuple(byte[] value) throws IOException {
 578  0
                 processor.processTuple(value);
 579  0
             } 
 580  
             
 581  
             public void close() throws IOException {
 582  0
                 processor.close();
 583  0
             }                    
 584  
         }
 585  
         public static class TupleUnshredder implements ShreddedProcessor {
 586  0
             KeyValuePair last = new KeyValuePair();
 587  
             public org.galagosearch.tupleflow.Processor<KeyValuePair> processor;                               
 588  
             
 589  0
             public TupleUnshredder(KeyValuePair.Processor processor) {
 590  0
                 this.processor = processor;
 591  0
             }         
 592  
             
 593  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<KeyValuePair> processor) {
 594  0
                 this.processor = processor;
 595  0
             }
 596  
             
 597  
             public KeyValuePair clone(KeyValuePair object) {
 598  0
                 KeyValuePair result = new KeyValuePair();
 599  0
                 if (object == null) return result;
 600  0
                 result.key = object.key; 
 601  0
                 result.value = object.value; 
 602  0
                 return result;
 603  
             }                 
 604  
             
 605  
             public void processKey(byte[] key) throws IOException {
 606  0
                 last.key = key;
 607  0
             }   
 608  
                 
 609  
             
 610  
             public void processTuple(byte[] value) throws IOException {
 611  0
                 last.value = value;
 612  0
                 processor.process(clone(last));
 613  0
             }               
 614  
             
 615  
             public void close() throws IOException {
 616  0
                 processor.close();
 617  0
             }
 618  
         }     
 619  16
         public static class TupleShredder implements Processor {
 620  0
             KeyValuePair last = new KeyValuePair();
 621  
             public ShreddedProcessor processor;
 622  
             
 623  0
             public TupleShredder(ShreddedProcessor processor) {
 624  0
                 this.processor = processor;
 625  0
             }                              
 626  
             
 627  
             public KeyValuePair clone(KeyValuePair object) {
 628  0
                 KeyValuePair result = new KeyValuePair();
 629  0
                 if (object == null) return result;
 630  0
                 result.key = object.key; 
 631  0
                 result.value = object.value; 
 632  0
                 return result;
 633  
             }                 
 634  
             
 635  
             public void process(KeyValuePair object) throws IOException {                                                                                                                                                   
 636  0
                 boolean processAll = false;
 637  0
                 if(last == null || Utility.compare(last.key, object.key) != 0 || processAll) { processor.processKey(object.key); processAll = true; }
 638  0
                 processor.processTuple(object.value);                                         
 639  0
             }
 640  
                           
 641  
             public Class<KeyValuePair> getInputClass() {
 642  0
                 return KeyValuePair.class;
 643  
             }
 644  
             
 645  
             public void close() throws IOException {
 646  0
                 processor.close();
 647  0
             }                     
 648  
         }
 649  
     } 
 650  
 }