aboutsummaryrefslogtreecommitdiff
path: root/qc_app/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r--qc_app/jobs.py36
1 files changed, 27 insertions, 9 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index f5e5173..dc1f967 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -3,15 +3,32 @@ import os
import sys
import shlex
import subprocess
-from typing import Union
from uuid import UUID, uuid4
from datetime import timedelta
+from typing import Union, Optional
from redis import Redis
+from flask import current_app as app
+
+JOBS_PREFIX = "JOBS"
class JobNotFound(Exception):
"""Raised if we try to retrieve a non-existent job."""
+def jobsnamespace():
+ """
+ Return the jobs namespace prefix. It depends on app configuration.
+
+ Calling this function outside of an application context will cause an
+ exception to be raised. It is mostly a convenience utility to use within the
+ application.
+ """
+ return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}"
+
+def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str:
+ """Build the key by appending it to the namespace prefix."""
+ return f"{namespaceprefix}:{jobid}"
+
def raise_jobnotfound(jobid: Union[str,UUID]):
"""Utility to raise a `NoSuchJobError`"""
raise JobNotFound(f"Could not retrieve job '{jobid}'.")
@@ -21,15 +38,16 @@ def error_filename(job_id, error_dir):
return f"{error_dir}/job_{job_id}.error"
def initialise_job(# pylint: disable=[too-many-arguments]
- redis_conn: Redis, job_id: str, command: list, job_type: str,
- ttl_seconds: int, extra_meta: dict) -> dict:
+ rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
+ ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
"Initialise a job 'object' and put in on redis"
the_job = {
- "job_id": job_id, "command": shlex.join(command), "status": "pending",
- "percent": 0, "job-type": job_type, **extra_meta
+ "job_id": jobid, "command": shlex.join(command), "status": "pending",
+ "percent": 0, "job-type": job_type, **(extra_meta or {})
}
- redis_conn.hset(name=the_job["job_id"], mapping=the_job)
- redis_conn.expire(name=the_job["job_id"], time=timedelta(seconds=ttl_seconds))
+ rconn.hset(job_key(rprefix, jobid), mapping=the_job)
+ rconn.expire(
+ name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds))
return the_job
def build_file_verification_job(#pylint: disable=[too-many-arguments]
@@ -85,7 +103,7 @@ def launch_job(the_job: dict, redisurl: str, error_dir):
return the_job
-def job(redis_conn, job_id: Union[str,UUID]):
+def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
"Retrieve the job"
- thejob = redis_conn.hgetall(str(job_id)) or raise_jobnotfound(job_id)
+ thejob = rconn.hgetall(job_key(rprefix, jobid)) or raise_jobnotfound(jobid)
return thejob