Browse Source

Fixes buffer offsets

segfaults
Pjotr Prins 3 years ago
parent
commit
1a2383cde6
  1. 26
      bio/core/bgzf/block.d
  2. 15
      bio/core/bgzf/inputstream.d
  3. 42
      bio/std/hts/bam/randomaccessmanager.d
  4. 4
      bio/std/hts/bam/reader.d

26
bio/core/bgzf/block.d

@ -24,7 +24,7 @@
module bio.core.bgzf.block;
import bio.std.hts.bam.constants;
import bio.core.utils.memoize;
// import bio.core.utils.memoize;
import bio.core.utils.zlib;
import std.array;
@ -35,6 +35,9 @@ import std.exception;
/**
Structure representing BGZF block.
In general, users shouldn't use it, as it is EXTREMELY low-level.
Note it is a struct that has support for comparison based
on its crc32 value.
*/
struct BgzfBlock {
// field types are as in the SAM/BAM specification
@ -97,7 +100,12 @@ import std.stdio;
Start offset is needed to be able to tell current virtual offset,
and yet be able to decompress blocks in parallel.
*/
struct DecompressedBgzfBlock {
class DecompressedBgzfBlock {
this(ulong start, ulong end, ubyte[] buf) {
start_offset = start;
end_offset = end;
decompressed_data = buf;
}
~this() {
stderr.writeln("destroy DecompressedBgzfBlock ",start_offset,":",end_offset," ",decompressed_data.sizeof);
};
@ -108,7 +116,7 @@ struct DecompressedBgzfBlock {
}
///
alias Cache!(BgzfBlock, DecompressedBgzfBlock) BgzfBlockCache;
// alias Cache!(BgzfBlock, DecompressedBgzfBlock) BgzfBlockCache;
/// Function for BGZF block decompression.
/// Reuses buffer allocated for storing compressed data,
@ -117,7 +125,7 @@ alias Cache!(BgzfBlock, DecompressedBgzfBlock) BgzfBlockCache;
DecompressedBgzfBlock decompressBgzfBlock(BgzfBlock block)
{
if (block.input_size == 0) {
return DecompressedBgzfBlock(block.start_offset,
return new DecompressedBgzfBlock(block.start_offset,
block.start_offset + block.bsize + 1,
cast(ubyte[])[]); // EOF marker
// TODO: add check for correctness of EOF marker
@ -199,8 +207,14 @@ DecompressedBgzfBlock decompressBgzfBlock(BgzfBlock block)
" - ", block._buffer.ptr + block.input_size);
}
block._buffer[0 .. block.input_size] = uncompressed[];
auto buf2 = (block._buffer[0 .. block.input_size]).dup;
assert(&buf2 != &block._buffer);
assert(buf2.length == block.input_size);
block._buffer = buf2;
block.dirty = true;
// block.start_offset = 0;
// block.bsize = cast(ushort)(buf2.length-1);
return DecompressedBgzfBlock(block.start_offset, block.end_offset,
block._buffer[0 .. block.input_size]);
auto decompressed = new DecompressedBgzfBlock(block.start_offset, block.end_offset, buf2);
return decompressed;
}

15
bio/core/bgzf/inputstream.d

@ -42,6 +42,15 @@ class BgzfException : Exception {
this(string msg) { super(msg); }
}
/*
Called by
randomaccessmanager.d: fillBgzfBufferFromStream(stream, true, &block, buf.ptr);
inputstream.d: auto result = fillBgzfBufferFromStream(_stream, _seekable, block, buffer,
*/
bool fillBgzfBufferFromStream(Stream stream, bool is_seekable,
BgzfBlock* block, ubyte* buffer,
size_t *number_of_bytes_read=null)
@ -342,7 +351,7 @@ class BgzfInputStream : Stream {
BgzfBlockSupplier _supplier;
ubyte[] _data;
BgzfBlockCache _cache;
// BgzfBlockCache _cache;
ubyte[] _read_buffer;
VirtualOffset _current_vo;
@ -444,13 +453,13 @@ class BgzfInputStream : Stream {
this(BgzfBlockSupplier supplier,
TaskPool pool=taskPool,
BgzfBlockCache cache=null,
// BgzfBlockCache cache=null,
size_t buffer_size=0)
{
_supplier = supplier;
_compressed_size = _supplier.totalCompressedSize();
_pool = pool;
_cache = cache;
// _cache = cache;
size_t n_tasks = max(pool.size, 1) * 2;
if (buffer_size > 0)

42
bio/std/hts/bam/randomaccessmanager.d

@ -8,10 +8,10 @@
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@ -67,10 +67,10 @@ private {
/// Class which random access tasks are delegated to.
class RandomAccessManager {
void setCache(BgzfBlockCache cache) {
_cache = cache;
}
// void setCache(BgzfBlockCache cache) {
// _cache = cache;
//}
void setTaskPool(TaskPool task_pool) {
_task_pool = task_pool;
}
@ -146,7 +146,8 @@ class RandomAccessManager {
auto _compressed_stream = new EndianStream(_stream, Endian.littleEndian);
_compressed_stream.seekSet(cast(size_t)(offset.coffset));
auto supplier = new StreamSupplier(_compressed_stream, offset.uoffset);
auto bgzf_stream = new BgzfInputStream(supplier, _task_pool, _cache);
// auto bgzf_stream = new BgzfInputStream(supplier, _task_pool, _cache);
auto bgzf_stream = new BgzfInputStream(supplier, _task_pool);
return bgzf_stream;
}
@ -154,7 +155,7 @@ class RandomAccessManager {
/// Every time new stream is used.
BamRead getReadAt(VirtualOffset offset) {
auto stream = createStreamStartingFrom(offset);
bool old_mode = _reader._seqprocmode;
_reader._seqprocmode = true;
auto read = bamReadRange(stream, _reader).front.dup;
@ -303,7 +304,7 @@ class RandomAccessManager {
auto reads = readsFromChunks!IteratePolicy(chunks);
return filterBamReads(reads, regions);
}
/// Fetch alignments with given reference sequence id, overlapping [beg..end)
auto getReads(alias IteratePolicy=withOffsets)(BamRegion region)
{
@ -325,7 +326,7 @@ class RandomAccessManager {
last_ref_id = region.ref_id;
}
}
static ref auto regB(ref BamRegion region) { return region.start; }
static ref auto regE(ref BamRegion region) { return region.end; }
foreach (ref group; regions_by_ref)
@ -341,24 +342,25 @@ private:
auto fstream = new bio.core.utils.stream.File(_filename);
auto compressed_stream = new EndianStream(fstream, Endian.littleEndian);
auto supplier = new StreamChunksSupplier(compressed_stream, chunks);
auto stream = new BgzfInputStream(supplier, _task_pool, _cache);
// auto stream = new BgzfInputStream(supplier, _task_pool, _cache);
auto stream = new BgzfInputStream(supplier, _task_pool);
return bamReadRange!IteratePolicy(stream, _reader);
}
string _filename;
BaiFile _bai;
BamReader _reader;
TaskPool _task_pool;
size_t _buffer_size;
BgzfBlockCache _cache;
// BgzfBlockCache _cache;
TaskPool task_pool() @property {
if (_task_pool is null)
_task_pool = taskPool;
return _task_pool;
}
public:
static struct BamReadFilter(R) {
@ -378,13 +380,13 @@ public:
ElementType!R front() @property {
return _current_read;
}
void popFront() {
_range.popFront();
findNext();
}
private:
private:
R _range;
uint _ref_id;
BamRegion _region;
@ -417,9 +419,9 @@ public:
if (_current_read.position >= _region.end) {
// As reads are sorted by leftmost coordinate,
// all remaining alignments in _range
// all remaining alignments in _range
// will not overlap the current interval as well.
//
//
// [-----)
// . [-----------)
// . [---)
@ -443,7 +445,7 @@ public:
}
if (_current_read.position +
_current_read.basesCovered() <= _region.start)
_current_read.basesCovered() <= _region.start)
{
/// ends before beginning of the region
/// [-----------)
@ -455,7 +457,7 @@ public:
return; /// _current_read overlaps the region
}
}
_empty = true;
_empty = true;
}
}

4
bio/std/hts/bam/reader.d

@ -546,7 +546,7 @@ private:
compressed_stream);
return new BgzfInputStream(block_supplier, _task_pool,
null, _buffer_size);
_buffer_size);
}
// get decompressed stream starting from the first alignment record
@ -559,7 +559,7 @@ private:
compressed_stream.seekCur(_reads_start_voffset.coffset);
auto block_supplier = new StreamSupplier(compressed_stream);
auto stream = new BgzfInputStream(block_supplier, _task_pool,
null, _buffer_size);
_buffer_size);
stream.readString(_reads_start_voffset.uoffset);
return stream;
} else {

Loading…
Cancel
Save