1
2
3 package org.galagosearch.tupleflow.execution;
4
5 import java.util.ArrayList;
6 import java.util.List;
7 import java.util.concurrent.CountDownLatch;
8 import java.util.concurrent.ExecutionException;
9 import java.util.concurrent.Executor;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.ThreadPoolExecutor;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.TimeoutException;
16 import org.galagosearch.tupleflow.ExNihiloSource;
17
18 /***
19 *
20 * @author trevor
21 */
22 public class ThreadedStageExecutor implements StageExecutor {
23 public static class InstanceRunnable implements Runnable {
24 StageInstanceDescription description;
25 Exception exception;
26 boolean isRunning;
27 boolean isQueued;
28 NetworkedCounterManager counterManager;
29 CountDownLatch latch;
30
31 public InstanceRunnable(StageInstanceDescription description,
32 NetworkedCounterManager manager,
33 CountDownLatch latch) {
34 this.isRunning = false;
35 this.isQueued = true;
36 this.description = description;
37 this.exception = null;
38 this.counterManager = manager;
39 this.latch = latch;
40 }
41
42 public synchronized Exception getException() { return exception; }
43 public synchronized boolean isQueued() { return isQueued; }
44 public synchronized boolean isRunning() { return isRunning; }
45 public synchronized boolean isDone() { return !isQueued && !isRunning; }
46
47 synchronized void setException(Exception e) { this.exception = e; }
48 synchronized void setIsRunning(boolean isRunning) { this.isRunning = isRunning; }
49 synchronized void setIsQueued(boolean isQueued) { this.isQueued = isQueued; }
50
51 public void run() {
52 try {
53 setIsQueued(false);
54 setIsRunning(true);
55 StageInstanceFactory factory = new StageInstanceFactory(counterManager);
56 ExNihiloSource source = factory.instantiate(description);
57 source.run();
58 } catch(Exception e) {
59 setException(e);
60 } finally {
61 latch.countDown();
62 setIsRunning(false);
63 }
64 }
65 }
66
67 public class ThreadedStageContext implements StageExecutionStatus, Runnable {
68 StageGroupDescription stage;
69 String temporaryDirectory;
70 boolean done = false;
71 ArrayList<InstanceRunnable> runnables = new ArrayList();
72 List<StageInstanceDescription> instances;
73 CountDownLatch latch;
74 NetworkedCounterManager counterManager;
75
76 ThreadedStageContext(StageGroupDescription stage, String temporaryDirectory) {
77 this.stage = stage;
78 this.counterManager = new NetworkedCounterManager();
79 this.temporaryDirectory = temporaryDirectory;
80 this.instances = stage.getInstances();
81 this.latch = new CountDownLatch(instances.size());
82 counterManager.start();
83
84 for(StageInstanceDescription instance : instances) {
85 InstanceRunnable runnable = new InstanceRunnable(instance, counterManager, latch);
86 runnables.add(runnable);
87 }
88 }
89
90 public synchronized boolean isDone() {
91 return done;
92 }
93
94 public void run() {
95 for(InstanceRunnable instance : runnables) {
96 threadPool.execute(instance);
97 }
98
99 while (latch.getCount() > 0) {
100 try {
101 latch.await();
102 } catch(InterruptedException e) {
103
104 }
105 }
106
107 synchronized(this) {
108 counterManager.stop();
109 done = true;
110 }
111 }
112
113 public String getName() {
114 return stage.getName();
115 }
116
117 public int getBlockedInstances() {
118 return 0;
119 }
120
121 public synchronized int getQueuedInstances() {
122 int queuedInstances = 0;
123
124 for (InstanceRunnable instance : runnables) {
125 if (instance.isQueued())
126 queuedInstances++;
127 }
128
129 return queuedInstances;
130 }
131
132 public int getRunningInstances() {
133 int runningInstances = 0;
134
135 for (InstanceRunnable instance : runnables) {
136 if (instance.isRunning())
137 runningInstances++;
138 }
139
140 return runningInstances;
141 }
142
143 public int getCompletedInstances() {
144 int completedInstances = 0;
145
146 for (InstanceRunnable instance : runnables) {
147 if (instance.isDone())
148 completedInstances++;
149 }
150
151 return completedInstances;
152 }
153
154 public synchronized List<Exception> getExceptions() {
155 ArrayList<Exception> exceptions = new ArrayList();
156
157 for (InstanceRunnable instance : runnables) {
158 Exception e = instance.getException();
159 if (e != null)
160 exceptions.add(e);
161 }
162
163 return exceptions;
164 }
165 }
166
167 public ThreadedStageExecutor() {
168 threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
169 }
170
171 public ThreadedStageContext execute(StageGroupDescription stage, String temporary) {
172 ThreadedStageContext result = new ThreadedStageContext(stage, temporary);
173 new Thread(result).start();
174 return result;
175 }
176
177 public void shutdown() {
178 threadPool.shutdown();
179 }
180
181 ExecutorService threadPool;
182 }