View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   
3   package org.galagosearch.tupleflow.execution;
4   
5   import java.util.ArrayList;
6   import java.util.List;
7   import java.util.concurrent.CountDownLatch;
8   import java.util.concurrent.ExecutionException;
9   import java.util.concurrent.Executor;
10  import java.util.concurrent.ExecutorService;
11  import java.util.concurrent.Executors;
12  import java.util.concurrent.Future;
13  import java.util.concurrent.ThreadPoolExecutor;
14  import java.util.concurrent.TimeUnit;
15  import java.util.concurrent.TimeoutException;
16  import org.galagosearch.tupleflow.ExNihiloSource;
17  
18  /***
19   *
20   * @author trevor
21   */
22  public class ThreadedStageExecutor implements StageExecutor {
23      public static class InstanceRunnable implements Runnable {
24          StageInstanceDescription description;
25          Exception exception;
26          boolean isRunning;
27          boolean isQueued;
28          NetworkedCounterManager counterManager;
29          CountDownLatch latch;
30          
31          public InstanceRunnable(StageInstanceDescription description,
32                  NetworkedCounterManager manager,
33                  CountDownLatch latch) {
34              this.isRunning = false;
35              this.isQueued = true;
36              this.description = description;
37              this.exception = null;
38              this.counterManager = manager;
39              this.latch = latch;
40          }
41  
42          public synchronized Exception getException() { return exception; }
43          public synchronized boolean isQueued() { return isQueued; }
44          public synchronized boolean isRunning() { return isRunning; }
45          public synchronized boolean isDone() { return !isQueued && !isRunning; }
46  
47          synchronized void setException(Exception e) { this.exception = e; }
48          synchronized void setIsRunning(boolean isRunning) { this.isRunning = isRunning; }
49          synchronized void setIsQueued(boolean isQueued) { this.isQueued = isQueued; }
50  
51          public void run() {
52              try {
53                  setIsQueued(false);
54                  setIsRunning(true);
55                  StageInstanceFactory factory = new StageInstanceFactory(counterManager);
56                  ExNihiloSource source = factory.instantiate(description);
57                  source.run();
58              } catch(Exception e) {
59                  setException(e);
60              } finally {
61                  latch.countDown();
62                  setIsRunning(false);
63              }
64          }
65      }
66      
67      public class ThreadedStageContext implements StageExecutionStatus, Runnable {
68          StageGroupDescription stage;
69          String temporaryDirectory;
70          boolean done = false;
71          ArrayList<InstanceRunnable> runnables = new ArrayList();
72          List<StageInstanceDescription> instances;
73          CountDownLatch latch;
74          NetworkedCounterManager counterManager;
75  
76          ThreadedStageContext(StageGroupDescription stage, String temporaryDirectory) {
77              this.stage = stage;
78              this.counterManager = new NetworkedCounterManager();
79              this.temporaryDirectory = temporaryDirectory;
80              this.instances = stage.getInstances();
81              this.latch = new CountDownLatch(instances.size());
82              counterManager.start();
83  
84              for(StageInstanceDescription instance : instances) {
85                  InstanceRunnable runnable = new InstanceRunnable(instance, counterManager, latch);
86                  runnables.add(runnable);
87              }
88          }
89  
90          public synchronized boolean isDone() {
91              return done;
92          }
93  
94          public void run() {
95              for(InstanceRunnable instance : runnables) {
96                  threadPool.execute(instance);
97              }
98  
99              while (latch.getCount() > 0) {
100                 try {
101                     latch.await();
102                 } catch(InterruptedException e) {
103                     // do nothing
104                 }
105             }
106 
107             synchronized(this) {
108                 counterManager.stop();
109                 done = true;
110             }
111         }
112 
113         public String getName() {
114             return stage.getName();
115         }
116 
117         public int getBlockedInstances() {
118             return 0;
119         }
120 
121         public synchronized int getQueuedInstances() {
122             int queuedInstances = 0;
123 
124             for (InstanceRunnable instance : runnables) {
125                 if (instance.isQueued())
126                     queuedInstances++;
127             }
128 
129             return queuedInstances;
130         }
131 
132         public int getRunningInstances() {
133             int runningInstances = 0;
134 
135             for (InstanceRunnable instance : runnables) {
136                 if (instance.isRunning())
137                     runningInstances++;
138             }
139 
140             return runningInstances;
141         }
142 
143         public int getCompletedInstances() {
144             int completedInstances = 0;
145 
146             for (InstanceRunnable instance : runnables) {
147                 if (instance.isDone())
148                     completedInstances++;
149             }
150 
151             return completedInstances;
152         }
153 
154         public synchronized List<Exception> getExceptions() {
155             ArrayList<Exception> exceptions = new ArrayList();
156 
157             for (InstanceRunnable instance : runnables) {
158                 Exception e = instance.getException();
159                 if (e != null)
160                     exceptions.add(e);
161             }
162 
163             return exceptions;
164         }
165     }
166 
167     public ThreadedStageExecutor() {
168         threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
169     }
170 
171     public ThreadedStageContext execute(StageGroupDescription stage, String temporary) {
172         ThreadedStageContext result = new ThreadedStageContext(stage, temporary);
173         new Thread(result).start();
174         return result;
175     }
176 
177     public void shutdown() {
178         threadPool.shutdown();
179     }
180 
181     ExecutorService threadPool;
182 }