1
2 package org.galagosearch.tupleflow.execution;
3
4 import java.util.ArrayList;
5 import java.util.Collections;
6 import java.util.List;
7 import java.util.concurrent.CountDownLatch;
8 import java.util.concurrent.ExecutionException;
9 import java.util.concurrent.ExecutorService;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.TimeUnit;
14
15 /***
16 * This will eventually allow you to run TupleFlow workers on other machines using
17 * SSH. For now, it doesn't work. If you really need job distribution, try the Grid
18 * Engine plugin.
19 *
20 * @author trevor
21 */
22 public class SSHStageExecutor extends RemoteStageExecutor {
23 private static String machineEndMarker = "STOP#MACHINE#SHUTDOWN";
24 private LinkedBlockingQueue<String> machines = new LinkedBlockingQueue();
25 private LinkedBlockingQueue<StageTask> tasks = new LinkedBlockingQueue();
26 private ExecutorService pool = null;
27 private String commandName;
28
29 public SSHStageExecutor(String commandName, List<String> machines) {
30 this.commandName = commandName;
31 this.machines.addAll(machines);
32 this.pool = Executors.newCachedThreadPool();
33 }
34
35 public class SSHStageExecutionContext implements Runnable, StageExecutionStatus {
36 String name;
37 List<String> jobPaths;
38 boolean done;
39
40 public SSHStageExecutionContext(String name, List<String> jobPaths) {
41 this.name = name;
42 this.jobPaths = jobPaths;
43 this.done = false;
44 }
45
46 public void run() {
47 CountDownLatch latch = new CountDownLatch(jobPaths.size());
48
49 for (String jobPath : jobPaths) {
50
51 StageTask task = new StageTask(commandName, jobPath, latch);
52 pool.execute(task);
53 }
54
55 synchronized(this) { done = true; }
56 }
57
58 public synchronized boolean isDone() {
59 return done;
60 }
61
62 public String getName() {
63 return name;
64 }
65
66 public int getBlockedInstances() {
67 return 0;
68 }
69
70 public int getQueuedInstances() {
71
72 return jobPaths.size();
73 }
74
75 public int getRunningInstances() {
76
77 return 0;
78 }
79
80 public int getCompletedInstances() {
81
82 return 0;
83 }
84
85 public List<Exception> getExceptions() {
86
87 return Collections.EMPTY_LIST;
88 }
89 }
90
91 public class StageTask implements Runnable {
92 private String commandName;
93 private String jobFileArgument;
94 private CountDownLatch latch;
95
96 public StageTask(String commandName, String jobFileArgument, CountDownLatch latch) {
97 this.commandName = commandName;
98 this.jobFileArgument = jobFileArgument;
99 this.latch = latch;
100 }
101
102 public void run() {
103 String machineName;
104
105 try {
106
107 machineName = machines.poll();
108
109
110 if (machineName.equals(machineEndMarker)) {
111 machines.offer(machineEndMarker);
112 return;
113 }
114
115 String[] arguments = new String[]{machineName, jobFileArgument};
116 Process process = Runtime.getRuntime().exec(commandName, arguments);
117
118
119 process.getOutputStream().close();
120
121
122 process.waitFor();
123 } catch (Exception e) {
124
125 e.printStackTrace();
126 } finally {
127 latch.countDown();
128 }
129 }
130
131 public boolean isNullTask() {
132 return commandName == null && jobFileArgument == null && latch == null;
133 }
134 }
135
136 public StageExecutionStatus submit(String stageName, ArrayList<String> jobPaths, String temporary) {
137 SSHStageExecutionContext context = new SSHStageExecutionContext(stageName, jobPaths);
138 new Thread(context).start();
139 return context;
140 }
141
142 public void shutdown() {
143 machines.add(machineEndMarker);
144 pool.shutdown();
145 }
146 }