about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--scripts/rqtl2/phenotypes_qc.py173
1 files changed, 117 insertions, 56 deletions
diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py
index 3448790..4c02578 100644
--- a/scripts/rqtl2/phenotypes_qc.py
+++ b/scripts/rqtl2/phenotypes_qc.py
@@ -1,31 +1,40 @@
 """Run quality control on phenotypes-specific files in the bundle."""
 import sys
 import shutil
+import logging
 import tempfile
+import contextlib
 from pathlib import Path
+from logging import Logger
 from zipfile import ZipFile
+from argparse import Namespace
 import multiprocessing as mproc
 from functools import reduce, partial
-from typing import Callable, Optional, Sequence
-from logging import Logger, getLogger, StreamHandler
+from typing import Union, Iterator, Callable, Optional, Sequence
 
 import MySQLdb as mdb
+from redis import Redis
 
 from r_qtl import r_qtl2 as rqtl2
 from r_qtl import r_qtl2_qc as rqc
 from r_qtl import exceptions as rqe
 from r_qtl.fileerrors import InvalidValue
 
+from functional_tools import chain
+
 from quality_control.checks import decimal_places_pattern
 
 from uploader.files import sha256_digest_over_file
 from uploader.samples.models import samples_by_species_and_population
 
 from scripts.rqtl2.entry import build_main
+from scripts.redis_logger import RedisMessageListHandler
 from scripts.rqtl2.cli_parser import add_bundle_argument
 from scripts.cli_parser import init_cli_parser, add_global_data_arguments
 from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
 
+__MODULE__ = "scripts.rqtl2.phenotypes_qc"
+
 def validate(phenobundle: Path, logger: Logger) -> dict:
     """Check that the bundle is generally valid"""
     try:
@@ -121,47 +130,78 @@ def undo_transpose(filetype: str, cdata: dict, extractiondir):
                 build_line_joiner(cdata))
 
 
-def qc_phenocovar_file(phenocovarfile: Path, separator: str, comment_char: str):
-    """Check that `phenocovar` files are structured correctly."""
-    _csvfile = rqtl2.read_csv_file(phenocovarfile, separator, comment_char)
-    _headings = tuple(heading.lower() for heading in next(_csvfile))
-    _errors = tuple()
-    for heading in ("description", "units"):
-        if heading not in _headings:
-            _errors = (InvalidValue(
-                phenocovarfile.name,
-                "header row",
-                "-",
-                "-",
-                (f"File {phenocovarfile.name} is missing the {heading} heading "
-                 "in the header line.")),)
+@contextlib.contextmanager
+def redis_logger(
+        redisuri: str, loggername: str, filename: str, fqkey: str
+) -> Iterator[logging.Logger]:
+    """Build a Redis message-list logger."""
+    rconn = Redis.from_url(redisuri, decode_responses=True)
+    logger = logging.getLogger(loggername)
+    logger.propagate = False
+    handler = RedisMessageListHandler(
+        rconn,
+        fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type]
+    handler.setFormatter(logging.getLogger().handlers[0].formatter)
+    logger.addHandler(handler)
+    try:
+        yield logger
+    finally:
+        rconn.close()
 
-    def collect_errors(errors_and_linecount, line):
-        _errs, _lc = errors_and_linecount
-        if len(line) != len(_headings):
-            _errs = _errs + (InvalidValue(
-                phenocovarfile.name,
-                line[0],
-                "-",
-                "-",
-                (f"Record {_lc} in file {phenocovarfile.name} has a different "
-                    "number of columns than the number of headings")),)
-        _line = dict(zip(_headings, line))
-        if not bool(_line["description"]):
-            _errs = _errs + (
-                InvalidValue(phenocovarfile.name,
-                             _line[_headings[0]],
-                             "description",
-                             _line["description"],
-                             "The description is not provided!"),)
 
-        return _errs, _lc+1
+def qc_phenocovar_file(
+        filename: Path,
+        redisuri,
+        fqkey: str,
+        separator: str,
+        comment_char: str):
+    """Check that `phenocovar` files are structured correctly."""
+    with redis_logger(
+            redisuri,
+            f"{__MODULE__}.qc_phenocovar_file",
+            filename.name,
+            fqkey) as logger:
+        logger.info("Running QC on file: %s", filename.name)
+        _csvfile = rqtl2.read_csv_file(filename, separator, comment_char)
+        _headings = tuple(heading.lower() for heading in next(_csvfile))
+        _errors: tuple[InvalidValue, ...] = tuple()
+        for heading in ("description", "units"):
+            if heading not in _headings:
+                _errors = (InvalidValue(
+                    filename.name,
+                    "header row",
+                    "-",
+                    "-",
+                    (f"File {filename.name} is missing the {heading} heading "
+                     "in the header line.")),)
+
+        def collect_errors(errors_and_linecount, line):
+            _errs, _lc = errors_and_linecount
+            logger.info("Testing record '%s'", line[0])
+            if len(line) != len(_headings):
+                _errs = _errs + (InvalidValue(
+                    filename.name,
+                    line[0],
+                    "-",
+                    "-",
+                    (f"Record {_lc} in file {filename.name} has a different "
+                        "number of columns than the number of headings")),)
+            _line = dict(zip(_headings, line))
+            if not bool(_line["description"]):
+                _errs = _errs + (
+                    InvalidValue(filename.name,
+                                 _line[_headings[0]],
+                                 "description",
+                                 _line["description"],
+                                 "The description is not provided!"),)
+
+            return _errs, _lc+1
 
-    return {
-        phenocovarfile.name: dict(zip(
-            ("errors", "linecount"),
-            reduce(collect_errors, _csvfile, (_errors, 1))))
-    }
+        return {
+            filename.name: dict(zip(
+                ("errors", "linecount"),
+                reduce(collect_errors, _csvfile, (_errors, 1))))
+        }
 
 
 def merge_dicts(*dicts):
@@ -211,8 +251,9 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments]
 ):
     """Run QC/QA on a `pheno` file."""
     _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
-    _headings = tuple(heading.lower() for heading in next(_csvfile))
-    _errors = tuple()
+    _headings: tuple[str, ...] = tuple(
+        heading.lower() for heading in next(_csvfile))
+    _errors: tuple[InvalidValue, ...] = tuple()
 
     _absent = tuple(pheno for pheno in _headings[1:] if pheno not in phenonames)
     if len(_absent) > 0:
@@ -258,20 +299,28 @@ def phenotype_names(filepath: Path,
                     separator: str,
                     comment_char: str) -> tuple[str, ...]:
     """Read phenotype names from `phenocovar` file."""
-    return reduce(lambda tpl, line: tpl + (line[0],),
+    return reduce(lambda tpl, line: tpl + (line[0],),#type: ignore[arg-type, return-value]
                   rqtl2.read_csv_file(filepath, separator, comment_char),
                   tuple())[1:]
 
+def fullyqualifiedkey(
+        prefix: str,
+        rest: Optional[str] = None
+) -> Union[Callable[[str], str], str]:
+    """Compute fully qualified Redis key."""
+    if not bool(rest):
+        return lambda _rest: f"{prefix}:{_rest}"
+    return f"{prefix}:{rest}"
 
-def run_qc(# pylint: disable=[too-many-arguments]
+def run_qc(# pylint: disable=[too-many-locals]
         dbconn: mdb.Connection,
-        phenobundle: Path,
-        workingdir: Path,
-        speciesid: int,
-        populationid: int,
+        args: Namespace,
         logger: Logger
 ) -> int:
     """Run quality control checks on the bundle."""
+    (phenobundle, workingdir, speciesid, populationid) = (
+        args.rqtl2bundle, args.workingdir, args.speciesid, args.populationid)
+    logger.debug("Beginning the quality assuarance checks.")
     results = check_for_averages_files(
         **check_for_mandatory_pheno_keys(
             **validate(phenobundle, logger)))
@@ -295,7 +344,8 @@ def run_qc(# pylint: disable=[too-many-arguments]
              for ftype in ("pheno", "phenocovar", "phenose", "phenonum")))
 
     #       - Fetch samples/individuals from database.
-    samples = tuple(
+    logger.debug("Fetching samples/individuals from the database.")
+    samples = tuple(#type: ignore[var-annotated]
         item for item in set(reduce(
             lambda acc, item: acc + (
                 item["Name"], item["Name2"], item["Symbol"], item["Alias"]),
@@ -306,8 +356,16 @@ def run_qc(# pylint: disable=[too-many-arguments]
     #       - Check that `description` and `units` is present in phenocovar for
     #         all phenotypes
     with mproc.Pool(mproc.cpu_count() - 1) as pool:
-        phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple(
-            (extractiondir.joinpath(_file), cdata["sep"], cdata["comment.char"])
+        logger.debug("Check for errors in 'phenocovar' file(s).")
+        _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple(
+            (extractiondir.joinpath(_file),
+             args.redisuri,
+             chain(
+                 "phenocovar",
+                 fullyqualifiedkey(args.jobid),
+                 fullyqualifiedkey(args.redisprefix)),
+             cdata["sep"],
+             cdata["comment.char"])
             for _file in cdata.get("phenocovar", []))))
 
         #       - Check all samples in pheno files exist in database
@@ -322,7 +380,9 @@ def run_qc(# pylint: disable=[too-many-arguments]
         dec_err_fn = partial(decimal_points_error, message=(
             "Expected a non-negative number with at least one decimal "
             "place."))
-        pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+
+        logger.debug("Check for errors in 'pheno' file(s).")
+        _pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
             extractiondir.joinpath(_file),
             samples,
             phenonames,
@@ -335,7 +395,8 @@ def run_qc(# pylint: disable=[too-many-arguments]
         #       - Check the 3 checks above for phenose and phenonum values too
         # qc_phenose_files(…)
         # qc_phenonum_files(…)
-        phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+        logger.debug("Check for errors in 'phenose' file(s).")
+        _phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
             extractiondir.joinpath(_file),
             samples,
             phenonames,
@@ -345,7 +406,8 @@ def run_qc(# pylint: disable=[too-many-arguments]
             dec_err_fn
         ) for _file in cdata.get("phenose", []))))
 
-        phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+        logger.debug("Check for errors in 'phenonum' file(s).")
+        _phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
             extractiondir.joinpath(_file),
             samples,
             phenonames,
@@ -362,7 +424,6 @@ def run_qc(# pylint: disable=[too-many-arguments]
 
 
 if __name__ == "__main__":
-
     def cli_args():
         """Process command-line arguments for `install_phenos`"""
         parser = add_bundle_argument(add_global_data_arguments(init_cli_parser(
@@ -370,7 +431,7 @@ if __name__ == "__main__":
             description=(
                 "Perform Quality Control checks on a phenotypes bundle file"))))
         parser.add_argument(
-            "--working-dir",
+            "--workingdir",
             default=f"{tempfile.gettempdir()}/phenotypes_qc",
             help=("The directory where this script will put its intermediate "
                   "files."),