aboutsummaryrefslogtreecommitdiff
path: root/qc_app/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'qc_app/jobs.py')
-rw-r--r--qc_app/jobs.py19
1 files changed, 18 insertions, 1 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index dc1f967..cf4e4ef 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -97,7 +97,8 @@ def launch_job(the_job: dict, redisurl: str, error_dir):
"w",
encoding="utf-8") as errorfile:
subprocess.Popen( # pylint: disable=[consider-using-with]
- [sys.executable, "-m", "scripts.worker", redisurl, job_id],
+ [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
+ job_id],
stderr=errorfile,
env={"PYTHONPATH": ":".join(sys.path)})
@@ -107,3 +108,19 @@ def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
"Retrieve the job"
thejob = rconn.hgetall(job_key(rprefix, jobid)) or raise_jobnotfound(jobid)
return thejob
+
+def update_status(
+ rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str):
+ """Update status of job in redis."""
+ rconn.hset(name=job_key(rprefix, jobid), key="status", value=status)
+
+def update_stdout_stderr(rconn: Redis,
+ rprefix: str,
+ jobid: Union[str, UUID],
+ bytes_read: bytes,
+ stream: str):
+ "Update the stdout/stderr keys according to the value of `stream`."
+ thejob = job(rconn, rprefix, jobid)
+ contents = thejob.get(stream, '')
+ new_contents = contents + bytes_read.decode("utf-8")
+ rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)