View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class NumberedExtent implements Type<NumberedExtent> {
25      public byte[] extentName;
26      public long number;
27      public int begin;
28      public int end; 
29      
30      public NumberedExtent() {}
31      public NumberedExtent(byte[] extentName, long number, int begin, int end) {
32          this.extentName = extentName;
33          this.number = number;
34          this.begin = begin;
35          this.end = end;
36      }  
37      
38      public String toString() {
39          try {
40              return String.format("%s,%d,%d,%d",
41                                     new String(extentName, "UTF-8"), number, begin, end);
42          } catch(UnsupportedEncodingException e) {
43              throw new RuntimeException("Couldn't convert string to UTF-8.");
44          }
45      } 
46  
47      public Order<NumberedExtent> getOrder(String... spec) {
48          if (Arrays.equals(spec, new String[] { "+extentName", "+number", "+begin" })) {
49              return new ExtentNameNumberBeginOrder();
50          }
51          return null;
52      } 
53        
54      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<NumberedExtent> {
55          public void process(NumberedExtent object) throws IOException;
56          public void close() throws IOException;
57      }                        
58      public interface Source extends Step {
59      }
60      public static class ExtentNameNumberBeginOrder implements Order<NumberedExtent> {
61          public int hash(NumberedExtent object) {
62              int h = 0;
63              h += Utility.hash(object.extentName);
64              h += Utility.hash(object.number);
65              h += Utility.hash(object.begin);
66              return h;
67          } 
68          public Comparator<NumberedExtent> greaterThan() {
69              return new Comparator<NumberedExtent>() {
70                  public int compare(NumberedExtent one, NumberedExtent two) {
71                      int result = 0;
72                      do {
73                          result = + Utility.compare(one.extentName, two.extentName);
74                          if(result != 0) break;
75                          result = + Utility.compare(one.number, two.number);
76                          if(result != 0) break;
77                          result = + Utility.compare(one.begin, two.begin);
78                          if(result != 0) break;
79                      } while (false);
80                      return -result;
81                  }
82              };
83          }     
84          public Comparator<NumberedExtent> lessThan() {
85              return new Comparator<NumberedExtent>() {
86                  public int compare(NumberedExtent one, NumberedExtent two) {
87                      int result = 0;
88                      do {
89                          result = + Utility.compare(one.extentName, two.extentName);
90                          if(result != 0) break;
91                          result = + Utility.compare(one.number, two.number);
92                          if(result != 0) break;
93                          result = + Utility.compare(one.begin, two.begin);
94                          if(result != 0) break;
95                      } while (false);
96                      return result;
97                  }
98              };
99          }     
100         public TypeReader<NumberedExtent> orderedReader(ArrayInput _input) {
101             return new ShreddedReader(_input);
102         }    
103 
104         public TypeReader<NumberedExtent> orderedReader(ArrayInput _input, int bufferSize) {
105             return new ShreddedReader(_input, bufferSize);
106         }    
107         public OrderedWriter<NumberedExtent> orderedWriter(ArrayOutput _output) {
108             ShreddedWriter w = new ShreddedWriter(_output);
109             return new OrderedWriterClass(w); 
110         }                                    
111         public static class OrderedWriterClass extends OrderedWriter< NumberedExtent > {
112             NumberedExtent last = null;
113             ShreddedWriter shreddedWriter = null; 
114             
115             public OrderedWriterClass(ShreddedWriter s) {
116                 this.shreddedWriter = s;
117             }
118             
119             public void process(NumberedExtent object) throws IOException {
120                boolean processAll = false;
121                if (processAll || last == null || 0 != Utility.compare(object.extentName, last.extentName)) { processAll = true; shreddedWriter.processExtentName(object.extentName); }
122                if (processAll || last == null || 0 != Utility.compare(object.number, last.number)) { processAll = true; shreddedWriter.processNumber(object.number); }
123                if (processAll || last == null || 0 != Utility.compare(object.begin, last.begin)) { processAll = true; shreddedWriter.processBegin(object.begin); }
124                shreddedWriter.processTuple(object.end);
125                last = object;
126             }           
127                  
128             public void close() throws IOException {
129                 shreddedWriter.close();
130             }
131             
132             public Class<NumberedExtent> getInputClass() {
133                 return NumberedExtent.class;
134             }
135         } 
136         public ReaderSource<NumberedExtent> orderedCombiner(Collection<TypeReader<NumberedExtent>> readers, boolean closeOnExit) {
137             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
138             
139             for (TypeReader<NumberedExtent> reader : readers) {
140                 shreddedReaders.add((ShreddedReader)reader);
141             }
142             
143             return new ShreddedCombiner(shreddedReaders, closeOnExit);
144         }                  
145         public NumberedExtent clone(NumberedExtent object) {
146             NumberedExtent result = new NumberedExtent();
147             if (object == null) return result;
148             result.extentName = object.extentName; 
149             result.number = object.number; 
150             result.begin = object.begin; 
151             result.end = object.end; 
152             return result;
153         }                 
154         public Class<NumberedExtent> getOrderedClass() {
155             return NumberedExtent.class;
156         }                           
157         public String[] getOrderSpec() {
158             return new String[] {"+extentName", "+number", "+begin"};
159         }
160 
161         public static String getSpecString() {
162             return "+extentName +number +begin";
163         }
164                            
165         public interface ShreddedProcessor extends Step {
166             public void processExtentName(byte[] extentName) throws IOException;
167             public void processNumber(long number) throws IOException;
168             public void processBegin(int begin) throws IOException;
169             public void processTuple(int end) throws IOException;
170             public void close() throws IOException;
171         }    
172         public interface ShreddedSource extends Step {
173         }                                              
174         
175         public static class ShreddedWriter implements ShreddedProcessor {
176             ArrayOutput output;
177             ShreddedBuffer buffer = new ShreddedBuffer();
178             byte[] lastExtentName;
179             long lastNumber;
180             int lastBegin;
181             boolean lastFlush = false;
182             
183             public ShreddedWriter(ArrayOutput output) {
184                 this.output = output;
185             }                        
186             
187             public void close() throws IOException {
188                 flush();
189             }
190             
191             public void processExtentName(byte[] extentName) {
192                 lastExtentName = extentName;
193                 buffer.processExtentName(extentName);
194             }
195             public void processNumber(long number) {
196                 lastNumber = number;
197                 buffer.processNumber(number);
198             }
199             public void processBegin(int begin) {
200                 lastBegin = begin;
201                 buffer.processBegin(begin);
202             }
203             public final void processTuple(int end) throws IOException {
204                 if (lastFlush) {
205                     if(buffer.extentNames.size() == 0) buffer.processExtentName(lastExtentName);
206                     if(buffer.numbers.size() == 0) buffer.processNumber(lastNumber);
207                     if(buffer.begins.size() == 0) buffer.processBegin(lastBegin);
208                     lastFlush = false;
209                 }
210                 buffer.processTuple(end);
211                 if (buffer.isFull())
212                     flush();
213             }
214             public final void flushTuples(int pauseIndex) throws IOException {
215                 
216                 while (buffer.getReadIndex() < pauseIndex) {
217                            
218                     output.writeInt(buffer.getEnd());
219                     buffer.incrementTuple();
220                 }
221             }  
222             public final void flushExtentName(int pauseIndex) throws IOException {
223                 while (buffer.getReadIndex() < pauseIndex) {
224                     int nextPause = buffer.getExtentNameEndIndex();
225                     int count = nextPause - buffer.getReadIndex();
226                     
227                     output.writeBytes(buffer.getExtentName());
228                     output.writeInt(count);
229                     buffer.incrementExtentName();
230                       
231                     flushNumber(nextPause);
232                     assert nextPause == buffer.getReadIndex();
233                 }
234             }
235             public final void flushNumber(int pauseIndex) throws IOException {
236                 while (buffer.getReadIndex() < pauseIndex) {
237                     int nextPause = buffer.getNumberEndIndex();
238                     int count = nextPause - buffer.getReadIndex();
239                     
240                     output.writeLong(buffer.getNumber());
241                     output.writeInt(count);
242                     buffer.incrementNumber();
243                       
244                     flushBegin(nextPause);
245                     assert nextPause == buffer.getReadIndex();
246                 }
247             }
248             public final void flushBegin(int pauseIndex) throws IOException {
249                 while (buffer.getReadIndex() < pauseIndex) {
250                     int nextPause = buffer.getBeginEndIndex();
251                     int count = nextPause - buffer.getReadIndex();
252                     
253                     output.writeInt(buffer.getBegin());
254                     output.writeInt(count);
255                     buffer.incrementBegin();
256                       
257                     flushTuples(nextPause);
258                     assert nextPause == buffer.getReadIndex();
259                 }
260             }
261             public void flush() throws IOException { 
262                 flushExtentName(buffer.getWriteIndex());
263                 buffer.reset(); 
264                 lastFlush = true;
265             }                           
266         }
267         public static class ShreddedBuffer {
268             ArrayList<byte[]> extentNames = new ArrayList();
269             ArrayList<Long> numbers = new ArrayList();
270             ArrayList<Integer> begins = new ArrayList();
271             ArrayList<Integer> extentNameTupleIdx = new ArrayList();
272             ArrayList<Integer> numberTupleIdx = new ArrayList();
273             ArrayList<Integer> beginTupleIdx = new ArrayList();
274             int extentNameReadIdx = 0;
275             int numberReadIdx = 0;
276             int beginReadIdx = 0;
277                             
278             int[] ends;
279             int writeTupleIndex = 0;
280             int readTupleIndex = 0;
281             int batchSize;
282 
283             public ShreddedBuffer(int batchSize) {
284                 this.batchSize = batchSize;
285 
286                 ends = new int[batchSize];
287             }                              
288 
289             public ShreddedBuffer() {    
290                 this(10000);
291             }                                                                                                                    
292             
293             public void processExtentName(byte[] extentName) {
294                 extentNames.add(extentName);
295                 extentNameTupleIdx.add(writeTupleIndex);
296             }                                      
297             public void processNumber(long number) {
298                 numbers.add(number);
299                 numberTupleIdx.add(writeTupleIndex);
300             }                                      
301             public void processBegin(int begin) {
302                 begins.add(begin);
303                 beginTupleIdx.add(writeTupleIndex);
304             }                                      
305             public void processTuple(int end) {
306                 assert extentNames.size() > 0;
307                 assert numbers.size() > 0;
308                 assert begins.size() > 0;
309                 ends[writeTupleIndex] = end;
310                 writeTupleIndex++;
311             }
312             public void resetData() {
313                 extentNames.clear();
314                 numbers.clear();
315                 begins.clear();
316                 extentNameTupleIdx.clear();
317                 numberTupleIdx.clear();
318                 beginTupleIdx.clear();
319                 writeTupleIndex = 0;
320             }                  
321                                  
322             public void resetRead() {
323                 readTupleIndex = 0;
324                 extentNameReadIdx = 0;
325                 numberReadIdx = 0;
326                 beginReadIdx = 0;
327             } 
328 
329             public void reset() {
330                 resetData();
331                 resetRead();
332             } 
333             public boolean isFull() {
334                 return writeTupleIndex >= batchSize;
335             }
336 
337             public boolean isEmpty() {
338                 return writeTupleIndex == 0;
339             }                          
340 
341             public boolean isAtEnd() {
342                 return readTupleIndex >= writeTupleIndex;
343             }           
344             public void incrementExtentName() {
345                 extentNameReadIdx++;  
346             }                                                                                              
347 
348             public void autoIncrementExtentName() {
349                 while (readTupleIndex >= getExtentNameEndIndex() && readTupleIndex < writeTupleIndex)
350                     extentNameReadIdx++;
351             }                 
352             public void incrementNumber() {
353                 numberReadIdx++;  
354             }                                                                                              
355 
356             public void autoIncrementNumber() {
357                 while (readTupleIndex >= getNumberEndIndex() && readTupleIndex < writeTupleIndex)
358                     numberReadIdx++;
359             }                 
360             public void incrementBegin() {
361                 beginReadIdx++;  
362             }                                                                                              
363 
364             public void autoIncrementBegin() {
365                 while (readTupleIndex >= getBeginEndIndex() && readTupleIndex < writeTupleIndex)
366                     beginReadIdx++;
367             }                 
368             public void incrementTuple() {
369                 readTupleIndex++;
370             }                    
371             public int getExtentNameEndIndex() {
372                 if ((extentNameReadIdx+1) >= extentNameTupleIdx.size())
373                     return writeTupleIndex;
374                 return extentNameTupleIdx.get(extentNameReadIdx+1);
375             }
376 
377             public int getNumberEndIndex() {
378                 if ((numberReadIdx+1) >= numberTupleIdx.size())
379                     return writeTupleIndex;
380                 return numberTupleIdx.get(numberReadIdx+1);
381             }
382 
383             public int getBeginEndIndex() {
384                 if ((beginReadIdx+1) >= beginTupleIdx.size())
385                     return writeTupleIndex;
386                 return beginTupleIdx.get(beginReadIdx+1);
387             }
388             public int getReadIndex() {
389                 return readTupleIndex;
390             }   
391 
392             public int getWriteIndex() {
393                 return writeTupleIndex;
394             } 
395             public byte[] getExtentName() {
396                 assert readTupleIndex < writeTupleIndex;
397                 assert extentNameReadIdx < extentNames.size();
398                 
399                 return extentNames.get(extentNameReadIdx);
400             }
401             public long getNumber() {
402                 assert readTupleIndex < writeTupleIndex;
403                 assert numberReadIdx < numbers.size();
404                 
405                 return numbers.get(numberReadIdx);
406             }
407             public int getBegin() {
408                 assert readTupleIndex < writeTupleIndex;
409                 assert beginReadIdx < begins.size();
410                 
411                 return begins.get(beginReadIdx);
412             }
413             public int getEnd() {
414                 assert readTupleIndex < writeTupleIndex;
415                 return ends[readTupleIndex];
416             }                                         
417             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
418                 while (getReadIndex() < endIndex) {
419                    output.processTuple(getEnd());
420                    incrementTuple();
421                 }
422             }                                                                           
423             public void copyUntilIndexExtentName(int endIndex, ShreddedProcessor output) throws IOException {
424                 while (getReadIndex() < endIndex) {
425                     output.processExtentName(getExtentName());
426                     assert getExtentNameEndIndex() <= endIndex;
427                     copyUntilIndexNumber(getExtentNameEndIndex(), output);
428                     incrementExtentName();
429                 }
430             } 
431             public void copyUntilIndexNumber(int endIndex, ShreddedProcessor output) throws IOException {
432                 while (getReadIndex() < endIndex) {
433                     output.processNumber(getNumber());
434                     assert getNumberEndIndex() <= endIndex;
435                     copyUntilIndexBegin(getNumberEndIndex(), output);
436                     incrementNumber();
437                 }
438             } 
439             public void copyUntilIndexBegin(int endIndex, ShreddedProcessor output) throws IOException {
440                 while (getReadIndex() < endIndex) {
441                     output.processBegin(getBegin());
442                     assert getBeginEndIndex() <= endIndex;
443                     copyTuples(getBeginEndIndex(), output);
444                     incrementBegin();
445                 }
446             }  
447             public void copyUntilExtentName(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
448                 while (!isAtEnd()) {
449                     if (other != null) {   
450                         assert !other.isAtEnd();
451                         int c = + Utility.compare(getExtentName(), other.getExtentName());
452                     
453                         if (c > 0) {
454                             break;   
455                         }
456                         
457                         output.processExtentName(getExtentName());
458                                       
459                         if (c < 0) {
460                             copyUntilIndexNumber(getExtentNameEndIndex(), output);
461                         } else if (c == 0) {
462                             copyUntilNumber(other, output);
463                             autoIncrementExtentName();
464                             break;
465                         }
466                     } else {
467                         output.processExtentName(getExtentName());
468                         copyUntilIndexNumber(getExtentNameEndIndex(), output);
469                     }
470                     incrementExtentName();  
471                     
472                
473                 }
474             }
475             public void copyUntilNumber(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
476                 while (!isAtEnd()) {
477                     if (other != null) {   
478                         assert !other.isAtEnd();
479                         int c = + Utility.compare(getNumber(), other.getNumber());
480                     
481                         if (c > 0) {
482                             break;   
483                         }
484                         
485                         output.processNumber(getNumber());
486                                       
487                         if (c < 0) {
488                             copyUntilIndexBegin(getNumberEndIndex(), output);
489                         } else if (c == 0) {
490                             copyUntilBegin(other, output);
491                             autoIncrementNumber();
492                             break;
493                         }
494                     } else {
495                         output.processNumber(getNumber());
496                         copyUntilIndexBegin(getNumberEndIndex(), output);
497                     }
498                     incrementNumber();  
499                     
500                     if (getExtentNameEndIndex() <= readTupleIndex)
501                         break;   
502                 }
503             }
504             public void copyUntilBegin(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
505                 while (!isAtEnd()) {
506                     if (other != null) {   
507                         assert !other.isAtEnd();
508                         int c = + Utility.compare(getBegin(), other.getBegin());
509                     
510                         if (c > 0) {
511                             break;   
512                         }
513                         
514                         output.processBegin(getBegin());
515                                       
516                         copyTuples(getBeginEndIndex(), output);
517                     } else {
518                         output.processBegin(getBegin());
519                         copyTuples(getBeginEndIndex(), output);
520                     }
521                     incrementBegin();  
522                     
523                     if (getNumberEndIndex() <= readTupleIndex)
524                         break;   
525                 }
526             }
527             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
528                 copyUntilExtentName(other, output);
529             }
530             
531         }                         
532         public static class ShreddedCombiner implements ReaderSource<NumberedExtent>, ShreddedSource {   
533             public ShreddedProcessor processor;
534             Collection<ShreddedReader> readers;       
535             boolean closeOnExit = false;
536             boolean uninitialized = true;
537             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
538             
539             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
540                 this.readers = readers;                                                       
541                 this.closeOnExit = closeOnExit;
542             }
543                                   
544             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
545                 if (processor instanceof ShreddedProcessor) {
546                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
547                 } else if (processor instanceof NumberedExtent.Processor) {
548                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedExtent.Processor) processor));
549                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
550                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedExtent>) processor));
551                 } else {
552                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
553                 }
554             }                                
555             
556             public Class<NumberedExtent> getOutputClass() {
557                 return NumberedExtent.class;
558             }
559             
560             public void initialize() throws IOException {
561                 for (ShreddedReader reader : readers) {
562                     reader.fill();                                        
563                     
564                     if (!reader.getBuffer().isAtEnd())
565                         queue.add(reader);
566                 }   
567 
568                 uninitialized = false;
569             }
570 
571             public void run() throws IOException {
572                 initialize();
573                
574                 while (queue.size() > 0) {
575                     ShreddedReader top = queue.poll();
576                     ShreddedReader next = null;
577                     ShreddedBuffer nextBuffer = null; 
578                     
579                     assert !top.getBuffer().isAtEnd();
580                                                   
581                     if (queue.size() > 0) {
582                         next = queue.peek();
583                         nextBuffer = next.getBuffer();
584                         assert !nextBuffer.isAtEnd();
585                     }
586                     
587                     top.getBuffer().copyUntil(nextBuffer, processor);
588                     if (top.getBuffer().isAtEnd())
589                         top.fill();                 
590                         
591                     if (!top.getBuffer().isAtEnd())
592                         queue.add(top);
593                 }              
594                 
595                 if (closeOnExit)
596                     processor.close();
597             }
598 
599             public NumberedExtent read() throws IOException {
600                 if (uninitialized)
601                     initialize();
602 
603                 NumberedExtent result = null;
604 
605                 while (queue.size() > 0) {
606                     ShreddedReader top = queue.poll();
607                     result = top.read();
608 
609                     if (result != null) {
610                         if (top.getBuffer().isAtEnd())
611                             top.fill();
612 
613                         queue.offer(top);
614                         break;
615                     } 
616                 }
617 
618                 return result;
619             }
620         } 
621         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<NumberedExtent>, ShreddedSource {      
622             public ShreddedProcessor processor;
623             ShreddedBuffer buffer;
624             NumberedExtent last = new NumberedExtent();         
625             long updateExtentNameCount = -1;
626             long updateNumberCount = -1;
627             long updateBeginCount = -1;
628             long tupleCount = 0;
629             long bufferStartCount = 0;  
630             ArrayInput input;
631             
632             public ShreddedReader(ArrayInput input) {
633                 this.input = input; 
634                 this.buffer = new ShreddedBuffer();
635             }                               
636             
637             public ShreddedReader(ArrayInput input, int bufferSize) { 
638                 this.input = input;
639                 this.buffer = new ShreddedBuffer(bufferSize);
640             }
641                  
642             public final int compareTo(ShreddedReader other) {
643                 ShreddedBuffer otherBuffer = other.getBuffer();
644                 
645                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
646                     return 0;                 
647                 } else if (buffer.isAtEnd()) {
648                     return -1;
649                 } else if (otherBuffer.isAtEnd()) {
650                     return 1;
651                 }
652                                    
653                 int result = 0;
654                 do {
655                     result = + Utility.compare(buffer.getExtentName(), otherBuffer.getExtentName());
656                     if(result != 0) break;
657                     result = + Utility.compare(buffer.getNumber(), otherBuffer.getNumber());
658                     if(result != 0) break;
659                     result = + Utility.compare(buffer.getBegin(), otherBuffer.getBegin());
660                     if(result != 0) break;
661                 } while (false);                                             
662                 
663                 return result;
664             }
665             
666             public final ShreddedBuffer getBuffer() {
667                 return buffer;
668             }                
669             
670             public final NumberedExtent read() throws IOException {
671                 if (buffer.isAtEnd()) {
672                     fill();             
673                 
674                     if (buffer.isAtEnd()) {
675                         return null;
676                     }
677                 }
678                       
679                 assert !buffer.isAtEnd();
680                 NumberedExtent result = new NumberedExtent();
681                 
682                 result.extentName = buffer.getExtentName();
683                 result.number = buffer.getNumber();
684                 result.begin = buffer.getBegin();
685                 result.end = buffer.getEnd();
686                 
687                 buffer.incrementTuple();
688                 buffer.autoIncrementExtentName();
689                 buffer.autoIncrementNumber();
690                 buffer.autoIncrementBegin();
691                 
692                 return result;
693             }           
694             
695             public final void fill() throws IOException {
696                 try {   
697                     buffer.reset();
698                     
699                     if (tupleCount != 0) {
700                                                       
701                         if(updateExtentNameCount - tupleCount > 0) {
702                             buffer.extentNames.add(last.extentName);
703                             buffer.extentNameTupleIdx.add((int) (updateExtentNameCount - tupleCount));
704                         }                              
705                         if(updateNumberCount - tupleCount > 0) {
706                             buffer.numbers.add(last.number);
707                             buffer.numberTupleIdx.add((int) (updateNumberCount - tupleCount));
708                         }                              
709                         if(updateBeginCount - tupleCount > 0) {
710                             buffer.begins.add(last.begin);
711                             buffer.beginTupleIdx.add((int) (updateBeginCount - tupleCount));
712                         }
713                         bufferStartCount = tupleCount;
714                     }
715                     
716                     while (!buffer.isFull()) {
717                         updateBegin();
718                         buffer.processTuple(input.readInt());
719                         tupleCount++;
720                     }
721                 } catch(EOFException e) {}
722             }
723 
724             public final void updateExtentName() throws IOException {
725                 if (updateExtentNameCount > tupleCount)
726                     return;
727                      
728                 last.extentName = input.readBytes();
729                 updateExtentNameCount = tupleCount + input.readInt();
730                                       
731                 buffer.processExtentName(last.extentName);
732             }
733             public final void updateNumber() throws IOException {
734                 if (updateNumberCount > tupleCount)
735                     return;
736                      
737                 updateExtentName();
738                 last.number = input.readLong();
739                 updateNumberCount = tupleCount + input.readInt();
740                                       
741                 buffer.processNumber(last.number);
742             }
743             public final void updateBegin() throws IOException {
744                 if (updateBeginCount > tupleCount)
745                     return;
746                      
747                 updateNumber();
748                 last.begin = input.readInt();
749                 updateBeginCount = tupleCount + input.readInt();
750                                       
751                 buffer.processBegin(last.begin);
752             }
753 
754             public void run() throws IOException {
755                 while (true) {
756                     fill();
757                     
758                     if (buffer.isAtEnd())
759                         break;
760                     
761                     buffer.copyUntil(null, processor);
762                 }      
763                 processor.close();
764             }
765             
766             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
767                 if (processor instanceof ShreddedProcessor) {
768                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
769                 } else if (processor instanceof NumberedExtent.Processor) {
770                     this.processor = new DuplicateEliminator(new TupleUnshredder((NumberedExtent.Processor) processor));
771                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
772                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<NumberedExtent>) processor));
773                 } else {
774                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
775                 }
776             }                                
777             
778             public Class<NumberedExtent> getOutputClass() {
779                 return NumberedExtent.class;
780             }                
781         }
782         
783         public static class DuplicateEliminator implements ShreddedProcessor {
784             public ShreddedProcessor processor;
785             NumberedExtent last = new NumberedExtent();
786             boolean extentNameProcess = true;
787             boolean numberProcess = true;
788             boolean beginProcess = true;
789                                            
790             public DuplicateEliminator() {}
791             public DuplicateEliminator(ShreddedProcessor processor) {
792                 this.processor = processor;
793             }
794             
795             public void setShreddedProcessor(ShreddedProcessor processor) {
796                 this.processor = processor;
797             }
798 
799             public void processExtentName(byte[] extentName) throws IOException {  
800                 if (extentNameProcess || Utility.compare(extentName, last.extentName) != 0) {
801                     last.extentName = extentName;
802                     processor.processExtentName(extentName);
803             resetNumber();
804                     extentNameProcess = false;
805                 }
806             }
807             public void processNumber(long number) throws IOException {  
808                 if (numberProcess || Utility.compare(number, last.number) != 0) {
809                     last.number = number;
810                     processor.processNumber(number);
811             resetBegin();
812                     numberProcess = false;
813                 }
814             }
815             public void processBegin(int begin) throws IOException {  
816                 if (beginProcess || Utility.compare(begin, last.begin) != 0) {
817                     last.begin = begin;
818                     processor.processBegin(begin);
819                     beginProcess = false;
820                 }
821             }  
822             
823             public void resetExtentName() {
824                  extentNameProcess = true;
825             resetNumber();
826             }                                                
827             public void resetNumber() {
828                  numberProcess = true;
829             resetBegin();
830             }                                                
831             public void resetBegin() {
832                  beginProcess = true;
833             }                                                
834                                
835             public void processTuple(int end) throws IOException {
836                 processor.processTuple(end);
837             } 
838             
839             public void close() throws IOException {
840                 processor.close();
841             }                    
842         }
843         public static class TupleUnshredder implements ShreddedProcessor {
844             NumberedExtent last = new NumberedExtent();
845             public org.galagosearch.tupleflow.Processor<NumberedExtent> processor;                               
846             
847             public TupleUnshredder(NumberedExtent.Processor processor) {
848                 this.processor = processor;
849             }         
850             
851             public TupleUnshredder(org.galagosearch.tupleflow.Processor<NumberedExtent> processor) {
852                 this.processor = processor;
853             }
854             
855             public NumberedExtent clone(NumberedExtent object) {
856                 NumberedExtent result = new NumberedExtent();
857                 if (object == null) return result;
858                 result.extentName = object.extentName; 
859                 result.number = object.number; 
860                 result.begin = object.begin; 
861                 result.end = object.end; 
862                 return result;
863             }                 
864             
865             public void processExtentName(byte[] extentName) throws IOException {
866                 last.extentName = extentName;
867             }   
868                 
869             public void processNumber(long number) throws IOException {
870                 last.number = number;
871             }   
872                 
873             public void processBegin(int begin) throws IOException {
874                 last.begin = begin;
875             }   
876                 
877             
878             public void processTuple(int end) throws IOException {
879                 last.end = end;
880                 processor.process(clone(last));
881             }               
882             
883             public void close() throws IOException {
884                 processor.close();
885             }
886         }     
887         public static class TupleShredder implements Processor {
888             NumberedExtent last = new NumberedExtent();
889             public ShreddedProcessor processor;
890             
891             public TupleShredder(ShreddedProcessor processor) {
892                 this.processor = processor;
893             }                              
894             
895             public NumberedExtent clone(NumberedExtent object) {
896                 NumberedExtent result = new NumberedExtent();
897                 if (object == null) return result;
898                 result.extentName = object.extentName; 
899                 result.number = object.number; 
900                 result.begin = object.begin; 
901                 result.end = object.end; 
902                 return result;
903             }                 
904             
905             public void process(NumberedExtent object) throws IOException {                                                                                                                                                   
906                 boolean processAll = false;
907                 if(last == null || Utility.compare(last.extentName, object.extentName) != 0 || processAll) { processor.processExtentName(object.extentName); processAll = true; }
908                 if(last == null || Utility.compare(last.number, object.number) != 0 || processAll) { processor.processNumber(object.number); processAll = true; }
909                 if(last == null || Utility.compare(last.begin, object.begin) != 0 || processAll) { processor.processBegin(object.begin); processAll = true; }
910                 processor.processTuple(object.end);                                         
911             }
912                           
913             public Class<NumberedExtent> getInputClass() {
914                 return NumberedExtent.class;
915             }
916             
917             public void close() throws IOException {
918                 processor.close();
919             }                     
920         }
921     } 
922 }