"""Entry point for processing R/qtl2 bundles.""" import sys import uuid import json import logging import traceback from typing import Any from pathlib import Path import MySQLdb as mdb from redis import Redis from qc_app import jobs from qc_app.db_utils import database_connection from qc_app.check_connections import check_db, check_redis from scripts.cli_parser import init_cli_parser from scripts.redis_logger import setup_redis_logger from scripts.rqtl2.install_phenos import install_pheno_files from scripts.rqtl2.install_genotypes import install_genotypes stderr_handler = logging.StreamHandler(stream=sys.stderr) logger = logging.getLogger("process_rqtl2_bundle") logger.setLevel("DEBUG") logger.addHandler(stderr_handler) def safe_json_decode(value: str) -> Any: """Safely decode the string values into JSON.""" try: return json.loads(value) except json.decoder.JSONDecodeError: return value def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict: """Load and parse job from Redis""" return { key: safe_json_decode(value) for key, value in jobs.job(rconn, jobid).items() } def has_geno_file(job: dict) -> bool: """Check whether to trigger processing of geno file(s).""" return bool(job.get("bundle-metadata", {}).get("geno-dataset-id")) def has_pheno_file(job: dict) -> bool: """Check whether to trigger processing of pheno file(s).""" meta = job.get("bundle-metadata", {}) return (bool(meta.get("probe-study-id")) and bool(meta.get("probe-dataset-id"))) def percent_completion(geno: float, pheno: float) -> float: """Compute the total completion percent.""" return 0.5 * (geno + pheno) def process_bundle(dbconn: mdb.Connection, rconn: Redis, jobid: uuid.UUID) -> int: """Process the R/qtl2 bundle.""" try: thejob = parse_job(rconn, jobid) meta = thejob["bundle-metadata"] rconn.hset(str(jobid), "geno-percent", "0") rconn.hset(str(jobid), "pheno-percent", "0") if has_geno_file(thejob): logger.info("Processing geno files.") genoexit = install_genotypes( dbconn, meta["speciesid"], meta["populationid"], meta["geno-dataset-id"], Path(meta["rqtl2-bundle-file"])) if genoexit != 0: raise Exception("Processing 'geno' file failed.") logger.debug( "geno file processing completed successfully. (ExitCode: %s)", genoexit) rconn.hset(str(jobid), "geno-percent", "100") if has_pheno_file(thejob): phenoexit = install_pheno_files( dbconn, meta["speciesid"], meta["platformid"], meta["probe-dataset-id"], Path(meta["rqtl2-bundle-file"])) if phenoexit != 0: raise Exception("Processing 'pheno' file failed.") logger.debug( "pheno file processing completed successfully. (ExitCode: %s)", phenoexit) rconn.hset(str(jobid), "pheno-percent", "100") return 0 except jobs.JobNotFound as exc: logger.error("%s", exc.args) except Exception as _exc:#pylint: disable=[broad-except] logger.error("Exiting with generic error: %s", traceback.format_exc()) return 1 if __name__ == "__main__": def main(): """Run the `process_rqtl2_bundle` script.""" args = init_cli_parser( "upload_rqtl2_bundle", "Launch this to control the processing of R/qtl2 zip bundles." ).parse_args() check_db(args.databaseuri) check_redis(args.redisuri) jobid = args.jobid with (database_connection(args.databaseuri) as dbconn, Redis.from_url(args.redisuri, decode_responses=True) as rconn): logger.addHandler(setup_redis_logger( rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry)) exitcode = process_bundle(dbconn, rconn, args.jobid) rconn.hset(str(args.jobid), "percent", "100") return exitcode sys.exit(main())