1
2
3 package org.galagosearch.tupleflow;
4
5 import java.io.IOException;
6 import java.util.Arrays;
7 import java.util.Comparator;
8 import java.util.List;
9 import java.util.logging.Logger;
10
11
12 /***
13 *
14 * @author trevor
15 */
16 public class OrderedCombiner<T> implements ReaderSource<T> {
17 TypeReader<T>[] inputs;
18 FileOrderedReader<T>[] files;
19 Order<T> order;
20 public Step processor;
21 boolean closeOnExit;
22 static int defaultBufferSize = 1000;
23 boolean initialized = false;
24 ReaderSource<T> source = null;
25 T last = null;
26
27 public static class SortPair<T> {
28 public SortPair(T object, TypeReader<T> more) {
29 this.object = object;
30 this.more = more;
31 }
32
33 public T object;
34 public TypeReader<T> more;
35 }
36
37 private Comparator<SortPair<T>> sortComparator(Comparator<T> compare) {
38 final Comparator<T> c = compare;
39 return new Comparator<SortPair<T>>() {
40 public int compare(SortPair<T> one, SortPair<T> two) {
41 return c.compare(one.object, two.object);
42 }
43 };
44 }
45
46 public OrderedCombiner(TypeReader<T>[] inputs, FileOrderedReader<T>[] files, Order<T> order, Processor<T> processor, boolean closeOnExit) {
47 this.inputs = inputs;
48 this.files = files;
49 this.order = order;
50 this.processor = processor;
51 this.closeOnExit = closeOnExit;
52 }
53
54 @SuppressWarnings(value = "unchecked")
55 public OrderedCombiner(TypeReader<T>[] inputs, Order<T> order, Processor<T> processor) {
56 this(inputs, new FileOrderedReader[0], order, processor, true);
57 }
58
59 @SuppressWarnings(value = "unchecked")
60 public OrderedCombiner(TypeReader<T>[] inputs, Order<T> order) {
61 this(inputs, new FileOrderedReader[0], order, null, true);
62 }
63
64 public Class<T> getOutputClass() {
65 return order.getOrderedClass();
66 }
67
68 public void setProcessor(final Step processor) throws IncompatibleProcessorException {
69 this.processor = processor;
70 }
71
72 public static <S> OrderedCombiner combineFromFiles(List<String> filenames, Order<S> order) throws IOException {
73 return combineFromFiles(filenames, order, null, true, defaultBufferSize);
74 }
75
76 @SuppressWarnings(value = "unchecked")
77 public static <S> OrderedCombiner combineFromFiles(List<String> filenames, Order<S> order, Processor<S> processor, boolean closeOnExit, int bufferSize) throws IOException {
78 TypeReader[] inputs = new TypeReader[filenames.size()];
79 FileOrderedReader[] readers = new FileOrderedReader[filenames.size()];
80
81 for (int i = 0; i < filenames.size(); i++) {
82 readers[i] = new FileOrderedReader<S>(filenames.get(i), order, bufferSize / filenames.size());
83 inputs[i] = readers[i].getOrderedReader();
84 }
85
86 return new OrderedCombiner<S>(inputs, readers, order, processor, closeOnExit);
87 }
88
89 @SuppressWarnings(value = "unchecked")
90 public static <S> OrderedCombiner combineFromFiles(List<String> filenames) throws IOException {
91 TypeReader[] inputs = new TypeReader[filenames.size()];
92 FileOrderedReader[] readers = new FileOrderedReader[filenames.size()];
93 assert filenames.size() > 0;
94
95 for (int i = 0; i < filenames.size(); i++) {
96 readers[i] = new FileOrderedReader<S>(filenames.get(i));
97 inputs[i] = readers[i].getOrderedReader();
98 }
99
100 return new OrderedCombiner<S>(inputs, readers, readers[0].getOrder(), null, true);
101 }
102
103 public static <S> OrderedCombiner combineFromFiles(List<String> filenames, Order<S> order, Processor<S> processor) throws IOException {
104 return combineFromFiles(filenames, order, processor, true, defaultBufferSize);
105 }
106
107 public T read() throws IOException {
108 if (source == null) {
109 source = order.orderedCombiner(Arrays.asList(inputs), false);
110 }
111
112 T result = source.read();
113
114 if(result == null)
115 close();
116
117 return result;
118 }
119
120 @SuppressWarnings("unchecked")
121 public void close() throws IOException {
122 for (FileOrderedReader reader : files) {
123 reader.close();
124 }
125
126 files = (FileOrderedReader<T>[]) new FileOrderedReader[0];
127 }
128
129 public void run() throws IOException {
130 if (inputs.length == 0) {
131 return;
132 }
133 source = order.orderedCombiner(Arrays.asList(inputs), false);
134
135 try {
136 source.setProcessor(processor);
137 } catch (IncompatibleProcessorException e) {
138 throw (IOException) new IOException("Wasn't able to link to this processor object.").initCause(e);
139 }
140
141 source.run();
142
143 if (closeOnExit) {
144 Linkage.close(processor);
145 }
146
147 close();
148 }
149 }