View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   package org.galagosearch.tupleflow.execution;
3   
4   import java.lang.reflect.Method;
5   import java.util.ArrayList;
6   import java.util.HashMap;
7   import java.util.TreeMap;
8   import java.util.regex.Matcher;
9   import java.util.regex.Pattern;
10  import org.galagosearch.tupleflow.Parameters.Parser;
11  import org.galagosearch.tupleflow.Parameters;
12  import org.galagosearch.tupleflow.TupleFlowParameters;
13  import org.xml.sax.Attributes;
14  import org.xml.sax.Locator;
15  import org.xml.sax.SAXException;
16  import org.xml.sax.SAXParseException;
17  import org.xml.sax.helpers.DefaultHandler;
18  
19  /***
20   *
21   * @author trevor
22   */
23  public class JobConstructor extends DefaultHandler implements ErrorHandler {
24      Pattern propertyPattern = Pattern.compile("//$//{([^}]+)//}");
25      HashMap<String, String> properties = new HashMap<String, String>();
26      ArrayList<String> errors = new ArrayList<String>();
27      Locator locator;
28      JobHandler jobHandler;
29      ErrorStore store;
30      String fileName;
31  
32      /***
33       * Creates a new instance of JobConstructor
34       */
35      public JobConstructor(String fileName, ErrorStore store) {
36          this.fileName = fileName;
37          this.jobHandler = new JobHandler();
38          this.store = store;
39      }
40  
41      public JobConstructor(ErrorStore store) {
42          this.fileName = "none";
43          this.jobHandler = new JobHandler();
44          this.store = store;
45      }
46  
47      public Job getJob() {
48          return jobHandler.getJob();
49      }
50  
51      public ErrorStore getErrorStore() {
52          return store;
53      }
54  
55      @Override
56      public void setDocumentLocator(Locator locator) {
57          this.locator = locator;
58      }
59  
60      public void addError(String filename, SAXParseException exception) {
61          store.addError(new FileLocation(filename, exception.getLineNumber(), exception.
62                                          getColumnNumber()), exception.getMessage());
63      }
64  
65      public void addError(String errorString) {
66          store.addError(location(), errorString);
67      }
68  
69      public void addWarning(String warning) {
70          store.addWarning(location(), warning);
71      }
72  
73      @Override
74      public void endElement(String uri, String localName, String qName) throws SAXException {
75          jobHandler.endElement(uri, localName, qName);
76      }
77  
78      @Override
79      public void startElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
80          jobHandler.startElement(uri, localName, qName, attributes);
81      }
82  
83      private void verifyData(final Attributes attributes) {
84          String className = attributes.getValue("class");
85          String order = attributes.getValue("order");
86          String type = attributes.getValue("type");
87          String hash = attributes.getValue("hash");
88  
89          if (className == null) {
90              addError("The data tag requires a class argument.");
91              return;
92          }
93  
94          if (type != null && !(type.equals("many") || type.equals("split"))) {
95              addError("The type parameter, if specified, must be 'many' or 'split'.");
96              return;
97          }
98  
99          if (!Verification.isClassAvailable(className)) {
100             addError("Couldn't find class: " + className);
101             return;
102         }
103 
104         if (order != null && !Verification.isOrderAvailable(className, order.split(" "))) {
105             addError("Couldn't find order: " + order);
106         }
107 
108         if (hash != null && !Verification.isOrderAvailable(className, hash.split(" "))) {
109             addError("Couldn't find order: " + hash);
110         }
111     }
112 
113     private void verifyStep(final Attributes attributes, final TupleFlowParameters parameters) {
114         String className = attributes.getValue("class");
115 
116         if (className == null) {
117             addError("The step tag requires a class argument.");
118             return;
119         }
120 
121         if (!Verification.isClassAvailable(className)) {
122             addError("Couldn't find a class named " + className);
123         }
124 
125         try {
126             Class c = Class.forName(className);
127             Class[] parameterTypes = new Class[]{TupleFlowParameters.class, ErrorHandler.class};
128             Method m = c.getDeclaredMethod("verify", parameterTypes);
129             m.invoke(null, parameters, this);
130         } catch (Exception e) {
131             addError("Exception thrown during step verification: " + className + " " + e.getCause().
132                      getMessage());
133         }
134     }
135 
136     @Override
137     public void characters(char[] buffer, int offset, int length) throws SAXException {
138         jobHandler.characters(buffer, offset, length);
139     }
140 
141     public FileLocation location() {
142         return new FileLocation(fileName, locator);
143     }
144     // Handlers section
145     public class StageHandler extends StackHandler {
146         Stage stage = new Stage();
147 
148         @Override
149         public void startHandler(String uri, String localName, String qName, Attributes attributes) throws SAXException {
150             if (attributes.getValue("id") == null) {
151                 addError(
152                         "'id' is a required attribute of 'stage'.");
153             }
154             stage.name = attributes.getValue("id");
155             stage.location = location();
156         }
157 
158         @Override
159         public void endChild(StackHandler handler, String uri, String localName, String qName) throws SAXException {
160             if (handler instanceof StageConnectionsHandler) {
161                 stage.connections = ((StageConnectionsHandler) handler).getConnectionPoints();
162             } else if (handler instanceof StepsHandler) {
163                 stage.steps = ((StepsHandler) handler).getSteps();
164             }
165         }
166 
167         @Override
168         public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
169             if (qName.equals("steps")) {
170                 addHandler(new StepsHandler(), uri, localName, qName, attributes);
171             } else if (qName.equals("connections")) {
172                 addHandler(new StageConnectionsHandler(), uri, localName, qName, attributes);
173             } else {
174                 addError("Unrecognized tag: '" + qName + "', expecting 'steps' or 'connections'.");
175                 addHandler(new StackHandler()); // ignore subtags
176             }
177         }
178 
179         private Stage getStage() {
180             return stage;
181         }
182     }
183 
184     public class StageConnectionsHandler extends StackHandler {
185         HashMap<String, StageConnectionPoint> connectionPoints = new HashMap<String, StageConnectionPoint>();
186 
187         public HashMap<String, StageConnectionPoint> getConnectionPoints() {
188             return connectionPoints;
189         }
190 
191         @Override
192         public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
193             String id = attributes.getValue("id");
194             String clazz = attributes.getValue("class");
195             String orderSpec = attributes.getValue("order");
196             String[] order = new String[0];
197 
198             if (!(qName.equals("input") || qName.equals("output"))) {
199                 addError("Expected 'input' or 'output', not " + qName);
200                 return;
201             }
202 
203             if (id == null) {
204                 addError("'id' is a required attribute of '" + qName + "'.");
205                 return;
206             }
207 
208             if (clazz == null) {
209                 addError("'class' is a required attribute of '" + qName + "'.");
210                 return;
211             }
212 
213             if (orderSpec != null) {
214                 order = orderSpec.split(" ");
215             }
216             if (qName.equals("input")) {
217                 connectionPoints.put(id,
218                                      new StageConnectionPoint(ConnectionPointType.Input,
219                                                               id, clazz, order,
220                                                               location()));
221             } else if (qName.equals("output")) {
222                 connectionPoints.put(id,
223                                      new StageConnectionPoint(ConnectionPointType.Output,
224                                                               id, clazz, order,
225                                                               location()));
226             } else {
227                 addError("Tag '" + qName + "' isn't legal in the connections section of a stage.");
228             }
229         }
230     }
231 
232     public class JobHandler extends StackHandler {
233         Job job = new Job();
234 
235         @Override
236         public void endChild(StackHandler handler, String uri, String localName, String qName) throws SAXException {
237             if (handler instanceof ConnectionsHandler) {
238                 job.connections = ((ConnectionsHandler) handler).getConnections();
239             } else if (handler instanceof StagesHandler) {
240                 job.stages = ((StagesHandler) handler).getStages();
241             }
242         }
243 
244         @Override
245         public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
246             if (qName.equals("job")) {
247                 // do nothing
248             } else if (qName.equals("connections")) {
249                 addHandler(new ConnectionsHandler(), uri, localName, qName, attributes);
250             } else if (qName.equals("stages")) {
251                 addHandler(new StagesHandler(), uri, localName, qName, attributes);
252             } else if (qName.equals("property")) {
253                 String name = attributes.getValue("name");
254                 String value = attributes.getValue("value");
255 
256                 if (name == null || value == null) {
257                     addError("The 'property' tag requries 'name' and 'value' attributes.");
258                 } else {
259                     job.properties.put(name, value);
260                     properties.put(name, value);
261                 }
262             } else {
263                 addError("Unrecognized tag: '" + qName + "', expecting 'connections' or 'stages'.");
264                 addHandler(new StackHandler()); // ignore subtags
265             }
266         }
267 
268         public Job getJob() {
269             return job;
270         }
271     }
272 
273     public class StagesHandler extends StackHandler {
274         TreeMap<String, Stage> stages = new TreeMap<String, Stage>();
275 
276         @Override
277         public void endChild(StackHandler handler, String uri, String localName, String qName) throws SAXException {
278             if (handler instanceof StageHandler) {
279                 Stage s = ((StageHandler) handler).getStage();
280                 if (s.name != null) {
281                     stages.put(s.name, s);
282                 }
283             }
284         }
285 
286         @Override
287         public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
288             if (qName.equals("stage")) {
289                 addHandler(new StageHandler(), uri, localName, qName, attributes);
290             } else {
291                 addError("Unrecognized tag: '" + qName + "', expecting 'stage'.");
292                 addHandler(new StackHandler()); // ignore subtags
293             }
294         }
295 
296         private TreeMap<String, Stage> getStages() {
297             return stages;
298         }
299     }
300 
301     public class StepsHandler extends StackHandler {
302         ArrayList<Step> steps = new ArrayList<Step>();
303 
304         public ArrayList<Step> getSteps() {
305             return steps;
306         }
307 
308         @Override
309         public void endChild(StackHandler handler, String uri, String localName, String qName) throws SAXException {
310             if (handler instanceof StepHandler) {
311                 steps.add(((StepHandler) handler).getStep());
312             } else if (handler instanceof MultiHandler) {
313                 steps.add(((MultiHandler) handler).getStep());
314             } else {
315                 addError("Unknown handler type: " + handler.getClass().toString());
316             }
317         }
318 
319         @Override
320         public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
321             if (qName.equals("step")) {
322                 addHandler(new StepHandler(), uri, localName, qName, attributes);
323             } else if (qName.equals("multi")) {
324                 addHandler(new MultiHandler(), uri, localName, qName, attributes);
325             } else if (qName.equals("output")) {
326                 if (attributes.getValue("id") == null) {
327                     addError(
328                             "'output' requires an 'id' attribute.");
329                 }
330                 steps.add(new OutputStep(location(), attributes.getValue("id")));
331             } else if (qName.equals("input")) {
332                 if (attributes.getValue("id") == null) {
333                     addError(
334                             "'input' requires an 'id' attribute.");
335                 }
336                 steps.add(new InputStep(location(), attributes.getValue("id")));
337             } else {
338                 addError(
339                         "Found '" + qName + "', but was expecting 'step', 'multi', 'input' or 'output'.");
340             }
341         }
342     }
343 
344     public class StepHandler extends StackHandler {
345         Step step;
346         Parameters parameters = new Parameters();
347         Parser parametersHandler;
348 
349         public StepHandler() throws SAXException {
350             parametersHandler = parameters.getParseHandler();
351             parametersHandler.startElement("", "", "parameters", null);
352         }
353 
354         @Override
355         public void startHandler(String uri, String localName, String qName, Attributes attributes) throws SAXException {
356             String className = attributes.getValue("class");
357 
358             if (className == null) {
359                 addError("'class' is a required attribute of 'step'.");
360             }
361 
362             step = new Step(location(), className, parameters);
363         }
364 
365         @Override
366         public void unhandledCharacters(char[] buffer, int offset, int length) throws SAXException {
367             String item = new String(buffer, offset, length);
368             Matcher m = propertyPattern.matcher(item);
369             StringBuffer stringBuffer = new StringBuffer();
370 
371             while (m.find()) {
372                 String key = m.group(1);
373                 String value = properties.get(key);
374 
375                 if (value == null) {
376                     addError("Couldn't find a property named '" + key + "'");
377                     m.appendReplacement(stringBuffer, "");
378                 } else {
379                     m.appendReplacement(stringBuffer, value);
380                 }
381             }
382             m.appendTail(stringBuffer);
383 
384             buffer = stringBuffer.toString().toCharArray();
385             parametersHandler.characters(buffer, 0, buffer.length);
386         }
387 
388         @Override
389         public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
390             parametersHandler.startElement(uri, localName, qName, attributes);
391         }
392 
393         @Override
394         public void unhandledEndElement(String uri, String localName, String qName) throws SAXException {
395             parametersHandler.endElement(uri, localName, qName);
396         }
397 
398         public Step getStep() {
399             return step;
400         }
401     }
402 
403     public class MultiHandler extends StackHandler {
404         MultiStep multi = new MultiStep();
405 
406         public MultiStep getStep() {
407             return multi;
408         }
409 
410         @Override
411         public void endChild(StackHandler handler, String uri, String localName, String qName) throws SAXException {
412             ArrayList<Step> steps = ((StepsHandler) handler).getSteps();
413             multi.groups.add(steps);
414         }
415 
416         @Override
417         public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
418             if (!qName.equals("group")) {
419                 addError("Found '" + qName + "' but was expecting 'group'.");
420                 return;
421             }
422 
423             addHandler(new StepsHandler());
424         }
425     }
426 
427     public class ConnectionsHandler extends StackHandler {
428         public ArrayList<Connection> connections = new ArrayList<Connection>();
429 
430         public ArrayList<Connection> getConnections() {
431             return connections;
432         }
433 
434         @Override
435         public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
436             if (!qName.equals("connection")) {
437                 addError("Found '" + qName + "' but expected 'connection'");
438             }
439 
440             addHandler(new ConnectionHandler(), uri, localName, qName, attributes);
441         }
442 
443         @Override
444         public void endChild(StackHandler handler, String uri, String localName, String qName) throws SAXException {
445             connections.add(((ConnectionHandler) handler).getConnection());
446         }
447     }
448 
449     public class ConnectionHandler extends StackHandler {
450         public Connection connection;
451 
452         public Connection getConnection() {
453             return connection;
454         }
455 
456         @Override
457         public void startHandler(String uri, String localName, String qName, Attributes attributes) throws SAXException {
458             String className = attributes.getValue("class");
459 
460             if (className == null) {
461                 addError("'class' is a required attribute of a connection.");
462             }
463 
464             String orderSpec = attributes.getValue("order");
465             String[] order = new String[0];
466 
467             if (orderSpec == null) {
468                 addError("'order' is a required attribute of a connection.");
469             } else {
470                 order = orderSpec.split(" ");
471             }
472 
473             String hashSpec = attributes.getValue("hash");
474             String[] hash = null;
475 
476             if (hashSpec != null) {
477                 hash = hashSpec.split(" ");
478             }
479             String hashCount = attributes.getValue("hashCount");
480             int count = -1;
481 
482             if (hashCount != null) {
483                 try {
484                     count = Integer.parseInt(hashCount);
485                 } catch (NumberFormatException e) {
486                     addError(
487                             "Expected a numeric argument for 'count', but saw '" + hashCount + "' instead.");
488                 }
489             }
490 
491             Verification.requireClass(className, JobConstructor.this);
492             connection = new Connection(location(), className, order, hash, count);
493         }
494 
495         @Override
496         public void unhandledStartElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
497             String stageName = attributes.getValue("stage");
498             String pointName = attributes.getValue("endpoint");
499 
500             if (!qName.equals("input") && !qName.equals("output")) {
501                 addError("Found '" + qName + "' but expected 'input' or 'output'.");
502                 return;
503             }
504 
505             if (stageName == null) {
506                 addError("'stage' is a required attribute of '" + qName + "'.");
507                 return;
508             }
509 
510             if (pointName == null) {
511                 addError("'endpoint' is a required attribute of '" + qName + "'.");
512                 return;
513             }
514 
515             if (qName.equals("input")) {
516                 connection.inputs.add(new ConnectionEndPoint(location(), stageName, pointName,
517                                                              ConnectionPointType.Input));
518             } else if (qName.equals("output")) {
519                 String assignmentString = attributes.getValue("assignment");
520 
521                 if (assignmentString == null) {
522                     addError("'assignment' is a required attribute of 'output'.");
523                     return;
524                 }
525 
526                 ConnectionAssignmentType assignment;
527 
528                 if (assignmentString.equals("one")) {
529                     assignment = ConnectionAssignmentType.One;
530                 } else if (assignmentString.equals("each")) {
531                     assignment = ConnectionAssignmentType.Each;
532                 } else if (assignmentString.equals("combined")) {
533                     assignment = ConnectionAssignmentType.Combined;
534                 } else {
535                     addError("'assignment' needs to be either 'one', 'each', or 'combined' (not '" +
536                              assignmentString + "').");
537                     return;
538                 }
539 
540                 ConnectionEndPoint point = new ConnectionEndPoint(location(),
541                                                                   stageName,
542                                                                   pointName,
543                                                                   assignment,
544                                                                   ConnectionPointType.Output);
545 
546                 connection.outputs.add(point);
547             }
548         }
549     }
550 }