1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
"""Quality control checks for R/qtl2 data bundles."""
from zipfile import ZipFile
from functools import reduce
from typing import Union, Sequence
from r_qtl import errors as rqe
from r_qtl import r_qtl2 as rqtl2
from r_qtl.r_qtl2 import __FILE_TYPES__
def bundle_files_list(cdata: dict) -> tuple[str, ...]:
"""Retrieve files listed in control file."""
def __merge__(alist: tuple[str, ...], member: Union[str, Sequence[str]]) -> tuple[str, ...]:
if isinstance(cdata[member], str):
return alist + (cdata[member],)
return alist + tuple(cdata[member])
fileslist: tuple[str, ...] = reduce(
__merge__,
(key for key in cdata.keys() if key in __FILE_TYPES__),
tuple())
if "file" in cdata.get("sex", {}):
sexfile = cdata["sex"]["file"]
fileslist = fileslist + (
(sexfile,) if isinstance(sexfile, str) else tuple(sexfile))
if "file" in cdata.get("cross_info", {}):
crossinfofile = cdata["cross_info"]["file"]
fileslist = fileslist + (
(crossinfofile,) if isinstance(crossinfofile, str)
else tuple(crossinfofile))
return fileslist
def missing_files(zfile: ZipFile) -> tuple[str, ...]:
"""
Retrieve a list of files listed in the control file that do not exist in the
bundle.
"""
def __missing_p__(thefile):
try:
zfile.getinfo(thefile)
return False
except KeyError:
return True
return tuple(filter(__missing_p__,
bundle_files_list(rqtl2.control_data(zfile))))
def validate_bundle(zfile: ZipFile):
"""Ensure the R/qtl2 bundle is valid."""
missing = missing_files(zfile)
if len(missing) > 0:
raise rqe.MissingFileError(
"The following files do not exist in the bundle: " +
", ".join(missing))
|