Coverage Report - org.galagosearch.tupleflow.StreamCombiner
 
Classes in this File Line Coverage Branch Coverage Complexity
StreamCombiner
0%
0/39
0%
0/12
0
StreamCombiner$ReaderWrapper
0%
0/17
0%
0/18
0
 
 1  
 // BSD License (http://www.galagosearch.org/license)
 2  
 package org.galagosearch.tupleflow;
 3  
 
 4  
 import java.io.IOException;
 5  
 import java.util.ArrayList;
 6  
 import java.util.Comparator;
 7  
 import java.util.List;
 8  
 import java.util.PriorityQueue;
 9  
 import org.galagosearch.tupleflow.execution.ErrorHandler;
 10  
 import org.galagosearch.tupleflow.execution.StageInstanceFactory;
 11  
 import org.galagosearch.tupleflow.execution.Verification;
 12  
 import org.galagosearch.tupleflow.execution.Verified;
 13  
 
 14  
 /**
 15  
  * Combines many streams of data together, sequentially.  This is a
 16  
  * replacement for a typical input step.
 17  
  * 
 18  
  * @author trevor
 19  
  */
 20  
 @Verified
 21  
 public class StreamCombiner<T> implements ExNihiloSource<T> {
 22  
     ArrayList<TypeReader<T>> readers;
 23  
     Comparator<T> comparator;
 24  
     public Processor<T> processor;
 25  
 
 26  0
     class ReaderWrapper implements Comparable<ReaderWrapper> {
 27  
         public TypeReader<T> reader;
 28  
         public T top;
 29  
         public T last;
 30  
 
 31  0
         public ReaderWrapper(TypeReader<T> r) {
 32  0
             reader = r;
 33  0
             top = null;
 34  0
             last = null;
 35  0
         }
 36  
 
 37  
         public int compareTo(ReaderWrapper other) {
 38  0
             if (top == null && other.top == null) {
 39  0
                 return 0;
 40  
             }
 41  0
             if (top == null) {
 42  0
                 return 1;
 43  
             }
 44  0
             if (other.top == null) {
 45  0
                 return -1;
 46  
             }
 47  0
             return comparator.compare(this.top, other.top);
 48  
         }
 49  
 
 50  
         public boolean read() throws IOException {
 51  0
             last = top;
 52  0
             top = reader.read();
 53  
             assert last == null || top == null || comparator.compare(last, top) <= 0 : last.toString() + " " + top.
 54  0
                     toString() + " " + reader.toString() + " " + reader.hashCode();
 55  
 
 56  0
             return top != null;
 57  
         }
 58  
     }
 59  
 
 60  
     @SuppressWarnings("unchecked")
 61  0
     public StreamCombiner(TupleFlowParameters parameters) throws IOException {
 62  0
         List<String> inputs = parameters.getXML().stringList("input");
 63  0
         String className = parameters.getXML().get("class");
 64  0
         String[] orderSpec = parameters.getXML().get("order", "").split(" ");
 65  0
         Order<T> order = StageInstanceFactory.createOrder(className, orderSpec);
 66  0
         comparator = order.lessThan();
 67  0
         readers = new ArrayList<TypeReader<T>>();
 68  
 
 69  0
         for (String input : inputs) {
 70  0
             readers.add((TypeReader<T>) parameters.getTypeReader(input));
 71  
         }
 72  0
     }
 73  
 
 74  
     public void run() throws IOException {
 75  0
         PriorityQueue<ReaderWrapper> wrappers = new PriorityQueue<ReaderWrapper>();
 76  
 
 77  0
         for (TypeReader<T> reader : readers) {
 78  0
             ReaderWrapper rw = new ReaderWrapper(reader);
 79  0
             if (rw.read()) {
 80  0
                 wrappers.add(rw);
 81  
             }
 82  0
         }
 83  
 
 84  0
         while (wrappers.size() > 0) {
 85  0
             ReaderWrapper rw = wrappers.poll();
 86  0
             T top = rw.top;
 87  
 
 88  0
             processor.process(rw.top);
 89  0
             if (rw.read()) {
 90  0
                 wrappers.add(rw);
 91  
             }
 92  0
         }
 93  
 
 94  0
         processor.close();
 95  0
     }
 96  
 
 97  
     public void setProcessor(Step processor) throws IncompatibleProcessorException {
 98  0
         Linkage.link(this, processor);
 99  0
     }
 100  
 
 101  
     public static String getInputClass(TupleFlowParameters parameters) {
 102  0
         return parameters.getXML().get("class");
 103  
     }
 104  
 
 105  
     public static String getOutputClass(TupleFlowParameters parameters) {
 106  0
         return parameters.getXML().get("class");
 107  
     }
 108  
 
 109  
     public static String[] getOutputOrder(TupleFlowParameters parameters) {
 110  0
         String[] orderSpec = parameters.getXML().get("order", "").split(" ");
 111  0
         return orderSpec;
 112  
     }
 113  
 
 114  
     public static String[] getInputOrder(TupleFlowParameters parameters) {
 115  0
         String[] orderSpec = parameters.getXML().get("order", "").split(" ");
 116  0
         return orderSpec;
 117  
     }
 118  
 
 119  
     public static void verify(TupleFlowParameters parameters, ErrorHandler handler) throws ClassNotFoundException {
 120  0
         Verification.requireParameters(new String[]{"input", "class"}, parameters.getXML(), handler);
 121  
 
 122  0
         List<String> inputs = parameters.getXML().stringList("input");
 123  0
         String cls = parameters.getXML().get("class");
 124  
 
 125  0
         for (String input : inputs) {
 126  0
             Verification.verifyTypeReader(input, Class.forName(cls), parameters, handler);
 127  
         }
 128  0
     }
 129  
 }