View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   package org.galagosearch.tupleflow;
3   
4   import java.io.FileNotFoundException;
5   import java.io.IOException;
6   import java.lang.reflect.InvocationTargetException;
7   
8   public class Splitter<T> implements Processor<T> {
9       private String prefix;
10      private Processor<T>[] processors;
11      private Order<T> typeOrder;
12  
13      public Splitter(Processor<T>[] processors, Order<T> order) {
14          this.processors = processors;
15          this.typeOrder = order;
16      }
17  
18      public static <S> Splitter<S> splitToFiles(String[] filenames, Order<S> sortOrder, Order<S> hashOrder) throws IOException, IncompatibleProcessorException {
19          return splitToFiles(filenames, sortOrder, hashOrder, null);
20      }
21  
22      @SuppressWarnings("unchecked")
23      public static <S> Splitter<S> splitToFiles(String[] filenames, Order<S> sortOrder, Order<S> hashOrder, Class reducerClass) throws IOException, IncompatibleProcessorException {
24          assert sortOrder != null;
25          assert hashOrder != null;
26  
27          Processor[] processors = new Processor[filenames.length];
28  
29          try {
30              for (int i = 0; i < filenames.length; i++) {
31                  FileOrderedWriter<S> writer = new FileOrderedWriter<S>(filenames[i], sortOrder);
32                  Sorter sorter;
33                  if (reducerClass != null) {
34                      sorter = new Sorter<S>(sortOrder, (Reducer<S>) reducerClass.getConstructor().
35                                             newInstance());
36                  } else {
37                      sorter = new Sorter<S>(sortOrder);
38                  }
39                  sorter.setProcessor(writer);
40                  processors[i] = sorter;
41              }
42          } catch (NoSuchMethodException e) {
43              throw new IOException(e.getMessage());
44          } catch (InvocationTargetException e) {
45              throw new IOException(e.getMessage());
46          } catch (InstantiationException e) {
47              throw new IOException(e.getMessage());
48          } catch (IllegalAccessException e) {
49              throw new IOException(e.getMessage());
50          }
51  
52          return new Splitter<S>(processors, hashOrder);
53      }
54  
55      @SuppressWarnings("unchecked")
56      public static <S> Splitter splitToFiles(String prefix, int limit, Type<S> type, Order<S> order, int count) throws IOException, FileNotFoundException, IncompatibleProcessorException {
57          assert type != null;
58          assert order != null;
59  
60          Processor[] processors = new Processor[count];
61  
62          for (int i = 0; i < count; i++) {
63              String filename = prefix + i;
64              FileOrderedWriter<S> writer = new FileOrderedWriter<S>(filename, order);
65              Sorter<S> sorter = new Sorter<S>(limit, order);
66              sorter.setProcessor(writer);
67              processors[i] = sorter;
68          }
69  
70          return new Splitter<S>(processors, order);
71      }
72  
73      public void process(T object) throws IOException {
74          int hash = typeOrder.hash(object);
75          if (hash < 0) {
76              hash = ~hash; // using bitwise complement, because -Integer.MIN_VALUE is still negative
77          }
78          assert hash >= 0 : "Just absed the hash value, so this should always be true";
79          hash = hash % processors.length;
80          assert hash >= 0 : "Mod operation made it negative!";
81          processors[hash].process(object);
82      }
83  
84      public void close() throws IOException {
85          for (Processor<T> processor : processors) {
86              processor.close();
87          }
88          processors = null;
89      }
90  }