View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   package org.galagosearch.tupleflow.execution;
3   
4   import java.io.File;
5   import java.io.FileOutputStream;
6   import java.io.ObjectOutputStream;
7   import java.util.ArrayList;
8   import java.util.List;
9   import java.util.concurrent.ExecutionException;
10  import java.util.concurrent.Future;
11  import java.util.concurrent.TimeUnit;
12  
13  /***
14   *
15   * @author trevor
16   */
17  public abstract class RemoteStageExecutor implements StageExecutor {
18      public abstract StageExecutionStatus submit(String stageName, ArrayList<String> jobPaths, String temporary);
19      public abstract void shutdown();
20  
21      public StageExecutionStatus execute(StageGroupDescription stage, String temporary) {
22          ArrayList<String> jobPaths = new ArrayList<String>();
23  
24          try {
25              String output = temporary + File.separator + "stdout";
26              new File(output).mkdir();
27              String stderr = temporary + File.separator + "stderr";
28              new File(stderr).mkdir();
29              String jobsDirectory = temporary + File.separator + "jobs";
30              new File(jobsDirectory).mkdir();
31              String stageJobsDirectory = jobsDirectory + File.separator + stage.getName();
32              new File(stageJobsDirectory).mkdir();
33  
34              List<StageInstanceDescription> instances = stage.getInstances();
35  
36              for (int i = 0; i < instances.size(); i++) {
37                  File instanceJobFile = new File(stageJobsDirectory + File.separator + i);
38                  File instanceCheckpoint = new File(
39                          stageJobsDirectory + File.separator + i + ".complete");
40  
41                  if (instanceCheckpoint.exists()) {
42                      continue;
43                  }
44                  ObjectOutputStream instanceJobStream = new ObjectOutputStream(new FileOutputStream(
45                                                                                instanceJobFile));
46                  StageInstanceDescription instance = instances.get(i);
47                  instanceJobStream.writeObject(instance);
48                  instanceJobStream.close();
49  
50                  jobPaths.add(instanceJobFile.toString());
51              }
52          } catch (Exception e) {
53              return new ErrorExecutionStatus(stage.getName(), e);
54          }
55  
56          return submit(stage.getName(), jobPaths, temporary);
57      }
58  }