1
2 package org.galagosearch.tupleflow;
3
4 import java.io.File;
5 import java.io.FileNotFoundException;
6 import java.io.IOException;
7 import java.lang.management.ManagementFactory;
8 import java.lang.management.MemoryMXBean;
9 import java.lang.management.MemoryNotificationInfo;
10 import java.lang.management.MemoryPoolMXBean;
11 import java.lang.management.MemoryType;
12 import java.lang.management.MemoryUsage;
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.Collections;
16 import java.util.Comparator;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.PriorityQueue;
20 import java.util.logging.Logger;
21 import javax.management.ListenerNotFoundException;
22 import javax.management.Notification;
23 import javax.management.NotificationEmitter;
24 import javax.management.NotificationListener;
25 import org.galagosearch.tupleflow.execution.ErrorHandler;
26 import org.galagosearch.tupleflow.execution.Verification;
27
28 /***
29 * <p>
30 * This class sorts an incoming stream of objects in some specified order.
31 * When this object is closed (by calling the close method), the sorted
32 * objects are then sent in sorted order to the next stage of the
33 * Processor chain.
34 * </p>
35 *
36 * <p>
37 * Since there may be many objects submitted to the Sorter (more than
38 * will fit in main memory), the object may create temporary files
39 * to store partially sorted results. The path used for these temporary
40 * files is specified in the TempPath Java preferences variable.
41 * </p>
42 *
43 * <p>
44 * In many instances, Sorters are used to generate streams of data that
45 * are then used to create aggregate statistics. For instance, suppose
46 * we want to compute the monthly sales of a particular corporation, separated
47 * by region. We can feed a set of transactions to the Sorter, each containing
48 * a dollar amount and the region it came from, e.g.:
49 * <ul>
50 * <li>($5.39, South)</li>
51 * <li>($2.24, North)</li>
52 * <li>($1.50, South)</li>
53 * </ul>
54 * If we sort this list by region name:
55 * <ul>
56 * <li>($2.24, North)</li>
57 * <li>($5.39, South)</li>
58 * <li>($1.50, South)</li>
59 * </ul>
60 * It's now very easy to add up totals for each region (since all data for each
61 * region is adjacent in the list).</p>
62 *
63 * <p>
64 * In these kinds of aggregate applications, it may be more efficient to
65 * provide the Sorter with a Reducer object. A Reducer is an object that
66 * transforms <i>n</i> sorted objects of type T into some (hopefully)
67 * smaller number of objects, also of type T. In the example above, we could
68 * write a reducer that turned those three transactions into:
69 * <ul>
70 * <li>($2.24, North)</li>
71 * <li>($6.89, South)</li>
72 * </ul>
73 * which would be equivalent for this application. Using a Reducer allows
74 * the application to buffer fewer items and hopefully reduce the reliance
75 * on the disk during sorting.</p>
76 *
77 * @author Trevor Strohman
78 */
79 public class Sorter<T> extends StandardStep<T, T> implements NotificationListener {
80 private int limit;
81 private int fileLimit = 20;
82 private volatile boolean flushRequested = false;
83 private ArrayList<T> objects;
84 private ArrayList<List<T>> runs;
85 private long runsCount = 0;
86 private Logger logger = Logger.getLogger(Sorter.class.toString());
87 private ArrayList<File> temporaryFiles;
88 private Order<T> order;
89 private Comparator<T> lessThanCompare;
90 private Reducer<T> reducer;
91 private static int reduceInterval = 100 * 1000;
92 private static int combineBufferSize = 100 * 1000;
93 private static int defaultObjectLimit = 50 * 1000 * 1000;
94 private Counter filesWritten = null;
95 private Counter sorterCombineSteps;
96
97 public Sorter(Order<T> order) {
98 this(defaultObjectLimit, order, null, null);
99 }
100
101 public Sorter(Order<T> order, Reducer<T> reducer) {
102 this(defaultObjectLimit, order, reducer, null);
103 }
104
105 public Sorter(int limit, Order<T> order) {
106 this(limit, order, null, null);
107 }
108
109 public Sorter(int limit, Order<T> order, Reducer<T> reducer) {
110 this(limit, order, reducer, null);
111 }
112
113 public Sorter(int limit, Order<T> order, Reducer<T> reducer, Processor<T> processor) {
114 this.limit = limit;
115 this.order = order;
116 this.processor = processor;
117 this.reducer = reducer;
118 this.objects = new ArrayList<T>();
119 this.runs = new ArrayList<List<T>>();
120 this.temporaryFiles = new ArrayList<File>();
121 this.lessThanCompare = order.lessThan();
122 this.flushRequested = false;
123
124 requestMemoryWarnings();
125 }
126
127 @SuppressWarnings("unchecked")
128 public Sorter(TupleFlowParameters parameters)
129 throws ClassNotFoundException, InstantiationException,
130 IllegalAccessException, IOException {
131 String className = parameters.getXML().get("class");
132 String[] orderSpec = parameters.getXML().get("order").split(" ");
133
134 Class clazz = Class.forName(className);
135 Type<T> typeInstance = (Type<T>) clazz.newInstance();
136 this.order = typeInstance.getOrder(orderSpec);
137 this.limit = (int) parameters.getXML().get("object-limit", defaultObjectLimit);
138 this.reducer = null;
139 this.flushRequested = false;
140
141 if (parameters.getXML().containsKey("reducer")) {
142 Class reducerClass = Class.forName(parameters.getXML().get("reducer"));
143 this.reducer = (Reducer<T>) reducerClass.newInstance();
144 }
145
146 this.processor = null;
147 this.objects = new ArrayList<T>();
148 this.runs = new ArrayList<List<T>>();
149 this.temporaryFiles = new ArrayList<File>();
150 this.lessThanCompare = order.lessThan();
151
152 this.filesWritten = parameters.getCounter("Sorter Files Written");
153 this.sorterCombineSteps = parameters.getCounter("Sorter Combine Steps");
154
155 requestMemoryWarnings();
156 }
157
158 public void requestMemoryWarnings() {
159 List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans();
160 long maxPoolSize = 0;
161 MemoryPoolMXBean biggestPool = null;
162
163 for (MemoryPoolMXBean pool : pools) {
164 if (pool.getType() != MemoryType.HEAP) {
165 continue;
166 }
167 MemoryUsage usage = pool.getUsage();
168
169 if (pool.isUsageThresholdSupported() &&
170 usage.getMax() > maxPoolSize) {
171 maxPoolSize = usage.getMax();
172 biggestPool = pool;
173 }
174 }
175
176 if (biggestPool != null) {
177 biggestPool.setUsageThreshold((long) (maxPoolSize * 0.7));
178 MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
179 NotificationEmitter emitter = (NotificationEmitter) memoryBean;
180 emitter.addNotificationListener(this, null, null);
181 } else {
182 throw new RuntimeException("Memory monitoring is not supported.");
183 }
184 }
185
186 public void removeMemoryWarnings() {
187 try {
188 MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
189 NotificationEmitter emitter = (NotificationEmitter) memoryBean;
190 emitter.removeNotificationListener(this);
191 } catch (ListenerNotFoundException e) {
192
193 }
194 }
195
196 public void handleNotification(Notification notification, Object handback) {
197 if (notification.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
198 flushRequested = true;
199 final Sorter f = this;
200
201 Thread t = new Thread() {
202 @Override
203 public void run() {
204 try {
205 f.flush();
206 } catch (IOException e) {
207 logger.severe(e.toString());
208 }
209 }
210 };
211
212 t.start();
213 }
214 }
215
216 public static void verify(TupleFlowParameters fullParameters, ErrorHandler handler) {
217 Parameters parameters = fullParameters.getXML();
218 String[] requiredParameters = {"order", "class"};
219
220 if (!Verification.requireParameters(requiredParameters, parameters, handler)) {
221 return;
222 }
223 String className = parameters.get("class");
224 String[] orderSpec = parameters.get("order").split(" ");
225
226 Verification.requireClass(className, handler);
227 Verification.requireOrder(className, orderSpec, handler);
228
229 if (parameters.containsKey("reducer")) {
230 String reducerType = parameters.get("reducer");
231 Verification.requireClass(reducerType, handler);
232 }
233 }
234
235 public static String getInputClass(TupleFlowParameters parameters) {
236 return parameters.getXML().get("class", "");
237 }
238
239 public static String getOutputClass(TupleFlowParameters parameters) {
240 return parameters.getXML().get("class", "");
241 }
242
243 public static String[] getOutputOrder(TupleFlowParameters parameters) {
244 String[] orderSpec = parameters.getXML().get("order", "").split(" ");
245 return orderSpec;
246 }
247
248 @Override
249 public String toString() {
250 return order.getOrderedClass().getName() + " " + Arrays.asList(order.getOrderSpec());
251 }
252
253 public boolean needsFlush() {
254 if (flushRequested) {
255 return true;
256 }
257 return objects.size() > reduceInterval ||
258 objects.size() + runsCount > limit;
259 }
260
261 public synchronized void process(T object) throws IOException {
262 objects.add(object);
263 flushIfNecessary();
264 }
265
266 public synchronized void flushIfNecessary() throws IOException {
267 if (needsFlush()) {
268 reduce();
269
270 if (needsFlush()) {
271 flush();
272 }
273 }
274 }
275
276 @Override
277 /***
278 * <p>Finishes sorting, then sends sorted output to later stages.</p>
279 *
280 * <p>This method is intentionally unsynchronized, as synchronizing it
281 * tends to cause deadlock problems (since this method calls process on
282 * later stages).</p>
283 */
284 public synchronized void close() throws IOException {
285
286
287 removeMemoryWarnings();
288
289 if (temporaryFiles.size() > 0) {
290 combine();
291 } else {
292 reduce();
293 combineRuns(processor);
294 }
295 processor.close();
296 }
297
298 /***
299 * Reduces the number of buffered objects. The recently buffered
300 * objects are processed by a Reducer, if one was specified in the
301 * constructor. The resulting objects from this reduction are then
302 * copied into a "reduced" set of buffered objects. The process
303 * resembles a generational garbage collector.
304 *
305 * Even if no reducer exists, this sorts all the current objects and
306 * sets them aside. We perform this initial sort while the objects
307 * are still warm in the cache to get improved throughput overall.
308 *
309 * Another benefit to reducing is the speed that we can respond to
310 * low memory events. If a low memory event happens and the objects
311 * aren't sorted, we have to sort them first before writing them to disk.
312 * The sorting process can take extra memory and time, which makes us
313 * risk running out of RAM while trying to get data out onto the disk.
314 */
315 private synchronized void reduce() throws IOException {
316 if (size() == 0) {
317 return;
318 }
319 List<T> results = objects;
320
321 if (reducer != null) {
322 results = reducer.reduce(objects);
323 }
324 Collections.sort(results, lessThanCompare);
325 runs.add(results);
326 runsCount += results.size();
327
328 objects = new ArrayList<T>();
329 }
330
331 /***
332 * Returns the number of currently buffered objects.
333 */
334 private long size() {
335 return runsCount + objects.size();
336 }
337
338 public synchronized void flush() throws IOException {
339 if (size() == 0) {
340 return;
341 }
342 reduce();
343 assert objects.size() == 0;
344
345 FileOrderedWriter<T> writer = getTemporaryWriter();
346 combineRuns(writer);
347 writer.close();
348 if (filesWritten != null) filesWritten.increment();
349
350 flushRequested = false;
351 }
352
353 private class RunWrapper<T> implements Comparable<RunWrapper<T>> {
354 public Iterator<T> iterator;
355 public T top;
356 Comparator<T> lessThan;
357
358 public RunWrapper(List<T> list, Comparator<T> lessThan) {
359 iterator = list.iterator();
360 this.lessThan = lessThan;
361 }
362
363 public int compareTo(RunWrapper<T> other) {
364 T one = top;
365 T two = other.top;
366
367 int result = lessThan.compare(one, two);
368 return result;
369 }
370
371 public boolean next() {
372 if (iterator.hasNext()) {
373 top = iterator.next();
374 return true;
375 } else {
376 top = null;
377 return false;
378 }
379 }
380 }
381
382 /***
383 * Takes the sorted runs in the runs array, and combines them into a
384 * single sorted list, which is processed by the processor called output.
385 */
386 private synchronized void combineRuns(Processor<T> output) throws IOException {
387 PriorityQueue<RunWrapper<T>> queue = new PriorityQueue<RunWrapper<T>>();
388
389
390
391 for (List<T> run : runs) {
392 RunWrapper<T> wrapper = new RunWrapper<T>(run, lessThanCompare);
393 if (wrapper.next()) {
394 queue.offer(wrapper);
395 }
396 }
397
398
399
400
401
402 while (queue.size() > 1) {
403 RunWrapper<T> wrapper = queue.poll();
404 RunWrapper<T> next = queue.peek();
405
406 output.process(wrapper.top);
407 wrapper.next();
408
409 while (wrapper.top != null &&
410 lessThanCompare.compare(wrapper.top, next.top) <= 0) {
411 output.process(wrapper.top);
412 wrapper.next();
413 }
414
415 if (wrapper.top != null) {
416 queue.offer(wrapper);
417 }
418 }
419
420
421 if (queue.size() == 1) {
422 RunWrapper<T> wrapper = queue.poll();
423
424 do {
425 output.process(wrapper.top);
426 } while (wrapper.next());
427 }
428
429 runs.clear();
430 runsCount = 0;
431 }
432
433 private synchronized FileOrderedWriter<T> getTemporaryWriter(long fileSize) throws IOException, FileNotFoundException {
434 File temporary = Utility.createTemporary(fileSize * 4);
435 FileOrderedWriter<T> writer = new FileOrderedWriter<T>(temporary, order);
436 temporaryFiles.add(temporary);
437 return writer;
438 }
439
440 private synchronized FileOrderedWriter<T> getTemporaryWriter() throws IOException, FileNotFoundException {
441 File temporary = Utility.createTemporary();
442 FileOrderedWriter<T> writer = new FileOrderedWriter<T>(temporary, order);
443 temporaryFiles.add(temporary);
444 return writer;
445 }
446
447 private synchronized void combine() throws IOException {
448 flush();
449
450 if (temporaryFiles.size() == 0) {
451 return;
452 }
453 while (temporaryFiles.size() > fileLimit) {
454
455
456 Collections.sort(temporaryFiles, new Comparator<File>() {
457 public int compare(File one, File two) {
458 long oneLength = one.length();
459 long twoLength = two.length();
460
461 if (oneLength > twoLength) {
462 return 1;
463 } else if (oneLength < twoLength) {
464 return -1;
465 }
466 return 0;
467 }
468 });
469
470
471 ArrayList<File> temporaryFileSet = new ArrayList<File>(temporaryFiles.subList(0,
472 fileLimit));
473 temporaryFiles.subList(0, fileLimit).clear();
474
475
476 long totalFileSize = 0;
477
478 for (File f : temporaryFiles) {
479 totalFileSize += f.length();
480 }
481
482
483
484 long oneGigabyte = 1024 * 1024 * 1024;
485 FileOrderedWriter<T> writer = getTemporaryWriter(totalFileSize * 2 + oneGigabyte);
486
487
488 combineStep(temporaryFileSet, writer);
489
490 writer.close();
491 temporaryFileSet.clear();
492 }
493
494 combineStep(temporaryFiles, processor);
495 temporaryFiles.clear();
496 }
497
498 private synchronized void combineStep(List<File> files, Processor<T> output) throws FileNotFoundException, IOException {
499 if (sorterCombineSteps != null) sorterCombineSteps.increment();
500
501 ArrayList<String> filenames = new ArrayList<String>();
502
503 for (File f : files) {
504 filenames.add(f.getPath());
505 }
506 OrderedCombiner combiner = OrderedCombiner.combineFromFiles(filenames, order, output, false,
507 combineBufferSize);
508 combiner.run();
509
510 for (File file : files) {
511 file.delete();
512 }
513 }
514 }