"""Quality control checks for R/qtl2 data bundles.""" from pathlib import Path from functools import reduce, partial from zipfile import ZipFile, is_zipfile from typing import Union, Iterator, Optional, Callable from r_qtl import r_qtl2 as rqtl2 from r_qtl import exceptions as rqe from r_qtl.r_qtl2 import FILE_TYPES from r_qtl.fileerrors import MissingFile from r_qtl.exceptions import InvalidFormat from quality_control.errors import InvalidValue from quality_control.checks import decimal_points_error def bundle_files_list(cdata: dict) -> tuple[tuple[str, str], ...]: """Retrieve files listed in control file.""" def __merge__(alist: tuple[tuple[str, str], ...], member: str) -> tuple[ tuple[str, str], ...]: if isinstance(cdata[member], str): return alist + ((member, str(cdata[member])),) return alist + tuple((member, str(afile)) for afile in cdata[member]) fileslist: tuple[tuple[str, str], ...] = reduce( __merge__, (key for key in cdata.keys() if key in FILE_TYPES), tuple()) if "file" in cdata.get("sex", {}): sexfile = cdata["sex"]["file"] fileslist = fileslist + ( (("sex.file", sexfile),) if isinstance(sexfile, str) else tuple(("sex.file", afile) for afile in sexfile)) if "file" in cdata.get("cross_info", {}): crossinfofile = cdata["cross_info"]["file"] fileslist = fileslist + ( (("cross_info.file", crossinfofile),) if isinstance(crossinfofile, str) else tuple(("cross_info.file", afile) for afile in crossinfofile)) return fileslist def __missing_from_zipfile__( zfile: ZipFile, cdata: dict) -> tuple[tuple[str, str], ...]: """Check for missing files from a still-compressed zip file.""" def __missing_p__(filedetails: tuple[str, str]): _cfkey, thefile = filedetails try: zfile.getinfo(thefile) return False except KeyError: return True return tuple(afile for afile in bundle_files_list(cdata) if __missing_p__(afile)) def __missing_from_dirpath__( dirpath: Path, cdata: dict) -> tuple[tuple[str, str], ...]: """Check for missing files from an extracted bundle.""" allfiles = tuple(_file.name for _file in dirpath.iterdir()) return tuple(afile for afile in bundle_files_list(cdata) if afile[1] not in allfiles) def missing_files(bundlesrc: Union[Path, ZipFile]) -> tuple[tuple[str, str], ...]: """ Retrieve a list of files listed in the control file that do not exist in the bundle. Parameters ---------- bundlesrc: Path object of ZipFile object: This is the bundle under check. Returns ------- A tuple of names listed in the control file that do not exist in the bundle. Raises ------ r_qtl.exceptions.InvalidFormat """ cdata = rqtl2.control_data(bundlesrc) if isinstance(bundlesrc, ZipFile): return __missing_from_zipfile__(bundlesrc, cdata) if isinstance(bundlesrc, Path): if is_zipfile(bundlesrc): return __missing_from_zipfile__(ZipFile(bundlesrc), cdata) if bundlesrc.is_dir(): return __missing_from_dirpath__(bundlesrc, cdata) raise InvalidFormat( "Expects either a zipfile.ZipFile object or a pathlib.Path object " "pointing to a directory containing the R/qtl2 bundle.") def validate_bundle(zfile: Union[Path, ZipFile]): """Ensure the R/qtl2 bundle is valid.""" missing = missing_files(zfile) if len(missing) > 0: raise rqe.MissingFileException( "The following files do not exist in the bundle: " + ", ".join(mfile[1] for mfile in missing)) def make_genocode_checker(genocode: dict, filename: str) -> Callable[ [int, str, str], Optional[InvalidValue]]: """Make a checker from the genotypes in the control data""" def __checker__(lineno: int, field: str, value: str) -> Optional[InvalidValue]: genotypes = tuple(genocode.keys()) if value not in genotypes: return InvalidValue(filename, lineno, field, value, ( f"Invalid value '{value}'. Expected one of {genotypes}.")) return None return __checker__ def geno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]: """Check for and retrieve geno errors.""" cdata = rqtl2.control_data(zfile) return ( error for error in retrieve_errors( zfile, "geno", (make_genocode_checker(cdata.get("genotypes", {}), "geno"),)) if error is not None) def pheno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]: """Check for and retrieve pheno errors.""" return ( error for error in retrieve_errors( zfile, "pheno", (partial( decimal_points_error, mini=3, filename="pheno"),)) if error is not None) def phenose_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]: """Check for and retrieve phenose errors.""" return ( error for error in retrieve_errors( zfile, "phenose", (partial(decimal_points_error, mini=6),)) if error is not None) def retrieve_errors(zfile: ZipFile, filetype: str, checkers: tuple[Callable]) -> Iterator[ Union[InvalidValue, MissingFile]]: """Check for and retrieve errors from files of type `filetype`.""" assert filetype in FILE_TYPES, f"Invalid file type {filetype}." cdata = rqtl2.control_data(zfile) try: for lineno, row in enumerate( rqtl2.file_data(zfile, filetype, cdata), start=1): for field, value in row.items(): if field == "id": continue if value is not None: for checker in checkers: yield checker(lineno=lineno, field=field, value=value) except rqe.MissingFileException: fname = cdata.get(filetype) yield MissingFile(filetype, fname, f"Missing '{filetype}' file '{fname}'.")