aboutsummaryrefslogtreecommitdiff
"""Quality control checks for R/qtl2 data bundles."""
from pathlib import Path
from functools import reduce, partial
from zipfile import ZipFile, is_zipfile
from typing import Union, Iterator, Optional, Callable

from r_qtl import r_qtl2 as rqtl2
from r_qtl import exceptions as rqe
from r_qtl.r_qtl2 import FILE_TYPES
from r_qtl.fileerrors import MissingFile
from r_qtl.exceptions import InvalidFormat

from quality_control.errors import InvalidValue
from quality_control.checks import decimal_points_error

def bundle_files_list(cdata: dict) -> tuple[tuple[str, str], ...]:
    """Retrieve files listed in control file."""
    def __merge__(alist: tuple[tuple[str, str], ...], member: str) -> tuple[
            tuple[str, str], ...]:
        if isinstance(cdata[member], str):
            return alist + ((member, str(cdata[member])),)
        return alist + tuple((member, str(afile)) for afile in cdata[member])

    fileslist: tuple[tuple[str, str], ...] = reduce(
        __merge__,
        (key for key in cdata.keys() if key in FILE_TYPES),
        tuple())

    if "file" in cdata.get("sex", {}):
        sexfile = cdata["sex"]["file"]
        fileslist = fileslist + (
            (("sex.file", sexfile),) if isinstance(sexfile, str)
            else tuple(("sex.file", afile) for afile in sexfile))

    if "file" in cdata.get("cross_info", {}):
        crossinfofile = cdata["cross_info"]["file"]
        fileslist = fileslist + (
            (("cross_info.file", crossinfofile),)
            if isinstance(crossinfofile, str)
            else tuple(("cross_info.file", afile) for afile in crossinfofile))

    return fileslist


def __missing_from_zipfile__(
        zfile: ZipFile, cdata: dict) -> tuple[tuple[str, str], ...]:
    """Check for missing files from a still-compressed zip file."""
    def __missing_p__(filedetails: tuple[str, str]):
        _cfkey, thefile = filedetails
        try:
            zfile.getinfo(thefile)
            return False
        except KeyError:
            return True

    return tuple(afile for afile in bundle_files_list(cdata)
                 if __missing_p__(afile))


def __missing_from_dirpath__(
        dirpath: Path, cdata: dict) -> tuple[tuple[str, str], ...]:
    """Check for missing files from an extracted bundle."""
    allfiles = tuple(_file.name for _file in dirpath.iterdir())
    return tuple(afile for afile in bundle_files_list(cdata)
                 if afile[1] not in allfiles)


def missing_files(bundlesrc: Union[Path, ZipFile]) -> tuple[tuple[str, str], ...]:
    """
    Retrieve a list of files listed in the control file that do not exist in the
    bundle.

    Parameters
    ----------
    bundlesrc: Path object of ZipFile object: This is the bundle under check.

    Returns
    -------
    A tuple of names listed in the control file that do not exist in the bundle.

    Raises
    ------
    r_qtl.exceptions.InvalidFormat
    """
    cdata = rqtl2.control_data(bundlesrc)
    if isinstance(bundlesrc, ZipFile):
        return __missing_from_zipfile__(bundlesrc, cdata)
    if isinstance(bundlesrc, Path):
        if is_zipfile(bundlesrc):
            return __missing_from_zipfile__(ZipFile(bundlesrc), cdata)
        if bundlesrc.is_dir():
            return __missing_from_dirpath__(bundlesrc, cdata)
    raise InvalidFormat(
        "Expects either a zipfile.ZipFile object or a pathlib.Path object "
        "pointing to a directory containing the R/qtl2 bundle.")


def validate_bundle(zfile: Union[Path, ZipFile]):
    """Ensure the R/qtl2 bundle is valid."""
    missing = missing_files(zfile)
    if len(missing) > 0:
        raise rqe.MissingFileException(
                        "The following files do not exist in the bundle: " +
                        ", ".join(mfile[1] for mfile in missing))

def make_genocode_checker(genocode: dict, filename: str) -> Callable[
        [int, str, str], Optional[InvalidValue]]:
    """Make a checker from the genotypes in the control data"""
    def __checker__(lineno: int, field: str, value: str) -> Optional[InvalidValue]:
        genotypes = tuple(genocode.keys())
        if value not in genotypes:
            return InvalidValue(filename, lineno, field, value, (
                f"Invalid value '{value}'. Expected one of {genotypes}."))
        return None
    return __checker__

def geno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]:
    """Check for and retrieve geno errors."""
    cdata = rqtl2.control_data(zfile)
    return (
        error for error in retrieve_errors(
            zfile, "geno", (make_genocode_checker(cdata.get("genotypes", {}), "geno"),))
        if error is not None)

def pheno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]:
    """Check for and retrieve pheno errors."""
    return (
        error for error in retrieve_errors(
            zfile, "pheno", (partial(
                decimal_points_error, mini=3, filename="pheno"),))
        if error is not None)

def phenose_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]:
    """Check for and retrieve phenose errors."""
    return (
        error for error in retrieve_errors(
            zfile, "phenose", (partial(decimal_points_error, mini=6),))
        if error is not None)

def retrieve_errors(zfile: ZipFile, filetype: str, checkers: tuple[Callable]) -> Iterator[
        Union[InvalidValue, MissingFile]]:
    """Check for and retrieve errors from files of type `filetype`."""
    assert filetype in FILE_TYPES, f"Invalid file type {filetype}."
    cdata = rqtl2.control_data(zfile)
    try:
        for lineno, row in enumerate(
                rqtl2.file_data(zfile, filetype, cdata), start=1):
            for field, value in row.items():
                if field == "id":
                    continue
                if value is not None:
                    for checker in checkers:
                        yield checker(lineno=lineno, field=field, value=value)
    except rqe.MissingFileException:
        fname = cdata.get(filetype)
        yield MissingFile(filetype, fname, f"Missing '{filetype}' file '{fname}'.")