Browse Source

improved sort performance (needs testing)

-O3 causes miscompilation, use -O2
master
Artem Tarasov 8 years ago
parent
commit
37b3c9798d
  1. 2
      BioD
  2. 8
      Makefile
  3. 143
      sambamba/sort.d
  4. 2
      thirdparty/mergesort.d

2
BioD

@ -1 +1 @@
Subproject commit ec30412d022db12068f2969908935b8eb71ab308
Subproject commit c664bfa7c3956e64df7d096514358efe93cfd12b

8
Makefile

@ -1,7 +1,7 @@
D_COMPILER=ldmd2
D_FLAGS=-IBioD #-O -release -inline -g# -version=serial
D_FLAGS=-IBioD -O2 #-release -inline -g# -version=serial
RDMD_FLAGS=--compiler=$(D_COMPILER) --build-only $(D_FLAGS)
RDMD_FLAGS=--force --compiler=$(D_COMPILER) --build-only $(D_FLAGS)
all:
mkdir -p build/
@ -13,11 +13,11 @@ release:
sambamba-ldmd2-64:
mkdir -p build/
rdmd --build-only --force --compiler=ldmd2 -O3 -m64 -noboundscheck -release -inline -IBioD/ -ofbuild/sambamba main.d
rdmd --build-only --force --compiler=ldmd2 -O2 -m64 -noboundscheck -release -inline -IBioD/ -ofbuild/sambamba main.d
sambamba-ldmd2-32:
mkdir -p build/
rdmd --build-only --force --compiler=ldmd2 -O3 -m32 -release -inline -IBioD/ -ofbuild/sambamba main.d
rdmd --build-only --force --compiler=ldmd2 -O2 -m32 -release -inline -IBioD/ -ofbuild/sambamba main.d
sambamba-flagstat:
mkdir -p build/

143
sambamba/sort.d

@ -40,6 +40,9 @@ import std.stream;
import std.stdio;
import core.atomic;
import std.c.stdlib;
import std.c.string;
import sambamba.utils.common.progressbar;
import thirdparty.mergesort;
@ -89,21 +92,79 @@ class Sorter {
string output_filename = null;
string filename = null;
private static auto chunks(R)(R reads, size_t size_in_bytes) {
struct UnsortedChunk {
size_t max_sz;
this(size_t max_total_size) {
max_sz = max_total_size;
read_storage = cast(ubyte*)std.c.stdlib.malloc(max_sz);
_reads_capa = 1024;
auto sz = BamRead.sizeof * _reads_capa;
_reads = cast(BamRead*)std.c.stdlib.malloc(sz);
}
void clear() {
_used = 0;
_n_reads = 0;
}
void fill(R)(R* reads) {
while (!reads.empty) {
auto read = reads.front;
auto len = read.raw_data.length;
if (len + _used > max_sz)
break;
std.c.string.memcpy(read_storage + _used, read.raw_data.ptr, len);
if (_n_reads == _reads_capa) {
_reads_capa *= 2;
_reads = cast(BamRead*)std.c.stdlib.realloc(_reads, _reads_capa * BamRead.sizeof);
}
_reads[_n_reads].raw_data = read_storage[_used .. _used + len];
_reads[_n_reads].associateWithReader(read.reader);
_n_reads += 1;
_used += len;
reads.popFront();
}
static size_t approxSize(BamRead read) {
return BamRead.sizeof * 3 / 2 + read.size_in_bytes;
if (_n_reads == 0) {
auto read = reads.front;
auto len = read.raw_data.length;
assert(len > max_sz);
_n_reads = 1;
read_storage = cast(ubyte*)std.c.stdlib.realloc(read_storage, len);
_used = len;
read_storage[0 .. len] = read.raw_data[];
_reads[0].raw_data = read_storage[0 .. _used];
_reads[0].associateWithReader(read.reader);
reads.popFront();
}
}
// false means that each chunk may contain reads with different ref. ID
return ReadRangeSplitter!(R, approxSize)(reads, size_in_bytes, false);
void free() {
std.c.stdlib.free(read_storage);
std.c.stdlib.free(_reads);
}
BamRead[] reads() @property { return _reads[0 .. _n_reads]; }
BamRead* _reads;
size_t _reads_capa;
size_t _n_reads;
ubyte* read_storage;
size_t _used;
}
static BamRead[] sortChunk(BamRead[] chunk, TaskPool task_pool) {
auto buf = cast(BamRead*)std.c.stdlib.malloc(chunk.length * BamRead.sizeof);
BamRead[] tmp = buf[0 .. chunk.length];
scope (exit) std.c.stdlib.free(buf);
if (!sort_by_name) {
mergeSort!compareCoordinates(chunk, task_pool);
mergeSort!(compareCoordinates, false)(chunk, task_pool, tmp);
} else {
mergeSort!compareReadNames(chunk, task_pool);
mergeSort!(compareReadNames, false)(chunk, task_pool);
}
return chunk;
}
@ -111,6 +172,7 @@ class Sorter {
void sort() {
createHeader();
bam.assumeSequentialProcessing();
if (show_progress) {
stderr.writeln("Writing sorted chunks to temporary directory...");
bar = new shared(ProgressBar)();
@ -144,31 +206,55 @@ class Sorter {
SortingOrder.coordinate;
}
private void writeSortedChunks(R)(R reads) {
auto unsorted_chunks = chunks(reads, memory_limit);
foreach (unsorted_chunk; unsorted_chunks)
{
auto chunk = sortChunk(unsorted_chunk, task_pool);
private void writeSortedChunks(R)(R reads_) {
auto buf1 = UnsortedChunk(memory_limit / 2);
auto buf2 = UnsortedChunk(memory_limit / 2);
scope(exit) { buf1.free(); buf2.free(); }
auto fn = tmpFile(chunkBaseName(filename, num_of_chunks), tmpdir);
tmpfiles ~= fn;
Task!(sortChunk, BamRead[], TaskPool)* sorting_task;
Stream stream = new BufferedFile(fn, FileMode.Out);
scope(failure) stream.close();
auto reads = reads_;
auto writer = new BamWriter(stream,
uncompressed_chunks ? 0 : 1,
task_pool);
scope(exit) writer.finish();
// 1) fill buf1
// 2) sort buf1 in parallel with filling buf2
// 3) dump buf1
// 4) sort buf2 in parallel with filling buf1
// ...
while (!reads.empty) {
buf1.clear();
buf1.fill(&reads);
writer.writeSamHeader(header);
writer.writeReferenceSequenceInfo(bam.reference_sequences);
if (sorting_task !is null)
dump(sorting_task.yieldForce());
foreach (read; chunk)
writer.writeRecord(read);
num_of_chunks += 1;
sorting_task = task!sortChunk(buf1.reads, task_pool);
task_pool.put(sorting_task);
swap(buf1, buf2);
}
if (sorting_task !is null)
dump(sorting_task.yieldForce());
}
private void dump(BamRead[] sorted_reads) {
auto fn = tmpFile(chunkBaseName(filename, num_of_chunks), tmpdir);
tmpfiles ~= fn;
Stream stream = new BufferedFile(fn, FileMode.Out);
scope(failure) stream.close();
auto writer = new BamWriter(stream,
uncompressed_chunks ? 0 : 1,
task_pool);
scope(exit) writer.finish();
writer.writeSamHeader(header);
writer.writeReferenceSequenceInfo(bam.reference_sequences);
foreach (read; sorted_reads)
writer.writeRecord(read);
num_of_chunks += 1;
}
private void mergeSortedChunks(alias comparator)() {
@ -206,6 +292,7 @@ class Sorter {
foreach (i; 0 .. num_of_chunks) {
auto bamfile = new BamReader(tmpfiles[i], task_pool);
bamfile.setBufferSize(memory_limit / 2 / num_of_chunks);
bamfile.assumeSequentialProcessing();
alignmentranges[i] = bamfile.readsWithProgress(
// WTF is going on here? See this thread:
// http://forum.dlang.org/thread/mailman.112.1341467786.31962.digitalmars-d@puremagic.com
@ -234,6 +321,7 @@ class Sorter {
foreach (i; 0 .. num_of_chunks) {
auto bamfile = new BamReader(tmpfiles[i]);
bamfile.setBufferSize(memory_limit / 2 / num_of_chunks);
bamfile.assumeSequentialProcessing();
alignmentranges[i] = bamfile.reads!withoutOffsets;
}
@ -287,8 +375,7 @@ int sort_main(string[] args) {
sorter.memory_limit = parseMemory(memory_limit_str);
}
// TODO: find a better formula
sorter.memory_limit /= 4;
sorter.memory_limit = (sorter.memory_limit * 2) / 3;
sorter.task_pool = new TaskPool(n_threads);
scope(exit) sorter.task_pool.finish();

2
thirdparty/mergesort.d

@ -41,7 +41,7 @@ import std.range, std.algorithm, std.functional, std.array, std.parallelism;
-----------------
++/
@trusted SortedRange!(R, less) mergeSort(alias less = "a < b", bool half = true, R)(R range, TaskPool task_pool=taskPool, ElementType!(R)[] temp = null)
@trusted SortedRange!(R, less) mergeSort(alias less = "a < b", bool half = true, R)(R range, TaskPool task_pool=taskPool, R temp = null)
{
static assert(isRandomAccessRange!R);
static assert(hasLength!R);

Loading…
Cancel
Save