View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class DocumentWordProbability implements Type<DocumentWordProbability> {
25      public String document;
26      public byte[] word;
27      public double probability; 
28      
29      public DocumentWordProbability() {}
30      public DocumentWordProbability(String document, byte[] word, double probability) {
31          this.document = document;
32          this.word = word;
33          this.probability = probability;
34      }  
35      
36      public String toString() {
37          try {
38              return String.format("%s,%s,%f",
39                                     document, new String(word, "UTF-8"), probability);
40          } catch(UnsupportedEncodingException e) {
41              throw new RuntimeException("Couldn't convert string to UTF-8.");
42          }
43      } 
44  
45      public Order<DocumentWordProbability> getOrder(String... spec) {
46          if (Arrays.equals(spec, new String[] { "+document", "+word" })) {
47              return new DocumentWordOrder();
48          }
49          if (Arrays.equals(spec, new String[] { "+word" })) {
50              return new WordOrder();
51          }
52          if (Arrays.equals(spec, new String[] { "+document" })) {
53              return new DocumentOrder();
54          }
55          return null;
56      } 
57        
58      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentWordProbability> {
59          public void process(DocumentWordProbability object) throws IOException;
60          public void close() throws IOException;
61      }                        
62      public interface Source extends Step {
63      }
64      public static class DocumentWordOrder implements Order<DocumentWordProbability> {
65          public int hash(DocumentWordProbability object) {
66              int h = 0;
67              h += Utility.hash(object.document);
68              h += Utility.hash(object.word);
69              return h;
70          } 
71          public Comparator<DocumentWordProbability> greaterThan() {
72              return new Comparator<DocumentWordProbability>() {
73                  public int compare(DocumentWordProbability one, DocumentWordProbability two) {
74                      int result = 0;
75                      do {
76                          result = + Utility.compare(one.document, two.document);
77                          if(result != 0) break;
78                          result = + Utility.compare(one.word, two.word);
79                          if(result != 0) break;
80                      } while (false);
81                      return -result;
82                  }
83              };
84          }     
85          public Comparator<DocumentWordProbability> lessThan() {
86              return new Comparator<DocumentWordProbability>() {
87                  public int compare(DocumentWordProbability one, DocumentWordProbability two) {
88                      int result = 0;
89                      do {
90                          result = + Utility.compare(one.document, two.document);
91                          if(result != 0) break;
92                          result = + Utility.compare(one.word, two.word);
93                          if(result != 0) break;
94                      } while (false);
95                      return result;
96                  }
97              };
98          }     
99          public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input) {
100             return new ShreddedReader(_input);
101         }    
102 
103         public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input, int bufferSize) {
104             return new ShreddedReader(_input, bufferSize);
105         }    
106         public OrderedWriter<DocumentWordProbability> orderedWriter(ArrayOutput _output) {
107             ShreddedWriter w = new ShreddedWriter(_output);
108             return new OrderedWriterClass(w); 
109         }                                    
110         public static class OrderedWriterClass extends OrderedWriter< DocumentWordProbability > {
111             DocumentWordProbability last = null;
112             ShreddedWriter shreddedWriter = null; 
113             
114             public OrderedWriterClass(ShreddedWriter s) {
115                 this.shreddedWriter = s;
116             }
117             
118             public void process(DocumentWordProbability object) throws IOException {
119                boolean processAll = false;
120                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
121                if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
122                shreddedWriter.processTuple(object.probability);
123                last = object;
124             }           
125                  
126             public void close() throws IOException {
127                 shreddedWriter.close();
128             }
129             
130             public Class<DocumentWordProbability> getInputClass() {
131                 return DocumentWordProbability.class;
132             }
133         } 
134         public ReaderSource<DocumentWordProbability> orderedCombiner(Collection<TypeReader<DocumentWordProbability>> readers, boolean closeOnExit) {
135             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
136             
137             for (TypeReader<DocumentWordProbability> reader : readers) {
138                 shreddedReaders.add((ShreddedReader)reader);
139             }
140             
141             return new ShreddedCombiner(shreddedReaders, closeOnExit);
142         }                  
143         public DocumentWordProbability clone(DocumentWordProbability object) {
144             DocumentWordProbability result = new DocumentWordProbability();
145             if (object == null) return result;
146             result.document = object.document; 
147             result.word = object.word; 
148             result.probability = object.probability; 
149             return result;
150         }                 
151         public Class<DocumentWordProbability> getOrderedClass() {
152             return DocumentWordProbability.class;
153         }                           
154         public String[] getOrderSpec() {
155             return new String[] {"+document", "+word"};
156         }
157 
158         public static String getSpecString() {
159             return "+document +word";
160         }
161                            
162         public interface ShreddedProcessor extends Step {
163             public void processDocument(String document) throws IOException;
164             public void processWord(byte[] word) throws IOException;
165             public void processTuple(double probability) throws IOException;
166             public void close() throws IOException;
167         }    
168         public interface ShreddedSource extends Step {
169         }                                              
170         
171         public static class ShreddedWriter implements ShreddedProcessor {
172             ArrayOutput output;
173             ShreddedBuffer buffer = new ShreddedBuffer();
174             String lastDocument;
175             byte[] lastWord;
176             boolean lastFlush = false;
177             
178             public ShreddedWriter(ArrayOutput output) {
179                 this.output = output;
180             }                        
181             
182             public void close() throws IOException {
183                 flush();
184             }
185             
186             public void processDocument(String document) {
187                 lastDocument = document;
188                 buffer.processDocument(document);
189             }
190             public void processWord(byte[] word) {
191                 lastWord = word;
192                 buffer.processWord(word);
193             }
194             public final void processTuple(double probability) throws IOException {
195                 if (lastFlush) {
196                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
197                     if(buffer.words.size() == 0) buffer.processWord(lastWord);
198                     lastFlush = false;
199                 }
200                 buffer.processTuple(probability);
201                 if (buffer.isFull())
202                     flush();
203             }
204             public final void flushTuples(int pauseIndex) throws IOException {
205                 
206                 while (buffer.getReadIndex() < pauseIndex) {
207                            
208                     output.writeDouble(buffer.getProbability());
209                     buffer.incrementTuple();
210                 }
211             }  
212             public final void flushDocument(int pauseIndex) throws IOException {
213                 while (buffer.getReadIndex() < pauseIndex) {
214                     int nextPause = buffer.getDocumentEndIndex();
215                     int count = nextPause - buffer.getReadIndex();
216                     
217                     output.writeString(buffer.getDocument());
218                     output.writeInt(count);
219                     buffer.incrementDocument();
220                       
221                     flushWord(nextPause);
222                     assert nextPause == buffer.getReadIndex();
223                 }
224             }
225             public final void flushWord(int pauseIndex) throws IOException {
226                 while (buffer.getReadIndex() < pauseIndex) {
227                     int nextPause = buffer.getWordEndIndex();
228                     int count = nextPause - buffer.getReadIndex();
229                     
230                     output.writeBytes(buffer.getWord());
231                     output.writeInt(count);
232                     buffer.incrementWord();
233                       
234                     flushTuples(nextPause);
235                     assert nextPause == buffer.getReadIndex();
236                 }
237             }
238             public void flush() throws IOException { 
239                 flushDocument(buffer.getWriteIndex());
240                 buffer.reset(); 
241                 lastFlush = true;
242             }                           
243         }
244         public static class ShreddedBuffer {
245             ArrayList<String> documents = new ArrayList();
246             ArrayList<byte[]> words = new ArrayList();
247             ArrayList<Integer> documentTupleIdx = new ArrayList();
248             ArrayList<Integer> wordTupleIdx = new ArrayList();
249             int documentReadIdx = 0;
250             int wordReadIdx = 0;
251                             
252             double[] probabilitys;
253             int writeTupleIndex = 0;
254             int readTupleIndex = 0;
255             int batchSize;
256 
257             public ShreddedBuffer(int batchSize) {
258                 this.batchSize = batchSize;
259 
260                 probabilitys = new double[batchSize];
261             }                              
262 
263             public ShreddedBuffer() {    
264                 this(10000);
265             }                                                                                                                    
266             
267             public void processDocument(String document) {
268                 documents.add(document);
269                 documentTupleIdx.add(writeTupleIndex);
270             }                                      
271             public void processWord(byte[] word) {
272                 words.add(word);
273                 wordTupleIdx.add(writeTupleIndex);
274             }                                      
275             public void processTuple(double probability) {
276                 assert documents.size() > 0;
277                 assert words.size() > 0;
278                 probabilitys[writeTupleIndex] = probability;
279                 writeTupleIndex++;
280             }
281             public void resetData() {
282                 documents.clear();
283                 words.clear();
284                 documentTupleIdx.clear();
285                 wordTupleIdx.clear();
286                 writeTupleIndex = 0;
287             }                  
288                                  
289             public void resetRead() {
290                 readTupleIndex = 0;
291                 documentReadIdx = 0;
292                 wordReadIdx = 0;
293             } 
294 
295             public void reset() {
296                 resetData();
297                 resetRead();
298             } 
299             public boolean isFull() {
300                 return writeTupleIndex >= batchSize;
301             }
302 
303             public boolean isEmpty() {
304                 return writeTupleIndex == 0;
305             }                          
306 
307             public boolean isAtEnd() {
308                 return readTupleIndex >= writeTupleIndex;
309             }           
310             public void incrementDocument() {
311                 documentReadIdx++;  
312             }                                                                                              
313 
314             public void autoIncrementDocument() {
315                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
316                     documentReadIdx++;
317             }                 
318             public void incrementWord() {
319                 wordReadIdx++;  
320             }                                                                                              
321 
322             public void autoIncrementWord() {
323                 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
324                     wordReadIdx++;
325             }                 
326             public void incrementTuple() {
327                 readTupleIndex++;
328             }                    
329             public int getDocumentEndIndex() {
330                 if ((documentReadIdx+1) >= documentTupleIdx.size())
331                     return writeTupleIndex;
332                 return documentTupleIdx.get(documentReadIdx+1);
333             }
334 
335             public int getWordEndIndex() {
336                 if ((wordReadIdx+1) >= wordTupleIdx.size())
337                     return writeTupleIndex;
338                 return wordTupleIdx.get(wordReadIdx+1);
339             }
340             public int getReadIndex() {
341                 return readTupleIndex;
342             }   
343 
344             public int getWriteIndex() {
345                 return writeTupleIndex;
346             } 
347             public String getDocument() {
348                 assert readTupleIndex < writeTupleIndex;
349                 assert documentReadIdx < documents.size();
350                 
351                 return documents.get(documentReadIdx);
352             }
353             public byte[] getWord() {
354                 assert readTupleIndex < writeTupleIndex;
355                 assert wordReadIdx < words.size();
356                 
357                 return words.get(wordReadIdx);
358             }
359             public double getProbability() {
360                 assert readTupleIndex < writeTupleIndex;
361                 return probabilitys[readTupleIndex];
362             }                                         
363             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
364                 while (getReadIndex() < endIndex) {
365                    output.processTuple(getProbability());
366                    incrementTuple();
367                 }
368             }                                                                           
369             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
370                 while (getReadIndex() < endIndex) {
371                     output.processDocument(getDocument());
372                     assert getDocumentEndIndex() <= endIndex;
373                     copyUntilIndexWord(getDocumentEndIndex(), output);
374                     incrementDocument();
375                 }
376             } 
377             public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
378                 while (getReadIndex() < endIndex) {
379                     output.processWord(getWord());
380                     assert getWordEndIndex() <= endIndex;
381                     copyTuples(getWordEndIndex(), output);
382                     incrementWord();
383                 }
384             }  
385             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
386                 while (!isAtEnd()) {
387                     if (other != null) {   
388                         assert !other.isAtEnd();
389                         int c = + Utility.compare(getDocument(), other.getDocument());
390                     
391                         if (c > 0) {
392                             break;   
393                         }
394                         
395                         output.processDocument(getDocument());
396                                       
397                         if (c < 0) {
398                             copyUntilIndexWord(getDocumentEndIndex(), output);
399                         } else if (c == 0) {
400                             copyUntilWord(other, output);
401                             autoIncrementDocument();
402                             break;
403                         }
404                     } else {
405                         output.processDocument(getDocument());
406                         copyUntilIndexWord(getDocumentEndIndex(), output);
407                     }
408                     incrementDocument();  
409                     
410                
411                 }
412             }
413             public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
414                 while (!isAtEnd()) {
415                     if (other != null) {   
416                         assert !other.isAtEnd();
417                         int c = + Utility.compare(getWord(), other.getWord());
418                     
419                         if (c > 0) {
420                             break;   
421                         }
422                         
423                         output.processWord(getWord());
424                                       
425                         copyTuples(getWordEndIndex(), output);
426                     } else {
427                         output.processWord(getWord());
428                         copyTuples(getWordEndIndex(), output);
429                     }
430                     incrementWord();  
431                     
432                     if (getDocumentEndIndex() <= readTupleIndex)
433                         break;   
434                 }
435             }
436             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
437                 copyUntilDocument(other, output);
438             }
439             
440         }                         
441         public static class ShreddedCombiner implements ReaderSource<DocumentWordProbability>, ShreddedSource {   
442             public ShreddedProcessor processor;
443             Collection<ShreddedReader> readers;       
444             boolean closeOnExit = false;
445             boolean uninitialized = true;
446             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
447             
448             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
449                 this.readers = readers;                                                       
450                 this.closeOnExit = closeOnExit;
451             }
452                                   
453             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
454                 if (processor instanceof ShreddedProcessor) {
455                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
456                 } else if (processor instanceof DocumentWordProbability.Processor) {
457                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
458                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
459                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
460                 } else {
461                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
462                 }
463             }                                
464             
465             public Class<DocumentWordProbability> getOutputClass() {
466                 return DocumentWordProbability.class;
467             }
468             
469             public void initialize() throws IOException {
470                 for (ShreddedReader reader : readers) {
471                     reader.fill();                                        
472                     
473                     if (!reader.getBuffer().isAtEnd())
474                         queue.add(reader);
475                 }   
476 
477                 uninitialized = false;
478             }
479 
480             public void run() throws IOException {
481                 initialize();
482                
483                 while (queue.size() > 0) {
484                     ShreddedReader top = queue.poll();
485                     ShreddedReader next = null;
486                     ShreddedBuffer nextBuffer = null; 
487                     
488                     assert !top.getBuffer().isAtEnd();
489                                                   
490                     if (queue.size() > 0) {
491                         next = queue.peek();
492                         nextBuffer = next.getBuffer();
493                         assert !nextBuffer.isAtEnd();
494                     }
495                     
496                     top.getBuffer().copyUntil(nextBuffer, processor);
497                     if (top.getBuffer().isAtEnd())
498                         top.fill();                 
499                         
500                     if (!top.getBuffer().isAtEnd())
501                         queue.add(top);
502                 }              
503                 
504                 if (closeOnExit)
505                     processor.close();
506             }
507 
508             public DocumentWordProbability read() throws IOException {
509                 if (uninitialized)
510                     initialize();
511 
512                 DocumentWordProbability result = null;
513 
514                 while (queue.size() > 0) {
515                     ShreddedReader top = queue.poll();
516                     result = top.read();
517 
518                     if (result != null) {
519                         if (top.getBuffer().isAtEnd())
520                             top.fill();
521 
522                         queue.offer(top);
523                         break;
524                     } 
525                 }
526 
527                 return result;
528             }
529         } 
530         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordProbability>, ShreddedSource {      
531             public ShreddedProcessor processor;
532             ShreddedBuffer buffer;
533             DocumentWordProbability last = new DocumentWordProbability();         
534             long updateDocumentCount = -1;
535             long updateWordCount = -1;
536             long tupleCount = 0;
537             long bufferStartCount = 0;  
538             ArrayInput input;
539             
540             public ShreddedReader(ArrayInput input) {
541                 this.input = input; 
542                 this.buffer = new ShreddedBuffer();
543             }                               
544             
545             public ShreddedReader(ArrayInput input, int bufferSize) { 
546                 this.input = input;
547                 this.buffer = new ShreddedBuffer(bufferSize);
548             }
549                  
550             public final int compareTo(ShreddedReader other) {
551                 ShreddedBuffer otherBuffer = other.getBuffer();
552                 
553                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
554                     return 0;                 
555                 } else if (buffer.isAtEnd()) {
556                     return -1;
557                 } else if (otherBuffer.isAtEnd()) {
558                     return 1;
559                 }
560                                    
561                 int result = 0;
562                 do {
563                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
564                     if(result != 0) break;
565                     result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
566                     if(result != 0) break;
567                 } while (false);                                             
568                 
569                 return result;
570             }
571             
572             public final ShreddedBuffer getBuffer() {
573                 return buffer;
574             }                
575             
576             public final DocumentWordProbability read() throws IOException {
577                 if (buffer.isAtEnd()) {
578                     fill();             
579                 
580                     if (buffer.isAtEnd()) {
581                         return null;
582                     }
583                 }
584                       
585                 assert !buffer.isAtEnd();
586                 DocumentWordProbability result = new DocumentWordProbability();
587                 
588                 result.document = buffer.getDocument();
589                 result.word = buffer.getWord();
590                 result.probability = buffer.getProbability();
591                 
592                 buffer.incrementTuple();
593                 buffer.autoIncrementDocument();
594                 buffer.autoIncrementWord();
595                 
596                 return result;
597             }           
598             
599             public final void fill() throws IOException {
600                 try {   
601                     buffer.reset();
602                     
603                     if (tupleCount != 0) {
604                                                       
605                         if(updateDocumentCount - tupleCount > 0) {
606                             buffer.documents.add(last.document);
607                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
608                         }                              
609                         if(updateWordCount - tupleCount > 0) {
610                             buffer.words.add(last.word);
611                             buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
612                         }
613                         bufferStartCount = tupleCount;
614                     }
615                     
616                     while (!buffer.isFull()) {
617                         updateWord();
618                         buffer.processTuple(input.readDouble());
619                         tupleCount++;
620                     }
621                 } catch(EOFException e) {}
622             }
623 
624             public final void updateDocument() throws IOException {
625                 if (updateDocumentCount > tupleCount)
626                     return;
627                      
628                 last.document = input.readString();
629                 updateDocumentCount = tupleCount + input.readInt();
630                                       
631                 buffer.processDocument(last.document);
632             }
633             public final void updateWord() throws IOException {
634                 if (updateWordCount > tupleCount)
635                     return;
636                      
637                 updateDocument();
638                 last.word = input.readBytes();
639                 updateWordCount = tupleCount + input.readInt();
640                                       
641                 buffer.processWord(last.word);
642             }
643 
644             public void run() throws IOException {
645                 while (true) {
646                     fill();
647                     
648                     if (buffer.isAtEnd())
649                         break;
650                     
651                     buffer.copyUntil(null, processor);
652                 }      
653                 processor.close();
654             }
655             
656             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
657                 if (processor instanceof ShreddedProcessor) {
658                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
659                 } else if (processor instanceof DocumentWordProbability.Processor) {
660                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
661                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
662                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
663                 } else {
664                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
665                 }
666             }                                
667             
668             public Class<DocumentWordProbability> getOutputClass() {
669                 return DocumentWordProbability.class;
670             }                
671         }
672         
673         public static class DuplicateEliminator implements ShreddedProcessor {
674             public ShreddedProcessor processor;
675             DocumentWordProbability last = new DocumentWordProbability();
676             boolean documentProcess = true;
677             boolean wordProcess = true;
678                                            
679             public DuplicateEliminator() {}
680             public DuplicateEliminator(ShreddedProcessor processor) {
681                 this.processor = processor;
682             }
683             
684             public void setShreddedProcessor(ShreddedProcessor processor) {
685                 this.processor = processor;
686             }
687 
688             public void processDocument(String document) throws IOException {  
689                 if (documentProcess || Utility.compare(document, last.document) != 0) {
690                     last.document = document;
691                     processor.processDocument(document);
692             resetWord();
693                     documentProcess = false;
694                 }
695             }
696             public void processWord(byte[] word) throws IOException {  
697                 if (wordProcess || Utility.compare(word, last.word) != 0) {
698                     last.word = word;
699                     processor.processWord(word);
700                     wordProcess = false;
701                 }
702             }  
703             
704             public void resetDocument() {
705                  documentProcess = true;
706             resetWord();
707             }                                                
708             public void resetWord() {
709                  wordProcess = true;
710             }                                                
711                                
712             public void processTuple(double probability) throws IOException {
713                 processor.processTuple(probability);
714             } 
715             
716             public void close() throws IOException {
717                 processor.close();
718             }                    
719         }
720         public static class TupleUnshredder implements ShreddedProcessor {
721             DocumentWordProbability last = new DocumentWordProbability();
722             public org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor;                               
723             
724             public TupleUnshredder(DocumentWordProbability.Processor processor) {
725                 this.processor = processor;
726             }         
727             
728             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor) {
729                 this.processor = processor;
730             }
731             
732             public DocumentWordProbability clone(DocumentWordProbability object) {
733                 DocumentWordProbability result = new DocumentWordProbability();
734                 if (object == null) return result;
735                 result.document = object.document; 
736                 result.word = object.word; 
737                 result.probability = object.probability; 
738                 return result;
739             }                 
740             
741             public void processDocument(String document) throws IOException {
742                 last.document = document;
743             }   
744                 
745             public void processWord(byte[] word) throws IOException {
746                 last.word = word;
747             }   
748                 
749             
750             public void processTuple(double probability) throws IOException {
751                 last.probability = probability;
752                 processor.process(clone(last));
753             }               
754             
755             public void close() throws IOException {
756                 processor.close();
757             }
758         }     
759         public static class TupleShredder implements Processor {
760             DocumentWordProbability last = new DocumentWordProbability();
761             public ShreddedProcessor processor;
762             
763             public TupleShredder(ShreddedProcessor processor) {
764                 this.processor = processor;
765             }                              
766             
767             public DocumentWordProbability clone(DocumentWordProbability object) {
768                 DocumentWordProbability result = new DocumentWordProbability();
769                 if (object == null) return result;
770                 result.document = object.document; 
771                 result.word = object.word; 
772                 result.probability = object.probability; 
773                 return result;
774             }                 
775             
776             public void process(DocumentWordProbability object) throws IOException {                                                                                                                                                   
777                 boolean processAll = false;
778                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
779                 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
780                 processor.processTuple(object.probability);                                         
781             }
782                           
783             public Class<DocumentWordProbability> getInputClass() {
784                 return DocumentWordProbability.class;
785             }
786             
787             public void close() throws IOException {
788                 processor.close();
789             }                     
790         }
791     } 
792     public static class WordOrder implements Order<DocumentWordProbability> {
793         public int hash(DocumentWordProbability object) {
794             int h = 0;
795             h += Utility.hash(object.word);
796             return h;
797         } 
798         public Comparator<DocumentWordProbability> greaterThan() {
799             return new Comparator<DocumentWordProbability>() {
800                 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
801                     int result = 0;
802                     do {
803                         result = + Utility.compare(one.word, two.word);
804                         if(result != 0) break;
805                     } while (false);
806                     return -result;
807                 }
808             };
809         }     
810         public Comparator<DocumentWordProbability> lessThan() {
811             return new Comparator<DocumentWordProbability>() {
812                 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
813                     int result = 0;
814                     do {
815                         result = + Utility.compare(one.word, two.word);
816                         if(result != 0) break;
817                     } while (false);
818                     return result;
819                 }
820             };
821         }     
822         public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input) {
823             return new ShreddedReader(_input);
824         }    
825 
826         public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input, int bufferSize) {
827             return new ShreddedReader(_input, bufferSize);
828         }    
829         public OrderedWriter<DocumentWordProbability> orderedWriter(ArrayOutput _output) {
830             ShreddedWriter w = new ShreddedWriter(_output);
831             return new OrderedWriterClass(w); 
832         }                                    
833         public static class OrderedWriterClass extends OrderedWriter< DocumentWordProbability > {
834             DocumentWordProbability last = null;
835             ShreddedWriter shreddedWriter = null; 
836             
837             public OrderedWriterClass(ShreddedWriter s) {
838                 this.shreddedWriter = s;
839             }
840             
841             public void process(DocumentWordProbability object) throws IOException {
842                boolean processAll = false;
843                if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
844                shreddedWriter.processTuple(object.document, object.probability);
845                last = object;
846             }           
847                  
848             public void close() throws IOException {
849                 shreddedWriter.close();
850             }
851             
852             public Class<DocumentWordProbability> getInputClass() {
853                 return DocumentWordProbability.class;
854             }
855         } 
856         public ReaderSource<DocumentWordProbability> orderedCombiner(Collection<TypeReader<DocumentWordProbability>> readers, boolean closeOnExit) {
857             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
858             
859             for (TypeReader<DocumentWordProbability> reader : readers) {
860                 shreddedReaders.add((ShreddedReader)reader);
861             }
862             
863             return new ShreddedCombiner(shreddedReaders, closeOnExit);
864         }                  
865         public DocumentWordProbability clone(DocumentWordProbability object) {
866             DocumentWordProbability result = new DocumentWordProbability();
867             if (object == null) return result;
868             result.document = object.document; 
869             result.word = object.word; 
870             result.probability = object.probability; 
871             return result;
872         }                 
873         public Class<DocumentWordProbability> getOrderedClass() {
874             return DocumentWordProbability.class;
875         }                           
876         public String[] getOrderSpec() {
877             return new String[] {"+word"};
878         }
879 
880         public static String getSpecString() {
881             return "+word";
882         }
883                            
884         public interface ShreddedProcessor extends Step {
885             public void processWord(byte[] word) throws IOException;
886             public void processTuple(String document, double probability) throws IOException;
887             public void close() throws IOException;
888         }    
889         public interface ShreddedSource extends Step {
890         }                                              
891         
892         public static class ShreddedWriter implements ShreddedProcessor {
893             ArrayOutput output;
894             ShreddedBuffer buffer = new ShreddedBuffer();
895             byte[] lastWord;
896             boolean lastFlush = false;
897             
898             public ShreddedWriter(ArrayOutput output) {
899                 this.output = output;
900             }                        
901             
902             public void close() throws IOException {
903                 flush();
904             }
905             
906             public void processWord(byte[] word) {
907                 lastWord = word;
908                 buffer.processWord(word);
909             }
910             public final void processTuple(String document, double probability) throws IOException {
911                 if (lastFlush) {
912                     if(buffer.words.size() == 0) buffer.processWord(lastWord);
913                     lastFlush = false;
914                 }
915                 buffer.processTuple(document, probability);
916                 if (buffer.isFull())
917                     flush();
918             }
919             public final void flushTuples(int pauseIndex) throws IOException {
920                 
921                 while (buffer.getReadIndex() < pauseIndex) {
922                            
923                     output.writeString(buffer.getDocument());
924                     output.writeDouble(buffer.getProbability());
925                     buffer.incrementTuple();
926                 }
927             }  
928             public final void flushWord(int pauseIndex) throws IOException {
929                 while (buffer.getReadIndex() < pauseIndex) {
930                     int nextPause = buffer.getWordEndIndex();
931                     int count = nextPause - buffer.getReadIndex();
932                     
933                     output.writeBytes(buffer.getWord());
934                     output.writeInt(count);
935                     buffer.incrementWord();
936                       
937                     flushTuples(nextPause);
938                     assert nextPause == buffer.getReadIndex();
939                 }
940             }
941             public void flush() throws IOException { 
942                 flushWord(buffer.getWriteIndex());
943                 buffer.reset(); 
944                 lastFlush = true;
945             }                           
946         }
947         public static class ShreddedBuffer {
948             ArrayList<byte[]> words = new ArrayList();
949             ArrayList<Integer> wordTupleIdx = new ArrayList();
950             int wordReadIdx = 0;
951                             
952             String[] documents;
953             double[] probabilitys;
954             int writeTupleIndex = 0;
955             int readTupleIndex = 0;
956             int batchSize;
957 
958             public ShreddedBuffer(int batchSize) {
959                 this.batchSize = batchSize;
960 
961                 documents = new String[batchSize];
962                 probabilitys = new double[batchSize];
963             }                              
964 
965             public ShreddedBuffer() {    
966                 this(10000);
967             }                                                                                                                    
968             
969             public void processWord(byte[] word) {
970                 words.add(word);
971                 wordTupleIdx.add(writeTupleIndex);
972             }                                      
973             public void processTuple(String document, double probability) {
974                 assert words.size() > 0;
975                 documents[writeTupleIndex] = document;
976                 probabilitys[writeTupleIndex] = probability;
977                 writeTupleIndex++;
978             }
979             public void resetData() {
980                 words.clear();
981                 wordTupleIdx.clear();
982                 writeTupleIndex = 0;
983             }                  
984                                  
985             public void resetRead() {
986                 readTupleIndex = 0;
987                 wordReadIdx = 0;
988             } 
989 
990             public void reset() {
991                 resetData();
992                 resetRead();
993             } 
994             public boolean isFull() {
995                 return writeTupleIndex >= batchSize;
996             }
997 
998             public boolean isEmpty() {
999                 return writeTupleIndex == 0;
1000             }                          
1001 
1002             public boolean isAtEnd() {
1003                 return readTupleIndex >= writeTupleIndex;
1004             }           
1005             public void incrementWord() {
1006                 wordReadIdx++;  
1007             }                                                                                              
1008 
1009             public void autoIncrementWord() {
1010                 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
1011                     wordReadIdx++;
1012             }                 
1013             public void incrementTuple() {
1014                 readTupleIndex++;
1015             }                    
1016             public int getWordEndIndex() {
1017                 if ((wordReadIdx+1) >= wordTupleIdx.size())
1018                     return writeTupleIndex;
1019                 return wordTupleIdx.get(wordReadIdx+1);
1020             }
1021             public int getReadIndex() {
1022                 return readTupleIndex;
1023             }   
1024 
1025             public int getWriteIndex() {
1026                 return writeTupleIndex;
1027             } 
1028             public byte[] getWord() {
1029                 assert readTupleIndex < writeTupleIndex;
1030                 assert wordReadIdx < words.size();
1031                 
1032                 return words.get(wordReadIdx);
1033             }
1034             public String getDocument() {
1035                 assert readTupleIndex < writeTupleIndex;
1036                 return documents[readTupleIndex];
1037             }                                         
1038             public double getProbability() {
1039                 assert readTupleIndex < writeTupleIndex;
1040                 return probabilitys[readTupleIndex];
1041             }                                         
1042             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1043                 while (getReadIndex() < endIndex) {
1044                    output.processTuple(getDocument(), getProbability());
1045                    incrementTuple();
1046                 }
1047             }                                                                           
1048             public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
1049                 while (getReadIndex() < endIndex) {
1050                     output.processWord(getWord());
1051                     assert getWordEndIndex() <= endIndex;
1052                     copyTuples(getWordEndIndex(), output);
1053                     incrementWord();
1054                 }
1055             }  
1056             public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1057                 while (!isAtEnd()) {
1058                     if (other != null) {   
1059                         assert !other.isAtEnd();
1060                         int c = + Utility.compare(getWord(), other.getWord());
1061                     
1062                         if (c > 0) {
1063                             break;   
1064                         }
1065                         
1066                         output.processWord(getWord());
1067                                       
1068                         copyTuples(getWordEndIndex(), output);
1069                     } else {
1070                         output.processWord(getWord());
1071                         copyTuples(getWordEndIndex(), output);
1072                     }
1073                     incrementWord();  
1074                     
1075                
1076                 }
1077             }
1078             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1079                 copyUntilWord(other, output);
1080             }
1081             
1082         }                         
1083         public static class ShreddedCombiner implements ReaderSource<DocumentWordProbability>, ShreddedSource {   
1084             public ShreddedProcessor processor;
1085             Collection<ShreddedReader> readers;       
1086             boolean closeOnExit = false;
1087             boolean uninitialized = true;
1088             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1089             
1090             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1091                 this.readers = readers;                                                       
1092                 this.closeOnExit = closeOnExit;
1093             }
1094                                   
1095             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1096                 if (processor instanceof ShreddedProcessor) {
1097                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1098                 } else if (processor instanceof DocumentWordProbability.Processor) {
1099                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
1100                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1101                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
1102                 } else {
1103                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1104                 }
1105             }                                
1106             
1107             public Class<DocumentWordProbability> getOutputClass() {
1108                 return DocumentWordProbability.class;
1109             }
1110             
1111             public void initialize() throws IOException {
1112                 for (ShreddedReader reader : readers) {
1113                     reader.fill();                                        
1114                     
1115                     if (!reader.getBuffer().isAtEnd())
1116                         queue.add(reader);
1117                 }   
1118 
1119                 uninitialized = false;
1120             }
1121 
1122             public void run() throws IOException {
1123                 initialize();
1124                
1125                 while (queue.size() > 0) {
1126                     ShreddedReader top = queue.poll();
1127                     ShreddedReader next = null;
1128                     ShreddedBuffer nextBuffer = null; 
1129                     
1130                     assert !top.getBuffer().isAtEnd();
1131                                                   
1132                     if (queue.size() > 0) {
1133                         next = queue.peek();
1134                         nextBuffer = next.getBuffer();
1135                         assert !nextBuffer.isAtEnd();
1136                     }
1137                     
1138                     top.getBuffer().copyUntil(nextBuffer, processor);
1139                     if (top.getBuffer().isAtEnd())
1140                         top.fill();                 
1141                         
1142                     if (!top.getBuffer().isAtEnd())
1143                         queue.add(top);
1144                 }              
1145                 
1146                 if (closeOnExit)
1147                     processor.close();
1148             }
1149 
1150             public DocumentWordProbability read() throws IOException {
1151                 if (uninitialized)
1152                     initialize();
1153 
1154                 DocumentWordProbability result = null;
1155 
1156                 while (queue.size() > 0) {
1157                     ShreddedReader top = queue.poll();
1158                     result = top.read();
1159 
1160                     if (result != null) {
1161                         if (top.getBuffer().isAtEnd())
1162                             top.fill();
1163 
1164                         queue.offer(top);
1165                         break;
1166                     } 
1167                 }
1168 
1169                 return result;
1170             }
1171         } 
1172         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordProbability>, ShreddedSource {      
1173             public ShreddedProcessor processor;
1174             ShreddedBuffer buffer;
1175             DocumentWordProbability last = new DocumentWordProbability();         
1176             long updateWordCount = -1;
1177             long tupleCount = 0;
1178             long bufferStartCount = 0;  
1179             ArrayInput input;
1180             
1181             public ShreddedReader(ArrayInput input) {
1182                 this.input = input; 
1183                 this.buffer = new ShreddedBuffer();
1184             }                               
1185             
1186             public ShreddedReader(ArrayInput input, int bufferSize) { 
1187                 this.input = input;
1188                 this.buffer = new ShreddedBuffer(bufferSize);
1189             }
1190                  
1191             public final int compareTo(ShreddedReader other) {
1192                 ShreddedBuffer otherBuffer = other.getBuffer();
1193                 
1194                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1195                     return 0;                 
1196                 } else if (buffer.isAtEnd()) {
1197                     return -1;
1198                 } else if (otherBuffer.isAtEnd()) {
1199                     return 1;
1200                 }
1201                                    
1202                 int result = 0;
1203                 do {
1204                     result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
1205                     if(result != 0) break;
1206                 } while (false);                                             
1207                 
1208                 return result;
1209             }
1210             
1211             public final ShreddedBuffer getBuffer() {
1212                 return buffer;
1213             }                
1214             
1215             public final DocumentWordProbability read() throws IOException {
1216                 if (buffer.isAtEnd()) {
1217                     fill();             
1218                 
1219                     if (buffer.isAtEnd()) {
1220                         return null;
1221                     }
1222                 }
1223                       
1224                 assert !buffer.isAtEnd();
1225                 DocumentWordProbability result = new DocumentWordProbability();
1226                 
1227                 result.word = buffer.getWord();
1228                 result.document = buffer.getDocument();
1229                 result.probability = buffer.getProbability();
1230                 
1231                 buffer.incrementTuple();
1232                 buffer.autoIncrementWord();
1233                 
1234                 return result;
1235             }           
1236             
1237             public final void fill() throws IOException {
1238                 try {   
1239                     buffer.reset();
1240                     
1241                     if (tupleCount != 0) {
1242                                                       
1243                         if(updateWordCount - tupleCount > 0) {
1244                             buffer.words.add(last.word);
1245                             buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
1246                         }
1247                         bufferStartCount = tupleCount;
1248                     }
1249                     
1250                     while (!buffer.isFull()) {
1251                         updateWord();
1252                         buffer.processTuple(input.readString(), input.readDouble());
1253                         tupleCount++;
1254                     }
1255                 } catch(EOFException e) {}
1256             }
1257 
1258             public final void updateWord() throws IOException {
1259                 if (updateWordCount > tupleCount)
1260                     return;
1261                      
1262                 last.word = input.readBytes();
1263                 updateWordCount = tupleCount + input.readInt();
1264                                       
1265                 buffer.processWord(last.word);
1266             }
1267 
1268             public void run() throws IOException {
1269                 while (true) {
1270                     fill();
1271                     
1272                     if (buffer.isAtEnd())
1273                         break;
1274                     
1275                     buffer.copyUntil(null, processor);
1276                 }      
1277                 processor.close();
1278             }
1279             
1280             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1281                 if (processor instanceof ShreddedProcessor) {
1282                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1283                 } else if (processor instanceof DocumentWordProbability.Processor) {
1284                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
1285                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1286                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
1287                 } else {
1288                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1289                 }
1290             }                                
1291             
1292             public Class<DocumentWordProbability> getOutputClass() {
1293                 return DocumentWordProbability.class;
1294             }                
1295         }
1296         
1297         public static class DuplicateEliminator implements ShreddedProcessor {
1298             public ShreddedProcessor processor;
1299             DocumentWordProbability last = new DocumentWordProbability();
1300             boolean wordProcess = true;
1301                                            
1302             public DuplicateEliminator() {}
1303             public DuplicateEliminator(ShreddedProcessor processor) {
1304                 this.processor = processor;
1305             }
1306             
1307             public void setShreddedProcessor(ShreddedProcessor processor) {
1308                 this.processor = processor;
1309             }
1310 
1311             public void processWord(byte[] word) throws IOException {  
1312                 if (wordProcess || Utility.compare(word, last.word) != 0) {
1313                     last.word = word;
1314                     processor.processWord(word);
1315                     wordProcess = false;
1316                 }
1317             }  
1318             
1319             public void resetWord() {
1320                  wordProcess = true;
1321             }                                                
1322                                
1323             public void processTuple(String document, double probability) throws IOException {
1324                 processor.processTuple(document, probability);
1325             } 
1326             
1327             public void close() throws IOException {
1328                 processor.close();
1329             }                    
1330         }
1331         public static class TupleUnshredder implements ShreddedProcessor {
1332             DocumentWordProbability last = new DocumentWordProbability();
1333             public org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor;                               
1334             
1335             public TupleUnshredder(DocumentWordProbability.Processor processor) {
1336                 this.processor = processor;
1337             }         
1338             
1339             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor) {
1340                 this.processor = processor;
1341             }
1342             
1343             public DocumentWordProbability clone(DocumentWordProbability object) {
1344                 DocumentWordProbability result = new DocumentWordProbability();
1345                 if (object == null) return result;
1346                 result.document = object.document; 
1347                 result.word = object.word; 
1348                 result.probability = object.probability; 
1349                 return result;
1350             }                 
1351             
1352             public void processWord(byte[] word) throws IOException {
1353                 last.word = word;
1354             }   
1355                 
1356             
1357             public void processTuple(String document, double probability) throws IOException {
1358                 last.document = document;
1359                 last.probability = probability;
1360                 processor.process(clone(last));
1361             }               
1362             
1363             public void close() throws IOException {
1364                 processor.close();
1365             }
1366         }     
1367         public static class TupleShredder implements Processor {
1368             DocumentWordProbability last = new DocumentWordProbability();
1369             public ShreddedProcessor processor;
1370             
1371             public TupleShredder(ShreddedProcessor processor) {
1372                 this.processor = processor;
1373             }                              
1374             
1375             public DocumentWordProbability clone(DocumentWordProbability object) {
1376                 DocumentWordProbability result = new DocumentWordProbability();
1377                 if (object == null) return result;
1378                 result.document = object.document; 
1379                 result.word = object.word; 
1380                 result.probability = object.probability; 
1381                 return result;
1382             }                 
1383             
1384             public void process(DocumentWordProbability object) throws IOException {                                                                                                                                                   
1385                 boolean processAll = false;
1386                 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
1387                 processor.processTuple(object.document, object.probability);                                         
1388             }
1389                           
1390             public Class<DocumentWordProbability> getInputClass() {
1391                 return DocumentWordProbability.class;
1392             }
1393             
1394             public void close() throws IOException {
1395                 processor.close();
1396             }                     
1397         }
1398     } 
1399     public static class DocumentOrder implements Order<DocumentWordProbability> {
1400         public int hash(DocumentWordProbability object) {
1401             int h = 0;
1402             h += Utility.hash(object.document);
1403             return h;
1404         } 
1405         public Comparator<DocumentWordProbability> greaterThan() {
1406             return new Comparator<DocumentWordProbability>() {
1407                 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
1408                     int result = 0;
1409                     do {
1410                         result = + Utility.compare(one.document, two.document);
1411                         if(result != 0) break;
1412                     } while (false);
1413                     return -result;
1414                 }
1415             };
1416         }     
1417         public Comparator<DocumentWordProbability> lessThan() {
1418             return new Comparator<DocumentWordProbability>() {
1419                 public int compare(DocumentWordProbability one, DocumentWordProbability two) {
1420                     int result = 0;
1421                     do {
1422                         result = + Utility.compare(one.document, two.document);
1423                         if(result != 0) break;
1424                     } while (false);
1425                     return result;
1426                 }
1427             };
1428         }     
1429         public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input) {
1430             return new ShreddedReader(_input);
1431         }    
1432 
1433         public TypeReader<DocumentWordProbability> orderedReader(ArrayInput _input, int bufferSize) {
1434             return new ShreddedReader(_input, bufferSize);
1435         }    
1436         public OrderedWriter<DocumentWordProbability> orderedWriter(ArrayOutput _output) {
1437             ShreddedWriter w = new ShreddedWriter(_output);
1438             return new OrderedWriterClass(w); 
1439         }                                    
1440         public static class OrderedWriterClass extends OrderedWriter< DocumentWordProbability > {
1441             DocumentWordProbability last = null;
1442             ShreddedWriter shreddedWriter = null; 
1443             
1444             public OrderedWriterClass(ShreddedWriter s) {
1445                 this.shreddedWriter = s;
1446             }
1447             
1448             public void process(DocumentWordProbability object) throws IOException {
1449                boolean processAll = false;
1450                if (processAll || last == null || 0 != Utility.compare(object.document, last.document)) { processAll = true; shreddedWriter.processDocument(object.document); }
1451                shreddedWriter.processTuple(object.word, object.probability);
1452                last = object;
1453             }           
1454                  
1455             public void close() throws IOException {
1456                 shreddedWriter.close();
1457             }
1458             
1459             public Class<DocumentWordProbability> getInputClass() {
1460                 return DocumentWordProbability.class;
1461             }
1462         } 
1463         public ReaderSource<DocumentWordProbability> orderedCombiner(Collection<TypeReader<DocumentWordProbability>> readers, boolean closeOnExit) {
1464             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
1465             
1466             for (TypeReader<DocumentWordProbability> reader : readers) {
1467                 shreddedReaders.add((ShreddedReader)reader);
1468             }
1469             
1470             return new ShreddedCombiner(shreddedReaders, closeOnExit);
1471         }                  
1472         public DocumentWordProbability clone(DocumentWordProbability object) {
1473             DocumentWordProbability result = new DocumentWordProbability();
1474             if (object == null) return result;
1475             result.document = object.document; 
1476             result.word = object.word; 
1477             result.probability = object.probability; 
1478             return result;
1479         }                 
1480         public Class<DocumentWordProbability> getOrderedClass() {
1481             return DocumentWordProbability.class;
1482         }                           
1483         public String[] getOrderSpec() {
1484             return new String[] {"+document"};
1485         }
1486 
1487         public static String getSpecString() {
1488             return "+document";
1489         }
1490                            
1491         public interface ShreddedProcessor extends Step {
1492             public void processDocument(String document) throws IOException;
1493             public void processTuple(byte[] word, double probability) throws IOException;
1494             public void close() throws IOException;
1495         }    
1496         public interface ShreddedSource extends Step {
1497         }                                              
1498         
1499         public static class ShreddedWriter implements ShreddedProcessor {
1500             ArrayOutput output;
1501             ShreddedBuffer buffer = new ShreddedBuffer();
1502             String lastDocument;
1503             boolean lastFlush = false;
1504             
1505             public ShreddedWriter(ArrayOutput output) {
1506                 this.output = output;
1507             }                        
1508             
1509             public void close() throws IOException {
1510                 flush();
1511             }
1512             
1513             public void processDocument(String document) {
1514                 lastDocument = document;
1515                 buffer.processDocument(document);
1516             }
1517             public final void processTuple(byte[] word, double probability) throws IOException {
1518                 if (lastFlush) {
1519                     if(buffer.documents.size() == 0) buffer.processDocument(lastDocument);
1520                     lastFlush = false;
1521                 }
1522                 buffer.processTuple(word, probability);
1523                 if (buffer.isFull())
1524                     flush();
1525             }
1526             public final void flushTuples(int pauseIndex) throws IOException {
1527                 
1528                 while (buffer.getReadIndex() < pauseIndex) {
1529                            
1530                     output.writeBytes(buffer.getWord());
1531                     output.writeDouble(buffer.getProbability());
1532                     buffer.incrementTuple();
1533                 }
1534             }  
1535             public final void flushDocument(int pauseIndex) throws IOException {
1536                 while (buffer.getReadIndex() < pauseIndex) {
1537                     int nextPause = buffer.getDocumentEndIndex();
1538                     int count = nextPause - buffer.getReadIndex();
1539                     
1540                     output.writeString(buffer.getDocument());
1541                     output.writeInt(count);
1542                     buffer.incrementDocument();
1543                       
1544                     flushTuples(nextPause);
1545                     assert nextPause == buffer.getReadIndex();
1546                 }
1547             }
1548             public void flush() throws IOException { 
1549                 flushDocument(buffer.getWriteIndex());
1550                 buffer.reset(); 
1551                 lastFlush = true;
1552             }                           
1553         }
1554         public static class ShreddedBuffer {
1555             ArrayList<String> documents = new ArrayList();
1556             ArrayList<Integer> documentTupleIdx = new ArrayList();
1557             int documentReadIdx = 0;
1558                             
1559             byte[][] words;
1560             double[] probabilitys;
1561             int writeTupleIndex = 0;
1562             int readTupleIndex = 0;
1563             int batchSize;
1564 
1565             public ShreddedBuffer(int batchSize) {
1566                 this.batchSize = batchSize;
1567 
1568                 words = new byte[batchSize][];
1569                 probabilitys = new double[batchSize];
1570             }                              
1571 
1572             public ShreddedBuffer() {    
1573                 this(10000);
1574             }                                                                                                                    
1575             
1576             public void processDocument(String document) {
1577                 documents.add(document);
1578                 documentTupleIdx.add(writeTupleIndex);
1579             }                                      
1580             public void processTuple(byte[] word, double probability) {
1581                 assert documents.size() > 0;
1582                 words[writeTupleIndex] = word;
1583                 probabilitys[writeTupleIndex] = probability;
1584                 writeTupleIndex++;
1585             }
1586             public void resetData() {
1587                 documents.clear();
1588                 documentTupleIdx.clear();
1589                 writeTupleIndex = 0;
1590             }                  
1591                                  
1592             public void resetRead() {
1593                 readTupleIndex = 0;
1594                 documentReadIdx = 0;
1595             } 
1596 
1597             public void reset() {
1598                 resetData();
1599                 resetRead();
1600             } 
1601             public boolean isFull() {
1602                 return writeTupleIndex >= batchSize;
1603             }
1604 
1605             public boolean isEmpty() {
1606                 return writeTupleIndex == 0;
1607             }                          
1608 
1609             public boolean isAtEnd() {
1610                 return readTupleIndex >= writeTupleIndex;
1611             }           
1612             public void incrementDocument() {
1613                 documentReadIdx++;  
1614             }                                                                                              
1615 
1616             public void autoIncrementDocument() {
1617                 while (readTupleIndex >= getDocumentEndIndex() && readTupleIndex < writeTupleIndex)
1618                     documentReadIdx++;
1619             }                 
1620             public void incrementTuple() {
1621                 readTupleIndex++;
1622             }                    
1623             public int getDocumentEndIndex() {
1624                 if ((documentReadIdx+1) >= documentTupleIdx.size())
1625                     return writeTupleIndex;
1626                 return documentTupleIdx.get(documentReadIdx+1);
1627             }
1628             public int getReadIndex() {
1629                 return readTupleIndex;
1630             }   
1631 
1632             public int getWriteIndex() {
1633                 return writeTupleIndex;
1634             } 
1635             public String getDocument() {
1636                 assert readTupleIndex < writeTupleIndex;
1637                 assert documentReadIdx < documents.size();
1638                 
1639                 return documents.get(documentReadIdx);
1640             }
1641             public byte[] getWord() {
1642                 assert readTupleIndex < writeTupleIndex;
1643                 return words[readTupleIndex];
1644             }                                         
1645             public double getProbability() {
1646                 assert readTupleIndex < writeTupleIndex;
1647                 return probabilitys[readTupleIndex];
1648             }                                         
1649             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
1650                 while (getReadIndex() < endIndex) {
1651                    output.processTuple(getWord(), getProbability());
1652                    incrementTuple();
1653                 }
1654             }                                                                           
1655             public void copyUntilIndexDocument(int endIndex, ShreddedProcessor output) throws IOException {
1656                 while (getReadIndex() < endIndex) {
1657                     output.processDocument(getDocument());
1658                     assert getDocumentEndIndex() <= endIndex;
1659                     copyTuples(getDocumentEndIndex(), output);
1660                     incrementDocument();
1661                 }
1662             }  
1663             public void copyUntilDocument(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1664                 while (!isAtEnd()) {
1665                     if (other != null) {   
1666                         assert !other.isAtEnd();
1667                         int c = + Utility.compare(getDocument(), other.getDocument());
1668                     
1669                         if (c > 0) {
1670                             break;   
1671                         }
1672                         
1673                         output.processDocument(getDocument());
1674                                       
1675                         copyTuples(getDocumentEndIndex(), output);
1676                     } else {
1677                         output.processDocument(getDocument());
1678                         copyTuples(getDocumentEndIndex(), output);
1679                     }
1680                     incrementDocument();  
1681                     
1682                
1683                 }
1684             }
1685             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
1686                 copyUntilDocument(other, output);
1687             }
1688             
1689         }                         
1690         public static class ShreddedCombiner implements ReaderSource<DocumentWordProbability>, ShreddedSource {   
1691             public ShreddedProcessor processor;
1692             Collection<ShreddedReader> readers;       
1693             boolean closeOnExit = false;
1694             boolean uninitialized = true;
1695             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
1696             
1697             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
1698                 this.readers = readers;                                                       
1699                 this.closeOnExit = closeOnExit;
1700             }
1701                                   
1702             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1703                 if (processor instanceof ShreddedProcessor) {
1704                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1705                 } else if (processor instanceof DocumentWordProbability.Processor) {
1706                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
1707                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1708                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
1709                 } else {
1710                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1711                 }
1712             }                                
1713             
1714             public Class<DocumentWordProbability> getOutputClass() {
1715                 return DocumentWordProbability.class;
1716             }
1717             
1718             public void initialize() throws IOException {
1719                 for (ShreddedReader reader : readers) {
1720                     reader.fill();                                        
1721                     
1722                     if (!reader.getBuffer().isAtEnd())
1723                         queue.add(reader);
1724                 }   
1725 
1726                 uninitialized = false;
1727             }
1728 
1729             public void run() throws IOException {
1730                 initialize();
1731                
1732                 while (queue.size() > 0) {
1733                     ShreddedReader top = queue.poll();
1734                     ShreddedReader next = null;
1735                     ShreddedBuffer nextBuffer = null; 
1736                     
1737                     assert !top.getBuffer().isAtEnd();
1738                                                   
1739                     if (queue.size() > 0) {
1740                         next = queue.peek();
1741                         nextBuffer = next.getBuffer();
1742                         assert !nextBuffer.isAtEnd();
1743                     }
1744                     
1745                     top.getBuffer().copyUntil(nextBuffer, processor);
1746                     if (top.getBuffer().isAtEnd())
1747                         top.fill();                 
1748                         
1749                     if (!top.getBuffer().isAtEnd())
1750                         queue.add(top);
1751                 }              
1752                 
1753                 if (closeOnExit)
1754                     processor.close();
1755             }
1756 
1757             public DocumentWordProbability read() throws IOException {
1758                 if (uninitialized)
1759                     initialize();
1760 
1761                 DocumentWordProbability result = null;
1762 
1763                 while (queue.size() > 0) {
1764                     ShreddedReader top = queue.poll();
1765                     result = top.read();
1766 
1767                     if (result != null) {
1768                         if (top.getBuffer().isAtEnd())
1769                             top.fill();
1770 
1771                         queue.offer(top);
1772                         break;
1773                     } 
1774                 }
1775 
1776                 return result;
1777             }
1778         } 
1779         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentWordProbability>, ShreddedSource {      
1780             public ShreddedProcessor processor;
1781             ShreddedBuffer buffer;
1782             DocumentWordProbability last = new DocumentWordProbability();         
1783             long updateDocumentCount = -1;
1784             long tupleCount = 0;
1785             long bufferStartCount = 0;  
1786             ArrayInput input;
1787             
1788             public ShreddedReader(ArrayInput input) {
1789                 this.input = input; 
1790                 this.buffer = new ShreddedBuffer();
1791             }                               
1792             
1793             public ShreddedReader(ArrayInput input, int bufferSize) { 
1794                 this.input = input;
1795                 this.buffer = new ShreddedBuffer(bufferSize);
1796             }
1797                  
1798             public final int compareTo(ShreddedReader other) {
1799                 ShreddedBuffer otherBuffer = other.getBuffer();
1800                 
1801                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1802                     return 0;                 
1803                 } else if (buffer.isAtEnd()) {
1804                     return -1;
1805                 } else if (otherBuffer.isAtEnd()) {
1806                     return 1;
1807                 }
1808                                    
1809                 int result = 0;
1810                 do {
1811                     result = + Utility.compare(buffer.getDocument(), otherBuffer.getDocument());
1812                     if(result != 0) break;
1813                 } while (false);                                             
1814                 
1815                 return result;
1816             }
1817             
1818             public final ShreddedBuffer getBuffer() {
1819                 return buffer;
1820             }                
1821             
1822             public final DocumentWordProbability read() throws IOException {
1823                 if (buffer.isAtEnd()) {
1824                     fill();             
1825                 
1826                     if (buffer.isAtEnd()) {
1827                         return null;
1828                     }
1829                 }
1830                       
1831                 assert !buffer.isAtEnd();
1832                 DocumentWordProbability result = new DocumentWordProbability();
1833                 
1834                 result.document = buffer.getDocument();
1835                 result.word = buffer.getWord();
1836                 result.probability = buffer.getProbability();
1837                 
1838                 buffer.incrementTuple();
1839                 buffer.autoIncrementDocument();
1840                 
1841                 return result;
1842             }           
1843             
1844             public final void fill() throws IOException {
1845                 try {   
1846                     buffer.reset();
1847                     
1848                     if (tupleCount != 0) {
1849                                                       
1850                         if(updateDocumentCount - tupleCount > 0) {
1851                             buffer.documents.add(last.document);
1852                             buffer.documentTupleIdx.add((int) (updateDocumentCount - tupleCount));
1853                         }
1854                         bufferStartCount = tupleCount;
1855                     }
1856                     
1857                     while (!buffer.isFull()) {
1858                         updateDocument();
1859                         buffer.processTuple(input.readBytes(), input.readDouble());
1860                         tupleCount++;
1861                     }
1862                 } catch(EOFException e) {}
1863             }
1864 
1865             public final void updateDocument() throws IOException {
1866                 if (updateDocumentCount > tupleCount)
1867                     return;
1868                      
1869                 last.document = input.readString();
1870                 updateDocumentCount = tupleCount + input.readInt();
1871                                       
1872                 buffer.processDocument(last.document);
1873             }
1874 
1875             public void run() throws IOException {
1876                 while (true) {
1877                     fill();
1878                     
1879                     if (buffer.isAtEnd())
1880                         break;
1881                     
1882                     buffer.copyUntil(null, processor);
1883                 }      
1884                 processor.close();
1885             }
1886             
1887             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1888                 if (processor instanceof ShreddedProcessor) {
1889                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1890                 } else if (processor instanceof DocumentWordProbability.Processor) {
1891                     this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentWordProbability.Processor) processor));
1892                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1893                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentWordProbability>) processor));
1894                 } else {
1895                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1896                 }
1897             }                                
1898             
1899             public Class<DocumentWordProbability> getOutputClass() {
1900                 return DocumentWordProbability.class;
1901             }                
1902         }
1903         
1904         public static class DuplicateEliminator implements ShreddedProcessor {
1905             public ShreddedProcessor processor;
1906             DocumentWordProbability last = new DocumentWordProbability();
1907             boolean documentProcess = true;
1908                                            
1909             public DuplicateEliminator() {}
1910             public DuplicateEliminator(ShreddedProcessor processor) {
1911                 this.processor = processor;
1912             }
1913             
1914             public void setShreddedProcessor(ShreddedProcessor processor) {
1915                 this.processor = processor;
1916             }
1917 
1918             public void processDocument(String document) throws IOException {  
1919                 if (documentProcess || Utility.compare(document, last.document) != 0) {
1920                     last.document = document;
1921                     processor.processDocument(document);
1922                     documentProcess = false;
1923                 }
1924             }  
1925             
1926             public void resetDocument() {
1927                  documentProcess = true;
1928             }                                                
1929                                
1930             public void processTuple(byte[] word, double probability) throws IOException {
1931                 processor.processTuple(word, probability);
1932             } 
1933             
1934             public void close() throws IOException {
1935                 processor.close();
1936             }                    
1937         }
1938         public static class TupleUnshredder implements ShreddedProcessor {
1939             DocumentWordProbability last = new DocumentWordProbability();
1940             public org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor;                               
1941             
1942             public TupleUnshredder(DocumentWordProbability.Processor processor) {
1943                 this.processor = processor;
1944             }         
1945             
1946             public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentWordProbability> processor) {
1947                 this.processor = processor;
1948             }
1949             
1950             public DocumentWordProbability clone(DocumentWordProbability object) {
1951                 DocumentWordProbability result = new DocumentWordProbability();
1952                 if (object == null) return result;
1953                 result.document = object.document; 
1954                 result.word = object.word; 
1955                 result.probability = object.probability; 
1956                 return result;
1957             }                 
1958             
1959             public void processDocument(String document) throws IOException {
1960                 last.document = document;
1961             }   
1962                 
1963             
1964             public void processTuple(byte[] word, double probability) throws IOException {
1965                 last.word = word;
1966                 last.probability = probability;
1967                 processor.process(clone(last));
1968             }               
1969             
1970             public void close() throws IOException {
1971                 processor.close();
1972             }
1973         }     
1974         public static class TupleShredder implements Processor {
1975             DocumentWordProbability last = new DocumentWordProbability();
1976             public ShreddedProcessor processor;
1977             
1978             public TupleShredder(ShreddedProcessor processor) {
1979                 this.processor = processor;
1980             }                              
1981             
1982             public DocumentWordProbability clone(DocumentWordProbability object) {
1983                 DocumentWordProbability result = new DocumentWordProbability();
1984                 if (object == null) return result;
1985                 result.document = object.document; 
1986                 result.word = object.word; 
1987                 result.probability = object.probability; 
1988                 return result;
1989             }                 
1990             
1991             public void process(DocumentWordProbability object) throws IOException {                                                                                                                                                   
1992                 boolean processAll = false;
1993                 if(last == null || Utility.compare(last.document, object.document) != 0 || processAll) { processor.processDocument(object.document); processAll = true; }
1994                 processor.processTuple(object.word, object.probability);                                         
1995             }
1996                           
1997             public Class<DocumentWordProbability> getInputClass() {
1998                 return DocumentWordProbability.class;
1999             }
2000             
2001             public void close() throws IOException {
2002                 processor.close();
2003             }                     
2004         }
2005     } 
2006 }