about summary refs log tree commit diff
path: root/scripts/process_rqtl2_bundle.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/process_rqtl2_bundle.py')
-rw-r--r--scripts/process_rqtl2_bundle.py123
1 files changed, 123 insertions, 0 deletions
diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py
new file mode 100644
index 0000000..105f787
--- /dev/null
+++ b/scripts/process_rqtl2_bundle.py
@@ -0,0 +1,123 @@
+"""Entry point for processing R/qtl2 bundles."""
+import sys
+import uuid
+import json
+import logging
+import traceback
+from typing import Any
+from pathlib import Path
+
+import MySQLdb as mdb
+from redis import Redis
+
+from qc_app import jobs
+from qc_app.db_utils import database_connection
+from qc_app.check_connections import check_db, check_redis
+
+from scripts.cli_parser import init_cli_parser
+from scripts.redis_logger import setup_redis_logger
+
+from scripts.rqtl2.install_phenos import install_pheno_files
+from scripts.rqtl2.install_genotypes import install_genotypes
+
+stderr_handler = logging.StreamHandler(stream=sys.stderr)
+logger = logging.getLogger("process_rqtl2_bundle")
+logger.setLevel("DEBUG")
+logger.addHandler(stderr_handler)
+
+def safe_json_decode(value: str) -> Any:
+    """Safely decode the string values into JSON."""
+    try:
+        return json.loads(value)
+    except json.decoder.JSONDecodeError:
+        return value
+
+def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict:
+    """Load and parse job from Redis"""
+    return {
+        key: safe_json_decode(value)
+            for key, value
+        in jobs.job(rconn, jobid).items()
+    }
+
+def has_geno_file(job: dict) -> bool:
+    """Check whether to trigger processing of geno file(s)."""
+    return bool(job.get("bundle-metadata", {}).get("geno-dataset-id"))
+
+def has_pheno_file(job: dict) -> bool:
+    """Check whether to trigger processing of pheno file(s)."""
+    meta = job.get("bundle-metadata", {})
+    return (bool(meta.get("probe-study-id"))
+            and bool(meta.get("probe-dataset-id")))
+
+def percent_completion(geno: float, pheno: float) -> float:
+    """Compute the total completion percent."""
+    return 0.5 * (geno + pheno)
+
+def process_bundle(dbconn: mdb.Connection, rconn: Redis, jobid: uuid.UUID) -> int:
+    """Process the R/qtl2 bundle."""
+    try:
+        thejob = parse_job(rconn, jobid)
+        meta = thejob["bundle-metadata"]
+        logger.debug("The metadata: %s", meta)
+        rconn.hset(str(jobid), "geno-percent", "0")
+        rconn.hset(str(jobid), "pheno-percent", "0")
+
+        if has_geno_file(thejob):
+            logger.info("Processing geno files.")
+            genoexit = install_genotypes(
+                dbconn,
+                meta["speciesid"],
+                meta["populationid"],
+                meta["geno-dataset-id"],
+                Path(meta["rqtl2-bundle-file"]))
+            if genoexit != 0:
+                raise Exception("Processing 'geno' file failed.")
+            logger.debug(
+                "geno file processing completed successfully. (ExitCode: %s)",
+                genoexit)
+            rconn.hset(str(jobid), "geno-percent", "100")
+
+        if has_pheno_file(thejob):
+            phenoexit = install_pheno_files(
+                dbconn,
+                meta["speciesid"],
+                meta["platformid"],
+                meta["probe-dataset-id"],
+                Path(meta["rqtl2-bundle-file"]))
+            if phenoexit != 0:
+                raise Exception("Processing 'pheno' file failed.")
+            logger.debug(
+                "pheno file processing completed successfully. (ExitCode: %s)",
+                phenoexit)
+            rconn.hset(str(jobid), "pheno-percent", "100")
+
+        return 0
+    except jobs.JobNotFound as exc:
+        logger.error("%s", exc.args)
+    except Exception as _exc:#pylint: disable=[broad-except]
+        logger.error("Exiting with generic error: %s", traceback.format_exc())
+
+    return 1
+
+if __name__ == "__main__":
+    def main():
+        """Run the `process_rqtl2_bundle` script."""
+        args = init_cli_parser(
+            "upload_rqtl2_bundle",
+            "Launch this to control the processing of R/qtl2 zip bundles."
+        ).parse_args()
+        check_db(args.databaseuri)
+        check_redis(args.redisuri)
+
+        jobid = args.jobid
+        with (database_connection(args.databaseuri) as dbconn,
+              Redis.from_url(args.redisuri, decode_responses=True) as rconn):
+            logger.addHandler(setup_redis_logger(
+                rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry))
+
+            exitcode = process_bundle(dbconn, rconn, args.jobid)
+            rconn.hset(str(args.jobid), "percent", "100")
+            return exitcode
+
+    sys.exit(main())