1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class WordOffset implements Type<WordOffset> {
25 public byte[] word;
26 public long offset;
27
28 public WordOffset() {}
29 public WordOffset(byte[] word, long offset) {
30 this.word = word;
31 this.offset = offset;
32 }
33
34 public String toString() {
35 try {
36 return String.format("%s,%d",
37 new String(word, "UTF-8"), offset);
38 } catch(UnsupportedEncodingException e) {
39 throw new RuntimeException("Couldn't convert string to UTF-8.");
40 }
41 }
42
43 public Order<WordOffset> getOrder(String... spec) {
44 if (Arrays.equals(spec, new String[] { "+word" })) {
45 return new WordOrder();
46 }
47 return null;
48 }
49
50 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<WordOffset> {
51 public void process(WordOffset object) throws IOException;
52 public void close() throws IOException;
53 }
54 public interface Source extends Step {
55 }
56 public static class WordOrder implements Order<WordOffset> {
57 public int hash(WordOffset object) {
58 int h = 0;
59 h += Utility.hash(object.word);
60 return h;
61 }
62 public Comparator<WordOffset> greaterThan() {
63 return new Comparator<WordOffset>() {
64 public int compare(WordOffset one, WordOffset two) {
65 int result = 0;
66 do {
67 result = + Utility.compare(one.word, two.word);
68 if(result != 0) break;
69 } while (false);
70 return -result;
71 }
72 };
73 }
74 public Comparator<WordOffset> lessThan() {
75 return new Comparator<WordOffset>() {
76 public int compare(WordOffset one, WordOffset two) {
77 int result = 0;
78 do {
79 result = + Utility.compare(one.word, two.word);
80 if(result != 0) break;
81 } while (false);
82 return result;
83 }
84 };
85 }
86 public TypeReader<WordOffset> orderedReader(ArrayInput _input) {
87 return new ShreddedReader(_input);
88 }
89
90 public TypeReader<WordOffset> orderedReader(ArrayInput _input, int bufferSize) {
91 return new ShreddedReader(_input, bufferSize);
92 }
93 public OrderedWriter<WordOffset> orderedWriter(ArrayOutput _output) {
94 ShreddedWriter w = new ShreddedWriter(_output);
95 return new OrderedWriterClass(w);
96 }
97 public static class OrderedWriterClass extends OrderedWriter< WordOffset > {
98 WordOffset last = null;
99 ShreddedWriter shreddedWriter = null;
100
101 public OrderedWriterClass(ShreddedWriter s) {
102 this.shreddedWriter = s;
103 }
104
105 public void process(WordOffset object) throws IOException {
106 boolean processAll = false;
107 if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
108 shreddedWriter.processTuple(object.offset);
109 last = object;
110 }
111
112 public void close() throws IOException {
113 shreddedWriter.close();
114 }
115
116 public Class<WordOffset> getInputClass() {
117 return WordOffset.class;
118 }
119 }
120 public ReaderSource<WordOffset> orderedCombiner(Collection<TypeReader<WordOffset>> readers, boolean closeOnExit) {
121 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
122
123 for (TypeReader<WordOffset> reader : readers) {
124 shreddedReaders.add((ShreddedReader)reader);
125 }
126
127 return new ShreddedCombiner(shreddedReaders, closeOnExit);
128 }
129 public WordOffset clone(WordOffset object) {
130 WordOffset result = new WordOffset();
131 if (object == null) return result;
132 result.word = object.word;
133 result.offset = object.offset;
134 return result;
135 }
136 public Class<WordOffset> getOrderedClass() {
137 return WordOffset.class;
138 }
139 public String[] getOrderSpec() {
140 return new String[] {"+word"};
141 }
142
143 public static String getSpecString() {
144 return "+word";
145 }
146
147 public interface ShreddedProcessor extends Step {
148 public void processWord(byte[] word) throws IOException;
149 public void processTuple(long offset) throws IOException;
150 public void close() throws IOException;
151 }
152 public interface ShreddedSource extends Step {
153 }
154
155 public static class ShreddedWriter implements ShreddedProcessor {
156 ArrayOutput output;
157 ShreddedBuffer buffer = new ShreddedBuffer();
158 byte[] lastWord;
159 boolean lastFlush = false;
160
161 public ShreddedWriter(ArrayOutput output) {
162 this.output = output;
163 }
164
165 public void close() throws IOException {
166 flush();
167 }
168
169 public void processWord(byte[] word) {
170 lastWord = word;
171 buffer.processWord(word);
172 }
173 public final void processTuple(long offset) throws IOException {
174 if (lastFlush) {
175 if(buffer.words.size() == 0) buffer.processWord(lastWord);
176 lastFlush = false;
177 }
178 buffer.processTuple(offset);
179 if (buffer.isFull())
180 flush();
181 }
182 public final void flushTuples(int pauseIndex) throws IOException {
183
184 while (buffer.getReadIndex() < pauseIndex) {
185
186 output.writeLong(buffer.getOffset());
187 buffer.incrementTuple();
188 }
189 }
190 public final void flushWord(int pauseIndex) throws IOException {
191 while (buffer.getReadIndex() < pauseIndex) {
192 int nextPause = buffer.getWordEndIndex();
193 int count = nextPause - buffer.getReadIndex();
194
195 output.writeBytes(buffer.getWord());
196 output.writeInt(count);
197 buffer.incrementWord();
198
199 flushTuples(nextPause);
200 assert nextPause == buffer.getReadIndex();
201 }
202 }
203 public void flush() throws IOException {
204 flushWord(buffer.getWriteIndex());
205 buffer.reset();
206 lastFlush = true;
207 }
208 }
209 public static class ShreddedBuffer {
210 ArrayList<byte[]> words = new ArrayList();
211 ArrayList<Integer> wordTupleIdx = new ArrayList();
212 int wordReadIdx = 0;
213
214 long[] offsets;
215 int writeTupleIndex = 0;
216 int readTupleIndex = 0;
217 int batchSize;
218
219 public ShreddedBuffer(int batchSize) {
220 this.batchSize = batchSize;
221
222 offsets = new long[batchSize];
223 }
224
225 public ShreddedBuffer() {
226 this(10000);
227 }
228
229 public void processWord(byte[] word) {
230 words.add(word);
231 wordTupleIdx.add(writeTupleIndex);
232 }
233 public void processTuple(long offset) {
234 assert words.size() > 0;
235 offsets[writeTupleIndex] = offset;
236 writeTupleIndex++;
237 }
238 public void resetData() {
239 words.clear();
240 wordTupleIdx.clear();
241 writeTupleIndex = 0;
242 }
243
244 public void resetRead() {
245 readTupleIndex = 0;
246 wordReadIdx = 0;
247 }
248
249 public void reset() {
250 resetData();
251 resetRead();
252 }
253 public boolean isFull() {
254 return writeTupleIndex >= batchSize;
255 }
256
257 public boolean isEmpty() {
258 return writeTupleIndex == 0;
259 }
260
261 public boolean isAtEnd() {
262 return readTupleIndex >= writeTupleIndex;
263 }
264 public void incrementWord() {
265 wordReadIdx++;
266 }
267
268 public void autoIncrementWord() {
269 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
270 wordReadIdx++;
271 }
272 public void incrementTuple() {
273 readTupleIndex++;
274 }
275 public int getWordEndIndex() {
276 if ((wordReadIdx+1) >= wordTupleIdx.size())
277 return writeTupleIndex;
278 return wordTupleIdx.get(wordReadIdx+1);
279 }
280 public int getReadIndex() {
281 return readTupleIndex;
282 }
283
284 public int getWriteIndex() {
285 return writeTupleIndex;
286 }
287 public byte[] getWord() {
288 assert readTupleIndex < writeTupleIndex;
289 assert wordReadIdx < words.size();
290
291 return words.get(wordReadIdx);
292 }
293 public long getOffset() {
294 assert readTupleIndex < writeTupleIndex;
295 return offsets[readTupleIndex];
296 }
297 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
298 while (getReadIndex() < endIndex) {
299 output.processTuple(getOffset());
300 incrementTuple();
301 }
302 }
303 public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
304 while (getReadIndex() < endIndex) {
305 output.processWord(getWord());
306 assert getWordEndIndex() <= endIndex;
307 copyTuples(getWordEndIndex(), output);
308 incrementWord();
309 }
310 }
311 public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
312 while (!isAtEnd()) {
313 if (other != null) {
314 assert !other.isAtEnd();
315 int c = + Utility.compare(getWord(), other.getWord());
316
317 if (c > 0) {
318 break;
319 }
320
321 output.processWord(getWord());
322
323 copyTuples(getWordEndIndex(), output);
324 } else {
325 output.processWord(getWord());
326 copyTuples(getWordEndIndex(), output);
327 }
328 incrementWord();
329
330
331 }
332 }
333 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
334 copyUntilWord(other, output);
335 }
336
337 }
338 public static class ShreddedCombiner implements ReaderSource<WordOffset>, ShreddedSource {
339 public ShreddedProcessor processor;
340 Collection<ShreddedReader> readers;
341 boolean closeOnExit = false;
342 boolean uninitialized = true;
343 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
344
345 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
346 this.readers = readers;
347 this.closeOnExit = closeOnExit;
348 }
349
350 public void setProcessor(Step processor) throws IncompatibleProcessorException {
351 if (processor instanceof ShreddedProcessor) {
352 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
353 } else if (processor instanceof WordOffset.Processor) {
354 this.processor = new DuplicateEliminator(new TupleUnshredder((WordOffset.Processor) processor));
355 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
356 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<WordOffset>) processor));
357 } else {
358 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
359 }
360 }
361
362 public Class<WordOffset> getOutputClass() {
363 return WordOffset.class;
364 }
365
366 public void initialize() throws IOException {
367 for (ShreddedReader reader : readers) {
368 reader.fill();
369
370 if (!reader.getBuffer().isAtEnd())
371 queue.add(reader);
372 }
373
374 uninitialized = false;
375 }
376
377 public void run() throws IOException {
378 initialize();
379
380 while (queue.size() > 0) {
381 ShreddedReader top = queue.poll();
382 ShreddedReader next = null;
383 ShreddedBuffer nextBuffer = null;
384
385 assert !top.getBuffer().isAtEnd();
386
387 if (queue.size() > 0) {
388 next = queue.peek();
389 nextBuffer = next.getBuffer();
390 assert !nextBuffer.isAtEnd();
391 }
392
393 top.getBuffer().copyUntil(nextBuffer, processor);
394 if (top.getBuffer().isAtEnd())
395 top.fill();
396
397 if (!top.getBuffer().isAtEnd())
398 queue.add(top);
399 }
400
401 if (closeOnExit)
402 processor.close();
403 }
404
405 public WordOffset read() throws IOException {
406 if (uninitialized)
407 initialize();
408
409 WordOffset result = null;
410
411 while (queue.size() > 0) {
412 ShreddedReader top = queue.poll();
413 result = top.read();
414
415 if (result != null) {
416 if (top.getBuffer().isAtEnd())
417 top.fill();
418
419 queue.offer(top);
420 break;
421 }
422 }
423
424 return result;
425 }
426 }
427 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<WordOffset>, ShreddedSource {
428 public ShreddedProcessor processor;
429 ShreddedBuffer buffer;
430 WordOffset last = new WordOffset();
431 long updateWordCount = -1;
432 long tupleCount = 0;
433 long bufferStartCount = 0;
434 ArrayInput input;
435
436 public ShreddedReader(ArrayInput input) {
437 this.input = input;
438 this.buffer = new ShreddedBuffer();
439 }
440
441 public ShreddedReader(ArrayInput input, int bufferSize) {
442 this.input = input;
443 this.buffer = new ShreddedBuffer(bufferSize);
444 }
445
446 public final int compareTo(ShreddedReader other) {
447 ShreddedBuffer otherBuffer = other.getBuffer();
448
449 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
450 return 0;
451 } else if (buffer.isAtEnd()) {
452 return -1;
453 } else if (otherBuffer.isAtEnd()) {
454 return 1;
455 }
456
457 int result = 0;
458 do {
459 result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
460 if(result != 0) break;
461 } while (false);
462
463 return result;
464 }
465
466 public final ShreddedBuffer getBuffer() {
467 return buffer;
468 }
469
470 public final WordOffset read() throws IOException {
471 if (buffer.isAtEnd()) {
472 fill();
473
474 if (buffer.isAtEnd()) {
475 return null;
476 }
477 }
478
479 assert !buffer.isAtEnd();
480 WordOffset result = new WordOffset();
481
482 result.word = buffer.getWord();
483 result.offset = buffer.getOffset();
484
485 buffer.incrementTuple();
486 buffer.autoIncrementWord();
487
488 return result;
489 }
490
491 public final void fill() throws IOException {
492 try {
493 buffer.reset();
494
495 if (tupleCount != 0) {
496
497 if(updateWordCount - tupleCount > 0) {
498 buffer.words.add(last.word);
499 buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
500 }
501 bufferStartCount = tupleCount;
502 }
503
504 while (!buffer.isFull()) {
505 updateWord();
506 buffer.processTuple(input.readLong());
507 tupleCount++;
508 }
509 } catch(EOFException e) {}
510 }
511
512 public final void updateWord() throws IOException {
513 if (updateWordCount > tupleCount)
514 return;
515
516 last.word = input.readBytes();
517 updateWordCount = tupleCount + input.readInt();
518
519 buffer.processWord(last.word);
520 }
521
522 public void run() throws IOException {
523 while (true) {
524 fill();
525
526 if (buffer.isAtEnd())
527 break;
528
529 buffer.copyUntil(null, processor);
530 }
531 processor.close();
532 }
533
534 public void setProcessor(Step processor) throws IncompatibleProcessorException {
535 if (processor instanceof ShreddedProcessor) {
536 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
537 } else if (processor instanceof WordOffset.Processor) {
538 this.processor = new DuplicateEliminator(new TupleUnshredder((WordOffset.Processor) processor));
539 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
540 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<WordOffset>) processor));
541 } else {
542 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
543 }
544 }
545
546 public Class<WordOffset> getOutputClass() {
547 return WordOffset.class;
548 }
549 }
550
551 public static class DuplicateEliminator implements ShreddedProcessor {
552 public ShreddedProcessor processor;
553 WordOffset last = new WordOffset();
554 boolean wordProcess = true;
555
556 public DuplicateEliminator() {}
557 public DuplicateEliminator(ShreddedProcessor processor) {
558 this.processor = processor;
559 }
560
561 public void setShreddedProcessor(ShreddedProcessor processor) {
562 this.processor = processor;
563 }
564
565 public void processWord(byte[] word) throws IOException {
566 if (wordProcess || Utility.compare(word, last.word) != 0) {
567 last.word = word;
568 processor.processWord(word);
569 wordProcess = false;
570 }
571 }
572
573 public void resetWord() {
574 wordProcess = true;
575 }
576
577 public void processTuple(long offset) throws IOException {
578 processor.processTuple(offset);
579 }
580
581 public void close() throws IOException {
582 processor.close();
583 }
584 }
585 public static class TupleUnshredder implements ShreddedProcessor {
586 WordOffset last = new WordOffset();
587 public org.galagosearch.tupleflow.Processor<WordOffset> processor;
588
589 public TupleUnshredder(WordOffset.Processor processor) {
590 this.processor = processor;
591 }
592
593 public TupleUnshredder(org.galagosearch.tupleflow.Processor<WordOffset> processor) {
594 this.processor = processor;
595 }
596
597 public WordOffset clone(WordOffset object) {
598 WordOffset result = new WordOffset();
599 if (object == null) return result;
600 result.word = object.word;
601 result.offset = object.offset;
602 return result;
603 }
604
605 public void processWord(byte[] word) throws IOException {
606 last.word = word;
607 }
608
609
610 public void processTuple(long offset) throws IOException {
611 last.offset = offset;
612 processor.process(clone(last));
613 }
614
615 public void close() throws IOException {
616 processor.close();
617 }
618 }
619 public static class TupleShredder implements Processor {
620 WordOffset last = new WordOffset();
621 public ShreddedProcessor processor;
622
623 public TupleShredder(ShreddedProcessor processor) {
624 this.processor = processor;
625 }
626
627 public WordOffset clone(WordOffset object) {
628 WordOffset result = new WordOffset();
629 if (object == null) return result;
630 result.word = object.word;
631 result.offset = object.offset;
632 return result;
633 }
634
635 public void process(WordOffset object) throws IOException {
636 boolean processAll = false;
637 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
638 processor.processTuple(object.offset);
639 }
640
641 public Class<WordOffset> getInputClass() {
642 return WordOffset.class;
643 }
644
645 public void close() throws IOException {
646 processor.close();
647 }
648 }
649 }
650 }