1
2 package org.galagosearch.tupleflow.execution;
3
4 import java.lang.reflect.Method;
5 import java.util.ArrayList;
6 import java.util.HashMap;
7 import java.util.TreeMap;
8 import java.util.regex.Matcher;
9 import java.util.regex.Pattern;
10 import org.galagosearch.tupleflow.Parameters.Parser;
11 import org.galagosearch.tupleflow.Parameters;
12 import org.galagosearch.tupleflow.TupleFlowParameters;
13 import org.xml.sax.Attributes;
14 import org.xml.sax.Locator;
15 import org.xml.sax.SAXException;
16 import org.xml.sax.SAXParseException;
17 import org.xml.sax.helpers.DefaultHandler;
18
19 /***
20 *
21 * @author trevor
22 */
23 public class JobConstructor extends DefaultHandler implements ErrorHandler {
24 Pattern propertyPattern = Pattern.compile("//$//{([^}]+)//}");
25 HashMap<String, String> properties = new HashMap<String, String>();
26 ArrayList<String> errors = new ArrayList<String>();
27 Locator locator;
28 JobHandler jobHandler;
29 ErrorStore store;
30 String fileName;
31
32 /***
33 * Creates a new instance of JobConstructor
34 */
35 public JobConstructor(String fileName, ErrorStore store) {
36 this.fileName = fileName;
37 this.jobHandler = new JobHandler();
38 this.store = store;
39 }
40
41 public JobConstructor(ErrorStore store) {
42 this.fileName = "none";
43 this.jobHandler = new JobHandler();
44 this.store = store;
45 }
46
47 public Job getJob() {
48 return jobHandler.getJob();
49 }
50
51 public ErrorStore getErrorStore() {
52 return store;
53 }
54
55 @Override
56 public void setDocumentLocator(Locator locator) {
57 this.locator = locator;
58 }
59
60 public void addError(String filename, SAXParseException exception) {
61 store.addError(new FileLocation(filename, exception.getLineNumber(), exception.
62 getColumnNumber()), exception.getMessage());
63 }
64
65 public void addError(String errorString) {
66 store.addError(location(), errorString);
67 }
68
69 public void addWarning(String warning) {
70 store.addWarning(location(), warning);
71 }
72
73 @Override
74 public void endElement(String uri, String localName, String qName) throws SAXException {
75 jobHandler.endElement(uri, localName, qName);
76 }
77
78 @Override
79 public void startElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
80 jobHandler.startElement(uri, localName, qName, attributes);
81 }
82
83 private void verifyData(final Attributes attributes) {
84 String className = attributes.getValue("class");
85 String order = attributes.getValue("order");
86 String type = attributes.getValue("type");
87 String hash = attributes.getValue("hash");
88
89 if (className == null) {
90 addError("The data tag requires a class argument.");
91 return;
92 }
93
94 if (type != null && !(type.equals("many") || type.equals("split"))) {
95 addError("The type parameter, if specified, must be 'many' or 'split'.");
96 return;
97 }
98
99 if (!Verification.isClassAvailable(className)) {
100 addError("Couldn't find class: " + className);
101 return;
102 }
103
104 if (order != null && !Verification.isOrderAvailable(className, order.split(" "))) {
105 addError("Couldn't find order: " + order);
106 }
107
108 if (hash != null && !Verification.isOrderAvailable(className, hash.split(" "))) {
109 addError("Couldn't find order: " + hash);
110 }
111 }
112
113 private void verifyStep(final Attributes attributes, final TupleFlowParameters parameters) {
114 String className = attributes.getValue("class");
115
116 if (className == null) {
117 addError("The step tag requires a class argument.");
118 return;
119 }
120
121 if (!Verification.isClassAvailable(className)) {
122 addError("Couldn't find a class named " + className);
123 }
124
125 try {
126 Class c = Class.forName(className);
127 Class[] parameterTypes = new Class[]{TupleFlowParameters.class, ErrorHandler.class};
128 Method m = c.getDeclaredMethod("verify", parameterTypes);
129 m.invoke(null, parameters, this);
130 } catch (Exception e) {
131 addError("Exception thrown during step verification: " + className + " " + e.getCause().
132 getMessage());
133 }
134 }
135
136 @Override
137 public void characters(char[] buffer, int offset, int length) throws SAXException {
138 jobHandler.characters(buffer, offset, length);
139 }
140
141 public FileLocation location() {
142 return new FileLocation(fileName, locator);
143 }
144
145 public class StageHandler extends StackHandler {
146 Stage stage = new Stage();
147
148 @Override
149 public void startHandler(String uri, String localName, String qName, Attributes attributes) throws SAXException {
150 if (attributes.getValue("id") == null) {
151 addError(
152 "'id' is a required attribute of 'stage'.");
153 }
154 stage.name = attributes.getValue("id");
155 stage.location = location();
156 }
157
158 @Override
159 public void endChild(StackHandler handler, String uri, String localName, String qName) throws SAXException {
160 if (handler instanceof StageConnectionsHandler) {
161 stage.connections = ((StageConnectionsHandler) handler).getConnectionPoints();
162 } else if (handler instanceof StepsHandler) {
163 stage.steps = ((StepsHandler) handler).getSteps();
164 }
165 }
166
167 @Override
168 public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
169 if (qName.equals("steps")) {
170 addHandler(new StepsHandler(), uri, localName, qName, attributes);
171 } else if (qName.equals("connections")) {
172 addHandler(new StageConnectionsHandler(), uri, localName, qName, attributes);
173 } else {
174 addError("Unrecognized tag: '" + qName + "', expecting 'steps' or 'connections'.");
175 addHandler(new StackHandler());
176 }
177 }
178
179 private Stage getStage() {
180 return stage;
181 }
182 }
183
184 public class StageConnectionsHandler extends StackHandler {
185 HashMap<String, StageConnectionPoint> connectionPoints = new HashMap<String, StageConnectionPoint>();
186
187 public HashMap<String, StageConnectionPoint> getConnectionPoints() {
188 return connectionPoints;
189 }
190
191 @Override
192 public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
193 String id = attributes.getValue("id");
194 String clazz = attributes.getValue("class");
195 String orderSpec = attributes.getValue("order");
196 String[] order = new String[0];
197
198 if (!(qName.equals("input") || qName.equals("output"))) {
199 addError("Expected 'input' or 'output', not " + qName);
200 return;
201 }
202
203 if (id == null) {
204 addError("'id' is a required attribute of '" + qName + "'.");
205 return;
206 }
207
208 if (clazz == null) {
209 addError("'class' is a required attribute of '" + qName + "'.");
210 return;
211 }
212
213 if (orderSpec != null) {
214 order = orderSpec.split(" ");
215 }
216 if (qName.equals("input")) {
217 connectionPoints.put(id,
218 new StageConnectionPoint(ConnectionPointType.Input,
219 id, clazz, order,
220 location()));
221 } else if (qName.equals("output")) {
222 connectionPoints.put(id,
223 new StageConnectionPoint(ConnectionPointType.Output,
224 id, clazz, order,
225 location()));
226 } else {
227 addError("Tag '" + qName + "' isn't legal in the connections section of a stage.");
228 }
229 }
230 }
231
232 public class JobHandler extends StackHandler {
233 Job job = new Job();
234
235 @Override
236 public void endChild(StackHandler handler, String uri, String localName, String qName) throws SAXException {
237 if (handler instanceof ConnectionsHandler) {
238 job.connections = ((ConnectionsHandler) handler).getConnections();
239 } else if (handler instanceof StagesHandler) {
240 job.stages = ((StagesHandler) handler).getStages();
241 }
242 }
243
244 @Override
245 public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
246 if (qName.equals("job")) {
247
248 } else if (qName.equals("connections")) {
249 addHandler(new ConnectionsHandler(), uri, localName, qName, attributes);
250 } else if (qName.equals("stages")) {
251 addHandler(new StagesHandler(), uri, localName, qName, attributes);
252 } else if (qName.equals("property")) {
253 String name = attributes.getValue("name");
254 String value = attributes.getValue("value");
255
256 if (name == null || value == null) {
257 addError("The 'property' tag requries 'name' and 'value' attributes.");
258 } else {
259 job.properties.put(name, value);
260 properties.put(name, value);
261 }
262 } else {
263 addError("Unrecognized tag: '" + qName + "', expecting 'connections' or 'stages'.");
264 addHandler(new StackHandler());
265 }
266 }
267
268 public Job getJob() {
269 return job;
270 }
271 }
272
273 public class StagesHandler extends StackHandler {
274 TreeMap<String, Stage> stages = new TreeMap<String, Stage>();
275
276 @Override
277 public void endChild(StackHandler handler, String uri, String localName, String qName) throws SAXException {
278 if (handler instanceof StageHandler) {
279 Stage s = ((StageHandler) handler).getStage();
280 if (s.name != null) {
281 stages.put(s.name, s);
282 }
283 }
284 }
285
286 @Override
287 public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
288 if (qName.equals("stage")) {
289 addHandler(new StageHandler(), uri, localName, qName, attributes);
290 } else {
291 addError("Unrecognized tag: '" + qName + "', expecting 'stage'.");
292 addHandler(new StackHandler());
293 }
294 }
295
296 private TreeMap<String, Stage> getStages() {
297 return stages;
298 }
299 }
300
301 public class StepsHandler extends StackHandler {
302 ArrayList<Step> steps = new ArrayList<Step>();
303
304 public ArrayList<Step> getSteps() {
305 return steps;
306 }
307
308 @Override
309 public void endChild(StackHandler handler, String uri, String localName, String qName) throws SAXException {
310 if (handler instanceof StepHandler) {
311 steps.add(((StepHandler) handler).getStep());
312 } else if (handler instanceof MultiHandler) {
313 steps.add(((MultiHandler) handler).getStep());
314 } else {
315 addError("Unknown handler type: " + handler.getClass().toString());
316 }
317 }
318
319 @Override
320 public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
321 if (qName.equals("step")) {
322 addHandler(new StepHandler(), uri, localName, qName, attributes);
323 } else if (qName.equals("multi")) {
324 addHandler(new MultiHandler(), uri, localName, qName, attributes);
325 } else if (qName.equals("output")) {
326 if (attributes.getValue("id") == null) {
327 addError(
328 "'output' requires an 'id' attribute.");
329 }
330 steps.add(new OutputStep(location(), attributes.getValue("id")));
331 } else if (qName.equals("input")) {
332 if (attributes.getValue("id") == null) {
333 addError(
334 "'input' requires an 'id' attribute.");
335 }
336 steps.add(new InputStep(location(), attributes.getValue("id")));
337 } else {
338 addError(
339 "Found '" + qName + "', but was expecting 'step', 'multi', 'input' or 'output'.");
340 }
341 }
342 }
343
344 public class StepHandler extends StackHandler {
345 Step step;
346 Parameters parameters = new Parameters();
347 Parser parametersHandler;
348
349 public StepHandler() throws SAXException {
350 parametersHandler = parameters.getParseHandler();
351 parametersHandler.startElement("", "", "parameters", null);
352 }
353
354 @Override
355 public void startHandler(String uri, String localName, String qName, Attributes attributes) throws SAXException {
356 String className = attributes.getValue("class");
357
358 if (className == null) {
359 addError("'class' is a required attribute of 'step'.");
360 }
361
362 step = new Step(location(), className, parameters);
363 }
364
365 @Override
366 public void unhandledCharacters(char[] buffer, int offset, int length) throws SAXException {
367 String item = new String(buffer, offset, length);
368 Matcher m = propertyPattern.matcher(item);
369 StringBuffer stringBuffer = new StringBuffer();
370
371 while (m.find()) {
372 String key = m.group(1);
373 String value = properties.get(key);
374
375 if (value == null) {
376 addError("Couldn't find a property named '" + key + "'");
377 m.appendReplacement(stringBuffer, "");
378 } else {
379 m.appendReplacement(stringBuffer, value);
380 }
381 }
382 m.appendTail(stringBuffer);
383
384 buffer = stringBuffer.toString().toCharArray();
385 parametersHandler.characters(buffer, 0, buffer.length);
386 }
387
388 @Override
389 public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
390 parametersHandler.startElement(uri, localName, qName, attributes);
391 }
392
393 @Override
394 public void unhandledEndElement(String uri, String localName, String qName) throws SAXException {
395 parametersHandler.endElement(uri, localName, qName);
396 }
397
398 public Step getStep() {
399 return step;
400 }
401 }
402
403 public class MultiHandler extends StackHandler {
404 MultiStep multi = new MultiStep();
405
406 public MultiStep getStep() {
407 return multi;
408 }
409
410 @Override
411 public void endChild(StackHandler handler, String uri, String localName, String qName) throws SAXException {
412 ArrayList<Step> steps = ((StepsHandler) handler).getSteps();
413 multi.groups.add(steps);
414 }
415
416 @Override
417 public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
418 if (!qName.equals("group")) {
419 addError("Found '" + qName + "' but was expecting 'group'.");
420 return;
421 }
422
423 addHandler(new StepsHandler());
424 }
425 }
426
427 public class ConnectionsHandler extends StackHandler {
428 public ArrayList<Connection> connections = new ArrayList<Connection>();
429
430 public ArrayList<Connection> getConnections() {
431 return connections;
432 }
433
434 @Override
435 public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
436 if (!qName.equals("connection")) {
437 addError("Found '" + qName + "' but expected 'connection'");
438 }
439
440 addHandler(new ConnectionHandler(), uri, localName, qName, attributes);
441 }
442
443 @Override
444 public void endChild(StackHandler handler, String uri, String localName, String qName) throws SAXException {
445 connections.add(((ConnectionHandler) handler).getConnection());
446 }
447 }
448
449 public class ConnectionHandler extends StackHandler {
450 public Connection connection;
451
452 public Connection getConnection() {
453 return connection;
454 }
455
456 @Override
457 public void startHandler(String uri, String localName, String qName, Attributes attributes) throws SAXException {
458 String className = attributes.getValue("class");
459
460 if (className == null) {
461 addError("'class' is a required attribute of a connection.");
462 }
463
464 String orderSpec = attributes.getValue("order");
465 String[] order = new String[0];
466
467 if (orderSpec == null) {
468 addError("'order' is a required attribute of a connection.");
469 } else {
470 order = orderSpec.split(" ");
471 }
472
473 String hashSpec = attributes.getValue("hash");
474 String[] hash = null;
475
476 if (hashSpec != null) {
477 hash = hashSpec.split(" ");
478 }
479 String hashCount = attributes.getValue("hashCount");
480 int count = -1;
481
482 if (hashCount != null) {
483 try {
484 count = Integer.parseInt(hashCount);
485 } catch (NumberFormatException e) {
486 addError(
487 "Expected a numeric argument for 'count', but saw '" + hashCount + "' instead.");
488 }
489 }
490
491 Verification.requireClass(className, JobConstructor.this);
492 connection = new Connection(location(), className, order, hash, count);
493 }
494
495 @Override
496 public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
497 String stageName = attributes.getValue("stage");
498 String pointName = attributes.getValue("endpoint");
499
500 if (!qName.equals("input") && !qName.equals("output")) {
501 addError("Found '" + qName + "' but expected 'input' or 'output'.");
502 return;
503 }
504
505 if (stageName == null) {
506 addError("'stage' is a required attribute of '" + qName + "'.");
507 return;
508 }
509
510 if (pointName == null) {
511 addError("'endpoint' is a required attribute of '" + qName + "'.");
512 return;
513 }
514
515 if (qName.equals("input")) {
516 connection.inputs.add(new ConnectionEndPoint(location(), stageName, pointName,
517 ConnectionPointType.Input));
518 } else if (qName.equals("output")) {
519 String assignmentString = attributes.getValue("assignment");
520
521 if (assignmentString == null) {
522 addError("'assignment' is a required attribute of 'output'.");
523 return;
524 }
525
526 ConnectionAssignmentType assignment;
527
528 if (assignmentString.equals("one")) {
529 assignment = ConnectionAssignmentType.One;
530 } else if (assignmentString.equals("each")) {
531 assignment = ConnectionAssignmentType.Each;
532 } else if (assignmentString.equals("combined")) {
533 assignment = ConnectionAssignmentType.Combined;
534 } else {
535 addError("'assignment' needs to be either 'one', 'each', or 'combined' (not '" +
536 assignmentString + "').");
537 return;
538 }
539
540 ConnectionEndPoint point = new ConnectionEndPoint(location(),
541 stageName,
542 pointName,
543 assignment,
544 ConnectionPointType.Output);
545
546 connection.outputs.add(point);
547 }
548 }
549 }
550 }