diff options
Diffstat (limited to 'scripts/rqtl2')
-rw-r--r-- | scripts/rqtl2/entry.py | 60 | ||||
-rw-r--r-- | scripts/rqtl2/install_genotypes.py | 5 | ||||
-rw-r--r-- | scripts/rqtl2/install_phenos.py | 5 | ||||
-rw-r--r-- | scripts/rqtl2/phenotypes_qc.py | 112 |
4 files changed, 116 insertions, 66 deletions
diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py index 2a18aa3..327ed2c 100644 --- a/scripts/rqtl2/entry.py +++ b/scripts/rqtl2/entry.py @@ -16,43 +16,47 @@ from scripts.redis_logger import setup_redis_logger def build_main( args: Namespace, - run_fn: Callable[[Connection, Namespace, logging.Logger], int], + run_fn: Callable[ + [Redis, Connection, str, Namespace, logging.Logger], + int + ], loggername: str ) -> Callable[[],int]: """Build a function to be used as an entry-point for scripts.""" def main(): - logging.basicConfig( - format=( - "%(asctime)s - %(levelname)s %(name)s: " - "(%(pathname)s: %(lineno)d) %(message)s"), - level=args.loglevel) - logger = logging.getLogger(loggername) - check_db(args.databaseuri) - check_redis(args.redisuri) - if not args.rqtl2bundle.exists(): - logger.error("File not found: '%s'.", args.rqtl2bundle) - return 2 + try: + logging.basicConfig( + format=( + "%(asctime)s - %(levelname)s %(name)s: " + "(%(pathname)s: %(lineno)d) %(message)s"), + level=args.loglevel) + logger = logging.getLogger(loggername) + with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, + database_connection(args.databaseuri) as dbconn): + fqjobid = jobs.job_key(args.redisprefix, args.jobid) + rconn.hset(fqjobid, "status", "started") + logger.addHandler(setup_redis_logger( + rconn, + fqjobid, + f"{fqjobid}:log-messages", + args.redisexpiry)) + logger.addHandler(StreamHandler(stream=sys.stdout)) - with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, - database_connection(args.databaseuri) as dbconn): - fqjobid = jobs.job_key(args.redisprefix, args.jobid) - rconn.hset(fqjobid, "status", "started") - logger.addHandler(setup_redis_logger( - rconn, - fqjobid, - f"{fqjobid}:log-messages", - args.redisexpiry)) - logger.addHandler(StreamHandler(stream=sys.stdout)) - try: - returncode = run_fn(dbconn, args, logger) + check_db(args.databaseuri) + check_redis(args.redisuri) + if not args.rqtl2bundle.exists(): + logger.error("File not found: '%s'.", args.rqtl2bundle) + return 2 + + returncode = run_fn(rconn, dbconn, fqjobid, args, logger) if returncode == 0: rconn.hset(fqjobid, "status", "completed:success") return returncode rconn.hset(fqjobid, "status", "completed:error") return returncode - except Exception as _exc: - logger.error("The process failed!", exc_info=True) - rconn.hset(fqjobid, "status", "completed:error") - return 4 + except Exception as _exc:# pylint: disable=[broad-except] + logger.error("The process failed!", exc_info=True) + rconn.hset(fqjobid, "status", "completed:error") + return 4 return main diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py index 20a19da..8762655 100644 --- a/scripts/rqtl2/install_genotypes.py +++ b/scripts/rqtl2/install_genotypes.py @@ -7,6 +7,7 @@ from functools import reduce from typing import Iterator, Optional from logging import Logger, getLogger +from redis import Redis import MySQLdb as mdb from MySQLdb.cursors import DictCursor @@ -185,8 +186,10 @@ def cross_reference_genotypes( cursor.executemany(insertquery, insertparams) return cursor.rowcount -def install_genotypes(#pylint: disable=[too-many-arguments, too-many-locals] +def install_genotypes(#pylint: disable=[too-many-locals] + rconn: Redis,#pylint: disable=[unused-argument] dbconn: mdb.Connection, + fullyqualifiedjobid: str,#pylint: disable=[unused-argument] args: argparse.Namespace, logger: Logger = getLogger(__name__) ) -> int: diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py index a6e9fb2..9059cd6 100644 --- a/scripts/rqtl2/install_phenos.py +++ b/scripts/rqtl2/install_phenos.py @@ -6,6 +6,7 @@ from zipfile import ZipFile from functools import reduce from logging import Logger, getLogger +from redis import Redis import MySQLdb as mdb from MySQLdb.cursors import DictCursor @@ -95,8 +96,10 @@ def cross_reference_probeset_data(dbconn: mdb.Connection, } for row in dataids)) return cursor.rowcount -def install_pheno_files(#pylint: disable=[too-many-arguments, too-many-locals] +def install_pheno_files(#pylint: disable=[too-many-locals] + rconn: Redis,#pylint: disable=[unused-argument] dbconn: mdb.Connection, + fullyqualifiedjobid: str,#pylint: disable=[unused-argument] args: argparse.Namespace, logger: Logger = getLogger()) -> int: """Load data in `pheno` files and other related files into the database.""" diff --git a/scripts/rqtl2/phenotypes_qc.py b/scripts/rqtl2/phenotypes_qc.py index 4f55e40..ba28ed0 100644 --- a/scripts/rqtl2/phenotypes_qc.py +++ b/scripts/rqtl2/phenotypes_qc.py @@ -1,6 +1,7 @@ """Run quality control on phenotypes-specific files in the bundle.""" import sys import uuid +import json import shutil import logging import tempfile @@ -152,6 +153,17 @@ def redis_logger( rconn.close() +def push_error(rconn: Redis, fqkey: str, error: InvalidValue) -> InvalidValue: + """Persist the error in redis.""" + rconn.rpush(fqkey, json.dumps(error._asdict())) + return error + + +def file_fqkey(prefix: str, section: str, filepath: Path) -> str: + """Build a files fully-qualified key in a consistent manner""" + return f"{prefix}:{section}:{filepath.name}" + + def qc_phenocovar_file( filepath: Path, redisuri, @@ -159,52 +171,64 @@ def qc_phenocovar_file( separator: str, comment_char: str): """Check that `phenocovar` files are structured correctly.""" - with redis_logger( + with (redis_logger( redisuri, f"{__MODULE__}.qc_phenocovar_file", filepath.name, - fqkey) as logger: + f"{fqkey}:logs") as logger, + Redis.from_url(redisuri, decode_responses=True) as rconn): logger.info("Running QC on file: %s", filepath.name) _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) _headings = tuple(heading.lower() for heading in next(_csvfile)) _errors: tuple[InvalidValue, ...] = tuple() + save_error = partial( + push_error, rconn, file_fqkey(fqkey, "errors", filepath)) for heading in ("description", "units"): if heading not in _headings: - _errors = (InvalidValue( + _errors = (save_error(InvalidValue( filepath.name, "header row", "-", "-", (f"File {filepath.name} is missing the {heading} heading " - "in the header line.")),) + "in the header line."))),) def collect_errors(errors_and_linecount, line): _errs, _lc = errors_and_linecount logger.info("Testing record '%s'", line[0]) if len(line) != len(_headings): - _errs = _errs + (InvalidValue( + _errs = _errs + (save_error(InvalidValue( filepath.name, line[0], "-", "-", (f"Record {_lc} in file {filepath.name} has a different " - "number of columns than the number of headings")),) + "number of columns than the number of headings"))),) _line = dict(zip(_headings, line)) if not bool(_line["description"]): _errs = _errs + ( - InvalidValue(filepath.name, - _line[_headings[0]], - "description", - _line["description"], - "The description is not provided!"),) - + save_error(InvalidValue(filepath.name, + _line[_headings[0]], + "description", + _line["description"], + "The description is not provided!")),) + + rconn.hset(file_fqkey(fqkey, "metadata", filepath), + mapping={ + "status": "checking", + "linecount": _lc+1, + "total-errors": len(_errs) + }) return _errs, _lc+1 - return { - filepath.name: dict(zip( - ("errors", "linecount"), - reduce(collect_errors, _csvfile, (_errors, 1)))) - } + _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1)) + rconn.hset(file_fqkey(fqkey, "metadata", filepath), + mapping={ + "status": "completed", + "linecount": _linecount, + "total-errors": len(_errors) + }) + return {filepath.name: {"errors": _errors, "linecount": _linecount}} def merge_dicts(*dicts): @@ -243,7 +267,7 @@ def integer_error( return InvalidValue(filename, rowtitle, coltitle, cellvalue, message) -def qc_pheno_file(# pylint: disable=[too-many-arguments] +def qc_pheno_file(# pylint: disable=[too-many-locals, too-many-arguments] filepath: Path, redisuri: str, fqkey: str, @@ -255,12 +279,15 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments] error_fn: Callable = decimal_points_error ): """Run QC/QA on a `pheno` file.""" - with redis_logger( + with (redis_logger( redisuri, f"{__MODULE__}.qc_pheno_file", filepath.name, - fqkey) as logger: + f"{fqkey}:logs") as logger, + Redis.from_url(redisuri, decode_responses=True) as rconn): logger.info("Running QC on file: %s", filepath.name) + save_error = partial( + push_error, rconn, file_fqkey(fqkey, "errors", filepath)) _csvfile = rqtl2.read_csv_file(filepath, separator, comment_char) _headings: tuple[str, ...] = tuple( heading.lower() for heading in next(_csvfile)) @@ -268,24 +295,25 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments] _absent = tuple(pheno for pheno in _headings[1:] if pheno not in phenonames) if len(_absent) > 0: - _errors = _errors + (InvalidValue( + _errors = _errors + (save_error(InvalidValue( filepath.name, "header row", "-", ", ".join(_absent), - (f"The phenotype names ({', '.join(samples)}) do not exist in any " - "of the provided phenocovar files.")),) + ("The following phenotype names do not exist in any of the " + f"provided phenocovar files: ({', '.join(_absent)})"))),) def collect_errors(errors_and_linecount, line): _errs, _lc = errors_and_linecount + logger.debug("Checking row %s", line[0]) if line[0] not in samples: - _errs = _errs + (InvalidValue( + _errs = _errs + (save_error(InvalidValue( filepath.name, line[0], _headings[0], line[0], (f"The sample named '{line[0]}' does not exist in the database. " - "You will need to upload that first.")),) + "You will need to upload that first."))),) for field, value in zip(_headings[1:], line[1:]): if value in na_strings: @@ -295,15 +323,24 @@ def qc_pheno_file(# pylint: disable=[too-many-arguments] line[0], field, value) - _errs = _errs + ((_err,) if bool(_err) else tuple()) - + _errs = _errs + ((save_error(_err),) if bool(_err) else tuple()) + + rconn.hset(file_fqkey(fqkey, "metadata", filepath), + mapping={ + "status": "checking", + "linecount": _lc+1, + "total-errors": len(_errs) + }) return _errs, _lc+1 - return { - filepath.name: dict(zip( - ("errors", "linecount"), - reduce(collect_errors, _csvfile, (_errors, 1)))) - } + _errors, _linecount = reduce(collect_errors, _csvfile, (_errors, 1)) + rconn.hset(file_fqkey(fqkey, "metadata", filepath), + mapping={ + "status": "completed", + "linecount": _linecount, + "total-errors": len(_errors) + }) + return {filepath.name: {"errors": _errors, "linecount": _linecount}} def phenotype_names(filepath: Path, @@ -324,7 +361,9 @@ def fullyqualifiedkey( return f"{prefix}:{rest}" def run_qc(# pylint: disable=[too-many-locals] + rconn: Redis, dbconn: mdb.Connection, + fullyqualifiedjobid: str, args: Namespace, logger: Logger ) -> int: @@ -366,15 +405,16 @@ def run_qc(# pylint: disable=[too-many-locals] # - Check that `description` and `units` is present in phenocovar for # all phenotypes + rconn.hset(fullyqualifiedjobid, + "fully-qualified-keys:phenocovar", + json.dumps(tuple(f"{fullyqualifiedjobid}:phenocovar:{_file}" + for _file in cdata.get("phenocovar", [])))) with mproc.Pool(mproc.cpu_count() - 1) as pool: logger.debug("Check for errors in 'phenocovar' file(s).") _phenocovar_qc_res = merge_dicts(*pool.starmap(qc_phenocovar_file, tuple( (extractiondir.joinpath(_file), args.redisuri, - chain( - "phenocovar", - fullyqualifiedkey(args.jobid), - fullyqualifiedkey(args.redisprefix)), + f"{fullyqualifiedjobid}:phenocovar", cdata["sep"], cdata["comment.char"]) for _file in cdata.get("phenocovar", [])))) |