about summary refs log tree commit diff
path: root/scripts/rqtl2/phenotypes_qc.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/rqtl2/phenotypes_qc.py')
-rw-r--r--scripts/rqtl2/phenotypes_qc.py519
1 files changed, 519 insertions, 0 deletions
diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py
new file mode 100644
index 0000000..9f11f57
--- /dev/null
+++ b/scripts/rqtl2/phenotypes_qc.py
@@ -0,0 +1,519 @@
+"""Run quality control on phenotypes-specific files in the bundle."""
+import sys
+import uuid
+import json
+import shutil
+import logging
+import tempfile
+import contextlib
+from pathlib import Path
+from logging import Logger
+from zipfile import ZipFile
+from argparse import Namespace
+import multiprocessing as mproc
+from functools import reduce, partial
+from typing import Union, Iterator, Callable, Optional, Sequence
+
+import MySQLdb as mdb
+from redis import Redis
+
+from r_qtl import r_qtl2 as rqtl2
+from r_qtl import r_qtl2_qc as rqc
+from r_qtl import exceptions as rqe
+from r_qtl.fileerrors import InvalidValue
+
+from functional_tools import chain
+
+from quality_control.checks import decimal_places_pattern
+
+from uploader.files import sha256_digest_over_file
+from uploader.samples.models import samples_by_species_and_population
+
+from scripts.rqtl2.entry import build_main
+from scripts.redis_logger import RedisMessageListHandler
+from scripts.rqtl2.cli_parser import add_bundle_argument
+from scripts.cli_parser import init_cli_parser, add_global_data_arguments
+from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
+
+__MODULE__ = "scripts.rqtl2.phenotypes_qc"
+logging.basicConfig(
+    format=("%(asctime)s - %(levelname)s %(name)s: "
+            "(%(pathname)s: %(lineno)d) %(message)s"))
+logger = logging.getLogger(__MODULE__)
+
+def validate(
+        phenobundle: Path,
+        logger: Logger# pylint: disable=[redefined-outer-name]
+) -> dict:
+    """Check that the bundle is generally valid"""
+    try:
+        rqc.validate_bundle(phenobundle)
+    except rqe.RQTLError as rqtlerr:
+        # logger.error("Bundle file validation failed!", exc_info=True)
+        return {
+            "skip": True,
+            "logger": logger,
+            "phenobundle": phenobundle,
+            "errors": (" ".join(rqtlerr.args),)
+        }
+    return {
+        "errors": tuple(),
+        "skip": False,
+        "phenobundle": phenobundle,
+        "logger": logger
+    }
+
+
+def check_for_mandatory_pheno_keys(
+        phenobundle: Path,
+        logger: Logger,# pylint: disable=[redefined-outer-name]
+        **kwargs
+) -> dict:
+    """Check that the mandatory keys exist for phenotypes."""
+    if kwargs.get("skip", False):
+        return {
+            **kwargs,
+            "logger": logger,
+            "phenobundle": phenobundle
+        }
+
+    _mandatory_keys = ("pheno", "phenocovar")
+    _cdata = rqtl2.read_control_file(phenobundle)
+    _errors = kwargs.get("errors", tuple()) + tuple(
+        f"Expected '{key}' file(s) are not declared in the bundle."
+        for key in _mandatory_keys if key not in _cdata.keys())
+    return {
+        **kwargs,
+        "logger": logger,
+        "phenobundle": phenobundle,
+        "errors": _errors,
+        "skip": len(_errors) > 0
+    }
+
+
+def check_for_averages_files(
+        phenobundle: Path,
+        logger: Logger,# pylint: disable=[redefined-outer-name]
+        **kwargs
+) -> dict:
+    """Check that averages files appear together"""
+    if kwargs.get("skip", False):
+        return {
+            **kwargs,
+            "logger": logger,
+            "phenobundle": phenobundle
+        }
+
+    _together = (("phenose", "phenonum"), ("phenonum", "phenose"))
+    _cdata = rqtl2.read_control_file(phenobundle)
+    _errors = kwargs.get("errors", tuple()) + tuple(
+        f"'{first}' is defined in the control file but there is no "
+        f"corresponding '{second}'"
+        for first, second in _together
+        if ((first in _cdata.keys()) and (second not in _cdata.keys())))
+    return {
+        **kwargs,
+        "logger": logger,
+        "phenobundle": phenobundle,
+        "errors": _errors,
+        "skip": len(_errors) > 0
+    }
+
+
+def extract_bundle(
+        bundle: Path, workdir: Path, jobid: uuid.UUID
+) -> tuple[Path, tuple[Path, ...]]:
+    """Extract the bundle."""
+    with ZipFile(bundle) as zfile:
+        extractiondir = workdir.joinpath(
+            f"{str(jobid)}-{sha256_digest_over_file(bundle)}-{bundle.name}")
+        return extractiondir, rqtl2.extract(zfile, extractiondir)
+
+
+def undo_transpose(filetype: str, cdata: dict, extractiondir):
+    """Undo transposition of all files of type `filetype` in thebundle."""
+    if len(cdata.get(filetype, [])) > 0 and cdata.get(f"{filetype}_transposed", False):
+        files = (extractiondir.joinpath(_file) for _file in cdata[filetype])
+        for _file in files:
+            rqtl2.transpose_csv_with_rename(
+                _file,
+                build_line_splitter(cdata),
+                build_line_joiner(cdata))
+
+
+@contextlib.contextmanager
+def redis_logger(
+        redisuri: str, loggername: str, filename: str, fqkey: str
+) -> Iterator[logging.Logger]:
+    """Build a Redis message-list logger."""
+    rconn = Redis.from_url(redisuri, decode_responses=True)
+    _logger = logging.getLogger(loggername)
+    _logger.propagate = False
+    handler = RedisMessageListHandler(
+        rconn,
+        fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type]
+    handler.setFormatter(logging.getLogger().handlers[0].formatter)
+    _logger.addHandler(handler)
+    try:
+        yield _logger
+    finally:
+        rconn.close()
+
+
+def push_error(rconn: Redis, fqkey: str, error: InvalidValue) -> InvalidValue:
+    """Persist the error in redis."""
+    rconn.rpush(fqkey, json.dumps(error._asdict()))
+    return error
+
+
+def file_fqkey(prefix: str, section: str, filepath: Path) -> str:
+    """Build a files fully-qualified key in a consistent manner"""
+    return f"{prefix}:{section}:{filepath.name}"
+
+
+def qc_phenocovar_file(
+        filepath: Path,
+        redisuri,
+        fqkey: str,
+        separator: str,
+        comment_char: str):
+    """Check that `phenocovar` files are structured correctly."""
+    with (redis_logger(
+            redisuri,
+            f"{__MODULE__}.qc_phenocovar_file",
+            filepath.name,
+            f"{fqkey}:logs") as _logger,
+          Redis.from_url(redisuri, decode_responses=True) as rconn):
+        print("Running QC on file: ", filepath.name)
+        _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
+        _headings = tuple(heading.lower() for heading in next(_csvfile))
+        _errors: tuple[InvalidValue, ...] = tuple()
+        save_error = partial(
+            push_error, rconn, file_fqkey(fqkey, "errors", filepath))
+        for heading in ("description", "units"):
+            if heading not in _headings:
+                _errors = (save_error(InvalidValue(
+                    filepath.name,
+                    "header row",
+                    "-",
+                    "-",
+                    (f"File {filepath.name} is missing the {heading} heading "
+                     "in the header line."))),)
+
+        def collect_errors(errors_and_linecount, line):
+            _errs, _lc = errors_and_linecount
+            _logger.info("Testing record '%s'", line[0])
+            if len(line) != len(_headings):
+                _errs = _errs + (save_error(InvalidValue(
+                    filepath.name,
+                    line[0],
+                    "-",
+                    "-",
+                    (f"Record {_lc} in file {filepath.name} has a different "
+                     "number of columns than the number of headings"))),)
+            _line = dict(zip(_headings, line))
+            if not bool(_line.get("description")):
+                _errs = _errs + (
+                    save_error(InvalidValue(filepath.name,
+                                            _line[_headings[0]],
+                                            "description",
+                                            _line.get("description"),
+                                            "The description is not provided!")),)
+
+            rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+                       mapping={
+                           "status": "checking",
+                           "linecount": _lc+1,
+                           "total-errors": len(_errs)
+                       })
+            return _errs, _lc+1
+
+        _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1))
+        rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+                   mapping={
+                       "status": "completed",
+                       "linecount": _linecount,
+                       "total-errors": len(_errors)
+                   })
+        return {filepath.name: {"errors": _errors, "linecount": _linecount}}
+
+
+def merge_dicts(*dicts):
+    """Merge multiple dicts into a single one."""
+    return reduce(lambda merged, dct: {**merged, **dct}, dicts, {})
+
+
+def decimal_points_error(# pylint: disable=[too-many-arguments,too-many-positional-arguments]
+        filename: str,
+        rowtitle: str,
+        coltitle: str,
+        cellvalue: str,
+        message: str,
+        decimal_places: int = 1
+) -> Optional[InvalidValue]:
+    """Returns an error if the value does not meet the checks."""
+    if not bool(decimal_places_pattern(decimal_places).match(cellvalue)):
+        return InvalidValue(filename, rowtitle, coltitle, cellvalue, message)
+    return None
+
+
+def integer_error(
+        filename: str,
+        rowtitle: str,
+        coltitle: str,
+        cellvalue: str,
+        message: str
+) -> Optional[InvalidValue]:
+    """Returns an error if the value does not meet the checks."""
+    try:
+        value = int(cellvalue)
+        if value <= 0:
+            raise ValueError("Must be a non-zero, positive number.")
+        return None
+    except ValueError as _verr:
+        return InvalidValue(filename, rowtitle, coltitle, cellvalue, message)
+
+
+def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments, too-many-positional-arguments]
+        filepath: Path,
+        redisuri: str,
+        fqkey: str,
+        samples: tuple[str, ...],
+        phenonames: tuple[str, ...],
+        separator: str,
+        comment_char: str,
+        na_strings: Sequence[str],
+        error_fn: Callable = decimal_points_error
+):
+    """Run QC/QA on a `pheno` file."""
+    with (redis_logger(
+            redisuri,
+            f"{__MODULE__}.qc_pheno_file",
+            filepath.name,
+            f"{fqkey}:logs") as _logger,
+          Redis.from_url(redisuri, decode_responses=True) as rconn):
+        print("Running QC on file: ", filepath.name)
+        save_error = partial(
+            push_error, rconn, file_fqkey(fqkey, "errors", filepath))
+        _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
+        _headings: tuple[str, ...] = tuple(
+            # select lowercase for comparison purposes
+            heading.lower() for heading in next(_csvfile))
+        _errors: tuple[InvalidValue, ...] = tuple()
+
+        _absent = tuple(pheno for pheno in _headings[1:] if pheno
+                        not in tuple(
+                            # lower to have consistent case with headings for
+                            # comparison
+                            phe.lower() for phe in phenonames))
+        if len(_absent) > 0:
+            _errors = _errors + (save_error(InvalidValue(
+                filepath.name,
+                "header row",
+                "-",
+                ", ".join(_absent),
+                ("The following phenotype names do not exist in any of the "
+                 f"provided phenocovar files: ({', '.join(_absent)})"))),)
+
+        def collect_errors(errors_and_linecount, line):
+            _errs, _lc = errors_and_linecount
+            _logger.debug("Checking row %s", line[0])
+            if line[0] not in samples:
+                _errs = _errs + (save_error(InvalidValue(
+                filepath.name,
+                line[0],
+                _headings[0],
+                line[0],
+                (f"The sample named '{line[0]}' does not exist in the database. "
+                 "You will need to upload that first."))),)
+
+            for field, value in zip(_headings[1:], line[1:]):
+                if value in na_strings:
+                    continue
+                _err = error_fn(
+                    filepath.name,
+                    line[0],
+                    field,
+                    value)
+                _errs = _errs + ((save_error(_err),) if bool(_err) else tuple())
+
+            rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+                       mapping={
+                           "status": "checking",
+                           "linecount": _lc+1,
+                           "total-errors": len(_errs)
+                       })
+            return _errs, _lc+1
+
+        _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1))
+        rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+                   mapping={
+                       "status": "completed",
+                       "linecount": _linecount,
+                       "total-errors": len(_errors)
+                   })
+        return {filepath.name: {"errors": _errors, "linecount": _linecount}}
+
+
+def phenotype_names(filepath: Path,
+                    separator: str,
+                    comment_char: str) -> tuple[str, ...]:
+    """Read phenotype names from `phenocovar` file."""
+    return reduce(lambda tpl, line: tpl + (line[0],),#type: ignore[arg-type, return-value]
+                  rqtl2.read_csv_file(filepath, separator, comment_char),
+                  tuple())[1:]
+
+def fullyqualifiedkey(
+        prefix: str,
+        rest: Optional[str] = None
+) -> Union[Callable[[str], str], str]:
+    """Compute fully qualified Redis key."""
+    if not bool(rest):
+        return lambda _rest: f"{prefix}:{_rest}"
+    return f"{prefix}:{rest}"
+
+def run_qc(# pylint: disable=[too-many-locals]
+        rconn: Redis,
+        dbconn: mdb.Connection,
+        fullyqualifiedjobid: str,
+        args: Namespace
+) -> int:
+    """Run quality control checks on the bundle."""
+    print("Beginning the quality assurance checks.")
+    results = check_for_averages_files(
+        **check_for_mandatory_pheno_keys(
+            **validate(args.rqtl2bundle, logger)))
+    errors = results.get("errors", tuple())
+    if len(errors) > 0:
+        logger.error("We found the following errors:\n%s",
+                     "\n".join(f" - {error}" for error in errors))
+        return 1
+    # Run QC on actual values
+    #       Steps:
+    #       - Extract file to specific directory
+    extractiondir, *_bundlefiles = extract_bundle(
+        args.rqtl2bundle, args.workingdir, args.jobid)
+
+    #       - For every pheno, phenocovar, phenose, phenonum file, undo
+    #         transposition where relevant
+    cdata = rqtl2.control_data(extractiondir)
+    with mproc.Pool(mproc.cpu_count() - 1) as pool:
+        pool.starmap(
+            undo_transpose,
+            ((ftype, cdata, extractiondir)
+             for ftype in ("pheno", "phenocovar", "phenose", "phenonum")))
+
+    #       - Fetch samples/individuals from database.
+    print("Fetching samples/individuals from the database.")
+    samples = tuple(#type: ignore[var-annotated]
+        item for item in set(reduce(
+            lambda acc, item: acc + (
+                item["Name"], item["Name2"], item["Symbol"], item["Alias"]),
+            samples_by_species_and_population(
+                dbconn, args.speciesid, args.populationid),
+            tuple()))
+        if bool(item))
+
+    #       - Check that `description` and `units` is present in phenocovar for
+    #         all phenotypes
+    rconn.hset(fullyqualifiedjobid,
+               "fully-qualified-keys:phenocovar",
+               json.dumps(tuple(f"{fullyqualifiedjobid}:phenocovar:{_file}"
+                                for _file in cdata.get("phenocovar", []))))
+    with mproc.Pool(mproc.cpu_count() - 1) as pool:
+        print("Check for errors in 'phenocovar' file(s).")
+        _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple(
+            (extractiondir.joinpath(_file),
+             args.redisuri,
+             f"{fullyqualifiedjobid}:phenocovar",
+             cdata["sep"],
+             cdata["comment.char"])
+            for _file in cdata.get("phenocovar", []))))
+
+        #       - Check all samples in pheno files exist in database
+        #       - Check all phenotypes in pheno files exist in phenocovar files
+        #       - Check all numeric values in pheno files
+        phenonames = tuple(set(
+            name for names in pool.starmap(phenotype_names, tuple(
+            (extractiondir.joinpath(_file), cdata["sep"], cdata["comment.char"])
+            for _file in cdata.get("phenocovar", [])))
+            for name in names))
+
+        dec_err_fn = partial(decimal_points_error, message=(
+            "Expected a non-negative number with at least one decimal "
+            "place."))
+
+        print("Check for errors in 'pheno' file(s).")
+        _pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+            extractiondir.joinpath(_file),
+            args.redisuri,
+            chain(
+                "pheno",
+                fullyqualifiedkey(args.jobid),
+                fullyqualifiedkey(args.redisprefix)),
+            samples,
+            phenonames,
+            cdata["sep"],
+            cdata["comment.char"],
+            cdata["na.strings"],
+            dec_err_fn
+        ) for _file in cdata.get("pheno", []))))
+
+        #       - Check the 3 checks above for phenose and phenonum values too
+        # qc_phenose_files(…)
+        # qc_phenonum_files(…)
+        print("Check for errors in 'phenose' file(s).")
+        _phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+            extractiondir.joinpath(_file),
+            args.redisuri,
+            chain(
+                "phenose",
+                fullyqualifiedkey(args.jobid),
+                fullyqualifiedkey(args.redisprefix)),
+            samples,
+            phenonames,
+            cdata["sep"],
+            cdata["comment.char"],
+            cdata["na.strings"],
+            dec_err_fn
+        ) for _file in cdata.get("phenose", []))))
+
+        print("Check for errors in 'phenonum' file(s).")
+        _phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
+            extractiondir.joinpath(_file),
+            args.redisuri,
+            chain(
+                "phenonum",
+                fullyqualifiedkey(args.jobid),
+                fullyqualifiedkey(args.redisprefix)),
+            samples,
+            phenonames,
+            cdata["sep"],
+            cdata["comment.char"],
+            cdata["na.strings"],
+            partial(integer_error, message=(
+                "Expected a non-negative, non-zero integer value."))
+        ) for _file in cdata.get("phenonum", []))))
+
+    #       - Delete all extracted files
+    shutil.rmtree(extractiondir)
+    return 0
+
+
+if __name__ == "__main__":
+    def cli_args():
+        """Process command-line arguments for `install_phenos`"""
+        parser = add_bundle_argument(add_global_data_arguments(init_cli_parser(
+            program="PhenotypesQC",
+            description=(
+                "Perform Quality Control checks on a phenotypes bundle file"))))
+        parser.add_argument(
+            "--workingdir",
+            default=f"{tempfile.gettempdir()}/phenotypes_qc",
+            help=("The directory where this script will put its intermediate "
+                  "files."),
+            type=Path)
+        return parser.parse_args()
+
+    main = build_main(cli_args(), run_qc, logger)
+    sys.exit(main())