View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   package org.galagosearch.tupleflow;
3   
4   import java.io.File;
5   import java.io.FileNotFoundException;
6   import java.io.IOException;
7   import java.lang.management.ManagementFactory;
8   import java.lang.management.MemoryMXBean;
9   import java.lang.management.MemoryNotificationInfo;
10  import java.lang.management.MemoryPoolMXBean;
11  import java.lang.management.MemoryType;
12  import java.lang.management.MemoryUsage;
13  import java.util.ArrayList;
14  import java.util.Arrays;
15  import java.util.Collections;
16  import java.util.Comparator;
17  import java.util.Iterator;
18  import java.util.List;
19  import java.util.PriorityQueue;
20  import java.util.logging.Logger;
21  import javax.management.ListenerNotFoundException;
22  import javax.management.Notification;
23  import javax.management.NotificationEmitter;
24  import javax.management.NotificationListener;
25  import org.galagosearch.tupleflow.execution.ErrorHandler;
26  import org.galagosearch.tupleflow.execution.Verification;
27  
28  /***
29   * <p>
30   * This class sorts an incoming stream of objects in some specified order.
31   * When this object is closed (by calling the close method), the sorted
32   * objects are then sent in sorted order to the next stage of the 
33   * Processor chain.
34   * </p>
35   *
36   * <p>
37   * Since there may be many objects submitted to the Sorter (more than
38   * will fit in main memory), the object may create temporary files
39   * to store partially sorted results.  The path used for these temporary
40   * files is specified in the TempPath Java preferences variable.
41   * </p>
42   *
43   * <p>
44   * In many instances, Sorters are used to generate streams of data that
45   * are then used to create aggregate statistics.  For instance, suppose
46   * we want to compute the monthly sales of a particular corporation, separated
47   * by region.  We can feed a set of transactions to the Sorter, each containing
48   * a dollar amount and the region it came from, e.g.:
49   * <ul>
50   *      <li>($5.39, South)</li>
51   *      <li>($2.24, North)</li>
52   *      <li>($1.50, South)</li>
53   * </ul>
54   * If we sort this list by region name:
55   * <ul>
56   *      <li>($2.24, North)</li>
57   *      <li>($5.39, South)</li>
58   *      <li>($1.50, South)</li>
59   * </ul>
60   * It's now very easy to add up totals for each region (since all data for each
61   * region is adjacent in the list).</p>
62   *
63   * <p>
64   * In these kinds of aggregate applications, it may be more efficient to 
65   * provide the Sorter with a Reducer object.  A Reducer is an object that
66   * transforms <i>n</i> sorted objects of type T into some (hopefully)
67   * smaller number of objects, also of type T.  In the example above, we could
68   * write a reducer that turned those three transactions into:
69   * <ul>
70   *      <li>($2.24, North)</li>
71   *      <li>($6.89, South)</li>
72   * </ul>
73   * which would be equivalent for this application.  Using a Reducer allows
74   * the application to buffer fewer items and hopefully reduce the reliance
75   * on the disk during sorting.</p>
76   *
77   * @author Trevor Strohman
78   */
79  public class Sorter<T> extends StandardStep<T, T> implements NotificationListener {
80      private int limit;
81      private int fileLimit = 20;
82      private volatile boolean flushRequested = false;
83      private ArrayList<T> objects;
84      private ArrayList<List<T>> runs;
85      private long runsCount = 0;
86      private Logger logger = Logger.getLogger(Sorter.class.toString());
87      private ArrayList<File> temporaryFiles;
88      private Order<T> order;
89      private Comparator<T> lessThanCompare;
90      private Reducer<T> reducer;
91      private static int reduceInterval = 100 * 1000;
92      private static int combineBufferSize = 100 * 1000;
93      private static int defaultObjectLimit = 50 * 1000 * 1000;
94      private Counter filesWritten = null;
95      private Counter sorterCombineSteps;
96  
97      public Sorter(Order<T> order) {
98          this(defaultObjectLimit, order, null, null);
99      }
100 
101     public Sorter(Order<T> order, Reducer<T> reducer) {
102         this(defaultObjectLimit, order, reducer, null);
103     }
104 
105     public Sorter(int limit, Order<T> order) {
106         this(limit, order, null, null);
107     }
108 
109     public Sorter(int limit, Order<T> order, Reducer<T> reducer) {
110         this(limit, order, reducer, null);
111     }
112 
113     public Sorter(int limit, Order<T> order, Reducer<T> reducer, Processor<T> processor) {
114         this.limit = limit;
115         this.order = order;
116         this.processor = processor;
117         this.reducer = reducer;
118         this.objects = new ArrayList<T>();
119         this.runs = new ArrayList<List<T>>();
120         this.temporaryFiles = new ArrayList<File>();
121         this.lessThanCompare = order.lessThan();
122         this.flushRequested = false;
123 
124         requestMemoryWarnings();
125     }
126 
127     @SuppressWarnings("unchecked")
128     public Sorter(TupleFlowParameters parameters)
129             throws ClassNotFoundException, InstantiationException,
130                    IllegalAccessException, IOException {
131         String className = parameters.getXML().get("class");
132         String[] orderSpec = parameters.getXML().get("order").split(" ");
133 
134         Class clazz = Class.forName(className);
135         Type<T> typeInstance = (Type<T>) clazz.newInstance();
136         this.order = typeInstance.getOrder(orderSpec);
137         this.limit = (int) parameters.getXML().get("object-limit", defaultObjectLimit);
138         this.reducer = null;
139         this.flushRequested = false;
140 
141         if (parameters.getXML().containsKey("reducer")) {
142             Class reducerClass = Class.forName(parameters.getXML().get("reducer"));
143             this.reducer = (Reducer<T>) reducerClass.newInstance();
144         }
145 
146         this.processor = null;
147         this.objects = new ArrayList<T>();
148         this.runs = new ArrayList<List<T>>();
149         this.temporaryFiles = new ArrayList<File>();
150         this.lessThanCompare = order.lessThan();
151 
152         this.filesWritten = parameters.getCounter("Sorter Files Written");
153         this.sorterCombineSteps = parameters.getCounter("Sorter Combine Steps");
154 
155         requestMemoryWarnings();
156     }
157 
158     public void requestMemoryWarnings() {
159         List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans();
160         long maxPoolSize = 0;
161         MemoryPoolMXBean biggestPool = null;
162 
163         for (MemoryPoolMXBean pool : pools) {
164             if (pool.getType() != MemoryType.HEAP) {
165                 continue;
166             }
167             MemoryUsage usage = pool.getUsage();
168 
169             if (pool.isUsageThresholdSupported() &&
170                     usage.getMax() > maxPoolSize) {
171                 maxPoolSize = usage.getMax();
172                 biggestPool = pool;
173             }
174         }
175 
176         if (biggestPool != null) {
177             biggestPool.setUsageThreshold((long) (maxPoolSize * 0.7));
178             MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
179             NotificationEmitter emitter = (NotificationEmitter) memoryBean;
180             emitter.addNotificationListener(this, null, null);
181         } else {
182             throw new RuntimeException("Memory monitoring is not supported.");
183         }
184     }
185 
186     public void removeMemoryWarnings() {
187         try {
188             MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
189             NotificationEmitter emitter = (NotificationEmitter) memoryBean;
190             emitter.removeNotificationListener(this);
191         } catch (ListenerNotFoundException e) {
192             // do nothing
193         }
194     }
195 
196     public void handleNotification(Notification notification, Object handback) {
197         if (notification.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
198             flushRequested = true;
199             final Sorter f = this;
200 
201             Thread t = new Thread() {
202                 @Override
203                 public void run() {
204                     try {
205                         f.flush();
206                     } catch (IOException e) {
207                         logger.severe(e.toString());
208                     }
209                 }
210             };
211 
212             t.start();
213         }
214     }
215 
216     public static void verify(TupleFlowParameters fullParameters, ErrorHandler handler) {
217         Parameters parameters = fullParameters.getXML();
218         String[] requiredParameters = {"order", "class"};
219 
220         if (!Verification.requireParameters(requiredParameters, parameters, handler)) {
221             return;
222         }
223         String className = parameters.get("class");
224         String[] orderSpec = parameters.get("order").split(" ");
225 
226         Verification.requireClass(className, handler);
227         Verification.requireOrder(className, orderSpec, handler);
228 
229         if (parameters.containsKey("reducer")) {
230             String reducerType = parameters.get("reducer");
231             Verification.requireClass(reducerType, handler);
232         }
233     }
234 
235     public static String getInputClass(TupleFlowParameters parameters) {
236         return parameters.getXML().get("class", "");
237     }
238 
239     public static String getOutputClass(TupleFlowParameters parameters) {
240         return parameters.getXML().get("class", "");
241     }
242 
243     public static String[] getOutputOrder(TupleFlowParameters parameters) {
244         String[] orderSpec = parameters.getXML().get("order", "").split(" ");
245         return orderSpec;
246     }
247 
248     @Override
249     public String toString() {
250         return order.getOrderedClass().getName() + " " + Arrays.asList(order.getOrderSpec());
251     }
252 
253     public boolean needsFlush() {
254         if (flushRequested) {
255             return true;
256         }
257         return objects.size() > reduceInterval ||
258                 objects.size() + runsCount > limit;
259     }
260 
261     public synchronized void process(T object) throws IOException {
262         objects.add(object);
263         flushIfNecessary();
264     }
265 
266     public synchronized void flushIfNecessary() throws IOException {
267         if (needsFlush()) {
268             reduce();
269 
270             if (needsFlush()) {
271                 flush();
272             }
273         }
274     }
275 
276     @Override
277     /***
278      * <p>Finishes sorting, then sends sorted output to later stages.</p>
279      * 
280      * <p>This method is intentionally unsynchronized, as synchronizing it 
281      * tends to cause deadlock problems (since this method calls process on
282      * later stages).</p>
283      */
284     public synchronized void close() throws IOException {
285         // remove this object as quickly as possible from the alert queue
286         // since we don't need to flush anymore
287         removeMemoryWarnings();
288 
289         if (temporaryFiles.size() > 0) {
290             combine();
291         } else {
292             reduce();
293             combineRuns(processor);
294         }
295         processor.close();
296     }
297 
298     /***
299      * Reduces the number of buffered objects.  The recently buffered
300      * objects are processed by a Reducer, if one was specified in the
301      * constructor.  The resulting objects from this reduction are then
302      * copied into a "reduced" set of buffered objects.  The process
303      * resembles a generational garbage collector.
304      * 
305      * Even if no reducer exists, this sorts all the current objects and
306      * sets them aside.  We perform this initial sort while the objects 
307      * are still warm in the cache to get improved throughput overall.
308      * 
309      * Another benefit to reducing is the speed that we can respond to 
310      * low memory events.  If a low memory event happens and the objects 
311      * aren't sorted, we have to sort them first before writing them to disk.
312      * The sorting process can take extra memory and time, which makes us
313      * risk running out of RAM while trying to get data out onto the disk.
314      */
315     private synchronized void reduce() throws IOException {
316         if (size() == 0) {
317             return;
318         }
319         List<T> results = objects;
320 
321         if (reducer != null) {
322             results = reducer.reduce(objects);
323         }
324         Collections.sort(results, lessThanCompare);
325         runs.add(results);
326         runsCount += results.size();
327 
328         objects = new ArrayList<T>();
329     }
330 
331     /***
332      * Returns the number of currently buffered objects.
333      */
334     private long size() {
335         return runsCount + objects.size();
336     }
337 
338     public synchronized void flush() throws IOException {
339         if (size() == 0) {
340             return;
341         }
342         reduce();
343         assert objects.size() == 0;
344 
345         FileOrderedWriter<T> writer = getTemporaryWriter();
346         combineRuns(writer);
347         writer.close();
348         if (filesWritten != null) filesWritten.increment();
349 
350         flushRequested = false;
351     }
352 
353     private class RunWrapper<T> implements Comparable<RunWrapper<T>> {
354         public Iterator<T> iterator;
355         public T top;
356         Comparator<T> lessThan;
357 
358         public RunWrapper(List<T> list, Comparator<T> lessThan) {
359             iterator = list.iterator();
360             this.lessThan = lessThan;
361         }
362 
363         public int compareTo(RunWrapper<T> other) {
364             T one = top;
365             T two = other.top;
366 
367             int result = lessThan.compare(one, two);
368             return result;
369         }
370 
371         public boolean next() {
372             if (iterator.hasNext()) {
373                 top = iterator.next();
374                 return true;
375             } else {
376                 top = null;
377                 return false;
378             }
379         }
380     }
381 
382     /***
383      * Takes the sorted runs in the runs array, and combines them into a
384      * single sorted list, which is processed by the processor called output.
385      */
386     private synchronized void combineRuns(Processor<T> output) throws IOException {
387         PriorityQueue<RunWrapper<T>> queue = new PriorityQueue<RunWrapper<T>>();
388 
389         // make a run wrapper for each run we've got buffered, 
390         // put it in the priority queue
391         for (List<T> run : runs) {
392             RunWrapper<T> wrapper = new RunWrapper<T>(run, lessThanCompare);
393             if (wrapper.next()) {
394                 queue.offer(wrapper);
395             }
396         }
397 
398         // we expect that some runs will have lots of contiguous tuples,
399         // in the case where the input is already almost sorted.  This loop
400         // is optimized for that case.
401 
402         while (queue.size() > 1) {
403             RunWrapper<T> wrapper = queue.poll();
404             RunWrapper<T> next = queue.peek();
405 
406             output.process(wrapper.top);
407             wrapper.next();
408 
409             while (wrapper.top != null &&
410                     lessThanCompare.compare(wrapper.top, next.top) <= 0) {
411                 output.process(wrapper.top);
412                 wrapper.next();
413             }
414 
415             if (wrapper.top != null) {
416                 queue.offer(wrapper);
417             }
418         }
419 
420         // process all objects from the final run
421         if (queue.size() == 1) {
422             RunWrapper<T> wrapper = queue.poll();
423 
424             do {
425                 output.process(wrapper.top);
426             } while (wrapper.next());
427         }
428 
429         runs.clear();
430         runsCount = 0;
431     }
432 
433     private synchronized FileOrderedWriter<T> getTemporaryWriter(long fileSize) throws IOException, FileNotFoundException {
434         File temporary = Utility.createTemporary(fileSize * 4);
435         FileOrderedWriter<T> writer = new FileOrderedWriter<T>(temporary, order);
436         temporaryFiles.add(temporary);
437         return writer;
438     }
439 
440     private synchronized FileOrderedWriter<T> getTemporaryWriter() throws IOException, FileNotFoundException {
441         File temporary = Utility.createTemporary();
442         FileOrderedWriter<T> writer = new FileOrderedWriter<T>(temporary, order);
443         temporaryFiles.add(temporary);
444         return writer;
445     }
446 
447     private synchronized void combine() throws IOException {
448         flush();
449 
450         if (temporaryFiles.size() == 0) {
451             return;
452         }
453         while (temporaryFiles.size() > fileLimit) {
454             // sort all the files so that small ones come first, since those
455             // are the ones we want to combine together.
456             Collections.sort(temporaryFiles, new Comparator<File>() {
457                          public int compare(File one, File two) {
458                              long oneLength = one.length();
459                              long twoLength = two.length();
460 
461                              if (oneLength > twoLength) {
462                                  return 1;
463                              } else if (oneLength < twoLength) {
464                                  return -1;
465                              }
466                              return 0;
467                          }
468                      });
469 
470             // pick a set of files to merge and remove them from the regular file set
471             ArrayList<File> temporaryFileSet = new ArrayList<File>(temporaryFiles.subList(0,
472                                                                                           fileLimit));
473             temporaryFiles.subList(0, fileLimit).clear();
474 
475             // calculate the total amount of space we'll need for this
476             long totalFileSize = 0;
477 
478             for (File f : temporaryFiles) {
479                 totalFileSize += f.length();
480             }
481 
482             // get a temporary writer that's big enough to handle all this data.
483             // this adds the File to the end of the list of temporary files
484             long oneGigabyte = 1024 * 1024 * 1024;
485             FileOrderedWriter<T> writer = getTemporaryWriter(totalFileSize * 2 + oneGigabyte);
486 
487             // do the actual combination work
488             combineStep(temporaryFileSet, writer);
489 
490             writer.close();
491             temporaryFileSet.clear();
492         }
493 
494         combineStep(temporaryFiles, processor);
495         temporaryFiles.clear();
496     }
497 
498     private synchronized void combineStep(List<File> files, Processor<T> output) throws FileNotFoundException, IOException {
499         if (sorterCombineSteps != null) sorterCombineSteps.increment();
500 
501         ArrayList<String> filenames = new ArrayList<String>();
502 
503         for (File f : files) {
504             filenames.add(f.getPath());
505         }
506         OrderedCombiner combiner = OrderedCombiner.combineFromFiles(filenames, order, output, false,
507                                                                     combineBufferSize);
508         combiner.run();
509 
510         for (File file : files) {
511             file.delete();
512         }
513     }
514 }