1
2
3 package org.galagosearch.tupleflow.execution;
4
5 import java.io.File;
6 import java.io.IOException;
7 import java.lang.reflect.Constructor;
8 import java.util.ArrayList;
9 import java.util.Arrays;
10 import java.util.List;
11 import org.galagosearch.tupleflow.Counter;
12 import org.galagosearch.tupleflow.ExNihiloSource;
13 import org.galagosearch.tupleflow.FileOrderedReader;
14 import org.galagosearch.tupleflow.FileOrderedWriter;
15 import org.galagosearch.tupleflow.IncompatibleProcessorException;
16 import org.galagosearch.tupleflow.Order;
17 import org.galagosearch.tupleflow.OrderedCombiner;
18 import org.galagosearch.tupleflow.Parameters;
19 import org.galagosearch.tupleflow.Processor;
20 import org.galagosearch.tupleflow.ReaderSource;
21 import org.galagosearch.tupleflow.Source;
22 import org.galagosearch.tupleflow.Splitter;
23 import org.galagosearch.tupleflow.TupleFlowParameters;
24 import org.galagosearch.tupleflow.TypeReader;
25 import org.galagosearch.tupleflow.Utility;
26 import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeInput;
27 import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeOutput;
28
29 /***
30 *
31 * @author trevor
32 */
33 public class StageInstanceFactory {
34 NetworkedCounterManager counterManager;
35
36 public StageInstanceFactory(NetworkedCounterManager counterManager) {
37 this.counterManager = counterManager;
38 }
39
40 public class StepParameters implements TupleFlowParameters {
41 Parameters xml;
42 StageInstanceDescription instance;
43
44 public StepParameters(Step o, StageInstanceDescription instance) {
45 this.xml = o.getParameters();
46 this.instance = instance;
47 }
48
49 public Counter getCounter(String name) {
50 return counterManager.newCounter(
51 name, instance.getName(),
52 new Integer(instance.getIndex()).toString(), instance.getMasterURL());
53 }
54
55 public TypeReader getTypeReader(String specification) throws IOException {
56 PipeOutput pipeOutput = instance.getReaders().get(specification);
57 return StageInstanceFactory.getTypeReader(pipeOutput);
58 }
59
60 public Processor getTypeWriter(String specification) throws IOException {
61 PipeInput pipeInput = instance.getWriters().get(specification);
62 return StageInstanceFactory.getTypeWriter(pipeInput);
63 }
64
65 public boolean readerExists(String specification, String className, String[] order) {
66 return instance.readerExists(specification, className, order);
67 }
68
69 public boolean writerExists(String specification, String className, String[] order) {
70 return instance.writerExists(specification, className, order);
71 }
72
73 public Parameters getXML() {
74 return xml;
75 }
76 }
77
78 public ExNihiloSource instantiate(StageInstanceDescription instance)
79 throws IncompatibleProcessorException, IOException {
80 return (ExNihiloSource) instantiate(instance, instance.getStage().getSteps());
81 }
82
83 public org.galagosearch.tupleflow.Step instantiate(
84 StageInstanceDescription instance,
85 ArrayList<Step> steps)
86 throws IncompatibleProcessorException, IOException {
87 org.galagosearch.tupleflow.Step previous = null;
88 org.galagosearch.tupleflow.Step first = null;
89
90 for (Step step : steps) {
91 org.galagosearch.tupleflow.Step current;
92
93 if (step instanceof MultiStep) {
94 current = instantiateMulti(instance, step);
95 } else if (step instanceof InputStep) {
96 current = instantiateInput(instance, (InputStep) step);
97 } else if (step instanceof OutputStep) {
98 current = instantiateOutput(instance, (OutputStep) step);
99 } else {
100 current = instantiateStep(instance, step);
101 }
102
103 if (first == null) {
104 first = current;
105 }
106 if (previous != null) {
107 ((Source) previous).setProcessor(current);
108 }
109
110 previous = current;
111 }
112
113 return first;
114 }
115
116 public org.galagosearch.tupleflow.Step instantiateStep(
117 StageInstanceDescription instance,
118 final Step step) throws IOException {
119 org.galagosearch.tupleflow.Step object;
120
121 try {
122 Class objectClass = Class.forName(step.getClassName());
123 Constructor parameterArgumentConstructor = null;
124 Constructor noArgumentConstructor = null;
125
126 for (Constructor c : objectClass.getConstructors()) {
127 java.lang.reflect.Type[] parameters = c.getGenericParameterTypes();
128
129 if (parameters.length == 0) {
130 noArgumentConstructor = c;
131 } else if (parameters.length == 1 && parameters[0] == TupleFlowParameters.class) {
132 parameterArgumentConstructor = c;
133 }
134 }
135
136 if (parameterArgumentConstructor != null) {
137 object = (org.galagosearch.tupleflow.Step) parameterArgumentConstructor.newInstance(
138 new StepParameters(step, instance));
139 } else if (noArgumentConstructor != null) {
140 object = (org.galagosearch.tupleflow.Step) noArgumentConstructor.newInstance();
141 } else {
142 throw new IncompatibleProcessorException(
143 "Couldn't instantiate this class because " +
144 "no compatible constructor was found: " + step.getClassName());
145 }
146 } catch (Exception e) {
147 throw (IOException) new IOException(
148 "Couldn't instantiate a step object: " + step.getClassName()).initCause(e);
149 }
150
151 return object;
152 }
153
154 public org.galagosearch.tupleflow.Step instantiateInput(
155 StageInstanceDescription instance,
156 InputStep step) throws IOException {
157 PipeOutput pipeOutput = instance.getReaders().get(step.getId());
158 return getTypeReaderSource(pipeOutput);
159 }
160
161 public org.galagosearch.tupleflow.Step instantiateOutput(
162 StageInstanceDescription instance,
163 final OutputStep step) throws IOException {
164 PipeInput pipeInput = instance.getWriters().get(step.getId());
165 return getTypeWriter(pipeInput);
166 }
167
168 private org.galagosearch.tupleflow.Step instantiateMulti(
169 StageInstanceDescription instance,
170 final Step step) throws IncompatibleProcessorException, IOException {
171 MultiStep multiStep = (MultiStep) step;
172 Processor[] processors = new Processor[multiStep.groups.size()];
173
174 for (int i = 0; i < multiStep.groups.size(); i++) {
175 ArrayList<Step> group = multiStep.groups.get(i);
176 processors[i] = (org.galagosearch.tupleflow.Processor) instantiate(instance, group);
177 }
178
179 return new org.galagosearch.tupleflow.Multi(processors);
180 }
181
182 protected static Order createOrder(final DataPipe pipe) throws IOException {
183 return createOrder(pipe.className, pipe.order);
184 }
185
186 public static Order createOrder(String className, String[] orderSpec) throws IOException {
187 Order order;
188
189 try {
190 Class typeClass = Class.forName(className);
191 org.galagosearch.tupleflow.Type type = (org.galagosearch.tupleflow.Type) typeClass.
192 getConstructor().newInstance();
193 order = type.getOrder(orderSpec);
194 } catch (Exception e) {
195 throw (IOException) new IOException(
196 "Couldn't create an order object for type: " + className).initCause(e);
197 }
198
199 return order;
200 }
201
202 public ReaderSource getTypeReaderSource(PipeOutput pipeOutput) throws IOException {
203 ReaderSource reader;
204
205 if (pipeOutput == null) {
206 return null;
207 }
208
209 Order order = createOrder(pipeOutput.getPipe());
210 String[] fileNames = pipeOutput.getFileNames();
211
212 if (fileNames.length > 1) {
213 reader = OrderedCombiner.combineFromFiles(Arrays.asList(fileNames), order);
214 } else {
215 reader = new FileOrderedReader(fileNames[0], order);
216 }
217 return reader;
218 }
219
220 @SuppressWarnings(value = "unchecked")
221 public static <T> ReaderSource<T> getTypeReader(final PipeOutput pipeOutput) throws IOException {
222 ReaderSource<T> reader;
223
224 if (pipeOutput == null) {
225 return null;
226 }
227
228 Order order = createOrder(pipeOutput.getPipe());
229 String[] fileNames = pipeOutput.getFileNames();
230
231 if (fileNames.length > 100) {
232 List<String> names = Arrays.asList(fileNames);
233 ArrayList<String> reduced = new ArrayList<String>();
234
235
236 for (int i = 0; i < names.size(); i += 20) {
237 int start = i;
238 int end = Math.min(names.size(), i + 20);
239 List<String> toCombine = names.subList(start, end);
240
241 reader = OrderedCombiner.combineFromFiles(toCombine, order);
242 File temporary = Utility.createTemporary();
243 FileOrderedWriter<T> writer = new FileOrderedWriter<T>(temporary, order);
244
245 try {
246 reader.setProcessor(writer);
247 } catch (IncompatibleProcessorException e) {
248 throw (IOException) new IOException("Incompatible processor for reader tuples").
249 initCause(e);
250 }
251
252 reader.run();
253
254 reduced.add(temporary.toString());
255 temporary.deleteOnExit();
256 }
257
258 reader = OrderedCombiner.combineFromFiles(reduced, order);
259 } else if (fileNames.length > 1) {
260 reader = OrderedCombiner.combineFromFiles(Arrays.asList(fileNames), order);
261 } else {
262 reader = new FileOrderedReader(fileNames[0], order);
263 }
264 return reader;
265 }
266
267 public static Processor getTypeWriter(final PipeInput pipeInput) throws IOException, IOException {
268 Processor writer;
269
270 if (pipeInput == null) {
271 return null;
272 }
273 String[] fileNames = pipeInput.getFileNames();
274 Order order = createOrder(pipeInput.getPipe());
275 Order hashOrder = createOrder(pipeInput.getPipe().getClassName(), pipeInput.getPipe().getHash());
276
277 assert order != null : "Order not found: " + Arrays.toString(pipeInput.getPipe().getOrder());
278
279 try {
280 if (fileNames.length == 1) {
281 writer = new FileOrderedWriter(fileNames[0], order);
282 } else {
283 assert hashOrder != null : "Hash order not found: " + pipeInput.getPipe().getPipeName() + " " + pipeInput.getPipe().getHash();
284 writer = Splitter.splitToFiles(fileNames, order, hashOrder);
285 }
286 } catch (IncompatibleProcessorException e) {
287 throw (IOException) new IOException("Failed to create a typeWriter").initCause(e);
288 }
289
290 return writer;
291 }
292 }