View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   package org.galagosearch.tupleflow.execution;
3   
4   import java.io.BufferedWriter;
5   import java.io.File;
6   import java.io.FileInputStream;
7   import java.io.FileWriter;
8   import java.io.IOException;
9   import java.io.ObjectInputStream;
10  import java.io.UnsupportedEncodingException;
11  import java.util.ArrayList;
12  import java.util.Collections;
13  import java.util.List;
14  import java.util.logging.Logger;
15  import javax.xml.parsers.ParserConfigurationException;
16  import org.galagosearch.tupleflow.ExNihiloSource;
17  import org.xml.sax.SAXException;
18  
19  /***
20   *
21   * @author trevor
22   */
23  public class LocalStageExecutor implements StageExecutor {
24      public static class SequentialExecutionContext implements StageExecutionStatus, Runnable {
25          String name;
26          List<StageInstanceDescription> instances;
27          ArrayList<Exception> exceptions = new ArrayList();
28  
29          int queuedInstances = 0;
30          int runningInstances = 0;
31          int completedInstances = 0;
32          boolean done = false;
33  
34          SequentialExecutionContext(String name, List<StageInstanceDescription> instances) {
35              this.name = name;
36              this.instances = instances;
37              this.queuedInstances = instances.size();
38          }
39  
40          public synchronized void markDone() {
41              completedInstances = instances.size();
42              runningInstances = 0;
43              completedInstances = 0;
44              done = true;
45          }
46  
47          public void run() {
48              try {
49                  for (StageInstanceDescription instance : instances) {
50                      NetworkedCounterManager counterManager = new NetworkedCounterManager();
51                      counterManager.start();
52                      synchronized(this) {
53                          runningInstances++;
54                          queuedInstances--;
55                      }
56                      NetworkedCounterManager manager = new NetworkedCounterManager();
57                      StageInstanceFactory factory = new StageInstanceFactory(manager);
58                      manager.start();
59                      ExNihiloSource source = factory.instantiate(instance);
60                      source.run();
61                      manager.stop();
62                      synchronized(this) {
63                          runningInstances--;
64                          completedInstances++;
65                      }
66                  }
67              } catch(Exception e) {
68                  synchronized(this) {
69                      exceptions.add(e);
70                  }
71              } finally {
72                  synchronized(this) {
73                      done = true;
74                  }
75              }
76          }
77  
78          public String getName() {
79              return name;
80          }
81  
82          public int getBlockedInstances() {
83              return 0;
84          }
85  
86          public synchronized int getQueuedInstances() {
87              return queuedInstances;
88          }
89  
90          public synchronized int getRunningInstances() {
91              return runningInstances;
92          }
93  
94          public int getCompletedInstances() {
95              return completedInstances;
96          }
97  
98          public synchronized boolean isDone() {
99              return done;
100         }
101 
102         public synchronized List<Exception> getExceptions() {
103             return exceptions;
104         }
105 
106         private synchronized void addException(Exception e) {
107             exceptions.add(e);
108         }
109     }
110 
111     public SequentialExecutionContext execute(StageGroupDescription stage, String temporary) {
112         SequentialExecutionContext context =
113                 new SequentialExecutionContext(stage.getName(), stage.getInstances());
114         context.run();
115         return context;
116     }
117 
118     public SequentialExecutionContext execute(StageInstanceDescription stage) {
119         SequentialExecutionContext context =
120                 new SequentialExecutionContext(stage.getName(), Collections.singletonList(stage));
121         context.run();
122         return context;
123     }
124 
125     public StageExecutionStatus execute(String descriptionFile) {
126         File errorFile = new File(descriptionFile + ".error");
127         File completeFile = new File(descriptionFile + ".complete");
128         SequentialExecutionContext result = null;
129 
130         Logger logger = Logger.getLogger(JobExecutor.class.toString());
131         StageInstanceDescription stage = null;
132 
133         // try to parse the stage description from disk
134         try {
135             ObjectInputStream stream =
136                     new ObjectInputStream(new FileInputStream(new File(descriptionFile)));
137             stage = (StageInstanceDescription) stream.readObject();
138         } catch(Exception e) {
139             return new ErrorExecutionStatus("unknown", e);
140         }
141         
142         // check for a checkpoint file.  If one exists, we quit.
143         if (completeFile.exists()) {
144             logger.info("Exiting early because a complete checkpoint was found.");
145             result = new SequentialExecutionContext(stage.getName(),
146                     Collections.singletonList(stage));
147             result.markDone();
148             return result;
149         }
150 
151         // get rid of any error checkpoints
152         if (errorFile.exists()) {
153             errorFile.delete();
154         }
155 
156         result = execute(stage);
157 
158         try {
159             if (result.getExceptions().size() > 0) {
160                 Throwable e = result.getExceptions().get(0);
161                 BufferedWriter writer = new BufferedWriter(new FileWriter(errorFile));
162                 writer.write(e.toString());
163                 writer.close();
164             } else {
165                 BufferedWriter writer = new BufferedWriter(new FileWriter(completeFile));
166                 writer.close();
167             }
168         } catch(Exception e) {
169             logger.warning("Trouble writing completion/error files: " + errorFile.toString());
170         }
171 
172         return result;
173     }
174 
175     public void shutdown() {
176     }
177 
178     public static void main(String[] args) throws UnsupportedEncodingException, ParserConfigurationException, SAXException, IOException, ClassNotFoundException {
179         Logger logger = Logger.getLogger(JobExecutor.class.toString());
180 
181         String stageDescriptionFile = args[0];
182         logger.info("Initializing: " + stageDescriptionFile);
183         StageExecutionStatus context = new LocalStageExecutor().execute(stageDescriptionFile);
184 
185         if (context.getExceptions().size() > 0) {
186             logger.severe("Exception thrown: " + context.getExceptions().get(0).toString());
187         } else {
188             logger.info("Job complete");
189         }
190     }
191 }