1
2
3 package org.galagosearch.tupleflow.execution;
4
5 import java.net.UnknownHostException;
6 import org.mortbay.jetty.Server;
7 import org.galagosearch.tupleflow.Utility;
8 import org.galagosearch.tupleflow.Processor;
9 import org.galagosearch.tupleflow.TypeReader;
10 import org.galagosearch.tupleflow.execution.Job.StagePoint;
11 import org.galagosearch.tupleflow.execution.StageGroupDescription.DataPipeRegion;
12 import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeInput;
13 import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeOutput;
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.StringReader;
17 import java.net.InetAddress;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.Date;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.TreeMap;
29 import java.util.TreeSet;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.Future;
32 import javax.xml.parsers.ParserConfigurationException;
33 import javax.xml.parsers.SAXParser;
34 import javax.xml.parsers.SAXParserFactory;
35 import java.util.logging.Level;
36 import java.util.logging.Logger;
37 import org.mortbay.jetty.bio.SocketConnector;
38 import org.xml.sax.InputSource;
39 import org.xml.sax.SAXException;
40 import org.xml.sax.SAXParseException;
41
42 /***
43 * <p>This class is responsible for executing TupleFlow jobs.</p>
44 *
45 * <p>A job is specified using the TupleFlow Job class. The Job class has an XML form
46 * which can be parsed by JobConstructor, but you can create one programmatically as well.</p>
47 *
48 * <p>Before the job is executed, it is verified. JobExecutor verifies that all the
49 * classes references by the Job object actually exist, and that the connections point
50 * sensible places. Once it has been verified, the JobExecutor builds an execution plan
51 * that will execute the job with as much parallelism as possible while not violate any
52 * ordering constraints dictated by stage connections. After the plan is generated,
53 * the job is sent to a StageExecutor for the low-level details of execution.</p>
54 *
55 * <p>TupleFlow has many different kinds of StageExecutors you can use. To get started
56 * and to debug your code, use the LocalStageExecutor or ThreadedExecutor. To harness
57 * more parallelism, use the SSHStageExecutor or the DRMAAStageExecutor.</p>
58 *
59 * @author trevor
60 */
61 public class JobExecutor {
62 ErrorStore store;
63 Job job;
64 String temporaryStorage;
65 int defaultHashCount = 10;
66 long maximumFileInputs = 200;
67 HashMap<String, ConnectionDescription> connections = new HashMap<String, ConnectionDescription>();
68 HashMap<String, StageGroupDescription> stages = new HashMap<String, StageGroupDescription>();
69 ArrayList<String> stageOrder = new ArrayList<String>();
70 HashMap<String, HashSet<String>> stageChildren = new HashMap<String, HashSet<String>>();
71 HashMap<String, HashSet<String>> stageParents = new HashMap<String, HashSet<String>>();
72 ArrayList<DataPipe> pipes = new ArrayList<DataPipe>();
73
74 public JobExecutor(Job job, String temporaryStorage, ErrorStore store) {
75 this.store = store;
76 this.temporaryStorage = temporaryStorage;
77 this.job = job;
78 }
79
80 /***
81 * Parses the XML text in the file specified by the filename parameter into a Job.
82 *
83 * @param filename A path to the XML file to parse.
84 * @param store An ErrorStore object where all error information should be sent.
85 * @return A job instance based on the XML description in the text parameter.
86 * @throws org.xml.sax.SAXException
87 * @throws javax.xml.parsers.ParserConfigurationException
88 * @throws java.io.IOException
89 */
90 public static Job parseFile(String filename, ErrorStore store) throws SAXException, ParserConfigurationException, IOException {
91
92 JobConstructor jobParseSaxHandler = new JobConstructor(filename, store);
93 SAXParser parser = SAXParserFactory.newInstance().newSAXParser();
94
95 try {
96 parser.parse(new File(filename), jobParseSaxHandler);
97 } catch (SAXParseException e) {
98 store.addError(filename, e);
99 }
100
101 return jobParseSaxHandler.getJob();
102 }
103
104 /***
105 * Parses the XML text in the text parameter into a Job.
106 *
107 * @param text XML text to parse into a Job.
108 * @param store An ErrorStore object where all error information should be sent.
109 * @return A job instance based on the XML description in the text parameter.
110 * @throws org.xml.sax.SAXException
111 * @throws javax.xml.parsers.ParserConfigurationException
112 * @throws java.io.IOException
113 */
114 public static Job parseText(String text, ErrorStore store) throws SAXException, ParserConfigurationException, IOException {
115
116 JobConstructor jobParseSaxHandler = new JobConstructor(store);
117 SAXParser parser = SAXParserFactory.newInstance().newSAXParser();
118
119 try {
120 StringReader reader = new StringReader(text);
121 parser.parse(new InputSource(reader), jobParseSaxHandler);
122 } catch (SAXParseException e) {
123 store.addError("none", e);
124 }
125
126 return jobParseSaxHandler.getJob();
127 }
128
129 /***
130 * This method tries to combine stages together to reduce overhead.
131 *
132 * In particular, this method looks for two stages, A and B, where each
133 * copy of B takes input from only one copy of stage A. In this case,
134 * all the steps from B are moved into stage A, saving a lot of file overhead
135 * in transferring tuples from A to B.
136 *
137 * This method is particularly useful for jobs that are created
138 * automatically.
139 *
140 * @param job The job instance to optimize.
141 * @return A new job instance, perhaps with fewer stages.
142 */
143 public static Job optimize(Job job) {
144
145 HashMap<String, String> outputs = new HashMap<String, String>();
146 HashMap<String, String> inputs = new HashMap<String, String>();
147
148 for (Stage stage : job.stages.values()) {
149
150 if (stage.steps.size() > 0) {
151 Step lastStep = stage.steps.get(stage.steps.size() - 1);
152 if (lastStep instanceof OutputStep) {
153 OutputStep output = (OutputStep) lastStep;
154 outputs.put(stage.name, output.getId());
155 }
156
157 Step firstStep = stage.steps.get(0);
158 if (firstStep instanceof InputStep) {
159 InputStep input = (InputStep) firstStep;
160 inputs.put(stage.name, input.getId());
161 }
162 }
163 }
164
165
166 HashMap<String, Stage> stages = new HashMap<String, Stage>(job.stages);
167
168
169
170
171 Iterator<Connection> iterator = job.connections.iterator();
172 Connection connection;
173
174 innerLoop:
175 while (iterator.hasNext()) {
176 connection = iterator.next();
177
178
179 if (connection.inputs.size() != 1) {
180 continue;
181 }
182 if (connection.outputs.size() < 1) {
183 continue;
184 }
185 if (connection.getHash() != null) {
186 continue;
187 }
188 ConnectionEndPoint connectionInput = connection.inputs.get(0);
189 String stageOutputPointId = outputs.get(connectionInput.getStageName());
190
191
192 if (!connectionInput.getPointName().equals(stageOutputPointId)) {
193 continue;
194 }
195 for (ConnectionEndPoint connectionOutput : connection.outputs) {
196 String stageInputPointId = inputs.get(connectionOutput.getStageName());
197
198 if (!connectionOutput.getPointName().equals(stageInputPointId)) {
199 continue innerLoop;
200 }
201 if (connectionOutput.getAssignment() == ConnectionAssignmentType.Combined) {
202 continue innerLoop;
203 }
204 }
205
206
207 Stage source = stages.get(connectionInput.getStageName());
208
209 MultiStep multi = new MultiStep();
210 multi.groups = new ArrayList<ArrayList<Step>>();
211
212 for (ConnectionEndPoint connectionOutput : connection.outputs) {
213 Stage destination = stages.get(connectionOutput.getStageName());
214
215 int length = destination.steps.size();
216 ArrayList<Step> steps = new ArrayList<Step>(destination.steps.subList(1, length));
217 multi.groups.add(steps);
218
219 renameConnections(job, source, destination);
220
221
222 source.connections.remove(connectionInput.getPointName());
223 destination.connections.remove(connectionOutput.getPointName());
224 source.connections.putAll(destination.connections);
225
226
227 job.stages.remove(destination);
228 }
229
230 source.steps.remove(source.steps.size() - 1);
231
232
233 if (multi.groups.size() == 1) {
234 source.steps.addAll(multi.groups.get(0));
235 } else {
236 source.steps.add(multi);
237 }
238
239
240 iterator.remove();
241
242
243 return optimize(job);
244 }
245
246 return job;
247 }
248
249 public static void renameConnections(Job job, Stage source, Stage destination) {
250
251 for (Connection connection : job.connections) {
252 for (ConnectionEndPoint input : connection.inputs) {
253 if (input.getStageName().equals(destination.name)) {
254 input.setStageName(source.name);
255 }
256 }
257
258 for (ConnectionEndPoint output : connection.outputs) {
259 if (output.getStageName().equals(destination.name)) {
260 output.setStageName(source.name);
261 }
262 }
263 }
264 }
265
266 public void prepare() {
267 boolean successful = constructAndVerify();
268
269 if (!successful) {
270 return;
271 }
272
273
274
275
276 if (needsMergeStages()) {
277 addMergeStages(job);
278 constructAndVerify();
279 }
280
281 for (DataPipe pipe : pipes) {
282 pipe.makeDirectories();
283 }
284 }
285
286 public void clear() {
287 connections.clear();
288 stageChildren.clear();
289 stageOrder.clear();
290 stageParents.clear();
291 stages.clear();
292 pipes.clear();
293 }
294
295 /***
296 * Checks to see if any stage has too many inputs.
297 *
298 * @return true, if there is a stage with more than maximumFileInputs.
299 */
300 public boolean needsMergeStages() {
301 for (StageGroupDescription stage : stages.values()) {
302 long totalInputs = 0;
303
304 if (stage.getName().endsWith("mergeStage")) {
305 continue;
306 }
307 for (DataPipeRegion region : stage.inputs.values()) {
308 totalInputs += region.fileCount();
309 }
310
311 if (totalInputs > this.maximumFileInputs) {
312 return true;
313 }
314 }
315
316 return false;
317 }
318
319 /***
320 * Add a merge stage to this job that merges the output of
321 * stageName called pointName.
322 *
323 * @param job The job that should get the new merge stage.
324 * @param stageName The stage that contains the output that needs merging.
325 * @param pointName The output point that needs merging in the stage stageName.
326 */
327 public void addMergeStage(Job job, String stageName, String pointName) {
328
329 Stage inputStage = job.stages.get(stageName);
330 StageConnectionPoint inputPoint = inputStage.getConnection(pointName);
331
332 String className = inputPoint.getClassName();
333 String[] typeOrder = inputPoint.getOrder();
334 String mergedStageName = stageName + "-" + pointName + "-mergeStage";
335 String mergedPointName = pointName + "-merged";
336
337
338 if (job.stages.containsKey(mergedStageName)) {
339 return;
340 }
341 Stage s = new Stage(mergedStageName);
342 s.add(new StageConnectionPoint(ConnectionPointType.Input,
343 pointName,
344 className,
345 typeOrder,
346 null));
347 s.add(new StageConnectionPoint(ConnectionPointType.Output,
348 pointName + "-merged",
349 className,
350 typeOrder,
351 null));
352
353 s.add(new InputStep(pointName));
354 s.add(new OutputStep(mergedPointName));
355 job.add(s);
356
357 String[] hash = null;
358 int hashCount = -1;
359
360
361 for (Connection connection : job.connections) {
362 for (ConnectionEndPoint input : connection.inputs) {
363 if (input.getStageName().equals(stageName) &&
364 input.getPointName().equals(pointName)) {
365 if (hash != null && connection.hash != null &&
366 !Arrays.equals(hash,connection.hash)) {
367 continue;
368 }
369 if (connection.hash != null) {
370 hash = connection.hash;
371 connection.hash = null;
372 }
373
374 input.setStageName(mergedStageName);
375 input.setPointName(mergedPointName);
376 }
377 }
378 }
379
380
381 job.connect(new StagePoint(stageName, pointName),
382 new StagePoint(mergedStageName, pointName),
383 ConnectionAssignmentType.Each,
384 hash,
385 hashCount);
386 }
387
388 /***
389 * Find stages that need to open a lot of files for reading when running,
390 * and add some intermediate merge stages to reduce problems with open files.
391 *
392 * @param job
393 */
394 public void addMergeStages(Job job) {
395
396 for (StageGroupDescription stage : stages.values()) {
397 long totalInputs = 0;
398
399
400 for (DataPipeRegion region : stage.inputs.values()) {
401 totalInputs += region.fileCount();
402 }
403
404
405
406
407 if (totalInputs > this.maximumFileInputs) {
408 Stage s = stage.getStage();
409 ArrayList<Connection> relevantConnections = new ArrayList<Connection>();
410
411
412 for (Connection connection : job.connections) {
413 for (ConnectionEndPoint point : connection.outputs) {
414 if (point.getStageName().equals(s.name)) {
415 relevantConnections.add(connection);
416 }
417 }
418 }
419
420
421
422
423 for (Connection connection : relevantConnections) {
424 if (connection.inputs.size() != 1) {
425 continue;
426 }
427 ConnectionEndPoint endPoint = connection.inputs.get(0);
428 String inputStageName = endPoint.getStageName();
429 String inputPointName = endPoint.getPointName();
430 StageGroupDescription inputStageDesc = stages.get(inputStageName);
431
432
433 if (inputStageDesc == null ||
434 inputStageName.endsWith("mergeStage")) {
435 continue;
436 }
437 if (inputStageDesc.instanceCount > 1) {
438 addMergeStage(job, inputStageName, inputPointName);
439 }
440 }
441 }
442 }
443 }
444
445 private static class EndPointName implements Comparable<EndPointName> {
446 public String stageName;
447 public String pointName;
448 public ConnectionPointType type;
449 public FileLocation location;
450
451 public EndPointName(String stageName, String pointName, ConnectionPointType type, FileLocation location) {
452 this.stageName = stageName;
453 this.pointName = pointName;
454 this.type = type;
455 this.location = location;
456 }
457
458 public EndPointName(String stageName, String pointName, ConnectionPointType type) {
459 this(stageName, pointName, type, null);
460 }
461
462 public int compareTo(EndPointName other) {
463 int result = stageName.compareTo(other.stageName);
464 if (result == 0) {
465 result = pointName.compareTo(other.pointName);
466 }
467 if (result == 0) {
468 result = type.compareTo(other.type);
469 }
470 return result;
471 }
472
473 @Override
474 public int hashCode() {
475 return stageName.hashCode() + 7 * pointName.hashCode() + 15 * type.toString().hashCode();
476 }
477
478 @Override
479 public String toString() {
480 return String.format("%s %s %s", stageName, pointName, type.toString());
481 }
482 }
483
484 /***
485 * In the parameter file, each stage has a connections section that describes a set of
486 * connection endpoints for the stage (inputs and outputs). This method verifies that
487 * all of those endpoints are connected to valid connections, defined under the
488 * connections tag in the job. If the method finds an dangling (unconnected) endpoint,
489 * an error message is added to the ErrorStore.
490 */
491 public void findDanglingEndpoints(final Job job) {
492 TreeSet<EndPointName> endPointNames = new TreeSet();
493
494
495 for (Stage stage : job.stages.values()) {
496
497 StageGroupDescription description = stages.get(stage.name);
498
499
500 for (StageConnectionPoint point : stage.connections.values()) {
501 EndPointName ep = new EndPointName(stage.name, point.getExternalName(), point.
502 getType(), point.location);
503 endPointNames.add(ep);
504 }
505 }
506
507
508
509 for (ConnectionDescription connection : connections.values()) {
510 for (EndPointDescription input : connection.inputs) {
511 EndPointName ep = new EndPointName(input.stage.getName(),
512 input.stagePoint.getExternalName(),
513 input.stagePoint.getType());
514 endPointNames.remove(ep);
515 }
516 for (EndPointDescription output : connection.outputs) {
517 EndPointName ep = new EndPointName(output.stage.getName(),
518 output.stagePoint.getExternalName(),
519 output.stagePoint.getType());
520 endPointNames.remove(ep);
521 }
522 }
523
524
525 for (EndPointName ep : endPointNames) {
526 store.addError(ep.location,
527 ep.stageName + ": No connection references the " + ep.type +
528 " with the name '" + ep.pointName + "'.");
529 }
530 }
531
532 private boolean constructAndVerify() {
533 clear();
534
535
536 for (Stage stage : job.stages.values()) {
537 stages.put(stage.name, new StageGroupDescription(stage));
538 }
539
540
541
542
543 Verification.verify(job, store);
544 if (store.getErrors().size() > 0) {
545 return false;
546 }
547
548
549
550 buildConnections(job);
551 if (store.getErrors().size() > 0) {
552 return false;
553 }
554
555 findDanglingEndpoints(job);
556 if (store.getErrors().size() > 0) {
557 return false;
558 }
559
560 generateDependencies();
561 if (store.getErrors().size() > 0) {
562 return false;
563 }
564
565 determineStageOrder();
566 if (store.getErrors().size() > 0) {
567 return false;
568 }
569
570 countStages();
571 if (store.getErrors().size() > 0) {
572 return false;
573 }
574
575 createPipeObjects();
576 return true;
577 }
578
579 private void generateDependencies() {
580
581 for (StageGroupDescription stage : stages.values()) {
582 stageChildren.put(stage.getName(), new HashSet());
583 stageParents.put(stage.getName(), new HashSet());
584 }
585
586 for (ConnectionDescription connection : connections.values()) {
587 for (EndPointDescription input : connection.inputs) {
588 for (EndPointDescription output : connection.outputs) {
589 stageChildren.get(input.getStageName()).add(output.getStageName());
590 stageParents.get(output.getStageName()).add(input.getStageName());
591 }
592 }
593 }
594 }
595
596 private void determineStageOrder() {
597 ArrayList<String> result = new ArrayList();
598 HashSet<String> used = new HashSet();
599 HashSet<String> batch = new HashSet();
600
601 for (String stageName : stageParents.keySet()) {
602 if (stageParents.get(stageName).size() == 0) {
603 batch.add(stageName);
604 }
605 }
606
607 while (batch.size() > 0) {
608 HashSet<String> nextBatch = new HashSet();
609
610 for (String stageName : batch) {
611 result.add(stageName);
612 used.add(stageName);
613
614 HashSet<String> children = stageChildren.get(stageName);
615
616
617 for (String child : children) {
618 HashSet<String> childParents = stageParents.get(child);
619
620 if (!used.contains(child) && used.containsAll(childParents)) {
621 nextBatch.add(child);
622 }
623 }
624 }
625
626 batch = nextBatch;
627 nextBatch = new HashSet();
628 }
629
630 assert result.size() == stages.size();
631 stageOrder = result;
632 }
633
634 private static class EndPointDescription {
635 public EndPointDescription(ConnectionDescription connection,
636 StageGroupDescription stage,
637 ConnectionEndPoint connectionPoint,
638 StageConnectionPoint stagePoint) {
639 this.connectionPoint = connectionPoint;
640 this.stagePoint = stagePoint;
641 this.stage = stage;
642 this.connection = connection;
643 }
644
645 public String getStageName() {
646 return stage.getName();
647 }
648
649 public StageConnectionPoint getStagePoint() {
650 return stagePoint;
651 }
652
653 public ConnectionEndPoint getConnectionPoint() {
654 return connectionPoint;
655 }
656
657 public ConnectionDescription getConnection() {
658 return connection;
659 }
660 public StageGroupDescription stage;
661 public StageConnectionPoint stagePoint;
662 public ConnectionEndPoint connectionPoint;
663 public ConnectionDescription connection;
664 }
665
666 private class ConnectionDescription {
667 public ConnectionDescription(Connection connection) {
668 this.connection = connection;
669 this.inputs = new ArrayList();
670 this.outputs = new ArrayList();
671 }
672
673 public boolean isHashed() {
674 return connection.hash != null;
675 }
676
677 public int getOutputCount() {
678 int result = 1;
679
680 if (isHashed()) {
681 String hashCount = job.properties.get("hashCount");
682
683 if (connection.getHashCount() > 0) {
684 result = connection.getHashCount();
685 } else if (hashCount != null &&
686 Utility.isInteger(hashCount)) {
687 result = Integer.parseInt(hashCount);
688 } else {
689 result = defaultHashCount;
690 }
691 } else {
692 result = getInputCount();
693 }
694
695 return result;
696 }
697
698 public int getInputCount() {
699 int total = 0;
700
701 for (EndPointDescription input : inputs) {
702 total += input.stage.instanceCount;
703 }
704
705 return total;
706 }
707
708 public String getName() {
709 return connection.getName();
710 }
711
712 public String[] getOrder() {
713 return connection.getOrder();
714 }
715
716 public String[] getHash() {
717 return connection.getHash();
718 }
719
720 private String getClassName() {
721 return connection.getClassName();
722 }
723
724 public DataPipe getPipe() {
725 return pipe;
726 }
727
728 public void setPipe(DataPipe pipe) {
729 this.pipe = pipe;
730 }
731
732 @Override
733 public String toString() {
734 return String.format("%s %s", getClassName(), getName());
735 }
736 public Connection connection;
737 public ArrayList<EndPointDescription> inputs;
738 public ArrayList<EndPointDescription> outputs;
739 public DataPipe pipe;
740 }
741
742 private EndPointDescription createEndPoint(ConnectionDescription connection,
743 ConnectionEndPoint endPoint) {
744 StageGroupDescription stageDescription = stages.get(endPoint.getStageName());
745
746 if (stageDescription == null) {
747 store.addError(endPoint.location,
748 "The stage '" + endPoint.getStageName() + "' was not found.");
749 } else {
750 Stage stage = stageDescription.getStage();
751 StageConnectionPoint point = stage.getConnection(endPoint.getPointName());
752
753 if (point == null) {
754 store.addError(endPoint.location, "The endpoint '" + endPoint.getPointName() + "' wasn't found in this stage, " +
755 "even though there is a connection to it.");
756 } else if (!ConnectionPointType.connectable(endPoint.getType(), point.getType())) {
757 store.addError(endPoint.location,
758 "The endpoint '" + endPoint.getPointName() + "' is in this stage, but it's going the wrong direction.");
759 } else if (!point.getClassName().equals(connection.connection.getClassName())) {
760 store.addError(endPoint.location, "This " + point.getType() + " has a different class name '" + point.getClassName() +
761 " than the connection that connects to it: " + connection.connection.
762 getClassName() + ".");
763 } else if (!Arrays.equals(point.getOrder(), connection.connection.getOrder())) {
764 store.addError(endPoint.location, "This " + point.getType() + " has a different order " + Arrays.toString(point.getOrder()) +
765 " than the connection that connects to it: " + Arrays.toString(
766 connection.connection.getOrder()));
767 } else {
768 return new EndPointDescription(connection, stageDescription, endPoint, point);
769 }
770 }
771
772 return null;
773 }
774
775 private ArrayList<EndPointDescription> createEndPoints(ConnectionDescription connection,
776 ArrayList<ConnectionEndPoint> endPoints) {
777 ArrayList<EndPointDescription> results = new ArrayList<EndPointDescription>();
778
779 for (ConnectionEndPoint endPoint : endPoints) {
780 EndPointDescription epd = createEndPoint(connection, endPoint);
781
782 if (epd != null) {
783 results.add(epd);
784 }
785 }
786
787 return results;
788 }
789
790 /***
791 * Creates ConnectionDescription objects for each connection listed in
792 * the Job object. The ConnectionDescription objects combine information
793 * from the StageConnectionPoints (in the Stages) and the ConnectionEndPoints
794 * (in the Job.Connection objects) to make them easier to access. In the process
795 * of making these objects, this method typechecks all of the connections.
796 */
797 private void buildConnections(final Job job) {
798 for (Connection connection : job.connections) {
799 ConnectionDescription description = new ConnectionDescription(connection);
800
801
802 ErrorHandler handler = store.getErrorHandler(connection.location);
803 Verification.requireClass(connection.getClassName(), handler);
804 Verification.requireOrder(connection.getClassName(), connection.getOrder(), handler);
805
806 if (connection.getHash() != null) {
807 Verification.requireOrder(connection.getClassName(),
808 connection.getHash(),
809 handler);
810 }
811 description.inputs = createEndPoints(description, connection.inputs);
812 description.outputs = createEndPoints(description, connection.outputs);
813 connections.put(connection.getName(), description);
814 }
815 }
816
817 /***
818 * This method computes the number of copies of each stage to run.
819 * The execution count of stage depends on its inputs. If the input
820 * for a stage is hashed 200 ways, for instance, then there will need
821 * to be 200 copies of the stage.
822 */
823 private void countStages() {
824 HashMap<String, HashSet<EndPointDescription>> stageInputs = new HashMap();
825 HashMap<String, HashSet<EndPointDescription>> stageOutputs = new HashMap();
826
827 for (String stageName : stages.keySet()) {
828 stageInputs.put(stageName, new HashSet());
829 stageOutputs.put(stageName, new HashSet());
830 }
831
832 for (ConnectionDescription connection : connections.values()) {
833 for (EndPointDescription endPoint : connection.inputs) {
834 stageOutputs.get(endPoint.stage.getName()).add(endPoint);
835 }
836 for (EndPointDescription endPoint : connection.outputs) {
837 stageInputs.get(endPoint.stage.getName()).add(endPoint);
838 }
839 }
840
841 for (String stageName : stageOrder) {
842 StageGroupDescription stage = stages.get(stageName);
843
844
845 if (stageInputs.get(stageName).size() == 0) {
846 stage.instanceCount = 1;
847 } else {
848
849 int instanceCount = 1;
850 boolean unknown = true;
851
852 HashSet<EndPointDescription> inputs = stageInputs.get(stageName);
853
854 for (EndPointDescription description : inputs) {
855 ConnectionEndPoint point = description.getConnectionPoint();
856 ConnectionAssignmentType assignment = point.getAssignment();
857
858 switch (assignment) {
859 case One:
860 store.addError(point.location,
861 "The 'one' mode is not currently supported.");
862 break;
863
864 case Each:
865 int inputCount = description.connection.getOutputCount();
866
867 if (unknown) {
868 instanceCount = inputCount;
869 unknown = false;
870 } else if (!unknown && instanceCount != inputCount) {
871 store.addError(point.location, "The number of stage instances for '" +
872 stageName + "' is ambiguous (" + inputCount +
873 " or " + instanceCount + ")");
874 }
875 break;
876
877 case Combined:
878
879 break;
880 }
881 }
882
883 if (unknown) {
884 instanceCount = 1;
885 }
886 stage.instanceCount = instanceCount;
887 }
888 }
889 }
890
891 /***
892 * Creates the directory structures to hold the files for this job.
893 */
894 private void createPipeObjects() {
895
896 for (ConnectionDescription connection : connections.values()) {
897
898 String directoryName = temporaryStorage + File.separator + connection.getName();
899 new File(directoryName).mkdir();
900
901 DataPipe pipe = new DataPipe(directoryName,
902 connection.getName(),
903 connection.getClassName(),
904 connection.getOrder(),
905 connection.getHash(),
906 connection.getInputCount(),
907 connection.getOutputCount());
908
909 int startIndex = 0;
910 connection.setPipe(pipe);
911
912 for (EndPointDescription input : connection.inputs) {
913 StageGroupDescription description = stages.get(input.getStageName());
914 description.outputs.put(input.getStagePoint().getInternalName(),
915 new DataPipeRegion(pipe,
916 startIndex,
917 startIndex + description.getInstanceCount(),
918 ConnectionPointType.Input));
919 startIndex += description.getInstanceCount();
920 }
921
922 for (EndPointDescription output : connection.outputs) {
923 StageGroupDescription description = stages.get(output.getStageName());
924 description.inputs.put(output.getStagePoint().getInternalName(),
925 new DataPipeRegion(pipe,
926 0,
927 connection.getOutputCount(),
928 ConnectionPointType.Output));
929 }
930
931 pipes.add(pipe);
932 }
933 }
934
935 public static class JobExecutionStatus {
936
937 HashMap<String, StageExecutionStatus> completedStages = new HashMap<String, StageExecutionStatus>();
938
939 HashSet<String> launchedStages = new HashSet<String>();
940
941
942 HashMap<String, StageExecutionStatus> runningStages = new HashMap<String, StageExecutionStatus>();
943
944 HashSet<String> completedConnections = new HashSet<String>();
945
946 HashMap<String, HashSet<String>> connectionDependencies = new HashMap<String, HashSet<String>>();
947
948
949 HashMap<String, StageGroupDescription> stages;
950
951 String temporaryStorage;
952 StageExecutor executor;
953 Date startDate;
954
955 public JobExecutionStatus(HashMap<String, StageGroupDescription> stages,
956 String temporaryStorage, StageExecutor executor, String masterURL) {
957 this.stages = stages;
958 this.temporaryStorage = temporaryStorage;
959 this.executor = executor;
960 this.startDate = new Date();
961
962 for (StageGroupDescription description : stages.values()) {
963
964 for (DataPipeRegion region : description.outputs.values()) {
965 String pipeName = region.pipe.pipeName;
966
967 if (!connectionDependencies.containsKey(pipeName)) {
968 connectionDependencies.put(
969 pipeName, new HashSet<String>());
970 }
971 connectionDependencies.get(pipeName).add(description.getName());
972 }
973
974 description.setMasterURL(masterURL);
975 }
976 }
977
978 class BlockedExecutionStatus implements StageExecutionStatus {
979 String name;
980 int instances;
981
982 BlockedExecutionStatus(String name, int instances) {
983 this.name = name;
984 this.instances = instances;
985 }
986
987 public String getName() { return name; }
988 public int getBlockedInstances() { return instances; }
989 public int getQueuedInstances() { return 0; }
990 public int getRunningInstances() { return 0; }
991 public int getCompletedInstances() { return 0; }
992 public boolean isDone() { return false; }
993 public List<Exception> getExceptions() { return Collections.EMPTY_LIST; }
994 }
995
996 public synchronized boolean isComplete() {
997 return stages.size() == completedStages.size();
998 }
999
1000 public synchronized Map<String, StageExecutionStatus> getStageStatus() {
1001 Map<String, StageExecutionStatus> result = new TreeMap();
1002
1003 for (String stageName : stages.keySet()) {
1004 int instanceCount = stages.get(stageName).getInstanceCount();
1005 if (completedStages.containsKey(stageName)) {
1006 result.put(stageName, completedStages.get(stageName));
1007 } else if (runningStages.containsKey(stageName)) {
1008 result.put(stageName, runningStages.get(stageName));
1009 } else {
1010 result.put(stageName, new BlockedExecutionStatus(stageName, instanceCount));
1011 }
1012 }
1013
1014 return result;
1015 }
1016
1017 /***
1018 * Returns the start date for this job.
1019 */
1020
1021 public Date getStartDate() {
1022 return startDate;
1023 }
1024
1025 /***
1026 * Returns the total amount of free memory in this JVM.
1027 */
1028
1029 public long getFreeMemory() {
1030 return Runtime.getRuntime().freeMemory();
1031 }
1032
1033 /***
1034 * Returns the maximum amount of memory that can be used by this
1035 * Java virtual machine.
1036 */
1037
1038 public long getMaxMemory() {
1039 return Runtime.getRuntime().maxMemory();
1040 }
1041
1042 public void run() throws InterruptedException, ExecutionException {
1043
1044 while (launchedStages.size() < stages.size()) {
1045
1046 StageGroupDescription description = findRunnableStage(stages.values(), launchedStages,
1047 completedConnections);
1048
1049
1050
1051
1052 while (description == null) {
1053
1054 waitForStages(runningStages, completedStages);
1055 updateCompletedConnections(completedStages, completedConnections,
1056 connectionDependencies);
1057
1058
1059 description = findRunnableStage(stages.values(), launchedStages,
1060 completedConnections);
1061 }
1062
1063 StageExecutionStatus result = executor.execute(description, temporaryStorage);
1064
1065 synchronized(this) {
1066 launchedStages.add(description.stage.name);
1067 runningStages.put(description.stage.name, result);
1068 }
1069 }
1070
1071
1072 while (runningStages.size() > 0) {
1073 waitForStages(runningStages, completedStages);
1074 }
1075 }
1076
1077 /***
1078 * Finds a stage that is ready to run by checking stage dependencies.
1079 *
1080 * @param descriptions
1081 * @param launchedJobs
1082 * @param completedConnections
1083 */
1084 private synchronized StageGroupDescription findRunnableStage(
1085 Collection<StageGroupDescription> descriptions,
1086 HashSet<String> launchedJobs,
1087 HashSet<String> completedConnections) {
1088
1089 for (StageGroupDescription description : descriptions) {
1090
1091 if (launchedJobs.contains(description.getName())) {
1092 continue;
1093 }
1094 boolean allComplete = true;
1095
1096 for (DataPipeRegion region : description.inputs.values()) {
1097
1098 if (!completedConnections.contains(region.pipe.pipeName)) {
1099 allComplete = false;
1100 break;
1101 }
1102 }
1103
1104 if (allComplete) {
1105 return description;
1106 }
1107 }
1108
1109
1110 return null;
1111 }
1112 /***
1113 * Polls all the running stages to see if they've completed. When one completes,
1114 * it is added to completedStages and the method returns.
1115 *
1116 * @param runningStages
1117 * @param completedStages
1118 * @throws java.lang.InterruptedException
1119 * @throws java.util.concurrent.ExecutionException
1120 */
1121 private void waitForStages(
1122 HashMap<String, StageExecutionStatus> runningStages,
1123 final HashMap<String, StageExecutionStatus> completedStages)
1124 throws InterruptedException, ExecutionException {
1125 long delay = 1;
1126
1127 while (runningStages.size() > 0) {
1128 synchronized(this) {
1129 for (String name : runningStages.keySet()) {
1130 StageExecutionStatus status = runningStages.get(name);
1131 if (status.isDone()) {
1132
1133 List<Exception> exceptions = status.getExceptions();
1134 if (exceptions.size() > 0) {
1135 throw new ExecutionException("Stage threw an exception: ", exceptions.get(0));
1136 }
1137 completedStages.put(name, status);
1138 runningStages.remove(name);
1139 return;
1140 }
1141 }
1142 }
1143
1144
1145 delay = Math.min(delay * 2, 1000);
1146 Thread.sleep(delay);
1147 }
1148 }
1149
1150 /***
1151 * Reads through all completed stages, trying to find any connections
1152 * that have been satisfied. By satisfied, we mean that all the input
1153 * has been generated for a particular connection. Once a connection
1154 * is satisfied, stages that require that read input from that connection
1155 * can start running.
1156 *
1157 * @param completedStages
1158 * @param completedConnections
1159 * @param connectionDependencies
1160 */
1161 private synchronized void updateCompletedConnections(
1162 final HashMap<String, StageExecutionStatus> completedStages,
1163 final HashSet<String> completedConnections,
1164 final HashMap<String, HashSet<String>> connectionDependencies) {
1165
1166 for (String pipeName : connectionDependencies.keySet()) {
1167
1168
1169 HashSet<String> pipeInputStages = connectionDependencies.get(pipeName);
1170
1171 if (completedStages.keySet().containsAll(pipeInputStages)) {
1172 completedConnections.add(pipeName);
1173 }
1174 }
1175 }
1176 }
1177
1178 public void run(StageExecutor executor) throws InterruptedException, ExecutionException, UnknownHostException, IOException {
1179 Server server = new Server(Utility.getFreePort());
1180 runWithServer(executor, server);
1181 }
1182
1183 public void runWithServer(StageExecutor executor, Server server) throws ExecutionException, InterruptedException, UnknownHostException {
1184
1185 InetAddress address = java.net.InetAddress.getLocalHost();
1186 int port = server.getConnectors()[0].getPort();
1187 String masterURL = String.format("http://%s:%d", address.getHostAddress(), port);
1188 JobExecutionStatus status = new JobExecutionStatus(stages, temporaryStorage, executor, masterURL);
1189 MasterWebHandler handler = new MasterWebHandler(status);
1190 server.addHandler(handler);
1191 status.run();
1192 handler.waitForFinalPage();
1193 server.removeHandler(handler);
1194 }
1195
1196 public static boolean runLocally(Job job, ErrorStore store) throws IOException,
1197 InterruptedException, ExecutionException, Exception {
1198 StageExecutor executor = StageExecutorFactory.newInstance("local", new String[] {});
1199 File tempFile = Utility.createTemporary();
1200 tempFile.delete();
1201 tempFile.mkdir();
1202
1203 JobExecutor jobExecutor = new JobExecutor(job, tempFile.getAbsolutePath(), store);
1204 jobExecutor.prepare();
1205
1206 if (store.hasStatements()) {
1207 return false;
1208 }
1209
1210 int port = Utility.getFreePort();
1211 Server server = new Server(port);
1212 server.start();
1213 System.out.println("Status: http://localhost:" + port);
1214 try {
1215 jobExecutor.runWithServer(executor, server);
1216 } finally {
1217 server.stop();
1218 executor.shutdown();
1219 }
1220
1221 return !store.hasStatements();
1222 }
1223
1224 public static void main(String[] args) throws ParserConfigurationException, SAXException,
1225 IOException, InterruptedException, ExecutionException {
1226 if (args.length < 3) {
1227 System.out.println("usage: executionModel parameterFile temporaryStorage");
1228 System.out.println(
1229 " where executionModel is one of: local, drmaa, ssh, debug, remotedebug");
1230 System.exit(-1);
1231 }
1232
1233 String executionModel = args[0];
1234 String parameterFile = args[1];
1235 String temporaryStorage = args[2];
1236 String[] remaining = Utility.subarray(args, 3);
1237 StageExecutor executor = null;
1238
1239 Logger logger = Logger.getLogger(JobExecutor.class.toString());
1240 logger.setLevel(Level.INFO);
1241
1242 executor = StageExecutorFactory.newInstance(executionModel, remaining);
1243 ErrorStore store = new ErrorStore();
1244
1245
1246 Job job = parseFile(parameterFile, store);
1247
1248 if (store.hasStatements()) {
1249 System.out.println(store.toString());
1250 System.exit(-1);
1251 }
1252
1253 JobExecutor jobExecutor = new JobExecutor(job, temporaryStorage, store);
1254 jobExecutor.prepare();
1255
1256 if (store.hasStatements()) {
1257 System.out.println(store.toString());
1258 System.exit(-1);
1259 }
1260
1261 jobExecutor.run(executor);
1262 logger.info("Job complete");
1263 executor.shutdown();
1264 }
1265 }