"""Quality control checks for R/qtl2 data bundles.""" import re from zipfile import ZipFile from functools import reduce from typing import Union, Sequence, Iterator from r_qtl import errors as rqe from r_qtl import r_qtl2 as rqtl2 from r_qtl.r_qtl2 import __FILE_TYPES__ from r_qtl.fileerrors import MissingFile from quality_control.errors import InvalidValue def bundle_files_list(cdata: dict) -> tuple[str, ...]: """Retrieve files listed in control file.""" def __merge__(alist: tuple[str, ...], member: Union[str, Sequence[str]]) -> tuple[str, ...]: if isinstance(cdata[member], str): return alist + (cdata[member],) return alist + tuple(cdata[member]) fileslist: tuple[str, ...] = reduce( __merge__, (key for key in cdata.keys() if key in __FILE_TYPES__), tuple()) if "file" in cdata.get("sex", {}): sexfile = cdata["sex"]["file"] fileslist = fileslist + ( (sexfile,) if isinstance(sexfile, str) else tuple(sexfile)) if "file" in cdata.get("cross_info", {}): crossinfofile = cdata["cross_info"]["file"] fileslist = fileslist + ( (crossinfofile,) if isinstance(crossinfofile, str) else tuple(crossinfofile)) return fileslist def missing_files(zfile: ZipFile) -> tuple[str, ...]: """ Retrieve a list of files listed in the control file that do not exist in the bundle. """ def __missing_p__(thefile): try: zfile.getinfo(thefile) return False except KeyError: return True return tuple(filter(__missing_p__, bundle_files_list(rqtl2.control_data(zfile)))) def validate_bundle(zfile: ZipFile): """Ensure the R/qtl2 bundle is valid.""" missing = missing_files(zfile) if len(missing) > 0: raise rqe.MissingFileError( "The following files do not exist in the bundle: " + ", ".join(missing)) def geno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]: """Check for and retrieve geno errors.""" cdata = rqtl2.control_data(zfile) genotypes = tuple(cdata.get("genotypes", {}).keys()) try: for lineno, row in enumerate( rqtl2.file_data(zfile, "geno", cdata), start=1): for field, value in row.items(): if field == "id": continue if value is not None and value not in genotypes: yield InvalidValue(lineno, field, value, ( f"Invalid value '{value}'. Expected one of " f"{genotypes}.")) except rqe.MissingFileError: fname = cdata.get("geno") yield MissingFile("geno", fname, f"Missing 'geno' file '{fname}'.") def pheno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]: """Check for and retrieve pheno errors.""" cdata = rqtl2.control_data(zfile) try: for lineno, row in enumerate( rqtl2.file_data(zfile, "pheno", cdata), start=1): for field, value in row.items(): if field == "id": continue if value is not None and not( re.search(r"^([0-9]+\.[0-9]{3,}|[0-9]+\.?0*)$", value) or re.search(r"^0\.0+$", value) or re.search("^0+$", value)): yield InvalidValue(lineno, field, value, ( f"Invalid value '{value}'. Expected numerical value " "with at least 3 decimal places.")) except rqe.MissingFileError: fname = cdata.get("pheno") yield MissingFile("pheno", fname, f"Missing 'pheno' file '{fname}'.")