1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
"""Run Quality Control checks on R/qtl2 bundle."""
import sys
import json
from pathlib import Path
from zipfile import ZipFile
from argparse import Namespace
from typing import Union, Sequence
from logging import Logger, getLogger, StreamHandler
from redis import Redis
from qc_app import jobs
from qc_app.db_utils import database_connection
from qc_app.check_connections import check_db, check_redis
from r_qtl import r_qtl2_qc as rqc
from r_qtl import fileerrors as rqfe
from scripts.cli_parser import init_cli_parser
from scripts.process_rqtl2_bundle import parse_job
from scripts.redis_logger import setup_redis_logger
def add_to_errors(rconn: Redis, fqjobid: str, key: str, errors: Sequence[rqfe.MissingFile]):
"""Add `errors` to a given list of errors"""
errs = tuple(set(
json.loads(rconn.hget(fqjobid, key) or "[]") +
[error.message for error in errors]))
rconn.hset(fqjobid, key, json.dumps(errs))
def qc_missing_files(rconn: Redis, fqjobid: str,
bundlefilepath: Union[str, Path]) -> tuple[
tuple[str, str], ...]:
"""Run QC for files listed in control file that don't exist in bundle."""
with ZipFile(str(bundlefilepath), "r") as zfile:
missing = rqc.missing_files(zfile)
add_to_errors(rconn, fqjobid, "errors-generic", tuple(
rqfe.MissingFile(
mfile[0], mfile[1], (
f"File '{mfile[1]}' is listed in the control file under "
f"the '{mfile[0]}' key, but it does not actually exist in "
"the bundle."))
for mfile in missing))
return missing
def qc_geno_errors(_rconn, _fqjobid, _job) -> bool:
"""Check for errors in `geno` file(s)."""
return False
def qc_pheno_errors(_rconn, _fqjobid, _job) -> bool:
"""Check for errors in `pheno` file(s)."""
return False
def qc_phenocovar_errors(_rconn, _fqjobid, _job) -> bool:
"""Check for errors in `phenocovar` file(s)."""
return False
def run_qc(rconn: Redis,
args: Namespace,
logger: Logger) -> int:
"""Run the QC programs."""
fqjobid = jobs.job_key(args.redisprefix, args.jobid)
thejob = parse_job(rconn, args.redisprefix, args.jobid)
jobmeta = thejob["job-metadata"]
if len(qc_missing_files(rconn, fqjobid, jobmeta["rqtl2-bundle-file"])) > 0:
logger.error("Missing files in the bundle!")
return 1
return (
1 if any((
qc_geno_errors(rconn, fqjobid, thejob),
qc_pheno_errors(rconn, fqjobid, thejob),
qc_phenocovar_errors(rconn, fqjobid, thejob)))
else 0)
if __name__ == "__main__":
def main():
"""Enter R/qtl2 bundle QC runner."""
args = init_cli_parser(
"qc-on-rqtl2-bundle", "Run QC on R/qtl2 bundle.").parse_args()
check_redis(args.redisuri)
check_db(args.databaseuri)
logger = getLogger("qc-on-rqtl2-bundle")
logger.addHandler(StreamHandler(stream=sys.stderr))
logger.setLevel("DEBUG")
fqjobid = jobs.job_key(args.redisprefix, args.jobid)
with (database_connection(args.databaseuri) as _dbconn,
Redis.from_url(args.redisuri, decode_responses=True) as rconn):
logger.addHandler(setup_redis_logger(
rconn, fqjobid, f"{fqjobid}:log-messages",
args.redisexpiry))
exitcode = run_qc(rconn, args, logger)
rconn.hset(
jobs.job_key(args.redisprefix, args.jobid), "exitcode", exitcode)
return exitcode
sys.exit(main())
|