aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qc_app/jobs.py15
-rw-r--r--qc_app/upload/rqtl2.py44
-rw-r--r--scripts/cli_parser.py22
-rw-r--r--scripts/process_rqtl2_bundle.py123
-rw-r--r--scripts/rqtl2/entry.py3
-rw-r--r--scripts/rqtl2/install_genotypes.py8
-rw-r--r--scripts/rqtl2/install_phenos.py8
7 files changed, 200 insertions, 23 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index c5bf5e5..a8257a3 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -3,11 +3,19 @@ import os
import sys
import shlex
import subprocess
-from uuid import uuid4
+from typing import Union
+from uuid import UUID, uuid4
from datetime import timedelta
from redis import Redis
+class JobNotFound(Exception):
+ """Raised if we try to retrieve a non-existent job."""
+
+def raise_jobnotfound(jobid: Union[str,UUID]):
+ """Utility to raise a `NoSuchJobError`"""
+ raise JobNotFound(f"Could not retrieve job '{jobid}'.")
+
def error_filename(job_id, error_dir):
"Compute the path of the file where errors will be dumped."
return f"{error_dir}/job_{job_id}.error"
@@ -70,6 +78,7 @@ def launch_job(the_job: dict, redisurl: str, error_dir):
return the_job
-def job(redis_conn, job_id: str):
+def job(redis_conn, job_id: Union[str,UUID]):
"Retrieve the job"
- return redis_conn.hgetall(job_id)
+ thejob = redis_conn.hgetall(str(job_id)) or raise_jobnotfound(job_id)
+ return thejob
diff --git a/qc_app/upload/rqtl2.py b/qc_app/upload/rqtl2.py
index 7609fa9..7ba90c2 100644
--- a/qc_app/upload/rqtl2.py
+++ b/qc_app/upload/rqtl2.py
@@ -1,8 +1,12 @@
"""Module to handle uploading of R/qtl2 bundles."""
+import sys
+import json
+from uuid import uuid4
from pathlib import Path
from datetime import date
from zipfile import ZipFile, is_zipfile
+from redis import Redis
from MySQLdb.cursors import DictCursor
from flask import (
flash,
@@ -16,6 +20,7 @@ from flask import (
from r_qtl import r_qtl2
from r_qtl.errors import InvalidFormat
+from qc_app import jobs
from qc_app.files import save_file, fullpath
from qc_app.dbinsert import species as all_species
from qc_app.db_utils import with_db_connection, database_connection
@@ -521,6 +526,7 @@ def select_dataset_info(species_id: int, population_id: int):
return render_template("rqtl2/summary-info.html",
species=species,
population=population,
+ rqtl2_bundle_file=thefile.name,
geno_dataset=geno_dataset,
probe_study=probeset_study,
probe_dataset=probeset_dataset)
@@ -532,11 +538,47 @@ def select_dataset_info(species_id: int, population_id: int):
methods=["POST"])
def confirm_bundle_details(species_id: int, population_id: int):
"""Confirm the details and trigger R/qtl2 bundle processing..."""
- with database_connection(app.config["SQL_URI"]) as conn:
+ redisuri = app.config["REDIS_URL"]
+ with (database_connection(app.config["SQL_URI"]) as conn,
+ Redis.from_url(redisuri, decode_responses=True) as rconn):
error = check_errors(
conn, "species", "population", "rqtl2_bundle_file", "geno-dataset",
"probe-study-id", "probe-dataset-id")
if bool(error):
return error
+ redis_ttl_seconds = app.config["JOBS_TTL_SECONDS"]
+ jobid = str(uuid4())
+ _job = jobs.launch_job(
+ jobs.initialise_job(
+ rconn,
+ jobid,
+ [
+ sys.executable, "-m", "scripts.process_rqtl2_bundle",
+ app.config["SQL_URI"], app.config["REDIS_URL"], jobid,
+ "--redisexpiry", str(redis_ttl_seconds)],
+ "R/qtl2 Bundle Upload",
+ redis_ttl_seconds,
+ {
+ "bundle-metadata": json.dumps({
+ "speciesid": species_id,
+ "populationid": population_id,
+ "rqtl2-bundle-file": str(fullpath(
+ request.form["rqtl2_bundle_file"])),
+ "geno-dataset-id": request.form.get(
+ "geno-dataset-id", ""),
+ "probe-study-id": request.form.get(
+ "probe-study-id", ""),
+ "probe-dataset-id": request.form.get(
+ "probe-dataset-id", ""),
+ **({
+ "platformid": probeset_study_by_id(
+ conn,
+ int(request.form["probe-study-id"]))["ChipId"]
+ } if bool(request.form.get("probe-study-id")) else {})
+ })
+ }),
+ redisuri,
+ f"{app.config['UPLOAD_FOLDER']}/job_errors")
+
raise NotImplementedError
diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py
index bceb3f4..e8f030c 100644
--- a/scripts/cli_parser.py
+++ b/scripts/cli_parser.py
@@ -7,16 +7,20 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument
"""Initialise the CLI arguments parser."""
parser = ArgumentParser(prog=program, description=description)
- parser.add_argument("databaseuri", help="URL to MariaDB")
- parser.add_argument("redisuri", help="URL to Redis")
- parser.add_argument("jobid",
- help="Job ID that this belongs to.",
- type=UUID)
- parser.add_argument("--redisexpiry",
- help="How long to keep any redis keys around.",
- type=int,
- default=86400)
+ parser.add_argument(
+ "databaseuri", type=str, help="URI to connect to MariaDB")
+ parser.add_argument(
+ "redisuri", type=str, help="URI to connect to the redis server.")
+ parser.add_argument("jobid", type=UUID, help="Job ID that this belongs to")
+ parser.add_argument(
+ "--redisexpiry",
+ type=int,
+ default=86400,
+ help="How long to keep any redis keys around.")
+ return parser
+def add_global_data_arguments(parser: ArgumentParser) -> ArgumentParser:
+ """Add the global (present in nearly ALL scripts) CLI arguments."""
parser.add_argument("speciesid",
type=int,
help="Species to which bundle relates.")
diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py
new file mode 100644
index 0000000..105f787
--- /dev/null
+++ b/scripts/process_rqtl2_bundle.py
@@ -0,0 +1,123 @@
+"""Entry point for processing R/qtl2 bundles."""
+import sys
+import uuid
+import json
+import logging
+import traceback
+from typing import Any
+from pathlib import Path
+
+import MySQLdb as mdb
+from redis import Redis
+
+from qc_app import jobs
+from qc_app.db_utils import database_connection
+from qc_app.check_connections import check_db, check_redis
+
+from scripts.cli_parser import init_cli_parser
+from scripts.redis_logger import setup_redis_logger
+
+from scripts.rqtl2.install_phenos import install_pheno_files
+from scripts.rqtl2.install_genotypes import install_genotypes
+
+stderr_handler = logging.StreamHandler(stream=sys.stderr)
+logger = logging.getLogger("process_rqtl2_bundle")
+logger.setLevel("DEBUG")
+logger.addHandler(stderr_handler)
+
+def safe_json_decode(value: str) -> Any:
+ """Safely decode the string values into JSON."""
+ try:
+ return json.loads(value)
+ except json.decoder.JSONDecodeError:
+ return value
+
+def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict:
+ """Load and parse job from Redis"""
+ return {
+ key: safe_json_decode(value)
+ for key, value
+ in jobs.job(rconn, jobid).items()
+ }
+
+def has_geno_file(job: dict) -> bool:
+ """Check whether to trigger processing of geno file(s)."""
+ return bool(job.get("bundle-metadata", {}).get("geno-dataset-id"))
+
+def has_pheno_file(job: dict) -> bool:
+ """Check whether to trigger processing of pheno file(s)."""
+ meta = job.get("bundle-metadata", {})
+ return (bool(meta.get("probe-study-id"))
+ and bool(meta.get("probe-dataset-id")))
+
+def percent_completion(geno: float, pheno: float) -> float:
+ """Compute the total completion percent."""
+ return 0.5 * (geno + pheno)
+
+def process_bundle(dbconn: mdb.Connection, rconn: Redis, jobid: uuid.UUID) -> int:
+ """Process the R/qtl2 bundle."""
+ try:
+ thejob = parse_job(rconn, jobid)
+ meta = thejob["bundle-metadata"]
+ logger.debug("The metadata: %s", meta)
+ rconn.hset(str(jobid), "geno-percent", "0")
+ rconn.hset(str(jobid), "pheno-percent", "0")
+
+ if has_geno_file(thejob):
+ logger.info("Processing geno files.")
+ genoexit = install_genotypes(
+ dbconn,
+ meta["speciesid"],
+ meta["populationid"],
+ meta["geno-dataset-id"],
+ Path(meta["rqtl2-bundle-file"]))
+ if genoexit != 0:
+ raise Exception("Processing 'geno' file failed.")
+ logger.debug(
+ "geno file processing completed successfully. (ExitCode: %s)",
+ genoexit)
+ rconn.hset(str(jobid), "geno-percent", "100")
+
+ if has_pheno_file(thejob):
+ phenoexit = install_pheno_files(
+ dbconn,
+ meta["speciesid"],
+ meta["platformid"],
+ meta["probe-dataset-id"],
+ Path(meta["rqtl2-bundle-file"]))
+ if phenoexit != 0:
+ raise Exception("Processing 'pheno' file failed.")
+ logger.debug(
+ "pheno file processing completed successfully. (ExitCode: %s)",
+ phenoexit)
+ rconn.hset(str(jobid), "pheno-percent", "100")
+
+ return 0
+ except jobs.JobNotFound as exc:
+ logger.error("%s", exc.args)
+ except Exception as _exc:#pylint: disable=[broad-except]
+ logger.error("Exiting with generic error: %s", traceback.format_exc())
+
+ return 1
+
+if __name__ == "__main__":
+ def main():
+ """Run the `process_rqtl2_bundle` script."""
+ args = init_cli_parser(
+ "upload_rqtl2_bundle",
+ "Launch this to control the processing of R/qtl2 zip bundles."
+ ).parse_args()
+ check_db(args.databaseuri)
+ check_redis(args.redisuri)
+
+ jobid = args.jobid
+ with (database_connection(args.databaseuri) as dbconn,
+ Redis.from_url(args.redisuri, decode_responses=True) as rconn):
+ logger.addHandler(setup_redis_logger(
+ rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry))
+
+ exitcode = process_bundle(dbconn, rconn, args.jobid)
+ rconn.hset(str(args.jobid), "percent", "100")
+ return exitcode
+
+ sys.exit(main())
diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py
index e2d70a6..eccc19d 100644
--- a/scripts/rqtl2/entry.py
+++ b/scripts/rqtl2/entry.py
@@ -11,13 +11,12 @@ from qc_app.check_connections import check_db, check_redis
from scripts.redis_logger import setup_redis_logger
-def build_main(cli_args: Callable[[], Namespace],
+def build_main(args: Namespace,
run_fn: Callable[[Connection, Namespace], int],
logger: Logger,
loglevel: str = "INFO") -> Callable[[],int]:
"""Build a function to be used as an entry-point for scripts."""
def main():
- args = cli_args()
check_db(args.databaseuri)
check_redis(args.redisuri)
if not args.rqtl2bundle.exists():
diff --git a/scripts/rqtl2/install_genotypes.py b/scripts/rqtl2/install_genotypes.py
index 77e7163..733ccf9 100644
--- a/scripts/rqtl2/install_genotypes.py
+++ b/scripts/rqtl2/install_genotypes.py
@@ -15,8 +15,8 @@ from r_qtl import r_qtl2 as rqtl2
from functional_tools import take
from scripts.rqtl2.entry import build_main
-from scripts.cli_parser import init_cli_parser
from scripts.rqtl2.cli_parser import add_common_arguments
+from scripts.cli_parser import init_cli_parser, add_global_data_arguments
stderr_handler = logging.StreamHandler(stream=sys.stderr)
logger = logging.getLogger("install_genotypes")
@@ -217,14 +217,14 @@ if __name__ == "__main__":
def cli_args():
"""Process command-line arguments for install_genotypes"""
- parser = add_common_arguments(init_cli_parser(
+ parser = add_common_arguments(add_global_data_arguments(init_cli_parser(
"install_genotypes",
- "Parse genotypes from R/qtl2 bundle into the database."))
+ "Parse genotypes from R/qtl2 bundle into the database.")))
return parser.parse_args()
main = build_main(
- cli_args,
+ cli_args(),
lambda dbconn, args: install_genotypes(dbconn,
args.speciesid,
args.populationid,
diff --git a/scripts/rqtl2/install_phenos.py b/scripts/rqtl2/install_phenos.py
index 5c25866..21b5f00 100644
--- a/scripts/rqtl2/install_phenos.py
+++ b/scripts/rqtl2/install_phenos.py
@@ -10,8 +10,8 @@ import MySQLdb as mdb
from MySQLdb.cursors import DictCursor
from scripts.rqtl2.entry import build_main
-from scripts.cli_parser import init_cli_parser
from scripts.rqtl2.cli_parser import add_common_arguments
+from scripts.cli_parser import init_cli_parser, add_global_data_arguments
from r_qtl import r_qtl2 as rqtl2
@@ -143,9 +143,9 @@ if __name__ == "__main__":
def cli_args():
"""Process command-line arguments for `install_phenos`"""
- parser = init_cli_parser(
+ parser = add_global_data_arguments(init_cli_parser(
"install_genotypes",
- "Parse genotypes from R/qtl2 bundle into the database.")
+ "Parse genotypes from R/qtl2 bundle into the database."))
parser.add_argument(
"platformid",
@@ -156,7 +156,7 @@ if __name__ == "__main__":
return parser.parse_args()
main = build_main(
- cli_args,
+ cli_args(),
lambda dbconn, args: install_pheno_files(dbconn,
args.speciesid,
args.platformid,