View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   
3   package org.galagosearch.tupleflow.execution;
4   
5   import java.net.UnknownHostException;
6   import org.mortbay.jetty.Server;
7   import org.galagosearch.tupleflow.Utility;
8   import org.galagosearch.tupleflow.Processor;
9   import org.galagosearch.tupleflow.TypeReader;
10  import org.galagosearch.tupleflow.execution.Job.StagePoint;
11  import org.galagosearch.tupleflow.execution.StageGroupDescription.DataPipeRegion;
12  import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeInput;
13  import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeOutput;
14  import java.io.File;
15  import java.io.IOException;
16  import java.io.StringReader;
17  import java.net.InetAddress;
18  import java.util.ArrayList;
19  import java.util.Arrays;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.Date;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  import java.util.TreeSet;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.Future;
32  import javax.xml.parsers.ParserConfigurationException;
33  import javax.xml.parsers.SAXParser;
34  import javax.xml.parsers.SAXParserFactory;
35  import java.util.logging.Level;
36  import java.util.logging.Logger;
37  import org.mortbay.jetty.bio.SocketConnector;
38  import org.xml.sax.InputSource;
39  import org.xml.sax.SAXException;
40  import org.xml.sax.SAXParseException;
41  
42  /***
43   * <p>This class is responsible for executing TupleFlow jobs.</p>
44   *
45   * <p>A job is specified using the TupleFlow Job class.  The Job class has an XML form
46   * which can be parsed by JobConstructor, but you can create one programmatically as well.</p>
47   *
48   * <p>Before the job is executed, it is verified.  JobExecutor verifies that all the
49   * classes references by the Job object actually exist, and that the connections point
50   * sensible places.  Once it has been verified, the JobExecutor builds an execution plan
51   * that will execute the job with as much parallelism as possible while not violate any
52   * ordering constraints dictated by stage connections.  After the plan is generated,
53   * the job is sent to a StageExecutor for the low-level details of execution.</p>
54   *
55   * <p>TupleFlow has many different kinds of StageExecutors you can use.  To get started
56   * and to debug your code, use the LocalStageExecutor or ThreadedExecutor.  To harness
57   * more parallelism, use the SSHStageExecutor or the DRMAAStageExecutor.</p>
58   * 
59   * @author trevor
60   */
61  public class JobExecutor {
62      ErrorStore store;
63      Job job;
64      String temporaryStorage;
65      int defaultHashCount = 10;
66      long maximumFileInputs = 200;
67      HashMap<String, ConnectionDescription> connections = new HashMap<String, ConnectionDescription>();
68      HashMap<String, StageGroupDescription> stages = new HashMap<String, StageGroupDescription>();
69      ArrayList<String> stageOrder = new ArrayList<String>();
70      HashMap<String, HashSet<String>> stageChildren = new HashMap<String, HashSet<String>>();
71      HashMap<String, HashSet<String>> stageParents = new HashMap<String, HashSet<String>>();
72      ArrayList<DataPipe> pipes = new ArrayList<DataPipe>();
73  
74      public JobExecutor(Job job, String temporaryStorage, ErrorStore store) {
75          this.store = store;
76          this.temporaryStorage = temporaryStorage;
77          this.job = job;
78      }
79  
80      /***
81       * Parses the XML text in the file specified by the filename parameter into a Job.
82       * 
83       * @param filename A path to the XML file to parse.
84       * @param store An ErrorStore object where all error information should be sent.
85       * @return A job instance based on the XML description in the text parameter.
86       * @throws org.xml.sax.SAXException
87       * @throws javax.xml.parsers.ParserConfigurationException
88       * @throws java.io.IOException
89       */
90      public static Job parseFile(String filename, ErrorStore store) throws SAXException, ParserConfigurationException, IOException {
91          // Parse the file
92          JobConstructor jobParseSaxHandler = new JobConstructor(filename, store);
93          SAXParser parser = SAXParserFactory.newInstance().newSAXParser();
94  
95          try {
96              parser.parse(new File(filename), jobParseSaxHandler);
97          } catch (SAXParseException e) {
98              store.addError(filename, e);
99          }
100 
101         return jobParseSaxHandler.getJob();
102     }
103 
104     /***
105      * Parses the XML text in the text parameter into a Job.
106      * 
107      * @param text XML text to parse into a Job.
108      * @param store An ErrorStore object where all error information should be sent.
109      * @return A job instance based on the XML description in the text parameter.
110      * @throws org.xml.sax.SAXException
111      * @throws javax.xml.parsers.ParserConfigurationException
112      * @throws java.io.IOException
113      */
114     public static Job parseText(String text, ErrorStore store) throws SAXException, ParserConfigurationException, IOException {
115         // Parse the file
116         JobConstructor jobParseSaxHandler = new JobConstructor(store);
117         SAXParser parser = SAXParserFactory.newInstance().newSAXParser();
118 
119         try {
120             StringReader reader = new StringReader(text);
121             parser.parse(new InputSource(reader), jobParseSaxHandler);
122         } catch (SAXParseException e) {
123             store.addError("none", e);
124         }
125 
126         return jobParseSaxHandler.getJob();
127     }
128 
129     /***
130      * This method tries to combine stages together to reduce overhead.
131      * 
132      * In particular, this method looks for two stages, A and B, where each
133      * copy of B takes input from only one copy of stage A.  In this case, 
134      * all the steps from B are moved into stage A, saving a lot of file overhead
135      * in transferring tuples from A to B.
136      * 
137      * This method is particularly useful for jobs that are created
138      * automatically.
139      * 
140      * @param job  The job instance to optimize.
141      * @return A new job instance, perhaps with fewer stages.
142      */
143     public static Job optimize(Job job) {
144         // First, figure out what source gets output from the stage, if any.
145         HashMap<String, String> outputs = new HashMap<String, String>();
146         HashMap<String, String> inputs = new HashMap<String, String>();
147 
148         for (Stage stage : job.stages.values()) {
149             // Output mapping
150             if (stage.steps.size() > 0) {
151                 Step lastStep = stage.steps.get(stage.steps.size() - 1);
152                 if (lastStep instanceof OutputStep) {
153                     OutputStep output = (OutputStep) lastStep;
154                     outputs.put(stage.name, output.getId());
155                 }
156 
157                 Step firstStep = stage.steps.get(0);
158                 if (firstStep instanceof InputStep) {
159                     InputStep input = (InputStep) firstStep;
160                     inputs.put(stage.name, input.getId());
161                 }
162             }
163         }
164 
165         // Create a mapping from String -> Stage
166         HashMap<String, Stage> stages = new HashMap<String, Stage>(job.stages);
167 
168         // Now, rip through the connections, and try to find a connection that links
169         // one of these inputs to one of the outputs.
170 
171         Iterator<Connection> iterator = job.connections.iterator();
172         Connection connection;
173 
174         innerLoop:
175         while (iterator.hasNext()) {
176             connection = iterator.next();
177 
178             // For simplicity, find just connections with single inputs and outputs and no hashing
179             if (connection.inputs.size() != 1) {
180                 continue;
181             }
182             if (connection.outputs.size() < 1) {
183                 continue;
184             }
185             if (connection.getHash() != null) {
186                 continue;
187             }
188             ConnectionEndPoint connectionInput = connection.inputs.get(0);
189             String stageOutputPointId = outputs.get(connectionInput.getStageName());
190 
191             // if the input to this connection is not the main output of a stage, skip
192             if (!connectionInput.getPointName().equals(stageOutputPointId)) {
193                 continue;            // make sure all of the outputs are the inputs of stages
194             }
195             for (ConnectionEndPoint connectionOutput : connection.outputs) {
196                 String stageInputPointId = inputs.get(connectionOutput.getStageName());
197 
198                 if (!connectionOutput.getPointName().equals(stageInputPointId)) {
199                     continue innerLoop;
200                 }
201                 if (connectionOutput.getAssignment() == ConnectionAssignmentType.Combined) {
202                     continue innerLoop;
203                 }
204             }
205 
206             // now we've verified that these stages can be combined together.
207             Stage source = stages.get(connectionInput.getStageName());
208 
209             MultiStep multi = new MultiStep();
210             multi.groups = new ArrayList<ArrayList<Step>>();
211 
212             for (ConnectionEndPoint connectionOutput : connection.outputs) {
213                 Stage destination = stages.get(connectionOutput.getStageName());
214                 // getting ready: remove the first step, add on to the multi
215                 int length = destination.steps.size();
216                 ArrayList<Step> steps = new ArrayList<Step>(destination.steps.subList(1, length));
217                 multi.groups.add(steps);
218 
219                 renameConnections(job, source, destination);
220 
221                 // combine dependence information
222                 source.connections.remove(connectionInput.getPointName());
223                 destination.connections.remove(connectionOutput.getPointName());
224                 source.connections.putAll(destination.connections);
225 
226                 // remove the destination stage
227                 job.stages.remove(destination);
228             }
229 
230             source.steps.remove(source.steps.size() - 1);
231 
232             // only add a multi step if there were multiple outputs
233             if (multi.groups.size() == 1) {
234                 source.steps.addAll(multi.groups.get(0));
235             } else {
236                 source.steps.add(multi);
237             }
238 
239             // remove this connection
240             iterator.remove();
241 
242             // recurse to remove other connections
243             return optimize(job);
244         }
245 
246         return job;
247     }
248 
249     public static void renameConnections(Job job, Stage source, Stage destination) {
250         // for each connection, rename dest -> source.
251         for (Connection connection : job.connections) {
252             for (ConnectionEndPoint input : connection.inputs) {
253                 if (input.getStageName().equals(destination.name)) {
254                     input.setStageName(source.name);
255                 }
256             }
257 
258             for (ConnectionEndPoint output : connection.outputs) {
259                 if (output.getStageName().equals(destination.name)) {
260                     output.setStageName(source.name);
261                 }
262             }
263         }
264     }
265 
266     public void prepare() {
267         boolean successful = constructAndVerify();
268 
269         if (!successful) {
270             return;
271         }
272 
273         // look through each stage to see how many open files it will have.
274         // if a particular stage will have a lot of open files, add some
275         // intermediate merge stages.
276         if (needsMergeStages()) {
277             addMergeStages(job);
278             constructAndVerify();
279         }
280 
281         for (DataPipe pipe : pipes) {
282             pipe.makeDirectories();
283         }
284     }
285 
286     public void clear() {
287         connections.clear();
288         stageChildren.clear();
289         stageOrder.clear();
290         stageParents.clear();
291         stages.clear();
292         pipes.clear();
293     }
294 
295     /***
296      * Checks to see if any stage has too many inputs.
297      * 
298      * @return true, if there is a stage with more than maximumFileInputs.
299      */
300     public boolean needsMergeStages() {
301         for (StageGroupDescription stage : stages.values()) {
302             long totalInputs = 0;
303 
304             if (stage.getName().endsWith("mergeStage")) {
305                 continue;
306             }
307             for (DataPipeRegion region : stage.inputs.values()) {
308                 totalInputs += region.fileCount();
309             }
310 
311             if (totalInputs > this.maximumFileInputs) {
312                 return true;
313             }
314         }
315 
316         return false;
317     }
318 
319     /***
320      * Add a merge stage to this job that merges the output of
321      * stageName called pointName.
322      * 
323      * @param job  The job that should get the new merge stage.
324      * @param stageName  The stage that contains the output that needs merging.
325      * @param pointName  The output point that needs merging in the stage stageName.
326      */
327     public void addMergeStage(Job job, String stageName, String pointName) {
328         // find the stage and the point, initialize class/order information
329         Stage inputStage = job.stages.get(stageName);
330         StageConnectionPoint inputPoint = inputStage.getConnection(pointName);
331 
332         String className = inputPoint.getClassName();
333         String[] typeOrder = inputPoint.getOrder();
334         String mergedStageName = stageName + "-" + pointName + "-mergeStage";
335         String mergedPointName = pointName + "-merged";
336 
337         // if this merge stage has already been added, don't add it again
338         if (job.stages.containsKey(mergedStageName)) {
339             return;        // create the stage itself
340         }
341         Stage s = new Stage(mergedStageName);
342         s.add(new StageConnectionPoint(ConnectionPointType.Input,
343                                        pointName,
344                                        className,
345                                        typeOrder,
346                                        null));
347         s.add(new StageConnectionPoint(ConnectionPointType.Output,
348                                        pointName + "-merged",
349                                        className,
350                                        typeOrder,
351                                        null));
352 
353         s.add(new InputStep(pointName));
354         s.add(new OutputStep(mergedPointName));
355         job.add(s);
356 
357         String[] hash = null;
358         int hashCount = -1;
359 
360         // run through the connections list, find all inputs for the previous data
361         for (Connection connection : job.connections) {
362             for (ConnectionEndPoint input : connection.inputs) {
363                 if (input.getStageName().equals(stageName) &&
364                         input.getPointName().equals(pointName)) {
365                     if (hash != null && connection.hash != null &&
366                         !Arrays.equals(hash,connection.hash)) {
367                         continue;
368                     }
369                     if (connection.hash != null) {
370                         hash = connection.hash;
371                         connection.hash = null;
372                     }
373 
374                     input.setStageName(mergedStageName);
375                     input.setPointName(mergedPointName);
376                 }
377             }
378         }
379 
380         // now, add a connection between the producing stage and the merge stage
381         job.connect(new StagePoint(stageName, pointName),
382                     new StagePoint(mergedStageName, pointName),
383                     ConnectionAssignmentType.Each,
384                     hash,
385                     hashCount);
386     }
387 
388     /***
389      * Find stages that need to open a lot of files for reading when running,
390      * and add some intermediate merge stages to reduce problems with open files.
391      * 
392      * @param job
393      */
394     public void addMergeStages(Job job) {
395         // look at each stage in the job
396         for (StageGroupDescription stage : stages.values()) {
397             long totalInputs = 0;
398 
399             // add up every file this stage will need to open
400             for (DataPipeRegion region : stage.inputs.values()) {
401                 totalInputs += region.fileCount();
402             }
403 
404             // if this stage needs to open too many files, it might crash.
405             // therefore, we want to add in an extra merge stage for every input
406             // that contains a lot of files.
407             if (totalInputs > this.maximumFileInputs) {
408                 Stage s = stage.getStage();
409                 ArrayList<Connection> relevantConnections = new ArrayList<Connection>();
410 
411                 // look for connections that point to this stage and store them
412                 for (Connection connection : job.connections) {
413                     for (ConnectionEndPoint point : connection.outputs) {
414                         if (point.getStageName().equals(s.name)) {
415                             relevantConnections.add(connection);
416                         }
417                     }
418                 }
419 
420                 // for each stage we marked in the last loop, find ones where all
421                 // the inputs come from a single stage, and that stage has a large
422                 // instanceCount (it generates a lot of files), and make a merge stage.
423                 for (Connection connection : relevantConnections) {
424                     if (connection.inputs.size() != 1) {
425                         continue;
426                     }
427                     ConnectionEndPoint endPoint = connection.inputs.get(0);
428                     String inputStageName = endPoint.getStageName();
429                     String inputPointName = endPoint.getPointName();
430                     StageGroupDescription inputStageDesc = stages.get(inputStageName);
431 
432                     // if there's no description, that means we just added it
433                     if (inputStageDesc == null ||
434                             inputStageName.endsWith("mergeStage")) {
435                         continue;
436                     }
437                     if (inputStageDesc.instanceCount > 1) {
438                         addMergeStage(job, inputStageName, inputPointName);
439                     }
440                 }
441             }
442         }
443     }
444 
445     private static class EndPointName implements Comparable<EndPointName> {
446         public String stageName;
447         public String pointName;
448         public ConnectionPointType type;
449         public FileLocation location;
450 
451         public EndPointName(String stageName, String pointName, ConnectionPointType type, FileLocation location) {
452             this.stageName = stageName;
453             this.pointName = pointName;
454             this.type = type;
455             this.location = location;
456         }
457 
458         public EndPointName(String stageName, String pointName, ConnectionPointType type) {
459             this(stageName, pointName, type, null);
460         }
461 
462         public int compareTo(EndPointName other) {
463             int result = stageName.compareTo(other.stageName);
464             if (result == 0) {
465                 result = pointName.compareTo(other.pointName);
466             }
467             if (result == 0) {
468                 result = type.compareTo(other.type);
469             }
470             return result;
471         }
472 
473         @Override
474         public int hashCode() {
475             return stageName.hashCode() + 7 * pointName.hashCode() + 15 * type.toString().hashCode();
476         }
477 
478         @Override
479         public String toString() {
480             return String.format("%s %s %s", stageName, pointName, type.toString());
481         }
482     }
483 
484     /***
485      * In the parameter file, each stage has a connections section that describes a set of
486      * connection endpoints for the stage (inputs and outputs).  This method verifies that
487      * all of those endpoints are connected to valid connections, defined under the 
488      * connections tag in the job.  If the method finds an dangling (unconnected) endpoint, 
489      * an error message is added to the ErrorStore.
490      */
491     public void findDanglingEndpoints(final Job job) {
492         TreeSet<EndPointName> endPointNames = new TreeSet();
493 
494         // First, make a list of all endpoints referenced in all stages.
495         for (Stage stage : job.stages.values()) {
496             // find the corresponding description object
497             StageGroupDescription description = stages.get(stage.name);
498 
499             // add all connection points to the set
500             for (StageConnectionPoint point : stage.connections.values()) {
501                 EndPointName ep = new EndPointName(stage.name, point.getExternalName(), point.
502                                                    getType(), point.location);
503                 endPointNames.add(ep);
504             }
505         }
506 
507         // Now we have a list of referenced names.  We now remove every endpoint that
508         // is referenced in the connections section.
509         for (ConnectionDescription connection : connections.values()) {
510             for (EndPointDescription input : connection.inputs) {
511                 EndPointName ep = new EndPointName(input.stage.getName(),
512                                                    input.stagePoint.getExternalName(),
513                                                    input.stagePoint.getType());
514                 endPointNames.remove(ep);
515             }
516             for (EndPointDescription output : connection.outputs) {
517                 EndPointName ep = new EndPointName(output.stage.getName(),
518                                                    output.stagePoint.getExternalName(),
519                                                    output.stagePoint.getType());
520                 endPointNames.remove(ep);
521             }
522         }
523 
524 
525         for (EndPointName ep : endPointNames) {
526             store.addError(ep.location,
527                            ep.stageName + ": No connection references the " + ep.type +
528                            " with the name '" + ep.pointName + "'.");
529         }
530     }
531 
532     private boolean constructAndVerify() {
533         clear();
534 
535         // first, we make stage group descriptions, getting ready to add connections in
536         for (Stage stage : job.stages.values()) {
537             stages.put(stage.name, new StageGroupDescription(stage));
538         }
539 
540         // verify each stage in the job to make sure that
541         // the connections between individual steps are typesafe, that
542         // step classes exist, etc.
543         Verification.verify(job, store);
544         if (store.getErrors().size() > 0) {
545             return false;
546         }
547 
548         // build data about connections between stages, while verifying
549         // type safety between stage connections.
550         buildConnections(job);
551         if (store.getErrors().size() > 0) {
552             return false;
553         }
554 
555         findDanglingEndpoints(job);
556         if (store.getErrors().size() > 0) {
557             return false;
558         }
559 
560         generateDependencies();
561         if (store.getErrors().size() > 0) {
562             return false;
563         }
564 
565         determineStageOrder();
566         if (store.getErrors().size() > 0) {
567             return false;
568         }
569 
570         countStages();
571         if (store.getErrors().size() > 0) {
572             return false;
573         }
574 
575         createPipeObjects();
576         return true;
577     }
578 
579     private void generateDependencies() {
580         // generate a list of stages
581         for (StageGroupDescription stage : stages.values()) {
582             stageChildren.put(stage.getName(), new HashSet());
583             stageParents.put(stage.getName(), new HashSet());
584         }
585 
586         for (ConnectionDescription connection : connections.values()) {
587             for (EndPointDescription input : connection.inputs) {
588                 for (EndPointDescription output : connection.outputs) {
589                     stageChildren.get(input.getStageName()).add(output.getStageName());
590                     stageParents.get(output.getStageName()).add(input.getStageName());
591                 }
592             }
593         }
594     }
595 
596     private void determineStageOrder() {
597         ArrayList<String> result = new ArrayList();
598         HashSet<String> used = new HashSet();
599         HashSet<String> batch = new HashSet();
600 
601         for (String stageName : stageParents.keySet()) {
602             if (stageParents.get(stageName).size() == 0) {
603                 batch.add(stageName);
604             }
605         }
606 
607         while (batch.size() > 0) {
608             HashSet<String> nextBatch = new HashSet();
609 
610             for (String stageName : batch) {
611                 result.add(stageName);
612                 used.add(stageName);
613 
614                 HashSet<String> children = stageChildren.get(stageName);
615 
616                 // adding this stage to the list may have unblocked a child
617                 for (String child : children) {
618                     HashSet<String> childParents = stageParents.get(child);
619 
620                     if (!used.contains(child) && used.containsAll(childParents)) {
621                         nextBatch.add(child);
622                     }
623                 }
624             }
625 
626             batch = nextBatch;
627             nextBatch = new HashSet();
628         }
629 
630         assert result.size() == stages.size();
631         stageOrder = result;
632     }
633 
634     private static class EndPointDescription {
635         public EndPointDescription(ConnectionDescription connection,
636                                    StageGroupDescription stage,
637                                    ConnectionEndPoint connectionPoint,
638                                    StageConnectionPoint stagePoint) {
639             this.connectionPoint = connectionPoint;
640             this.stagePoint = stagePoint;
641             this.stage = stage;
642             this.connection = connection;
643         }
644 
645         public String getStageName() {
646             return stage.getName();
647         }
648 
649         public StageConnectionPoint getStagePoint() {
650             return stagePoint;
651         }
652 
653         public ConnectionEndPoint getConnectionPoint() {
654             return connectionPoint;
655         }
656 
657         public ConnectionDescription getConnection() {
658             return connection;
659         }
660         public StageGroupDescription stage;
661         public StageConnectionPoint stagePoint;
662         public ConnectionEndPoint connectionPoint;
663         public ConnectionDescription connection;
664     }
665 
666     private class ConnectionDescription {
667         public ConnectionDescription(Connection connection) {
668             this.connection = connection;
669             this.inputs = new ArrayList();
670             this.outputs = new ArrayList();
671         }
672 
673         public boolean isHashed() {
674             return connection.hash != null;
675         }
676 
677         public int getOutputCount() {
678             int result = 1;
679 
680             if (isHashed()) {
681                 String hashCount = job.properties.get("hashCount");
682 
683                 if (connection.getHashCount() > 0) {
684                     result = connection.getHashCount();
685                 } else if (hashCount != null &&
686                         Utility.isInteger(hashCount)) {
687                     result = Integer.parseInt(hashCount);
688                 } else {
689                     result = defaultHashCount;
690                 }
691             } else {
692                 result = getInputCount();
693             }
694 
695             return result;
696         }
697 
698         public int getInputCount() {
699             int total = 0;
700 
701             for (EndPointDescription input : inputs) {
702                 total += input.stage.instanceCount;
703             }
704 
705             return total;
706         }
707 
708         public String getName() {
709             return connection.getName();
710         }
711 
712         public String[] getOrder() {
713             return connection.getOrder();
714         }
715 
716         public String[] getHash() {
717             return connection.getHash();
718         }
719 
720         private String getClassName() {
721             return connection.getClassName();
722         }
723 
724         public DataPipe getPipe() {
725             return pipe;
726         }
727 
728         public void setPipe(DataPipe pipe) {
729             this.pipe = pipe;
730         }
731 
732         @Override
733         public String toString() {
734             return String.format("%s %s", getClassName(), getName());
735         }
736         public Connection connection;
737         public ArrayList<EndPointDescription> inputs;
738         public ArrayList<EndPointDescription> outputs;
739         public DataPipe pipe;
740     }
741 
742     private EndPointDescription createEndPoint(ConnectionDescription connection,
743                                                ConnectionEndPoint endPoint) {
744         StageGroupDescription stageDescription = stages.get(endPoint.getStageName());
745 
746         if (stageDescription == null) {
747             store.addError(endPoint.location,
748                            "The stage '" + endPoint.getStageName() + "' was not found.");
749         } else {
750             Stage stage = stageDescription.getStage();
751             StageConnectionPoint point = stage.getConnection(endPoint.getPointName());
752 
753             if (point == null) {
754                 store.addError(endPoint.location, "The endpoint '" + endPoint.getPointName() + "' wasn't found in this stage, " +
755                                "even though there is a connection to it.");
756             } else if (!ConnectionPointType.connectable(endPoint.getType(), point.getType())) {
757                 store.addError(endPoint.location,
758                                "The endpoint '" + endPoint.getPointName() + "' is in this stage, but it's going the wrong direction.");
759             } else if (!point.getClassName().equals(connection.connection.getClassName())) {
760                 store.addError(endPoint.location, "This " + point.getType() + " has a different class name '" + point.getClassName() +
761                                " than the connection that connects to it: " + connection.connection.
762                                getClassName() + ".");
763             } else if (!Arrays.equals(point.getOrder(), connection.connection.getOrder())) {
764                 store.addError(endPoint.location, "This " + point.getType() + " has a different order " + Arrays.toString(point.getOrder()) +
765                                " than the connection that connects to it: " + Arrays.toString(
766                                connection.connection.getOrder()));
767             } else {
768                 return new EndPointDescription(connection, stageDescription, endPoint, point);
769             }
770         }
771 
772         return null;
773     }
774 
775     private ArrayList<EndPointDescription> createEndPoints(ConnectionDescription connection,
776                                                            ArrayList<ConnectionEndPoint> endPoints) {
777         ArrayList<EndPointDescription> results = new ArrayList<EndPointDescription>();
778 
779         for (ConnectionEndPoint endPoint : endPoints) {
780             EndPointDescription epd = createEndPoint(connection, endPoint);
781 
782             if (epd != null) {
783                 results.add(epd);
784             }
785         }
786 
787         return results;
788     }
789 
790     /***
791      * Creates ConnectionDescription objects for each connection listed in
792      * the Job object.  The ConnectionDescription objects combine information
793      * from the StageConnectionPoints (in the Stages) and the ConnectionEndPoints
794      * (in the Job.Connection objects) to make them easier to access.  In the process
795      * of making these objects, this method typechecks all of the connections.
796      */
797     private void buildConnections(final Job job) {
798         for (Connection connection : job.connections) {
799             ConnectionDescription description = new ConnectionDescription(connection);
800 
801             // verify that the class, order, and hash exist
802             ErrorHandler handler = store.getErrorHandler(connection.location);
803             Verification.requireClass(connection.getClassName(), handler);
804             Verification.requireOrder(connection.getClassName(), connection.getOrder(), handler);
805 
806             if (connection.getHash() != null) {
807                 Verification.requireOrder(connection.getClassName(),
808                                                                         connection.getHash(),
809                                                                         handler);
810             }
811             description.inputs = createEndPoints(description, connection.inputs);
812             description.outputs = createEndPoints(description, connection.outputs);
813             connections.put(connection.getName(), description);
814         }
815     }
816 
817     /***
818      * This method computes the number of copies of each stage to run.  
819      * The execution count of stage depends on its inputs.  If the input
820      * for a stage is hashed 200 ways, for instance, then there will need
821      * to be 200 copies of the stage. 
822      */
823     private void countStages() {
824         HashMap<String, HashSet<EndPointDescription>> stageInputs = new HashMap();
825         HashMap<String, HashSet<EndPointDescription>> stageOutputs = new HashMap();
826 
827         for (String stageName : stages.keySet()) {
828             stageInputs.put(stageName, new HashSet());
829             stageOutputs.put(stageName, new HashSet());
830         }
831 
832         for (ConnectionDescription connection : connections.values()) {
833             for (EndPointDescription endPoint : connection.inputs) {
834                 stageOutputs.get(endPoint.stage.getName()).add(endPoint);
835             }
836             for (EndPointDescription endPoint : connection.outputs) {
837                 stageInputs.get(endPoint.stage.getName()).add(endPoint);
838             }
839         }
840 
841         for (String stageName : stageOrder) {
842             StageGroupDescription stage = stages.get(stageName);
843 
844             // if stage has no inputs, then we store 1 in stageCounts
845             if (stageInputs.get(stageName).size() == 0) {
846                 stage.instanceCount = 1;
847             } else {
848                 // find out what the assignment is for this connection.
849                 int instanceCount = 1;
850                 boolean unknown = true;
851 
852                 HashSet<EndPointDescription> inputs = stageInputs.get(stageName);
853 
854                 for (EndPointDescription description : inputs) {
855                     ConnectionEndPoint point = description.getConnectionPoint();
856                     ConnectionAssignmentType assignment = point.getAssignment();
857 
858                     switch (assignment) {
859                         case One:
860                             store.addError(point.location,
861                                            "The 'one' mode is not currently supported.");
862                             break;
863 
864                         case Each:
865                             int inputCount = description.connection.getOutputCount();
866 
867                             if (unknown) {
868                                 instanceCount = inputCount;
869                                 unknown = false;
870                             } else if (!unknown && instanceCount != inputCount) {
871                                 store.addError(point.location, "The number of stage instances for '" +
872                                                stageName + "' is ambiguous (" + inputCount +
873                                                " or " + instanceCount + ")");
874                             }
875                             break;
876 
877                         case Combined:
878                             // this doesn't affect the number of stages.
879                             break;
880                     }
881                 }
882 
883                 if (unknown) {
884                     instanceCount = 1;
885                 }
886                 stage.instanceCount = instanceCount;
887             }
888         }
889     }
890 
891     /***
892      * Creates the directory structures to hold the files for this job.
893      */
894     private void createPipeObjects() {
895         // Now, we need to create the connections
896         for (ConnectionDescription connection : connections.values()) {
897             // Make the parent directory
898             String directoryName = temporaryStorage + File.separator + connection.getName();
899             new File(directoryName).mkdir();
900 
901             DataPipe pipe = new DataPipe(directoryName,
902                                          connection.getName(),
903                                          connection.getClassName(),
904                                          connection.getOrder(),
905                                          connection.getHash(),
906                                          connection.getInputCount(),
907                                          connection.getOutputCount());
908 
909             int startIndex = 0;
910             connection.setPipe(pipe);
911 
912             for (EndPointDescription input : connection.inputs) {
913                 StageGroupDescription description = stages.get(input.getStageName());
914                 description.outputs.put(input.getStagePoint().getInternalName(),
915                                         new DataPipeRegion(pipe,
916                                                            startIndex,
917                                                            startIndex + description.getInstanceCount(),
918                                                            ConnectionPointType.Input));
919                 startIndex += description.getInstanceCount();
920             }
921 
922             for (EndPointDescription output : connection.outputs) {
923                 StageGroupDescription description = stages.get(output.getStageName());
924                 description.inputs.put(output.getStagePoint().getInternalName(),
925                                        new DataPipeRegion(pipe,
926                                                           0,
927                                                           connection.getOutputCount(),
928                                                           ConnectionPointType.Output));
929             }
930 
931             pipes.add(pipe);
932         }
933     }
934 
935     public static class JobExecutionStatus {
936         // these are the names of all stages that have completed
937         HashMap<String, StageExecutionStatus> completedStages = new HashMap<String, StageExecutionStatus>();
938         // named of all stages that have been launched (contains all completed stages too)
939         HashSet<String> launchedStages = new HashSet<String>();
940         // all stages that are currently running.  Includes a Future object that can be used
941         // to wait for the stage to complete (and to get exceptions thrown by the stage)
942         HashMap<String, StageExecutionStatus> runningStages = new HashMap<String, StageExecutionStatus>();
943         // names of connections that are complete, meaning that all their inputs have been created
944         HashSet<String> completedConnections = new HashSet<String>();
945         // map from connection names to the names of stages that provide inputs to the connection
946         HashMap<String, HashSet<String>> connectionDependencies = new HashMap<String, HashSet<String>>();
947 
948         // reference to the parent class.
949         HashMap<String, StageGroupDescription> stages;
950         // reference to the parent class.
951         String temporaryStorage;
952         StageExecutor executor;
953         Date startDate;
954 
955         public JobExecutionStatus(HashMap<String, StageGroupDescription> stages,
956                 String temporaryStorage, StageExecutor executor, String masterURL) {
957             this.stages = stages;
958             this.temporaryStorage = temporaryStorage;
959             this.executor = executor;
960             this.startDate = new Date();
961 
962             for (StageGroupDescription description : stages.values()) {
963                 // build a list of dependencies from pipe inputs to stage names
964                 for (DataPipeRegion region : description.outputs.values()) {
965                     String pipeName = region.pipe.pipeName;
966 
967                     if (!connectionDependencies.containsKey(pipeName)) {
968                         connectionDependencies.put(
969                                 pipeName, new HashSet<String>());
970                     }
971                     connectionDependencies.get(pipeName).add(description.getName());
972                 }
973 
974                 description.setMasterURL(masterURL);
975             }
976         }
977 
978         class BlockedExecutionStatus implements StageExecutionStatus {
979             String name;
980             int instances;
981             
982             BlockedExecutionStatus(String name, int instances) {
983                 this.name = name;
984                 this.instances = instances;
985             }
986 
987             public String getName() { return name; }
988             public int getBlockedInstances() { return instances; }
989             public int getQueuedInstances() { return 0; }
990             public int getRunningInstances() { return 0; }
991             public int getCompletedInstances() { return 0; }
992             public boolean isDone() { return false; }
993             public List<Exception> getExceptions() { return Collections.EMPTY_LIST; }
994         }
995 
996         public synchronized boolean isComplete() {
997             return stages.size() == completedStages.size();
998         }
999 
1000         public synchronized Map<String, StageExecutionStatus> getStageStatus() {
1001             Map<String, StageExecutionStatus> result = new TreeMap();
1002 
1003             for (String stageName : stages.keySet()) {
1004                 int instanceCount = stages.get(stageName).getInstanceCount();
1005                 if (completedStages.containsKey(stageName)) {
1006                     result.put(stageName, completedStages.get(stageName));
1007                 } else if (runningStages.containsKey(stageName)) {
1008                     result.put(stageName, runningStages.get(stageName));
1009                 } else {
1010                     result.put(stageName, new BlockedExecutionStatus(stageName, instanceCount));
1011                 }
1012             }
1013 
1014             return result;
1015         }
1016 
1017         /***
1018          * Returns the start date for this job.
1019          */
1020 
1021         public Date getStartDate() {
1022             return startDate;
1023         }
1024 
1025         /***
1026          * Returns the total amount of free memory in this JVM.
1027          */
1028 
1029         public long getFreeMemory() {
1030             return Runtime.getRuntime().freeMemory();
1031         }
1032 
1033         /***
1034          * Returns the maximum amount of memory that can be used by this
1035          * Java virtual machine.
1036          */
1037 
1038         public long getMaxMemory() {
1039             return Runtime.getRuntime().maxMemory();
1040         }
1041 
1042         public void run() throws InterruptedException, ExecutionException {
1043             // while there are incomplete stages, choose one to execute
1044             while (launchedStages.size() < stages.size()) {
1045                 // look for stages where all of their inputs are complete
1046                 StageGroupDescription description = findRunnableStage(stages.values(), launchedStages,
1047                                                                       completedConnections);
1048 
1049                 // didn't find any runnable stages, so we need to check to
1050                 // see if any other stages have finished recently that might have
1051                 // generated interesting input for pending stages.
1052                 while (description == null) {
1053                     // wait for at least one stage to complete
1054                     waitForStages(runningStages, completedStages);
1055                     updateCompletedConnections(completedStages, completedConnections,
1056                                                connectionDependencies);
1057 
1058                     // now, try again to find a runnable stage
1059                     description = findRunnableStage(stages.values(), launchedStages,
1060                                                     completedConnections);
1061                 }
1062 
1063                 StageExecutionStatus result = executor.execute(description, temporaryStorage);
1064 
1065                 synchronized(this) {
1066                     launchedStages.add(description.stage.name);
1067                     runningStages.put(description.stage.name, result);
1068                 }
1069             }
1070 
1071             // wait for everything to complete
1072             while (runningStages.size() > 0) {
1073                 waitForStages(runningStages, completedStages);
1074             }
1075         }
1076 
1077         /***
1078          * Finds a stage that is ready to run by checking stage dependencies.
1079          *
1080          * @param descriptions
1081          * @param launchedJobs
1082          * @param completedConnections
1083          */
1084         private synchronized StageGroupDescription findRunnableStage(
1085                 Collection<StageGroupDescription> descriptions,
1086                 HashSet<String> launchedJobs,
1087                 HashSet<String> completedConnections) {
1088             // for each job we might want to launch
1089             for (StageGroupDescription description : descriptions) {
1090                 // if it has already been launched, we don't need to do it again
1091                 if (launchedJobs.contains(description.getName())) {
1092                     continue;            // are the inputs to this stage ready?
1093                 }
1094                 boolean allComplete = true;
1095 
1096                 for (DataPipeRegion region : description.inputs.values()) {
1097                     // if this input is incomplete, we can't run this stage yet
1098                     if (!completedConnections.contains(region.pipe.pipeName)) {
1099                         allComplete = false;
1100                         break;
1101                     }
1102                 }
1103 
1104                 if (allComplete) {
1105                     return description;
1106                 }
1107             }
1108 
1109             // there are no stages ready to run
1110             return null;
1111         }
1112         /***
1113          * Polls all the running stages to see if they've completed.  When one completes,
1114          * it is added to completedStages and the method returns.
1115          * 
1116          * @param runningStages
1117          * @param completedStages
1118          * @throws java.lang.InterruptedException
1119          * @throws java.util.concurrent.ExecutionException
1120          */
1121         private void waitForStages(
1122                 HashMap<String, StageExecutionStatus> runningStages,
1123                 final HashMap<String, StageExecutionStatus> completedStages)
1124                 throws InterruptedException, ExecutionException {
1125             long delay = 1;
1126 
1127             while (runningStages.size() > 0) {
1128                 synchronized(this) {
1129                     for (String name : runningStages.keySet()) {
1130                         StageExecutionStatus status = runningStages.get(name);
1131                         if (status.isDone()) {
1132                             // force the exception to throw
1133                             List<Exception> exceptions = status.getExceptions();
1134                             if (exceptions.size() > 0) {
1135                                 throw new ExecutionException("Stage threw an exception: ", exceptions.get(0));
1136                             }
1137                             completedStages.put(name, status);
1138                             runningStages.remove(name);
1139                             return;
1140                         }
1141                     }
1142                 }
1143 
1144                 // check at least once a second, but poll faster at first
1145                 delay = Math.min(delay * 2, 1000);
1146                 Thread.sleep(delay);
1147             }
1148         }
1149 
1150         /***
1151          * Reads through all completed stages, trying to find any connections
1152          * that have been satisfied.  By satisfied, we mean that all the input
1153          * has been generated for a particular connection.  Once a connection
1154          * is satisfied, stages that require that read input from that connection
1155          * can start running.
1156          *
1157          * @param completedStages
1158          * @param completedConnections
1159          * @param connectionDependencies
1160          */
1161         private synchronized void updateCompletedConnections(
1162                 final HashMap<String, StageExecutionStatus> completedStages,
1163                 final HashSet<String> completedConnections,
1164                 final HashMap<String, HashSet<String>> connectionDependencies) {
1165             // Loop over all connections:
1166             for (String pipeName : connectionDependencies.keySet()) {
1167                 // These are all the stages that need to be completed before
1168                 // the connection pipeName is satisfied.
1169                 HashSet<String> pipeInputStages = connectionDependencies.get(pipeName);
1170 
1171                 if (completedStages.keySet().containsAll(pipeInputStages)) {
1172                     completedConnections.add(pipeName);
1173                 }
1174             }
1175         }
1176     }
1177 
1178     public void run(StageExecutor executor) throws InterruptedException, ExecutionException, UnknownHostException, IOException {
1179         Server server = new Server(Utility.getFreePort());
1180         runWithServer(executor, server);
1181     }
1182 
1183     public void runWithServer(StageExecutor executor, Server server) throws ExecutionException, InterruptedException, UnknownHostException {
1184         // FIXME: all of this needs to be refactored.
1185         InetAddress address = java.net.InetAddress.getLocalHost();
1186         int port = server.getConnectors()[0].getPort();
1187         String masterURL = String.format("http://%s:%d", address.getHostAddress(), port);
1188         JobExecutionStatus status = new JobExecutionStatus(stages, temporaryStorage, executor, masterURL);
1189         MasterWebHandler handler = new MasterWebHandler(status);
1190         server.addHandler(handler);
1191         status.run();
1192         handler.waitForFinalPage();
1193         server.removeHandler(handler);
1194     }
1195 
1196     public static boolean runLocally(Job job, ErrorStore store) throws IOException,
1197             InterruptedException, ExecutionException, Exception {
1198         StageExecutor executor = StageExecutorFactory.newInstance("local", new String[] {});
1199         File tempFile = Utility.createTemporary();
1200         tempFile.delete();
1201         tempFile.mkdir();
1202         
1203         JobExecutor jobExecutor = new JobExecutor(job, tempFile.getAbsolutePath(), store);
1204         jobExecutor.prepare();
1205 
1206         if (store.hasStatements()) {
1207             return false;
1208         }
1209 
1210         int port = Utility.getFreePort();
1211         Server server = new Server(port);
1212         server.start();
1213         System.out.println("Status: http://localhost:" + port);
1214         try {
1215             jobExecutor.runWithServer(executor, server);
1216         } finally {
1217             server.stop();
1218             executor.shutdown();
1219         }
1220         
1221         return !store.hasStatements();
1222     }
1223 
1224     public static void main(String[] args) throws ParserConfigurationException, SAXException,
1225             IOException, InterruptedException, ExecutionException {
1226         if (args.length < 3) {
1227             System.out.println("usage: executionModel parameterFile temporaryStorage");
1228             System.out.println(
1229                     "   where executionModel is one of: local, drmaa, ssh, debug, remotedebug");
1230             System.exit(-1);
1231         }
1232 
1233         String executionModel = args[0];
1234         String parameterFile = args[1];
1235         String temporaryStorage = args[2];
1236         String[] remaining = Utility.subarray(args, 3);
1237         StageExecutor executor = null;
1238 
1239         Logger logger = Logger.getLogger(JobExecutor.class.toString());
1240         logger.setLevel(Level.INFO);
1241 
1242         executor = StageExecutorFactory.newInstance(executionModel, remaining);
1243         ErrorStore store = new ErrorStore();
1244 
1245         // First, parse the job
1246         Job job = parseFile(parameterFile, store);
1247 
1248         if (store.hasStatements()) {
1249             System.out.println(store.toString());
1250             System.exit(-1);
1251         }
1252 
1253         JobExecutor jobExecutor = new JobExecutor(job, temporaryStorage, store);
1254         jobExecutor.prepare();
1255 
1256         if (store.hasStatements()) {
1257             System.out.println(store.toString());
1258             System.exit(-1);
1259         }
1260 
1261         jobExecutor.run(executor);
1262         logger.info("Job complete");
1263         executor.shutdown();
1264     }
1265 }