Browse Source

bgzf input/output streams

remotes/georgeg/bam_output_redesign
lomereiter 10 years ago
parent
commit
d38d2b17da
  1. 12
      bio/bam/randomaccessmanager.d
  2. 6
      bio/bam/reader.d
  3. 8
      bio/bam/readrange.d
  4. 2
      bio/core/bgzf/compress.d
  5. 62
      bio/core/bgzf/inputstream.d
  6. 155
      bio/core/bgzf/outputstream.d
  7. 30
      bio/core/utils/range.d
  8. 92
      bio/core/utils/roundbuf.d
  9. 4
      test/unittests.d

12
bio/bam/randomaccessmanager.d

@ -117,8 +117,8 @@ class RandomAccessManager {
return true;
}
/// Get new BgzfInputStream starting from specified virtual offset.
BgzfInputStream createStreamStartingFrom(VirtualOffset offset, TaskPool task_pool=null) {
/// Get new IChunkInputStream starting from specified virtual offset.
IChunkInputStream createStreamStartingFrom(VirtualOffset offset, TaskPool task_pool=null) {
auto _stream = new bio.core.utils.stream.File(_filename);
auto _compressed_stream = new EndianStream(_stream, Endian.littleEndian);
@ -134,7 +134,7 @@ class RandomAccessManager {
auto adjusted_range = chain(repeat(adjusted_front, 1),
map!makeAugmentedBlock(decompressed_range));
return cast(BgzfInputStream)makeChunkInputStream(adjusted_range);
return cast(IChunkInputStream)makeChunkInputStream(adjusted_range);
}
if (task_pool is null) {
@ -163,7 +163,7 @@ class RandomAccessManager {
///
/// If $(D task_pool) is not null, it is used for parallel decompression. Otherwise, decompression is serial.
auto getReadsBetween(VirtualOffset from, VirtualOffset to, TaskPool task_pool=null) {
BgzfInputStream stream = createStreamStartingFrom(from, task_pool);
IChunkInputStream stream = createStreamStartingFrom(from, task_pool);
static bool offsetTooBig(BamReadBlock record, VirtualOffset vo) {
return record.end_virtual_offset > vo;
@ -244,7 +244,7 @@ class RandomAccessManager {
// (modified unpacked blocks)
// |
// V (5)
// BgzfInputStream
// IChunkInputStream
// |
// V (6)
// filter out non-overlapping records
@ -255,7 +255,7 @@ class RandomAccessManager {
auto bgzf_range = getJoinedBgzfRange(chunks); // (2)
auto decompressed_blocks = getUnpackedBlocks(bgzf_range); // (3)
auto augmented_blocks = getAugmentedBlocks(decompressed_blocks, chunks); // (4)
BgzfInputStream stream = makeChunkInputStream(augmented_blocks); // (5)
IChunkInputStream stream = makeChunkInputStream(augmented_blocks); // (5)
auto reads = bamReadRange!IteratePolicy(stream);
return filterBamReads(reads, ref_id, beg, end); // (6)
}

6
bio/bam/reader.d

@ -269,7 +269,7 @@ private:
string _filename; // filename (if available)
Stream _source_stream; // compressed
BgzfInputStream _decompressed_stream; // decompressed
IChunkInputStream _decompressed_stream; // decompressed
Stream _bam; // decompressed + endian conversion
bool _stream_is_seekable;
@ -349,7 +349,7 @@ private:
}
// get decompressed stream out of compressed BAM file
BgzfInputStream getDecompressedStream() {
IChunkInputStream getDecompressedStream() {
auto compressed_stream = getSeekableCompressedStream();
@ -370,7 +370,7 @@ private:
// get decompressed stream starting from the first alignment record
BgzfInputStream getDecompressedBamReadStream() {
IChunkInputStream getDecompressedBamReadStream() {
auto compressed_stream = getSeekableCompressedStream();
if (compressed_stream !is null) {

8
bio/bam/readrange.d

@ -74,8 +74,8 @@ mixin template withoutOffsets() {
class BamReadRange(alias IteratePolicy)
{
/// Create new range from BgzfInputStream.
this(ref BgzfInputStream stream) {
/// Create new range from IChunkInputStream.
this(ref IChunkInputStream stream) {
_stream = stream;
_endian_stream = new EndianStream(_stream, Endian.littleEndian);
readNext();
@ -92,7 +92,7 @@ class BamReadRange(alias IteratePolicy)
}
private:
BgzfInputStream _stream;
IChunkInputStream _stream;
EndianStream _endian_stream;
BamRead _current_record;
@ -143,6 +143,6 @@ private:
}
/// Returns: lazy range of BamRead structs constructed from a given stream.
auto bamReadRange(alias IteratePolicy=withoutOffsets)(ref BgzfInputStream stream) {
auto bamReadRange(alias IteratePolicy=withoutOffsets)(ref IChunkInputStream stream) {
return new BamReadRange!IteratePolicy(stream);
}

2
bio/core/bgzf/compress.d

@ -38,7 +38,7 @@ import core.bitop;
/// decompressed data
///
/// For uncompressed BAM output, use level = 0.
ubyte[] bgzfCompress(ubyte[] chunk, int level, ubyte[] buffer=null)
ubyte[] bgzfCompress(ubyte[] chunk, int level=-1, ubyte[] buffer=null)
in
{
assert(-1 <= level && level <= 9);

62
bio/core/bgzf/inputstream.d

@ -20,6 +20,7 @@
module bio.core.bgzf.inputstream;
import bio.core.bgzf.block;
import bio.core.bgzf.blockrange;
import bio.core.bgzf.virtualoffset;
import std.stream;
@ -27,6 +28,7 @@ import std.range;
import std.array;
import std.traits;
import std.algorithm;
import std.parallelism;
/// Interface on top of Stream, which glues decompressed blocks
/// together and provides virtualTell() method for determining
@ -47,7 +49,7 @@ import std.algorithm;
/// caching, for random access - with caching; and maybe
/// you'll want several types of caching.)
///
abstract class BgzfInputStream : Stream {
abstract class IChunkInputStream : Stream {
/// Returns: current virtual offset
///
/// (For details about what virtual offset is,
@ -86,7 +88,7 @@ AugmentedDecompressedBgzfBlock makeAugmentedBlock(DecompressedBgzfBlock block) {
However, front() gets called only once for each element. That fits
the philosophy of std.algorithm.map.
*/
final class ChunkInputStream(ChunkRange) : BgzfInputStream
final class ChunkInputStream(ChunkRange) : IChunkInputStream
{
static assert(is(ElementType!ChunkRange == AugmentedDecompressedBgzfBlock));
@ -322,3 +324,59 @@ auto makeChunkInputStream(R)(R range, size_t compressed_file_size=0)
{
return makeChunkInputStream(map!makeAugmentedBlock(range), compressed_file_size);
}
/// Convenient decorator for input streams.
final class BgzfInputStream : IChunkInputStream {
private IChunkInputStream _stream;
override size_t readBlock(void* buffer, size_t size) {
return _stream.readBlock(buffer, size);
}
override ulong seek(long offset, SeekPos whence) {
throw new SeekException("Stream is not seekable");
}
override ulong writeBlock(const void* buf, size_t size) {
throw new WriteException("Stream is not writeable");
}
/// Methods implementing $(D IChunkInputStream)
override VirtualOffset virtualTell() {
return _stream.virtualTell();
}
/// ditto
override ubyte[] readSlice(size_t n) {
return _stream.readSlice(n);
}
/// ditto
override float average_compression_ratio() @property const {
return _stream.average_compression_ratio;
}
/// ditto
override size_t compressed_file_size() @property const {
return _stream.compressed_file_size;
}
bool readEOF() @property {
return _stream.readEOF;
}
/// Read BGZF blocks from supplied $(D stream) using $(D task_pool)
/// to do parallel decompression
this(Stream stream, TaskPool task_pool=taskPool) {
auto stream_size = stream.seekable ? stream.size : 0;
auto bgzf_range = BgzfRange(stream);
auto decompressed_blocks = task_pool.map!decompressBgzfBlock(bgzf_range, 24);
_stream = makeChunkInputStream(decompressed_blocks, stream_size);
readable = true;
writeable = false;
seekable = false;
}
}

155
bio/core/bgzf/outputstream.d

@ -0,0 +1,155 @@
/*
This file is part of BioD.
Copyright (C) 2012 Artem Tarasov <lomereiter@gmail.com>
BioD is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
BioD is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
module bio.core.bgzf.outputstream;
import bio.core.bgzf.constants;
import bio.core.bgzf.compress;
import bio.core.utils.roundbuf;
import std.stream;
import std.parallelism;
import std.array;
import std.algorithm : max;
/// Class for BGZF compression
class BgzfOutputStream : Stream {
private {
Stream _stream = void;
TaskPool _task_pool = void;
ubyte[] _buffer = void;
size_t _current_size;
int _compression_level = void;
alias Task!(bgzfCompress, ubyte[], int) CompressionTask;
RoundBuf!(CompressionTask*) _compression_tasks = void;
}
/// Create new BGZF output stream which will use
/// provided $(D task_pool) to do multithreaded compression.
this(Stream output_stream,
int compression_level=-1,
TaskPool task_pool=taskPool,
size_t max_block_size=BGZF_BLOCK_SIZE)
{
_stream = output_stream;
_task_pool = task_pool;
_compression_level = compression_level;
_buffer = uninitializedArray!(ubyte[])(max_block_size);
_compression_tasks = RoundBuf!(CompressionTask*)(max(task_pool.size, 1));
readable = false;
writeable = true;
seekable = false;
}
override size_t readBlock(void* buffer, size_t size) {
throw new ReadException("Stream is not readable");
}
override ulong seek(long offset, SeekPos whence) {
throw new SeekException("Stream is not seekable");
}
override size_t writeBlock(const void* buf, size_t size) {
if (size + _current_size >= _buffer.length) {
size_t room;
ubyte[] data = (cast(ubyte*)buf)[0 .. size];
while (data.length + _current_size >= _buffer.length) {
room = _buffer.length - _current_size;
_buffer[$ - room .. $] = data[0 .. room];
data = data[room .. $];
_current_size = _buffer.length;
flushCurrentBlock();
}
_buffer[0 .. data.length] = data[];
_current_size = data.length;
} else {
_buffer[_current_size .. _current_size + size] = (cast(ubyte*)buf)[0 .. size];
_current_size += size;
}
return size;
}
/// Force flushing current block, even if it is not yet filled.
/// Should be used when it's not desired to have records crossing block borders.
void flushCurrentBlock() {
if (_current_size == 0)
return;
if (_compression_tasks.full) {
auto block = _compression_tasks.front.yieldForce();
_stream.writeExact(block.ptr, block.length);
_compression_tasks.popFront();
}
auto compression_task = task!bgzfCompress(_buffer[0 .. _current_size],
_compression_level);
_compression_tasks.put(compression_task);
_task_pool.put(compression_task);
_buffer = uninitializedArray!(ubyte[])(_buffer.length);
_current_size = 0;
}
/// Flush all remaining BGZF blocks and close source stream.
override void close() {
flushCurrentBlock();
foreach (task; _compression_tasks) {
auto block = task.yieldForce();
_stream.writeExact(block.ptr, block.length);
}
_stream.close();
super.close();
}
}
unittest {
import bio.core.bgzf.inputstream;
import bio.core.bgzf.blockrange;
import std.array, std.range, std.stdio;
char[] data = "my very l" ~ array(repeat('o', 1000000)) ~ "ng string";
auto output_stream = new MemoryStream();
auto bgzf_output_stream = new BgzfOutputStream(output_stream);
bgzf_output_stream.writeExact(data.ptr, data.length);
bgzf_output_stream.close();
auto input_stream = new MemoryStream(output_stream.data);
input_stream.seekSet(0);
auto bgzf_input_stream = new BgzfInputStream(input_stream);
char[] uncompressed_data = new char[data.length];
bgzf_input_stream.readExact(uncompressed_data.ptr, data.length);
assert(uncompressed_data == data);
}

30
bio/core/utils/range.d

@ -19,6 +19,8 @@
*/
module bio.core.utils.range;
import bio.core.utils.roundbuf;
import std.range;
import std.exception;
import std.algorithm;
@ -43,45 +45,41 @@ auto prefetch(Range)(Range r, size_t amount) {
alias ElementType!Range E;
this(Range range, size_t amount) {
_elements = new E[amount];
_roundbuf = RoundBuf!E(amount);
_range = range;
_amount = amount;
foreach (i; 0 .. _amount) {
foreach (i; 0 .. amount) {
if (_range.empty) {
break;
}
_elements[i] = _range.front;
++ _read;
_roundbuf.put(_range.front);
_range.popFront();
}
}
bool empty() @property {
return _range.empty() && _read == _consumed;
return _range.empty && _roundbuf.empty;
}
E front() @property {
return _elements[_consumed % _amount];
auto ref E front() @property {
return _roundbuf.front;
}
void popFront() @property {
assert(!_roundbuf.empty);
if (_range.empty) {
++ _consumed;
_roundbuf.popFront();
return;
}
_elements[_consumed % _amount] = _range.front;
++ _consumed;
_roundbuf.popFront();
_roundbuf.put(_range.front);
_range.popFront();
++ _read;
}
private:
Range _range;
size_t _read = 0;
size_t _consumed = 0;
size_t _amount;
E[] _elements = void;
RoundBuf!E _roundbuf;
}
return Result(r, amount);

92
bio/core/utils/roundbuf.d

@ -0,0 +1,92 @@
/*
This file is part of BioD.
Copyright (C) 2012 Artem Tarasov <lomereiter@gmail.com>
BioD is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
BioD is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
module bio.core.utils.roundbuf;
import std.exception;
/// Cyclic buffer
struct RoundBuf(T) {
private {
T[] _items = void;
size_t _put;
size_t _taken;
}
/** initializes round buffer of size $(D n) */
this(size_t n) {
_items = new T[n];
}
/// Input range primitives
bool empty() @property const {
return _put == _taken;
}
/// ditto
auto ref T front() @property {
enforce(!empty, "buffer is empty");
return _items[_taken % $];
}
/// ditto
void popFront() {
++_taken;
}
/// Output range primitive
void put(T item) {
enforce(!full, "buffer is full");
_items[_put % $] = item;
++_put;
}
/// Check if buffer is full
bool full() @property const {
return _put == _taken + _items.length;
}
/// Current number of elements
size_t length() @property const {
return _put - _taken;
}
}
unittest {
auto buf = RoundBuf!int(4);
assert(buf.empty);
buf.put(1);
buf.put(2);
assert(buf.length == 2);
assert(buf.front == 1);
buf.popFront();
buf.put(1);
buf.put(0);
buf.put(3);
assert(buf.full);
buf.popFront();
buf.put(4);
buf.popFront();
buf.popFront();
assert(buf.front == 3);
buf.popFront();
assert(buf.front == 4);
}

4
test/unittests.d

@ -31,7 +31,9 @@ import bio.bam.validation.alignment;
import bio.bam.utils.samheadermerger;
import bio.sam.utils.recordparser;
import bio.bam.serialization.sam;
import bio.core.bgzf.outputstream;
import bio.core.utils.roundbuf;
import bio.core.utils.range;
import bio.core.utils.tmpfile;
import std.path;

Loading…
Cancel
Save