1
2 package org.galagosearch.tupleflow;
3
4 import java.io.IOException;
5 import java.util.ArrayList;
6 import java.util.Comparator;
7 import java.util.List;
8 import java.util.PriorityQueue;
9 import org.galagosearch.tupleflow.execution.ErrorHandler;
10 import org.galagosearch.tupleflow.execution.StageInstanceFactory;
11 import org.galagosearch.tupleflow.execution.Verification;
12 import org.galagosearch.tupleflow.execution.Verified;
13
14 /***
15 * Combines many streams of data together, sequentially. This is a
16 * replacement for a typical input step.
17 *
18 * @author trevor
19 */
20 @Verified
21 public class StreamCombiner<T> implements ExNihiloSource<T> {
22 ArrayList<TypeReader<T>> readers;
23 Comparator<T> comparator;
24 public Processor<T> processor;
25
26 class ReaderWrapper implements Comparable<ReaderWrapper> {
27 public TypeReader<T> reader;
28 public T top;
29 public T last;
30
31 public ReaderWrapper(TypeReader<T> r) {
32 reader = r;
33 top = null;
34 last = null;
35 }
36
37 public int compareTo(ReaderWrapper other) {
38 if (top == null && other.top == null) {
39 return 0;
40 }
41 if (top == null) {
42 return 1;
43 }
44 if (other.top == null) {
45 return -1;
46 }
47 return comparator.compare(this.top, other.top);
48 }
49
50 public boolean read() throws IOException {
51 last = top;
52 top = reader.read();
53 assert last == null || top == null || comparator.compare(last, top) <= 0 : last.toString() + " " + top.
54 toString() + " " + reader.toString() + " " + reader.hashCode();
55
56 return top != null;
57 }
58 }
59
60 @SuppressWarnings("unchecked")
61 public StreamCombiner(TupleFlowParameters parameters) throws IOException {
62 List<String> inputs = parameters.getXML().stringList("input");
63 String className = parameters.getXML().get("class");
64 String[] orderSpec = parameters.getXML().get("order", "").split(" ");
65 Order<T> order = StageInstanceFactory.createOrder(className, orderSpec);
66 comparator = order.lessThan();
67 readers = new ArrayList<TypeReader<T>>();
68
69 for (String input : inputs) {
70 readers.add((TypeReader<T>) parameters.getTypeReader(input));
71 }
72 }
73
74 public void run() throws IOException {
75 PriorityQueue<ReaderWrapper> wrappers = new PriorityQueue<ReaderWrapper>();
76
77 for (TypeReader<T> reader : readers) {
78 ReaderWrapper rw = new ReaderWrapper(reader);
79 if (rw.read()) {
80 wrappers.add(rw);
81 }
82 }
83
84 while (wrappers.size() > 0) {
85 ReaderWrapper rw = wrappers.poll();
86 T top = rw.top;
87
88 processor.process(rw.top);
89 if (rw.read()) {
90 wrappers.add(rw);
91 }
92 }
93
94 processor.close();
95 }
96
97 public void setProcessor(Step processor) throws IncompatibleProcessorException {
98 Linkage.link(this, processor);
99 }
100
101 public static String getInputClass(TupleFlowParameters parameters) {
102 return parameters.getXML().get("class");
103 }
104
105 public static String getOutputClass(TupleFlowParameters parameters) {
106 return parameters.getXML().get("class");
107 }
108
109 public static String[] getOutputOrder(TupleFlowParameters parameters) {
110 String[] orderSpec = parameters.getXML().get("order", "").split(" ");
111 return orderSpec;
112 }
113
114 public static String[] getInputOrder(TupleFlowParameters parameters) {
115 String[] orderSpec = parameters.getXML().get("order", "").split(" ");
116 return orderSpec;
117 }
118
119 public static void verify(TupleFlowParameters parameters, ErrorHandler handler) throws ClassNotFoundException {
120 Verification.requireParameters(new String[]{"input", "class"}, parameters.getXML(), handler);
121
122 List<String> inputs = parameters.getXML().stringList("input");
123 String cls = parameters.getXML().get("class");
124
125 for (String input : inputs) {
126 Verification.verifyTypeReader(input, Class.forName(cls), parameters, handler);
127 }
128 }
129 }