Coverage Report - org.galagosearch.core.parse.WordCountReducer
 
Classes in this File Line Coverage Branch Coverage Complexity
WordCountReducer
0%
0/40
0%
0/22
0
 
 1  
 // BSD License (http://www.galagosearch.org/license)
 2  
 package org.galagosearch.core.parse;
 3  
 
 4  
 import java.io.IOException;
 5  
 import java.util.ArrayList;
 6  
 import java.util.HashMap;
 7  
 import java.util.List;
 8  
 import org.galagosearch.core.types.WordCount;
 9  
 import org.galagosearch.tupleflow.IncompatibleProcessorException;
 10  
 import org.galagosearch.tupleflow.InputClass;
 11  
 import org.galagosearch.tupleflow.Linkage;
 12  
 import org.galagosearch.tupleflow.OutputClass;
 13  
 import org.galagosearch.tupleflow.Processor;
 14  
 import org.galagosearch.tupleflow.Reducer;
 15  
 import org.galagosearch.tupleflow.Source;
 16  
 import org.galagosearch.tupleflow.Step;
 17  
 import org.galagosearch.tupleflow.execution.Verified;
 18  
 
 19  
 /**
 20  
  *
 21  
  * @author trevor
 22  
  */
 23  
 @InputClass(className = "org.galagosearch.core.types.WordCount", order = {"+word"})
 24  
 @OutputClass(className = "org.galagosearch.core.types.WordCount", order = {"+word"})
 25  
 @Verified
 26  0
 public class WordCountReducer implements Processor<WordCount>, Source<WordCount>, Reducer<WordCount>,
 27  
         WordCount.Processor {
 28  
     public Processor<WordCount> processor;
 29  0
     private WordCount last = null;
 30  0
     private WordCount aggregate = null;
 31  0
     private long totalTerms = 0;
 32  
 
 33  
     public void process(WordCount wordCount) throws IOException {
 34  0
         if (last != null) {
 35  0
             if (!wordCount.word.equals(last.word)) {
 36  0
                 flush();
 37  0
             } else if (aggregate == null) {
 38  0
                 aggregate = new WordCount(last.word, last.count + wordCount.count,
 39  
                                           last.documents + wordCount.documents);
 40  
             } else {
 41  0
                 aggregate.count += wordCount.count;
 42  0
                 aggregate.documents += wordCount.documents;
 43  
             }
 44  
         }
 45  
 
 46  0
         last = wordCount;
 47  0
     }
 48  
 
 49  
     public void flush() throws IOException {
 50  0
         if (last != null) {
 51  0
             if (aggregate != null) {
 52  0
                 assert aggregate != null;
 53  0
                 processor.process(aggregate);
 54  0
                 totalTerms += aggregate.count;
 55  
             } else {
 56  0
                 assert last != null;
 57  0
                 processor.process(last);
 58  0
                 totalTerms += last.count;
 59  
             }
 60  
 
 61  0
             aggregate = null;
 62  
         }
 63  0
     }
 64  
 
 65  
     public void close() throws IOException {
 66  0
         flush();
 67  0
         processor.close();
 68  0
     }
 69  
 
 70  
     public void setProcessor(Step processor) throws IncompatibleProcessorException {
 71  0
         Linkage.link(this, processor);
 72  0
     }
 73  
 
 74  
     public ArrayList<WordCount> reduce(List<WordCount> input) throws IOException {
 75  0
         HashMap<String, WordCount> countObjects = new HashMap<String, WordCount>();
 76  
 
 77  0
         for (WordCount wordCount : input) {
 78  0
             WordCount original = countObjects.get(wordCount.word);
 79  
 
 80  0
             if (original == null) {
 81  0
                 countObjects.put(wordCount.word, wordCount);
 82  
             } else {
 83  0
                 original.documents += wordCount.documents;
 84  0
                 original.count += wordCount.count;
 85  
             }
 86  0
         }
 87  
 
 88  0
         return new ArrayList<WordCount>(countObjects.values());
 89  
     }
 90  
 
 91  
     public long getTotalTerms() {
 92  0
         return totalTerms;
 93  
     }
 94  
 
 95  
     public Class<WordCount> getInputClass() {
 96  0
         return WordCount.class;
 97  
     }
 98  
 
 99  
     public Class<WordCount> getOutputClass() {
 100  0
         return WordCount.class;
 101  
     }
 102  
 }