From 53b1e7cb181380a24aab4cbc7a9634b2d8dd2d29 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Fri, 19 Jan 2024 08:29:04 +0300 Subject: scripts: Process R/qtl2 bundle Build script to start the processing of the R/qtl2 bundle. --- scripts/process_rqtl2_bundle.py | 123 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 scripts/process_rqtl2_bundle.py (limited to 'scripts/process_rqtl2_bundle.py') diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py new file mode 100644 index 0000000..105f787 --- /dev/null +++ b/scripts/process_rqtl2_bundle.py @@ -0,0 +1,123 @@ +"""Entry point for processing R/qtl2 bundles.""" +import sys +import uuid +import json +import logging +import traceback +from typing import Any +from pathlib import Path + +import MySQLdb as mdb +from redis import Redis + +from qc_app import jobs +from qc_app.db_utils import database_connection +from qc_app.check_connections import check_db, check_redis + +from scripts.cli_parser import init_cli_parser +from scripts.redis_logger import setup_redis_logger + +from scripts.rqtl2.install_phenos import install_pheno_files +from scripts.rqtl2.install_genotypes import install_genotypes + +stderr_handler = logging.StreamHandler(stream=sys.stderr) +logger = logging.getLogger("process_rqtl2_bundle") +logger.setLevel("DEBUG") +logger.addHandler(stderr_handler) + +def safe_json_decode(value: str) -> Any: + """Safely decode the string values into JSON.""" + try: + return json.loads(value) + except json.decoder.JSONDecodeError: + return value + +def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict: + """Load and parse job from Redis""" + return { + key: safe_json_decode(value) + for key, value + in jobs.job(rconn, jobid).items() + } + +def has_geno_file(job: dict) -> bool: + """Check whether to trigger processing of geno file(s).""" + return bool(job.get("bundle-metadata", {}).get("geno-dataset-id")) + +def has_pheno_file(job: dict) -> bool: + """Check whether to trigger processing of pheno file(s).""" + meta = job.get("bundle-metadata", {}) + return (bool(meta.get("probe-study-id")) + and bool(meta.get("probe-dataset-id"))) + +def percent_completion(geno: float, pheno: float) -> float: + """Compute the total completion percent.""" + return 0.5 * (geno + pheno) + +def process_bundle(dbconn: mdb.Connection, rconn: Redis, jobid: uuid.UUID) -> int: + """Process the R/qtl2 bundle.""" + try: + thejob = parse_job(rconn, jobid) + meta = thejob["bundle-metadata"] + logger.debug("The metadata: %s", meta) + rconn.hset(str(jobid), "geno-percent", "0") + rconn.hset(str(jobid), "pheno-percent", "0") + + if has_geno_file(thejob): + logger.info("Processing geno files.") + genoexit = install_genotypes( + dbconn, + meta["speciesid"], + meta["populationid"], + meta["geno-dataset-id"], + Path(meta["rqtl2-bundle-file"])) + if genoexit != 0: + raise Exception("Processing 'geno' file failed.") + logger.debug( + "geno file processing completed successfully. (ExitCode: %s)", + genoexit) + rconn.hset(str(jobid), "geno-percent", "100") + + if has_pheno_file(thejob): + phenoexit = install_pheno_files( + dbconn, + meta["speciesid"], + meta["platformid"], + meta["probe-dataset-id"], + Path(meta["rqtl2-bundle-file"])) + if phenoexit != 0: + raise Exception("Processing 'pheno' file failed.") + logger.debug( + "pheno file processing completed successfully. (ExitCode: %s)", + phenoexit) + rconn.hset(str(jobid), "pheno-percent", "100") + + return 0 + except jobs.JobNotFound as exc: + logger.error("%s", exc.args) + except Exception as _exc:#pylint: disable=[broad-except] + logger.error("Exiting with generic error: %s", traceback.format_exc()) + + return 1 + +if __name__ == "__main__": + def main(): + """Run the `process_rqtl2_bundle` script.""" + args = init_cli_parser( + "upload_rqtl2_bundle", + "Launch this to control the processing of R/qtl2 zip bundles." + ).parse_args() + check_db(args.databaseuri) + check_redis(args.redisuri) + + jobid = args.jobid + with (database_connection(args.databaseuri) as dbconn, + Redis.from_url(args.redisuri, decode_responses=True) as rconn): + logger.addHandler(setup_redis_logger( + rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry)) + + exitcode = process_bundle(dbconn, rconn, args.jobid) + rconn.hset(str(args.jobid), "percent", "100") + return exitcode + + sys.exit(main()) -- cgit v1.2.3