Coverage Report - org.galagosearch.core.types.ExtractedLink
 
Classes in this File Line Coverage Branch Coverage Complexity
ExtractedLink
23%
3/13
25%
1/4
0
ExtractedLink$DestUrlOrder
16%
4/25
0%
0/4
0
ExtractedLink$DestUrlOrder$1
0%
0/5
0%
0/2
0
ExtractedLink$DestUrlOrder$2
0%
0/5
0%
0/2
0
ExtractedLink$DestUrlOrder$DuplicateEliminator
0%
0/19
0%
0/4
0
ExtractedLink$DestUrlOrder$OrderedWriterClass
0%
0/14
0%
0/6
0
ExtractedLink$DestUrlOrder$ShreddedBuffer
0%
0/82
0%
0/54
0
ExtractedLink$DestUrlOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
ExtractedLink$DestUrlOrder$ShreddedProcessor
N/A
N/A
0
ExtractedLink$DestUrlOrder$ShreddedReader
0%
0/71
0%
0/34
0
ExtractedLink$DestUrlOrder$ShreddedSource
N/A
N/A
0
ExtractedLink$DestUrlOrder$ShreddedWriter
0%
0/38
0%
0/14
0
ExtractedLink$DestUrlOrder$TupleShredder
0%
0/19
0%
0/8
0
ExtractedLink$DestUrlOrder$TupleUnshredder
0%
0/23
0%
0/2
0
ExtractedLink$Processor
N/A
N/A
0
ExtractedLink$Source
N/A
N/A
0
ExtractedLink$SrcUrlOrder
0%
0/25
0%
0/4
0
ExtractedLink$SrcUrlOrder$1
0%
0/5
0%
0/2
0
ExtractedLink$SrcUrlOrder$2
0%
0/5
0%
0/2
0
ExtractedLink$SrcUrlOrder$DuplicateEliminator
0%
0/19
0%
0/4
0
ExtractedLink$SrcUrlOrder$OrderedWriterClass
0%
0/14
0%
0/6
0
ExtractedLink$SrcUrlOrder$ShreddedBuffer
0%
0/82
0%
0/54
0
ExtractedLink$SrcUrlOrder$ShreddedCombiner
0%
0/55
0%
0/36
0
ExtractedLink$SrcUrlOrder$ShreddedProcessor
N/A
N/A
0
ExtractedLink$SrcUrlOrder$ShreddedReader
0%
0/71
0%
0/34
0
ExtractedLink$SrcUrlOrder$ShreddedSource
N/A
N/A
0
ExtractedLink$SrcUrlOrder$ShreddedWriter
0%
0/38
0%
0/14
0
ExtractedLink$SrcUrlOrder$TupleShredder
0%
0/19
0%
0/8
0
ExtractedLink$SrcUrlOrder$TupleUnshredder
0%
0/23
0%
0/2
0
 
 1  
 // This file was automatically generated with the command: 
 2  
 //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
 3  
 package org.galagosearch.core.types;
 4  
 
 5  
 import org.galagosearch.tupleflow.Utility;
 6  
 import org.galagosearch.tupleflow.ArrayInput;
 7  
 import org.galagosearch.tupleflow.ArrayOutput;
 8  
 import org.galagosearch.tupleflow.Order;   
 9  
 import org.galagosearch.tupleflow.OrderedWriter;
 10  
 import org.galagosearch.tupleflow.Type; 
 11  
 import org.galagosearch.tupleflow.TypeReader;
 12  
 import org.galagosearch.tupleflow.Step; 
 13  
 import org.galagosearch.tupleflow.IncompatibleProcessorException;
 14  
 import org.galagosearch.tupleflow.ReaderSource;
 15  
 import java.io.IOException;             
 16  
 import java.io.EOFException;
 17  
 import java.io.UnsupportedEncodingException;
 18  
 import java.util.ArrayList;
 19  
 import java.util.Arrays;   
 20  
 import java.util.Comparator;
 21  
 import java.util.PriorityQueue;
 22  
 import java.util.Collection;
 23  
 
 24  
 public class ExtractedLink implements Type<ExtractedLink> {
 25  
     public String srcUrl;
 26  
     public String destUrl;
 27  
     public String anchorText;
 28  
     public boolean noFollow; 
 29  
     
 30  8
     public ExtractedLink() {}
 31  0
     public ExtractedLink(String srcUrl, String destUrl, String anchorText, boolean noFollow) {
 32  0
         this.srcUrl = srcUrl;
 33  0
         this.destUrl = destUrl;
 34  0
         this.anchorText = anchorText;
 35  0
         this.noFollow = noFollow;
 36  0
     }  
 37  
     
 38  
     public String toString() {
 39  0
             return String.format("%s,%s,%s,%b",
 40  
                                    srcUrl, destUrl, anchorText, noFollow);
 41  
     } 
 42  
 
 43  
     public Order<ExtractedLink> getOrder(String... spec) {
 44  8
         if (Arrays.equals(spec, new String[] { "+destUrl" })) {
 45  8
             return new DestUrlOrder();
 46  
         }
 47  0
         if (Arrays.equals(spec, new String[] { "+srcUrl" })) {
 48  0
             return new SrcUrlOrder();
 49  
         }
 50  0
         return null;
 51  
     } 
 52  
       
 53  
     public interface Processor extends Step, org.galagosearch.tupleflow.Processor<ExtractedLink> {
 54  
         public void process(ExtractedLink object) throws IOException;
 55  
         public void close() throws IOException;
 56  
     }                        
 57  
     public interface Source extends Step {
 58  
     }
 59  36
     public static class DestUrlOrder implements Order<ExtractedLink> {
 60  
         public int hash(ExtractedLink object) {
 61  0
             int h = 0;
 62  0
             h += Utility.hash(object.destUrl);
 63  0
             return h;
 64  
         } 
 65  
         public Comparator<ExtractedLink> greaterThan() {
 66  0
             return new Comparator<ExtractedLink>() {
 67  0
                 public int compare(ExtractedLink one, ExtractedLink two) {
 68  0
                     int result = 0;
 69  
                     do {
 70  0
                         result = + Utility.compare(one.destUrl, two.destUrl);
 71  0
                         if(result != 0) break;
 72  
                     } while (false);
 73  0
                     return -result;
 74  
                 }
 75  
             };
 76  
         }     
 77  
         public Comparator<ExtractedLink> lessThan() {
 78  0
             return new Comparator<ExtractedLink>() {
 79  0
                 public int compare(ExtractedLink one, ExtractedLink two) {
 80  0
                     int result = 0;
 81  
                     do {
 82  0
                         result = + Utility.compare(one.destUrl, two.destUrl);
 83  0
                         if(result != 0) break;
 84  
                     } while (false);
 85  0
                     return result;
 86  
                 }
 87  
             };
 88  
         }     
 89  
         public TypeReader<ExtractedLink> orderedReader(ArrayInput _input) {
 90  0
             return new ShreddedReader(_input);
 91  
         }    
 92  
 
 93  
         public TypeReader<ExtractedLink> orderedReader(ArrayInput _input, int bufferSize) {
 94  0
             return new ShreddedReader(_input, bufferSize);
 95  
         }    
 96  
         public OrderedWriter<ExtractedLink> orderedWriter(ArrayOutput _output) {
 97  0
             ShreddedWriter w = new ShreddedWriter(_output);
 98  0
             return new OrderedWriterClass(w); 
 99  
         }                                    
 100  0
         public static class OrderedWriterClass extends OrderedWriter< ExtractedLink > {
 101  0
             ExtractedLink last = null;
 102  0
             ShreddedWriter shreddedWriter = null; 
 103  
             
 104  0
             public OrderedWriterClass(ShreddedWriter s) {
 105  0
                 this.shreddedWriter = s;
 106  0
             }
 107  
             
 108  
             public void process(ExtractedLink object) throws IOException {
 109  0
                boolean processAll = false;
 110  0
                if (processAll || last == null || 0 != Utility.compare(object.destUrl, last.destUrl)) { processAll = true; shreddedWriter.processDestUrl(object.destUrl); }
 111  0
                shreddedWriter.processTuple(object.srcUrl, object.anchorText, object.noFollow);
 112  0
                last = object;
 113  0
             }           
 114  
                  
 115  
             public void close() throws IOException {
 116  0
                 shreddedWriter.close();
 117  0
             }
 118  
             
 119  
             public Class<ExtractedLink> getInputClass() {
 120  0
                 return ExtractedLink.class;
 121  
             }
 122  
         } 
 123  
         public ReaderSource<ExtractedLink> orderedCombiner(Collection<TypeReader<ExtractedLink>> readers, boolean closeOnExit) {
 124  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 125  
             
 126  0
             for (TypeReader<ExtractedLink> reader : readers) {
 127  0
                 shreddedReaders.add((ShreddedReader)reader);
 128  
             }
 129  
             
 130  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 131  
         }                  
 132  
         public ExtractedLink clone(ExtractedLink object) {
 133  0
             ExtractedLink result = new ExtractedLink();
 134  0
             if (object == null) return result;
 135  0
             result.srcUrl = object.srcUrl; 
 136  0
             result.destUrl = object.destUrl; 
 137  0
             result.anchorText = object.anchorText; 
 138  0
             result.noFollow = object.noFollow; 
 139  0
             return result;
 140  
         }                 
 141  
         public Class<ExtractedLink> getOrderedClass() {
 142  28
             return ExtractedLink.class;
 143  
         }                           
 144  
         public String[] getOrderSpec() {
 145  28
             return new String[] {"+destUrl"};
 146  
         }
 147  
 
 148  
         public static String getSpecString() {
 149  0
             return "+destUrl";
 150  
         }
 151  
                            
 152  
         public interface ShreddedProcessor extends Step {
 153  
             public void processDestUrl(String destUrl) throws IOException;
 154  
             public void processTuple(String srcUrl, String anchorText, boolean noFollow) throws IOException;
 155  
             public void close() throws IOException;
 156  
         }    
 157  
         public interface ShreddedSource extends Step {
 158  
         }                                              
 159  
         
 160  0
         public static class ShreddedWriter implements ShreddedProcessor {
 161  
             ArrayOutput output;
 162  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 163  
             String lastDestUrl;
 164  0
             boolean lastFlush = false;
 165  
             
 166  0
             public ShreddedWriter(ArrayOutput output) {
 167  0
                 this.output = output;
 168  0
             }                        
 169  
             
 170  
             public void close() throws IOException {
 171  0
                 flush();
 172  0
             }
 173  
             
 174  
             public void processDestUrl(String destUrl) {
 175  0
                 lastDestUrl = destUrl;
 176  0
                 buffer.processDestUrl(destUrl);
 177  0
             }
 178  
             public final void processTuple(String srcUrl, String anchorText, boolean noFollow) throws IOException {
 179  0
                 if (lastFlush) {
 180  0
                     if(buffer.destUrls.size() == 0) buffer.processDestUrl(lastDestUrl);
 181  0
                     lastFlush = false;
 182  
                 }
 183  0
                 buffer.processTuple(srcUrl, anchorText, noFollow);
 184  0
                 if (buffer.isFull())
 185  0
                     flush();
 186  0
             }
 187  
             public final void flushTuples(int pauseIndex) throws IOException {
 188  
                 
 189  0
                 while (buffer.getReadIndex() < pauseIndex) {
 190  
                            
 191  0
                     output.writeString(buffer.getSrcUrl());
 192  0
                     output.writeString(buffer.getAnchorText());
 193  0
                     output.writeBoolean(buffer.getNoFollow());
 194  0
                     buffer.incrementTuple();
 195  
                 }
 196  0
             }  
 197  
             public final void flushDestUrl(int pauseIndex) throws IOException {
 198  0
                 while (buffer.getReadIndex() < pauseIndex) {
 199  0
                     int nextPause = buffer.getDestUrlEndIndex();
 200  0
                     int count = nextPause - buffer.getReadIndex();
 201  
                     
 202  0
                     output.writeString(buffer.getDestUrl());
 203  0
                     output.writeInt(count);
 204  0
                     buffer.incrementDestUrl();
 205  
                       
 206  0
                     flushTuples(nextPause);
 207  0
                     assert nextPause == buffer.getReadIndex();
 208  0
                 }
 209  0
             }
 210  
             public void flush() throws IOException { 
 211  0
                 flushDestUrl(buffer.getWriteIndex());
 212  0
                 buffer.reset(); 
 213  0
                 lastFlush = true;
 214  0
             }                           
 215  
         }
 216  0
         public static class ShreddedBuffer {
 217  0
             ArrayList<String> destUrls = new ArrayList();
 218  0
             ArrayList<Integer> destUrlTupleIdx = new ArrayList();
 219  0
             int destUrlReadIdx = 0;
 220  
                             
 221  
             String[] srcUrls;
 222  
             String[] anchorTexts;
 223  
             boolean[] noFollows;
 224  0
             int writeTupleIndex = 0;
 225  0
             int readTupleIndex = 0;
 226  
             int batchSize;
 227  
 
 228  0
             public ShreddedBuffer(int batchSize) {
 229  0
                 this.batchSize = batchSize;
 230  
 
 231  0
                 srcUrls = new String[batchSize];
 232  0
                 anchorTexts = new String[batchSize];
 233  0
                 noFollows = new boolean[batchSize];
 234  0
             }                              
 235  
 
 236  
             public ShreddedBuffer() {    
 237  0
                 this(10000);
 238  0
             }                                                                                                                    
 239  
             
 240  
             public void processDestUrl(String destUrl) {
 241  0
                 destUrls.add(destUrl);
 242  0
                 destUrlTupleIdx.add(writeTupleIndex);
 243  0
             }                                      
 244  
             public void processTuple(String srcUrl, String anchorText, boolean noFollow) {
 245  0
                 assert destUrls.size() > 0;
 246  0
                 srcUrls[writeTupleIndex] = srcUrl;
 247  0
                 anchorTexts[writeTupleIndex] = anchorText;
 248  0
                 noFollows[writeTupleIndex] = noFollow;
 249  0
                 writeTupleIndex++;
 250  0
             }
 251  
             public void resetData() {
 252  0
                 destUrls.clear();
 253  0
                 destUrlTupleIdx.clear();
 254  0
                 writeTupleIndex = 0;
 255  0
             }                  
 256  
                                  
 257  
             public void resetRead() {
 258  0
                 readTupleIndex = 0;
 259  0
                 destUrlReadIdx = 0;
 260  0
             } 
 261  
 
 262  
             public void reset() {
 263  0
                 resetData();
 264  0
                 resetRead();
 265  0
             } 
 266  
             public boolean isFull() {
 267  0
                 return writeTupleIndex >= batchSize;
 268  
             }
 269  
 
 270  
             public boolean isEmpty() {
 271  0
                 return writeTupleIndex == 0;
 272  
             }                          
 273  
 
 274  
             public boolean isAtEnd() {
 275  0
                 return readTupleIndex >= writeTupleIndex;
 276  
             }           
 277  
             public void incrementDestUrl() {
 278  0
                 destUrlReadIdx++;  
 279  0
             }                                                                                              
 280  
 
 281  
             public void autoIncrementDestUrl() {
 282  0
                 while (readTupleIndex >= getDestUrlEndIndex() && readTupleIndex < writeTupleIndex)
 283  0
                     destUrlReadIdx++;
 284  0
             }                 
 285  
             public void incrementTuple() {
 286  0
                 readTupleIndex++;
 287  0
             }                    
 288  
             public int getDestUrlEndIndex() {
 289  0
                 if ((destUrlReadIdx+1) >= destUrlTupleIdx.size())
 290  0
                     return writeTupleIndex;
 291  0
                 return destUrlTupleIdx.get(destUrlReadIdx+1);
 292  
             }
 293  
             public int getReadIndex() {
 294  0
                 return readTupleIndex;
 295  
             }   
 296  
 
 297  
             public int getWriteIndex() {
 298  0
                 return writeTupleIndex;
 299  
             } 
 300  
             public String getDestUrl() {
 301  0
                 assert readTupleIndex < writeTupleIndex;
 302  0
                 assert destUrlReadIdx < destUrls.size();
 303  
                 
 304  0
                 return destUrls.get(destUrlReadIdx);
 305  
             }
 306  
             public String getSrcUrl() {
 307  0
                 assert readTupleIndex < writeTupleIndex;
 308  0
                 return srcUrls[readTupleIndex];
 309  
             }                                         
 310  
             public String getAnchorText() {
 311  0
                 assert readTupleIndex < writeTupleIndex;
 312  0
                 return anchorTexts[readTupleIndex];
 313  
             }                                         
 314  
             public boolean getNoFollow() {
 315  0
                 assert readTupleIndex < writeTupleIndex;
 316  0
                 return noFollows[readTupleIndex];
 317  
             }                                         
 318  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 319  0
                 while (getReadIndex() < endIndex) {
 320  0
                    output.processTuple(getSrcUrl(), getAnchorText(), getNoFollow());
 321  0
                    incrementTuple();
 322  
                 }
 323  0
             }                                                                           
 324  
             public void copyUntilIndexDestUrl(int endIndex, ShreddedProcessor output) throws IOException {
 325  0
                 while (getReadIndex() < endIndex) {
 326  0
                     output.processDestUrl(getDestUrl());
 327  0
                     assert getDestUrlEndIndex() <= endIndex;
 328  0
                     copyTuples(getDestUrlEndIndex(), output);
 329  0
                     incrementDestUrl();
 330  
                 }
 331  0
             }  
 332  
             public void copyUntilDestUrl(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 333  0
                 while (!isAtEnd()) {
 334  0
                     if (other != null) {   
 335  0
                         assert !other.isAtEnd();
 336  0
                         int c = + Utility.compare(getDestUrl(), other.getDestUrl());
 337  
                     
 338  0
                         if (c > 0) {
 339  0
                             break;   
 340  
                         }
 341  
                         
 342  0
                         output.processDestUrl(getDestUrl());
 343  
                                       
 344  0
                         copyTuples(getDestUrlEndIndex(), output);
 345  0
                     } else {
 346  0
                         output.processDestUrl(getDestUrl());
 347  0
                         copyTuples(getDestUrlEndIndex(), output);
 348  
                     }
 349  0
                     incrementDestUrl();  
 350  
                     
 351  
                
 352  
                 }
 353  0
             }
 354  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 355  0
                 copyUntilDestUrl(other, output);
 356  0
             }
 357  
             
 358  
         }                         
 359  0
         public static class ShreddedCombiner implements ReaderSource<ExtractedLink>, ShreddedSource {   
 360  
             public ShreddedProcessor processor;
 361  
             Collection<ShreddedReader> readers;       
 362  0
             boolean closeOnExit = false;
 363  0
             boolean uninitialized = true;
 364  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 365  
             
 366  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 367  0
                 this.readers = readers;                                                       
 368  0
                 this.closeOnExit = closeOnExit;
 369  0
             }
 370  
                                   
 371  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 372  0
                 if (processor instanceof ShreddedProcessor) {
 373  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 374  0
                 } else if (processor instanceof ExtractedLink.Processor) {
 375  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((ExtractedLink.Processor) processor));
 376  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 377  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<ExtractedLink>) processor));
 378  
                 } else {
 379  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 380  
                 }
 381  0
             }                                
 382  
             
 383  
             public Class<ExtractedLink> getOutputClass() {
 384  0
                 return ExtractedLink.class;
 385  
             }
 386  
             
 387  
             public void initialize() throws IOException {
 388  0
                 for (ShreddedReader reader : readers) {
 389  0
                     reader.fill();                                        
 390  
                     
 391  0
                     if (!reader.getBuffer().isAtEnd())
 392  0
                         queue.add(reader);
 393  
                 }   
 394  
 
 395  0
                 uninitialized = false;
 396  0
             }
 397  
 
 398  
             public void run() throws IOException {
 399  0
                 initialize();
 400  
                
 401  0
                 while (queue.size() > 0) {
 402  0
                     ShreddedReader top = queue.poll();
 403  0
                     ShreddedReader next = null;
 404  0
                     ShreddedBuffer nextBuffer = null; 
 405  
                     
 406  0
                     assert !top.getBuffer().isAtEnd();
 407  
                                                   
 408  0
                     if (queue.size() > 0) {
 409  0
                         next = queue.peek();
 410  0
                         nextBuffer = next.getBuffer();
 411  0
                         assert !nextBuffer.isAtEnd();
 412  
                     }
 413  
                     
 414  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 415  0
                     if (top.getBuffer().isAtEnd())
 416  0
                         top.fill();                 
 417  
                         
 418  0
                     if (!top.getBuffer().isAtEnd())
 419  0
                         queue.add(top);
 420  0
                 }              
 421  
                 
 422  0
                 if (closeOnExit)
 423  0
                     processor.close();
 424  0
             }
 425  
 
 426  
             public ExtractedLink read() throws IOException {
 427  0
                 if (uninitialized)
 428  0
                     initialize();
 429  
 
 430  0
                 ExtractedLink result = null;
 431  
 
 432  0
                 while (queue.size() > 0) {
 433  0
                     ShreddedReader top = queue.poll();
 434  0
                     result = top.read();
 435  
 
 436  0
                     if (result != null) {
 437  0
                         if (top.getBuffer().isAtEnd())
 438  0
                             top.fill();
 439  
 
 440  0
                         queue.offer(top);
 441  0
                         break;
 442  
                     } 
 443  0
                 }
 444  
 
 445  0
                 return result;
 446  
             }
 447  
         } 
 448  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<ExtractedLink>, ShreddedSource {      
 449  
             public ShreddedProcessor processor;
 450  
             ShreddedBuffer buffer;
 451  0
             ExtractedLink last = new ExtractedLink();         
 452  0
             long updateDestUrlCount = -1;
 453  0
             long tupleCount = 0;
 454  0
             long bufferStartCount = 0;  
 455  
             ArrayInput input;
 456  
             
 457  0
             public ShreddedReader(ArrayInput input) {
 458  0
                 this.input = input; 
 459  0
                 this.buffer = new ShreddedBuffer();
 460  0
             }                               
 461  
             
 462  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 463  0
                 this.input = input;
 464  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 465  0
             }
 466  
                  
 467  
             public final int compareTo(ShreddedReader other) {
 468  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 469  
                 
 470  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 471  0
                     return 0;                 
 472  0
                 } else if (buffer.isAtEnd()) {
 473  0
                     return -1;
 474  0
                 } else if (otherBuffer.isAtEnd()) {
 475  0
                     return 1;
 476  
                 }
 477  
                                    
 478  0
                 int result = 0;
 479  
                 do {
 480  0
                     result = + Utility.compare(buffer.getDestUrl(), otherBuffer.getDestUrl());
 481  0
                     if(result != 0) break;
 482  
                 } while (false);                                             
 483  
                 
 484  0
                 return result;
 485  
             }
 486  
             
 487  
             public final ShreddedBuffer getBuffer() {
 488  0
                 return buffer;
 489  
             }                
 490  
             
 491  
             public final ExtractedLink read() throws IOException {
 492  0
                 if (buffer.isAtEnd()) {
 493  0
                     fill();             
 494  
                 
 495  0
                     if (buffer.isAtEnd()) {
 496  0
                         return null;
 497  
                     }
 498  
                 }
 499  
                       
 500  0
                 assert !buffer.isAtEnd();
 501  0
                 ExtractedLink result = new ExtractedLink();
 502  
                 
 503  0
                 result.destUrl = buffer.getDestUrl();
 504  0
                 result.srcUrl = buffer.getSrcUrl();
 505  0
                 result.anchorText = buffer.getAnchorText();
 506  0
                 result.noFollow = buffer.getNoFollow();
 507  
                 
 508  0
                 buffer.incrementTuple();
 509  0
                 buffer.autoIncrementDestUrl();
 510  
                 
 511  0
                 return result;
 512  
             }           
 513  
             
 514  
             public final void fill() throws IOException {
 515  
                 try {   
 516  0
                     buffer.reset();
 517  
                     
 518  0
                     if (tupleCount != 0) {
 519  
                                                       
 520  0
                         if(updateDestUrlCount - tupleCount > 0) {
 521  0
                             buffer.destUrls.add(last.destUrl);
 522  0
                             buffer.destUrlTupleIdx.add((int) (updateDestUrlCount - tupleCount));
 523  
                         }
 524  0
                         bufferStartCount = tupleCount;
 525  
                     }
 526  
                     
 527  0
                     while (!buffer.isFull()) {
 528  0
                         updateDestUrl();
 529  0
                         buffer.processTuple(input.readString(), input.readString(), input.readBoolean());
 530  0
                         tupleCount++;
 531  
                     }
 532  0
                 } catch(EOFException e) {}
 533  0
             }
 534  
 
 535  
             public final void updateDestUrl() throws IOException {
 536  0
                 if (updateDestUrlCount > tupleCount)
 537  0
                     return;
 538  
                      
 539  0
                 last.destUrl = input.readString();
 540  0
                 updateDestUrlCount = tupleCount + input.readInt();
 541  
                                       
 542  0
                 buffer.processDestUrl(last.destUrl);
 543  0
             }
 544  
 
 545  
             public void run() throws IOException {
 546  
                 while (true) {
 547  0
                     fill();
 548  
                     
 549  0
                     if (buffer.isAtEnd())
 550  0
                         break;
 551  
                     
 552  0
                     buffer.copyUntil(null, processor);
 553  
                 }      
 554  0
                 processor.close();
 555  0
             }
 556  
             
 557  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 558  0
                 if (processor instanceof ShreddedProcessor) {
 559  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 560  0
                 } else if (processor instanceof ExtractedLink.Processor) {
 561  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((ExtractedLink.Processor) processor));
 562  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 563  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<ExtractedLink>) processor));
 564  
                 } else {
 565  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 566  
                 }
 567  0
             }                                
 568  
             
 569  
             public Class<ExtractedLink> getOutputClass() {
 570  0
                 return ExtractedLink.class;
 571  
             }                
 572  
         }
 573  
         
 574  
         public static class DuplicateEliminator implements ShreddedProcessor {
 575  
             public ShreddedProcessor processor;
 576  0
             ExtractedLink last = new ExtractedLink();
 577  0
             boolean destUrlProcess = true;
 578  
                                            
 579  0
             public DuplicateEliminator() {}
 580  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 581  0
                 this.processor = processor;
 582  0
             }
 583  
             
 584  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 585  0
                 this.processor = processor;
 586  0
             }
 587  
 
 588  
             public void processDestUrl(String destUrl) throws IOException {  
 589  0
                 if (destUrlProcess || Utility.compare(destUrl, last.destUrl) != 0) {
 590  0
                     last.destUrl = destUrl;
 591  0
                     processor.processDestUrl(destUrl);
 592  0
                     destUrlProcess = false;
 593  
                 }
 594  0
             }  
 595  
             
 596  
             public void resetDestUrl() {
 597  0
                  destUrlProcess = true;
 598  0
             }                                                
 599  
                                
 600  
             public void processTuple(String srcUrl, String anchorText, boolean noFollow) throws IOException {
 601  0
                 processor.processTuple(srcUrl, anchorText, noFollow);
 602  0
             } 
 603  
             
 604  
             public void close() throws IOException {
 605  0
                 processor.close();
 606  0
             }                    
 607  
         }
 608  
         public static class TupleUnshredder implements ShreddedProcessor {
 609  0
             ExtractedLink last = new ExtractedLink();
 610  
             public org.galagosearch.tupleflow.Processor<ExtractedLink> processor;                               
 611  
             
 612  0
             public TupleUnshredder(ExtractedLink.Processor processor) {
 613  0
                 this.processor = processor;
 614  0
             }         
 615  
             
 616  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<ExtractedLink> processor) {
 617  0
                 this.processor = processor;
 618  0
             }
 619  
             
 620  
             public ExtractedLink clone(ExtractedLink object) {
 621  0
                 ExtractedLink result = new ExtractedLink();
 622  0
                 if (object == null) return result;
 623  0
                 result.srcUrl = object.srcUrl; 
 624  0
                 result.destUrl = object.destUrl; 
 625  0
                 result.anchorText = object.anchorText; 
 626  0
                 result.noFollow = object.noFollow; 
 627  0
                 return result;
 628  
             }                 
 629  
             
 630  
             public void processDestUrl(String destUrl) throws IOException {
 631  0
                 last.destUrl = destUrl;
 632  0
             }   
 633  
                 
 634  
             
 635  
             public void processTuple(String srcUrl, String anchorText, boolean noFollow) throws IOException {
 636  0
                 last.srcUrl = srcUrl;
 637  0
                 last.anchorText = anchorText;
 638  0
                 last.noFollow = noFollow;
 639  0
                 processor.process(clone(last));
 640  0
             }               
 641  
             
 642  
             public void close() throws IOException {
 643  0
                 processor.close();
 644  0
             }
 645  
         }     
 646  36
         public static class TupleShredder implements Processor {
 647  0
             ExtractedLink last = new ExtractedLink();
 648  
             public ShreddedProcessor processor;
 649  
             
 650  0
             public TupleShredder(ShreddedProcessor processor) {
 651  0
                 this.processor = processor;
 652  0
             }                              
 653  
             
 654  
             public ExtractedLink clone(ExtractedLink object) {
 655  0
                 ExtractedLink result = new ExtractedLink();
 656  0
                 if (object == null) return result;
 657  0
                 result.srcUrl = object.srcUrl; 
 658  0
                 result.destUrl = object.destUrl; 
 659  0
                 result.anchorText = object.anchorText; 
 660  0
                 result.noFollow = object.noFollow; 
 661  0
                 return result;
 662  
             }                 
 663  
             
 664  
             public void process(ExtractedLink object) throws IOException {                                                                                                                                                   
 665  0
                 boolean processAll = false;
 666  0
                 if(last == null || Utility.compare(last.destUrl, object.destUrl) != 0 || processAll) { processor.processDestUrl(object.destUrl); processAll = true; }
 667  0
                 processor.processTuple(object.srcUrl, object.anchorText, object.noFollow);                                         
 668  0
             }
 669  
                           
 670  
             public Class<ExtractedLink> getInputClass() {
 671  0
                 return ExtractedLink.class;
 672  
             }
 673  
             
 674  
             public void close() throws IOException {
 675  0
                 processor.close();
 676  0
             }                     
 677  
         }
 678  
     } 
 679  0
     public static class SrcUrlOrder implements Order<ExtractedLink> {
 680  
         public int hash(ExtractedLink object) {
 681  0
             int h = 0;
 682  0
             h += Utility.hash(object.srcUrl);
 683  0
             return h;
 684  
         } 
 685  
         public Comparator<ExtractedLink> greaterThan() {
 686  0
             return new Comparator<ExtractedLink>() {
 687  0
                 public int compare(ExtractedLink one, ExtractedLink two) {
 688  0
                     int result = 0;
 689  
                     do {
 690  0
                         result = + Utility.compare(one.srcUrl, two.srcUrl);
 691  0
                         if(result != 0) break;
 692  
                     } while (false);
 693  0
                     return -result;
 694  
                 }
 695  
             };
 696  
         }     
 697  
         public Comparator<ExtractedLink> lessThan() {
 698  0
             return new Comparator<ExtractedLink>() {
 699  0
                 public int compare(ExtractedLink one, ExtractedLink two) {
 700  0
                     int result = 0;
 701  
                     do {
 702  0
                         result = + Utility.compare(one.srcUrl, two.srcUrl);
 703  0
                         if(result != 0) break;
 704  
                     } while (false);
 705  0
                     return result;
 706  
                 }
 707  
             };
 708  
         }     
 709  
         public TypeReader<ExtractedLink> orderedReader(ArrayInput _input) {
 710  0
             return new ShreddedReader(_input);
 711  
         }    
 712  
 
 713  
         public TypeReader<ExtractedLink> orderedReader(ArrayInput _input, int bufferSize) {
 714  0
             return new ShreddedReader(_input, bufferSize);
 715  
         }    
 716  
         public OrderedWriter<ExtractedLink> orderedWriter(ArrayOutput _output) {
 717  0
             ShreddedWriter w = new ShreddedWriter(_output);
 718  0
             return new OrderedWriterClass(w); 
 719  
         }                                    
 720  0
         public static class OrderedWriterClass extends OrderedWriter< ExtractedLink > {
 721  0
             ExtractedLink last = null;
 722  0
             ShreddedWriter shreddedWriter = null; 
 723  
             
 724  0
             public OrderedWriterClass(ShreddedWriter s) {
 725  0
                 this.shreddedWriter = s;
 726  0
             }
 727  
             
 728  
             public void process(ExtractedLink object) throws IOException {
 729  0
                boolean processAll = false;
 730  0
                if (processAll || last == null || 0 != Utility.compare(object.srcUrl, last.srcUrl)) { processAll = true; shreddedWriter.processSrcUrl(object.srcUrl); }
 731  0
                shreddedWriter.processTuple(object.destUrl, object.anchorText, object.noFollow);
 732  0
                last = object;
 733  0
             }           
 734  
                  
 735  
             public void close() throws IOException {
 736  0
                 shreddedWriter.close();
 737  0
             }
 738  
             
 739  
             public Class<ExtractedLink> getInputClass() {
 740  0
                 return ExtractedLink.class;
 741  
             }
 742  
         } 
 743  
         public ReaderSource<ExtractedLink> orderedCombiner(Collection<TypeReader<ExtractedLink>> readers, boolean closeOnExit) {
 744  0
             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
 745  
             
 746  0
             for (TypeReader<ExtractedLink> reader : readers) {
 747  0
                 shreddedReaders.add((ShreddedReader)reader);
 748  
             }
 749  
             
 750  0
             return new ShreddedCombiner(shreddedReaders, closeOnExit);
 751  
         }                  
 752  
         public ExtractedLink clone(ExtractedLink object) {
 753  0
             ExtractedLink result = new ExtractedLink();
 754  0
             if (object == null) return result;
 755  0
             result.srcUrl = object.srcUrl; 
 756  0
             result.destUrl = object.destUrl; 
 757  0
             result.anchorText = object.anchorText; 
 758  0
             result.noFollow = object.noFollow; 
 759  0
             return result;
 760  
         }                 
 761  
         public Class<ExtractedLink> getOrderedClass() {
 762  0
             return ExtractedLink.class;
 763  
         }                           
 764  
         public String[] getOrderSpec() {
 765  0
             return new String[] {"+srcUrl"};
 766  
         }
 767  
 
 768  
         public static String getSpecString() {
 769  0
             return "+srcUrl";
 770  
         }
 771  
                            
 772  
         public interface ShreddedProcessor extends Step {
 773  
             public void processSrcUrl(String srcUrl) throws IOException;
 774  
             public void processTuple(String destUrl, String anchorText, boolean noFollow) throws IOException;
 775  
             public void close() throws IOException;
 776  
         }    
 777  
         public interface ShreddedSource extends Step {
 778  
         }                                              
 779  
         
 780  0
         public static class ShreddedWriter implements ShreddedProcessor {
 781  
             ArrayOutput output;
 782  0
             ShreddedBuffer buffer = new ShreddedBuffer();
 783  
             String lastSrcUrl;
 784  0
             boolean lastFlush = false;
 785  
             
 786  0
             public ShreddedWriter(ArrayOutput output) {
 787  0
                 this.output = output;
 788  0
             }                        
 789  
             
 790  
             public void close() throws IOException {
 791  0
                 flush();
 792  0
             }
 793  
             
 794  
             public void processSrcUrl(String srcUrl) {
 795  0
                 lastSrcUrl = srcUrl;
 796  0
                 buffer.processSrcUrl(srcUrl);
 797  0
             }
 798  
             public final void processTuple(String destUrl, String anchorText, boolean noFollow) throws IOException {
 799  0
                 if (lastFlush) {
 800  0
                     if(buffer.srcUrls.size() == 0) buffer.processSrcUrl(lastSrcUrl);
 801  0
                     lastFlush = false;
 802  
                 }
 803  0
                 buffer.processTuple(destUrl, anchorText, noFollow);
 804  0
                 if (buffer.isFull())
 805  0
                     flush();
 806  0
             }
 807  
             public final void flushTuples(int pauseIndex) throws IOException {
 808  
                 
 809  0
                 while (buffer.getReadIndex() < pauseIndex) {
 810  
                            
 811  0
                     output.writeString(buffer.getDestUrl());
 812  0
                     output.writeString(buffer.getAnchorText());
 813  0
                     output.writeBoolean(buffer.getNoFollow());
 814  0
                     buffer.incrementTuple();
 815  
                 }
 816  0
             }  
 817  
             public final void flushSrcUrl(int pauseIndex) throws IOException {
 818  0
                 while (buffer.getReadIndex() < pauseIndex) {
 819  0
                     int nextPause = buffer.getSrcUrlEndIndex();
 820  0
                     int count = nextPause - buffer.getReadIndex();
 821  
                     
 822  0
                     output.writeString(buffer.getSrcUrl());
 823  0
                     output.writeInt(count);
 824  0
                     buffer.incrementSrcUrl();
 825  
                       
 826  0
                     flushTuples(nextPause);
 827  0
                     assert nextPause == buffer.getReadIndex();
 828  0
                 }
 829  0
             }
 830  
             public void flush() throws IOException { 
 831  0
                 flushSrcUrl(buffer.getWriteIndex());
 832  0
                 buffer.reset(); 
 833  0
                 lastFlush = true;
 834  0
             }                           
 835  
         }
 836  0
         public static class ShreddedBuffer {
 837  0
             ArrayList<String> srcUrls = new ArrayList();
 838  0
             ArrayList<Integer> srcUrlTupleIdx = new ArrayList();
 839  0
             int srcUrlReadIdx = 0;
 840  
                             
 841  
             String[] destUrls;
 842  
             String[] anchorTexts;
 843  
             boolean[] noFollows;
 844  0
             int writeTupleIndex = 0;
 845  0
             int readTupleIndex = 0;
 846  
             int batchSize;
 847  
 
 848  0
             public ShreddedBuffer(int batchSize) {
 849  0
                 this.batchSize = batchSize;
 850  
 
 851  0
                 destUrls = new String[batchSize];
 852  0
                 anchorTexts = new String[batchSize];
 853  0
                 noFollows = new boolean[batchSize];
 854  0
             }                              
 855  
 
 856  
             public ShreddedBuffer() {    
 857  0
                 this(10000);
 858  0
             }                                                                                                                    
 859  
             
 860  
             public void processSrcUrl(String srcUrl) {
 861  0
                 srcUrls.add(srcUrl);
 862  0
                 srcUrlTupleIdx.add(writeTupleIndex);
 863  0
             }                                      
 864  
             public void processTuple(String destUrl, String anchorText, boolean noFollow) {
 865  0
                 assert srcUrls.size() > 0;
 866  0
                 destUrls[writeTupleIndex] = destUrl;
 867  0
                 anchorTexts[writeTupleIndex] = anchorText;
 868  0
                 noFollows[writeTupleIndex] = noFollow;
 869  0
                 writeTupleIndex++;
 870  0
             }
 871  
             public void resetData() {
 872  0
                 srcUrls.clear();
 873  0
                 srcUrlTupleIdx.clear();
 874  0
                 writeTupleIndex = 0;
 875  0
             }                  
 876  
                                  
 877  
             public void resetRead() {
 878  0
                 readTupleIndex = 0;
 879  0
                 srcUrlReadIdx = 0;
 880  0
             } 
 881  
 
 882  
             public void reset() {
 883  0
                 resetData();
 884  0
                 resetRead();
 885  0
             } 
 886  
             public boolean isFull() {
 887  0
                 return writeTupleIndex >= batchSize;
 888  
             }
 889  
 
 890  
             public boolean isEmpty() {
 891  0
                 return writeTupleIndex == 0;
 892  
             }                          
 893  
 
 894  
             public boolean isAtEnd() {
 895  0
                 return readTupleIndex >= writeTupleIndex;
 896  
             }           
 897  
             public void incrementSrcUrl() {
 898  0
                 srcUrlReadIdx++;  
 899  0
             }                                                                                              
 900  
 
 901  
             public void autoIncrementSrcUrl() {
 902  0
                 while (readTupleIndex >= getSrcUrlEndIndex() && readTupleIndex < writeTupleIndex)
 903  0
                     srcUrlReadIdx++;
 904  0
             }                 
 905  
             public void incrementTuple() {
 906  0
                 readTupleIndex++;
 907  0
             }                    
 908  
             public int getSrcUrlEndIndex() {
 909  0
                 if ((srcUrlReadIdx+1) >= srcUrlTupleIdx.size())
 910  0
                     return writeTupleIndex;
 911  0
                 return srcUrlTupleIdx.get(srcUrlReadIdx+1);
 912  
             }
 913  
             public int getReadIndex() {
 914  0
                 return readTupleIndex;
 915  
             }   
 916  
 
 917  
             public int getWriteIndex() {
 918  0
                 return writeTupleIndex;
 919  
             } 
 920  
             public String getSrcUrl() {
 921  0
                 assert readTupleIndex < writeTupleIndex;
 922  0
                 assert srcUrlReadIdx < srcUrls.size();
 923  
                 
 924  0
                 return srcUrls.get(srcUrlReadIdx);
 925  
             }
 926  
             public String getDestUrl() {
 927  0
                 assert readTupleIndex < writeTupleIndex;
 928  0
                 return destUrls[readTupleIndex];
 929  
             }                                         
 930  
             public String getAnchorText() {
 931  0
                 assert readTupleIndex < writeTupleIndex;
 932  0
                 return anchorTexts[readTupleIndex];
 933  
             }                                         
 934  
             public boolean getNoFollow() {
 935  0
                 assert readTupleIndex < writeTupleIndex;
 936  0
                 return noFollows[readTupleIndex];
 937  
             }                                         
 938  
             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
 939  0
                 while (getReadIndex() < endIndex) {
 940  0
                    output.processTuple(getDestUrl(), getAnchorText(), getNoFollow());
 941  0
                    incrementTuple();
 942  
                 }
 943  0
             }                                                                           
 944  
             public void copyUntilIndexSrcUrl(int endIndex, ShreddedProcessor output) throws IOException {
 945  0
                 while (getReadIndex() < endIndex) {
 946  0
                     output.processSrcUrl(getSrcUrl());
 947  0
                     assert getSrcUrlEndIndex() <= endIndex;
 948  0
                     copyTuples(getSrcUrlEndIndex(), output);
 949  0
                     incrementSrcUrl();
 950  
                 }
 951  0
             }  
 952  
             public void copyUntilSrcUrl(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 953  0
                 while (!isAtEnd()) {
 954  0
                     if (other != null) {   
 955  0
                         assert !other.isAtEnd();
 956  0
                         int c = + Utility.compare(getSrcUrl(), other.getSrcUrl());
 957  
                     
 958  0
                         if (c > 0) {
 959  0
                             break;   
 960  
                         }
 961  
                         
 962  0
                         output.processSrcUrl(getSrcUrl());
 963  
                                       
 964  0
                         copyTuples(getSrcUrlEndIndex(), output);
 965  0
                     } else {
 966  0
                         output.processSrcUrl(getSrcUrl());
 967  0
                         copyTuples(getSrcUrlEndIndex(), output);
 968  
                     }
 969  0
                     incrementSrcUrl();  
 970  
                     
 971  
                
 972  
                 }
 973  0
             }
 974  
             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
 975  0
                 copyUntilSrcUrl(other, output);
 976  0
             }
 977  
             
 978  
         }                         
 979  0
         public static class ShreddedCombiner implements ReaderSource<ExtractedLink>, ShreddedSource {   
 980  
             public ShreddedProcessor processor;
 981  
             Collection<ShreddedReader> readers;       
 982  0
             boolean closeOnExit = false;
 983  0
             boolean uninitialized = true;
 984  0
             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
 985  
             
 986  0
             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
 987  0
                 this.readers = readers;                                                       
 988  0
                 this.closeOnExit = closeOnExit;
 989  0
             }
 990  
                                   
 991  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 992  0
                 if (processor instanceof ShreddedProcessor) {
 993  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 994  0
                 } else if (processor instanceof ExtractedLink.Processor) {
 995  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((ExtractedLink.Processor) processor));
 996  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 997  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<ExtractedLink>) processor));
 998  
                 } else {
 999  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1000  
                 }
 1001  0
             }                                
 1002  
             
 1003  
             public Class<ExtractedLink> getOutputClass() {
 1004  0
                 return ExtractedLink.class;
 1005  
             }
 1006  
             
 1007  
             public void initialize() throws IOException {
 1008  0
                 for (ShreddedReader reader : readers) {
 1009  0
                     reader.fill();                                        
 1010  
                     
 1011  0
                     if (!reader.getBuffer().isAtEnd())
 1012  0
                         queue.add(reader);
 1013  
                 }   
 1014  
 
 1015  0
                 uninitialized = false;
 1016  0
             }
 1017  
 
 1018  
             public void run() throws IOException {
 1019  0
                 initialize();
 1020  
                
 1021  0
                 while (queue.size() > 0) {
 1022  0
                     ShreddedReader top = queue.poll();
 1023  0
                     ShreddedReader next = null;
 1024  0
                     ShreddedBuffer nextBuffer = null; 
 1025  
                     
 1026  0
                     assert !top.getBuffer().isAtEnd();
 1027  
                                                   
 1028  0
                     if (queue.size() > 0) {
 1029  0
                         next = queue.peek();
 1030  0
                         nextBuffer = next.getBuffer();
 1031  0
                         assert !nextBuffer.isAtEnd();
 1032  
                     }
 1033  
                     
 1034  0
                     top.getBuffer().copyUntil(nextBuffer, processor);
 1035  0
                     if (top.getBuffer().isAtEnd())
 1036  0
                         top.fill();                 
 1037  
                         
 1038  0
                     if (!top.getBuffer().isAtEnd())
 1039  0
                         queue.add(top);
 1040  0
                 }              
 1041  
                 
 1042  0
                 if (closeOnExit)
 1043  0
                     processor.close();
 1044  0
             }
 1045  
 
 1046  
             public ExtractedLink read() throws IOException {
 1047  0
                 if (uninitialized)
 1048  0
                     initialize();
 1049  
 
 1050  0
                 ExtractedLink result = null;
 1051  
 
 1052  0
                 while (queue.size() > 0) {
 1053  0
                     ShreddedReader top = queue.poll();
 1054  0
                     result = top.read();
 1055  
 
 1056  0
                     if (result != null) {
 1057  0
                         if (top.getBuffer().isAtEnd())
 1058  0
                             top.fill();
 1059  
 
 1060  0
                         queue.offer(top);
 1061  0
                         break;
 1062  
                     } 
 1063  0
                 }
 1064  
 
 1065  0
                 return result;
 1066  
             }
 1067  
         } 
 1068  0
         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<ExtractedLink>, ShreddedSource {      
 1069  
             public ShreddedProcessor processor;
 1070  
             ShreddedBuffer buffer;
 1071  0
             ExtractedLink last = new ExtractedLink();         
 1072  0
             long updateSrcUrlCount = -1;
 1073  0
             long tupleCount = 0;
 1074  0
             long bufferStartCount = 0;  
 1075  
             ArrayInput input;
 1076  
             
 1077  0
             public ShreddedReader(ArrayInput input) {
 1078  0
                 this.input = input; 
 1079  0
                 this.buffer = new ShreddedBuffer();
 1080  0
             }                               
 1081  
             
 1082  0
             public ShreddedReader(ArrayInput input, int bufferSize) { 
 1083  0
                 this.input = input;
 1084  0
                 this.buffer = new ShreddedBuffer(bufferSize);
 1085  0
             }
 1086  
                  
 1087  
             public final int compareTo(ShreddedReader other) {
 1088  0
                 ShreddedBuffer otherBuffer = other.getBuffer();
 1089  
                 
 1090  0
                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
 1091  0
                     return 0;                 
 1092  0
                 } else if (buffer.isAtEnd()) {
 1093  0
                     return -1;
 1094  0
                 } else if (otherBuffer.isAtEnd()) {
 1095  0
                     return 1;
 1096  
                 }
 1097  
                                    
 1098  0
                 int result = 0;
 1099  
                 do {
 1100  0
                     result = + Utility.compare(buffer.getSrcUrl(), otherBuffer.getSrcUrl());
 1101  0
                     if(result != 0) break;
 1102  
                 } while (false);                                             
 1103  
                 
 1104  0
                 return result;
 1105  
             }
 1106  
             
 1107  
             public final ShreddedBuffer getBuffer() {
 1108  0
                 return buffer;
 1109  
             }                
 1110  
             
 1111  
             public final ExtractedLink read() throws IOException {
 1112  0
                 if (buffer.isAtEnd()) {
 1113  0
                     fill();             
 1114  
                 
 1115  0
                     if (buffer.isAtEnd()) {
 1116  0
                         return null;
 1117  
                     }
 1118  
                 }
 1119  
                       
 1120  0
                 assert !buffer.isAtEnd();
 1121  0
                 ExtractedLink result = new ExtractedLink();
 1122  
                 
 1123  0
                 result.srcUrl = buffer.getSrcUrl();
 1124  0
                 result.destUrl = buffer.getDestUrl();
 1125  0
                 result.anchorText = buffer.getAnchorText();
 1126  0
                 result.noFollow = buffer.getNoFollow();
 1127  
                 
 1128  0
                 buffer.incrementTuple();
 1129  0
                 buffer.autoIncrementSrcUrl();
 1130  
                 
 1131  0
                 return result;
 1132  
             }           
 1133  
             
 1134  
             public final void fill() throws IOException {
 1135  
                 try {   
 1136  0
                     buffer.reset();
 1137  
                     
 1138  0
                     if (tupleCount != 0) {
 1139  
                                                       
 1140  0
                         if(updateSrcUrlCount - tupleCount > 0) {
 1141  0
                             buffer.srcUrls.add(last.srcUrl);
 1142  0
                             buffer.srcUrlTupleIdx.add((int) (updateSrcUrlCount - tupleCount));
 1143  
                         }
 1144  0
                         bufferStartCount = tupleCount;
 1145  
                     }
 1146  
                     
 1147  0
                     while (!buffer.isFull()) {
 1148  0
                         updateSrcUrl();
 1149  0
                         buffer.processTuple(input.readString(), input.readString(), input.readBoolean());
 1150  0
                         tupleCount++;
 1151  
                     }
 1152  0
                 } catch(EOFException e) {}
 1153  0
             }
 1154  
 
 1155  
             public final void updateSrcUrl() throws IOException {
 1156  0
                 if (updateSrcUrlCount > tupleCount)
 1157  0
                     return;
 1158  
                      
 1159  0
                 last.srcUrl = input.readString();
 1160  0
                 updateSrcUrlCount = tupleCount + input.readInt();
 1161  
                                       
 1162  0
                 buffer.processSrcUrl(last.srcUrl);
 1163  0
             }
 1164  
 
 1165  
             public void run() throws IOException {
 1166  
                 while (true) {
 1167  0
                     fill();
 1168  
                     
 1169  0
                     if (buffer.isAtEnd())
 1170  0
                         break;
 1171  
                     
 1172  0
                     buffer.copyUntil(null, processor);
 1173  
                 }      
 1174  0
                 processor.close();
 1175  0
             }
 1176  
             
 1177  
             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
 1178  0
                 if (processor instanceof ShreddedProcessor) {
 1179  0
                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
 1180  0
                 } else if (processor instanceof ExtractedLink.Processor) {
 1181  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((ExtractedLink.Processor) processor));
 1182  0
                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
 1183  0
                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<ExtractedLink>) processor));
 1184  
                 } else {
 1185  0
                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
 1186  
                 }
 1187  0
             }                                
 1188  
             
 1189  
             public Class<ExtractedLink> getOutputClass() {
 1190  0
                 return ExtractedLink.class;
 1191  
             }                
 1192  
         }
 1193  
         
 1194  
         public static class DuplicateEliminator implements ShreddedProcessor {
 1195  
             public ShreddedProcessor processor;
 1196  0
             ExtractedLink last = new ExtractedLink();
 1197  0
             boolean srcUrlProcess = true;
 1198  
                                            
 1199  0
             public DuplicateEliminator() {}
 1200  0
             public DuplicateEliminator(ShreddedProcessor processor) {
 1201  0
                 this.processor = processor;
 1202  0
             }
 1203  
             
 1204  
             public void setShreddedProcessor(ShreddedProcessor processor) {
 1205  0
                 this.processor = processor;
 1206  0
             }
 1207  
 
 1208  
             public void processSrcUrl(String srcUrl) throws IOException {  
 1209  0
                 if (srcUrlProcess || Utility.compare(srcUrl, last.srcUrl) != 0) {
 1210  0
                     last.srcUrl = srcUrl;
 1211  0
                     processor.processSrcUrl(srcUrl);
 1212  0
                     srcUrlProcess = false;
 1213  
                 }
 1214  0
             }  
 1215  
             
 1216  
             public void resetSrcUrl() {
 1217  0
                  srcUrlProcess = true;
 1218  0
             }                                                
 1219  
                                
 1220  
             public void processTuple(String destUrl, String anchorText, boolean noFollow) throws IOException {
 1221  0
                 processor.processTuple(destUrl, anchorText, noFollow);
 1222  0
             } 
 1223  
             
 1224  
             public void close() throws IOException {
 1225  0
                 processor.close();
 1226  0
             }                    
 1227  
         }
 1228  
         public static class TupleUnshredder implements ShreddedProcessor {
 1229  0
             ExtractedLink last = new ExtractedLink();
 1230  
             public org.galagosearch.tupleflow.Processor<ExtractedLink> processor;                               
 1231  
             
 1232  0
             public TupleUnshredder(ExtractedLink.Processor processor) {
 1233  0
                 this.processor = processor;
 1234  0
             }         
 1235  
             
 1236  0
             public TupleUnshredder(org.galagosearch.tupleflow.Processor<ExtractedLink> processor) {
 1237  0
                 this.processor = processor;
 1238  0
             }
 1239  
             
 1240  
             public ExtractedLink clone(ExtractedLink object) {
 1241  0
                 ExtractedLink result = new ExtractedLink();
 1242  0
                 if (object == null) return result;
 1243  0
                 result.srcUrl = object.srcUrl; 
 1244  0
                 result.destUrl = object.destUrl; 
 1245  0
                 result.anchorText = object.anchorText; 
 1246  0
                 result.noFollow = object.noFollow; 
 1247  0
                 return result;
 1248  
             }                 
 1249  
             
 1250  
             public void processSrcUrl(String srcUrl) throws IOException {
 1251  0
                 last.srcUrl = srcUrl;
 1252  0
             }   
 1253  
                 
 1254  
             
 1255  
             public void processTuple(String destUrl, String anchorText, boolean noFollow) throws IOException {
 1256  0
                 last.destUrl = destUrl;
 1257  0
                 last.anchorText = anchorText;
 1258  0
                 last.noFollow = noFollow;
 1259  0
                 processor.process(clone(last));
 1260  0
             }               
 1261  
             
 1262  
             public void close() throws IOException {
 1263  0
                 processor.close();
 1264  0
             }
 1265  
         }     
 1266  0
         public static class TupleShredder implements Processor {
 1267  0
             ExtractedLink last = new ExtractedLink();
 1268  
             public ShreddedProcessor processor;
 1269  
             
 1270  0
             public TupleShredder(ShreddedProcessor processor) {
 1271  0
                 this.processor = processor;
 1272  0
             }                              
 1273  
             
 1274  
             public ExtractedLink clone(ExtractedLink object) {
 1275  0
                 ExtractedLink result = new ExtractedLink();
 1276  0
                 if (object == null) return result;
 1277  0
                 result.srcUrl = object.srcUrl; 
 1278  0
                 result.destUrl = object.destUrl; 
 1279  0
                 result.anchorText = object.anchorText; 
 1280  0
                 result.noFollow = object.noFollow; 
 1281  0
                 return result;
 1282  
             }                 
 1283  
             
 1284  
             public void process(ExtractedLink object) throws IOException {                                                                                                                                                   
 1285  0
                 boolean processAll = false;
 1286  0
                 if(last == null || Utility.compare(last.srcUrl, object.srcUrl) != 0 || processAll) { processor.processSrcUrl(object.srcUrl); processAll = true; }
 1287  0
                 processor.processTuple(object.destUrl, object.anchorText, object.noFollow);                                         
 1288  0
             }
 1289  
                           
 1290  
             public Class<ExtractedLink> getInputClass() {
 1291  0
                 return ExtractedLink.class;
 1292  
             }
 1293  
             
 1294  
             public void close() throws IOException {
 1295  0
                 processor.close();
 1296  0
             }                     
 1297  
         }
 1298  
     } 
 1299  
 }