View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   package org.galagosearch.core.parse;
3   
4   import java.io.IOException;
5   import java.util.ArrayList;
6   import java.util.HashMap;
7   import java.util.List;
8   import org.galagosearch.core.types.WordCount;
9   import org.galagosearch.tupleflow.IncompatibleProcessorException;
10  import org.galagosearch.tupleflow.InputClass;
11  import org.galagosearch.tupleflow.Linkage;
12  import org.galagosearch.tupleflow.OutputClass;
13  import org.galagosearch.tupleflow.Processor;
14  import org.galagosearch.tupleflow.Reducer;
15  import org.galagosearch.tupleflow.Source;
16  import org.galagosearch.tupleflow.Step;
17  import org.galagosearch.tupleflow.execution.Verified;
18  
19  /***
20   *
21   * @author trevor
22   */
23  @InputClass(className = "org.galagosearch.core.types.WordCount", order = {"+word"})
24  @OutputClass(className = "org.galagosearch.core.types.WordCount", order = {"+word"})
25  @Verified
26  public class WordCountReducer implements Processor<WordCount>, Source<WordCount>, Reducer<WordCount>,
27          WordCount.Processor {
28      public Processor<WordCount> processor;
29      private WordCount last = null;
30      private WordCount aggregate = null;
31      private long totalTerms = 0;
32  
33      public void process(WordCount wordCount) throws IOException {
34          if (last != null) {
35              if (!wordCount.word.equals(last.word)) {
36                  flush();
37              } else if (aggregate == null) {
38                  aggregate = new WordCount(last.word, last.count + wordCount.count,
39                                            last.documents + wordCount.documents);
40              } else {
41                  aggregate.count += wordCount.count;
42                  aggregate.documents += wordCount.documents;
43              }
44          }
45  
46          last = wordCount;
47      }
48  
49      public void flush() throws IOException {
50          if (last != null) {
51              if (aggregate != null) {
52                  assert aggregate != null;
53                  processor.process(aggregate);
54                  totalTerms += aggregate.count;
55              } else {
56                  assert last != null;
57                  processor.process(last);
58                  totalTerms += last.count;
59              }
60  
61              aggregate = null;
62          }
63      }
64  
65      public void close() throws IOException {
66          flush();
67          processor.close();
68      }
69  
70      public void setProcessor(Step processor) throws IncompatibleProcessorException {
71          Linkage.link(this, processor);
72      }
73  
74      public ArrayList<WordCount> reduce(List<WordCount> input) throws IOException {
75          HashMap<String, WordCount> countObjects = new HashMap<String, WordCount>();
76  
77          for (WordCount wordCount : input) {
78              WordCount original = countObjects.get(wordCount.word);
79  
80              if (original == null) {
81                  countObjects.put(wordCount.word, wordCount);
82              } else {
83                  original.documents += wordCount.documents;
84                  original.count += wordCount.count;
85              }
86          }
87  
88          return new ArrayList<WordCount>(countObjects.values());
89      }
90  
91      public long getTotalTerms() {
92          return totalTerms;
93      }
94  
95      public Class<WordCount> getInputClass() {
96          return WordCount.class;
97      }
98  
99      public Class<WordCount> getOutputClass() {
100         return WordCount.class;
101     }
102 }