View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class DocumentWordPosition implements Type<DocumentWordPosition> {
25      public String document;
26      public byte[] word;
27      public int position; 
28      
29      public DocumentWordPosition() {}
30      public DocumentWordPosition(String document, byte[] word, int position) {
31          this.document = document;
32          this.word = word;
33          this.position = position;
34      }  
35      
36      public String toString() {
37          try {
38              return String.format("%s,%s,%d",
39                                     document, new String(word, "UTF-8"), position);
40          } catch(UnsupportedEncodingException e) {
41              throw new RuntimeException("Couldn't convert string to UTF-8.");
42          }
43      } 
44  
45      public Order<DocumentWordPosition> getOrder(String... spec) {
46          if (Arrays.equals(spec, new String[] { "+document" })) {
47              return new DocumentOrder();
48          }
49          if (Arrays.equals(spec, new String[] { "+document", "+word", "+position" })) {
50              return new DocumentWordPositionOrder();
51          }
52          return null;
53      } 
54        
55      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentWordPosition> {
56          public void process(DocumentWordPosition object) throws IOException;
57          public void close() throws IOException;
58      }                        
59      public interface Source extends Step {
60      }
61      public static class DocumentOrder implements Order<DocumentWordPosition> {
62          public int hash(DocumentWordPosition object) {
63              int h = 0;
64              h += Utility.hash(object.document);
65              return h;
66          } 
67          public Comparator<DocumentWordPosition> greaterThan() {
68              return new Comparator<DocumentWordPosition>() {
69                  public int compare(DocumentWordPosition one, DocumentWordPosition two) {
70                      int result = 0;
71                      do {
72                          result = + Utility.compare(one.document, two.document);
73                          if(result != 0) break;
74                      } while (false);
75                      return -result;
76                  }
77              };
78          }     
79          public Comparator<DocumentWordPosition> lessThan() {
80              return new Comparator<DocumentWordPosition>() {
81                  public int compare(DocumentWordPosition one, DocumentWordPosition two) {
82                      int result = 0;
83                      do {
84                          result = + Utility.compare(one.document, two.document);
85                          if(result != 0) break;
86                      } while (false);
87                      return result;
88                  }
89              };
90          }     
91          public TypeReader<DocumentWordPosition> orderedReader(ArrayInput _input) {
92              return new ShreddedReader(_input);
93          }    
94  
95          public TypeReader<DocumentWordPosition> orderedReader(ArrayInput _input, int bufferSize) {
96              return new ShreddedReader(_input, bufferSize);
97          }    
98          public OrderedWriter<DocumentWordPosition> orderedWriter(ArrayOutput _output) {
99              ShreddedWriter w = new ShreddedWriter(_output);
100             return new OrderedWriterClass(w); 
101         }                                    
102         public static class OrderedWriterClass extends OrderedWriter< DocumentWordPosition > {
103             DocumentWordPosition last = null;
104             ShreddedWriter shreddedWriter = null; 
105             
106             public OrderedWriterClass(ShreddedWriter s) {
107                 this.shreddedWriter = s;
108             }
109             
110             public void process(DocumentWordPosition object) throws IOException {
111                boolean processAll = false;
112                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
113                shreddedWriter.processTuple(object.word, object.position);
114                last = object;
115             }           
116                  
117             public void close() throws IOException {
118                 shreddedWriter.close();
119             }
120             
121             public Class<DocumentWordPosition> getInputClass() {
122                 return DocumentWordPosition.class;
123             }
124         } 
125         public ReaderSource<DocumentWordPosition> orderedCombiner(Collection<TypeReader<DocumentWordPosition>> readers, boolean closeOnExit) {
126             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
127             
128             for (TypeReader<DocumentWordPosition> reader : readers) {
129                 shreddedReaders.add((ShreddedReader)reader);
130             }
131             
132             return new ShreddedCombiner(shreddedReaders, closeOnExit);
133         }                  
134         public DocumentWordPosition clone(DocumentWordPosition object) {
135             DocumentWordPosition result = new DocumentWordPosition();
136             if (object == null) return result;
137             result.document = object.document; 
138             result.word = object.word; 
139             result.position = object.position; 
140             return result;
141         }                 
142         public Class<DocumentWordPosition> getOrderedClass() {
143             return DocumentWordPosition.class;
144         }                           
145         public String[] getOrderSpec() {
146             return new String[] {"+document"};
147         }
148 
149         public static String getSpecString() {
150             return "+document";
151         }
152                            
153         public interface ShreddedProcessor extends Step {
154             public void processDocument(String document) throws IOException;
155             public void processTuple(byte[] word, int position) throws IOException;
156             public void close() throws IOException;
157         }    
158         public interface ShreddedSource extends Step {
159         }                                              
160         
161         public static class ShreddedWriter implements ShreddedProcessor {
162             ArrayOutput output;
163             ShreddedBuffer buffer = new ShreddedBuffer();
164             String lastDocument;
165             boolean lastFlush = false;
166             
167             public ShreddedWriter(ArrayOutput output) {
168                 this.output = output;
169             }                        
170             
171             public void close() throws IOException {
172                 flush();
173             }
174             
175             public void processDocument(String document) {
176                 lastDocument = document;
177                 buffer.processDocument(document);
178             }
179             public final void processTuple(byte[] word, int position) throws IOException {
180                 if (lastFlush) {
181                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
182                     lastFlush = false;
183                 }
184                 buffer.processTuple(word, position);
185                 if (buffer.isFull())
186                     flush();
187             }
188             public final void flushTuples(int pauseIndex) throws IOException {
189                 
190                 while (buffer.getReadIndex() < pauseIndex) {
191                            
192                     output.writeBytes(buffer.getWord());
193                     output.writeInt(buffer.getPosition());
194                     buffer.incrementTuple();
195                 }
196             }  
197             public final void flushDocument(int pauseIndex) throws IOException {
198                 while (buffer.getReadIndex() < pauseIndex) {
199                     int nextPause = buffer.getDocumentEndIndex();
200                     int count = nextPause - buffer.getReadIndex();
201                     
202                     output.writeString(buffer.getDocument());
203                     output.writeInt(count);
204                     buffer.incrementDocument();
205                       
206                     flushTuples(nextPause);
207                     assert nextPause == buffer.getReadIndex();
208                 }
209             }
210             public void flush() throws IOException { 
211                 flushDocument(buffer.getWriteIndex());
212                 buffer.reset(); 
213                 lastFlush = true;
214             }                           
215         }
216         public static class ShreddedBuffer {
217             ArrayList<String> documents = new ArrayList();
218             ArrayList<Integer> documentTupleIdx = new ArrayList();
219             int documentReadIdx = 0;
220                             
221             byte[][] words;
222             int[] positions;
223             int writeTupleIndex = 0;
224             int readTupleIndex = 0;
225             int batchSize;
226 
227             public ShreddedBuffer(int batchSize) {
228                 this.batchSize = batchSize;
229 
230                 words = new byte[batchSize][];
231                 positions = new int[batchSize];
232             }                              
233 
234             public ShreddedBuffer() {    
235                 this(10000);
236             }                                                                                                                    
237             
238             public void processDocument(String document) {
239                 documents.add(document);
240                 documentTupleIdx.add(writeTupleIndex);
241             }                                      
242             public void processTuple(byte[] word, int position) {
243                 assert documents.size() > 0;
244                 words[writeTupleIndex] = word;
245                 positions[writeTupleIndex] = position;
246                 writeTupleIndex++;
247             }
248             public void resetData() {
249                 documents.clear();
250                 documentTupleIdx.clear();
251                 writeTupleIndex = 0;
252             }                  
253                                  
254             public void resetRead() {
255                 readTupleIndex = 0;
256                 documentReadIdx = 0;
257             } 
258 
259             public void reset() {
260                 resetData();
261                 resetRead();
262             } 
263             public boolean isFull() {
264                 return writeTupleIndex >= batchSize;
265             }
266 
267             public boolean isEmpty() {
268                 return writeTupleIndex == 0;
269             }                          
270 
271             public boolean isAtEnd() {
272                 return readTupleIndex >= writeTupleIndex;
273             }           
274             public void incrementDocument() {
275                 documentReadIdx++;  
276             }                                                                                              
277 
278             public void autoIncrementDocument() {
279                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
280                     documentReadIdx++;
281             }                 
282             public void incrementTuple() {
283                 readTupleIndex++;
284             }                    
285             public int getDocumentEndIndex() {
286                 if ((documentReadIdx+1) >= documentTupleIdx.size())
287                     return writeTupleIndex;
288                 return documentTupleIdx.get(documentReadIdx+1);
289             }
290             public int getReadIndex() {
291                 return readTupleIndex;
292             }   
293 
294             public int getWriteIndex() {
295                 return writeTupleIndex;
296             } 
297             public String getDocument() {
298                 assert readTupleIndex < writeTupleIndex;
299                 assert documentReadIdx < documents.size();
300                 
301                 return documents.get(documentReadIdx);
302             }
303             public byte[] getWord() {
304                 assert readTupleIndex < writeTupleIndex;
305                 return words[readTupleIndex];
306             }                                         
307             public int getPosition() {
308                 assert readTupleIndex < writeTupleIndex;
309                 return positions[readTupleIndex];
310             }                                         
311             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
312                 while (getReadIndex() < endIndex) {
313                    output.processTuple(getWord(), getPosition());
314                    incrementTuple();
315                 }
316             }                                                                           
317             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
318                 while (getReadIndex() < endIndex) {
319                     output.processDocument(getDocument());
320                     assert getDocumentEndIndex() <= endIndex;
321                     copyTuples(getDocumentEndIndex(), output);
322                     incrementDocument();
323                 }
324             }  
325             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
326                 while (!isAtEnd()) {
327                     if (other != null) {   
328                         assert !other.isAtEnd();
329                         int c = + Utility.compare(getDocument(), other.getDocument());
330                     
331                         if (c > 0) {
332                             break;   
333                         }
334                         
335                         output.processDocument(getDocument());
336                                       
337                         copyTuples(getDocumentEndIndex(), output);
338                     } else {
339                         output.processDocument(getDocument());
340                         copyTuples(getDocumentEndIndex(), output);
341                     }
342                     incrementDocument();  
343                     
344                
345                 }
346             }
347             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
348                 copyUntilDocument(other, output);
349             }
350             
351         }                         
352         public static class ShreddedCombiner implements ReaderSource<DocumentWordPosition>, ShreddedSource {   
353             public ShreddedProcessor processor;
354             Collection<ShreddedReader> readers;       
355             boolean closeOnExit = false;
356             boolean uninitialized = true;
357             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
358             
359             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
360                 this.readers = readers;                                                       
361                 this.closeOnExit = closeOnExit;
362             }
363                                   
364             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
365                 if (processor instanceof ShreddedProcessor) {
366                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
367                 } else if (processor instanceof DocumentWordPosition.Processor) {
368                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordPosition.Processor) processor));
369                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
370                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordPosition>) processor));
371                 } else {
372                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
373                 }
374             }                                
375             
376             public Class<DocumentWordPosition> getOutputClass() {
377                 return DocumentWordPosition.class;
378             }
379             
380             public void initialize() throws IOException {
381                 for (ShreddedReader reader : readers) {
382                     reader.fill();                                        
383                     
384                     if (!reader.getBuffer().isAtEnd())
385                         queue.add(reader);
386                 }   
387 
388                 uninitialized = false;
389             }
390 
391             public void run() throws IOException {
392                 initialize();
393                
394                 while (queue.size() > 0) {
395                     ShreddedReader top = queue.poll();
396                     ShreddedReader next = null;
397                     ShreddedBuffer nextBuffer = null; 
398                     
399                     assert !top.getBuffer().isAtEnd();
400                                                   
401                     if (queue.size() > 0) {
402                         next = queue.peek();
403                         nextBuffer = next.getBuffer();
404                         assert !nextBuffer.isAtEnd();
405                     }
406                     
407                     top.getBuffer().copyUntil(nextBuffer, processor);
408                     if (top.getBuffer().isAtEnd())
409                         top.fill();                 
410                         
411                     if (!top.getBuffer().isAtEnd())
412                         queue.add(top);
413                 }              
414                 
415                 if (closeOnExit)
416                     processor.close();
417             }
418 
419             public DocumentWordPosition read() throws IOException {
420                 if (uninitialized)
421                     initialize();
422 
423                 DocumentWordPosition result = null;
424 
425                 while (queue.size() > 0) {
426                     ShreddedReader top = queue.poll();
427                     result = top.read();
428 
429                     if (result != null) {
430                         if (top.getBuffer().isAtEnd())
431                             top.fill();
432 
433                         queue.offer(top);
434                         break;
435                     } 
436                 }
437 
438                 return result;
439             }
440         } 
441         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordPosition>, ShreddedSource {      
442             public ShreddedProcessor processor;
443             ShreddedBuffer buffer;
444             DocumentWordPosition last = new DocumentWordPosition();         
445             long updateDocumentCount = -1;
446             long tupleCount = 0;
447             long bufferStartCount = 0;  
448             ArrayInput input;
449             
450             public ShreddedReader(ArrayInput input) {
451                 this.input = input; 
452                 this.buffer = new ShreddedBuffer();
453             }                               
454             
455             public ShreddedReader(ArrayInput input, int bufferSize) { 
456                 this.input = input;
457                 this.buffer = new ShreddedBuffer(bufferSize);
458             }
459                  
460             public final int compareTo(ShreddedReader other) {
461                 ShreddedBuffer otherBuffer = other.getBuffer();
462                 
463                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
464                     return 0;                 
465                 } else if (buffer.isAtEnd()) {
466                     return -1;
467                 } else if (otherBuffer.isAtEnd()) {
468                     return 1;
469                 }
470                                    
471                 int result = 0;
472                 do {
473                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
474                     if(result != 0) break;
475                 } while (false);                                             
476                 
477                 return result;
478             }
479             
480             public final ShreddedBuffer getBuffer() {
481                 return buffer;
482             }                
483             
484             public final DocumentWordPosition read() throws IOException {
485                 if (buffer.isAtEnd()) {
486                     fill();             
487                 
488                     if (buffer.isAtEnd()) {
489                         return null;
490                     }
491                 }
492                       
493                 assert !buffer.isAtEnd();
494                 DocumentWordPosition result = new DocumentWordPosition();
495                 
496                 result.document = buffer.getDocument();
497                 result.word = buffer.getWord();
498                 result.position = buffer.getPosition();
499                 
500                 buffer.incrementTuple();
501                 buffer.autoIncrementDocument();
502                 
503                 return result;
504             }           
505             
506             public final void fill() throws IOException {
507                 try {   
508                     buffer.reset();
509                     
510                     if (tupleCount != 0) {
511                                                       
512                         if(updateDocumentCount - tupleCount > 0) {
513                             buffer.documents.add(last.document);
514                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
515                         }
516                         bufferStartCount = tupleCount;
517                     }
518                     
519                     while (!buffer.isFull()) {
520                         updateDocument();
521                         buffer.processTuple(input.readBytes(), input.readInt());
522                         tupleCount++;
523                     }
524                 } catch(EOFException e) {}
525             }
526 
527             public final void updateDocument() throws IOException {
528                 if (updateDocumentCount > tupleCount)
529                     return;
530                      
531                 last.document = input.readString();
532                 updateDocumentCount = tupleCount + input.readInt();
533                                       
534                 buffer.processDocument(last.document);
535             }
536 
537             public void run() throws IOException {
538                 while (true) {
539                     fill();
540                     
541                     if (buffer.isAtEnd())
542                         break;
543                     
544                     buffer.copyUntil(null, processor);
545                 }      
546                 processor.close();
547             }
548             
549             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
550                 if (processor instanceof ShreddedProcessor) {
551                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
552                 } else if (processor instanceof DocumentWordPosition.Processor) {
553                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordPosition.Processor) processor));
554                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
555                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordPosition>) processor));
556                 } else {
557                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
558                 }
559             }                                
560             
561             public Class<DocumentWordPosition> getOutputClass() {
562                 return DocumentWordPosition.class;
563             }                
564         }
565         
566         public static class DuplicateEliminator implements ShreddedProcessor {
567             public ShreddedProcessor processor;
568             DocumentWordPosition last = new DocumentWordPosition();
569             boolean documentProcess = true;
570                                            
571             public DuplicateEliminator() {}
572             public DuplicateEliminator(ShreddedProcessor processor) {
573                 this.processor = processor;
574             }
575             
576             public void setShreddedProcessor(ShreddedProcessor processor) {
577                 this.processor = processor;
578             }
579 
580             public void processDocument(String document) throws IOException {  
581                 if (documentProcess || Utility.compare(document, last.document) != 0) {
582                     last.document = document;
583                     processor.processDocument(document);
584                     documentProcess = false;
585                 }
586             }  
587             
588             public void resetDocument() {
589                  documentProcess = true;
590             }                                                
591                                
592             public void processTuple(byte[] word, int position) throws IOException {
593                 processor.processTuple(word, position);
594             } 
595             
596             public void close() throws IOException {
597                 processor.close();
598             }                    
599         }
600         public static class TupleUnshredder implements ShreddedProcessor {
601             DocumentWordPosition last = new DocumentWordPosition();
602             public org.galagosearch.tupleflow.Processor<DocumentWordPosition> processor;                               
603             
604             public TupleUnshredder(DocumentWordPosition.Processor processor) {
605                 this.processor = processor;
606             }         
607             
608             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordPosition> processor) {
609                 this.processor = processor;
610             }
611             
612             public DocumentWordPosition clone(DocumentWordPosition object) {
613                 DocumentWordPosition result = new DocumentWordPosition();
614                 if (object == null) return result;
615                 result.document = object.document; 
616                 result.word = object.word; 
617                 result.position = object.position; 
618                 return result;
619             }                 
620             
621             public void processDocument(String document) throws IOException {
622                 last.document = document;
623             }   
624                 
625             
626             public void processTuple(byte[] word, int position) throws IOException {
627                 last.word = word;
628                 last.position = position;
629                 processor.process(clone(last));
630             }               
631             
632             public void close() throws IOException {
633                 processor.close();
634             }
635         }     
636         public static class TupleShredder implements Processor {
637             DocumentWordPosition last = new DocumentWordPosition();
638             public ShreddedProcessor processor;
639             
640             public TupleShredder(ShreddedProcessor processor) {
641                 this.processor = processor;
642             }                              
643             
644             public DocumentWordPosition clone(DocumentWordPosition object) {
645                 DocumentWordPosition result = new DocumentWordPosition();
646                 if (object == null) return result;
647                 result.document = object.document; 
648                 result.word = object.word; 
649                 result.position = object.position; 
650                 return result;
651             }                 
652             
653             public void process(DocumentWordPosition object) throws IOException {                                                                                                                                                   
654                 boolean processAll = false;
655                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
656                 processor.processTuple(object.word, object.position);                                         
657             }
658                           
659             public Class<DocumentWordPosition> getInputClass() {
660                 return DocumentWordPosition.class;
661             }
662             
663             public void close() throws IOException {
664                 processor.close();
665             }                     
666         }
667     } 
668     public static class DocumentWordPositionOrder implements Order<DocumentWordPosition> {
669         public int hash(DocumentWordPosition object) {
670             int h = 0;
671             h += Utility.hash(object.document);
672             h += Utility.hash(object.word);
673             h += Utility.hash(object.position);
674             return h;
675         } 
676         public Comparator<DocumentWordPosition> greaterThan() {
677             return new Comparator<DocumentWordPosition>() {
678                 public int compare(DocumentWordPosition one, DocumentWordPosition two) {
679                     int result = 0;
680                     do {
681                         result = + Utility.compare(one.document, two.document);
682                         if(result != 0) break;
683                         result = + Utility.compare(one.word, two.word);
684                         if(result != 0) break;
685                         result = + Utility.compare(one.position, two.position);
686                         if(result != 0) break;
687                     } while (false);
688                     return -result;
689                 }
690             };
691         }     
692         public Comparator<DocumentWordPosition> lessThan() {
693             return new Comparator<DocumentWordPosition>() {
694                 public int compare(DocumentWordPosition one, DocumentWordPosition two) {
695                     int result = 0;
696                     do {
697                         result = + Utility.compare(one.document, two.document);
698                         if(result != 0) break;
699                         result = + Utility.compare(one.word, two.word);
700                         if(result != 0) break;
701                         result = + Utility.compare(one.position, two.position);
702                         if(result != 0) break;
703                     } while (false);
704                     return result;
705                 }
706             };
707         }     
708         public TypeReader<DocumentWordPosition> orderedReader(ArrayInput _input) {
709             return new ShreddedReader(_input);
710         }    
711 
712         public TypeReader<DocumentWordPosition> orderedReader(ArrayInput _input, int bufferSize) {
713             return new ShreddedReader(_input, bufferSize);
714         }    
715         public OrderedWriter<DocumentWordPosition> orderedWriter(ArrayOutput _output) {
716             ShreddedWriter w = new ShreddedWriter(_output);
717             return new OrderedWriterClass(w); 
718         }                                    
719         public static class OrderedWriterClass extends OrderedWriter< DocumentWordPosition > {
720             DocumentWordPosition last = null;
721             ShreddedWriter shreddedWriter = null; 
722             
723             public OrderedWriterClass(ShreddedWriter s) {
724                 this.shreddedWriter = s;
725             }
726             
727             public void process(DocumentWordPosition object) throws IOException {
728                boolean processAll = false;
729                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
730                if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
731                if (processAll || last == null || 0 != Utility.compare(object.position, last.position)) { processAll = true; shreddedWriter.processPosition(object.position); }
732                shreddedWriter.processTuple();
733                last = object;
734             }           
735                  
736             public void close() throws IOException {
737                 shreddedWriter.close();
738             }
739             
740             public Class<DocumentWordPosition> getInputClass() {
741                 return DocumentWordPosition.class;
742             }
743         } 
744         public ReaderSource<DocumentWordPosition> orderedCombiner(Collection<TypeReader<DocumentWordPosition>> readers, boolean closeOnExit) {
745             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
746             
747             for (TypeReader<DocumentWordPosition> reader : readers) {
748                 shreddedReaders.add((ShreddedReader)reader);
749             }
750             
751             return new ShreddedCombiner(shreddedReaders, closeOnExit);
752         }                  
753         public DocumentWordPosition clone(DocumentWordPosition object) {
754             DocumentWordPosition result = new DocumentWordPosition();
755             if (object == null) return result;
756             result.document = object.document; 
757             result.word = object.word; 
758             result.position = object.position; 
759             return result;
760         }                 
761         public Class<DocumentWordPosition> getOrderedClass() {
762             return DocumentWordPosition.class;
763         }                           
764         public String[] getOrderSpec() {
765             return new String[] {"+document", "+word", "+position"};
766         }
767 
768         public static String getSpecString() {
769             return "+document +word +position";
770         }
771                            
772         public interface ShreddedProcessor extends Step {
773             public void processDocument(String document) throws IOException;
774             public void processWord(byte[] word) throws IOException;
775             public void processPosition(int position) throws IOException;
776             public void processTuple() throws IOException;
777             public void close() throws IOException;
778         }    
779         public interface ShreddedSource extends Step {
780         }                                              
781         
782         public static class ShreddedWriter implements ShreddedProcessor {
783             ArrayOutput output;
784             ShreddedBuffer buffer = new ShreddedBuffer();
785             String lastDocument;
786             byte[] lastWord;
787             int lastPosition;
788             boolean lastFlush = false;
789             
790             public ShreddedWriter(ArrayOutput output) {
791                 this.output = output;
792             }                        
793             
794             public void close() throws IOException {
795                 flush();
796             }
797             
798             public void processDocument(String document) {
799                 lastDocument = document;
800                 buffer.processDocument(document);
801             }
802             public void processWord(byte[] word) {
803                 lastWord = word;
804                 buffer.processWord(word);
805             }
806             public void processPosition(int position) {
807                 lastPosition = position;
808                 buffer.processPosition(position);
809             }
810             public final void processTuple() throws IOException {
811                 if (lastFlush) {
812                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
813                     if(buffer.words.size() == 0) buffer.processWord(lastWord);
814                     if(buffer.positions.size() == 0) buffer.processPosition(lastPosition);
815                     lastFlush = false;
816                 }
817                 buffer.processTuple();
818                 if (buffer.isFull())
819                     flush();
820             }
821             public final void flushTuples(int pauseIndex) throws IOException {
822                 
823                 while (buffer.getReadIndex() < pauseIndex) {
824                            
825                     buffer.incrementTuple();
826                 }
827             }  
828             public final void flushDocument(int pauseIndex) throws IOException {
829                 while (buffer.getReadIndex() < pauseIndex) {
830                     int nextPause = buffer.getDocumentEndIndex();
831                     int count = nextPause - buffer.getReadIndex();
832                     
833                     output.writeString(buffer.getDocument());
834                     output.writeInt(count);
835                     buffer.incrementDocument();
836                       
837                     flushWord(nextPause);
838                     assert nextPause == buffer.getReadIndex();
839                 }
840             }
841             public final void flushWord(int pauseIndex) throws IOException {
842                 while (buffer.getReadIndex() < pauseIndex) {
843                     int nextPause = buffer.getWordEndIndex();
844                     int count = nextPause - buffer.getReadIndex();
845                     
846                     output.writeBytes(buffer.getWord());
847                     output.writeInt(count);
848                     buffer.incrementWord();
849                       
850                     flushPosition(nextPause);
851                     assert nextPause == buffer.getReadIndex();
852                 }
853             }
854             public final void flushPosition(int pauseIndex) throws IOException {
855                 while (buffer.getReadIndex() < pauseIndex) {
856                     int nextPause = buffer.getPositionEndIndex();
857                     int count = nextPause - buffer.getReadIndex();
858                     
859                     output.writeInt(buffer.getPosition());
860                     output.writeInt(count);
861                     buffer.incrementPosition();
862                       
863                     flushTuples(nextPause);
864                     assert nextPause == buffer.getReadIndex();
865                 }
866             }
867             public void flush() throws IOException { 
868                 flushDocument(buffer.getWriteIndex());
869                 buffer.reset(); 
870                 lastFlush = true;
871             }                           
872         }
873         public static class ShreddedBuffer {
874             ArrayList<String> documents = new ArrayList();
875             ArrayList<byte[]> words = new ArrayList();
876             ArrayList<Integer> positions = new ArrayList();
877             ArrayList<Integer> documentTupleIdx = new ArrayList();
878             ArrayList<Integer> wordTupleIdx = new ArrayList();
879             ArrayList<Integer> positionTupleIdx = new ArrayList();
880             int documentReadIdx = 0;
881             int wordReadIdx = 0;
882             int positionReadIdx = 0;
883                             
884             int writeTupleIndex = 0;
885             int readTupleIndex = 0;
886             int batchSize;
887 
888             public ShreddedBuffer(int batchSize) {
889                 this.batchSize = batchSize;
890 
891             }                              
892 
893             public ShreddedBuffer() {    
894                 this(10000);
895             }                                                                                                                    
896             
897             public void processDocument(String document) {
898                 documents.add(document);
899                 documentTupleIdx.add(writeTupleIndex);
900             }                                      
901             public void processWord(byte[] word) {
902                 words.add(word);
903                 wordTupleIdx.add(writeTupleIndex);
904             }                                      
905             public void processPosition(int position) {
906                 positions.add(position);
907                 positionTupleIdx.add(writeTupleIndex);
908             }                                      
909             public void processTuple() {
910                 assert documents.size() > 0;
911                 assert words.size() > 0;
912                 assert positions.size() > 0;
913                 writeTupleIndex++;
914             }
915             public void resetData() {
916                 documents.clear();
917                 words.clear();
918                 positions.clear();
919                 documentTupleIdx.clear();
920                 wordTupleIdx.clear();
921                 positionTupleIdx.clear();
922                 writeTupleIndex = 0;
923             }                  
924                                  
925             public void resetRead() {
926                 readTupleIndex = 0;
927                 documentReadIdx = 0;
928                 wordReadIdx = 0;
929                 positionReadIdx = 0;
930             } 
931 
932             public void reset() {
933                 resetData();
934                 resetRead();
935             } 
936             public boolean isFull() {
937                 return writeTupleIndex >= batchSize;
938             }
939 
940             public boolean isEmpty() {
941                 return writeTupleIndex == 0;
942             }                          
943 
944             public boolean isAtEnd() {
945                 return readTupleIndex >= writeTupleIndex;
946             }           
947             public void incrementDocument() {
948                 documentReadIdx++;  
949             }                                                                                              
950 
951             public void autoIncrementDocument() {
952                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
953                     documentReadIdx++;
954             }                 
955             public void incrementWord() {
956                 wordReadIdx++;  
957             }                                                                                              
958 
959             public void autoIncrementWord() {
960                 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
961                     wordReadIdx++;
962             }                 
963             public void incrementPosition() {
964                 positionReadIdx++;  
965             }                                                                                              
966 
967             public void autoIncrementPosition() {
968                 while (readTupleIndex >= getPositionEndIndex() && readTupleIndex < writeTupleIndex)
969                     positionReadIdx++;
970             }                 
971             public void incrementTuple() {
972                 readTupleIndex++;
973             }                    
974             public int getDocumentEndIndex() {
975                 if ((documentReadIdx+1) >= documentTupleIdx.size())
976                     return writeTupleIndex;
977                 return documentTupleIdx.get(documentReadIdx+1);
978             }
979 
980             public int getWordEndIndex() {
981                 if ((wordReadIdx+1) >= wordTupleIdx.size())
982                     return writeTupleIndex;
983                 return wordTupleIdx.get(wordReadIdx+1);
984             }
985 
986             public int getPositionEndIndex() {
987                 if ((positionReadIdx+1) >= positionTupleIdx.size())
988                     return writeTupleIndex;
989                 return positionTupleIdx.get(positionReadIdx+1);
990             }
991             public int getReadIndex() {
992                 return readTupleIndex;
993             }   
994 
995             public int getWriteIndex() {
996                 return writeTupleIndex;
997             } 
998             public String getDocument() {
999                 assert readTupleIndex < writeTupleIndex;
1000                 assert documentReadIdx < documents.size();
1001                 
1002                 return documents.get(documentReadIdx);
1003             }
1004             public byte[] getWord() {
1005                 assert readTupleIndex < writeTupleIndex;
1006                 assert wordReadIdx < words.size();
1007                 
1008                 return words.get(wordReadIdx);
1009             }
1010             public int getPosition() {
1011                 assert readTupleIndex < writeTupleIndex;
1012                 assert positionReadIdx < positions.size();
1013                 
1014                 return positions.get(positionReadIdx);
1015             }
1016 
1017             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1018                 while (getReadIndex() < endIndex) {
1019                    output.processTuple();
1020                    incrementTuple();
1021                 }
1022             }                                                                           
1023             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
1024                 while (getReadIndex() < endIndex) {
1025                     output.processDocument(getDocument());
1026                     assert getDocumentEndIndex() <= endIndex;
1027                     copyUntilIndexWord(getDocumentEndIndex(), output);
1028                     incrementDocument();
1029                 }
1030             } 
1031             public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
1032                 while (getReadIndex() < endIndex) {
1033                     output.processWord(getWord());
1034                     assert getWordEndIndex() <= endIndex;
1035                     copyUntilIndexPosition(getWordEndIndex(), output);
1036                     incrementWord();
1037                 }
1038             } 
1039             public void copyUntilIndexPosition(int endIndex, ShreddedProcessor output) throws IOException {
1040                 while (getReadIndex() < endIndex) {
1041                     output.processPosition(getPosition());
1042                     assert getPositionEndIndex() <= endIndex;
1043                     copyTuples(getPositionEndIndex(), output);
1044                     incrementPosition();
1045                 }
1046             }  
1047             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1048                 while (!isAtEnd()) {
1049                     if (other != null) {   
1050                         assert !other.isAtEnd();
1051                         int c = + Utility.compare(getDocument(), other.getDocument());
1052                     
1053                         if (c > 0) {
1054                             break;   
1055                         }
1056                         
1057                         output.processDocument(getDocument());
1058                                       
1059                         if (c < 0) {
1060                             copyUntilIndexWord(getDocumentEndIndex(), output);
1061                         } else if (c == 0) {
1062                             copyUntilWord(other, output);
1063                             autoIncrementDocument();
1064                             break;
1065                         }
1066                     } else {
1067                         output.processDocument(getDocument());
1068                         copyUntilIndexWord(getDocumentEndIndex(), output);
1069                     }
1070                     incrementDocument();  
1071                     
1072                
1073                 }
1074             }
1075             public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1076                 while (!isAtEnd()) {
1077                     if (other != null) {   
1078                         assert !other.isAtEnd();
1079                         int c = + Utility.compare(getWord(), other.getWord());
1080                     
1081                         if (c > 0) {
1082                             break;   
1083                         }
1084                         
1085                         output.processWord(getWord());
1086                                       
1087                         if (c < 0) {
1088                             copyUntilIndexPosition(getWordEndIndex(), output);
1089                         } else if (c == 0) {
1090                             copyUntilPosition(other, output);
1091                             autoIncrementWord();
1092                             break;
1093                         }
1094                     } else {
1095                         output.processWord(getWord());
1096                         copyUntilIndexPosition(getWordEndIndex(), output);
1097                     }
1098                     incrementWord();  
1099                     
1100                     if (getDocumentEndIndex() <= readTupleIndex)
1101                         break;   
1102                 }
1103             }
1104             public void copyUntilPosition(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1105                 while (!isAtEnd()) {
1106                     if (other != null) {   
1107                         assert !other.isAtEnd();
1108                         int c = + Utility.compare(getPosition(), other.getPosition());
1109                     
1110                         if (c > 0) {
1111                             break;   
1112                         }
1113                         
1114                         output.processPosition(getPosition());
1115                                       
1116                         copyTuples(getPositionEndIndex(), output);
1117                     } else {
1118                         output.processPosition(getPosition());
1119                         copyTuples(getPositionEndIndex(), output);
1120                     }
1121                     incrementPosition();  
1122                     
1123                     if (getWordEndIndex() <= readTupleIndex)
1124                         break;   
1125                 }
1126             }
1127             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1128                 copyUntilDocument(other, output);
1129             }
1130             
1131         }                         
1132         public static class ShreddedCombiner implements ReaderSource<DocumentWordPosition>, ShreddedSource {   
1133             public ShreddedProcessor processor;
1134             Collection<ShreddedReader> readers;       
1135             boolean closeOnExit = false;
1136             boolean uninitialized = true;
1137             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1138             
1139             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1140                 this.readers = readers;                                                       
1141                 this.closeOnExit = closeOnExit;
1142             }
1143                                   
1144             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1145                 if (processor instanceof ShreddedProcessor) {
1146                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1147                 } else if (processor instanceof DocumentWordPosition.Processor) {
1148                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordPosition.Processor) processor));
1149                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1150                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordPosition>) processor));
1151                 } else {
1152                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1153                 }
1154             }                                
1155             
1156             public Class<DocumentWordPosition> getOutputClass() {
1157                 return DocumentWordPosition.class;
1158             }
1159             
1160             public void initialize() throws IOException {
1161                 for (ShreddedReader reader : readers) {
1162                     reader.fill();                                        
1163                     
1164                     if (!reader.getBuffer().isAtEnd())
1165                         queue.add(reader);
1166                 }   
1167 
1168                 uninitialized = false;
1169             }
1170 
1171             public void run() throws IOException {
1172                 initialize();
1173                
1174                 while (queue.size() > 0) {
1175                     ShreddedReader top = queue.poll();
1176                     ShreddedReader next = null;
1177                     ShreddedBuffer nextBuffer = null; 
1178                     
1179                     assert !top.getBuffer().isAtEnd();
1180                                                   
1181                     if (queue.size() > 0) {
1182                         next = queue.peek();
1183                         nextBuffer = next.getBuffer();
1184                         assert !nextBuffer.isAtEnd();
1185                     }
1186                     
1187                     top.getBuffer().copyUntil(nextBuffer, processor);
1188                     if (top.getBuffer().isAtEnd())
1189                         top.fill();                 
1190                         
1191                     if (!top.getBuffer().isAtEnd())
1192                         queue.add(top);
1193                 }              
1194                 
1195                 if (closeOnExit)
1196                     processor.close();
1197             }
1198 
1199             public DocumentWordPosition read() throws IOException {
1200                 if (uninitialized)
1201                     initialize();
1202 
1203                 DocumentWordPosition result = null;
1204 
1205                 while (queue.size() > 0) {
1206                     ShreddedReader top = queue.poll();
1207                     result = top.read();
1208 
1209                     if (result != null) {
1210                         if (top.getBuffer().isAtEnd())
1211                             top.fill();
1212 
1213                         queue.offer(top);
1214                         break;
1215                     } 
1216                 }
1217 
1218                 return result;
1219             }
1220         } 
1221         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordPosition>, ShreddedSource {      
1222             public ShreddedProcessor processor;
1223             ShreddedBuffer buffer;
1224             DocumentWordPosition last = new DocumentWordPosition();         
1225             long updateDocumentCount = -1;
1226             long updateWordCount = -1;
1227             long updatePositionCount = -1;
1228             long tupleCount = 0;
1229             long bufferStartCount = 0;  
1230             ArrayInput input;
1231             
1232             public ShreddedReader(ArrayInput input) {
1233                 this.input = input; 
1234                 this.buffer = new ShreddedBuffer();
1235             }                               
1236             
1237             public ShreddedReader(ArrayInput input, int bufferSize) { 
1238                 this.input = input;
1239                 this.buffer = new ShreddedBuffer(bufferSize);
1240             }
1241                  
1242             public final int compareTo(ShreddedReader other) {
1243                 ShreddedBuffer otherBuffer = other.getBuffer();
1244                 
1245                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1246                     return 0;                 
1247                 } else if (buffer.isAtEnd()) {
1248                     return -1;
1249                 } else if (otherBuffer.isAtEnd()) {
1250                     return 1;
1251                 }
1252                                    
1253                 int result = 0;
1254                 do {
1255                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
1256                     if(result != 0) break;
1257                     result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
1258                     if(result != 0) break;
1259                     result = + Utility.compare(buffer.getPosition(), otherBuffer.getPosition());
1260                     if(result != 0) break;
1261                 } while (false);                                             
1262                 
1263                 return result;
1264             }
1265             
1266             public final ShreddedBuffer getBuffer() {
1267                 return buffer;
1268             }                
1269             
1270             public final DocumentWordPosition read() throws IOException {
1271                 if (buffer.isAtEnd()) {
1272                     fill();             
1273                 
1274                     if (buffer.isAtEnd()) {
1275                         return null;
1276                     }
1277                 }
1278                       
1279                 assert !buffer.isAtEnd();
1280                 DocumentWordPosition result = new DocumentWordPosition();
1281                 
1282                 result.document = buffer.getDocument();
1283                 result.word = buffer.getWord();
1284                 result.position = buffer.getPosition();
1285                 
1286                 buffer.incrementTuple();
1287                 buffer.autoIncrementDocument();
1288                 buffer.autoIncrementWord();
1289                 buffer.autoIncrementPosition();
1290                 
1291                 return result;
1292             }           
1293             
1294             public final void fill() throws IOException {
1295                 try {   
1296                     buffer.reset();
1297                     
1298                     if (tupleCount != 0) {
1299                                                       
1300                         if(updateDocumentCount - tupleCount > 0) {
1301                             buffer.documents.add(last.document);
1302                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
1303                         }                              
1304                         if(updateWordCount - tupleCount > 0) {
1305                             buffer.words.add(last.word);
1306                             buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
1307                         }                              
1308                         if(updatePositionCount - tupleCount > 0) {
1309                             buffer.positions.add(last.position);
1310                             buffer.positionTupleIdx.add((int) (updatePositionCount - tupleCount));
1311                         }
1312                         bufferStartCount = tupleCount;
1313                     }
1314                     
1315                     while (!buffer.isFull()) {
1316                         updatePosition();
1317                         buffer.processTuple();
1318                         tupleCount++;
1319                     }
1320                 } catch(EOFException e) {}
1321             }
1322 
1323             public final void updateDocument() throws IOException {
1324                 if (updateDocumentCount > tupleCount)
1325                     return;
1326                      
1327                 last.document = input.readString();
1328                 updateDocumentCount = tupleCount + input.readInt();
1329                                       
1330                 buffer.processDocument(last.document);
1331             }
1332             public final void updateWord() throws IOException {
1333                 if (updateWordCount > tupleCount)
1334                     return;
1335                      
1336                 updateDocument();
1337                 last.word = input.readBytes();
1338                 updateWordCount = tupleCount + input.readInt();
1339                                       
1340                 buffer.processWord(last.word);
1341             }
1342             public final void updatePosition() throws IOException {
1343                 if (updatePositionCount > tupleCount)
1344                     return;
1345                      
1346                 updateWord();
1347                 last.position = input.readInt();
1348                 updatePositionCount = tupleCount + input.readInt();
1349                                       
1350                 buffer.processPosition(last.position);
1351             }
1352 
1353             public void run() throws IOException {
1354                 while (true) {
1355                     fill();
1356                     
1357                     if (buffer.isAtEnd())
1358                         break;
1359                     
1360                     buffer.copyUntil(null, processor);
1361                 }      
1362                 processor.close();
1363             }
1364             
1365             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1366                 if (processor instanceof ShreddedProcessor) {
1367                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1368                 } else if (processor instanceof DocumentWordPosition.Processor) {
1369                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordPosition.Processor) processor));
1370                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1371                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordPosition>) processor));
1372                 } else {
1373                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1374                 }
1375             }                                
1376             
1377             public Class<DocumentWordPosition> getOutputClass() {
1378                 return DocumentWordPosition.class;
1379             }                
1380         }
1381         
1382         public static class DuplicateEliminator implements ShreddedProcessor {
1383             public ShreddedProcessor processor;
1384             DocumentWordPosition last = new DocumentWordPosition();
1385             boolean documentProcess = true;
1386             boolean wordProcess = true;
1387             boolean positionProcess = true;
1388                                            
1389             public DuplicateEliminator() {}
1390             public DuplicateEliminator(ShreddedProcessor processor) {
1391                 this.processor = processor;
1392             }
1393             
1394             public void setShreddedProcessor(ShreddedProcessor processor) {
1395                 this.processor = processor;
1396             }
1397 
1398             public void processDocument(String document) throws IOException {  
1399                 if (documentProcess || Utility.compare(document, last.document) != 0) {
1400                     last.document = document;
1401                     processor.processDocument(document);
1402             resetWord();
1403                     documentProcess = false;
1404                 }
1405             }
1406             public void processWord(byte[] word) throws IOException {  
1407                 if (wordProcess || Utility.compare(word, last.word) != 0) {
1408                     last.word = word;
1409                     processor.processWord(word);
1410             resetPosition();
1411                     wordProcess = false;
1412                 }
1413             }
1414             public void processPosition(int position) throws IOException {  
1415                 if (positionProcess || Utility.compare(position, last.position) != 0) {
1416                     last.position = position;
1417                     processor.processPosition(position);
1418                     positionProcess = false;
1419                 }
1420             }  
1421             
1422             public void resetDocument() {
1423                  documentProcess = true;
1424             resetWord();
1425             }                                                
1426             public void resetWord() {
1427                  wordProcess = true;
1428             resetPosition();
1429             }                                                
1430             public void resetPosition() {
1431                  positionProcess = true;
1432             }                                                
1433                                
1434             public void processTuple() throws IOException {
1435                 processor.processTuple();
1436             } 
1437             
1438             public void close() throws IOException {
1439                 processor.close();
1440             }                    
1441         }
1442         public static class TupleUnshredder implements ShreddedProcessor {
1443             DocumentWordPosition last = new DocumentWordPosition();
1444             public org.galagosearch.tupleflow.Processor<DocumentWordPosition> processor;                               
1445             
1446             public TupleUnshredder(DocumentWordPosition.Processor processor) {
1447                 this.processor = processor;
1448             }         
1449             
1450             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordPosition> processor) {
1451                 this.processor = processor;
1452             }
1453             
1454             public DocumentWordPosition clone(DocumentWordPosition object) {
1455                 DocumentWordPosition result = new DocumentWordPosition();
1456                 if (object == null) return result;
1457                 result.document = object.document; 
1458                 result.word = object.word; 
1459                 result.position = object.position; 
1460                 return result;
1461             }                 
1462             
1463             public void processDocument(String document) throws IOException {
1464                 last.document = document;
1465             }   
1466                 
1467             public void processWord(byte[] word) throws IOException {
1468                 last.word = word;
1469             }   
1470                 
1471             public void processPosition(int position) throws IOException {
1472                 last.position = position;
1473             }   
1474                 
1475             
1476             public void processTuple() throws IOException {
1477                 processor.process(clone(last));
1478             }               
1479             
1480             public void close() throws IOException {
1481                 processor.close();
1482             }
1483         }     
1484         public static class TupleShredder implements Processor {
1485             DocumentWordPosition last = new DocumentWordPosition();
1486             public ShreddedProcessor processor;
1487             
1488             public TupleShredder(ShreddedProcessor processor) {
1489                 this.processor = processor;
1490             }                              
1491             
1492             public DocumentWordPosition clone(DocumentWordPosition object) {
1493                 DocumentWordPosition result = new DocumentWordPosition();
1494                 if (object == null) return result;
1495                 result.document = object.document; 
1496                 result.word = object.word; 
1497                 result.position = object.position; 
1498                 return result;
1499             }                 
1500             
1501             public void process(DocumentWordPosition object) throws IOException {                                                                                                                                                   
1502                 boolean processAll = false;
1503                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
1504                 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
1505                 if(last == null || Utility.compare(last.position, object.position) != 0 || processAll) { processor.processPosition(object.position); processAll = true; }
1506                 processor.processTuple();                                         
1507             }
1508                           
1509             public Class<DocumentWordPosition> getInputClass() {
1510                 return DocumentWordPosition.class;
1511             }
1512             
1513             public void close() throws IOException {
1514                 processor.close();
1515             }                     
1516         }
1517     } 
1518 }