1
2
3 package org.galagosearch.tupleflow.execution;
4
5 import java.io.File;
6 import java.io.Serializable;
7 import java.util.ArrayList;
8 import java.util.Arrays;
9 import java.util.Map;
10
11 /***
12 *
13 * @author trevor
14 */
15 public class StageInstanceDescription implements Serializable {
16
17
18 Stage stage;
19
20 int index;
21
22
23 private Map<String, PipeOutput> readers;
24 private
25 Map<String, PipeInput> writers;
26
27
28 String masterURL;
29
30 public StageInstanceDescription(
31 Stage stage, int index,
32 Map<String, PipeInput> pipeInputs,
33 Map<String, PipeOutput> pipeOutputs,
34 String masterURL) {
35 this.stage = stage;
36 this.index = index;
37 this.writers = pipeInputs;
38 this.readers = pipeOutputs;
39 this.masterURL = masterURL;
40 }
41
42 /***
43 * @return the readers
44 */
45 public Map<String, PipeOutput> getReaders() {
46 return readers;
47 }
48
49 /***
50 * @return the writers
51 */
52 public Map<String, PipeInput> getWriters() {
53 return writers;
54 }
55
56 public static class PipeInput implements Serializable {
57 private int index;
58 private DataPipe pipe;
59
60 public PipeInput(DataPipe pipe, int index) {
61 this.index = index;
62 this.pipe = pipe;
63 }
64
65 public String[] getFileNames() {
66 return pipe.getInputFileNames(index);
67 }
68
69 public DataPipe getPipe() {
70 return pipe;
71 }
72 }
73
74 public static class PipeOutput implements Serializable {
75 private int start;
76 private int stop;
77 private DataPipe pipe;
78
79 public PipeOutput(DataPipe pipe, int index) {
80 this(pipe, index, index + 1);
81 }
82
83 public PipeOutput(DataPipe pipe, int start, int stop) {
84 this.start = start;
85 this.stop = stop;
86 this.pipe = pipe;
87 }
88
89 public String[] getFileNames() {
90 ArrayList<String[]> allFilenames = new ArrayList();
91 int totalNames = 0;
92
93 for (int i = start; i < stop; i++) {
94 String[] batch = pipe.getOutputFileNames(i);
95 totalNames += batch.length;
96 allFilenames.add(batch);
97 }
98
99 String[] result = new String[totalNames];
100 int spot = 0;
101
102 for (String[] batch : allFilenames) {
103 System.arraycopy(batch, 0, result, spot, batch.length);
104 spot += batch.length;
105 }
106
107 return result;
108 }
109
110 public DataPipe getPipe() {
111 return pipe;
112 }
113 }
114
115 public String getName() {
116 return stage.name;
117 }
118
119 public int getIndex() {
120 return index;
121 }
122
123 public String getPath() {
124 return getName() + File.separator + getIndex();
125 }
126
127 public String getMasterURL() {
128 return masterURL;
129 }
130
131 public boolean writerExists(String specification, String className, String[] order) {
132 PipeInput input = getWriters().get(specification);
133 if (input == null) return false;
134 return input.getPipe().getClassName().equals(className) &&
135 Arrays.equals(input.getPipe().getOrder(), order);
136 }
137
138 public boolean readerExists(String specification, String className, String[] order) {
139 PipeOutput output = getReaders().get(specification);
140 if (output == null) return false;
141 return output.getPipe().getClassName().equals(className) &&
142 Arrays.equals(output.getPipe().getOrder(), order);
143 }
144
145 public Stage getStage() {
146 return stage;
147 }
148 }