View Javadoc

1   // This file was automatically generated with the command: 
2   //     java org.galagosearch.tupleflow.typebuilder.TypeBuilderMojo ...
3   package org.galagosearch.core.types;
4   
5   import org.galagosearch.tupleflow.Utility;
6   import org.galagosearch.tupleflow.ArrayInput;
7   import org.galagosearch.tupleflow.ArrayOutput;
8   import org.galagosearch.tupleflow.Order;   
9   import org.galagosearch.tupleflow.OrderedWriter;
10  import org.galagosearch.tupleflow.Type; 
11  import org.galagosearch.tupleflow.TypeReader;
12  import org.galagosearch.tupleflow.Step; 
13  import org.galagosearch.tupleflow.IncompatibleProcessorException;
14  import org.galagosearch.tupleflow.ReaderSource;
15  import java.io.IOException;             
16  import java.io.EOFException;
17  import java.io.UnsupportedEncodingException;
18  import java.util.ArrayList;
19  import java.util.Arrays;   
20  import java.util.Comparator;
21  import java.util.PriorityQueue;
22  import java.util.Collection;
23  
24  public class KeyValuePair implements Type<KeyValuePair> {
25      public byte[] key;
26      public byte[] value; 
27      
28      public KeyValuePair() {}
29      public KeyValuePair(byte[] key, byte[] value) {
30          this.key = key;
31          this.value = value;
32      }  
33      
34      public String toString() {
35          try {
36              return String.format("%s,%s",
37                                     new String(key, "UTF-8"), new String(value, "UTF-8"));
38          } catch(UnsupportedEncodingException e) {
39              throw new RuntimeException("Couldn't convert string to UTF-8.");
40          }
41      } 
42  
43      public Order<KeyValuePair> getOrder(String... spec) {
44          if (Arrays.equals(spec, new String[] { "+key" })) {
45              return new KeyOrder();
46          }
47          return null;
48      } 
49        
50      public interface Processor extends Step, org.galagosearch.tupleflow.Processor<KeyValuePair> {
51          public void process(KeyValuePair object) throws IOException;
52          public void close() throws IOException;
53      }                        
54      public interface Source extends Step {
55      }
56      public static class KeyOrder implements Order<KeyValuePair> {
57          public int hash(KeyValuePair object) {
58              int h = 0;
59              h += Utility.hash(object.key);
60              return h;
61          } 
62          public Comparator<KeyValuePair> greaterThan() {
63              return new Comparator<KeyValuePair>() {
64                  public int compare(KeyValuePair one, KeyValuePair two) {
65                      int result = 0;
66                      do {
67                          result = + Utility.compare(one.key, two.key);
68                          if(result != 0) break;
69                      } while (false);
70                      return -result;
71                  }
72              };
73          }     
74          public Comparator<KeyValuePair> lessThan() {
75              return new Comparator<KeyValuePair>() {
76                  public int compare(KeyValuePair one, KeyValuePair two) {
77                      int result = 0;
78                      do {
79                          result = + Utility.compare(one.key, two.key);
80                          if(result != 0) break;
81                      } while (false);
82                      return result;
83                  }
84              };
85          }     
86          public TypeReader<KeyValuePair> orderedReader(ArrayInput _input) {
87              return new ShreddedReader(_input);
88          }    
89  
90          public TypeReader<KeyValuePair> orderedReader(ArrayInput _input, int bufferSize) {
91              return new ShreddedReader(_input, bufferSize);
92          }    
93          public OrderedWriter<KeyValuePair> orderedWriter(ArrayOutput _output) {
94              ShreddedWriter w = new ShreddedWriter(_output);
95              return new OrderedWriterClass(w); 
96          }                                    
97          public static class OrderedWriterClass extends OrderedWriter< KeyValuePair > {
98              KeyValuePair last = null;
99              ShreddedWriter shreddedWriter = null; 
100             
101             public OrderedWriterClass(ShreddedWriter s) {
102                 this.shreddedWriter = s;
103             }
104             
105             public void process(KeyValuePair object) throws IOException {
106                boolean processAll = false;
107                if (processAll || last == null || 0 != Utility.compare(object.key, last.key)) { processAll = true; shreddedWriter.processKey(object.key); }
108                shreddedWriter.processTuple(object.value);
109                last = object;
110             }           
111                  
112             public void close() throws IOException {
113                 shreddedWriter.close();
114             }
115             
116             public Class<KeyValuePair> getInputClass() {
117                 return KeyValuePair.class;
118             }
119         } 
120         public ReaderSource<KeyValuePair> orderedCombiner(Collection<TypeReader<KeyValuePair>> readers, boolean closeOnExit) {
121             ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
122             
123             for (TypeReader<KeyValuePair> reader : readers) {
124                 shreddedReaders.add((ShreddedReader)reader);
125             }
126             
127             return new ShreddedCombiner(shreddedReaders, closeOnExit);
128         }                  
129         public KeyValuePair clone(KeyValuePair object) {
130             KeyValuePair result = new KeyValuePair();
131             if (object == null) return result;
132             result.key = object.key; 
133             result.value = object.value; 
134             return result;
135         }                 
136         public Class<KeyValuePair> getOrderedClass() {
137             return KeyValuePair.class;
138         }                           
139         public String[] getOrderSpec() {
140             return new String[] {"+key"};
141         }
142 
143         public static String getSpecString() {
144             return "+key";
145         }
146                            
147         public interface ShreddedProcessor extends Step {
148             public void processKey(byte[] key) throws IOException;
149             public void processTuple(byte[] value) throws IOException;
150             public void close() throws IOException;
151         }    
152         public interface ShreddedSource extends Step {
153         }                                              
154         
155         public static class ShreddedWriter implements ShreddedProcessor {
156             ArrayOutput output;
157             ShreddedBuffer buffer = new ShreddedBuffer();
158             byte[] lastKey;
159             boolean lastFlush = false;
160             
161             public ShreddedWriter(ArrayOutput output) {
162                 this.output = output;
163             }                        
164             
165             public void close() throws IOException {
166                 flush();
167             }
168             
169             public void processKey(byte[] key) {
170                 lastKey = key;
171                 buffer.processKey(key);
172             }
173             public final void processTuple(byte[] value) throws IOException {
174                 if (lastFlush) {
175                     if(buffer.keys.size() == 0) buffer.processKey(lastKey);
176                     lastFlush = false;
177                 }
178                 buffer.processTuple(value);
179                 if (buffer.isFull())
180                     flush();
181             }
182             public final void flushTuples(int pauseIndex) throws IOException {
183                 
184                 while (buffer.getReadIndex() < pauseIndex) {
185                            
186                     output.writeBytes(buffer.getValue());
187                     buffer.incrementTuple();
188                 }
189             }  
190             public final void flushKey(int pauseIndex) throws IOException {
191                 while (buffer.getReadIndex() < pauseIndex) {
192                     int nextPause = buffer.getKeyEndIndex();
193                     int count = nextPause - buffer.getReadIndex();
194                     
195                     output.writeBytes(buffer.getKey());
196                     output.writeInt(count);
197                     buffer.incrementKey();
198                       
199                     flushTuples(nextPause);
200                     assert nextPause == buffer.getReadIndex();
201                 }
202             }
203             public void flush() throws IOException { 
204                 flushKey(buffer.getWriteIndex());
205                 buffer.reset(); 
206                 lastFlush = true;
207             }                           
208         }
209         public static class ShreddedBuffer {
210             ArrayList<byte[]> keys = new ArrayList();
211             ArrayList<Integer> keyTupleIdx = new ArrayList();
212             int keyReadIdx = 0;
213                             
214             byte[][] values;
215             int writeTupleIndex = 0;
216             int readTupleIndex = 0;
217             int batchSize;
218 
219             public ShreddedBuffer(int batchSize) {
220                 this.batchSize = batchSize;
221 
222                 values = new byte[batchSize][];
223             }                              
224 
225             public ShreddedBuffer() {    
226                 this(10000);
227             }                                                                                                                    
228             
229             public void processKey(byte[] key) {
230                 keys.add(key);
231                 keyTupleIdx.add(writeTupleIndex);
232             }                                      
233             public void processTuple(byte[] value) {
234                 assert keys.size() > 0;
235                 values[writeTupleIndex] = value;
236                 writeTupleIndex++;
237             }
238             public void resetData() {
239                 keys.clear();
240                 keyTupleIdx.clear();
241                 writeTupleIndex = 0;
242             }                  
243                                  
244             public void resetRead() {
245                 readTupleIndex = 0;
246                 keyReadIdx = 0;
247             } 
248 
249             public void reset() {
250                 resetData();
251                 resetRead();
252             } 
253             public boolean isFull() {
254                 return writeTupleIndex >= batchSize;
255             }
256 
257             public boolean isEmpty() {
258                 return writeTupleIndex == 0;
259             }                          
260 
261             public boolean isAtEnd() {
262                 return readTupleIndex >= writeTupleIndex;
263             }           
264             public void incrementKey() {
265                 keyReadIdx++;  
266             }                                                                                              
267 
268             public void autoIncrementKey() {
269                 while (readTupleIndex >= getKeyEndIndex() && readTupleIndex < writeTupleIndex)
270                     keyReadIdx++;
271             }                 
272             public void incrementTuple() {
273                 readTupleIndex++;
274             }                    
275             public int getKeyEndIndex() {
276                 if ((keyReadIdx+1) >= keyTupleIdx.size())
277                     return writeTupleIndex;
278                 return keyTupleIdx.get(keyReadIdx+1);
279             }
280             public int getReadIndex() {
281                 return readTupleIndex;
282             }   
283 
284             public int getWriteIndex() {
285                 return writeTupleIndex;
286             } 
287             public byte[] getKey() {
288                 assert readTupleIndex < writeTupleIndex;
289                 assert keyReadIdx < keys.size();
290                 
291                 return keys.get(keyReadIdx);
292             }
293             public byte[] getValue() {
294                 assert readTupleIndex < writeTupleIndex;
295                 return values[readTupleIndex];
296             }                                         
297             public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
298                 while (getReadIndex() < endIndex) {
299                    output.processTuple(getValue());
300                    incrementTuple();
301                 }
302             }                                                                           
303             public void copyUntilIndexKey(int endIndex, ShreddedProcessor output) throws IOException {
304                 while (getReadIndex() < endIndex) {
305                     output.processKey(getKey());
306                     assert getKeyEndIndex() <= endIndex;
307                     copyTuples(getKeyEndIndex(), output);
308                     incrementKey();
309                 }
310             }  
311             public void copyUntilKey(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
312                 while (!isAtEnd()) {
313                     if (other != null) {   
314                         assert !other.isAtEnd();
315                         int c = + Utility.compare(getKey(), other.getKey());
316                     
317                         if (c > 0) {
318                             break;   
319                         }
320                         
321                         output.processKey(getKey());
322                                       
323                         copyTuples(getKeyEndIndex(), output);
324                     } else {
325                         output.processKey(getKey());
326                         copyTuples(getKeyEndIndex(), output);
327                     }
328                     incrementKey();  
329                     
330                
331                 }
332             }
333             public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
334                 copyUntilKey(other, output);
335             }
336             
337         }                         
338         public static class ShreddedCombiner implements ReaderSource<KeyValuePair>, ShreddedSource {   
339             public ShreddedProcessor processor;
340             Collection<ShreddedReader> readers;       
341             boolean closeOnExit = false;
342             boolean uninitialized = true;
343             PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
344             
345             public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
346                 this.readers = readers;                                                       
347                 this.closeOnExit = closeOnExit;
348             }
349                                   
350             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
351                 if (processor instanceof ShreddedProcessor) {
352                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
353                 } else if (processor instanceof KeyValuePair.Processor) {
354                     this.processor = new DuplicateEliminator(new TupleUnshredder((KeyValuePair.Processor) processor));
355                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
356                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<KeyValuePair>) processor));
357                 } else {
358                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
359                 }
360             }                                
361             
362             public Class<KeyValuePair> getOutputClass() {
363                 return KeyValuePair.class;
364             }
365             
366             public void initialize() throws IOException {
367                 for (ShreddedReader reader : readers) {
368                     reader.fill();                                        
369                     
370                     if (!reader.getBuffer().isAtEnd())
371                         queue.add(reader);
372                 }   
373 
374                 uninitialized = false;
375             }
376 
377             public void run() throws IOException {
378                 initialize();
379                
380                 while (queue.size() > 0) {
381                     ShreddedReader top = queue.poll();
382                     ShreddedReader next = null;
383                     ShreddedBuffer nextBuffer = null; 
384                     
385                     assert !top.getBuffer().isAtEnd();
386                                                   
387                     if (queue.size() > 0) {
388                         next = queue.peek();
389                         nextBuffer = next.getBuffer();
390                         assert !nextBuffer.isAtEnd();
391                     }
392                     
393                     top.getBuffer().copyUntil(nextBuffer, processor);
394                     if (top.getBuffer().isAtEnd())
395                         top.fill();                 
396                         
397                     if (!top.getBuffer().isAtEnd())
398                         queue.add(top);
399                 }              
400                 
401                 if (closeOnExit)
402                     processor.close();
403             }
404 
405             public KeyValuePair read() throws IOException {
406                 if (uninitialized)
407                     initialize();
408 
409                 KeyValuePair result = null;
410 
411                 while (queue.size() > 0) {
412                     ShreddedReader top = queue.poll();
413                     result = top.read();
414 
415                     if (result != null) {
416                         if (top.getBuffer().isAtEnd())
417                             top.fill();
418 
419                         queue.offer(top);
420                         break;
421                     } 
422                 }
423 
424                 return result;
425             }
426         } 
427         public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<KeyValuePair>, ShreddedSource {      
428             public ShreddedProcessor processor;
429             ShreddedBuffer buffer;
430             KeyValuePair last = new KeyValuePair();         
431             long updateKeyCount = -1;
432             long tupleCount = 0;
433             long bufferStartCount = 0;  
434             ArrayInput input;
435             
436             public ShreddedReader(ArrayInput input) {
437                 this.input = input; 
438                 this.buffer = new ShreddedBuffer();
439             }                               
440             
441             public ShreddedReader(ArrayInput input, int bufferSize) { 
442                 this.input = input;
443                 this.buffer = new ShreddedBuffer(bufferSize);
444             }
445                  
446             public final int compareTo(ShreddedReader other) {
447                 ShreddedBuffer otherBuffer = other.getBuffer();
448                 
449                 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
450                     return 0;                 
451                 } else if (buffer.isAtEnd()) {
452                     return -1;
453                 } else if (otherBuffer.isAtEnd()) {
454                     return 1;
455                 }
456                                    
457                 int result = 0;
458                 do {
459                     result = + Utility.compare(buffer.getKey(), otherBuffer.getKey());
460                     if(result != 0) break;
461                 } while (false);                                             
462                 
463                 return result;
464             }
465             
466             public final ShreddedBuffer getBuffer() {
467                 return buffer;
468             }                
469             
470             public final KeyValuePair read() throws IOException {
471                 if (buffer.isAtEnd()) {
472                     fill();             
473                 
474                     if (buffer.isAtEnd()) {
475                         return null;
476                     }
477                 }
478                       
479                 assert !buffer.isAtEnd();
480                 KeyValuePair result = new KeyValuePair();
481                 
482                 result.key = buffer.getKey();
483                 result.value = buffer.getValue();
484                 
485                 buffer.incrementTuple();
486                 buffer.autoIncrementKey();
487                 
488                 return result;
489             }           
490             
491             public final void fill() throws IOException {
492                 try {   
493                     buffer.reset();
494                     
495                     if (tupleCount != 0) {
496                                                       
497                         if(updateKeyCount - tupleCount > 0) {
498                             buffer.keys.add(last.key);
499                             buffer.keyTupleIdx.add((int) (updateKeyCount - tupleCount));
500                         }
501                         bufferStartCount = tupleCount;
502                     }
503                     
504                     while (!buffer.isFull()) {
505                         updateKey();
506                         buffer.processTuple(input.readBytes());
507                         tupleCount++;
508                     }
509                 } catch(EOFException e) {}
510             }
511 
512             public final void updateKey() throws IOException {
513                 if (updateKeyCount > tupleCount)
514                     return;
515                      
516                 last.key = input.readBytes();
517                 updateKeyCount = tupleCount + input.readInt();
518                                       
519                 buffer.processKey(last.key);
520             }
521 
522             public void run() throws IOException {
523                 while (true) {
524                     fill();
525                     
526                     if (buffer.isAtEnd())
527                         break;
528                     
529                     buffer.copyUntil(null, processor);
530                 }      
531                 processor.close();
532             }
533             
534             public void setProcessor(Step processor) throws IncompatibleProcessorException {  
535                 if (processor instanceof ShreddedProcessor) {
536                     this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
537                 } else if (processor instanceof KeyValuePair.Processor) {
538                     this.processor = new DuplicateEliminator(new TupleUnshredder((KeyValuePair.Processor) processor));
539                 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
540                     this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<KeyValuePair>) processor));
541                 } else {
542                     throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());                                                                       
543                 }
544             }                                
545             
546             public Class<KeyValuePair> getOutputClass() {
547                 return KeyValuePair.class;
548             }                
549         }
550         
551         public static class DuplicateEliminator implements ShreddedProcessor {
552             public ShreddedProcessor processor;
553             KeyValuePair last = new KeyValuePair();
554             boolean keyProcess = true;
555                                            
556             public DuplicateEliminator() {}
557             public DuplicateEliminator(ShreddedProcessor processor) {
558                 this.processor = processor;
559             }
560             
561             public void setShreddedProcessor(ShreddedProcessor processor) {
562                 this.processor = processor;
563             }
564 
565             public void processKey(byte[] key) throws IOException {  
566                 if (keyProcess || Utility.compare(key, last.key) != 0) {
567                     last.key = key;
568                     processor.processKey(key);
569                     keyProcess = false;
570                 }
571             }  
572             
573             public void resetKey() {
574                  keyProcess = true;
575             }                                                
576                                
577             public void processTuple(byte[] value) throws IOException {
578                 processor.processTuple(value);
579             } 
580             
581             public void close() throws IOException {
582                 processor.close();
583             }                    
584         }
585         public static class TupleUnshredder implements ShreddedProcessor {
586             KeyValuePair last = new KeyValuePair();
587             public org.galagosearch.tupleflow.Processor<KeyValuePair> processor;                               
588             
589             public TupleUnshredder(KeyValuePair.Processor processor) {
590                 this.processor = processor;
591             }         
592             
593             public TupleUnshredder(org.galagosearch.tupleflow.Processor<KeyValuePair> processor) {
594                 this.processor = processor;
595             }
596             
597             public KeyValuePair clone(KeyValuePair object) {
598                 KeyValuePair result = new KeyValuePair();
599                 if (object == null) return result;
600                 result.key = object.key; 
601                 result.value = object.value; 
602                 return result;
603             }                 
604             
605             public void processKey(byte[] key) throws IOException {
606                 last.key = key;
607             }   
608                 
609             
610             public void processTuple(byte[] value) throws IOException {
611                 last.value = value;
612                 processor.process(clone(last));
613             }               
614             
615             public void close() throws IOException {
616                 processor.close();
617             }
618         }     
619         public static class TupleShredder implements Processor {
620             KeyValuePair last = new KeyValuePair();
621             public ShreddedProcessor processor;
622             
623             public TupleShredder(ShreddedProcessor processor) {
624                 this.processor = processor;
625             }                              
626             
627             public KeyValuePair clone(KeyValuePair object) {
628                 KeyValuePair result = new KeyValuePair();
629                 if (object == null) return result;
630                 result.key = object.key; 
631                 result.value = object.value; 
632                 return result;
633             }                 
634             
635             public void process(KeyValuePair object) throws IOException {                                                                                                                                                   
636                 boolean processAll = false;
637                 if(last == null || Utility.compare(last.key, object.key) != 0 || processAll) { processor.processKey(object.key); processAll = true; }
638                 processor.processTuple(object.value);                                         
639             }
640                           
641             public Class<KeyValuePair> getInputClass() {
642                 return KeyValuePair.class;
643             }
644             
645             public void close() throws IOException {
646                 processor.close();
647             }                     
648         }
649     } 
650 }