aboutsummaryrefslogtreecommitdiff
path: root/r_qtl/r_qtl2_qc.py
diff options
context:
space:
mode:
Diffstat (limited to 'r_qtl/r_qtl2_qc.py')
-rw-r--r--r_qtl/r_qtl2_qc.py32
1 files changed, 18 insertions, 14 deletions
diff --git a/r_qtl/r_qtl2_qc.py b/r_qtl/r_qtl2_qc.py
index 4b3e184..a41c442 100644
--- a/r_qtl/r_qtl2_qc.py
+++ b/r_qtl/r_qtl2_qc.py
@@ -2,7 +2,7 @@
import re
from zipfile import ZipFile
from functools import reduce
-from typing import Union, Sequence, Iterator, Optional, Callable
+from typing import Union, Iterator, Optional, Callable
from r_qtl import errors as rqe
from r_qtl import r_qtl2 as rqtl2
@@ -11,14 +11,15 @@ from r_qtl.fileerrors import MissingFile
from quality_control.errors import InvalidValue
-def bundle_files_list(cdata: dict) -> tuple[str, ...]:
+def bundle_files_list(cdata: dict) -> tuple[tuple[str, str], ...]:
"""Retrieve files listed in control file."""
- def __merge__(alist: tuple[str, ...], member: Union[str, Sequence[str]]) -> tuple[str, ...]:
+ def __merge__(alist: tuple[tuple[str, str], ...], member: str) -> tuple[
+ tuple[str, str], ...]:
if isinstance(cdata[member], str):
- return alist + (cdata[member],)
- return alist + tuple(cdata[member])
+ return alist + ((member, str(cdata[member])),)
+ return alist + tuple((member, str(afile)) for afile in cdata[member])
- fileslist: tuple[str, ...] = reduce(
+ fileslist: tuple[tuple[str, str], ...] = reduce(
__merge__,
(key for key in cdata.keys() if key in __FILE_TYPES__),
tuple())
@@ -26,30 +27,33 @@ def bundle_files_list(cdata: dict) -> tuple[str, ...]:
if "file" in cdata.get("sex", {}):
sexfile = cdata["sex"]["file"]
fileslist = fileslist + (
- (sexfile,) if isinstance(sexfile, str) else tuple(sexfile))
+ (("sex.file", sexfile),) if isinstance(sexfile, str)
+ else tuple(("sex.file", afile) for afile in sexfile))
if "file" in cdata.get("cross_info", {}):
crossinfofile = cdata["cross_info"]["file"]
fileslist = fileslist + (
- (crossinfofile,) if isinstance(crossinfofile, str)
- else tuple(crossinfofile))
+ (("cross_info.file", crossinfofile),)
+ if isinstance(crossinfofile, str)
+ else tuple(("cross_info.file", afile) for afile in crossinfofile))
return fileslist
-def missing_files(zfile: ZipFile) -> tuple[str, ...]:
+def missing_files(zfile: ZipFile) -> tuple[tuple[str, str], ...]:
"""
Retrieve a list of files listed in the control file that do not exist in the
bundle.
"""
- def __missing_p__(thefile):
+ def __missing_p__(filedetails: tuple[str, str]):
+ _cfkey, thefile = filedetails
try:
zfile.getinfo(thefile)
return False
except KeyError:
return True
- return tuple(filter(__missing_p__,
- bundle_files_list(rqtl2.control_data(zfile))))
+ return tuple(afile for afile in bundle_files_list(rqtl2.control_data(zfile))
+ if __missing_p__(afile))
def validate_bundle(zfile: ZipFile):
"""Ensure the R/qtl2 bundle is valid."""
@@ -57,7 +61,7 @@ def validate_bundle(zfile: ZipFile):
if len(missing) > 0:
raise rqe.MissingFileError(
"The following files do not exist in the bundle: " +
- ", ".join(missing))
+ ", ".join(mfile[1] for mfile in missing))
def make_genocode_checker(genocode: dict) -> Callable[[int, str, str], Optional[InvalidValue]]:
"""Make a checker from the genotypes in the control data"""