Coverage Report - org.galagosearch.core.types.NumberedLink
 
Classes in this File Line Coverage Branch Coverage Complexity
NumberedLink
0%
0/11
0%
0/4
0
NumberedLink$DestinationOrder
0%
0/23
0%
0/4
0
NumberedLink$DestinationOrder$1
0%
0/5
0%
0/2
0
NumberedLink$DestinationOrder$2
0%
0/5
0%
0/2
0
NumberedLink$DestinationOrder$DuplicateEliminator
0%
0/19
0%
0/4
0
NumberedLink$DestinationOrder$OrderedWriterClass
0%
0/14
0%
0/6
0
NumberedLink$DestinationOrder$ShreddedBuffer
0%
0/74
0%
0/46
0
NumberedLink$DestinationOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
NumberedLink$DestinationOrder$ShreddedProcessor
N/A
N/A
0
NumberedLink$DestinationOrder$ShreddedReader
0%
0/69
0%
0/34
0
NumberedLink$DestinationOrder$ShreddedSource
N/A
N/A
0
NumberedLink$DestinationOrder$ShreddedWriter
0%
0/36
0%
0/14
0
NumberedLink$DestinationOrder$TupleShredder
0%
0/17
0%
0/8
0
NumberedLink$DestinationOrder$TupleUnshredder
0%
0/19
0%
0/2
0
NumberedLink$Processor
N/A
N/A
0
NumberedLink$Source
N/A
N/A
0
NumberedLink$SourceOrder
0%
0/23
0%
0/4
0
NumberedLink$SourceOrder$1
0%
0/5
0%
0/2
0
NumberedLink$SourceOrder$2
0%
0/5
0%
0/2
0
NumberedLink$SourceOrder$DuplicateEliminator
0%
0/19
0%
0/4
0
NumberedLink$SourceOrder$OrderedWriterClass
0%
0/14
0%
0/6
0
NumberedLink$SourceOrder$ShreddedBuffer
0%
0/74
0%
0/46
0
NumberedLink$SourceOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
NumberedLink$SourceOrder$ShreddedProcessor
N/A
N/A
0
NumberedLink$SourceOrder$ShreddedReader
0%
0/69
0%
0/34
0
NumberedLink$SourceOrder$ShreddedSource
N/A
N/A
0
NumberedLink$SourceOrder$ShreddedWriter
0%
0/36
0%
0/14
0
NumberedLink$SourceOrder$TupleShredder
0%
0/17
0%
0/8
0
NumberedLink$SourceOrder$TupleUnshredder
0%
0/19
0%
0/2
0
 
 1  
 // This file was automatically generated with the command: 
 2  
 //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
 3  
 package org.galagosearch.core.types;
 4  
 
 5  
 import org.galagosearch.tupleflow.Utility;
 6  
 import org.galagosearch.tupleflow.ArrayInput;
 7  
 import org.galagosearch.tupleflow.ArrayOutput;
 8  
 import org.galagosearch.tupleflow.Order;   
 9  
 import org.galagosearch.tupleflow.OrderedWriter;
 10  
 import org.galagosearch.tupleflow.Type; 
 11  
 import org.galagosearch.tupleflow.TypeReader;
 12  
 import org.galagosearch.tupleflow.Step; 
 13  
 import org.galagosearch.tupleflow.IncompatibleProcessorException;
 14  
 import org.galagosearch.tupleflow.ReaderSource;
 15  
 import java.io.IOException;             
 16  
 import java.io.EOFException;
 17  
 import java.io.UnsupportedEncodingException;
 18  
 import java.util.ArrayList;
 19  
 import java.util.Arrays;   
 20  
 import java.util.Comparator;
 21  
 import java.util.PriorityQueue;
 22  
 import java.util.Collection;
 23  
 
 24  
 public class NumberedLink implements Type<NumberedLink> {
 25  
     public long source;
 26  
     public long destination; 
 27  
     
 28  0
     public NumberedLink() {}
 29  0
     public NumberedLink(long source, long destination) {
 30  0
         this.source = source;
 31  0
         this.destination = destination;
 32  0
     }  
 33  
     
 34  
     public String toString() {
 35  0
             return String.format("%d,%d",
 36  
                                    source, destination);
 37  
     } 
 38  
 
 39  
     public Order<NumberedLink> getOrder(String... spec) {
 40  0
         if (Arrays.equals(spec, new String[] { "+source" })) {
 41  0
             return new SourceOrder();
 42  
         }
 43  0
         if (Arrays.equals(spec, new String[] { "+destination" })) {
 44  0
             return new DestinationOrder();
 45  
         }
 46  0
         return null;
 47  
     } 
 48  
       
 49  
     public interface Processor extends Step, org.galagosearch.tupleflow.Processor<NumberedLink> {
 50  
         public void process(NumberedLink object) throws IOException;
 51  
         public void close() throws IOException;
 52  
     }                        
 53  
     public interface Source extends Step {
 54  
     }
 55  0
     public static class SourceOrder implements Order<NumberedLink> {
 56  
         public int hash(NumberedLink object) {
 57  0
             int h = 0;
 58  0
             h += Utility.hash(object.source);
 59  0
             return h;
 60  
         } 
 61  
         public Comparator<NumberedLink> greaterThan() {
 62  0
             return new Comparator<NumberedLink>() {
 63  0
                 public int compare(NumberedLink one, NumberedLink two) {
 64  0
                     int result = 0;
 65  
                     do {
 66  0
                         result = + Utility.compare(one.source, two.source);
 67  0
                         if(result != 0) break;
 68  
                     } while (false);
 69  0
                     return -result;
 70  
                 }
 71  
             };
 72  
         }     
 73  
         public Comparator<NumberedLink> lessThan() {
 74  0
             return new Comparator<NumberedLink>() {
 75  0
                 public int compare(NumberedLink one, NumberedLink two) {
 76  0
                     int result = 0;
 77  
                     do {
 78  0
                         result = + Utility.compare(one.source, two.source);
 79  0
                         if(result != 0) break;
 80  
                     } while (false);
 81  0
                     return result;
 82  
                 }
 83  
             };
 84  
         }     
 85  
         public TypeReader<NumberedLink> orderedReader(ArrayInput _input) {
 86  0
             return new ShreddedReader(_input);
 87  
         }    
 88  
 
 89  
         public TypeReader<NumberedLink> orderedReader(ArrayInput _input, int bufferSize) {
 90  0
             return new ShreddedReader(_input, bufferSize);
 91  
         }    
 92  
         public OrderedWriter<NumberedLink> orderedWriter(ArrayOutput _output) {
 93  0
             ShreddedWriter w = new ShreddedWriter(_output);
 94  0
             return new OrderedWriterClass(w); 
 95  
         }                                    
 96  0
         public static class OrderedWriterClass extends OrderedWriter< NumberedLink > {
 97  0
             NumberedLink last = null;
 98  0
             ShreddedWriter shreddedWriter = null; 
 99  
             
 100  0
             public OrderedWriterClass(ShreddedWriter s) {
 101  0
                 this.shreddedWriter = s;
 102  0
             }
 103  
             
 104  
             public void process(NumberedLink object) throws IOException {
 105  0
                boolean processAll = false;
 106  0
                if (processAll || last == null || 0 != Utility.compare(object.source, last.source)) { processAll = true; shreddedWriter.processSource(object.source); }
 107  0
                shreddedWriter.processTuple(object.destination);
 108  0
                last = object;
 109  0
             }           
 110  
                  
 111  
             public void close() throws IOException {
 112  0
                 shreddedWriter.close();
 113  0
             }
 114  
             
 115  
             public Class<NumberedLink> getInputClass() {
 116  0
                 return NumberedLink.class;
 117  
             }
 118  
         } 
 119  
         public ReaderSource<NumberedLink> orderedCombiner(Collection<TypeReader<NumberedLink>> readers, boolean closeOnExit) {
 120  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 121  
             
 122  0
             for (TypeReader<NumberedLink> reader : readers) {
 123  0
                 shreddedReaders.add((ShreddedReader)reader);
 124  
             }
 125  
             
 126  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 127  
         }                  
 128  
         public NumberedLink clone(NumberedLink object) {
 129  0
             NumberedLink result = new NumberedLink();
 130  0
             if (object == null) return result;
 131  0
             result.source = object.source; 
 132  0
             result.destination = object.destination; 
 133  0
             return result;
 134  
         }                 
 135  
         public Class<NumberedLink> getOrderedClass() {
 136  0
             return NumberedLink.class;
 137  
         }                           
 138  
         public String[] getOrderSpec() {
 139  0
             return new String[] {"+source"};
 140  
         }
 141  
 
 142  
         public static String getSpecString() {
 143  0
             return "+source";
 144  
         }
 145  
                            
 146  
         public interface ShreddedProcessor extends Step {
 147  
             public void processSource(long source) throws IOException;
 148  
             public void processTuple(long destination) throws IOException;
 149  
             public void close() throws IOException;
 150  
         }    
 151  
         public interface ShreddedSource extends Step {
 152  
         }                                              
 153  
         
 154  0
         public static class ShreddedWriter implements ShreddedProcessor {
 155  
             ArrayOutput output;
 156  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 157  
             long lastSource;
 158  0
             boolean lastFlush = false;
 159  
             
 160  0
             public ShreddedWriter(ArrayOutput output) {
 161  0
                 this.output = output;
 162  0
             }                        
 163  
             
 164  
             public void close() throws IOException {
 165  0
                 flush();
 166  0
             }
 167  
             
 168  
             public void processSource(long source) {
 169  0
                 lastSource = source;
 170  0
                 buffer.processSource(source);
 171  0
             }
 172  
             public final void processTuple(long destination) throws IOException {
 173  0
                 if (lastFlush) {
 174  0
                     if(buffer.sources.size() == 0) buffer.processSource(lastSource);
 175  0
                     lastFlush = false;
 176  
                 }
 177  0
                 buffer.processTuple(destination);
 178  0
                 if (buffer.isFull())
 179  0
                     flush();
 180  0
             }
 181  
             public final void flushTuples(int pauseIndex) throws IOException {
 182  
                 
 183  0
                 while (buffer.getReadIndex() < pauseIndex) {
 184  
                            
 185  0
                     output.writeLong(buffer.getDestination());
 186  0
                     buffer.incrementTuple();
 187  
                 }
 188  0
             }  
 189  
             public final void flushSource(int pauseIndex) throws IOException {
 190  0
                 while (buffer.getReadIndex() < pauseIndex) {
 191  0
                     int nextPause = buffer.getSourceEndIndex();
 192  0
                     int count = nextPause - buffer.getReadIndex();
 193  
                     
 194  0
                     output.writeLong(buffer.getSource());
 195  0
                     output.writeInt(count);
 196  0
                     buffer.incrementSource();
 197  
                       
 198  0
                     flushTuples(nextPause);
 199  0
                     assert nextPause == buffer.getReadIndex();
 200  0
                 }
 201  0
             }
 202  
             public void flush() throws IOException { 
 203  0
                 flushSource(buffer.getWriteIndex());
 204  0
                 buffer.reset(); 
 205  0
                 lastFlush = true;
 206  0
             }                           
 207  
         }
 208  0
         public static class ShreddedBuffer {
 209  0
             ArrayList<Long> sources = new ArrayList();
 210  0
             ArrayList<Integer> sourceTupleIdx = new ArrayList();
 211  0
             int sourceReadIdx = 0;
 212  
                             
 213  
             long[] destinations;
 214  0
             int writeTupleIndex = 0;
 215  0
             int readTupleIndex = 0;
 216  
             int batchSize;
 217  
 
 218  0
             public ShreddedBuffer(int batchSize) {
 219  0
                 this.batchSize = batchSize;
 220  
 
 221  0
                 destinations = new long[batchSize];
 222  0
             }                              
 223  
 
 224  
             public ShreddedBuffer() {    
 225  0
                 this(10000);
 226  0
             }                                                                                                                    
 227  
             
 228  
             public void processSource(long source) {
 229  0
                 sources.add(source);
 230  0
                 sourceTupleIdx.add(writeTupleIndex);
 231  0
             }                                      
 232  
             public void processTuple(long destination) {
 233  0
                 assert sources.size() > 0;
 234  0
                 destinations[writeTupleIndex] = destination;
 235  0
                 writeTupleIndex++;
 236  0
             }
 237  
             public void resetData() {
 238  0
                 sources.clear();
 239  0
                 sourceTupleIdx.clear();
 240  0
                 writeTupleIndex = 0;
 241  0
             }                  
 242  
                                  
 243  
             public void resetRead() {
 244  0
                 readTupleIndex = 0;
 245  0
                 sourceReadIdx = 0;
 246  0
             } 
 247  
 
 248  
             public void reset() {
 249  0
                 resetData();
 250  0
                 resetRead();
 251  0
             } 
 252  
             public boolean isFull() {
 253  0
                 return writeTupleIndex >= batchSize;
 254  
             }
 255  
 
 256  
             public boolean isEmpty() {
 257  0
                 return writeTupleIndex == 0;
 258  
             }                          
 259  
 
 260  
             public boolean isAtEnd() {
 261  0
                 return readTupleIndex >= writeTupleIndex;
 262  
             }           
 263  
             public void incrementSource() {
 264  0
                 sourceReadIdx++;  
 265  0
             }                                                                                              
 266  
 
 267  
             public void autoIncrementSource() {
 268  0
                 while (readTupleIndex >= getSourceEndIndex() && readTupleIndex < writeTupleIndex)
 269  0
                     sourceReadIdx++;
 270  0
             }                 
 271  
             public void incrementTuple() {
 272  0
                 readTupleIndex++;
 273  0
             }                    
 274  
             public int getSourceEndIndex() {
 275  0
                 if ((sourceReadIdx+1) >= sourceTupleIdx.size())
 276  0
                     return writeTupleIndex;
 277  0
                 return sourceTupleIdx.get(sourceReadIdx+1);
 278  
             }
 279  
             public int getReadIndex() {
 280  0
                 return readTupleIndex;
 281  
             }   
 282  
 
 283  
             public int getWriteIndex() {
 284  0
                 return writeTupleIndex;
 285  
             } 
 286  
             public long getSource() {
 287  0
                 assert readTupleIndex < writeTupleIndex;
 288  0
                 assert sourceReadIdx < sources.size();
 289  
                 
 290  0
                 return sources.get(sourceReadIdx);
 291  
             }
 292  
             public long getDestination() {
 293  0
                 assert readTupleIndex < writeTupleIndex;
 294  0
                 return destinations[readTupleIndex];
 295  
             }                                         
 296  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 297  0
                 while (getReadIndex() < endIndex) {
 298  0
                    output.processTuple(getDestination());
 299  0
                    incrementTuple();
 300  
                 }
 301  0
             }                                                                           
 302  
             public void copyUntilIndexSource(int endIndex, ShreddedProcessor output) throws IOException {
 303  0
                 while (getReadIndex() < endIndex) {
 304  0
                     output.processSource(getSource());
 305  0
                     assert getSourceEndIndex() <= endIndex;
 306  0
                     copyTuples(getSourceEndIndex(), output);
 307  0
                     incrementSource();
 308  
                 }
 309  0
             }  
 310  
             public void copyUntilSource(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 311  0
                 while (!isAtEnd()) {
 312  0
                     if (other != null) {   
 313  0
                         assert !other.isAtEnd();
 314  0
                         int c = + Utility.compare(getSource(), other.getSource());
 315  
                     
 316  0
                         if (c > 0) {
 317  0
                             break;   
 318  
                         }
 319  
                         
 320  0
                         output.processSource(getSource());
 321  
                                       
 322  0
                         copyTuples(getSourceEndIndex(), output);
 323  0
                     } else {
 324  0
                         output.processSource(getSource());
 325  0
                         copyTuples(getSourceEndIndex(), output);
 326  
                     }
 327  0
                     incrementSource();  
 328  
                     
 329  
                
 330  
                 }
 331  0
             }
 332  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 333  0
                 copyUntilSource(other, output);
 334  0
             }
 335  
             
 336  
         }                         
 337  0
         public static class ShreddedCombiner implements ReaderSource<NumberedLink>, ShreddedSource {   
 338  
             public ShreddedProcessor processor;
 339  
             Collection<ShreddedReader> readers;       
 340  0
             boolean closeOnExit = false;
 341  0
             boolean uninitialized = true;
 342  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 343  
             
 344  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 345  0
                 this.readers = readers;                                                       
 346  0
                 this.closeOnExit = closeOnExit;
 347  0
             }
 348  
                                   
 349  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 350  0
                 if (processor instanceof ShreddedProcessor) {
 351  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 352  0
                 } else if (processor instanceof NumberedLink.Processor) {
 353  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedLink.Processor) processor));
 354  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 355  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedLink>) processor));
 356  
                 } else {
 357  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 358  
                 }
 359  0
             }                                
 360  
             
 361  
             public Class<NumberedLink> getOutputClass() {
 362  0
                 return NumberedLink.class;
 363  
             }
 364  
             
 365  
             public void initialize() throws IOException {
 366  0
                 for (ShreddedReader reader : readers) {
 367  0
                     reader.fill();                                        
 368  
                     
 369  0
                     if (!reader.getBuffer().isAtEnd())
 370  0
                         queue.add(reader);
 371  
                 }   
 372  
 
 373  0
                 uninitialized = false;
 374  0
             }
 375  
 
 376  
             public void run() throws IOException {
 377  0
                 initialize();
 378  
                
 379  0
                 while (queue.size() > 0) {
 380  0
                     ShreddedReader top = queue.poll();
 381  0
                     ShreddedReader next = null;
 382  0
                     ShreddedBuffer nextBuffer = null; 
 383  
                     
 384  0
                     assert !top.getBuffer().isAtEnd();
 385  
                                                   
 386  0
                     if (queue.size() > 0) {
 387  0
                         next = queue.peek();
 388  0
                         nextBuffer = next.getBuffer();
 389  0
                         assert !nextBuffer.isAtEnd();
 390  
                     }
 391  
                     
 392  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 393  0
                     if (top.getBuffer().isAtEnd())
 394  0
                         top.fill();                 
 395  
                         
 396  0
                     if (!top.getBuffer().isAtEnd())
 397  0
                         queue.add(top);
 398  0
                 }              
 399  
                 
 400  0
                 if (closeOnExit)
 401  0
                     processor.close();
 402  0
             }
 403  
 
 404  
             public NumberedLink read() throws IOException {
 405  0
                 if (uninitialized)
 406  0
                     initialize();
 407  
 
 408  0
                 NumberedLink result = null;
 409  
 
 410  0
                 while (queue.size() > 0) {
 411  0
                     ShreddedReader top = queue.poll();
 412  0
                     result = top.read();
 413  
 
 414  0
                     if (result != null) {
 415  0
                         if (top.getBuffer().isAtEnd())
 416  0
                             top.fill();
 417  
 
 418  0
                         queue.offer(top);
 419  0
                         break;
 420  
                     } 
 421  0
                 }
 422  
 
 423  0
                 return result;
 424  
             }
 425  
         } 
 426  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedLink>, ShreddedSource {      
 427  
             public ShreddedProcessor processor;
 428  
             ShreddedBuffer buffer;
 429  0
             NumberedLink last = new NumberedLink();         
 430  0
             long updateSourceCount = -1;
 431  0
             long tupleCount = 0;
 432  0
             long bufferStartCount = 0;  
 433  
             ArrayInput input;
 434  
             
 435  0
             public ShreddedReader(ArrayInput input) {
 436  0
                 this.input = input; 
 437  0
                 this.buffer = new ShreddedBuffer();
 438  0
             }                               
 439  
             
 440  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 441  0
                 this.input = input;
 442  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 443  0
             }
 444  
                  
 445  
             public final int compareTo(ShreddedReader other) {
 446  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 447  
                 
 448  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 449  0
                     return 0;                 
 450  0
                 } else if (buffer.isAtEnd()) {
 451  0
                     return -1;
 452  0
                 } else if (otherBuffer.isAtEnd()) {
 453  0
                     return 1;
 454  
                 }
 455  
                                    
 456  0
                 int result = 0;
 457  
                 do {
 458  0
                     result = + Utility.compare(buffer.getSource(), otherBuffer.getSource());
 459  0
                     if(result != 0) break;
 460  
                 } while (false);                                             
 461  
                 
 462  0
                 return result;
 463  
             }
 464  
             
 465  
             public final ShreddedBuffer getBuffer() {
 466  0
                 return buffer;
 467  
             }                
 468  
             
 469  
             public final NumberedLink read() throws IOException {
 470  0
                 if (buffer.isAtEnd()) {
 471  0
                     fill();             
 472  
                 
 473  0
                     if (buffer.isAtEnd()) {
 474  0
                         return null;
 475  
                     }
 476  
                 }
 477  
                       
 478  0
                 assert !buffer.isAtEnd();
 479  0
                 NumberedLink result = new NumberedLink();
 480  
                 
 481  0
                 result.source = buffer.getSource();
 482  0
                 result.destination = buffer.getDestination();
 483  
                 
 484  0
                 buffer.incrementTuple();
 485  0
                 buffer.autoIncrementSource();
 486  
                 
 487  0
                 return result;
 488  
             }           
 489  
             
 490  
             public final void fill() throws IOException {
 491  
                 try {   
 492  0
                     buffer.reset();
 493  
                     
 494  0
                     if (tupleCount != 0) {
 495  
                                                       
 496  0
                         if(updateSourceCount - tupleCount > 0) {
 497  0
                             buffer.sources.add(last.source);
 498  0
                             buffer.sourceTupleIdx.add((int) (updateSourceCount - tupleCount));
 499  
                         }
 500  0
                         bufferStartCount = tupleCount;
 501  
                     }
 502  
                     
 503  0
                     while (!buffer.isFull()) {
 504  0
                         updateSource();
 505  0
                         buffer.processTuple(input.readLong());
 506  0
                         tupleCount++;
 507  
                     }
 508  0
                 } catch(EOFException e) {}
 509  0
             }
 510  
 
 511  
             public final void updateSource() throws IOException {
 512  0
                 if (updateSourceCount > tupleCount)
 513  0
                     return;
 514  
                      
 515  0
                 last.source = input.readLong();
 516  0
                 updateSourceCount = tupleCount + input.readInt();
 517  
                                       
 518  0
                 buffer.processSource(last.source);
 519  0
             }
 520  
 
 521  
             public void run() throws IOException {
 522  
                 while (true) {
 523  0
                     fill();
 524  
                     
 525  0
                     if (buffer.isAtEnd())
 526  0
                         break;
 527  
                     
 528  0
                     buffer.copyUntil(null, processor);
 529  
                 }      
 530  0
                 processor.close();
 531  0
             }
 532  
             
 533  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 534  0
                 if (processor instanceof ShreddedProcessor) {
 535  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 536  0
                 } else if (processor instanceof NumberedLink.Processor) {
 537  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedLink.Processor) processor));
 538  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 539  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedLink>) processor));
 540  
                 } else {
 541  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 542  
                 }
 543  0
             }                                
 544  
             
 545  
             public Class<NumberedLink> getOutputClass() {
 546  0
                 return NumberedLink.class;
 547  
             }                
 548  
         }
 549  
         
 550  
         public static class DuplicateEliminator implements ShreddedProcessor {
 551  
             public ShreddedProcessor processor;
 552  0
             NumberedLink last = new NumberedLink();
 553  0
             boolean sourceProcess = true;
 554  
                                            
 555  0
             public DuplicateEliminator() {}
 556  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 557  0
                 this.processor = processor;
 558  0
             }
 559  
             
 560  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 561  0
                 this.processor = processor;
 562  0
             }
 563  
 
 564  
             public void processSource(long source) throws IOException {  
 565  0
                 if (sourceProcess || Utility.compare(source, last.source) != 0) {
 566  0
                     last.source = source;
 567  0
                     processor.processSource(source);
 568  0
                     sourceProcess = false;
 569  
                 }
 570  0
             }  
 571  
             
 572  
             public void resetSource() {
 573  0
                  sourceProcess = true;
 574  0
             }                                                
 575  
                                
 576  
             public void processTuple(long destination) throws IOException {
 577  0
                 processor.processTuple(destination);
 578  0
             } 
 579  
             
 580  
             public void close() throws IOException {
 581  0
                 processor.close();
 582  0
             }                    
 583  
         }
 584  
         public static class TupleUnshredder implements ShreddedProcessor {
 585  0
             NumberedLink last = new NumberedLink();
 586  
             public org.galagosearch.tupleflow.Processor<NumberedLink> processor;                               
 587  
             
 588  0
             public TupleUnshredder(NumberedLink.Processor processor) {
 589  0
                 this.processor = processor;
 590  0
             }         
 591  
             
 592  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedLink> processor) {
 593  0
                 this.processor = processor;
 594  0
             }
 595  
             
 596  
             public NumberedLink clone(NumberedLink object) {
 597  0
                 NumberedLink result = new NumberedLink();
 598  0
                 if (object == null) return result;
 599  0
                 result.source = object.source; 
 600  0
                 result.destination = object.destination; 
 601  0
                 return result;
 602  
             }                 
 603  
             
 604  
             public void processSource(long source) throws IOException {
 605  0
                 last.source = source;
 606  0
             }   
 607  
                 
 608  
             
 609  
             public void processTuple(long destination) throws IOException {
 610  0
                 last.destination = destination;
 611  0
                 processor.process(clone(last));
 612  0
             }               
 613  
             
 614  
             public void close() throws IOException {
 615  0
                 processor.close();
 616  0
             }
 617  
         }     
 618  0
         public static class TupleShredder implements Processor {
 619  0
             NumberedLink last = new NumberedLink();
 620  
             public ShreddedProcessor processor;
 621  
             
 622  0
             public TupleShredder(ShreddedProcessor processor) {
 623  0
                 this.processor = processor;
 624  0
             }                              
 625  
             
 626  
             public NumberedLink clone(NumberedLink object) {
 627  0
                 NumberedLink result = new NumberedLink();
 628  0
                 if (object == null) return result;
 629  0
                 result.source = object.source; 
 630  0
                 result.destination = object.destination; 
 631  0
                 return result;
 632  
             }                 
 633  
             
 634  
             public void process(NumberedLink object) throws IOException {                                                                                                                                                   
 635  0
                 boolean processAll = false;
 636  0
                 if(last == null || Utility.compare(last.source, object.source) != 0 || processAll) { processor.processSource(object.source); processAll = true; }
 637  0
                 processor.processTuple(object.destination);                                         
 638  0
             }
 639  
                           
 640  
             public Class<NumberedLink> getInputClass() {
 641  0
                 return NumberedLink.class;
 642  
             }
 643  
             
 644  
             public void close() throws IOException {
 645  0
                 processor.close();
 646  0
             }                     
 647  
         }
 648  
     } 
 649  0
     public static class DestinationOrder implements Order<NumberedLink> {
 650  
         public int hash(NumberedLink object) {
 651  0
             int h = 0;
 652  0
             h += Utility.hash(object.destination);
 653  0
             return h;
 654  
         } 
 655  
         public Comparator<NumberedLink> greaterThan() {
 656  0
             return new Comparator<NumberedLink>() {
 657  0
                 public int compare(NumberedLink one, NumberedLink two) {
 658  0
                     int result = 0;
 659  
                     do {
 660  0
                         result = + Utility.compare(one.destination, two.destination);
 661  0
                         if(result != 0) break;
 662  
                     } while (false);
 663  0
                     return -result;
 664  
                 }
 665  
             };
 666  
         }     
 667  
         public Comparator<NumberedLink> lessThan() {
 668  0
             return new Comparator<NumberedLink>() {
 669  0
                 public int compare(NumberedLink one, NumberedLink two) {
 670  0
                     int result = 0;
 671  
                     do {
 672  0
                         result = + Utility.compare(one.destination, two.destination);
 673  0
                         if(result != 0) break;
 674  
                     } while (false);
 675  0
                     return result;
 676  
                 }
 677  
             };
 678  
         }     
 679  
         public TypeReader<NumberedLink> orderedReader(ArrayInput _input) {
 680  0
             return new ShreddedReader(_input);
 681  
         }    
 682  
 
 683  
         public TypeReader<NumberedLink> orderedReader(ArrayInput _input, int bufferSize) {
 684  0
             return new ShreddedReader(_input, bufferSize);
 685  
         }    
 686  
         public OrderedWriter<NumberedLink> orderedWriter(ArrayOutput _output) {
 687  0
             ShreddedWriter w = new ShreddedWriter(_output);
 688  0
             return new OrderedWriterClass(w); 
 689  
         }                                    
 690  0
         public static class OrderedWriterClass extends OrderedWriter< NumberedLink > {
 691  0
             NumberedLink last = null;
 692  0
             ShreddedWriter shreddedWriter = null; 
 693  
             
 694  0
             public OrderedWriterClass(ShreddedWriter s) {
 695  0
                 this.shreddedWriter = s;
 696  0
             }
 697  
             
 698  
             public void process(NumberedLink object) throws IOException {
 699  0
                boolean processAll = false;
 700  0
                if (processAll || last == null || 0 != Utility.compare(object.destination, last.destination)) { processAll = true; shreddedWriter.processDestination(object.destination); }
 701  0
                shreddedWriter.processTuple(object.source);
 702  0
                last = object;
 703  0
             }           
 704  
                  
 705  
             public void close() throws IOException {
 706  0
                 shreddedWriter.close();
 707  0
             }
 708  
             
 709  
             public Class<NumberedLink> getInputClass() {
 710  0
                 return NumberedLink.class;
 711  
             }
 712  
         } 
 713  
         public ReaderSource<NumberedLink> orderedCombiner(Collection<TypeReader<NumberedLink>> readers, boolean closeOnExit) {
 714  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 715  
             
 716  0
             for (TypeReader<NumberedLink> reader : readers) {
 717  0
                 shreddedReaders.add((ShreddedReader)reader);
 718  
             }
 719  
             
 720  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 721  
         }                  
 722  
         public NumberedLink clone(NumberedLink object) {
 723  0
             NumberedLink result = new NumberedLink();
 724  0
             if (object == null) return result;
 725  0
             result.source = object.source; 
 726  0
             result.destination = object.destination; 
 727  0
             return result;
 728  
         }                 
 729  
         public Class<NumberedLink> getOrderedClass() {
 730  0
             return NumberedLink.class;
 731  
         }                           
 732  
         public String[] getOrderSpec() {
 733  0
             return new String[] {"+destination"};
 734  
         }
 735  
 
 736  
         public static String getSpecString() {
 737  0
             return "+destination";
 738  
         }
 739  
                            
 740  
         public interface ShreddedProcessor extends Step {
 741  
             public void processDestination(long destination) throws IOException;
 742  
             public void processTuple(long source) throws IOException;
 743  
             public void close() throws IOException;
 744  
         }    
 745  
         public interface ShreddedSource extends Step {
 746  
         }                                              
 747  
         
 748  0
         public static class ShreddedWriter implements ShreddedProcessor {
 749  
             ArrayOutput output;
 750  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 751  
             long lastDestination;
 752  0
             boolean lastFlush = false;
 753  
             
 754  0
             public ShreddedWriter(ArrayOutput output) {
 755  0
                 this.output = output;
 756  0
             }                        
 757  
             
 758  
             public void close() throws IOException {
 759  0
                 flush();
 760  0
             }
 761  
             
 762  
             public void processDestination(long destination) {
 763  0
                 lastDestination = destination;
 764  0
                 buffer.processDestination(destination);
 765  0
             }
 766  
             public final void processTuple(long source) throws IOException {
 767  0
                 if (lastFlush) {
 768  0
                     if(buffer.destinations.size() == 0) buffer.processDestination(lastDestination);
 769  0
                     lastFlush = false;
 770  
                 }
 771  0
                 buffer.processTuple(source);
 772  0
                 if (buffer.isFull())
 773  0
                     flush();
 774  0
             }
 775  
             public final void flushTuples(int pauseIndex) throws IOException {
 776  
                 
 777  0
                 while (buffer.getReadIndex() < pauseIndex) {
 778  
                            
 779  0
                     output.writeLong(buffer.getSource());
 780  0
                     buffer.incrementTuple();
 781  
                 }
 782  0
             }  
 783  
             public final void flushDestination(int pauseIndex) throws IOException {
 784  0
                 while (buffer.getReadIndex() < pauseIndex) {
 785  0
                     int nextPause = buffer.getDestinationEndIndex();
 786  0
                     int count = nextPause - buffer.getReadIndex();
 787  
                     
 788  0
                     output.writeLong(buffer.getDestination());
 789  0
                     output.writeInt(count);
 790  0
                     buffer.incrementDestination();
 791  
                       
 792  0
                     flushTuples(nextPause);
 793  0
                     assert nextPause == buffer.getReadIndex();
 794  0
                 }
 795  0
             }
 796  
             public void flush() throws IOException { 
 797  0
                 flushDestination(buffer.getWriteIndex());
 798  0
                 buffer.reset(); 
 799  0
                 lastFlush = true;
 800  0
             }                           
 801  
         }
 802  0
         public static class ShreddedBuffer {
 803  0
             ArrayList<Long> destinations = new ArrayList();
 804  0
             ArrayList<Integer> destinationTupleIdx = new ArrayList();
 805  0
             int destinationReadIdx = 0;
 806  
                             
 807  
             long[] sources;
 808  0
             int writeTupleIndex = 0;
 809  0
             int readTupleIndex = 0;
 810  
             int batchSize;
 811  
 
 812  0
             public ShreddedBuffer(int batchSize) {
 813  0
                 this.batchSize = batchSize;
 814  
 
 815  0
                 sources = new long[batchSize];
 816  0
             }                              
 817  
 
 818  
             public ShreddedBuffer() {    
 819  0
                 this(10000);
 820  0
             }                                                                                                                    
 821  
             
 822  
             public void processDestination(long destination) {
 823  0
                 destinations.add(destination);
 824  0
                 destinationTupleIdx.add(writeTupleIndex);
 825  0
             }                                      
 826  
             public void processTuple(long source) {
 827  0
                 assert destinations.size() > 0;
 828  0
                 sources[writeTupleIndex] = source;
 829  0
                 writeTupleIndex++;
 830  0
             }
 831  
             public void resetData() {
 832  0
                 destinations.clear();
 833  0
                 destinationTupleIdx.clear();
 834  0
                 writeTupleIndex = 0;
 835  0
             }                  
 836  
                                  
 837  
             public void resetRead() {
 838  0
                 readTupleIndex = 0;
 839  0
                 destinationReadIdx = 0;
 840  0
             } 
 841  
 
 842  
             public void reset() {
 843  0
                 resetData();
 844  0
                 resetRead();
 845  0
             } 
 846  
             public boolean isFull() {
 847  0
                 return writeTupleIndex >= batchSize;
 848  
             }
 849  
 
 850  
             public boolean isEmpty() {
 851  0
                 return writeTupleIndex == 0;
 852  
             }                          
 853  
 
 854  
             public boolean isAtEnd() {
 855  0
                 return readTupleIndex >= writeTupleIndex;
 856  
             }           
 857  
             public void incrementDestination() {
 858  0
                 destinationReadIdx++;  
 859  0
             }                                                                                              
 860  
 
 861  
             public void autoIncrementDestination() {
 862  0
                 while (readTupleIndex >= getDestinationEndIndex() && readTupleIndex < writeTupleIndex)
 863  0
                     destinationReadIdx++;
 864  0
             }                 
 865  
             public void incrementTuple() {
 866  0
                 readTupleIndex++;
 867  0
             }                    
 868  
             public int getDestinationEndIndex() {
 869  0
                 if ((destinationReadIdx+1) >= destinationTupleIdx.size())
 870  0
                     return writeTupleIndex;
 871  0
                 return destinationTupleIdx.get(destinationReadIdx+1);
 872  
             }
 873  
             public int getReadIndex() {
 874  0
                 return readTupleIndex;
 875  
             }   
 876  
 
 877  
             public int getWriteIndex() {
 878  0
                 return writeTupleIndex;
 879  
             } 
 880  
             public long getDestination() {
 881  0
                 assert readTupleIndex < writeTupleIndex;
 882  0
                 assert destinationReadIdx < destinations.size();
 883  
                 
 884  0
                 return destinations.get(destinationReadIdx);
 885  
             }
 886  
             public long getSource() {
 887  0
                 assert readTupleIndex < writeTupleIndex;
 888  0
                 return sources[readTupleIndex];
 889  
             }                                         
 890  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 891  0
                 while (getReadIndex() < endIndex) {
 892  0
                    output.processTuple(getSource());
 893  0
                    incrementTuple();
 894  
                 }
 895  0
             }                                                                           
 896  
             public void copyUntilIndexDestination(int endIndex, ShreddedProcessor output) throws IOException {
 897  0
                 while (getReadIndex() < endIndex) {
 898  0
                     output.processDestination(getDestination());
 899  0
                     assert getDestinationEndIndex() <= endIndex;
 900  0
                     copyTuples(getDestinationEndIndex(), output);
 901  0
                     incrementDestination();
 902  
                 }
 903  0
             }  
 904  
             public void copyUntilDestination(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 905  0
                 while (!isAtEnd()) {
 906  0
                     if (other != null) {   
 907  0
                         assert !other.isAtEnd();
 908  0
                         int c = + Utility.compare(getDestination(), other.getDestination());
 909  
                     
 910  0
                         if (c > 0) {
 911  0
                             break;   
 912  
                         }
 913  
                         
 914  0
                         output.processDestination(getDestination());
 915  
                                       
 916  0
                         copyTuples(getDestinationEndIndex(), output);
 917  0
                     } else {
 918  0
                         output.processDestination(getDestination());
 919  0
                         copyTuples(getDestinationEndIndex(), output);
 920  
                     }
 921  0
                     incrementDestination();  
 922  
                     
 923  
                
 924  
                 }
 925  0
             }
 926  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 927  0
                 copyUntilDestination(other, output);
 928  0
             }
 929  
             
 930  
         }                         
 931  0
         public static class ShreddedCombiner implements ReaderSource<NumberedLink>, ShreddedSource {   
 932  
             public ShreddedProcessor processor;
 933  
             Collection<ShreddedReader> readers;       
 934  0
             boolean closeOnExit = false;
 935  0
             boolean uninitialized = true;
 936  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 937  
             
 938  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 939  0
                 this.readers = readers;                                                       
 940  0
                 this.closeOnExit = closeOnExit;
 941  0
             }
 942  
                                   
 943  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 944  0
                 if (processor instanceof ShreddedProcessor) {
 945  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 946  0
                 } else if (processor instanceof NumberedLink.Processor) {
 947  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedLink.Processor) processor));
 948  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 949  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedLink>) processor));
 950  
                 } else {
 951  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 952  
                 }
 953  0
             }                                
 954  
             
 955  
             public Class<NumberedLink> getOutputClass() {
 956  0
                 return NumberedLink.class;
 957  
             }
 958  
             
 959  
             public void initialize() throws IOException {
 960  0
                 for (ShreddedReader reader : readers) {
 961  0
                     reader.fill();                                        
 962  
                     
 963  0
                     if (!reader.getBuffer().isAtEnd())
 964  0
                         queue.add(reader);
 965  
                 }   
 966  
 
 967  0
                 uninitialized = false;
 968  0
             }
 969  
 
 970  
             public void run() throws IOException {
 971  0
                 initialize();
 972  
                
 973  0
                 while (queue.size() > 0) {
 974  0
                     ShreddedReader top = queue.poll();
 975  0
                     ShreddedReader next = null;
 976  0
                     ShreddedBuffer nextBuffer = null; 
 977  
                     
 978  0
                     assert !top.getBuffer().isAtEnd();
 979  
                                                   
 980  0
                     if (queue.size() > 0) {
 981  0
                         next = queue.peek();
 982  0
                         nextBuffer = next.getBuffer();
 983  0
                         assert !nextBuffer.isAtEnd();
 984  
                     }
 985  
                     
 986  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 987  0
                     if (top.getBuffer().isAtEnd())
 988  0
                         top.fill();                 
 989  
                         
 990  0
                     if (!top.getBuffer().isAtEnd())
 991  0
                         queue.add(top);
 992  0
                 }              
 993  
                 
 994  0
                 if (closeOnExit)
 995  0
                     processor.close();
 996  0
             }
 997  
 
 998  
             public NumberedLink read() throws IOException {
 999  0
                 if (uninitialized)
 1000  0
                     initialize();
 1001  
 
 1002  0
                 NumberedLink result = null;
 1003  
 
 1004  0
                 while (queue.size() > 0) {
 1005  0
                     ShreddedReader top = queue.poll();
 1006  0
                     result = top.read();
 1007  
 
 1008  0
                     if (result != null) {
 1009  0
                         if (top.getBuffer().isAtEnd())
 1010  0
                             top.fill();
 1011  
 
 1012  0
                         queue.offer(top);
 1013  0
                         break;
 1014  
                     } 
 1015  0
                 }
 1016  
 
 1017  0
                 return result;
 1018  
             }
 1019  
         } 
 1020  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedLink>, ShreddedSource {      
 1021  
             public ShreddedProcessor processor;
 1022  
             ShreddedBuffer buffer;
 1023  0
             NumberedLink last = new NumberedLink();         
 1024  0
             long updateDestinationCount = -1;
 1025  0
             long tupleCount = 0;
 1026  0
             long bufferStartCount = 0;  
 1027  
             ArrayInput input;
 1028  
             
 1029  0
             public ShreddedReader(ArrayInput input) {
 1030  0
                 this.input = input; 
 1031  0
                 this.buffer = new ShreddedBuffer();
 1032  0
             }                               
 1033  
             
 1034  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 1035  0
                 this.input = input;
 1036  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 1037  0
             }
 1038  
                  
 1039  
             public final int compareTo(ShreddedReader other) {
 1040  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 1041  
                 
 1042  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 1043  0
                     return 0;                 
 1044  0
                 } else if (buffer.isAtEnd()) {
 1045  0
                     return -1;
 1046  0
                 } else if (otherBuffer.isAtEnd()) {
 1047  0
                     return 1;
 1048  
                 }
 1049  
                                    
 1050  0
                 int result = 0;
 1051  
                 do {
 1052  0
                     result = + Utility.compare(buffer.getDestination(), otherBuffer.getDestination());
 1053  0
                     if(result != 0) break;
 1054  
                 } while (false);                                             
 1055  
                 
 1056  0
                 return result;
 1057  
             }
 1058  
             
 1059  
             public final ShreddedBuffer getBuffer() {
 1060  0
                 return buffer;
 1061  
             }                
 1062  
             
 1063  
             public final NumberedLink read() throws IOException {
 1064  0
                 if (buffer.isAtEnd()) {
 1065  0
                     fill();             
 1066  
                 
 1067  0
                     if (buffer.isAtEnd()) {
 1068  0
                         return null;
 1069  
                     }
 1070  
                 }
 1071  
                       
 1072  0
                 assert !buffer.isAtEnd();
 1073  0
                 NumberedLink result = new NumberedLink();
 1074  
                 
 1075  0
                 result.destination = buffer.getDestination();
 1076  0
                 result.source = buffer.getSource();
 1077  
                 
 1078  0
                 buffer.incrementTuple();
 1079  0
                 buffer.autoIncrementDestination();
 1080  
                 
 1081  0
                 return result;
 1082  
             }           
 1083  
             
 1084  
             public final void fill() throws IOException {
 1085  
                 try {   
 1086  0
                     buffer.reset();
 1087  
                     
 1088  0
                     if (tupleCount != 0) {
 1089  
                                                       
 1090  0
                         if(updateDestinationCount - tupleCount > 0) {
 1091  0
                             buffer.destinations.add(last.destination);
 1092  0
                             buffer.destinationTupleIdx.add((int) (updateDestinationCount - tupleCount));
 1093  
                         }
 1094  0
                         bufferStartCount = tupleCount;
 1095  
                     }
 1096  
                     
 1097  0
                     while (!buffer.isFull()) {
 1098  0
                         updateDestination();
 1099  0
                         buffer.processTuple(input.readLong());
 1100  0
                         tupleCount++;
 1101  
                     }
 1102  0
                 } catch(EOFException e) {}
 1103  0
             }
 1104  
 
 1105  
             public final void updateDestination() throws IOException {
 1106  0
                 if (updateDestinationCount > tupleCount)
 1107  0
                     return;
 1108  
                      
 1109  0
                 last.destination = input.readLong();
 1110  0
                 updateDestinationCount = tupleCount + input.readInt();
 1111  
                                       
 1112  0
                 buffer.processDestination(last.destination);
 1113  0
             }
 1114  
 
 1115  
             public void run() throws IOException {
 1116  
                 while (true) {
 1117  0
                     fill();
 1118  
                     
 1119  0
                     if (buffer.isAtEnd())
 1120  0
                         break;
 1121  
                     
 1122  0
                     buffer.copyUntil(null, processor);
 1123  
                 }      
 1124  0
                 processor.close();
 1125  0
             }
 1126  
             
 1127  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1128  0
                 if (processor instanceof ShreddedProcessor) {
 1129  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1130  0
                 } else if (processor instanceof NumberedLink.Processor) {
 1131  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedLink.Processor) processor));
 1132  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1133  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedLink>) processor));
 1134  
                 } else {
 1135  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1136  
                 }
 1137  0
             }                                
 1138  
             
 1139  
             public Class<NumberedLink> getOutputClass() {
 1140  0
                 return NumberedLink.class;
 1141  
             }                
 1142  
         }
 1143  
         
 1144  
         public static class DuplicateEliminator implements ShreddedProcessor {
 1145  
             public ShreddedProcessor processor;
 1146  0
             NumberedLink last = new NumberedLink();
 1147  0
             boolean destinationProcess = true;
 1148  
                                            
 1149  0
             public DuplicateEliminator() {}
 1150  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 1151  0
                 this.processor = processor;
 1152  0
             }
 1153  
             
 1154  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 1155  0
                 this.processor = processor;
 1156  0
             }
 1157  
 
 1158  
             public void processDestination(long destination) throws IOException {  
 1159  0
                 if (destinationProcess || Utility.compare(destination, last.destination) != 0) {
 1160  0
                     last.destination = destination;
 1161  0
                     processor.processDestination(destination);
 1162  0
                     destinationProcess = false;
 1163  
                 }
 1164  0
             }  
 1165  
             
 1166  
             public void resetDestination() {
 1167  0
                  destinationProcess = true;
 1168  0
             }                                                
 1169  
                                
 1170  
             public void processTuple(long source) throws IOException {
 1171  0
                 processor.processTuple(source);
 1172  0
             } 
 1173  
             
 1174  
             public void close() throws IOException {
 1175  0
                 processor.close();
 1176  0
             }                    
 1177  
         }
 1178  
         public static class TupleUnshredder implements ShreddedProcessor {
 1179  0
             NumberedLink last = new NumberedLink();
 1180  
             public org.galagosearch.tupleflow.Processor<NumberedLink> processor;                               
 1181  
             
 1182  0
             public TupleUnshredder(NumberedLink.Processor processor) {
 1183  0
                 this.processor = processor;
 1184  0
             }         
 1185  
             
 1186  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedLink> processor) {
 1187  0
                 this.processor = processor;
 1188  0
             }
 1189  
             
 1190  
             public NumberedLink clone(NumberedLink object) {
 1191  0
                 NumberedLink result = new NumberedLink();
 1192  0
                 if (object == null) return result;
 1193  0
                 result.source = object.source; 
 1194  0
                 result.destination = object.destination; 
 1195  0
                 return result;
 1196  
             }                 
 1197  
             
 1198  
             public void processDestination(long destination) throws IOException {
 1199  0
                 last.destination = destination;
 1200  0
             }   
 1201  
                 
 1202  
             
 1203  
             public void processTuple(long source) throws IOException {
 1204  0
                 last.source = source;
 1205  0
                 processor.process(clone(last));
 1206  0
             }               
 1207  
             
 1208  
             public void close() throws IOException {
 1209  0
                 processor.close();
 1210  0
             }
 1211  
         }     
 1212  0
         public static class TupleShredder implements Processor {
 1213  0
             NumberedLink last = new NumberedLink();
 1214  
             public ShreddedProcessor processor;
 1215  
             
 1216  0
             public TupleShredder(ShreddedProcessor processor) {
 1217  0
                 this.processor = processor;
 1218  0
             }                              
 1219  
             
 1220  
             public NumberedLink clone(NumberedLink object) {
 1221  0
                 NumberedLink result = new NumberedLink();
 1222  0
                 if (object == null) return result;
 1223  0
                 result.source = object.source; 
 1224  0
                 result.destination = object.destination; 
 1225  0
                 return result;
 1226  
             }                 
 1227  
             
 1228  
             public void process(NumberedLink object) throws IOException {                                                                                                                                                   
 1229  0
                 boolean processAll = false;
 1230  0
                 if(last == null || Utility.compare(last.destination, object.destination) != 0 || processAll) { processor.processDestination(object.destination); processAll = true; }
 1231  0
                 processor.processTuple(object.source);                                         
 1232  0
             }
 1233  
                           
 1234  
             public Class<NumberedLink> getInputClass() {
 1235  0
                 return NumberedLink.class;
 1236  
             }
 1237  
             
 1238  
             public void close() throws IOException {
 1239  0
                 processor.close();
 1240  0
             }                     
 1241  
         }
 1242  
     } 
 1243  
 }