Browse Source

multiple BAM reader (2.061+)

remotes/georgeg/no_streams
lomereiter 10 years ago
parent
commit
9d4ef7ca9d
  1. 246
      bio/bam/multireader.d
  2. 8
      bio/bam/reader.d
  3. 11
      bio/bam/utils/samheadermerger.d

246
bio/bam/multireader.d

@ -0,0 +1,246 @@
/*
This file is part of BioD.
Copyright (C) 2012-2013 Artem Tarasov <lomereiter@gmail.com>
BioD is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
BioD is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
module bio.bam.multireader;
import bio.sam.header;
import bio.bam.reader;
import bio.bam.read;
import bio.bam.referenceinfo;
import bio.bam.utils.samheadermerger;
import bio.core.utils.range;
import std.algorithm;
import std.range;
import std.array;
import std.exception;
import std.typecons;
alias size_t FileId;
// ((BamRead, SamHeaderMerger), (BamRead, SamHeaderMerger)) -> bool
bool compare(T)(const auto ref T r1, const auto ref T r2) {
assert(r1[1] == r2[1]);
auto sorting_order = r1[1].merged_header.sorting_order;
if (sorting_order == SortingOrder.coordinate)
return compareCoordinates(r1[0], r2[0]);
else if (sorting_order == SortingOrder.queryname)
return compareReadNames(r1[0], r2[0]);
else
assert(0);
}
// (BamReader, SamHeaderMerger, FileId) -> [(BamRead, SamHeaderMerger, FileId)]
auto readRange(BamReader reader, SamHeaderMerger merger, FileId index) {
return zip(reader.reads, repeat(merger), repeat(index));
}
// (BamReader, SamHeaderMerger, FileId, int, uint, uint) ->
// [(BamRead, SamHeaderMerger, FileId)]
auto readRange(BamReader reader, SamHeaderMerger merger, FileId index,
int ref_id, uint start, uint end)
{
auto old_ref_id = cast(int)merger.ref_id_reverse_map[index][ref_id];
auto reads = reader.reference(old_ref_id)[start .. end];
return zip(reads, repeat(merger), repeat(index));
}
// ([BamReader], SamHeaderMerger) -> [[(BamRead, SamHeaderMerger, FileId)]]
auto readRanges(BamReader[] readers, SamHeaderMerger merger) {
return readers.zip(repeat(merger), iota(readers.length))
.map!(t => readRange(t[0], t[1], t[2]))();
}
// ([BamReader], SamHeaderMerger, int, uint, uint) ->
// [[BamRead, SamHeaderMerger, FileId)]]
auto readRanges(BamReader[] readers, SamHeaderMerger merger,
int ref_id, uint start, uint end)
{
return readers.zip(repeat(merger), iota(readers.length),
repeat(ref_id), repeat(start), repeat(end))
.map!(t => readRange(t[0], t[1], t[2], t[3], t[4], t[5]))();
}
// tweaks RG and PG tags, and reference sequence ID
// [(BamRead, SamHeaderMerger, size_t)] -> [BamRead]
// [[(BamRead, SamHeaderMerger, size_t)]] -> [[BamRead]]
auto adjustTags(R)(R reads_with_aux_info) if (isInputRange!R) {
return map!adjustTags1(reads_with_aux_info).array();
}
auto adjustTags1(R)(R reads_with_aux_info) if (isInputRange!R) {
return map!adjustTags(reads_with_aux_info);
}
// (BamRead, SamHeaderMerger, size_t) -> (BamRead, SamHeaderMerger)
auto adjustTags(R)(R read_with_aux_info) if (!isInputRange!R) {
auto read = read_with_aux_info[0];
auto merger = read_with_aux_info[1];
auto file_id = read_with_aux_info[2];
with (merger) {
assert(file_id < ref_id_map.length);
auto old_ref_id = read.ref_id;
if (old_ref_id != -1 && old_ref_id in ref_id_map[file_id]) {
auto new_ref_id = to!int(ref_id_map[file_id][old_ref_id]);
if (new_ref_id != old_ref_id)
read.ref_id = new_ref_id;
}
auto program = read["PG"];
if (!program.is_nothing) {
auto pg_str = *(cast(string*)(&program));
if (pg_str in program_id_map[file_id]) {
auto new_pg = program_id_map[file_id][pg_str];
if (new_pg != pg_str)
read["PG"] = new_pg;
}
}
auto read_group = read["RG"];
if (!read_group.is_nothing) {
auto rg_str = *(cast(string*)(&read_group));
if (rg_str in readgroup_id_map[file_id]) {
auto new_rg = readgroup_id_map[file_id][rg_str];
if (new_rg != rg_str)
read["RG"] = new_rg;
}
}
}
return tuple(read, merger);
}
///
class MultiBamReader {
///
this(BamReader[] readers) {
_readers = readers;
_merger = new SamHeaderMerger(readers.map!(r => r.header).array());
auto n_references = _merger.merged_header.sequences.length;
_reference_sequences = new ReferenceSequenceInfo[n_references];
size_t i;
foreach (line; _merger.merged_header.sequences) {
_reference_sequences[i] = ReferenceSequenceInfo(line.name, line.length);
_reference_sequence_dict[line.name] = i++;
}
}
///
this(string[] filenames) {
this(filenames.map!(fn => new BamReader(fn)).array());
}
///
BamReader[] readers() @property {
return _readers;
}
///
SamHeader header() @property {
return _merger.merged_header;
}
///
auto reads() @property {
return readRanges(_readers, _merger).adjustTags().nWayUnion!compare();
}
///
const(ReferenceSequenceInfo)[] reference_sequences() @property const {
return _reference_sequences;
}
/**
Check if reference named $(I ref_name) is presented in BAM header.
*/
bool hasReference(string ref_name) {
return null != (ref_name in _reference_sequence_dict);
}
/**
Returns reference sequence with id $(I ref_id).
*/
MultiBamReference reference(int ref_id) {
enforce(ref_id >= 0, "Invalid reference index");
enforce(ref_id < _reference_sequences.length, "Invalid reference index");
return MultiBamReference(_readers, _merger,
_reference_sequences[ref_id], ref_id);
}
/**
Returns reference sequence named $(I ref_name).
*/
MultiBamReference opIndex(string ref_name) {
enforce(hasReference(ref_name),
"Reference with name " ~ ref_name ~ " does not exist");
auto ref_id = cast(int)_reference_sequence_dict[ref_name];
return reference(ref_id);
}
private {
BamReader[] _readers;
SamHeaderMerger _merger;
ReferenceSequenceInfo[] _reference_sequences;
size_t[string] _reference_sequence_dict;
}
}
struct MultiBamReference {
private {
BamReader[] _readers;
SamHeaderMerger _merger;
int _ref_id;
ReferenceSequenceInfo _info;
}
this(BamReader[] readers, SamHeaderMerger merger,
ReferenceSequenceInfo info, int ref_id)
{
_readers = readers;
_merger = merger;
_ref_id = ref_id;
_info = info;
}
///
string name() @property const { return _info.name; }
///
int length() @property const { return _info.length; }
///
int ref_id() @property const { return _ref_id; }
/// Get alignments overlapping [start, end) region.
/// $(BR)
/// Coordinates are 0-based.
auto opSlice(uint start, uint end) {
enforce(start < end, "start must be less than end");
auto ranges = readRanges(_readers, _merger, ref_id, start, end);
return ranges.adjustTags().nWayUnion!compare();
}
///
auto opSlice() {
return opSlice(0, length);
}
}

8
bio/bam/reader.d

@ -116,13 +116,17 @@ class BamReader : IBamSamReader {
}
/// ditto
this(string filename,
std.parallelism.TaskPool task_pool = std.parallelism.taskPool) {
this(string filename, std.parallelism.TaskPool task_pool) {
_filename = filename;
_source_stream = getNativeEndianSourceStream();
this(_source_stream, task_pool);
}
/// ditto
this(string filename) {
this(filename, std.parallelism.taskPool);
}
/**
True if BAI file was found for this BAM file.

11
bio/bam/utils/samheadermerger.d

@ -37,14 +37,13 @@ import bio.bam.utils.graph;
class SamHeaderMerger {
/// Takes array of SAM headers as an input.
///
/// WARNING: merger might modify the passed array for better performance.
this(SamHeader[] headers, bool validate_headers=false) {
_headers = headers;
_len = _headers.length;
merged_header = new SamHeader();
ref_id_map = new size_t[size_t][_len];
ref_id_reverse_map = new size_t[size_t][_len];
program_id_map = new string[string][_len];
readgroup_id_map = new string[string][_len];
@ -86,10 +85,13 @@ class SamHeaderMerger {
/// the same for program record identifiers
string[string][] program_id_map;
/// Map: index of SamHeader in input array of headers -> new refID -> old refID
size_t[size_t][] ref_id_reverse_map;
private:
// NOTE FOR DEVELOPER:
// for more info on what's going on here, read comments in CLItools/sambamba-merge/merge.d
// for more info on what's going on here, read comments in sambamba/sambamba/merge.d
SamHeader[] _headers;
size_t _len; // number of headers
@ -137,11 +139,12 @@ private:
merged_header.sequences.add(dict[v]);
}
// make mapping
// make mappings
foreach (size_t i, header; _headers) {
foreach (size_t j, SqLine sq; header.sequences) {
auto new_index = merged_header.sequences.getSequenceIndex(sq.name);
ref_id_map[i][j] = to!size_t(new_index);
ref_id_reverse_map[i][to!size_t(new_index)] = j;
}
}
}

Loading…
Cancel
Save