1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class DocumentExtent implements Type<DocumentExtent> {
25 public String extentName;
26 public String identifier;
27 public int begin;
28 public int end;
29
30 public DocumentExtent() {}
31 public DocumentExtent(String extentName, String identifier, int begin, int end) {
32 this.extentName = extentName;
33 this.identifier = identifier;
34 this.begin = begin;
35 this.end = end;
36 }
37
38 public String toString() {
39 return String.format("%s,%s,%d,%d",
40 extentName, identifier, begin, end);
41 }
42
43 public Order<DocumentExtent> getOrder(String... spec) {
44 if (Arrays.equals(spec, new String[] { "+identifier" })) {
45 return new IdentifierOrder();
46 }
47 return null;
48 }
49
50 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<DocumentExtent> {
51 public void process(DocumentExtent object) throws IOException;
52 public void close() throws IOException;
53 }
54 public interface Source extends Step {
55 }
56 public static class IdentifierOrder implements Order<DocumentExtent> {
57 public int hash(DocumentExtent object) {
58 int h = 0;
59 h += Utility.hash(object.identifier);
60 return h;
61 }
62 public Comparator<DocumentExtent> greaterThan() {
63 return new Comparator<DocumentExtent>() {
64 public int compare(DocumentExtent one, DocumentExtent two) {
65 int result = 0;
66 do {
67 result = + Utility.compare(one.identifier, two.identifier);
68 if(result != 0) break;
69 } while (false);
70 return -result;
71 }
72 };
73 }
74 public Comparator<DocumentExtent> lessThan() {
75 return new Comparator<DocumentExtent>() {
76 public int compare(DocumentExtent one, DocumentExtent two) {
77 int result = 0;
78 do {
79 result = + Utility.compare(one.identifier, two.identifier);
80 if(result != 0) break;
81 } while (false);
82 return result;
83 }
84 };
85 }
86 public TypeReader<DocumentExtent> orderedReader(ArrayInput _input) {
87 return new ShreddedReader(_input);
88 }
89
90 public TypeReader<DocumentExtent> orderedReader(ArrayInput _input, int bufferSize) {
91 return new ShreddedReader(_input, bufferSize);
92 }
93 public OrderedWriter<DocumentExtent> orderedWriter(ArrayOutput _output) {
94 ShreddedWriter w = new ShreddedWriter(_output);
95 return new OrderedWriterClass(w);
96 }
97 public static class OrderedWriterClass extends OrderedWriter< DocumentExtent > {
98 DocumentExtent last = null;
99 ShreddedWriter shreddedWriter = null;
100
101 public OrderedWriterClass(ShreddedWriter s) {
102 this.shreddedWriter = s;
103 }
104
105 public void process(DocumentExtent object) throws IOException {
106 boolean processAll = false;
107 if (processAll || last == null || 0 != Utility.compare(object.identifier, last.identifier)) { processAll = true; shreddedWriter.processIdentifier(object.identifier); }
108 shreddedWriter.processTuple(object.extentName, object.begin, object.end);
109 last = object;
110 }
111
112 public void close() throws IOException {
113 shreddedWriter.close();
114 }
115
116 public Class<DocumentExtent> getInputClass() {
117 return DocumentExtent.class;
118 }
119 }
120 public ReaderSource<DocumentExtent> orderedCombiner(Collection<TypeReader<DocumentExtent>> readers, boolean closeOnExit) {
121 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
122
123 for (TypeReader<DocumentExtent> reader : readers) {
124 shreddedReaders.add((ShreddedReader)reader);
125 }
126
127 return new ShreddedCombiner(shreddedReaders, closeOnExit);
128 }
129 public DocumentExtent clone(DocumentExtent object) {
130 DocumentExtent result = new DocumentExtent();
131 if (object == null) return result;
132 result.extentName = object.extentName;
133 result.identifier = object.identifier;
134 result.begin = object.begin;
135 result.end = object.end;
136 return result;
137 }
138 public Class<DocumentExtent> getOrderedClass() {
139 return DocumentExtent.class;
140 }
141 public String[] getOrderSpec() {
142 return new String[] {"+identifier"};
143 }
144
145 public static String getSpecString() {
146 return "+identifier";
147 }
148
149 public interface ShreddedProcessor extends Step {
150 public void processIdentifier(String identifier) throws IOException;
151 public void processTuple(String extentName, int begin, int end) throws IOException;
152 public void close() throws IOException;
153 }
154 public interface ShreddedSource extends Step {
155 }
156
157 public static class ShreddedWriter implements ShreddedProcessor {
158 ArrayOutput output;
159 ShreddedBuffer buffer = new ShreddedBuffer();
160 String lastIdentifier;
161 boolean lastFlush = false;
162
163 public ShreddedWriter(ArrayOutput output) {
164 this.output = output;
165 }
166
167 public void close() throws IOException {
168 flush();
169 }
170
171 public void processIdentifier(String identifier) {
172 lastIdentifier = identifier;
173 buffer.processIdentifier(identifier);
174 }
175 public final void processTuple(String extentName, int begin, int end) throws IOException {
176 if (lastFlush) {
177 if(buffer.identifiers.size() == 0) buffer.processIdentifier(lastIdentifier);
178 lastFlush = false;
179 }
180 buffer.processTuple(extentName, begin, end);
181 if (buffer.isFull())
182 flush();
183 }
184 public final void flushTuples(int pauseIndex) throws IOException {
185
186 while (buffer.getReadIndex() < pauseIndex) {
187
188 output.writeString(buffer.getExtentName());
189 output.writeInt(buffer.getBegin());
190 output.writeInt(buffer.getEnd());
191 buffer.incrementTuple();
192 }
193 }
194 public final void flushIdentifier(int pauseIndex) throws IOException {
195 while (buffer.getReadIndex() < pauseIndex) {
196 int nextPause = buffer.getIdentifierEndIndex();
197 int count = nextPause - buffer.getReadIndex();
198
199 output.writeString(buffer.getIdentifier());
200 output.writeInt(count);
201 buffer.incrementIdentifier();
202
203 flushTuples(nextPause);
204 assert nextPause == buffer.getReadIndex();
205 }
206 }
207 public void flush() throws IOException {
208 flushIdentifier(buffer.getWriteIndex());
209 buffer.reset();
210 lastFlush = true;
211 }
212 }
213 public static class ShreddedBuffer {
214 ArrayList<String> identifiers = new ArrayList();
215 ArrayList<Integer> identifierTupleIdx = new ArrayList();
216 int identifierReadIdx = 0;
217
218 String[] extentNames;
219 int[] begins;
220 int[] ends;
221 int writeTupleIndex = 0;
222 int readTupleIndex = 0;
223 int batchSize;
224
225 public ShreddedBuffer(int batchSize) {
226 this.batchSize = batchSize;
227
228 extentNames = new String[batchSize];
229 begins = new int[batchSize];
230 ends = new int[batchSize];
231 }
232
233 public ShreddedBuffer() {
234 this(10000);
235 }
236
237 public void processIdentifier(String identifier) {
238 identifiers.add(identifier);
239 identifierTupleIdx.add(writeTupleIndex);
240 }
241 public void processTuple(String extentName, int begin, int end) {
242 assert identifiers.size() > 0;
243 extentNames[writeTupleIndex] = extentName;
244 begins[writeTupleIndex] = begin;
245 ends[writeTupleIndex] = end;
246 writeTupleIndex++;
247 }
248 public void resetData() {
249 identifiers.clear();
250 identifierTupleIdx.clear();
251 writeTupleIndex = 0;
252 }
253
254 public void resetRead() {
255 readTupleIndex = 0;
256 identifierReadIdx = 0;
257 }
258
259 public void reset() {
260 resetData();
261 resetRead();
262 }
263 public boolean isFull() {
264 return writeTupleIndex >= batchSize;
265 }
266
267 public boolean isEmpty() {
268 return writeTupleIndex == 0;
269 }
270
271 public boolean isAtEnd() {
272 return readTupleIndex >= writeTupleIndex;
273 }
274 public void incrementIdentifier() {
275 identifierReadIdx++;
276 }
277
278 public void autoIncrementIdentifier() {
279 while (readTupleIndex >= getIdentifierEndIndex() && readTupleIndex < writeTupleIndex)
280 identifierReadIdx++;
281 }
282 public void incrementTuple() {
283 readTupleIndex++;
284 }
285 public int getIdentifierEndIndex() {
286 if ((identifierReadIdx+1) >= identifierTupleIdx.size())
287 return writeTupleIndex;
288 return identifierTupleIdx.get(identifierReadIdx+1);
289 }
290 public int getReadIndex() {
291 return readTupleIndex;
292 }
293
294 public int getWriteIndex() {
295 return writeTupleIndex;
296 }
297 public String getIdentifier() {
298 assert readTupleIndex < writeTupleIndex;
299 assert identifierReadIdx < identifiers.size();
300
301 return identifiers.get(identifierReadIdx);
302 }
303 public String getExtentName() {
304 assert readTupleIndex < writeTupleIndex;
305 return extentNames[readTupleIndex];
306 }
307 public int getBegin() {
308 assert readTupleIndex < writeTupleIndex;
309 return begins[readTupleIndex];
310 }
311 public int getEnd() {
312 assert readTupleIndex < writeTupleIndex;
313 return ends[readTupleIndex];
314 }
315 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
316 while (getReadIndex() < endIndex) {
317 output.processTuple(getExtentName(), getBegin(), getEnd());
318 incrementTuple();
319 }
320 }
321 public void copyUntilIndexIdentifier(int endIndex, ShreddedProcessor output) throws IOException {
322 while (getReadIndex() < endIndex) {
323 output.processIdentifier(getIdentifier());
324 assert getIdentifierEndIndex() <= endIndex;
325 copyTuples(getIdentifierEndIndex(), output);
326 incrementIdentifier();
327 }
328 }
329 public void copyUntilIdentifier(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
330 while (!isAtEnd()) {
331 if (other != null) {
332 assert !other.isAtEnd();
333 int c = + Utility.compare(getIdentifier(), other.getIdentifier());
334
335 if (c > 0) {
336 break;
337 }
338
339 output.processIdentifier(getIdentifier());
340
341 copyTuples(getIdentifierEndIndex(), output);
342 } else {
343 output.processIdentifier(getIdentifier());
344 copyTuples(getIdentifierEndIndex(), output);
345 }
346 incrementIdentifier();
347
348
349 }
350 }
351 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
352 copyUntilIdentifier(other, output);
353 }
354
355 }
356 public static class ShreddedCombiner implements ReaderSource<DocumentExtent>, ShreddedSource {
357 public ShreddedProcessor processor;
358 Collection<ShreddedReader> readers;
359 boolean closeOnExit = false;
360 boolean uninitialized = true;
361 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
362
363 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
364 this.readers = readers;
365 this.closeOnExit = closeOnExit;
366 }
367
368 public void setProcessor(Step processor) throws IncompatibleProcessorException {
369 if (processor instanceof ShreddedProcessor) {
370 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
371 } else if (processor instanceof DocumentExtent.Processor) {
372 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentExtent.Processor) processor));
373 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
374 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentExtent>) processor));
375 } else {
376 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
377 }
378 }
379
380 public Class<DocumentExtent> getOutputClass() {
381 return DocumentExtent.class;
382 }
383
384 public void initialize() throws IOException {
385 for (ShreddedReader reader : readers) {
386 reader.fill();
387
388 if (!reader.getBuffer().isAtEnd())
389 queue.add(reader);
390 }
391
392 uninitialized = false;
393 }
394
395 public void run() throws IOException {
396 initialize();
397
398 while (queue.size() > 0) {
399 ShreddedReader top = queue.poll();
400 ShreddedReader next = null;
401 ShreddedBuffer nextBuffer = null;
402
403 assert !top.getBuffer().isAtEnd();
404
405 if (queue.size() > 0) {
406 next = queue.peek();
407 nextBuffer = next.getBuffer();
408 assert !nextBuffer.isAtEnd();
409 }
410
411 top.getBuffer().copyUntil(nextBuffer, processor);
412 if (top.getBuffer().isAtEnd())
413 top.fill();
414
415 if (!top.getBuffer().isAtEnd())
416 queue.add(top);
417 }
418
419 if (closeOnExit)
420 processor.close();
421 }
422
423 public DocumentExtent read() throws IOException {
424 if (uninitialized)
425 initialize();
426
427 DocumentExtent result = null;
428
429 while (queue.size() > 0) {
430 ShreddedReader top = queue.poll();
431 result = top.read();
432
433 if (result != null) {
434 if (top.getBuffer().isAtEnd())
435 top.fill();
436
437 queue.offer(top);
438 break;
439 }
440 }
441
442 return result;
443 }
444 }
445 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<DocumentExtent>, ShreddedSource {
446 public ShreddedProcessor processor;
447 ShreddedBuffer buffer;
448 DocumentExtent last = new DocumentExtent();
449 long updateIdentifierCount = -1;
450 long tupleCount = 0;
451 long bufferStartCount = 0;
452 ArrayInput input;
453
454 public ShreddedReader(ArrayInput input) {
455 this.input = input;
456 this.buffer = new ShreddedBuffer();
457 }
458
459 public ShreddedReader(ArrayInput input, int bufferSize) {
460 this.input = input;
461 this.buffer = new ShreddedBuffer(bufferSize);
462 }
463
464 public final int compareTo(ShreddedReader other) {
465 ShreddedBuffer otherBuffer = other.getBuffer();
466
467 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
468 return 0;
469 } else if (buffer.isAtEnd()) {
470 return -1;
471 } else if (otherBuffer.isAtEnd()) {
472 return 1;
473 }
474
475 int result = 0;
476 do {
477 result = + Utility.compare(buffer.getIdentifier(), otherBuffer.getIdentifier());
478 if(result != 0) break;
479 } while (false);
480
481 return result;
482 }
483
484 public final ShreddedBuffer getBuffer() {
485 return buffer;
486 }
487
488 public final DocumentExtent read() throws IOException {
489 if (buffer.isAtEnd()) {
490 fill();
491
492 if (buffer.isAtEnd()) {
493 return null;
494 }
495 }
496
497 assert !buffer.isAtEnd();
498 DocumentExtent result = new DocumentExtent();
499
500 result.identifier = buffer.getIdentifier();
501 result.extentName = buffer.getExtentName();
502 result.begin = buffer.getBegin();
503 result.end = buffer.getEnd();
504
505 buffer.incrementTuple();
506 buffer.autoIncrementIdentifier();
507
508 return result;
509 }
510
511 public final void fill() throws IOException {
512 try {
513 buffer.reset();
514
515 if (tupleCount != 0) {
516
517 if(updateIdentifierCount - tupleCount > 0) {
518 buffer.identifiers.add(last.identifier);
519 buffer.identifierTupleIdx.add((int) (updateIdentifierCount - tupleCount));
520 }
521 bufferStartCount = tupleCount;
522 }
523
524 while (!buffer.isFull()) {
525 updateIdentifier();
526 buffer.processTuple(input.readString(), input.readInt(), input.readInt());
527 tupleCount++;
528 }
529 } catch(EOFException e) {}
530 }
531
532 public final void updateIdentifier() throws IOException {
533 if (updateIdentifierCount > tupleCount)
534 return;
535
536 last.identifier = input.readString();
537 updateIdentifierCount = tupleCount + input.readInt();
538
539 buffer.processIdentifier(last.identifier);
540 }
541
542 public void run() throws IOException {
543 while (true) {
544 fill();
545
546 if (buffer.isAtEnd())
547 break;
548
549 buffer.copyUntil(null, processor);
550 }
551 processor.close();
552 }
553
554 public void setProcessor(Step processor) throws IncompatibleProcessorException {
555 if (processor instanceof ShreddedProcessor) {
556 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
557 } else if (processor instanceof DocumentExtent.Processor) {
558 this.processor = new DuplicateEliminator(new TupleUnshredder((DocumentExtent.Processor) processor));
559 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
560 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<DocumentExtent>) processor));
561 } else {
562 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
563 }
564 }
565
566 public Class<DocumentExtent> getOutputClass() {
567 return DocumentExtent.class;
568 }
569 }
570
571 public static class DuplicateEliminator implements ShreddedProcessor {
572 public ShreddedProcessor processor;
573 DocumentExtent last = new DocumentExtent();
574 boolean identifierProcess = true;
575
576 public DuplicateEliminator() {}
577 public DuplicateEliminator(ShreddedProcessor processor) {
578 this.processor = processor;
579 }
580
581 public void setShreddedProcessor(ShreddedProcessor processor) {
582 this.processor = processor;
583 }
584
585 public void processIdentifier(String identifier) throws IOException {
586 if (identifierProcess || Utility.compare(identifier, last.identifier) != 0) {
587 last.identifier = identifier;
588 processor.processIdentifier(identifier);
589 identifierProcess = false;
590 }
591 }
592
593 public void resetIdentifier() {
594 identifierProcess = true;
595 }
596
597 public void processTuple(String extentName, int begin, int end) throws IOException {
598 processor.processTuple(extentName, begin, end);
599 }
600
601 public void close() throws IOException {
602 processor.close();
603 }
604 }
605 public static class TupleUnshredder implements ShreddedProcessor {
606 DocumentExtent last = new DocumentExtent();
607 public org.galagosearch.tupleflow.Processor<DocumentExtent> processor;
608
609 public TupleUnshredder(DocumentExtent.Processor processor) {
610 this.processor = processor;
611 }
612
613 public TupleUnshredder(org.galagosearch.tupleflow.Processor<DocumentExtent> processor) {
614 this.processor = processor;
615 }
616
617 public DocumentExtent clone(DocumentExtent object) {
618 DocumentExtent result = new DocumentExtent();
619 if (object == null) return result;
620 result.extentName = object.extentName;
621 result.identifier = object.identifier;
622 result.begin = object.begin;
623 result.end = object.end;
624 return result;
625 }
626
627 public void processIdentifier(String identifier) throws IOException {
628 last.identifier = identifier;
629 }
630
631
632 public void processTuple(String extentName, int begin, int end) throws IOException {
633 last.extentName = extentName;
634 last.begin = begin;
635 last.end = end;
636 processor.process(clone(last));
637 }
638
639 public void close() throws IOException {
640 processor.close();
641 }
642 }
643 public static class TupleShredder implements Processor {
644 DocumentExtent last = new DocumentExtent();
645 public ShreddedProcessor processor;
646
647 public TupleShredder(ShreddedProcessor processor) {
648 this.processor = processor;
649 }
650
651 public DocumentExtent clone(DocumentExtent object) {
652 DocumentExtent result = new DocumentExtent();
653 if (object == null) return result;
654 result.extentName = object.extentName;
655 result.identifier = object.identifier;
656 result.begin = object.begin;
657 result.end = object.end;
658 return result;
659 }
660
661 public void process(DocumentExtent object) throws IOException {
662 boolean processAll = false;
663 if(last == null || Utility.compare(last.identifier, object.identifier) != 0 || processAll) { processor.processIdentifier(object.identifier); processAll = true; }
664 processor.processTuple(object.extentName, object.begin, object.end);
665 }
666
667 public Class<DocumentExtent> getInputClass() {
668 return DocumentExtent.class;
669 }
670
671 public void close() throws IOException {
672 processor.close();
673 }
674 }
675 }
676 }