1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
"""Quality control checks for R/qtl2 data bundles."""
from pathlib import Path
from functools import reduce, partial
from zipfile import ZipFile, is_zipfile
from typing import Union, Iterator, Optional, Callable
from r_qtl import r_qtl2 as rqtl2
from r_qtl import exceptions as rqe
from r_qtl.r_qtl2 import FILE_TYPES
from r_qtl.fileerrors import MissingFile
from quality_control.errors import InvalidValue
from quality_control.checks import decimal_points_error
def bundle_files_list(cdata: dict) -> tuple[tuple[str, str], ...]:
"""Retrieve files listed in control file."""
def __merge__(alist: tuple[tuple[str, str], ...], member: str) -> tuple[
tuple[str, str], ...]:
if isinstance(cdata[member], str):
return alist + ((member, str(cdata[member])),)
return alist + tuple((member, str(afile)) for afile in cdata[member])
fileslist: tuple[tuple[str, str], ...] = reduce(
__merge__,
(key for key in cdata.keys() if key in FILE_TYPES),
tuple())
if "file" in cdata.get("sex", {}):
sexfile = cdata["sex"]["file"]
fileslist = fileslist + (
(("sex.file", sexfile),) if isinstance(sexfile, str)
else tuple(("sex.file", afile) for afile in sexfile))
if "file" in cdata.get("cross_info", {}):
crossinfofile = cdata["cross_info"]["file"]
fileslist = fileslist + (
(("cross_info.file", crossinfofile),)
if isinstance(crossinfofile, str)
else tuple(("cross_info.file", afile) for afile in crossinfofile))
return fileslist
def __missing_from_zipfile__(
zfile: ZipFile, cdata: dict) -> tuple[tuple[str, str], ...]:
"""Check for missing files from a still-compressed zip file."""
def __missing_p__(filedetails: tuple[str, str]):
_cfkey, thefile = filedetails
try:
zfile.getinfo(thefile)
return False
except KeyError:
return True
return tuple(afile for afile in bundle_files_list(cdata)
if __missing_p__(afile))
def __missing_from_dirpath__(
dirpath: Path, cdata: dict) -> tuple[tuple[str, str], ...]:
"""Check for missing files from an extracted bundle."""
allfiles = tuple(_file.name for _file in dirpath.iterdir())
return tuple(afile for afile in bundle_files_list(cdata)
if afile[1] not in allfiles)
def missing_files(bundlesrc: Union[Path, ZipFile]) -> tuple[tuple[str, str], ...]:
"""
Retrieve a list of files listed in the control file that do not exist in the
bundle.
Parameters
----------
bundlesrc: Path object of ZipFile object: This is the bundle under check.
Returns
-------
A tuple of names listed in the control file that do not exist in the bundle.
Raises
------
r_qtl.exceptions.InvalidFormat
"""
cdata = rqtl2.control_data(bundlesrc)
if isinstance(bundlesrc, ZipFile):
return __missing_from_zipfile__(bundlesrc, cdata)
if isinstance(bundlesrc, Path):
if is_zipfile(bundlesrc):
return __missing_from_zipfile__(ZipFile(bundlesrc, cdata))
if bundlesrc.is_dir():
return __missing_from_dirpath__(bundlesrc, cdata)
raise InvalidFormat(
"Expects either a zipfile.ZipFile object or a pathlib.Path object "
"pointing to a directory containing the R/qtl2 bundle.")
def validate_bundle(zfile: ZipFile):
"""Ensure the R/qtl2 bundle is valid."""
missing = missing_files(zfile)
if len(missing) > 0:
raise rqe.MissingFileException(
"The following files do not exist in the bundle: " +
", ".join(mfile[1] for mfile in missing))
def make_genocode_checker(genocode: dict, filename: str) -> Callable[
[int, str, str], Optional[InvalidValue]]:
"""Make a checker from the genotypes in the control data"""
def __checker__(lineno: int, field: str, value: str) -> Optional[InvalidValue]:
genotypes = tuple(genocode.keys())
if value not in genotypes:
return InvalidValue(filename, lineno, field, value, (
f"Invalid value '{value}'. Expected one of {genotypes}."))
return None
return __checker__
def geno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]:
"""Check for and retrieve geno errors."""
cdata = rqtl2.control_data(zfile)
return (
error for error in retrieve_errors(
zfile, "geno", (make_genocode_checker(cdata.get("genotypes", {}), "geno"),))
if error is not None)
def pheno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]:
"""Check for and retrieve pheno errors."""
return (
error for error in retrieve_errors(
zfile, "pheno", (partial(
decimal_points_error, mini=3, filename="pheno"),))
if error is not None)
def phenose_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]:
"""Check for and retrieve phenose errors."""
return (
error for error in retrieve_errors(
zfile, "phenose", (partial(decimal_points_error, mini=6),))
if error is not None)
def retrieve_errors(zfile: ZipFile, filetype: str, checkers: tuple[Callable]) -> Iterator[
Union[InvalidValue, MissingFile]]:
"""Check for and retrieve errors from files of type `filetype`."""
assert filetype in FILE_TYPES, f"Invalid file type {filetype}."
cdata = rqtl2.control_data(zfile)
try:
for lineno, row in enumerate(
rqtl2.file_data(zfile, filetype, cdata), start=1):
for field, value in row.items():
if field == "id":
continue
if value is not None:
for checker in checkers:
yield checker(lineno=lineno, field=field, value=value)
except rqe.MissingFileException:
fname = cdata.get(filetype)
yield MissingFile(filetype, fname, f"Missing '{filetype}' file '{fname}'.")
|