1
2 package org.galagosearch.tupleflow.execution;
3
4 import java.io.BufferedWriter;
5 import java.io.File;
6 import java.io.FileInputStream;
7 import java.io.FileWriter;
8 import java.io.IOException;
9 import java.io.ObjectInputStream;
10 import java.io.UnsupportedEncodingException;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.logging.Logger;
15 import javax.xml.parsers.ParserConfigurationException;
16 import org.galagosearch.tupleflow.ExNihiloSource;
17 import org.xml.sax.SAXException;
18
19 /***
20 *
21 * @author trevor
22 */
23 public class LocalStageExecutor implements StageExecutor {
24 public static class SequentialExecutionContext implements StageExecutionStatus, Runnable {
25 String name;
26 List<StageInstanceDescription> instances;
27 ArrayList<Exception> exceptions = new ArrayList();
28
29 int queuedInstances = 0;
30 int runningInstances = 0;
31 int completedInstances = 0;
32 boolean done = false;
33
34 SequentialExecutionContext(String name, List<StageInstanceDescription> instances) {
35 this.name = name;
36 this.instances = instances;
37 this.queuedInstances = instances.size();
38 }
39
40 public synchronized void markDone() {
41 completedInstances = instances.size();
42 runningInstances = 0;
43 completedInstances = 0;
44 done = true;
45 }
46
47 public void run() {
48 try {
49 for (StageInstanceDescription instance : instances) {
50 NetworkedCounterManager counterManager = new NetworkedCounterManager();
51 counterManager.start();
52 synchronized(this) {
53 runningInstances++;
54 queuedInstances--;
55 }
56 NetworkedCounterManager manager = new NetworkedCounterManager();
57 StageInstanceFactory factory = new StageInstanceFactory(manager);
58 manager.start();
59 ExNihiloSource source = factory.instantiate(instance);
60 source.run();
61 manager.stop();
62 synchronized(this) {
63 runningInstances--;
64 completedInstances++;
65 }
66 }
67 } catch(Exception e) {
68 synchronized(this) {
69 exceptions.add(e);
70 }
71 } finally {
72 synchronized(this) {
73 done = true;
74 }
75 }
76 }
77
78 public String getName() {
79 return name;
80 }
81
82 public int getBlockedInstances() {
83 return 0;
84 }
85
86 public synchronized int getQueuedInstances() {
87 return queuedInstances;
88 }
89
90 public synchronized int getRunningInstances() {
91 return runningInstances;
92 }
93
94 public int getCompletedInstances() {
95 return completedInstances;
96 }
97
98 public synchronized boolean isDone() {
99 return done;
100 }
101
102 public synchronized List<Exception> getExceptions() {
103 return exceptions;
104 }
105
106 private synchronized void addException(Exception e) {
107 exceptions.add(e);
108 }
109 }
110
111 public SequentialExecutionContext execute(StageGroupDescription stage, String temporary) {
112 SequentialExecutionContext context =
113 new SequentialExecutionContext(stage.getName(), stage.getInstances());
114 context.run();
115 return context;
116 }
117
118 public SequentialExecutionContext execute(StageInstanceDescription stage) {
119 SequentialExecutionContext context =
120 new SequentialExecutionContext(stage.getName(), Collections.singletonList(stage));
121 context.run();
122 return context;
123 }
124
125 public StageExecutionStatus execute(String descriptionFile) {
126 File errorFile = new File(descriptionFile + ".error");
127 File completeFile = new File(descriptionFile + ".complete");
128 SequentialExecutionContext result = null;
129
130 Logger logger = Logger.getLogger(JobExecutor.class.toString());
131 StageInstanceDescription stage = null;
132
133
134 try {
135 ObjectInputStream stream =
136 new ObjectInputStream(new FileInputStream(new File(descriptionFile)));
137 stage = (StageInstanceDescription) stream.readObject();
138 } catch(Exception e) {
139 return new ErrorExecutionStatus("unknown", e);
140 }
141
142
143 if (completeFile.exists()) {
144 logger.info("Exiting early because a complete checkpoint was found.");
145 result = new SequentialExecutionContext(stage.getName(),
146 Collections.singletonList(stage));
147 result.markDone();
148 return result;
149 }
150
151
152 if (errorFile.exists()) {
153 errorFile.delete();
154 }
155
156 result = execute(stage);
157
158 try {
159 if (result.getExceptions().size() > 0) {
160 Throwable e = result.getExceptions().get(0);
161 BufferedWriter writer = new BufferedWriter(new FileWriter(errorFile));
162 writer.write(e.toString());
163 writer.close();
164 } else {
165 BufferedWriter writer = new BufferedWriter(new FileWriter(completeFile));
166 writer.close();
167 }
168 } catch(Exception e) {
169 logger.warning("Trouble writing completion/error files: " + errorFile.toString());
170 }
171
172 return result;
173 }
174
175 public void shutdown() {
176 }
177
178 public static void main(String[] args) throws UnsupportedEncodingException, ParserConfigurationException, SAXException, IOException, ClassNotFoundException {
179 Logger logger = Logger.getLogger(JobExecutor.class.toString());
180
181 String stageDescriptionFile = args[0];
182 logger.info("Initializing: " + stageDescriptionFile);
183 StageExecutionStatus context = new LocalStageExecutor().execute(stageDescriptionFile);
184
185 if (context.getExceptions().size() > 0) {
186 logger.severe("Exception thrown: " + context.getExceptions().get(0).toString());
187 } else {
188 logger.info("Job complete");
189 }
190 }
191 }