View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   
3   package org.galagosearch.core.index;
4   
5   import java.io.ByteArrayInputStream;
6   import java.io.File;
7   import java.io.FileInputStream;
8   import java.io.FileOutputStream;
9   import java.io.IOException;
10  import java.io.InputStream;
11  import java.io.OutputStream;
12  import java.util.ArrayList;
13  import org.galagosearch.tupleflow.Utility;
14  
15  /***
16   * A BackedCompressedByteBuffer is like a CompressedByteBuffer,
17   * but it can overflow into disk storage if necessary.  Unlike a 
18   * CompressedByteBuffer, there's no getBytes() method since that 
19   * wouldn't makes sense if all the data is on disk.
20   *
21   * @author trevor
22   */
23  public class BackedCompressedByteBuffer {
24      ArrayList<File> segments;
25      CompressedByteBuffer buffer;
26      long diskLength;
27      long threshold;
28  
29      /*** Creates a new instance of BackedCompressedByteBuffer */
30      public BackedCompressedByteBuffer(long threshold) {
31          buffer = new CompressedByteBuffer();
32          segments = new ArrayList<File>();
33          this.threshold = threshold;
34      }
35  
36      public BackedCompressedByteBuffer() {
37          this(1024 * 1024);
38      }
39  
40      public void add(long value) throws IOException {
41          buffer.add(value);
42  
43          if (buffer.length() > threshold) {
44              flush();
45          }
46      }
47  
48      public void add(CompressedByteBuffer other) throws IOException {
49          if (other.length() > threshold) {
50              flush();
51              flushBuffer(other);
52          } else {
53              buffer.add(other);
54          }
55      }
56  
57      public void addFloat(float f) throws IOException {
58          buffer.addFloat(f);
59  
60          if (buffer.length() > threshold) {
61              flush();
62          }
63      }
64  
65      public void addRaw(int b) throws IOException {
66          buffer.addRaw(b);
67  
68          if (buffer.length() > threshold) {
69              flush();
70          }
71      }
72  
73      public void write(OutputStream stream) throws IOException {
74          for (File f : segments) {
75              Utility.copyFileToStream(f, stream);
76          }
77  
78          buffer.write(stream);
79      }
80  
81      public void flush() throws IOException {
82          flushBuffer(buffer);
83          buffer.clear();
84      }
85  
86      void flushBuffer(CompressedByteBuffer other) throws IOException {
87          File file = Utility.createTemporary();
88          FileOutputStream stream = new FileOutputStream(file);
89          other.write(stream);
90          stream.close();
91          diskLength += buffer.length();
92          segments.add(file);
93      }
94  
95      public long length() {
96          return diskLength + buffer.length();
97      }
98  
99      public void clear() {
100         for (File f : segments) {
101             f.delete();
102         }
103         segments.clear();
104         buffer.clear();
105         diskLength = 0;
106     }
107 
108     public BufferInputStream getInputStream() throws IOException {
109         return new BufferInputStream();
110     }
111 
112     public class BufferInputStream extends InputStream {
113         InputStream current = null;
114         int fileSegment = -1;
115 
116         BufferInputStream() throws IOException {
117             this.nextStream();
118         }
119 
120         private boolean nextStream() throws IOException {
121             if (fileSegment < segments.size() - 1) {
122                 if (current != null) {
123                     current.close();
124                 }
125                 fileSegment++;
126                 current = new FileInputStream(segments.get(fileSegment));
127                 return true;
128             } else if (fileSegment == segments.size() - 1) {
129                 current = new ByteArrayInputStream(buffer.getBytes(), 0, buffer.length());
130                 fileSegment++;
131                 return true;
132             } else {
133                 current = null;
134                 return false;
135             }
136         }
137 
138         @Override
139         public int available() throws IOException {
140             if (current == null) {
141                 return 0;
142             }
143             if (current instanceof ByteArrayInputStream) {
144                 return current.available();
145             }
146             long total = 0;
147             for (int i = fileSegment + 1; i < segments.size(); i++) {
148                 File f = segments.get(i);
149                 total += f.length();
150             }
151 
152             total += buffer.length();
153             total += current.available();
154 
155             if (total > Integer.MAX_VALUE) {
156                 return Integer.MAX_VALUE;
157             }
158             return (int) total;
159         }
160 
161         @Override
162         public void close() throws IOException {
163             if (current != null) {
164                 current.close();
165             }
166         }
167 
168         @Override
169         public int read(byte[] arg) throws IOException {
170             return read(arg, 0, arg.length);
171         }
172 
173         @Override
174         public int read(byte[] arg, int offset, int length) throws IOException {
175             if (current == null) {
176                 return -1;
177             }
178             int result = current.read(arg, offset, length);
179             int total = 0;
180 
181             while (total < length) {
182                 if (result >= 0) {
183                     total += result;
184                 }
185 
186                 if (nextStream() == false) {
187                     if (total > 0) {
188                         return total;
189                     } else {
190                         return result;
191                     }
192                 }
193 
194                 result = current.read(arg, offset + total, length - total);
195             }
196 
197             return total;
198         }
199 
200         public int read() throws IOException {
201             if (current == null) {
202                 return -1;
203             }
204             int result = current.read();
205 
206             while (result < 0 && nextStream()) {
207                 result = current.read();
208             }
209             return result;
210         }
211     }
212 }