Browse Source

init

master
Pjotr Prins 10 months ago
commit
2c731763ac
8 changed files with 2457 additions and 0 deletions
  1. +34
    -0
      dub.json
  2. +122
    -0
      header.d
  3. +82
    -0
      snappy.d
  4. +47
    -0
      source/app.d
  5. +871
    -0
      source/reader.d
  6. +555
    -0
      source/writer.d
  7. +675
    -0
      utils/bam/reader.d
  8. +71
    -0
      utils/markdup.d

+ 34
- 0
dub.json View File

@@ -0,0 +1,34 @@
{
"authors": [
"NickRoz"
],
"buildOptions": [
"debugMode"
],
"copyright": "Copyright © 2019, NickRoz",
"dependencies": {
"gperftools_d": "~>0.1.0",
"biod": "~>0.2.2",
"dlang-snappy": "~>0.0.2",
"emsi_containers": "~>0.7.0"
},
"description": "columnar BAM",
"dflags": [
"-version=BioDlegacy",
"-m64",
"-g"
],
"importPaths": [
"/home/nickroz/.dub/packages/biod-0.2.3/biod",
"/home/nickroz/.dub/packages/gperftools_d-0.1.0/gperftools_d/source",
"/home/nickroz/.dub/packages/dlang-snappy-0.0.2/dlang-snappy/source",
"/home/nickroz/.dub/packages/emsi_containers-0.7.0/emsi_containers/src"
],
"license": "proprietary",
"name": "cbam",
"sourceFiles": [
"/home/nickroz/Documents/CBAM/utils/bam/reader.d"
],
"targetType": "executable"
}

+ 122
- 0
header.d View File

@@ -0,0 +1,122 @@
/*
New style BAM reader. This file is part of Sambamba.
Copyright (C) 2017,2018 Pjotr Prins <pjotr.prins@thebird.nl>

Sambamba is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
by the Free Software Foundation; either version 2 of the License,
or (at your option) any later version.

Sambamba is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
02111-1307 USA

*/

// This is a complete rewrite of Artem Tarasov's original reader.

module bio.std.experimental.hts.bam.header;

/*
import std.conv;
import core.stdc.stdio: fopen, fread, fclose;
import std.typecons;
import std.bitmanip;

import bio.bam.cigar;
*/

import std.exception;
import std.file;
import std.stdio;
import std.string;

// why import this from old bio.bam
// TODO check it depends on undead.
import bio.std.hts.bam.constants;

import bio.std.hts.sam.header;
import bio.std.experimental.hts.bgzf;
import bio.std.experimental.hts.bgzf_writer;

// what is the difference btw these constants and the ones from bio.std.hts.bam.constants
import bio.std.experimental.hts.constants;

struct RefSequence {
size_d length;
string name;
}

struct BamHeader {
string id;
string text;
RefSequence[] refs;

this(SamHeader header, const(bio.std.hts.bam.referenceinfo.ReferenceSequenceInfo)[] bamRefs){
id = BAM_MAGIC;
text = header.text;
refs.length = bamRefs.length;
int i = 0;
foreach(seq; bamRefs){
refs[i].name = seq.name;
refs[i].length = seq.length;
i++;
}
}
this(string bam_magic, string header_text){
id = bam_magic;
text = header_text;
}

//@disable this(this); // disable copy semantics;
}

void fetch_bam_header(ref BamHeader header, ref BgzfStream stream) {
// stderr.writeln("Fetching BAM header");
ubyte[4] ubyte4;
stream.read(ubyte4);
enforce(ubyte4 == BAM_MAGIC,"Invalid file format: expected BAM magic number");
immutable text_size = stream.read!int();
// stderr.writeln("Text size ",text_size.sizeof," ",text_size);
immutable text = stream.read!string(text_size);
header = BamHeader(BAM_MAGIC,text);
immutable n_refs = stream.read!int();
// stderr.writeln("Fetching ",n_refs," references");
foreach(int n_ref; 0..n_refs) {
immutable l_name = stream.read!int();
// stderr.writeln("!!",l_name);
auto ref_name = stream.read!string(l_name);
immutable l_ref = stream.read!int(); // length of reference sequence (bps)
// stderr.writeln(l_name," ",ref_name," ",l_ref);
header.refs ~= RefSequence(l_ref,ref_name[0..l_name-1]); // drop zero terminator
}
}

void write_bam_header(ref BgzfWriter bw, ref BamHeader header) {
// stderr.writeln("Writing BAM header");
ubyte[4] magic = cast(ubyte[])BAM_MAGIC;
bw.write(magic);
// stderr.writeln("Text size ",int.sizeof," ",header.text.length);
bw.write!int(cast(int)header.text.length);
bw.write(header.text);
auto n_refs = cast(int)header.refs.length;
bw.write!int(cast(int)header.refs.length);
// stderr.writeln("Writing ",n_refs," references");
foreach(int n_ref; 0..n_refs) {
immutable refseq = header.refs[n_ref];
bw.write!int(cast(int)(refseq.name.length+1)); // incl. zero terminator
// stderr.writeln("!!",refseq.name.length+1);
bw.write(refseq.name);
bw.write!ubyte(cast(ubyte)'\0');
bw.write!int(cast(int)refseq.length);
// stderr.writeln(refseq.name.length+1," ",refseq.name," ",refseq.length);
}
// stderr.writeln("!!");
bw.flush_block();
}

+ 82
- 0
snappy.d View File

@@ -0,0 +1,82 @@
module snappy.snappy;

import std.conv;
import std.stdio;

// import gperftools_d.profiler;

extern (C) {
enum snappy_status {
SNAPPY_OK = 0,
SNAPPY_INVALID_INPUT = 1,
SNAPPY_BUFFER_TOO_SMALL = 2,
};

snappy_status snappy_uncompressed_length(const byte* compressed,
size_t compressed_length,
size_t* result);

snappy_status snappy_uncompress(const byte* compressed,
size_t compressed_length,
byte* uncompressed,
size_t* uncompressed_length);
snappy_status snappy_compress(const byte* input,
size_t input_length,
byte* compressed,
size_t* compressed_length);
size_t snappy_max_compressed_length(size_t source_length);
}

class Snappy {
static byte[] uncompress(byte[] compressed, byte[] res) { //, byte[] res
size_t uncompressedPrediction;
snappy_status ok = snappy_uncompressed_length(compressed.ptr, compressed.length, &uncompressedPrediction);
if (ok != snappy_status.SNAPPY_OK) {
throw new Exception(to!(string)(ok));
}
// auto res = new byte[uncompressedPrediction];
size_t uncompressed = uncompressedPrediction;
ok = snappy_uncompress(compressed.ptr, compressed.length, res.ptr, &uncompressed);
if (ok != snappy_status.SNAPPY_OK) {
throw new Exception(to!(string)(ok));
}
if (uncompressed != uncompressedPrediction) {
throw new Exception("uncompressedPrediction " ~ to!(string)(uncompressedPrediction) ~ " != " ~ "uncompressed " ~ to!(string)(uncompressed));
}
return res;
}



static byte[] compress(byte[] uncompressed) {
// ProfilerStart();

size_t maxCompressedSize = snappy_max_compressed_length(uncompressed.length);
byte[] res = new byte[maxCompressedSize];
size_t compressedSize = maxCompressedSize;
snappy_status ok = snappy_compress(uncompressed.ptr, uncompressed.length, res.ptr, &compressedSize);
if (ok != snappy_status.SNAPPY_OK) {
throw new Exception(to!(string)(ok));
}
// ProfilerStop();
return res[0..compressedSize];
}

}


unittest{
import std.stdio;
import util.snappy;

byte[] data = cast(byte[])"ffdsffffffffffffffffaaaaaaaaaaaaaaaaaaccccccccccccccccccccccccdddddddddddddddddddeeeeeeeeeeeeeeeeeeeeeee";
writeln("-------------------------------------------------");
writefln("start test compress data, length:%s", data.length);

byte[] cprData = Snappy.compress(data);
writefln("compress data, length:%s, data:%s", cprData.length, cprData);

byte[] unData = Snappy.uncompress(cprData);
writefln("uncompress data, length:%s, data:%s", unData.length, unData);
}

+ 47
- 0
source/app.d View File

@@ -0,0 +1,47 @@
module app;
import source.writer;
import reader;
import utils.bam.reader;
import std.stdio;
import std.datetime.stopwatch;
import bio.std.hts.bam.reader;
import std.parallelism;
import std.file;
import std.getopt;
import bio.std.hts.bam.reader;
import std.process;
//import gperftools_d.profiler;


void main(string[] args)
{
// Drop disk caches, requires password
executeShell("echo 3 | sudo tee /proc/sys/vm/drop_caches");
auto fn1 = getcwd() ~ "Path to cbam file";
File file = File(fn1, "r");
FileReader fileR = new FileReader(file);
alias bamFields = FileReader.bamFields;
// auto test = FileReader.VariadicReader!(FileReader.bamFields.bin, bamFields.flag, bamFields.raw_qual, bamFields.pos, bamFields.mapq)(
// fileR.fileMeta, file);//

auto test = fileR.getColumn!(bamFields.raw_qual)();
auto sw = StopWatch(AutoStart.no);
sw.start();
//ProfilerStart();
foreach(raw_qual; test){
}
//ProfilerStop();
sw.stop();
writeln(sw.peek.total!"msecs");
import std.algorithm.iteration : reduce;

writeln("UNCOMPRESSING TIME: ",
reduce!"a+b"(GLOBAL_UNCOMPRESS));

}

+ 871
- 0
source/reader.d View File

@@ -0,0 +1,871 @@
module reader;
int[] GLOBAL_BENCH;
int[] GLOBAL_UNCOMPRESS;
import std.stdio;
import std.exception;
import std.bitmanip;
import std.conv;
import std.traits;
import std.algorithm.iteration;
import std.range.primitives;
import std.file;
import std.algorithm.searching;
import std.datetime.stopwatch;
import std.algorithm.comparison;
import std.parallelism;
import std.meta;
import std.format;

import source.writer;
import bio.std.experimental.hts.bam.header;
import utils.bam.reader;
import snappy.snappy;

//import gperftools_d.profiler;

class FileReader
{
File file;
FileMeta fileMeta;
BamHeader bamHeader;
///
this(File f)
{
file = f;
parse_meta();
}

~this()
{
if (!file.isOpen())
return;
file.close();
}
///
void close()
{
file.close();
}

/// Maps column types to Offsets
enum colTypeToOff = [
"_bin_mq_nl" : "Offset.bin_mq_nl",
"_flag_nc" : "Offset.flag_nc",
"sequence_length" : "Offset.l_seq",
"_next_refID" : "Offset.next_refID",
"_next_pos" : "Offset.next_pos",
"_tlen" : "Offset.tlen",
"read_name" : "_read_name_offset",
"raw_cigar" : "_cigar_offset",
"raw_sequence" : "_seq_offset",
"raw_qual" : "_qual_offset",
"raw_tags" : "_tags_offset"
];

/// Returns array of RawReadBlobs representing rowGroup.
RawReadBlob[] readRowGroup(const int i)
{
enforce(i >= 0 && i < fileMeta.rowGroups.length, "No such rowgroup " ~ to!string(i));

auto sizesCol = getColumn!(bamFields.blob_size).getChunk(i);

RawReadBlob[] readsBuf;
readsBuf.length = sizesCol.length;

/// Preallocates blob size to avoid subsequent allocations
foreach (blob_n, ref elem; readsBuf)
{
elem._data.length = sizesCol[blob_n];
}

template GenFiller(ColumnTypes colType)
{
static if (colType == ColumnTypes._refID || colType == ColumnTypes._pos)
{
enum ternar = [ColumnTypes._refID : "refid", ColumnTypes._pos : "pos"];
const char[] GenFiller = "
foreach(read_n, ref blob; readsBuf){
blob."
~ ternar[colType] ~ " = *(cast( int*)(col.ptr + int.sizeof*read_n));
}";
}
else static if (colType == ColumnTypes.read_name ||
colType == ColumnTypes.raw_cigar ||
colType == ColumnTypes.raw_sequence ||
colType == ColumnTypes.raw_qual ||
colType == ColumnTypes.raw_tags )
{
enum offset = "blob." ~ colTypeToOff[to!string(colType)];
//pragma(msg, colType);
//pragma(msg, offset);
const char[] GenFiller = "
foreach(read_n, ref blob; readsBuf){
blob._data["
~ offset ~ ".." ~ offset ~ "+col[read_n].length] = col[read_n];
}";
}
else
{
enum offset = colTypeToOff[to!string(colType)];
const char[] GenFiller = "
foreach(read_n, ref blob; readsBuf){
blob._data[" ~ offset ~ ".." ~ offset
~ "+int.sizeof] = col[read_n*int.sizeof..(read_n+1)*int.sizeof];
}";
}
}

void localReader(ColumnTypes colType)()
{
auto col = getBaseColumn!(colType).getRawChunk(i);
mixin(GenFiller!(colType));
}

static foreach (member; EnumMembers!ColumnTypes)
{
static if (member != ColumnTypes._blob_size)
{
localReader!(member);
}
}

return readsBuf;
}

/// enum of BAM fields held in CBAM file
enum bamFields
{
refID,
pos,
bin,
mapq,
flag,
nextRefID,
nextPos,
tlen,
read_name,
cigar,
raw_sequence,
raw_qual,
blob_size,
tags
}

/// Maps BAM fields to Columns (some BAM fields packed together)
enum bamfToCbamf = [
bamFields.refID : ColumnTypes._refID, bamFields.pos : ColumnTypes._pos, bamFields.bin
: ColumnTypes._bin_mq_nl, bamFields.mapq : ColumnTypes._bin_mq_nl, bamFields.flag : ColumnTypes._flag_nc,
bamFields.nextRefID : ColumnTypes._next_refID, bamFields.nextPos : ColumnTypes._next_pos, bamFields.tlen
: ColumnTypes._tlen, bamFields.read_name : ColumnTypes.read_name, bamFields.cigar : ColumnTypes.raw_cigar,
bamFields.raw_qual : ColumnTypes.raw_qual, bamFields.raw_sequence
: ColumnTypes.raw_sequence, bamFields.blob_size : ColumnTypes._blob_size,
bamFields.tags : ColumnTypes.raw_tags
];

/**
Generates Column given wanted bamField.

Alias `colT` contains the type

Example:
---
FileReader cbam_reader = new FileReader(file);

cbam_reader.getColType!(bfield).colT(fileMeta, file);
---
*/

template getColType(bamFields bfield)
{
static if (bfield == bamFields.refID || bfield == bamFields.pos || bfield == bamFields.nextRefID
|| bfield == bamFields.nextPos || bfield == bamFields.tlen
|| bfield == bamFields.blob_size)
{
alias colT = Column!(int[], bfield); // swap colT to getColType to make usage easier
}
else static if (bfield == bamFields.bin || bfield == bamFields.flag)
{
alias colT = Column!(short[], bfield);
}
else static if (bfield == bamFields.mapq)
alias colT = Column!(ubyte[], bfield);
else static if (bfield == bamFields.read_name || bfield == bamFields.cigar
|| bfield == bamFields.raw_qual || bfield == bamFields.raw_sequence ||
bfield == bamFields.tags)
{
alias colT = Column!(ubyte[][], bfield);
}
else
static assert(false, "This BAM field is not supported yet.");
}

auto getVarReader(BFields...)()
{
return VariadicReader!(BFields)(fileMeta, file);
}

struct VariadicReader(Cols...) if (Cols.length > 0)
{

template genSeq(Args...)
{
static if (Args.length == 0)
{
alias genSeq = AliasSeq!();
}
else
{
alias genSeq = AliasSeq!(getColType!(Args[0]).colT, genSeq!(Args[1 .. $]));
}
}

template extractType(Args...)
{
static if (Args.length == 0)
{
alias extractType = AliasSeq!();
}
else
{
alias extractType = AliasSeq!(ElementType!(TemplateArgsOf!(Args[0])[0]),
extractType!(Args[1 .. $]));
}
}

alias colsT = genSeq!(Cols);
colsT columns;
alias readT = extractType!(colsT);

this(FileMeta meta, File f)
{
foreach (i, ref col; columns)
{
col = colsT[i](meta, f);
}
}
int counterForTest;
void getFields(ref readT fields)
{
foreach (i, ref field; fields)
{
field = columns[i].front;
columns[i].popFront();
}
}

int opApply(int delegate(readT) dg)
{
int result = 0;

import std.range;

readT fields;

for (getFields(fields); !columns[0].empty; getFields(fields))
{
counterForTest++;
result = dg(fields);
if (result)
break;
}

return result;
}
}

unittest
{
auto fn1 = getcwd() ~ "/source/tests/test3.cbam";
auto fn2 = getcwd() ~ "/source/tests/test1.bam";

File file = File(fn1, "r");
FileReader fileR = new FileReader(file);
BamReadBlobStream BAMreader = BamReadBlobStream(fn2);

auto test = VariadicReader!(bamFields.bin, bamFields.flag, bamFields.raw_qual)(
fileR.fileMeta, file);

int counter = 0;
foreach (a, b, c; test)
{
auto temp = BAMreader.front();
auto test = temp._bin;
auto test2 = temp._flag;
// writefln("{%s}: {%s} _ {%s}", counter, test, a);
// writefln("{%s}: {%s} _ {%s}", counter, test2, b);
assert(a == temp._bin);
assert(b == temp._flag);
assert(c == temp.raw_qual);
BAMreader.popFront();
counter++;
}
}

/// Get field specified by enum ColumnTypes.
auto getBaseColumn(ColumnTypes colType)()
{
static if (colType == ColumnTypes._refID || colType == ColumnTypes._pos || colType == ColumnTypes._blob_size
|| colType == ColumnTypes._bin_mq_nl || colType == ColumnTypes._flag_nc
|| colType == ColumnTypes.sequence_length
|| colType == ColumnTypes._next_refID
|| colType == ColumnTypes._next_pos || colType == ColumnTypes._tlen)
{
// refID as a default field, simple plug
return new Column!(int[], bamFields.refID, colType)(fileMeta, file);
}
else
{ // FIX: Passes the case when user sent wrong colType. Should throw error
pragma(msg, colType);
return new Column!(ubyte[][], bamFields.refID, colType)(fileMeta, file);
}
}

/**
Returns constructed Column struct for reading requested BAM field.

Example:
---
FileReader cbam_reader = new FileReader(file);

auto colRefID = cbam_reader.getColumn(bamFields.refID);
---
*/
auto getColumn(bamFields bfield)()
{
return getColType!(bfield).colT(fileMeta, file);
}

/**
Column struct.

Gives access to CBAM columns.

Examples:
---
FileReader cbam_reader = new FileReader(file);

auto col = cbam_reader.getColumn(bamFields.refID);

/// Print refID of every BAM record
foreach(elem; col){
writeln(elem);
}

/// With index
foreach(i, elem; col){
writefln("%s: %s", i, elem);
}

/// Get array of refID of every BAM record
auto values = col.fullColumn();
---
*/
struct Column(T, bamFields bfield, ColumnTypes columnType = bamfToCbamf[bfield])
if (is(T == ubyte[][]) || is(T == int[]) || is(T == short[]) || is(T == ubyte[]))
{

this(FileMeta meta, File f)
{
enforce(f.isOpen());
import std.algorithm.searching : maxElement;
uncompressedBuffer.length = map!(rowgroup => rowgroup.uncompressedColSizes[columnType])(meta.rowGroups).maxElement;
fileMeta = meta;
buffer.length = fileMeta.rowGroups[0].num_rows;
file = f;
enforce(buffer.length != 0, ("File is empty"));
rangeStatus = RangeStatus(meta.rowGroups);
fetchBuffer(fileMeta.rowGroups[0]);
popFront();
}

/// Returns number of records in whole CBAM file
public @property long size()
{
return reduce!((a, b) => a + b)(
map!(row_group => row_group.num_rows)(fileMeta.rowGroups));
}

/// Returns array representing whole CBAM column
public T fullColumn()
{
T column;
column.length = this.size;

foreach (i, elem; this)
{
column[i] = elem;
}

return column;
}

/// Parses value from byte buffer
private static ElementType!T parse(ref byte[] bytes, ref long offset)
{
// variable size fields require additional parsing step
static if (is(ElementType!T == ubyte[]))
{
auto length = *(cast(int*)(bytes.ptr + offset));
offset += int.sizeof;
ElementType!T value = cast(ubyte[]) bytes[offset .. offset + length];
offset += length;
}
else
{
static if (bfield == bamFields.bin || bfield == bamFields.flag)
{
int localOffset = 2;
}
else static if (bfield == bamFields.mapq)
{
int localOffset = 1;
}
else
{
int localOffset = 0;
}
ElementType!T value = *(cast(ElementType!T*)(bytes.ptr + offset + localOffset));
offset += int.sizeof; // fields smaller than int packed into one
}
return value;
}

@property ElementType!T front()
{
return rangeStatus.frontElem;
}
int counterTest;
@property bool empty()
{
counterTest++;
// if(rangeStatus.empty){
// writeln("CounterTest: ", counterTest);
// }
return rangeStatus.empty;
}

void popFront()
{
//enforce(!rangeStatus.empty);
counterTest++;
if (rangeStatus.bufOver)
{
rangeStatus.advance();
auto sw = StopWatch(AutoStart.no);
sw.start();
fetchBuffer(fileMeta.rowGroups[rangeStatus.rowGroupNum]);
auto elapsed = sw.peek.total!"msecs";
// writeln("TIME ON BUFFER FETCH: ", elapsed);
}

rangeStatus.frontElem = buffer[rangeStatus.curElem];
rangeStatus.curElem++;
}

/// Iterates over CBAM column
int opApply(int delegate(ElementType!T) dg)
{
int result = 0;
foreach (rowGroup; fileMeta.rowGroups)
{
fetchBuffer(rowGroup);
foreach (ref ElementType!T elem; buffer)
{
result = dg(elem);
if (result)
break;
}
}

return result;
}

/// Iterates over CBAM column with index
int opApply(int delegate(ref size_t i, ElementType!T) dg)
{
int result = 0;
size_t i = 0;
auto sw = StopWatch(AutoStart.no);
sw.start();
foreach (rowGroup; fileMeta.rowGroups)
{

fetchBuffer(rowGroup);

foreach (ElementType!T elem; buffer)
{
result = dg(i, elem);
++i;
if (result)
break;
}
}
sw.stop();
//writeln("OPAPPLY ELAPSED : ", sw.peek.total!"msecs");
return result;
}

byte[] rawInputBuffer;
private void fetchBuffer(bool isRaw = false)(RowGroupMeta rowGroup)
{
// ProfilerStart();
enforce(file.isOpen());
if(rawInputBuffer.ptr == null){
// ProfilerStart();
auto rowgroupSizes = map!(a => a.columnsSizes[columnType])(fileMeta.rowGroups);
auto maxSize = rowgroupSizes.maxElement;
rawInputBuffer.length = 2*maxSize;
// ProfilerFlush();
}
buffer.length = rowGroup.num_rows;
ulong size = rowGroup.columnsSizes[columnType];

rawInputBuffer.length = size;
file.seek(rowGroup.columnsOffsets[columnType]);
auto sw2 = StopWatch(AutoStart.no);
sw2.start();
file.rawRead(rawInputBuffer);
sw2.stop();
GLOBAL_UNCOMPRESS ~= cast(int) sw2.peek.total!"msecs";

alias plainBuffer = uncompressedBuffer;
plainBuffer.length = rowGroup.uncompressedColSizes[columnType];
// auto sw2 = StopWatch(AutoStart.no);
// sw2.start();
// writeln("test");
// auto plainBuffer = Snappy.uncompress(rawInputBuffer);
Snappy.uncompress(rawInputBuffer, plainBuffer);
// ProfilerFlush();
// sw2.stop();
//writeln("RAWUNCOMPRESS : ", sw2.peek.total!"usecs");
// GLOBAL_UNCOMPRESS ~= cast(int) sw2.peek.total!"msecs";
// ProfilerStop();
static if (isRaw && !is(T == ubyte[][]))
{
rawBuffer = cast(ubyte[]) plainBuffer;
}
else
{

void fillBuffer(byte[] plainBuf, T buf)
{
long offset = 0;
foreach (i, ref elem; buf)
{
elem = parse(plainBuf, offset);
}
}

struct inputWrapper
{
byte[] bytes;
long offset;
}

void fillIntegralBuffer(inputWrapper wrapper, shared(T) buf)
{
import core.atomic : atomicStore;

long offset = 0;
while (offset < wrapper.bytes.length)
{
// atomicStore(buf[wrapper.offset++],
// cast(shared ElementType!T) parse(wrapper.bytes, offset));
buf[wrapper.offset++] = cast(shared ElementType!T) parse(wrapper.bytes,
offset);
}
}

if (is(T == ubyte[][])){
fillBuffer(plainBuffer, buffer);
}

else
{
auto sw = StopWatch(AutoStart.no);
sw.start();
auto pool = new TaskPool(1);
scope (exit)
pool.finish();

auto chunkSize = plainBuffer.length / pool.size();
inputWrapper[] inputWrappers;
if (chunkSize * pool.size() == plainBuffer.length && chunkSize % 64 == 0)
{
inputWrappers.length = pool.size();
}
else
{
inputWrappers.length = pool.size() + 1;
}

while (chunkSize % 64 != 0)
chunkSize--;

foreach (i, ref wrapper; inputWrappers)
{
wrapper.offset = i * chunkSize / 4; // fields size is 4
size_t offset = i * chunkSize;

if (offset + chunkSize > plainBuffer.length)
{
wrapper.bytes = plainBuffer[offset .. $];
}
else
{
wrapper.bytes = plainBuffer[offset .. offset + chunkSize];
}
}
auto tempShared = cast(shared T) buffer;

foreach (wrapper; parallel(inputWrappers))
{
fillIntegralBuffer(wrapper, tempShared);
}
sw.stop();
//writeln("BUFFER FETCH ELAPSED: ", sw.peek.total!"msecs");
GLOBAL_BENCH ~= cast(int) sw.peek.total!"msecs";
}

}

}

T getChunk(int i)
{
fetchBuffer(fileMeta.rowGroups[i]);
return buffer;
}

auto getRawChunk(int i)
{
fetchBuffer!(true)(fileMeta.rowGroups[i]);
static if (columnType == ColumnTypes.read_name || columnType == ColumnTypes.raw_cigar
|| columnType == ColumnTypes.raw_qual || columnType == ColumnTypes.raw_sequence
|| columnType == ColumnTypes.raw_tags)
{
return buffer;
}
else
{
return rawBuffer;
}
}

string toString()
{
return format("COLUMN TYPE : %s\nCOLUMN bfield : %s\nCOLUMN colType: %s\nCOLUMN fileno : %s\n",
typeid(buffer), to!string(bfield), to!string(columnType), file.fileno);
}

private:
byte[] uncompressedBuffer;
ubyte[] rawBuffer;
T buffer;
pragma(msg, T);

struct RangeStatus
{
this(RowGroupMeta[] groups)
{
//writeln("INIT");
rowGroups = groups;
rowGroupNum = 0;
curElem = 0;
}

RowGroupMeta[] rowGroups;
ElementType!T frontElem;
int rowGroupNum;
int curElem;
private int processedTest;
@property bool empty()
{
processedTest++;
if(bufOver && exhausted){
//writeln("processedTest: ", processedTest);
}
return bufOver && exhausted;
}

@property bool bufOver()
{
return curElem == rowGroups[rowGroupNum].num_rows;
}

@property bool exhausted()
{
return rowGroupNum == rowGroups.length - 1;
}

void advance()
{
rowGroupNum++;
curElem = 0;
}
}

RangeStatus rangeStatus;

FileMeta fileMeta;
public File file;
public int status;
}

unittest
{
auto fn1 = getcwd() ~ "/source/tests/test3.cbam";
auto fn2 = getcwd() ~ "/source/tests/test1.bam";

File file = File(fn1, "r");
FileReader fileR = new FileReader(file);

auto CBAMpos = fileR.getColumn!(bamFields.pos); //fileR.getColumn!(int[], bamFields.pos);
auto CBAMflag = fileR.getColumn!(bamFields.flag);
auto CBAMmapq = fileR.getColumn!(bamFields.mapq);
auto CBAMname = fileR.getColumn!(bamFields.read_name);
auto CBAMcig = fileR.getColumn!(bamFields.cigar);
auto CBAMseq = fileR.getColumn!(bamFields.raw_sequence);
auto CBAMqual = fileR.getColumn!(bamFields.raw_qual);
auto CBAMtags = fileR.getColumn!(bamFields.tags);

auto posCol = CBAMpos.fullColumn();
auto flagCol = CBAMflag.fullColumn();
auto mapqCol = CBAMmapq.fullColumn();
auto readNam = CBAMname.fullColumn();
auto cigCol = CBAMcig.fullColumn();
auto seqCol = CBAMseq.fullColumn();
auto qualCol = CBAMqual.fullColumn();
auto tagsCol = CBAMtags.fullColumn();

BamReadBlobStream reader = BamReadBlobStream(fn2);
int i = 0;

auto rangeFlagCol = fileR.getColumn!(bamFields.flag);
while (!reader.empty())
{
auto temp = reader.front();

assert(rangeFlagCol.front() == temp._flag);
if (!rangeFlagCol.empty)
rangeFlagCol.popFront();

// writefln("Pos: {%s} _ {%s}", posCol[i], temp.pos);
assert(posCol[i] == temp.pos);
// writefln("Flag: {%s} _ {%s}", flagCol[i], temp._flag);
// writeln(i);
assert(flagCol[i] == temp._flag);
assert(mapqCol[i] == temp._mapq);
assert(equal(readNam[i], temp.read_name));
assert(equal(cigCol[i], temp.raw_cigar));
assert(equal(seqCol[i], temp.raw_sequence));
assert(equal(qualCol[i], temp.raw_qual));
assert(equal(tagsCol[i], temp.raw_tags));
reader.popFront();
++i;
}
assert(posCol.length - 1 == i);
}

/// Parse FileMeta
void parse_meta()
{
ulong file_size = file.size();
enforce(file_size >= CBAM_MAGIC.sizeof, "File size is smaller than MAGIC string");

file.seek(-1 * (2 * uint.sizeof + ulong.sizeof), SEEK_END);
ubyte[] buffer;
buffer.length = 16;
file.rawRead(buffer);

enforce(buffer[$ - uint.sizeof .. $] == CBAM_MAGIC, "Invalid file");

ulong meta_offset = read!(ulong, Endian.littleEndian, ubyte[])(buffer);
uint meta_size = read!(uint, Endian.littleEndian, ubyte[])(buffer);

file.seek(meta_offset);
if (meta_size > 0)
{
buffer.length = meta_size;
file.rawRead(buffer);

uint rowGroupsAmount = read!(int, Endian.littleEndian, ubyte[])(buffer);

fileMeta.rowGroups.length = rowGroupsAmount;
foreach (ref rowGroup; fileMeta.rowGroups)
{
foreach (ref columnOffset; rowGroup.columnsOffsets)
{
//writeln("BUFFER SIZE: ", buffer.length);
columnOffset = read!(ulong, Endian.bigEndian, ubyte[])(buffer);
//writeln(columnOffset);
}

foreach (ref columnSize; rowGroup.columnsSizes)
{
columnSize = read!(ulong, Endian.bigEndian, ubyte[])(buffer);
}

foreach (ref uncompressedColSize; rowGroup.uncompressedColSizes){
uncompressedColSize = read!(ulong, Endian.bigEndian, ubyte[])(buffer);
}
rowGroup.total_byte_size = read!(ulong, Endian.bigEndian, ubyte[])(buffer);

rowGroup.num_rows = read!(uint, Endian.bigEndian, ubyte[])(buffer);

}
}
parseBamHeader();
}

void parseBamHeader()
{
ubyte[] buffer;
buffer.length = uint.sizeof;
file.rawRead(buffer);
uint header_text_size = read!(uint, Endian.littleEndian, ubyte[])(buffer);

if (header_text_size > 0)
{
buffer.length = header_text_size;
auto test = file.rawRead(buffer);
bamHeader.text = cast(string) buffer.dup;
}

buffer.length = uint.sizeof;
file.rawRead(buffer);
uint num_ref_seq = read!(uint, Endian.littleEndian, ubyte[])(buffer);
bamHeader.refs.length = num_ref_seq;

foreach (ref ref_seq; bamHeader.refs)
{
buffer.length = uint.sizeof;

file.rawRead(buffer);
uint name_length = read!(uint, Endian.littleEndian, ubyte[])(buffer);
if (name_length > 0)
{
buffer.length = name_length;
file.rawRead(buffer);
ref_seq.name = cast(string) buffer.dup;
}

buffer.length = ulong.sizeof;
file.rawRead(buffer);
ref_seq.length = read!(ulong, Endian.littleEndian, ubyte[])(buffer);
}
}

}

+ 555
- 0
source/writer.d View File

@@ -0,0 +1,555 @@
module source.writer;
import std.stdio;
import std.string;
import std.traits;
import std.conv;
import std.file;
import std.bitmanip;
import std.algorithm;
import std.algorithm.iteration;
import std.exception;
import reader;

//import sambamba.utils.lz4;

import utils.bam.reader;

//import utils.bam.header;
import bio.std.experimental.hts.bam.header;

import snappy.snappy;

const ubyte[4] CBAM_MAGIC = ['C', 'B', 'A', 'M'];
const uint simple_field_size = uint.sizeof;
const uint DEFAULT_SIZE = 20_000; // FIX: crashes when default size is bigger than amount of records in file

int bamToCbam(const string fn, const string ofn)
{
BamReadBlobStream reader = BamReadBlobStream(fn);
File file = File(ofn, "w");
FileWriter fileWriter = new FileWriter(file, reader.header);

while (!reader.empty())
{
fileWriter.addRecord(reader.front());
reader.popFront();
}
fileWriter.addRecord(reader.front());
fileWriter.close();

return fileWriter.totalRows;
}

struct RowGroupMeta
{
ulong[EnumMembers!ColumnTypes.length] columnsOffsets;
ulong[EnumMembers!ColumnTypes.length] columnsSizes;
ulong[EnumMembers!ColumnTypes.length] uncompressedColSizes;
ulong total_byte_size;
uint num_rows;
}

struct FileMeta
{
RowGroupMeta[] rowGroups;
}

// Order matters
enum ColumnTypes
{
_refID,
_pos,
_blob_size,
_bin_mq_nl,
_flag_nc,
sequence_length,
_next_refID,
_next_pos,
_tlen,
read_name,
raw_cigar,
raw_sequence,
raw_qual,
raw_tags
}

class FileWriter
{
File file;
FileMeta fileMeta;
BamHeader bamHeader;
int rowGroupSize;
RawReadBlob[] buffer;
int numRows;
int totalRows;

this(File fn, BamHeader bamHead)
{
this(fn, bamHeader, DEFAULT_SIZE);
}

this(File fn, BamHeader bamHead, int groupSize)
{
rowGroupSize = groupSize;
buffer.length = rowGroupSize;
file = fn;
file.rawWrite(CBAM_MAGIC);
bamHeader = bamHead;
numRows = 0;
totalRows = 0;
}

~this()
{
close();
}

void close()
{
if (!file.isOpen)
return;
flushBuf();
writeMeta();
file.close();
}

void addRecord(RawReadBlob record)
{ // blob is struct, passed by value
if (numRows == rowGroupSize)
{
flushBuf();
numRows = 0;
}
buffer[numRows++] = record;
}

void flushBuf()
{
writeRowGroup(buffer[0..numRows], numRows); // when buffer.length > numRows
}

void writeRowGroup(RawReadBlob[] recordBuf, uint num_rows)
{
assert(file.isOpen());

RowGroupMeta rowGroupMeta;
rowGroupMeta.num_rows = num_rows;
totalRows += numRows;

uint total_size = 0;
ubyte[] buf;
buf.length = num_rows * int.sizeof; // biggest field is int32

foreach (columnType; EnumMembers!ColumnTypes)
{
if (columnType < ColumnTypes.read_name)
{ // Types which memory size is known
rowGroupMeta.columnsOffsets[columnType] = file.tell(); // may be wrong, check
for (int i = 0; i < num_rows; ++i)
{
writeFieldToBuf(buf, columnType, recordBuf[i], i * simple_field_size);
}
rowGroupMeta.uncompressedColSizes[columnType] = buf.length;
rowGroupMeta.columnsSizes[columnType] = writeColumn(
buf[0 .. num_rows * simple_field_size]);
}
else
{
buf.length = calcBufSize(columnType, recordBuf) + int.sizeof * num_rows;
rowGroupMeta.columnsOffsets[columnType] = file.tell();

int currentPos = 0;
for (int i = 0; i < num_rows; ++i)
{
writeVarFieldToBuf(buf, columnType, recordBuf[i], currentPos);
}
rowGroupMeta.uncompressedColSizes[columnType] = buf.length;
rowGroupMeta.columnsSizes[columnType] = writeColumn(buf[0 .. currentPos]);
}
}

rowGroupMeta.total_byte_size = reduce!((a, b) => a + b)(rowGroupMeta.columnsSizes);
fileMeta.rowGroups ~= rowGroupMeta;
}

unittest
{
auto fn = getcwd() ~ "/source/tests/test1.bam";

File testFile = File.tmpfile();
scope (exit)
testFile.close();

BamBlobReader reader = BamBlobReader(fn);

const int BATCH_SIZE = 1_00_000;
FileWriter fileWriter = new FileWriter(testFile, reader.header, BATCH_SIZE);

while (!reader.empty())
{
fileWriter.addRecord(reader.fetch());
}
fileWriter.writeMeta();
testFile.flush();
testFile.rewind();

FileReader fileReader = new FileReader(testFile);
BamBlobReader reader2 = BamBlobReader(fn);
int rowGroupNum = 0;
RawReadBlob[] recordBuf;
recordBuf.length = BATCH_SIZE;
int num_rows = 0;
while (!reader.empty())
{
while (num_rows < BATCH_SIZE && !reader.empty())
{
recordBuf[num_rows++] = reader.fetch();
}
auto testBuf = fileReader.readRowGroup(rowGroupNum);
for (int i = 0; i < num_rows; i++)
{
assert(recordBuf[i]._bin_mq_nl == testBuf[i]._bin_mq_nl);
assert(recordBuf[i]._flag_nc == testBuf[i]._flag_nc);
assert(recordBuf[i].sequence_length == testBuf[i].sequence_length);
assert(recordBuf[i]._next_pos == testBuf[i]._next_pos);
assert(recordBuf[i]._next_refID == testBuf[i]._next_refID);
assert(recordBuf[i]._mapq == testBuf[i]._mapq);
assert(recordBuf[i]._flag == testBuf[i]._flag);
assert(equal(recordBuf[i].read_name, testBuf[i].read_name));
assert(equal(recordBuf[i].raw_cigar, testBuf[i].raw_cigar));
assert(equal(recordBuf[i].raw_qual, testBuf[i].raw_qual));
assert(equal(recordBuf[i].raw_sequence, testBuf[i].raw_sequence));
}
rowGroupNum++;
}
}

void writeFieldToBuf(ubyte[] buf, ColumnTypes columnType, RawReadBlob readBlob, int offset)
{
//pragma(inline, true);

switch (columnType)
{
case ColumnTypes._refID:
{
std.bitmanip.write!(int, Endian.littleEndian, ubyte[])(buf,
readBlob.refid, offset);
break;
}
case ColumnTypes._pos:
{
std.bitmanip.write!(int, Endian.littleEndian, ubyte[])(buf, readBlob.pos, offset);
break;
}
case ColumnTypes._blob_size:
{
uint blob_size = cast(int) readBlob._data.length;
std.bitmanip.write(buf, blob_size, offset);
break;
}
case ColumnTypes._bin_mq_nl:
{
buf[offset .. offset + simple_field_size] = readBlob.raw_bin_mq_nl;
break;
}
case ColumnTypes.sequence_length:
{
buf[offset .. offset + simple_field_size] = readBlob.raw_sequence_length;
break;
}
case ColumnTypes._flag_nc:
{
buf[offset .. offset + simple_field_size] = readBlob.raw_flag_nc;
break;
}
case ColumnTypes._next_pos:
{
buf[offset .. offset + simple_field_size] = readBlob.raw_next_pos;
break;
}
case ColumnTypes._next_refID:
{
buf[offset .. offset + simple_field_size] = readBlob.raw_next_refID;
break;
}
case ColumnTypes._tlen:
{
buf[offset .. offset + simple_field_size] = readBlob.raw_tlen;
break;
}
default:
{
assert(false, "No such type exists");
} //should throw
}
}

void writeVarFieldToBuf(ubyte[] buf, ColumnTypes columnType, RawReadBlob readBlob, ref int offset)
{
//pragma(inline, true);
//if(!(offset%10000)) writeln(offset);

switch (columnType)
{
case ColumnTypes.read_name:
{
int fieldSize = cast(int) readBlob.read_name.length;
write!(int, Endian.littleEndian, ubyte[])(buf, fieldSize, offset);
offset += int.sizeof;
buf[offset .. offset + readBlob.read_name.length] = readBlob.read_name;
offset += readBlob.read_name.length;
break;
}
case ColumnTypes.raw_cigar:
{
int fieldSize = cast(int) readBlob.raw_cigar.length;
write!(int, Endian.littleEndian, ubyte[])(buf, fieldSize, offset);
offset += int.sizeof;
buf[offset .. offset + readBlob.raw_cigar.length] = readBlob.raw_cigar;
offset += readBlob.raw_cigar.length;
break;
}
case ColumnTypes.raw_qual:
{
int fieldSize = cast(int) readBlob.raw_qual.length;
write!(int, Endian.littleEndian, ubyte[])(buf, fieldSize, offset);
offset += int.sizeof;
buf[offset .. offset + readBlob.raw_qual.length] = readBlob.raw_qual;
assert(buf[offset .. offset + readBlob.raw_qual.length] == readBlob.raw_qual);
offset += readBlob.raw_qual.length;
break;
}
case ColumnTypes.raw_sequence:
{
int fieldSize = cast(int) readBlob.raw_sequence.length;
write!(int, Endian.littleEndian, ubyte[])(buf, fieldSize, offset);
offset += int.sizeof;
buf[offset .. offset + readBlob.raw_sequence.length] = readBlob.raw_sequence;
offset += readBlob.raw_sequence.length;
break;
}

case ColumnTypes.raw_tags:
{
auto length = readBlob.raw_tags.length;
write!(int, Endian.littleEndian, ubyte[])(buf, cast(int)length, offset);
offset += int.sizeof;
buf[offset .. offset + length] = readBlob.raw_tags;
offset += length;
break;
}

default:
{
assert(false, "No such type exists");
} //should throw
}
}

/// Returns size of buffer to allocate to hold columnType field of all records
uint calcBufSize(ColumnTypes columnType, RawReadBlob[] recordBuf)
{
//pragma(inline, true);
//writeln("RECORDBUF SIZE: ", recordBuf.length);
switch (columnType)
{
case ColumnTypes.read_name:
return reduce!((a,
b) => a + b)(map!(a => a._l_read_name)(recordBuf));
case ColumnTypes.raw_cigar:
return cast(uint)(int.sizeof * reduce!((a,
b) => a + b)(map!(a => a._n_cigar_op)(recordBuf)));
case ColumnTypes.raw_sequence:
return reduce!((a,
b) => a + b)(map!(a => (a.sequence_length + 1) / 2)(recordBuf));
case ColumnTypes.raw_qual:
return reduce!((a,
b) => a + b)(map!(a => a.sequence_length)(recordBuf));
case ColumnTypes.raw_tags:
return reduce!((a,
b) => a + b)(map!(a => a._tags_length)(recordBuf));
default:
{
assert(false);
}
}
}

ulong writeColumn(ubyte[] column)
{
//pragma(inline, true);

byte[] buf = Snappy.compress(cast(byte[]) column);
file.rawWrite(buf);
return buf.length;
}

// ...|fileMeta|bamHeader| meta_offset |meta_size| MAGIC |
// ...| | | 8 bytes | 4 bytes |4 bytes|
// ...| | | | | |

/// Write meta to file
void writeMeta()
{
ulong meta_offset = file.tell();

ubyte[] buf;
ulong offset = 0;
uint rowGroupsAmount = cast(uint) fileMeta.rowGroups.length;
buf.length = int.sizeof + rowGroupsAmount * RowGroupMeta.sizeof;
write!(int, Endian.littleEndian, ubyte[])(buf, rowGroupsAmount, offset);
offset += int.sizeof;
foreach (rowGroup; fileMeta.rowGroups)
{
foreach (columnOffset; rowGroup.columnsOffsets)
{
std.bitmanip.write(buf, columnOffset, offset);
offset += ulong.sizeof;
}
foreach (columnSize; rowGroup.columnsSizes)
{
std.bitmanip.write(buf, columnSize, offset);
offset += ulong.sizeof;
}
foreach (uncompressedColSize; rowGroup.uncompressedColSizes){
std.bitmanip.write(buf, uncompressedColSize, offset);
offset += ulong.sizeof;
}
std.bitmanip.write(buf, rowGroup.total_byte_size, offset);
offset += ulong.sizeof;

std.bitmanip.write(buf, rowGroup.num_rows, offset);
offset += uint.sizeof;
}

file.rawWrite(buf);
uint meta_size = cast(uint) buf.length;

writeBamHeader(bamHeader, file);

buf.length = ulong.sizeof;
writeToFile(meta_offset, file, buf);
writeToFile(meta_size, file, buf);
file.rawWrite(CBAM_MAGIC);
}

unittest
{
auto fn = getcwd() ~ "/source/tests/test1.bam";

File testFile = File.tmpfile();
scope (exit)
testFile.close();

BamBlobReader reader = BamBlobReader(fn);

FileWriter fileWriter = new FileWriter(testFile, reader.header);
while (!reader.empty())
{
fileWriter.addRecord(reader.fetch());
}

fileWriter.writeMeta();
testFile.flush();

testFile.rewind();
FileReader fileReader = new FileReader(testFile);

assert(fileReader.fileMeta == fileWriter.fileMeta);
assert(equal!"a == b"(fileReader.bamHeader.refs, fileWriter.bamHeader.refs));
assert(equal(fileReader.bamHeader.text, fileWriter.bamHeader.text));
}

/// Writes BamHeader to the file
static void writeBamHeader(BamHeader bamHeader, File file)
{
//pragma(inline, true);
ubyte[] buf;
buf.length = ulong.sizeof;

auto text_size = cast(uint) bamHeader.text.length;

writeToFile(text_size, file, buf);
file.rawWrite(cast(ubyte[]) bamHeader.text);
writeToFile(cast(uint) bamHeader.refs.length, file, buf);

foreach (ref_seq; bamHeader.refs)
{
ubyte[] ref_name = cast(ubyte[]) ref_seq.name.dup;
auto ref_name_length = cast(uint) ref_name.length;

writeToFile(ref_name_length, file, buf);
file.rawWrite(ref_name);
writeToFile(ref_seq.length, file, buf);
}
}

unittest
{
File testFile = File.tmpfile();
scope (exit)
testFile.close();

auto fn = getcwd() ~ "/source/tests/test1.bam";
auto bamReader = BamBlobReader(fn);
BamHeader bamHeader = bamReader.header;

writeBamHeader(bamHeader, testFile);

ubyte[] buf;
buf.length = ulong.sizeof;
writeToFile(cast(ulong) 0, testFile, buf);
writeToFile(cast(uint) 0, testFile, buf);
testFile.rawWrite(CBAM_MAGIC);
testFile.flush();

testFile.rewind();
auto fileReader = new FileReader(testFile);

// writeln(bamHeader.refs.ptr);
// writeln(fileReader.bamHeader.refs.ptr);
assert(equal!"a == b"(fileReader.bamHeader.refs, bamHeader.refs), "Refs are corrupted");
assert(equal(fileReader.bamHeader.text, bamHeader.text), "Text is corrupted");
}

/// Write T obj to file. buf.length must be <= ulong.sizeof
static void writeToFile(T)(T obj, File file, ubyte[] buf)
{
enforce(T.sizeof <= ulong.sizeof);
enforce(buf.length <= ulong.sizeof);
enforce(T.sizeof <= buf.length, "No types longer than ulong");

write!(T, Endian.littleEndian, ubyte[])(buf, obj, 0);
file.rawWrite(buf[0 .. T.sizeof]);
}

unittest
{
File testFile = File.tmpfile();
scope (exit)
testFile.close();

ubyte[] buf;
buf.length = ulong.sizeof;

const ubyte ub = 10;
const ushort us = 12;
const uint ui = 14;
const ulong ul = 16;

writeToFile(ub, testFile, buf);
writeToFile(us, testFile, buf);
writeToFile(ui, testFile, buf);
writeToFile(ul, testFile, buf);
testFile.flush();

testFile.rewind();
buf.length = ubyte.sizeof + ushort.sizeof + uint.sizeof + ulong.sizeof;
testFile.rawRead(buf);

assert(read!(ubyte, Endian.littleEndian, ubyte[])(buf) == ub);
assert(read!(ushort, Endian.littleEndian, ubyte[])(buf) == us);
assert(read!(uint, Endian.littleEndian, ubyte[])(buf) == ui);
assert(read!(ulong, Endian.littleEndian, ubyte[])(buf) == ul);
}
}

+ 675
- 0
utils/bam/reader.d View File

@@ -0,0 +1,675 @@
/*
New style BAM reader. This file is part of Sambamba.
Copyright (C) 2017 Pjotr Prins <pjotr.prins@thebird.nl>

Sambamba is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
by the Free Software Foundation; either version 2 of the License,
or (at your option) any later version.

Sambamba is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
02111-1307 USA

*/

// This is a complete rewrite of Artem Tarasov's original reader.

module utils.bam.reader;

import std.conv;
import core.stdc.stdio: fopen, fread, fclose;
import std.exception;
import std.file;
import std.format : format;
import std.stdio;
import std.string;
import std.typecons;
import std.bitmanip;

//TODO remove these dependecies
import bio.std.hts.bam.cigar;
import bio.std.hts.bam.constants;

import bio.core.utils.exception;

import bio.std.experimental.hts.bgzf;
import bio.std.experimental.hts.constants;

//import utils.bam.header;
import bio.std.experimental.hts.bam.header;


template ReadFlags(alias flag) {
// 0x01: template having multiple segments in sequencing
@property bool is_paired() nothrow { return cast(bool)(flag & 0x1); }
// 0x2: Each segment properly aligned according to the aligner
@property bool is_proper_pair() nothrow { return cast(bool)(flag & 0x2); }
// 0x4: Segment unmapped
@property bool is_unmapped_raw() nothrow { return cast(bool)(flag & 0x4); }
@property bool is_mapped_raw() nothrow { return cast(bool)(!(flag & 0x4)); }
// 0x8: Next segment in template unmapped
@property bool mate_is_unmapped() nothrow { return cast(bool)(flag & 0x8); }
// 0x10: SEQ being reverse complimented
@property bool is_reverse_strand() nothrow { return cast(bool)(flag & 0x10); }
// 0x20: SEQ of the next segment in the template being reverse complemented
@property bool mate_is_reverse_strand() nothrow { return cast(bool)(flag & 0x20); }
// 0x40: The first segment in the template
@property bool is_first_of_pair() nothrow { return cast(bool)(flag & 0x40); }
// 0x80: The last segment in the template
@property bool is_second_of_pair() nothrow { return cast(bool)(flag & 0x80); }
// 0x100: Secondary segment
@property bool is_secondary_alignment() nothrow { return cast(bool)(flag & 0x100); }
// 0x200: Not passing filters, such as platform/vendor quality controls
@property bool is_qc_fail() {
assert(is_mapped_raw,to!string(this));
return cast(bool)(flag & 0x200); }
alias is_qc_fail failed_quality_control;
/// 0x400: PCR or optical duplicate
@property bool is_duplicate() nothrow { return cast(bool)(flag & 0x400); }
/// 0x800: Supplementary alignment
@property bool is_supplementary() nothrow { return cast(bool)(flag & 0x800); }
@property string show_flags() {
string res = format("b%b-%d",flag,flag);
if (is_paired) res ~= " pair";
if (is_proper_pair) res ~= " proper";
if (is_mapped_raw) res ~= " mapped";
if (is_unmapped_raw) res ~= " unmapped";
if (mate_is_unmapped) res ~= " mate_unmapped";
if (is_reverse_strand) res ~= " rev_strand";
if (mate_is_reverse_strand) res ~= " mate_rev_strand";
if (is_first_of_pair) res ~= " first_of_pair";
if (is_second_of_pair) res ~= " second_of_pair";
if (is_secondary_alignment) res ~= " secondary_aln";
if (is_mapped_raw && is_qc_fail) res ~= " qc_fail";
if (is_duplicate) res ~= " duplicate";
if (is_supplementary) res ~= " suppl";
return res;
}
}

template CheckMapped(alias refid) {
@property nothrow bool is_unmapped() {
return is_unmapped_raw;
}
@property bool is_mapped() {
debug {
if (is_mapped_raw) {
assert(refid != -1, "ref_id can not be -1 for mapped read"); // BAM spec
}
}
return !is_unmapped_raw;
}
}

enum Offset {
bin_mq_nl=0, flag_nc=4, flag=6, l_seq=8, next_refID=12, next_pos=16, tlen=20, read_name=24
};

/**
Raw Read buffer containing unparsed data. It should be considered
read-only.

Current implementation is a cluct (class-struct hybrid). The _data
pointer is shared when ReadBlob is assigned to another variable
(i.e., there is a remote dependency). The advantage is that for
each Read data gets allocated on the heap only once.

All offsets are indexed on init (except for tags). When using
fields beyond refid,pos use ProcessReadBlob instead because it
caches values.
*/

struct ReadBlob {
RefId refid; // -1 is invalid (BAM Spec)
GenomePos pos; // 0 coordinate based (BAM spec)
private ubyte[] _data;
uint offset_cigar=int.max, offset_seq=int.max, offset_qual=int.max;

mixin ReadFlags!(_flag);
mixin CheckMapped!(refid);

/*
this(RefId ref_id, GenomePos read_pos, ubyte[] buf) {
refid = ref_id;
pos = read_pos;
_data = buf;
}
*/

@property void cleanup() {
destroy(_data);
_data = null;
}

// Turn ReadBlob into class-struct hybrid or a cluct ;)
// @disable this(this); // disable copy semantics;

@property @trusted nothrow private const T fetch(T)(uint raw_offset) {
ubyte[] buf = cast(ubyte[])_data[raw_offset..raw_offset+T.sizeof];
return cast(const(T))buf.read!(T,Endian.littleEndian)();
}

@property @trusted nothrow private const
uint _bin_mq_nl() { return fetch!uint(Offset.bin_mq_nl); }
@property @trusted nothrow private const
uint _flag_nc() { return fetch!uint(Offset.flag_nc); }
@property @trusted nothrow private const
int sequence_length() { return fetch!int(Offset.l_seq); }
@property @trusted nothrow private const
int _next_refID() { return fetch!int(Offset.next_refID); }
@property @trusted nothrow private const
int _next_pos() { return fetch!int(Offset.next_pos); }
@property @trusted nothrow private const
int _tlen() { return fetch!int(Offset.tlen); } // avoid using TLEN
@property @trusted nothrow private const
ushort _bin() { return _bin_mq_nl >> 16; }
@property @trusted nothrow private const
ubyte _mapq() { return (_bin_mq_nl >> 8) & 0xFF; }
@property @trusted nothrow private const
ubyte _l_read_name() { return _bin_mq_nl & 0xFF; }
@property @trusted nothrow private const
ushort _flag() { return fetch!ushort(Offset.flag); }
@property @trusted nothrow private const
ushort _n_cigar_op() { return _flag_nc & 0xFFFF; }
@property @trusted nothrow private const
uint _read_name_offset() { return Offset.read_name; }
@property @trusted nothrow private
uint _cigar_offset() {
if (offset_cigar == int.max)
offset_cigar = Offset.read_name + cast(uint)(_l_read_name * char.sizeof);
return offset_cigar;
}
@property @trusted nothrow private
uint _seq_offset() {
if (offset_seq == int.max)
offset_seq = _cigar_offset + cast(uint)(_n_cigar_op * uint.sizeof);
return offset_seq;
}
@property @trusted nothrow private
uint _qual_offset() {
if (offset_qual == int.max)
offset_qual = _seq_offset + (sequence_length + 1)/2;
return offset_qual;
}
@property @trusted nothrow private
uint _tags_offset() { return _qual_offset + sequence_length; }
@property @trusted nothrow private
ubyte[] read_name() { return _data[_read_name_offset.._cigar_offset]; }
@property @trusted nothrow private
ubyte[] raw_cigar() { return _data[_cigar_offset.._seq_offset]; }
@property @trusted nothrow private
ubyte[] raw_sequence() { return _data[_seq_offset.._qual_offset]; }

alias sequence_length _l_seq;
alias _mapq mapping_quality; // MAPQ

string toString() {
return "<** " ~ ReadBlob.stringof ~ " (data size " ~ to!string(_data.length) ~ ") " ~ to!string(refid) ~ ":" ~ to!string(pos) ~ " length " ~ to!string(sequence_length) ~ " flags " ~ show_flags() ~ ">";
}

}

struct RawReadBlob {
RefId refid; // -1 is invalid (BAM Spec)
GenomePos pos; // 0 coordinate based (BAM spec)
ubyte[] _data;
uint offset_cigar=int.max, offset_seq=int.max, offset_qual=int.max;

mixin ReadFlags!(_flag);
mixin CheckMapped!(refid);

this(RefId ref_id, GenomePos read_pos, ubyte[] buf) {
refid = ref_id;
pos = read_pos;
_data = buf;
}

@property void cleanup() {
destroy(_data);
_data = null;
}

// Turn ReadBlob into class-struct hybrid or a cluct ;)
// @disable this(this); // disable copy semantics;

@property @trusted nothrow private const ubyte[] raw_fetch(T)(uint raw_offset) {
ubyte[] buf = cast(ubyte[])_data[raw_offset..raw_offset+T.sizeof];
return buf;
}

@property @trusted nothrow public const
ubyte[] raw_bin_mq_nl() { return raw_fetch!uint(Offset.bin_mq_nl); }
@property @trusted nothrow public const
ubyte[] raw_flag_nc() { return raw_fetch!uint(Offset.flag_nc); }
@property @trusted nothrow public const
ubyte[] raw_sequence_length() { return raw_fetch!int(Offset.l_seq); }
@property @trusted nothrow public const
ubyte[] raw_next_refID() { return raw_fetch!int(Offset.next_refID); }
@property @trusted nothrow public const
ubyte[] raw_next_pos() { return raw_fetch!int(Offset.next_pos); }
@property @trusted nothrow public const
ubyte[] raw_tlen() { return raw_fetch!int(Offset.tlen); } // avoid using TLEN
@property @trusted nothrow public const
ubyte[] raw_flag() { return raw_fetch!ushort(Offset.flag); }



@property @trusted nothrow private const T fetch(T)(uint raw_offset) {
//debug printf(cast(char*)to!string(raw_offset));
ubyte[] buf = cast(ubyte[])_data[raw_offset..raw_offset+T.sizeof];
return cast(const(T))buf.read!(T,Endian.littleEndian)();
}

@property @trusted nothrow public const
uint _bin_mq_nl() { return fetch!uint(Offset.bin_mq_nl); }
@property @trusted nothrow public const
uint _flag_nc() { return fetch!uint(Offset.flag_nc); }
@property @trusted nothrow public const
int sequence_length() { return fetch!int(Offset.l_seq); }
@property @trusted nothrow public const
int _next_refID() { return fetch!int(Offset.next_refID); }
@property @trusted nothrow public const
int _next_pos() { return fetch!int(Offset.next_pos); }
@property @trusted nothrow public const
int _tlen() { return fetch!int(Offset.tlen); } // avoid using TLEN
@property @trusted nothrow public const
ushort _bin() { return _bin_mq_nl >> 16; }
@property @trusted nothrow public const
ubyte _mapq() { return (_bin_mq_nl >> 8) & 0xFF; }
@property @trusted nothrow public const
ubyte _l_read_name() { return _bin_mq_nl & 0xFF; }
@property @trusted nothrow public const
ushort _flag() { return fetch!ushort(Offset.flag); }
@property @trusted nothrow public const
ushort _n_cigar_op() { return _flag_nc & 0xFFFF; }
@property @trusted nothrow public const
uint _read_name_offset() { return Offset.read_name; }
@property @trusted nothrow public
uint _cigar_offset() {
if (offset_cigar == int.max)
offset_cigar = Offset.read_name + cast(uint)(_l_read_name * char.sizeof);
return offset_cigar;
}
@property @trusted nothrow public
uint _seq_offset() {
if (offset_seq == int.max)
offset_seq = _cigar_offset + cast(uint)(_n_cigar_op * uint.sizeof);
return offset_seq;
}
@property @trusted nothrow public
uint _qual_offset() {
if (offset_qual == int.max)
offset_qual = _seq_offset + (sequence_length + 1)/2;
return offset_qual;
}
@property @trusted nothrow public
uint _tags_offset() { return _qual_offset + sequence_length; }
@property @trusted nothrow public
int _tags_length() { return cast(int) (_data.length - _tags_offset); }
@property @trusted nothrow public
ubyte[] read_name() { return _data[_read_name_offset.._cigar_offset]; }
@property @trusted nothrow public
ubyte[] raw_cigar() { return _data[_cigar_offset.._seq_offset]; }
@property @trusted nothrow public
ubyte[] raw_sequence() { return _data[_seq_offset.._qual_offset]; }
@property @trusted nothrow public
ubyte[] raw_qual() { return _data[_qual_offset.._qual_offset + sequence_length];}
@property @trusted nothrow public
ubyte[] raw_tags() { return _data[_tags_offset..$];}

string toString() {
return "<** " ~ RawReadBlob.stringof ~ " (data size " ~ to!string(_data.length) ~ ") " ~ to!string(refid) ~ ":" ~ to!string(pos) ~ " length " ~ to!string(sequence_length) ~ " flags " ~ show_flags() ~ ">";
}
}
/**
ProcessReadBlob provides a caching mechanism for ReadBlob fields. Use
this when you need to access field/elements multiple times. Note
that ProcessReadBlob becomes invalid when ReadBlob goes out of scope.
*/
struct ProcessReadBlob {
private Nullable!ReadBlob _read2;
Nullable!int sequence_length2;
private Nullable!string sequence2, read_name2;
private Nullable!CigarOperations cigar2;
private Nullable!GenomePos consumed_reference_bases2;

mixin ReadFlags!(_flag);
mixin CheckMapped!(ref_id);

this(Nullable!ReadBlob _r) {
_read2 = _r;
}

@property void cleanup() {
_read2.cleanup;
}

@property nothrow bool isNull() {
return _read2.isNull;
}

@property RefId ref_id() {
enforce(_read2.is_mapped,"Trying to get ref_id an unmapped read " ~ to!string(_read2));
return _read2.refid;
}

@property RefId raw_ref_id() {
return _read2.refid;
}

@property nothrow uint _flag_nc() {
return _read2._flag_nc;
}

@property nothrow ushort _flag() {
return _read2._flag;
}

alias ref_id refid;

/// Get the start position on the reference sequence (better use
/// start_loc), i.e., the first base that gets consumed in the
/// CIGAR.
@property GenomePos start_pos() {
assert(_read2.is_mapped,"Trying to get pos on an unmapped read"); // BAM spec
asserte(_read2.pos < GenomePos.max);
return cast(GenomePos)_read2.pos;
}

@property GenomePos raw_start_pos() {
return cast(GenomePos)_read2.pos;
}

/// Get the end position on the reference sequence (better use end_loc)
@property GenomePos end_pos() {
assert(sequence_length > 0, "Trying to get end_pos on an empty read sequence");
assert(!consumed_reference_bases.isNull);
return start_pos + consumed_reference_bases;
}

@property GenomePos raw_end_pos() {
return raw_start_pos + consumed_reference_bases;
}

@property GenomeLocation start_loc() {
return GenomeLocation(ref_id,start_pos);
}

@property GenomeLocation end_loc() {
return GenomeLocation(ref_id,end_pos);
}

@property @trusted MappingQuality mapping_quality() { // MAPQ
assert(_read2.is_mapped,"Trying to get MAPQ on an unmapped read"); // BAM spec
return MappingQuality(_read2.mapping_quality);
}

@property @trusted int tlen() { // do not use
return _read2._tlen;
}

@property @trusted GenomePos sequence_length() {
if (sequence_length2.isNull)
sequence_length2 = _read2.sequence_length;
return sequence_length2;
}

/// Count and caches consumed reference bases. Uses raw_cigar to
/// avoid a heap allocation.
@property @trusted Nullable!GenomePos consumed_reference_bases() {
if (consumed_reference_bases2.isNull) {
assert(_read2.is_mapped,"Trying to get consumed bases on an unmapped read"); // BAM spec
assert(!read_name.isNull,"Trying to get CIGAR on RNAME is '*'"); // BAM spec
auto raw = cast(uint[]) _read2.raw_cigar();
if (raw.length==1 && raw[0] == '*')
return consumed_reference_bases2; // null
else {
GenomePos bases = 0;
for (size_t i = 0; i < raw.length; i++) {
auto cigarop = CigarOperation(raw[i]);
if (cigarop.is_reference_consuming)
bases += cigarop.length;
}
consumed_reference_bases2 = bases;
}
}
return consumed_reference_bases2;
}

/// Count query consumed bases. Uses raw_cigar to avoid a heap
/// allocation.
@property @trusted GenomePos consumed_query_bases() {
assert(_read2.is_mapped,"Trying to get consumed bases on an unmapped read"); // BAM spec
assert(!read_name.isNull,"Trying to get CIGAR on RNAME is '*'"); // BAM spec
auto raw = cast(uint[]) _read2.raw_cigar();
if (raw.length==1 && raw[0] == '*')
return consumed_reference_bases2; // null
else {
GenomePos bases = 0;