1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
"""Quality control checks for R/qtl2 data bundles."""
from zipfile import ZipFile
from functools import reduce, partial
from typing import Union, Iterator, Optional, Callable
from r_qtl import errors as rqe
from r_qtl import r_qtl2 as rqtl2
from r_qtl.r_qtl2 import __FILE_TYPES__
from r_qtl.fileerrors import MissingFile
from quality_control.errors import InvalidValue
from quality_control.checks import decimal_points_error
def bundle_files_list(cdata: dict) -> tuple[tuple[str, str], ...]:
"""Retrieve files listed in control file."""
def __merge__(alist: tuple[tuple[str, str], ...], member: str) -> tuple[
tuple[str, str], ...]:
if isinstance(cdata[member], str):
return alist + ((member, str(cdata[member])),)
return alist + tuple((member, str(afile)) for afile in cdata[member])
fileslist: tuple[tuple[str, str], ...] = reduce(
__merge__,
(key for key in cdata.keys() if key in __FILE_TYPES__),
tuple())
if "file" in cdata.get("sex", {}):
sexfile = cdata["sex"]["file"]
fileslist = fileslist + (
(("sex.file", sexfile),) if isinstance(sexfile, str)
else tuple(("sex.file", afile) for afile in sexfile))
if "file" in cdata.get("cross_info", {}):
crossinfofile = cdata["cross_info"]["file"]
fileslist = fileslist + (
(("cross_info.file", crossinfofile),)
if isinstance(crossinfofile, str)
else tuple(("cross_info.file", afile) for afile in crossinfofile))
return fileslist
def missing_files(zfile: ZipFile) -> tuple[tuple[str, str], ...]:
"""
Retrieve a list of files listed in the control file that do not exist in the
bundle.
"""
def __missing_p__(filedetails: tuple[str, str]):
_cfkey, thefile = filedetails
try:
zfile.getinfo(thefile)
return False
except KeyError:
return True
return tuple(afile for afile in bundle_files_list(rqtl2.control_data(zfile))
if __missing_p__(afile))
def validate_bundle(zfile: ZipFile):
"""Ensure the R/qtl2 bundle is valid."""
missing = missing_files(zfile)
if len(missing) > 0:
raise rqe.MissingFileError(
"The following files do not exist in the bundle: " +
", ".join(mfile[1] for mfile in missing))
def make_genocode_checker(genocode: dict) -> Callable[[int, str, str], Optional[InvalidValue]]:
"""Make a checker from the genotypes in the control data"""
def __checker__(lineno: int, field: str, value: str) -> Optional[InvalidValue]:
genotypes = tuple(genocode.keys())
if value not in genotypes:
return InvalidValue(lineno, field, value, (
f"Invalid value '{value}'. Expected one of {genotypes}."))
return None
return __checker__
def geno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]:
"""Check for and retrieve geno errors."""
cdata = rqtl2.control_data(zfile)
return (
error for error in retrieve_errors(
zfile, "geno", (make_genocode_checker(cdata.get("genotypes", {})),))
if error is not None)
def pheno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]:
"""Check for and retrieve pheno errors."""
return (
error for error in retrieve_errors(
zfile, "pheno", (partial(decimal_points_error, mini=3),))
if error is not None)
def phenose_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]:
"""Check for and retrieve phenose errors."""
return (
error for error in retrieve_errors(
zfile, "phenose", (partial(decimal_points_error, mini=6),))
if error is not None)
def retrieve_errors(zfile: ZipFile, filetype: str, checkers: tuple[Callable]) -> Iterator[
Union[InvalidValue, MissingFile]]:
"""Check for and retrieve errors from files of type `filetype`."""
assert filetype in __FILE_TYPES__, f"Invalid file type {filetype}."
cdata = rqtl2.control_data(zfile)
try:
for lineno, row in enumerate(
rqtl2.file_data(zfile, filetype, cdata), start=1):
for field, value in row.items():
if field == "id":
continue
if value is not None:
for checker in checkers:
yield checker(lineno, field, value)
except rqe.MissingFileError:
fname = cdata.get(filetype)
yield MissingFile(filetype, fname, f"Missing '{filetype}' file '{fname}'.")
|