1
2 package org.galagosearch.tupleflow.execution;
3
4 import java.io.File;
5 import java.io.FileOutputStream;
6 import java.io.ObjectOutputStream;
7 import java.util.ArrayList;
8 import java.util.List;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.TimeUnit;
12
13 /***
14 *
15 * @author trevor
16 */
17 public abstract class RemoteStageExecutor implements StageExecutor {
18 public abstract StageExecutionStatus submit(String stageName, ArrayList<String> jobPaths, String temporary);
19 public abstract void shutdown();
20
21 public StageExecutionStatus execute(StageGroupDescription stage, String temporary) {
22 ArrayList<String> jobPaths = new ArrayList<String>();
23
24 try {
25 String output = temporary + File.separator + "stdout";
26 new File(output).mkdir();
27 String stderr = temporary + File.separator + "stderr";
28 new File(stderr).mkdir();
29 String jobsDirectory = temporary + File.separator + "jobs";
30 new File(jobsDirectory).mkdir();
31 String stageJobsDirectory = jobsDirectory + File.separator + stage.getName();
32 new File(stageJobsDirectory).mkdir();
33
34 List<StageInstanceDescription> instances = stage.getInstances();
35
36 for (int i = 0; i < instances.size(); i++) {
37 File instanceJobFile = new File(stageJobsDirectory + File.separator + i);
38 File instanceCheckpoint = new File(
39 stageJobsDirectory + File.separator + i + ".complete");
40
41 if (instanceCheckpoint.exists()) {
42 continue;
43 }
44 ObjectOutputStream instanceJobStream = new ObjectOutputStream(new FileOutputStream(
45 instanceJobFile));
46 StageInstanceDescription instance = instances.get(i);
47 instanceJobStream.writeObject(instance);
48 instanceJobStream.close();
49
50 jobPaths.add(instanceJobFile.toString());
51 }
52 } catch (Exception e) {
53 return new ErrorExecutionStatus(stage.getName(), e);
54 }
55
56 return submit(stage.getName(), jobPaths, temporary);
57 }
58 }