"""Quality control checks for R/qtl2 data bundles.""" from zipfile import ZipFile from functools import reduce, partial from typing import Union, Iterator, Optional, Callable from r_qtl import errors as rqe from r_qtl import r_qtl2 as rqtl2 from r_qtl.r_qtl2 import FILE_TYPES from r_qtl.fileerrors import MissingFile from quality_control.errors import InvalidValue from quality_control.checks import decimal_points_error def bundle_files_list(cdata: dict) -> tuple[tuple[str, str], ...]: """Retrieve files listed in control file.""" def __merge__(alist: tuple[tuple[str, str], ...], member: str) -> tuple[ tuple[str, str], ...]: if isinstance(cdata[member], str): return alist + ((member, str(cdata[member])),) return alist + tuple((member, str(afile)) for afile in cdata[member]) fileslist: tuple[tuple[str, str], ...] = reduce( __merge__, (key for key in cdata.keys() if key in FILE_TYPES), tuple()) if "file" in cdata.get("sex", {}): sexfile = cdata["sex"]["file"] fileslist = fileslist + ( (("sex.file", sexfile),) if isinstance(sexfile, str) else tuple(("sex.file", afile) for afile in sexfile)) if "file" in cdata.get("cross_info", {}): crossinfofile = cdata["cross_info"]["file"] fileslist = fileslist + ( (("cross_info.file", crossinfofile),) if isinstance(crossinfofile, str) else tuple(("cross_info.file", afile) for afile in crossinfofile)) return fileslist def missing_files(zfile: ZipFile) -> tuple[tuple[str, str], ...]: """ Retrieve a list of files listed in the control file that do not exist in the bundle. """ def __missing_p__(filedetails: tuple[str, str]): _cfkey, thefile = filedetails try: zfile.getinfo(thefile) return False except KeyError: return True return tuple(afile for afile in bundle_files_list(rqtl2.control_data(zfile)) if __missing_p__(afile)) def validate_bundle(zfile: ZipFile): """Ensure the R/qtl2 bundle is valid.""" missing = missing_files(zfile) if len(missing) > 0: raise rqe.MissingFileError( "The following files do not exist in the bundle: " + ", ".join(mfile[1] for mfile in missing)) def make_genocode_checker(genocode: dict) -> Callable[[int, str, str], Optional[InvalidValue]]: """Make a checker from the genotypes in the control data""" def __checker__(lineno: int, field: str, value: str) -> Optional[InvalidValue]: genotypes = tuple(genocode.keys()) if value not in genotypes: return InvalidValue(lineno, field, value, ( f"Invalid value '{value}'. Expected one of {genotypes}.")) return None return __checker__ def geno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]: """Check for and retrieve geno errors.""" cdata = rqtl2.control_data(zfile) return ( error for error in retrieve_errors( zfile, "geno", (make_genocode_checker(cdata.get("genotypes", {})),)) if error is not None) def pheno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]: """Check for and retrieve pheno errors.""" return ( error for error in retrieve_errors( zfile, "pheno", (partial(decimal_points_error, mini=3),)) if error is not None) def phenose_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]: """Check for and retrieve phenose errors.""" return ( error for error in retrieve_errors( zfile, "phenose", (partial(decimal_points_error, mini=6),)) if error is not None) def retrieve_errors(zfile: ZipFile, filetype: str, checkers: tuple[Callable]) -> Iterator[ Union[InvalidValue, MissingFile]]: """Check for and retrieve errors from files of type `filetype`.""" assert filetype in FILE_TYPES, f"Invalid file type {filetype}." cdata = rqtl2.control_data(zfile) try: for lineno, row in enumerate( rqtl2.file_data(zfile, filetype, cdata), start=1): for field, value in row.items(): if field == "id": continue if value is not None: for checker in checkers: yield checker(lineno, field, value) except rqe.MissingFileError: fname = cdata.get(filetype) yield MissingFile(filetype, fname, f"Missing '{filetype}' file '{fname}'.")