"""Run Quality Control checks on R/qtl2 bundle.""" import sys import json from pathlib import Path from zipfile import ZipFile from argparse import Namespace from typing import Union, Sequence from logging import Logger, getLogger, StreamHandler from redis import Redis from qc_app import jobs from qc_app.db_utils import database_connection from qc_app.check_connections import check_db, check_redis from r_qtl import r_qtl2_qc as rqc from r_qtl import fileerrors as rqfe from scripts.cli_parser import init_cli_parser from scripts.process_rqtl2_bundle import parse_job from scripts.redis_logger import setup_redis_logger def add_to_errors(rconn: Redis, fqjobid: str, key: str, errors: Sequence[rqfe.MissingFile]): """Add `errors` to a given list of errors""" errs = tuple(set( json.loads(rconn.hget(fqjobid, key) or "[]") + [error.message for error in errors])) rconn.hset(fqjobid, key, json.dumps(errs)) def qc_missing_files(rconn: Redis, fqjobid: str, bundlefilepath: Union[str, Path]) -> tuple[ tuple[str, str], ...]: """Run QC for files listed in control file that don't exist in bundle.""" with ZipFile(str(bundlefilepath), "r") as zfile: missing = rqc.missing_files(zfile) add_to_errors(rconn, fqjobid, "errors-generic", tuple( rqfe.MissingFile( mfile[0], mfile[1], ( f"File '{mfile[1]}' is listed in the control file under " f"the '{mfile[0]}' key, but it does not actually exist in " "the bundle.")) for mfile in missing)) return missing def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int: """Run the QC programs.""" fqjobid = jobs.job_key(args.redisprefix, args.jobid) thejob = parse_job(rconn, args.redisprefix, args.jobid) jobmeta = thejob["job-metadata"] if len(qc_missing_files(rconn, fqjobid, jobmeta["rqtl2-bundle-file"])) > 0: logger.error("Missing files in the bundle!") return 1 return 0 if __name__ == "__main__": def main(): """Enter R/qtl2 bundle QC runner.""" args = init_cli_parser( "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.").parse_args() check_redis(args.redisuri) check_db(args.databaseuri) logger = getLogger("qc-on-rqtl2-bundle") logger.addHandler(StreamHandler(stream=sys.stderr)) logger.setLevel("DEBUG") fqjobid = jobs.job_key(args.redisprefix, args.jobid) with (database_connection(args.databaseuri) as _dbconn, Redis.from_url(args.redisuri, decode_responses=True) as rconn): logger.addHandler(setup_redis_logger( rconn, fqjobid, f"{fqjobid}:log-messages", args.redisexpiry)) exitcode = run_qc(rconn, args, logger) rconn.hset( jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode) return exitcode sys.exit(main())