1
2 package org.galagosearch.core.parse;
3
4 import java.io.IOException;
5 import java.util.ArrayList;
6 import java.util.HashMap;
7 import java.util.List;
8 import org.galagosearch.core.types.WordCount;
9 import org.galagosearch.tupleflow.IncompatibleProcessorException;
10 import org.galagosearch.tupleflow.InputClass;
11 import org.galagosearch.tupleflow.Linkage;
12 import org.galagosearch.tupleflow.OutputClass;
13 import org.galagosearch.tupleflow.Processor;
14 import org.galagosearch.tupleflow.Reducer;
15 import org.galagosearch.tupleflow.Source;
16 import org.galagosearch.tupleflow.Step;
17 import org.galagosearch.tupleflow.execution.Verified;
18
19 /***
20 *
21 * @author trevor
22 */
23 @InputClass(className = "org.galagosearch.core.types.WordCount", order = {"+word"})
24 @OutputClass(className = "org.galagosearch.core.types.WordCount", order = {"+word"})
25 @Verified
26 public class WordCountReducer implements Processor<WordCount>, Source<WordCount>, Reducer<WordCount>,
27 WordCount.Processor {
28 public Processor<WordCount> processor;
29 private WordCount last = null;
30 private WordCount aggregate = null;
31 private long totalTerms = 0;
32
33 public void process(WordCount wordCount) throws IOException {
34 if (last != null) {
35 if (!wordCount.word.equals(last.word)) {
36 flush();
37 } else if (aggregate == null) {
38 aggregate = new WordCount(last.word, last.count + wordCount.count,
39 last.documents + wordCount.documents);
40 } else {
41 aggregate.count += wordCount.count;
42 aggregate.documents += wordCount.documents;
43 }
44 }
45
46 last = wordCount;
47 }
48
49 public void flush() throws IOException {
50 if (last != null) {
51 if (aggregate != null) {
52 assert aggregate != null;
53 processor.process(aggregate);
54 totalTerms += aggregate.count;
55 } else {
56 assert last != null;
57 processor.process(last);
58 totalTerms += last.count;
59 }
60
61 aggregate = null;
62 }
63 }
64
65 public void close() throws IOException {
66 flush();
67 processor.close();
68 }
69
70 public void setProcessor(Step processor) throws IncompatibleProcessorException {
71 Linkage.link(this, processor);
72 }
73
74 public ArrayList<WordCount> reduce(List<WordCount> input) throws IOException {
75 HashMap<String, WordCount> countObjects = new HashMap<String, WordCount>();
76
77 for (WordCount wordCount : input) {
78 WordCount original = countObjects.get(wordCount.word);
79
80 if (original == null) {
81 countObjects.put(wordCount.word, wordCount);
82 } else {
83 original.documents += wordCount.documents;
84 original.count += wordCount.count;
85 }
86 }
87
88 return new ArrayList<WordCount>(countObjects.values());
89 }
90
91 public long getTotalTerms() {
92 return totalTerms;
93 }
94
95 public Class<WordCount> getInputClass() {
96 return WordCount.class;
97 }
98
99 public Class<WordCount> getOutputClass() {
100 return WordCount.class;
101 }
102 }