You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

387 lines
12 KiB

  1. /*
  2. This file is part of BioD.
  3. Copyright (C) 2012 Artem Tarasov <lomereiter@gmail.com>
  4. Permission is hereby granted, free of charge, to any person obtaining a
  5. copy of this software and associated documentation files (the "Software"),
  6. to deal in the Software without restriction, including without limitation
  7. the rights to use, copy, modify, merge, publish, distribute, sublicense,
  8. and/or sell copies of the Software, and to permit persons to whom the
  9. Software is furnished to do so, subject to the following conditions:
  10. The above copyright notice and this permission notice shall be included in
  11. all copies or substantial portions of the Software.
  12. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  13. IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  15. AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  17. FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
  18. DEALINGS IN THE SOFTWARE.
  19. */
  20. module bio.core.bgzf.inputstream;
  21. import bio.core.bgzf.block;
  22. import bio.core.bgzf.blockrange;
  23. import bio.core.bgzf.virtualoffset;
  24. import std.stream;
  25. import std.range;
  26. import std.array;
  27. import std.traits;
  28. import std.algorithm;
  29. import std.parallelism;
  30. /// Interface on top of Stream, which glues decompressed blocks
  31. /// together and provides virtualTell() method for determining
  32. /// virtual offset in the stream.
  33. ///
  34. /// NOTE: the class has no virtualSeek method. The reason is
  35. /// that an implementing class should have no direct access
  36. /// to any of underlying streams, (which provide chunks
  37. /// and their start virtual offsets), the reason being
  38. /// the single responsibility principle.
  39. ///
  40. /// If you need to do "virtualSeek", seek to coffset
  41. /// in stream providing compressed blocks, create new
  42. /// stream for decompressing, and skip uoffset bytes
  43. /// in the decompressed stream. In general, you probably
  44. /// will want to use different decompression function
  45. /// for each use case. (For serial iteration - without any
  46. /// caching, for random access - with caching; and maybe
  47. /// you'll want several types of caching.)
  48. ///
  49. abstract class IChunkInputStream : Stream {
  50. /// Returns: current virtual offset
  51. ///
  52. /// (For details about what virtual offset is,
  53. /// see bgzfrange module documentation.)
  54. ///
  55. abstract VirtualOffset virtualTell();
  56. /// Read a slice from chunk stream.
  57. abstract ubyte[] readSlice(size_t n);
  58. /// Average compression so far
  59. abstract float average_compression_ratio() @property const;
  60. /// Size of underlying BAM file (0 if not available).
  61. abstract size_t compressed_file_size() @property const;
  62. }
  63. /// Extends DecompressedBgzfBlock with skip_start and skip_end members.
  64. struct AugmentedDecompressedBgzfBlock {
  65. DecompressedBgzfBlock block;
  66. alias block this;
  67. ushort skip_start; /// how many bytes to skip from start
  68. ushort skip_end; /// how many bytes to skip from end
  69. }
  70. /// Convenience function for making decompressed blocks compatible with ChunkInputStream.
  71. AugmentedDecompressedBgzfBlock makeAugmentedBlock(DecompressedBgzfBlock block) {
  72. return AugmentedDecompressedBgzfBlock(block, 0, 0);
  73. }
  74. /**
  75. Class for turning range of AugmentedDecompressedBgzfBlock objects
  76. into a proper InputStream
  77. NOTE: an assumption is made that calling empty() on range is cheap.
  78. However, front() gets called only once for each element. That fits
  79. the philosophy of std.algorithm.map.
  80. */
  81. final class ChunkInputStream(ChunkRange) : IChunkInputStream
  82. {
  83. static assert(is(ElementType!ChunkRange == AugmentedDecompressedBgzfBlock));
  84. /// Construct class instance from range of chunks.
  85. this(ChunkRange range, size_t compressed_file_size=0)
  86. {
  87. _range = range;
  88. readable = true;
  89. writeable = false;
  90. seekable = false;
  91. if (_range.empty) {
  92. setEOF();
  93. }
  94. _its_time_to_get_next_chunk = true; // defer getting first chunk
  95. _compressed_file_size = compressed_file_size;
  96. }
  97. /// Read a slice from chunk stream.
  98. ///
  99. /// Behaviour: when current chunk contains >= n bytes,
  100. /// a chunk slice is returned.
  101. /// Otherwise, memory copying occurs.
  102. override ubyte[] readSlice(size_t n) {
  103. if (_range.empty()) {
  104. _start_offset = _end_offset;
  105. _cur = 0;
  106. setEOF();
  107. return null;
  108. }
  109. if (_its_time_to_get_next_chunk) {
  110. setupStream();
  111. }
  112. if (_cur + n < _len) {
  113. // can return a slice,
  114. // remaining in the current chunk
  115. auto slice = _buf[_cur .. _cur + n];
  116. _cur += n;
  117. return slice;
  118. } else if (_cur + n == _len) {
  119. // end of current chunk
  120. auto slice = _buf[_cur .. _len];
  121. _range.popFront();
  122. _its_time_to_get_next_chunk = true;
  123. return slice;
  124. } else {
  125. // this branch will be executed rarely
  126. // (wish the compiler could understand that...)
  127. auto slice = uninitializedArray!(ubyte[])(n);
  128. readExact(slice.ptr, n);
  129. return slice;
  130. }
  131. }
  132. override size_t readBlock(void* buffer, size_t size) {
  133. if (_range.empty()) {
  134. // no more chunks
  135. _start_offset = _end_offset;
  136. _cur = 0;
  137. setEOF();
  138. return 0;
  139. }
  140. if (_its_time_to_get_next_chunk) {
  141. setupStream();
  142. }
  143. size_t read = readBlockHelper(buffer, size);
  144. if (read < size) {
  145. _range.popFront(); // get next chunk
  146. setupStream();
  147. }
  148. if (read == 0) { // try with next chunk
  149. read = readBlockHelper(buffer, size);
  150. }
  151. if (_cur == _len) { // end of current chunk
  152. _range.popFront();
  153. _its_time_to_get_next_chunk = true;
  154. }
  155. return read;
  156. }
  157. override size_t writeBlock(const void* buffer, size_t size) {
  158. throw new WriteException("Stream is not writeable");
  159. }
  160. override ulong seek(long offset, SeekPos whence) {
  161. throw new SeekException("Stream is not seekable");
  162. }
  163. /// Returns: current virtual offset
  164. override VirtualOffset virtualTell() {
  165. assert(_cur < (1<<16));
  166. if (_its_time_to_get_next_chunk) {
  167. setupStream();
  168. }
  169. return VirtualOffset(_start_offset, cast(ushort)_cur);
  170. }
  171. override float average_compression_ratio() @property const {
  172. if (_total_compressed == 0) return 0.0;
  173. return cast(float)_total_uncompressed/_total_compressed;
  174. }
  175. override size_t compressed_file_size() @property const {
  176. return _compressed_file_size;
  177. }
  178. private:
  179. ChunkRange _range;
  180. typeof(_range.front.start_offset) _start_offset;
  181. typeof(_range.front.end_offset) _end_offset;
  182. typeof(_range.front.decompressed_data) _buf;
  183. size_t _compressed_file_size;
  184. size_t _len; // current data length
  185. size_t _cur; // current position
  186. size_t _total_compressed; // compressed bytes read so far
  187. size_t _total_uncompressed; // uncompressed size of blocks read so far
  188. bool _its_time_to_get_next_chunk;
  189. // Effect:
  190. //
  191. // _range is untouched (only empty() and front() are used)
  192. //
  193. // _buf is filled with the contents of _range.front.decompressed_data
  194. //
  195. // _cur, _start_offset_, end_offset, _total_uncompressed, and
  196. // _total_compressed variables are updated appropriately
  197. //
  198. // If _range.front.decompressed_data.length == 0, readEOF is set
  199. // (the reason is that this means we've just encountered a special
  200. // BGZF block, indicating end-of-file).
  201. //
  202. // _its_time_to_get_next_chunk is set back to false
  203. void setupStream() {
  204. _its_time_to_get_next_chunk = false;
  205. _cur = 0;
  206. // we don't rely on the caller that it will set _start_offset and
  207. // _end_offset appropriately
  208. if (_range.empty) {
  209. _start_offset = _end_offset;
  210. setEOF();
  211. return;
  212. }
  213. auto tmp = _range.front; /// _range might be lazy,
  214. /// so extra front() calls
  215. /// can cost a lot
  216. debug {
  217. /*
  218. import std.stdio;
  219. if (tmp.skip_start != 0 || tmp.skip_end != 0) {
  220. writeln("reading partial decompressed block: ");
  221. writeln(" skip_start = ", tmp.skip_start);
  222. writeln(" skip_end = ", tmp.skip_end);
  223. writeln(" start_offset = ", tmp.start_offset);
  224. writeln(" end_offset = ", tmp.end_offset);
  225. }
  226. */
  227. }
  228. _cur = tmp.skip_start;
  229. _start_offset = tmp.start_offset;
  230. _end_offset = tmp.end_offset;
  231. _total_compressed += _end_offset - _start_offset - tmp.skip_start - tmp.skip_end;
  232. _buf = tmp.decompressed_data[0 .. $ - tmp.skip_end];
  233. _len = _buf.length;
  234. _total_uncompressed += _len;
  235. if (_len == 0) {
  236. setEOF();
  237. }
  238. return;
  239. }
  240. version(development)
  241. {
  242. final void setEOF() {
  243. import std.stdio;
  244. std.stdio.stderr.writeln("[info][chunkinputstream] len == 0, start offset = ", _start_offset,
  245. ", end offset = ", _end_offset);
  246. readEOF = true;
  247. }
  248. } else {
  249. final void setEOF() { readEOF = true; }
  250. }
  251. size_t readBlockHelper(void* buffer, size_t size) {
  252. ubyte* cbuf = cast(ubyte*) buffer;
  253. if (size + _cur > _len)
  254. size = cast(size_t)(_len - _cur);
  255. ubyte[] ubuf = cast(ubyte[])_buf[cast(size_t)_cur .. cast(size_t)(_cur + size)];
  256. cbuf[0 .. size] = ubuf[];
  257. _cur += size;
  258. return size;
  259. }
  260. }
  261. /**
  262. Returns: input stream wrapping given range of memory chunks
  263. */
  264. auto makeChunkInputStream(R)(R range, size_t compressed_file_size=0)
  265. if (is(Unqual!(ElementType!R) == AugmentedDecompressedBgzfBlock))
  266. {
  267. return new ChunkInputStream!R(range, compressed_file_size);
  268. }
  269. /// ditto
  270. auto makeChunkInputStream(R)(R range, size_t compressed_file_size=0)
  271. if (is(Unqual!(ElementType!R) == DecompressedBgzfBlock))
  272. {
  273. return makeChunkInputStream(map!makeAugmentedBlock(range), compressed_file_size);
  274. }
  275. /// Convenient decorator for input streams.
  276. final class BgzfInputStream : IChunkInputStream {
  277. private IChunkInputStream _stream;
  278. override size_t readBlock(void* buffer, size_t size) {
  279. return _stream.readBlock(buffer, size);
  280. }
  281. override ulong seek(long offset, SeekPos whence) {
  282. throw new SeekException("Stream is not seekable");
  283. }
  284. override size_t writeBlock(const void* buf, size_t size) {
  285. throw new WriteException("Stream is not writeable");
  286. }
  287. /// Methods implementing $(D IChunkInputStream)
  288. override VirtualOffset virtualTell() {
  289. return _stream.virtualTell();
  290. }
  291. /// ditto
  292. override ubyte[] readSlice(size_t n) {
  293. return _stream.readSlice(n);
  294. }
  295. /// ditto
  296. override float average_compression_ratio() @property const {
  297. return _stream.average_compression_ratio;
  298. }
  299. /// ditto
  300. override size_t compressed_file_size() @property const {
  301. return _stream.compressed_file_size;
  302. }
  303. bool readEOF() @property {
  304. return _stream.readEOF;
  305. }
  306. /// Read BGZF blocks from supplied $(D stream) using $(D task_pool)
  307. /// to do parallel decompression
  308. this(Stream stream, TaskPool task_pool=taskPool) {
  309. auto stream_size = stream.seekable ? stream.size : 0;
  310. auto bgzf_range = BgzfRange(stream);
  311. auto decompressed_blocks = task_pool.map!decompressBgzfBlock(bgzf_range, 24);
  312. _stream = makeChunkInputStream(decompressed_blocks, cast(size_t)stream_size);
  313. readable = true;
  314. writeable = false;
  315. seekable = false;
  316. }
  317. }