aboutsummaryrefslogtreecommitdiff
path: root/qc_app/jobs.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-01-24 12:29:10 +0300
committerFrederick Muriuki Muriithi2024-01-24 12:29:10 +0300
commit07deef46a3f3ba53cc632a9381fb25c55e1017b1 (patch)
treea2f4fd1c9f9f69ed774ceff7d0cc5549f630bf9e /qc_app/jobs.py
parent96c600723726c3391532d86d17183bea960ece57 (diff)
downloadgn-uploader-07deef46a3f3ba53cc632a9381fb25c55e1017b1.tar.gz
Checks: Update code and tests to ensure all checks pass.
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r--qc_app/jobs.py19
1 files changed, 10 insertions, 9 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index 1491015..21889da 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -29,20 +29,20 @@ def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str:
"""Build the key by appending it to the namespace prefix."""
return f"{namespaceprefix}:{jobid}"
-def raise_jobnotfound(jobid: Union[str,UUID]):
+def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]):
"""Utility to raise a `NoSuchJobError`"""
- raise JobNotFound(f"Could not retrieve job '{jobid}'.")
+ raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.")
-def error_filename(job_id, error_dir):
+def error_filename(jobid, error_dir):
"Compute the path of the file where errors will be dumped."
- return f"{error_dir}/job_{job_id}.error"
+ return f"{error_dir}/job_{jobid}.error"
def initialise_job(# pylint: disable=[too-many-arguments]
rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
"Initialise a job 'object' and put in on redis"
the_job = {
- "job_id": jobid, "command": shlex.join(command), "status": "pending",
+ "jobid": jobid, "command": shlex.join(command), "status": "pending",
"percent": 0, "job-type": job_type, **(extra_meta or {})
}
rconn.hset(job_key(rprefix, jobid), mapping=the_job)
@@ -95,13 +95,13 @@ def launch_job(the_job: dict, redisurl: str, error_dir):
if not os.path.exists(error_dir):
os.mkdir(error_dir)
- job_id = the_job["job_id"]
- with open(error_filename(job_id, error_dir),
+ jobid = the_job["jobid"]
+ with open(error_filename(jobid, error_dir),
"w",
encoding="utf-8") as errorfile:
subprocess.Popen( # pylint: disable=[consider-using-with]
[sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
- job_id],
+ jobid],
stderr=errorfile,
env={"PYTHONPATH": ":".join(sys.path)})
@@ -109,7 +109,8 @@ def launch_job(the_job: dict, redisurl: str, error_dir):
def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
"Retrieve the job"
- thejob = rconn.hgetall(job_key(rprefix, jobid)) or raise_jobnotfound(jobid)
+ thejob = (rconn.hgetall(job_key(rprefix, jobid)) or
+ raise_jobnotfound(rprefix, jobid))
return thejob
def update_status(