1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
|
"""This scripts reads and store genotype files to an LMDB store.
Similarly, it can be use to read this data.
Example:
guix shell python-click python-lmdb python-wrapper python-numpy -- \
python lmdb_matrix.py import-genotype \
<path-to-genotype-file> <path-to-lmdb-store>
guix shell python-click python-lmdb python-wrapper python-numpy -- \
python lmdb_matrix.py print-current-matrix \
<path-to-lmdb-store>
"""
from pathlib import Path
from subprocess import check_output
import lmdb
import json
import click
import numpy as np
from dataclasses import dataclass
@dataclass
class GenotypeMatrix:
"""Store the actual Genotype Matrix"""
matrix: np.ndarray
metadata: dict
file_info: dict
def count_trailing_newlines(file_path):
with open(file_path, 'rb') as stream:
stream.seek(0, 2) # Move to the end of the file
file_size = stream.tell()
if file_size == 0:
return 0
chunk_size = 1024 # Read in small chunks
empty_lines = 0
_buffer = b""
# Read chunks from the end backward
while stream.tell() > 0:
# Don't read beyond start
chunk_size = min(chunk_size, stream.tell())
stream.seek(-chunk_size, 1) # Move backward
chunk = stream.read(chunk_size) + _buffer
stream.seek(-chunk_size, 1) # Move back to start of chunk
# Decode chunk to text
try:
chunk_text = chunk.decode('utf-8', errors='ignore')
except UnicodeDecodeError:
# If decoding fails, keep some bytes for the next
# chunk Keep last 16 bytes to avoid splitting
# characters
_buffer = chunk[-16:]
continue
# Split into lines from the end
lines = chunk_text.splitlines()
# Count empty lines from the end
for line in reversed(lines):
if line.strip() != "":
return empty_lines
empty_lines += 1
if stream.tell() == 0:
return empty_lines
return empty_lines
def wc(filename):
"""Get total file count of a file"""
return int(check_output(["wc", "-l", filename]).split()[0]) - \
count_trailing_newlines(filename)
def get_genotype_metadata(genotype_file: str) -> tuple[dict, dict]:
"""Parse metadata from a genotype file, separating '@'-prefixed
and '#'-prefixed entries.
This function reads a genotype file and extracts two types of metadata:
- '@'-prefixed metadata (e.g., '@name:BXD'), stored as key-value
pairs for dataset attributes.
- '#'-prefixed metadata (e.g., '# File name: BXD_Geno...'), stored
as key-value pairs for file information. Lines starting with
'#' without a colon are skipped as comments. Parsing stops at
the first non-metadata line.
Args:
genotype_file (str): Path to the genotype file to be parsed.
Returns:
tuple[dict, dict]: A tuple containing two dictionaries:
- First dict: '@'-prefixed metadata (e.g., {'name': 'BXD',
'type': 'riset'}).
- Second dict: '#'-prefixed metadata with colons (e.g.,
{'File name': 'BXD_Geno...', 'Citation': '...'}).
Example:
>>> meta, file_info = get_genotype_metadata("BXD.small.geno")
>>> print(meta)
{'name': 'BXD', 'type': 'riset', 'mat': 'B', 'pat': 'D',
'het': 'H', 'unk': 'U'}
>>> print(file_info)
{'File name': 'BXD_Geno-19Jan2017b_forGN.xls', 'Metadata':
'Please retain...'}
"""
metadata = {}
file_metadata = {}
with open(genotype_file, "r") as stream:
while True:
line = stream.readline().strip()
match line:
case meta if line.startswith("#"):
if ":" in meta:
key, value = meta[2:].split(":", 1)
file_metadata[key] = value
case meta if line.startswith("#"):
continue
case meta if line.startswith("@") and ":" in line:
key, value = meta[1:].split(":", 1)
if value:
metadata[key] = value.strip()
case _:
break
return metadata, file_metadata
def get_genotype_dimensions(genotype_file: str) -> tuple[int, int]:
"""Calculate the dimensions of the data section in a genotype
file.
This function determines the number of data rows and columns in a
genotype file. It skips metadata lines (starting with '#' or '@')
and uses the first non-metadata line as the header to count
columns. The total number of lines is counted in binary mode to
efficiently handle large files, and the number of data rows is
calculated by subtracting metadata and header lines. Accounts for
a potential trailing newline.
Args:
genotype_file (str): Path to the genotype file to be analyzed.
Returns:
tuple[int, int]: A tuple containing:
- First int: Number of data rows (excluding metadata and
header).
- Second int: Number of columns (based on the header row).
Example:
>>> rows, cols = get_genotype_dimensions("BXD.small.geno")
>>> print(rows, cols)
2, 202 # Example: 2 data rows, 202 columns (from header)
Note:
Assumes the first non-metadata line is the header row, split
by whitespace. A trailing newline may be included in the line
count but is accounted for in the returned row count.
"""
counter = 0
rows = []
with open(genotype_file, "r") as stream:
while True:
line = stream.readline()
counter += 1
match line:
case "" | _ if line.startswith(("#", "@")):
continue
case _:
rows = line.split()
break
return wc(genotype_file) - counter, len(rows)
def read_genotype_headers(genotype_file: str) -> list[str]:
"""Extract the header row from a genotype file.
This function reads a genotype file and returns the first
non-metadata line as a list of column headers. It skips lines
starting with '#' (comments), '@' (metadata), or empty lines,
assuming the first non-skipped line contains the header (e.g.,
'Chr', 'Locus', 'cM', 'Mb', followed by strain names like 'BXD1',
'BXD2', etc.). The header is split by whitespace to create the
list of column names.
Args:
genotype_file (str): Path to the genotype file to be parsed.
Returns:
list[str]: A list of column headers from the first non-metadata line.
Example:
>>> headers = read_genotype_headers("BXD.small.geno")
>>> print(headers)
['Chr', 'Locus', 'cM', 'Mb', 'BXD1', 'BXD2', ..., 'BXD220']
"""
rows = []
with open(genotype_file, "r") as stream:
while True:
line = stream.readline()
match line:
case _ if line.startswith("#") or line.startswith("@") or line == "":
continue
case _:
rows = line.split()
break
return rows
def read_genotype_file(genotype_file: str) -> GenotypeMatrix:
"""Read a genotype file and construct a GenotypeMatrix object.
This function parses a genotype file to extract metadata and
genotype data, creating a numerical matrix of genotype values and
associated metadata. It processes:
- '@'-prefixed metadata (e.g., '@mat:B') for dataset attributes
like maternal/paternal alleles.
- '#'-prefixed metadata (e.g., '# File name:...') for file
information.
- Header row for column names (e.g., 'Chr', 'Locus', BXD strains).
- Data rows, converting genotype symbols (e.g., 'B', 'D', 'H',
'U') to numeric values (0, 1, 2, 3) based on metadata mappings.
The function skips comment lines ('#'), metadata lines ('@'), and
empty lines, and organizes the data into a GenotypeMatrix with a
numpy array and metadata dictionaries.
Args:
genotype_file (str): Path to the genotype file to be parsed.
Returns:
GenotypeMatrix: An object containing:
- matrix: A numpy array (nrows x ncols) with genotype values (0:
maternal, 1: paternal, 2: heterozygous, 3: unknown).
- metadata: A dictionary with '@'-prefixed metadata, row/column
counts, individuals (BXD strains), metadata columns (e.g., 'Chr',
'Locus'), and lists of metadata values per row.
- file_info: A dictionary with '#'-prefixed metadata (e.g., 'File
name', 'Citation').
Raises:
ValueError: If an unrecognized genotype symbol is encountered in
the data.
Example:
>>> geno_matrix = read_genotype_file("BXD.small.geno")
>>> print(geno_matrix.matrix.shape)
(2, 198) # Example: 2 rows, 198 BXD strains
>>> print(geno_matrix.metadata["name"])
'BXD'
>>> print(geno_matrix.file_info["File name"])
'BXD_Geno-19Jan2017b_forGN.xls'
"""
header = read_genotype_headers(genotype_file)
counter = 0
for i, el in enumerate(header):
if el in ["Chr", "Locus", "cM", "Mb"]:
continue
else:
counter = i
break
metadata_columns, individuals = header[:counter], header[counter:]
nrows, ncols = get_genotype_dimensions(genotype_file)
ncols -= len(metadata_columns)
matrix = np.zeros((nrows, ncols), dtype=np.uint8)
metadata, file_metadata = get_genotype_metadata(genotype_file)
metadata = metadata | {
"nrows": nrows,
"ncols": ncols,
"individuals": individuals,
"metadata_columns": metadata_columns
}
for key in metadata_columns:
metadata[key] = []
maternal = metadata.get("mat")
paternal = metadata.get("pat")
heterozygous = metadata.get("het")
unknown = metadata.get("unk")
locus, chromosomes = [], []
i = 0
sentinel = True
with open(genotype_file, "r") as stream:
while True:
if i == nrows:
break
line = stream.readline().split()
meta, data = [], []
if line and line[0] in metadata_columns:
# Skip the metadata column
line = stream.readline().split()
sentinel = False
if len(line) == 0 or (line[0].startswith("#") and sentinel) \
or line[0].startswith("@"):
continue
meta, data = line[:len(metadata_columns)
], line[len(metadata_columns):]
for j, el in enumerate(data):
match el:
case _ if el.isdigit():
matrix[i, j] = int(el)
case _ if maternal == el:
matrix[i, j] = 0
case _ if paternal == el:
matrix[i, j] = 1
case _ if heterozygous == el:
matrix[i, j] = 2
case _ if unknown == el:
matrix[i, j] = 3
case _:
raise ValueError
i += 1
__map = dict(zip(metadata_columns, meta))
for key in metadata_columns:
metadata[key].append(__map.get(key))
genotype_matrix = GenotypeMatrix(
matrix=matrix,
metadata=metadata,
file_info=file_metadata
)
return genotype_matrix
def create_database(db_path: str) -> lmdb.Environment:
"""Create or open an LMDB environment."""
return lmdb.open(db_path, map_size=100 * 1024 * 1024, create=True)
def genotype_db_put(db: lmdb.Environment, genotype: GenotypeMatrix) -> bool:
"""Put genotype GENOTYPEMATRIX from DB environment"""
metadata = json.dumps(genotype.metadata).encode("utf-8")
file_info = json.dumps(genotype.file_info).encode("utf-8")
with db.begin(write=True) as txn:
txn.put(b"matrix", genotype.matrix.tobytes())
txn.put(b"metadata", metadata)
# XXXX: KLUDGE: Put this in RDF instead
txn.put(b"file_info", file_info)
return True
def genotype_db_get(db: lmdb.Environment) -> GenotypeMatrix:
"""Get genotype GENOTYPEMATRIX from DB environment"""
with db.begin() as txn:
metadata = json.loads(txn.get(b"metadata").decode("utf-8"))
nrows, ncols = metadata.get("nrows"), metadata.get("ncols")
matrix = np.frombuffer(
txn.get(b"matrix"), dtype=np.uint8).reshape(nrows, ncols)
return GenotypeMatrix(
matrix=matrix,
metadata=metadata,
file_info=json.loads(txn.get(b"file_info"))
)
def get_genotype_files(directory: str) -> list[tuple[str, int]]:
geno_files = [
(_file.as_posix(), _file.stat().st_size)
for _file in Path(directory).glob("*.geno") if _file.is_file()]
return sorted(geno_files, key=lambda x: x[1])
def __import_directory(directory: str, lmdb_path: str):
for genofile, file_size in get_genotype_files(directory):
genofile = Path(genofile)
size_mb = file_size / (1024 ** 2)
lmdb_store = (Path(lmdb_path) / genofile.stem).as_posix()
print(f"Processing file: {genofile.name}")
with create_database(lmdb_store) as db:
genotype_db_put(
db=db, genotype=read_genotype_file(genofile.as_posix()))
print(f"\nSuccessfuly created: [{size_mb:.2f} MB] {genofile.stem}")
@click.command(help="Import the genotype directory")
@click.argument("genotype_directory")
@click.argument("lmdb_path")
def import_directory(genotype_directory: str, lmdb_path: str):
"Import a genotype directory into genotype_database path"
__import_directory(directory=genotype_directory, lmdb_path=lmdb_path)
@click.command(help="Import the genotype file")
@click.argument("geno_file")
@click.argument("genotype_database")
def import_genotype(geno_file: str, genotype_database: str):
"Import a genotype file into genotype_database path"
with create_database(genotype_database) as db:
genotype_db_put(db=db, genotype=read_genotype_file(geno_file))
@click.command(help="Print the current matrix")
@click.argument("database_directory")
def print_current_matrix(database_directory: str):
"""Print the current matrix in the database."""
with create_database(database_directory) as db:
current = genotype_db_get(db)
print(f"Matrix: {current.matrix}")
print(f"Metadata: {current.metadata}")
print(f"File Info: {current.file_info}")
@click.group()
def cli():
pass
cli.add_command(print_current_matrix)
cli.add_command(import_genotype)
cli.add_command(import_directory)
if __name__ == "__main__":
cli()
|