1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class KeyValuePair implements Type<KeyValuePair> {
25 public byte[] key;
26 public byte[] value;
27
28 public KeyValuePair() {}
29 public KeyValuePair(byte[] key, byte[] value) {
30 this.key = key;
31 this.value = value;
32 }
33
34 public String toString() {
35 try {
36 return String.format("%s,%s",
37 new String(key, "UTF-8"), new String(value, "UTF-8"));
38 } catch(UnsupportedEncodingException e) {
39 throw new RuntimeException("Couldn't convert string to UTF-8.");
40 }
41 }
42
43 public Order<KeyValuePair> getOrder(String... spec) {
44 if (Arrays.equals(spec, new String[] { "+key" })) {
45 return new KeyOrder();
46 }
47 return null;
48 }
49
50 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<KeyValuePair> {
51 public void process(KeyValuePair object) throws IOException;
52 public void close() throws IOException;
53 }
54 public interface Source extends Step {
55 }
56 public static class KeyOrder implements Order<KeyValuePair> {
57 public int hash(KeyValuePair object) {
58 int h = 0;
59 h += Utility.hash(object.key);
60 return h;
61 }
62 public Comparator<KeyValuePair> greaterThan() {
63 return new Comparator<KeyValuePair>() {
64 public int compare(KeyValuePair one, KeyValuePair two) {
65 int result = 0;
66 do {
67 result = + Utility.compare(one.key, two.key);
68 if(result != 0) break;
69 } while (false);
70 return -result;
71 }
72 };
73 }
74 public Comparator<KeyValuePair> lessThan() {
75 return new Comparator<KeyValuePair>() {
76 public int compare(KeyValuePair one, KeyValuePair two) {
77 int result = 0;
78 do {
79 result = + Utility.compare(one.key, two.key);
80 if(result != 0) break;
81 } while (false);
82 return result;
83 }
84 };
85 }
86 public TypeReader<KeyValuePair> orderedReader(ArrayInput _input) {
87 return new ShreddedReader(_input);
88 }
89
90 public TypeReader<KeyValuePair> orderedReader(ArrayInput _input, int bufferSize) {
91 return new ShreddedReader(_input, bufferSize);
92 }
93 public OrderedWriter<KeyValuePair> orderedWriter(ArrayOutput _output) {
94 ShreddedWriter w = new ShreddedWriter(_output);
95 return new OrderedWriterClass(w);
96 }
97 public static class OrderedWriterClass extends OrderedWriter< KeyValuePair > {
98 KeyValuePair last = null;
99 ShreddedWriter shreddedWriter = null;
100
101 public OrderedWriterClass(ShreddedWriter s) {
102 this.shreddedWriter = s;
103 }
104
105 public void process(KeyValuePair object) throws IOException {
106 boolean processAll = false;
107 if (processAll || last == null || 0 != Utility.compare(object.key, last.key)) { processAll = true; shreddedWriter.processKey(object.key); }
108 shreddedWriter.processTuple(object.value);
109 last = object;
110 }
111
112 public void close() throws IOException {
113 shreddedWriter.close();
114 }
115
116 public Class<KeyValuePair> getInputClass() {
117 return KeyValuePair.class;
118 }
119 }
120 public ReaderSource<KeyValuePair> orderedCombiner(Collection<TypeReader<KeyValuePair>> readers, boolean closeOnExit) {
121 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
122
123 for (TypeReader<KeyValuePair> reader : readers) {
124 shreddedReaders.add((ShreddedReader)reader);
125 }
126
127 return new ShreddedCombiner(shreddedReaders, closeOnExit);
128 }
129 public KeyValuePair clone(KeyValuePair object) {
130 KeyValuePair result = new KeyValuePair();
131 if (object == null) return result;
132 result.key = object.key;
133 result.value = object.value;
134 return result;
135 }
136 public Class<KeyValuePair> getOrderedClass() {
137 return KeyValuePair.class;
138 }
139 public String[] getOrderSpec() {
140 return new String[] {"+key"};
141 }
142
143 public static String getSpecString() {
144 return "+key";
145 }
146
147 public interface ShreddedProcessor extends Step {
148 public void processKey(byte[] key) throws IOException;
149 public void processTuple(byte[] value) throws IOException;
150 public void close() throws IOException;
151 }
152 public interface ShreddedSource extends Step {
153 }
154
155 public static class ShreddedWriter implements ShreddedProcessor {
156 ArrayOutput output;
157 ShreddedBuffer buffer = new ShreddedBuffer();
158 byte[] lastKey;
159 boolean lastFlush = false;
160
161 public ShreddedWriter(ArrayOutput output) {
162 this.output = output;
163 }
164
165 public void close() throws IOException {
166 flush();
167 }
168
169 public void processKey(byte[] key) {
170 lastKey = key;
171 buffer.processKey(key);
172 }
173 public final void processTuple(byte[] value) throws IOException {
174 if (lastFlush) {
175 if(buffer.keys.size() == 0) buffer.processKey(lastKey);
176 lastFlush = false;
177 }
178 buffer.processTuple(value);
179 if (buffer.isFull())
180 flush();
181 }
182 public final void flushTuples(int pauseIndex) throws IOException {
183
184 while (buffer.getReadIndex() < pauseIndex) {
185
186 output.writeBytes(buffer.getValue());
187 buffer.incrementTuple();
188 }
189 }
190 public final void flushKey(int pauseIndex) throws IOException {
191 while (buffer.getReadIndex() < pauseIndex) {
192 int nextPause = buffer.getKeyEndIndex();
193 int count = nextPause - buffer.getReadIndex();
194
195 output.writeBytes(buffer.getKey());
196 output.writeInt(count);
197 buffer.incrementKey();
198
199 flushTuples(nextPause);
200 assert nextPause == buffer.getReadIndex();
201 }
202 }
203 public void flush() throws IOException {
204 flushKey(buffer.getWriteIndex());
205 buffer.reset();
206 lastFlush = true;
207 }
208 }
209 public static class ShreddedBuffer {
210 ArrayList<byte[]> keys = new ArrayList();
211 ArrayList<Integer> keyTupleIdx = new ArrayList();
212 int keyReadIdx = 0;
213
214 byte[][] values;
215 int writeTupleIndex = 0;
216 int readTupleIndex = 0;
217 int batchSize;
218
219 public ShreddedBuffer(int batchSize) {
220 this.batchSize = batchSize;
221
222 values = new byte[batchSize][];
223 }
224
225 public ShreddedBuffer() {
226 this(10000);
227 }
228
229 public void processKey(byte[] key) {
230 keys.add(key);
231 keyTupleIdx.add(writeTupleIndex);
232 }
233 public void processTuple(byte[] value) {
234 assert keys.size() > 0;
235 values[writeTupleIndex] = value;
236 writeTupleIndex++;
237 }
238 public void resetData() {
239 keys.clear();
240 keyTupleIdx.clear();
241 writeTupleIndex = 0;
242 }
243
244 public void resetRead() {
245 readTupleIndex = 0;
246 keyReadIdx = 0;
247 }
248
249 public void reset() {
250 resetData();
251 resetRead();
252 }
253 public boolean isFull() {
254 return writeTupleIndex >= batchSize;
255 }
256
257 public boolean isEmpty() {
258 return writeTupleIndex == 0;
259 }
260
261 public boolean isAtEnd() {
262 return readTupleIndex >= writeTupleIndex;
263 }
264 public void incrementKey() {
265 keyReadIdx++;
266 }
267
268 public void autoIncrementKey() {
269 while (readTupleIndex >= getKeyEndIndex() && readTupleIndex < writeTupleIndex)
270 keyReadIdx++;
271 }
272 public void incrementTuple() {
273 readTupleIndex++;
274 }
275 public int getKeyEndIndex() {
276 if ((keyReadIdx+1) >= keyTupleIdx.size())
277 return writeTupleIndex;
278 return keyTupleIdx.get(keyReadIdx+1);
279 }
280 public int getReadIndex() {
281 return readTupleIndex;
282 }
283
284 public int getWriteIndex() {
285 return writeTupleIndex;
286 }
287 public byte[] getKey() {
288 assert readTupleIndex < writeTupleIndex;
289 assert keyReadIdx < keys.size();
290
291 return keys.get(keyReadIdx);
292 }
293 public byte[] getValue() {
294 assert readTupleIndex < writeTupleIndex;
295 return values[readTupleIndex];
296 }
297 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
298 while (getReadIndex() < endIndex) {
299 output.processTuple(getValue());
300 incrementTuple();
301 }
302 }
303 public void copyUntilIndexKey(int endIndex, ShreddedProcessor output) throws IOException {
304 while (getReadIndex() < endIndex) {
305 output.processKey(getKey());
306 assert getKeyEndIndex() <= endIndex;
307 copyTuples(getKeyEndIndex(), output);
308 incrementKey();
309 }
310 }
311 public void copyUntilKey(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
312 while (!isAtEnd()) {
313 if (other != null) {
314 assert !other.isAtEnd();
315 int c = + Utility.compare(getKey(), other.getKey());
316
317 if (c > 0) {
318 break;
319 }
320
321 output.processKey(getKey());
322
323 copyTuples(getKeyEndIndex(), output);
324 } else {
325 output.processKey(getKey());
326 copyTuples(getKeyEndIndex(), output);
327 }
328 incrementKey();
329
330
331 }
332 }
333 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
334 copyUntilKey(other, output);
335 }
336
337 }
338 public static class ShreddedCombiner implements ReaderSource<KeyValuePair>, ShreddedSource {
339 public ShreddedProcessor processor;
340 Collection<ShreddedReader> readers;
341 boolean closeOnExit = false;
342 boolean uninitialized = true;
343 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
344
345 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
346 this.readers = readers;
347 this.closeOnExit = closeOnExit;
348 }
349
350 public void setProcessor(Step processor) throws IncompatibleProcessorException {
351 if (processor instanceof ShreddedProcessor) {
352 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
353 } else if (processor instanceof KeyValuePair.Processor) {
354 this.processor = new DuplicateEliminator(new TupleUnshredder((KeyValuePair.Processor) processor));
355 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
356 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<KeyValuePair>) processor));
357 } else {
358 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
359 }
360 }
361
362 public Class<KeyValuePair> getOutputClass() {
363 return KeyValuePair.class;
364 }
365
366 public void initialize() throws IOException {
367 for (ShreddedReader reader : readers) {
368 reader.fill();
369
370 if (!reader.getBuffer().isAtEnd())
371 queue.add(reader);
372 }
373
374 uninitialized = false;
375 }
376
377 public void run() throws IOException {
378 initialize();
379
380 while (queue.size() > 0) {
381 ShreddedReader top = queue.poll();
382 ShreddedReader next = null;
383 ShreddedBuffer nextBuffer = null;
384
385 assert !top.getBuffer().isAtEnd();
386
387 if (queue.size() > 0) {
388 next = queue.peek();
389 nextBuffer = next.getBuffer();
390 assert !nextBuffer.isAtEnd();
391 }
392
393 top.getBuffer().copyUntil(nextBuffer, processor);
394 if (top.getBuffer().isAtEnd())
395 top.fill();
396
397 if (!top.getBuffer().isAtEnd())
398 queue.add(top);
399 }
400
401 if (closeOnExit)
402 processor.close();
403 }
404
405 public KeyValuePair read() throws IOException {
406 if (uninitialized)
407 initialize();
408
409 KeyValuePair result = null;
410
411 while (queue.size() > 0) {
412 ShreddedReader top = queue.poll();
413 result = top.read();
414
415 if (result != null) {
416 if (top.getBuffer().isAtEnd())
417 top.fill();
418
419 queue.offer(top);
420 break;
421 }
422 }
423
424 return result;
425 }
426 }
427 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<KeyValuePair>, ShreddedSource {
428 public ShreddedProcessor processor;
429 ShreddedBuffer buffer;
430 KeyValuePair last = new KeyValuePair();
431 long updateKeyCount = -1;
432 long tupleCount = 0;
433 long bufferStartCount = 0;
434 ArrayInput input;
435
436 public ShreddedReader(ArrayInput input) {
437 this.input = input;
438 this.buffer = new ShreddedBuffer();
439 }
440
441 public ShreddedReader(ArrayInput input, int bufferSize) {
442 this.input = input;
443 this.buffer = new ShreddedBuffer(bufferSize);
444 }
445
446 public final int compareTo(ShreddedReader other) {
447 ShreddedBuffer otherBuffer = other.getBuffer();
448
449 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
450 return 0;
451 } else if (buffer.isAtEnd()) {
452 return -1;
453 } else if (otherBuffer.isAtEnd()) {
454 return 1;
455 }
456
457 int result = 0;
458 do {
459 result = + Utility.compare(buffer.getKey(), otherBuffer.getKey());
460 if(result != 0) break;
461 } while (false);
462
463 return result;
464 }
465
466 public final ShreddedBuffer getBuffer() {
467 return buffer;
468 }
469
470 public final KeyValuePair read() throws IOException {
471 if (buffer.isAtEnd()) {
472 fill();
473
474 if (buffer.isAtEnd()) {
475 return null;
476 }
477 }
478
479 assert !buffer.isAtEnd();
480 KeyValuePair result = new KeyValuePair();
481
482 result.key = buffer.getKey();
483 result.value = buffer.getValue();
484
485 buffer.incrementTuple();
486 buffer.autoIncrementKey();
487
488 return result;
489 }
490
491 public final void fill() throws IOException {
492 try {
493 buffer.reset();
494
495 if (tupleCount != 0) {
496
497 if(updateKeyCount - tupleCount > 0) {
498 buffer.keys.add(last.key);
499 buffer.keyTupleIdx.add((int) (updateKeyCount - tupleCount));
500 }
501 bufferStartCount = tupleCount;
502 }
503
504 while (!buffer.isFull()) {
505 updateKey();
506 buffer.processTuple(input.readBytes());
507 tupleCount++;
508 }
509 } catch(EOFException e) {}
510 }
511
512 public final void updateKey() throws IOException {
513 if (updateKeyCount > tupleCount)
514 return;
515
516 last.key = input.readBytes();
517 updateKeyCount = tupleCount + input.readInt();
518
519 buffer.processKey(last.key);
520 }
521
522 public void run() throws IOException {
523 while (true) {
524 fill();
525
526 if (buffer.isAtEnd())
527 break;
528
529 buffer.copyUntil(null, processor);
530 }
531 processor.close();
532 }
533
534 public void setProcessor(Step processor) throws IncompatibleProcessorException {
535 if (processor instanceof ShreddedProcessor) {
536 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
537 } else if (processor instanceof KeyValuePair.Processor) {
538 this.processor = new DuplicateEliminator(new TupleUnshredder((KeyValuePair.Processor) processor));
539 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
540 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<KeyValuePair>) processor));
541 } else {
542 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
543 }
544 }
545
546 public Class<KeyValuePair> getOutputClass() {
547 return KeyValuePair.class;
548 }
549 }
550
551 public static class DuplicateEliminator implements ShreddedProcessor {
552 public ShreddedProcessor processor;
553 KeyValuePair last = new KeyValuePair();
554 boolean keyProcess = true;
555
556 public DuplicateEliminator() {}
557 public DuplicateEliminator(ShreddedProcessor processor) {
558 this.processor = processor;
559 }
560
561 public void setShreddedProcessor(ShreddedProcessor processor) {
562 this.processor = processor;
563 }
564
565 public void processKey(byte[] key) throws IOException {
566 if (keyProcess || Utility.compare(key, last.key) != 0) {
567 last.key = key;
568 processor.processKey(key);
569 keyProcess = false;
570 }
571 }
572
573 public void resetKey() {
574 keyProcess = true;
575 }
576
577 public void processTuple(byte[] value) throws IOException {
578 processor.processTuple(value);
579 }
580
581 public void close() throws IOException {
582 processor.close();
583 }
584 }
585 public static class TupleUnshredder implements ShreddedProcessor {
586 KeyValuePair last = new KeyValuePair();
587 public org.galagosearch.tupleflow.Processor<KeyValuePair> processor;
588
589 public TupleUnshredder(KeyValuePair.Processor processor) {
590 this.processor = processor;
591 }
592
593 public TupleUnshredder(org.galagosearch.tupleflow.Processor<KeyValuePair> processor) {
594 this.processor = processor;
595 }
596
597 public KeyValuePair clone(KeyValuePair object) {
598 KeyValuePair result = new KeyValuePair();
599 if (object == null) return result;
600 result.key = object.key;
601 result.value = object.value;
602 return result;
603 }
604
605 public void processKey(byte[] key) throws IOException {
606 last.key = key;
607 }
608
609
610 public void processTuple(byte[] value) throws IOException {
611 last.value = value;
612 processor.process(clone(last));
613 }
614
615 public void close() throws IOException {
616 processor.close();
617 }
618 }
619 public static class TupleShredder implements Processor {
620 KeyValuePair last = new KeyValuePair();
621 public ShreddedProcessor processor;
622
623 public TupleShredder(ShreddedProcessor processor) {
624 this.processor = processor;
625 }
626
627 public KeyValuePair clone(KeyValuePair object) {
628 KeyValuePair result = new KeyValuePair();
629 if (object == null) return result;
630 result.key = object.key;
631 result.value = object.value;
632 return result;
633 }
634
635 public void process(KeyValuePair object) throws IOException {
636 boolean processAll = false;
637 if(last == null || Utility.compare(last.key, object.key) != 0 || processAll) { processor.processKey(object.key); processAll = true; }
638 processor.processTuple(object.value);
639 }
640
641 public Class<KeyValuePair> getInputClass() {
642 return KeyValuePair.class;
643 }
644
645 public void close() throws IOException {
646 processor.close();
647 }
648 }
649 }
650 }