Coverage Report - org.galagosearch.tupleflow.execution.JobExecutor
 
Classes in this File Line Coverage Branch Coverage Complexity
JobExecutor
0%
0/390
0%
0/204
0
JobExecutor$1
0%
0/1
N/A
0
JobExecutor$ConnectionDescription
0%
0/30
0%
0/12
0
JobExecutor$EndPointDescription
0%
0/10
N/A
0
JobExecutor$EndPointName
0%
0/17
0%
0/4
0
JobExecutor$JobExecutionStatus
0%
0/83
0%
0/42
0
JobExecutor$JobExecutionStatus$BlockedExecutionStatus
0%
0/11
N/A
0
 
 1  
 // BSD License (http://www.galagosearch.org/license)
 2  
 
 3  
 package org.galagosearch.tupleflow.execution;
 4  
 
 5  
 import java.net.UnknownHostException;
 6  
 import org.mortbay.jetty.Server;
 7  
 import org.galagosearch.tupleflow.Utility;
 8  
 import org.galagosearch.tupleflow.Processor;
 9  
 import org.galagosearch.tupleflow.TypeReader;
 10  
 import org.galagosearch.tupleflow.execution.Job.StagePoint;
 11  
 import org.galagosearch.tupleflow.execution.StageGroupDescription.DataPipeRegion;
 12  
 import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeInput;
 13  
 import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeOutput;
 14  
 import java.io.File;
 15  
 import java.io.IOException;
 16  
 import java.io.StringReader;
 17  
 import java.net.InetAddress;
 18  
 import java.util.ArrayList;
 19  
 import java.util.Arrays;
 20  
 import java.util.Collection;
 21  
 import java.util.Collections;
 22  
 import java.util.Date;
 23  
 import java.util.HashMap;
 24  
 import java.util.HashSet;
 25  
 import java.util.Iterator;
 26  
 import java.util.List;
 27  
 import java.util.Map;
 28  
 import java.util.TreeMap;
 29  
 import java.util.TreeSet;
 30  
 import java.util.concurrent.ExecutionException;
 31  
 import java.util.concurrent.Future;
 32  
 import javax.xml.parsers.ParserConfigurationException;
 33  
 import javax.xml.parsers.SAXParser;
 34  
 import javax.xml.parsers.SAXParserFactory;
 35  
 import java.util.logging.Level;
 36  
 import java.util.logging.Logger;
 37  
 import org.mortbay.jetty.bio.SocketConnector;
 38  
 import org.xml.sax.InputSource;
 39  
 import org.xml.sax.SAXException;
 40  
 import org.xml.sax.SAXParseException;
 41  
 
 42  
 /**
 43  
  * <p>This class is responsible for executing TupleFlow jobs.</p>
 44  
  *
 45  
  * <p>A job is specified using the TupleFlow Job class.  The Job class has an XML form
 46  
  * which can be parsed by JobConstructor, but you can create one programmatically as well.</p>
 47  
  *
 48  
  * <p>Before the job is executed, it is verified.  JobExecutor verifies that all the
 49  
  * classes references by the Job object actually exist, and that the connections point
 50  
  * sensible places.  Once it has been verified, the JobExecutor builds an execution plan
 51  
  * that will execute the job with as much parallelism as possible while not violate any
 52  
  * ordering constraints dictated by stage connections.  After the plan is generated,
 53  
  * the job is sent to a StageExecutor for the low-level details of execution.</p>
 54  
  *
 55  
  * <p>TupleFlow has many different kinds of StageExecutors you can use.  To get started
 56  
  * and to debug your code, use the LocalStageExecutor or ThreadedExecutor.  To harness
 57  
  * more parallelism, use the SSHStageExecutor or the DRMAAStageExecutor.</p>
 58  
  * 
 59  
  * @author trevor
 60  
  */
 61  0
 public class JobExecutor {
 62  
     ErrorStore store;
 63  
     Job job;
 64  
     String temporaryStorage;
 65  0
     int defaultHashCount = 10;
 66  0
     long maximumFileInputs = 200;
 67  0
     HashMap<String, ConnectionDescription> connections = new HashMap<String, ConnectionDescription>();
 68  0
     HashMap<String, StageGroupDescription> stages = new HashMap<String, StageGroupDescription>();
 69  0
     ArrayList<String> stageOrder = new ArrayList<String>();
 70  0
     HashMap<String, HashSet<String>> stageChildren = new HashMap<String, HashSet<String>>();
 71  0
     HashMap<String, HashSet<String>> stageParents = new HashMap<String, HashSet<String>>();
 72  0
     ArrayList<DataPipe> pipes = new ArrayList<DataPipe>();
 73  
 
 74  0
     public JobExecutor(Job job, String temporaryStorage, ErrorStore store) {
 75  0
         this.store = store;
 76  0
         this.temporaryStorage = temporaryStorage;
 77  0
         this.job = job;
 78  0
     }
 79  
 
 80  
     /**
 81  
      * Parses the XML text in the file specified by the filename parameter into a Job.
 82  
      * 
 83  
      * @param filename A path to the XML file to parse.
 84  
      * @param store An ErrorStore object where all error information should be sent.
 85  
      * @return A job instance based on the XML description in the text parameter.
 86  
      * @throws org.xml.sax.SAXException
 87  
      * @throws javax.xml.parsers.ParserConfigurationException
 88  
      * @throws java.io.IOException
 89  
      */
 90  
     public static Job parseFile(String filename, ErrorStore store) throws SAXException, ParserConfigurationException, IOException {
 91  
         // Parse the file
 92  0
         JobConstructor jobParseSaxHandler = new JobConstructor(filename, store);
 93  0
         SAXParser parser = SAXParserFactory.newInstance().newSAXParser();
 94  
 
 95  
         try {
 96  0
             parser.parse(new File(filename), jobParseSaxHandler);
 97  0
         } catch (SAXParseException e) {
 98  0
             store.addError(filename, e);
 99  0
         }
 100  
 
 101  0
         return jobParseSaxHandler.getJob();
 102  
     }
 103  
 
 104  
     /**
 105  
      * Parses the XML text in the text parameter into a Job.
 106  
      * 
 107  
      * @param text XML text to parse into a Job.
 108  
      * @param store An ErrorStore object where all error information should be sent.
 109  
      * @return A job instance based on the XML description in the text parameter.
 110  
      * @throws org.xml.sax.SAXException
 111  
      * @throws javax.xml.parsers.ParserConfigurationException
 112  
      * @throws java.io.IOException
 113  
      */
 114  
     public static Job parseText(String text, ErrorStore store) throws SAXException, ParserConfigurationException, IOException {
 115  
         // Parse the file
 116  0
         JobConstructor jobParseSaxHandler = new JobConstructor(store);
 117  0
         SAXParser parser = SAXParserFactory.newInstance().newSAXParser();
 118  
 
 119  
         try {
 120  0
             StringReader reader = new StringReader(text);
 121  0
             parser.parse(new InputSource(reader), jobParseSaxHandler);
 122  0
         } catch (SAXParseException e) {
 123  0
             store.addError("none", e);
 124  0
         }
 125  
 
 126  0
         return jobParseSaxHandler.getJob();
 127  
     }
 128  
 
 129  
     /**
 130  
      * This method tries to combine stages together to reduce overhead.
 131  
      * 
 132  
      * In particular, this method looks for two stages, A and B, where each
 133  
      * copy of B takes input from only one copy of stage A.  In this case, 
 134  
      * all the steps from B are moved into stage A, saving a lot of file overhead
 135  
      * in transferring tuples from A to B.
 136  
      * 
 137  
      * This method is particularly useful for jobs that are created
 138  
      * automatically.
 139  
      * 
 140  
      * @param job  The job instance to optimize.
 141  
      * @return A new job instance, perhaps with fewer stages.
 142  
      */
 143  
     public static Job optimize(Job job) {
 144  
         // First, figure out what source gets output from the stage, if any.
 145  0
         HashMap<String, String> outputs = new HashMap<String, String>();
 146  0
         HashMap<String, String> inputs = new HashMap<String, String>();
 147  
 
 148  0
         for (Stage stage : job.stages.values()) {
 149  
             // Output mapping
 150  0
             if (stage.steps.size() > 0) {
 151  0
                 Step lastStep = stage.steps.get(stage.steps.size() - 1);
 152  0
                 if (lastStep instanceof OutputStep) {
 153  0
                     OutputStep output = (OutputStep) lastStep;
 154  0
                     outputs.put(stage.name, output.getId());
 155  
                 }
 156  
 
 157  0
                 Step firstStep = stage.steps.get(0);
 158  0
                 if (firstStep instanceof InputStep) {
 159  0
                     InputStep input = (InputStep) firstStep;
 160  0
                     inputs.put(stage.name, input.getId());
 161  
                 }
 162  0
             }
 163  
         }
 164  
 
 165  
         // Create a mapping from String -> Stage
 166  0
         HashMap<String, Stage> stages = new HashMap<String, Stage>(job.stages);
 167  
 
 168  
         // Now, rip through the connections, and try to find a connection that links
 169  
         // one of these inputs to one of the outputs.
 170  
 
 171  0
         Iterator<Connection> iterator = job.connections.iterator();
 172  
         Connection connection;
 173  
 
 174  
         innerLoop:
 175  0
         while (iterator.hasNext()) {
 176  0
             connection = iterator.next();
 177  
 
 178  
             // For simplicity, find just connections with single inputs and outputs and no hashing
 179  0
             if (connection.inputs.size() != 1) {
 180  0
                 continue;
 181  
             }
 182  0
             if (connection.outputs.size() < 1) {
 183  0
                 continue;
 184  
             }
 185  0
             if (connection.getHash() != null) {
 186  0
                 continue;
 187  
             }
 188  0
             ConnectionEndPoint connectionInput = connection.inputs.get(0);
 189  0
             String stageOutputPointId = outputs.get(connectionInput.getStageName());
 190  
 
 191  
             // if the input to this connection is not the main output of a stage, skip
 192  0
             if (!connectionInput.getPointName().equals(stageOutputPointId)) {
 193  0
                 continue;            // make sure all of the outputs are the inputs of stages
 194  
             }
 195  0
             for (ConnectionEndPoint connectionOutput : connection.outputs) {
 196  0
                 String stageInputPointId = inputs.get(connectionOutput.getStageName());
 197  
 
 198  0
                 if (!connectionOutput.getPointName().equals(stageInputPointId)) {
 199  0
                     continue innerLoop;
 200  
                 }
 201  0
                 if (connectionOutput.getAssignment() == ConnectionAssignmentType.Combined) {
 202  0
                     continue innerLoop;
 203  
                 }
 204  0
             }
 205  
 
 206  
             // now we've verified that these stages can be combined together.
 207  0
             Stage source = stages.get(connectionInput.getStageName());
 208  
 
 209  0
             MultiStep multi = new MultiStep();
 210  0
             multi.groups = new ArrayList<ArrayList<Step>>();
 211  
 
 212  0
             for (ConnectionEndPoint connectionOutput : connection.outputs) {
 213  0
                 Stage destination = stages.get(connectionOutput.getStageName());
 214  
                 // getting ready: remove the first step, add on to the multi
 215  0
                 int length = destination.steps.size();
 216  0
                 ArrayList<Step> steps = new ArrayList<Step>(destination.steps.subList(1, length));
 217  0
                 multi.groups.add(steps);
 218  
 
 219  0
                 renameConnections(job, source, destination);
 220  
 
 221  
                 // combine dependence information
 222  0
                 source.connections.remove(connectionInput.getPointName());
 223  0
                 destination.connections.remove(connectionOutput.getPointName());
 224  0
                 source.connections.putAll(destination.connections);
 225  
 
 226  
                 // remove the destination stage
 227  0
                 job.stages.remove(destination);
 228  0
             }
 229  
 
 230  0
             source.steps.remove(source.steps.size() - 1);
 231  
 
 232  
             // only add a multi step if there were multiple outputs
 233  0
             if (multi.groups.size() == 1) {
 234  0
                 source.steps.addAll(multi.groups.get(0));
 235  
             } else {
 236  0
                 source.steps.add(multi);
 237  
             }
 238  
 
 239  
             // remove this connection
 240  0
             iterator.remove();
 241  
 
 242  
             // recurse to remove other connections
 243  0
             return optimize(job);
 244  
         }
 245  
 
 246  0
         return job;
 247  
     }
 248  
 
 249  
     public static void renameConnections(Job job, Stage source, Stage destination) {
 250  
         // for each connection, rename dest -> source.
 251  0
         for (Connection connection : job.connections) {
 252  0
             for (ConnectionEndPoint input : connection.inputs) {
 253  0
                 if (input.getStageName().equals(destination.name)) {
 254  0
                     input.setStageName(source.name);
 255  
                 }
 256  
             }
 257  
 
 258  0
             for (ConnectionEndPoint output : connection.outputs) {
 259  0
                 if (output.getStageName().equals(destination.name)) {
 260  0
                     output.setStageName(source.name);
 261  
                 }
 262  
             }
 263  
         }
 264  0
     }
 265  
 
 266  
     public void prepare() {
 267  0
         boolean successful = constructAndVerify();
 268  
 
 269  0
         if (!successful) {
 270  0
             return;
 271  
         }
 272  
 
 273  
         // look through each stage to see how many open files it will have.
 274  
         // if a particular stage will have a lot of open files, add some
 275  
         // intermediate merge stages.
 276  0
         if (needsMergeStages()) {
 277  0
             addMergeStages(job);
 278  0
             constructAndVerify();
 279  
         }
 280  
 
 281  0
         for (DataPipe pipe : pipes) {
 282  0
             pipe.makeDirectories();
 283  
         }
 284  0
     }
 285  
 
 286  
     public void clear() {
 287  0
         connections.clear();
 288  0
         stageChildren.clear();
 289  0
         stageOrder.clear();
 290  0
         stageParents.clear();
 291  0
         stages.clear();
 292  0
         pipes.clear();
 293  0
     }
 294  
 
 295  
     /**
 296  
      * Checks to see if any stage has too many inputs.
 297  
      * 
 298  
      * @return true, if there is a stage with more than maximumFileInputs.
 299  
      */
 300  
     public boolean needsMergeStages() {
 301  0
         for (StageGroupDescription stage : stages.values()) {
 302  0
             long totalInputs = 0;
 303  
 
 304  0
             if (stage.getName().endsWith("mergeStage")) {
 305  0
                 continue;
 306  
             }
 307  0
             for (DataPipeRegion region : stage.inputs.values()) {
 308  0
                 totalInputs += region.fileCount();
 309  
             }
 310  
 
 311  0
             if (totalInputs > this.maximumFileInputs) {
 312  0
                 return true;
 313  
             }
 314  0
         }
 315  
 
 316  0
         return false;
 317  
     }
 318  
 
 319  
     /**
 320  
      * Add a merge stage to this job that merges the output of
 321  
      * stageName called pointName.
 322  
      * 
 323  
      * @param job  The job that should get the new merge stage.
 324  
      * @param stageName  The stage that contains the output that needs merging.
 325  
      * @param pointName  The output point that needs merging in the stage stageName.
 326  
      */
 327  
     public void addMergeStage(Job job, String stageName, String pointName) {
 328  
         // find the stage and the point, initialize class/order information
 329  0
         Stage inputStage = job.stages.get(stageName);
 330  0
         StageConnectionPoint inputPoint = inputStage.getConnection(pointName);
 331  
 
 332  0
         String className = inputPoint.getClassName();
 333  0
         String[] typeOrder = inputPoint.getOrder();
 334  0
         String mergedStageName = stageName + "-" + pointName + "-mergeStage";
 335  0
         String mergedPointName = pointName + "-merged";
 336  
 
 337  
         // if this merge stage has already been added, don't add it again
 338  0
         if (job.stages.containsKey(mergedStageName)) {
 339  0
             return;        // create the stage itself
 340  
         }
 341  0
         Stage s = new Stage(mergedStageName);
 342  0
         s.add(new StageConnectionPoint(ConnectionPointType.Input,
 343  
                                        pointName,
 344  
                                        className,
 345  
                                        typeOrder,
 346  
                                        null));
 347  0
         s.add(new StageConnectionPoint(ConnectionPointType.Output,
 348  
                                        pointName + "-merged",
 349  
                                        className,
 350  
                                        typeOrder,
 351  
                                        null));
 352  
 
 353  0
         s.add(new InputStep(pointName));
 354  0
         s.add(new OutputStep(mergedPointName));
 355  0
         job.add(s);
 356  
 
 357  0
         String[] hash = null;
 358  0
         int hashCount = -1;
 359  
 
 360  
         // run through the connections list, find all inputs for the previous data
 361  0
         for (Connection connection : job.connections) {
 362  0
             for (ConnectionEndPoint input : connection.inputs) {
 363  0
                 if (input.getStageName().equals(stageName) &&
 364  
                         input.getPointName().equals(pointName)) {
 365  0
                     if (hash != null && connection.hash != null &&
 366  
                         !Arrays.equals(hash,connection.hash)) {
 367  0
                         continue;
 368  
                     }
 369  0
                     if (connection.hash != null) {
 370  0
                         hash = connection.hash;
 371  0
                         connection.hash = null;
 372  
                     }
 373  
 
 374  0
                     input.setStageName(mergedStageName);
 375  0
                     input.setPointName(mergedPointName);
 376  
                 }
 377  
             }
 378  
         }
 379  
 
 380  
         // now, add a connection between the producing stage and the merge stage
 381  0
         job.connect(new StagePoint(stageName, pointName),
 382  
                     new StagePoint(mergedStageName, pointName),
 383  
                     ConnectionAssignmentType.Each,
 384  
                     hash,
 385  
                     hashCount);
 386  0
     }
 387  
 
 388  
     /**
 389  
      * Find stages that need to open a lot of files for reading when running,
 390  
      * and add some intermediate merge stages to reduce problems with open files.
 391  
      * 
 392  
      * @param job
 393  
      */
 394  
     public void addMergeStages(Job job) {
 395  
         // look at each stage in the job
 396  0
         for (StageGroupDescription stage : stages.values()) {
 397  0
             long totalInputs = 0;
 398  
 
 399  
             // add up every file this stage will need to open
 400  0
             for (DataPipeRegion region : stage.inputs.values()) {
 401  0
                 totalInputs += region.fileCount();
 402  
             }
 403  
 
 404  
             // if this stage needs to open too many files, it might crash.
 405  
             // therefore, we want to add in an extra merge stage for every input
 406  
             // that contains a lot of files.
 407  0
             if (totalInputs > this.maximumFileInputs) {
 408  0
                 Stage s = stage.getStage();
 409  0
                 ArrayList<Connection> relevantConnections = new ArrayList<Connection>();
 410  
 
 411  
                 // look for connections that point to this stage and store them
 412  0
                 for (Connection connection : job.connections) {
 413  0
                     for (ConnectionEndPoint point : connection.outputs) {
 414  0
                         if (point.getStageName().equals(s.name)) {
 415  0
                             relevantConnections.add(connection);
 416  
                         }
 417  
                     }
 418  
                 }
 419  
 
 420  
                 // for each stage we marked in the last loop, find ones where all
 421  
                 // the inputs come from a single stage, and that stage has a large
 422  
                 // instanceCount (it generates a lot of files), and make a merge stage.
 423  0
                 for (Connection connection : relevantConnections) {
 424  0
                     if (connection.inputs.size() != 1) {
 425  0
                         continue;
 426  
                     }
 427  0
                     ConnectionEndPoint endPoint = connection.inputs.get(0);
 428  0
                     String inputStageName = endPoint.getStageName();
 429  0
                     String inputPointName = endPoint.getPointName();
 430  0
                     StageGroupDescription inputStageDesc = stages.get(inputStageName);
 431  
 
 432  
                     // if there's no description, that means we just added it
 433  0
                     if (inputStageDesc == null ||
 434  
                             inputStageName.endsWith("mergeStage")) {
 435  0
                         continue;
 436  
                     }
 437  0
                     if (inputStageDesc.instanceCount > 1) {
 438  0
                         addMergeStage(job, inputStageName, inputPointName);
 439  
                     }
 440  0
                 }
 441  
             }
 442  0
         }
 443  0
     }
 444  
 
 445  0
     private static class EndPointName implements Comparable<EndPointName> {
 446  
         public String stageName;
 447  
         public String pointName;
 448  
         public ConnectionPointType type;
 449  
         public FileLocation location;
 450  
 
 451  0
         public EndPointName(String stageName, String pointName, ConnectionPointType type, FileLocation location) {
 452  0
             this.stageName = stageName;
 453  0
             this.pointName = pointName;
 454  0
             this.type = type;
 455  0
             this.location = location;
 456  0
         }
 457  
 
 458  
         public EndPointName(String stageName, String pointName, ConnectionPointType type) {
 459  0
             this(stageName, pointName, type, null);
 460  0
         }
 461  
 
 462  
         public int compareTo(EndPointName other) {
 463  0
             int result = stageName.compareTo(other.stageName);
 464  0
             if (result == 0) {
 465  0
                 result = pointName.compareTo(other.pointName);
 466  
             }
 467  0
             if (result == 0) {
 468  0
                 result = type.compareTo(other.type);
 469  
             }
 470  0
             return result;
 471  
         }
 472  
 
 473  
         @Override
 474  
         public int hashCode() {
 475  0
             return stageName.hashCode() + 7 * pointName.hashCode() + 15 * type.toString().hashCode();
 476  
         }
 477  
 
 478  
         @Override
 479  
         public String toString() {
 480  0
             return String.format("%s %s %s", stageName, pointName, type.toString());
 481  
         }
 482  
     }
 483  
 
 484  
     /**
 485  
      * In the parameter file, each stage has a connections section that describes a set of
 486  
      * connection endpoints for the stage (inputs and outputs).  This method verifies that
 487  
      * all of those endpoints are connected to valid connections, defined under the 
 488  
      * connections tag in the job.  If the method finds an dangling (unconnected) endpoint, 
 489  
      * an error message is added to the ErrorStore.
 490  
      */
 491  
     public void findDanglingEndpoints(final Job job) {
 492  0
         TreeSet<EndPointName> endPointNames = new TreeSet();
 493  
 
 494  
         // First, make a list of all endpoints referenced in all stages.
 495  0
         for (Stage stage : job.stages.values()) {
 496  
             // find the corresponding description object
 497  0
             StageGroupDescription description = stages.get(stage.name);
 498  
 
 499  
             // add all connection points to the set
 500  0
             for (StageConnectionPoint point : stage.connections.values()) {
 501  0
                 EndPointName ep = new EndPointName(stage.name, point.getExternalName(), point.
 502  
                                                    getType(), point.location);
 503  0
                 endPointNames.add(ep);
 504  0
             }
 505  0
         }
 506  
 
 507  
         // Now we have a list of referenced names.  We now remove every endpoint that
 508  
         // is referenced in the connections section.
 509  0
         for (ConnectionDescription connection : connections.values()) {
 510  0
             for (EndPointDescription input : connection.inputs) {
 511  0
                 EndPointName ep = new EndPointName(input.stage.getName(),
 512  
                                                    input.stagePoint.getExternalName(),
 513  
                                                    input.stagePoint.getType());
 514  0
                 endPointNames.remove(ep);
 515  0
             }
 516  0
             for (EndPointDescription output : connection.outputs) {
 517  0
                 EndPointName ep = new EndPointName(output.stage.getName(),
 518  
                                                    output.stagePoint.getExternalName(),
 519  
                                                    output.stagePoint.getType());
 520  0
                 endPointNames.remove(ep);
 521  0
             }
 522  
         }
 523  
 
 524  
 
 525  0
         for (EndPointName ep : endPointNames) {
 526  0
             store.addError(ep.location,
 527  
                            ep.stageName + ": No connection references the " + ep.type +
 528  
                            " with the name '" + ep.pointName + "'.");
 529  
         }
 530  0
     }
 531  
 
 532  
     private boolean constructAndVerify() {
 533  0
         clear();
 534  
 
 535  
         // first, we make stage group descriptions, getting ready to add connections in
 536  0
         for (Stage stage : job.stages.values()) {
 537  0
             stages.put(stage.name, new StageGroupDescription(stage));
 538  
         }
 539  
 
 540  
         // verify each stage in the job to make sure that
 541  
         // the connections between individual steps are typesafe, that
 542  
         // step classes exist, etc.
 543  0
         Verification.verify(job, store);
 544  0
         if (store.getErrors().size() > 0) {
 545  0
             return false;
 546  
         }
 547  
 
 548  
         // build data about connections between stages, while verifying
 549  
         // type safety between stage connections.
 550  0
         buildConnections(job);
 551  0
         if (store.getErrors().size() > 0) {
 552  0
             return false;
 553  
         }
 554  
 
 555  0
         findDanglingEndpoints(job);
 556  0
         if (store.getErrors().size() > 0) {
 557  0
             return false;
 558  
         }
 559  
 
 560  0
         generateDependencies();
 561  0
         if (store.getErrors().size() > 0) {
 562  0
             return false;
 563  
         }
 564  
 
 565  0
         determineStageOrder();
 566  0
         if (store.getErrors().size() > 0) {
 567  0
             return false;
 568  
         }
 569  
 
 570  0
         countStages();
 571  0
         if (store.getErrors().size() > 0) {
 572  0
             return false;
 573  
         }
 574  
 
 575  0
         createPipeObjects();
 576  0
         return true;
 577  
     }
 578  
 
 579  
     private void generateDependencies() {
 580  
         // generate a list of stages
 581  0
         for (StageGroupDescription stage : stages.values()) {
 582  0
             stageChildren.put(stage.getName(), new HashSet());
 583  0
             stageParents.put(stage.getName(), new HashSet());
 584  
         }
 585  
 
 586  0
         for (ConnectionDescription connection : connections.values()) {
 587  0
             for (EndPointDescription input : connection.inputs) {
 588  0
                 for (EndPointDescription output : connection.outputs) {
 589  0
                     stageChildren.get(input.getStageName()).add(output.getStageName());
 590  0
                     stageParents.get(output.getStageName()).add(input.getStageName());
 591  
                 }
 592  
             }
 593  
         }
 594  0
     }
 595  
 
 596  
     private void determineStageOrder() {
 597  0
         ArrayList<String> result = new ArrayList();
 598  0
         HashSet<String> used = new HashSet();
 599  0
         HashSet<String> batch = new HashSet();
 600  
 
 601  0
         for (String stageName : stageParents.keySet()) {
 602  0
             if (stageParents.get(stageName).size() == 0) {
 603  0
                 batch.add(stageName);
 604  
             }
 605  
         }
 606  
 
 607  0
         while (batch.size() > 0) {
 608  0
             HashSet<String> nextBatch = new HashSet();
 609  
 
 610  0
             for (String stageName : batch) {
 611  0
                 result.add(stageName);
 612  0
                 used.add(stageName);
 613  
 
 614  0
                 HashSet<String> children = stageChildren.get(stageName);
 615  
 
 616  
                 // adding this stage to the list may have unblocked a child
 617  0
                 for (String child : children) {
 618  0
                     HashSet<String> childParents = stageParents.get(child);
 619  
 
 620  0
                     if (!used.contains(child) && used.containsAll(childParents)) {
 621  0
                         nextBatch.add(child);
 622  
                     }
 623  0
                 }
 624  0
             }
 625  
 
 626  0
             batch = nextBatch;
 627  0
             nextBatch = new HashSet();
 628  0
         }
 629  
 
 630  0
         assert result.size() == stages.size();
 631  0
         stageOrder = result;
 632  0
     }
 633  
 
 634  
     private static class EndPointDescription {
 635  
         public EndPointDescription(ConnectionDescription connection,
 636  
                                    StageGroupDescription stage,
 637  
                                    ConnectionEndPoint connectionPoint,
 638  0
                                    StageConnectionPoint stagePoint) {
 639  0
             this.connectionPoint = connectionPoint;
 640  0
             this.stagePoint = stagePoint;
 641  0
             this.stage = stage;
 642  0
             this.connection = connection;
 643  0
         }
 644  
 
 645  
         public String getStageName() {
 646  0
             return stage.getName();
 647  
         }
 648  
 
 649  
         public StageConnectionPoint getStagePoint() {
 650  0
             return stagePoint;
 651  
         }
 652  
 
 653  
         public ConnectionEndPoint getConnectionPoint() {
 654  0
             return connectionPoint;
 655  
         }
 656  
 
 657  
         public ConnectionDescription getConnection() {
 658  0
             return connection;
 659  
         }
 660  
         public StageGroupDescription stage;
 661  
         public StageConnectionPoint stagePoint;
 662  
         public ConnectionEndPoint connectionPoint;
 663  
         public ConnectionDescription connection;
 664  
     }
 665  
 
 666  0
     private class ConnectionDescription {
 667  0
         public ConnectionDescription(Connection connection) {
 668  0
             this.connection = connection;
 669  0
             this.inputs = new ArrayList();
 670  0
             this.outputs = new ArrayList();
 671  0
         }
 672  
 
 673  
         public boolean isHashed() {
 674  0
             return connection.hash != null;
 675  
         }
 676  
 
 677  
         public int getOutputCount() {
 678  0
             int result = 1;
 679  
 
 680  0
             if (isHashed()) {
 681  0
                 String hashCount = job.properties.get("hashCount");
 682  
 
 683  0
                 if (connection.getHashCount() > 0) {
 684  0
                     result = connection.getHashCount();
 685  0
                 } else if (hashCount != null &&
 686  
                         Utility.isInteger(hashCount)) {
 687  0
                     result = Integer.parseInt(hashCount);
 688  
                 } else {
 689  0
                     result = defaultHashCount;
 690  
                 }
 691  0
             } else {
 692  0
                 result = getInputCount();
 693  
             }
 694  
 
 695  0
             return result;
 696  
         }
 697  
 
 698  
         public int getInputCount() {
 699  0
             int total = 0;
 700  
 
 701  0
             for (EndPointDescription input : inputs) {
 702  0
                 total += input.stage.instanceCount;
 703  
             }
 704  
 
 705  0
             return total;
 706  
         }
 707  
 
 708  
         public String getName() {
 709  0
             return connection.getName();
 710  
         }
 711  
 
 712  
         public String[] getOrder() {
 713  0
             return connection.getOrder();
 714  
         }
 715  
 
 716  
         public String[] getHash() {
 717  0
             return connection.getHash();
 718  
         }
 719  
 
 720  
         private String getClassName() {
 721  0
             return connection.getClassName();
 722  
         }
 723  
 
 724  
         public DataPipe getPipe() {
 725  0
             return pipe;
 726  
         }
 727  
 
 728  
         public void setPipe(DataPipe pipe) {
 729  0
             this.pipe = pipe;
 730  0
         }
 731  
 
 732  
         @Override
 733  
         public String toString() {
 734  0
             return String.format("%s %s", getClassName(), getName());
 735  
         }
 736  
         public Connection connection;
 737  
         public ArrayList<EndPointDescription> inputs;
 738  
         public ArrayList<EndPointDescription> outputs;
 739  
         public DataPipe pipe;
 740  
     }
 741  
 
 742  
     private EndPointDescription createEndPoint(ConnectionDescription connection,
 743  
                                                ConnectionEndPoint endPoint) {
 744  0
         StageGroupDescription stageDescription = stages.get(endPoint.getStageName());
 745  
 
 746  0
         if (stageDescription == null) {
 747  0
             store.addError(endPoint.location,
 748  
                            "The stage '" + endPoint.getStageName() + "' was not found.");
 749  
         } else {
 750  0
             Stage stage = stageDescription.getStage();
 751  0
             StageConnectionPoint point = stage.getConnection(endPoint.getPointName());
 752  
 
 753  0
             if (point == null) {
 754  0
                 store.addError(endPoint.location, "The endpoint '" + endPoint.getPointName() + "' wasn't found in this stage, " +
 755  
                                "even though there is a connection to it.");
 756  0
             } else if (!ConnectionPointType.connectable(endPoint.getType(), point.getType())) {
 757  0
                 store.addError(endPoint.location,
 758  
                                "The endpoint '" + endPoint.getPointName() + "' is in this stage, but it's going the wrong direction.");
 759  0
             } else if (!point.getClassName().equals(connection.connection.getClassName())) {
 760  0
                 store.addError(endPoint.location, "This " + point.getType() + " has a different class name '" + point.getClassName() +
 761  
                                " than the connection that connects to it: " + connection.connection.
 762  
                                getClassName() + ".");
 763  0
             } else if (!Arrays.equals(point.getOrder(), connection.connection.getOrder())) {
 764  0
                 store.addError(endPoint.location, "This " + point.getType() + " has a different order " + Arrays.toString(point.getOrder()) +
 765  
                                " than the connection that connects to it: " + Arrays.toString(
 766  
                                connection.connection.getOrder()));
 767  
             } else {
 768  0
                 return new EndPointDescription(connection, stageDescription, endPoint, point);
 769  
             }
 770  
         }
 771  
 
 772  0
         return null;
 773  
     }
 774  
 
 775  
     private ArrayList<EndPointDescription> createEndPoints(ConnectionDescription connection,
 776  
                                                            ArrayList<ConnectionEndPoint> endPoints) {
 777  0
         ArrayList<EndPointDescription> results = new ArrayList<EndPointDescription>();
 778  
 
 779  0
         for (ConnectionEndPoint endPoint : endPoints) {
 780  0
             EndPointDescription epd = createEndPoint(connection, endPoint);
 781  
 
 782  0
             if (epd != null) {
 783  0
                 results.add(epd);
 784  
             }
 785  0
         }
 786  
 
 787  0
         return results;
 788  
     }
 789  
 
 790  
     /**
 791  
      * Creates ConnectionDescription objects for each connection listed in
 792  
      * the Job object.  The ConnectionDescription objects combine information
 793  
      * from the StageConnectionPoints (in the Stages) and the ConnectionEndPoints
 794  
      * (in the Job.Connection objects) to make them easier to access.  In the process
 795  
      * of making these objects, this method typechecks all of the connections.
 796  
      */
 797  
     private void buildConnections(final Job job) {
 798  0
         for (Connection connection : job.connections) {
 799  0
             ConnectionDescription description = new ConnectionDescription(connection);
 800  
 
 801  
             // verify that the class, order, and hash exist
 802  0
             ErrorHandler handler = store.getErrorHandler(connection.location);
 803  0
             Verification.requireClass(connection.getClassName(), handler);
 804  0
             Verification.requireOrder(connection.getClassName(), connection.getOrder(), handler);
 805  
 
 806  0
             if (connection.getHash() != null) {
 807  0
                 Verification.requireOrder(connection.getClassName(),
 808  
                                                                         connection.getHash(),
 809  
                                                                         handler);
 810  
             }
 811  0
             description.inputs = createEndPoints(description, connection.inputs);
 812  0
             description.outputs = createEndPoints(description, connection.outputs);
 813  0
             connections.put(connection.getName(), description);
 814  0
         }
 815  0
     }
 816  
 
 817  
     /**
 818  
      * This method computes the number of copies of each stage to run.  
 819  
      * The execution count of stage depends on its inputs.  If the input
 820  
      * for a stage is hashed 200 ways, for instance, then there will need
 821  
      * to be 200 copies of the stage. 
 822  
      */
 823  
     private void countStages() {
 824  0
         HashMap<String, HashSet<EndPointDescription>> stageInputs = new HashMap();
 825  0
         HashMap<String, HashSet<EndPointDescription>> stageOutputs = new HashMap();
 826  
 
 827  0
         for (String stageName : stages.keySet()) {
 828  0
             stageInputs.put(stageName, new HashSet());
 829  0
             stageOutputs.put(stageName, new HashSet());
 830  
         }
 831  
 
 832  0
         for (ConnectionDescription connection : connections.values()) {
 833  0
             for (EndPointDescription endPoint : connection.inputs) {
 834  0
                 stageOutputs.get(endPoint.stage.getName()).add(endPoint);
 835  
             }
 836  0
             for (EndPointDescription endPoint : connection.outputs) {
 837  0
                 stageInputs.get(endPoint.stage.getName()).add(endPoint);
 838  
             }
 839  
         }
 840  
 
 841  0
         for (String stageName : stageOrder) {
 842  0
             StageGroupDescription stage = stages.get(stageName);
 843  
 
 844  
             // if stage has no inputs, then we store 1 in stageCounts
 845  0
             if (stageInputs.get(stageName).size() == 0) {
 846  0
                 stage.instanceCount = 1;
 847  
             } else {
 848  
                 // find out what the assignment is for this connection.
 849  0
                 int instanceCount = 1;
 850  0
                 boolean unknown = true;
 851  
 
 852  0
                 HashSet<EndPointDescription> inputs = stageInputs.get(stageName);
 853  
 
 854  0
                 for (EndPointDescription description : inputs) {
 855  0
                     ConnectionEndPoint point = description.getConnectionPoint();
 856  0
                     ConnectionAssignmentType assignment = point.getAssignment();
 857  
 
 858  0
                     switch (assignment) {
 859  
                         case One:
 860  0
                             store.addError(point.location,
 861  
                                            "The 'one' mode is not currently supported.");
 862  0
                             break;
 863  
 
 864  
                         case Each:
 865  0
                             int inputCount = description.connection.getOutputCount();
 866  
 
 867  0
                             if (unknown) {
 868  0
                                 instanceCount = inputCount;
 869  0
                                 unknown = false;
 870  0
                             } else if (!unknown && instanceCount != inputCount) {
 871  0
                                 store.addError(point.location, "The number of stage instances for '" +
 872  
                                                stageName + "' is ambiguous (" + inputCount +
 873  
                                                " or " + instanceCount + ")");
 874  
                             }
 875  
                             break;
 876  
 
 877  
                         case Combined:
 878  
                             // this doesn't affect the number of stages.
 879  
                             break;
 880  
                     }
 881  0
                 }
 882  
 
 883  0
                 if (unknown) {
 884  0
                     instanceCount = 1;
 885  
                 }
 886  0
                 stage.instanceCount = instanceCount;
 887  
             }
 888  0
         }
 889  0
     }
 890  
 
 891  
     /**
 892  
      * Creates the directory structures to hold the files for this job.
 893  
      */
 894  
     private void createPipeObjects() {
 895  
         // Now, we need to create the connections
 896  0
         for (ConnectionDescription connection : connections.values()) {
 897  
             // Make the parent directory
 898  0
             String directoryName = temporaryStorage + File.separator + connection.getName();
 899  0
             new File(directoryName).mkdir();
 900  
 
 901  0
             DataPipe pipe = new DataPipe(directoryName,
 902  
                                          connection.getName(),
 903  
                                          connection.getClassName(),
 904  
                                          connection.getOrder(),
 905  
                                          connection.getHash(),
 906  
                                          connection.getInputCount(),
 907  
                                          connection.getOutputCount());
 908  
 
 909  0
             int startIndex = 0;
 910  0
             connection.setPipe(pipe);
 911  
 
 912  0
             for (EndPointDescription input : connection.inputs) {
 913  0
                 StageGroupDescription description = stages.get(input.getStageName());
 914  0
                 description.outputs.put(input.getStagePoint().getInternalName(),
 915  
                                         new DataPipeRegion(pipe,
 916  
                                                            startIndex,
 917  
                                                            startIndex + description.getInstanceCount(),
 918  
                                                            ConnectionPointType.Input));
 919  0
                 startIndex += description.getInstanceCount();
 920  0
             }
 921  
 
 922  0
             for (EndPointDescription output : connection.outputs) {
 923  0
                 StageGroupDescription description = stages.get(output.getStageName());
 924  0
                 description.inputs.put(output.getStagePoint().getInternalName(),
 925  
                                        new DataPipeRegion(pipe,
 926  
                                                           0,
 927  
                                                           connection.getOutputCount(),
 928  
                                                           ConnectionPointType.Output));
 929  0
             }
 930  
 
 931  0
             pipes.add(pipe);
 932  0
         }
 933  0
     }
 934  
 
 935  
     public static class JobExecutionStatus {
 936  
         // these are the names of all stages that have completed
 937  0
         HashMap<String, StageExecutionStatus> completedStages = new HashMap<String, StageExecutionStatus>();
 938  
         // named of all stages that have been launched (contains all completed stages too)
 939  0
         HashSet<String> launchedStages = new HashSet<String>();
 940  
         // all stages that are currently running.  Includes a Future object that can be used
 941  
         // to wait for the stage to complete (and to get exceptions thrown by the stage)
 942  0
         HashMap<String, StageExecutionStatus> runningStages = new HashMap<String, StageExecutionStatus>();
 943  
         // names of connections that are complete, meaning that all their inputs have been created
 944  0
         HashSet<String> completedConnections = new HashSet<String>();
 945  
         // map from connection names to the names of stages that provide inputs to the connection
 946  0
         HashMap<String, HashSet<String>> connectionDependencies = new HashMap<String, HashSet<String>>();
 947  
 
 948  
         // reference to the parent class.
 949  
         HashMap<String, StageGroupDescription> stages;
 950  
         // reference to the parent class.
 951  
         String temporaryStorage;
 952  
         StageExecutor executor;
 953  
         Date startDate;
 954  
 
 955  
         public JobExecutionStatus(HashMap<String, StageGroupDescription> stages,
 956  0
                 String temporaryStorage, StageExecutor executor, String masterURL) {
 957  0
             this.stages = stages;
 958  0
             this.temporaryStorage = temporaryStorage;
 959  0
             this.executor = executor;
 960  0
             this.startDate = new Date();
 961  
 
 962  0
             for (StageGroupDescription description : stages.values()) {
 963  
                 // build a list of dependencies from pipe inputs to stage names
 964  0
                 for (DataPipeRegion region : description.outputs.values()) {
 965  0
                     String pipeName = region.pipe.pipeName;
 966  
 
 967  0
                     if (!connectionDependencies.containsKey(pipeName)) {
 968  0
                         connectionDependencies.put(
 969  
                                 pipeName, new HashSet<String>());
 970  
                     }
 971  0
                     connectionDependencies.get(pipeName).add(description.getName());
 972  0
                 }
 973  
 
 974  0
                 description.setMasterURL(masterURL);
 975  
             }
 976  0
         }
 977  
 
 978  
         class BlockedExecutionStatus implements StageExecutionStatus {
 979  
             String name;
 980  
             int instances;
 981  
             
 982  0
             BlockedExecutionStatus(String name, int instances) {
 983  0
                 this.name = name;
 984  0
                 this.instances = instances;
 985  0
             }
 986  
 
 987  0
             public String getName() { return name; }
 988  0
             public int getBlockedInstances() { return instances; }
 989  0
             public int getQueuedInstances() { return 0; }
 990  0
             public int getRunningInstances() { return 0; }
 991  0
             public int getCompletedInstances() { return 0; }
 992  0
             public boolean isDone() { return false; }
 993  0
             public List<Exception> getExceptions() { return Collections.EMPTY_LIST; }
 994  
         }
 995  
 
 996  
         public synchronized boolean isComplete() {
 997  0
             return stages.size() == completedStages.size();
 998  
         }
 999  
 
 1000  
         public synchronized Map<String, StageExecutionStatus> getStageStatus() {
 1001  0
             Map<String, StageExecutionStatus> result = new TreeMap();
 1002  
 
 1003  0
             for (String stageName : stages.keySet()) {
 1004  0
                 int instanceCount = stages.get(stageName).getInstanceCount();
 1005  0
                 if (completedStages.containsKey(stageName)) {
 1006  0
                     result.put(stageName, completedStages.get(stageName));
 1007  0
                 } else if (runningStages.containsKey(stageName)) {
 1008  0
                     result.put(stageName, runningStages.get(stageName));
 1009  
                 } else {
 1010  0
                     result.put(stageName, new BlockedExecutionStatus(stageName, instanceCount));
 1011  
                 }
 1012  0
             }
 1013  
 
 1014  0
             return result;
 1015  
         }
 1016  
 
 1017  
         /**
 1018  
          * Returns the start date for this job.
 1019  
          */
 1020  
 
 1021  
         public Date getStartDate() {
 1022  0
             return startDate;
 1023  
         }
 1024  
 
 1025  
         /**
 1026  
          * Returns the total amount of free memory in this JVM.
 1027  
          */
 1028  
 
 1029  
         public long getFreeMemory() {
 1030  0
             return Runtime.getRuntime().freeMemory();
 1031  
         }
 1032  
 
 1033  
         /**
 1034  
          * Returns the maximum amount of memory that can be used by this
 1035  
          * Java virtual machine.
 1036  
          */
 1037  
 
 1038  
         public long getMaxMemory() {
 1039  0
             return Runtime.getRuntime().maxMemory();
 1040  
         }
 1041  
 
 1042  
         public void run() throws InterruptedException, ExecutionException {
 1043  
             // while there are incomplete stages, choose one to execute
 1044  0
             while (launchedStages.size() < stages.size()) {
 1045  
                 // look for stages where all of their inputs are complete
 1046  0
                 StageGroupDescription description = findRunnableStage(stages.values(), launchedStages,
 1047  
                                                                       completedConnections);
 1048  
 
 1049  
                 // didn't find any runnable stages, so we need to check to
 1050  
                 // see if any other stages have finished recently that might have
 1051  
                 // generated interesting input for pending stages.
 1052  0
                 while (description == null) {
 1053  
                     // wait for at least one stage to complete
 1054  0
                     waitForStages(runningStages, completedStages);
 1055  0
                     updateCompletedConnections(completedStages, completedConnections,
 1056  
                                                connectionDependencies);
 1057  
 
 1058  
                     // now, try again to find a runnable stage
 1059  0
                     description = findRunnableStage(stages.values(), launchedStages,
 1060  
                                                     completedConnections);
 1061  
                 }
 1062  
 
 1063  0
                 StageExecutionStatus result = executor.execute(description, temporaryStorage);
 1064  
 
 1065  0
                 synchronized(this) {
 1066  0
                     launchedStages.add(description.stage.name);
 1067  0
                     runningStages.put(description.stage.name, result);
 1068  0
                 }
 1069  0
             }
 1070  
 
 1071  
             // wait for everything to complete
 1072  0
             while (runningStages.size() > 0) {
 1073  0
                 waitForStages(runningStages, completedStages);
 1074  
             }
 1075  0
         }
 1076  
 
 1077  
         /**
 1078  
          * Finds a stage that is ready to run by checking stage dependencies.
 1079  
          *
 1080  
          * @param descriptions
 1081  
          * @param launchedJobs
 1082  
          * @param completedConnections
 1083  
          */
 1084  
         private synchronized StageGroupDescription findRunnableStage(
 1085  
                 Collection<StageGroupDescription> descriptions,
 1086  
                 HashSet<String> launchedJobs,
 1087  
                 HashSet<String> completedConnections) {
 1088  
             // for each job we might want to launch
 1089  0
             for (StageGroupDescription description : descriptions) {
 1090  
                 // if it has already been launched, we don't need to do it again
 1091  0
                 if (launchedJobs.contains(description.getName())) {
 1092  0
                     continue;            // are the inputs to this stage ready?
 1093  
                 }
 1094  0
                 boolean allComplete = true;
 1095  
 
 1096  0
                 for (DataPipeRegion region : description.inputs.values()) {
 1097  
                     // if this input is incomplete, we can't run this stage yet
 1098  0
                     if (!completedConnections.contains(region.pipe.pipeName)) {
 1099  0
                         allComplete = false;
 1100  0
                         break;
 1101  
                     }
 1102  
                 }
 1103  
 
 1104  0
                 if (allComplete) {
 1105  0
                     return description;
 1106  
                 }
 1107  0
             }
 1108  
 
 1109  
             // there are no stages ready to run
 1110  0
             return null;
 1111  
         }
 1112  
         /**
 1113  
          * Polls all the running stages to see if they've completed.  When one completes,
 1114  
          * it is added to completedStages and the method returns.
 1115  
          * 
 1116  
          * @param runningStages
 1117  
          * @param completedStages
 1118  
          * @throws java.lang.InterruptedException
 1119  
          * @throws java.util.concurrent.ExecutionException
 1120  
          */
 1121  
         private void waitForStages(
 1122  
                 HashMap<String, StageExecutionStatus> runningStages,
 1123  
                 final HashMap<String, StageExecutionStatus> completedStages)
 1124  
                 throws InterruptedException, ExecutionException {
 1125  0
             long delay = 1;
 1126  
 
 1127  0
             while (runningStages.size() > 0) {
 1128  0
                 synchronized(this) {
 1129  0
                     for (String name : runningStages.keySet()) {
 1130  0
                         StageExecutionStatus status = runningStages.get(name);
 1131  0
                         if (status.isDone()) {
 1132  
                             // force the exception to throw
 1133  0
                             List<Exception> exceptions = status.getExceptions();
 1134  0
                             if (exceptions.size() > 0) {
 1135  0
                                 throw new ExecutionException("Stage threw an exception: ", exceptions.get(0));
 1136  
                             }
 1137  0
                             completedStages.put(name, status);
 1138  0
                             runningStages.remove(name);
 1139  0
                             return;
 1140  
                         }
 1141  0
                     }
 1142  0
                 }
 1143  
 
 1144  
                 // check at least once a second, but poll faster at first
 1145  0
                 delay = Math.min(delay * 2, 1000);
 1146  0
                 Thread.sleep(delay);
 1147  
             }
 1148  0
         }
 1149  
 
 1150  
         /**
 1151  
          * Reads through all completed stages, trying to find any connections
 1152  
          * that have been satisfied.  By satisfied, we mean that all the input
 1153  
          * has been generated for a particular connection.  Once a connection
 1154  
          * is satisfied, stages that require that read input from that connection
 1155  
          * can start running.
 1156  
          *
 1157  
          * @param completedStages
 1158  
          * @param completedConnections
 1159  
          * @param connectionDependencies
 1160  
          */
 1161  
         private synchronized void updateCompletedConnections(
 1162  
                 final HashMap<String, StageExecutionStatus> completedStages,
 1163  
                 final HashSet<String> completedConnections,
 1164  
                 final HashMap<String, HashSet<String>> connectionDependencies) {
 1165  
             // Loop over all connections:
 1166  0
             for (String pipeName : connectionDependencies.keySet()) {
 1167  
                 // These are all the stages that need to be completed before
 1168  
                 // the connection pipeName is satisfied.
 1169  0
                 HashSet<String> pipeInputStages = connectionDependencies.get(pipeName);
 1170  
 
 1171  0
                 if (completedStages.keySet().containsAll(pipeInputStages)) {
 1172  0
                     completedConnections.add(pipeName);
 1173  
                 }
 1174  0
             }
 1175  0
         }
 1176  
     }
 1177  
 
 1178  
     public void run(StageExecutor executor) throws InterruptedException, ExecutionException, UnknownHostException, IOException {
 1179  0
         Server server = new Server(Utility.getFreePort());
 1180  0
         runWithServer(executor, server);
 1181  0
     }
 1182  
 
 1183  
     public void runWithServer(StageExecutor executor, Server server) throws ExecutionException, InterruptedException, UnknownHostException {
 1184  
         // FIXME: all of this needs to be refactored.
 1185  0
         InetAddress address = java.net.InetAddress.getLocalHost();
 1186  0
         int port = server.getConnectors()[0].getPort();
 1187  0
         String masterURL = String.format("http://%s:%d", address.getHostAddress(), port);
 1188  0
         JobExecutionStatus status = new JobExecutionStatus(stages, temporaryStorage, executor, masterURL);
 1189  0
         MasterWebHandler handler = new MasterWebHandler(status);
 1190  0
         server.addHandler(handler);
 1191  0
         status.run();
 1192  0
         handler.waitForFinalPage();
 1193  0
         server.removeHandler(handler);
 1194  0
     }
 1195  
 
 1196  
     public static boolean runLocally(Job job, ErrorStore store) throws IOException,
 1197  
             InterruptedException, ExecutionException, Exception {
 1198  0
         StageExecutor executor = StageExecutorFactory.newInstance("local", new String[] {});
 1199  0
         File tempFile = Utility.createTemporary();
 1200  0
         tempFile.delete();
 1201  0
         tempFile.mkdir();
 1202  
         
 1203  0
         JobExecutor jobExecutor = new JobExecutor(job, tempFile.getAbsolutePath(), store);
 1204  0
         jobExecutor.prepare();
 1205  
 
 1206  0
         if (store.hasStatements()) {
 1207  0
             return false;
 1208  
         }
 1209  
 
 1210  0
         int port = Utility.getFreePort();
 1211  0
         Server server = new Server(port);
 1212  0
         server.start();
 1213  0
         System.out.println("Status: http://localhost:" + port);
 1214  
         try {
 1215  0
             jobExecutor.runWithServer(executor, server);
 1216  
         } finally {
 1217  0
             server.stop();
 1218  0
             executor.shutdown();
 1219  0
         }
 1220  
         
 1221  0
         return !store.hasStatements();
 1222  
     }
 1223  
 
 1224  
     public static void main(String[] args) throws ParserConfigurationException, SAXException,
 1225  
             IOException, InterruptedException, ExecutionException {
 1226  0
         if (args.length < 3) {
 1227  0
             System.out.println("usage: executionModel parameterFile temporaryStorage");
 1228  0
             System.out.println(
 1229  
                     "   where executionModel is one of: local, drmaa, ssh, debug, remotedebug");
 1230  0
             System.exit(-1);
 1231  
         }
 1232  
 
 1233  0
         String executionModel = args[0];
 1234  0
         String parameterFile = args[1];
 1235  0
         String temporaryStorage = args[2];
 1236  0
         String[] remaining = Utility.subarray(args, 3);
 1237  0
         StageExecutor executor = null;
 1238  
 
 1239  0
         Logger logger = Logger.getLogger(JobExecutor.class.toString());
 1240  0
         logger.setLevel(Level.INFO);
 1241  
 
 1242  0
         executor = StageExecutorFactory.newInstance(executionModel, remaining);
 1243  0
         ErrorStore store = new ErrorStore();
 1244  
 
 1245  
         // First, parse the job
 1246  0
         Job job = parseFile(parameterFile, store);
 1247  
 
 1248  0
         if (store.hasStatements()) {
 1249  0
             System.out.println(store.toString());
 1250  0
             System.exit(-1);
 1251  
         }
 1252  
 
 1253  0
         JobExecutor jobExecutor = new JobExecutor(job, temporaryStorage, store);
 1254  0
         jobExecutor.prepare();
 1255  
 
 1256  0
         if (store.hasStatements()) {
 1257  0
             System.out.println(store.toString());
 1258  0
             System.exit(-1);
 1259  
         }
 1260  
 
 1261  0
         jobExecutor.run(executor);
 1262  0
         logger.info("Job complete");
 1263  0
         executor.shutdown();
 1264  0
     }
 1265  
 }