View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   
3   package org.galagosearch.tupleflow;
4   
5   import java.io.IOException;
6   import java.util.Arrays;
7   import java.util.Comparator;
8   import java.util.List;
9   import java.util.logging.Logger;
10  
11  
12  /***
13   *
14   * @author trevor
15   */
16  public class OrderedCombiner<T> implements ReaderSource<T> {
17      TypeReader<T>[] inputs;
18      FileOrderedReader<T>[] files;
19      Order<T> order;
20      public Step processor;
21      boolean closeOnExit;
22      static int defaultBufferSize = 1000;
23      boolean initialized = false;
24      ReaderSource<T> source = null;
25      T last = null;
26  
27      public static class SortPair<T> {
28          public SortPair(T object, TypeReader<T> more) {
29              this.object = object;
30              this.more = more;
31          }
32  
33          public T object;
34          public TypeReader<T> more;
35      }
36  
37      private Comparator<SortPair<T>> sortComparator(Comparator<T> compare) {
38          final Comparator<T> c = compare;
39          return new Comparator<SortPair<T>>() {
40              public int compare(SortPair<T> one, SortPair<T> two) {
41                  return c.compare(one.object, two.object);
42              }
43          };
44      }
45  
46      public OrderedCombiner(TypeReader<T>[] inputs, FileOrderedReader<T>[] files, Order<T> order, Processor<T> processor, boolean closeOnExit) {
47          this.inputs = inputs;
48          this.files = files;
49          this.order = order;
50          this.processor = processor;
51          this.closeOnExit = closeOnExit;
52      }
53  
54      @SuppressWarnings(value = "unchecked")
55      public OrderedCombiner(TypeReader<T>[] inputs, Order<T> order, Processor<T> processor) {
56          this(inputs, new FileOrderedReader[0], order, processor, true);
57      }
58  
59      @SuppressWarnings(value = "unchecked")
60      public OrderedCombiner(TypeReader<T>[] inputs, Order<T> order) {
61          this(inputs, new FileOrderedReader[0], order, null, true);
62      }
63  
64      public Class<T> getOutputClass() {
65          return order.getOrderedClass();
66      }
67  
68      public void setProcessor(final Step processor) throws IncompatibleProcessorException {
69          this.processor = processor;
70      }
71  
72      public static <S> OrderedCombiner combineFromFiles(List<String> filenames, Order<S> order) throws IOException {
73          return combineFromFiles(filenames, order, null, true, defaultBufferSize);
74      }
75  
76      @SuppressWarnings(value = "unchecked")
77      public static <S> OrderedCombiner combineFromFiles(List<String> filenames, Order<S> order, Processor<S> processor, boolean closeOnExit, int bufferSize) throws IOException {
78          TypeReader[] inputs = new TypeReader[filenames.size()];
79          FileOrderedReader[] readers = new FileOrderedReader[filenames.size()];
80  
81          for (int i = 0; i < filenames.size(); i++) {
82              readers[i] = new FileOrderedReader<S>(filenames.get(i), order, bufferSize / filenames.size());
83              inputs[i] = readers[i].getOrderedReader();
84          }
85  
86          return new OrderedCombiner<S>(inputs, readers, order, processor, closeOnExit);
87      } 
88      
89      @SuppressWarnings(value = "unchecked")
90      public static <S> OrderedCombiner combineFromFiles(List<String> filenames) throws IOException {
91          TypeReader[] inputs = new TypeReader[filenames.size()];
92          FileOrderedReader[] readers = new FileOrderedReader[filenames.size()];
93          assert filenames.size() > 0;
94  
95          for (int i = 0; i < filenames.size(); i++) {
96              readers[i] = new FileOrderedReader<S>(filenames.get(i));
97              inputs[i] = readers[i].getOrderedReader();
98          }
99  
100         return new OrderedCombiner<S>(inputs, readers, readers[0].getOrder(), null, true);
101     }
102 
103     public static <S> OrderedCombiner combineFromFiles(List<String> filenames, Order<S> order, Processor<S> processor) throws IOException {
104         return combineFromFiles(filenames, order, processor, true, defaultBufferSize);
105     }
106 
107     public T read() throws IOException {
108         if (source == null) {
109             source = order.orderedCombiner(Arrays.asList(inputs), false);
110         }
111         
112         T result = source.read();
113 
114         if(result == null)
115             close();
116         
117         return result;
118     }
119 
120     @SuppressWarnings("unchecked")
121     public void close() throws IOException {
122         for (FileOrderedReader reader : files) {
123             reader.close();
124         }
125         
126         files = (FileOrderedReader<T>[]) new FileOrderedReader[0];
127     }
128     
129     public void run() throws IOException {
130         if (inputs.length == 0) {
131             return;
132         }
133         source = order.orderedCombiner(Arrays.asList(inputs), false);
134 
135         try {
136             source.setProcessor(processor);
137         } catch (IncompatibleProcessorException e) {
138             throw (IOException) new IOException("Wasn't able to link to this processor object.").initCause(e);
139         }
140 
141         source.run();
142 
143         if (closeOnExit) {
144             Linkage.close(processor);
145         }
146 
147         close();
148     }
149 }