about summary refs log tree commit diff
path: root/scripts/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/worker.py')
-rw-r--r--scripts/worker.py49
1 files changed, 23 insertions, 26 deletions
diff --git a/scripts/worker.py b/scripts/worker.py
index 90d83c4..17d6e06 100644
--- a/scripts/worker.py
+++ b/scripts/worker.py
@@ -20,6 +20,8 @@ def parse_args():
             "Generic worker to launch and manage specific worker scripts"))
     parser.add_argument(
         "redisurl", default="redis:///", help="URL to the redis server")
+    parser.add_argument(
+        "redisprefix", type=str, help="The prefix before the job ID.")
     parser.add_argument("job_id", help="The id of the job being processed")
 
     args = parser.parse_args()
@@ -27,58 +29,53 @@ def parse_args():
 
     return args
 
-def update_stdout_stderr(rconn, job_id, bytes_read, stream: str):
-    "Update the stdout/stderr keys according to the value of `stream`."
-    job = jobs.job(rconn, job_id)
-    contents = job.get(stream, '')
-    new_contents = contents + bytes_read.decode("utf-8")
-    rconn.hset(name=job_id, key=stream, value=new_contents)
-
-def update_status(rconn, job_id, status):
-    """Update status of job in redis."""
-    rconn.hset(name=job_id, key="status", value=status)
-
-def run_job(job, rconn):
+def run_job(rconn: Redis, job: dict, redisprefix: str):
     "Run the actual job."
-    job_id = job["job_id"]
+    jobid = job["job_id"]
     try:
         with TemporaryDirectory() as tmpdir:
-            stderrpath = f"{tmpdir}/{job_id}.stderr"
+            stderrpath = f"{tmpdir}/{jobid}.stderr"
             with open(stderrpath, "w+b") as tmpfl:
                 with subprocess.Popen(
                         shlex.split(job["command"]), stdout=subprocess.PIPE,
                         stderr=tmpfl) as process:
                     while process.poll() is None:
-                        update_status(rconn, job_id, "running")
-                        update_stdout_stderr(
-                            rconn, job_id, process.stdout.read1(), "stdout")
+                        jobs.update_status(rconn, redisprefix, jobid, "running")
+                        jobs.update_stdout_stderr(
+                            rconn, redisprefix, jobid, process.stdout.read1(), "stdout")
                         sleep(1)
 
-                    update_status(
+                    jobs.update_status(
                         rconn,
-                        job_id,
+                        redisprefix,
+                        jobid,
                         ("error" if process.returncode != 0 else "success"))
 
             with open(stderrpath, "rb") as stderr:
                 stderr_content = stderr.read()
-                update_stdout_stderr(rconn, job_id, stderr_content, "stderr")
+                jobs.update_stdout_stderr(rconn, redisprefix, jobid, stderr_content, "stderr")
 
             os.remove(stderrpath)
         return process.poll()
     except Exception as exc:# pylint: disable=[broad-except,unused-variable]
-        update_status(rconn, job_id, "error")
-        update_stdout_stderr(
-            rconn, job_id, traceback.format_exc().encode("utf-8"), "stderr")
+        jobs.update_status(rconn, redisprefix, jobid, "error")
+        jobs.update_stdout_stderr(
+            rconn, redisprefix, jobid, traceback.format_exc().encode("utf-8"), "stderr")
         print(traceback.format_exc(), file=sys.stderr)
-        sys.exit(4)
+        return 4
 
 def main():
     "Entry point function"
     args = parse_args()
     with Redis.from_url(args.redisurl, decode_responses=True) as rconn:
-        job = jobs.job(rconn, args.job_id)
+        job = jobs.job(rconn, args.redisprefix, args.job_id)
         if job:
-            return run_job(job, rconn)
+            return run_job(rconn, job, args.redisprefix)
+        jobs.update_status(rconn, args.job_id, "error")
+        fqjobid = jobs.job_key(redisprefix, args.jobid)
+        rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.")
+        redis_conn.expire(name=job_key(args.redisprefix, args.job_id),
+                          time=timedelta(seconds=(2 * 60 * 60)))
         print(f"No such job. '{args.job_id}'.", file=sys.stderr)
         return 2
     return 3