Coverage Report - org.galagosearch.tupleflow.OrderedCombiner
 
Classes in this File Line Coverage Branch Coverage Complexity
OrderedCombiner
29%
17/58
6%
1/18
0
OrderedCombiner$1
0%
0/2
N/A
0
OrderedCombiner$SortPair
0%
0/4
N/A
0
 
 1  
 // BSD License (http://www.galagosearch.org/license)
 2  
 
 3  
 package org.galagosearch.tupleflow;
 4  
 
 5  
 import java.io.IOException;
 6  
 import java.util.Arrays;
 7  
 import java.util.Comparator;
 8  
 import java.util.List;
 9  
 import java.util.logging.Logger;
 10  
 
 11  
 
 12  
 /**
 13  
  *
 14  
  * @author trevor
 15  
  */
 16  2
 public class OrderedCombiner<T> implements ReaderSource<T> {
 17  
     TypeReader<T>[] inputs;
 18  
     FileOrderedReader<T>[] files;
 19  
     Order<T> order;
 20  
     public Step processor;
 21  
     boolean closeOnExit;
 22  2
     static int defaultBufferSize = 1000;
 23  4
     boolean initialized = false;
 24  4
     ReaderSource<T> source = null;
 25  4
     T last = null;
 26  
 
 27  
     public static class SortPair<T> {
 28  0
         public SortPair(T object, TypeReader<T> more) {
 29  0
             this.object = object;
 30  0
             this.more = more;
 31  0
         }
 32  
 
 33  
         public T object;
 34  
         public TypeReader<T> more;
 35  
     }
 36  
 
 37  
     private Comparator<SortPair<T>> sortComparator(Comparator<T> compare) {
 38  0
         final Comparator<T> c = compare;
 39  0
         return new Comparator<SortPair<T>>() {
 40  0
             public int compare(SortPair<T> one, SortPair<T> two) {
 41  0
                 return c.compare(one.object, two.object);
 42  
             }
 43  
         };
 44  
     }
 45  
 
 46  4
     public OrderedCombiner(TypeReader<T>[] inputs, FileOrderedReader<T>[] files, Order<T> order, Processor<T> processor, boolean closeOnExit) {
 47  4
         this.inputs = inputs;
 48  4
         this.files = files;
 49  4
         this.order = order;
 50  4
         this.processor = processor;
 51  4
         this.closeOnExit = closeOnExit;
 52  4
     }
 53  
 
 54  
     @SuppressWarnings(value = "unchecked")
 55  
     public OrderedCombiner(TypeReader<T>[] inputs, Order<T> order, Processor<T> processor) {
 56  0
         this(inputs, new FileOrderedReader[0], order, processor, true);
 57  0
     }
 58  
 
 59  
     @SuppressWarnings(value = "unchecked")
 60  
     public OrderedCombiner(TypeReader<T>[] inputs, Order<T> order) {
 61  4
         this(inputs, new FileOrderedReader[0], order, null, true);
 62  4
     }
 63  
 
 64  
     public Class<T> getOutputClass() {
 65  2
         return order.getOrderedClass();
 66  
     }
 67  
 
 68  
     public void setProcessor(final Step processor) throws IncompatibleProcessorException {
 69  0
         this.processor = processor;
 70  0
     }
 71  
 
 72  
     public static <S> OrderedCombiner combineFromFiles(List<String> filenames, Order<S> order) throws IOException {
 73  0
         return combineFromFiles(filenames, order, null, true, defaultBufferSize);
 74  
     }
 75  
 
 76  
     @SuppressWarnings(value = "unchecked")
 77  
     public static <S> OrderedCombiner combineFromFiles(List<String> filenames, Order<S> order, Processor<S> processor, boolean closeOnExit, int bufferSize) throws IOException {
 78  0
         TypeReader[] inputs = new TypeReader[filenames.size()];
 79  0
         FileOrderedReader[] readers = new FileOrderedReader[filenames.size()];
 80  
 
 81  0
         for (int i = 0; i < filenames.size(); i++) {
 82  0
             readers[i] = new FileOrderedReader<S>(filenames.get(i), order, bufferSize / filenames.size());
 83  0
             inputs[i] = readers[i].getOrderedReader();
 84  
         }
 85  
 
 86  0
         return new OrderedCombiner<S>(inputs, readers, order, processor, closeOnExit);
 87  
     } 
 88  
     
 89  
     @SuppressWarnings(value = "unchecked")
 90  
     public static <S> OrderedCombiner combineFromFiles(List<String> filenames) throws IOException {
 91  0
         TypeReader[] inputs = new TypeReader[filenames.size()];
 92  0
         FileOrderedReader[] readers = new FileOrderedReader[filenames.size()];
 93  0
         assert filenames.size() > 0;
 94  
 
 95  0
         for (int i = 0; i < filenames.size(); i++) {
 96  0
             readers[i] = new FileOrderedReader<S>(filenames.get(i));
 97  0
             inputs[i] = readers[i].getOrderedReader();
 98  
         }
 99  
 
 100  0
         return new OrderedCombiner<S>(inputs, readers, readers[0].getOrder(), null, true);
 101  
     }
 102  
 
 103  
     public static <S> OrderedCombiner combineFromFiles(List<String> filenames, Order<S> order, Processor<S> processor) throws IOException {
 104  0
         return combineFromFiles(filenames, order, processor, true, defaultBufferSize);
 105  
     }
 106  
 
 107  
     public T read() throws IOException {
 108  0
         if (source == null) {
 109  0
             source = order.orderedCombiner(Arrays.asList(inputs), false);
 110  
         }
 111  
         
 112  0
         T result = source.read();
 113  
 
 114  0
         if(result == null)
 115  0
             close();
 116  
         
 117  0
         return result;
 118  
     }
 119  
 
 120  
     @SuppressWarnings("unchecked")
 121  
     public void close() throws IOException {
 122  0
         for (FileOrderedReader reader : files) {
 123  0
             reader.close();
 124  
         }
 125  
         
 126  0
         files = (FileOrderedReader<T>[]) new FileOrderedReader[0];
 127  0
     }
 128  
     
 129  
     public void run() throws IOException {
 130  2
         if (inputs.length == 0) {
 131  2
             return;
 132  
         }
 133  0
         source = order.orderedCombiner(Arrays.asList(inputs), false);
 134  
 
 135  
         try {
 136  0
             source.setProcessor(processor);
 137  0
         } catch (IncompatibleProcessorException e) {
 138  0
             throw (IOException) new IOException("Wasn't able to link to this processor object.").initCause(e);
 139  0
         }
 140  
 
 141  0
         source.run();
 142  
 
 143  0
         if (closeOnExit) {
 144  0
             Linkage.close(processor);
 145  
         }
 146  
 
 147  0
         close();
 148  0
     }
 149  
 }