View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class AdditionalDocumentText implements Type<AdditionalDocumentText> {
25      public String identifier;
26      public String text; 
27      
28      public AdditionalDocumentText() {}
29      public AdditionalDocumentText(String identifier, String text) {
30          this.identifier = identifier;
31          this.text = text;
32      }  
33      
34      public String toString() {
35              return String.format("%s,%s",
36                                     identifier, text);
37      } 
38  
39      public Order<AdditionalDocumentText> getOrder(String... spec) {
40          if (Arrays.equals(spec, new String[] {  })) {
41              return new Unordered();
42          }
43          if (Arrays.equals(spec, new String[] { "+identifier" })) {
44              return new IdentifierOrder();
45          }
46          return null;
47      } 
48        
49      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<AdditionalDocumentText> {
50          public void process(AdditionalDocumentText object) throws IOException;
51          public void close() throws IOException;
52      }                        
53      public interface Source extends Step {
54      }
55      public static class Unordered implements Order<AdditionalDocumentText> {
56          public int hash(AdditionalDocumentText object) {
57              int h = 0;
58              return h;
59          } 
60          public Comparator<AdditionalDocumentText> greaterThan() {
61              return new Comparator<AdditionalDocumentText>() {
62                  public int compare(AdditionalDocumentText one, AdditionalDocumentText two) {
63                      int result = 0;
64                      do {
65                      } while (false);
66                      return -result;
67                  }
68              };
69          }     
70          public Comparator<AdditionalDocumentText> lessThan() {
71              return new Comparator<AdditionalDocumentText>() {
72                  public int compare(AdditionalDocumentText one, AdditionalDocumentText two) {
73                      int result = 0;
74                      do {
75                      } while (false);
76                      return result;
77                  }
78              };
79          }     
80          public TypeReader<AdditionalDocumentText> orderedReader(ArrayInput _input) {
81              return new ShreddedReader(_input);
82          }    
83  
84          public TypeReader<AdditionalDocumentText> orderedReader(ArrayInput _input, int bufferSize) {
85              return new ShreddedReader(_input, bufferSize);
86          }    
87          public OrderedWriter<AdditionalDocumentText> orderedWriter(ArrayOutput _output) {
88              ShreddedWriter w = new ShreddedWriter(_output);
89              return new OrderedWriterClass(w); 
90          }                                    
91          public static class OrderedWriterClass extends OrderedWriter< AdditionalDocumentText > {
92              AdditionalDocumentText last = null;
93              ShreddedWriter shreddedWriter = null; 
94              
95              public OrderedWriterClass(ShreddedWriter s) {
96                  this.shreddedWriter = s;
97              }
98              
99              public void process(AdditionalDocumentText object) throws IOException {
100                boolean processAll = false;
101                shreddedWriter.processTuple(object.identifier, object.text);
102                last = object;
103             }           
104                  
105             public void close() throws IOException {
106                 shreddedWriter.close();
107             }
108             
109             public Class<AdditionalDocumentText> getInputClass() {
110                 return AdditionalDocumentText.class;
111             }
112         } 
113         public ReaderSource<AdditionalDocumentText> orderedCombiner(Collection<TypeReader<AdditionalDocumentText>> readers, boolean closeOnExit) {
114             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
115             
116             for (TypeReader<AdditionalDocumentText> reader : readers) {
117                 shreddedReaders.add((ShreddedReader)reader);
118             }
119             
120             return new ShreddedCombiner(shreddedReaders, closeOnExit);
121         }                  
122         public AdditionalDocumentText clone(AdditionalDocumentText object) {
123             AdditionalDocumentText result = new AdditionalDocumentText();
124             if (object == null) return result;
125             result.identifier = object.identifier; 
126             result.text = object.text; 
127             return result;
128         }                 
129         public Class<AdditionalDocumentText> getOrderedClass() {
130             return AdditionalDocumentText.class;
131         }                           
132         public String[] getOrderSpec() {
133             return new String[] {};
134         }
135 
136         public static String getSpecString() {
137             return "";
138         }
139                            
140         public interface ShreddedProcessor extends Step {
141             public void processTuple(String identifier, String text) throws IOException;
142             public void close() throws IOException;
143         }    
144         public interface ShreddedSource extends Step {
145         }                                              
146         
147         public static class ShreddedWriter implements ShreddedProcessor {
148             ArrayOutput output;
149             ShreddedBuffer buffer = new ShreddedBuffer();
150             boolean lastFlush = false;
151             
152             public ShreddedWriter(ArrayOutput output) {
153                 this.output = output;
154             }                        
155             
156             public void close() throws IOException {
157                 flush();
158             }
159             
160             public final void processTuple(String identifier, String text) throws IOException {
161                 if (lastFlush) {
162                     lastFlush = false;
163                 }
164                 buffer.processTuple(identifier, text);
165                 if (buffer.isFull())
166                     flush();
167             }
168             public final void flushTuples(int pauseIndex) throws IOException {
169                 
170                 while (buffer.getReadIndex() < pauseIndex) {
171                            
172                     output.writeString(buffer.getIdentifier());
173                     output.writeString(buffer.getText());
174                     buffer.incrementTuple();
175                 }
176             }  
177             public void flush() throws IOException { 
178                 flushTuples(buffer.getWriteIndex());
179                 buffer.reset(); 
180                 lastFlush = true;
181             }                           
182         }
183         public static class ShreddedBuffer {
184                             
185             String[] identifiers;
186             String[] texts;
187             int writeTupleIndex = 0;
188             int readTupleIndex = 0;
189             int batchSize;
190 
191             public ShreddedBuffer(int batchSize) {
192                 this.batchSize = batchSize;
193 
194                 identifiers = new String[batchSize];
195                 texts = new String[batchSize];
196             }                              
197 
198             public ShreddedBuffer() {    
199                 this(10000);
200             }                                                                                                                    
201             
202             public void processTuple(String identifier, String text) {
203                 identifiers[writeTupleIndex] = identifier;
204                 texts[writeTupleIndex] = text;
205                 writeTupleIndex++;
206             }
207             public void resetData() {
208                 writeTupleIndex = 0;
209             }                  
210                                  
211             public void resetRead() {
212                 readTupleIndex = 0;
213             } 
214 
215             public void reset() {
216                 resetData();
217                 resetRead();
218             } 
219             public boolean isFull() {
220                 return writeTupleIndex >= batchSize;
221             }
222 
223             public boolean isEmpty() {
224                 return writeTupleIndex == 0;
225             }                          
226 
227             public boolean isAtEnd() {
228                 return readTupleIndex >= writeTupleIndex;
229             }           
230             public void incrementTuple() {
231                 readTupleIndex++;
232             }                    
233             public int getReadIndex() {
234                 return readTupleIndex;
235             }   
236 
237             public int getWriteIndex() {
238                 return writeTupleIndex;
239             } 
240             public String getIdentifier() {
241                 assert readTupleIndex < writeTupleIndex;
242                 return identifiers[readTupleIndex];
243             }                                         
244             public String getText() {
245                 assert readTupleIndex < writeTupleIndex;
246                 return texts[readTupleIndex];
247             }                                         
248             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
249                 while (getReadIndex() < endIndex) {
250                    output.processTuple(getIdentifier(), getText());
251                    incrementTuple();
252                 }
253             }                                                                           
254              
255             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
256             }
257             
258         }                         
259         public static class ShreddedCombiner implements ReaderSource<AdditionalDocumentText>, ShreddedSource {   
260             public ShreddedProcessor processor;
261             Collection<ShreddedReader> readers;       
262             boolean closeOnExit = false;
263             boolean uninitialized = true;
264             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
265             
266             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
267                 this.readers = readers;                                                       
268                 this.closeOnExit = closeOnExit;
269             }
270                                   
271             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
272                 if (processor instanceof ShreddedProcessor) {
273                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
274                 } else if (processor instanceof AdditionalDocumentText.Processor) {
275                     this.processor = new DuplicateEliminator(new TupleUnshredder((AdditionalDocumentText.Processor) processor));
276                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
277                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<AdditionalDocumentText>) processor));
278                 } else {
279                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
280                 }
281             }                                
282             
283             public Class<AdditionalDocumentText> getOutputClass() {
284                 return AdditionalDocumentText.class;
285             }
286             
287             public void initialize() throws IOException {
288                 for (ShreddedReader reader : readers) {
289                     reader.fill();                                        
290                     
291                     if (!reader.getBuffer().isAtEnd())
292                         queue.add(reader);
293                 }   
294 
295                 uninitialized = false;
296             }
297 
298             public void run() throws IOException {
299                 initialize();
300                
301                 while (queue.size() > 0) {
302                     ShreddedReader top = queue.poll();
303                     ShreddedReader next = null;
304                     ShreddedBuffer nextBuffer = null; 
305                     
306                     assert !top.getBuffer().isAtEnd();
307                                                   
308                     if (queue.size() > 0) {
309                         next = queue.peek();
310                         nextBuffer = next.getBuffer();
311                         assert !nextBuffer.isAtEnd();
312                     }
313                     
314                     top.getBuffer().copyUntil(nextBuffer, processor);
315                     if (top.getBuffer().isAtEnd())
316                         top.fill();                 
317                         
318                     if (!top.getBuffer().isAtEnd())
319                         queue.add(top);
320                 }              
321                 
322                 if (closeOnExit)
323                     processor.close();
324             }
325 
326             public AdditionalDocumentText read() throws IOException {
327                 if (uninitialized)
328                     initialize();
329 
330                 AdditionalDocumentText result = null;
331 
332                 while (queue.size() > 0) {
333                     ShreddedReader top = queue.poll();
334                     result = top.read();
335 
336                     if (result != null) {
337                         if (top.getBuffer().isAtEnd())
338                             top.fill();
339 
340                         queue.offer(top);
341                         break;
342                     } 
343                 }
344 
345                 return result;
346             }
347         } 
348         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<AdditionalDocumentText>, ShreddedSource {      
349             public ShreddedProcessor processor;
350             ShreddedBuffer buffer;
351             AdditionalDocumentText last = new AdditionalDocumentText();         
352             long tupleCount = 0;
353             long bufferStartCount = 0;  
354             ArrayInput input;
355             
356             public ShreddedReader(ArrayInput input) {
357                 this.input = input; 
358                 this.buffer = new ShreddedBuffer();
359             }                               
360             
361             public ShreddedReader(ArrayInput input, int bufferSize) { 
362                 this.input = input;
363                 this.buffer = new ShreddedBuffer(bufferSize);
364             }
365                  
366             public final int compareTo(ShreddedReader other) {
367                 ShreddedBuffer otherBuffer = other.getBuffer();
368                 
369                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
370                     return 0;                 
371                 } else if (buffer.isAtEnd()) {
372                     return -1;
373                 } else if (otherBuffer.isAtEnd()) {
374                     return 1;
375                 }
376                                    
377                 int result = 0;
378                 do {
379                 } while (false);                                             
380                 
381                 return result;
382             }
383             
384             public final ShreddedBuffer getBuffer() {
385                 return buffer;
386             }                
387             
388             public final AdditionalDocumentText read() throws IOException {
389                 if (buffer.isAtEnd()) {
390                     fill();             
391                 
392                     if (buffer.isAtEnd()) {
393                         return null;
394                     }
395                 }
396                       
397                 assert !buffer.isAtEnd();
398                 AdditionalDocumentText result = new AdditionalDocumentText();
399                 
400                 result.identifier = buffer.getIdentifier();
401                 result.text = buffer.getText();
402                 
403                 buffer.incrementTuple();
404                 
405                 return result;
406             }           
407             
408             public final void fill() throws IOException {
409                 try {   
410                     buffer.reset();
411                     
412                     if (tupleCount != 0) {
413                         bufferStartCount = tupleCount;
414                     }
415                     
416                     while (!buffer.isFull()) {
417                         buffer.processTuple(input.readString(), input.readString());
418                         tupleCount++;
419                     }
420                 } catch(EOFException e) {}
421             }
422 
423 
424             public void run() throws IOException {
425                 while (true) {
426                     fill();
427                     
428                     if (buffer.isAtEnd())
429                         break;
430                     
431                     buffer.copyUntil(null, processor);
432                 }      
433                 processor.close();
434             }
435             
436             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
437                 if (processor instanceof ShreddedProcessor) {
438                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
439                 } else if (processor instanceof AdditionalDocumentText.Processor) {
440                     this.processor = new DuplicateEliminator(new TupleUnshredder((AdditionalDocumentText.Processor) processor));
441                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
442                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<AdditionalDocumentText>) processor));
443                 } else {
444                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
445                 }
446             }                                
447             
448             public Class<AdditionalDocumentText> getOutputClass() {
449                 return AdditionalDocumentText.class;
450             }                
451         }
452         
453         public static class DuplicateEliminator implements ShreddedProcessor {
454             public ShreddedProcessor processor;
455             AdditionalDocumentText last = new AdditionalDocumentText();
456                                            
457             public DuplicateEliminator() {}
458             public DuplicateEliminator(ShreddedProcessor processor) {
459                 this.processor = processor;
460             }
461             
462             public void setShreddedProcessor(ShreddedProcessor processor) {
463                 this.processor = processor;
464             }
465 
466           
467             
468                                
469             public void processTuple(String identifier, String text) throws IOException {
470                 processor.processTuple(identifier, text);
471             } 
472             
473             public void close() throws IOException {
474                 processor.close();
475             }                    
476         }
477         public static class TupleUnshredder implements ShreddedProcessor {
478             AdditionalDocumentText last = new AdditionalDocumentText();
479             public org.galagosearch.tupleflow.Processor<AdditionalDocumentText> processor;                               
480             
481             public TupleUnshredder(AdditionalDocumentText.Processor processor) {
482                 this.processor = processor;
483             }         
484             
485             public TupleUnshredder(org.galagosearch.tupleflow.Processor<AdditionalDocumentText> processor) {
486                 this.processor = processor;
487             }
488             
489             public AdditionalDocumentText clone(AdditionalDocumentText object) {
490                 AdditionalDocumentText result = new AdditionalDocumentText();
491                 if (object == null) return result;
492                 result.identifier = object.identifier; 
493                 result.text = object.text; 
494                 return result;
495             }                 
496             
497             
498             public void processTuple(String identifier, String text) throws IOException {
499                 last.identifier = identifier;
500                 last.text = text;
501                 processor.process(clone(last));
502             }               
503             
504             public void close() throws IOException {
505                 processor.close();
506             }
507         }     
508         public static class TupleShredder implements Processor {
509             AdditionalDocumentText last = new AdditionalDocumentText();
510             public ShreddedProcessor processor;
511             
512             public TupleShredder(ShreddedProcessor processor) {
513                 this.processor = processor;
514             }                              
515             
516             public AdditionalDocumentText clone(AdditionalDocumentText object) {
517                 AdditionalDocumentText result = new AdditionalDocumentText();
518                 if (object == null) return result;
519                 result.identifier = object.identifier; 
520                 result.text = object.text; 
521                 return result;
522             }                 
523             
524             public void process(AdditionalDocumentText object) throws IOException {                                                                                                                                                   
525                 boolean processAll = false;
526                 processor.processTuple(object.identifier, object.text);                                         
527             }
528                           
529             public Class<AdditionalDocumentText> getInputClass() {
530                 return AdditionalDocumentText.class;
531             }
532             
533             public void close() throws IOException {
534                 processor.close();
535             }                     
536         }
537     } 
538     public static class IdentifierOrder implements Order<AdditionalDocumentText> {
539         public int hash(AdditionalDocumentText object) {
540             int h = 0;
541             h += Utility.hash(object.identifier);
542             return h;
543         } 
544         public Comparator<AdditionalDocumentText> greaterThan() {
545             return new Comparator<AdditionalDocumentText>() {
546                 public int compare(AdditionalDocumentText one, AdditionalDocumentText two) {
547                     int result = 0;
548                     do {
549                         result = + Utility.compare(one.identifier, two.identifier);
550                         if(result != 0) break;
551                     } while (false);
552                     return -result;
553                 }
554             };
555         }     
556         public Comparator<AdditionalDocumentText> lessThan() {
557             return new Comparator<AdditionalDocumentText>() {
558                 public int compare(AdditionalDocumentText one, AdditionalDocumentText two) {
559                     int result = 0;
560                     do {
561                         result = + Utility.compare(one.identifier, two.identifier);
562                         if(result != 0) break;
563                     } while (false);
564                     return result;
565                 }
566             };
567         }     
568         public TypeReader<AdditionalDocumentText> orderedReader(ArrayInput _input) {
569             return new ShreddedReader(_input);
570         }    
571 
572         public TypeReader<AdditionalDocumentText> orderedReader(ArrayInput _input, int bufferSize) {
573             return new ShreddedReader(_input, bufferSize);
574         }    
575         public OrderedWriter<AdditionalDocumentText> orderedWriter(ArrayOutput _output) {
576             ShreddedWriter w = new ShreddedWriter(_output);
577             return new OrderedWriterClass(w); 
578         }                                    
579         public static class OrderedWriterClass extends OrderedWriter< AdditionalDocumentText > {
580             AdditionalDocumentText last = null;
581             ShreddedWriter shreddedWriter = null; 
582             
583             public OrderedWriterClass(ShreddedWriter s) {
584                 this.shreddedWriter = s;
585             }
586             
587             public void process(AdditionalDocumentText object) throws IOException {
588                boolean processAll = false;
589                if (processAll || last == null || 0 != Utility.compare(object.identifier, last.identifier)) { processAll = true; shreddedWriter.processIdentifier(object.identifier); }
590                shreddedWriter.processTuple(object.text);
591                last = object;
592             }           
593                  
594             public void close() throws IOException {
595                 shreddedWriter.close();
596             }
597             
598             public Class<AdditionalDocumentText> getInputClass() {
599                 return AdditionalDocumentText.class;
600             }
601         } 
602         public ReaderSource<AdditionalDocumentText> orderedCombiner(Collection<TypeReader<AdditionalDocumentText>> readers, boolean closeOnExit) {
603             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
604             
605             for (TypeReader<AdditionalDocumentText> reader : readers) {
606                 shreddedReaders.add((ShreddedReader)reader);
607             }
608             
609             return new ShreddedCombiner(shreddedReaders, closeOnExit);
610         }                  
611         public AdditionalDocumentText clone(AdditionalDocumentText object) {
612             AdditionalDocumentText result = new AdditionalDocumentText();
613             if (object == null) return result;
614             result.identifier = object.identifier; 
615             result.text = object.text; 
616             return result;
617         }                 
618         public Class<AdditionalDocumentText> getOrderedClass() {
619             return AdditionalDocumentText.class;
620         }                           
621         public String[] getOrderSpec() {
622             return new String[] {"+identifier"};
623         }
624 
625         public static String getSpecString() {
626             return "+identifier";
627         }
628                            
629         public interface ShreddedProcessor extends Step {
630             public void processIdentifier(String identifier) throws IOException;
631             public void processTuple(String text) throws IOException;
632             public void close() throws IOException;
633         }    
634         public interface ShreddedSource extends Step {
635         }                                              
636         
637         public static class ShreddedWriter implements ShreddedProcessor {
638             ArrayOutput output;
639             ShreddedBuffer buffer = new ShreddedBuffer();
640             String lastIdentifier;
641             boolean lastFlush = false;
642             
643             public ShreddedWriter(ArrayOutput output) {
644                 this.output = output;
645             }                        
646             
647             public void close() throws IOException {
648                 flush();
649             }
650             
651             public void processIdentifier(String identifier) {
652                 lastIdentifier = identifier;
653                 buffer.processIdentifier(identifier);
654             }
655             public final void processTuple(String text) throws IOException {
656                 if (lastFlush) {
657                     if(buffer.identifiers.size() == 0) buffer.processIdentifier(lastIdentifier);
658                     lastFlush = false;
659                 }
660                 buffer.processTuple(text);
661                 if (buffer.isFull())
662                     flush();
663             }
664             public final void flushTuples(int pauseIndex) throws IOException {
665                 
666                 while (buffer.getReadIndex() < pauseIndex) {
667                            
668                     output.writeString(buffer.getText());
669                     buffer.incrementTuple();
670                 }
671             }  
672             public final void flushIdentifier(int pauseIndex) throws IOException {
673                 while (buffer.getReadIndex() < pauseIndex) {
674                     int nextPause = buffer.getIdentifierEndIndex();
675                     int count = nextPause - buffer.getReadIndex();
676                     
677                     output.writeString(buffer.getIdentifier());
678                     output.writeInt(count);
679                     buffer.incrementIdentifier();
680                       
681                     flushTuples(nextPause);
682                     assert nextPause == buffer.getReadIndex();
683                 }
684             }
685             public void flush() throws IOException { 
686                 flushIdentifier(buffer.getWriteIndex());
687                 buffer.reset(); 
688                 lastFlush = true;
689             }                           
690         }
691         public static class ShreddedBuffer {
692             ArrayList<String> identifiers = new ArrayList();
693             ArrayList<Integer> identifierTupleIdx = new ArrayList();
694             int identifierReadIdx = 0;
695                             
696             String[] texts;
697             int writeTupleIndex = 0;
698             int readTupleIndex = 0;
699             int batchSize;
700 
701             public ShreddedBuffer(int batchSize) {
702                 this.batchSize = batchSize;
703 
704                 texts = new String[batchSize];
705             }                              
706 
707             public ShreddedBuffer() {    
708                 this(10000);
709             }                                                                                                                    
710             
711             public void processIdentifier(String identifier) {
712                 identifiers.add(identifier);
713                 identifierTupleIdx.add(writeTupleIndex);
714             }                                      
715             public void processTuple(String text) {
716                 assert identifiers.size() > 0;
717                 texts[writeTupleIndex] = text;
718                 writeTupleIndex++;
719             }
720             public void resetData() {
721                 identifiers.clear();
722                 identifierTupleIdx.clear();
723                 writeTupleIndex = 0;
724             }                  
725                                  
726             public void resetRead() {
727                 readTupleIndex = 0;
728                 identifierReadIdx = 0;
729             } 
730 
731             public void reset() {
732                 resetData();
733                 resetRead();
734             } 
735             public boolean isFull() {
736                 return writeTupleIndex >= batchSize;
737             }
738 
739             public boolean isEmpty() {
740                 return writeTupleIndex == 0;
741             }                          
742 
743             public boolean isAtEnd() {
744                 return readTupleIndex >= writeTupleIndex;
745             }           
746             public void incrementIdentifier() {
747                 identifierReadIdx++;  
748             }                                                                                              
749 
750             public void autoIncrementIdentifier() {
751                 while (readTupleIndex >= getIdentifierEndIndex() && readTupleIndex < writeTupleIndex)
752                     identifierReadIdx++;
753             }                 
754             public void incrementTuple() {
755                 readTupleIndex++;
756             }                    
757             public int getIdentifierEndIndex() {
758                 if ((identifierReadIdx+1) >= identifierTupleIdx.size())
759                     return writeTupleIndex;
760                 return identifierTupleIdx.get(identifierReadIdx+1);
761             }
762             public int getReadIndex() {
763                 return readTupleIndex;
764             }   
765 
766             public int getWriteIndex() {
767                 return writeTupleIndex;
768             } 
769             public String getIdentifier() {
770                 assert readTupleIndex < writeTupleIndex;
771                 assert identifierReadIdx < identifiers.size();
772                 
773                 return identifiers.get(identifierReadIdx);
774             }
775             public String getText() {
776                 assert readTupleIndex < writeTupleIndex;
777                 return texts[readTupleIndex];
778             }                                         
779             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
780                 while (getReadIndex() < endIndex) {
781                    output.processTuple(getText());
782                    incrementTuple();
783                 }
784             }                                                                           
785             public void copyUntilIndexIdentifier(int endIndex, ShreddedProcessor output) throws IOException {
786                 while (getReadIndex() < endIndex) {
787                     output.processIdentifier(getIdentifier());
788                     assert getIdentifierEndIndex() <= endIndex;
789                     copyTuples(getIdentifierEndIndex(), output);
790                     incrementIdentifier();
791                 }
792             }  
793             public void copyUntilIdentifier(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
794                 while (!isAtEnd()) {
795                     if (other != null) {   
796                         assert !other.isAtEnd();
797                         int c = + Utility.compare(getIdentifier(), other.getIdentifier());
798                     
799                         if (c > 0) {
800                             break;   
801                         }
802                         
803                         output.processIdentifier(getIdentifier());
804                                       
805                         copyTuples(getIdentifierEndIndex(), output);
806                     } else {
807                         output.processIdentifier(getIdentifier());
808                         copyTuples(getIdentifierEndIndex(), output);
809                     }
810                     incrementIdentifier();  
811                     
812                
813                 }
814             }
815             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
816                 copyUntilIdentifier(other, output);
817             }
818             
819         }                         
820         public static class ShreddedCombiner implements ReaderSource<AdditionalDocumentText>, ShreddedSource {   
821             public ShreddedProcessor processor;
822             Collection<ShreddedReader> readers;       
823             boolean closeOnExit = false;
824             boolean uninitialized = true;
825             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
826             
827             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
828                 this.readers = readers;                                                       
829                 this.closeOnExit = closeOnExit;
830             }
831                                   
832             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
833                 if (processor instanceof ShreddedProcessor) {
834                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
835                 } else if (processor instanceof AdditionalDocumentText.Processor) {
836                     this.processor = new DuplicateEliminator(new TupleUnshredder((AdditionalDocumentText.Processor) processor));
837                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
838                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<AdditionalDocumentText>) processor));
839                 } else {
840                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
841                 }
842             }                                
843             
844             public Class<AdditionalDocumentText> getOutputClass() {
845                 return AdditionalDocumentText.class;
846             }
847             
848             public void initialize() throws IOException {
849                 for (ShreddedReader reader : readers) {
850                     reader.fill();                                        
851                     
852                     if (!reader.getBuffer().isAtEnd())
853                         queue.add(reader);
854                 }   
855 
856                 uninitialized = false;
857             }
858 
859             public void run() throws IOException {
860                 initialize();
861                
862                 while (queue.size() > 0) {
863                     ShreddedReader top = queue.poll();
864                     ShreddedReader next = null;
865                     ShreddedBuffer nextBuffer = null; 
866                     
867                     assert !top.getBuffer().isAtEnd();
868                                                   
869                     if (queue.size() > 0) {
870                         next = queue.peek();
871                         nextBuffer = next.getBuffer();
872                         assert !nextBuffer.isAtEnd();
873                     }
874                     
875                     top.getBuffer().copyUntil(nextBuffer, processor);
876                     if (top.getBuffer().isAtEnd())
877                         top.fill();                 
878                         
879                     if (!top.getBuffer().isAtEnd())
880                         queue.add(top);
881                 }              
882                 
883                 if (closeOnExit)
884                     processor.close();
885             }
886 
887             public AdditionalDocumentText read() throws IOException {
888                 if (uninitialized)
889                     initialize();
890 
891                 AdditionalDocumentText result = null;
892 
893                 while (queue.size() > 0) {
894                     ShreddedReader top = queue.poll();
895                     result = top.read();
896 
897                     if (result != null) {
898                         if (top.getBuffer().isAtEnd())
899                             top.fill();
900 
901                         queue.offer(top);
902                         break;
903                     } 
904                 }
905 
906                 return result;
907             }
908         } 
909         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<AdditionalDocumentText>, ShreddedSource {      
910             public ShreddedProcessor processor;
911             ShreddedBuffer buffer;
912             AdditionalDocumentText last = new AdditionalDocumentText();         
913             long updateIdentifierCount = -1;
914             long tupleCount = 0;
915             long bufferStartCount = 0;  
916             ArrayInput input;
917             
918             public ShreddedReader(ArrayInput input) {
919                 this.input = input; 
920                 this.buffer = new ShreddedBuffer();
921             }                               
922             
923             public ShreddedReader(ArrayInput input, int bufferSize) { 
924                 this.input = input;
925                 this.buffer = new ShreddedBuffer(bufferSize);
926             }
927                  
928             public final int compareTo(ShreddedReader other) {
929                 ShreddedBuffer otherBuffer = other.getBuffer();
930                 
931                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
932                     return 0;                 
933                 } else if (buffer.isAtEnd()) {
934                     return -1;
935                 } else if (otherBuffer.isAtEnd()) {
936                     return 1;
937                 }
938                                    
939                 int result = 0;
940                 do {
941                     result = + Utility.compare(buffer.getIdentifier(), otherBuffer.getIdentifier());
942                     if(result != 0) break;
943                 } while (false);                                             
944                 
945                 return result;
946             }
947             
948             public final ShreddedBuffer getBuffer() {
949                 return buffer;
950             }                
951             
952             public final AdditionalDocumentText read() throws IOException {
953                 if (buffer.isAtEnd()) {
954                     fill();             
955                 
956                     if (buffer.isAtEnd()) {
957                         return null;
958                     }
959                 }
960                       
961                 assert !buffer.isAtEnd();
962                 AdditionalDocumentText result = new AdditionalDocumentText();
963                 
964                 result.identifier = buffer.getIdentifier();
965                 result.text = buffer.getText();
966                 
967                 buffer.incrementTuple();
968                 buffer.autoIncrementIdentifier();
969                 
970                 return result;
971             }           
972             
973             public final void fill() throws IOException {
974                 try {   
975                     buffer.reset();
976                     
977                     if (tupleCount != 0) {
978                                                       
979                         if(updateIdentifierCount - tupleCount > 0) {
980                             buffer.identifiers.add(last.identifier);
981                             buffer.identifierTupleIdx.add((int) (updateIdentifierCount - tupleCount));
982                         }
983                         bufferStartCount = tupleCount;
984                     }
985                     
986                     while (!buffer.isFull()) {
987                         updateIdentifier();
988                         buffer.processTuple(input.readString());
989                         tupleCount++;
990                     }
991                 } catch(EOFException e) {}
992             }
993 
994             public final void updateIdentifier() throws IOException {
995                 if (updateIdentifierCount > tupleCount)
996                     return;
997                      
998                 last.identifier = input.readString();
999                 updateIdentifierCount = tupleCount + input.readInt();
1000                                       
1001                 buffer.processIdentifier(last.identifier);
1002             }
1003 
1004             public void run() throws IOException {
1005                 while (true) {
1006                     fill();
1007                     
1008                     if (buffer.isAtEnd())
1009                         break;
1010                     
1011                     buffer.copyUntil(null, processor);
1012                 }      
1013                 processor.close();
1014             }
1015             
1016             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1017                 if (processor instanceof ShreddedProcessor) {
1018                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1019                 } else if (processor instanceof AdditionalDocumentText.Processor) {
1020                     this.processor = new DuplicateEliminator(new TupleUnshredder((AdditionalDocumentText.Processor) processor));
1021                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1022                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<AdditionalDocumentText>) processor));
1023                 } else {
1024                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1025                 }
1026             }                                
1027             
1028             public Class<AdditionalDocumentText> getOutputClass() {
1029                 return AdditionalDocumentText.class;
1030             }                
1031         }
1032         
1033         public static class DuplicateEliminator implements ShreddedProcessor {
1034             public ShreddedProcessor processor;
1035             AdditionalDocumentText last = new AdditionalDocumentText();
1036             boolean identifierProcess = true;
1037                                            
1038             public DuplicateEliminator() {}
1039             public DuplicateEliminator(ShreddedProcessor processor) {
1040                 this.processor = processor;
1041             }
1042             
1043             public void setShreddedProcessor(ShreddedProcessor processor) {
1044                 this.processor = processor;
1045             }
1046 
1047             public void processIdentifier(String identifier) throws IOException {  
1048                 if (identifierProcess || Utility.compare(identifier, last.identifier) != 0) {
1049                     last.identifier = identifier;
1050                     processor.processIdentifier(identifier);
1051                     identifierProcess = false;
1052                 }
1053             }  
1054             
1055             public void resetIdentifier() {
1056                  identifierProcess = true;
1057             }                                                
1058                                
1059             public void processTuple(String text) throws IOException {
1060                 processor.processTuple(text);
1061             } 
1062             
1063             public void close() throws IOException {
1064                 processor.close();
1065             }                    
1066         }
1067         public static class TupleUnshredder implements ShreddedProcessor {
1068             AdditionalDocumentText last = new AdditionalDocumentText();
1069             public org.galagosearch.tupleflow.Processor<AdditionalDocumentText> processor;                               
1070             
1071             public TupleUnshredder(AdditionalDocumentText.Processor processor) {
1072                 this.processor = processor;
1073             }         
1074             
1075             public TupleUnshredder(org.galagosearch.tupleflow.Processor<AdditionalDocumentText> processor) {
1076                 this.processor = processor;
1077             }
1078             
1079             public AdditionalDocumentText clone(AdditionalDocumentText object) {
1080                 AdditionalDocumentText result = new AdditionalDocumentText();
1081                 if (object == null) return result;
1082                 result.identifier = object.identifier; 
1083                 result.text = object.text; 
1084                 return result;
1085             }                 
1086             
1087             public void processIdentifier(String identifier) throws IOException {
1088                 last.identifier = identifier;
1089             }   
1090                 
1091             
1092             public void processTuple(String text) throws IOException {
1093                 last.text = text;
1094                 processor.process(clone(last));
1095             }               
1096             
1097             public void close() throws IOException {
1098                 processor.close();
1099             }
1100         }     
1101         public static class TupleShredder implements Processor {
1102             AdditionalDocumentText last = new AdditionalDocumentText();
1103             public ShreddedProcessor processor;
1104             
1105             public TupleShredder(ShreddedProcessor processor) {
1106                 this.processor = processor;
1107             }                              
1108             
1109             public AdditionalDocumentText clone(AdditionalDocumentText object) {
1110                 AdditionalDocumentText result = new AdditionalDocumentText();
1111                 if (object == null) return result;
1112                 result.identifier = object.identifier; 
1113                 result.text = object.text; 
1114                 return result;
1115             }                 
1116             
1117             public void process(AdditionalDocumentText object) throws IOException {                                                                                                                                                   
1118                 boolean processAll = false;
1119                 if(last == null || Utility.compare(last.identifier, object.identifier) != 0 || processAll) { processor.processIdentifier(object.identifier); processAll = true; }
1120                 processor.processTuple(object.text);                                         
1121             }
1122                           
1123             public Class<AdditionalDocumentText> getInputClass() {
1124                 return AdditionalDocumentText.class;
1125             }
1126             
1127             public void close() throws IOException {
1128                 processor.close();
1129             }                     
1130         }
1131     } 
1132 }