View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   package org.galagosearch.tupleflow;
3   
4   import java.io.IOException;
5   import java.util.ArrayList;
6   import java.util.Comparator;
7   import java.util.List;
8   import java.util.PriorityQueue;
9   import org.galagosearch.tupleflow.execution.ErrorHandler;
10  import org.galagosearch.tupleflow.execution.StageInstanceFactory;
11  import org.galagosearch.tupleflow.execution.Verification;
12  import org.galagosearch.tupleflow.execution.Verified;
13  
14  /***
15   * Combines many streams of data together, sequentially.  This is a
16   * replacement for a typical input step.
17   * 
18   * @author trevor
19   */
20  @Verified
21  public class StreamCombiner<T> implements ExNihiloSource<T> {
22      ArrayList<TypeReader<T>> readers;
23      Comparator<T> comparator;
24      public Processor<T> processor;
25  
26      class ReaderWrapper implements Comparable<ReaderWrapper> {
27          public TypeReader<T> reader;
28          public T top;
29          public T last;
30  
31          public ReaderWrapper(TypeReader<T> r) {
32              reader = r;
33              top = null;
34              last = null;
35          }
36  
37          public int compareTo(ReaderWrapper other) {
38              if (top == null && other.top == null) {
39                  return 0;
40              }
41              if (top == null) {
42                  return 1;
43              }
44              if (other.top == null) {
45                  return -1;
46              }
47              return comparator.compare(this.top, other.top);
48          }
49  
50          public boolean read() throws IOException {
51              last = top;
52              top = reader.read();
53              assert last == null || top == null || comparator.compare(last, top) <= 0 : last.toString() + " " + top.
54                      toString() + " " + reader.toString() + " " + reader.hashCode();
55  
56              return top != null;
57          }
58      }
59  
60      @SuppressWarnings("unchecked")
61      public StreamCombiner(TupleFlowParameters parameters) throws IOException {
62          List<String> inputs = parameters.getXML().stringList("input");
63          String className = parameters.getXML().get("class");
64          String[] orderSpec = parameters.getXML().get("order", "").split(" ");
65          Order<T> order = StageInstanceFactory.createOrder(className, orderSpec);
66          comparator = order.lessThan();
67          readers = new ArrayList<TypeReader<T>>();
68  
69          for (String input : inputs) {
70              readers.add((TypeReader<T>) parameters.getTypeReader(input));
71          }
72      }
73  
74      public void run() throws IOException {
75          PriorityQueue<ReaderWrapper> wrappers = new PriorityQueue<ReaderWrapper>();
76  
77          for (TypeReader<T> reader : readers) {
78              ReaderWrapper rw = new ReaderWrapper(reader);
79              if (rw.read()) {
80                  wrappers.add(rw);
81              }
82          }
83  
84          while (wrappers.size() > 0) {
85              ReaderWrapper rw = wrappers.poll();
86              T top = rw.top;
87  
88              processor.process(rw.top);
89              if (rw.read()) {
90                  wrappers.add(rw);
91              }
92          }
93  
94          processor.close();
95      }
96  
97      public void setProcessor(Step processor) throws IncompatibleProcessorException {
98          Linkage.link(this, processor);
99      }
100 
101     public static String getInputClass(TupleFlowParameters parameters) {
102         return parameters.getXML().get("class");
103     }
104 
105     public static String getOutputClass(TupleFlowParameters parameters) {
106         return parameters.getXML().get("class");
107     }
108 
109     public static String[] getOutputOrder(TupleFlowParameters parameters) {
110         String[] orderSpec = parameters.getXML().get("order", "").split(" ");
111         return orderSpec;
112     }
113 
114     public static String[] getInputOrder(TupleFlowParameters parameters) {
115         String[] orderSpec = parameters.getXML().get("order", "").split(" ");
116         return orderSpec;
117     }
118 
119     public static void verify(TupleFlowParameters parameters, ErrorHandler handler) throws ClassNotFoundException {
120         Verification.requireParameters(new String[]{"input", "class"}, parameters.getXML(), handler);
121 
122         List<String> inputs = parameters.getXML().stringList("input");
123         String cls = parameters.getXML().get("class");
124 
125         for (String input : inputs) {
126             Verification.verifyTypeReader(input, Class.forName(cls), parameters, handler);
127         }
128     }
129 }