View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class NumberedLink implements Type<NumberedLink> {
25      public long source;
26      public long destination; 
27      
28      public NumberedLink() {}
29      public NumberedLink(long source, long destination) {
30          this.source = source;
31          this.destination = destination;
32      }  
33      
34      public String toString() {
35              return String.format("%d,%d",
36                                     source, destination);
37      } 
38  
39      public Order<NumberedLink> getOrder(String... spec) {
40          if (Arrays.equals(spec, new String[] { "+source" })) {
41              return new SourceOrder();
42          }
43          if (Arrays.equals(spec, new String[] { "+destination" })) {
44              return new DestinationOrder();
45          }
46          return null;
47      } 
48        
49      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<NumberedLink> {
50          public void process(NumberedLink object) throws IOException;
51          public void close() throws IOException;
52      }                        
53      public interface Source extends Step {
54      }
55      public static class SourceOrder implements Order<NumberedLink> {
56          public int hash(NumberedLink object) {
57              int h = 0;
58              h += Utility.hash(object.source);
59              return h;
60          } 
61          public Comparator<NumberedLink> greaterThan() {
62              return new Comparator<NumberedLink>() {
63                  public int compare(NumberedLink one, NumberedLink two) {
64                      int result = 0;
65                      do {
66                          result = + Utility.compare(one.source, two.source);
67                          if(result != 0) break;
68                      } while (false);
69                      return -result;
70                  }
71              };
72          }     
73          public Comparator<NumberedLink> lessThan() {
74              return new Comparator<NumberedLink>() {
75                  public int compare(NumberedLink one, NumberedLink two) {
76                      int result = 0;
77                      do {
78                          result = + Utility.compare(one.source, two.source);
79                          if(result != 0) break;
80                      } while (false);
81                      return result;
82                  }
83              };
84          }     
85          public TypeReader<NumberedLink> orderedReader(ArrayInput _input) {
86              return new ShreddedReader(_input);
87          }    
88  
89          public TypeReader<NumberedLink> orderedReader(ArrayInput _input, int bufferSize) {
90              return new ShreddedReader(_input, bufferSize);
91          }    
92          public OrderedWriter<NumberedLink> orderedWriter(ArrayOutput _output) {
93              ShreddedWriter w = new ShreddedWriter(_output);
94              return new OrderedWriterClass(w); 
95          }                                    
96          public static class OrderedWriterClass extends OrderedWriter< NumberedLink > {
97              NumberedLink last = null;
98              ShreddedWriter shreddedWriter = null; 
99              
100             public OrderedWriterClass(ShreddedWriter s) {
101                 this.shreddedWriter = s;
102             }
103             
104             public void process(NumberedLink object) throws IOException {
105                boolean processAll = false;
106                if (processAll || last == null || 0 != Utility.compare(object.source, last.source)) { processAll = true; shreddedWriter.processSource(object.source); }
107                shreddedWriter.processTuple(object.destination);
108                last = object;
109             }           
110                  
111             public void close() throws IOException {
112                 shreddedWriter.close();
113             }
114             
115             public Class<NumberedLink> getInputClass() {
116                 return NumberedLink.class;
117             }
118         } 
119         public ReaderSource<NumberedLink> orderedCombiner(Collection<TypeReader<NumberedLink>> readers, boolean closeOnExit) {
120             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
121             
122             for (TypeReader<NumberedLink> reader : readers) {
123                 shreddedReaders.add((ShreddedReader)reader);
124             }
125             
126             return new ShreddedCombiner(shreddedReaders, closeOnExit);
127         }                  
128         public NumberedLink clone(NumberedLink object) {
129             NumberedLink result = new NumberedLink();
130             if (object == null) return result;
131             result.source = object.source; 
132             result.destination = object.destination; 
133             return result;
134         }                 
135         public Class<NumberedLink> getOrderedClass() {
136             return NumberedLink.class;
137         }                           
138         public String[] getOrderSpec() {
139             return new String[] {"+source"};
140         }
141 
142         public static String getSpecString() {
143             return "+source";
144         }
145                            
146         public interface ShreddedProcessor extends Step {
147             public void processSource(long source) throws IOException;
148             public void processTuple(long destination) throws IOException;
149             public void close() throws IOException;
150         }    
151         public interface ShreddedSource extends Step {
152         }                                              
153         
154         public static class ShreddedWriter implements ShreddedProcessor {
155             ArrayOutput output;
156             ShreddedBuffer buffer = new ShreddedBuffer();
157             long lastSource;
158             boolean lastFlush = false;
159             
160             public ShreddedWriter(ArrayOutput output) {
161                 this.output = output;
162             }                        
163             
164             public void close() throws IOException {
165                 flush();
166             }
167             
168             public void processSource(long source) {
169                 lastSource = source;
170                 buffer.processSource(source);
171             }
172             public final void processTuple(long destination) throws IOException {
173                 if (lastFlush) {
174                     if(buffer.sources.size() == 0) buffer.processSource(lastSource);
175                     lastFlush = false;
176                 }
177                 buffer.processTuple(destination);
178                 if (buffer.isFull())
179                     flush();
180             }
181             public final void flushTuples(int pauseIndex) throws IOException {
182                 
183                 while (buffer.getReadIndex() < pauseIndex) {
184                            
185                     output.writeLong(buffer.getDestination());
186                     buffer.incrementTuple();
187                 }
188             }  
189             public final void flushSource(int pauseIndex) throws IOException {
190                 while (buffer.getReadIndex() < pauseIndex) {
191                     int nextPause = buffer.getSourceEndIndex();
192                     int count = nextPause - buffer.getReadIndex();
193                     
194                     output.writeLong(buffer.getSource());
195                     output.writeInt(count);
196                     buffer.incrementSource();
197                       
198                     flushTuples(nextPause);
199                     assert nextPause == buffer.getReadIndex();
200                 }
201             }
202             public void flush() throws IOException { 
203                 flushSource(buffer.getWriteIndex());
204                 buffer.reset(); 
205                 lastFlush = true;
206             }                           
207         }
208         public static class ShreddedBuffer {
209             ArrayList<Long> sources = new ArrayList();
210             ArrayList<Integer> sourceTupleIdx = new ArrayList();
211             int sourceReadIdx = 0;
212                             
213             long[] destinations;
214             int writeTupleIndex = 0;
215             int readTupleIndex = 0;
216             int batchSize;
217 
218             public ShreddedBuffer(int batchSize) {
219                 this.batchSize = batchSize;
220 
221                 destinations = new long[batchSize];
222             }                              
223 
224             public ShreddedBuffer() {    
225                 this(10000);
226             }                                                                                                                    
227             
228             public void processSource(long source) {
229                 sources.add(source);
230                 sourceTupleIdx.add(writeTupleIndex);
231             }                                      
232             public void processTuple(long destination) {
233                 assert sources.size() > 0;
234                 destinations[writeTupleIndex] = destination;
235                 writeTupleIndex++;
236             }
237             public void resetData() {
238                 sources.clear();
239                 sourceTupleIdx.clear();
240                 writeTupleIndex = 0;
241             }                  
242                                  
243             public void resetRead() {
244                 readTupleIndex = 0;
245                 sourceReadIdx = 0;
246             } 
247 
248             public void reset() {
249                 resetData();
250                 resetRead();
251             } 
252             public boolean isFull() {
253                 return writeTupleIndex >= batchSize;
254             }
255 
256             public boolean isEmpty() {
257                 return writeTupleIndex == 0;
258             }                          
259 
260             public boolean isAtEnd() {
261                 return readTupleIndex >= writeTupleIndex;
262             }           
263             public void incrementSource() {
264                 sourceReadIdx++;  
265             }                                                                                              
266 
267             public void autoIncrementSource() {
268                 while (readTupleIndex >= getSourceEndIndex() && readTupleIndex < writeTupleIndex)
269                     sourceReadIdx++;
270             }                 
271             public void incrementTuple() {
272                 readTupleIndex++;
273             }                    
274             public int getSourceEndIndex() {
275                 if ((sourceReadIdx+1) >= sourceTupleIdx.size())
276                     return writeTupleIndex;
277                 return sourceTupleIdx.get(sourceReadIdx+1);
278             }
279             public int getReadIndex() {
280                 return readTupleIndex;
281             }   
282 
283             public int getWriteIndex() {
284                 return writeTupleIndex;
285             } 
286             public long getSource() {
287                 assert readTupleIndex < writeTupleIndex;
288                 assert sourceReadIdx < sources.size();
289                 
290                 return sources.get(sourceReadIdx);
291             }
292             public long getDestination() {
293                 assert readTupleIndex < writeTupleIndex;
294                 return destinations[readTupleIndex];
295             }                                         
296             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
297                 while (getReadIndex() < endIndex) {
298                    output.processTuple(getDestination());
299                    incrementTuple();
300                 }
301             }                                                                           
302             public void copyUntilIndexSource(int endIndex, ShreddedProcessor output) throws IOException {
303                 while (getReadIndex() < endIndex) {
304                     output.processSource(getSource());
305                     assert getSourceEndIndex() <= endIndex;
306                     copyTuples(getSourceEndIndex(), output);
307                     incrementSource();
308                 }
309             }  
310             public void copyUntilSource(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
311                 while (!isAtEnd()) {
312                     if (other != null) {   
313                         assert !other.isAtEnd();
314                         int c = + Utility.compare(getSource(), other.getSource());
315                     
316                         if (c > 0) {
317                             break;   
318                         }
319                         
320                         output.processSource(getSource());
321                                       
322                         copyTuples(getSourceEndIndex(), output);
323                     } else {
324                         output.processSource(getSource());
325                         copyTuples(getSourceEndIndex(), output);
326                     }
327                     incrementSource();  
328                     
329                
330                 }
331             }
332             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
333                 copyUntilSource(other, output);
334             }
335             
336         }                         
337         public static class ShreddedCombiner implements ReaderSource<NumberedLink>, ShreddedSource {   
338             public ShreddedProcessor processor;
339             Collection<ShreddedReader> readers;       
340             boolean closeOnExit = false;
341             boolean uninitialized = true;
342             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
343             
344             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
345                 this.readers = readers;                                                       
346                 this.closeOnExit = closeOnExit;
347             }
348                                   
349             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
350                 if (processor instanceof ShreddedProcessor) {
351                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
352                 } else if (processor instanceof NumberedLink.Processor) {
353                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedLink.Processor) processor));
354                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
355                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedLink>) processor));
356                 } else {
357                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
358                 }
359             }                                
360             
361             public Class<NumberedLink> getOutputClass() {
362                 return NumberedLink.class;
363             }
364             
365             public void initialize() throws IOException {
366                 for (ShreddedReader reader : readers) {
367                     reader.fill();                                        
368                     
369                     if (!reader.getBuffer().isAtEnd())
370                         queue.add(reader);
371                 }   
372 
373                 uninitialized = false;
374             }
375 
376             public void run() throws IOException {
377                 initialize();
378                
379                 while (queue.size() > 0) {
380                     ShreddedReader top = queue.poll();
381                     ShreddedReader next = null;
382                     ShreddedBuffer nextBuffer = null; 
383                     
384                     assert !top.getBuffer().isAtEnd();
385                                                   
386                     if (queue.size() > 0) {
387                         next = queue.peek();
388                         nextBuffer = next.getBuffer();
389                         assert !nextBuffer.isAtEnd();
390                     }
391                     
392                     top.getBuffer().copyUntil(nextBuffer, processor);
393                     if (top.getBuffer().isAtEnd())
394                         top.fill();                 
395                         
396                     if (!top.getBuffer().isAtEnd())
397                         queue.add(top);
398                 }              
399                 
400                 if (closeOnExit)
401                     processor.close();
402             }
403 
404             public NumberedLink read() throws IOException {
405                 if (uninitialized)
406                     initialize();
407 
408                 NumberedLink result = null;
409 
410                 while (queue.size() > 0) {
411                     ShreddedReader top = queue.poll();
412                     result = top.read();
413 
414                     if (result != null) {
415                         if (top.getBuffer().isAtEnd())
416                             top.fill();
417 
418                         queue.offer(top);
419                         break;
420                     } 
421                 }
422 
423                 return result;
424             }
425         } 
426         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedLink>, ShreddedSource {      
427             public ShreddedProcessor processor;
428             ShreddedBuffer buffer;
429             NumberedLink last = new NumberedLink();         
430             long updateSourceCount = -1;
431             long tupleCount = 0;
432             long bufferStartCount = 0;  
433             ArrayInput input;
434             
435             public ShreddedReader(ArrayInput input) {
436                 this.input = input; 
437                 this.buffer = new ShreddedBuffer();
438             }                               
439             
440             public ShreddedReader(ArrayInput input, int bufferSize) { 
441                 this.input = input;
442                 this.buffer = new ShreddedBuffer(bufferSize);
443             }
444                  
445             public final int compareTo(ShreddedReader other) {
446                 ShreddedBuffer otherBuffer = other.getBuffer();
447                 
448                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
449                     return 0;                 
450                 } else if (buffer.isAtEnd()) {
451                     return -1;
452                 } else if (otherBuffer.isAtEnd()) {
453                     return 1;
454                 }
455                                    
456                 int result = 0;
457                 do {
458                     result = + Utility.compare(buffer.getSource(), otherBuffer.getSource());
459                     if(result != 0) break;
460                 } while (false);                                             
461                 
462                 return result;
463             }
464             
465             public final ShreddedBuffer getBuffer() {
466                 return buffer;
467             }                
468             
469             public final NumberedLink read() throws IOException {
470                 if (buffer.isAtEnd()) {
471                     fill();             
472                 
473                     if (buffer.isAtEnd()) {
474                         return null;
475                     }
476                 }
477                       
478                 assert !buffer.isAtEnd();
479                 NumberedLink result = new NumberedLink();
480                 
481                 result.source = buffer.getSource();
482                 result.destination = buffer.getDestination();
483                 
484                 buffer.incrementTuple();
485                 buffer.autoIncrementSource();
486                 
487                 return result;
488             }           
489             
490             public final void fill() throws IOException {
491                 try {   
492                     buffer.reset();
493                     
494                     if (tupleCount != 0) {
495                                                       
496                         if(updateSourceCount - tupleCount > 0) {
497                             buffer.sources.add(last.source);
498                             buffer.sourceTupleIdx.add((int) (updateSourceCount - tupleCount));
499                         }
500                         bufferStartCount = tupleCount;
501                     }
502                     
503                     while (!buffer.isFull()) {
504                         updateSource();
505                         buffer.processTuple(input.readLong());
506                         tupleCount++;
507                     }
508                 } catch(EOFException e) {}
509             }
510 
511             public final void updateSource() throws IOException {
512                 if (updateSourceCount > tupleCount)
513                     return;
514                      
515                 last.source = input.readLong();
516                 updateSourceCount = tupleCount + input.readInt();
517                                       
518                 buffer.processSource(last.source);
519             }
520 
521             public void run() throws IOException {
522                 while (true) {
523                     fill();
524                     
525                     if (buffer.isAtEnd())
526                         break;
527                     
528                     buffer.copyUntil(null, processor);
529                 }      
530                 processor.close();
531             }
532             
533             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
534                 if (processor instanceof ShreddedProcessor) {
535                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
536                 } else if (processor instanceof NumberedLink.Processor) {
537                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedLink.Processor) processor));
538                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
539                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedLink>) processor));
540                 } else {
541                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
542                 }
543             }                                
544             
545             public Class<NumberedLink> getOutputClass() {
546                 return NumberedLink.class;
547             }                
548         }
549         
550         public static class DuplicateEliminator implements ShreddedProcessor {
551             public ShreddedProcessor processor;
552             NumberedLink last = new NumberedLink();
553             boolean sourceProcess = true;
554                                            
555             public DuplicateEliminator() {}
556             public DuplicateEliminator(ShreddedProcessor processor) {
557                 this.processor = processor;
558             }
559             
560             public void setShreddedProcessor(ShreddedProcessor processor) {
561                 this.processor = processor;
562             }
563 
564             public void processSource(long source) throws IOException {  
565                 if (sourceProcess || Utility.compare(source, last.source) != 0) {
566                     last.source = source;
567                     processor.processSource(source);
568                     sourceProcess = false;
569                 }
570             }  
571             
572             public void resetSource() {
573                  sourceProcess = true;
574             }                                                
575                                
576             public void processTuple(long destination) throws IOException {
577                 processor.processTuple(destination);
578             } 
579             
580             public void close() throws IOException {
581                 processor.close();
582             }                    
583         }
584         public static class TupleUnshredder implements ShreddedProcessor {
585             NumberedLink last = new NumberedLink();
586             public org.galagosearch.tupleflow.Processor<NumberedLink> processor;                               
587             
588             public TupleUnshredder(NumberedLink.Processor processor) {
589                 this.processor = processor;
590             }         
591             
592             public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedLink> processor) {
593                 this.processor = processor;
594             }
595             
596             public NumberedLink clone(NumberedLink object) {
597                 NumberedLink result = new NumberedLink();
598                 if (object == null) return result;
599                 result.source = object.source; 
600                 result.destination = object.destination; 
601                 return result;
602             }                 
603             
604             public void processSource(long source) throws IOException {
605                 last.source = source;
606             }   
607                 
608             
609             public void processTuple(long destination) throws IOException {
610                 last.destination = destination;
611                 processor.process(clone(last));
612             }               
613             
614             public void close() throws IOException {
615                 processor.close();
616             }
617         }     
618         public static class TupleShredder implements Processor {
619             NumberedLink last = new NumberedLink();
620             public ShreddedProcessor processor;
621             
622             public TupleShredder(ShreddedProcessor processor) {
623                 this.processor = processor;
624             }                              
625             
626             public NumberedLink clone(NumberedLink object) {
627                 NumberedLink result = new NumberedLink();
628                 if (object == null) return result;
629                 result.source = object.source; 
630                 result.destination = object.destination; 
631                 return result;
632             }                 
633             
634             public void process(NumberedLink object) throws IOException {                                                                                                                                                   
635                 boolean processAll = false;
636                 if(last == null || Utility.compare(last.source, object.source) != 0 || processAll) { processor.processSource(object.source); processAll = true; }
637                 processor.processTuple(object.destination);                                         
638             }
639                           
640             public Class<NumberedLink> getInputClass() {
641                 return NumberedLink.class;
642             }
643             
644             public void close() throws IOException {
645                 processor.close();
646             }                     
647         }
648     } 
649     public static class DestinationOrder implements Order<NumberedLink> {
650         public int hash(NumberedLink object) {
651             int h = 0;
652             h += Utility.hash(object.destination);
653             return h;
654         } 
655         public Comparator<NumberedLink> greaterThan() {
656             return new Comparator<NumberedLink>() {
657                 public int compare(NumberedLink one, NumberedLink two) {
658                     int result = 0;
659                     do {
660                         result = + Utility.compare(one.destination, two.destination);
661                         if(result != 0) break;
662                     } while (false);
663                     return -result;
664                 }
665             };
666         }     
667         public Comparator<NumberedLink> lessThan() {
668             return new Comparator<NumberedLink>() {
669                 public int compare(NumberedLink one, NumberedLink two) {
670                     int result = 0;
671                     do {
672                         result = + Utility.compare(one.destination, two.destination);
673                         if(result != 0) break;
674                     } while (false);
675                     return result;
676                 }
677             };
678         }     
679         public TypeReader<NumberedLink> orderedReader(ArrayInput _input) {
680             return new ShreddedReader(_input);
681         }    
682 
683         public TypeReader<NumberedLink> orderedReader(ArrayInput _input, int bufferSize) {
684             return new ShreddedReader(_input, bufferSize);
685         }    
686         public OrderedWriter<NumberedLink> orderedWriter(ArrayOutput _output) {
687             ShreddedWriter w = new ShreddedWriter(_output);
688             return new OrderedWriterClass(w); 
689         }                                    
690         public static class OrderedWriterClass extends OrderedWriter< NumberedLink > {
691             NumberedLink last = null;
692             ShreddedWriter shreddedWriter = null; 
693             
694             public OrderedWriterClass(ShreddedWriter s) {
695                 this.shreddedWriter = s;
696             }
697             
698             public void process(NumberedLink object) throws IOException {
699                boolean processAll = false;
700                if (processAll || last == null || 0 != Utility.compare(object.destination, last.destination)) { processAll = true; shreddedWriter.processDestination(object.destination); }
701                shreddedWriter.processTuple(object.source);
702                last = object;
703             }           
704                  
705             public void close() throws IOException {
706                 shreddedWriter.close();
707             }
708             
709             public Class<NumberedLink> getInputClass() {
710                 return NumberedLink.class;
711             }
712         } 
713         public ReaderSource<NumberedLink> orderedCombiner(Collection<TypeReader<NumberedLink>> readers, boolean closeOnExit) {
714             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
715             
716             for (TypeReader<NumberedLink> reader : readers) {
717                 shreddedReaders.add((ShreddedReader)reader);
718             }
719             
720             return new ShreddedCombiner(shreddedReaders, closeOnExit);
721         }                  
722         public NumberedLink clone(NumberedLink object) {
723             NumberedLink result = new NumberedLink();
724             if (object == null) return result;
725             result.source = object.source; 
726             result.destination = object.destination; 
727             return result;
728         }                 
729         public Class<NumberedLink> getOrderedClass() {
730             return NumberedLink.class;
731         }                           
732         public String[] getOrderSpec() {
733             return new String[] {"+destination"};
734         }
735 
736         public static String getSpecString() {
737             return "+destination";
738         }
739                            
740         public interface ShreddedProcessor extends Step {
741             public void processDestination(long destination) throws IOException;
742             public void processTuple(long source) throws IOException;
743             public void close() throws IOException;
744         }    
745         public interface ShreddedSource extends Step {
746         }                                              
747         
748         public static class ShreddedWriter implements ShreddedProcessor {
749             ArrayOutput output;
750             ShreddedBuffer buffer = new ShreddedBuffer();
751             long lastDestination;
752             boolean lastFlush = false;
753             
754             public ShreddedWriter(ArrayOutput output) {
755                 this.output = output;
756             }                        
757             
758             public void close() throws IOException {
759                 flush();
760             }
761             
762             public void processDestination(long destination) {
763                 lastDestination = destination;
764                 buffer.processDestination(destination);
765             }
766             public final void processTuple(long source) throws IOException {
767                 if (lastFlush) {
768                     if(buffer.destinations.size() == 0) buffer.processDestination(lastDestination);
769                     lastFlush = false;
770                 }
771                 buffer.processTuple(source);
772                 if (buffer.isFull())
773                     flush();
774             }
775             public final void flushTuples(int pauseIndex) throws IOException {
776                 
777                 while (buffer.getReadIndex() < pauseIndex) {
778                            
779                     output.writeLong(buffer.getSource());
780                     buffer.incrementTuple();
781                 }
782             }  
783             public final void flushDestination(int pauseIndex) throws IOException {
784                 while (buffer.getReadIndex() < pauseIndex) {
785                     int nextPause = buffer.getDestinationEndIndex();
786                     int count = nextPause - buffer.getReadIndex();
787                     
788                     output.writeLong(buffer.getDestination());
789                     output.writeInt(count);
790                     buffer.incrementDestination();
791                       
792                     flushTuples(nextPause);
793                     assert nextPause == buffer.getReadIndex();
794                 }
795             }
796             public void flush() throws IOException { 
797                 flushDestination(buffer.getWriteIndex());
798                 buffer.reset(); 
799                 lastFlush = true;
800             }                           
801         }
802         public static class ShreddedBuffer {
803             ArrayList<Long> destinations = new ArrayList();
804             ArrayList<Integer> destinationTupleIdx = new ArrayList();
805             int destinationReadIdx = 0;
806                             
807             long[] sources;
808             int writeTupleIndex = 0;
809             int readTupleIndex = 0;
810             int batchSize;
811 
812             public ShreddedBuffer(int batchSize) {
813                 this.batchSize = batchSize;
814 
815                 sources = new long[batchSize];
816             }                              
817 
818             public ShreddedBuffer() {    
819                 this(10000);
820             }                                                                                                                    
821             
822             public void processDestination(long destination) {
823                 destinations.add(destination);
824                 destinationTupleIdx.add(writeTupleIndex);
825             }                                      
826             public void processTuple(long source) {
827                 assert destinations.size() > 0;
828                 sources[writeTupleIndex] = source;
829                 writeTupleIndex++;
830             }
831             public void resetData() {
832                 destinations.clear();
833                 destinationTupleIdx.clear();
834                 writeTupleIndex = 0;
835             }                  
836                                  
837             public void resetRead() {
838                 readTupleIndex = 0;
839                 destinationReadIdx = 0;
840             } 
841 
842             public void reset() {
843                 resetData();
844                 resetRead();
845             } 
846             public boolean isFull() {
847                 return writeTupleIndex >= batchSize;
848             }
849 
850             public boolean isEmpty() {
851                 return writeTupleIndex == 0;
852             }                          
853 
854             public boolean isAtEnd() {
855                 return readTupleIndex >= writeTupleIndex;
856             }           
857             public void incrementDestination() {
858                 destinationReadIdx++;  
859             }                                                                                              
860 
861             public void autoIncrementDestination() {
862                 while (readTupleIndex >= getDestinationEndIndex() && readTupleIndex < writeTupleIndex)
863                     destinationReadIdx++;
864             }                 
865             public void incrementTuple() {
866                 readTupleIndex++;
867             }                    
868             public int getDestinationEndIndex() {
869                 if ((destinationReadIdx+1) >= destinationTupleIdx.size())
870                     return writeTupleIndex;
871                 return destinationTupleIdx.get(destinationReadIdx+1);
872             }
873             public int getReadIndex() {
874                 return readTupleIndex;
875             }   
876 
877             public int getWriteIndex() {
878                 return writeTupleIndex;
879             } 
880             public long getDestination() {
881                 assert readTupleIndex < writeTupleIndex;
882                 assert destinationReadIdx < destinations.size();
883                 
884                 return destinations.get(destinationReadIdx);
885             }
886             public long getSource() {
887                 assert readTupleIndex < writeTupleIndex;
888                 return sources[readTupleIndex];
889             }                                         
890             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
891                 while (getReadIndex() < endIndex) {
892                    output.processTuple(getSource());
893                    incrementTuple();
894                 }
895             }                                                                           
896             public void copyUntilIndexDestination(int endIndex, ShreddedProcessor output) throws IOException {
897                 while (getReadIndex() < endIndex) {
898                     output.processDestination(getDestination());
899                     assert getDestinationEndIndex() <= endIndex;
900                     copyTuples(getDestinationEndIndex(), output);
901                     incrementDestination();
902                 }
903             }  
904             public void copyUntilDestination(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
905                 while (!isAtEnd()) {
906                     if (other != null) {   
907                         assert !other.isAtEnd();
908                         int c = + Utility.compare(getDestination(), other.getDestination());
909                     
910                         if (c > 0) {
911                             break;   
912                         }
913                         
914                         output.processDestination(getDestination());
915                                       
916                         copyTuples(getDestinationEndIndex(), output);
917                     } else {
918                         output.processDestination(getDestination());
919                         copyTuples(getDestinationEndIndex(), output);
920                     }
921                     incrementDestination();  
922                     
923                
924                 }
925             }
926             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
927                 copyUntilDestination(other, output);
928             }
929             
930         }                         
931         public static class ShreddedCombiner implements ReaderSource<NumberedLink>, ShreddedSource {   
932             public ShreddedProcessor processor;
933             Collection<ShreddedReader> readers;       
934             boolean closeOnExit = false;
935             boolean uninitialized = true;
936             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
937             
938             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
939                 this.readers = readers;                                                       
940                 this.closeOnExit = closeOnExit;
941             }
942                                   
943             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
944                 if (processor instanceof ShreddedProcessor) {
945                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
946                 } else if (processor instanceof NumberedLink.Processor) {
947                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedLink.Processor) processor));
948                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
949                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedLink>) processor));
950                 } else {
951                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
952                 }
953             }                                
954             
955             public Class<NumberedLink> getOutputClass() {
956                 return NumberedLink.class;
957             }
958             
959             public void initialize() throws IOException {
960                 for (ShreddedReader reader : readers) {
961                     reader.fill();                                        
962                     
963                     if (!reader.getBuffer().isAtEnd())
964                         queue.add(reader);
965                 }   
966 
967                 uninitialized = false;
968             }
969 
970             public void run() throws IOException {
971                 initialize();
972                
973                 while (queue.size() > 0) {
974                     ShreddedReader top = queue.poll();
975                     ShreddedReader next = null;
976                     ShreddedBuffer nextBuffer = null; 
977                     
978                     assert !top.getBuffer().isAtEnd();
979                                                   
980                     if (queue.size() > 0) {
981                         next = queue.peek();
982                         nextBuffer = next.getBuffer();
983                         assert !nextBuffer.isAtEnd();
984                     }
985                     
986                     top.getBuffer().copyUntil(nextBuffer, processor);
987                     if (top.getBuffer().isAtEnd())
988                         top.fill();                 
989                         
990                     if (!top.getBuffer().isAtEnd())
991                         queue.add(top);
992                 }              
993                 
994                 if (closeOnExit)
995                     processor.close();
996             }
997 
998             public NumberedLink read() throws IOException {
999                 if (uninitialized)
1000                     initialize();
1001 
1002                 NumberedLink result = null;
1003 
1004                 while (queue.size() > 0) {
1005                     ShreddedReader top = queue.poll();
1006                     result = top.read();
1007 
1008                     if (result != null) {
1009                         if (top.getBuffer().isAtEnd())
1010                             top.fill();
1011 
1012                         queue.offer(top);
1013                         break;
1014                     } 
1015                 }
1016 
1017                 return result;
1018             }
1019         } 
1020         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedLink>, ShreddedSource {      
1021             public ShreddedProcessor processor;
1022             ShreddedBuffer buffer;
1023             NumberedLink last = new NumberedLink();         
1024             long updateDestinationCount = -1;
1025             long tupleCount = 0;
1026             long bufferStartCount = 0;  
1027             ArrayInput input;
1028             
1029             public ShreddedReader(ArrayInput input) {
1030                 this.input = input; 
1031                 this.buffer = new ShreddedBuffer();
1032             }                               
1033             
1034             public ShreddedReader(ArrayInput input, int bufferSize) { 
1035                 this.input = input;
1036                 this.buffer = new ShreddedBuffer(bufferSize);
1037             }
1038                  
1039             public final int compareTo(ShreddedReader other) {
1040                 ShreddedBuffer otherBuffer = other.getBuffer();
1041                 
1042                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1043                     return 0;                 
1044                 } else if (buffer.isAtEnd()) {
1045                     return -1;
1046                 } else if (otherBuffer.isAtEnd()) {
1047                     return 1;
1048                 }
1049                                    
1050                 int result = 0;
1051                 do {
1052                     result = + Utility.compare(buffer.getDestination(), otherBuffer.getDestination());
1053                     if(result != 0) break;
1054                 } while (false);                                             
1055                 
1056                 return result;
1057             }
1058             
1059             public final ShreddedBuffer getBuffer() {
1060                 return buffer;
1061             }                
1062             
1063             public final NumberedLink read() throws IOException {
1064                 if (buffer.isAtEnd()) {
1065                     fill();             
1066                 
1067                     if (buffer.isAtEnd()) {
1068                         return null;
1069                     }
1070                 }
1071                       
1072                 assert !buffer.isAtEnd();
1073                 NumberedLink result = new NumberedLink();
1074                 
1075                 result.destination = buffer.getDestination();
1076                 result.source = buffer.getSource();
1077                 
1078                 buffer.incrementTuple();
1079                 buffer.autoIncrementDestination();
1080                 
1081                 return result;
1082             }           
1083             
1084             public final void fill() throws IOException {
1085                 try {   
1086                     buffer.reset();
1087                     
1088                     if (tupleCount != 0) {
1089                                                       
1090                         if(updateDestinationCount - tupleCount > 0) {
1091                             buffer.destinations.add(last.destination);
1092                             buffer.destinationTupleIdx.add((int) (updateDestinationCount - tupleCount));
1093                         }
1094                         bufferStartCount = tupleCount;
1095                     }
1096                     
1097                     while (!buffer.isFull()) {
1098                         updateDestination();
1099                         buffer.processTuple(input.readLong());
1100                         tupleCount++;
1101                     }
1102                 } catch(EOFException e) {}
1103             }
1104 
1105             public final void updateDestination() throws IOException {
1106                 if (updateDestinationCount > tupleCount)
1107                     return;
1108                      
1109                 last.destination = input.readLong();
1110                 updateDestinationCount = tupleCount + input.readInt();
1111                                       
1112                 buffer.processDestination(last.destination);
1113             }
1114 
1115             public void run() throws IOException {
1116                 while (true) {
1117                     fill();
1118                     
1119                     if (buffer.isAtEnd())
1120                         break;
1121                     
1122                     buffer.copyUntil(null, processor);
1123                 }      
1124                 processor.close();
1125             }
1126             
1127             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1128                 if (processor instanceof ShreddedProcessor) {
1129                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1130                 } else if (processor instanceof NumberedLink.Processor) {
1131                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedLink.Processor) processor));
1132                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1133                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedLink>) processor));
1134                 } else {
1135                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1136                 }
1137             }                                
1138             
1139             public Class<NumberedLink> getOutputClass() {
1140                 return NumberedLink.class;
1141             }                
1142         }
1143         
1144         public static class DuplicateEliminator implements ShreddedProcessor {
1145             public ShreddedProcessor processor;
1146             NumberedLink last = new NumberedLink();
1147             boolean destinationProcess = true;
1148                                            
1149             public DuplicateEliminator() {}
1150             public DuplicateEliminator(ShreddedProcessor processor) {
1151                 this.processor = processor;
1152             }
1153             
1154             public void setShreddedProcessor(ShreddedProcessor processor) {
1155                 this.processor = processor;
1156             }
1157 
1158             public void processDestination(long destination) throws IOException {  
1159                 if (destinationProcess || Utility.compare(destination, last.destination) != 0) {
1160                     last.destination = destination;
1161                     processor.processDestination(destination);
1162                     destinationProcess = false;
1163                 }
1164             }  
1165             
1166             public void resetDestination() {
1167                  destinationProcess = true;
1168             }                                                
1169                                
1170             public void processTuple(long source) throws IOException {
1171                 processor.processTuple(source);
1172             } 
1173             
1174             public void close() throws IOException {
1175                 processor.close();
1176             }                    
1177         }
1178         public static class TupleUnshredder implements ShreddedProcessor {
1179             NumberedLink last = new NumberedLink();
1180             public org.galagosearch.tupleflow.Processor<NumberedLink> processor;                               
1181             
1182             public TupleUnshredder(NumberedLink.Processor processor) {
1183                 this.processor = processor;
1184             }         
1185             
1186             public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedLink> processor) {
1187                 this.processor = processor;
1188             }
1189             
1190             public NumberedLink clone(NumberedLink object) {
1191                 NumberedLink result = new NumberedLink();
1192                 if (object == null) return result;
1193                 result.source = object.source; 
1194                 result.destination = object.destination; 
1195                 return result;
1196             }                 
1197             
1198             public void processDestination(long destination) throws IOException {
1199                 last.destination = destination;
1200             }   
1201                 
1202             
1203             public void processTuple(long source) throws IOException {
1204                 last.source = source;
1205                 processor.process(clone(last));
1206             }               
1207             
1208             public void close() throws IOException {
1209                 processor.close();
1210             }
1211         }     
1212         public static class TupleShredder implements Processor {
1213             NumberedLink last = new NumberedLink();
1214             public ShreddedProcessor processor;
1215             
1216             public TupleShredder(ShreddedProcessor processor) {
1217                 this.processor = processor;
1218             }                              
1219             
1220             public NumberedLink clone(NumberedLink object) {
1221                 NumberedLink result = new NumberedLink();
1222                 if (object == null) return result;
1223                 result.source = object.source; 
1224                 result.destination = object.destination; 
1225                 return result;
1226             }                 
1227             
1228             public void process(NumberedLink object) throws IOException {                                                                                                                                                   
1229                 boolean processAll = false;
1230                 if(last == null || Utility.compare(last.destination, object.destination) != 0 || processAll) { processor.processDestination(object.destination); processAll = true; }
1231                 processor.processTuple(object.source);                                         
1232             }
1233                           
1234             public Class<NumberedLink> getInputClass() {
1235                 return NumberedLink.class;
1236             }
1237             
1238             public void close() throws IOException {
1239                 processor.close();
1240             }                     
1241         }
1242     } 
1243 }