Coverage Report - org.galagosearch.tupleflow.execution.RemoteStageExecutor
 
Classes in this File Line Coverage Branch Coverage Complexity
RemoteStageExecutor
0%
0/25
0%
0/4
0
 
 1  
 // BSD License (http://www.galagosearch.org/license)
 2  
 package org.galagosearch.tupleflow.execution;
 3  
 
 4  
 import java.io.File;
 5  
 import java.io.FileOutputStream;
 6  
 import java.io.ObjectOutputStream;
 7  
 import java.util.ArrayList;
 8  
 import java.util.List;
 9  
 import java.util.concurrent.ExecutionException;
 10  
 import java.util.concurrent.Future;
 11  
 import java.util.concurrent.TimeUnit;
 12  
 
 13  
 /**
 14  
  *
 15  
  * @author trevor
 16  
  */
 17  0
 public abstract class RemoteStageExecutor implements StageExecutor {
 18  
     public abstract StageExecutionStatus submit(String stageName, ArrayList<String> jobPaths, String temporary);
 19  
     public abstract void shutdown();
 20  
 
 21  
     public StageExecutionStatus execute(StageGroupDescription stage, String temporary) {
 22  0
         ArrayList<String> jobPaths = new ArrayList<String>();
 23  
 
 24  
         try {
 25  0
             String output = temporary + File.separator + "stdout";
 26  0
             new File(output).mkdir();
 27  0
             String stderr = temporary + File.separator + "stderr";
 28  0
             new File(stderr).mkdir();
 29  0
             String jobsDirectory = temporary + File.separator + "jobs";
 30  0
             new File(jobsDirectory).mkdir();
 31  0
             String stageJobsDirectory = jobsDirectory + File.separator + stage.getName();
 32  0
             new File(stageJobsDirectory).mkdir();
 33  
 
 34  0
             List<StageInstanceDescription> instances = stage.getInstances();
 35  
 
 36  0
             for (int i = 0; i < instances.size(); i++) {
 37  0
                 File instanceJobFile = new File(stageJobsDirectory + File.separator + i);
 38  0
                 File instanceCheckpoint = new File(
 39  
                         stageJobsDirectory + File.separator + i + ".complete");
 40  
 
 41  0
                 if (instanceCheckpoint.exists()) {
 42  0
                     continue;
 43  
                 }
 44  0
                 ObjectOutputStream instanceJobStream = new ObjectOutputStream(new FileOutputStream(
 45  
                                                                               instanceJobFile));
 46  0
                 StageInstanceDescription instance = instances.get(i);
 47  0
                 instanceJobStream.writeObject(instance);
 48  0
                 instanceJobStream.close();
 49  
 
 50  0
                 jobPaths.add(instanceJobFile.toString());
 51  
             }
 52  0
         } catch (Exception e) {
 53  0
             return new ErrorExecutionStatus(stage.getName(), e);
 54  0
         }
 55  
 
 56  0
         return submit(stage.getName(), jobPaths, temporary);
 57  
     }
 58  
 }