1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
"""The R/qtl2 parsing and processing code."""
import io
import csv
import json
import yaml
from pathlib import Path
from functools import reduce
from zipfile import ZipFile, ZipInfo, is_zipfile
from typing import Any, List, Union, Iterator, Iterable
from r_qtl.errors import InvalidFormat
from quality_control.parsing import take
def thread_op(value, *functions):
"""Thread the `value` through the sequence of `functions`."""
return reduce(lambda result, func: func(result), functions, value)
def control_data(zfile: ZipFile) -> dict:
"""Retrieve the control file from the zip file info."""
files = tuple(filename
for filename in zfile.namelist()
if (filename.endswith(".yaml") or filename.endswith(".json")))
num_files = len(files)
if num_files == 0:
raise InvalidFormat("Expected a json or yaml control file.")
if num_files > 1:
raise InvalidFormat("Found more than one possible control file.")
return (json.loads(zfile.read(files[0]))
if files[0].endswith(".json")
else yaml.safe_load(zfile.read(files[0])))
def genotype_data(zfile: ZipFile, cdata: dict) -> Iterator[dict]:
"""Load the genotype file, making use of the control data."""
def replace_genotype_codes(val):
return cdata["genotypes"].get(val, val)
def replace_na_strings(val):
nastrings = cdata.get("na.strings")
if bool(nastrings):
return (None if val in nastrings else val)
return val
if not cdata.get("geno_transposed", False):
with zfile.open(cdata["geno"]) as genofile:
reader = csv.DictReader(
filter(lambda line: not line.startswith("#"),
io.TextIOWrapper(genofile)),
delimiter=cdata.get("sep", ","))
for row in reader:
yield {
key: thread_op(
value,
replace_genotype_codes,
replace_na_strings)
for key,value
in row.items()
}
def __merge__(key, samples, line):
marker = line[0]
return tuple(
dict(zip(
[key, marker],
(thread_op(item, replace_genotype_codes, replace_na_strings)
for item in items)))
for items in zip(samples, line[1:]))
def __n_batch__(iterable: Iterable, num):
while True:
items = take(iterable, num)
if len(items) <= 0:
break
yield items
if cdata.get("geno_transposed", False):
with zfile.open(cdata["geno"]) as genofile:
lines = (line.strip().split(cdata.get("sep", ","))
for line in filter(lambda line: not line.startswith("#"),
io.TextIOWrapper(genofile)))
id_line = next(lines)
id_key, samples = id_line[0], id_line[1:]
def __organise_by_id__(acc, item):
row = acc.get(item[id_key], {})
return {**acc, item[id_key]: {**row, **item}}
for _key, row in reduce(# type: ignore[var-annotated]
__organise_by_id__,
(row
for batch in __n_batch__(lines, 300)
for line in batch
for row in __merge__(id_key, samples, line)),
{}).items():
yield row
def map_data(zfile: ZipFile, map_type: str, cdata: dict) -> tuple[dict, ...]:
"""Read gmap files to get the genome mapping data"""
assert map_type in ("genetic-map", "physical-map"), "Invalid map type"
map_file = cdata[{
"genetic-map": "gmap",
"physical-map": "pmap"
}[map_type]]
transposed_dict = {
"genetic-map": "gmap_transposed",
"physical-map": "pmap_transposed"
}
if not cdata.get(transposed_dict[map_type], False):
with zfile.open(map_file) as gmapfile:
reader = csv.DictReader(
filter(lambda line: not line.startswith("#"),
io.TextIOWrapper(gmapfile)),
delimiter=cdata.get("sep", ","))
return tuple(row for row in reader)
with zfile.open(map_file) as gmapfile:
lines = [[field.strip() for field in
line.strip().split(cdata.get("sep", ","))]
for line in
filter(lambda line: not line.startswith("#"),
io.TextIOWrapper(gmapfile))]
headers = tuple(line[0] for line in lines)
return reduce(
lambda gmap, row: gmap + (dict(zip(headers, row)),),
zip(*(line[1:] for line in lines)),
tuple())
def read_r_qtl2_files(filepath: Path):
"""Read R/qtl2 format zip files."""
with ZipFile(filepath, "r") as zfile:
cf = control_data(zfile)
raise NotImplementedError("Implementation is incomplete.")
|