diff options
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r-- | qc_app/jobs.py | 36 |
1 files changed, 27 insertions, 9 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py index f5e5173..dc1f967 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -3,15 +3,32 @@ import os import sys import shlex import subprocess -from typing import Union from uuid import UUID, uuid4 from datetime import timedelta +from typing import Union, Optional from redis import Redis +from flask import current_app as app + +JOBS_PREFIX = "JOBS" class JobNotFound(Exception): """Raised if we try to retrieve a non-existent job.""" +def jobsnamespace(): + """ + Return the jobs namespace prefix. It depends on app configuration. + + Calling this function outside of an application context will cause an + exception to be raised. It is mostly a convenience utility to use within the + application. + """ + return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}" + +def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str: + """Build the key by appending it to the namespace prefix.""" + return f"{namespaceprefix}:{jobid}" + def raise_jobnotfound(jobid: Union[str,UUID]): """Utility to raise a `NoSuchJobError`""" raise JobNotFound(f"Could not retrieve job '{jobid}'.") @@ -21,15 +38,16 @@ def error_filename(job_id, error_dir): return f"{error_dir}/job_{job_id}.error" def initialise_job(# pylint: disable=[too-many-arguments] - redis_conn: Redis, job_id: str, command: list, job_type: str, - ttl_seconds: int, extra_meta: dict) -> dict: + rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str, + ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict: "Initialise a job 'object' and put in on redis" the_job = { - "job_id": job_id, "command": shlex.join(command), "status": "pending", - "percent": 0, "job-type": job_type, **extra_meta + "job_id": jobid, "command": shlex.join(command), "status": "pending", + "percent": 0, "job-type": job_type, **(extra_meta or {}) } - redis_conn.hset(name=the_job["job_id"], mapping=the_job) - redis_conn.expire(name=the_job["job_id"], time=timedelta(seconds=ttl_seconds)) + rconn.hset(job_key(rprefix, jobid), mapping=the_job) + rconn.expire( + name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds)) return the_job def build_file_verification_job(#pylint: disable=[too-many-arguments] @@ -85,7 +103,7 @@ def launch_job(the_job: dict, redisurl: str, error_dir): return the_job -def job(redis_conn, job_id: Union[str,UUID]): +def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]): "Retrieve the job" - thejob = redis_conn.hgetall(str(job_id)) or raise_jobnotfound(job_id) + thejob = rconn.hgetall(job_key(rprefix, jobid)) or raise_jobnotfound(jobid) return thejob |