about summary refs log tree commit diff
path: root/scripts
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-02-13 13:06:29 +0300
committerFrederick Muriuki Muriithi2024-02-13 13:06:29 +0300
commit1fda6924b4ac792e4fea42179f8e2242c1cd6dd5 (patch)
tree516ffd417017e62bc951b5b02b8efb1b5f1f5f0d /scripts
parent971d1383aa81947a1d43725150bcfa6eceec24f0 (diff)
downloadgn-uploader-1fda6924b4ac792e4fea42179f8e2242c1cd6dd5.tar.gz
Add some parallelism to the QC Checks
Diffstat (limited to 'scripts')
-rw-r--r--scripts/qc_on_rqtl2_bundle.py41
1 files changed, 35 insertions, 6 deletions
diff --git a/scripts/qc_on_rqtl2_bundle.py b/scripts/qc_on_rqtl2_bundle.py
index b5b2059..dd5e73e 100644
--- a/scripts/qc_on_rqtl2_bundle.py
+++ b/scripts/qc_on_rqtl2_bundle.py
@@ -1,9 +1,11 @@
 """Run Quality Control checks on R/qtl2 bundle."""
 import sys
 import json
+from time import sleep
 from zipfile import ZipFile
 from functools import partial
 from argparse import Namespace
+import multiprocessing as mproc
 from logging import Logger, getLogger, StreamHandler
 from typing import Union, Sequence, Callable, Iterator
 
@@ -180,12 +182,39 @@ def run_qc(rconn: Redis,
         if qc_missing_files(rconn, fqjobid, zfile, logger):
             return 1
 
-        return (
-            1 if any((
-                qc_geno_errors(rconn, fqjobid, zfile, logger),
-                qc_pheno_errors(rconn, fqjobid, zfile, logger),
-                qc_phenose_errors(rconn, fqjobid, zfile, logger),
-                qc_phenocovar_errors(rconn, fqjobid, zfile, logger)))
+    def with_zipfile(rconn, fqjobid, filename, logger, func):
+        with ZipFile(filename, "r") as zfile:
+            return func(rconn, fqjobid, zfile, logger)
+
+    def buildargs(func):
+        return (rconn, fqjobid, jobmeta["rqtl2-bundle-file"], logger, func)
+    processes = [
+        mproc.Process(target=with_zipfile, args=buildargs(qc_geno_errors,)),
+        mproc.Process(target=with_zipfile, args=buildargs(qc_pheno_errors,)),
+        mproc.Process(target=with_zipfile, args=buildargs(qc_phenose_errors,)),
+        mproc.Process(target=with_zipfile, args=buildargs(qc_phenocovar_errors,))
+    ]
+    for process in processes:
+        process.start()
+
+    while True:
+        processes_running = any(
+            (process.is_alive() for process in processes))
+        if not processes_running:
+            break
+        sleep(2)
+
+    if any((process.exitcode for process in processes)):
+        # at least one process failed for some reason...
+        return 1
+
+    def __fetch_errors__(rkey: str) -> tuple:
+        return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]"))
+
+    return (1 if any((
+        bool(__fetch_errors__(key))
+        for key in
+        ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar")))
             else 0)
 
 if __name__ == "__main__":