1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
|
"""The R/qtl2 parsing and processing code."""
import io
import csv
import json
from pathlib import Path
from zipfile import ZipFile
from functools import reduce, partial
from typing import Union, Iterator, Iterable, Callable, Optional
import yaml
from functional_tools import take, chain
from r_qtl.errors import InvalidFormat, MissingFileError
FILE_TYPES = (
"geno", "founder_geno", "pheno", "covar", "phenocovar", "gmap", "pmap",
"phenose")
def control_data(zfile: ZipFile) -> dict:
"""Retrieve the control file from the zip file info."""
files = tuple(filename
for filename in zfile.namelist()
if (filename.endswith(".yaml") or filename.endswith(".json")))
num_files = len(files)
if num_files == 0:
raise InvalidFormat("Expected a json or yaml control file.")
if num_files > 1:
raise InvalidFormat("Found more than one possible control file.")
return {
"na.strings": ["NA"],
"comment.char": "#",
"sep": ",",
**{
f"{key}_transposed": False for key in FILE_TYPES
},
**(json.loads(zfile.read(files[0]))
if files[0].endswith(".json")
else yaml.safe_load(zfile.read(files[0])))
}
def replace_na_strings(cdata, val):
"""Replace values indicated in `na.strings` with `None`."""
return (None if val in cdata.get("na.strings", ["NA"]) else val)
def with_non_transposed(zfile: ZipFile,
member_key: str,
cdata: dict,
process_value: Callable[
[dict], dict] = lambda val: val) -> Iterator[dict]:
"""Process non-transposed file values
Arguments:
zfile: A zipfile object from opening a R/qtl2 bundle.
member_key: A key to retrieve the member file to process from the file.
cdata: The control data from the R/qtl2 bundle read from the JSON/YAML file.
process_value: A function to process the values from the file.
"""
def not_comment_line(line):
return not line.startswith(cdata.get("comment.char", "#"))
sep = cdata.get("sep", ",")
with zfile.open(cdata[member_key]) as innerfile:
try:
wrapped_file = io.TextIOWrapper(innerfile)
firstrow = tuple(
field.strip() for field in
next(filter(not_comment_line, wrapped_file)).strip().split(sep))
id_key = firstrow[0]
wrapped_file.seek(0)
reader = csv.DictReader(filter(not_comment_line, wrapped_file),
delimiter=sep)
for row in reader:
processed = process_value(row)
yield {
"id": processed[id_key],
**{
key: value
for key, value in processed.items()
if key != id_key
}
}
except StopIteration as exc:
raise InvalidFormat("The file has no rows!") from exc
def __make_organise_by_id__(id_key):
"""Return a function to use with `reduce` to organise values by some
identifier."""
def __organiser__(acc, item):
row = acc.get(item[id_key], {})
return {**acc, item[id_key]: {**row, **item}}
return __organiser__
def __batch_of_n__(iterable: Iterable, num):
"""Return a batch of `num` items or less from the `iterable`."""
while True:
items = take(iterable, num)
if len(items) <= 0:
break
yield items
def with_transposed(zfile: ZipFile,
member_key: str,
cdata: dict,
process_value: Callable[
[str, tuple[str, ...], tuple[str, ...]],
tuple[dict, ...]]) -> Iterator[dict]:
"""Process transposed file values
Arguments:
zfile: A zipfile object from opening a R/qtl2 bundle.
member_key: A key to retrieve the member file to process from the file.
cdata: The control data from the R/qtl2 bundle read from the JSON/YAML file.
process_value: A function to process the values from the file.
"""
with zfile.open(cdata[member_key]) as innerfile:
lines = (tuple(field.strip() for field in
line.strip().split(cdata.get("sep", ",")))
for line in
filter(lambda line: not line.startswith("#"),
io.TextIOWrapper(innerfile)))
try:
id_line = next(lines)
id_key, headers = id_line[0], id_line[1:]
for _key, row in reduce(# type: ignore[var-annotated]
__make_organise_by_id__(id_key),
(row
for batch in __batch_of_n__(lines, 300)
for line in batch
for row in process_value(id_key, headers, line)),
{}).items():
yield {
"id": row[id_key],
**{
key: value
for key, value in row.items()
if key != id_key
}}
except StopIteration:
pass
def make_process_data_geno(cdata) -> tuple[
Callable[[dict], dict],
Callable[[str, tuple[str, ...], tuple[str, ...]],
tuple[dict, ...]]]:
"""Build functions to process genotype data."""
def replace_genotype_codes(val):#pylint: disable=[redefined-outer-name]
# The rewrite will probably make this obsolete.
return cdata["genotypes"].get(val, val)
def __non_transposed__(row: dict) -> dict:
return {
key: chain(value, replace_genotype_codes,
partial(replace_na_strings, cdata))
for key,value in row.items()
}
def __transposed__(id_key: str,
ids: tuple[str, ...],
vals: tuple[str, ...]) -> tuple[dict, ...]:
return tuple(
dict(zip(
[id_key, vals[0]],
(chain(item, replace_genotype_codes, partial(replace_na_strings, cdata))
for item in items)))
for items in zip(ids, vals[1:]))
return (__non_transposed__, __transposed__)
def replace_sex_info(val, cdata: dict):
"""Replace sex information in files with values in the control data."""
sex_info = cdata.get("sex", False)
if bool(sex_info):
return sex_info.get(val, val)
return val
def replace_cross_info(val, cdata: dict):
"""
Replace cross information in files with the values in the control data.
"""
cross_info = cdata.get("cross_info", False)
if bool(cross_info):
return cross_info.get(val, val)
return val
def make_process_data_covar(cdata) -> tuple[
Callable[[dict], dict],
Callable[[str, tuple[str, ...], tuple[str, ...]],
tuple[dict, ...]]]:
"""Build functions to process sex and cross information in covar files."""
rep_sex_info = partial(replace_sex_info, cdata=cdata)
rep_cross_info = partial(replace_cross_info, cdata=cdata)
def non_transposed(row: dict) -> dict:
return {
key: chain(value, rep_sex_info, rep_cross_info)
for key,value in row.items()
}
def transposed(id_key: str,
ids: tuple[str, ...],
vals: tuple[str, ...]) -> tuple[dict, ...]:
return tuple(
dict(zip(
[id_key, vals[0]],
(chain(item, rep_sex_info, rep_cross_info)
for item in items)))
for items in zip(ids, vals[1:]))
return (non_transposed, transposed)
def file_data(zfile: ZipFile,
member_key: str,
cdata: dict,
process_value: Optional[Callable[[dict], dict]] = None,
process_transposed_value: Optional[Callable[
[str, tuple[str, ...], tuple[str, ...]],
tuple[dict, ...]]] = None) -> Iterator[dict]:
"""Load data from files in R/qtl2 zip bundle."""
def __default_process_value_non_transposed__(val: dict) -> dict:
return {
key: replace_na_strings(cdata, value) for key,value in val.items()
}
def __default_process_value_transposed__(
id_key: str,
ids: tuple[str, ...],
vals: tuple[str, ...]) -> tuple[dict, ...]:
"""Default values processor for transposed files."""
return tuple(
dict(zip([id_key, replace_na_strings(cdata, vals[0])], items))
for items in zip(
ids, (replace_na_strings(cdata, val) for val in vals[1:])))
process_value = process_value or __default_process_value_non_transposed__
process_transposed_value = (
process_transposed_value or __default_process_value_transposed__)
try:
if isinstance(cdata[member_key], list):
for row in (line for lines in
(file_data(
zfile, member_key, {**cdata, member_key: innerfile},
process_value, process_transposed_value)
for innerfile in cdata[member_key])
for line in lines):
yield row
return
if not cdata.get(f"{member_key}_transposed", False):
for row in with_non_transposed(zfile, member_key, cdata, process_value):
yield row
return
for row in with_transposed(
zfile, member_key, cdata, process_transposed_value):
yield row
except KeyError as exc:
raise MissingFileError(*exc.args) from exc
def cross_information(zfile: ZipFile, cdata: dict) -> Iterator[dict]:
"""Load cross information where present."""
cdata_cross_info = cdata.get("cross_info", {})
cross_info_file_key = "covar"
new_cdata = {**cdata}
sex_fields = (cdata.get("sex",{}).get("covar",""),)
if "file" in cdata_cross_info:
cross_info_file_key = "gnqc_cross_info_file"
new_cdata = {**cdata, "gnqc_cross_info_file": cdata_cross_info["file"]}
for row in file_data(zfile,
cross_info_file_key,
new_cdata,
*make_process_data_covar(cdata)):
yield {
key: chain(value, partial(replace_cross_info, cdata=cdata))
for key, value in row.items() if key not in sex_fields}
def sex_information(zfile: ZipFile, cdata: dict) -> Iterator[dict]:
"""Load cross information where present."""
cdata_sex_info = cdata.get("sex", {})
sex_info_file_key = "covar"
new_cdata = {**cdata}
ci_fields = (cdata.get("cross_info",{}).get("covar",""),)
if "file" in cdata_sex_info:
sex_info_file_key = "gnqc_sex_info_file"
new_cdata = {**cdata, "gnqc_sex_info_file": cdata_sex_info["file"]}
for row in file_data(zfile,
sex_info_file_key,
new_cdata,
*make_process_data_covar(cdata)):
yield {
key: chain(value, partial(replace_sex_info, cdata=cdata))
for key, value in row.items() if key not in ci_fields}
def genotype_data(zfile: ZipFile):
"""Convenience function to genotype data from R/qtl2 bundle."""
cdata = control_data(zfile)
return file_data(zfile, "geno", cdata, *make_process_data_geno(cdata))
def raw_file_data(zipfilepath: Union[str, Path],
memberfilename: str) -> Iterator[str]:
"""Read the raw text from a file in the R/qtl2 bundle."""
with (ZipFile(str(zipfilepath), "r") as zfile,
zfile.open(memberfilename) as innerfile):
wrappedfile = io.TextIOWrapper(innerfile)
for line in wrappedfile:
yield line
def strip_comments(rawdata: Iterator[str], commentchar) -> Iterator[str]:
"""Remove comments from raw text."""
return (line for line in rawdata if not line.startswith(commentchar))
def missing_value_codes_to_none(value: str,
nastrings: tuple[str, ...]) -> Optional[str]:
"""
If 'value' is a missing value code, return `None`, otherwise return 'value'.
"""
return value if value not in nastrings else None
def replace_genotype_codes(value: str, genocodes: dict):
"""Convert genotype codes into values specified in control file."""
return genocodes.get(value, value)
def read_control_file(zipfilepath: Union[str, Path]) -> dict:
"""Read control data."""
with ZipFile(str(zipfilepath), "r") as zfile:
# move `control_data` code here and replace existing function.
cdata = control_data(zfile)
return {
**cdata,
**{
ftype: ([cdata[ftype]]
if isinstance(cdata[ftype], str)
else cdata[ftype])
for ftype in FILE_TYPES
if bool(cdata.get(ftype))
}
}
def read_file_data(
zipfilepath: Union[str, Path],
memberfilename: str,
processfile: Callable[[Iterator[str]], Iterator[str]] = lambda itr: itr,
processline: Callable[[str], str] = lambda line: line,
processfield: Callable[
[Optional[str]], Optional[str]] = lambda val: val) -> Iterator[
tuple[Optional[str], ...]]:
"""Read a single file from the bundle processing each field."""
cdata = read_control_file(zipfilepath)
return (
tuple(processfield(field.strip())
for field in processline(row.strip()).split(cdata["sep"]))
for row in
processfile(
strip_comments(
raw_file_data(zipfilepath, memberfilename),
cdata["comment.char"])))
def read_geno_file_data(
zipfilepath: Union[str, Path],
memberfilename: str) -> Iterator[tuple[Optional[str], ...]]:
"""Read a 'geno' file from the R/qtl2 bundle."""
cdata = read_control_file(zipfilepath)
return read_file_data(
zipfilepath,
memberfilename,
processfield=partial(
replace_genotype_codes, genocodes=cdata.get("genotypes", {})))
def load_samples(
zipfilepath: Union[str, Path], filetype: str) -> tuple[str, ...]:
"""Load the samples/cases/individuals from file(s) of type 'filetype'."""
cdata = read_control_file(zipfilepath)
samples: set[str] = set()
for afile in cdata.get(filetype, []):
filedata = read_geno_file_data(zipfilepath, afile)
if cdata.get(f"{filetype}_transposed", False):
samples.update(
item for item in next(filedata)[1:] if item is not None)
else:
try:
next(filedata)# Ignore first row.
samples.update(
line[0] for line in filedata if line[0] is not None)
except StopIteration:# Empty file.
pass
return tuple(samples)
load_geno_samples = partial(load_samples, filetype="geno")
load_founder_geno_samples = partial(load_samples, filetype="founder_geno")
load_pheno_samples = partial(load_samples, filetype="pheno")
|