1
2
3 package org.galagosearch.core.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class IdentifiedLink implements Type<IdentifiedLink> {
25 public String identifier;
26 public String url;
27 public String anchorText;
28
29 public IdentifiedLink() {}
30 public IdentifiedLink(String identifier, String url, String anchorText) {
31 this.identifier = identifier;
32 this.url = url;
33 this.anchorText = anchorText;
34 }
35
36 public String toString() {
37 return String.format("%s,%s,%s",
38 identifier, url, anchorText);
39 }
40
41 public Order<IdentifiedLink> getOrder(String... spec) {
42 if (Arrays.equals(spec, new String[] { "+url" })) {
43 return new UrlOrder();
44 }
45 return null;
46 }
47
48 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<IdentifiedLink> {
49 public void process(IdentifiedLink object) throws IOException;
50 public void close() throws IOException;
51 }
52 public interface Source extends Step {
53 }
54 public static class UrlOrder implements Order<IdentifiedLink> {
55 public int hash(IdentifiedLink object) {
56 int h = 0;
57 h += Utility.hash(object.url);
58 return h;
59 }
60 public Comparator<IdentifiedLink> greaterThan() {
61 return new Comparator<IdentifiedLink>() {
62 public int compare(IdentifiedLink one, IdentifiedLink two) {
63 int result = 0;
64 do {
65 result = + Utility.compare(one.url, two.url);
66 if(result != 0) break;
67 } while (false);
68 return -result;
69 }
70 };
71 }
72 public Comparator<IdentifiedLink> lessThan() {
73 return new Comparator<IdentifiedLink>() {
74 public int compare(IdentifiedLink one, IdentifiedLink two) {
75 int result = 0;
76 do {
77 result = + Utility.compare(one.url, two.url);
78 if(result != 0) break;
79 } while (false);
80 return result;
81 }
82 };
83 }
84 public TypeReader<IdentifiedLink> orderedReader(ArrayInput _input) {
85 return new ShreddedReader(_input);
86 }
87
88 public TypeReader<IdentifiedLink> orderedReader(ArrayInput _input, int bufferSize) {
89 return new ShreddedReader(_input, bufferSize);
90 }
91 public OrderedWriter<IdentifiedLink> orderedWriter(ArrayOutput _output) {
92 ShreddedWriter w = new ShreddedWriter(_output);
93 return new OrderedWriterClass(w);
94 }
95 public static class OrderedWriterClass extends OrderedWriter< IdentifiedLink > {
96 IdentifiedLink last = null;
97 ShreddedWriter shreddedWriter = null;
98
99 public OrderedWriterClass(ShreddedWriter s) {
100 this.shreddedWriter = s;
101 }
102
103 public void process(IdentifiedLink object) throws IOException {
104 boolean processAll = false;
105 if (processAll || last == null || 0 != Utility.compare(object.url, last.url)) { processAll = true; shreddedWriter.processUrl(object.url); }
106 shreddedWriter.processTuple(object.identifier, object.anchorText);
107 last = object;
108 }
109
110 public void close() throws IOException {
111 shreddedWriter.close();
112 }
113
114 public Class<IdentifiedLink> getInputClass() {
115 return IdentifiedLink.class;
116 }
117 }
118 public ReaderSource<IdentifiedLink> orderedCombiner(Collection<TypeReader<IdentifiedLink>> readers, boolean closeOnExit) {
119 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
120
121 for (TypeReader<IdentifiedLink> reader : readers) {
122 shreddedReaders.add((ShreddedReader)reader);
123 }
124
125 return new ShreddedCombiner(shreddedReaders, closeOnExit);
126 }
127 public IdentifiedLink clone(IdentifiedLink object) {
128 IdentifiedLink result = new IdentifiedLink();
129 if (object == null) return result;
130 result.identifier = object.identifier;
131 result.url = object.url;
132 result.anchorText = object.anchorText;
133 return result;
134 }
135 public Class<IdentifiedLink> getOrderedClass() {
136 return IdentifiedLink.class;
137 }
138 public String[] getOrderSpec() {
139 return new String[] {"+url"};
140 }
141
142 public static String getSpecString() {
143 return "+url";
144 }
145
146 public interface ShreddedProcessor extends Step {
147 public void processUrl(String url) throws IOException;
148 public void processTuple(String identifier, String anchorText) throws IOException;
149 public void close() throws IOException;
150 }
151 public interface ShreddedSource extends Step {
152 }
153
154 public static class ShreddedWriter implements ShreddedProcessor {
155 ArrayOutput output;
156 ShreddedBuffer buffer = new ShreddedBuffer();
157 String lastUrl;
158 boolean lastFlush = false;
159
160 public ShreddedWriter(ArrayOutput output) {
161 this.output = output;
162 }
163
164 public void close() throws IOException {
165 flush();
166 }
167
168 public void processUrl(String url) {
169 lastUrl = url;
170 buffer.processUrl(url);
171 }
172 public final void processTuple(String identifier, String anchorText) throws IOException {
173 if (lastFlush) {
174 if(buffer.urls.size() == 0) buffer.processUrl(lastUrl);
175 lastFlush = false;
176 }
177 buffer.processTuple(identifier, anchorText);
178 if (buffer.isFull())
179 flush();
180 }
181 public final void flushTuples(int pauseIndex) throws IOException {
182
183 while (buffer.getReadIndex() < pauseIndex) {
184
185 output.writeString(buffer.getIdentifier());
186 output.writeString(buffer.getAnchorText());
187 buffer.incrementTuple();
188 }
189 }
190 public final void flushUrl(int pauseIndex) throws IOException {
191 while (buffer.getReadIndex() < pauseIndex) {
192 int nextPause = buffer.getUrlEndIndex();
193 int count = nextPause - buffer.getReadIndex();
194
195 output.writeString(buffer.getUrl());
196 output.writeInt(count);
197 buffer.incrementUrl();
198
199 flushTuples(nextPause);
200 assert nextPause == buffer.getReadIndex();
201 }
202 }
203 public void flush() throws IOException {
204 flushUrl(buffer.getWriteIndex());
205 buffer.reset();
206 lastFlush = true;
207 }
208 }
209 public static class ShreddedBuffer {
210 ArrayList<String> urls = new ArrayList();
211 ArrayList<Integer> urlTupleIdx = new ArrayList();
212 int urlReadIdx = 0;
213
214 String[] identifiers;
215 String[] anchorTexts;
216 int writeTupleIndex = 0;
217 int readTupleIndex = 0;
218 int batchSize;
219
220 public ShreddedBuffer(int batchSize) {
221 this.batchSize = batchSize;
222
223 identifiers = new String[batchSize];
224 anchorTexts = new String[batchSize];
225 }
226
227 public ShreddedBuffer() {
228 this(10000);
229 }
230
231 public void processUrl(String url) {
232 urls.add(url);
233 urlTupleIdx.add(writeTupleIndex);
234 }
235 public void processTuple(String identifier, String anchorText) {
236 assert urls.size() > 0;
237 identifiers[writeTupleIndex] = identifier;
238 anchorTexts[writeTupleIndex] = anchorText;
239 writeTupleIndex++;
240 }
241 public void resetData() {
242 urls.clear();
243 urlTupleIdx.clear();
244 writeTupleIndex = 0;
245 }
246
247 public void resetRead() {
248 readTupleIndex = 0;
249 urlReadIdx = 0;
250 }
251
252 public void reset() {
253 resetData();
254 resetRead();
255 }
256 public boolean isFull() {
257 return writeTupleIndex >= batchSize;
258 }
259
260 public boolean isEmpty() {
261 return writeTupleIndex == 0;
262 }
263
264 public boolean isAtEnd() {
265 return readTupleIndex >= writeTupleIndex;
266 }
267 public void incrementUrl() {
268 urlReadIdx++;
269 }
270
271 public void autoIncrementUrl() {
272 while (readTupleIndex >= getUrlEndIndex() && readTupleIndex < writeTupleIndex)
273 urlReadIdx++;
274 }
275 public void incrementTuple() {
276 readTupleIndex++;
277 }
278 public int getUrlEndIndex() {
279 if ((urlReadIdx+1) >= urlTupleIdx.size())
280 return writeTupleIndex;
281 return urlTupleIdx.get(urlReadIdx+1);
282 }
283 public int getReadIndex() {
284 return readTupleIndex;
285 }
286
287 public int getWriteIndex() {
288 return writeTupleIndex;
289 }
290 public String getUrl() {
291 assert readTupleIndex < writeTupleIndex;
292 assert urlReadIdx < urls.size();
293
294 return urls.get(urlReadIdx);
295 }
296 public String getIdentifier() {
297 assert readTupleIndex < writeTupleIndex;
298 return identifiers[readTupleIndex];
299 }
300 public String getAnchorText() {
301 assert readTupleIndex < writeTupleIndex;
302 return anchorTexts[readTupleIndex];
303 }
304 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
305 while (getReadIndex() < endIndex) {
306 output.processTuple(getIdentifier(), getAnchorText());
307 incrementTuple();
308 }
309 }
310 public void copyUntilIndexUrl(int endIndex, ShreddedProcessor output) throws IOException {
311 while (getReadIndex() < endIndex) {
312 output.processUrl(getUrl());
313 assert getUrlEndIndex() <= endIndex;
314 copyTuples(getUrlEndIndex(), output);
315 incrementUrl();
316 }
317 }
318 public void copyUntilUrl(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
319 while (!isAtEnd()) {
320 if (other != null) {
321 assert !other.isAtEnd();
322 int c = + Utility.compare(getUrl(), other.getUrl());
323
324 if (c > 0) {
325 break;
326 }
327
328 output.processUrl(getUrl());
329
330 copyTuples(getUrlEndIndex(), output);
331 } else {
332 output.processUrl(getUrl());
333 copyTuples(getUrlEndIndex(), output);
334 }
335 incrementUrl();
336
337
338 }
339 }
340 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
341 copyUntilUrl(other, output);
342 }
343
344 }
345 public static class ShreddedCombiner implements ReaderSource<IdentifiedLink>, ShreddedSource {
346 public ShreddedProcessor processor;
347 Collection<ShreddedReader> readers;
348 boolean closeOnExit = false;
349 boolean uninitialized = true;
350 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
351
352 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
353 this.readers = readers;
354 this.closeOnExit = closeOnExit;
355 }
356
357 public void setProcessor(Step processor) throws IncompatibleProcessorException {
358 if (processor instanceof ShreddedProcessor) {
359 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
360 } else if (processor instanceof IdentifiedLink.Processor) {
361 this.processor = new DuplicateEliminator(new TupleUnshredder((IdentifiedLink.Processor) processor));
362 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
363 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<IdentifiedLink>) processor));
364 } else {
365 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
366 }
367 }
368
369 public Class<IdentifiedLink> getOutputClass() {
370 return IdentifiedLink.class;
371 }
372
373 public void initialize() throws IOException {
374 for (ShreddedReader reader : readers) {
375 reader.fill();
376
377 if (!reader.getBuffer().isAtEnd())
378 queue.add(reader);
379 }
380
381 uninitialized = false;
382 }
383
384 public void run() throws IOException {
385 initialize();
386
387 while (queue.size() > 0) {
388 ShreddedReader top = queue.poll();
389 ShreddedReader next = null;
390 ShreddedBuffer nextBuffer = null;
391
392 assert !top.getBuffer().isAtEnd();
393
394 if (queue.size() > 0) {
395 next = queue.peek();
396 nextBuffer = next.getBuffer();
397 assert !nextBuffer.isAtEnd();
398 }
399
400 top.getBuffer().copyUntil(nextBuffer, processor);
401 if (top.getBuffer().isAtEnd())
402 top.fill();
403
404 if (!top.getBuffer().isAtEnd())
405 queue.add(top);
406 }
407
408 if (closeOnExit)
409 processor.close();
410 }
411
412 public IdentifiedLink read() throws IOException {
413 if (uninitialized)
414 initialize();
415
416 IdentifiedLink result = null;
417
418 while (queue.size() > 0) {
419 ShreddedReader top = queue.poll();
420 result = top.read();
421
422 if (result != null) {
423 if (top.getBuffer().isAtEnd())
424 top.fill();
425
426 queue.offer(top);
427 break;
428 }
429 }
430
431 return result;
432 }
433 }
434 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<IdentifiedLink>, ShreddedSource {
435 public ShreddedProcessor processor;
436 ShreddedBuffer buffer;
437 IdentifiedLink last = new IdentifiedLink();
438 long updateUrlCount = -1;
439 long tupleCount = 0;
440 long bufferStartCount = 0;
441 ArrayInput input;
442
443 public ShreddedReader(ArrayInput input) {
444 this.input = input;
445 this.buffer = new ShreddedBuffer();
446 }
447
448 public ShreddedReader(ArrayInput input, int bufferSize) {
449 this.input = input;
450 this.buffer = new ShreddedBuffer(bufferSize);
451 }
452
453 public final int compareTo(ShreddedReader other) {
454 ShreddedBuffer otherBuffer = other.getBuffer();
455
456 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
457 return 0;
458 } else if (buffer.isAtEnd()) {
459 return -1;
460 } else if (otherBuffer.isAtEnd()) {
461 return 1;
462 }
463
464 int result = 0;
465 do {
466 result = + Utility.compare(buffer.getUrl(), otherBuffer.getUrl());
467 if(result != 0) break;
468 } while (false);
469
470 return result;
471 }
472
473 public final ShreddedBuffer getBuffer() {
474 return buffer;
475 }
476
477 public final IdentifiedLink read() throws IOException {
478 if (buffer.isAtEnd()) {
479 fill();
480
481 if (buffer.isAtEnd()) {
482 return null;
483 }
484 }
485
486 assert !buffer.isAtEnd();
487 IdentifiedLink result = new IdentifiedLink();
488
489 result.url = buffer.getUrl();
490 result.identifier = buffer.getIdentifier();
491 result.anchorText = buffer.getAnchorText();
492
493 buffer.incrementTuple();
494 buffer.autoIncrementUrl();
495
496 return result;
497 }
498
499 public final void fill() throws IOException {
500 try {
501 buffer.reset();
502
503 if (tupleCount != 0) {
504
505 if(updateUrlCount - tupleCount > 0) {
506 buffer.urls.add(last.url);
507 buffer.urlTupleIdx.add((int) (updateUrlCount - tupleCount));
508 }
509 bufferStartCount = tupleCount;
510 }
511
512 while (!buffer.isFull()) {
513 updateUrl();
514 buffer.processTuple(input.readString(), input.readString());
515 tupleCount++;
516 }
517 } catch(EOFException e) {}
518 }
519
520 public final void updateUrl() throws IOException {
521 if (updateUrlCount > tupleCount)
522 return;
523
524 last.url = input.readString();
525 updateUrlCount = tupleCount + input.readInt();
526
527 buffer.processUrl(last.url);
528 }
529
530 public void run() throws IOException {
531 while (true) {
532 fill();
533
534 if (buffer.isAtEnd())
535 break;
536
537 buffer.copyUntil(null, processor);
538 }
539 processor.close();
540 }
541
542 public void setProcessor(Step processor) throws IncompatibleProcessorException {
543 if (processor instanceof ShreddedProcessor) {
544 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
545 } else if (processor instanceof IdentifiedLink.Processor) {
546 this.processor = new DuplicateEliminator(new TupleUnshredder((IdentifiedLink.Processor) processor));
547 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
548 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<IdentifiedLink>) processor));
549 } else {
550 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
551 }
552 }
553
554 public Class<IdentifiedLink> getOutputClass() {
555 return IdentifiedLink.class;
556 }
557 }
558
559 public static class DuplicateEliminator implements ShreddedProcessor {
560 public ShreddedProcessor processor;
561 IdentifiedLink last = new IdentifiedLink();
562 boolean urlProcess = true;
563
564 public DuplicateEliminator() {}
565 public DuplicateEliminator(ShreddedProcessor processor) {
566 this.processor = processor;
567 }
568
569 public void setShreddedProcessor(ShreddedProcessor processor) {
570 this.processor = processor;
571 }
572
573 public void processUrl(String url) throws IOException {
574 if (urlProcess || Utility.compare(url, last.url) != 0) {
575 last.url = url;
576 processor.processUrl(url);
577 urlProcess = false;
578 }
579 }
580
581 public void resetUrl() {
582 urlProcess = true;
583 }
584
585 public void processTuple(String identifier, String anchorText) throws IOException {
586 processor.processTuple(identifier, anchorText);
587 }
588
589 public void close() throws IOException {
590 processor.close();
591 }
592 }
593 public static class TupleUnshredder implements ShreddedProcessor {
594 IdentifiedLink last = new IdentifiedLink();
595 public org.galagosearch.tupleflow.Processor<IdentifiedLink> processor;
596
597 public TupleUnshredder(IdentifiedLink.Processor processor) {
598 this.processor = processor;
599 }
600
601 public TupleUnshredder(org.galagosearch.tupleflow.Processor<IdentifiedLink> processor) {
602 this.processor = processor;
603 }
604
605 public IdentifiedLink clone(IdentifiedLink object) {
606 IdentifiedLink result = new IdentifiedLink();
607 if (object == null) return result;
608 result.identifier = object.identifier;
609 result.url = object.url;
610 result.anchorText = object.anchorText;
611 return result;
612 }
613
614 public void processUrl(String url) throws IOException {
615 last.url = url;
616 }
617
618
619 public void processTuple(String identifier, String anchorText) throws IOException {
620 last.identifier = identifier;
621 last.anchorText = anchorText;
622 processor.process(clone(last));
623 }
624
625 public void close() throws IOException {
626 processor.close();
627 }
628 }
629 public static class TupleShredder implements Processor {
630 IdentifiedLink last = new IdentifiedLink();
631 public ShreddedProcessor processor;
632
633 public TupleShredder(ShreddedProcessor processor) {
634 this.processor = processor;
635 }
636
637 public IdentifiedLink clone(IdentifiedLink object) {
638 IdentifiedLink result = new IdentifiedLink();
639 if (object == null) return result;
640 result.identifier = object.identifier;
641 result.url = object.url;
642 result.anchorText = object.anchorText;
643 return result;
644 }
645
646 public void process(IdentifiedLink object) throws IOException {
647 boolean processAll = false;
648 if(last == null || Utility.compare(last.url, object.url) != 0 || processAll) { processor.processUrl(object.url); processAll = true; }
649 processor.processTuple(object.identifier, object.anchorText);
650 }
651
652 public Class<IdentifiedLink> getInputClass() {
653 return IdentifiedLink.class;
654 }
655
656 public void close() throws IOException {
657 processor.close();
658 }
659 }
660 }
661 }