Coverage Report - org.galagosearch.tupleflow.Sorter
 
Classes in this File Line Coverage Branch Coverage Complexity
Sorter
43%
84/197
30%
21/70
0
Sorter$1
0%
0/6
N/A
0
Sorter$2
0%
0/8
0%
0/4
0
Sorter$RunWrapper
64%
9/14
100%
2/2
0
 
 1  
 // BSD License (http://www.galagosearch.org/license)
 2  
 package org.galagosearch.tupleflow;
 3  
 
 4  
 import java.io.File;
 5  
 import java.io.FileNotFoundException;
 6  
 import java.io.IOException;
 7  
 import java.lang.management.ManagementFactory;
 8  
 import java.lang.management.MemoryMXBean;
 9  
 import java.lang.management.MemoryNotificationInfo;
 10  
 import java.lang.management.MemoryPoolMXBean;
 11  
 import java.lang.management.MemoryType;
 12  
 import java.lang.management.MemoryUsage;
 13  
 import java.util.ArrayList;
 14  
 import java.util.Arrays;
 15  
 import java.util.Collections;
 16  
 import java.util.Comparator;
 17  
 import java.util.Iterator;
 18  
 import java.util.List;
 19  
 import java.util.PriorityQueue;
 20  
 import java.util.logging.Logger;
 21  
 import javax.management.ListenerNotFoundException;
 22  
 import javax.management.Notification;
 23  
 import javax.management.NotificationEmitter;
 24  
 import javax.management.NotificationListener;
 25  
 import org.galagosearch.tupleflow.execution.ErrorHandler;
 26  
 import org.galagosearch.tupleflow.execution.Verification;
 27  
 
 28  
 /**
 29  
  * <p>
 30  
  * This class sorts an incoming stream of objects in some specified order.
 31  
  * When this object is closed (by calling the close method), the sorted
 32  
  * objects are then sent in sorted order to the next stage of the 
 33  
  * Processor chain.
 34  
  * </p>
 35  
  *
 36  
  * <p>
 37  
  * Since there may be many objects submitted to the Sorter (more than
 38  
  * will fit in main memory), the object may create temporary files
 39  
  * to store partially sorted results.  The path used for these temporary
 40  
  * files is specified in the TempPath Java preferences variable.
 41  
  * </p>
 42  
  *
 43  
  * <p>
 44  
  * In many instances, Sorters are used to generate streams of data that
 45  
  * are then used to create aggregate statistics.  For instance, suppose
 46  
  * we want to compute the monthly sales of a particular corporation, separated
 47  
  * by region.  We can feed a set of transactions to the Sorter, each containing
 48  
  * a dollar amount and the region it came from, e.g.:
 49  
  * <ul>
 50  
  *      <li>($5.39, South)</li>
 51  
  *      <li>($2.24, North)</li>
 52  
  *      <li>($1.50, South)</li>
 53  
  * </ul>
 54  
  * If we sort this list by region name:
 55  
  * <ul>
 56  
  *      <li>($2.24, North)</li>
 57  
  *      <li>($5.39, South)</li>
 58  
  *      <li>($1.50, South)</li>
 59  
  * </ul>
 60  
  * It's now very easy to add up totals for each region (since all data for each
 61  
  * region is adjacent in the list).</p>
 62  
  *
 63  
  * <p>
 64  
  * In these kinds of aggregate applications, it may be more efficient to 
 65  
  * provide the Sorter with a Reducer object.  A Reducer is an object that
 66  
  * transforms <i>n</i> sorted objects of type T into some (hopefully)
 67  
  * smaller number of objects, also of type T.  In the example above, we could
 68  
  * write a reducer that turned those three transactions into:
 69  
  * <ul>
 70  
  *      <li>($2.24, North)</li>
 71  
  *      <li>($6.89, South)</li>
 72  
  * </ul>
 73  
  * which would be equivalent for this application.  Using a Reducer allows
 74  
  * the application to buffer fewer items and hopefully reduce the reliance
 75  
  * on the disk during sorting.</p>
 76  
  *
 77  
  * @author Trevor Strohman
 78  
  */
 79  2
 public class Sorter<T> extends StandardStep<T, T> implements NotificationListener {
 80  
     private int limit;
 81  10
     private int fileLimit = 20;
 82  10
     private volatile boolean flushRequested = false;
 83  
     private ArrayList<T> objects;
 84  
     private ArrayList<List<T>> runs;
 85  10
     private long runsCount = 0;
 86  10
     private Logger logger = Logger.getLogger(Sorter.class.toString());
 87  
     private ArrayList<File> temporaryFiles;
 88  
     private Order<T> order;
 89  
     private Comparator<T> lessThanCompare;
 90  
     private Reducer<T> reducer;
 91  2
     private static int reduceInterval = 100 * 1000;
 92  2
     private static int combineBufferSize = 100 * 1000;
 93  2
     private static int defaultObjectLimit = 50 * 1000 * 1000;
 94  10
     private Counter filesWritten = null;
 95  
     private Counter sorterCombineSteps;
 96  
 
 97  
     public Sorter(Order<T> order) {
 98  10
         this(defaultObjectLimit, order, null, null);
 99  10
     }
 100  
 
 101  
     public Sorter(Order<T> order, Reducer<T> reducer) {
 102  0
         this(defaultObjectLimit, order, reducer, null);
 103  0
     }
 104  
 
 105  
     public Sorter(int limit, Order<T> order) {
 106  0
         this(limit, order, null, null);
 107  0
     }
 108  
 
 109  
     public Sorter(int limit, Order<T> order, Reducer<T> reducer) {
 110  0
         this(limit, order, reducer, null);
 111  0
     }
 112  
 
 113  10
     public Sorter(int limit, Order<T> order, Reducer<T> reducer, Processor<T> processor) {
 114  10
         this.limit = limit;
 115  10
         this.order = order;
 116  10
         this.processor = processor;
 117  10
         this.reducer = reducer;
 118  10
         this.objects = new ArrayList<T>();
 119  10
         this.runs = new ArrayList<List<T>>();
 120  10
         this.temporaryFiles = new ArrayList<File>();
 121  10
         this.lessThanCompare = order.lessThan();
 122  10
         this.flushRequested = false;
 123  
 
 124  10
         requestMemoryWarnings();
 125  10
     }
 126  
 
 127  
     @SuppressWarnings("unchecked")
 128  
     public Sorter(TupleFlowParameters parameters)
 129  
             throws ClassNotFoundException, InstantiationException,
 130  0
                    IllegalAccessException, IOException {
 131  0
         String className = parameters.getXML().get("class");
 132  0
         String[] orderSpec = parameters.getXML().get("order").split(" ");
 133  
 
 134  0
         Class clazz = Class.forName(className);
 135  0
         Type<T> typeInstance = (Type<T>) clazz.newInstance();
 136  0
         this.order = typeInstance.getOrder(orderSpec);
 137  0
         this.limit = (int) parameters.getXML().get("object-limit", defaultObjectLimit);
 138  0
         this.reducer = null;
 139  0
         this.flushRequested = false;
 140  
 
 141  0
         if (parameters.getXML().containsKey("reducer")) {
 142  0
             Class reducerClass = Class.forName(parameters.getXML().get("reducer"));
 143  0
             this.reducer = (Reducer<T>) reducerClass.newInstance();
 144  
         }
 145  
 
 146  0
         this.processor = null;
 147  0
         this.objects = new ArrayList<T>();
 148  0
         this.runs = new ArrayList<List<T>>();
 149  0
         this.temporaryFiles = new ArrayList<File>();
 150  0
         this.lessThanCompare = order.lessThan();
 151  
 
 152  0
         this.filesWritten = parameters.getCounter("Sorter Files Written");
 153  0
         this.sorterCombineSteps = parameters.getCounter("Sorter Combine Steps");
 154  
 
 155  0
         requestMemoryWarnings();
 156  0
     }
 157  
 
 158  
     public void requestMemoryWarnings() {
 159  10
         List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans();
 160  10
         long maxPoolSize = 0;
 161  10
         MemoryPoolMXBean biggestPool = null;
 162  
 
 163  10
         for (MemoryPoolMXBean pool : pools) {
 164  70
             if (pool.getType() != MemoryType.HEAP) {
 165  40
                 continue;
 166  
             }
 167  30
             MemoryUsage usage = pool.getUsage();
 168  
 
 169  30
             if (pool.isUsageThresholdSupported() &&
 170  
                     usage.getMax() > maxPoolSize) {
 171  10
                 maxPoolSize = usage.getMax();
 172  10
                 biggestPool = pool;
 173  
             }
 174  30
         }
 175  
 
 176  10
         if (biggestPool != null) {
 177  10
             biggestPool.setUsageThreshold((long) (maxPoolSize * 0.7));
 178  10
             MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
 179  10
             NotificationEmitter emitter = (NotificationEmitter) memoryBean;
 180  10
             emitter.addNotificationListener(this, null, null);
 181  10
         } else {
 182  0
             throw new RuntimeException("Memory monitoring is not supported.");
 183  
         }
 184  10
     }
 185  
 
 186  
     public void removeMemoryWarnings() {
 187  
         try {
 188  2
             MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
 189  2
             NotificationEmitter emitter = (NotificationEmitter) memoryBean;
 190  2
             emitter.removeNotificationListener(this);
 191  0
         } catch (ListenerNotFoundException e) {
 192  
             // do nothing
 193  2
         }
 194  2
     }
 195  
 
 196  
     public void handleNotification(Notification notification, Object handback) {
 197  0
         if (notification.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
 198  0
             flushRequested = true;
 199  0
             final Sorter f = this;
 200  
 
 201  0
             Thread t = new Thread() {
 202  
                 @Override
 203  0
                 public void run() {
 204  
                     try {
 205  0
                         f.flush();
 206  0
                     } catch (IOException e) {
 207  0
                         logger.severe(e.toString());
 208  0
                     }
 209  0
                 }
 210  
             };
 211  
 
 212  0
             t.start();
 213  
         }
 214  0
     }
 215  
 
 216  
     public static void verify(TupleFlowParameters fullParameters, ErrorHandler handler) {
 217  0
         Parameters parameters = fullParameters.getXML();
 218  0
         String[] requiredParameters = {"order", "class"};
 219  
 
 220  0
         if (!Verification.requireParameters(requiredParameters, parameters, handler)) {
 221  0
             return;
 222  
         }
 223  0
         String className = parameters.get("class");
 224  0
         String[] orderSpec = parameters.get("order").split(" ");
 225  
 
 226  0
         Verification.requireClass(className, handler);
 227  0
         Verification.requireOrder(className, orderSpec, handler);
 228  
 
 229  0
         if (parameters.containsKey("reducer")) {
 230  0
             String reducerType = parameters.get("reducer");
 231  0
             Verification.requireClass(reducerType, handler);
 232  
         }
 233  0
     }
 234  
 
 235  
     public static String getInputClass(TupleFlowParameters parameters) {
 236  2
         return parameters.getXML().get("class", "");
 237  
     }
 238  
 
 239  
     public static String getOutputClass(TupleFlowParameters parameters) {
 240  2
         return parameters.getXML().get("class", "");
 241  
     }
 242  
 
 243  
     public static String[] getOutputOrder(TupleFlowParameters parameters) {
 244  0
         String[] orderSpec = parameters.getXML().get("order", "").split(" ");
 245  0
         return orderSpec;
 246  
     }
 247  
 
 248  
     @Override
 249  
     public String toString() {
 250  0
         return order.getOrderedClass().getName() + " " + Arrays.asList(order.getOrderSpec());
 251  
     }
 252  
 
 253  
     public boolean needsFlush() {
 254  4
         if (flushRequested) {
 255  0
             return true;
 256  
         }
 257  4
         return objects.size() > reduceInterval ||
 258  
                 objects.size() + runsCount > limit;
 259  
     }
 260  
 
 261  
     public synchronized void process(T object) throws IOException {
 262  4
         objects.add(object);
 263  4
         flushIfNecessary();
 264  4
     }
 265  
 
 266  
     public synchronized void flushIfNecessary() throws IOException {
 267  4
         if (needsFlush()) {
 268  0
             reduce();
 269  
 
 270  0
             if (needsFlush()) {
 271  0
                 flush();
 272  
             }
 273  
         }
 274  4
     }
 275  
 
 276  
     @Override
 277  
     /**
 278  
      * <p>Finishes sorting, then sends sorted output to later stages.</p>
 279  
      * 
 280  
      * <p>This method is intentionally unsynchronized, as synchronizing it 
 281  
      * tends to cause deadlock problems (since this method calls process on
 282  
      * later stages).</p>
 283  
      */
 284  
     public synchronized void close() throws IOException {
 285  
         // remove this object as quickly as possible from the alert queue
 286  
         // since we don't need to flush anymore
 287  2
         removeMemoryWarnings();
 288  
 
 289  2
         if (temporaryFiles.size() > 0) {
 290  0
             combine();
 291  
         } else {
 292  2
             reduce();
 293  2
             combineRuns(processor);
 294  
         }
 295  2
         processor.close();
 296  2
     }
 297  
 
 298  
     /**
 299  
      * Reduces the number of buffered objects.  The recently buffered
 300  
      * objects are processed by a Reducer, if one was specified in the
 301  
      * constructor.  The resulting objects from this reduction are then
 302  
      * copied into a "reduced" set of buffered objects.  The process
 303  
      * resembles a generational garbage collector.
 304  
      * 
 305  
      * Even if no reducer exists, this sorts all the current objects and
 306  
      * sets them aside.  We perform this initial sort while the objects 
 307  
      * are still warm in the cache to get improved throughput overall.
 308  
      * 
 309  
      * Another benefit to reducing is the speed that we can respond to 
 310  
      * low memory events.  If a low memory event happens and the objects 
 311  
      * aren't sorted, we have to sort them first before writing them to disk.
 312  
      * The sorting process can take extra memory and time, which makes us
 313  
      * risk running out of RAM while trying to get data out onto the disk.
 314  
      */
 315  
     private synchronized void reduce() throws IOException {
 316  2
         if (size() == 0) {
 317  0
             return;
 318  
         }
 319  2
         List<T> results = objects;
 320  
 
 321  2
         if (reducer != null) {
 322  0
             results = reducer.reduce(objects);
 323  
         }
 324  2
         Collections.sort(results, lessThanCompare);
 325  2
         runs.add(results);
 326  2
         runsCount += results.size();
 327  
 
 328  2
         objects = new ArrayList<T>();
 329  2
     }
 330  
 
 331  
     /**
 332  
      * Returns the number of currently buffered objects.
 333  
      */
 334  
     private long size() {
 335  2
         return runsCount + objects.size();
 336  
     }
 337  
 
 338  
     public synchronized void flush() throws IOException {
 339  0
         if (size() == 0) {
 340  0
             return;
 341  
         }
 342  0
         reduce();
 343  0
         assert objects.size() == 0;
 344  
 
 345  0
         FileOrderedWriter<T> writer = getTemporaryWriter();
 346  0
         combineRuns(writer);
 347  0
         writer.close();
 348  0
         if (filesWritten != null) filesWritten.increment();
 349  
 
 350  0
         flushRequested = false;
 351  0
     }
 352  
 
 353  0
     private class RunWrapper<T> implements Comparable<RunWrapper<T>> {
 354  
         public Iterator<T> iterator;
 355  
         public T top;
 356  
         Comparator<T> lessThan;
 357  
 
 358  2
         public RunWrapper(List<T> list, Comparator<T> lessThan) {
 359  2
             iterator = list.iterator();
 360  2
             this.lessThan = lessThan;
 361  2
         }
 362  
 
 363  
         public int compareTo(RunWrapper<T> other) {
 364  0
             T one = top;
 365  0
             T two = other.top;
 366  
 
 367  0
             int result = lessThan.compare(one, two);
 368  0
             return result;
 369  
         }
 370  
 
 371  
         public boolean next() {
 372  4
             if (iterator.hasNext()) {
 373  2
                 top = iterator.next();
 374  2
                 return true;
 375  
             } else {
 376  2
                 top = null;
 377  2
                 return false;
 378  
             }
 379  
         }
 380  
     }
 381  
 
 382  
     /**
 383  
      * Takes the sorted runs in the runs array, and combines them into a
 384  
      * single sorted list, which is processed by the processor called output.
 385  
      */
 386  
     private synchronized void combineRuns(Processor<T> output) throws IOException {
 387  2
         PriorityQueue<RunWrapper<T>> queue = new PriorityQueue<RunWrapper<T>>();
 388  
 
 389  
         // make a run wrapper for each run we've got buffered, 
 390  
         // put it in the priority queue
 391  2
         for (List<T> run : runs) {
 392  2
             RunWrapper<T> wrapper = new RunWrapper<T>(run, lessThanCompare);
 393  2
             if (wrapper.next()) {
 394  2
                 queue.offer(wrapper);
 395  
             }
 396  2
         }
 397  
 
 398  
         // we expect that some runs will have lots of contiguous tuples,
 399  
         // in the case where the input is already almost sorted.  This loop
 400  
         // is optimized for that case.
 401  
 
 402  2
         while (queue.size() > 1) {
 403  0
             RunWrapper<T> wrapper = queue.poll();
 404  0
             RunWrapper<T> next = queue.peek();
 405  
 
 406  0
             output.process(wrapper.top);
 407  0
             wrapper.next();
 408  
 
 409  
             while (wrapper.top != null &&
 410  0
                     lessThanCompare.compare(wrapper.top, next.top) <= 0) {
 411  0
                 output.process(wrapper.top);
 412  0
                 wrapper.next();
 413  
             }
 414  
 
 415  0
             if (wrapper.top != null) {
 416  0
                 queue.offer(wrapper);
 417  
             }
 418  0
         }
 419  
 
 420  
         // process all objects from the final run
 421  2
         if (queue.size() == 1) {
 422  2
             RunWrapper<T> wrapper = queue.poll();
 423  
 
 424  
             do {
 425  2
                 output.process(wrapper.top);
 426  2
             } while (wrapper.next());
 427  
         }
 428  
 
 429  2
         runs.clear();
 430  2
         runsCount = 0;
 431  2
     }
 432  
 
 433  
     private synchronized FileOrderedWriter<T> getTemporaryWriter(long fileSize) throws IOException, FileNotFoundException {
 434  0
         File temporary = Utility.createTemporary(fileSize * 4);
 435  0
         FileOrderedWriter<T> writer = new FileOrderedWriter<T>(temporary, order);
 436  0
         temporaryFiles.add(temporary);
 437  0
         return writer;
 438  
     }
 439  
 
 440  
     private synchronized FileOrderedWriter<T> getTemporaryWriter() throws IOException, FileNotFoundException {
 441  0
         File temporary = Utility.createTemporary();
 442  0
         FileOrderedWriter<T> writer = new FileOrderedWriter<T>(temporary, order);
 443  0
         temporaryFiles.add(temporary);
 444  0
         return writer;
 445  
     }
 446  
 
 447  
     private synchronized void combine() throws IOException {
 448  0
         flush();
 449  
 
 450  0
         if (temporaryFiles.size() == 0) {
 451  0
             return;
 452  
         }
 453  0
         while (temporaryFiles.size() > fileLimit) {
 454  
             // sort all the files so that small ones come first, since those
 455  
             // are the ones we want to combine together.
 456  0
             Collections.sort(temporaryFiles, new Comparator<File>() {
 457  0
                          public int compare(File one, File two) {
 458  0
                              long oneLength = one.length();
 459  0
                              long twoLength = two.length();
 460  
 
 461  0
                              if (oneLength > twoLength) {
 462  0
                                  return 1;
 463  0
                              } else if (oneLength < twoLength) {
 464  0
                                  return -1;
 465  
                              }
 466  0
                              return 0;
 467  
                          }
 468  
                      });
 469  
 
 470  
             // pick a set of files to merge and remove them from the regular file set
 471  0
             ArrayList<File> temporaryFileSet = new ArrayList<File>(temporaryFiles.subList(0,
 472  
                                                                                           fileLimit));
 473  0
             temporaryFiles.subList(0, fileLimit).clear();
 474  
 
 475  
             // calculate the total amount of space we'll need for this
 476  0
             long totalFileSize = 0;
 477  
 
 478  0
             for (File f : temporaryFiles) {
 479  0
                 totalFileSize += f.length();
 480  
             }
 481  
 
 482  
             // get a temporary writer that's big enough to handle all this data.
 483  
             // this adds the File to the end of the list of temporary files
 484  0
             long oneGigabyte = 1024 * 1024 * 1024;
 485  0
             FileOrderedWriter<T> writer = getTemporaryWriter(totalFileSize * 2 + oneGigabyte);
 486  
 
 487  
             // do the actual combination work
 488  0
             combineStep(temporaryFileSet, writer);
 489  
 
 490  0
             writer.close();
 491  0
             temporaryFileSet.clear();
 492  0
         }
 493  
 
 494  0
         combineStep(temporaryFiles, processor);
 495  0
         temporaryFiles.clear();
 496  0
     }
 497  
 
 498  
     private synchronized void combineStep(List<File> files, Processor<T> output) throws FileNotFoundException, IOException {
 499  0
         if (sorterCombineSteps != null) sorterCombineSteps.increment();
 500  
 
 501  0
         ArrayList<String> filenames = new ArrayList<String>();
 502  
 
 503  0
         for (File f : files) {
 504  0
             filenames.add(f.getPath());
 505  
         }
 506  0
         OrderedCombiner combiner = OrderedCombiner.combineFromFiles(filenames, order, output, false,
 507  
                                                                     combineBufferSize);
 508  0
         combiner.run();
 509  
 
 510  0
         for (File file : files) {
 511  0
             file.delete();
 512  
         }
 513  0
     }
 514  
 }