View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class DocumentData implements Type<DocumentData> {
25      public String identifier;
26      public String url;
27      public int textLength; 
28      
29      public DocumentData() {}
30      public DocumentData(String identifier, String url, int textLength) {
31          this.identifier = identifier;
32          this.url = url;
33          this.textLength = textLength;
34      }  
35      
36      public String toString() {
37              return String.format("%s,%s,%d",
38                                     identifier, url, textLength);
39      } 
40  
41      public Order<DocumentData> getOrder(String... spec) {
42          if (Arrays.equals(spec, new String[] {  })) {
43              return new Unordered();
44          }
45          if (Arrays.equals(spec, new String[] { "+url" })) {
46              return new UrlOrder();
47          }
48          if (Arrays.equals(spec, new String[] { "+identifier" })) {
49              return new IdentifierOrder();
50          }
51          return null;
52      } 
53        
54      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentData> {
55          public void process(DocumentData object) throws IOException;
56          public void close() throws IOException;
57      }                        
58      public interface Source extends Step {
59      }
60      public static class Unordered implements Order<DocumentData> {
61          public int hash(DocumentData object) {
62              int h = 0;
63              return h;
64          } 
65          public Comparator<DocumentData> greaterThan() {
66              return new Comparator<DocumentData>() {
67                  public int compare(DocumentData one, DocumentData two) {
68                      int result = 0;
69                      do {
70                      } while (false);
71                      return -result;
72                  }
73              };
74          }     
75          public Comparator<DocumentData> lessThan() {
76              return new Comparator<DocumentData>() {
77                  public int compare(DocumentData one, DocumentData two) {
78                      int result = 0;
79                      do {
80                      } while (false);
81                      return result;
82                  }
83              };
84          }     
85          public TypeReader<DocumentData> orderedReader(ArrayInput _input) {
86              return new ShreddedReader(_input);
87          }    
88  
89          public TypeReader<DocumentData> orderedReader(ArrayInput _input, int bufferSize) {
90              return new ShreddedReader(_input, bufferSize);
91          }    
92          public OrderedWriter<DocumentData> orderedWriter(ArrayOutput _output) {
93              ShreddedWriter w = new ShreddedWriter(_output);
94              return new OrderedWriterClass(w); 
95          }                                    
96          public static class OrderedWriterClass extends OrderedWriter< DocumentData > {
97              DocumentData last = null;
98              ShreddedWriter shreddedWriter = null; 
99              
100             public OrderedWriterClass(ShreddedWriter s) {
101                 this.shreddedWriter = s;
102             }
103             
104             public void process(DocumentData object) throws IOException {
105                boolean processAll = false;
106                shreddedWriter.processTuple(object.identifier, object.url, object.textLength);
107                last = object;
108             }           
109                  
110             public void close() throws IOException {
111                 shreddedWriter.close();
112             }
113             
114             public Class<DocumentData> getInputClass() {
115                 return DocumentData.class;
116             }
117         } 
118         public ReaderSource<DocumentData> orderedCombiner(Collection<TypeReader<DocumentData>> readers, boolean closeOnExit) {
119             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
120             
121             for (TypeReader<DocumentData> reader : readers) {
122                 shreddedReaders.add((ShreddedReader)reader);
123             }
124             
125             return new ShreddedCombiner(shreddedReaders, closeOnExit);
126         }                  
127         public DocumentData clone(DocumentData object) {
128             DocumentData result = new DocumentData();
129             if (object == null) return result;
130             result.identifier = object.identifier; 
131             result.url = object.url; 
132             result.textLength = object.textLength; 
133             return result;
134         }                 
135         public Class<DocumentData> getOrderedClass() {
136             return DocumentData.class;
137         }                           
138         public String[] getOrderSpec() {
139             return new String[] {};
140         }
141 
142         public static String getSpecString() {
143             return "";
144         }
145                            
146         public interface ShreddedProcessor extends Step {
147             public void processTuple(String identifier, String url, int textLength) throws IOException;
148             public void close() throws IOException;
149         }    
150         public interface ShreddedSource extends Step {
151         }                                              
152         
153         public static class ShreddedWriter implements ShreddedProcessor {
154             ArrayOutput output;
155             ShreddedBuffer buffer = new ShreddedBuffer();
156             boolean lastFlush = false;
157             
158             public ShreddedWriter(ArrayOutput output) {
159                 this.output = output;
160             }                        
161             
162             public void close() throws IOException {
163                 flush();
164             }
165             
166             public final void processTuple(String identifier, String url, int textLength) throws IOException {
167                 if (lastFlush) {
168                     lastFlush = false;
169                 }
170                 buffer.processTuple(identifier, url, textLength);
171                 if (buffer.isFull())
172                     flush();
173             }
174             public final void flushTuples(int pauseIndex) throws IOException {
175                 
176                 while (buffer.getReadIndex() < pauseIndex) {
177                            
178                     output.writeString(buffer.getIdentifier());
179                     output.writeString(buffer.getUrl());
180                     output.writeInt(buffer.getTextLength());
181                     buffer.incrementTuple();
182                 }
183             }  
184             public void flush() throws IOException { 
185                 flushTuples(buffer.getWriteIndex());
186                 buffer.reset(); 
187                 lastFlush = true;
188             }                           
189         }
190         public static class ShreddedBuffer {
191                             
192             String[] identifiers;
193             String[] urls;
194             int[] textLengths;
195             int writeTupleIndex = 0;
196             int readTupleIndex = 0;
197             int batchSize;
198 
199             public ShreddedBuffer(int batchSize) {
200                 this.batchSize = batchSize;
201 
202                 identifiers = new String[batchSize];
203                 urls = new String[batchSize];
204                 textLengths = new int[batchSize];
205             }                              
206 
207             public ShreddedBuffer() {    
208                 this(10000);
209             }                                                                                                                    
210             
211             public void processTuple(String identifier, String url, int textLength) {
212                 identifiers[writeTupleIndex] = identifier;
213                 urls[writeTupleIndex] = url;
214                 textLengths[writeTupleIndex] = textLength;
215                 writeTupleIndex++;
216             }
217             public void resetData() {
218                 writeTupleIndex = 0;
219             }                  
220                                  
221             public void resetRead() {
222                 readTupleIndex = 0;
223             } 
224 
225             public void reset() {
226                 resetData();
227                 resetRead();
228             } 
229             public boolean isFull() {
230                 return writeTupleIndex >= batchSize;
231             }
232 
233             public boolean isEmpty() {
234                 return writeTupleIndex == 0;
235             }                          
236 
237             public boolean isAtEnd() {
238                 return readTupleIndex >= writeTupleIndex;
239             }           
240             public void incrementTuple() {
241                 readTupleIndex++;
242             }                    
243             public int getReadIndex() {
244                 return readTupleIndex;
245             }   
246 
247             public int getWriteIndex() {
248                 return writeTupleIndex;
249             } 
250             public String getIdentifier() {
251                 assert readTupleIndex < writeTupleIndex;
252                 return identifiers[readTupleIndex];
253             }                                         
254             public String getUrl() {
255                 assert readTupleIndex < writeTupleIndex;
256                 return urls[readTupleIndex];
257             }                                         
258             public int getTextLength() {
259                 assert readTupleIndex < writeTupleIndex;
260                 return textLengths[readTupleIndex];
261             }                                         
262             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
263                 while (getReadIndex() < endIndex) {
264                    output.processTuple(getIdentifier(), getUrl(), getTextLength());
265                    incrementTuple();
266                 }
267             }                                                                           
268              
269             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
270             }
271             
272         }                         
273         public static class ShreddedCombiner implements ReaderSource<DocumentData>, ShreddedSource {   
274             public ShreddedProcessor processor;
275             Collection<ShreddedReader> readers;       
276             boolean closeOnExit = false;
277             boolean uninitialized = true;
278             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
279             
280             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
281                 this.readers = readers;                                                       
282                 this.closeOnExit = closeOnExit;
283             }
284                                   
285             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
286                 if (processor instanceof ShreddedProcessor) {
287                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
288                 } else if (processor instanceof DocumentData.Processor) {
289                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
290                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
291                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
292                 } else {
293                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
294                 }
295             }                                
296             
297             public Class<DocumentData> getOutputClass() {
298                 return DocumentData.class;
299             }
300             
301             public void initialize() throws IOException {
302                 for (ShreddedReader reader : readers) {
303                     reader.fill();                                        
304                     
305                     if (!reader.getBuffer().isAtEnd())
306                         queue.add(reader);
307                 }   
308 
309                 uninitialized = false;
310             }
311 
312             public void run() throws IOException {
313                 initialize();
314                
315                 while (queue.size() > 0) {
316                     ShreddedReader top = queue.poll();
317                     ShreddedReader next = null;
318                     ShreddedBuffer nextBuffer = null; 
319                     
320                     assert !top.getBuffer().isAtEnd();
321                                                   
322                     if (queue.size() > 0) {
323                         next = queue.peek();
324                         nextBuffer = next.getBuffer();
325                         assert !nextBuffer.isAtEnd();
326                     }
327                     
328                     top.getBuffer().copyUntil(nextBuffer, processor);
329                     if (top.getBuffer().isAtEnd())
330                         top.fill();                 
331                         
332                     if (!top.getBuffer().isAtEnd())
333                         queue.add(top);
334                 }              
335                 
336                 if (closeOnExit)
337                     processor.close();
338             }
339 
340             public DocumentData read() throws IOException {
341                 if (uninitialized)
342                     initialize();
343 
344                 DocumentData result = null;
345 
346                 while (queue.size() > 0) {
347                     ShreddedReader top = queue.poll();
348                     result = top.read();
349 
350                     if (result != null) {
351                         if (top.getBuffer().isAtEnd())
352                             top.fill();
353 
354                         queue.offer(top);
355                         break;
356                     } 
357                 }
358 
359                 return result;
360             }
361         } 
362         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentData>, ShreddedSource {      
363             public ShreddedProcessor processor;
364             ShreddedBuffer buffer;
365             DocumentData last = new DocumentData();         
366             long tupleCount = 0;
367             long bufferStartCount = 0;  
368             ArrayInput input;
369             
370             public ShreddedReader(ArrayInput input) {
371                 this.input = input; 
372                 this.buffer = new ShreddedBuffer();
373             }                               
374             
375             public ShreddedReader(ArrayInput input, int bufferSize) { 
376                 this.input = input;
377                 this.buffer = new ShreddedBuffer(bufferSize);
378             }
379                  
380             public final int compareTo(ShreddedReader other) {
381                 ShreddedBuffer otherBuffer = other.getBuffer();
382                 
383                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
384                     return 0;                 
385                 } else if (buffer.isAtEnd()) {
386                     return -1;
387                 } else if (otherBuffer.isAtEnd()) {
388                     return 1;
389                 }
390                                    
391                 int result = 0;
392                 do {
393                 } while (false);                                             
394                 
395                 return result;
396             }
397             
398             public final ShreddedBuffer getBuffer() {
399                 return buffer;
400             }                
401             
402             public final DocumentData read() throws IOException {
403                 if (buffer.isAtEnd()) {
404                     fill();             
405                 
406                     if (buffer.isAtEnd()) {
407                         return null;
408                     }
409                 }
410                       
411                 assert !buffer.isAtEnd();
412                 DocumentData result = new DocumentData();
413                 
414                 result.identifier = buffer.getIdentifier();
415                 result.url = buffer.getUrl();
416                 result.textLength = buffer.getTextLength();
417                 
418                 buffer.incrementTuple();
419                 
420                 return result;
421             }           
422             
423             public final void fill() throws IOException {
424                 try {   
425                     buffer.reset();
426                     
427                     if (tupleCount != 0) {
428                         bufferStartCount = tupleCount;
429                     }
430                     
431                     while (!buffer.isFull()) {
432                         buffer.processTuple(input.readString(), input.readString(), input.readInt());
433                         tupleCount++;
434                     }
435                 } catch(EOFException e) {}
436             }
437 
438 
439             public void run() throws IOException {
440                 while (true) {
441                     fill();
442                     
443                     if (buffer.isAtEnd())
444                         break;
445                     
446                     buffer.copyUntil(null, processor);
447                 }      
448                 processor.close();
449             }
450             
451             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
452                 if (processor instanceof ShreddedProcessor) {
453                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
454                 } else if (processor instanceof DocumentData.Processor) {
455                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
456                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
457                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
458                 } else {
459                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
460                 }
461             }                                
462             
463             public Class<DocumentData> getOutputClass() {
464                 return DocumentData.class;
465             }                
466         }
467         
468         public static class DuplicateEliminator implements ShreddedProcessor {
469             public ShreddedProcessor processor;
470             DocumentData last = new DocumentData();
471                                            
472             public DuplicateEliminator() {}
473             public DuplicateEliminator(ShreddedProcessor processor) {
474                 this.processor = processor;
475             }
476             
477             public void setShreddedProcessor(ShreddedProcessor processor) {
478                 this.processor = processor;
479             }
480 
481           
482             
483                                
484             public void processTuple(String identifier, String url, int textLength) throws IOException {
485                 processor.processTuple(identifier, url, textLength);
486             } 
487             
488             public void close() throws IOException {
489                 processor.close();
490             }                    
491         }
492         public static class TupleUnshredder implements ShreddedProcessor {
493             DocumentData last = new DocumentData();
494             public org.galagosearch.tupleflow.Processor<DocumentData> processor;                               
495             
496             public TupleUnshredder(DocumentData.Processor processor) {
497                 this.processor = processor;
498             }         
499             
500             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentData> processor) {
501                 this.processor = processor;
502             }
503             
504             public DocumentData clone(DocumentData object) {
505                 DocumentData result = new DocumentData();
506                 if (object == null) return result;
507                 result.identifier = object.identifier; 
508                 result.url = object.url; 
509                 result.textLength = object.textLength; 
510                 return result;
511             }                 
512             
513             
514             public void processTuple(String identifier, String url, int textLength) throws IOException {
515                 last.identifier = identifier;
516                 last.url = url;
517                 last.textLength = textLength;
518                 processor.process(clone(last));
519             }               
520             
521             public void close() throws IOException {
522                 processor.close();
523             }
524         }     
525         public static class TupleShredder implements Processor {
526             DocumentData last = new DocumentData();
527             public ShreddedProcessor processor;
528             
529             public TupleShredder(ShreddedProcessor processor) {
530                 this.processor = processor;
531             }                              
532             
533             public DocumentData clone(DocumentData object) {
534                 DocumentData result = new DocumentData();
535                 if (object == null) return result;
536                 result.identifier = object.identifier; 
537                 result.url = object.url; 
538                 result.textLength = object.textLength; 
539                 return result;
540             }                 
541             
542             public void process(DocumentData object) throws IOException {                                                                                                                                                   
543                 boolean processAll = false;
544                 processor.processTuple(object.identifier, object.url, object.textLength);                                         
545             }
546                           
547             public Class<DocumentData> getInputClass() {
548                 return DocumentData.class;
549             }
550             
551             public void close() throws IOException {
552                 processor.close();
553             }                     
554         }
555     } 
556     public static class UrlOrder implements Order<DocumentData> {
557         public int hash(DocumentData object) {
558             int h = 0;
559             h += Utility.hash(object.url);
560             return h;
561         } 
562         public Comparator<DocumentData> greaterThan() {
563             return new Comparator<DocumentData>() {
564                 public int compare(DocumentData one, DocumentData two) {
565                     int result = 0;
566                     do {
567                         result = + Utility.compare(one.url, two.url);
568                         if(result != 0) break;
569                     } while (false);
570                     return -result;
571                 }
572             };
573         }     
574         public Comparator<DocumentData> lessThan() {
575             return new Comparator<DocumentData>() {
576                 public int compare(DocumentData one, DocumentData two) {
577                     int result = 0;
578                     do {
579                         result = + Utility.compare(one.url, two.url);
580                         if(result != 0) break;
581                     } while (false);
582                     return result;
583                 }
584             };
585         }     
586         public TypeReader<DocumentData> orderedReader(ArrayInput _input) {
587             return new ShreddedReader(_input);
588         }    
589 
590         public TypeReader<DocumentData> orderedReader(ArrayInput _input, int bufferSize) {
591             return new ShreddedReader(_input, bufferSize);
592         }    
593         public OrderedWriter<DocumentData> orderedWriter(ArrayOutput _output) {
594             ShreddedWriter w = new ShreddedWriter(_output);
595             return new OrderedWriterClass(w); 
596         }                                    
597         public static class OrderedWriterClass extends OrderedWriter< DocumentData > {
598             DocumentData last = null;
599             ShreddedWriter shreddedWriter = null; 
600             
601             public OrderedWriterClass(ShreddedWriter s) {
602                 this.shreddedWriter = s;
603             }
604             
605             public void process(DocumentData object) throws IOException {
606                boolean processAll = false;
607                if (processAll || last == null || 0 != Utility.compare(object.url, last.url)) { processAll = true; shreddedWriter.processUrl(object.url); }
608                shreddedWriter.processTuple(object.identifier, object.textLength);
609                last = object;
610             }           
611                  
612             public void close() throws IOException {
613                 shreddedWriter.close();
614             }
615             
616             public Class<DocumentData> getInputClass() {
617                 return DocumentData.class;
618             }
619         } 
620         public ReaderSource<DocumentData> orderedCombiner(Collection<TypeReader<DocumentData>> readers, boolean closeOnExit) {
621             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
622             
623             for (TypeReader<DocumentData> reader : readers) {
624                 shreddedReaders.add((ShreddedReader)reader);
625             }
626             
627             return new ShreddedCombiner(shreddedReaders, closeOnExit);
628         }                  
629         public DocumentData clone(DocumentData object) {
630             DocumentData result = new DocumentData();
631             if (object == null) return result;
632             result.identifier = object.identifier; 
633             result.url = object.url; 
634             result.textLength = object.textLength; 
635             return result;
636         }                 
637         public Class<DocumentData> getOrderedClass() {
638             return DocumentData.class;
639         }                           
640         public String[] getOrderSpec() {
641             return new String[] {"+url"};
642         }
643 
644         public static String getSpecString() {
645             return "+url";
646         }
647                            
648         public interface ShreddedProcessor extends Step {
649             public void processUrl(String url) throws IOException;
650             public void processTuple(String identifier, int textLength) throws IOException;
651             public void close() throws IOException;
652         }    
653         public interface ShreddedSource extends Step {
654         }                                              
655         
656         public static class ShreddedWriter implements ShreddedProcessor {
657             ArrayOutput output;
658             ShreddedBuffer buffer = new ShreddedBuffer();
659             String lastUrl;
660             boolean lastFlush = false;
661             
662             public ShreddedWriter(ArrayOutput output) {
663                 this.output = output;
664             }                        
665             
666             public void close() throws IOException {
667                 flush();
668             }
669             
670             public void processUrl(String url) {
671                 lastUrl = url;
672                 buffer.processUrl(url);
673             }
674             public final void processTuple(String identifier, int textLength) throws IOException {
675                 if (lastFlush) {
676                     if(buffer.urls.size() == 0) buffer.processUrl(lastUrl);
677                     lastFlush = false;
678                 }
679                 buffer.processTuple(identifier, textLength);
680                 if (buffer.isFull())
681                     flush();
682             }
683             public final void flushTuples(int pauseIndex) throws IOException {
684                 
685                 while (buffer.getReadIndex() < pauseIndex) {
686                            
687                     output.writeString(buffer.getIdentifier());
688                     output.writeInt(buffer.getTextLength());
689                     buffer.incrementTuple();
690                 }
691             }  
692             public final void flushUrl(int pauseIndex) throws IOException {
693                 while (buffer.getReadIndex() < pauseIndex) {
694                     int nextPause = buffer.getUrlEndIndex();
695                     int count = nextPause - buffer.getReadIndex();
696                     
697                     output.writeString(buffer.getUrl());
698                     output.writeInt(count);
699                     buffer.incrementUrl();
700                       
701                     flushTuples(nextPause);
702                     assert nextPause == buffer.getReadIndex();
703                 }
704             }
705             public void flush() throws IOException { 
706                 flushUrl(buffer.getWriteIndex());
707                 buffer.reset(); 
708                 lastFlush = true;
709             }                           
710         }
711         public static class ShreddedBuffer {
712             ArrayList<String> urls = new ArrayList();
713             ArrayList<Integer> urlTupleIdx = new ArrayList();
714             int urlReadIdx = 0;
715                             
716             String[] identifiers;
717             int[] textLengths;
718             int writeTupleIndex = 0;
719             int readTupleIndex = 0;
720             int batchSize;
721 
722             public ShreddedBuffer(int batchSize) {
723                 this.batchSize = batchSize;
724 
725                 identifiers = new String[batchSize];
726                 textLengths = new int[batchSize];
727             }                              
728 
729             public ShreddedBuffer() {    
730                 this(10000);
731             }                                                                                                                    
732             
733             public void processUrl(String url) {
734                 urls.add(url);
735                 urlTupleIdx.add(writeTupleIndex);
736             }                                      
737             public void processTuple(String identifier, int textLength) {
738                 assert urls.size() > 0;
739                 identifiers[writeTupleIndex] = identifier;
740                 textLengths[writeTupleIndex] = textLength;
741                 writeTupleIndex++;
742             }
743             public void resetData() {
744                 urls.clear();
745                 urlTupleIdx.clear();
746                 writeTupleIndex = 0;
747             }                  
748                                  
749             public void resetRead() {
750                 readTupleIndex = 0;
751                 urlReadIdx = 0;
752             } 
753 
754             public void reset() {
755                 resetData();
756                 resetRead();
757             } 
758             public boolean isFull() {
759                 return writeTupleIndex >= batchSize;
760             }
761 
762             public boolean isEmpty() {
763                 return writeTupleIndex == 0;
764             }                          
765 
766             public boolean isAtEnd() {
767                 return readTupleIndex >= writeTupleIndex;
768             }           
769             public void incrementUrl() {
770                 urlReadIdx++;  
771             }                                                                                              
772 
773             public void autoIncrementUrl() {
774                 while (readTupleIndex >= getUrlEndIndex() && readTupleIndex < writeTupleIndex)
775                     urlReadIdx++;
776             }                 
777             public void incrementTuple() {
778                 readTupleIndex++;
779             }                    
780             public int getUrlEndIndex() {
781                 if ((urlReadIdx+1) >= urlTupleIdx.size())
782                     return writeTupleIndex;
783                 return urlTupleIdx.get(urlReadIdx+1);
784             }
785             public int getReadIndex() {
786                 return readTupleIndex;
787             }   
788 
789             public int getWriteIndex() {
790                 return writeTupleIndex;
791             } 
792             public String getUrl() {
793                 assert readTupleIndex < writeTupleIndex;
794                 assert urlReadIdx < urls.size();
795                 
796                 return urls.get(urlReadIdx);
797             }
798             public String getIdentifier() {
799                 assert readTupleIndex < writeTupleIndex;
800                 return identifiers[readTupleIndex];
801             }                                         
802             public int getTextLength() {
803                 assert readTupleIndex < writeTupleIndex;
804                 return textLengths[readTupleIndex];
805             }                                         
806             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
807                 while (getReadIndex() < endIndex) {
808                    output.processTuple(getIdentifier(), getTextLength());
809                    incrementTuple();
810                 }
811             }                                                                           
812             public void copyUntilIndexUrl(int endIndex, ShreddedProcessor output) throws IOException {
813                 while (getReadIndex() < endIndex) {
814                     output.processUrl(getUrl());
815                     assert getUrlEndIndex() <= endIndex;
816                     copyTuples(getUrlEndIndex(), output);
817                     incrementUrl();
818                 }
819             }  
820             public void copyUntilUrl(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
821                 while (!isAtEnd()) {
822                     if (other != null) {   
823                         assert !other.isAtEnd();
824                         int c = + Utility.compare(getUrl(), other.getUrl());
825                     
826                         if (c > 0) {
827                             break;   
828                         }
829                         
830                         output.processUrl(getUrl());
831                                       
832                         copyTuples(getUrlEndIndex(), output);
833                     } else {
834                         output.processUrl(getUrl());
835                         copyTuples(getUrlEndIndex(), output);
836                     }
837                     incrementUrl();  
838                     
839                
840                 }
841             }
842             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
843                 copyUntilUrl(other, output);
844             }
845             
846         }                         
847         public static class ShreddedCombiner implements ReaderSource<DocumentData>, ShreddedSource {   
848             public ShreddedProcessor processor;
849             Collection<ShreddedReader> readers;       
850             boolean closeOnExit = false;
851             boolean uninitialized = true;
852             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
853             
854             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
855                 this.readers = readers;                                                       
856                 this.closeOnExit = closeOnExit;
857             }
858                                   
859             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
860                 if (processor instanceof ShreddedProcessor) {
861                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
862                 } else if (processor instanceof DocumentData.Processor) {
863                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
864                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
865                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
866                 } else {
867                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
868                 }
869             }                                
870             
871             public Class<DocumentData> getOutputClass() {
872                 return DocumentData.class;
873             }
874             
875             public void initialize() throws IOException {
876                 for (ShreddedReader reader : readers) {
877                     reader.fill();                                        
878                     
879                     if (!reader.getBuffer().isAtEnd())
880                         queue.add(reader);
881                 }   
882 
883                 uninitialized = false;
884             }
885 
886             public void run() throws IOException {
887                 initialize();
888                
889                 while (queue.size() > 0) {
890                     ShreddedReader top = queue.poll();
891                     ShreddedReader next = null;
892                     ShreddedBuffer nextBuffer = null; 
893                     
894                     assert !top.getBuffer().isAtEnd();
895                                                   
896                     if (queue.size() > 0) {
897                         next = queue.peek();
898                         nextBuffer = next.getBuffer();
899                         assert !nextBuffer.isAtEnd();
900                     }
901                     
902                     top.getBuffer().copyUntil(nextBuffer, processor);
903                     if (top.getBuffer().isAtEnd())
904                         top.fill();                 
905                         
906                     if (!top.getBuffer().isAtEnd())
907                         queue.add(top);
908                 }              
909                 
910                 if (closeOnExit)
911                     processor.close();
912             }
913 
914             public DocumentData read() throws IOException {
915                 if (uninitialized)
916                     initialize();
917 
918                 DocumentData result = null;
919 
920                 while (queue.size() > 0) {
921                     ShreddedReader top = queue.poll();
922                     result = top.read();
923 
924                     if (result != null) {
925                         if (top.getBuffer().isAtEnd())
926                             top.fill();
927 
928                         queue.offer(top);
929                         break;
930                     } 
931                 }
932 
933                 return result;
934             }
935         } 
936         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentData>, ShreddedSource {      
937             public ShreddedProcessor processor;
938             ShreddedBuffer buffer;
939             DocumentData last = new DocumentData();         
940             long updateUrlCount = -1;
941             long tupleCount = 0;
942             long bufferStartCount = 0;  
943             ArrayInput input;
944             
945             public ShreddedReader(ArrayInput input) {
946                 this.input = input; 
947                 this.buffer = new ShreddedBuffer();
948             }                               
949             
950             public ShreddedReader(ArrayInput input, int bufferSize) { 
951                 this.input = input;
952                 this.buffer = new ShreddedBuffer(bufferSize);
953             }
954                  
955             public final int compareTo(ShreddedReader other) {
956                 ShreddedBuffer otherBuffer = other.getBuffer();
957                 
958                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
959                     return 0;                 
960                 } else if (buffer.isAtEnd()) {
961                     return -1;
962                 } else if (otherBuffer.isAtEnd()) {
963                     return 1;
964                 }
965                                    
966                 int result = 0;
967                 do {
968                     result = + Utility.compare(buffer.getUrl(), otherBuffer.getUrl());
969                     if(result != 0) break;
970                 } while (false);                                             
971                 
972                 return result;
973             }
974             
975             public final ShreddedBuffer getBuffer() {
976                 return buffer;
977             }                
978             
979             public final DocumentData read() throws IOException {
980                 if (buffer.isAtEnd()) {
981                     fill();             
982                 
983                     if (buffer.isAtEnd()) {
984                         return null;
985                     }
986                 }
987                       
988                 assert !buffer.isAtEnd();
989                 DocumentData result = new DocumentData();
990                 
991                 result.url = buffer.getUrl();
992                 result.identifier = buffer.getIdentifier();
993                 result.textLength = buffer.getTextLength();
994                 
995                 buffer.incrementTuple();
996                 buffer.autoIncrementUrl();
997                 
998                 return result;
999             }           
1000             
1001             public final void fill() throws IOException {
1002                 try {   
1003                     buffer.reset();
1004                     
1005                     if (tupleCount != 0) {
1006                                                       
1007                         if(updateUrlCount - tupleCount > 0) {
1008                             buffer.urls.add(last.url);
1009                             buffer.urlTupleIdx.add((int) (updateUrlCount - tupleCount));
1010                         }
1011                         bufferStartCount = tupleCount;
1012                     }
1013                     
1014                     while (!buffer.isFull()) {
1015                         updateUrl();
1016                         buffer.processTuple(input.readString(), input.readInt());
1017                         tupleCount++;
1018                     }
1019                 } catch(EOFException e) {}
1020             }
1021 
1022             public final void updateUrl() throws IOException {
1023                 if (updateUrlCount > tupleCount)
1024                     return;
1025                      
1026                 last.url = input.readString();
1027                 updateUrlCount = tupleCount + input.readInt();
1028                                       
1029                 buffer.processUrl(last.url);
1030             }
1031 
1032             public void run() throws IOException {
1033                 while (true) {
1034                     fill();
1035                     
1036                     if (buffer.isAtEnd())
1037                         break;
1038                     
1039                     buffer.copyUntil(null, processor);
1040                 }      
1041                 processor.close();
1042             }
1043             
1044             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1045                 if (processor instanceof ShreddedProcessor) {
1046                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1047                 } else if (processor instanceof DocumentData.Processor) {
1048                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
1049                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1050                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
1051                 } else {
1052                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1053                 }
1054             }                                
1055             
1056             public Class<DocumentData> getOutputClass() {
1057                 return DocumentData.class;
1058             }                
1059         }
1060         
1061         public static class DuplicateEliminator implements ShreddedProcessor {
1062             public ShreddedProcessor processor;
1063             DocumentData last = new DocumentData();
1064             boolean urlProcess = true;
1065                                            
1066             public DuplicateEliminator() {}
1067             public DuplicateEliminator(ShreddedProcessor processor) {
1068                 this.processor = processor;
1069             }
1070             
1071             public void setShreddedProcessor(ShreddedProcessor processor) {
1072                 this.processor = processor;
1073             }
1074 
1075             public void processUrl(String url) throws IOException {  
1076                 if (urlProcess || Utility.compare(url, last.url) != 0) {
1077                     last.url = url;
1078                     processor.processUrl(url);
1079                     urlProcess = false;
1080                 }
1081             }  
1082             
1083             public void resetUrl() {
1084                  urlProcess = true;
1085             }                                                
1086                                
1087             public void processTuple(String identifier, int textLength) throws IOException {
1088                 processor.processTuple(identifier, textLength);
1089             } 
1090             
1091             public void close() throws IOException {
1092                 processor.close();
1093             }                    
1094         }
1095         public static class TupleUnshredder implements ShreddedProcessor {
1096             DocumentData last = new DocumentData();
1097             public org.galagosearch.tupleflow.Processor<DocumentData> processor;                               
1098             
1099             public TupleUnshredder(DocumentData.Processor processor) {
1100                 this.processor = processor;
1101             }         
1102             
1103             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentData> processor) {
1104                 this.processor = processor;
1105             }
1106             
1107             public DocumentData clone(DocumentData object) {
1108                 DocumentData result = new DocumentData();
1109                 if (object == null) return result;
1110                 result.identifier = object.identifier; 
1111                 result.url = object.url; 
1112                 result.textLength = object.textLength; 
1113                 return result;
1114             }                 
1115             
1116             public void processUrl(String url) throws IOException {
1117                 last.url = url;
1118             }   
1119                 
1120             
1121             public void processTuple(String identifier, int textLength) throws IOException {
1122                 last.identifier = identifier;
1123                 last.textLength = textLength;
1124                 processor.process(clone(last));
1125             }               
1126             
1127             public void close() throws IOException {
1128                 processor.close();
1129             }
1130         }     
1131         public static class TupleShredder implements Processor {
1132             DocumentData last = new DocumentData();
1133             public ShreddedProcessor processor;
1134             
1135             public TupleShredder(ShreddedProcessor processor) {
1136                 this.processor = processor;
1137             }                              
1138             
1139             public DocumentData clone(DocumentData object) {
1140                 DocumentData result = new DocumentData();
1141                 if (object == null) return result;
1142                 result.identifier = object.identifier; 
1143                 result.url = object.url; 
1144                 result.textLength = object.textLength; 
1145                 return result;
1146             }                 
1147             
1148             public void process(DocumentData object) throws IOException {                                                                                                                                                   
1149                 boolean processAll = false;
1150                 if(last == null || Utility.compare(last.url, object.url) != 0 || processAll) { processor.processUrl(object.url); processAll = true; }
1151                 processor.processTuple(object.identifier, object.textLength);                                         
1152             }
1153                           
1154             public Class<DocumentData> getInputClass() {
1155                 return DocumentData.class;
1156             }
1157             
1158             public void close() throws IOException {
1159                 processor.close();
1160             }                     
1161         }
1162     } 
1163     public static class IdentifierOrder implements Order<DocumentData> {
1164         public int hash(DocumentData object) {
1165             int h = 0;
1166             h += Utility.hash(object.identifier);
1167             return h;
1168         } 
1169         public Comparator<DocumentData> greaterThan() {
1170             return new Comparator<DocumentData>() {
1171                 public int compare(DocumentData one, DocumentData two) {
1172                     int result = 0;
1173                     do {
1174                         result = + Utility.compare(one.identifier, two.identifier);
1175                         if(result != 0) break;
1176                     } while (false);
1177                     return -result;
1178                 }
1179             };
1180         }     
1181         public Comparator<DocumentData> lessThan() {
1182             return new Comparator<DocumentData>() {
1183                 public int compare(DocumentData one, DocumentData two) {
1184                     int result = 0;
1185                     do {
1186                         result = + Utility.compare(one.identifier, two.identifier);
1187                         if(result != 0) break;
1188                     } while (false);
1189                     return result;
1190                 }
1191             };
1192         }     
1193         public TypeReader<DocumentData> orderedReader(ArrayInput _input) {
1194             return new ShreddedReader(_input);
1195         }    
1196 
1197         public TypeReader<DocumentData> orderedReader(ArrayInput _input, int bufferSize) {
1198             return new ShreddedReader(_input, bufferSize);
1199         }    
1200         public OrderedWriter<DocumentData> orderedWriter(ArrayOutput _output) {
1201             ShreddedWriter w = new ShreddedWriter(_output);
1202             return new OrderedWriterClass(w); 
1203         }                                    
1204         public static class OrderedWriterClass extends OrderedWriter< DocumentData > {
1205             DocumentData last = null;
1206             ShreddedWriter shreddedWriter = null; 
1207             
1208             public OrderedWriterClass(ShreddedWriter s) {
1209                 this.shreddedWriter = s;
1210             }
1211             
1212             public void process(DocumentData object) throws IOException {
1213                boolean processAll = false;
1214                if (processAll || last == null || 0 != Utility.compare(object.identifier, last.identifier)) { processAll = true; shreddedWriter.processIdentifier(object.identifier); }
1215                shreddedWriter.processTuple(object.url, object.textLength);
1216                last = object;
1217             }           
1218                  
1219             public void close() throws IOException {
1220                 shreddedWriter.close();
1221             }
1222             
1223             public Class<DocumentData> getInputClass() {
1224                 return DocumentData.class;
1225             }
1226         } 
1227         public ReaderSource<DocumentData> orderedCombiner(Collection<TypeReader<DocumentData>> readers, boolean closeOnExit) {
1228             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
1229             
1230             for (TypeReader<DocumentData> reader : readers) {
1231                 shreddedReaders.add((ShreddedReader)reader);
1232             }
1233             
1234             return new ShreddedCombiner(shreddedReaders, closeOnExit);
1235         }                  
1236         public DocumentData clone(DocumentData object) {
1237             DocumentData result = new DocumentData();
1238             if (object == null) return result;
1239             result.identifier = object.identifier; 
1240             result.url = object.url; 
1241             result.textLength = object.textLength; 
1242             return result;
1243         }                 
1244         public Class<DocumentData> getOrderedClass() {
1245             return DocumentData.class;
1246         }                           
1247         public String[] getOrderSpec() {
1248             return new String[] {"+identifier"};
1249         }
1250 
1251         public static String getSpecString() {
1252             return "+identifier";
1253         }
1254                            
1255         public interface ShreddedProcessor extends Step {
1256             public void processIdentifier(String identifier) throws IOException;
1257             public void processTuple(String url, int textLength) throws IOException;
1258             public void close() throws IOException;
1259         }    
1260         public interface ShreddedSource extends Step {
1261         }                                              
1262         
1263         public static class ShreddedWriter implements ShreddedProcessor {
1264             ArrayOutput output;
1265             ShreddedBuffer buffer = new ShreddedBuffer();
1266             String lastIdentifier;
1267             boolean lastFlush = false;
1268             
1269             public ShreddedWriter(ArrayOutput output) {
1270                 this.output = output;
1271             }                        
1272             
1273             public void close() throws IOException {
1274                 flush();
1275             }
1276             
1277             public void processIdentifier(String identifier) {
1278                 lastIdentifier = identifier;
1279                 buffer.processIdentifier(identifier);
1280             }
1281             public final void processTuple(String url, int textLength) throws IOException {
1282                 if (lastFlush) {
1283                     if(buffer.identifiers.size() == 0) buffer.processIdentifier(lastIdentifier);
1284                     lastFlush = false;
1285                 }
1286                 buffer.processTuple(url, textLength);
1287                 if (buffer.isFull())
1288                     flush();
1289             }
1290             public final void flushTuples(int pauseIndex) throws IOException {
1291                 
1292                 while (buffer.getReadIndex() < pauseIndex) {
1293                            
1294                     output.writeString(buffer.getUrl());
1295                     output.writeInt(buffer.getTextLength());
1296                     buffer.incrementTuple();
1297                 }
1298             }  
1299             public final void flushIdentifier(int pauseIndex) throws IOException {
1300                 while (buffer.getReadIndex() < pauseIndex) {
1301                     int nextPause = buffer.getIdentifierEndIndex();
1302                     int count = nextPause - buffer.getReadIndex();
1303                     
1304                     output.writeString(buffer.getIdentifier());
1305                     output.writeInt(count);
1306                     buffer.incrementIdentifier();
1307                       
1308                     flushTuples(nextPause);
1309                     assert nextPause == buffer.getReadIndex();
1310                 }
1311             }
1312             public void flush() throws IOException { 
1313                 flushIdentifier(buffer.getWriteIndex());
1314                 buffer.reset(); 
1315                 lastFlush = true;
1316             }                           
1317         }
1318         public static class ShreddedBuffer {
1319             ArrayList<String> identifiers = new ArrayList();
1320             ArrayList<Integer> identifierTupleIdx = new ArrayList();
1321             int identifierReadIdx = 0;
1322                             
1323             String[] urls;
1324             int[] textLengths;
1325             int writeTupleIndex = 0;
1326             int readTupleIndex = 0;
1327             int batchSize;
1328 
1329             public ShreddedBuffer(int batchSize) {
1330                 this.batchSize = batchSize;
1331 
1332                 urls = new String[batchSize];
1333                 textLengths = new int[batchSize];
1334             }                              
1335 
1336             public ShreddedBuffer() {    
1337                 this(10000);
1338             }                                                                                                                    
1339             
1340             public void processIdentifier(String identifier) {
1341                 identifiers.add(identifier);
1342                 identifierTupleIdx.add(writeTupleIndex);
1343             }                                      
1344             public void processTuple(String url, int textLength) {
1345                 assert identifiers.size() > 0;
1346                 urls[writeTupleIndex] = url;
1347                 textLengths[writeTupleIndex] = textLength;
1348                 writeTupleIndex++;
1349             }
1350             public void resetData() {
1351                 identifiers.clear();
1352                 identifierTupleIdx.clear();
1353                 writeTupleIndex = 0;
1354             }                  
1355                                  
1356             public void resetRead() {
1357                 readTupleIndex = 0;
1358                 identifierReadIdx = 0;
1359             } 
1360 
1361             public void reset() {
1362                 resetData();
1363                 resetRead();
1364             } 
1365             public boolean isFull() {
1366                 return writeTupleIndex >= batchSize;
1367             }
1368 
1369             public boolean isEmpty() {
1370                 return writeTupleIndex == 0;
1371             }                          
1372 
1373             public boolean isAtEnd() {
1374                 return readTupleIndex >= writeTupleIndex;
1375             }           
1376             public void incrementIdentifier() {
1377                 identifierReadIdx++;  
1378             }                                                                                              
1379 
1380             public void autoIncrementIdentifier() {
1381                 while (readTupleIndex >= getIdentifierEndIndex() && readTupleIndex < writeTupleIndex)
1382                     identifierReadIdx++;
1383             }                 
1384             public void incrementTuple() {
1385                 readTupleIndex++;
1386             }                    
1387             public int getIdentifierEndIndex() {
1388                 if ((identifierReadIdx+1) >= identifierTupleIdx.size())
1389                     return writeTupleIndex;
1390                 return identifierTupleIdx.get(identifierReadIdx+1);
1391             }
1392             public int getReadIndex() {
1393                 return readTupleIndex;
1394             }   
1395 
1396             public int getWriteIndex() {
1397                 return writeTupleIndex;
1398             } 
1399             public String getIdentifier() {
1400                 assert readTupleIndex < writeTupleIndex;
1401                 assert identifierReadIdx < identifiers.size();
1402                 
1403                 return identifiers.get(identifierReadIdx);
1404             }
1405             public String getUrl() {
1406                 assert readTupleIndex < writeTupleIndex;
1407                 return urls[readTupleIndex];
1408             }                                         
1409             public int getTextLength() {
1410                 assert readTupleIndex < writeTupleIndex;
1411                 return textLengths[readTupleIndex];
1412             }                                         
1413             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1414                 while (getReadIndex() < endIndex) {
1415                    output.processTuple(getUrl(), getTextLength());
1416                    incrementTuple();
1417                 }
1418             }                                                                           
1419             public void copyUntilIndexIdentifier(int endIndex, ShreddedProcessor output) throws IOException {
1420                 while (getReadIndex() < endIndex) {
1421                     output.processIdentifier(getIdentifier());
1422                     assert getIdentifierEndIndex() <= endIndex;
1423                     copyTuples(getIdentifierEndIndex(), output);
1424                     incrementIdentifier();
1425                 }
1426             }  
1427             public void copyUntilIdentifier(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1428                 while (!isAtEnd()) {
1429                     if (other != null) {   
1430                         assert !other.isAtEnd();
1431                         int c = + Utility.compare(getIdentifier(), other.getIdentifier());
1432                     
1433                         if (c > 0) {
1434                             break;   
1435                         }
1436                         
1437                         output.processIdentifier(getIdentifier());
1438                                       
1439                         copyTuples(getIdentifierEndIndex(), output);
1440                     } else {
1441                         output.processIdentifier(getIdentifier());
1442                         copyTuples(getIdentifierEndIndex(), output);
1443                     }
1444                     incrementIdentifier();  
1445                     
1446                
1447                 }
1448             }
1449             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1450                 copyUntilIdentifier(other, output);
1451             }
1452             
1453         }                         
1454         public static class ShreddedCombiner implements ReaderSource<DocumentData>, ShreddedSource {   
1455             public ShreddedProcessor processor;
1456             Collection<ShreddedReader> readers;       
1457             boolean closeOnExit = false;
1458             boolean uninitialized = true;
1459             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1460             
1461             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1462                 this.readers = readers;                                                       
1463                 this.closeOnExit = closeOnExit;
1464             }
1465                                   
1466             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1467                 if (processor instanceof ShreddedProcessor) {
1468                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1469                 } else if (processor instanceof DocumentData.Processor) {
1470                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
1471                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1472                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
1473                 } else {
1474                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1475                 }
1476             }                                
1477             
1478             public Class<DocumentData> getOutputClass() {
1479                 return DocumentData.class;
1480             }
1481             
1482             public void initialize() throws IOException {
1483                 for (ShreddedReader reader : readers) {
1484                     reader.fill();                                        
1485                     
1486                     if (!reader.getBuffer().isAtEnd())
1487                         queue.add(reader);
1488                 }   
1489 
1490                 uninitialized = false;
1491             }
1492 
1493             public void run() throws IOException {
1494                 initialize();
1495                
1496                 while (queue.size() > 0) {
1497                     ShreddedReader top = queue.poll();
1498                     ShreddedReader next = null;
1499                     ShreddedBuffer nextBuffer = null; 
1500                     
1501                     assert !top.getBuffer().isAtEnd();
1502                                                   
1503                     if (queue.size() > 0) {
1504                         next = queue.peek();
1505                         nextBuffer = next.getBuffer();
1506                         assert !nextBuffer.isAtEnd();
1507                     }
1508                     
1509                     top.getBuffer().copyUntil(nextBuffer, processor);
1510                     if (top.getBuffer().isAtEnd())
1511                         top.fill();                 
1512                         
1513                     if (!top.getBuffer().isAtEnd())
1514                         queue.add(top);
1515                 }              
1516                 
1517                 if (closeOnExit)
1518                     processor.close();
1519             }
1520 
1521             public DocumentData read() throws IOException {
1522                 if (uninitialized)
1523                     initialize();
1524 
1525                 DocumentData result = null;
1526 
1527                 while (queue.size() > 0) {
1528                     ShreddedReader top = queue.poll();
1529                     result = top.read();
1530 
1531                     if (result != null) {
1532                         if (top.getBuffer().isAtEnd())
1533                             top.fill();
1534 
1535                         queue.offer(top);
1536                         break;
1537                     } 
1538                 }
1539 
1540                 return result;
1541             }
1542         } 
1543         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentData>, ShreddedSource {      
1544             public ShreddedProcessor processor;
1545             ShreddedBuffer buffer;
1546             DocumentData last = new DocumentData();         
1547             long updateIdentifierCount = -1;
1548             long tupleCount = 0;
1549             long bufferStartCount = 0;  
1550             ArrayInput input;
1551             
1552             public ShreddedReader(ArrayInput input) {
1553                 this.input = input; 
1554                 this.buffer = new ShreddedBuffer();
1555             }                               
1556             
1557             public ShreddedReader(ArrayInput input, int bufferSize) { 
1558                 this.input = input;
1559                 this.buffer = new ShreddedBuffer(bufferSize);
1560             }
1561                  
1562             public final int compareTo(ShreddedReader other) {
1563                 ShreddedBuffer otherBuffer = other.getBuffer();
1564                 
1565                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1566                     return 0;                 
1567                 } else if (buffer.isAtEnd()) {
1568                     return -1;
1569                 } else if (otherBuffer.isAtEnd()) {
1570                     return 1;
1571                 }
1572                                    
1573                 int result = 0;
1574                 do {
1575                     result = + Utility.compare(buffer.getIdentifier(), otherBuffer.getIdentifier());
1576                     if(result != 0) break;
1577                 } while (false);                                             
1578                 
1579                 return result;
1580             }
1581             
1582             public final ShreddedBuffer getBuffer() {
1583                 return buffer;
1584             }                
1585             
1586             public final DocumentData read() throws IOException {
1587                 if (buffer.isAtEnd()) {
1588                     fill();             
1589                 
1590                     if (buffer.isAtEnd()) {
1591                         return null;
1592                     }
1593                 }
1594                       
1595                 assert !buffer.isAtEnd();
1596                 DocumentData result = new DocumentData();
1597                 
1598                 result.identifier = buffer.getIdentifier();
1599                 result.url = buffer.getUrl();
1600                 result.textLength = buffer.getTextLength();
1601                 
1602                 buffer.incrementTuple();
1603                 buffer.autoIncrementIdentifier();
1604                 
1605                 return result;
1606             }           
1607             
1608             public final void fill() throws IOException {
1609                 try {   
1610                     buffer.reset();
1611                     
1612                     if (tupleCount != 0) {
1613                                                       
1614                         if(updateIdentifierCount - tupleCount > 0) {
1615                             buffer.identifiers.add(last.identifier);
1616                             buffer.identifierTupleIdx.add((int) (updateIdentifierCount - tupleCount));
1617                         }
1618                         bufferStartCount = tupleCount;
1619                     }
1620                     
1621                     while (!buffer.isFull()) {
1622                         updateIdentifier();
1623                         buffer.processTuple(input.readString(), input.readInt());
1624                         tupleCount++;
1625                     }
1626                 } catch(EOFException e) {}
1627             }
1628 
1629             public final void updateIdentifier() throws IOException {
1630                 if (updateIdentifierCount > tupleCount)
1631                     return;
1632                      
1633                 last.identifier = input.readString();
1634                 updateIdentifierCount = tupleCount + input.readInt();
1635                                       
1636                 buffer.processIdentifier(last.identifier);
1637             }
1638 
1639             public void run() throws IOException {
1640                 while (true) {
1641                     fill();
1642                     
1643                     if (buffer.isAtEnd())
1644                         break;
1645                     
1646                     buffer.copyUntil(null, processor);
1647                 }      
1648                 processor.close();
1649             }
1650             
1651             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1652                 if (processor instanceof ShreddedProcessor) {
1653                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1654                 } else if (processor instanceof DocumentData.Processor) {
1655                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentData.Processor) processor));
1656                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1657                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentData>) processor));
1658                 } else {
1659                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1660                 }
1661             }                                
1662             
1663             public Class<DocumentData> getOutputClass() {
1664                 return DocumentData.class;
1665             }                
1666         }
1667         
1668         public static class DuplicateEliminator implements ShreddedProcessor {
1669             public ShreddedProcessor processor;
1670             DocumentData last = new DocumentData();
1671             boolean identifierProcess = true;
1672                                            
1673             public DuplicateEliminator() {}
1674             public DuplicateEliminator(ShreddedProcessor processor) {
1675                 this.processor = processor;
1676             }
1677             
1678             public void setShreddedProcessor(ShreddedProcessor processor) {
1679                 this.processor = processor;
1680             }
1681 
1682             public void processIdentifier(String identifier) throws IOException {  
1683                 if (identifierProcess || Utility.compare(identifier, last.identifier) != 0) {
1684                     last.identifier = identifier;
1685                     processor.processIdentifier(identifier);
1686                     identifierProcess = false;
1687                 }
1688             }  
1689             
1690             public void resetIdentifier() {
1691                  identifierProcess = true;
1692             }                                                
1693                                
1694             public void processTuple(String url, int textLength) throws IOException {
1695                 processor.processTuple(url, textLength);
1696             } 
1697             
1698             public void close() throws IOException {
1699                 processor.close();
1700             }                    
1701         }
1702         public static class TupleUnshredder implements ShreddedProcessor {
1703             DocumentData last = new DocumentData();
1704             public org.galagosearch.tupleflow.Processor<DocumentData> processor;                               
1705             
1706             public TupleUnshredder(DocumentData.Processor processor) {
1707                 this.processor = processor;
1708             }         
1709             
1710             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentData> processor) {
1711                 this.processor = processor;
1712             }
1713             
1714             public DocumentData clone(DocumentData object) {
1715                 DocumentData result = new DocumentData();
1716                 if (object == null) return result;
1717                 result.identifier = object.identifier; 
1718                 result.url = object.url; 
1719                 result.textLength = object.textLength; 
1720                 return result;
1721             }                 
1722             
1723             public void processIdentifier(String identifier) throws IOException {
1724                 last.identifier = identifier;
1725             }   
1726                 
1727             
1728             public void processTuple(String url, int textLength) throws IOException {
1729                 last.url = url;
1730                 last.textLength = textLength;
1731                 processor.process(clone(last));
1732             }               
1733             
1734             public void close() throws IOException {
1735                 processor.close();
1736             }
1737         }     
1738         public static class TupleShredder implements Processor {
1739             DocumentData last = new DocumentData();
1740             public ShreddedProcessor processor;
1741             
1742             public TupleShredder(ShreddedProcessor processor) {
1743                 this.processor = processor;
1744             }                              
1745             
1746             public DocumentData clone(DocumentData object) {
1747                 DocumentData result = new DocumentData();
1748                 if (object == null) return result;
1749                 result.identifier = object.identifier; 
1750                 result.url = object.url; 
1751                 result.textLength = object.textLength; 
1752                 return result;
1753             }                 
1754             
1755             public void process(DocumentData object) throws IOException {                                                                                                                                                   
1756                 boolean processAll = false;
1757                 if(last == null || Utility.compare(last.identifier, object.identifier) != 0 || processAll) { processor.processIdentifier(object.identifier); processAll = true; }
1758                 processor.processTuple(object.url, object.textLength);                                         
1759             }
1760                           
1761             public Class<DocumentData> getInputClass() {
1762                 return DocumentData.class;
1763             }
1764             
1765             public void close() throws IOException {
1766                 processor.close();
1767             }                     
1768         }
1769     } 
1770 }