From 53b1e7cb181380a24aab4cbc7a9634b2d8dd2d29 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Fri, 19 Jan 2024 08:29:04 +0300 Subject: scripts: Process R/qtl2 bundle Build script to start the processing of the R/qtl2 bundle. --- qc_app/jobs.py | 15 ++++- qc_app/upload/rqtl2.py | 44 ++++++++++++- scripts/cli_parser.py | 22 ++++--- scripts/process_rqtl2_bundle.py | 123 +++++++++++++++++++++++++++++++++++++ scripts/rqtl2/entry.py | 3 +- scripts/rqtl2/install_genotypes.py | 8 +-- scripts/rqtl2/install_phenos.py | 8 +-- 7 files changed, 200 insertions(+), 23 deletions(-) create mode 100644 scripts/process_rqtl2_bundle.py diff --git a/qc_app/jobs.py b/qc_app/jobs.py index c5bf5e5..a8257a3 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -3,11 +3,19 @@ import os import sys import shlex import subprocess -from uuid import uuid4 +from typing import Union +from uuid import UUID, uuid4 from datetime import timedelta from redis import Redis +class JobNotFound(Exception): + """Raised if we try to retrieve a non-existent job.""" + +def raise_jobnotfound(jobid: Union[str,UUID]): + """Utility to raise a `NoSuchJobError`""" + raise JobNotFound(f"Could not retrieve job '{jobid}'.") + def error_filename(job_id, error_dir): "Compute the path of the file where errors will be dumped." return f"{error_dir}/job_{job_id}.error" @@ -70,6 +78,7 @@ def launch_job(the_job: dict, redisurl: str, error_dir): return the_job -def job(redis_conn, job_id: str): +def job(redis_conn, job_id: Union[str,UUID]): "Retrieve the job" - return redis_conn.hgetall(job_id) + thejob = redis_conn.hgetall(str(job_id)) or raise_jobnotfound(job_id) + return thejob diff --git a/qc_app/upload/rqtl2.py b/qc_app/upload/rqtl2.py index 7609fa9..7ba90c2 100644 --- a/qc_app/upload/rqtl2.py +++ b/qc_app/upload/rqtl2.py @@ -1,8 +1,12 @@ """Module to handle uploading of R/qtl2 bundles.""" +import sys +import json +from uuid import uuid4 from pathlib import Path from datetime import date from zipfile import ZipFile, is_zipfile +from redis import Redis from MySQLdb.cursors import DictCursor from flask import ( flash, @@ -16,6 +20,7 @@ from flask import ( from r_qtl import r_qtl2 from r_qtl.errors import InvalidFormat +from qc_app import jobs from qc_app.files import save_file, fullpath from qc_app.dbinsert import species as all_species from qc_app.db_utils import with_db_connection, database_connection @@ -521,6 +526,7 @@ def select_dataset_info(species_id: int, population_id: int): return render_template("rqtl2/summary-info.html", species=species, population=population, + rqtl2_bundle_file=thefile.name, geno_dataset=geno_dataset, probe_study=probeset_study, probe_dataset=probeset_dataset) @@ -532,11 +538,47 @@ def select_dataset_info(species_id: int, population_id: int): methods=["POST"]) def confirm_bundle_details(species_id: int, population_id: int): """Confirm the details and trigger R/qtl2 bundle processing...""" - with database_connection(app.config["SQL_URI"]) as conn: + redisuri = app.config["REDIS_URL"] + with (database_connection(app.config["SQL_URI"]) as conn, + Redis.from_url(redisuri, decode_responses=True) as rconn): error = check_errors( conn, "species", "population", "rqtl2_bundle_file", "geno-dataset", "probe-study-id", "probe-dataset-id") if bool(error): return error + redis_ttl_seconds = app.config["JOBS_TTL_SECONDS"] + jobid = str(uuid4()) + _job = jobs.launch_job( + jobs.initialise_job( + rconn, + jobid, + [ + sys.executable, "-m", "scripts.process_rqtl2_bundle", + app.config["SQL_URI"], app.config["REDIS_URL"], jobid, + "--redisexpiry", str(redis_ttl_seconds)], + "R/qtl2 Bundle Upload", + redis_ttl_seconds, + { + "bundle-metadata": json.dumps({ + "speciesid": species_id, + "populationid": population_id, + "rqtl2-bundle-file": str(fullpath( + request.form["rqtl2_bundle_file"])), + "geno-dataset-id": request.form.get( + "geno-dataset-id", ""), + "probe-study-id": request.form.get( + "probe-study-id", ""), + "probe-dataset-id": request.form.get( + "probe-dataset-id", ""), + **({ + "platformid": probeset_study_by_id( + conn, + int(request.form["probe-study-id"]))["ChipId"] + } if bool(request.form.get("probe-study-id")) else {}) + }) + }), + redisuri, + f"{app.config['UPLOAD_FOLDER']}/job_errors") + raise NotImplementedError diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py index bceb3f4..e8f030c 100644 --- a/scripts/cli_parser.py +++ b/scripts/cli_parser.py @@ -7,16 +7,20 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument """Initialise the CLI arguments parser.""" parser = ArgumentParser(prog=program, description=description) - parser.add_argument("databaseuri", help="URL to MariaDB") - parser.add_argument("redisuri", help="URL to Redis") - parser.add_argument("jobid", - help="Job ID that this belongs to.", - type=UUID) - parser.add_argument("--redisexpiry", - help="How long to keep any redis keys around.", - type=int, - default=86400) + parser.add_argument( + "databaseuri", type=str, help="URI to connect to MariaDB") + parser.add_argument( + "redisuri", type=str, help="URI to connect to the redis server.") + parser.add_argument("jobid", type=UUID, help="Job ID that this belongs to") + parser.add_argument( + "--redisexpiry", + type=int, + default=86400, + help="How long to keep any redis keys around.") + return parser +def add_global_data_arguments(parser: ArgumentParser) -> ArgumentParser: + """Add the global (present in nearly ALL scripts) CLI arguments.""" parser.add_argument("speciesid", type=int, help="Species to which bundle relates.") diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py new file mode 100644 index 0000000..105f787 --- /dev/null +++ b/scripts/process_rqtl2_bundle.py @@ -0,0 +1,123 @@ +"""Entry point for processing R/qtl2 bundles.""" +import sys +import uuid +import json +import logging +import traceback +from typing import Any +from pathlib import Path + +import MySQLdb as mdb +from redis import Redis + +from qc_app import jobs +from qc_app.db_utils import database_connection +from qc_app.check_connections import check_db, check_redis + +from scripts.cli_parser import init_cli_parser +from scripts.redis_logger import setup_redis_logger + +from scripts.rqtl2.install_phenos import install_pheno_files +from scripts.rqtl2.install_genotypes import install_genotypes + +stderr_handler = logging.StreamHandler(stream=sys.stderr) +logger = logging.getLogger("process_rqtl2_bundle") +logger.setLevel("DEBUG") +logger.addHandler(stderr_handler) + +def safe_json_decode(value: str) -> Any: + """Safely decode the string values into JSON.""" + try: + return json.loads(value) + except json.decoder.JSONDecodeError: + return value + +def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict: + """Load and parse job from Redis""" + return { + key: safe_json_decode(value) + for key, value + in jobs.job(rconn, jobid).items() + } + +def has_geno_file(job: dict) -> bool: + """Check whether to trigger processing of geno file(s).""" + return bool(job.get("bundle-metadata", {}).get("geno-dataset-id")) + +def has_pheno_file(job: dict) -> bool: + """Check whether to trigger processing of pheno file(s).""" + meta = job.get("bundle-metadata", {}) + return (bool(meta.get("probe-study-id")) + and bool(meta.get("probe-dataset-id"))) + +def percent_completion(geno: float, pheno: float) -> float: + """Compute the total completion percent.""" + return 0.5 * (geno + pheno) + +def process_bundle(dbconn: mdb.Connection, rconn: Redis, jobid: uuid.UUID) -> int: + """Process the R/qtl2 bundle.""" + try: + thejob = parse_job(rconn, jobid) + meta = thejob["bundle-metadata"] + logger.debug("The metadata: %s", meta) + rconn.hset(str(jobid), "geno-percent", "0") + rconn.hset(str(jobid), "pheno-percent", "0") + + if has_geno_file(thejob): + logger.info("Processing geno files.") + genoexit = install_genotypes( + dbconn, + meta["speciesid"], + meta["populationid"], + meta["geno-dataset-id"], + Path(meta["rqtl2-bundle-file"])) + if genoexit != 0: + raise Exception("Processing 'geno' file failed.") + logger.debug( + "geno file processing completed successfully. (ExitCode: %s)", + genoexit) + rconn.hset(str(jobid), "geno-percent", "100") + + if has_pheno_file(thejob): + phenoexit = install_pheno_files( + dbconn, + meta["speciesid"], + meta["platformid"], + meta["probe-dataset-id"], + Path(meta["rqtl2-bundle-file"])) + if phenoexit != 0: + raise Exception("Processing 'pheno' file failed.") + logger.debug( + "pheno file processing completed successfully. (ExitCode: %s)", + phenoexit) + rconn.hset(str(jobid), "pheno-percent", "100") + + return 0 + except jobs.JobNotFound as exc: + logger.error("%s", exc.args) + except Exception as _exc:#pylint: disable=[broad-except] + logger.error("Exiting with generic error: %s", traceback.format_exc()) + + return 1 + +if __name__ == "__main__": + def main(): + """Run the `process_rqtl2_bundle` script.""" + args = init_cli_parser( + "upload_rqtl2_bundle", + "Launch this to control the processing of R/qtl2 zip bundles." + ).parse_args() + check_db(args.databaseuri) + check_redis(args.redisuri) + + jobid = args.jobid + with (database_connection(args.databaseuri) as dbconn, + Redis.from_url(args.redisuri, decode_responses=True) as rconn): + logger.addHandler(setup_redis_logger( + rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry)) + + exitcode = process_bundle(dbconn, rconn, args.jobid) + rconn.hset(str(args.jobid), "percent", "100") + return exitcode + + sys.exit(main()) diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py index e2d70a6..eccc19d 100644 --- a/scripts/rqtl2/entry.py +++ b/scripts/rqtl2/entry.py @@ -11,13 +11,12 @@ from qc_app.check_connections import check_db, check_redis from scripts.redis_logger import setup_redis_logger -def build_main(cli_args: Callable[[], Namespace], +def build_main(args: Namespace, run_fn: Callable[[Connection, Namespace], int], logger: Logger, loglevel: str = "INFO") -> Callable[[],int]: """Build a function to be used as an entry-point for scripts.""" def main(): - args = cli_args() check_db(args.databaseuri) check_redis(args.redisuri) if not args.rqtl2bundle.exists(): diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py index 77e7163..733ccf9 100644 --- a/scripts/rqtl2/install_genotypes.py +++ b/scripts/rqtl2/install_genotypes.py @@ -15,8 +15,8 @@ from r_qtl import r_qtl2 as rqtl2 from functional_tools import take from scripts.rqtl2.entry import build_main -from scripts.cli_parser import init_cli_parser from scripts.rqtl2.cli_parser import add_common_arguments +from scripts.cli_parser import init_cli_parser, add_global_data_arguments stderr_handler = logging.StreamHandler(stream=sys.stderr) logger = logging.getLogger("install_genotypes") @@ -217,14 +217,14 @@ if __name__ == "__main__": def cli_args(): """Process command-line arguments for install_genotypes""" - parser = add_common_arguments(init_cli_parser( + parser = add_common_arguments(add_global_data_arguments(init_cli_parser( "install_genotypes", - "Parse genotypes from R/qtl2 bundle into the database.")) + "Parse genotypes from R/qtl2 bundle into the database."))) return parser.parse_args() main = build_main( - cli_args, + cli_args(), lambda dbconn, args: install_genotypes(dbconn, args.speciesid, args.populationid, diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py index 5c25866..21b5f00 100644 --- a/scripts/rqtl2/install_phenos.py +++ b/scripts/rqtl2/install_phenos.py @@ -10,8 +10,8 @@ import MySQLdb as mdb from MySQLdb.cursors import DictCursor from scripts.rqtl2.entry import build_main -from scripts.cli_parser import init_cli_parser from scripts.rqtl2.cli_parser import add_common_arguments +from scripts.cli_parser import init_cli_parser, add_global_data_arguments from r_qtl import r_qtl2 as rqtl2 @@ -143,9 +143,9 @@ if __name__ == "__main__": def cli_args(): """Process command-line arguments for `install_phenos`""" - parser = init_cli_parser( + parser = add_global_data_arguments(init_cli_parser( "install_genotypes", - "Parse genotypes from R/qtl2 bundle into the database.") + "Parse genotypes from R/qtl2 bundle into the database.")) parser.add_argument( "platformid", @@ -156,7 +156,7 @@ if __name__ == "__main__": return parser.parse_args() main = build_main( - cli_args, + cli_args(), lambda dbconn, args: install_pheno_files(dbconn, args.speciesid, args.platformid, -- cgit v1.2.3