View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   
3   package org.galagosearch.core.parse;
4   
5   import java.io.File;
6   import java.io.IOException;
7   import java.util.ArrayList;
8   import java.util.List;
9   import org.galagosearch.core.index.VocabularyReader;
10  import org.galagosearch.core.index.VocabularyReader.TermSlot;
11  import org.galagosearch.core.index.IndexReader;
12  import org.galagosearch.tupleflow.ExNihiloSource;
13  import org.galagosearch.tupleflow.FileSource;
14  import org.galagosearch.tupleflow.IncompatibleProcessorException;
15  import org.galagosearch.tupleflow.Linkage;
16  import org.galagosearch.tupleflow.OutputClass;
17  import org.galagosearch.tupleflow.Parameters.Value;
18  import org.galagosearch.tupleflow.Processor;
19  import org.galagosearch.tupleflow.Step;
20  import org.galagosearch.tupleflow.TupleFlowParameters;
21  import org.galagosearch.tupleflow.execution.ErrorHandler;
22  import org.galagosearch.tupleflow.execution.Verified;
23  import org.galagosearch.core.types.DocumentSplit;
24  
25  /***
26   * From a set of inputs, splits the input into many DocumentSplit records.
27   * This will usually be in a stage by itself at the beginning of a Galago pipeline.
28   * This is somewhat similar to FileSource, except that it can autodetect file formats.
29   * This splitter can detect ARC, TREC, TRECWEB and corpus files.
30   * 
31   * @author trevor
32   */
33  
34  @Verified
35  @OutputClass(className = "org.galagosearch.core.types.DocumentSplit")
36  public class DocumentSource implements ExNihiloSource<DocumentSplit> {
37      public Processor processor;
38      TupleFlowParameters parameters;
39      
40      public DocumentSource(TupleFlowParameters parameters) {
41          this.parameters = parameters;
42      }
43  
44      private String getExtension(String fileName) {
45          String[] fields = fileName.split("//.");
46          
47          // A filename needs to have a period to have an extension.
48          if (fields.length <= 1) {
49              return "";
50          }
51          
52          // If the last chunk of the filename is gz, we'll ignore it.
53          // The second-to-last bit is the type extension (but only if
54          // there are at least three parts to the name).
55          if (fields[fields.length-1].equals("gz")) {
56              if (fields.length > 2) {
57                  return fields[fields.length-2];
58              } else {
59                  return "";
60              }
61          }
62          
63          // No 'gz' extension, so just return the last part.
64          return fields[fields.length-1];
65      }
66  
67      private void processCorpusFile(String fileName, String fileType) throws IOException {
68          // If this is a big file, we'll split it into roughly 100MB pieces.
69          long fileLength = new File(fileName).length();
70          long chunkSize = 100 * 1024 * 1024;
71          
72          IndexReader reader = new IndexReader(fileName);
73          VocabularyReader vocabulary = reader.getVocabulary();
74          List<TermSlot> slots = vocabulary.getSlots();
75          int pieces = Math.max(1, (int) (fileLength / chunkSize));
76          ArrayList<byte[]> keys = new ArrayList<byte[]>();
77  
78          for (int i = 1; i < pieces; ++i) {
79              float fraction = (float) i / pieces;
80              int slot = (int) (fraction * slots.size());
81              keys.add(slots.get(slot).termData);
82          }
83  
84          for (int i = 0; i < pieces; ++i) {
85              byte[] firstKey = new byte[0];
86              byte[] lastKey = new byte[0];
87  
88              if (i > 0) {
89                  firstKey = keys.get(i - 1);
90              }
91              if (i < pieces - 1) {
92                  lastKey = keys.get(i);
93              }
94              DocumentSplit split = new DocumentSplit(fileName, fileType, false, firstKey, lastKey);
95              processor.process(split);
96          }
97      }
98      
99      private void processFile(String fileName) throws IOException {
100         // First, try to detect what kind of file this is:
101         boolean isCompressed = fileName.endsWith(".gz");
102         String fileType = null;
103         
104         // We'll try to detect by extension first, so we don't have to open the file
105         String extension = getExtension(fileName);
106         if (extension.equals("corpus") ||
107             extension.equals("trecweb") ||
108             extension.equals("trectext") ||
109             extension.equals("arc") ||
110             extension.equals("txt") ||
111             extension.equals("html") ||
112             extension.equals("xml")) {
113             fileType = extension;
114         } else {
115             // Oh well, we need to autodetect the file type.
116             if (IndexReader.isIndexFile(fileName)) {
117                 fileType = "corpus";
118             } else {
119                 // Eventually it'd be nice to do more format detection here.
120                 throw new IOException("Couldn't determine file type for: " + fileName);
121             }
122         }
123         
124         if (fileType.equals("corpus")) {
125             processCorpusFile(fileName, fileType);            
126         } else {
127             processSplit(fileName, fileType, isCompressed);
128         }
129     }
130     
131     private void processDirectory(File root) throws IOException {
132         for (File file : root.listFiles()) {
133             if (file.isHidden()) {
134                 continue;
135             }
136             if (file.isDirectory()) {
137                 processDirectory(file);
138             } else {
139                 processFile(file.getAbsolutePath());
140             }
141         }
142     }
143     
144     public void run() throws IOException {
145         if (parameters.getXML().containsKey("directory")) {
146             List<Value> directories = parameters.getXML().list("directory");
147 
148             for (Value directory : directories) {
149                 File directoryFile = new File(directory.toString());
150                 processDirectory(directoryFile);
151             }
152         } else if (parameters.getXML().containsKey("filename")) {
153             List<Value> files = parameters.getXML().list("filename");
154 
155             for (Value file : files) {
156                 processFile(file.toString());
157             }
158         }
159 
160         processor.close();
161     }
162 
163     public void setProcessor(Step processor) throws IncompatibleProcessorException {
164         Linkage.link(this, processor);
165     }
166 
167     public static void verify(TupleFlowParameters parameters, ErrorHandler handler) {
168         FileSource.verify(parameters, handler);
169     }
170 
171     private void processSplit(String fileName, String fileType, boolean isCompressed) throws IOException {
172         DocumentSplit split = new DocumentSplit(fileName, fileType, isCompressed, new byte[0], new byte[0]);
173         processor.process(split);
174     }
175 }