1
2 package org.galagosearch.tupleflow;
3
4 import java.io.IOException;
5 import java.util.ArrayList;
6 import java.util.List;
7 import java.util.logging.Logger;
8
9 /***
10 *
11 * @author trevor
12 */
13 public class SequentialCombiner<T> implements ExNihiloSource<T>, TypeReader<T> {
14 ArrayList<String> filenames;
15 Order<T> order;
16 FileOrderedReader<T> active;
17 public Processor<T> processor;
18 boolean closeOnExit = true;
19 Logger logger;
20
21 /*** Creates a new instance of SequentialCombiner */
22 public SequentialCombiner(List<String> filenames, Order<T> order) {
23 this.filenames = new ArrayList<String>(filenames);
24 this.order = order;
25 this.logger = Logger.getLogger(SequentialCombiner.class.toString());
26 }
27
28 public Class<T> getOutputClass() {
29 return order.getOrderedClass();
30 }
31
32 public void setProcessor(Step processor) throws IncompatibleProcessorException {
33 Linkage.link(this, processor);
34 }
35
36 public void run() throws IOException {
37 logger.info("Starting");
38
39 for (String filename : filenames) {
40 logger.info("Opening: " + filename);
41 FileOrderedReader<T> reader = new FileOrderedReader<T>(filename, order);
42 T object;
43
44 while ((object = reader.read()) != null) {
45 processor.process(object);
46 }
47
48 reader.close();
49 logger.info("Closing: " + filename);
50 }
51
52 logger.info("Finished");
53 if (closeOnExit) {
54 processor.close();
55 }
56 }
57
58 public static <S> SequentialCombiner<S> combineFromFiles(
59 List<String> filenames,
60 Order<S> order) throws IOException {
61 return new SequentialCombiner<S>(filenames, order);
62 }
63
64 public T read() throws IOException {
65 if (active == null) {
66 if (filenames.size() == 0) {
67 logger.info("Complete");
68 return null;
69 } else {
70 logger.info("Opening: " + filenames.get(0));
71 active = new FileOrderedReader<T>(filenames.get(0), order);
72 filenames.remove(0);
73 }
74 }
75
76 T object = active.read();
77
78 while (object == null && filenames.size() > 0) {
79 logger.info("Opening: " + filenames.get(0));
80 active = new FileOrderedReader<T>(filenames.get(0), order);
81 filenames.remove(0);
82 object = active.read();
83 }
84
85 return object;
86 }
87 }