aboutsummaryrefslogtreecommitdiff
path: root/qc_app
diff options
context:
space:
mode:
Diffstat (limited to 'qc_app')
-rw-r--r--qc_app/jobs.py19
-rw-r--r--qc_app/upload/rqtl2.py37
2 files changed, 38 insertions, 18 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index dc1f967..cf4e4ef 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -97,7 +97,8 @@ def launch_job(the_job: dict, redisurl: str, error_dir):
"w",
encoding="utf-8") as errorfile:
subprocess.Popen( # pylint: disable=[consider-using-with]
- [sys.executable, "-m", "scripts.worker", redisurl, job_id],
+ [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
+ job_id],
stderr=errorfile,
env={"PYTHONPATH": ":".join(sys.path)})
@@ -107,3 +108,19 @@ def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
"Retrieve the job"
thejob = rconn.hgetall(job_key(rprefix, jobid)) or raise_jobnotfound(jobid)
return thejob
+
+def update_status(
+ rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str):
+ """Update status of job in redis."""
+ rconn.hset(name=job_key(rprefix, jobid), key="status", value=status)
+
+def update_stdout_stderr(rconn: Redis,
+ rprefix: str,
+ jobid: Union[str, UUID],
+ bytes_read: bytes,
+ stream: str):
+ "Update the stdout/stderr keys according to the value of `stream`."
+ thejob = job(rconn, rprefix, jobid)
+ contents = thejob.get(stream, '')
+ new_contents = contents + bytes_read.decode("utf-8")
+ rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)
diff --git a/qc_app/upload/rqtl2.py b/qc_app/upload/rqtl2.py
index 7805114..eea49e1 100644
--- a/qc_app/upload/rqtl2.py
+++ b/qc_app/upload/rqtl2.py
@@ -552,11 +552,13 @@ def confirm_bundle_details(species_id: int, population_id: int):
_job = jobs.launch_job(
jobs.initialise_job(
rconn,
+ jobs.jobsnamespace(),
jobid,
[
sys.executable, "-m", "scripts.process_rqtl2_bundle",
- app.config["SQL_URI"], app.config["REDIS_URL"], jobid,
- "--redisexpiry", str(redis_ttl_seconds)],
+ app.config["SQL_URI"], app.config["REDIS_URL"],
+ jobs.jobsnamespace(), jobid, "--redisexpiry",
+ str(redis_ttl_seconds)],
"R/qtl2 Bundle Upload",
redis_ttl_seconds,
{
@@ -588,21 +590,22 @@ def confirm_bundle_details(species_id: int, population_id: int):
def rqtl2_processing_status(jobid: UUID):
"""Retrieve the status of the job processing the uploaded R/qtl2 bundle."""
with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
- thejob = jobs.job(rconn, jobid)
- if not bool(thejob):
- return render_template("rqtl2/no-such-job.html", jobid=jobid)
+ try:
+ thejob = jobs.job(rconn, jobs.jobsnamespace(), jobid)
- messagelistname = thejob.get("log-messagelist")
- logmessages = (rconn.lrange(messagelistname, 0, -1)
- if bool(messagelistname) else [])
+ messagelistname = thejob.get("log-messagelist")
+ logmessages = (rconn.lrange(messagelistname, 0, -1)
+ if bool(messagelistname) else [])
- if thejob["status"] == "error":
- return render_template(
- "rqtl2/rqtl2-job-error.html", job=thejob, messages=logmessages)
- if thejob["status"] == "success":
- return render_template("rqtl2/rqtl2-job-results.html",
- job=thejob,
- messages=logmessages)
+ if thejob["status"] == "error":
+ return render_template(
+ "rqtl2/rqtl2-job-error.html", job=thejob, messages=logmessages)
+ if thejob["status"] == "success":
+ return render_template("rqtl2/rqtl2-job-results.html",
+ job=thejob,
+ messages=logmessages)
- return render_template(
- "rqtl2/rqtl2-job-status.html", job=thejob, messages=logmessages)
+ return render_template(
+ "rqtl2/rqtl2-job-status.html", job=thejob, messages=logmessages)
+ except jobs.JobNotFound as _exc:
+ return render_template("rqtl2/no-such-job.html", jobid=jobid)