1
2
3 package org.galagosearch.tupleflow.execution;
4
5 import java.io.Serializable;
6 import java.util.ArrayList;
7 import java.util.Collection;
8 import java.util.HashMap;
9 import java.util.HashSet;
10 import java.util.Map;
11 import java.util.TreeMap;
12
13 /***
14 * A Job object specifies a TupleFlow execution: the objects used, their parameters,
15 * and how they communicate. A Job can be specified in XML (and parsed with JobConstructor),
16 * or in code.
17 *
18 * @author trevor
19 */
20 public class Job implements Serializable {
21 public TreeMap<String, Stage> stages = new TreeMap<String, Stage>();
22 public ArrayList<Connection> connections = new ArrayList<Connection>();
23 public HashMap<String, ConnectionEndPoint> exports = new HashMap<String, ConnectionEndPoint>();
24 public HashMap<String, String> properties = new HashMap<String, String>();
25
26 private String orderString(String[] order) {
27 StringBuilder builder = new StringBuilder();
28 for (String o : order) {
29 if (builder.length() > 0) {
30 builder.append(" ");
31 }
32 builder.append(o);
33 }
34 return builder.toString();
35 }
36
37 /***
38 * Sometimes its convenient to specify a group of stages as its own job,
39 * with connections that flow between stages specified in the job. The
40 * add method allows you to add another job to this job. To avoid name
41 * conflicts, all stages in the job are renamed from <tt>stageName</tt>
42 * to <tt>jobName.stageName</tt>.
43 */
44 public void add(String jobName, Job group) {
45 assert this != group;
46
47 for (Stage s : group.stages.values()) {
48 Stage copy = s.clone();
49 copy.name = jobName + "." + s.name;
50 add(copy);
51 }
52
53 for (Connection c : group.connections) {
54 Connection copy = c.clone();
55
56 for (ConnectionEndPoint input : copy.inputs) {
57 input.setStageName(jobName + "." + input.getStageName());
58 }
59
60 for (ConnectionEndPoint output : copy.outputs) {
61 output.setStageName(jobName + "." + output.getStageName());
62 }
63
64 connections.add(copy);
65 }
66 }
67
68 /***
69 * Adds a stage to the current job.
70 */
71 public void add(Stage s) {
72 stages.put(s.name, s);
73 }
74
75 Map<String, Stage> findStagesWithPrefix(String prefix) {
76 Map<String, Stage> result;
77 if (stages.containsKey(prefix)) {
78 result = new HashMap<String, Stage>();
79 result.put(prefix, stages.get(prefix));
80 } else {
81 result = stages.subMap(prefix + '.', prefix + ('.' + 1));
82 }
83
84 return result;
85 }
86
87 public static class StagePoint implements Comparable<StagePoint> {
88 String stageName;
89 String pointName;
90 private StageConnectionPoint point;
91
92 public StagePoint(String stageName, String pointName) {
93 this(stageName, pointName, null);
94 }
95
96 public StagePoint(String stageName, String pointName, StageConnectionPoint point) {
97 this.stageName = stageName;
98 this.pointName = pointName;
99 this.point = point;
100 }
101
102 public boolean equals(StagePoint other) {
103 return stageName.equals(other.stageName) && pointName.equals(other.pointName);
104 }
105
106 public int compareTo(StagePoint other) {
107 int result = stageName.compareTo(other.stageName);
108 if (result != 0) {
109 return result;
110 }
111 return pointName.compareTo(other.pointName);
112 }
113
114 @Override
115 public int hashCode() {
116 return stageName.hashCode() + pointName.hashCode() * 3;
117 }
118
119 @Override
120 public boolean equals(Object obj) {
121 if (obj == null) {
122 return false;
123 }
124 if (getClass() != obj.getClass()) {
125 return false;
126 }
127
128 final StagePoint other = (StagePoint) obj;
129 if (this.stageName != other.stageName && (this.stageName == null || !this.stageName.
130 equals(other.stageName))) {
131 return false;
132 }
133 if (this.pointName != other.pointName && (this.pointName == null || !this.pointName.
134 equals(other.pointName))) {
135 return false;
136 }
137
138 return true;
139 }
140
141 public StageConnectionPoint getPoint() {
142 return point;
143 }
144
145 public void setPoint(StageConnectionPoint point) {
146 this.point = point;
147 }
148
149 @Override
150 public String toString() {
151 return String.format("%s:%s", stageName, pointName);
152 }
153 }
154
155 HashSet<StagePoint> extractStagePoints(Collection<Stage> allStages, ConnectionPointType type) {
156 HashSet<StagePoint> result = new HashSet<StagePoint>();
157
158 for (Stage s : allStages) {
159 for (Map.Entry<String, StageConnectionPoint> e : s.connections.entrySet()) {
160 String pointName = e.getKey();
161 String stageName = s.name;
162
163 if (e.getValue().type == type) {
164 result.add(new StagePoint(stageName, pointName, e.getValue()));
165 }
166 }
167 }
168
169 return result;
170 }
171
172 /***
173 * Connects outputs from stage sourceName to inputs from stage
174 * destinationName.
175 *
176 * Connect can make connections between any stage with the name
177 * sourceName, or that starts with sourceName (same goes for
178 * destinationName), which makes this particularly useful for
179 * making connections between sub-jobs.
180 */
181 public void connect(String sourceName, String destinationName, ConnectionAssignmentType assignment) {
182
183 Map<String, Stage> sources = findStagesWithPrefix(sourceName);
184 Map<String, Stage> destinations = findStagesWithPrefix(destinationName);
185
186
187 HashSet<StagePoint> outputs = extractStagePoints(sources.values(),
188 ConnectionPointType.Output);
189 HashSet<StagePoint> inputs = extractStagePoints(destinations.values(),
190 ConnectionPointType.Input);
191
192
193 for (Connection c : connections) {
194 for (ConnectionEndPoint p : c.outputs) {
195 StagePoint point = new StagePoint(p.getStageName(), p.getPointName());
196 inputs.remove(point);
197 }
198 }
199
200
201 HashMap<String, ArrayList<StagePoint>> outputMap = new HashMap<String, ArrayList<StagePoint>>();
202 for (StagePoint point : outputs) {
203 if (!outputMap.containsKey(point.pointName)) {
204 outputMap.put(point.pointName, new ArrayList<StagePoint>());
205 }
206 outputMap.get(point.pointName).add(point);
207 }
208
209 for (StagePoint destinationPoint : inputs) {
210 if (outputMap.containsKey(destinationPoint.pointName)) {
211 assert outputMap.get(destinationPoint.pointName).size() == 1;
212 StagePoint sourcePoint = outputMap.get(destinationPoint.pointName).get(0);
213
214 connect(sourcePoint, destinationPoint, assignment);
215 }
216 }
217 }
218
219 public void connect(StagePoint source, StagePoint destination, ConnectionAssignmentType assignment) {
220 int hashCount = -1;
221 String[] hashType = null;
222
223 if (assignment != ConnectionAssignmentType.Combined) {
224 hashType = source.point.getOrder();
225 }
226 connect(source, destination, assignment, hashType, hashCount);
227 }
228
229 public void connect(StagePoint source, StagePoint destination, ConnectionAssignmentType assignment, String[] hashType, int hashCount) {
230
231 Connection connection = null;
232
233 if (source.getPoint() == null) {
234 Stage sourceStage = stages.get(source.stageName);
235 StageConnectionPoint sourcePoint = sourceStage.getConnection(source.pointName);
236 source.point = sourcePoint;
237 }
238
239 for (Connection c : connections) {
240 if (c.inputs.size() < 1) {
241 continue;
242 }
243 ConnectionEndPoint connectionInput = c.inputs.get(0);
244
245 if (connectionInput.getPointName().equals(source.pointName) &&
246 connectionInput.getStageName().equals(source.stageName)) {
247 connection = c;
248 break;
249 }
250 }
251
252
253 if (connection == null) {
254 connection = new Connection(null, source.getPoint().getClassName(), source.getPoint().
255 getOrder(),
256 hashType,
257 hashCount);
258 ConnectionEndPoint input = new ConnectionEndPoint(null,
259 source.stageName,
260 source.pointName,
261 ConnectionPointType.Input);
262 connection.inputs.add(input);
263 connections.add(connection);
264 }
265
266 ConnectionEndPoint output = new ConnectionEndPoint(null,
267 destination.stageName,
268 destination.pointName,
269 assignment,
270 ConnectionPointType.Output);
271 connection.outputs.add(output);
272 }
273
274 /***
275 * Returns this job as a graph in the DOT language. This DOT text can be
276 * used with GraphViz (http://www.graphviz.org) to display a picture of
277 * the job.
278 */
279 public String toDotString() {
280 StringBuilder builder = new StringBuilder();
281 builder.append("digraph {\n");
282
283 for (Connection connection : connections) {
284 for (ConnectionEndPoint input : connection.inputs) {
285 for (ConnectionEndPoint output : connection.outputs) {
286 String edge = String.format(" %s -> %s [label=\"%s\"];\n",
287 input.getStageName(), output.getStageName(), connection.getName());
288 builder.append(edge);
289 }
290 }
291 }
292
293 for (Stage stage : stages.values()) {
294 builder.append(String.format(" %s;\n", stage.name));
295 }
296
297 builder.append("}\n");
298 return builder.toString();
299 }
300
301 @Override
302 public String toString() {
303 StringBuilder builder = new StringBuilder();
304 builder.append("<job>\n");
305
306
307 for (Map.Entry<String, String> entry : properties.entrySet()) {
308 builder.append(String.format(" <property name=\"%s\" value=\"%s\" />\n",
309 entry.getKey(), entry.getValue()));
310 }
311 builder.append("\n");
312
313
314 builder.append(" <connections>\n");
315 for (Connection connection : connections) {
316 if (connection.getHash() != null) {
317 String connectionHeader = String.format(
318 " <connection id=\"%s\" \n" +
319 " class=\"%s\" \n" +
320 " order=\"%s\" \n" +
321 " hash=\"%s\" \n" +
322 " hashCount=\"%d\"> \n",
323 connection.getName(),
324 connection.getClassName(),
325 orderString(connection.getOrder()),
326 orderString(connection.getHash()),
327 connection.getHashCount());
328 builder.append(connectionHeader);
329 } else {
330 String connectionHeader = String.format(
331 " <connection id=\"%s\" \n" +
332 " class=\"%s\" \n" +
333 " order=\"%s\"> \n",
334 connection.getName(),
335 connection.getClassName(),
336 orderString(connection.getOrder()));
337 builder.append(connectionHeader);
338 }
339
340 for (ConnectionEndPoint point : connection.inputs) {
341 String endPointString = String.format(
342 " <input stage=\"%s\" \n" +
343 " endpoint=\"%s\" /> \n",
344 point.getStageName(),
345 point.getPointName());
346 builder.append(endPointString);
347 }
348
349 for (ConnectionEndPoint point : connection.outputs) {
350 String endPointString = String.format(
351 " <output stage=\"%s\" \n" +
352 " endpoint=\"%s\" \n" +
353 " assignment=\"%s\" /> \n",
354 point.getStageName(),
355 point.getPointName(),
356 point.getAssignment());
357 builder.append(endPointString);
358 }
359
360 builder.append(" </connection>\n");
361 }
362 builder.append(" </connections>\n");
363 builder.append("\n");
364
365
366 builder.append(" <stages>\n");
367 for (Stage s : stages.values()) {
368 String stageHeader =
369 String.format(" <stage id=\"%s\">\n", s.name);
370 builder.append(stageHeader);
371
372 builder.append(" <connections>\n");
373
374 for (StageConnectionPoint point : s.connections.values()) {
375 String pointString = String.format(
376 " <%s id=\"%s\" as=\"%s\" class=\"%s\" order=\"%s\" />\n",
377 point.type, point.externalName, point.internalName, point.getClassName(),
378 orderString(point.getOrder()));
379 builder.append(pointString);
380 }
381
382 builder.append(" </connections>\n");
383 printSteps(builder, s.steps, "steps");
384
385 builder.append(" </stage>\n");
386 }
387 builder.append(" </stages>\n");
388
389 builder.append("</job>\n");
390 return builder.toString();
391 }
392
393 private void printSteps(final StringBuilder builder, final ArrayList<Step> steps, final String tag) {
394 builder.append(String.format(" <%s>\n", tag));
395 for (Step step : steps) {
396 if (step instanceof InputStep) {
397 InputStep input = (InputStep) step;
398 String line = String.format(" <input id=\"%s\" />\n", input.getId());
399 builder.append(line);
400 } else if (step instanceof OutputStep) {
401 OutputStep output = (OutputStep) step;
402 String line = String.format(" <output id=\"%s\" />\n", output.getId());
403 builder.append(line);
404 } else if (step instanceof MultiStep) {
405 MultiStep multi = (MultiStep) step;
406 builder.append(" <multi>\n");
407 for (ArrayList<Step> group : multi.groups) {
408 printSteps(builder, group, "group");
409 }
410 builder.append(" </multi>\n");
411 } else if (step.getParameters() == null || step.getParameters().isEmpty()) {
412 String stepHeader = String.format(" <step class=\"%s\" />\n", step.
413 getClassName());
414 builder.append(stepHeader);
415 } else {
416 String stepHeader = String.format(" <step class=\"%s\">\n", step.
417 getClassName());
418 builder.append(stepHeader);
419 String parametersString = step.getParameters().toString();
420
421
422 int start = parametersString.indexOf("<parameters>") + "<parameters>".length();
423 int end = parametersString.lastIndexOf("</parameters>");
424 parametersString = parametersString.substring(start, end);
425
426 builder.append(parametersString);
427 builder.append(" </step>\n");
428 }
429 }
430 builder.append(String.format(" </%s>\n", tag));
431 }
432 }
433