1
2
3 package org.galagosearch.core.index;
4
5 import java.io.ByteArrayInputStream;
6 import java.io.File;
7 import java.io.FileInputStream;
8 import java.io.FileOutputStream;
9 import java.io.IOException;
10 import java.io.InputStream;
11 import java.io.OutputStream;
12 import java.util.ArrayList;
13 import org.galagosearch.tupleflow.Utility;
14
15 /***
16 * A BackedCompressedByteBuffer is like a CompressedByteBuffer,
17 * but it can overflow into disk storage if necessary. Unlike a
18 * CompressedByteBuffer, there's no getBytes() method since that
19 * wouldn't makes sense if all the data is on disk.
20 *
21 * @author trevor
22 */
23 public class BackedCompressedByteBuffer {
24 ArrayList<File> segments;
25 CompressedByteBuffer buffer;
26 long diskLength;
27 long threshold;
28
29 /*** Creates a new instance of BackedCompressedByteBuffer */
30 public BackedCompressedByteBuffer(long threshold) {
31 buffer = new CompressedByteBuffer();
32 segments = new ArrayList<File>();
33 this.threshold = threshold;
34 }
35
36 public BackedCompressedByteBuffer() {
37 this(1024 * 1024);
38 }
39
40 public void add(long value) throws IOException {
41 buffer.add(value);
42
43 if (buffer.length() > threshold) {
44 flush();
45 }
46 }
47
48 public void add(CompressedByteBuffer other) throws IOException {
49 if (other.length() > threshold) {
50 flush();
51 flushBuffer(other);
52 } else {
53 buffer.add(other);
54 }
55 }
56
57 public void addFloat(float f) throws IOException {
58 buffer.addFloat(f);
59
60 if (buffer.length() > threshold) {
61 flush();
62 }
63 }
64
65 public void addRaw(int b) throws IOException {
66 buffer.addRaw(b);
67
68 if (buffer.length() > threshold) {
69 flush();
70 }
71 }
72
73 public void write(OutputStream stream) throws IOException {
74 for (File f : segments) {
75 Utility.copyFileToStream(f, stream);
76 }
77
78 buffer.write(stream);
79 }
80
81 public void flush() throws IOException {
82 flushBuffer(buffer);
83 buffer.clear();
84 }
85
86 void flushBuffer(CompressedByteBuffer other) throws IOException {
87 File file = Utility.createTemporary();
88 FileOutputStream stream = new FileOutputStream(file);
89 other.write(stream);
90 stream.close();
91 diskLength += buffer.length();
92 segments.add(file);
93 }
94
95 public long length() {
96 return diskLength + buffer.length();
97 }
98
99 public void clear() {
100 for (File f : segments) {
101 f.delete();
102 }
103 segments.clear();
104 buffer.clear();
105 diskLength = 0;
106 }
107
108 public BufferInputStream getInputStream() throws IOException {
109 return new BufferInputStream();
110 }
111
112 public class BufferInputStream extends InputStream {
113 InputStream current = null;
114 int fileSegment = -1;
115
116 BufferInputStream() throws IOException {
117 this.nextStream();
118 }
119
120 private boolean nextStream() throws IOException {
121 if (fileSegment < segments.size() - 1) {
122 if (current != null) {
123 current.close();
124 }
125 fileSegment++;
126 current = new FileInputStream(segments.get(fileSegment));
127 return true;
128 } else if (fileSegment == segments.size() - 1) {
129 current = new ByteArrayInputStream(buffer.getBytes(), 0, buffer.length());
130 fileSegment++;
131 return true;
132 } else {
133 current = null;
134 return false;
135 }
136 }
137
138 @Override
139 public int available() throws IOException {
140 if (current == null) {
141 return 0;
142 }
143 if (current instanceof ByteArrayInputStream) {
144 return current.available();
145 }
146 long total = 0;
147 for (int i = fileSegment + 1; i < segments.size(); i++) {
148 File f = segments.get(i);
149 total += f.length();
150 }
151
152 total += buffer.length();
153 total += current.available();
154
155 if (total > Integer.MAX_VALUE) {
156 return Integer.MAX_VALUE;
157 }
158 return (int) total;
159 }
160
161 @Override
162 public void close() throws IOException {
163 if (current != null) {
164 current.close();
165 }
166 }
167
168 @Override
169 public int read(byte[] arg) throws IOException {
170 return read(arg, 0, arg.length);
171 }
172
173 @Override
174 public int read(byte[] arg, int offset, int length) throws IOException {
175 if (current == null) {
176 return -1;
177 }
178 int result = current.read(arg, offset, length);
179 int total = 0;
180
181 while (total < length) {
182 if (result >= 0) {
183 total += result;
184 }
185
186 if (nextStream() == false) {
187 if (total > 0) {
188 return total;
189 } else {
190 return result;
191 }
192 }
193
194 result = current.read(arg, offset + total, length - total);
195 }
196
197 return total;
198 }
199
200 public int read() throws IOException {
201 if (current == null) {
202 return -1;
203 }
204 int result = current.read();
205
206 while (result < 0 && nextStream()) {
207 result = current.read();
208 }
209 return result;
210 }
211 }
212 }