Coverage Report - org.galagosearch.tupleflow.execution.LocalStageExecutor
 
Classes in this File Line Coverage Branch Coverage Complexity
LocalStageExecutor
0%
0/47
0%
0/8
0
LocalStageExecutor$SequentialExecutionContext
0%
0/51
0%
0/2
0
 
 1  
 // BSD License (http://www.galagosearch.org/license)
 2  
 package org.galagosearch.tupleflow.execution;
 3  
 
 4  
 import java.io.BufferedWriter;
 5  
 import java.io.File;
 6  
 import java.io.FileInputStream;
 7  
 import java.io.FileWriter;
 8  
 import java.io.IOException;
 9  
 import java.io.ObjectInputStream;
 10  
 import java.io.UnsupportedEncodingException;
 11  
 import java.util.ArrayList;
 12  
 import java.util.Collections;
 13  
 import java.util.List;
 14  
 import java.util.logging.Logger;
 15  
 import javax.xml.parsers.ParserConfigurationException;
 16  
 import org.galagosearch.tupleflow.ExNihiloSource;
 17  
 import org.xml.sax.SAXException;
 18  
 
 19  
 /**
 20  
  *
 21  
  * @author trevor
 22  
  */
 23  0
 public class LocalStageExecutor implements StageExecutor {
 24  0
     public static class SequentialExecutionContext implements StageExecutionStatus, Runnable {
 25  
         String name;
 26  
         List<StageInstanceDescription> instances;
 27  0
         ArrayList<Exception> exceptions = new ArrayList();
 28  
 
 29  0
         int queuedInstances = 0;
 30  0
         int runningInstances = 0;
 31  0
         int completedInstances = 0;
 32  0
         boolean done = false;
 33  
 
 34  0
         SequentialExecutionContext(String name, List<StageInstanceDescription> instances) {
 35  0
             this.name = name;
 36  0
             this.instances = instances;
 37  0
             this.queuedInstances = instances.size();
 38  0
         }
 39  
 
 40  
         public synchronized void markDone() {
 41  0
             completedInstances = instances.size();
 42  0
             runningInstances = 0;
 43  0
             completedInstances = 0;
 44  0
             done = true;
 45  0
         }
 46  
 
 47  
         public void run() {
 48  
             try {
 49  0
                 for (StageInstanceDescription instance : instances) {
 50  0
                     NetworkedCounterManager counterManager = new NetworkedCounterManager();
 51  0
                     counterManager.start();
 52  0
                     synchronized(this) {
 53  0
                         runningInstances++;
 54  0
                         queuedInstances--;
 55  0
                     }
 56  0
                     NetworkedCounterManager manager = new NetworkedCounterManager();
 57  0
                     StageInstanceFactory factory = new StageInstanceFactory(manager);
 58  0
                     manager.start();
 59  0
                     ExNihiloSource source = factory.instantiate(instance);
 60  0
                     source.run();
 61  0
                     manager.stop();
 62  0
                     synchronized(this) {
 63  0
                         runningInstances--;
 64  0
                         completedInstances++;
 65  0
                     }
 66  0
                 }
 67  0
             } catch(Exception e) {
 68  0
                 synchronized(this) {
 69  0
                     exceptions.add(e);
 70  0
                 }
 71  
             } finally {
 72  0
                 synchronized(this) {
 73  0
                     done = true;
 74  0
                 }
 75  0
             }
 76  0
         }
 77  
 
 78  
         public String getName() {
 79  0
             return name;
 80  
         }
 81  
 
 82  
         public int getBlockedInstances() {
 83  0
             return 0;
 84  
         }
 85  
 
 86  
         public synchronized int getQueuedInstances() {
 87  0
             return queuedInstances;
 88  
         }
 89  
 
 90  
         public synchronized int getRunningInstances() {
 91  0
             return runningInstances;
 92  
         }
 93  
 
 94  
         public int getCompletedInstances() {
 95  0
             return completedInstances;
 96  
         }
 97  
 
 98  
         public synchronized boolean isDone() {
 99  0
             return done;
 100  
         }
 101  
 
 102  
         public synchronized List<Exception> getExceptions() {
 103  0
             return exceptions;
 104  
         }
 105  
 
 106  
         private synchronized void addException(Exception e) {
 107  0
             exceptions.add(e);
 108  0
         }
 109  
     }
 110  
 
 111  
     public SequentialExecutionContext execute(StageGroupDescription stage, String temporary) {
 112  0
         SequentialExecutionContext context =
 113  
                 new SequentialExecutionContext(stage.getName(), stage.getInstances());
 114  0
         context.run();
 115  0
         return context;
 116  
     }
 117  
 
 118  
     public SequentialExecutionContext execute(StageInstanceDescription stage) {
 119  0
         SequentialExecutionContext context =
 120  
                 new SequentialExecutionContext(stage.getName(), Collections.singletonList(stage));
 121  0
         context.run();
 122  0
         return context;
 123  
     }
 124  
 
 125  
     public StageExecutionStatus execute(String descriptionFile) {
 126  0
         File errorFile = new File(descriptionFile + ".error");
 127  0
         File completeFile = new File(descriptionFile + ".complete");
 128  0
         SequentialExecutionContext result = null;
 129  
 
 130  0
         Logger logger = Logger.getLogger(JobExecutor.class.toString());
 131  0
         StageInstanceDescription stage = null;
 132  
 
 133  
         // try to parse the stage description from disk
 134  
         try {
 135  0
             ObjectInputStream stream =
 136  
                     new ObjectInputStream(new FileInputStream(new File(descriptionFile)));
 137  0
             stage = (StageInstanceDescription) stream.readObject();
 138  0
         } catch(Exception e) {
 139  0
             return new ErrorExecutionStatus("unknown", e);
 140  0
         }
 141  
         
 142  
         // check for a checkpoint file.  If one exists, we quit.
 143  0
         if (completeFile.exists()) {
 144  0
             logger.info("Exiting early because a complete checkpoint was found.");
 145  0
             result = new SequentialExecutionContext(stage.getName(),
 146  
                     Collections.singletonList(stage));
 147  0
             result.markDone();
 148  0
             return result;
 149  
         }
 150  
 
 151  
         // get rid of any error checkpoints
 152  0
         if (errorFile.exists()) {
 153  0
             errorFile.delete();
 154  
         }
 155  
 
 156  0
         result = execute(stage);
 157  
 
 158  
         try {
 159  0
             if (result.getExceptions().size() > 0) {
 160  0
                 Throwable e = result.getExceptions().get(0);
 161  0
                 BufferedWriter writer = new BufferedWriter(new FileWriter(errorFile));
 162  0
                 writer.write(e.toString());
 163  0
                 writer.close();
 164  0
             } else {
 165  0
                 BufferedWriter writer = new BufferedWriter(new FileWriter(completeFile));
 166  0
                 writer.close();
 167  
             }
 168  0
         } catch(Exception e) {
 169  0
             logger.warning("Trouble writing completion/error files: " + errorFile.toString());
 170  0
         }
 171  
 
 172  0
         return result;
 173  
     }
 174  
 
 175  
     public void shutdown() {
 176  0
     }
 177  
 
 178  
     public static void main(String[] args) throws UnsupportedEncodingException, ParserConfigurationException, SAXException, IOException, ClassNotFoundException {
 179  0
         Logger logger = Logger.getLogger(JobExecutor.class.toString());
 180  
 
 181  0
         String stageDescriptionFile = args[0];
 182  0
         logger.info("Initializing: " + stageDescriptionFile);
 183  0
         StageExecutionStatus context = new LocalStageExecutor().execute(stageDescriptionFile);
 184  
 
 185  0
         if (context.getExceptions().size() > 0) {
 186  0
             logger.severe("Exception thrown: " + context.getExceptions().get(0).toString());
 187  
         } else {
 188  0
             logger.info("Job complete");
 189  
         }
 190  0
     }
 191  
 }