"""Quality control checks for R/qtl2 data bundles.""" from zipfile import ZipFile from functools import reduce from typing import Union, Sequence from r_qtl import errors as rqe from r_qtl import r_qtl2 as rqtl2 from r_qtl.r_qtl2 import __FILE_TYPES__ def bundle_files_list(zfile, cdata: dict) -> tuple[str]: """Retrieve files listed in control file.""" def __merge__(alist: tuple[str, ...], member: Union[str, Sequence[str]]) -> tuple[str, ...]: if isinstance(cdata[member], str): return alist + (cdata[member],) return alist + tuple(cdata[member]) fileslist = tuple() fileslist = reduce(__merge__, (key for key in cdata.keys() if key in __FILE_TYPES__), tuple()) if "file" in cdata.get("sex", {}): sexfile = cdata["sex"]["file"] fileslist = fileslist + ( (sexfile,) if isinstance(sexfile, str) else tuple(sexfile)) if "file" in cdata.get("cross_info", {}): crossinfofile = cdata["cross_info"]["file"] fileslist = fileslist + ( (crossinfofile,) if isinstance(crossinfofile, str) else tuple(crossinfofile)) return fileslist def missing_files(zfile: ZipFile) -> tuple[str]: """ Retrieve a list of files listed in the control file that do not exist in the bundle. """ def __missing_p__(thefile): try: zfile.getinfo(thefile) return False except KeyError: return True return tuple(filter(__missing_p__, bundle_files_list(zfile, rqtl2.control_data(zfile)))) def validate_bundle(zfile: ZipFile): """Ensure the R/qtl2 bundle is valid.""" missing = missing_files(zfile) if len(missing) > 0: raise rqe.MissingFileError( "The following files do not exist in the bundle: " + ", ".join(missing))