Browse Source

refactored random access a bit

remotes/georgeg/no_streams
lomereiter 10 years ago
parent
commit
7a41ba7b00
  1. 9
      bio/bam/multireader.d
  2. 95
      bio/bam/randomaccessmanager.d
  3. 4
      bio/bam/reader.d

9
bio/bam/multireader.d

@ -24,10 +24,11 @@ import bio.bam.reader;
import bio.bam.read;
import bio.bam.referenceinfo;
import bio.bam.utils.samheadermerger;
import bio.core.utils.range;
import std.algorithm;
import std.range;
import std.conv;
import std.parallelism;
import std.array;
import std.exception;
import std.typecons;
@ -149,6 +150,11 @@ class MultiBamReader {
this(filenames.map!(fn => new BamReader(fn)).array());
}
///
this(string[] filenames, std.parallelism.TaskPool task_pool = taskPool) {
this(filenames.map!(fn => new BamReader(fn, task_pool)).array());
}
///
BamReader[] readers() @property {
return _readers;
@ -204,6 +210,7 @@ class MultiBamReader {
}
}
///
struct MultiBamReference {
private {
BamReader[] _readers;

95
bio/bam/randomaccessmanager.d

@ -45,23 +45,52 @@ import std.exception;
import std.parallelism;
// made public to be accessible from utils.memoize module
auto decompressTask(BgzfBlock block) {
auto t = task!decompressBgzfBlock(block);
taskPool.put(t);
// keeps task pool together with block
struct BgzfBlockAux {
TaskPool task_pool;
BgzfBlock block;
alias block this;
hash_t toHash() const pure @safe nothrow { return block.toHash(); }
bool opEquals(const ref BgzfBlockAux other) pure @safe nothrow {
return block == other.block;
}
int opCmp(const ref BgzfBlockAux other) const pure @safe nothrow {
return block.opCmp(other.block);
}
}
// BgzfBlockAux -> Task
auto decompressTask(BgzfBlockAux b) {
auto t = task!decompressBgzfBlock(b.block);
b.task_pool.put(t);
return t;
}
private alias memoize!(decompressTask, 512, FifoCache, BgzfBlock) memDecompressTask;
// BgzfBlockAux -> Task
private alias memoize!(decompressTask, 512,
FifoCache, BgzfBlockAux) memDecompressTask;
// made public only to be accessible from std.algorithm
auto decompressSerial(BgzfBlock block) {
return decompress(block).yieldForce();
// (BgzfBlock, TaskPool) -> DecompressedBgzfBlock
auto decompressSerial(BT)(BT block_and_pool) {
return decompress(block_and_pool).yieldForce();
}
// ditto
auto decompress(BgzfBlock block) {
return memDecompressTask(block);
// (BgzfBlock, TaskPool) -> Task
auto decompress(BT)(BT block_and_pool) {
auto data = BgzfBlockAux(block_and_pool[1], block_and_pool[0]);
return memDecompressTask(data);
}
// ([BgzfBlock], TaskPool) -> [DecompressedBgzfBlock]
auto parallelUnpack(BR)(BR bgzf_range, TaskPool pool, size_t n_threads = 0) {
if (n_threads == 0)
n_threads = max(pool.size(), 1);
auto tasks = bgzf_range.zip(repeat(pool)).map!decompress();
return tasks.prefetch(n_threads).map!"a.yieldForce()"();
}
debug {
@ -71,6 +100,10 @@ debug {
/// Class which random access tasks are delegated to.
class RandomAccessManager {
void setTaskPool(TaskPool task_pool) {
_task_pool = task_pool;
}
/// Constructs new manager for BAM file
this(string filename) {
_filename = filename;
@ -132,13 +165,14 @@ class RandomAccessManager {
}
/// Get new IChunkInputStream starting from specified virtual offset.
IChunkInputStream createStreamStartingFrom(VirtualOffset offset, TaskPool task_pool=null) {
IChunkInputStream createStreamStartingFrom(VirtualOffset offset, bool parallel=true) {
auto _stream = new bio.core.utils.stream.File(_filename);
auto _compressed_stream = new EndianStream(_stream, Endian.littleEndian);
_compressed_stream.seekSet(cast(size_t)(offset.coffset));
auto bgzf_range = BgzfRange(_compressed_stream);
auto n_threads = parallel ? max(task_pool.size, 1) : 1;
auto blocks = BgzfRange(_compressed_stream).parallelUnpack(task_pool, n_threads);
static auto helper(R)(R decompressed_range, VirtualOffset offset) {
@ -151,11 +185,7 @@ class RandomAccessManager {
return cast(IChunkInputStream)makeChunkInputStream(adjusted_range);
}
if (task_pool is null) {
return helper(map!decompressSerial(bgzf_range), offset);
} else {
return helper(task_pool.map!(bio.core.bgzf.blockrange.decompressBgzfBlock)(bgzf_range), offset);
}
return helper(blocks, offset);
}
/// Get single read at a given virtual offset.
@ -176,8 +206,8 @@ class RandomAccessManager {
/// to a start of an alignment record.
///
/// If $(D task_pool) is not null, it is used for parallel decompression. Otherwise, decompression is serial.
auto getReadsBetween(VirtualOffset from, VirtualOffset to, TaskPool task_pool=null) {
IChunkInputStream stream = createStreamStartingFrom(from, task_pool);
auto getReadsBetween(VirtualOffset from, VirtualOffset to) {
IChunkInputStream stream = createStreamStartingFrom(from);
static bool offsetTooBig(BamReadBlock record, VirtualOffset vo) {
return record.end_virtual_offset > vo;
@ -267,7 +297,7 @@ class RandomAccessManager {
// that's it!
auto bgzf_range = getJoinedBgzfRange(chunks); // (2)
auto decompressed_blocks = getUnpackedBlocks(bgzf_range); // (3)
auto decompressed_blocks = getUnpackedBlocks(bgzf_range, task_pool); // (3)
auto augmented_blocks = getAugmentedBlocks(decompressed_blocks, chunks); // (4)
IChunkInputStream stream = makeChunkInputStream(augmented_blocks); // (5)
auto reads = bamReadRange!IteratePolicy(stream, _reader);
@ -279,7 +309,14 @@ private:
string _filename;
BaiFile _bai;
BamReader _reader;
TaskPool _task_pool;
TaskPool task_pool() @property {
if (_task_pool is null)
_task_pool = taskPool;
return _task_pool;
}
public:
// Let's implement the plan described above!
@ -309,20 +346,12 @@ public:
return bgzf_blocks;
}
// (3) : [BgzfBlock] -> [DecompressedBgzfBlock]
static auto getUnpackedBlocks(R)(R bgzf_range) {
// (3) : ([BgzfBlock], TaskPool) -> [DecompressedBgzfBlock]
static auto getUnpackedBlocks(R)(R bgzf_range, TaskPool pool) {
version(serial) {
return map!decompressBgzfBlock(bgzf_range);
return bgzf_range.parallelUnpack(pool, 1);
} else {
InputRange!DecompressedBgzfBlock result;
if (taskPool.size < 2) {
result = inputRangeObject(map!decompressBgzfBlock(bgzf_range));
} else {
// up to (taskPool.size) tasks are being executed at every moment
auto prefetched_range = prefetch(map!decompress(bgzf_range), taskPool.size);
result = inputRangeObject(map!"a.yieldForce()"(prefetched_range));
}
return result;
return bgzf_range.parallelUnpack(pool);
}
}

4
bio/bam/reader.d

@ -309,7 +309,7 @@ class BamReader : IBamSamReader {
enforce(from < to, "First offset must be strictly less than second");
enforce(_stream_is_seekable, "Stream is not seekable");
return _random_access_manager.getReadsBetween(from, to, _task_pool);
return _random_access_manager.getReadsBetween(from, to);
}
/**
@ -413,6 +413,8 @@ private:
} else {
_rndaccssmgr = new RandomAccessManager(this);
}
_rndaccssmgr.setTaskPool(_task_pool);
}
}
return _rndaccssmgr;

Loading…
Cancel
Save