Coverage Report - org.galagosearch.tupleflow.Splitter
 
Classes in this File Line Coverage Branch Coverage Complexity
Splitter
0%
0/48
0%
0/34
0
 
 1  
 // BSD License (http://www.galagosearch.org/license)
 2  
 package org.galagosearch.tupleflow;
 3  
 
 4  
 import java.io.FileNotFoundException;
 5  
 import java.io.IOException;
 6  
 import java.lang.reflect.InvocationTargetException;
 7  
 
 8  0
 public class Splitter<T> implements Processor<T> {
 9  
     private String prefix;
 10  
     private Processor<T>[] processors;
 11  
     private Order<T> typeOrder;
 12  
 
 13  0
     public Splitter(Processor<T>[] processors, Order<T> order) {
 14  0
         this.processors = processors;
 15  0
         this.typeOrder = order;
 16  0
     }
 17  
 
 18  
     public static <S> Splitter<S> splitToFiles(String[] filenames, Order<S> sortOrder, Order<S> hashOrder) throws IOException, IncompatibleProcessorException {
 19  0
         return splitToFiles(filenames, sortOrder, hashOrder, null);
 20  
     }
 21  
 
 22  
     @SuppressWarnings("unchecked")
 23  
     public static <S> Splitter<S> splitToFiles(String[] filenames, Order<S> sortOrder, Order<S> hashOrder, Class reducerClass) throws IOException, IncompatibleProcessorException {
 24  0
         assert sortOrder != null;
 25  0
         assert hashOrder != null;
 26  
 
 27  0
         Processor[] processors = new Processor[filenames.length];
 28  
 
 29  
         try {
 30  0
             for (int i = 0; i < filenames.length; i++) {
 31  0
                 FileOrderedWriter<S> writer = new FileOrderedWriter<S>(filenames[i], sortOrder);
 32  
                 Sorter sorter;
 33  0
                 if (reducerClass != null) {
 34  0
                     sorter = new Sorter<S>(sortOrder, (Reducer<S>) reducerClass.getConstructor().
 35  
                                            newInstance());
 36  
                 } else {
 37  0
                     sorter = new Sorter<S>(sortOrder);
 38  
                 }
 39  0
                 sorter.setProcessor(writer);
 40  0
                 processors[i] = sorter;
 41  
             }
 42  0
         } catch (NoSuchMethodException e) {
 43  0
             throw new IOException(e.getMessage());
 44  0
         } catch (InvocationTargetException e) {
 45  0
             throw new IOException(e.getMessage());
 46  0
         } catch (InstantiationException e) {
 47  0
             throw new IOException(e.getMessage());
 48  0
         } catch (IllegalAccessException e) {
 49  0
             throw new IOException(e.getMessage());
 50  0
         }
 51  
 
 52  0
         return new Splitter<S>(processors, hashOrder);
 53  
     }
 54  
 
 55  
     @SuppressWarnings("unchecked")
 56  
     public static <S> Splitter splitToFiles(String prefix, int limit, Type<S> type, Order<S> order, int count) throws IOException, FileNotFoundException, IncompatibleProcessorException {
 57  0
         assert type != null;
 58  0
         assert order != null;
 59  
 
 60  0
         Processor[] processors = new Processor[count];
 61  
 
 62  0
         for (int i = 0; i < count; i++) {
 63  0
             String filename = prefix + i;
 64  0
             FileOrderedWriter<S> writer = new FileOrderedWriter<S>(filename, order);
 65  0
             Sorter<S> sorter = new Sorter<S>(limit, order);
 66  0
             sorter.setProcessor(writer);
 67  0
             processors[i] = sorter;
 68  
         }
 69  
 
 70  0
         return new Splitter<S>(processors, order);
 71  
     }
 72  
 
 73  
     public void process(T object) throws IOException {
 74  0
         int hash = typeOrder.hash(object);
 75  0
         if (hash < 0) {
 76  0
             hash = ~hash; // using bitwise complement, because -Integer.MIN_VALUE is still negative
 77  
         }
 78  0
         assert hash >= 0 : "Just absed the hash value, so this should always be true";
 79  0
         hash = hash % processors.length;
 80  0
         assert hash >= 0 : "Mod operation made it negative!";
 81  0
         processors[hash].process(object);
 82  0
     }
 83  
 
 84  
     public void close() throws IOException {
 85  0
         for (Processor<T> processor : processors) {
 86  0
             processor.close();
 87  
         }
 88  0
         processors = null;
 89  0
     }
 90  
 }