1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
"""The R/qtl2 parsing and processing code."""
import io
import csv
import json
import yaml
from pathlib import Path
from functools import reduce
from typing import Any, List, Union, Iterator
from zipfile import ZipFile, ZipInfo, is_zipfile
from quality_control.debug import __pk__
from r_qtl.errors import InvalidFormat
def thread_op(value, *functions):
"""Thread the `value` through the sequence of `functions`."""
return reduce(lambda result, func: func(result), functions, value)
def control_data(zfile: ZipFile) -> dict:
"""Retrieve the control file from the zip file info."""
files = tuple(filename
for filename in zfile.namelist()
if (filename.endswith(".yaml") or filename.endswith(".json")))
num_files = len(files)
if num_files == 0:
raise InvalidFormat("Expected a json or yaml control file.")
if num_files > 1:
raise InvalidFormat("Found more than one possible control file.")
return (json.loads(zfile.read(files[0]))
if files[0].endswith(".json")
else yaml.safe_load(zfile.read(files[0])))
def genotype_metadata(zfile: ZipFile, cdata: dict) -> dict:
"""Read Individual ID key and the marker names."""
# TODO: Handle transposed files
line_num = 0
with zfile.open(cdata["geno"]) as genofile:
for line in filter(lambda line: not line.startswith("#"),
io.TextIOWrapper(genofile)):
line_parts = line.strip().split(cdata.get("sep", ","))
return {
"individual_id_key": line_parts[0].strip(),
"markers": tuple(marker.strip() for marker in line_parts[1:])
}
def genotype_data(zfile: ZipFile, cdata: dict) -> Iterator[dict]:
"""Load the genotype file, making use of the control data."""
# TODO: Handle transposed files
with zfile.open(cdata["geno"]) as genofile:
reader = csv.DictReader(filter(lambda line: not line.startswith("#"),
io.TextIOWrapper(genofile)),
delimiter=cdata.get("sep", ","))
for row in reader:
yield {
key: thread_op(
value,
# replace genotype codes
lambda val: cdata["genotypes"].get(val, val),
# replace N/A strings
lambda val: (None if val in cdata["na.strings"] else val))
for key,value
in row.items()
}
def read_r_qtl2_files(filepath: Path):
"""Read R/qtl2 format zip files."""
with ZipFile(filepath, "r") as zfile:
cf = control_data(zfile)
raise NotImplementedError("Implementation is incomplete.")
|