From 0b6d924ad8615c2a6c9739c77f5001c96ea3553d Mon Sep 17 00:00:00 2001
From: Frederick Muriuki Muriithi
Date: Mon, 12 Aug 2024 16:26:28 -0500
Subject: Rewrite the QC code for R/qtl2

---
 scripts/qc_on_rqtl2_bundle2.py | 307 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 307 insertions(+)
 create mode 100644 scripts/qc_on_rqtl2_bundle2.py

(limited to 'scripts')

diff --git a/scripts/qc_on_rqtl2_bundle2.py b/scripts/qc_on_rqtl2_bundle2.py
new file mode 100644
index 0000000..f39689e
--- /dev/null
+++ b/scripts/qc_on_rqtl2_bundle2.py
@@ -0,0 +1,307 @@
+"""Run Quality Control checks on R/qtl2 bundle."""
+import os
+import sys
+import json
+from time import sleep
+from pathlib import Path
+from zipfile import ZipFile
+from argparse import Namespace
+import multiprocessing as mproc
+from functools import reduce, partial
+from logging import Logger, getLogger, StreamHandler
+from typing import Union, Sequence, Callable, Iterator
+
+import MySQLdb as mdb
+from redis import Redis
+
+from quality_control.errors import InvalidValue
+from quality_control.checks import decimal_points_error
+
+from uploader import jobs
+from uploader.db_utils import database_connection
+from uploader.check_connections import check_db, check_redis
+
+from r_qtl import r_qtl2 as rqtl2
+from r_qtl import r_qtl2_qc as rqc
+from r_qtl import exceptions as rqe
+from r_qtl import fileerrors as rqfe
+
+from scripts.process_rqtl2_bundle import parse_job
+from scripts.redis_logger import setup_redis_logger
+from scripts.cli_parser import init_cli_parser, add_global_data_arguments
+
+
+def build_line_splitter(cdata: dict) -> Callable[[str], tuple[Union[str, None], ...]]:
+    """Build and return a function to use to split data in the files.
+
+    Parameters
+    ----------
+    cdata: A dict holding the control information included with the R/qtl2
+        bundle.
+
+    Returns
+    -------
+    A function that takes a string and return a tuple of strings.
+    """
+    separator = cdata["sep"]
+    na_strings = cdata["na.strings"]
+    def __splitter__(line: str) -> tuple[Union[str, None], ...]:
+        return tuple(
+            item if item not in na_strings else None
+            for item in
+            (field.strip() for field in line.strip().split(separator)))
+    return __splitter__
+
+
+def build_line_joiner(cdata: dict) -> Callable[[tuple[Union[str, None], ...]], str]:
+    """Build and return a function to use to split data in the files.
+
+    Parameters
+    ----------
+    cdata: A dict holding the control information included with the R/qtl2
+        bundle.
+
+    Returns
+    -------
+    A function that takes a string and return a tuple of strings.
+    """
+    separator = cdata["sep"]
+    na_strings = cdata["na.strings"]
+    def __joiner__(row: tuple[Union[str, None], ...]) -> str:
+        return separator.join(
+            (na_strings[0] if item is None else item)
+            for item in row)
+    return __joiner__
+
+
+def check_for_missing_files(
+        rconn: Redis, fqjobid: str, extractpath: Path, logger: Logger) -> bool:
+    """Check that all files listed in the control file do actually exist."""
+    logger.info("Checking for missing files.")
+    missing = rqc.missing_files(extractpath)
+    # add_to_errors(rconn, fqjobid, "errors-generic", tuple(
+    #     rqfe.MissingFile(
+    #         mfile[0], mfile[1], (
+    #             f"File '{mfile[1]}' is listed in the control file under "
+    #             f"the '{mfile[0]}' key, but it does not actually exist in "
+    #             "the bundle."))
+    #     for mfile in missing))
+    if len(missing) > 0:
+        logger.error(f"Missing files in the bundle!")
+        return True
+    return False
+
+
+def open_file(file_: Path) -> Iterator:
+    """Open file and return one line at a time."""
+    with open(file_, "r", encoding="utf8") as infile:
+        for line in infile:
+            yield line
+
+
+def check_markers(filename: str, row: tuple[str, ...]) -> tuple[rqfe.InvalidValue]:
+    """Check that the markers are okay"""
+    errors = tuple()
+    counts = {}
+    for marker in row:
+        counts = {**counts, marker: counts.get(marker, 0) + 1}
+        if marker is None or marker == "":
+            errors = errors + (rqfe.InvalidValue(
+                filename,
+                "markers"
+                "-",
+                marker,
+                "A marker MUST be a valid value."),)
+
+    return errors + tuple(
+        rqfe.InvalidValue(filename,
+                          "markers",
+                          key,
+                          f"Marker '{key}' was repeated {value} times")
+        for key,value in counts.items() if value > 1)
+
+
+def check_geno_line(
+        filename: str,
+        headers: tuple[str, ...],
+        row: tuple[Union[str, None]],
+        cdata: dict
+) -> tuple[rqfe.InvalidValue]:
+    """Check that the geno line is correct."""
+    errors = tuple()
+    # Verify that line has same number of columns as headers
+    if len(headers) != len(row):
+        errors = errors + (rqfe.InvalidValue(
+            filename,
+            headers[0],
+            row[0],
+            row[0],
+            "Every line MUST have the same number of columns."),)
+
+    # First column is the individuals/cases/samples
+    if not bool(row[0]):
+        errors = errors + (rqfe.InvalidValue(
+            filename,
+            headers[0],
+            row[0],
+            row[0],
+            "The sample/case MUST be a valid value."),)
+
+    def __process_value__(val):
+        if val in cdata["na.strings"]:
+            return None
+        if val in cdata["alleles"]:
+            return cdata["genotypes"][val]
+
+    genocode = cdata.get("genotypes", {})
+    for coltitle, cellvalue in zip(headers[1:],row[1:]):
+        if (
+                bool(genocode) and
+                cellvalue is not None and
+                cellvalue not in genocode.keys()
+        ):
+            errors = errors + (rqfe.InvalidValue(
+                filename, row[0], coltitle, cellvalue,
+                f"Value '{cellvalue}' is invalid. Expected one of "
+                f"'{', '.join(genocode.keys())}'."))
+
+    return errors
+
+
+def file_errors_and_details(
+        file_: Path,
+        filetype: str,
+        cdata: dict,
+        linesplitterfn: Callable,
+        linejoinerfn: Callable,
+        headercheckers: tuple[Callable, ...],
+        bodycheckers: tuple[Callable, ...]
+) -> dict:
+    """Compute errors, and other file metadata."""
+    errors = tuple()
+    if cdata[f"{filetype}_transposed"]:
+        rqtl2.transpose_csv_with_rename(file_, linesplitterfn, linejoinerfn)
+
+    for lineno, line in enumerate(open_file(file_), start=1):
+        row = linesplitterfn(line)
+        if lineno == 1:
+            headers = tuple(row)
+            errors = errors + reduce(
+                lambda errs, fnct: errs + fnct(file_.name, row[1:]),
+                headercheckers,
+                tuple())
+            continue
+
+        errors = errors + reduce(
+            lambda errs, fnct: errs + fnct(file_.name, headers, row, cdata),
+            bodycheckers,
+            tuple())
+            
+    return {
+        "filename": file_.name,
+        "filesize": os.stat(file_).st_size,
+        "linecount": lineno,
+        # TOD0: Maybe put the errors in a redis list and return the name of the
+        #     redis list rather than the errors. Maybe also replace the errors
+        #     key with a flag e.g. "errors-found": True/False
+        "errors": errors
+    }
+
+
+def check_for_geno_errors(
+        extractdir: Path,
+        cdata: dict,
+        linesplitterfn: Callable[[str], tuple[Union[str, None]]],
+        linejoinerfn: Callable[[tuple[Union[str, None], ...]], str],
+        logger: Logger) -> bool:
+    """Check for errors in genotype files."""
+    if "geno" in cdata:
+        genofiles = tuple(extractdir.joinpath(fname) for fname in cdata["geno"])
+        ## Run code below in multiprocessing once you verify it works.
+        gerrs = tuple(file_errors_and_details(
+            file_,
+            filetype="geno",
+            cdata=cdata,
+            linesplitterfn=linesplitterfn,
+            linejoinerfn=linejoinerfn,
+            headercheckers=(check_markers,),
+            bodycheckers=(check_geno_line,)) for file_ in genofiles)
+        # TOD0: Add the errors to redis
+        if len(gerrs) > 0:
+            logger.error("At least one of the 'geno' files has (an) error(s).")
+            return True
+        logger.info("No error(s) found in any of the 'geno' files.")
+    return False
+
+
+# def check_for_pheno_errors(...):
+#     """Check for errors in phenotype files."""
+#     pass
+
+
+# def check_for_phenose_errors(...):
+#     """Check for errors in phenotype, standard-error files."""
+#     pass
+
+
+# def check_for_phenocovar_errors(...):
+#     """Check for errors in phenotype-covariates files."""
+#     pass
+
+
+def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int:
+    """Run quality control checks on R/qtl2 bundles."""
+    fqjobid = jobs.job_key(args.redisprefix, args.jobid)
+    thejob = parse_job(rconn, args.redisprefix, args.jobid)
+    print(f"THE JOB =================> {thejob}")
+    jobmeta = thejob["job-metadata"]
+    inpath = Path(jobmeta["rqtl2-bundle-file"])
+    extractdir = inpath.parent.joinpath(f"{inpath.name}__extraction_dir")
+    with ZipFile(inpath, "r") as zfile:
+        rqtl2.extract(zfile, extractdir)
+
+    ### BEGIN: The quality control checks ###
+    cdata = rqtl2.control_data(extractdir)
+    splitter = build_line_splitter(cdata)
+    joiner = build_line_joiner(cdata)
+    check_for_missing_files(rconn, fqjobid, extractdir, logger)
+    check_for_geno_errors(extractdir, cdata, splitter, joiner, logger)
+    # check_for_pheno_errors(...)
+    # check_for_phenose_errors(...)
+    # check_for_phenocovar_errors(...)
+    ### END: The quality control checks ###
+
+    def __fetch_errors__(rkey: str) -> tuple:
+        return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]"))
+
+    return (1 if any((
+        bool(__fetch_errors__(key))
+        for key in
+        ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar")))
+            else 0)
+
+
+if __name__ == "__main__":
+    def main():
+        """Enter R/qtl2 bundle QC runner."""
+        args = add_global_data_arguments(init_cli_parser(
+            "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.")).parse_args()
+        check_redis(args.redisuri)
+        check_db(args.databaseuri)
+
+        logger = getLogger("qc-on-rqtl2-bundle")
+        logger.addHandler(StreamHandler(stream=sys.stderr))
+        logger.setLevel("DEBUG")
+
+        fqjobid = jobs.job_key(args.redisprefix, args.jobid)
+        with Redis.from_url(args.redisuri, decode_responses=True) as rconn:
+            logger.addHandler(setup_redis_logger(
+                rconn, fqjobid, f"{fqjobid}:log-messages",
+                args.redisexpiry))
+
+            exitcode = run_qc(rconn, args, logger)
+            rconn.hset(
+                jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode)
+            return exitcode
+
+    sys.exit(main())
-- 
cgit v1.2.3