Coverage Report - org.galagosearch.tupleflow.execution.ThreadedStageExecutor
 
Classes in this File Line Coverage Branch Coverage Complexity
ThreadedStageExecutor
0%
0/9
N/A
0
ThreadedStageExecutor$InstanceRunnable
0%
0/26
0%
0/4
0
ThreadedStageExecutor$ThreadedStageContext
0%
0/50
0%
0/22
0
 
 1  
 // BSD License (http://www.galagosearch.org/license)
 2  
 
 3  
 package org.galagosearch.tupleflow.execution;
 4  
 
 5  
 import java.util.ArrayList;
 6  
 import java.util.List;
 7  
 import java.util.concurrent.CountDownLatch;
 8  
 import java.util.concurrent.ExecutionException;
 9  
 import java.util.concurrent.Executor;
 10  
 import java.util.concurrent.ExecutorService;
 11  
 import java.util.concurrent.Executors;
 12  
 import java.util.concurrent.Future;
 13  
 import java.util.concurrent.ThreadPoolExecutor;
 14  
 import java.util.concurrent.TimeUnit;
 15  
 import java.util.concurrent.TimeoutException;
 16  
 import org.galagosearch.tupleflow.ExNihiloSource;
 17  
 
 18  
 /**
 19  
  *
 20  
  * @author trevor
 21  
  */
 22  0
 public class ThreadedStageExecutor implements StageExecutor {
 23  
     public static class InstanceRunnable implements Runnable {
 24  
         StageInstanceDescription description;
 25  
         Exception exception;
 26  
         boolean isRunning;
 27  
         boolean isQueued;
 28  
         NetworkedCounterManager counterManager;
 29  
         CountDownLatch latch;
 30  
         
 31  
         public InstanceRunnable(StageInstanceDescription description,
 32  
                 NetworkedCounterManager manager,
 33  0
                 CountDownLatch latch) {
 34  0
             this.isRunning = false;
 35  0
             this.isQueued = true;
 36  0
             this.description = description;
 37  0
             this.exception = null;
 38  0
             this.counterManager = manager;
 39  0
             this.latch = latch;
 40  0
         }
 41  
 
 42  0
         public synchronized Exception getException() { return exception; }
 43  0
         public synchronized boolean isQueued() { return isQueued; }
 44  0
         public synchronized boolean isRunning() { return isRunning; }
 45  0
         public synchronized boolean isDone() { return !isQueued && !isRunning; }
 46  
 
 47  0
         synchronized void setException(Exception e) { this.exception = e; }
 48  0
         synchronized void setIsRunning(boolean isRunning) { this.isRunning = isRunning; }
 49  0
         synchronized void setIsQueued(boolean isQueued) { this.isQueued = isQueued; }
 50  
 
 51  
         public void run() {
 52  
             try {
 53  0
                 setIsQueued(false);
 54  0
                 setIsRunning(true);
 55  0
                 StageInstanceFactory factory = new StageInstanceFactory(counterManager);
 56  0
                 ExNihiloSource source = factory.instantiate(description);
 57  0
                 source.run();
 58  0
             } catch(Exception e) {
 59  0
                 setException(e);
 60  
             } finally {
 61  0
                 latch.countDown();
 62  0
                 setIsRunning(false);
 63  0
             }
 64  0
         }
 65  
     }
 66  
     
 67  
     public class ThreadedStageContext implements StageExecutionStatus, Runnable {
 68  
         StageGroupDescription stage;
 69  
         String temporaryDirectory;
 70  0
         boolean done = false;
 71  0
         ArrayList<InstanceRunnable> runnables = new ArrayList();
 72  
         List<StageInstanceDescription> instances;
 73  
         CountDownLatch latch;
 74  
         NetworkedCounterManager counterManager;
 75  
 
 76  0
         ThreadedStageContext(StageGroupDescription stage, String temporaryDirectory) {
 77  0
             this.stage = stage;
 78  0
             this.counterManager = new NetworkedCounterManager();
 79  0
             this.temporaryDirectory = temporaryDirectory;
 80  0
             this.instances = stage.getInstances();
 81  0
             this.latch = new CountDownLatch(instances.size());
 82  0
             counterManager.start();
 83  
 
 84  0
             for(StageInstanceDescription instance : instances) {
 85  0
                 InstanceRunnable runnable = new InstanceRunnable(instance, counterManager, latch);
 86  0
                 runnables.add(runnable);
 87  0
             }
 88  0
         }
 89  
 
 90  
         public synchronized boolean isDone() {
 91  0
             return done;
 92  
         }
 93  
 
 94  
         public void run() {
 95  0
             for(InstanceRunnable instance : runnables) {
 96  0
                 threadPool.execute(instance);
 97  
             }
 98  
 
 99  0
             while (latch.getCount() > 0) {
 100  
                 try {
 101  0
                     latch.await();
 102  0
                 } catch(InterruptedException e) {
 103  
                     // do nothing
 104  0
                 }
 105  
             }
 106  
 
 107  0
             synchronized(this) {
 108  0
                 counterManager.stop();
 109  0
                 done = true;
 110  0
             }
 111  0
         }
 112  
 
 113  
         public String getName() {
 114  0
             return stage.getName();
 115  
         }
 116  
 
 117  
         public int getBlockedInstances() {
 118  0
             return 0;
 119  
         }
 120  
 
 121  
         public synchronized int getQueuedInstances() {
 122  0
             int queuedInstances = 0;
 123  
 
 124  0
             for (InstanceRunnable instance : runnables) {
 125  0
                 if (instance.isQueued())
 126  0
                     queuedInstances++;
 127  
             }
 128  
 
 129  0
             return queuedInstances;
 130  
         }
 131  
 
 132  
         public int getRunningInstances() {
 133  0
             int runningInstances = 0;
 134  
 
 135  0
             for (InstanceRunnable instance : runnables) {
 136  0
                 if (instance.isRunning())
 137  0
                     runningInstances++;
 138  
             }
 139  
 
 140  0
             return runningInstances;
 141  
         }
 142  
 
 143  
         public int getCompletedInstances() {
 144  0
             int completedInstances = 0;
 145  
 
 146  0
             for (InstanceRunnable instance : runnables) {
 147  0
                 if (instance.isDone())
 148  0
                     completedInstances++;
 149  
             }
 150  
 
 151  0
             return completedInstances;
 152  
         }
 153  
 
 154  
         public synchronized List<Exception> getExceptions() {
 155  0
             ArrayList<Exception> exceptions = new ArrayList();
 156  
 
 157  0
             for (InstanceRunnable instance : runnables) {
 158  0
                 Exception e = instance.getException();
 159  0
                 if (e != null)
 160  0
                     exceptions.add(e);
 161  0
             }
 162  
 
 163  0
             return exceptions;
 164  
         }
 165  
     }
 166  
 
 167  0
     public ThreadedStageExecutor() {
 168  0
         threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 169  0
     }
 170  
 
 171  
     public ThreadedStageContext execute(StageGroupDescription stage, String temporary) {
 172  0
         ThreadedStageContext result = new ThreadedStageContext(stage, temporary);
 173  0
         new Thread(result).start();
 174  0
         return result;
 175  
     }
 176  
 
 177  
     public void shutdown() {
 178  0
         threadPool.shutdown();
 179  0
     }
 180  
 
 181  
     ExecutorService threadPool;
 182  
 }