1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
"""Entry point for processing R/qtl2 bundles."""
import sys
import uuid
import json
import logging
import traceback
from typing import Any
from pathlib import Path
import MySQLdb as mdb
from redis import Redis
from qc_app import jobs
from qc_app.db_utils import database_connection
from qc_app.check_connections import check_db, check_redis
from scripts.cli_parser import init_cli_parser
from scripts.redis_logger import setup_redis_logger
from scripts.rqtl2.install_phenos import install_pheno_files
from scripts.rqtl2.install_genotypes import install_genotypes
stderr_handler = logging.StreamHandler(stream=sys.stderr)
logger = logging.getLogger("process_rqtl2_bundle")
logger.setLevel("DEBUG")
logger.addHandler(stderr_handler)
def safe_json_decode(value: str) -> Any:
"""Safely decode the string values into JSON."""
try:
return json.loads(value)
except json.decoder.JSONDecodeError:
return value
def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict:
"""Load and parse job from Redis"""
return {
key: safe_json_decode(value)
for key, value
in jobs.job(rconn, jobid).items()
}
def has_geno_file(job: dict) -> bool:
"""Check whether to trigger processing of geno file(s)."""
return bool(job.get("bundle-metadata", {}).get("geno-dataset-id"))
def has_pheno_file(job: dict) -> bool:
"""Check whether to trigger processing of pheno file(s)."""
meta = job.get("bundle-metadata", {})
return (bool(meta.get("probe-study-id"))
and bool(meta.get("probe-dataset-id")))
def percent_completion(geno: float, pheno: float) -> float:
"""Compute the total completion percent."""
return 0.5 * (geno + pheno)
def process_bundle(dbconn: mdb.Connection, rconn: Redis, jobid: uuid.UUID) -> int:
"""Process the R/qtl2 bundle."""
try:
thejob = parse_job(rconn, jobid)
meta = thejob["bundle-metadata"]
logger.debug("The metadata: %s", meta)
rconn.hset(str(jobid), "geno-percent", "0")
rconn.hset(str(jobid), "pheno-percent", "0")
if has_geno_file(thejob):
logger.info("Processing geno files.")
genoexit = install_genotypes(
dbconn,
meta["speciesid"],
meta["populationid"],
meta["geno-dataset-id"],
Path(meta["rqtl2-bundle-file"]))
if genoexit != 0:
raise Exception("Processing 'geno' file failed.")
logger.debug(
"geno file processing completed successfully. (ExitCode: %s)",
genoexit)
rconn.hset(str(jobid), "geno-percent", "100")
if has_pheno_file(thejob):
phenoexit = install_pheno_files(
dbconn,
meta["speciesid"],
meta["platformid"],
meta["probe-dataset-id"],
Path(meta["rqtl2-bundle-file"]))
if phenoexit != 0:
raise Exception("Processing 'pheno' file failed.")
logger.debug(
"pheno file processing completed successfully. (ExitCode: %s)",
phenoexit)
rconn.hset(str(jobid), "pheno-percent", "100")
return 0
except jobs.JobNotFound as exc:
logger.error("%s", exc.args)
except Exception as _exc:#pylint: disable=[broad-except]
logger.error("Exiting with generic error: %s", traceback.format_exc())
return 1
if __name__ == "__main__":
def main():
"""Run the `process_rqtl2_bundle` script."""
args = init_cli_parser(
"upload_rqtl2_bundle",
"Launch this to control the processing of R/qtl2 zip bundles."
).parse_args()
check_db(args.databaseuri)
check_redis(args.redisuri)
jobid = args.jobid
with (database_connection(args.databaseuri) as dbconn,
Redis.from_url(args.redisuri, decode_responses=True) as rconn):
logger.addHandler(setup_redis_logger(
rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry))
exitcode = process_bundle(dbconn, rconn, args.jobid)
rconn.hset(str(args.jobid), "percent", "100")
return exitcode
sys.exit(main())
|