1
2
3 package org.galagosearch.core.parse;
4
5 import java.io.File;
6 import java.io.IOException;
7 import java.util.ArrayList;
8 import java.util.List;
9 import org.galagosearch.core.index.VocabularyReader;
10 import org.galagosearch.core.index.VocabularyReader.TermSlot;
11 import org.galagosearch.core.index.IndexReader;
12 import org.galagosearch.tupleflow.ExNihiloSource;
13 import org.galagosearch.tupleflow.FileSource;
14 import org.galagosearch.tupleflow.IncompatibleProcessorException;
15 import org.galagosearch.tupleflow.Linkage;
16 import org.galagosearch.tupleflow.OutputClass;
17 import org.galagosearch.tupleflow.Parameters.Value;
18 import org.galagosearch.tupleflow.Processor;
19 import org.galagosearch.tupleflow.Step;
20 import org.galagosearch.tupleflow.TupleFlowParameters;
21 import org.galagosearch.tupleflow.execution.ErrorHandler;
22 import org.galagosearch.tupleflow.execution.Verified;
23 import org.galagosearch.core.types.DocumentSplit;
24
25 /***
26 * From a set of inputs, splits the input into many DocumentSplit records.
27 * This will usually be in a stage by itself at the beginning of a Galago pipeline.
28 * This is somewhat similar to FileSource, except that it can autodetect file formats.
29 * This splitter can detect ARC, TREC, TRECWEB and corpus files.
30 *
31 * @author trevor
32 */
33
34 @Verified
35 @OutputClass(className = "org.galagosearch.core.types.DocumentSplit")
36 public class DocumentSource implements ExNihiloSource<DocumentSplit> {
37 public Processor processor;
38 TupleFlowParameters parameters;
39
40 public DocumentSource(TupleFlowParameters parameters) {
41 this.parameters = parameters;
42 }
43
44 private String getExtension(String fileName) {
45 String[] fields = fileName.split("//.");
46
47
48 if (fields.length <= 1) {
49 return "";
50 }
51
52
53
54
55 if (fields[fields.length-1].equals("gz")) {
56 if (fields.length > 2) {
57 return fields[fields.length-2];
58 } else {
59 return "";
60 }
61 }
62
63
64 return fields[fields.length-1];
65 }
66
67 private void processCorpusFile(String fileName, String fileType) throws IOException {
68
69 long fileLength = new File(fileName).length();
70 long chunkSize = 100 * 1024 * 1024;
71
72 IndexReader reader = new IndexReader(fileName);
73 VocabularyReader vocabulary = reader.getVocabulary();
74 List<TermSlot> slots = vocabulary.getSlots();
75 int pieces = Math.max(1, (int) (fileLength / chunkSize));
76 ArrayList<byte[]> keys = new ArrayList<byte[]>();
77
78 for (int i = 1; i < pieces; ++i) {
79 float fraction = (float) i / pieces;
80 int slot = (int) (fraction * slots.size());
81 keys.add(slots.get(slot).termData);
82 }
83
84 for (int i = 0; i < pieces; ++i) {
85 byte[] firstKey = new byte[0];
86 byte[] lastKey = new byte[0];
87
88 if (i > 0) {
89 firstKey = keys.get(i - 1);
90 }
91 if (i < pieces - 1) {
92 lastKey = keys.get(i);
93 }
94 DocumentSplit split = new DocumentSplit(fileName, fileType, false, firstKey, lastKey);
95 processor.process(split);
96 }
97 }
98
99 private void processFile(String fileName) throws IOException {
100
101 boolean isCompressed = fileName.endsWith(".gz");
102 String fileType = null;
103
104
105 String extension = getExtension(fileName);
106 if (extension.equals("corpus") ||
107 extension.equals("trecweb") ||
108 extension.equals("trectext") ||
109 extension.equals("arc") ||
110 extension.equals("txt") ||
111 extension.equals("html") ||
112 extension.equals("xml")) {
113 fileType = extension;
114 } else {
115
116 if (IndexReader.isIndexFile(fileName)) {
117 fileType = "corpus";
118 } else {
119
120 throw new IOException("Couldn't determine file type for: " + fileName);
121 }
122 }
123
124 if (fileType.equals("corpus")) {
125 processCorpusFile(fileName, fileType);
126 } else {
127 processSplit(fileName, fileType, isCompressed);
128 }
129 }
130
131 private void processDirectory(File root) throws IOException {
132 for (File file : root.listFiles()) {
133 if (file.isHidden()) {
134 continue;
135 }
136 if (file.isDirectory()) {
137 processDirectory(file);
138 } else {
139 processFile(file.getAbsolutePath());
140 }
141 }
142 }
143
144 public void run() throws IOException {
145 if (parameters.getXML().containsKey("directory")) {
146 List<Value> directories = parameters.getXML().list("directory");
147
148 for (Value directory : directories) {
149 File directoryFile = new File(directory.toString());
150 processDirectory(directoryFile);
151 }
152 } else if (parameters.getXML().containsKey("filename")) {
153 List<Value> files = parameters.getXML().list("filename");
154
155 for (Value file : files) {
156 processFile(file.toString());
157 }
158 }
159
160 processor.close();
161 }
162
163 public void setProcessor(Step processor) throws IncompatibleProcessorException {
164 Linkage.link(this, processor);
165 }
166
167 public static void verify(TupleFlowParameters parameters, ErrorHandler handler) {
168 FileSource.verify(parameters, handler);
169 }
170
171 private void processSplit(String fileName, String fileType, boolean isCompressed) throws IOException {
172 DocumentSplit split = new DocumentSplit(fileName, fileType, isCompressed, new byte[0], new byte[0]);
173 processor.process(split);
174 }
175 }