View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class ExtractedLink implements Type<ExtractedLink> {
25      public String srcUrl;
26      public String destUrl;
27      public String anchorText;
28      public boolean noFollow; 
29      
30      public ExtractedLink() {}
31      public ExtractedLink(String srcUrl, String destUrl, String anchorText, boolean noFollow) {
32          this.srcUrl = srcUrl;
33          this.destUrl = destUrl;
34          this.anchorText = anchorText;
35          this.noFollow = noFollow;
36      }  
37      
38      public String toString() {
39              return String.format("%s,%s,%s,%b",
40                                     srcUrl, destUrl, anchorText, noFollow);
41      } 
42  
43      public Order<ExtractedLink> getOrder(String... spec) {
44          if (Arrays.equals(spec, new String[] { "+destUrl" })) {
45              return new DestUrlOrder();
46          }
47          if (Arrays.equals(spec, new String[] { "+srcUrl" })) {
48              return new SrcUrlOrder();
49          }
50          return null;
51      } 
52        
53      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<ExtractedLink> {
54          public void process(ExtractedLink object) throws IOException;
55          public void close() throws IOException;
56      }                        
57      public interface Source extends Step {
58      }
59      public static class DestUrlOrder implements Order<ExtractedLink> {
60          public int hash(ExtractedLink object) {
61              int h = 0;
62              h += Utility.hash(object.destUrl);
63              return h;
64          } 
65          public Comparator<ExtractedLink> greaterThan() {
66              return new Comparator<ExtractedLink>() {
67                  public int compare(ExtractedLink one, ExtractedLink two) {
68                      int result = 0;
69                      do {
70                          result = + Utility.compare(one.destUrl, two.destUrl);
71                          if(result != 0) break;
72                      } while (false);
73                      return -result;
74                  }
75              };
76          }     
77          public Comparator<ExtractedLink> lessThan() {
78              return new Comparator<ExtractedLink>() {
79                  public int compare(ExtractedLink one, ExtractedLink two) {
80                      int result = 0;
81                      do {
82                          result = + Utility.compare(one.destUrl, two.destUrl);
83                          if(result != 0) break;
84                      } while (false);
85                      return result;
86                  }
87              };
88          }     
89          public TypeReader<ExtractedLink> orderedReader(ArrayInput _input) {
90              return new ShreddedReader(_input);
91          }    
92  
93          public TypeReader<ExtractedLink> orderedReader(ArrayInput _input, int bufferSize) {
94              return new ShreddedReader(_input, bufferSize);
95          }    
96          public OrderedWriter<ExtractedLink> orderedWriter(ArrayOutput _output) {
97              ShreddedWriter w = new ShreddedWriter(_output);
98              return new OrderedWriterClass(w); 
99          }                                    
100         public static class OrderedWriterClass extends OrderedWriter< ExtractedLink > {
101             ExtractedLink last = null;
102             ShreddedWriter shreddedWriter = null; 
103             
104             public OrderedWriterClass(ShreddedWriter s) {
105                 this.shreddedWriter = s;
106             }
107             
108             public void process(ExtractedLink object) throws IOException {
109                boolean processAll = false;
110                if (processAll || last == null || 0 != Utility.compare(object.destUrl, last.destUrl)) { processAll = true; shreddedWriter.processDestUrl(object.destUrl); }
111                shreddedWriter.processTuple(object.srcUrl, object.anchorText, object.noFollow);
112                last = object;
113             }           
114                  
115             public void close() throws IOException {
116                 shreddedWriter.close();
117             }
118             
119             public Class<ExtractedLink> getInputClass() {
120                 return ExtractedLink.class;
121             }
122         } 
123         public ReaderSource<ExtractedLink> orderedCombiner(Collection<TypeReader<ExtractedLink>> readers, boolean closeOnExit) {
124             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
125             
126             for (TypeReader<ExtractedLink> reader : readers) {
127                 shreddedReaders.add((ShreddedReader)reader);
128             }
129             
130             return new ShreddedCombiner(shreddedReaders, closeOnExit);
131         }                  
132         public ExtractedLink clone(ExtractedLink object) {
133             ExtractedLink result = new ExtractedLink();
134             if (object == null) return result;
135             result.srcUrl = object.srcUrl; 
136             result.destUrl = object.destUrl; 
137             result.anchorText = object.anchorText; 
138             result.noFollow = object.noFollow; 
139             return result;
140         }                 
141         public Class<ExtractedLink> getOrderedClass() {
142             return ExtractedLink.class;
143         }                           
144         public String[] getOrderSpec() {
145             return new String[] {"+destUrl"};
146         }
147 
148         public static String getSpecString() {
149             return "+destUrl";
150         }
151                            
152         public interface ShreddedProcessor extends Step {
153             public void processDestUrl(String destUrl) throws IOException;
154             public void processTuple(String srcUrl, String anchorText, boolean noFollow) throws IOException;
155             public void close() throws IOException;
156         }    
157         public interface ShreddedSource extends Step {
158         }                                              
159         
160         public static class ShreddedWriter implements ShreddedProcessor {
161             ArrayOutput output;
162             ShreddedBuffer buffer = new ShreddedBuffer();
163             String lastDestUrl;
164             boolean lastFlush = false;
165             
166             public ShreddedWriter(ArrayOutput output) {
167                 this.output = output;
168             }                        
169             
170             public void close() throws IOException {
171                 flush();
172             }
173             
174             public void processDestUrl(String destUrl) {
175                 lastDestUrl = destUrl;
176                 buffer.processDestUrl(destUrl);
177             }
178             public final void processTuple(String srcUrl, String anchorText, boolean noFollow) throws IOException {
179                 if (lastFlush) {
180                     if(buffer.destUrls.size() == 0) buffer.processDestUrl(lastDestUrl);
181                     lastFlush = false;
182                 }
183                 buffer.processTuple(srcUrl, anchorText, noFollow);
184                 if (buffer.isFull())
185                     flush();
186             }
187             public final void flushTuples(int pauseIndex) throws IOException {
188                 
189                 while (buffer.getReadIndex() < pauseIndex) {
190                            
191                     output.writeString(buffer.getSrcUrl());
192                     output.writeString(buffer.getAnchorText());
193                     output.writeBoolean(buffer.getNoFollow());
194                     buffer.incrementTuple();
195                 }
196             }  
197             public final void flushDestUrl(int pauseIndex) throws IOException {
198                 while (buffer.getReadIndex() < pauseIndex) {
199                     int nextPause = buffer.getDestUrlEndIndex();
200                     int count = nextPause - buffer.getReadIndex();
201                     
202                     output.writeString(buffer.getDestUrl());
203                     output.writeInt(count);
204                     buffer.incrementDestUrl();
205                       
206                     flushTuples(nextPause);
207                     assert nextPause == buffer.getReadIndex();
208                 }
209             }
210             public void flush() throws IOException { 
211                 flushDestUrl(buffer.getWriteIndex());
212                 buffer.reset(); 
213                 lastFlush = true;
214             }                           
215         }
216         public static class ShreddedBuffer {
217             ArrayList<String> destUrls = new ArrayList();
218             ArrayList<Integer> destUrlTupleIdx = new ArrayList();
219             int destUrlReadIdx = 0;
220                             
221             String[] srcUrls;
222             String[] anchorTexts;
223             boolean[] noFollows;
224             int writeTupleIndex = 0;
225             int readTupleIndex = 0;
226             int batchSize;
227 
228             public ShreddedBuffer(int batchSize) {
229                 this.batchSize = batchSize;
230 
231                 srcUrls = new String[batchSize];
232                 anchorTexts = new String[batchSize];
233                 noFollows = new boolean[batchSize];
234             }                              
235 
236             public ShreddedBuffer() {    
237                 this(10000);
238             }                                                                                                                    
239             
240             public void processDestUrl(String destUrl) {
241                 destUrls.add(destUrl);
242                 destUrlTupleIdx.add(writeTupleIndex);
243             }                                      
244             public void processTuple(String srcUrl, String anchorText, boolean noFollow) {
245                 assert destUrls.size() > 0;
246                 srcUrls[writeTupleIndex] = srcUrl;
247                 anchorTexts[writeTupleIndex] = anchorText;
248                 noFollows[writeTupleIndex] = noFollow;
249                 writeTupleIndex++;
250             }
251             public void resetData() {
252                 destUrls.clear();
253                 destUrlTupleIdx.clear();
254                 writeTupleIndex = 0;
255             }                  
256                                  
257             public void resetRead() {
258                 readTupleIndex = 0;
259                 destUrlReadIdx = 0;
260             } 
261 
262             public void reset() {
263                 resetData();
264                 resetRead();
265             } 
266             public boolean isFull() {
267                 return writeTupleIndex >= batchSize;
268             }
269 
270             public boolean isEmpty() {
271                 return writeTupleIndex == 0;
272             }                          
273 
274             public boolean isAtEnd() {
275                 return readTupleIndex >= writeTupleIndex;
276             }           
277             public void incrementDestUrl() {
278                 destUrlReadIdx++;  
279             }                                                                                              
280 
281             public void autoIncrementDestUrl() {
282                 while (readTupleIndex >= getDestUrlEndIndex() && readTupleIndex < writeTupleIndex)
283                     destUrlReadIdx++;
284             }                 
285             public void incrementTuple() {
286                 readTupleIndex++;
287             }                    
288             public int getDestUrlEndIndex() {
289                 if ((destUrlReadIdx+1) >= destUrlTupleIdx.size())
290                     return writeTupleIndex;
291                 return destUrlTupleIdx.get(destUrlReadIdx+1);
292             }
293             public int getReadIndex() {
294                 return readTupleIndex;
295             }   
296 
297             public int getWriteIndex() {
298                 return writeTupleIndex;
299             } 
300             public String getDestUrl() {
301                 assert readTupleIndex < writeTupleIndex;
302                 assert destUrlReadIdx < destUrls.size();
303                 
304                 return destUrls.get(destUrlReadIdx);
305             }
306             public String getSrcUrl() {
307                 assert readTupleIndex < writeTupleIndex;
308                 return srcUrls[readTupleIndex];
309             }                                         
310             public String getAnchorText() {
311                 assert readTupleIndex < writeTupleIndex;
312                 return anchorTexts[readTupleIndex];
313             }                                         
314             public boolean getNoFollow() {
315                 assert readTupleIndex < writeTupleIndex;
316                 return noFollows[readTupleIndex];
317             }                                         
318             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
319                 while (getReadIndex() < endIndex) {
320                    output.processTuple(getSrcUrl(), getAnchorText(), getNoFollow());
321                    incrementTuple();
322                 }
323             }                                                                           
324             public void copyUntilIndexDestUrl(int endIndex, ShreddedProcessor output) throws IOException {
325                 while (getReadIndex() < endIndex) {
326                     output.processDestUrl(getDestUrl());
327                     assert getDestUrlEndIndex() <= endIndex;
328                     copyTuples(getDestUrlEndIndex(), output);
329                     incrementDestUrl();
330                 }
331             }  
332             public void copyUntilDestUrl(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
333                 while (!isAtEnd()) {
334                     if (other != null) {   
335                         assert !other.isAtEnd();
336                         int c = + Utility.compare(getDestUrl(), other.getDestUrl());
337                     
338                         if (c > 0) {
339                             break;   
340                         }
341                         
342                         output.processDestUrl(getDestUrl());
343                                       
344                         copyTuples(getDestUrlEndIndex(), output);
345                     } else {
346                         output.processDestUrl(getDestUrl());
347                         copyTuples(getDestUrlEndIndex(), output);
348                     }
349                     incrementDestUrl();  
350                     
351                
352                 }
353             }
354             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
355                 copyUntilDestUrl(other, output);
356             }
357             
358         }                         
359         public static class ShreddedCombiner implements ReaderSource<ExtractedLink>, ShreddedSource {   
360             public ShreddedProcessor processor;
361             Collection<ShreddedReader> readers;       
362             boolean closeOnExit = false;
363             boolean uninitialized = true;
364             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
365             
366             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
367                 this.readers = readers;                                                       
368                 this.closeOnExit = closeOnExit;
369             }
370                                   
371             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
372                 if (processor instanceof ShreddedProcessor) {
373                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
374                 } else if (processor instanceof ExtractedLink.Processor) {
375                     this.processor = new DuplicateEliminator(new TupleUnshredder((ExtractedLink.Processor) processor));
376                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
377                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<ExtractedLink>) processor));
378                 } else {
379                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
380                 }
381             }                                
382             
383             public Class<ExtractedLink> getOutputClass() {
384                 return ExtractedLink.class;
385             }
386             
387             public void initialize() throws IOException {
388                 for (ShreddedReader reader : readers) {
389                     reader.fill();                                        
390                     
391                     if (!reader.getBuffer().isAtEnd())
392                         queue.add(reader);
393                 }   
394 
395                 uninitialized = false;
396             }
397 
398             public void run() throws IOException {
399                 initialize();
400                
401                 while (queue.size() > 0) {
402                     ShreddedReader top = queue.poll();
403                     ShreddedReader next = null;
404                     ShreddedBuffer nextBuffer = null; 
405                     
406                     assert !top.getBuffer().isAtEnd();
407                                                   
408                     if (queue.size() > 0) {
409                         next = queue.peek();
410                         nextBuffer = next.getBuffer();
411                         assert !nextBuffer.isAtEnd();
412                     }
413                     
414                     top.getBuffer().copyUntil(nextBuffer, processor);
415                     if (top.getBuffer().isAtEnd())
416                         top.fill();                 
417                         
418                     if (!top.getBuffer().isAtEnd())
419                         queue.add(top);
420                 }              
421                 
422                 if (closeOnExit)
423                     processor.close();
424             }
425 
426             public ExtractedLink read() throws IOException {
427                 if (uninitialized)
428                     initialize();
429 
430                 ExtractedLink result = null;
431 
432                 while (queue.size() > 0) {
433                     ShreddedReader top = queue.poll();
434                     result = top.read();
435 
436                     if (result != null) {
437                         if (top.getBuffer().isAtEnd())
438                             top.fill();
439 
440                         queue.offer(top);
441                         break;
442                     } 
443                 }
444 
445                 return result;
446             }
447         } 
448         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<ExtractedLink>, ShreddedSource {      
449             public ShreddedProcessor processor;
450             ShreddedBuffer buffer;
451             ExtractedLink last = new ExtractedLink();         
452             long updateDestUrlCount = -1;
453             long tupleCount = 0;
454             long bufferStartCount = 0;  
455             ArrayInput input;
456             
457             public ShreddedReader(ArrayInput input) {
458                 this.input = input; 
459                 this.buffer = new ShreddedBuffer();
460             }                               
461             
462             public ShreddedReader(ArrayInput input, int bufferSize) { 
463                 this.input = input;
464                 this.buffer = new ShreddedBuffer(bufferSize);
465             }
466                  
467             public final int compareTo(ShreddedReader other) {
468                 ShreddedBuffer otherBuffer = other.getBuffer();
469                 
470                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
471                     return 0;                 
472                 } else if (buffer.isAtEnd()) {
473                     return -1;
474                 } else if (otherBuffer.isAtEnd()) {
475                     return 1;
476                 }
477                                    
478                 int result = 0;
479                 do {
480                     result = + Utility.compare(buffer.getDestUrl(), otherBuffer.getDestUrl());
481                     if(result != 0) break;
482                 } while (false);                                             
483                 
484                 return result;
485             }
486             
487             public final ShreddedBuffer getBuffer() {
488                 return buffer;
489             }                
490             
491             public final ExtractedLink read() throws IOException {
492                 if (buffer.isAtEnd()) {
493                     fill();             
494                 
495                     if (buffer.isAtEnd()) {
496                         return null;
497                     }
498                 }
499                       
500                 assert !buffer.isAtEnd();
501                 ExtractedLink result = new ExtractedLink();
502                 
503                 result.destUrl = buffer.getDestUrl();
504                 result.srcUrl = buffer.getSrcUrl();
505                 result.anchorText = buffer.getAnchorText();
506                 result.noFollow = buffer.getNoFollow();
507                 
508                 buffer.incrementTuple();
509                 buffer.autoIncrementDestUrl();
510                 
511                 return result;
512             }           
513             
514             public final void fill() throws IOException {
515                 try {   
516                     buffer.reset();
517                     
518                     if (tupleCount != 0) {
519                                                       
520                         if(updateDestUrlCount - tupleCount > 0) {
521                             buffer.destUrls.add(last.destUrl);
522                             buffer.destUrlTupleIdx.add((int) (updateDestUrlCount - tupleCount));
523                         }
524                         bufferStartCount = tupleCount;
525                     }
526                     
527                     while (!buffer.isFull()) {
528                         updateDestUrl();
529                         buffer.processTuple(input.readString(), input.readString(), input.readBoolean());
530                         tupleCount++;
531                     }
532                 } catch(EOFException e) {}
533             }
534 
535             public final void updateDestUrl() throws IOException {
536                 if (updateDestUrlCount > tupleCount)
537                     return;
538                      
539                 last.destUrl = input.readString();
540                 updateDestUrlCount = tupleCount + input.readInt();
541                                       
542                 buffer.processDestUrl(last.destUrl);
543             }
544 
545             public void run() throws IOException {
546                 while (true) {
547                     fill();
548                     
549                     if (buffer.isAtEnd())
550                         break;
551                     
552                     buffer.copyUntil(null, processor);
553                 }      
554                 processor.close();
555             }
556             
557             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
558                 if (processor instanceof ShreddedProcessor) {
559                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
560                 } else if (processor instanceof ExtractedLink.Processor) {
561                     this.processor = new DuplicateEliminator(new TupleUnshredder((ExtractedLink.Processor) processor));
562                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
563                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<ExtractedLink>) processor));
564                 } else {
565                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
566                 }
567             }                                
568             
569             public Class<ExtractedLink> getOutputClass() {
570                 return ExtractedLink.class;
571             }                
572         }
573         
574         public static class DuplicateEliminator implements ShreddedProcessor {
575             public ShreddedProcessor processor;
576             ExtractedLink last = new ExtractedLink();
577             boolean destUrlProcess = true;
578                                            
579             public DuplicateEliminator() {}
580             public DuplicateEliminator(ShreddedProcessor processor) {
581                 this.processor = processor;
582             }
583             
584             public void setShreddedProcessor(ShreddedProcessor processor) {
585                 this.processor = processor;
586             }
587 
588             public void processDestUrl(String destUrl) throws IOException {  
589                 if (destUrlProcess || Utility.compare(destUrl, last.destUrl) != 0) {
590                     last.destUrl = destUrl;
591                     processor.processDestUrl(destUrl);
592                     destUrlProcess = false;
593                 }
594             }  
595             
596             public void resetDestUrl() {
597                  destUrlProcess = true;
598             }                                                
599                                
600             public void processTuple(String srcUrl, String anchorText, boolean noFollow) throws IOException {
601                 processor.processTuple(srcUrl, anchorText, noFollow);
602             } 
603             
604             public void close() throws IOException {
605                 processor.close();
606             }                    
607         }
608         public static class TupleUnshredder implements ShreddedProcessor {
609             ExtractedLink last = new ExtractedLink();
610             public org.galagosearch.tupleflow.Processor<ExtractedLink> processor;                               
611             
612             public TupleUnshredder(ExtractedLink.Processor processor) {
613                 this.processor = processor;
614             }         
615             
616             public TupleUnshredder(org.galagosearch.tupleflow.Processor<ExtractedLink> processor) {
617                 this.processor = processor;
618             }
619             
620             public ExtractedLink clone(ExtractedLink object) {
621                 ExtractedLink result = new ExtractedLink();
622                 if (object == null) return result;
623                 result.srcUrl = object.srcUrl; 
624                 result.destUrl = object.destUrl; 
625                 result.anchorText = object.anchorText; 
626                 result.noFollow = object.noFollow; 
627                 return result;
628             }                 
629             
630             public void processDestUrl(String destUrl) throws IOException {
631                 last.destUrl = destUrl;
632             }   
633                 
634             
635             public void processTuple(String srcUrl, String anchorText, boolean noFollow) throws IOException {
636                 last.srcUrl = srcUrl;
637                 last.anchorText = anchorText;
638                 last.noFollow = noFollow;
639                 processor.process(clone(last));
640             }               
641             
642             public void close() throws IOException {
643                 processor.close();
644             }
645         }     
646         public static class TupleShredder implements Processor {
647             ExtractedLink last = new ExtractedLink();
648             public ShreddedProcessor processor;
649             
650             public TupleShredder(ShreddedProcessor processor) {
651                 this.processor = processor;
652             }                              
653             
654             public ExtractedLink clone(ExtractedLink object) {
655                 ExtractedLink result = new ExtractedLink();
656                 if (object == null) return result;
657                 result.srcUrl = object.srcUrl; 
658                 result.destUrl = object.destUrl; 
659                 result.anchorText = object.anchorText; 
660                 result.noFollow = object.noFollow; 
661                 return result;
662             }                 
663             
664             public void process(ExtractedLink object) throws IOException {                                                                                                                                                   
665                 boolean processAll = false;
666                 if(last == null || Utility.compare(last.destUrl, object.destUrl) != 0 || processAll) { processor.processDestUrl(object.destUrl); processAll = true; }
667                 processor.processTuple(object.srcUrl, object.anchorText, object.noFollow);                                         
668             }
669                           
670             public Class<ExtractedLink> getInputClass() {
671                 return ExtractedLink.class;
672             }
673             
674             public void close() throws IOException {
675                 processor.close();
676             }                     
677         }
678     } 
679     public static class SrcUrlOrder implements Order<ExtractedLink> {
680         public int hash(ExtractedLink object) {
681             int h = 0;
682             h += Utility.hash(object.srcUrl);
683             return h;
684         } 
685         public Comparator<ExtractedLink> greaterThan() {
686             return new Comparator<ExtractedLink>() {
687                 public int compare(ExtractedLink one, ExtractedLink two) {
688                     int result = 0;
689                     do {
690                         result = + Utility.compare(one.srcUrl, two.srcUrl);
691                         if(result != 0) break;
692                     } while (false);
693                     return -result;
694                 }
695             };
696         }     
697         public Comparator<ExtractedLink> lessThan() {
698             return new Comparator<ExtractedLink>() {
699                 public int compare(ExtractedLink one, ExtractedLink two) {
700                     int result = 0;
701                     do {
702                         result = + Utility.compare(one.srcUrl, two.srcUrl);
703                         if(result != 0) break;
704                     } while (false);
705                     return result;
706                 }
707             };
708         }     
709         public TypeReader<ExtractedLink> orderedReader(ArrayInput _input) {
710             return new ShreddedReader(_input);
711         }    
712 
713         public TypeReader<ExtractedLink> orderedReader(ArrayInput _input, int bufferSize) {
714             return new ShreddedReader(_input, bufferSize);
715         }    
716         public OrderedWriter<ExtractedLink> orderedWriter(ArrayOutput _output) {
717             ShreddedWriter w = new ShreddedWriter(_output);
718             return new OrderedWriterClass(w); 
719         }                                    
720         public static class OrderedWriterClass extends OrderedWriter< ExtractedLink > {
721             ExtractedLink last = null;
722             ShreddedWriter shreddedWriter = null; 
723             
724             public OrderedWriterClass(ShreddedWriter s) {
725                 this.shreddedWriter = s;
726             }
727             
728             public void process(ExtractedLink object) throws IOException {
729                boolean processAll = false;
730                if (processAll || last == null || 0 != Utility.compare(object.srcUrl, last.srcUrl)) { processAll = true; shreddedWriter.processSrcUrl(object.srcUrl); }
731                shreddedWriter.processTuple(object.destUrl, object.anchorText, object.noFollow);
732                last = object;
733             }           
734                  
735             public void close() throws IOException {
736                 shreddedWriter.close();
737             }
738             
739             public Class<ExtractedLink> getInputClass() {
740                 return ExtractedLink.class;
741             }
742         } 
743         public ReaderSource<ExtractedLink> orderedCombiner(Collection<TypeReader<ExtractedLink>> readers, boolean closeOnExit) {
744             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
745             
746             for (TypeReader<ExtractedLink> reader : readers) {
747                 shreddedReaders.add((ShreddedReader)reader);
748             }
749             
750             return new ShreddedCombiner(shreddedReaders, closeOnExit);
751         }                  
752         public ExtractedLink clone(ExtractedLink object) {
753             ExtractedLink result = new ExtractedLink();
754             if (object == null) return result;
755             result.srcUrl = object.srcUrl; 
756             result.destUrl = object.destUrl; 
757             result.anchorText = object.anchorText; 
758             result.noFollow = object.noFollow; 
759             return result;
760         }                 
761         public Class<ExtractedLink> getOrderedClass() {
762             return ExtractedLink.class;
763         }                           
764         public String[] getOrderSpec() {
765             return new String[] {"+srcUrl"};
766         }
767 
768         public static String getSpecString() {
769             return "+srcUrl";
770         }
771                            
772         public interface ShreddedProcessor extends Step {
773             public void processSrcUrl(String srcUrl) throws IOException;
774             public void processTuple(String destUrl, String anchorText, boolean noFollow) throws IOException;
775             public void close() throws IOException;
776         }    
777         public interface ShreddedSource extends Step {
778         }                                              
779         
780         public static class ShreddedWriter implements ShreddedProcessor {
781             ArrayOutput output;
782             ShreddedBuffer buffer = new ShreddedBuffer();
783             String lastSrcUrl;
784             boolean lastFlush = false;
785             
786             public ShreddedWriter(ArrayOutput output) {
787                 this.output = output;
788             }                        
789             
790             public void close() throws IOException {
791                 flush();
792             }
793             
794             public void processSrcUrl(String srcUrl) {
795                 lastSrcUrl = srcUrl;
796                 buffer.processSrcUrl(srcUrl);
797             }
798             public final void processTuple(String destUrl, String anchorText, boolean noFollow) throws IOException {
799                 if (lastFlush) {
800                     if(buffer.srcUrls.size() == 0) buffer.processSrcUrl(lastSrcUrl);
801                     lastFlush = false;
802                 }
803                 buffer.processTuple(destUrl, anchorText, noFollow);
804                 if (buffer.isFull())
805                     flush();
806             }
807             public final void flushTuples(int pauseIndex) throws IOException {
808                 
809                 while (buffer.getReadIndex() < pauseIndex) {
810                            
811                     output.writeString(buffer.getDestUrl());
812                     output.writeString(buffer.getAnchorText());
813                     output.writeBoolean(buffer.getNoFollow());
814                     buffer.incrementTuple();
815                 }
816             }  
817             public final void flushSrcUrl(int pauseIndex) throws IOException {
818                 while (buffer.getReadIndex() < pauseIndex) {
819                     int nextPause = buffer.getSrcUrlEndIndex();
820                     int count = nextPause - buffer.getReadIndex();
821                     
822                     output.writeString(buffer.getSrcUrl());
823                     output.writeInt(count);
824                     buffer.incrementSrcUrl();
825                       
826                     flushTuples(nextPause);
827                     assert nextPause == buffer.getReadIndex();
828                 }
829             }
830             public void flush() throws IOException { 
831                 flushSrcUrl(buffer.getWriteIndex());
832                 buffer.reset(); 
833                 lastFlush = true;
834             }                           
835         }
836         public static class ShreddedBuffer {
837             ArrayList<String> srcUrls = new ArrayList();
838             ArrayList<Integer> srcUrlTupleIdx = new ArrayList();
839             int srcUrlReadIdx = 0;
840                             
841             String[] destUrls;
842             String[] anchorTexts;
843             boolean[] noFollows;
844             int writeTupleIndex = 0;
845             int readTupleIndex = 0;
846             int batchSize;
847 
848             public ShreddedBuffer(int batchSize) {
849                 this.batchSize = batchSize;
850 
851                 destUrls = new String[batchSize];
852                 anchorTexts = new String[batchSize];
853                 noFollows = new boolean[batchSize];
854             }                              
855 
856             public ShreddedBuffer() {    
857                 this(10000);
858             }                                                                                                                    
859             
860             public void processSrcUrl(String srcUrl) {
861                 srcUrls.add(srcUrl);
862                 srcUrlTupleIdx.add(writeTupleIndex);
863             }                                      
864             public void processTuple(String destUrl, String anchorText, boolean noFollow) {
865                 assert srcUrls.size() > 0;
866                 destUrls[writeTupleIndex] = destUrl;
867                 anchorTexts[writeTupleIndex] = anchorText;
868                 noFollows[writeTupleIndex] = noFollow;
869                 writeTupleIndex++;
870             }
871             public void resetData() {
872                 srcUrls.clear();
873                 srcUrlTupleIdx.clear();
874                 writeTupleIndex = 0;
875             }                  
876                                  
877             public void resetRead() {
878                 readTupleIndex = 0;
879                 srcUrlReadIdx = 0;
880             } 
881 
882             public void reset() {
883                 resetData();
884                 resetRead();
885             } 
886             public boolean isFull() {
887                 return writeTupleIndex >= batchSize;
888             }
889 
890             public boolean isEmpty() {
891                 return writeTupleIndex == 0;
892             }                          
893 
894             public boolean isAtEnd() {
895                 return readTupleIndex >= writeTupleIndex;
896             }           
897             public void incrementSrcUrl() {
898                 srcUrlReadIdx++;  
899             }                                                                                              
900 
901             public void autoIncrementSrcUrl() {
902                 while (readTupleIndex >= getSrcUrlEndIndex() && readTupleIndex < writeTupleIndex)
903                     srcUrlReadIdx++;
904             }                 
905             public void incrementTuple() {
906                 readTupleIndex++;
907             }                    
908             public int getSrcUrlEndIndex() {
909                 if ((srcUrlReadIdx+1) >= srcUrlTupleIdx.size())
910                     return writeTupleIndex;
911                 return srcUrlTupleIdx.get(srcUrlReadIdx+1);
912             }
913             public int getReadIndex() {
914                 return readTupleIndex;
915             }   
916 
917             public int getWriteIndex() {
918                 return writeTupleIndex;
919             } 
920             public String getSrcUrl() {
921                 assert readTupleIndex < writeTupleIndex;
922                 assert srcUrlReadIdx < srcUrls.size();
923                 
924                 return srcUrls.get(srcUrlReadIdx);
925             }
926             public String getDestUrl() {
927                 assert readTupleIndex < writeTupleIndex;
928                 return destUrls[readTupleIndex];
929             }                                         
930             public String getAnchorText() {
931                 assert readTupleIndex < writeTupleIndex;
932                 return anchorTexts[readTupleIndex];
933             }                                         
934             public boolean getNoFollow() {
935                 assert readTupleIndex < writeTupleIndex;
936                 return noFollows[readTupleIndex];
937             }                                         
938             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
939                 while (getReadIndex() < endIndex) {
940                    output.processTuple(getDestUrl(), getAnchorText(), getNoFollow());
941                    incrementTuple();
942                 }
943             }                                                                           
944             public void copyUntilIndexSrcUrl(int endIndex, ShreddedProcessor output) throws IOException {
945                 while (getReadIndex() < endIndex) {
946                     output.processSrcUrl(getSrcUrl());
947                     assert getSrcUrlEndIndex() <= endIndex;
948                     copyTuples(getSrcUrlEndIndex(), output);
949                     incrementSrcUrl();
950                 }
951             }  
952             public void copyUntilSrcUrl(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
953                 while (!isAtEnd()) {
954                     if (other != null) {   
955                         assert !other.isAtEnd();
956                         int c = + Utility.compare(getSrcUrl(), other.getSrcUrl());
957                     
958                         if (c > 0) {
959                             break;   
960                         }
961                         
962                         output.processSrcUrl(getSrcUrl());
963                                       
964                         copyTuples(getSrcUrlEndIndex(), output);
965                     } else {
966                         output.processSrcUrl(getSrcUrl());
967                         copyTuples(getSrcUrlEndIndex(), output);
968                     }
969                     incrementSrcUrl();  
970                     
971                
972                 }
973             }
974             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
975                 copyUntilSrcUrl(other, output);
976             }
977             
978         }                         
979         public static class ShreddedCombiner implements ReaderSource<ExtractedLink>, ShreddedSource {   
980             public ShreddedProcessor processor;
981             Collection<ShreddedReader> readers;       
982             boolean closeOnExit = false;
983             boolean uninitialized = true;
984             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
985             
986             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
987                 this.readers = readers;                                                       
988                 this.closeOnExit = closeOnExit;
989             }
990                                   
991             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
992                 if (processor instanceof ShreddedProcessor) {
993                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
994                 } else if (processor instanceof ExtractedLink.Processor) {
995                     this.processor = new DuplicateEliminator(new TupleUnshredder((ExtractedLink.Processor) processor));
996                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
997                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<ExtractedLink>) processor));
998                 } else {
999                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1000                 }
1001             }                                
1002             
1003             public Class<ExtractedLink> getOutputClass() {
1004                 return ExtractedLink.class;
1005             }
1006             
1007             public void initialize() throws IOException {
1008                 for (ShreddedReader reader : readers) {
1009                     reader.fill();                                        
1010                     
1011                     if (!reader.getBuffer().isAtEnd())
1012                         queue.add(reader);
1013                 }   
1014 
1015                 uninitialized = false;
1016             }
1017 
1018             public void run() throws IOException {
1019                 initialize();
1020                
1021                 while (queue.size() > 0) {
1022                     ShreddedReader top = queue.poll();
1023                     ShreddedReader next = null;
1024                     ShreddedBuffer nextBuffer = null; 
1025                     
1026                     assert !top.getBuffer().isAtEnd();
1027                                                   
1028                     if (queue.size() > 0) {
1029                         next = queue.peek();
1030                         nextBuffer = next.getBuffer();
1031                         assert !nextBuffer.isAtEnd();
1032                     }
1033                     
1034                     top.getBuffer().copyUntil(nextBuffer, processor);
1035                     if (top.getBuffer().isAtEnd())
1036                         top.fill();                 
1037                         
1038                     if (!top.getBuffer().isAtEnd())
1039                         queue.add(top);
1040                 }              
1041                 
1042                 if (closeOnExit)
1043                     processor.close();
1044             }
1045 
1046             public ExtractedLink read() throws IOException {
1047                 if (uninitialized)
1048                     initialize();
1049 
1050                 ExtractedLink result = null;
1051 
1052                 while (queue.size() > 0) {
1053                     ShreddedReader top = queue.poll();
1054                     result = top.read();
1055 
1056                     if (result != null) {
1057                         if (top.getBuffer().isAtEnd())
1058                             top.fill();
1059 
1060                         queue.offer(top);
1061                         break;
1062                     } 
1063                 }
1064 
1065                 return result;
1066             }
1067         } 
1068         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<ExtractedLink>, ShreddedSource {      
1069             public ShreddedProcessor processor;
1070             ShreddedBuffer buffer;
1071             ExtractedLink last = new ExtractedLink();         
1072             long updateSrcUrlCount = -1;
1073             long tupleCount = 0;
1074             long bufferStartCount = 0;  
1075             ArrayInput input;
1076             
1077             public ShreddedReader(ArrayInput input) {
1078                 this.input = input; 
1079                 this.buffer = new ShreddedBuffer();
1080             }                               
1081             
1082             public ShreddedReader(ArrayInput input, int bufferSize) { 
1083                 this.input = input;
1084                 this.buffer = new ShreddedBuffer(bufferSize);
1085             }
1086                  
1087             public final int compareTo(ShreddedReader other) {
1088                 ShreddedBuffer otherBuffer = other.getBuffer();
1089                 
1090                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
1091                     return 0;                 
1092                 } else if (buffer.isAtEnd()) {
1093                     return -1;
1094                 } else if (otherBuffer.isAtEnd()) {
1095                     return 1;
1096                 }
1097                                    
1098                 int result = 0;
1099                 do {
1100                     result = + Utility.compare(buffer.getSrcUrl(), otherBuffer.getSrcUrl());
1101                     if(result != 0) break;
1102                 } while (false);                                             
1103                 
1104                 return result;
1105             }
1106             
1107             public final ShreddedBuffer getBuffer() {
1108                 return buffer;
1109             }                
1110             
1111             public final ExtractedLink read() throws IOException {
1112                 if (buffer.isAtEnd()) {
1113                     fill();             
1114                 
1115                     if (buffer.isAtEnd()) {
1116                         return null;
1117                     }
1118                 }
1119                       
1120                 assert !buffer.isAtEnd();
1121                 ExtractedLink result = new ExtractedLink();
1122                 
1123                 result.srcUrl = buffer.getSrcUrl();
1124                 result.destUrl = buffer.getDestUrl();
1125                 result.anchorText = buffer.getAnchorText();
1126                 result.noFollow = buffer.getNoFollow();
1127                 
1128                 buffer.incrementTuple();
1129                 buffer.autoIncrementSrcUrl();
1130                 
1131                 return result;
1132             }           
1133             
1134             public final void fill() throws IOException {
1135                 try {   
1136                     buffer.reset();
1137                     
1138                     if (tupleCount != 0) {
1139                                                       
1140                         if(updateSrcUrlCount - tupleCount > 0) {
1141                             buffer.srcUrls.add(last.srcUrl);
1142                             buffer.srcUrlTupleIdx.add((int) (updateSrcUrlCount - tupleCount));
1143                         }
1144                         bufferStartCount = tupleCount;
1145                     }
1146                     
1147                     while (!buffer.isFull()) {
1148                         updateSrcUrl();
1149                         buffer.processTuple(input.readString(), input.readString(), input.readBoolean());
1150                         tupleCount++;
1151                     }
1152                 } catch(EOFException e) {}
1153             }
1154 
1155             public final void updateSrcUrl() throws IOException {
1156                 if (updateSrcUrlCount > tupleCount)
1157                     return;
1158                      
1159                 last.srcUrl = input.readString();
1160                 updateSrcUrlCount = tupleCount + input.readInt();
1161                                       
1162                 buffer.processSrcUrl(last.srcUrl);
1163             }
1164 
1165             public void run() throws IOException {
1166                 while (true) {
1167                     fill();
1168                     
1169                     if (buffer.isAtEnd())
1170                         break;
1171                     
1172                     buffer.copyUntil(null, processor);
1173                 }      
1174                 processor.close();
1175             }
1176             
1177             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
1178                 if (processor instanceof ShreddedProcessor) {
1179                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
1180                 } else if (processor instanceof ExtractedLink.Processor) {
1181                     this.processor = new DuplicateEliminator(new TupleUnshredder((ExtractedLink.Processor) processor));
1182                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
1183                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<ExtractedLink>) processor));
1184                 } else {
1185                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
1186                 }
1187             }                                
1188             
1189             public Class<ExtractedLink> getOutputClass() {
1190                 return ExtractedLink.class;
1191             }                
1192         }
1193         
1194         public static class DuplicateEliminator implements ShreddedProcessor {
1195             public ShreddedProcessor processor;
1196             ExtractedLink last = new ExtractedLink();
1197             boolean srcUrlProcess = true;
1198                                            
1199             public DuplicateEliminator() {}
1200             public DuplicateEliminator(ShreddedProcessor processor) {
1201                 this.processor = processor;
1202             }
1203             
1204             public void setShreddedProcessor(ShreddedProcessor processor) {
1205                 this.processor = processor;
1206             }
1207 
1208             public void processSrcUrl(String srcUrl) throws IOException {  
1209                 if (srcUrlProcess || Utility.compare(srcUrl, last.srcUrl) != 0) {
1210                     last.srcUrl = srcUrl;
1211                     processor.processSrcUrl(srcUrl);
1212                     srcUrlProcess = false;
1213                 }
1214             }  
1215             
1216             public void resetSrcUrl() {
1217                  srcUrlProcess = true;
1218             }                                                
1219                                
1220             public void processTuple(String destUrl, String anchorText, boolean noFollow) throws IOException {
1221                 processor.processTuple(destUrl, anchorText, noFollow);
1222             } 
1223             
1224             public void close() throws IOException {
1225                 processor.close();
1226             }                    
1227         }
1228         public static class TupleUnshredder implements ShreddedProcessor {
1229             ExtractedLink last = new ExtractedLink();
1230             public org.galagosearch.tupleflow.Processor<ExtractedLink> processor;                               
1231             
1232             public TupleUnshredder(ExtractedLink.Processor processor) {
1233                 this.processor = processor;
1234             }         
1235             
1236             public TupleUnshredder(org.galagosearch.tupleflow.Processor<ExtractedLink> processor) {
1237                 this.processor = processor;
1238             }
1239             
1240             public ExtractedLink clone(ExtractedLink object) {
1241                 ExtractedLink result = new ExtractedLink();
1242                 if (object == null) return result;
1243                 result.srcUrl = object.srcUrl; 
1244                 result.destUrl = object.destUrl; 
1245                 result.anchorText = object.anchorText; 
1246                 result.noFollow = object.noFollow; 
1247                 return result;
1248             }                 
1249             
1250             public void processSrcUrl(String srcUrl) throws IOException {
1251                 last.srcUrl = srcUrl;
1252             }   
1253                 
1254             
1255             public void processTuple(String destUrl, String anchorText, boolean noFollow) throws IOException {
1256                 last.destUrl = destUrl;
1257                 last.anchorText = anchorText;
1258                 last.noFollow = noFollow;
1259                 processor.process(clone(last));
1260             }               
1261             
1262             public void close() throws IOException {
1263                 processor.close();
1264             }
1265         }     
1266         public static class TupleShredder implements Processor {
1267             ExtractedLink last = new ExtractedLink();
1268             public ShreddedProcessor processor;
1269             
1270             public TupleShredder(ShreddedProcessor processor) {
1271                 this.processor = processor;
1272             }                              
1273             
1274             public ExtractedLink clone(ExtractedLink object) {
1275                 ExtractedLink result = new ExtractedLink();
1276                 if (object == null) return result;
1277                 result.srcUrl = object.srcUrl; 
1278                 result.destUrl = object.destUrl; 
1279                 result.anchorText = object.anchorText; 
1280                 result.noFollow = object.noFollow; 
1281                 return result;
1282             }                 
1283             
1284             public void process(ExtractedLink object) throws IOException {                                                                                                                                                   
1285                 boolean processAll = false;
1286                 if(last == null || Utility.compare(last.srcUrl, object.srcUrl) != 0 || processAll) { processor.processSrcUrl(object.srcUrl); processAll = true; }
1287                 processor.processTuple(object.destUrl, object.anchorText, object.noFollow);                                         
1288             }
1289                           
1290             public Class<ExtractedLink> getInputClass() {
1291                 return ExtractedLink.class;
1292             }
1293             
1294             public void close() throws IOException {
1295                 processor.close();
1296             }                     
1297         }
1298     } 
1299 }