View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class WordCount implements Type<WordCount> {
25      public String word;
26      public long count;
27      public long documents; 
28      
29      public WordCount() {}
30      public WordCount(String word, long count, long documents) {
31          this.word = word;
32          this.count = count;
33          this.documents = documents;
34      }  
35      
36      public String toString() {
37              return String.format("%s,%d,%d",
38                                     word, count, documents);
39      } 
40  
41      public Order<WordCount> getOrder(String... spec) {
42          if (Arrays.equals(spec, new String[] { "+word" })) {
43              return new WordOrder();
44          }
45          return null;
46      } 
47        
48      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<WordCount> {
49          public void process(WordCount object) throws IOException;
50          public void close() throws IOException;
51      }                        
52      public interface Source extends Step {
53      }
54      public static class WordOrder implements Order<WordCount> {
55          public int hash(WordCount object) {
56              int h = 0;
57              h += Utility.hash(object.word);
58              return h;
59          } 
60          public Comparator<WordCount> greaterThan() {
61              return new Comparator<WordCount>() {
62                  public int compare(WordCount one, WordCount two) {
63                      int result = 0;
64                      do {
65                          result = + Utility.compare(one.word, two.word);
66                          if(result != 0) break;
67                      } while (false);
68                      return -result;
69                  }
70              };
71          }     
72          public Comparator<WordCount> lessThan() {
73              return new Comparator<WordCount>() {
74                  public int compare(WordCount one, WordCount two) {
75                      int result = 0;
76                      do {
77                          result = + Utility.compare(one.word, two.word);
78                          if(result != 0) break;
79                      } while (false);
80                      return result;
81                  }
82              };
83          }     
84          public TypeReader<WordCount> orderedReader(ArrayInput _input) {
85              return new ShreddedReader(_input);
86          }    
87  
88          public TypeReader<WordCount> orderedReader(ArrayInput _input, int bufferSize) {
89              return new ShreddedReader(_input, bufferSize);
90          }    
91          public OrderedWriter<WordCount> orderedWriter(ArrayOutput _output) {
92              ShreddedWriter w = new ShreddedWriter(_output);
93              return new OrderedWriterClass(w); 
94          }                                    
95          public static class OrderedWriterClass extends OrderedWriter< WordCount > {
96              WordCount last = null;
97              ShreddedWriter shreddedWriter = null; 
98              
99              public OrderedWriterClass(ShreddedWriter s) {
100                 this.shreddedWriter = s;
101             }
102             
103             public void process(WordCount object) throws IOException {
104                boolean processAll = false;
105                if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
106                shreddedWriter.processTuple(object.count, object.documents);
107                last = object;
108             }           
109                  
110             public void close() throws IOException {
111                 shreddedWriter.close();
112             }
113             
114             public Class<WordCount> getInputClass() {
115                 return WordCount.class;
116             }
117         } 
118         public ReaderSource<WordCount> orderedCombiner(Collection<TypeReader<WordCount>> readers, boolean closeOnExit) {
119             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
120             
121             for (TypeReader<WordCount> reader : readers) {
122                 shreddedReaders.add((ShreddedReader)reader);
123             }
124             
125             return new ShreddedCombiner(shreddedReaders, closeOnExit);
126         }                  
127         public WordCount clone(WordCount object) {
128             WordCount result = new WordCount();
129             if (object == null) return result;
130             result.word = object.word; 
131             result.count = object.count; 
132             result.documents = object.documents; 
133             return result;
134         }                 
135         public Class<WordCount> getOrderedClass() {
136             return WordCount.class;
137         }                           
138         public String[] getOrderSpec() {
139             return new String[] {"+word"};
140         }
141 
142         public static String getSpecString() {
143             return "+word";
144         }
145                            
146         public interface ShreddedProcessor extends Step {
147             public void processWord(String word) throws IOException;
148             public void processTuple(long count, long documents) throws IOException;
149             public void close() throws IOException;
150         }    
151         public interface ShreddedSource extends Step {
152         }                                              
153         
154         public static class ShreddedWriter implements ShreddedProcessor {
155             ArrayOutput output;
156             ShreddedBuffer buffer = new ShreddedBuffer();
157             String lastWord;
158             boolean lastFlush = false;
159             
160             public ShreddedWriter(ArrayOutput output) {
161                 this.output = output;
162             }                        
163             
164             public void close() throws IOException {
165                 flush();
166             }
167             
168             public void processWord(String word) {
169                 lastWord = word;
170                 buffer.processWord(word);
171             }
172             public final void processTuple(long count, long documents) throws IOException {
173                 if (lastFlush) {
174                     if(buffer.words.size() == 0) buffer.processWord(lastWord);
175                     lastFlush = false;
176                 }
177                 buffer.processTuple(count, documents);
178                 if (buffer.isFull())
179                     flush();
180             }
181             public final void flushTuples(int pauseIndex) throws IOException {
182                 
183                 while (buffer.getReadIndex() < pauseIndex) {
184                            
185                     output.writeLong(buffer.getCount());
186                     output.writeLong(buffer.getDocuments());
187                     buffer.incrementTuple();
188                 }
189             }  
190             public final void flushWord(int pauseIndex) throws IOException {
191                 while (buffer.getReadIndex() < pauseIndex) {
192                     int nextPause = buffer.getWordEndIndex();
193                     int count = nextPause - buffer.getReadIndex();
194                     
195                     output.writeString(buffer.getWord());
196                     output.writeInt(count);
197                     buffer.incrementWord();
198                       
199                     flushTuples(nextPause);
200                     assert nextPause == buffer.getReadIndex();
201                 }
202             }
203             public void flush() throws IOException { 
204                 flushWord(buffer.getWriteIndex());
205                 buffer.reset(); 
206                 lastFlush = true;
207             }                           
208         }
209         public static class ShreddedBuffer {
210             ArrayList<String> words = new ArrayList();
211             ArrayList<Integer> wordTupleIdx = new ArrayList();
212             int wordReadIdx = 0;
213                             
214             long[] counts;
215             long[] documentss;
216             int writeTupleIndex = 0;
217             int readTupleIndex = 0;
218             int batchSize;
219 
220             public ShreddedBuffer(int batchSize) {
221                 this.batchSize = batchSize;
222 
223                 counts = new long[batchSize];
224                 documentss = new long[batchSize];
225             }                              
226 
227             public ShreddedBuffer() {    
228                 this(10000);
229             }                                                                                                                    
230             
231             public void processWord(String word) {
232                 words.add(word);
233                 wordTupleIdx.add(writeTupleIndex);
234             }                                      
235             public void processTuple(long count, long documents) {
236                 assert words.size() > 0;
237                 counts[writeTupleIndex] = count;
238                 documentss[writeTupleIndex] = documents;
239                 writeTupleIndex++;
240             }
241             public void resetData() {
242                 words.clear();
243                 wordTupleIdx.clear();
244                 writeTupleIndex = 0;
245             }                  
246                                  
247             public void resetRead() {
248                 readTupleIndex = 0;
249                 wordReadIdx = 0;
250             } 
251 
252             public void reset() {
253                 resetData();
254                 resetRead();
255             } 
256             public boolean isFull() {
257                 return writeTupleIndex >= batchSize;
258             }
259 
260             public boolean isEmpty() {
261                 return writeTupleIndex == 0;
262             }                          
263 
264             public boolean isAtEnd() {
265                 return readTupleIndex >= writeTupleIndex;
266             }           
267             public void incrementWord() {
268                 wordReadIdx++;  
269             }                                                                                              
270 
271             public void autoIncrementWord() {
272                 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
273                     wordReadIdx++;
274             }                 
275             public void incrementTuple() {
276                 readTupleIndex++;
277             }                    
278             public int getWordEndIndex() {
279                 if ((wordReadIdx+1) >= wordTupleIdx.size())
280                     return writeTupleIndex;
281                 return wordTupleIdx.get(wordReadIdx+1);
282             }
283             public int getReadIndex() {
284                 return readTupleIndex;
285             }   
286 
287             public int getWriteIndex() {
288                 return writeTupleIndex;
289             } 
290             public String getWord() {
291                 assert readTupleIndex < writeTupleIndex;
292                 assert wordReadIdx < words.size();
293                 
294                 return words.get(wordReadIdx);
295             }
296             public long getCount() {
297                 assert readTupleIndex < writeTupleIndex;
298                 return counts[readTupleIndex];
299             }                                         
300             public long getDocuments() {
301                 assert readTupleIndex < writeTupleIndex;
302                 return documentss[readTupleIndex];
303             }                                         
304             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
305                 while (getReadIndex() < endIndex) {
306                    output.processTuple(getCount(), getDocuments());
307                    incrementTuple();
308                 }
309             }                                                                           
310             public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
311                 while (getReadIndex() < endIndex) {
312                     output.processWord(getWord());
313                     assert getWordEndIndex() <= endIndex;
314                     copyTuples(getWordEndIndex(), output);
315                     incrementWord();
316                 }
317             }  
318             public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
319                 while (!isAtEnd()) {
320                     if (other != null) {   
321                         assert !other.isAtEnd();
322                         int c = + Utility.compare(getWord(), other.getWord());
323                     
324                         if (c > 0) {
325                             break;   
326                         }
327                         
328                         output.processWord(getWord());
329                                       
330                         copyTuples(getWordEndIndex(), output);
331                     } else {
332                         output.processWord(getWord());
333                         copyTuples(getWordEndIndex(), output);
334                     }
335                     incrementWord();  
336                     
337                
338                 }
339             }
340             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
341                 copyUntilWord(other, output);
342             }
343             
344         }                         
345         public static class ShreddedCombiner implements ReaderSource<WordCount>, ShreddedSource {   
346             public ShreddedProcessor processor;
347             Collection<ShreddedReader> readers;       
348             boolean closeOnExit = false;
349             boolean uninitialized = true;
350             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
351             
352             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
353                 this.readers = readers;                                                       
354                 this.closeOnExit = closeOnExit;
355             }
356                                   
357             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
358                 if (processor instanceof ShreddedProcessor) {
359                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
360                 } else if (processor instanceof WordCount.Processor) {
361                     this.processor = new DuplicateEliminator(new TupleUnshredder((WordCount.Processor) processor));
362                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
363                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<WordCount>) processor));
364                 } else {
365                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
366                 }
367             }                                
368             
369             public Class<WordCount> getOutputClass() {
370                 return WordCount.class;
371             }
372             
373             public void initialize() throws IOException {
374                 for (ShreddedReader reader : readers) {
375                     reader.fill();                                        
376                     
377                     if (!reader.getBuffer().isAtEnd())
378                         queue.add(reader);
379                 }   
380 
381                 uninitialized = false;
382             }
383 
384             public void run() throws IOException {
385                 initialize();
386                
387                 while (queue.size() > 0) {
388                     ShreddedReader top = queue.poll();
389                     ShreddedReader next = null;
390                     ShreddedBuffer nextBuffer = null; 
391                     
392                     assert !top.getBuffer().isAtEnd();
393                                                   
394                     if (queue.size() > 0) {
395                         next = queue.peek();
396                         nextBuffer = next.getBuffer();
397                         assert !nextBuffer.isAtEnd();
398                     }
399                     
400                     top.getBuffer().copyUntil(nextBuffer, processor);
401                     if (top.getBuffer().isAtEnd())
402                         top.fill();                 
403                         
404                     if (!top.getBuffer().isAtEnd())
405                         queue.add(top);
406                 }              
407                 
408                 if (closeOnExit)
409                     processor.close();
410             }
411 
412             public WordCount read() throws IOException {
413                 if (uninitialized)
414                     initialize();
415 
416                 WordCount result = null;
417 
418                 while (queue.size() > 0) {
419                     ShreddedReader top = queue.poll();
420                     result = top.read();
421 
422                     if (result != null) {
423                         if (top.getBuffer().isAtEnd())
424                             top.fill();
425 
426                         queue.offer(top);
427                         break;
428                     } 
429                 }
430 
431                 return result;
432             }
433         } 
434         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<WordCount>, ShreddedSource {      
435             public ShreddedProcessor processor;
436             ShreddedBuffer buffer;
437             WordCount last = new WordCount();         
438             long updateWordCount = -1;
439             long tupleCount = 0;
440             long bufferStartCount = 0;  
441             ArrayInput input;
442             
443             public ShreddedReader(ArrayInput input) {
444                 this.input = input; 
445                 this.buffer = new ShreddedBuffer();
446             }                               
447             
448             public ShreddedReader(ArrayInput input, int bufferSize) { 
449                 this.input = input;
450                 this.buffer = new ShreddedBuffer(bufferSize);
451             }
452                  
453             public final int compareTo(ShreddedReader other) {
454                 ShreddedBuffer otherBuffer = other.getBuffer();
455                 
456                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
457                     return 0;                 
458                 } else if (buffer.isAtEnd()) {
459                     return -1;
460                 } else if (otherBuffer.isAtEnd()) {
461                     return 1;
462                 }
463                                    
464                 int result = 0;
465                 do {
466                     result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
467                     if(result != 0) break;
468                 } while (false);                                             
469                 
470                 return result;
471             }
472             
473             public final ShreddedBuffer getBuffer() {
474                 return buffer;
475             }                
476             
477             public final WordCount read() throws IOException {
478                 if (buffer.isAtEnd()) {
479                     fill();             
480                 
481                     if (buffer.isAtEnd()) {
482                         return null;
483                     }
484                 }
485                       
486                 assert !buffer.isAtEnd();
487                 WordCount result = new WordCount();
488                 
489                 result.word = buffer.getWord();
490                 result.count = buffer.getCount();
491                 result.documents = buffer.getDocuments();
492                 
493                 buffer.incrementTuple();
494                 buffer.autoIncrementWord();
495                 
496                 return result;
497             }           
498             
499             public final void fill() throws IOException {
500                 try {   
501                     buffer.reset();
502                     
503                     if (tupleCount != 0) {
504                                                       
505                         if(updateWordCount - tupleCount > 0) {
506                             buffer.words.add(last.word);
507                             buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
508                         }
509                         bufferStartCount = tupleCount;
510                     }
511                     
512                     while (!buffer.isFull()) {
513                         updateWord();
514                         buffer.processTuple(input.readLong(), input.readLong());
515                         tupleCount++;
516                     }
517                 } catch(EOFException e) {}
518             }
519 
520             public final void updateWord() throws IOException {
521                 if (updateWordCount > tupleCount)
522                     return;
523                      
524                 last.word = input.readString();
525                 updateWordCount = tupleCount + input.readInt();
526                                       
527                 buffer.processWord(last.word);
528             }
529 
530             public void run() throws IOException {
531                 while (true) {
532                     fill();
533                     
534                     if (buffer.isAtEnd())
535                         break;
536                     
537                     buffer.copyUntil(null, processor);
538                 }      
539                 processor.close();
540             }
541             
542             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
543                 if (processor instanceof ShreddedProcessor) {
544                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
545                 } else if (processor instanceof WordCount.Processor) {
546                     this.processor = new DuplicateEliminator(new TupleUnshredder((WordCount.Processor) processor));
547                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
548                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<WordCount>) processor));
549                 } else {
550                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
551                 }
552             }                                
553             
554             public Class<WordCount> getOutputClass() {
555                 return WordCount.class;
556             }                
557         }
558         
559         public static class DuplicateEliminator implements ShreddedProcessor {
560             public ShreddedProcessor processor;
561             WordCount last = new WordCount();
562             boolean wordProcess = true;
563                                            
564             public DuplicateEliminator() {}
565             public DuplicateEliminator(ShreddedProcessor processor) {
566                 this.processor = processor;
567             }
568             
569             public void setShreddedProcessor(ShreddedProcessor processor) {
570                 this.processor = processor;
571             }
572 
573             public void processWord(String word) throws IOException {  
574                 if (wordProcess || Utility.compare(word, last.word) != 0) {
575                     last.word = word;
576                     processor.processWord(word);
577                     wordProcess = false;
578                 }
579             }  
580             
581             public void resetWord() {
582                  wordProcess = true;
583             }                                                
584                                
585             public void processTuple(long count, long documents) throws IOException {
586                 processor.processTuple(count, documents);
587             } 
588             
589             public void close() throws IOException {
590                 processor.close();
591             }                    
592         }
593         public static class TupleUnshredder implements ShreddedProcessor {
594             WordCount last = new WordCount();
595             public org.galagosearch.tupleflow.Processor<WordCount> processor;                               
596             
597             public TupleUnshredder(WordCount.Processor processor) {
598                 this.processor = processor;
599             }         
600             
601             public TupleUnshredder(org.galagosearch.tupleflow.Processor<WordCount> processor) {
602                 this.processor = processor;
603             }
604             
605             public WordCount clone(WordCount object) {
606                 WordCount result = new WordCount();
607                 if (object == null) return result;
608                 result.word = object.word; 
609                 result.count = object.count; 
610                 result.documents = object.documents; 
611                 return result;
612             }                 
613             
614             public void processWord(String word) throws IOException {
615                 last.word = word;
616             }   
617                 
618             
619             public void processTuple(long count, long documents) throws IOException {
620                 last.count = count;
621                 last.documents = documents;
622                 processor.process(clone(last));
623             }               
624             
625             public void close() throws IOException {
626                 processor.close();
627             }
628         }     
629         public static class TupleShredder implements Processor {
630             WordCount last = new WordCount();
631             public ShreddedProcessor processor;
632             
633             public TupleShredder(ShreddedProcessor processor) {
634                 this.processor = processor;
635             }                              
636             
637             public WordCount clone(WordCount object) {
638                 WordCount result = new WordCount();
639                 if (object == null) return result;
640                 result.word = object.word; 
641                 result.count = object.count; 
642                 result.documents = object.documents; 
643                 return result;
644             }                 
645             
646             public void process(WordCount object) throws IOException {                                                                                                                                                   
647                 boolean processAll = false;
648                 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
649                 processor.processTuple(object.count, object.documents);                                         
650             }
651                           
652             public Class<WordCount> getInputClass() {
653                 return WordCount.class;
654             }
655             
656             public void close() throws IOException {
657                 processor.close();
658             }                     
659         }
660     } 
661 }