View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   
3   package org.galagosearch.tupleflow.execution;
4   
5   import java.io.File;
6   import java.io.Serializable;
7   import java.util.ArrayList;
8   import java.util.Arrays;
9   import java.util.Map;
10  
11  /***
12   *
13   * @author trevor
14   */
15  public class StageInstanceDescription implements Serializable {
16  
17      /// A stage object (probably genereated by the JobConstructor parser)
18      Stage stage;
19      /// The index of this job (between 0 and n-1 if there are n instances)
20      int index;
21  
22      /// A map from stage input names to data pipe structures (note that the input to a pipe connects to a stage output)
23      private Map<String, PipeOutput> readers;
24      private /// A map from stage input names to data pipe structures (note that the input to a pipe connects to a stage output)
25      Map<String, PipeInput> writers;
26  
27      /// The URL of the Master server for this job.
28      String masterURL;
29  
30      public StageInstanceDescription(
31              Stage stage, int index,
32              Map<String, PipeInput> pipeInputs,
33              Map<String, PipeOutput> pipeOutputs,
34              String masterURL) {
35          this.stage = stage;
36          this.index = index;
37          this.writers = pipeInputs;
38          this.readers = pipeOutputs;
39          this.masterURL = masterURL;
40      }
41  
42      /***
43       * @return the readers
44       */
45      public Map<String, PipeOutput> getReaders() {
46          return readers;
47      }
48  
49      /***
50       * @return the writers
51       */
52      public Map<String, PipeInput> getWriters() {
53          return writers;
54      }
55  
56      public static class PipeInput implements Serializable {
57          private int index;
58          private DataPipe pipe;
59  
60          public PipeInput(DataPipe pipe, int index) {
61              this.index = index;
62              this.pipe = pipe;
63          }
64  
65          public String[] getFileNames() {
66              return pipe.getInputFileNames(index);
67          }
68  
69          public DataPipe getPipe() {
70              return pipe;
71          }
72      }
73  
74      public static class PipeOutput implements Serializable {
75          private int start;
76          private int stop;
77          private DataPipe pipe;
78  
79          public PipeOutput(DataPipe pipe, int index) {
80              this(pipe, index, index + 1);
81          }
82  
83          public PipeOutput(DataPipe pipe, int start, int stop) {
84              this.start = start;
85              this.stop = stop;
86              this.pipe = pipe;
87          }
88  
89          public String[] getFileNames() {
90              ArrayList<String[]> allFilenames = new ArrayList();
91              int totalNames = 0;
92  
93              for (int i = start; i < stop; i++) {
94                  String[] batch = pipe.getOutputFileNames(i);
95                  totalNames += batch.length;
96                  allFilenames.add(batch);
97              }
98  
99              String[] result = new String[totalNames];
100             int spot = 0;
101 
102             for (String[] batch : allFilenames) {
103                 System.arraycopy(batch, 0, result, spot, batch.length);
104                 spot += batch.length;
105             }
106 
107             return result;
108         }
109 
110         public DataPipe getPipe() {
111             return pipe;
112         }
113     }
114 
115     public String getName() {
116         return stage.name;
117     }
118 
119     public int getIndex() {
120         return index;
121     }
122 
123     public String getPath() {
124         return getName() + File.separator + getIndex();
125     }
126 
127     public String getMasterURL() {
128         return masterURL;
129     }
130 
131     public boolean writerExists(String specification, String className, String[] order) {
132         PipeInput input = getWriters().get(specification);
133         if (input == null) return false;
134         return input.getPipe().getClassName().equals(className) &&
135                Arrays.equals(input.getPipe().getOrder(), order);
136     }
137 
138     public boolean readerExists(String specification, String className, String[] order) {
139         PipeOutput output = getReaders().get(specification);
140         if (output == null) return false;
141         return output.getPipe().getClassName().equals(className) &&
142                Arrays.equals(output.getPipe().getOrder(), order);
143     }
144 
145     public Stage getStage() {
146         return stage;
147     }
148 }