aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-02-13 13:06:29 +0300
committerFrederick Muriuki Muriithi2024-02-13 13:06:29 +0300
commit1fda6924b4ac792e4fea42179f8e2242c1cd6dd5 (patch)
tree516ffd417017e62bc951b5b02b8efb1b5f1f5f0d
parent971d1383aa81947a1d43725150bcfa6eceec24f0 (diff)
downloadgn-uploader-1fda6924b4ac792e4fea42179f8e2242c1cd6dd5.tar.gz
Add some parallelism to the QC Checks
-rw-r--r--scripts/qc_on_rqtl2_bundle.py41
1 files changed, 35 insertions, 6 deletions
diff --git a/scripts/qc_on_rqtl2_bundle.py b/scripts/qc_on_rqtl2_bundle.py
index b5b2059..dd5e73e 100644
--- a/scripts/qc_on_rqtl2_bundle.py
+++ b/scripts/qc_on_rqtl2_bundle.py
@@ -1,9 +1,11 @@
"""Run Quality Control checks on R/qtl2 bundle."""
import sys
import json
+from time import sleep
from zipfile import ZipFile
from functools import partial
from argparse import Namespace
+import multiprocessing as mproc
from logging import Logger, getLogger, StreamHandler
from typing import Union, Sequence, Callable, Iterator
@@ -180,12 +182,39 @@ def run_qc(rconn: Redis,
if qc_missing_files(rconn, fqjobid, zfile, logger):
return 1
- return (
- 1 if any((
- qc_geno_errors(rconn, fqjobid, zfile, logger),
- qc_pheno_errors(rconn, fqjobid, zfile, logger),
- qc_phenose_errors(rconn, fqjobid, zfile, logger),
- qc_phenocovar_errors(rconn, fqjobid, zfile, logger)))
+ def with_zipfile(rconn, fqjobid, filename, logger, func):
+ with ZipFile(filename, "r") as zfile:
+ return func(rconn, fqjobid, zfile, logger)
+
+ def buildargs(func):
+ return (rconn, fqjobid, jobmeta["rqtl2-bundle-file"], logger, func)
+ processes = [
+ mproc.Process(target=with_zipfile, args=buildargs(qc_geno_errors,)),
+ mproc.Process(target=with_zipfile, args=buildargs(qc_pheno_errors,)),
+ mproc.Process(target=with_zipfile, args=buildargs(qc_phenose_errors,)),
+ mproc.Process(target=with_zipfile, args=buildargs(qc_phenocovar_errors,))
+ ]
+ for process in processes:
+ process.start()
+
+ while True:
+ processes_running = any(
+ (process.is_alive() for process in processes))
+ if not processes_running:
+ break
+ sleep(2)
+
+ if any((process.exitcode for process in processes)):
+ # at least one process failed for some reason...
+ return 1
+
+ def __fetch_errors__(rkey: str) -> tuple:
+ return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]"))
+
+ return (1 if any((
+ bool(__fetch_errors__(key))
+ for key in
+ ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar")))
else 0)
if __name__ == "__main__":