Coverage Report - org.galagosearch.tupleflow.execution.StageInstanceFactory
 
Classes in this File Line Coverage Branch Coverage Complexity
StageInstanceFactory
0%
0/106
0%
0/50
0
StageInstanceFactory$StepParameters
0%
0/12
N/A
0
 
 1  
 // BSD License (http://www.galagosearch.org/license)
 2  
 
 3  
 package org.galagosearch.tupleflow.execution;
 4  
 
 5  
 import java.io.File;
 6  
 import java.io.IOException;
 7  
 import java.lang.reflect.Constructor;
 8  
 import java.util.ArrayList;
 9  
 import java.util.Arrays;
 10  
 import java.util.List;
 11  
 import org.galagosearch.tupleflow.Counter;
 12  
 import org.galagosearch.tupleflow.ExNihiloSource;
 13  
 import org.galagosearch.tupleflow.FileOrderedReader;
 14  
 import org.galagosearch.tupleflow.FileOrderedWriter;
 15  
 import org.galagosearch.tupleflow.IncompatibleProcessorException;
 16  
 import org.galagosearch.tupleflow.Order;
 17  
 import org.galagosearch.tupleflow.OrderedCombiner;
 18  
 import org.galagosearch.tupleflow.Parameters;
 19  
 import org.galagosearch.tupleflow.Processor;
 20  
 import org.galagosearch.tupleflow.ReaderSource;
 21  
 import org.galagosearch.tupleflow.Source;
 22  
 import org.galagosearch.tupleflow.Splitter;
 23  
 import org.galagosearch.tupleflow.TupleFlowParameters;
 24  
 import org.galagosearch.tupleflow.TypeReader;
 25  
 import org.galagosearch.tupleflow.Utility;
 26  
 import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeInput;
 27  
 import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeOutput;
 28  
 
 29  
 /**
 30  
  *
 31  
  * @author trevor
 32  
  */
 33  0
 public class StageInstanceFactory {
 34  
     NetworkedCounterManager counterManager;
 35  
 
 36  0
     public StageInstanceFactory(NetworkedCounterManager counterManager) {
 37  0
         this.counterManager = counterManager;
 38  0
     }
 39  
 
 40  
     public class StepParameters implements TupleFlowParameters {
 41  
         Parameters xml;
 42  
         StageInstanceDescription instance;
 43  
 
 44  0
         public StepParameters(Step o, StageInstanceDescription instance) {
 45  0
             this.xml = o.getParameters();
 46  0
             this.instance = instance;
 47  0
         }
 48  
 
 49  
         public Counter getCounter(String name) {
 50  0
             return counterManager.newCounter(
 51  
                     name, instance.getName(),
 52  
                     new Integer(instance.getIndex()).toString(), instance.getMasterURL());
 53  
         }
 54  
 
 55  
         public TypeReader getTypeReader(String specification) throws IOException {
 56  0
             PipeOutput pipeOutput = instance.getReaders().get(specification);
 57  0
             return StageInstanceFactory.getTypeReader(pipeOutput);
 58  
         }
 59  
 
 60  
         public Processor getTypeWriter(String specification) throws IOException {
 61  0
             PipeInput pipeInput = instance.getWriters().get(specification);
 62  0
             return StageInstanceFactory.getTypeWriter(pipeInput);
 63  
         }
 64  
 
 65  
         public boolean readerExists(String specification, String className, String[] order) {
 66  0
             return instance.readerExists(specification, className, order);
 67  
         }
 68  
 
 69  
         public boolean writerExists(String specification, String className, String[] order) {
 70  0
             return instance.writerExists(specification, className, order);
 71  
         }
 72  
 
 73  
         public Parameters getXML() {
 74  0
             return xml;
 75  
         }
 76  
     }
 77  
 
 78  
     public ExNihiloSource instantiate(StageInstanceDescription instance)
 79  
             throws IncompatibleProcessorException, IOException {
 80  0
         return (ExNihiloSource) instantiate(instance, instance.getStage().getSteps());
 81  
     }
 82  
 
 83  
     public org.galagosearch.tupleflow.Step instantiate(
 84  
             StageInstanceDescription instance,
 85  
             ArrayList<Step> steps)
 86  
             throws IncompatibleProcessorException, IOException {
 87  0
         org.galagosearch.tupleflow.Step previous = null;
 88  0
         org.galagosearch.tupleflow.Step first = null;
 89  
 
 90  0
         for (Step step : steps) {
 91  
             org.galagosearch.tupleflow.Step current;
 92  
 
 93  0
             if (step instanceof MultiStep) {
 94  0
                 current = instantiateMulti(instance, step);
 95  0
             } else if (step instanceof InputStep) {
 96  0
                 current = instantiateInput(instance, (InputStep) step);
 97  0
             } else if (step instanceof OutputStep) {
 98  0
                 current = instantiateOutput(instance, (OutputStep) step);
 99  
             } else {
 100  0
                 current = instantiateStep(instance, step);
 101  
             }
 102  
 
 103  0
             if (first == null) {
 104  0
                 first = current;
 105  
             }
 106  0
             if (previous != null) {
 107  0
                 ((Source) previous).setProcessor(current);
 108  
             }
 109  
 
 110  0
             previous = current;
 111  0
         }
 112  
 
 113  0
         return first;
 114  
     }
 115  
 
 116  
     public org.galagosearch.tupleflow.Step instantiateStep(
 117  
             StageInstanceDescription instance,
 118  
             final Step step) throws IOException {
 119  
         org.galagosearch.tupleflow.Step object;
 120  
 
 121  
         try {
 122  0
             Class objectClass = Class.forName(step.getClassName());
 123  0
             Constructor parameterArgumentConstructor = null;
 124  0
             Constructor noArgumentConstructor = null;
 125  
 
 126  0
             for (Constructor c : objectClass.getConstructors()) {
 127  0
                 java.lang.reflect.Type[] parameters = c.getGenericParameterTypes();
 128  
 
 129  0
                 if (parameters.length == 0) {
 130  0
                     noArgumentConstructor = c;
 131  0
                 } else if (parameters.length == 1 && parameters[0] == TupleFlowParameters.class) {
 132  0
                     parameterArgumentConstructor = c;
 133  
                 }
 134  
             }
 135  
 
 136  0
             if (parameterArgumentConstructor != null) {
 137  0
                 object = (org.galagosearch.tupleflow.Step) parameterArgumentConstructor.newInstance(
 138  
                         new StepParameters(step, instance));
 139  0
             } else if (noArgumentConstructor != null) {
 140  0
                 object = (org.galagosearch.tupleflow.Step) noArgumentConstructor.newInstance();
 141  
             } else {
 142  0
                 throw new IncompatibleProcessorException(
 143  
                         "Couldn't instantiate this class because " +
 144  
                         "no compatible constructor was found: " + step.getClassName());
 145  
             }
 146  0
         } catch (Exception e) {
 147  0
             throw (IOException) new IOException(
 148  
                     "Couldn't instantiate a step object: " + step.getClassName()).initCause(e);
 149  0
         }
 150  
 
 151  0
         return object;
 152  
     }
 153  
 
 154  
     public org.galagosearch.tupleflow.Step instantiateInput(
 155  
             StageInstanceDescription instance,
 156  
             InputStep step) throws IOException {
 157  0
         PipeOutput pipeOutput = instance.getReaders().get(step.getId());
 158  0
         return getTypeReaderSource(pipeOutput);
 159  
     }
 160  
 
 161  
     public org.galagosearch.tupleflow.Step instantiateOutput(
 162  
             StageInstanceDescription instance,
 163  
             final OutputStep step) throws IOException {
 164  0
         PipeInput pipeInput = instance.getWriters().get(step.getId());
 165  0
         return getTypeWriter(pipeInput);
 166  
     }
 167  
 
 168  
     private org.galagosearch.tupleflow.Step instantiateMulti(
 169  
             StageInstanceDescription instance,
 170  
             final Step step) throws IncompatibleProcessorException, IOException {
 171  0
         MultiStep multiStep = (MultiStep) step;
 172  0
         Processor[] processors = new Processor[multiStep.groups.size()];
 173  
 
 174  0
         for (int i = 0; i < multiStep.groups.size(); i++) {
 175  0
             ArrayList<Step> group = multiStep.groups.get(i);
 176  0
             processors[i] = (org.galagosearch.tupleflow.Processor) instantiate(instance, group);
 177  
         }
 178  
 
 179  0
         return new org.galagosearch.tupleflow.Multi(processors);
 180  
     }
 181  
 
 182  
     protected static Order createOrder(final DataPipe pipe) throws IOException {
 183  0
         return createOrder(pipe.className, pipe.order);
 184  
     }
 185  
 
 186  
     public static Order createOrder(String className, String[] orderSpec) throws IOException {
 187  
         Order order;
 188  
 
 189  
         try {
 190  0
             Class typeClass = Class.forName(className);
 191  0
             org.galagosearch.tupleflow.Type type = (org.galagosearch.tupleflow.Type) typeClass.
 192  
                     getConstructor().newInstance();
 193  0
             order = type.getOrder(orderSpec);
 194  0
         } catch (Exception e) {
 195  0
             throw (IOException) new IOException(
 196  
                     "Couldn't create an order object for type: " + className).initCause(e);
 197  0
         }
 198  
 
 199  0
         return order;
 200  
     }
 201  
 
 202  
     public ReaderSource getTypeReaderSource(PipeOutput pipeOutput) throws IOException {
 203  
         ReaderSource reader;
 204  
 
 205  0
         if (pipeOutput == null) {
 206  0
             return null;
 207  
         }
 208  
 
 209  0
         Order order = createOrder(pipeOutput.getPipe());
 210  0
         String[] fileNames = pipeOutput.getFileNames();
 211  
 
 212  0
         if (fileNames.length > 1) {
 213  0
             reader = OrderedCombiner.combineFromFiles(Arrays.asList(fileNames), order);
 214  
         } else {
 215  0
             reader = new FileOrderedReader(fileNames[0], order);
 216  
         }
 217  0
         return reader;
 218  
     }
 219  
 
 220  
     @SuppressWarnings(value = "unchecked")
 221  
     public static <T> ReaderSource<T> getTypeReader(final PipeOutput pipeOutput) throws IOException {
 222  
         ReaderSource<T> reader;
 223  
 
 224  0
         if (pipeOutput == null) {
 225  0
             return null;
 226  
         }
 227  
 
 228  0
         Order order = createOrder(pipeOutput.getPipe());
 229  0
         String[] fileNames = pipeOutput.getFileNames();
 230  
 
 231  0
         if (fileNames.length > 100) {
 232  0
             List<String> names = Arrays.asList(fileNames);
 233  0
             ArrayList<String> reduced = new ArrayList<String>();
 234  
 
 235  
             // combine 20 files at a time
 236  0
             for (int i = 0; i < names.size(); i += 20) {
 237  0
                 int start = i;
 238  0
                 int end = Math.min(names.size(), i + 20);
 239  0
                 List<String> toCombine = names.subList(start, end);
 240  
 
 241  0
                 reader = OrderedCombiner.combineFromFiles(toCombine, order);
 242  0
                 File temporary = Utility.createTemporary();
 243  0
                 FileOrderedWriter<T> writer = new FileOrderedWriter<T>(temporary, order);
 244  
 
 245  
                 try {
 246  0
                     reader.setProcessor(writer);
 247  0
                 } catch (IncompatibleProcessorException e) {
 248  0
                     throw (IOException) new IOException("Incompatible processor for reader tuples").
 249  
                             initCause(e);
 250  0
                 }
 251  
 
 252  0
                 reader.run();
 253  
 
 254  0
                 reduced.add(temporary.toString());
 255  0
                 temporary.deleteOnExit();
 256  
             }
 257  
 
 258  0
             reader = OrderedCombiner.combineFromFiles(reduced, order);
 259  0
         } else if (fileNames.length > 1) {
 260  0
             reader = OrderedCombiner.combineFromFiles(Arrays.asList(fileNames), order);
 261  
         } else {
 262  0
             reader = new FileOrderedReader(fileNames[0], order);
 263  
         }
 264  0
         return reader;
 265  
     }
 266  
 
 267  
     public static Processor getTypeWriter(final PipeInput pipeInput) throws IOException, IOException {
 268  
         Processor writer;
 269  
 
 270  0
         if (pipeInput == null) {
 271  0
             return null;
 272  
         }
 273  0
         String[] fileNames = pipeInput.getFileNames();
 274  0
         Order order = createOrder(pipeInput.getPipe());
 275  0
         Order hashOrder = createOrder(pipeInput.getPipe().getClassName(), pipeInput.getPipe().getHash());
 276  
 
 277  0
         assert order != null : "Order not found: " + Arrays.toString(pipeInput.getPipe().getOrder());
 278  
 
 279  
         try {
 280  0
             if (fileNames.length == 1) {
 281  0
                 writer = new FileOrderedWriter(fileNames[0], order);
 282  
             } else {
 283  0
                 assert hashOrder != null : "Hash order not found: " + pipeInput.getPipe().getPipeName() + " " + pipeInput.getPipe().getHash();
 284  0
                 writer = Splitter.splitToFiles(fileNames, order, hashOrder);
 285  
             }
 286  0
         } catch (IncompatibleProcessorException e) {
 287  0
             throw (IOException) new IOException("Failed to create a typeWriter").initCause(e);
 288  0
         }
 289  
 
 290  0
         return writer;
 291  
     }
 292  
 }