From dd369b846524fed0c08d1b7318fd73478506c3ee Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Fri, 9 Feb 2024 03:10:06 +0300 Subject: Provide the key for each file listed in the control file. --- r_qtl/r_qtl2_qc.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) (limited to 'r_qtl') diff --git a/r_qtl/r_qtl2_qc.py b/r_qtl/r_qtl2_qc.py index 4b3e184..a41c442 100644 --- a/r_qtl/r_qtl2_qc.py +++ b/r_qtl/r_qtl2_qc.py @@ -2,7 +2,7 @@ import re from zipfile import ZipFile from functools import reduce -from typing import Union, Sequence, Iterator, Optional, Callable +from typing import Union, Iterator, Optional, Callable from r_qtl import errors as rqe from r_qtl import r_qtl2 as rqtl2 @@ -11,14 +11,15 @@ from r_qtl.fileerrors import MissingFile from quality_control.errors import InvalidValue -def bundle_files_list(cdata: dict) -> tuple[str, ...]: +def bundle_files_list(cdata: dict) -> tuple[tuple[str, str], ...]: """Retrieve files listed in control file.""" - def __merge__(alist: tuple[str, ...], member: Union[str, Sequence[str]]) -> tuple[str, ...]: + def __merge__(alist: tuple[tuple[str, str], ...], member: str) -> tuple[ + tuple[str, str], ...]: if isinstance(cdata[member], str): - return alist + (cdata[member],) - return alist + tuple(cdata[member]) + return alist + ((member, str(cdata[member])),) + return alist + tuple((member, str(afile)) for afile in cdata[member]) - fileslist: tuple[str, ...] = reduce( + fileslist: tuple[tuple[str, str], ...] = reduce( __merge__, (key for key in cdata.keys() if key in __FILE_TYPES__), tuple()) @@ -26,30 +27,33 @@ def bundle_files_list(cdata: dict) -> tuple[str, ...]: if "file" in cdata.get("sex", {}): sexfile = cdata["sex"]["file"] fileslist = fileslist + ( - (sexfile,) if isinstance(sexfile, str) else tuple(sexfile)) + (("sex.file", sexfile),) if isinstance(sexfile, str) + else tuple(("sex.file", afile) for afile in sexfile)) if "file" in cdata.get("cross_info", {}): crossinfofile = cdata["cross_info"]["file"] fileslist = fileslist + ( - (crossinfofile,) if isinstance(crossinfofile, str) - else tuple(crossinfofile)) + (("cross_info.file", crossinfofile),) + if isinstance(crossinfofile, str) + else tuple(("cross_info.file", afile) for afile in crossinfofile)) return fileslist -def missing_files(zfile: ZipFile) -> tuple[str, ...]: +def missing_files(zfile: ZipFile) -> tuple[tuple[str, str], ...]: """ Retrieve a list of files listed in the control file that do not exist in the bundle. """ - def __missing_p__(thefile): + def __missing_p__(filedetails: tuple[str, str]): + _cfkey, thefile = filedetails try: zfile.getinfo(thefile) return False except KeyError: return True - return tuple(filter(__missing_p__, - bundle_files_list(rqtl2.control_data(zfile)))) + return tuple(afile for afile in bundle_files_list(rqtl2.control_data(zfile)) + if __missing_p__(afile)) def validate_bundle(zfile: ZipFile): """Ensure the R/qtl2 bundle is valid.""" @@ -57,7 +61,7 @@ def validate_bundle(zfile: ZipFile): if len(missing) > 0: raise rqe.MissingFileError( "The following files do not exist in the bundle: " + - ", ".join(missing)) + ", ".join(mfile[1] for mfile in missing)) def make_genocode_checker(genocode: dict) -> Callable[[int, str, str], Optional[InvalidValue]]: """Make a checker from the genotypes in the control data""" -- cgit v1.2.3