1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
"""Run Quality Control checks on R/qtl2 bundle."""
import sys
import json
from typing import Sequence
from zipfile import ZipFile
from argparse import Namespace
from logging import Logger, getLogger, StreamHandler
from redis import Redis
from qc_app import jobs
from qc_app.db_utils import database_connection
from qc_app.check_connections import check_db, check_redis
from r_qtl import r_qtl2_qc as rqc
from r_qtl import fileerrors as rqfe
from scripts.cli_parser import init_cli_parser
from scripts.process_rqtl2_bundle import parse_job
from scripts.redis_logger import setup_redis_logger
def add_to_errors(rconn: Redis, fqjobid: str, key: str, errors: Sequence[rqfe.MissingFile]):
"""Add `errors` to a given list of errors"""
errs = tuple(set(
json.loads(rconn.hget(fqjobid, key) or "[]") +
[error.message for error in errors]))
rconn.hset(fqjobid, key, json.dumps(errs))
def run_qc(rconn: Redis, args: Namespace, logger: Logger) -> int:
"""Run the QC programs."""
fqjobid = jobs.job_key(args.redisprefix, args.jobid)
thejob = parse_job(rconn, args.redisprefix, args.jobid)
meta = thejob["job-metadata"]
with ZipFile(meta["rqtl2-bundle-file"], "r") as zfile:
missing = rqc.missing_files(zfile)
add_to_errors(rconn, fqjobid, "errors-generic", tuple(
rqfe.MissingFile(
mfile[0], mfile[1], (
f"File '{mfile[1]}' is listed in the control file under "
f"the '{mfile[0]}' key, but it does not actually exist in "
"the bundle."))
for mfile in missing))
if len(missing) > 0:
logger.error("Missing files in the bundle!")
return 1
return 0
if __name__ == "__main__":
def main():
"""Enter R/qtl2 bundle QC runner."""
args = init_cli_parser(
"qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.").parse_args()
check_redis(args.redisuri)
check_db(args.databaseuri)
logger = getLogger("qc-on-rqtl2-bundle")
logger.addHandler(StreamHandler(stream=sys.stderr))
logger.setLevel("DEBUG")
fqjobid = jobs.job_key(args.redisprefix, args.jobid)
with (database_connection(args.databaseuri) as _dbconn,
Redis.from_url(args.redisuri, decode_responses=True) as rconn):
logger.addHandler(setup_redis_logger(
rconn, fqjobid, f"{fqjobid}:log-messages",
args.redisexpiry))
exitcode = run_qc(rconn, args, logger)
rconn.hset(
jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode)
return exitcode
sys.exit(main())
|