about summary refs log tree commit diff
path: root/scripts/rqtl2
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/rqtl2')
-rw-r--r--scripts/rqtl2/entry.py46
-rw-r--r--scripts/rqtl2/install_genotypes.py5
-rw-r--r--scripts/rqtl2/install_phenos.py5
-rw-r--r--scripts/rqtl2/phenotypes_qc.py167
4 files changed, 140 insertions, 83 deletions
diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py
index 2a18aa3..e0e00e7 100644
--- a/scripts/rqtl2/entry.py
+++ b/scripts/rqtl2/entry.py
@@ -16,41 +16,41 @@ from scripts.redis_logger import setup_redis_logger
 
 def build_main(
         args: Namespace,
-        run_fn: Callable[[Connection, Namespace, logging.Logger], int],
-        loggername: str
+        run_fn: Callable[
+            [Redis, Connection, str, Namespace, logging.Logger],
+            int
+        ],
+        logger: logging.Logger
 ) -> Callable[[],int]:
     """Build a function to be used as an entry-point for scripts."""
     def main():
-        logging.basicConfig(
-            format=(
-                "%(asctime)s - %(levelname)s %(name)s: "
-                "(%(pathname)s: %(lineno)d) %(message)s"),
-            level=args.loglevel)
-        logger = logging.getLogger(loggername)
-        check_db(args.databaseuri)
-        check_redis(args.redisuri)
-        if not args.rqtl2bundle.exists():
-            logger.error("File not found: '%s'.", args.rqtl2bundle)
-            return 2
-
         with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
               database_connection(args.databaseuri) as dbconn):
+            logger.setLevel(args.loglevel.upper())
             fqjobid = jobs.job_key(args.redisprefix, args.jobid)
-            rconn.hset(fqjobid, "status", "started")
-            logger.addHandler(setup_redis_logger(
-                rconn,
-                fqjobid,
-                f"{fqjobid}:log-messages",
-                args.redisexpiry))
-            logger.addHandler(StreamHandler(stream=sys.stdout))
+
             try:
-                returncode = run_fn(dbconn, args, logger)
+                rconn.hset(fqjobid, "status", "started")
+                logger.addHandler(setup_redis_logger(
+                    rconn,
+                    fqjobid,
+                    f"{fqjobid}:log-messages",
+                    args.redisexpiry))
+                logger.addHandler(StreamHandler(stream=sys.stderr))
+
+                check_db(args.databaseuri)
+                check_redis(args.redisuri)
+                if not args.rqtl2bundle.exists():
+                    logger.error("File not found: '%s'.", args.rqtl2bundle)
+                    return 2
+
+                returncode = run_fn(rconn, dbconn, fqjobid, args)
                 if returncode == 0:
                     rconn.hset(fqjobid, "status", "completed:success")
                     return returncode
                 rconn.hset(fqjobid, "status", "completed:error")
                 return returncode
-            except Exception as _exc:
+            except Exception as _exc:# pylint: disable=[broad-except]
                 logger.error("The process failed!", exc_info=True)
                 rconn.hset(fqjobid, "status", "completed:error")
                 return 4
diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py
index 20a19da..8762655 100644
--- a/scripts/rqtl2/install_genotypes.py
+++ b/scripts/rqtl2/install_genotypes.py
@@ -7,6 +7,7 @@ from functools import reduce
 from typing import Iterator, Optional
 from logging import Logger, getLogger
 
+from redis import Redis
 import MySQLdb as mdb
 from MySQLdb.cursors import DictCursor
 
@@ -185,8 +186,10 @@ def cross_reference_genotypes(
         cursor.executemany(insertquery, insertparams)
         return cursor.rowcount
 
-def install_genotypes(#pylint: disable=[too-many-arguments, too-many-locals]
+def install_genotypes(#pylint: disable=[too-many-locals]
+        rconn: Redis,#pylint: disable=[unused-argument]
         dbconn: mdb.Connection,
+        fullyqualifiedjobid: str,#pylint: disable=[unused-argument]
         args: argparse.Namespace,
         logger: Logger = getLogger(__name__)
 ) -> int:
diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py
index a6e9fb2..9059cd6 100644
--- a/scripts/rqtl2/install_phenos.py
+++ b/scripts/rqtl2/install_phenos.py
@@ -6,6 +6,7 @@ from zipfile import ZipFile
 from functools import reduce
 from logging import Logger, getLogger
 
+from redis import Redis
 import MySQLdb as mdb
 from MySQLdb.cursors import DictCursor
 
@@ -95,8 +96,10 @@ def cross_reference_probeset_data(dbconn: mdb.Connection,
             } for row in dataids))
         return cursor.rowcount
 
-def install_pheno_files(#pylint: disable=[too-many-arguments, too-many-locals]
+def install_pheno_files(#pylint: disable=[too-many-locals]
+        rconn: Redis,#pylint: disable=[unused-argument]
         dbconn: mdb.Connection,
+        fullyqualifiedjobid: str,#pylint: disable=[unused-argument]
         args: argparse.Namespace,
         logger: Logger = getLogger()) -> int:
     """Load data in `pheno` files and other related files into the database."""
diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py
index 4f55e40..9f11f57 100644
--- a/scripts/rqtl2/phenotypes_qc.py
+++ b/scripts/rqtl2/phenotypes_qc.py
@@ -1,6 +1,7 @@
 """Run quality control on phenotypes-specific files in the bundle."""
 import sys
 import uuid
+import json
 import shutil
 import logging
 import tempfile
@@ -35,8 +36,15 @@ from scripts.cli_parser import init_cli_parser, add_global_data_arguments
 from scripts.rqtl2.bundleutils import build_line_joiner, build_line_splitter
 
 __MODULE__ = "scripts.rqtl2.phenotypes_qc"
+logging.basicConfig(
+    format=("%(asctime)s - %(levelname)s %(name)s: "
+            "(%(pathname)s: %(lineno)d) %(message)s"))
+logger = logging.getLogger(__MODULE__)
 
-def validate(phenobundle: Path, logger: Logger) -> dict:
+def validate(
+        phenobundle: Path,
+        logger: Logger# pylint: disable=[redefined-outer-name]
+) -> dict:
     """Check that the bundle is generally valid"""
     try:
         rqc.validate_bundle(phenobundle)
@@ -58,7 +66,7 @@ def validate(phenobundle: Path, logger: Logger) -> dict:
 
 def check_for_mandatory_pheno_keys(
         phenobundle: Path,
-        logger: Logger,
+        logger: Logger,# pylint: disable=[redefined-outer-name]
         **kwargs
 ) -> dict:
     """Check that the mandatory keys exist for phenotypes."""
@@ -85,7 +93,7 @@ def check_for_mandatory_pheno_keys(
 
 def check_for_averages_files(
         phenobundle: Path,
-        logger: Logger,
+        logger: Logger,# pylint: disable=[redefined-outer-name]
         **kwargs
 ) -> dict:
     """Check that averages files appear together"""
@@ -139,19 +147,30 @@ def redis_logger(
 ) -> Iterator[logging.Logger]:
     """Build a Redis message-list logger."""
     rconn = Redis.from_url(redisuri, decode_responses=True)
-    logger = logging.getLogger(loggername)
-    logger.propagate = False
+    _logger = logging.getLogger(loggername)
+    _logger.propagate = False
     handler = RedisMessageListHandler(
         rconn,
         fullyqualifiedkey(fqkey, filename))#type: ignore[arg-type]
     handler.setFormatter(logging.getLogger().handlers[0].formatter)
-    logger.addHandler(handler)
+    _logger.addHandler(handler)
     try:
-        yield logger
+        yield _logger
     finally:
         rconn.close()
 
 
+def push_error(rconn: Redis, fqkey: str, error: InvalidValue) -> InvalidValue:
+    """Persist the error in redis."""
+    rconn.rpush(fqkey, json.dumps(error._asdict()))
+    return error
+
+
+def file_fqkey(prefix: str, section: str, filepath: Path) -> str:
+    """Build a files fully-qualified key in a consistent manner"""
+    return f"{prefix}:{section}:{filepath.name}"
+
+
 def qc_phenocovar_file(
         filepath: Path,
         redisuri,
@@ -159,52 +178,64 @@ def qc_phenocovar_file(
         separator: str,
         comment_char: str):
     """Check that `phenocovar` files are structured correctly."""
-    with redis_logger(
+    with (redis_logger(
             redisuri,
             f"{__MODULE__}.qc_phenocovar_file",
             filepath.name,
-            fqkey) as logger:
-        logger.info("Running QC on file: %s", filepath.name)
+            f"{fqkey}:logs") as _logger,
+          Redis.from_url(redisuri, decode_responses=True) as rconn):
+        print("Running QC on file: ", filepath.name)
         _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
         _headings = tuple(heading.lower() for heading in next(_csvfile))
         _errors: tuple[InvalidValue, ...] = tuple()
+        save_error = partial(
+            push_error, rconn, file_fqkey(fqkey, "errors", filepath))
         for heading in ("description", "units"):
             if heading not in _headings:
-                _errors = (InvalidValue(
+                _errors = (save_error(InvalidValue(
                     filepath.name,
                     "header row",
                     "-",
                     "-",
                     (f"File {filepath.name} is missing the {heading} heading "
-                     "in the header line.")),)
+                     "in the header line."))),)
 
         def collect_errors(errors_and_linecount, line):
             _errs, _lc = errors_and_linecount
-            logger.info("Testing record '%s'", line[0])
+            _logger.info("Testing record '%s'", line[0])
             if len(line) != len(_headings):
-                _errs = _errs + (InvalidValue(
+                _errs = _errs + (save_error(InvalidValue(
                     filepath.name,
                     line[0],
                     "-",
                     "-",
                     (f"Record {_lc} in file {filepath.name} has a different "
-                        "number of columns than the number of headings")),)
+                     "number of columns than the number of headings"))),)
             _line = dict(zip(_headings, line))
-            if not bool(_line["description"]):
+            if not bool(_line.get("description")):
                 _errs = _errs + (
-                    InvalidValue(filepath.name,
-                                 _line[_headings[0]],
-                                 "description",
-                                 _line["description"],
-                                 "The description is not provided!"),)
-
+                    save_error(InvalidValue(filepath.name,
+                                            _line[_headings[0]],
+                                            "description",
+                                            _line.get("description"),
+                                            "The description is not provided!")),)
+
+            rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+                       mapping={
+                           "status": "checking",
+                           "linecount": _lc+1,
+                           "total-errors": len(_errs)
+                       })
             return _errs, _lc+1
 
-        return {
-            filepath.name: dict(zip(
-                ("errors", "linecount"),
-                reduce(collect_errors, _csvfile, (_errors, 1))))
-        }
+        _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1))
+        rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+                   mapping={
+                       "status": "completed",
+                       "linecount": _linecount,
+                       "total-errors": len(_errors)
+                   })
+        return {filepath.name: {"errors": _errors, "linecount": _linecount}}
 
 
 def merge_dicts(*dicts):
@@ -212,7 +243,7 @@ def merge_dicts(*dicts):
     return reduce(lambda merged, dct: {**merged, **dct}, dicts, {})
 
 
-def decimal_points_error(# pylint: disable=[too-many-arguments]
+def decimal_points_error(# pylint: disable=[too-many-arguments,too-many-positional-arguments]
         filename: str,
         rowtitle: str,
         coltitle: str,
@@ -243,7 +274,7 @@ def integer_error(
         return InvalidValue(filename, rowtitle, coltitle, cellvalue, message)
 
 
-def qc_pheno_file(# pylint: disable=[too-many-arguments]
+def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments, too-many-positional-arguments]
         filepath: Path,
         redisuri: str,
         fqkey: str,
@@ -255,37 +286,46 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments]
         error_fn: Callable = decimal_points_error
 ):
     """Run QC/QA on a `pheno` file."""
-    with redis_logger(
+    with (redis_logger(
             redisuri,
             f"{__MODULE__}.qc_pheno_file",
             filepath.name,
-            fqkey) as logger:
-        logger.info("Running QC on file: %s", filepath.name)
+            f"{fqkey}:logs") as _logger,
+          Redis.from_url(redisuri, decode_responses=True) as rconn):
+        print("Running QC on file: ", filepath.name)
+        save_error = partial(
+            push_error, rconn, file_fqkey(fqkey, "errors", filepath))
         _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char)
         _headings: tuple[str, ...] = tuple(
+            # select lowercase for comparison purposes
             heading.lower() for heading in next(_csvfile))
         _errors: tuple[InvalidValue, ...] = tuple()
 
-        _absent = tuple(pheno for pheno in _headings[1:] if pheno not in phenonames)
+        _absent = tuple(pheno for pheno in _headings[1:] if pheno
+                        not in tuple(
+                            # lower to have consistent case with headings for
+                            # comparison
+                            phe.lower() for phe in phenonames))
         if len(_absent) > 0:
-            _errors = _errors + (InvalidValue(
+            _errors = _errors + (save_error(InvalidValue(
                 filepath.name,
                 "header row",
                 "-",
                 ", ".join(_absent),
-                (f"The phenotype names ({', '.join(samples)}) do not exist in any "
-                 "of the provided phenocovar files.")),)
+                ("The following phenotype names do not exist in any of the "
+                 f"provided phenocovar files: ({', '.join(_absent)})"))),)
 
         def collect_errors(errors_and_linecount, line):
             _errs, _lc = errors_and_linecount
+            _logger.debug("Checking row %s", line[0])
             if line[0] not in samples:
-                _errs = _errs + (InvalidValue(
+                _errs = _errs + (save_error(InvalidValue(
                 filepath.name,
                 line[0],
                 _headings[0],
                 line[0],
                 (f"The sample named '{line[0]}' does not exist in the database. "
-                 "You will need to upload that first.")),)
+                 "You will need to upload that first."))),)
 
             for field, value in zip(_headings[1:], line[1:]):
                 if value in na_strings:
@@ -295,15 +335,24 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments]
                     line[0],
                     field,
                     value)
-                _errs = _errs + ((_err,) if bool(_err) else tuple())
-
+                _errs = _errs + ((save_error(_err),) if bool(_err) else tuple())
+
+            rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+                       mapping={
+                           "status": "checking",
+                           "linecount": _lc+1,
+                           "total-errors": len(_errs)
+                       })
             return _errs, _lc+1
 
-        return {
-            filepath.name: dict(zip(
-                ("errors", "linecount"),
-                reduce(collect_errors, _csvfile, (_errors, 1))))
-        }
+        _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1))
+        rconn.hset(file_fqkey(fqkey, "metadata", filepath),
+                   mapping={
+                       "status": "completed",
+                       "linecount": _linecount,
+                       "total-errors": len(_errors)
+                   })
+        return {filepath.name: {"errors": _errors, "linecount": _linecount}}
 
 
 def phenotype_names(filepath: Path,
@@ -324,12 +373,13 @@ def fullyqualifiedkey(
     return f"{prefix}:{rest}"
 
 def run_qc(# pylint: disable=[too-many-locals]
+        rconn: Redis,
         dbconn: mdb.Connection,
-        args: Namespace,
-        logger: Logger
+        fullyqualifiedjobid: str,
+        args: Namespace
 ) -> int:
     """Run quality control checks on the bundle."""
-    logger.debug("Beginning the quality assurance checks.")
+    print("Beginning the quality assurance checks.")
     results = check_for_averages_files(
         **check_for_mandatory_pheno_keys(
             **validate(args.rqtl2bundle, logger)))
@@ -354,7 +404,7 @@ def run_qc(# pylint: disable=[too-many-locals]
              for ftype in ("pheno", "phenocovar", "phenose", "phenonum")))
 
     #       - Fetch samples/individuals from database.
-    logger.debug("Fetching samples/individuals from the database.")
+    print("Fetching samples/individuals from the database.")
     samples = tuple(#type: ignore[var-annotated]
         item for item in set(reduce(
             lambda acc, item: acc + (
@@ -366,15 +416,16 @@ def run_qc(# pylint: disable=[too-many-locals]
 
     #       - Check that `description` and `units` is present in phenocovar for
     #         all phenotypes
+    rconn.hset(fullyqualifiedjobid,
+               "fully-qualified-keys:phenocovar",
+               json.dumps(tuple(f"{fullyqualifiedjobid}:phenocovar:{_file}"
+                                for _file in cdata.get("phenocovar", []))))
     with mproc.Pool(mproc.cpu_count() - 1) as pool:
-        logger.debug("Check for errors in 'phenocovar' file(s).")
+        print("Check for errors in 'phenocovar' file(s).")
         _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple(
             (extractiondir.joinpath(_file),
              args.redisuri,
-             chain(
-                 "phenocovar",
-                 fullyqualifiedkey(args.jobid),
-                 fullyqualifiedkey(args.redisprefix)),
+             f"{fullyqualifiedjobid}:phenocovar",
              cdata["sep"],
              cdata["comment.char"])
             for _file in cdata.get("phenocovar", []))))
@@ -392,7 +443,7 @@ def run_qc(# pylint: disable=[too-many-locals]
             "Expected a non-negative number with at least one decimal "
             "place."))
 
-        logger.debug("Check for errors in 'pheno' file(s).")
+        print("Check for errors in 'pheno' file(s).")
         _pheno_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
             extractiondir.joinpath(_file),
             args.redisuri,
@@ -411,7 +462,7 @@ def run_qc(# pylint: disable=[too-many-locals]
         #       - Check the 3 checks above for phenose and phenonum values too
         # qc_phenose_files(…)
         # qc_phenonum_files(…)
-        logger.debug("Check for errors in 'phenose' file(s).")
+        print("Check for errors in 'phenose' file(s).")
         _phenose_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
             extractiondir.joinpath(_file),
             args.redisuri,
@@ -427,7 +478,7 @@ def run_qc(# pylint: disable=[too-many-locals]
             dec_err_fn
         ) for _file in cdata.get("phenose", []))))
 
-        logger.debug("Check for errors in 'phenonum' file(s).")
+        print("Check for errors in 'phenonum' file(s).")
         _phenonum_qc_res = merge_dicts(*pool.starmap(qc_pheno_file, tuple((
             extractiondir.joinpath(_file),
             args.redisuri,
@@ -464,5 +515,5 @@ if __name__ == "__main__":
             type=Path)
         return parser.parse_args()
 
-    main = build_main(cli_args(), run_qc, __MODULE__)
+    main = build_main(cli_args(), run_qc, logger)
     sys.exit(main())