"""Run Quality Control checks on R/qtl2 bundle.""" import sys import json from typing import Sequence from zipfile import ZipFile from argparse import Namespace from logging import Logger, getLogger, StreamHandler from redis import Redis from qc_app import jobs from qc_app.db_utils import database_connection from qc_app.check_connections import check_db, check_redis from r_qtl import r_qtl2_qc as rqc from r_qtl import fileerrors as rqfe from scripts.cli_parser import init_cli_parser from scripts.process_rqtl2_bundle import parse_job from scripts.redis_logger import setup_redis_logger def add_to_errors(rconn: Redis, fqjobid: str, key: str, errors: Sequence[rqfe.MissingFile]): """Add `errors` to a given list of errors""" errs = tuple(set( json.loads(rconn.hget(fqjobid, key) or "[]") + [error.message for error in errors])) rconn.hset(fqjobid, key, json.dumps(errs)) def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int: """Run the QC programs.""" fqjobid = jobs.job_key(args.redisprefix, args.jobid) thejob = parse_job(rconn, args.redisprefix, args.jobid) meta = thejob["job-metadata"] with ZipFile(meta["rqtl2-bundle-file"], "r") as zfile: missing = rqc.missing_files(zfile) add_to_errors(rconn, fqjobid, "errors-generic", tuple( rqfe.MissingFile( mfile[0], mfile[1], ( f"File '{mfile[1]}' is listed in the control file under " f"the '{mfile[0]}' key, but it does not actually exist in " "the bundle.")) for mfile in missing)) if len(missing) > 0: logger.error("Missing files in the bundle!") return 1 return 0 if __name__ == "__main__": def main(): """Enter R/qtl2 bundle QC runner.""" args = init_cli_parser( "qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.").parse_args() check_redis(args.redisuri) check_db(args.databaseuri) logger = getLogger("qc-on-rqtl2-bundle") logger.addHandler(StreamHandler(stream=sys.stderr)) logger.setLevel("DEBUG") fqjobid = jobs.job_key(args.redisprefix, args.jobid) with (database_connection(args.databaseuri) as _dbconn, Redis.from_url(args.redisuri, decode_responses=True) as rconn): logger.addHandler(setup_redis_logger( rconn, fqjobid, f"{fqjobid}:log-messages", args.redisexpiry)) exitcode = run_qc(rconn, args, logger) rconn.hset( jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode) return exitcode sys.exit(main())