View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   package org.galagosearch.tupleflow.execution;
3   
4   import java.util.ArrayList;
5   import java.util.Collections;
6   import java.util.List;
7   import java.util.concurrent.CountDownLatch;
8   import java.util.concurrent.ExecutionException;
9   import java.util.concurrent.ExecutorService;
10  import java.util.concurrent.Executors;
11  import java.util.concurrent.Future;
12  import java.util.concurrent.LinkedBlockingQueue;
13  import java.util.concurrent.TimeUnit;
14  
15  /***
16   * This will eventually allow you to run TupleFlow workers on other machines using
17   * SSH.  For now, it doesn't work.  If you really need job distribution, try the Grid
18   * Engine plugin.
19   * 
20   * @author trevor
21   */
22  public class SSHStageExecutor extends RemoteStageExecutor {
23      private static String machineEndMarker = "STOP#MACHINE#SHUTDOWN";
24      private LinkedBlockingQueue<String> machines = new LinkedBlockingQueue();
25      private LinkedBlockingQueue<StageTask> tasks = new LinkedBlockingQueue();
26      private ExecutorService pool = null;
27      private String commandName;
28  
29      public SSHStageExecutor(String commandName, List<String> machines) {
30          this.commandName = commandName;
31          this.machines.addAll(machines);
32          this.pool = Executors.newCachedThreadPool();
33      }
34  
35      public class SSHStageExecutionContext implements Runnable, StageExecutionStatus {
36          String name;
37          List<String> jobPaths;
38          boolean done;
39  
40          public SSHStageExecutionContext(String name, List<String> jobPaths) {
41              this.name = name;
42              this.jobPaths = jobPaths;
43              this.done = false;
44          }
45  
46          public void run() {
47              CountDownLatch latch = new CountDownLatch(jobPaths.size());
48  
49              for (String jobPath : jobPaths) {
50                  // submit this job to the queue
51                  StageTask task = new StageTask(commandName, jobPath, latch);
52                  pool.execute(task);
53              }
54  
55              synchronized(this) { done = true; }
56          }
57  
58          public synchronized boolean isDone() {
59              return done;
60          }
61  
62          public String getName() {
63              return name;
64          }
65  
66          public int getBlockedInstances() {
67              return 0;
68          }
69  
70          public int getQueuedInstances() {
71              // FIXME
72              return jobPaths.size();
73          }
74  
75          public int getRunningInstances() {
76              // FIXME
77              return 0;
78          }
79  
80          public int getCompletedInstances() {
81              // FIXME
82              return 0;
83          }
84  
85          public List<Exception> getExceptions() {
86              // FIXME
87              return Collections.EMPTY_LIST;
88          }
89      }
90  
91      public class StageTask implements Runnable {
92          private String commandName;
93          private String jobFileArgument;
94          private CountDownLatch latch;
95  
96          public StageTask(String commandName, String jobFileArgument, CountDownLatch latch) {
97              this.commandName = commandName;
98              this.jobFileArgument = jobFileArgument;
99              this.latch = latch;
100         }
101 
102         public void run() {
103             String machineName;
104 
105             try {
106                 // first, wait for a machine reservation
107                 machineName = machines.poll();
108 
109                 // if we see a shutdown marker, quit, but put the marker back
110                 if (machineName.equals(machineEndMarker)) {
111                     machines.offer(machineEndMarker);
112                     return;
113                 }
114 
115                 String[] arguments = new String[]{machineName, jobFileArgument};
116                 Process process = Runtime.getRuntime().exec(commandName, arguments);
117 
118                 // close the process stdin
119                 process.getOutputStream().close();
120                 // BUGBUG: someday we need to trap process stdout/stderr here
121 
122                 process.waitFor();
123             } catch (Exception e) {
124                 // BUGBUG: fix this too
125                 e.printStackTrace();
126             } finally {
127                 latch.countDown();
128             }
129         }
130 
131         public boolean isNullTask() {
132             return commandName == null && jobFileArgument == null && latch == null;
133         }
134     }
135 
136     public StageExecutionStatus submit(String stageName, ArrayList<String> jobPaths, String temporary) {
137         SSHStageExecutionContext context = new SSHStageExecutionContext(stageName, jobPaths);
138         new Thread(context).start();
139         return context;
140     }
141 
142     public void shutdown() {
143         machines.add(machineEndMarker);
144         pool.shutdown();
145     }
146 }