1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class DocumentValuedExtent implements Type<DocumentValuedExtent> {
25 public String extentName;
26 public String identifier;
27 public int begin;
28 public int end;
29 public long value;
30
31 public DocumentValuedExtent() {}
32 public DocumentValuedExtent(String extentName, String identifier, int begin, int end, long value) {
33 this.extentName = extentName;
34 this.identifier = identifier;
35 this.begin = begin;
36 this.end = end;
37 this.value = value;
38 }
39
40 public String toString() {
41 return String.format("%s,%s,%d,%d,%d",
42 extentName, identifier, begin, end, value);
43 }
44
45 public Order<DocumentValuedExtent> getOrder(String... spec) {
46 if (Arrays.equals(spec, new String[] { "+identifier" })) {
47 return new IdentifierOrder();
48 }
49 return null;
50 }
51
52 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentValuedExtent> {
53 public void process(DocumentValuedExtent object) throws IOException;
54 public void close() throws IOException;
55 }
56 public interface Source extends Step {
57 }
58 public static class IdentifierOrder implements Order<DocumentValuedExtent> {
59 public int hash(DocumentValuedExtent object) {
60 int h = 0;
61 h += Utility.hash(object.identifier);
62 return h;
63 }
64 public Comparator<DocumentValuedExtent> greaterThan() {
65 return new Comparator<DocumentValuedExtent>() {
66 public int compare(DocumentValuedExtent one, DocumentValuedExtent two) {
67 int result = 0;
68 do {
69 result = + Utility.compare(one.identifier, two.identifier);
70 if(result != 0) break;
71 } while (false);
72 return -result;
73 }
74 };
75 }
76 public Comparator<DocumentValuedExtent> lessThan() {
77 return new Comparator<DocumentValuedExtent>() {
78 public int compare(DocumentValuedExtent one, DocumentValuedExtent two) {
79 int result = 0;
80 do {
81 result = + Utility.compare(one.identifier, two.identifier);
82 if(result != 0) break;
83 } while (false);
84 return result;
85 }
86 };
87 }
88 public TypeReader<DocumentValuedExtent> orderedReader(ArrayInput _input) {
89 return new ShreddedReader(_input);
90 }
91
92 public TypeReader<DocumentValuedExtent> orderedReader(ArrayInput _input, int bufferSize) {
93 return new ShreddedReader(_input, bufferSize);
94 }
95 public OrderedWriter<DocumentValuedExtent> orderedWriter(ArrayOutput _output) {
96 ShreddedWriter w = new ShreddedWriter(_output);
97 return new OrderedWriterClass(w);
98 }
99 public static class OrderedWriterClass extends OrderedWriter< DocumentValuedExtent > {
100 DocumentValuedExtent last = null;
101 ShreddedWriter shreddedWriter = null;
102
103 public OrderedWriterClass(ShreddedWriter s) {
104 this.shreddedWriter = s;
105 }
106
107 public void process(DocumentValuedExtent object) throws IOException {
108 boolean processAll = false;
109 if (processAll || last == null || 0 != Utility.compare(object.identifier, last.identifier)) { processAll = true; shreddedWriter.processIdentifier(object.identifier); }
110 shreddedWriter.processTuple(object.extentName, object.begin, object.end, object.value);
111 last = object;
112 }
113
114 public void close() throws IOException {
115 shreddedWriter.close();
116 }
117
118 public Class<DocumentValuedExtent> getInputClass() {
119 return DocumentValuedExtent.class;
120 }
121 }
122 public ReaderSource<DocumentValuedExtent> orderedCombiner(Collection<TypeReader<DocumentValuedExtent>> readers, boolean closeOnExit) {
123 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
124
125 for (TypeReader<DocumentValuedExtent> reader : readers) {
126 shreddedReaders.add((ShreddedReader)reader);
127 }
128
129 return new ShreddedCombiner(shreddedReaders, closeOnExit);
130 }
131 public DocumentValuedExtent clone(DocumentValuedExtent object) {
132 DocumentValuedExtent result = new DocumentValuedExtent();
133 if (object == null) return result;
134 result.extentName = object.extentName;
135 result.identifier = object.identifier;
136 result.begin = object.begin;
137 result.end = object.end;
138 result.value = object.value;
139 return result;
140 }
141 public Class<DocumentValuedExtent> getOrderedClass() {
142 return DocumentValuedExtent.class;
143 }
144 public String[] getOrderSpec() {
145 return new String[] {"+identifier"};
146 }
147
148 public static String getSpecString() {
149 return "+identifier";
150 }
151
152 public interface ShreddedProcessor extends Step {
153 public void processIdentifier(String identifier) throws IOException;
154 public void processTuple(String extentName, int begin, int end, long value) throws IOException;
155 public void close() throws IOException;
156 }
157 public interface ShreddedSource extends Step {
158 }
159
160 public static class ShreddedWriter implements ShreddedProcessor {
161 ArrayOutput output;
162 ShreddedBuffer buffer = new ShreddedBuffer();
163 String lastIdentifier;
164 boolean lastFlush = false;
165
166 public ShreddedWriter(ArrayOutput output) {
167 this.output = output;
168 }
169
170 public void close() throws IOException {
171 flush();
172 }
173
174 public void processIdentifier(String identifier) {
175 lastIdentifier = identifier;
176 buffer.processIdentifier(identifier);
177 }
178 public final void processTuple(String extentName, int begin, int end, long value) throws IOException {
179 if (lastFlush) {
180 if(buffer.identifiers.size() == 0) buffer.processIdentifier(lastIdentifier);
181 lastFlush = false;
182 }
183 buffer.processTuple(extentName, begin, end, value);
184 if (buffer.isFull())
185 flush();
186 }
187 public final void flushTuples(int pauseIndex) throws IOException {
188
189 while (buffer.getReadIndex() < pauseIndex) {
190
191 output.writeString(buffer.getExtentName());
192 output.writeInt(buffer.getBegin());
193 output.writeInt(buffer.getEnd());
194 output.writeLong(buffer.getValue());
195 buffer.incrementTuple();
196 }
197 }
198 public final void flushIdentifier(int pauseIndex) throws IOException {
199 while (buffer.getReadIndex() < pauseIndex) {
200 int nextPause = buffer.getIdentifierEndIndex();
201 int count = nextPause - buffer.getReadIndex();
202
203 output.writeString(buffer.getIdentifier());
204 output.writeInt(count);
205 buffer.incrementIdentifier();
206
207 flushTuples(nextPause);
208 assert nextPause == buffer.getReadIndex();
209 }
210 }
211 public void flush() throws IOException {
212 flushIdentifier(buffer.getWriteIndex());
213 buffer.reset();
214 lastFlush = true;
215 }
216 }
217 public static class ShreddedBuffer {
218 ArrayList<String> identifiers = new ArrayList();
219 ArrayList<Integer> identifierTupleIdx = new ArrayList();
220 int identifierReadIdx = 0;
221
222 String[] extentNames;
223 int[] begins;
224 int[] ends;
225 long[] values;
226 int writeTupleIndex = 0;
227 int readTupleIndex = 0;
228 int batchSize;
229
230 public ShreddedBuffer(int batchSize) {
231 this.batchSize = batchSize;
232
233 extentNames = new String[batchSize];
234 begins = new int[batchSize];
235 ends = new int[batchSize];
236 values = new long[batchSize];
237 }
238
239 public ShreddedBuffer() {
240 this(10000);
241 }
242
243 public void processIdentifier(String identifier) {
244 identifiers.add(identifier);
245 identifierTupleIdx.add(writeTupleIndex);
246 }
247 public void processTuple(String extentName, int begin, int end, long value) {
248 assert identifiers.size() > 0;
249 extentNames[writeTupleIndex] = extentName;
250 begins[writeTupleIndex] = begin;
251 ends[writeTupleIndex] = end;
252 values[writeTupleIndex] = value;
253 writeTupleIndex++;
254 }
255 public void resetData() {
256 identifiers.clear();
257 identifierTupleIdx.clear();
258 writeTupleIndex = 0;
259 }
260
261 public void resetRead() {
262 readTupleIndex = 0;
263 identifierReadIdx = 0;
264 }
265
266 public void reset() {
267 resetData();
268 resetRead();
269 }
270 public boolean isFull() {
271 return writeTupleIndex >= batchSize;
272 }
273
274 public boolean isEmpty() {
275 return writeTupleIndex == 0;
276 }
277
278 public boolean isAtEnd() {
279 return readTupleIndex >= writeTupleIndex;
280 }
281 public void incrementIdentifier() {
282 identifierReadIdx++;
283 }
284
285 public void autoIncrementIdentifier() {
286 while (readTupleIndex >= getIdentifierEndIndex() && readTupleIndex < writeTupleIndex)
287 identifierReadIdx++;
288 }
289 public void incrementTuple() {
290 readTupleIndex++;
291 }
292 public int getIdentifierEndIndex() {
293 if ((identifierReadIdx+1) >= identifierTupleIdx.size())
294 return writeTupleIndex;
295 return identifierTupleIdx.get(identifierReadIdx+1);
296 }
297 public int getReadIndex() {
298 return readTupleIndex;
299 }
300
301 public int getWriteIndex() {
302 return writeTupleIndex;
303 }
304 public String getIdentifier() {
305 assert readTupleIndex < writeTupleIndex;
306 assert identifierReadIdx < identifiers.size();
307
308 return identifiers.get(identifierReadIdx);
309 }
310 public String getExtentName() {
311 assert readTupleIndex < writeTupleIndex;
312 return extentNames[readTupleIndex];
313 }
314 public int getBegin() {
315 assert readTupleIndex < writeTupleIndex;
316 return begins[readTupleIndex];
317 }
318 public int getEnd() {
319 assert readTupleIndex < writeTupleIndex;
320 return ends[readTupleIndex];
321 }
322 public long getValue() {
323 assert readTupleIndex < writeTupleIndex;
324 return values[readTupleIndex];
325 }
326 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
327 while (getReadIndex() < endIndex) {
328 output.processTuple(getExtentName(), getBegin(), getEnd(), getValue());
329 incrementTuple();
330 }
331 }
332 public void copyUntilIndexIdentifier(int endIndex, ShreddedProcessor output) throws IOException {
333 while (getReadIndex() < endIndex) {
334 output.processIdentifier(getIdentifier());
335 assert getIdentifierEndIndex() <= endIndex;
336 copyTuples(getIdentifierEndIndex(), output);
337 incrementIdentifier();
338 }
339 }
340 public void copyUntilIdentifier(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
341 while (!isAtEnd()) {
342 if (other != null) {
343 assert !other.isAtEnd();
344 int c = + Utility.compare(getIdentifier(), other.getIdentifier());
345
346 if (c > 0) {
347 break;
348 }
349
350 output.processIdentifier(getIdentifier());
351
352 copyTuples(getIdentifierEndIndex(), output);
353 } else {
354 output.processIdentifier(getIdentifier());
355 copyTuples(getIdentifierEndIndex(), output);
356 }
357 incrementIdentifier();
358
359
360 }
361 }
362 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
363 copyUntilIdentifier(other, output);
364 }
365
366 }
367 public static class ShreddedCombiner implements ReaderSource<DocumentValuedExtent>, ShreddedSource {
368 public ShreddedProcessor processor;
369 Collection<ShreddedReader> readers;
370 boolean closeOnExit = false;
371 boolean uninitialized = true;
372 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
373
374 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
375 this.readers = readers;
376 this.closeOnExit = closeOnExit;
377 }
378
379 public void setProcessor(Step processor) throws IncompatibleProcessorException {
380 if (processor instanceof ShreddedProcessor) {
381 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
382 } else if (processor instanceof DocumentValuedExtent.Processor) {
383 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentValuedExtent.Processor) processor));
384 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
385 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentValuedExtent>) processor));
386 } else {
387 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
388 }
389 }
390
391 public Class<DocumentValuedExtent> getOutputClass() {
392 return DocumentValuedExtent.class;
393 }
394
395 public void initialize() throws IOException {
396 for (ShreddedReader reader : readers) {
397 reader.fill();
398
399 if (!reader.getBuffer().isAtEnd())
400 queue.add(reader);
401 }
402
403 uninitialized = false;
404 }
405
406 public void run() throws IOException {
407 initialize();
408
409 while (queue.size() > 0) {
410 ShreddedReader top = queue.poll();
411 ShreddedReader next = null;
412 ShreddedBuffer nextBuffer = null;
413
414 assert !top.getBuffer().isAtEnd();
415
416 if (queue.size() > 0) {
417 next = queue.peek();
418 nextBuffer = next.getBuffer();
419 assert !nextBuffer.isAtEnd();
420 }
421
422 top.getBuffer().copyUntil(nextBuffer, processor);
423 if (top.getBuffer().isAtEnd())
424 top.fill();
425
426 if (!top.getBuffer().isAtEnd())
427 queue.add(top);
428 }
429
430 if (closeOnExit)
431 processor.close();
432 }
433
434 public DocumentValuedExtent read() throws IOException {
435 if (uninitialized)
436 initialize();
437
438 DocumentValuedExtent result = null;
439
440 while (queue.size() > 0) {
441 ShreddedReader top = queue.poll();
442 result = top.read();
443
444 if (result != null) {
445 if (top.getBuffer().isAtEnd())
446 top.fill();
447
448 queue.offer(top);
449 break;
450 }
451 }
452
453 return result;
454 }
455 }
456 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentValuedExtent>, ShreddedSource {
457 public ShreddedProcessor processor;
458 ShreddedBuffer buffer;
459 DocumentValuedExtent last = new DocumentValuedExtent();
460 long updateIdentifierCount = -1;
461 long tupleCount = 0;
462 long bufferStartCount = 0;
463 ArrayInput input;
464
465 public ShreddedReader(ArrayInput input) {
466 this.input = input;
467 this.buffer = new ShreddedBuffer();
468 }
469
470 public ShreddedReader(ArrayInput input, int bufferSize) {
471 this.input = input;
472 this.buffer = new ShreddedBuffer(bufferSize);
473 }
474
475 public final int compareTo(ShreddedReader other) {
476 ShreddedBuffer otherBuffer = other.getBuffer();
477
478 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
479 return 0;
480 } else if (buffer.isAtEnd()) {
481 return -1;
482 } else if (otherBuffer.isAtEnd()) {
483 return 1;
484 }
485
486 int result = 0;
487 do {
488 result = + Utility.compare(buffer.getIdentifier(), otherBuffer.getIdentifier());
489 if(result != 0) break;
490 } while (false);
491
492 return result;
493 }
494
495 public final ShreddedBuffer getBuffer() {
496 return buffer;
497 }
498
499 public final DocumentValuedExtent read() throws IOException {
500 if (buffer.isAtEnd()) {
501 fill();
502
503 if (buffer.isAtEnd()) {
504 return null;
505 }
506 }
507
508 assert !buffer.isAtEnd();
509 DocumentValuedExtent result = new DocumentValuedExtent();
510
511 result.identifier = buffer.getIdentifier();
512 result.extentName = buffer.getExtentName();
513 result.begin = buffer.getBegin();
514 result.end = buffer.getEnd();
515 result.value = buffer.getValue();
516
517 buffer.incrementTuple();
518 buffer.autoIncrementIdentifier();
519
520 return result;
521 }
522
523 public final void fill() throws IOException {
524 try {
525 buffer.reset();
526
527 if (tupleCount != 0) {
528
529 if(updateIdentifierCount - tupleCount > 0) {
530 buffer.identifiers.add(last.identifier);
531 buffer.identifierTupleIdx.add((int) (updateIdentifierCount - tupleCount));
532 }
533 bufferStartCount = tupleCount;
534 }
535
536 while (!buffer.isFull()) {
537 updateIdentifier();
538 buffer.processTuple(input.readString(), input.readInt(), input.readInt(), input.readLong());
539 tupleCount++;
540 }
541 } catch(EOFException e) {}
542 }
543
544 public final void updateIdentifier() throws IOException {
545 if (updateIdentifierCount > tupleCount)
546 return;
547
548 last.identifier = input.readString();
549 updateIdentifierCount = tupleCount + input.readInt();
550
551 buffer.processIdentifier(last.identifier);
552 }
553
554 public void run() throws IOException {
555 while (true) {
556 fill();
557
558 if (buffer.isAtEnd())
559 break;
560
561 buffer.copyUntil(null, processor);
562 }
563 processor.close();
564 }
565
566 public void setProcessor(Step processor) throws IncompatibleProcessorException {
567 if (processor instanceof ShreddedProcessor) {
568 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
569 } else if (processor instanceof DocumentValuedExtent.Processor) {
570 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentValuedExtent.Processor) processor));
571 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
572 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentValuedExtent>) processor));
573 } else {
574 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
575 }
576 }
577
578 public Class<DocumentValuedExtent> getOutputClass() {
579 return DocumentValuedExtent.class;
580 }
581 }
582
583 public static class DuplicateEliminator implements ShreddedProcessor {
584 public ShreddedProcessor processor;
585 DocumentValuedExtent last = new DocumentValuedExtent();
586 boolean identifierProcess = true;
587
588 public DuplicateEliminator() {}
589 public DuplicateEliminator(ShreddedProcessor processor) {
590 this.processor = processor;
591 }
592
593 public void setShreddedProcessor(ShreddedProcessor processor) {
594 this.processor = processor;
595 }
596
597 public void processIdentifier(String identifier) throws IOException {
598 if (identifierProcess || Utility.compare(identifier, last.identifier) != 0) {
599 last.identifier = identifier;
600 processor.processIdentifier(identifier);
601 identifierProcess = false;
602 }
603 }
604
605 public void resetIdentifier() {
606 identifierProcess = true;
607 }
608
609 public void processTuple(String extentName, int begin, int end, long value) throws IOException {
610 processor.processTuple(extentName, begin, end, value);
611 }
612
613 public void close() throws IOException {
614 processor.close();
615 }
616 }
617 public static class TupleUnshredder implements ShreddedProcessor {
618 DocumentValuedExtent last = new DocumentValuedExtent();
619 public org.galagosearch.tupleflow.Processor<DocumentValuedExtent> processor;
620
621 public TupleUnshredder(DocumentValuedExtent.Processor processor) {
622 this.processor = processor;
623 }
624
625 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentValuedExtent> processor) {
626 this.processor = processor;
627 }
628
629 public DocumentValuedExtent clone(DocumentValuedExtent object) {
630 DocumentValuedExtent result = new DocumentValuedExtent();
631 if (object == null) return result;
632 result.extentName = object.extentName;
633 result.identifier = object.identifier;
634 result.begin = object.begin;
635 result.end = object.end;
636 result.value = object.value;
637 return result;
638 }
639
640 public void processIdentifier(String identifier) throws IOException {
641 last.identifier = identifier;
642 }
643
644
645 public void processTuple(String extentName, int begin, int end, long value) throws IOException {
646 last.extentName = extentName;
647 last.begin = begin;
648 last.end = end;
649 last.value = value;
650 processor.process(clone(last));
651 }
652
653 public void close() throws IOException {
654 processor.close();
655 }
656 }
657 public static class TupleShredder implements Processor {
658 DocumentValuedExtent last = new DocumentValuedExtent();
659 public ShreddedProcessor processor;
660
661 public TupleShredder(ShreddedProcessor processor) {
662 this.processor = processor;
663 }
664
665 public DocumentValuedExtent clone(DocumentValuedExtent object) {
666 DocumentValuedExtent result = new DocumentValuedExtent();
667 if (object == null) return result;
668 result.extentName = object.extentName;
669 result.identifier = object.identifier;
670 result.begin = object.begin;
671 result.end = object.end;
672 result.value = object.value;
673 return result;
674 }
675
676 public void process(DocumentValuedExtent object) throws IOException {
677 boolean processAll = false;
678 if(last == null || Utility.compare(last.identifier, object.identifier) != 0 || processAll) { processor.processIdentifier(object.identifier); processAll = true; }
679 processor.processTuple(object.extentName, object.begin, object.end, object.value);
680 }
681
682 public Class<DocumentValuedExtent> getInputClass() {
683 return DocumentValuedExtent.class;
684 }
685
686 public void close() throws IOException {
687 processor.close();
688 }
689 }
690 }
691 }