aboutsummaryrefslogtreecommitdiff
path: root/r_qtl/r_qtl2_qc.py
blob: f62f142488ea5355de9780a690cd138be4b65d67 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""Quality control checks for R/qtl2 data bundles."""
import re
from zipfile import ZipFile
from functools import reduce
from typing import Union, Iterator, Optional, Callable

from r_qtl import errors as rqe
from r_qtl import r_qtl2 as rqtl2
from r_qtl.r_qtl2 import __FILE_TYPES__
from r_qtl.fileerrors import MissingFile

from quality_control.errors import InvalidValue

def bundle_files_list(cdata: dict) -> tuple[tuple[str, str], ...]:
    """Retrieve files listed in control file."""
    def __merge__(alist: tuple[tuple[str, str], ...], member: str) -> tuple[
            tuple[str, str], ...]:
        if isinstance(cdata[member], str):
            return alist + ((member, str(cdata[member])),)
        return alist + tuple((member, str(afile)) for afile in cdata[member])

    fileslist: tuple[tuple[str, str], ...] = reduce(
        __merge__,
        (key for key in cdata.keys() if key in __FILE_TYPES__),
        tuple())

    if "file" in cdata.get("sex", {}):
        sexfile = cdata["sex"]["file"]
        fileslist = fileslist + (
            (("sex.file", sexfile),) if isinstance(sexfile, str)
            else tuple(("sex.file", afile) for afile in sexfile))

    if "file" in cdata.get("cross_info", {}):
        crossinfofile = cdata["cross_info"]["file"]
        fileslist = fileslist + (
            (("cross_info.file", crossinfofile),)
            if isinstance(crossinfofile, str)
            else tuple(("cross_info.file", afile) for afile in crossinfofile))

    return fileslist

def missing_files(zfile: ZipFile) -> tuple[tuple[str, str], ...]:
    """
    Retrieve a list of files listed in the control file that do not exist in the
    bundle.
    """
    def __missing_p__(filedetails: tuple[str, str]):
        _cfkey, thefile = filedetails
        try:
            zfile.getinfo(thefile)
            return False
        except KeyError:
            return True

    return tuple(afile for afile in bundle_files_list(rqtl2.control_data(zfile))
                 if __missing_p__(afile))

def validate_bundle(zfile: ZipFile):
    """Ensure the R/qtl2 bundle is valid."""
    missing = missing_files(zfile)
    if len(missing) > 0:
        raise rqe.MissingFileError(
                        "The following files do not exist in the bundle: " +
                        ", ".join(mfile[1] for mfile in missing))

def make_genocode_checker(genocode: dict) -> Callable[[int, str, str], Optional[InvalidValue]]:
    """Make a checker from the genotypes in the control data"""
    def __checker__(lineno: int, field: str, value: str) -> Optional[InvalidValue]:
        genotypes = tuple(genocode.keys())
        if value not in genotypes:
            return InvalidValue(lineno, field, value, (
                f"Invalid value '{value}'. Expected one of {genotypes}."))
        return None
    return __checker__

def geno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]:
    """Check for and retrieve geno errors."""
    cdata = rqtl2.control_data(zfile)
    return (
        error for error in retrieve_errors(
            zfile, "geno", (make_genocode_checker(cdata.get("genotypes", {})),))
        if error is not None)

def pheno_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]:
    """Check for and retrieve pheno errors."""
    def __min_3_decimal_places__(
            lineno: int, field: str, value: str) -> Optional[InvalidValue]:
        if not (re.search(r"^([0-9]+\.[0-9]{3,}|[0-9]+\.?0*)$", value)
                or re.search(r"^0\.0+$", value)
                or re.search("^0+$", value)):
            return InvalidValue(lineno, field, value, (
                f"Invalid value '{value}'. Expected numerical value "
                "with at least 3 decimal places."))
        return None
    return (
        error for error in retrieve_errors(
            zfile, "pheno", (__min_3_decimal_places__,))
        if error is not None)

def phenose_errors(zfile: ZipFile) -> Iterator[Union[InvalidValue, MissingFile]]:
    """Check for and retrieve phenose errors."""
    def __min_6_decimal_places__(
            lineno: int, field: str, value: str) -> Optional[InvalidValue]:
        if not (re.search(r"^([0-9]+\.[0-9]{6,}|[0-9]+\.?0*)$", value)
                or re.search(r"^0\.0+$", value)
                or re.search("^0+$", value)):
            return InvalidValue(lineno, field, value, (
                f"Invalid value '{value}'. Expected numerical value "
                "with at least 6 decimal places."))
        return None
    return (
        error for error in retrieve_errors(
            zfile, "phenose", (__min_6_decimal_places__,))
        if error is not None)

def retrieve_errors(zfile: ZipFile, filetype: str, checkers: tuple[Callable]) -> Iterator[
        Union[InvalidValue, MissingFile]]:
    """Check for and retrieve errors from files of type `filetype`."""
    assert filetype in __FILE_TYPES__, f"Invalid file type {filetype}."
    cdata = rqtl2.control_data(zfile)
    try:
        for lineno, row in enumerate(
                rqtl2.file_data(zfile, filetype, cdata), start=1):
            for field, value in row.items():
                if field == "id":
                    continue
                if value is not None:
                    for checker in checkers:
                        yield checker(lineno, field, value)
    except rqe.MissingFileError:
        fname = cdata.get(filetype)
        yield MissingFile(filetype, fname, f"Missing '{filetype}' file '{fname}'.")