Coverage Report - org.galagosearch.tupleflow.execution.SSHStageExecutor
 
Classes in this File Line Coverage Branch Coverage Complexity
SSHStageExecutor
0%
0/16
N/A
0
SSHStageExecutor$SSHStageExecutionContext
0%
0/19
0%
0/2
0
SSHStageExecutor$StageTask
0%
0/18
0%
0/8
0
 
 1  
 // BSD License (http://www.galagosearch.org/license)
 2  
 package org.galagosearch.tupleflow.execution;
 3  
 
 4  
 import java.util.ArrayList;
 5  
 import java.util.Collections;
 6  
 import java.util.List;
 7  
 import java.util.concurrent.CountDownLatch;
 8  
 import java.util.concurrent.ExecutionException;
 9  
 import java.util.concurrent.ExecutorService;
 10  
 import java.util.concurrent.Executors;
 11  
 import java.util.concurrent.Future;
 12  
 import java.util.concurrent.LinkedBlockingQueue;
 13  
 import java.util.concurrent.TimeUnit;
 14  
 
 15  
 /**
 16  
  * This will eventually allow you to run TupleFlow workers on other machines using
 17  
  * SSH.  For now, it doesn't work.  If you really need job distribution, try the Grid
 18  
  * Engine plugin.
 19  
  * 
 20  
  * @author trevor
 21  
  */
 22  0
 public class SSHStageExecutor extends RemoteStageExecutor {
 23  0
     private static String machineEndMarker = "STOP#MACHINE#SHUTDOWN";
 24  0
     private LinkedBlockingQueue<String> machines = new LinkedBlockingQueue();
 25  0
     private LinkedBlockingQueue<StageTask> tasks = new LinkedBlockingQueue();
 26  0
     private ExecutorService pool = null;
 27  
     private String commandName;
 28  
 
 29  0
     public SSHStageExecutor(String commandName, List<String> machines) {
 30  0
         this.commandName = commandName;
 31  0
         this.machines.addAll(machines);
 32  0
         this.pool = Executors.newCachedThreadPool();
 33  0
     }
 34  
 
 35  
     public class SSHStageExecutionContext implements Runnable, StageExecutionStatus {
 36  
         String name;
 37  
         List<String> jobPaths;
 38  
         boolean done;
 39  
 
 40  0
         public SSHStageExecutionContext(String name, List<String> jobPaths) {
 41  0
             this.name = name;
 42  0
             this.jobPaths = jobPaths;
 43  0
             this.done = false;
 44  0
         }
 45  
 
 46  
         public void run() {
 47  0
             CountDownLatch latch = new CountDownLatch(jobPaths.size());
 48  
 
 49  0
             for (String jobPath : jobPaths) {
 50  
                 // submit this job to the queue
 51  0
                 StageTask task = new StageTask(commandName, jobPath, latch);
 52  0
                 pool.execute(task);
 53  0
             }
 54  
 
 55  0
             synchronized(this) { done = true; }
 56  0
         }
 57  
 
 58  
         public synchronized boolean isDone() {
 59  0
             return done;
 60  
         }
 61  
 
 62  
         public String getName() {
 63  0
             return name;
 64  
         }
 65  
 
 66  
         public int getBlockedInstances() {
 67  0
             return 0;
 68  
         }
 69  
 
 70  
         public int getQueuedInstances() {
 71  
             // FIXME
 72  0
             return jobPaths.size();
 73  
         }
 74  
 
 75  
         public int getRunningInstances() {
 76  
             // FIXME
 77  0
             return 0;
 78  
         }
 79  
 
 80  
         public int getCompletedInstances() {
 81  
             // FIXME
 82  0
             return 0;
 83  
         }
 84  
 
 85  
         public List<Exception> getExceptions() {
 86  
             // FIXME
 87  0
             return Collections.EMPTY_LIST;
 88  
         }
 89  
     }
 90  
 
 91  
     public class StageTask implements Runnable {
 92  
         private String commandName;
 93  
         private String jobFileArgument;
 94  
         private CountDownLatch latch;
 95  
 
 96  0
         public StageTask(String commandName, String jobFileArgument, CountDownLatch latch) {
 97  0
             this.commandName = commandName;
 98  0
             this.jobFileArgument = jobFileArgument;
 99  0
             this.latch = latch;
 100  0
         }
 101  
 
 102  
         public void run() {
 103  
             String machineName;
 104  
 
 105  
             try {
 106  
                 // first, wait for a machine reservation
 107  0
                 machineName = machines.poll();
 108  
 
 109  
                 // if we see a shutdown marker, quit, but put the marker back
 110  0
                 if (machineName.equals(machineEndMarker)) {
 111  0
                     machines.offer(machineEndMarker);
 112  
                     return;
 113  
                 }
 114  
 
 115  0
                 String[] arguments = new String[]{machineName, jobFileArgument};
 116  0
                 Process process = Runtime.getRuntime().exec(commandName, arguments);
 117  
 
 118  
                 // close the process stdin
 119  0
                 process.getOutputStream().close();
 120  
                 // BUGBUG: someday we need to trap process stdout/stderr here
 121  
 
 122  0
                 process.waitFor();
 123  0
             } catch (Exception e) {
 124  
                 // BUGBUG: fix this too
 125  0
                 e.printStackTrace();
 126  
             } finally {
 127  0
                 latch.countDown();
 128  0
             }
 129  0
         }
 130  
 
 131  
         public boolean isNullTask() {
 132  0
             return commandName == null && jobFileArgument == null && latch == null;
 133  
         }
 134  
     }
 135  
 
 136  
     public StageExecutionStatus submit(String stageName, ArrayList<String> jobPaths, String temporary) {
 137  0
         SSHStageExecutionContext context = new SSHStageExecutionContext(stageName, jobPaths);
 138  0
         new Thread(context).start();
 139  0
         return context;
 140  
     }
 141  
 
 142  
     public void shutdown() {
 143  0
         machines.add(machineEndMarker);
 144  0
         pool.shutdown();
 145  0
     }
 146  
 }