aboutsummaryrefslogtreecommitdiff
path: root/scripts/process_rqtl2_bundle.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-01-19 08:29:04 +0300
committerFrederick Muriuki Muriithi2024-01-19 08:29:04 +0300
commit53b1e7cb181380a24aab4cbc7a9634b2d8dd2d29 (patch)
tree284a170506ba6557f8a163b3464487346cf40b9c /scripts/process_rqtl2_bundle.py
parent028e84f8a179f43e092cfb35975ef30d47aca82a (diff)
downloadgn-uploader-53b1e7cb181380a24aab4cbc7a9634b2d8dd2d29.tar.gz
scripts: Process R/qtl2 bundle
Build script to start the processing of the R/qtl2 bundle.
Diffstat (limited to 'scripts/process_rqtl2_bundle.py')
-rw-r--r--scripts/process_rqtl2_bundle.py123
1 files changed, 123 insertions, 0 deletions
diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py
new file mode 100644
index 0000000..105f787
--- /dev/null
+++ b/scripts/process_rqtl2_bundle.py
@@ -0,0 +1,123 @@
+"""Entry point for processing R/qtl2 bundles."""
+import sys
+import uuid
+import json
+import logging
+import traceback
+from typing import Any
+from pathlib import Path
+
+import MySQLdb as mdb
+from redis import Redis
+
+from qc_app import jobs
+from qc_app.db_utils import database_connection
+from qc_app.check_connections import check_db, check_redis
+
+from scripts.cli_parser import init_cli_parser
+from scripts.redis_logger import setup_redis_logger
+
+from scripts.rqtl2.install_phenos import install_pheno_files
+from scripts.rqtl2.install_genotypes import install_genotypes
+
+stderr_handler = logging.StreamHandler(stream=sys.stderr)
+logger = logging.getLogger("process_rqtl2_bundle")
+logger.setLevel("DEBUG")
+logger.addHandler(stderr_handler)
+
+def safe_json_decode(value: str) -> Any:
+ """Safely decode the string values into JSON."""
+ try:
+ return json.loads(value)
+ except json.decoder.JSONDecodeError:
+ return value
+
+def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict:
+ """Load and parse job from Redis"""
+ return {
+ key: safe_json_decode(value)
+ for key, value
+ in jobs.job(rconn, jobid).items()
+ }
+
+def has_geno_file(job: dict) -> bool:
+ """Check whether to trigger processing of geno file(s)."""
+ return bool(job.get("bundle-metadata", {}).get("geno-dataset-id"))
+
+def has_pheno_file(job: dict) -> bool:
+ """Check whether to trigger processing of pheno file(s)."""
+ meta = job.get("bundle-metadata", {})
+ return (bool(meta.get("probe-study-id"))
+ and bool(meta.get("probe-dataset-id")))
+
+def percent_completion(geno: float, pheno: float) -> float:
+ """Compute the total completion percent."""
+ return 0.5 * (geno + pheno)
+
+def process_bundle(dbconn: mdb.Connection, rconn: Redis, jobid: uuid.UUID) -> int:
+ """Process the R/qtl2 bundle."""
+ try:
+ thejob = parse_job(rconn, jobid)
+ meta = thejob["bundle-metadata"]
+ logger.debug("The metadata: %s", meta)
+ rconn.hset(str(jobid), "geno-percent", "0")
+ rconn.hset(str(jobid), "pheno-percent", "0")
+
+ if has_geno_file(thejob):
+ logger.info("Processing geno files.")
+ genoexit = install_genotypes(
+ dbconn,
+ meta["speciesid"],
+ meta["populationid"],
+ meta["geno-dataset-id"],
+ Path(meta["rqtl2-bundle-file"]))
+ if genoexit != 0:
+ raise Exception("Processing 'geno' file failed.")
+ logger.debug(
+ "geno file processing completed successfully. (ExitCode: %s)",
+ genoexit)
+ rconn.hset(str(jobid), "geno-percent", "100")
+
+ if has_pheno_file(thejob):
+ phenoexit = install_pheno_files(
+ dbconn,
+ meta["speciesid"],
+ meta["platformid"],
+ meta["probe-dataset-id"],
+ Path(meta["rqtl2-bundle-file"]))
+ if phenoexit != 0:
+ raise Exception("Processing 'pheno' file failed.")
+ logger.debug(
+ "pheno file processing completed successfully. (ExitCode: %s)",
+ phenoexit)
+ rconn.hset(str(jobid), "pheno-percent", "100")
+
+ return 0
+ except jobs.JobNotFound as exc:
+ logger.error("%s", exc.args)
+ except Exception as _exc:#pylint: disable=[broad-except]
+ logger.error("Exiting with generic error: %s", traceback.format_exc())
+
+ return 1
+
+if __name__ == "__main__":
+ def main():
+ """Run the `process_rqtl2_bundle` script."""
+ args = init_cli_parser(
+ "upload_rqtl2_bundle",
+ "Launch this to control the processing of R/qtl2 zip bundles."
+ ).parse_args()
+ check_db(args.databaseuri)
+ check_redis(args.redisuri)
+
+ jobid = args.jobid
+ with (database_connection(args.databaseuri) as dbconn,
+ Redis.from_url(args.redisuri, decode_responses=True) as rconn):
+ logger.addHandler(setup_redis_logger(
+ rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry))
+
+ exitcode = process_bundle(dbconn, rconn, args.jobid)
+ rconn.hset(str(args.jobid), "percent", "100")
+ return exitcode
+
+ sys.exit(main())