View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   
3   package org.galagosearch.tupleflow.execution;
4   
5   import java.util.ArrayList;
6   import java.util.HashMap;
7   import java.util.List;
8   import java.util.Map;
9   import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeInput;
10  import org.galagosearch.tupleflow.execution.StageInstanceDescription.PipeOutput;
11  
12  /***
13   *
14   * @author trevor
15   */
16  public class StageGroupDescription {
17      /// A stage object (probably genereated by the JobConstructor parser)
18      Stage stage;
19      
20      public HashMap<String, DataPipeRegion> inputs;
21      public HashMap<String, DataPipeRegion> outputs;
22      
23      /// Count of instances of this stage
24      public int instanceCount;
25  
26      /// URL of the Master for this job.
27      String masterURL;
28      
29      public static class DataPipeRegion {
30          DataPipe pipe;
31          int start;
32          int end;
33          ConnectionPointType direction;
34          
35          public DataPipeRegion(DataPipe pipe, int start, int end, ConnectionPointType direction) {
36              this.pipe = pipe;
37              this.start = start;
38              this.end = end;
39              this.direction = direction;
40          }
41          
42          public int fileCount() {
43              int count = 0;
44              
45              for(int i=start; i<end; i++) {
46                  String[] filenames = pipe.getOutputFileNames(i);
47                  count += filenames.length;
48              }
49              
50              return count;
51          }
52      }
53      
54      public StageGroupDescription(Stage stage) {
55          this(stage, 1, "");
56      }
57      
58      /*** Creates a new instance of StageGroupDescription */
59      public StageGroupDescription(Stage stage, int instanceCount, String masterURL) {
60          this.stage = stage;
61          this.inputs = new HashMap<String, DataPipeRegion>();
62          this.outputs = new HashMap<String, DataPipeRegion>();
63          this.instanceCount = instanceCount;
64          this.masterURL = masterURL;
65      }
66      
67      public String getName() {
68          return stage.name;
69      }
70  
71      public String getMasterURL() {
72          return masterURL;
73      }
74  
75      public void setMasterURL(String masterURL) {
76          this.masterURL = masterURL;
77      }
78  
79      public boolean containsInput(String name) {
80          return inputs.containsKey(name);
81      }
82      
83      public boolean containsOutput(String name) {
84          return outputs.containsKey(name);
85      }
86  
87      public Stage getStage() {
88          return stage;
89      }
90  
91      public int getInstanceCount() {
92          return instanceCount;
93      }
94  
95      @Override
96      public String toString() {
97          return stage.toString();
98      }
99  
100 
101     public List<StageInstanceDescription> getInstances() {
102         ArrayList<StageInstanceDescription> instances = new ArrayList();
103         
104         for(int i=0; i<instanceCount; i++) {
105             Map<String, PipeInput> instanceOutputs = new HashMap<String, PipeInput>();
106             
107             for(String key : outputs.keySet()) {
108                 DataPipeRegion region = outputs.get(key);
109                 
110                 if(region.end - region.start <= 1) {
111                     instanceOutputs.put(key, new PipeInput(region.pipe, 0));
112                 } else {
113                     assert region.end - region.start == instanceCount;
114                     instanceOutputs.put(key, new PipeInput(region.pipe, region.start + i));
115                 }
116             }
117             
118             Map<String, PipeOutput> instanceInputs = new HashMap<String, PipeOutput>();
119 
120             for(String key : inputs.keySet()) {
121                 DataPipeRegion region = inputs.get(key);
122                 
123                 if(region.end - region.start <= 1) {
124                     instanceInputs.put(key, new PipeOutput(region.pipe, 0));
125                 } else if(instanceCount == 1 && region.end - region.start > 1) {
126                     // assignment == "combined"
127                     instanceInputs.put(key, new PipeOutput(region.pipe, region.start, region.end));
128                 } else {
129                     assert region.end - region.start == instanceCount;
130                     instanceInputs.put(key, new PipeOutput(region.pipe, region.start + i));
131                 }
132             }
133 
134             StageInstanceDescription instance =
135                     new StageInstanceDescription(
136                             stage, i, instanceOutputs, instanceInputs, masterURL);
137             instances.add(instance);
138         }
139         
140         return instances;
141     }
142 }