From 1fda6924b4ac792e4fea42179f8e2242c1cd6dd5 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Tue, 13 Feb 2024 13:06:29 +0300 Subject: Add some parallelism to the QC Checks --- scripts/qc_on_rqtl2_bundle.py | 41 +++++++++++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 6 deletions(-) (limited to 'scripts') diff --git a/scripts/qc_on_rqtl2_bundle.py b/scripts/qc_on_rqtl2_bundle.py index b5b2059..dd5e73e 100644 --- a/scripts/qc_on_rqtl2_bundle.py +++ b/scripts/qc_on_rqtl2_bundle.py @@ -1,9 +1,11 @@ """Run Quality Control checks on R/qtl2 bundle.""" import sys import json +from time import sleep from zipfile import ZipFile from functools import partial from argparse import Namespace +import multiprocessing as mproc from logging import Logger, getLogger, StreamHandler from typing import Union, Sequence, Callable, Iterator @@ -180,12 +182,39 @@ def run_qc(rconn: Redis, if qc_missing_files(rconn, fqjobid, zfile, logger): return 1 - return ( - 1 if any(( - qc_geno_errors(rconn, fqjobid, zfile, logger), - qc_pheno_errors(rconn, fqjobid, zfile, logger), - qc_phenose_errors(rconn, fqjobid, zfile, logger), - qc_phenocovar_errors(rconn, fqjobid, zfile, logger))) + def with_zipfile(rconn, fqjobid, filename, logger, func): + with ZipFile(filename, "r") as zfile: + return func(rconn, fqjobid, zfile, logger) + + def buildargs(func): + return (rconn, fqjobid, jobmeta["rqtl2-bundle-file"], logger, func) + processes = [ + mproc.Process(target=with_zipfile, args=buildargs(qc_geno_errors,)), + mproc.Process(target=with_zipfile, args=buildargs(qc_pheno_errors,)), + mproc.Process(target=with_zipfile, args=buildargs(qc_phenose_errors,)), + mproc.Process(target=with_zipfile, args=buildargs(qc_phenocovar_errors,)) + ] + for process in processes: + process.start() + + while True: + processes_running = any( + (process.is_alive() for process in processes)) + if not processes_running: + break + sleep(2) + + if any((process.exitcode for process in processes)): + # at least one process failed for some reason... + return 1 + + def __fetch_errors__(rkey: str) -> tuple: + return tuple(json.loads(rconn.hget(fqjobid, rkey) or "[]")) + + return (1 if any(( + bool(__fetch_errors__(key)) + for key in + ("errors-geno", "errors-pheno", "errors-phenos", "errors-phenocovar"))) else 0) if __name__ == "__main__": -- cgit v1.2.3