aboutsummaryrefslogtreecommitdiff
path: root/r_qtl/r_qtl2_qc.py
blob: b45c17a195bda84c7dc47a7e277027690409ddc4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""Quality control checks for R/qtl2 data bundles."""
import re
from zipfile import ZipFile
from functools import reduce
from typing import Union, Sequence, Iterator, Optional

from r_qtl import errors as rqe
from r_qtl import r_qtl2 as rqtl2
from r_qtl.r_qtl2 import __FILE_TYPES__

def bundle_files_list(cdata: dict) -> tuple[str, ...]:
    """Retrieve files listed in control file."""
    def __merge__(alist: tuple[str, ...], member: Union[str, Sequence[str]]) -> tuple[str, ...]:
        if isinstance(cdata[member], str):
            return alist + (cdata[member],)
        return alist + tuple(cdata[member])

    fileslist: tuple[str, ...] = reduce(
        __merge__,
        (key for key in cdata.keys() if key in __FILE_TYPES__),
        tuple())

    if "file" in cdata.get("sex", {}):
        sexfile = cdata["sex"]["file"]
        fileslist = fileslist + (
            (sexfile,) if isinstance(sexfile, str) else tuple(sexfile))

    if "file" in cdata.get("cross_info", {}):
        crossinfofile = cdata["cross_info"]["file"]
        fileslist = fileslist + (
            (crossinfofile,) if isinstance(crossinfofile, str)
            else tuple(crossinfofile))

    return fileslist

def missing_files(zfile: ZipFile) -> tuple[str, ...]:
    """
    Retrieve a list of files listed in the control file that do not exist in the
    bundle.
    """
    def __missing_p__(thefile):
        try:
            zfile.getinfo(thefile)
            return False
        except KeyError:
            return True

    return tuple(filter(__missing_p__,
                        bundle_files_list(rqtl2.control_data(zfile))))

def validate_bundle(zfile: ZipFile):
    """Ensure the R/qtl2 bundle is valid."""
    missing = missing_files(zfile)
    if len(missing) > 0:
        raise rqe.MissingFileError(
                        "The following files do not exist in the bundle: " +
                        ", ".join(missing))

def geno_errors(zfile: ZipFile) -> Iterator[
        tuple[Optional[int], Optional[str], str]]:
    """Check for and retrieve geno errors."""
    cdata = rqtl2.control_data(zfile)
    genotypes = tuple(cdata.get("genotypes", {}).keys())
    try:
        for lineno, row in enumerate(
                rqtl2.file_data(zfile, "geno", cdata), start=1):
            for field, value in row.items():
                if field == "id":
                    continue
                if value is not None and value not in genotypes:
                    yield (lineno, field, (
                        f"Invalid value '{value}'. Expected one of {genotypes}"))
    except rqe.MissingFileError:
        yield (None, None, "Missing 'geno' file.")

def pheno_errors(zfile: ZipFile) -> Iterator[
        tuple[Optional[int], Optional[str], str]]:
    """Check for and retrieve pheno errors."""
    cdata = rqtl2.control_data(zfile)
    try:
        for lineno, row in enumerate(
                rqtl2.file_data(zfile, "pheno", cdata), start=1):
            for field, value in row.items():
                if field == "id":
                    continue
                if value is not None and not(
                        re.search(r"^([0-9]+\.[0-9]{3,}|[0-9]+\.?0*)$", value)
                        or re.search(r"^0\.0+$", value)
                        or re.search("^0+$", value)):
                    yield (lineno, field, (
                        f"Invalid value '{value}'. Expected numerical value "
                        "with at least 3 decimal places."))
    except rqe.MissingFileError:
        yield (None, None, "Missing 'pheno' file.")