1
2 package org.galagosearch.tupleflow;
3
4 import java.io.FileNotFoundException;
5 import java.io.IOException;
6 import java.lang.reflect.InvocationTargetException;
7
8 public class Splitter<T> implements Processor<T> {
9 private String prefix;
10 private Processor<T>[] processors;
11 private Order<T> typeOrder;
12
13 public Splitter(Processor<T>[] processors, Order<T> order) {
14 this.processors = processors;
15 this.typeOrder = order;
16 }
17
18 public static <S> Splitter<S> splitToFiles(String[] filenames, Order<S> sortOrder, Order<S> hashOrder) throws IOException, IncompatibleProcessorException {
19 return splitToFiles(filenames, sortOrder, hashOrder, null);
20 }
21
22 @SuppressWarnings("unchecked")
23 public static <S> Splitter<S> splitToFiles(String[] filenames, Order<S> sortOrder, Order<S> hashOrder, Class reducerClass) throws IOException, IncompatibleProcessorException {
24 assert sortOrder != null;
25 assert hashOrder != null;
26
27 Processor[] processors = new Processor[filenames.length];
28
29 try {
30 for (int i = 0; i < filenames.length; i++) {
31 FileOrderedWriter<S> writer = new FileOrderedWriter<S>(filenames[i], sortOrder);
32 Sorter sorter;
33 if (reducerClass != null) {
34 sorter = new Sorter<S>(sortOrder, (Reducer<S>) reducerClass.getConstructor().
35 newInstance());
36 } else {
37 sorter = new Sorter<S>(sortOrder);
38 }
39 sorter.setProcessor(writer);
40 processors[i] = sorter;
41 }
42 } catch (NoSuchMethodException e) {
43 throw new IOException(e.getMessage());
44 } catch (InvocationTargetException e) {
45 throw new IOException(e.getMessage());
46 } catch (InstantiationException e) {
47 throw new IOException(e.getMessage());
48 } catch (IllegalAccessException e) {
49 throw new IOException(e.getMessage());
50 }
51
52 return new Splitter<S>(processors, hashOrder);
53 }
54
55 @SuppressWarnings("unchecked")
56 public static <S> Splitter splitToFiles(String prefix, int limit, Type<S> type, Order<S> order, int count) throws IOException, FileNotFoundException, IncompatibleProcessorException {
57 assert type != null;
58 assert order != null;
59
60 Processor[] processors = new Processor[count];
61
62 for (int i = 0; i < count; i++) {
63 String filename = prefix + i;
64 FileOrderedWriter<S> writer = new FileOrderedWriter<S>(filename, order);
65 Sorter<S> sorter = new Sorter<S>(limit, order);
66 sorter.setProcessor(writer);
67 processors[i] = sorter;
68 }
69
70 return new Splitter<S>(processors, order);
71 }
72
73 public void process(T object) throws IOException {
74 int hash = typeOrder.hash(object);
75 if (hash < 0) {
76 hash = ~hash;
77 }
78 assert hash >= 0 : "Just absed the hash value, so this should always be true";
79 hash = hash % processors.length;
80 assert hash >= 0 : "Mod operation made it negative!";
81 processors[hash].process(object);
82 }
83
84 public void close() throws IOException {
85 for (Processor<T> processor : processors) {
86 processor.close();
87 }
88 processors = null;
89 }
90 }