1
2
3 package org.galagosearch.tupleflow.types;
4
5 import org.galagosearch.tupleflow.Utility;
6 import org.galagosearch.tupleflow.ArrayInput;
7 import org.galagosearch.tupleflow.ArrayOutput;
8 import org.galagosearch.tupleflow.Order;
9 import org.galagosearch.tupleflow.OrderedWriter;
10 import org.galagosearch.tupleflow.Type;
11 import org.galagosearch.tupleflow.TypeReader;
12 import org.galagosearch.tupleflow.Step;
13 import org.galagosearch.tupleflow.IncompatibleProcessorException;
14 import org.galagosearch.tupleflow.ReaderSource;
15 import java.io.IOException;
16 import java.io.EOFException;
17 import java.io.UnsupportedEncodingException;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Comparator;
21 import java.util.PriorityQueue;
22 import java.util.Collection;
23
24 public class XMLFragment implements Type<XMLFragment> {
25 public String nodePath;
26 public String innerText;
27
28 public XMLFragment() {}
29 public XMLFragment(String nodePath, String innerText) {
30 this.nodePath = nodePath;
31 this.innerText = innerText;
32 }
33
34 public String toString() {
35 return String.format("%s,%s",
36 nodePath, innerText);
37 }
38
39 public Order<XMLFragment> getOrder(String... spec) {
40 if (Arrays.equals(spec, new String[] { "+nodePath" })) {
41 return new NodePathOrder();
42 }
43 return null;
44 }
45
46 public interface Processor extends Step, org.galagosearch.tupleflow.Processor<XMLFragment> {
47 public void process(XMLFragment object) throws IOException;
48 public void close() throws IOException;
49 }
50 public interface Source extends Step {
51 }
52 public static class NodePathOrder implements Order<XMLFragment> {
53 public int hash(XMLFragment object) {
54 int h = 0;
55 h += Utility.hash(object.nodePath);
56 return h;
57 }
58 public Comparator<XMLFragment> greaterThan() {
59 return new Comparator<XMLFragment>() {
60 public int compare(XMLFragment one, XMLFragment two) {
61 int result = 0;
62 do {
63 result = + Utility.compare(one.nodePath, two.nodePath);
64 if(result != 0) break;
65 } while (false);
66 return -result;
67 }
68 };
69 }
70 public Comparator<XMLFragment> lessThan() {
71 return new Comparator<XMLFragment>() {
72 public int compare(XMLFragment one, XMLFragment two) {
73 int result = 0;
74 do {
75 result = + Utility.compare(one.nodePath, two.nodePath);
76 if(result != 0) break;
77 } while (false);
78 return result;
79 }
80 };
81 }
82 public TypeReader<XMLFragment> orderedReader(ArrayInput _input) {
83 return new ShreddedReader(_input);
84 }
85
86 public TypeReader<XMLFragment> orderedReader(ArrayInput _input, int bufferSize) {
87 return new ShreddedReader(_input, bufferSize);
88 }
89 public OrderedWriter<XMLFragment> orderedWriter(ArrayOutput _output) {
90 ShreddedWriter w = new ShreddedWriter(_output);
91 return new OrderedWriterClass(w);
92 }
93 public static class OrderedWriterClass extends OrderedWriter< XMLFragment > {
94 XMLFragment last = null;
95 ShreddedWriter shreddedWriter = null;
96
97 public OrderedWriterClass(ShreddedWriter s) {
98 this.shreddedWriter = s;
99 }
100
101 public void process(XMLFragment object) throws IOException {
102 boolean processAll = false;
103 if (processAll || last == null || 0 != Utility.compare(object.nodePath, last.nodePath)) { processAll = true; shreddedWriter.processNodePath(object.nodePath); }
104 shreddedWriter.processTuple(object.innerText);
105 last = object;
106 }
107
108 public void close() throws IOException {
109 shreddedWriter.close();
110 }
111
112 public Class<XMLFragment> getInputClass() {
113 return XMLFragment.class;
114 }
115 }
116 public ReaderSource<XMLFragment> orderedCombiner(Collection<TypeReader<XMLFragment>> readers, boolean closeOnExit) {
117 ArrayList<ShreddedReader> shreddedReaders = new ArrayList();
118
119 for (TypeReader<XMLFragment> reader : readers) {
120 shreddedReaders.add((ShreddedReader)reader);
121 }
122
123 return new ShreddedCombiner(shreddedReaders, closeOnExit);
124 }
125 public XMLFragment clone(XMLFragment object) {
126 XMLFragment result = new XMLFragment();
127 if (object == null) return result;
128 result.nodePath = object.nodePath;
129 result.innerText = object.innerText;
130 return result;
131 }
132 public Class<XMLFragment> getOrderedClass() {
133 return XMLFragment.class;
134 }
135 public String[] getOrderSpec() {
136 return new String[] {"+nodePath"};
137 }
138
139 public static String getSpecString() {
140 return "+nodePath";
141 }
142
143 public interface ShreddedProcessor extends Step {
144 public void processNodePath(String nodePath) throws IOException;
145 public void processTuple(String innerText) throws IOException;
146 public void close() throws IOException;
147 }
148 public interface ShreddedSource extends Step {
149 }
150
151 public static class ShreddedWriter implements ShreddedProcessor {
152 ArrayOutput output;
153 ShreddedBuffer buffer = new ShreddedBuffer();
154 String lastNodePath;
155 boolean lastFlush = false;
156
157 public ShreddedWriter(ArrayOutput output) {
158 this.output = output;
159 }
160
161 public void close() throws IOException {
162 flush();
163 }
164
165 public void processNodePath(String nodePath) {
166 lastNodePath = nodePath;
167 buffer.processNodePath(nodePath);
168 }
169 public final void processTuple(String innerText) throws IOException {
170 if (lastFlush) {
171 if(buffer.nodePaths.size() == 0) buffer.processNodePath(lastNodePath);
172 lastFlush = false;
173 }
174 buffer.processTuple(innerText);
175 if (buffer.isFull())
176 flush();
177 }
178 public final void flushTuples(int pauseIndex) throws IOException {
179
180 while (buffer.getReadIndex() < pauseIndex) {
181
182 output.writeString(buffer.getInnerText());
183 buffer.incrementTuple();
184 }
185 }
186 public final void flushNodePath(int pauseIndex) throws IOException {
187 while (buffer.getReadIndex() < pauseIndex) {
188 int nextPause = buffer.getNodePathEndIndex();
189 int count = nextPause - buffer.getReadIndex();
190
191 output.writeString(buffer.getNodePath());
192 output.writeInt(count);
193 buffer.incrementNodePath();
194
195 flushTuples(nextPause);
196 assert nextPause == buffer.getReadIndex();
197 }
198 }
199 public void flush() throws IOException {
200 flushNodePath(buffer.getWriteIndex());
201 buffer.reset();
202 lastFlush = true;
203 }
204 }
205 public static class ShreddedBuffer {
206 ArrayList<String> nodePaths = new ArrayList();
207 ArrayList<Integer> nodePathTupleIdx = new ArrayList();
208 int nodePathReadIdx = 0;
209
210 String[] innerTexts;
211 int writeTupleIndex = 0;
212 int readTupleIndex = 0;
213 int batchSize;
214
215 public ShreddedBuffer(int batchSize) {
216 this.batchSize = batchSize;
217
218 innerTexts = new String[batchSize];
219 }
220
221 public ShreddedBuffer() {
222 this(10000);
223 }
224
225 public void processNodePath(String nodePath) {
226 nodePaths.add(nodePath);
227 nodePathTupleIdx.add(writeTupleIndex);
228 }
229 public void processTuple(String innerText) {
230 assert nodePaths.size() > 0;
231 innerTexts[writeTupleIndex] = innerText;
232 writeTupleIndex++;
233 }
234 public void resetData() {
235 nodePaths.clear();
236 nodePathTupleIdx.clear();
237 writeTupleIndex = 0;
238 }
239
240 public void resetRead() {
241 readTupleIndex = 0;
242 nodePathReadIdx = 0;
243 }
244
245 public void reset() {
246 resetData();
247 resetRead();
248 }
249 public boolean isFull() {
250 return writeTupleIndex >= batchSize;
251 }
252
253 public boolean isEmpty() {
254 return writeTupleIndex == 0;
255 }
256
257 public boolean isAtEnd() {
258 return readTupleIndex >= writeTupleIndex;
259 }
260 public void incrementNodePath() {
261 nodePathReadIdx++;
262 }
263
264 public void autoIncrementNodePath() {
265 while (readTupleIndex >= getNodePathEndIndex() && readTupleIndex < writeTupleIndex)
266 nodePathReadIdx++;
267 }
268 public void incrementTuple() {
269 readTupleIndex++;
270 }
271 public int getNodePathEndIndex() {
272 if ((nodePathReadIdx+1) >= nodePathTupleIdx.size())
273 return writeTupleIndex;
274 return nodePathTupleIdx.get(nodePathReadIdx+1);
275 }
276 public int getReadIndex() {
277 return readTupleIndex;
278 }
279
280 public int getWriteIndex() {
281 return writeTupleIndex;
282 }
283 public String getNodePath() {
284 assert readTupleIndex < writeTupleIndex;
285 assert nodePathReadIdx < nodePaths.size();
286
287 return nodePaths.get(nodePathReadIdx);
288 }
289 public String getInnerText() {
290 assert readTupleIndex < writeTupleIndex;
291 return innerTexts[readTupleIndex];
292 }
293 public void copyTuples(int endIndex, ShreddedProcessor output) throws IOException {
294 while (getReadIndex() < endIndex) {
295 output.processTuple(getInnerText());
296 incrementTuple();
297 }
298 }
299 public void copyUntilIndexNodePath(int endIndex, ShreddedProcessor output) throws IOException {
300 while (getReadIndex() < endIndex) {
301 output.processNodePath(getNodePath());
302 assert getNodePathEndIndex() <= endIndex;
303 copyTuples(getNodePathEndIndex(), output);
304 incrementNodePath();
305 }
306 }
307 public void copyUntilNodePath(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
308 while (!isAtEnd()) {
309 if (other != null) {
310 assert !other.isAtEnd();
311 int c = + Utility.compare(getNodePath(), other.getNodePath());
312
313 if (c > 0) {
314 break;
315 }
316
317 output.processNodePath(getNodePath());
318
319 copyTuples(getNodePathEndIndex(), output);
320 } else {
321 output.processNodePath(getNodePath());
322 copyTuples(getNodePathEndIndex(), output);
323 }
324 incrementNodePath();
325
326
327 }
328 }
329 public void copyUntil(ShreddedBuffer other, ShreddedProcessor output) throws IOException {
330 copyUntilNodePath(other, output);
331 }
332
333 }
334 public static class ShreddedCombiner implements ReaderSource<XMLFragment>, ShreddedSource {
335 public ShreddedProcessor processor;
336 Collection<ShreddedReader> readers;
337 boolean closeOnExit = false;
338 boolean uninitialized = true;
339 PriorityQueue<ShreddedReader> queue = new PriorityQueue<ShreddedReader>();
340
341 public ShreddedCombiner(Collection<ShreddedReader> readers, boolean closeOnExit) {
342 this.readers = readers;
343 this.closeOnExit = closeOnExit;
344 }
345
346 public void setProcessor(Step processor) throws IncompatibleProcessorException {
347 if (processor instanceof ShreddedProcessor) {
348 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
349 } else if (processor instanceof XMLFragment.Processor) {
350 this.processor = new DuplicateEliminator(new TupleUnshredder((XMLFragment.Processor) processor));
351 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
352 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<XMLFragment>) processor));
353 } else {
354 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
355 }
356 }
357
358 public Class<XMLFragment> getOutputClass() {
359 return XMLFragment.class;
360 }
361
362 public void initialize() throws IOException {
363 for (ShreddedReader reader : readers) {
364 reader.fill();
365
366 if (!reader.getBuffer().isAtEnd())
367 queue.add(reader);
368 }
369
370 uninitialized = false;
371 }
372
373 public void run() throws IOException {
374 initialize();
375
376 while (queue.size() > 0) {
377 ShreddedReader top = queue.poll();
378 ShreddedReader next = null;
379 ShreddedBuffer nextBuffer = null;
380
381 assert !top.getBuffer().isAtEnd();
382
383 if (queue.size() > 0) {
384 next = queue.peek();
385 nextBuffer = next.getBuffer();
386 assert !nextBuffer.isAtEnd();
387 }
388
389 top.getBuffer().copyUntil(nextBuffer, processor);
390 if (top.getBuffer().isAtEnd())
391 top.fill();
392
393 if (!top.getBuffer().isAtEnd())
394 queue.add(top);
395 }
396
397 if (closeOnExit)
398 processor.close();
399 }
400
401 public XMLFragment read() throws IOException {
402 if (uninitialized)
403 initialize();
404
405 XMLFragment result = null;
406
407 while (queue.size() > 0) {
408 ShreddedReader top = queue.poll();
409 result = top.read();
410
411 if (result != null) {
412 if (top.getBuffer().isAtEnd())
413 top.fill();
414
415 queue.offer(top);
416 break;
417 }
418 }
419
420 return result;
421 }
422 }
423 public static class ShreddedReader implements Step, Comparable<ShreddedReader>, TypeReader<XMLFragment>, ShreddedSource {
424 public ShreddedProcessor processor;
425 ShreddedBuffer buffer;
426 XMLFragment last = new XMLFragment();
427 long updateNodePathCount = -1;
428 long tupleCount = 0;
429 long bufferStartCount = 0;
430 ArrayInput input;
431
432 public ShreddedReader(ArrayInput input) {
433 this.input = input;
434 this.buffer = new ShreddedBuffer();
435 }
436
437 public ShreddedReader(ArrayInput input, int bufferSize) {
438 this.input = input;
439 this.buffer = new ShreddedBuffer(bufferSize);
440 }
441
442 public final int compareTo(ShreddedReader other) {
443 ShreddedBuffer otherBuffer = other.getBuffer();
444
445 if (buffer.isAtEnd() && otherBuffer.isAtEnd()) {
446 return 0;
447 } else if (buffer.isAtEnd()) {
448 return -1;
449 } else if (otherBuffer.isAtEnd()) {
450 return 1;
451 }
452
453 int result = 0;
454 do {
455 result = + Utility.compare(buffer.getNodePath(), otherBuffer.getNodePath());
456 if(result != 0) break;
457 } while (false);
458
459 return result;
460 }
461
462 public final ShreddedBuffer getBuffer() {
463 return buffer;
464 }
465
466 public final XMLFragment read() throws IOException {
467 if (buffer.isAtEnd()) {
468 fill();
469
470 if (buffer.isAtEnd()) {
471 return null;
472 }
473 }
474
475 assert !buffer.isAtEnd();
476 XMLFragment result = new XMLFragment();
477
478 result.nodePath = buffer.getNodePath();
479 result.innerText = buffer.getInnerText();
480
481 buffer.incrementTuple();
482 buffer.autoIncrementNodePath();
483
484 return result;
485 }
486
487 public final void fill() throws IOException {
488 try {
489 buffer.reset();
490
491 if (tupleCount != 0) {
492
493 if(updateNodePathCount - tupleCount > 0) {
494 buffer.nodePaths.add(last.nodePath);
495 buffer.nodePathTupleIdx.add((int) (updateNodePathCount - tupleCount));
496 }
497 bufferStartCount = tupleCount;
498 }
499
500 while (!buffer.isFull()) {
501 updateNodePath();
502 buffer.processTuple(input.readString());
503 tupleCount++;
504 }
505 } catch(EOFException e) {}
506 }
507
508 public final void updateNodePath() throws IOException {
509 if (updateNodePathCount > tupleCount)
510 return;
511
512 last.nodePath = input.readString();
513 updateNodePathCount = tupleCount + input.readInt();
514
515 buffer.processNodePath(last.nodePath);
516 }
517
518 public void run() throws IOException {
519 while (true) {
520 fill();
521
522 if (buffer.isAtEnd())
523 break;
524
525 buffer.copyUntil(null, processor);
526 }
527 processor.close();
528 }
529
530 public void setProcessor(Step processor) throws IncompatibleProcessorException {
531 if (processor instanceof ShreddedProcessor) {
532 this.processor = new DuplicateEliminator((ShreddedProcessor) processor);
533 } else if (processor instanceof XMLFragment.Processor) {
534 this.processor = new DuplicateEliminator(new TupleUnshredder((XMLFragment.Processor) processor));
535 } else if (processor instanceof org.galagosearch.tupleflow.Processor) {
536 this.processor = new DuplicateEliminator(new TupleUnshredder((org.galagosearch.tupleflow.Processor<XMLFragment>) processor));
537 } else {
538 throw new IncompatibleProcessorException(processor.getClass().getName() + " is not supported by " + this.getClass().getName());
539 }
540 }
541
542 public Class<XMLFragment> getOutputClass() {
543 return XMLFragment.class;
544 }
545 }
546
547 public static class DuplicateEliminator implements ShreddedProcessor {
548 public ShreddedProcessor processor;
549 XMLFragment last = new XMLFragment();
550 boolean nodePathProcess = true;
551
552 public DuplicateEliminator() {}
553 public DuplicateEliminator(ShreddedProcessor processor) {
554 this.processor = processor;
555 }
556
557 public void setShreddedProcessor(ShreddedProcessor processor) {
558 this.processor = processor;
559 }
560
561 public void processNodePath(String nodePath) throws IOException {
562 if (nodePathProcess || Utility.compare(nodePath, last.nodePath) != 0) {
563 last.nodePath = nodePath;
564 processor.processNodePath(nodePath);
565 nodePathProcess = false;
566 }
567 }
568
569 public void resetNodePath() {
570 nodePathProcess = true;
571 }
572
573 public void processTuple(String innerText) throws IOException {
574 processor.processTuple(innerText);
575 }
576
577 public void close() throws IOException {
578 processor.close();
579 }
580 }
581 public static class TupleUnshredder implements ShreddedProcessor {
582 XMLFragment last = new XMLFragment();
583 public org.galagosearch.tupleflow.Processor<XMLFragment> processor;
584
585 public TupleUnshredder(XMLFragment.Processor processor) {
586 this.processor = processor;
587 }
588
589 public TupleUnshredder(org.galagosearch.tupleflow.Processor<XMLFragment> processor) {
590 this.processor = processor;
591 }
592
593 public XMLFragment clone(XMLFragment object) {
594 XMLFragment result = new XMLFragment();
595 if (object == null) return result;
596 result.nodePath = object.nodePath;
597 result.innerText = object.innerText;
598 return result;
599 }
600
601 public void processNodePath(String nodePath) throws IOException {
602 last.nodePath = nodePath;
603 }
604
605
606 public void processTuple(String innerText) throws IOException {
607 last.innerText = innerText;
608 processor.process(clone(last));
609 }
610
611 public void close() throws IOException {
612 processor.close();
613 }
614 }
615 public static class TupleShredder implements Processor {
616 XMLFragment last = new XMLFragment();
617 public ShreddedProcessor processor;
618
619 public TupleShredder(ShreddedProcessor processor) {
620 this.processor = processor;
621 }
622
623 public XMLFragment clone(XMLFragment object) {
624 XMLFragment result = new XMLFragment();
625 if (object == null) return result;
626 result.nodePath = object.nodePath;
627 result.innerText = object.innerText;
628 return result;
629 }
630
631 public void process(XMLFragment object) throws IOException {
632 boolean processAll = false;
633 if(last == null || Utility.compare(last.nodePath, object.nodePath) != 0 || processAll) { processor.processNodePath(object.nodePath); processAll = true; }
634 processor.processTuple(object.innerText);
635 }
636
637 public Class<XMLFragment> getInputClass() {
638 return XMLFragment.class;
639 }
640
641 public void close() throws IOException {
642 processor.close();
643 }
644 }
645 }
646 }