1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class ExtractedLink implements Type<ExtractedLink> {
25 public String srcUrl;
26 public String destUrl;
27 public String anchorText;
28 public boolean noFollow;
29
30 public ExtractedLink() {}
31 public ExtractedLink(String srcUrl, String destUrl, String anchorText, boolean noFollow) {
32 this.srcUrl = srcUrl;
33 this.destUrl = destUrl;
34 this.anchorText = anchorText;
35 this.noFollow = noFollow;
36 }
37
38 public String toString() {
39 return String.format("%s,%s,%s,%b",
40 srcUrl, destUrl, anchorText, noFollow);
41 }
42
43 public Order<ExtractedLink> getOrder(String... spec) {
44 if (Arrays.equals(spec, new String[] { "+destUrl" })) {
45 return new DestUrlOrder();
46 }
47 if (Arrays.equals(spec, new String[] { "+srcUrl" })) {
48 return new SrcUrlOrder();
49 }
50 return null;
51 }
52
53 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<ExtractedLink> {
54 public void process(ExtractedLink object) throws IOException;
55 public void close() throws IOException;
56 }
57 public interface Source extends Step {
58 }
59 public static class DestUrlOrder implements Order<ExtractedLink> {
60 public int hash(ExtractedLink object) {
61 int h = 0;
62 h += Utility.hash(object.destUrl);
63 return h;
64 }
65 public Comparator<ExtractedLink> greaterThan() {
66 return new Comparator<ExtractedLink>() {
67 public int compare(ExtractedLink one, ExtractedLink two) {
68 int result = 0;
69 do {
70 result = + Utility.compare(one.destUrl, two.destUrl);
71 if(result != 0) break;
72 } while (false);
73 return -result;
74 }
75 };
76 }
77 public Comparator<ExtractedLink> lessThan() {
78 return new Comparator<ExtractedLink>() {
79 public int compare(ExtractedLink one, ExtractedLink two) {
80 int result = 0;
81 do {
82 result = + Utility.compare(one.destUrl, two.destUrl);
83 if(result != 0) break;
84 } while (false);
85 return result;
86 }
87 };
88 }
89 public TypeReader<ExtractedLink> orderedReader(ArrayInput _input) {
90 return new ShreddedReader(_input);
91 }
92
93 public TypeReader<ExtractedLink> orderedReader(ArrayInput _input, int bufferSize) {
94 return new ShreddedReader(_input, bufferSize);
95 }
96 public OrderedWriter<ExtractedLink> orderedWriter(ArrayOutput _output) {
97 ShreddedWriter w = new ShreddedWriter(_output);
98 return new OrderedWriterClass(w);
99 }
100 public static class OrderedWriterClass extends OrderedWriter< ExtractedLink > {
101 ExtractedLink last = null;
102 ShreddedWriter shreddedWriter = null;
103
104 public OrderedWriterClass(ShreddedWriter s) {
105 this.shreddedWriter = s;
106 }
107
108 public void process(ExtractedLink object) throws IOException {
109 boolean processAll = false;
110 if (processAll || last == null || 0 != Utility.compare(object.destUrl, last.destUrl)) { processAll = true; shreddedWriter.processDestUrl(object.destUrl); }
111 shreddedWriter.processTuple(object.srcUrl, object.anchorText, object.noFollow);
112 last = object;
113 }
114
115 public void close() throws IOException {
116 shreddedWriter.close();
117 }
118
119 public Class<ExtractedLink> getInputClass() {
120 return ExtractedLink.class;
121 }
122 }
123 public ReaderSource<ExtractedLink> orderedCombiner(Collection<TypeReader<ExtractedLink>> readers, boolean closeOnExit) {
124 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
125
126 for (TypeReader<ExtractedLink> reader : readers) {
127 shreddedReaders.add((ShreddedReader)reader);
128 }
129
130 return new ShreddedCombiner(shreddedReaders, closeOnExit);
131 }
132 public ExtractedLink clone(ExtractedLink object) {
133 ExtractedLink result = new ExtractedLink();
134 if (object == null) return result;
135 result.srcUrl = object.srcUrl;
136 result.destUrl = object.destUrl;
137 result.anchorText = object.anchorText;
138 result.noFollow = object.noFollow;
139 return result;
140 }
141 public Class<ExtractedLink> getOrderedClass() {
142 return ExtractedLink.class;
143 }
144 public String[] getOrderSpec() {
145 return new String[] {"+destUrl"};
146 }
147
148 public static String getSpecString() {
149 return "+destUrl";
150 }
151
152 public interface ShreddedProcessor extends Step {
153 public void processDestUrl(String destUrl) throws IOException;
154 public void processTuple(String srcUrl, String anchorText, boolean noFollow) throws IOException;
155 public void close() throws IOException;
156 }
157 public interface ShreddedSource extends Step {
158 }
159
160 public static class ShreddedWriter implements ShreddedProcessor {
161 ArrayOutput output;
162 ShreddedBuffer buffer = new ShreddedBuffer();
163 String lastDestUrl;
164 boolean lastFlush = false;
165
166 public ShreddedWriter(ArrayOutput output) {
167 this.output = output;
168 }
169
170 public void close() throws IOException {
171 flush();
172 }
173
174 public void processDestUrl(String destUrl) {
175 lastDestUrl = destUrl;
176 buffer.processDestUrl(destUrl);
177 }
178 public final void processTuple(String srcUrl, String anchorText, boolean noFollow) throws IOException {
179 if (lastFlush) {
180 if(buffer.destUrls.size() == 0) buffer.processDestUrl(lastDestUrl);
181 lastFlush = false;
182 }
183 buffer.processTuple(srcUrl, anchorText, noFollow);
184 if (buffer.isFull())
185 flush();
186 }
187 public final void flushTuples(int pauseIndex) throws IOException {
188
189 while (buffer.getReadIndex() < pauseIndex) {
190
191 output.writeString(buffer.getSrcUrl());
192 output.writeString(buffer.getAnchorText());
193 output.writeBoolean(buffer.getNoFollow());
194 buffer.incrementTuple();
195 }
196 }
197 public final void flushDestUrl(int pauseIndex) throws IOException {
198 while (buffer.getReadIndex() < pauseIndex) {
199 int nextPause = buffer.getDestUrlEndIndex();
200 int count = nextPause - buffer.getReadIndex();
201
202 output.writeString(buffer.getDestUrl());
203 output.writeInt(count);
204 buffer.incrementDestUrl();
205
206 flushTuples(nextPause);
207 assert nextPause == buffer.getReadIndex();
208 }
209 }
210 public void flush() throws IOException {
211 flushDestUrl(buffer.getWriteIndex());
212 buffer.reset();
213 lastFlush = true;
214 }
215 }
216 public static class ShreddedBuffer {
217 ArrayList<String> destUrls = new ArrayList();
218 ArrayList<Integer> destUrlTupleIdx = new ArrayList();
219 int destUrlReadIdx = 0;
220
221 String[] srcUrls;
222 String[] anchorTexts;
223 boolean[] noFollows;
224 int writeTupleIndex = 0;
225 int readTupleIndex = 0;
226 int batchSize;
227
228 public ShreddedBuffer(int batchSize) {
229 this.batchSize = batchSize;
230
231 srcUrls = new String[batchSize];
232 anchorTexts = new String[batchSize];
233 noFollows = new boolean[batchSize];
234 }
235
236 public ShreddedBuffer() {
237 this(10000);
238 }
239
240 public void processDestUrl(String destUrl) {
241 destUrls.add(destUrl);
242 destUrlTupleIdx.add(writeTupleIndex);
243 }
244 public void processTuple(String srcUrl, String anchorText, boolean noFollow) {
245 assert destUrls.size() > 0;
246 srcUrls[writeTupleIndex] = srcUrl;
247 anchorTexts[writeTupleIndex] = anchorText;
248 noFollows[writeTupleIndex] = noFollow;
249 writeTupleIndex++;
250 }
251 public void resetData() {
252 destUrls.clear();
253 destUrlTupleIdx.clear();
254 writeTupleIndex = 0;
255 }
256
257 public void resetRead() {
258 readTupleIndex = 0;
259 destUrlReadIdx = 0;
260 }
261
262 public void reset() {
263 resetData();
264 resetRead();
265 }
266 public boolean isFull() {
267 return writeTupleIndex >= batchSize;
268 }
269
270 public boolean isEmpty() {
271 return writeTupleIndex == 0;
272 }
273
274 public boolean isAtEnd() {
275 return readTupleIndex >= writeTupleIndex;
276 }
277 public void incrementDestUrl() {
278 destUrlReadIdx++;
279 }
280
281 public void autoIncrementDestUrl() {
282 while (readTupleIndex >= getDestUrlEndIndex() && readTupleIndex < writeTupleIndex)
283 destUrlReadIdx++;
284 }
285 public void incrementTuple() {
286 readTupleIndex++;
287 }
288 public int getDestUrlEndIndex() {
289 if ((destUrlReadIdx+1) >= destUrlTupleIdx.size())
290 return writeTupleIndex;
291 return destUrlTupleIdx.get(destUrlReadIdx+1);
292 }
293 public int getReadIndex() {
294 return readTupleIndex;
295 }
296
297 public int getWriteIndex() {
298 return writeTupleIndex;
299 }
300 public String getDestUrl() {
301 assert readTupleIndex < writeTupleIndex;
302 assert destUrlReadIdx < destUrls.size();
303
304 return destUrls.get(destUrlReadIdx);
305 }
306 public String getSrcUrl() {
307 assert readTupleIndex < writeTupleIndex;
308 return srcUrls[readTupleIndex];
309 }
310 public String getAnchorText() {
311 assert readTupleIndex < writeTupleIndex;
312 return anchorTexts[readTupleIndex];
313 }
314 public boolean getNoFollow() {
315 assert readTupleIndex < writeTupleIndex;
316 return noFollows[readTupleIndex];
317 }
318 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
319 while (getReadIndex() < endIndex) {
320 output.processTuple(getSrcUrl(), getAnchorText(), getNoFollow());
321 incrementTuple();
322 }
323 }
324 public void copyUntilIndexDestUrl(int endIndex, ShreddedProcessor output) throws IOException {
325 while (getReadIndex() < endIndex) {
326 output.processDestUrl(getDestUrl());
327 assert getDestUrlEndIndex() <= endIndex;
328 copyTuples(getDestUrlEndIndex(), output);
329 incrementDestUrl();
330 }
331 }
332 public void copyUntilDestUrl(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
333 while (!isAtEnd()) {
334 if (other != null) {
335 assert !other.isAtEnd();
336 int c = + Utility.compare(getDestUrl(), other.getDestUrl());
337
338 if (c > 0) {
339 break;
340 }
341
342 output.processDestUrl(getDestUrl());
343
344 copyTuples(getDestUrlEndIndex(), output);
345 } else {
346 output.processDestUrl(getDestUrl());
347 copyTuples(getDestUrlEndIndex(), output);
348 }
349 incrementDestUrl();
350
351
352 }
353 }
354 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
355 copyUntilDestUrl(other, output);
356 }
357
358 }
359 public static class ShreddedCombiner implements ReaderSource<ExtractedLink>, ShreddedSource {
360 public ShreddedProcessor processor;
361 Collection<ShreddedReader> readers;
362 boolean closeOnExit = false;
363 boolean uninitialized = true;
364 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
365
366 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
367 this.readers = readers;
368 this.closeOnExit = closeOnExit;
369 }
370
371 public void setProcessor(Step processor) throws IncompatibleProcessorException {
372 if (processor instanceof ShreddedProcessor) {
373 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
374 } else if (processor instanceof ExtractedLink.Processor) {
375 this.processor = new DuplicateEliminator(new TupleUnshredder((ExtractedLink.Processor) processor));
376 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
377 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<ExtractedLink>) processor));
378 } else {
379 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
380 }
381 }
382
383 public Class<ExtractedLink> getOutputClass() {
384 return ExtractedLink.class;
385 }
386
387 public void initialize() throws IOException {
388 for (ShreddedReader reader : readers) {
389 reader.fill();
390
391 if (!reader.getBuffer().isAtEnd())
392 queue.add(reader);
393 }
394
395 uninitialized = false;
396 }
397
398 public void run() throws IOException {
399 initialize();
400
401 while (queue.size() > 0) {
402 ShreddedReader top = queue.poll();
403 ShreddedReader next = null;
404 ShreddedBuffer nextBuffer = null;
405
406 assert !top.getBuffer().isAtEnd();
407
408 if (queue.size() > 0) {
409 next = queue.peek();
410 nextBuffer = next.getBuffer();
411 assert !nextBuffer.isAtEnd();
412 }
413
414 top.getBuffer().copyUntil(nextBuffer, processor);
415 if (top.getBuffer().isAtEnd())
416 top.fill();
417
418 if (!top.getBuffer().isAtEnd())
419 queue.add(top);
420 }
421
422 if (closeOnExit)
423 processor.close();
424 }
425
426 public ExtractedLink read() throws IOException {
427 if (uninitialized)
428 initialize();
429
430 ExtractedLink result = null;
431
432 while (queue.size() > 0) {
433 ShreddedReader top = queue.poll();
434 result = top.read();
435
436 if (result != null) {
437 if (top.getBuffer().isAtEnd())
438 top.fill();
439
440 queue.offer(top);
441 break;
442 }
443 }
444
445 return result;
446 }
447 }
448 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<ExtractedLink>, ShreddedSource {
449 public ShreddedProcessor processor;
450 ShreddedBuffer buffer;
451 ExtractedLink last = new ExtractedLink();
452 long updateDestUrlCount = -1;
453 long tupleCount = 0;
454 long bufferStartCount = 0;
455 ArrayInput input;
456
457 public ShreddedReader(ArrayInput input) {
458 this.input = input;
459 this.buffer = new ShreddedBuffer();
460 }
461
462 public ShreddedReader(ArrayInput input, int bufferSize) {
463 this.input = input;
464 this.buffer = new ShreddedBuffer(bufferSize);
465 }
466
467 public final int compareTo(ShreddedReader other) {
468 ShreddedBuffer otherBuffer = other.getBuffer();
469
470 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
471 return 0;
472 } else if (buffer.isAtEnd()) {
473 return -1;
474 } else if (otherBuffer.isAtEnd()) {
475 return 1;
476 }
477
478 int result = 0;
479 do {
480 result = + Utility.compare(buffer.getDestUrl(), otherBuffer.getDestUrl());
481 if(result != 0) break;
482 } while (false);
483
484 return result;
485 }
486
487 public final ShreddedBuffer getBuffer() {
488 return buffer;
489 }
490
491 public final ExtractedLink read() throws IOException {
492 if (buffer.isAtEnd()) {
493 fill();
494
495 if (buffer.isAtEnd()) {
496 return null;
497 }
498 }
499
500 assert !buffer.isAtEnd();
501 ExtractedLink result = new ExtractedLink();
502
503 result.destUrl = buffer.getDestUrl();
504 result.srcUrl = buffer.getSrcUrl();
505 result.anchorText = buffer.getAnchorText();
506 result.noFollow = buffer.getNoFollow();
507
508 buffer.incrementTuple();
509 buffer.autoIncrementDestUrl();
510
511 return result;
512 }
513
514 public final void fill() throws IOException {
515 try {
516 buffer.reset();
517
518 if (tupleCount != 0) {
519
520 if(updateDestUrlCount - tupleCount > 0) {
521 buffer.destUrls.add(last.destUrl);
522 buffer.destUrlTupleIdx.add((int) (updateDestUrlCount - tupleCount));
523 }
524 bufferStartCount = tupleCount;
525 }
526
527 while (!buffer.isFull()) {
528 updateDestUrl();
529 buffer.processTuple(input.readString(), input.readString(), input.readBoolean());
530 tupleCount++;
531 }
532 } catch(EOFException e) {}
533 }
534
535 public final void updateDestUrl() throws IOException {
536 if (updateDestUrlCount > tupleCount)
537 return;
538
539 last.destUrl = input.readString();
540 updateDestUrlCount = tupleCount + input.readInt();
541
542 buffer.processDestUrl(last.destUrl);
543 }
544
545 public void run() throws IOException {
546 while (true) {
547 fill();
548
549 if (buffer.isAtEnd())
550 break;
551
552 buffer.copyUntil(null, processor);
553 }
554 processor.close();
555 }
556
557 public void setProcessor(Step processor) throws IncompatibleProcessorException {
558 if (processor instanceof ShreddedProcessor) {
559 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
560 } else if (processor instanceof ExtractedLink.Processor) {
561 this.processor = new DuplicateEliminator(new TupleUnshredder((ExtractedLink.Processor) processor));
562 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
563 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<ExtractedLink>) processor));
564 } else {
565 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
566 }
567 }
568
569 public Class<ExtractedLink> getOutputClass() {
570 return ExtractedLink.class;
571 }
572 }
573
574 public static class DuplicateEliminator implements ShreddedProcessor {
575 public ShreddedProcessor processor;
576 ExtractedLink last = new ExtractedLink();
577 boolean destUrlProcess = true;
578
579 public DuplicateEliminator() {}
580 public DuplicateEliminator(ShreddedProcessor processor) {
581 this.processor = processor;
582 }
583
584 public void setShreddedProcessor(ShreddedProcessor processor) {
585 this.processor = processor;
586 }
587
588 public void processDestUrl(String destUrl) throws IOException {
589 if (destUrlProcess || Utility.compare(destUrl, last.destUrl) != 0) {
590 last.destUrl = destUrl;
591 processor.processDestUrl(destUrl);
592 destUrlProcess = false;
593 }
594 }
595
596 public void resetDestUrl() {
597 destUrlProcess = true;
598 }
599
600 public void processTuple(String srcUrl, String anchorText, boolean noFollow) throws IOException {
601 processor.processTuple(srcUrl, anchorText, noFollow);
602 }
603
604 public void close() throws IOException {
605 processor.close();
606 }
607 }
608 public static class TupleUnshredder implements ShreddedProcessor {
609 ExtractedLink last = new ExtractedLink();
610 public org.galagosearch.tupleflow.Processor<ExtractedLink> processor;
611
612 public TupleUnshredder(ExtractedLink.Processor processor) {
613 this.processor = processor;
614 }
615
616 public TupleUnshredder(org.galagosearch.tupleflow.Processor<ExtractedLink> processor) {
617 this.processor = processor;
618 }
619
620 public ExtractedLink clone(ExtractedLink object) {
621 ExtractedLink result = new ExtractedLink();
622 if (object == null) return result;
623 result.srcUrl = object.srcUrl;
624 result.destUrl = object.destUrl;
625 result.anchorText = object.anchorText;
626 result.noFollow = object.noFollow;
627 return result;
628 }
629
630 public void processDestUrl(String destUrl) throws IOException {
631 last.destUrl = destUrl;
632 }
633
634
635 public void processTuple(String srcUrl, String anchorText, boolean noFollow) throws IOException {
636 last.srcUrl = srcUrl;
637 last.anchorText = anchorText;
638 last.noFollow = noFollow;
639 processor.process(clone(last));
640 }
641
642 public void close() throws IOException {
643 processor.close();
644 }
645 }
646 public static class TupleShredder implements Processor {
647 ExtractedLink last = new ExtractedLink();
648 public ShreddedProcessor processor;
649
650 public TupleShredder(ShreddedProcessor processor) {
651 this.processor = processor;
652 }
653
654 public ExtractedLink clone(ExtractedLink object) {
655 ExtractedLink result = new ExtractedLink();
656 if (object == null) return result;
657 result.srcUrl = object.srcUrl;
658 result.destUrl = object.destUrl;
659 result.anchorText = object.anchorText;
660 result.noFollow = object.noFollow;
661 return result;
662 }
663
664 public void process(ExtractedLink object) throws IOException {
665 boolean processAll = false;
666 if(last == null || Utility.compare(last.destUrl, object.destUrl) != 0 || processAll) { processor.processDestUrl(object.destUrl); processAll = true; }
667 processor.processTuple(object.srcUrl, object.anchorText, object.noFollow);
668 }
669
670 public Class<ExtractedLink> getInputClass() {
671 return ExtractedLink.class;
672 }
673
674 public void close() throws IOException {
675 processor.close();
676 }
677 }
678 }
679 public static class SrcUrlOrder implements Order<ExtractedLink> {
680 public int hash(ExtractedLink object) {
681 int h = 0;
682 h += Utility.hash(object.srcUrl);
683 return h;
684 }
685 public Comparator<ExtractedLink> greaterThan() {
686 return new Comparator<ExtractedLink>() {
687 public int compare(ExtractedLink one, ExtractedLink two) {
688 int result = 0;
689 do {
690 result = + Utility.compare(one.srcUrl, two.srcUrl);
691 if(result != 0) break;
692 } while (false);
693 return -result;
694 }
695 };
696 }
697 public Comparator<ExtractedLink> lessThan() {
698 return new Comparator<ExtractedLink>() {
699 public int compare(ExtractedLink one, ExtractedLink two) {
700 int result = 0;
701 do {
702 result = + Utility.compare(one.srcUrl, two.srcUrl);
703 if(result != 0) break;
704 } while (false);
705 return result;
706 }
707 };
708 }
709 public TypeReader<ExtractedLink> orderedReader(ArrayInput _input) {
710 return new ShreddedReader(_input);
711 }
712
713 public TypeReader<ExtractedLink> orderedReader(ArrayInput _input, int bufferSize) {
714 return new ShreddedReader(_input, bufferSize);
715 }
716 public OrderedWriter<ExtractedLink> orderedWriter(ArrayOutput _output) {
717 ShreddedWriter w = new ShreddedWriter(_output);
718 return new OrderedWriterClass(w);
719 }
720 public static class OrderedWriterClass extends OrderedWriter< ExtractedLink > {
721 ExtractedLink last = null;
722 ShreddedWriter shreddedWriter = null;
723
724 public OrderedWriterClass(ShreddedWriter s) {
725 this.shreddedWriter = s;
726 }
727
728 public void process(ExtractedLink object) throws IOException {
729 boolean processAll = false;
730 if (processAll || last == null || 0 != Utility.compare(object.srcUrl, last.srcUrl)) { processAll = true; shreddedWriter.processSrcUrl(object.srcUrl); }
731 shreddedWriter.processTuple(object.destUrl, object.anchorText, object.noFollow);
732 last = object;
733 }
734
735 public void close() throws IOException {
736 shreddedWriter.close();
737 }
738
739 public Class<ExtractedLink> getInputClass() {
740 return ExtractedLink.class;
741 }
742 }
743 public ReaderSource<ExtractedLink> orderedCombiner(Collection<TypeReader<ExtractedLink>> readers, boolean closeOnExit) {
744 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
745
746 for (TypeReader<ExtractedLink> reader : readers) {
747 shreddedReaders.add((ShreddedReader)reader);
748 }
749
750 return new ShreddedCombiner(shreddedReaders, closeOnExit);
751 }
752 public ExtractedLink clone(ExtractedLink object) {
753 ExtractedLink result = new ExtractedLink();
754 if (object == null) return result;
755 result.srcUrl = object.srcUrl;
756 result.destUrl = object.destUrl;
757 result.anchorText = object.anchorText;
758 result.noFollow = object.noFollow;
759 return result;
760 }
761 public Class<ExtractedLink> getOrderedClass() {
762 return ExtractedLink.class;
763 }
764 public String[] getOrderSpec() {
765 return new String[] {"+srcUrl"};
766 }
767
768 public static String getSpecString() {
769 return "+srcUrl";
770 }
771
772 public interface ShreddedProcessor extends Step {
773 public void processSrcUrl(String srcUrl) throws IOException;
774 public void processTuple(String destUrl, String anchorText, boolean noFollow) throws IOException;
775 public void close() throws IOException;
776 }
777 public interface ShreddedSource extends Step {
778 }
779
780 public static class ShreddedWriter implements ShreddedProcessor {
781 ArrayOutput output;
782 ShreddedBuffer buffer = new ShreddedBuffer();
783 String lastSrcUrl;
784 boolean lastFlush = false;
785
786 public ShreddedWriter(ArrayOutput output) {
787 this.output = output;
788 }
789
790 public void close() throws IOException {
791 flush();
792 }
793
794 public void processSrcUrl(String srcUrl) {
795 lastSrcUrl = srcUrl;
796 buffer.processSrcUrl(srcUrl);
797 }
798 public final void processTuple(String destUrl, String anchorText, boolean noFollow) throws IOException {
799 if (lastFlush) {
800 if(buffer.srcUrls.size() == 0) buffer.processSrcUrl(lastSrcUrl);
801 lastFlush = false;
802 }
803 buffer.processTuple(destUrl, anchorText, noFollow);
804 if (buffer.isFull())
805 flush();
806 }
807 public final void flushTuples(int pauseIndex) throws IOException {
808
809 while (buffer.getReadIndex() < pauseIndex) {
810
811 output.writeString(buffer.getDestUrl());
812 output.writeString(buffer.getAnchorText());
813 output.writeBoolean(buffer.getNoFollow());
814 buffer.incrementTuple();
815 }
816 }
817 public final void flushSrcUrl(int pauseIndex) throws IOException {
818 while (buffer.getReadIndex() < pauseIndex) {
819 int nextPause = buffer.getSrcUrlEndIndex();
820 int count = nextPause - buffer.getReadIndex();
821
822 output.writeString(buffer.getSrcUrl());
823 output.writeInt(count);
824 buffer.incrementSrcUrl();
825
826 flushTuples(nextPause);
827 assert nextPause == buffer.getReadIndex();
828 }
829 }
830 public void flush() throws IOException {
831 flushSrcUrl(buffer.getWriteIndex());
832 buffer.reset();
833 lastFlush = true;
834 }
835 }
836 public static class ShreddedBuffer {
837 ArrayList<String> srcUrls = new ArrayList();
838 ArrayList<Integer> srcUrlTupleIdx = new ArrayList();
839 int srcUrlReadIdx = 0;
840
841 String[] destUrls;
842 String[] anchorTexts;
843 boolean[] noFollows;
844 int writeTupleIndex = 0;
845 int readTupleIndex = 0;
846 int batchSize;
847
848 public ShreddedBuffer(int batchSize) {
849 this.batchSize = batchSize;
850
851 destUrls = new String[batchSize];
852 anchorTexts = new String[batchSize];
853 noFollows = new boolean[batchSize];
854 }
855
856 public ShreddedBuffer() {
857 this(10000);
858 }
859
860 public void processSrcUrl(String srcUrl) {
861 srcUrls.add(srcUrl);
862 srcUrlTupleIdx.add(writeTupleIndex);
863 }
864 public void processTuple(String destUrl, String anchorText, boolean noFollow) {
865 assert srcUrls.size() > 0;
866 destUrls[writeTupleIndex] = destUrl;
867 anchorTexts[writeTupleIndex] = anchorText;
868 noFollows[writeTupleIndex] = noFollow;
869 writeTupleIndex++;
870 }
871 public void resetData() {
872 srcUrls.clear();
873 srcUrlTupleIdx.clear();
874 writeTupleIndex = 0;
875 }
876
877 public void resetRead() {
878 readTupleIndex = 0;
879 srcUrlReadIdx = 0;
880 }
881
882 public void reset() {
883 resetData();
884 resetRead();
885 }
886 public boolean isFull() {
887 return writeTupleIndex >= batchSize;
888 }
889
890 public boolean isEmpty() {
891 return writeTupleIndex == 0;
892 }
893
894 public boolean isAtEnd() {
895 return readTupleIndex >= writeTupleIndex;
896 }
897 public void incrementSrcUrl() {
898 srcUrlReadIdx++;
899 }
900
901 public void autoIncrementSrcUrl() {
902 while (readTupleIndex >= getSrcUrlEndIndex() && readTupleIndex < writeTupleIndex)
903 srcUrlReadIdx++;
904 }
905 public void incrementTuple() {
906 readTupleIndex++;
907 }
908 public int getSrcUrlEndIndex() {
909 if ((srcUrlReadIdx+1) >= srcUrlTupleIdx.size())
910 return writeTupleIndex;
911 return srcUrlTupleIdx.get(srcUrlReadIdx+1);
912 }
913 public int getReadIndex() {
914 return readTupleIndex;
915 }
916
917 public int getWriteIndex() {
918 return writeTupleIndex;
919 }
920 public String getSrcUrl() {
921 assert readTupleIndex < writeTupleIndex;
922 assert srcUrlReadIdx < srcUrls.size();
923
924 return srcUrls.get(srcUrlReadIdx);
925 }
926 public String getDestUrl() {
927 assert readTupleIndex < writeTupleIndex;
928 return destUrls[readTupleIndex];
929 }
930 public String getAnchorText() {
931 assert readTupleIndex < writeTupleIndex;
932 return anchorTexts[readTupleIndex];
933 }
934 public boolean getNoFollow() {
935 assert readTupleIndex < writeTupleIndex;
936 return noFollows[readTupleIndex];
937 }
938 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
939 while (getReadIndex() < endIndex) {
940 output.processTuple(getDestUrl(), getAnchorText(), getNoFollow());
941 incrementTuple();
942 }
943 }
944 public void copyUntilIndexSrcUrl(int endIndex, ShreddedProcessor output) throws IOException {
945 while (getReadIndex() < endIndex) {
946 output.processSrcUrl(getSrcUrl());
947 assert getSrcUrlEndIndex() <= endIndex;
948 copyTuples(getSrcUrlEndIndex(), output);
949 incrementSrcUrl();
950 }
951 }
952 public void copyUntilSrcUrl(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
953 while (!isAtEnd()) {
954 if (other != null) {
955 assert !other.isAtEnd();
956 int c = + Utility.compare(getSrcUrl(), other.getSrcUrl());
957
958 if (c > 0) {
959 break;
960 }
961
962 output.processSrcUrl(getSrcUrl());
963
964 copyTuples(getSrcUrlEndIndex(), output);
965 } else {
966 output.processSrcUrl(getSrcUrl());
967 copyTuples(getSrcUrlEndIndex(), output);
968 }
969 incrementSrcUrl();
970
971
972 }
973 }
974 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
975 copyUntilSrcUrl(other, output);
976 }
977
978 }
979 public static class ShreddedCombiner implements ReaderSource<ExtractedLink>, ShreddedSource {
980 public ShreddedProcessor processor;
981 Collection<ShreddedReader> readers;
982 boolean closeOnExit = false;
983 boolean uninitialized = true;
984 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
985
986 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
987 this.readers = readers;
988 this.closeOnExit = closeOnExit;
989 }
990
991 public void setProcessor(Step processor) throws IncompatibleProcessorException {
992 if (processor instanceof ShreddedProcessor) {
993 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
994 } else if (processor instanceof ExtractedLink.Processor) {
995 this.processor = new DuplicateEliminator(new TupleUnshredder((ExtractedLink.Processor) processor));
996 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
997 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<ExtractedLink>) processor));
998 } else {
999 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1000 }
1001 }
1002
1003 public Class<ExtractedLink> getOutputClass() {
1004 return ExtractedLink.class;
1005 }
1006
1007 public void initialize() throws IOException {
1008 for (ShreddedReader reader : readers) {
1009 reader.fill();
1010
1011 if (!reader.getBuffer().isAtEnd())
1012 queue.add(reader);
1013 }
1014
1015 uninitialized = false;
1016 }
1017
1018 public void run() throws IOException {
1019 initialize();
1020
1021 while (queue.size() > 0) {
1022 ShreddedReader top = queue.poll();
1023 ShreddedReader next = null;
1024 ShreddedBuffer nextBuffer = null;
1025
1026 assert !top.getBuffer().isAtEnd();
1027
1028 if (queue.size() > 0) {
1029 next = queue.peek();
1030 nextBuffer = next.getBuffer();
1031 assert !nextBuffer.isAtEnd();
1032 }
1033
1034 top.getBuffer().copyUntil(nextBuffer, processor);
1035 if (top.getBuffer().isAtEnd())
1036 top.fill();
1037
1038 if (!top.getBuffer().isAtEnd())
1039 queue.add(top);
1040 }
1041
1042 if (closeOnExit)
1043 processor.close();
1044 }
1045
1046 public ExtractedLink read() throws IOException {
1047 if (uninitialized)
1048 initialize();
1049
1050 ExtractedLink result = null;
1051
1052 while (queue.size() > 0) {
1053 ShreddedReader top = queue.poll();
1054 result = top.read();
1055
1056 if (result != null) {
1057 if (top.getBuffer().isAtEnd())
1058 top.fill();
1059
1060 queue.offer(top);
1061 break;
1062 }
1063 }
1064
1065 return result;
1066 }
1067 }
1068 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<ExtractedLink>, ShreddedSource {
1069 public ShreddedProcessor processor;
1070 ShreddedBuffer buffer;
1071 ExtractedLink last = new ExtractedLink();
1072 long updateSrcUrlCount = -1;
1073 long tupleCount = 0;
1074 long bufferStartCount = 0;
1075 ArrayInput input;
1076
1077 public ShreddedReader(ArrayInput input) {
1078 this.input = input;
1079 this.buffer = new ShreddedBuffer();
1080 }
1081
1082 public ShreddedReader(ArrayInput input, int bufferSize) {
1083 this.input = input;
1084 this.buffer = new ShreddedBuffer(bufferSize);
1085 }
1086
1087 public final int compareTo(ShreddedReader other) {
1088 ShreddedBuffer otherBuffer = other.getBuffer();
1089
1090 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1091 return 0;
1092 } else if (buffer.isAtEnd()) {
1093 return -1;
1094 } else if (otherBuffer.isAtEnd()) {
1095 return 1;
1096 }
1097
1098 int result = 0;
1099 do {
1100 result = + Utility.compare(buffer.getSrcUrl(), otherBuffer.getSrcUrl());
1101 if(result != 0) break;
1102 } while (false);
1103
1104 return result;
1105 }
1106
1107 public final ShreddedBuffer getBuffer() {
1108 return buffer;
1109 }
1110
1111 public final ExtractedLink read() throws IOException {
1112 if (buffer.isAtEnd()) {
1113 fill();
1114
1115 if (buffer.isAtEnd()) {
1116 return null;
1117 }
1118 }
1119
1120 assert !buffer.isAtEnd();
1121 ExtractedLink result = new ExtractedLink();
1122
1123 result.srcUrl = buffer.getSrcUrl();
1124 result.destUrl = buffer.getDestUrl();
1125 result.anchorText = buffer.getAnchorText();
1126 result.noFollow = buffer.getNoFollow();
1127
1128 buffer.incrementTuple();
1129 buffer.autoIncrementSrcUrl();
1130
1131 return result;
1132 }
1133
1134 public final void fill() throws IOException {
1135 try {
1136 buffer.reset();
1137
1138 if (tupleCount != 0) {
1139
1140 if(updateSrcUrlCount - tupleCount > 0) {
1141 buffer.srcUrls.add(last.srcUrl);
1142 buffer.srcUrlTupleIdx.add((int) (updateSrcUrlCount - tupleCount));
1143 }
1144 bufferStartCount = tupleCount;
1145 }
1146
1147 while (!buffer.isFull()) {
1148 updateSrcUrl();
1149 buffer.processTuple(input.readString(), input.readString(), input.readBoolean());
1150 tupleCount++;
1151 }
1152 } catch(EOFException e) {}
1153 }
1154
1155 public final void updateSrcUrl() throws IOException {
1156 if (updateSrcUrlCount > tupleCount)
1157 return;
1158
1159 last.srcUrl = input.readString();
1160 updateSrcUrlCount = tupleCount + input.readInt();
1161
1162 buffer.processSrcUrl(last.srcUrl);
1163 }
1164
1165 public void run() throws IOException {
1166 while (true) {
1167 fill();
1168
1169 if (buffer.isAtEnd())
1170 break;
1171
1172 buffer.copyUntil(null, processor);
1173 }
1174 processor.close();
1175 }
1176
1177 public void setProcessor(Step processor) throws IncompatibleProcessorException {
1178 if (processor instanceof ShreddedProcessor) {
1179 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1180 } else if (processor instanceof ExtractedLink.Processor) {
1181 this.processor = new DuplicateEliminator(new TupleUnshredder((ExtractedLink.Processor) processor));
1182 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1183 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<ExtractedLink>) processor));
1184 } else {
1185 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
1186 }
1187 }
1188
1189 public Class<ExtractedLink> getOutputClass() {
1190 return ExtractedLink.class;
1191 }
1192 }
1193
1194 public static class DuplicateEliminator implements ShreddedProcessor {
1195 public ShreddedProcessor processor;
1196 ExtractedLink last = new ExtractedLink();
1197 boolean srcUrlProcess = true;
1198
1199 public DuplicateEliminator() {}
1200 public DuplicateEliminator(ShreddedProcessor processor) {
1201 this.processor = processor;
1202 }
1203
1204 public void setShreddedProcessor(ShreddedProcessor processor) {
1205 this.processor = processor;
1206 }
1207
1208 public void processSrcUrl(String srcUrl) throws IOException {
1209 if (srcUrlProcess || Utility.compare(srcUrl, last.srcUrl) != 0) {
1210 last.srcUrl = srcUrl;
1211 processor.processSrcUrl(srcUrl);
1212 srcUrlProcess = false;
1213 }
1214 }
1215
1216 public void resetSrcUrl() {
1217 srcUrlProcess = true;
1218 }
1219
1220 public void processTuple(String destUrl, String anchorText, boolean noFollow) throws IOException {
1221 processor.processTuple(destUrl, anchorText, noFollow);
1222 }
1223
1224 public void close() throws IOException {
1225 processor.close();
1226 }
1227 }
1228 public static class TupleUnshredder implements ShreddedProcessor {
1229 ExtractedLink last = new ExtractedLink();
1230 public org.galagosearch.tupleflow.Processor<ExtractedLink> processor;
1231
1232 public TupleUnshredder(ExtractedLink.Processor processor) {
1233 this.processor = processor;
1234 }
1235
1236 public TupleUnshredder(org.galagosearch.tupleflow.Processor<ExtractedLink> processor) {
1237 this.processor = processor;
1238 }
1239
1240 public ExtractedLink clone(ExtractedLink object) {
1241 ExtractedLink result = new ExtractedLink();
1242 if (object == null) return result;
1243 result.srcUrl = object.srcUrl;
1244 result.destUrl = object.destUrl;
1245 result.anchorText = object.anchorText;
1246 result.noFollow = object.noFollow;
1247 return result;
1248 }
1249
1250 public void processSrcUrl(String srcUrl) throws IOException {
1251 last.srcUrl = srcUrl;
1252 }
1253
1254
1255 public void processTuple(String destUrl, String anchorText, boolean noFollow) throws IOException {
1256 last.destUrl = destUrl;
1257 last.anchorText = anchorText;
1258 last.noFollow = noFollow;
1259 processor.process(clone(last));
1260 }
1261
1262 public void close() throws IOException {
1263 processor.close();
1264 }
1265 }
1266 public static class TupleShredder implements Processor {
1267 ExtractedLink last = new ExtractedLink();
1268 public ShreddedProcessor processor;
1269
1270 public TupleShredder(ShreddedProcessor processor) {
1271 this.processor = processor;
1272 }
1273
1274 public ExtractedLink clone(ExtractedLink object) {
1275 ExtractedLink result = new ExtractedLink();
1276 if (object == null) return result;
1277 result.srcUrl = object.srcUrl;
1278 result.destUrl = object.destUrl;
1279 result.anchorText = object.anchorText;
1280 result.noFollow = object.noFollow;
1281 return result;
1282 }
1283
1284 public void process(ExtractedLink object) throws IOException {
1285 boolean processAll = false;
1286 if(last == null || Utility.compare(last.srcUrl, object.srcUrl) != 0 || processAll) { processor.processSrcUrl(object.srcUrl); processAll = true; }
1287 processor.processTuple(object.destUrl, object.anchorText, object.noFollow);
1288 }
1289
1290 public Class<ExtractedLink> getInputClass() {
1291 return ExtractedLink.class;
1292 }
1293
1294 public void close() throws IOException {
1295 processor.close();
1296 }
1297 }
1298 }
1299 }