diff options
author | Frederick Muriuki Muriithi | 2024-02-13 13:06:29 +0300 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2024-02-13 13:06:29 +0300 |
commit | 1fda6924b4ac792e4fea42179f8e2242c1cd6dd5 (patch) | |
tree | 516ffd417017e62bc951b5b02b8efb1b5f1f5f0d | |
parent | 971d1383aa81947a1d43725150bcfa6eceec24f0 (diff) | |
download | gn-uploader-1fda6924b4ac792e4fea42179f8e2242c1cd6dd5.tar.gz |
Add some parallelism to the QC Checks
-rw-r--r-- | scripts/qc_on_rqtl2_bundle.py | 41 |
1 files changed, 35 insertions, 6 deletions
diff --git a/scripts/qc_on_rqtl2_bundle.py b/scripts/qc_on_rqtl2_bundle.py index b5b2059..dd5e73e 100644 --- a/scripts/qc_on_rqtl2_bundle.py +++ b/scripts/qc_on_rqtl2_bundle.py @@ -1,9 +1,11 @@ """Run Quality Control checks on R/qtl2 bundle.""" import sys import json +from time import sleep from zipfile import ZipFile from functools import partial from argparse import Namespace +import multiprocessing as mproc from logging import Logger, getLogger, StreamHandler from typing import Union, Sequence, Callable, Iterator @@ -180,12 +182,39 @@ def run_qc(rconn: Redis, if qc_missing_files(rconn, fqjobid, zfile, logger): return 1 - return ( - 1 if any(( - qc_geno_errors(rconn, fqjobid, zfile, logger), - qc_pheno_errors(rconn, fqjobid, zfile, logger), - qc_phenose_errors(rconn, fqjobid, zfile, logger), - qc_phenocovar_errors(rconn, fqjobid, zfile, logger))) + def with_zipfile(rconn, fqjobid, filename, logger, func): + with ZipFile(filename, "r") as zfile: + return func(rconn, fqjobid, zfile, logger) + + def buildargs(func): + return (rconn, fqjobid, jobmeta["rqtl2-bundle-file"], logger, func) + processes = [ + mproc.Process(target=with_zipfile, args=buildargs(qc_geno_errors,)), + mproc.Process(target=with_zipfile, args=buildargs(qc_pheno_errors,)), + mproc.Process(target=with_zipfile, args=buildargs(qc_phenose_errors,)), + mproc.Process(target=with_zipfile, args=buildargs(qc_phenocovar_errors,)) + ] + for process in processes: + process.start() + + while True: + processes_running = any( + (process.is_alive() for process in processes)) + if not processes_running: + break + sleep(2) + + if any((process.exitcode for process in processes)): + # at least one process failed for some reason... + return 1 + + def __fetch_errors__(rkey: str) -> tuple: + return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]")) + + return (1 if any(( + bool(__fetch_errors__(key)) + for key in + ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar"))) else 0) if __name__ == "__main__": |