1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class WordCount implements Type<WordCount> {
25 public String word;
26 public long count;
27 public long documents;
28
29 public WordCount() {}
30 public WordCount(String word, long count, long documents) {
31 this.word = word;
32 this.count = count;
33 this.documents = documents;
34 }
35
36 public String toString() {
37 return String.format("%s,%d,%d",
38 word, count, documents);
39 }
40
41 public Order<WordCount> getOrder(String... spec) {
42 if (Arrays.equals(spec, new String[] { "+word" })) {
43 return new WordOrder();
44 }
45 return null;
46 }
47
48 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<WordCount> {
49 public void process(WordCount object) throws IOException;
50 public void close() throws IOException;
51 }
52 public interface Source extends Step {
53 }
54 public static class WordOrder implements Order<WordCount> {
55 public int hash(WordCount object) {
56 int h = 0;
57 h += Utility.hash(object.word);
58 return h;
59 }
60 public Comparator<WordCount> greaterThan() {
61 return new Comparator<WordCount>() {
62 public int compare(WordCount one, WordCount two) {
63 int result = 0;
64 do {
65 result = + Utility.compare(one.word, two.word);
66 if(result != 0) break;
67 } while (false);
68 return -result;
69 }
70 };
71 }
72 public Comparator<WordCount> lessThan() {
73 return new Comparator<WordCount>() {
74 public int compare(WordCount one, WordCount two) {
75 int result = 0;
76 do {
77 result = + Utility.compare(one.word, two.word);
78 if(result != 0) break;
79 } while (false);
80 return result;
81 }
82 };
83 }
84 public TypeReader<WordCount> orderedReader(ArrayInput _input) {
85 return new ShreddedReader(_input);
86 }
87
88 public TypeReader<WordCount> orderedReader(ArrayInput _input, int bufferSize) {
89 return new ShreddedReader(_input, bufferSize);
90 }
91 public OrderedWriter<WordCount> orderedWriter(ArrayOutput _output) {
92 ShreddedWriter w = new ShreddedWriter(_output);
93 return new OrderedWriterClass(w);
94 }
95 public static class OrderedWriterClass extends OrderedWriter< WordCount > {
96 WordCount last = null;
97 ShreddedWriter shreddedWriter = null;
98
99 public OrderedWriterClass(ShreddedWriter s) {
100 this.shreddedWriter = s;
101 }
102
103 public void process(WordCount object) throws IOException {
104 boolean processAll = false;
105 if (processAll || last == null || 0 != Utility.compare(object.word, last.word)) { processAll = true; shreddedWriter.processWord(object.word); }
106 shreddedWriter.processTuple(object.count, object.documents);
107 last = object;
108 }
109
110 public void close() throws IOException {
111 shreddedWriter.close();
112 }
113
114 public Class<WordCount> getInputClass() {
115 return WordCount.class;
116 }
117 }
118 public ReaderSource<WordCount> orderedCombiner(Collection<TypeReader<WordCount>> readers, boolean closeOnExit) {
119 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
120
121 for (TypeReader<WordCount> reader : readers) {
122 shreddedReaders.add((ShreddedReader)reader);
123 }
124
125 return new ShreddedCombiner(shreddedReaders, closeOnExit);
126 }
127 public WordCount clone(WordCount object) {
128 WordCount result = new WordCount();
129 if (object == null) return result;
130 result.word = object.word;
131 result.count = object.count;
132 result.documents = object.documents;
133 return result;
134 }
135 public Class<WordCount> getOrderedClass() {
136 return WordCount.class;
137 }
138 public String[] getOrderSpec() {
139 return new String[] {"+word"};
140 }
141
142 public static String getSpecString() {
143 return "+word";
144 }
145
146 public interface ShreddedProcessor extends Step {
147 public void processWord(String word) throws IOException;
148 public void processTuple(long count, long documents) throws IOException;
149 public void close() throws IOException;
150 }
151 public interface ShreddedSource extends Step {
152 }
153
154 public static class ShreddedWriter implements ShreddedProcessor {
155 ArrayOutput output;
156 ShreddedBuffer buffer = new ShreddedBuffer();
157 String lastWord;
158 boolean lastFlush = false;
159
160 public ShreddedWriter(ArrayOutput output) {
161 this.output = output;
162 }
163
164 public void close() throws IOException {
165 flush();
166 }
167
168 public void processWord(String word) {
169 lastWord = word;
170 buffer.processWord(word);
171 }
172 public final void processTuple(long count, long documents) throws IOException {
173 if (lastFlush) {
174 if(buffer.words.size() == 0) buffer.processWord(lastWord);
175 lastFlush = false;
176 }
177 buffer.processTuple(count, documents);
178 if (buffer.isFull())
179 flush();
180 }
181 public final void flushTuples(int pauseIndex) throws IOException {
182
183 while (buffer.getReadIndex() < pauseIndex) {
184
185 output.writeLong(buffer.getCount());
186 output.writeLong(buffer.getDocuments());
187 buffer.incrementTuple();
188 }
189 }
190 public final void flushWord(int pauseIndex) throws IOException {
191 while (buffer.getReadIndex() < pauseIndex) {
192 int nextPause = buffer.getWordEndIndex();
193 int count = nextPause - buffer.getReadIndex();
194
195 output.writeString(buffer.getWord());
196 output.writeInt(count);
197 buffer.incrementWord();
198
199 flushTuples(nextPause);
200 assert nextPause == buffer.getReadIndex();
201 }
202 }
203 public void flush() throws IOException {
204 flushWord(buffer.getWriteIndex());
205 buffer.reset();
206 lastFlush = true;
207 }
208 }
209 public static class ShreddedBuffer {
210 ArrayList<String> words = new ArrayList();
211 ArrayList<Integer> wordTupleIdx = new ArrayList();
212 int wordReadIdx = 0;
213
214 long[] counts;
215 long[] documentss;
216 int writeTupleIndex = 0;
217 int readTupleIndex = 0;
218 int batchSize;
219
220 public ShreddedBuffer(int batchSize) {
221 this.batchSize = batchSize;
222
223 counts = new long[batchSize];
224 documentss = new long[batchSize];
225 }
226
227 public ShreddedBuffer() {
228 this(10000);
229 }
230
231 public void processWord(String word) {
232 words.add(word);
233 wordTupleIdx.add(writeTupleIndex);
234 }
235 public void processTuple(long count, long documents) {
236 assert words.size() > 0;
237 counts[writeTupleIndex] = count;
238 documentss[writeTupleIndex] = documents;
239 writeTupleIndex++;
240 }
241 public void resetData() {
242 words.clear();
243 wordTupleIdx.clear();
244 writeTupleIndex = 0;
245 }
246
247 public void resetRead() {
248 readTupleIndex = 0;
249 wordReadIdx = 0;
250 }
251
252 public void reset() {
253 resetData();
254 resetRead();
255 }
256 public boolean isFull() {
257 return writeTupleIndex >= batchSize;
258 }
259
260 public boolean isEmpty() {
261 return writeTupleIndex == 0;
262 }
263
264 public boolean isAtEnd() {
265 return readTupleIndex >= writeTupleIndex;
266 }
267 public void incrementWord() {
268 wordReadIdx++;
269 }
270
271 public void autoIncrementWord() {
272 while (readTupleIndex >= getWordEndIndex() && readTupleIndex < writeTupleIndex)
273 wordReadIdx++;
274 }
275 public void incrementTuple() {
276 readTupleIndex++;
277 }
278 public int getWordEndIndex() {
279 if ((wordReadIdx+1) >= wordTupleIdx.size())
280 return writeTupleIndex;
281 return wordTupleIdx.get(wordReadIdx+1);
282 }
283 public int getReadIndex() {
284 return readTupleIndex;
285 }
286
287 public int getWriteIndex() {
288 return writeTupleIndex;
289 }
290 public String getWord() {
291 assert readTupleIndex < writeTupleIndex;
292 assert wordReadIdx < words.size();
293
294 return words.get(wordReadIdx);
295 }
296 public long getCount() {
297 assert readTupleIndex < writeTupleIndex;
298 return counts[readTupleIndex];
299 }
300 public long getDocuments() {
301 assert readTupleIndex < writeTupleIndex;
302 return documentss[readTupleIndex];
303 }
304 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
305 while (getReadIndex() < endIndex) {
306 output.processTuple(getCount(), getDocuments());
307 incrementTuple();
308 }
309 }
310 public void copyUntilIndexWord(int endIndex, ShreddedProcessor output) throws IOException {
311 while (getReadIndex() < endIndex) {
312 output.processWord(getWord());
313 assert getWordEndIndex() <= endIndex;
314 copyTuples(getWordEndIndex(), output);
315 incrementWord();
316 }
317 }
318 public void copyUntilWord(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
319 while (!isAtEnd()) {
320 if (other != null) {
321 assert !other.isAtEnd();
322 int c = + Utility.compare(getWord(), other.getWord());
323
324 if (c > 0) {
325 break;
326 }
327
328 output.processWord(getWord());
329
330 copyTuples(getWordEndIndex(), output);
331 } else {
332 output.processWord(getWord());
333 copyTuples(getWordEndIndex(), output);
334 }
335 incrementWord();
336
337
338 }
339 }
340 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
341 copyUntilWord(other, output);
342 }
343
344 }
345 public static class ShreddedCombiner implements ReaderSource<WordCount>, ShreddedSource {
346 public ShreddedProcessor processor;
347 Collection<ShreddedReader> readers;
348 boolean closeOnExit = false;
349 boolean uninitialized = true;
350 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
351
352 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
353 this.readers = readers;
354 this.closeOnExit = closeOnExit;
355 }
356
357 public void setProcessor(Step processor) throws IncompatibleProcessorException {
358 if (processor instanceof ShreddedProcessor) {
359 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
360 } else if (processor instanceof WordCount.Processor) {
361 this.processor = new DuplicateEliminator(new TupleUnshredder((WordCount.Processor) processor));
362 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
363 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<WordCount>) processor));
364 } else {
365 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
366 }
367 }
368
369 public Class<WordCount> getOutputClass() {
370 return WordCount.class;
371 }
372
373 public void initialize() throws IOException {
374 for (ShreddedReader reader : readers) {
375 reader.fill();
376
377 if (!reader.getBuffer().isAtEnd())
378 queue.add(reader);
379 }
380
381 uninitialized = false;
382 }
383
384 public void run() throws IOException {
385 initialize();
386
387 while (queue.size() > 0) {
388 ShreddedReader top = queue.poll();
389 ShreddedReader next = null;
390 ShreddedBuffer nextBuffer = null;
391
392 assert !top.getBuffer().isAtEnd();
393
394 if (queue.size() > 0) {
395 next = queue.peek();
396 nextBuffer = next.getBuffer();
397 assert !nextBuffer.isAtEnd();
398 }
399
400 top.getBuffer().copyUntil(nextBuffer, processor);
401 if (top.getBuffer().isAtEnd())
402 top.fill();
403
404 if (!top.getBuffer().isAtEnd())
405 queue.add(top);
406 }
407
408 if (closeOnExit)
409 processor.close();
410 }
411
412 public WordCount read() throws IOException {
413 if (uninitialized)
414 initialize();
415
416 WordCount result = null;
417
418 while (queue.size() > 0) {
419 ShreddedReader top = queue.poll();
420 result = top.read();
421
422 if (result != null) {
423 if (top.getBuffer().isAtEnd())
424 top.fill();
425
426 queue.offer(top);
427 break;
428 }
429 }
430
431 return result;
432 }
433 }
434 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<WordCount>, ShreddedSource {
435 public ShreddedProcessor processor;
436 ShreddedBuffer buffer;
437 WordCount last = new WordCount();
438 long updateWordCount = -1;
439 long tupleCount = 0;
440 long bufferStartCount = 0;
441 ArrayInput input;
442
443 public ShreddedReader(ArrayInput input) {
444 this.input = input;
445 this.buffer = new ShreddedBuffer();
446 }
447
448 public ShreddedReader(ArrayInput input, int bufferSize) {
449 this.input = input;
450 this.buffer = new ShreddedBuffer(bufferSize);
451 }
452
453 public final int compareTo(ShreddedReader other) {
454 ShreddedBuffer otherBuffer = other.getBuffer();
455
456 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
457 return 0;
458 } else if (buffer.isAtEnd()) {
459 return -1;
460 } else if (otherBuffer.isAtEnd()) {
461 return 1;
462 }
463
464 int result = 0;
465 do {
466 result = + Utility.compare(buffer.getWord(), otherBuffer.getWord());
467 if(result != 0) break;
468 } while (false);
469
470 return result;
471 }
472
473 public final ShreddedBuffer getBuffer() {
474 return buffer;
475 }
476
477 public final WordCount read() throws IOException {
478 if (buffer.isAtEnd()) {
479 fill();
480
481 if (buffer.isAtEnd()) {
482 return null;
483 }
484 }
485
486 assert !buffer.isAtEnd();
487 WordCount result = new WordCount();
488
489 result.word = buffer.getWord();
490 result.count = buffer.getCount();
491 result.documents = buffer.getDocuments();
492
493 buffer.incrementTuple();
494 buffer.autoIncrementWord();
495
496 return result;
497 }
498
499 public final void fill() throws IOException {
500 try {
501 buffer.reset();
502
503 if (tupleCount != 0) {
504
505 if(updateWordCount - tupleCount > 0) {
506 buffer.words.add(last.word);
507 buffer.wordTupleIdx.add((int) (updateWordCount - tupleCount));
508 }
509 bufferStartCount = tupleCount;
510 }
511
512 while (!buffer.isFull()) {
513 updateWord();
514 buffer.processTuple(input.readLong(), input.readLong());
515 tupleCount++;
516 }
517 } catch(EOFException e) {}
518 }
519
520 public final void updateWord() throws IOException {
521 if (updateWordCount > tupleCount)
522 return;
523
524 last.word = input.readString();
525 updateWordCount = tupleCount + input.readInt();
526
527 buffer.processWord(last.word);
528 }
529
530 public void run() throws IOException {
531 while (true) {
532 fill();
533
534 if (buffer.isAtEnd())
535 break;
536
537 buffer.copyUntil(null, processor);
538 }
539 processor.close();
540 }
541
542 public void setProcessor(Step processor) throws IncompatibleProcessorException {
543 if (processor instanceof ShreddedProcessor) {
544 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
545 } else if (processor instanceof WordCount.Processor) {
546 this.processor = new DuplicateEliminator(new TupleUnshredder((WordCount.Processor) processor));
547 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
548 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<WordCount>) processor));
549 } else {
550 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
551 }
552 }
553
554 public Class<WordCount> getOutputClass() {
555 return WordCount.class;
556 }
557 }
558
559 public static class DuplicateEliminator implements ShreddedProcessor {
560 public ShreddedProcessor processor;
561 WordCount last = new WordCount();
562 boolean wordProcess = true;
563
564 public DuplicateEliminator() {}
565 public DuplicateEliminator(ShreddedProcessor processor) {
566 this.processor = processor;
567 }
568
569 public void setShreddedProcessor(ShreddedProcessor processor) {
570 this.processor = processor;
571 }
572
573 public void processWord(String word) throws IOException {
574 if (wordProcess || Utility.compare(word, last.word) != 0) {
575 last.word = word;
576 processor.processWord(word);
577 wordProcess = false;
578 }
579 }
580
581 public void resetWord() {
582 wordProcess = true;
583 }
584
585 public void processTuple(long count, long documents) throws IOException {
586 processor.processTuple(count, documents);
587 }
588
589 public void close() throws IOException {
590 processor.close();
591 }
592 }
593 public static class TupleUnshredder implements ShreddedProcessor {
594 WordCount last = new WordCount();
595 public org.galagosearch.tupleflow.Processor<WordCount> processor;
596
597 public TupleUnshredder(WordCount.Processor processor) {
598 this.processor = processor;
599 }
600
601 public TupleUnshredder(org.galagosearch.tupleflow.Processor<WordCount> processor) {
602 this.processor = processor;
603 }
604
605 public WordCount clone(WordCount object) {
606 WordCount result = new WordCount();
607 if (object == null) return result;
608 result.word = object.word;
609 result.count = object.count;
610 result.documents = object.documents;
611 return result;
612 }
613
614 public void processWord(String word) throws IOException {
615 last.word = word;
616 }
617
618
619 public void processTuple(long count, long documents) throws IOException {
620 last.count = count;
621 last.documents = documents;
622 processor.process(clone(last));
623 }
624
625 public void close() throws IOException {
626 processor.close();
627 }
628 }
629 public static class TupleShredder implements Processor {
630 WordCount last = new WordCount();
631 public ShreddedProcessor processor;
632
633 public TupleShredder(ShreddedProcessor processor) {
634 this.processor = processor;
635 }
636
637 public WordCount clone(WordCount object) {
638 WordCount result = new WordCount();
639 if (object == null) return result;
640 result.word = object.word;
641 result.count = object.count;
642 result.documents = object.documents;
643 return result;
644 }
645
646 public void process(WordCount object) throws IOException {
647 boolean processAll = false;
648 if(last == null || Utility.compare(last.word, object.word) != 0 || processAll) { processor.processWord(object.word); processAll = true; }
649 processor.processTuple(object.count, object.documents);
650 }
651
652 public Class<WordCount> getInputClass() {
653 return WordCount.class;
654 }
655
656 public void close() throws IOException {
657 processor.close();
658 }
659 }
660 }
661 }