View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   package org.galagosearch.tupleflow;
3   
4   import java.io.IOException;
5   import java.util.ArrayList;
6   import java.util.List;
7   import java.util.logging.Logger;
8   
9   /***
10   *
11   * @author trevor
12   */
13  public class SequentialCombiner<T> implements ExNihiloSource<T>, TypeReader<T> {
14      ArrayList<String> filenames;
15      Order<T> order;
16      FileOrderedReader<T> active;
17      public Processor<T> processor;
18      boolean closeOnExit = true;
19      Logger logger;
20  
21      /*** Creates a new instance of SequentialCombiner */
22      public SequentialCombiner(List<String> filenames, Order<T> order) {
23          this.filenames = new ArrayList<String>(filenames);
24          this.order = order;
25          this.logger = Logger.getLogger(SequentialCombiner.class.toString());
26      }
27  
28      public Class<T> getOutputClass() {
29          return order.getOrderedClass();
30      }
31  
32      public void setProcessor(Step processor) throws IncompatibleProcessorException {
33          Linkage.link(this, processor);
34      }
35  
36      public void run() throws IOException {
37          logger.info("Starting");
38  
39          for (String filename : filenames) {
40              logger.info("Opening: " + filename);
41              FileOrderedReader<T> reader = new FileOrderedReader<T>(filename, order);
42              T object;
43  
44              while ((object = reader.read()) != null) {
45                  processor.process(object);
46              }
47  
48              reader.close();
49              logger.info("Closing: " + filename);
50          }
51  
52          logger.info("Finished");
53          if (closeOnExit) {
54              processor.close();
55          }
56      }
57  
58      public static <S> SequentialCombiner<S> combineFromFiles(
59              List<String> filenames,
60              Order<S> order) throws IOException {
61          return new SequentialCombiner<S>(filenames, order);
62      }
63  
64      public T read() throws IOException {
65          if (active == null) {
66              if (filenames.size() == 0) {
67                  logger.info("Complete");
68                  return null;
69              } else {
70                  logger.info("Opening: " + filenames.get(0));
71                  active = new FileOrderedReader<T>(filenames.get(0), order);
72                  filenames.remove(0);
73              }
74          }
75  
76          T object = active.read();
77  
78          while (object == null && filenames.size() > 0) {
79              logger.info("Opening: " + filenames.get(0));
80              active = new FileOrderedReader<T>(filenames.get(0), order);
81              filenames.remove(0);
82              object = active.read();
83          }
84  
85          return object;
86      }
87  }