"""Entry point for processing R/qtl2 bundles.""" import sys import uuid import json import argparse import traceback from typing import Any from pathlib import Path from zipfile import ZipFile from logging import Logger, getLogger, StreamHandler import MySQLdb as mdb from redis import Redis from functional_tools import take import r_qtl.r_qtl2 as rqtl2 import r_qtl.r_qtl2_qc as rqc import r_qtl.exceptions as rqe from uploader import jobs from uploader.db_utils import database_connection from uploader.check_connections import check_db, check_redis from scripts.cli_parser import init_cli_parser from scripts.redis_logger import setup_redis_logger from scripts.rqtl2.install_phenos import install_pheno_files from scripts.rqtl2.install_genotypes import install_genotypes def safe_json_decode(value: str) -> Any: """Safely decode the string values into JSON.""" try: return json.loads(value) except json.decoder.JSONDecodeError: return value def parse_job(rconn: Redis, rprefix: str, jobid: uuid.UUID) -> dict: """Load and parse job from Redis""" return { key: safe_json_decode(value) for key, value in jobs.job(rconn, rprefix, jobid).items() } def has_geno_file(job: dict) -> bool: """Check whether to trigger processing of geno file(s).""" return bool(job.get("bundle-metadata", {}).get("geno-dataset-id")) def has_pheno_file(job: dict) -> bool: """Check whether to trigger processing of pheno file(s).""" meta = job.get("bundle-metadata", {}) return (bool(meta.get("probe-study-id")) and bool(meta.get("probe-dataset-id"))) def percent_completion(geno: float, pheno: float) -> float: """Compute the total completion percent.""" return 0.5 * (geno + pheno) def qc_r_qtl2_bundle(bundlefilepath, logger): """Run QC checks on the R/qtl2 bundle.""" with ZipFile(bundlefilepath, "r") as zfile: logger.info("Validating the bundle ...") rqc.validate_bundle(zfile) logger.info( "Bundle successfully validated. All listed files are present.") cdata = rqtl2.control_data(zfile) if "geno" in cdata: gerr = False logger.info("Validating 'geno' file.") for error in take(rqc.geno_errors(zfile), 10): gerr = True logger.error("%s: [Line %s, Field %s]", error[2], error[0], error[1]) if gerr: logger.error("... more") raise rqe.InvalidFormat("'geno' file content contains errors.") logger.info("'geno' file validation was successful.") def process_bundle(dbconn: mdb.Connection, rconn: Redis, rprefix: str, jobid: uuid.UUID, logger: Logger) -> int: """Process the R/qtl2 bundle.""" try: thejob = parse_job(rconn, rprefix, jobid) meta = thejob["bundle-metadata"] qc_r_qtl2_bundle(meta["rqtl2-bundle-file"], logger) rconn.hset(jobs.job_key(rprefix, jobid), "geno-percent", "0") rconn.hset(jobs.job_key(rprefix, jobid), "pheno-percent", "0") if has_geno_file(thejob): logger.info("Processing geno files.") genoexit = install_genotypes( dbconn, argparse.Namespace( speciesid=meta["speciesid"], populationid=meta["populationid"], datasetid=meta["geno-dataset-id"], rqtl2bundle=Path(meta["rqtl2-bundle-file"])), logger) if genoexit != 0: raise Exception("Processing 'geno' file failed.") logger.debug( "geno file processing completed successfully. (ExitCode: %s)", genoexit) rconn.hset(jobs.job_key(rprefix, jobid), "geno-percent", "100") if has_pheno_file(thejob): phenoexit = install_pheno_files( dbconn, argparse.Namespace( speciesid=meta["speciesid"], platformid=meta["platformid"], dataset_id=meta["probe-dataset-id"], rqtl2bundle=Path(meta["rqtl2-bundle-file"])), logger) if phenoexit != 0: raise Exception("Processing 'pheno' file failed.") logger.debug( "pheno file processing completed successfully. (ExitCode: %s)", phenoexit) rconn.hset(jobs.job_key(rprefix, jobid), "pheno-percent", "100") return 0 except jobs.JobNotFound as exc: logger.error("%s", exc.args) except Exception as _exc:#pylint: disable=[broad-except] logger.error("Exiting with generic error: %s", traceback.format_exc()) return 1 if __name__ == "__main__": def main(): """Run the `process_rqtl2_bundle` script.""" args = init_cli_parser( "upload_rqtl2_bundle", "Launch this to control the processing of R/qtl2 zip bundles." ).parse_args() check_db(args.databaseuri) check_redis(args.redisuri) logger = getLogger("process_rqtl2_bundle") logger.addHandler(StreamHandler(stream=sys.stderr)) logger.setLevel("DEBUG") fqjobid = jobs.job_key(args.redisprefix, args.jobid) with (database_connection(args.databaseuri) as dbconn, Redis.from_url(args.redisuri, decode_responses=True) as rconn): logger.addHandler(setup_redis_logger( rconn, fqjobid, f"{fqjobid}:log-messages", args.redisexpiry)) exitcode = process_bundle( dbconn, rconn, args.redisprefix, args.jobid, logger) rconn.hset( jobs.job_key(args.redisprefix, args.jobid), "percent", "100") return exitcode sys.exit(main())