aboutsummaryrefslogtreecommitdiff
path: root/scripts/process_rqtl2_bundle.py
blob: 105f787c7cc92a93aab5f0314a18fe3466935263 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""Entry point for processing R/qtl2 bundles."""
import sys
import uuid
import json
import logging
import traceback
from typing import Any
from pathlib import Path

import MySQLdb as mdb
from redis import Redis

from qc_app import jobs
from qc_app.db_utils import database_connection
from qc_app.check_connections import check_db, check_redis

from scripts.cli_parser import init_cli_parser
from scripts.redis_logger import setup_redis_logger

from scripts.rqtl2.install_phenos import install_pheno_files
from scripts.rqtl2.install_genotypes import install_genotypes

stderr_handler = logging.StreamHandler(stream=sys.stderr)
logger = logging.getLogger("process_rqtl2_bundle")
logger.setLevel("DEBUG")
logger.addHandler(stderr_handler)

def safe_json_decode(value: str) -> Any:
    """Safely decode the string values into JSON."""
    try:
        return json.loads(value)
    except json.decoder.JSONDecodeError:
        return value

def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict:
    """Load and parse job from Redis"""
    return {
        key: safe_json_decode(value)
            for key, value
        in jobs.job(rconn, jobid).items()
    }

def has_geno_file(job: dict) -> bool:
    """Check whether to trigger processing of geno file(s)."""
    return bool(job.get("bundle-metadata", {}).get("geno-dataset-id"))

def has_pheno_file(job: dict) -> bool:
    """Check whether to trigger processing of pheno file(s)."""
    meta = job.get("bundle-metadata", {})
    return (bool(meta.get("probe-study-id"))
            and bool(meta.get("probe-dataset-id")))

def percent_completion(geno: float, pheno: float) -> float:
    """Compute the total completion percent."""
    return 0.5 * (geno + pheno)

def process_bundle(dbconn: mdb.Connection, rconn: Redis, jobid: uuid.UUID) -> int:
    """Process the R/qtl2 bundle."""
    try:
        thejob = parse_job(rconn, jobid)
        meta = thejob["bundle-metadata"]
        logger.debug("The metadata: %s", meta)
        rconn.hset(str(jobid), "geno-percent", "0")
        rconn.hset(str(jobid), "pheno-percent", "0")

        if has_geno_file(thejob):
            logger.info("Processing geno files.")
            genoexit = install_genotypes(
                dbconn,
                meta["speciesid"],
                meta["populationid"],
                meta["geno-dataset-id"],
                Path(meta["rqtl2-bundle-file"]))
            if genoexit != 0:
                raise Exception("Processing 'geno' file failed.")
            logger.debug(
                "geno file processing completed successfully. (ExitCode: %s)",
                genoexit)
            rconn.hset(str(jobid), "geno-percent", "100")

        if has_pheno_file(thejob):
            phenoexit = install_pheno_files(
                dbconn,
                meta["speciesid"],
                meta["platformid"],
                meta["probe-dataset-id"],
                Path(meta["rqtl2-bundle-file"]))
            if phenoexit != 0:
                raise Exception("Processing 'pheno' file failed.")
            logger.debug(
                "pheno file processing completed successfully. (ExitCode: %s)",
                phenoexit)
            rconn.hset(str(jobid), "pheno-percent", "100")

        return 0
    except jobs.JobNotFound as exc:
        logger.error("%s", exc.args)
    except Exception as _exc:#pylint: disable=[broad-except]
        logger.error("Exiting with generic error: %s", traceback.format_exc())

    return 1

if __name__ == "__main__":
    def main():
        """Run the `process_rqtl2_bundle` script."""
        args = init_cli_parser(
            "upload_rqtl2_bundle",
            "Launch this to control the processing of R/qtl2 zip bundles."
        ).parse_args()
        check_db(args.databaseuri)
        check_redis(args.redisuri)

        jobid = args.jobid
        with (database_connection(args.databaseuri) as dbconn,
              Redis.from_url(args.redisuri, decode_responses=True) as rconn):
            logger.addHandler(setup_redis_logger(
                rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry))

            exitcode = process_bundle(dbconn, rconn, args.jobid)
            rconn.hset(str(args.jobid), "percent", "100")
            return exitcode

    sys.exit(main())