"""Entry point for processing R/qtl2 bundles.""" import sys import uuid import json import traceback from typing import Any from pathlib import Path from logging import Logger, getLogger, StreamHandler import MySQLdb as mdb from redis import Redis from qc_app import jobs from qc_app.db_utils import database_connection from qc_app.check_connections import check_db, check_redis from scripts.cli_parser import init_cli_parser from scripts.redis_logger import setup_redis_logger from scripts.rqtl2.install_phenos import install_pheno_files from scripts.rqtl2.install_genotypes import install_genotypes def safe_json_decode(value: str) -> Any: """Safely decode the string values into JSON.""" try: return json.loads(value) except json.decoder.JSONDecodeError: return value def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict: """Load and parse job from Redis""" return { key: safe_json_decode(value) for key, value in jobs.job(rconn, jobid).items() } def has_geno_file(job: dict) -> bool: """Check whether to trigger processing of geno file(s).""" return bool(job.get("bundle-metadata", {}).get("geno-dataset-id")) def has_pheno_file(job: dict) -> bool: """Check whether to trigger processing of pheno file(s).""" meta = job.get("bundle-metadata", {}) return (bool(meta.get("probe-study-id")) and bool(meta.get("probe-dataset-id"))) def percent_completion(geno: float, pheno: float) -> float: """Compute the total completion percent.""" return 0.5 * (geno + pheno) def process_bundle(dbconn: mdb.Connection, rconn: Redis, jobid: uuid.UUID, logger: Logger) -> int: """Process the R/qtl2 bundle.""" try: thejob = parse_job(rconn, jobid) meta = thejob["bundle-metadata"] rconn.hset(str(jobid), "geno-percent", "0") rconn.hset(str(jobid), "pheno-percent", "0") if has_geno_file(thejob): logger.info("Processing geno files.") genoexit = install_genotypes( dbconn, meta["speciesid"], meta["populationid"], meta["geno-dataset-id"], Path(meta["rqtl2-bundle-file"]), logger) if genoexit != 0: raise Exception("Processing 'geno' file failed.") logger.debug( "geno file processing completed successfully. (ExitCode: %s)", genoexit) rconn.hset(str(jobid), "geno-percent", "100") if has_pheno_file(thejob): phenoexit = install_pheno_files( dbconn, meta["speciesid"], meta["platformid"], meta["probe-dataset-id"], Path(meta["rqtl2-bundle-file"]), logger) if phenoexit != 0: raise Exception("Processing 'pheno' file failed.") logger.debug( "pheno file processing completed successfully. (ExitCode: %s)", phenoexit) rconn.hset(str(jobid), "pheno-percent", "100") return 0 except jobs.JobNotFound as exc: logger.error("%s", exc.args) except Exception as _exc:#pylint: disable=[broad-except] logger.error("Exiting with generic error: %s", traceback.format_exc()) return 1 if __name__ == "__main__": def main(): """Run the `process_rqtl2_bundle` script.""" args = init_cli_parser( "upload_rqtl2_bundle", "Launch this to control the processing of R/qtl2 zip bundles." ).parse_args() check_db(args.databaseuri) check_redis(args.redisuri) logger = getLogger("process_rqtl2_bundle") logger.addHandler(StreamHandler(stream=sys.stderr)) logger.setLevel("DEBUG") jobid = args.jobid with (database_connection(args.databaseuri) as dbconn, Redis.from_url(args.redisuri, decode_responses=True) as rconn): logger.addHandler(setup_redis_logger( rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry)) exitcode = process_bundle(dbconn, rconn, args.jobid, logger) rconn.hset(str(args.jobid), "percent", "100") return exitcode sys.exit(main())