Coverage Report - org.galagosearch.tupleflow.execution.Job
 
Classes in this File Line Coverage Branch Coverage Complexity
Job
38%
66/174
30%
27/90
0
Job$StagePoint
25%
7/28
0%
0/22
0
 
 1  
 // BSD License (http://www.galagosearch.org/license)
 2  
 
 3  
 package org.galagosearch.tupleflow.execution;
 4  
 
 5  
 import java.io.Serializable;
 6  
 import java.util.ArrayList;
 7  
 import java.util.Collection;
 8  
 import java.util.HashMap;
 9  
 import java.util.HashSet;
 10  
 import java.util.Map;
 11  
 import java.util.TreeMap;
 12  
 
 13  
 /**
 14  
  * A Job object specifies a TupleFlow execution: the objects used, their parameters,
 15  
  * and how they communicate.  A Job can be specified in XML (and parsed with JobConstructor),
 16  
  * or in code.
 17  
  * 
 18  
  * @author trevor
 19  
  */
 20  4
 public class Job implements Serializable {
 21  2
     public TreeMap<String, Stage> stages = new TreeMap<String, Stage>();
 22  2
     public ArrayList<Connection> connections = new ArrayList<Connection>();
 23  2
     public HashMap<String, ConnectionEndPoint> exports = new HashMap<String, ConnectionEndPoint>();
 24  2
     public HashMap<String, String> properties = new HashMap<String, String>();
 25  
 
 26  
     private String orderString(String[] order) {
 27  0
         StringBuilder builder = new StringBuilder();
 28  0
         for (String o : order) {
 29  0
             if (builder.length() > 0) {
 30  0
                 builder.append(" ");
 31  
             }
 32  0
             builder.append(o);
 33  
         }
 34  0
         return builder.toString();
 35  
     }
 36  
 
 37  
     /**
 38  
      * Sometimes its convenient to specify a group of stages as its own job,
 39  
      * with connections that flow between stages specified in the job.  The
 40  
      * add method allows you to add another job to this job.  To avoid name
 41  
      * conflicts, all stages in the job are renamed from <tt>stageName</tt>
 42  
      * to <tt>jobName.stageName</tt>.
 43  
      */
 44  
     public void add(String jobName, Job group) {
 45  0
         assert this != group;
 46  
 
 47  0
         for (Stage s : group.stages.values()) {
 48  0
             Stage copy = s.clone();
 49  0
             copy.name = jobName + "." + s.name;
 50  0
             add(copy);
 51  0
         }
 52  
 
 53  0
         for (Connection c : group.connections) {
 54  0
             Connection copy = c.clone();
 55  
 
 56  0
             for (ConnectionEndPoint input : copy.inputs) {
 57  0
                 input.setStageName(jobName + "." + input.getStageName());
 58  
             }
 59  
 
 60  0
             for (ConnectionEndPoint output : copy.outputs) {
 61  0
                 output.setStageName(jobName + "." + output.getStageName());
 62  
             }
 63  
 
 64  0
             connections.add(copy);
 65  0
         }
 66  0
     }
 67  
 
 68  
     /**
 69  
      * Adds a stage to the current job.
 70  
      */
 71  
     public void add(Stage s) {
 72  4
         stages.put(s.name, s);
 73  4
     }
 74  
 
 75  
     Map<String, Stage> findStagesWithPrefix(String prefix) {
 76  
         Map<String, Stage> result;
 77  4
         if (stages.containsKey(prefix)) {
 78  4
             result = new HashMap<String, Stage>();
 79  4
             result.put(prefix, stages.get(prefix));
 80  
         } else {
 81  0
             result = stages.subMap(prefix + '.', prefix + ('.' + 1));
 82  
         }
 83  
 
 84  4
         return result;
 85  
     }
 86  
 
 87  2
     public static class StagePoint implements Comparable<StagePoint> {
 88  
         String stageName;
 89  
         String pointName;
 90  
         private StageConnectionPoint point;
 91  
 
 92  
         public StagePoint(String stageName, String pointName) {
 93  0
             this(stageName, pointName, null);
 94  0
         }
 95  
 
 96  4
         public StagePoint(String stageName, String pointName, StageConnectionPoint point) {
 97  4
             this.stageName = stageName;
 98  4
             this.pointName = pointName;
 99  4
             this.point = point;
 100  4
         }
 101  
 
 102  
         public boolean equals(StagePoint other) {
 103  0
             return stageName.equals(other.stageName) && pointName.equals(other.pointName);
 104  
         }
 105  
 
 106  
         public int compareTo(StagePoint other) {
 107  0
             int result = stageName.compareTo(other.stageName);
 108  0
             if (result != 0) {
 109  0
                 return result;
 110  
             }
 111  0
             return pointName.compareTo(other.pointName);
 112  
         }
 113  
 
 114  
         @Override
 115  
         public int hashCode() {
 116  4
             return stageName.hashCode() + pointName.hashCode() * 3;
 117  
         }
 118  
 
 119  
         @Override
 120  
         public boolean equals(Object obj) {
 121  0
             if (obj == null) {
 122  0
                 return false;
 123  
             }
 124  0
             if (getClass() != obj.getClass()) {
 125  0
                 return false;
 126  
             }
 127  
 
 128  0
             final StagePoint other = (StagePoint) obj;
 129  0
             if (this.stageName != other.stageName && (this.stageName == null || !this.stageName.
 130  
                     equals(other.stageName))) {
 131  0
                 return false;
 132  
             }
 133  0
             if (this.pointName != other.pointName && (this.pointName == null || !this.pointName.
 134  
                     equals(other.pointName))) {
 135  0
                 return false;
 136  
             }
 137  
 
 138  0
             return true;
 139  
         }
 140  
 
 141  
         public StageConnectionPoint getPoint() {
 142  6
             return point;
 143  
         }
 144  
 
 145  
         public void setPoint(StageConnectionPoint point) {
 146  0
             this.point = point;
 147  0
         }
 148  
 
 149  
         @Override
 150  
         public String toString() {
 151  0
             return String.format("%s:%s", stageName, pointName);
 152  
         }
 153  
     }
 154  
 
 155  
     HashSet<StagePoint> extractStagePoints(Collection<Stage> allStages, ConnectionPointType type) {
 156  4
         HashSet<StagePoint> result = new HashSet<StagePoint>();
 157  
 
 158  4
         for (Stage s : allStages) {
 159  4
             for (Map.Entry<String, StageConnectionPoint> e : s.connections.entrySet()) {
 160  4
                 String pointName = e.getKey();
 161  4
                 String stageName = s.name;
 162  
 
 163  4
                 if (e.getValue().type == type) {
 164  4
                     result.add(new StagePoint(stageName, pointName, e.getValue()));
 165  
                 }
 166  4
             }
 167  
         }
 168  
 
 169  4
         return result;
 170  
     }
 171  
 
 172  
     /**
 173  
      * Connects outputs from stage sourceName to inputs from stage
 174  
      * destinationName.
 175  
      * 
 176  
      * Connect can make connections between any stage with the name
 177  
      * sourceName, or that starts with sourceName (same goes for
 178  
      * destinationName), which makes this particularly useful for 
 179  
      * making connections between sub-jobs.
 180  
      */
 181  
     public void connect(String sourceName, String destinationName, ConnectionAssignmentType assignment) {
 182  
         // scan the stages, looking for sources
 183  2
         Map<String, Stage> sources = findStagesWithPrefix(sourceName);
 184  2
         Map<String, Stage> destinations = findStagesWithPrefix(destinationName);
 185  
 
 186  
         // find all inputs and outputs in these stages
 187  2
         HashSet<StagePoint> outputs = extractStagePoints(sources.values(),
 188  
                                                          ConnectionPointType.Output);
 189  2
         HashSet<StagePoint> inputs = extractStagePoints(destinations.values(),
 190  
                                                         ConnectionPointType.Input);
 191  
 
 192  
         // remove any inputs that are already referenced in job connections
 193  2
         for (Connection c : connections) {
 194  0
             for (ConnectionEndPoint p : c.outputs) {
 195  0
                 StagePoint point = new StagePoint(p.getStageName(), p.getPointName());
 196  0
                 inputs.remove(point);
 197  0
             }
 198  
         }
 199  
 
 200  
         // now we have a list of all dangling inputs.  try to match them with outputs
 201  2
         HashMap<String, ArrayList<StagePoint>> outputMap = new HashMap<String, ArrayList<StagePoint>>();
 202  2
         for (StagePoint point : outputs) {
 203  2
             if (!outputMap.containsKey(point.pointName)) {
 204  2
                 outputMap.put(point.pointName, new ArrayList<StagePoint>());
 205  
             }
 206  2
             outputMap.get(point.pointName).add(point);
 207  
         }
 208  
 
 209  2
         for (StagePoint destinationPoint : inputs) {
 210  2
             if (outputMap.containsKey(destinationPoint.pointName)) {
 211  2
                 assert outputMap.get(destinationPoint.pointName).size() == 1;
 212  2
                 StagePoint sourcePoint = outputMap.get(destinationPoint.pointName).get(0);
 213  
 
 214  2
                 connect(sourcePoint, destinationPoint, assignment);
 215  2
             }
 216  
         }
 217  2
     }
 218  
 
 219  
     public void connect(StagePoint source, StagePoint destination, ConnectionAssignmentType assignment) {
 220  2
         int hashCount = -1;
 221  2
         String[] hashType = null;
 222  
 
 223  2
         if (assignment != ConnectionAssignmentType.Combined) {
 224  0
             hashType = source.point.getOrder();
 225  
         }
 226  2
         connect(source, destination, assignment, hashType, hashCount);
 227  2
     }
 228  
 
 229  
     public void connect(StagePoint source, StagePoint destination, ConnectionAssignmentType assignment, String[] hashType, int hashCount) {
 230  
         // first, try to find a usable connection
 231  2
         Connection connection = null;
 232  
 
 233  2
         if (source.getPoint() == null) {
 234  0
             Stage sourceStage = stages.get(source.stageName);
 235  0
             StageConnectionPoint sourcePoint = sourceStage.getConnection(source.pointName);
 236  0
             source.point = sourcePoint;
 237  
         }
 238  
 
 239  2
         for (Connection c : connections) {
 240  0
             if (c.inputs.size() < 1) {
 241  0
                 continue;
 242  
             }
 243  0
             ConnectionEndPoint connectionInput = c.inputs.get(0);
 244  
 
 245  0
             if (connectionInput.getPointName().equals(source.pointName) &&
 246  
                     connectionInput.getStageName().equals(source.stageName)) {
 247  0
                 connection = c;
 248  0
                 break;
 249  
             }
 250  0
         }
 251  
 
 252  
         // coudln't find a connection that has this input, so we'll make one
 253  2
         if (connection == null) {
 254  2
             connection = new Connection(null, source.getPoint().getClassName(), source.getPoint().
 255  
                                         getOrder(),
 256  
                                         hashType,
 257  
                                         hashCount);
 258  2
             ConnectionEndPoint input = new ConnectionEndPoint(null,
 259  
                                                               source.stageName,
 260  
                                                               source.pointName,
 261  
                                                               ConnectionPointType.Input);
 262  2
             connection.inputs.add(input);
 263  2
             connections.add(connection);
 264  
         }
 265  
 
 266  2
         ConnectionEndPoint output = new ConnectionEndPoint(null,
 267  
                                                            destination.stageName,
 268  
                                                            destination.pointName,
 269  
                                                            assignment,
 270  
                                                            ConnectionPointType.Output);
 271  2
         connection.outputs.add(output);
 272  2
     }
 273  
 
 274  
     /**
 275  
      * Returns this job as a graph in the DOT language.  This DOT text can be
 276  
      * used with GraphViz (http://www.graphviz.org) to display a picture of
 277  
      * the job.
 278  
      */
 279  
     public String toDotString() {
 280  2
         StringBuilder builder = new StringBuilder();
 281  2
         builder.append("digraph {\n");
 282  
 
 283  2
         for (Connection connection : connections) {
 284  2
             for (ConnectionEndPoint input : connection.inputs) {
 285  2
                 for (ConnectionEndPoint output : connection.outputs) {
 286  2
                     String edge = String.format("  %s -> %s [label=\"%s\"];\n",
 287  
                             input.getStageName(), output.getStageName(), connection.getName());
 288  2
                     builder.append(edge);
 289  2
                 }
 290  
             }
 291  
         }
 292  
 
 293  2
         for (Stage stage : stages.values()) {
 294  4
             builder.append(String.format("  %s;\n", stage.name));
 295  
         }
 296  
 
 297  2
         builder.append("}\n");
 298  2
         return builder.toString();
 299  
     }
 300  
 
 301  
     @Override
 302  
     public String toString() {
 303  0
         StringBuilder builder = new StringBuilder();
 304  0
         builder.append("<job>\n");
 305  
 
 306  
         // Properties block
 307  0
         for (Map.Entry<String, String> entry : properties.entrySet()) {
 308  0
             builder.append(String.format("    <property name=\"%s\" value=\"%s\" />\n",
 309  
                                          entry.getKey(), entry.getValue()));
 310  
         }
 311  0
         builder.append("\n");
 312  
 
 313  
         // Connections block
 314  0
         builder.append("    <connections>\n");
 315  0
         for (Connection connection : connections) {
 316  0
             if (connection.getHash() != null) {
 317  0
                 String connectionHeader = String.format(
 318  
                         "        <connection id=\"%s\"           \n" +
 319  
                         "                    class=\"%s\"        \n" +
 320  
                         "                    order=\"%s\"        \n" +
 321  
                         "                    hash=\"%s\"         \n" +
 322  
                         "                    hashCount=\"%d\">   \n",
 323  
                         connection.getName(),
 324  
                         connection.getClassName(),
 325  
                         orderString(connection.getOrder()),
 326  
                         orderString(connection.getHash()),
 327  
                         connection.getHashCount());
 328  0
                 builder.append(connectionHeader);
 329  0
             } else {
 330  0
                 String connectionHeader = String.format(
 331  
                         "        <connection id=\"%s\"         \n" +
 332  
                         "                    class=\"%s\"        \n" +
 333  
                         "                    order=\"%s\">       \n",
 334  
                         connection.getName(),
 335  
                         connection.getClassName(),
 336  
                         orderString(connection.getOrder()));
 337  0
                 builder.append(connectionHeader);
 338  
             }
 339  
 
 340  0
             for (ConnectionEndPoint point : connection.inputs) {
 341  0
                 String endPointString = String.format(
 342  
                         "            <input stage=\"%s\"         \n" +
 343  
                         "                   endpoint=\"%s\" />   \n",
 344  
                         point.getStageName(),
 345  
                         point.getPointName());
 346  0
                 builder.append(endPointString);
 347  0
             }
 348  
 
 349  0
             for (ConnectionEndPoint point : connection.outputs) {
 350  0
                 String endPointString = String.format(
 351  
                         "            <output stage=\"%s\"         \n" +
 352  
                         "                    endpoint=\"%s\"      \n" +
 353  
                         "                    assignment=\"%s\" /> \n",
 354  
                         point.getStageName(),
 355  
                         point.getPointName(),
 356  
                         point.getAssignment());
 357  0
                 builder.append(endPointString);
 358  0
             }
 359  
 
 360  0
             builder.append("        </connection>\n");
 361  
         }
 362  0
         builder.append("    </connections>\n");
 363  0
         builder.append("\n");
 364  
 
 365  
         // Stages block
 366  0
         builder.append("    <stages>\n");
 367  0
         for (Stage s : stages.values()) {
 368  0
             String stageHeader =
 369  
                     String.format("        <stage id=\"%s\">\n", s.name);
 370  0
             builder.append(stageHeader);
 371  
 
 372  0
             builder.append("            <connections>\n");
 373  
 
 374  0
             for (StageConnectionPoint point : s.connections.values()) {
 375  0
                 String pointString = String.format(
 376  
                         "                <%s id=\"%s\" as=\"%s\" class=\"%s\" order=\"%s\" />\n",
 377  
                         point.type, point.externalName, point.internalName, point.getClassName(),
 378  
                         orderString(point.getOrder()));
 379  0
                 builder.append(pointString);
 380  0
             }
 381  
 
 382  0
             builder.append("            </connections>\n");
 383  0
             printSteps(builder, s.steps, "steps");
 384  
 
 385  0
             builder.append("        </stage>\n");
 386  0
         }
 387  0
         builder.append("    </stages>\n");
 388  
 
 389  0
         builder.append("</job>\n");
 390  0
         return builder.toString();
 391  
     }
 392  
 
 393  
     private void printSteps(final StringBuilder builder, final ArrayList<Step> steps, final String tag) {
 394  0
         builder.append(String.format("            <%s>\n", tag));
 395  0
         for (Step step : steps) {
 396  0
             if (step instanceof InputStep) {
 397  0
                 InputStep input = (InputStep) step;
 398  0
                 String line = String.format("                <input id=\"%s\" />\n", input.getId());
 399  0
                 builder.append(line);
 400  0
             } else if (step instanceof OutputStep) {
 401  0
                 OutputStep output = (OutputStep) step;
 402  0
                 String line = String.format("                <output id=\"%s\" />\n", output.getId());
 403  0
                 builder.append(line);
 404  0
             } else if (step instanceof MultiStep) {
 405  0
                 MultiStep multi = (MultiStep) step;
 406  0
                 builder.append("                <multi>\n");
 407  0
                 for (ArrayList<Step> group : multi.groups) {
 408  0
                     printSteps(builder, group, "group");
 409  
                 }
 410  0
                 builder.append("                </multi>\n");
 411  0
             } else if (step.getParameters() == null || step.getParameters().isEmpty()) {
 412  0
                 String stepHeader = String.format("                <step class=\"%s\" />\n", step.
 413  
                                                   getClassName());
 414  0
                 builder.append(stepHeader);
 415  0
             } else {
 416  0
                 String stepHeader = String.format("                <step class=\"%s\">\n", step.
 417  
                                                   getClassName());
 418  0
                 builder.append(stepHeader);
 419  0
                 String parametersString = step.getParameters().toString();
 420  
 
 421  
                 // strip out the beginning and end parts
 422  0
                 int start = parametersString.indexOf("<parameters>") + "<parameters>".length();
 423  0
                 int end = parametersString.lastIndexOf("</parameters>");
 424  0
                 parametersString = parametersString.substring(start, end);
 425  
 
 426  0
                 builder.append(parametersString);
 427  0
                 builder.append("                </step>\n");
 428  0
             }
 429  
         }
 430  0
         builder.append(String.format("                </%s>\n", tag));
 431  0
     }
 432  
 }
 433