From 53b1e7cb181380a24aab4cbc7a9634b2d8dd2d29 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Fri, 19 Jan 2024 08:29:04 +0300 Subject: scripts: Process R/qtl2 bundle Build script to start the processing of the R/qtl2 bundle. --- scripts/cli_parser.py | 22 ++++--- scripts/process_rqtl2_bundle.py | 123 +++++++++++++++++++++++++++++++++++++ scripts/rqtl2/entry.py | 3 +- scripts/rqtl2/install_genotypes.py | 8 +-- scripts/rqtl2/install_phenos.py | 8 +-- 5 files changed, 145 insertions(+), 19 deletions(-) create mode 100644 scripts/process_rqtl2_bundle.py (limited to 'scripts') diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py index bceb3f4..e8f030c 100644 --- a/scripts/cli_parser.py +++ b/scripts/cli_parser.py @@ -7,16 +7,20 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument """Initialise the CLI arguments parser.""" parser = ArgumentParser(prog=program, description=description) - parser.add_argument("databaseuri", help="URL to MariaDB") - parser.add_argument("redisuri", help="URL to Redis") - parser.add_argument("jobid", - help="Job ID that this belongs to.", - type=UUID) - parser.add_argument("--redisexpiry", - help="How long to keep any redis keys around.", - type=int, - default=86400) + parser.add_argument( + "databaseuri", type=str, help="URI to connect to MariaDB") + parser.add_argument( + "redisuri", type=str, help="URI to connect to the redis server.") + parser.add_argument("jobid", type=UUID, help="Job ID that this belongs to") + parser.add_argument( + "--redisexpiry", + type=int, + default=86400, + help="How long to keep any redis keys around.") + return parser +def add_global_data_arguments(parser: ArgumentParser) -> ArgumentParser: + """Add the global (present in nearly ALL scripts) CLI arguments.""" parser.add_argument("speciesid", type=int, help="Species to which bundle relates.") diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py new file mode 100644 index 0000000..105f787 --- /dev/null +++ b/scripts/process_rqtl2_bundle.py @@ -0,0 +1,123 @@ +"""Entry point for processing R/qtl2 bundles.""" +import sys +import uuid +import json +import logging +import traceback +from typing import Any +from pathlib import Path + +import MySQLdb as mdb +from redis import Redis + +from qc_app import jobs +from qc_app.db_utils import database_connection +from qc_app.check_connections import check_db, check_redis + +from scripts.cli_parser import init_cli_parser +from scripts.redis_logger import setup_redis_logger + +from scripts.rqtl2.install_phenos import install_pheno_files +from scripts.rqtl2.install_genotypes import install_genotypes + +stderr_handler = logging.StreamHandler(stream=sys.stderr) +logger = logging.getLogger("process_rqtl2_bundle") +logger.setLevel("DEBUG") +logger.addHandler(stderr_handler) + +def safe_json_decode(value: str) -> Any: + """Safely decode the string values into JSON.""" + try: + return json.loads(value) + except json.decoder.JSONDecodeError: + return value + +def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict: + """Load and parse job from Redis""" + return { + key: safe_json_decode(value) + for key, value + in jobs.job(rconn, jobid).items() + } + +def has_geno_file(job: dict) -> bool: + """Check whether to trigger processing of geno file(s).""" + return bool(job.get("bundle-metadata", {}).get("geno-dataset-id")) + +def has_pheno_file(job: dict) -> bool: + """Check whether to trigger processing of pheno file(s).""" + meta = job.get("bundle-metadata", {}) + return (bool(meta.get("probe-study-id")) + and bool(meta.get("probe-dataset-id"))) + +def percent_completion(geno: float, pheno: float) -> float: + """Compute the total completion percent.""" + return 0.5 * (geno + pheno) + +def process_bundle(dbconn: mdb.Connection, rconn: Redis, jobid: uuid.UUID) -> int: + """Process the R/qtl2 bundle.""" + try: + thejob = parse_job(rconn, jobid) + meta = thejob["bundle-metadata"] + logger.debug("The metadata: %s", meta) + rconn.hset(str(jobid), "geno-percent", "0") + rconn.hset(str(jobid), "pheno-percent", "0") + + if has_geno_file(thejob): + logger.info("Processing geno files.") + genoexit = install_genotypes( + dbconn, + meta["speciesid"], + meta["populationid"], + meta["geno-dataset-id"], + Path(meta["rqtl2-bundle-file"])) + if genoexit != 0: + raise Exception("Processing 'geno' file failed.") + logger.debug( + "geno file processing completed successfully. (ExitCode: %s)", + genoexit) + rconn.hset(str(jobid), "geno-percent", "100") + + if has_pheno_file(thejob): + phenoexit = install_pheno_files( + dbconn, + meta["speciesid"], + meta["platformid"], + meta["probe-dataset-id"], + Path(meta["rqtl2-bundle-file"])) + if phenoexit != 0: + raise Exception("Processing 'pheno' file failed.") + logger.debug( + "pheno file processing completed successfully. (ExitCode: %s)", + phenoexit) + rconn.hset(str(jobid), "pheno-percent", "100") + + return 0 + except jobs.JobNotFound as exc: + logger.error("%s", exc.args) + except Exception as _exc:#pylint: disable=[broad-except] + logger.error("Exiting with generic error: %s", traceback.format_exc()) + + return 1 + +if __name__ == "__main__": + def main(): + """Run the `process_rqtl2_bundle` script.""" + args = init_cli_parser( + "upload_rqtl2_bundle", + "Launch this to control the processing of R/qtl2 zip bundles." + ).parse_args() + check_db(args.databaseuri) + check_redis(args.redisuri) + + jobid = args.jobid + with (database_connection(args.databaseuri) as dbconn, + Redis.from_url(args.redisuri, decode_responses=True) as rconn): + logger.addHandler(setup_redis_logger( + rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry)) + + exitcode = process_bundle(dbconn, rconn, args.jobid) + rconn.hset(str(args.jobid), "percent", "100") + return exitcode + + sys.exit(main()) diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py index e2d70a6..eccc19d 100644 --- a/scripts/rqtl2/entry.py +++ b/scripts/rqtl2/entry.py @@ -11,13 +11,12 @@ from qc_app.check_connections import check_db, check_redis from scripts.redis_logger import setup_redis_logger -def build_main(cli_args: Callable[[], Namespace], +def build_main(args: Namespace, run_fn: Callable[[Connection, Namespace], int], logger: Logger, loglevel: str = "INFO") -> Callable[[],int]: """Build a function to be used as an entry-point for scripts.""" def main(): - args = cli_args() check_db(args.databaseuri) check_redis(args.redisuri) if not args.rqtl2bundle.exists(): diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py index 77e7163..733ccf9 100644 --- a/scripts/rqtl2/install_genotypes.py +++ b/scripts/rqtl2/install_genotypes.py @@ -15,8 +15,8 @@ from r_qtl import r_qtl2 as rqtl2 from functional_tools import take from scripts.rqtl2.entry import build_main -from scripts.cli_parser import init_cli_parser from scripts.rqtl2.cli_parser import add_common_arguments +from scripts.cli_parser import init_cli_parser, add_global_data_arguments stderr_handler = logging.StreamHandler(stream=sys.stderr) logger = logging.getLogger("install_genotypes") @@ -217,14 +217,14 @@ if __name__ == "__main__": def cli_args(): """Process command-line arguments for install_genotypes""" - parser = add_common_arguments(init_cli_parser( + parser = add_common_arguments(add_global_data_arguments(init_cli_parser( "install_genotypes", - "Parse genotypes from R/qtl2 bundle into the database.")) + "Parse genotypes from R/qtl2 bundle into the database."))) return parser.parse_args() main = build_main( - cli_args, + cli_args(), lambda dbconn, args: install_genotypes(dbconn, args.speciesid, args.populationid, diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py index 5c25866..21b5f00 100644 --- a/scripts/rqtl2/install_phenos.py +++ b/scripts/rqtl2/install_phenos.py @@ -10,8 +10,8 @@ import MySQLdb as mdb from MySQLdb.cursors import DictCursor from scripts.rqtl2.entry import build_main -from scripts.cli_parser import init_cli_parser from scripts.rqtl2.cli_parser import add_common_arguments +from scripts.cli_parser import init_cli_parser, add_global_data_arguments from r_qtl import r_qtl2 as rqtl2 @@ -143,9 +143,9 @@ if __name__ == "__main__": def cli_args(): """Process command-line arguments for `install_phenos`""" - parser = init_cli_parser( + parser = add_global_data_arguments(init_cli_parser( "install_genotypes", - "Parse genotypes from R/qtl2 bundle into the database.") + "Parse genotypes from R/qtl2 bundle into the database.")) parser.add_argument( "platformid", @@ -156,7 +156,7 @@ if __name__ == "__main__": return parser.parse_args() main = build_main( - cli_args, + cli_args(), lambda dbconn, args: install_pheno_files(dbconn, args.speciesid, args.platformid, -- cgit v1.2.3