aboutsummaryrefslogtreecommitdiff
path: root/scripts/process_rqtl2_bundle.py
blob: 4efc3e0744e7fa84edf686bbb4594de59ca2918f (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""Entry point for processing R/qtl2 bundles."""
import sys
import uuid
import json
import argparse
import traceback
from typing import Any
from pathlib import Path
from zipfile import ZipFile
from logging import Logger, getLogger, StreamHandler

import MySQLdb as mdb
from redis import Redis
from gn_libs.mysqldb import database_connection

from functional_tools import take

import r_qtl.r_qtl2 as rqtl2
import r_qtl.r_qtl2_qc as rqc
import r_qtl.exceptions as rqe

from uploader import jobs
from uploader.check_connections import check_db, check_redis

from scripts.cli_parser import init_cli_parser
from scripts.redis_logger import setup_redis_logger

from scripts.rqtl2.install_phenos import install_pheno_files
from scripts.rqtl2.install_genotypes import install_genotypes

def safe_json_decode(value: str) -> Any:
    """Safely decode the string values into JSON."""
    try:
        return json.loads(value)
    except json.decoder.JSONDecodeError:
        return value

def parse_job(rconn: Redis, rprefix: str, jobid: uuid.UUID) -> dict:
    """Load and parse job from Redis"""
    return {
        key: safe_json_decode(value)
            for key, value
        in jobs.job(rconn, rprefix, jobid).items()
    }

def has_geno_file(job: dict) -> bool:
    """Check whether to trigger processing of geno file(s)."""
    return bool(job.get("bundle-metadata", {}).get("geno-dataset-id"))

def has_pheno_file(job: dict) -> bool:
    """Check whether to trigger processing of pheno file(s)."""
    meta = job.get("bundle-metadata", {})
    return (bool(meta.get("probe-study-id"))
            and bool(meta.get("probe-dataset-id")))

def percent_completion(geno: float, pheno: float) -> float:
    """Compute the total completion percent."""
    return 0.5 * (geno + pheno)

def qc_r_qtl2_bundle(bundlefilepath, logger):
    """Run QC checks on the R/qtl2 bundle."""
    with ZipFile(bundlefilepath, "r") as zfile:
        logger.info("Validating the bundle ...")
        rqc.validate_bundle(zfile)
        logger.info(
            "Bundle successfully validated. All listed files are present.")

        cdata = rqtl2.control_data(zfile)
        if "geno" in cdata:
            gerr = False
            logger.info("Validating 'geno' file.")
            for error in take(rqc.geno_errors(zfile), 10):
                gerr = True
                logger.error("%s: [Line %s, Field %s]",
                             error[2], error[0], error[1])
            if gerr:
                logger.error("... more")
                raise rqe.InvalidFormat("'geno' file content contains errors.")
            logger.info("'geno' file validation was successful.")

def process_bundle(dbconn: mdb.Connection,
                   rconn: Redis,
                   rprefix: str,
                   jobid: uuid.UUID,
                   logger: Logger) -> int:
    """Process the R/qtl2 bundle."""
    try:
        thejob = parse_job(rconn, rprefix, jobid)
        meta = thejob["bundle-metadata"]
        qc_r_qtl2_bundle(meta["rqtl2-bundle-file"], logger)
        rconn.hset(jobs.job_key(rprefix, jobid), "geno-percent", "0")
        rconn.hset(jobs.job_key(rprefix, jobid), "pheno-percent", "0")

        if has_geno_file(thejob):
            logger.info("Processing geno files.")
            genoexit = install_genotypes(
                dbconn,
                argparse.Namespace(
                    speciesid=meta["speciesid"],
                    populationid=meta["populationid"],
                    datasetid=meta["geno-dataset-id"],
                    rqtl2bundle=Path(meta["rqtl2-bundle-file"])),
                logger)
            if genoexit != 0:
                raise Exception("Processing 'geno' file failed.")
            logger.debug(
                "geno file processing completed successfully. (ExitCode: %s)",
                genoexit)
            rconn.hset(jobs.job_key(rprefix, jobid), "geno-percent", "100")

        if has_pheno_file(thejob):
            phenoexit = install_pheno_files(
                dbconn,
                argparse.Namespace(
                    speciesid=meta["speciesid"],
                    platformid=meta["platformid"],
                    dataset_id=meta["probe-dataset-id"],
                    rqtl2bundle=Path(meta["rqtl2-bundle-file"])),
                logger)
            if phenoexit != 0:
                raise Exception("Processing 'pheno' file failed.")
            logger.debug(
                "pheno file processing completed successfully. (ExitCode: %s)",
                phenoexit)
            rconn.hset(jobs.job_key(rprefix, jobid), "pheno-percent", "100")

        return 0
    except jobs.JobNotFound as exc:
        logger.error("%s", exc.args)
    except Exception as _exc:#pylint: disable=[broad-except]
        logger.error("Exiting with generic error: %s", traceback.format_exc())

    return 1

if __name__ == "__main__":
    def main():
        """Run the `process_rqtl2_bundle` script."""
        args = init_cli_parser(
            "upload_rqtl2_bundle",
            "Launch this to control the processing of R/qtl2 zip bundles."
        ).parse_args()
        check_db(args.databaseuri)
        check_redis(args.redisuri)

        logger = getLogger("process_rqtl2_bundle")
        logger.addHandler(StreamHandler(stream=sys.stderr))
        logger.setLevel("DEBUG")

        fqjobid = jobs.job_key(args.redisprefix, args.jobid)
        with (database_connection(args.databaseuri) as dbconn,
              Redis.from_url(args.redisuri, decode_responses=True) as rconn):
            logger.addHandler(setup_redis_logger(
                rconn, fqjobid, f"{fqjobid}:log-messages",
                args.redisexpiry))

            exitcode = process_bundle(
                dbconn, rconn, args.redisprefix, args.jobid, logger)
            rconn.hset(
                jobs.job_key(args.redisprefix, args.jobid), "percent", "100")
            return exitcode

    sys.exit(main())