View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   
3   package org.galagosearch.tupleflow.execution;
4   
5   import java.io.File;
6   import java.io.IOException;
7   import java.lang.reflect.Constructor;
8   import java.util.ArrayList;
9   import java.util.Arrays;
10  import java.util.List;
11  import org.galagosearch.tupleflow.Counter;
12  import org.galagosearch.tupleflow.ExNihiloSource;
13  import org.galagosearch.tupleflow.FileOrderedReader;
14  import org.galagosearch.tupleflow.FileOrderedWriter;
15  import org.galagosearch.tupleflow.IncompatibleProcessorException;
16  import org.galagosearch.tupleflow.Order;
17  import org.galagosearch.tupleflow.OrderedCombiner;
18  import org.galagosearch.tupleflow.Parameters;
19  import org.galagosearch.tupleflow.Processor;
20  import org.galagosearch.tupleflow.ReaderSource;
21  import org.galagosearch.tupleflow.Source;
22  import org.galagosearch.tupleflow.Splitter;
23  import org.galagosearch.tupleflow.TupleFlowParameters;
24  import org.galagosearch.tupleflow.TypeReader;
25  import org.galagosearch.tupleflow.Utility;
26  import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeInput;
27  import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeOutput;
28  
29  /***
30   *
31   * @author trevor
32   */
33  public class StageInstanceFactory {
34      NetworkedCounterManager counterManager;
35  
36      public StageInstanceFactory(NetworkedCounterManager counterManager) {
37          this.counterManager = counterManager;
38      }
39  
40      public class StepParameters implements TupleFlowParameters {
41          Parameters xml;
42          StageInstanceDescription instance;
43  
44          public StepParameters(Step o, StageInstanceDescription instance) {
45              this.xml = o.getParameters();
46              this.instance = instance;
47          }
48  
49          public Counter getCounter(String name) {
50              return counterManager.newCounter(
51                      name, instance.getName(),
52                      new Integer(instance.getIndex()).toString(), instance.getMasterURL());
53          }
54  
55          public TypeReader getTypeReader(String specification) throws IOException {
56              PipeOutput pipeOutput = instance.getReaders().get(specification);
57              return StageInstanceFactory.getTypeReader(pipeOutput);
58          }
59  
60          public Processor getTypeWriter(String specification) throws IOException {
61              PipeInput pipeInput = instance.getWriters().get(specification);
62              return StageInstanceFactory.getTypeWriter(pipeInput);
63          }
64  
65          public boolean readerExists(String specification, String className, String[] order) {
66              return instance.readerExists(specification, className, order);
67          }
68  
69          public boolean writerExists(String specification, String className, String[] order) {
70              return instance.writerExists(specification, className, order);
71          }
72  
73          public Parameters getXML() {
74              return xml;
75          }
76      }
77  
78      public ExNihiloSource instantiate(StageInstanceDescription instance)
79              throws IncompatibleProcessorException, IOException {
80          return (ExNihiloSource) instantiate(instance, instance.getStage().getSteps());
81      }
82  
83      public org.galagosearch.tupleflow.Step instantiate(
84              StageInstanceDescription instance,
85              ArrayList<Step> steps)
86              throws IncompatibleProcessorException, IOException {
87          org.galagosearch.tupleflow.Step previous = null;
88          org.galagosearch.tupleflow.Step first = null;
89  
90          for (Step step : steps) {
91              org.galagosearch.tupleflow.Step current;
92  
93              if (step instanceof MultiStep) {
94                  current = instantiateMulti(instance, step);
95              } else if (step instanceof InputStep) {
96                  current = instantiateInput(instance, (InputStep) step);
97              } else if (step instanceof OutputStep) {
98                  current = instantiateOutput(instance, (OutputStep) step);
99              } else {
100                 current = instantiateStep(instance, step);
101             }
102 
103             if (first == null) {
104                 first = current;
105             }
106             if (previous != null) {
107                 ((Source) previous).setProcessor(current);
108             }
109 
110             previous = current;
111         }
112 
113         return first;
114     }
115 
116     public org.galagosearch.tupleflow.Step instantiateStep(
117             StageInstanceDescription instance,
118             final Step step) throws IOException {
119         org.galagosearch.tupleflow.Step object;
120 
121         try {
122             Class objectClass = Class.forName(step.getClassName());
123             Constructor parameterArgumentConstructor = null;
124             Constructor noArgumentConstructor = null;
125 
126             for (Constructor c : objectClass.getConstructors()) {
127                 java.lang.reflect.Type[] parameters = c.getGenericParameterTypes();
128 
129                 if (parameters.length == 0) {
130                     noArgumentConstructor = c;
131                 } else if (parameters.length == 1 && parameters[0] == TupleFlowParameters.class) {
132                     parameterArgumentConstructor = c;
133                 }
134             }
135 
136             if (parameterArgumentConstructor != null) {
137                 object = (org.galagosearch.tupleflow.Step) parameterArgumentConstructor.newInstance(
138                         new StepParameters(step, instance));
139             } else if (noArgumentConstructor != null) {
140                 object = (org.galagosearch.tupleflow.Step) noArgumentConstructor.newInstance();
141             } else {
142                 throw new IncompatibleProcessorException(
143                         "Couldn't instantiate this class because " +
144                         "no compatible constructor was found: " + step.getClassName());
145             }
146         } catch (Exception e) {
147             throw (IOException) new IOException(
148                     "Couldn't instantiate a step object: " + step.getClassName()).initCause(e);
149         }
150 
151         return object;
152     }
153 
154     public org.galagosearch.tupleflow.Step instantiateInput(
155             StageInstanceDescription instance,
156             InputStep step) throws IOException {
157         PipeOutput pipeOutput = instance.getReaders().get(step.getId());
158         return getTypeReaderSource(pipeOutput);
159     }
160 
161     public org.galagosearch.tupleflow.Step instantiateOutput(
162             StageInstanceDescription instance,
163             final OutputStep step) throws IOException {
164         PipeInput pipeInput = instance.getWriters().get(step.getId());
165         return getTypeWriter(pipeInput);
166     }
167 
168     private org.galagosearch.tupleflow.Step instantiateMulti(
169             StageInstanceDescription instance,
170             final Step step) throws IncompatibleProcessorException, IOException {
171         MultiStep multiStep = (MultiStep) step;
172         Processor[] processors = new Processor[multiStep.groups.size()];
173 
174         for (int i = 0; i < multiStep.groups.size(); i++) {
175             ArrayList<Step> group = multiStep.groups.get(i);
176             processors[i] = (org.galagosearch.tupleflow.Processor) instantiate(instance, group);
177         }
178 
179         return new org.galagosearch.tupleflow.Multi(processors);
180     }
181 
182     protected static Order createOrder(final DataPipe pipe) throws IOException {
183         return createOrder(pipe.className, pipe.order);
184     }
185 
186     public static Order createOrder(String className, String[] orderSpec) throws IOException {
187         Order order;
188 
189         try {
190             Class typeClass = Class.forName(className);
191             org.galagosearch.tupleflow.Type type = (org.galagosearch.tupleflow.Type) typeClass.
192                     getConstructor().newInstance();
193             order = type.getOrder(orderSpec);
194         } catch (Exception e) {
195             throw (IOException) new IOException(
196                     "Couldn't create an order object for type: " + className).initCause(e);
197         }
198 
199         return order;
200     }
201 
202     public ReaderSource getTypeReaderSource(PipeOutput pipeOutput) throws IOException {
203         ReaderSource reader;
204 
205         if (pipeOutput == null) {
206             return null;
207         }
208 
209         Order order = createOrder(pipeOutput.getPipe());
210         String[] fileNames = pipeOutput.getFileNames();
211 
212         if (fileNames.length > 1) {
213             reader = OrderedCombiner.combineFromFiles(Arrays.asList(fileNames), order);
214         } else {
215             reader = new FileOrderedReader(fileNames[0], order);
216         }
217         return reader;
218     }
219 
220     @SuppressWarnings(value = "unchecked")
221     public static <T> ReaderSource<T> getTypeReader(final PipeOutput pipeOutput) throws IOException {
222         ReaderSource<T> reader;
223 
224         if (pipeOutput == null) {
225             return null;
226         }
227 
228         Order order = createOrder(pipeOutput.getPipe());
229         String[] fileNames = pipeOutput.getFileNames();
230 
231         if (fileNames.length > 100) {
232             List<String> names = Arrays.asList(fileNames);
233             ArrayList<String> reduced = new ArrayList<String>();
234 
235             // combine 20 files at a time
236             for (int i = 0; i < names.size(); i += 20) {
237                 int start = i;
238                 int end = Math.min(names.size(), i + 20);
239                 List<String> toCombine = names.subList(start, end);
240 
241                 reader = OrderedCombiner.combineFromFiles(toCombine, order);
242                 File temporary = Utility.createTemporary();
243                 FileOrderedWriter<T> writer = new FileOrderedWriter<T>(temporary, order);
244 
245                 try {
246                     reader.setProcessor(writer);
247                 } catch (IncompatibleProcessorException e) {
248                     throw (IOException) new IOException("Incompatible processor for reader tuples").
249                             initCause(e);
250                 }
251 
252                 reader.run();
253 
254                 reduced.add(temporary.toString());
255                 temporary.deleteOnExit();
256             }
257 
258             reader = OrderedCombiner.combineFromFiles(reduced, order);
259         } else if (fileNames.length > 1) {
260             reader = OrderedCombiner.combineFromFiles(Arrays.asList(fileNames), order);
261         } else {
262             reader = new FileOrderedReader(fileNames[0], order);
263         }
264         return reader;
265     }
266 
267     public static Processor getTypeWriter(final PipeInput pipeInput) throws IOException, IOException {
268         Processor writer;
269 
270         if (pipeInput == null) {
271             return null;
272         }
273         String[] fileNames = pipeInput.getFileNames();
274         Order order = createOrder(pipeInput.getPipe());
275         Order hashOrder = createOrder(pipeInput.getPipe().getClassName(), pipeInput.getPipe().getHash());
276 
277         assert order != null : "Order not found: " + Arrays.toString(pipeInput.getPipe().getOrder());
278 
279         try {
280             if (fileNames.length == 1) {
281                 writer = new FileOrderedWriter(fileNames[0], order);
282             } else {
283                 assert hashOrder != null : "Hash order not found: " + pipeInput.getPipe().getPipeName() + " " + pipeInput.getPipe().getHash();
284                 writer = Splitter.splitToFiles(fileNames, order, hashOrder);
285             }
286         } catch (IncompatibleProcessorException e) {
287             throw (IOException) new IOException("Failed to create a typeWriter").initCause(e);
288         }
289 
290         return writer;
291     }
292 }