View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   
3   package org.galagosearch.tupleflow.execution;
4   
5   import java.io.Serializable;
6   import java.util.ArrayList;
7   import java.util.Collection;
8   import java.util.HashMap;
9   import java.util.HashSet;
10  import java.util.Map;
11  import java.util.TreeMap;
12  
13  /***
14   * A Job object specifies a TupleFlow execution: the objects used, their parameters,
15   * and how they communicate.  A Job can be specified in XML (and parsed with JobConstructor),
16   * or in code.
17   * 
18   * @author trevor
19   */
20  public class Job implements Serializable {
21      public TreeMap<String, Stage> stages = new TreeMap<String, Stage>();
22      public ArrayList<Connection> connections = new ArrayList<Connection>();
23      public HashMap<String, ConnectionEndPoint> exports = new HashMap<String, ConnectionEndPoint>();
24      public HashMap<String, String> properties = new HashMap<String, String>();
25  
26      private String orderString(String[] order) {
27          StringBuilder builder = new StringBuilder();
28          for (String o : order) {
29              if (builder.length() > 0) {
30                  builder.append(" ");
31              }
32              builder.append(o);
33          }
34          return builder.toString();
35      }
36  
37      /***
38       * Sometimes its convenient to specify a group of stages as its own job,
39       * with connections that flow between stages specified in the job.  The
40       * add method allows you to add another job to this job.  To avoid name
41       * conflicts, all stages in the job are renamed from <tt>stageName</tt>
42       * to <tt>jobName.stageName</tt>.
43       */
44      public void add(String jobName, Job group) {
45          assert this != group;
46  
47          for (Stage s : group.stages.values()) {
48              Stage copy = s.clone();
49              copy.name = jobName + "." + s.name;
50              add(copy);
51          }
52  
53          for (Connection c : group.connections) {
54              Connection copy = c.clone();
55  
56              for (ConnectionEndPoint input : copy.inputs) {
57                  input.setStageName(jobName + "." + input.getStageName());
58              }
59  
60              for (ConnectionEndPoint output : copy.outputs) {
61                  output.setStageName(jobName + "." + output.getStageName());
62              }
63  
64              connections.add(copy);
65          }
66      }
67  
68      /***
69       * Adds a stage to the current job.
70       */
71      public void add(Stage s) {
72          stages.put(s.name, s);
73      }
74  
75      Map<String, Stage> findStagesWithPrefix(String prefix) {
76          Map<String, Stage> result;
77          if (stages.containsKey(prefix)) {
78              result = new HashMap<String, Stage>();
79              result.put(prefix, stages.get(prefix));
80          } else {
81              result = stages.subMap(prefix + '.', prefix + ('.' + 1));
82          }
83  
84          return result;
85      }
86  
87      public static class StagePoint implements Comparable<StagePoint> {
88          String stageName;
89          String pointName;
90          private StageConnectionPoint point;
91  
92          public StagePoint(String stageName, String pointName) {
93              this(stageName, pointName, null);
94          }
95  
96          public StagePoint(String stageName, String pointName, StageConnectionPoint point) {
97              this.stageName = stageName;
98              this.pointName = pointName;
99              this.point = point;
100         }
101 
102         public boolean equals(StagePoint other) {
103             return stageName.equals(other.stageName) && pointName.equals(other.pointName);
104         }
105 
106         public int compareTo(StagePoint other) {
107             int result = stageName.compareTo(other.stageName);
108             if (result != 0) {
109                 return result;
110             }
111             return pointName.compareTo(other.pointName);
112         }
113 
114         @Override
115         public int hashCode() {
116             return stageName.hashCode() + pointName.hashCode() * 3;
117         }
118 
119         @Override
120         public boolean equals(Object obj) {
121             if (obj == null) {
122                 return false;
123             }
124             if (getClass() != obj.getClass()) {
125                 return false;
126             }
127 
128             final StagePoint other = (StagePoint) obj;
129             if (this.stageName != other.stageName && (this.stageName == null || !this.stageName.
130                     equals(other.stageName))) {
131                 return false;
132             }
133             if (this.pointName != other.pointName && (this.pointName == null || !this.pointName.
134                     equals(other.pointName))) {
135                 return false;
136             }
137 
138             return true;
139         }
140 
141         public StageConnectionPoint getPoint() {
142             return point;
143         }
144 
145         public void setPoint(StageConnectionPoint point) {
146             this.point = point;
147         }
148 
149         @Override
150         public String toString() {
151             return String.format("%s:%s", stageName, pointName);
152         }
153     }
154 
155     HashSet<StagePoint> extractStagePoints(Collection<Stage> allStages, ConnectionPointType type) {
156         HashSet<StagePoint> result = new HashSet<StagePoint>();
157 
158         for (Stage s : allStages) {
159             for (Map.Entry<String, StageConnectionPoint> e : s.connections.entrySet()) {
160                 String pointName = e.getKey();
161                 String stageName = s.name;
162 
163                 if (e.getValue().type == type) {
164                     result.add(new StagePoint(stageName, pointName, e.getValue()));
165                 }
166             }
167         }
168 
169         return result;
170     }
171 
172     /***
173      * Connects outputs from stage sourceName to inputs from stage
174      * destinationName.
175      * 
176      * Connect can make connections between any stage with the name
177      * sourceName, or that starts with sourceName (same goes for
178      * destinationName), which makes this particularly useful for 
179      * making connections between sub-jobs.
180      */
181     public void connect(String sourceName, String destinationName, ConnectionAssignmentType assignment) {
182         // scan the stages, looking for sources
183         Map<String, Stage> sources = findStagesWithPrefix(sourceName);
184         Map<String, Stage> destinations = findStagesWithPrefix(destinationName);
185 
186         // find all inputs and outputs in these stages
187         HashSet<StagePoint> outputs = extractStagePoints(sources.values(),
188                                                          ConnectionPointType.Output);
189         HashSet<StagePoint> inputs = extractStagePoints(destinations.values(),
190                                                         ConnectionPointType.Input);
191 
192         // remove any inputs that are already referenced in job connections
193         for (Connection c : connections) {
194             for (ConnectionEndPoint p : c.outputs) {
195                 StagePoint point = new StagePoint(p.getStageName(), p.getPointName());
196                 inputs.remove(point);
197             }
198         }
199 
200         // now we have a list of all dangling inputs.  try to match them with outputs
201         HashMap<String, ArrayList<StagePoint>> outputMap = new HashMap<String, ArrayList<StagePoint>>();
202         for (StagePoint point : outputs) {
203             if (!outputMap.containsKey(point.pointName)) {
204                 outputMap.put(point.pointName, new ArrayList<StagePoint>());
205             }
206             outputMap.get(point.pointName).add(point);
207         }
208 
209         for (StagePoint destinationPoint : inputs) {
210             if (outputMap.containsKey(destinationPoint.pointName)) {
211                 assert outputMap.get(destinationPoint.pointName).size() == 1;
212                 StagePoint sourcePoint = outputMap.get(destinationPoint.pointName).get(0);
213 
214                 connect(sourcePoint, destinationPoint, assignment);
215             }
216         }
217     }
218 
219     public void connect(StagePoint source, StagePoint destination, ConnectionAssignmentType assignment) {
220         int hashCount = -1;
221         String[] hashType = null;
222 
223         if (assignment != ConnectionAssignmentType.Combined) {
224             hashType = source.point.getOrder();
225         }
226         connect(source, destination, assignment, hashType, hashCount);
227     }
228 
229     public void connect(StagePoint source, StagePoint destination, ConnectionAssignmentType assignment, String[] hashType, int hashCount) {
230         // first, try to find a usable connection
231         Connection connection = null;
232 
233         if (source.getPoint() == null) {
234             Stage sourceStage = stages.get(source.stageName);
235             StageConnectionPoint sourcePoint = sourceStage.getConnection(source.pointName);
236             source.point = sourcePoint;
237         }
238 
239         for (Connection c : connections) {
240             if (c.inputs.size() < 1) {
241                 continue;
242             }
243             ConnectionEndPoint connectionInput = c.inputs.get(0);
244 
245             if (connectionInput.getPointName().equals(source.pointName) &&
246                     connectionInput.getStageName().equals(source.stageName)) {
247                 connection = c;
248                 break;
249             }
250         }
251 
252         // coudln't find a connection that has this input, so we'll make one
253         if (connection == null) {
254             connection = new Connection(null, source.getPoint().getClassName(), source.getPoint().
255                                         getOrder(),
256                                         hashType,
257                                         hashCount);
258             ConnectionEndPoint input = new ConnectionEndPoint(null,
259                                                               source.stageName,
260                                                               source.pointName,
261                                                               ConnectionPointType.Input);
262             connection.inputs.add(input);
263             connections.add(connection);
264         }
265 
266         ConnectionEndPoint output = new ConnectionEndPoint(null,
267                                                            destination.stageName,
268                                                            destination.pointName,
269                                                            assignment,
270                                                            ConnectionPointType.Output);
271         connection.outputs.add(output);
272     }
273 
274     /***
275      * Returns this job as a graph in the DOT language.  This DOT text can be
276      * used with GraphViz (http://www.graphviz.org) to display a picture of
277      * the job.
278      */
279     public String toDotString() {
280         StringBuilder builder = new StringBuilder();
281         builder.append("digraph {\n");
282 
283         for (Connection connection : connections) {
284             for (ConnectionEndPoint input : connection.inputs) {
285                 for (ConnectionEndPoint output : connection.outputs) {
286                     String edge = String.format("  %s -> %s [label=\"%s\"];\n",
287                             input.getStageName(), output.getStageName(), connection.getName());
288                     builder.append(edge);
289                 }
290             }
291         }
292 
293         for (Stage stage : stages.values()) {
294             builder.append(String.format("  %s;\n", stage.name));
295         }
296 
297         builder.append("}\n");
298         return builder.toString();
299     }
300 
301     @Override
302     public String toString() {
303         StringBuilder builder = new StringBuilder();
304         builder.append("<job>\n");
305 
306         // Properties block
307         for (Map.Entry<String, String> entry : properties.entrySet()) {
308             builder.append(String.format("    <property name=\"%s\" value=\"%s\" />\n",
309                                          entry.getKey(), entry.getValue()));
310         }
311         builder.append("\n");
312 
313         // Connections block
314         builder.append("    <connections>\n");
315         for (Connection connection : connections) {
316             if (connection.getHash() != null) {
317                 String connectionHeader = String.format(
318                         "        <connection id=\"%s\"           \n" +
319                         "                    class=\"%s\"        \n" +
320                         "                    order=\"%s\"        \n" +
321                         "                    hash=\"%s\"         \n" +
322                         "                    hashCount=\"%d\">   \n",
323                         connection.getName(),
324                         connection.getClassName(),
325                         orderString(connection.getOrder()),
326                         orderString(connection.getHash()),
327                         connection.getHashCount());
328                 builder.append(connectionHeader);
329             } else {
330                 String connectionHeader = String.format(
331                         "        <connection id=\"%s\"         \n" +
332                         "                    class=\"%s\"        \n" +
333                         "                    order=\"%s\">       \n",
334                         connection.getName(),
335                         connection.getClassName(),
336                         orderString(connection.getOrder()));
337                 builder.append(connectionHeader);
338             }
339 
340             for (ConnectionEndPoint point : connection.inputs) {
341                 String endPointString = String.format(
342                         "            <input stage=\"%s\"         \n" +
343                         "                   endpoint=\"%s\" />   \n",
344                         point.getStageName(),
345                         point.getPointName());
346                 builder.append(endPointString);
347             }
348 
349             for (ConnectionEndPoint point : connection.outputs) {
350                 String endPointString = String.format(
351                         "            <output stage=\"%s\"         \n" +
352                         "                    endpoint=\"%s\"      \n" +
353                         "                    assignment=\"%s\" /> \n",
354                         point.getStageName(),
355                         point.getPointName(),
356                         point.getAssignment());
357                 builder.append(endPointString);
358             }
359 
360             builder.append("        </connection>\n");
361         }
362         builder.append("    </connections>\n");
363         builder.append("\n");
364 
365         // Stages block
366         builder.append("    <stages>\n");
367         for (Stage s : stages.values()) {
368             String stageHeader =
369                     String.format("        <stage id=\"%s\">\n", s.name);
370             builder.append(stageHeader);
371 
372             builder.append("            <connections>\n");
373 
374             for (StageConnectionPoint point : s.connections.values()) {
375                 String pointString = String.format(
376                         "                <%s id=\"%s\" as=\"%s\" class=\"%s\" order=\"%s\" />\n",
377                         point.type, point.externalName, point.internalName, point.getClassName(),
378                         orderString(point.getOrder()));
379                 builder.append(pointString);
380             }
381 
382             builder.append("            </connections>\n");
383             printSteps(builder, s.steps, "steps");
384 
385             builder.append("        </stage>\n");
386         }
387         builder.append("    </stages>\n");
388 
389         builder.append("</job>\n");
390         return builder.toString();
391     }
392 
393     private void printSteps(final StringBuilder builder, final ArrayList<Step> steps, final String tag) {
394         builder.append(String.format("            <%s>\n", tag));
395         for (Step step : steps) {
396             if (step instanceof InputStep) {
397                 InputStep input = (InputStep) step;
398                 String line = String.format("                <input id=\"%s\" />\n", input.getId());
399                 builder.append(line);
400             } else if (step instanceof OutputStep) {
401                 OutputStep output = (OutputStep) step;
402                 String line = String.format("                <output id=\"%s\" />\n", output.getId());
403                 builder.append(line);
404             } else if (step instanceof MultiStep) {
405                 MultiStep multi = (MultiStep) step;
406                 builder.append("                <multi>\n");
407                 for (ArrayList<Step> group : multi.groups) {
408                     printSteps(builder, group, "group");
409                 }
410                 builder.append("                </multi>\n");
411             } else if (step.getParameters() == null || step.getParameters().isEmpty()) {
412                 String stepHeader = String.format("                <step class=\"%s\" />\n", step.
413                                                   getClassName());
414                 builder.append(stepHeader);
415             } else {
416                 String stepHeader = String.format("                <step class=\"%s\">\n", step.
417                                                   getClassName());
418                 builder.append(stepHeader);
419                 String parametersString = step.getParameters().toString();
420 
421                 // strip out the beginning and end parts
422                 int start = parametersString.indexOf("<parameters>") + "<parameters>".length();
423                 int end = parametersString.lastIndexOf("</parameters>");
424                 parametersString = parametersString.substring(start, end);
425 
426                 builder.append(parametersString);
427                 builder.append("                </step>\n");
428             }
429         }
430         builder.append(String.format("                </%s>\n", tag));
431     }
432 }
433