1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
'''Genotype database reader
This module is a tiny Python library to read a GeneNetwork genotype
database. It exports the following functions.
* open - Open a genotype database
* matrix - Get current matrix
* nparray - Get matrix as a 2D numpy array
* row - Get row of matrix
* column - Get column of matrix
Here is a typical invocation to read row 17 and column 13 from a genotype
database at `/tmp/bxd`.
from gn3 import genodb
with genodb.open('/tmp/bxd') as db:
matrix = genodb.matrix(db)
print(genodb.nparray(matrix))
print(genodb.row(matrix, 17))
print(genodb.column(matrix, 13))
'''
from collections import namedtuple
from contextlib import contextmanager
import lmdb
import numpy as np
# pylint: disable=invalid-name,redefined-builtin
GenotypeDatabase = namedtuple('GenotypeDatabase', 'txn hash_length')
Matrix = namedtuple('Matrix', 'db nrows ncols array transpose')
@contextmanager
def open(path):
'''Open genotype database.'''
env = lmdb.open(path, readonly=True, create=False)
txn = env.begin()
yield GenotypeDatabase(txn, 32) # 32 bytes in a SHA256 hash
txn.abort()
env.close()
def get(db, key):
'''Get value associated with key in genotype database.'''
return db.txn.get(key)
def get_metadata(db, hash, metadata):
'''Get metadata associated with hash in genotype database.'''
return db.txn.get(hash + b':' + metadata.encode())
def matrix(db):
'''Get current matrix from genotype database.'''
hash = get(db, b'versions')[0:db.hash_length]
read_optimized_blob = get(db, get(db, b'current'))
nrows = int.from_bytes(get_metadata(db, hash, 'nrows'), byteorder='little')
ncols = int.from_bytes(get_metadata(db, hash, 'ncols'), byteorder='little')
return Matrix(db, nrows, ncols,
np.reshape(np.frombuffer(read_optimized_blob[0 : nrows*ncols], dtype=np.uint8),
(nrows, ncols)),
np.reshape(np.frombuffer(read_optimized_blob[nrows*ncols :], dtype=np.uint8),
(nrows, ncols)))
def nparray(matrix):
'''Get matrix as a 2D numpy array.'''
# pylint: disable=redefined-outer-name
return matrix.array
def row(matrix, index):
'''Get row of matrix.'''
# pylint: disable=redefined-outer-name
return matrix.array[index,:]
def column(matrix, index):
'''Get column of matrix.'''
# pylint: disable=redefined-outer-name
return matrix.transpose[index,:]
|