diff options
Diffstat (limited to 'scripts/worker.py')
-rw-r--r-- | scripts/worker.py | 49 |
1 files changed, 23 insertions, 26 deletions
diff --git a/scripts/worker.py b/scripts/worker.py index 90d83c4..17d6e06 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -20,6 +20,8 @@ def parse_args(): "Generic worker to launch and manage specific worker scripts")) parser.add_argument( "redisurl", default="redis:///", help="URL to the redis server") + parser.add_argument( + "redisprefix", type=str, help="The prefix before the job ID.") parser.add_argument("job_id", help="The id of the job being processed") args = parser.parse_args() @@ -27,58 +29,53 @@ def parse_args(): return args -def update_stdout_stderr(rconn, job_id, bytes_read, stream: str): - "Update the stdout/stderr keys according to the value of `stream`." - job = jobs.job(rconn, job_id) - contents = job.get(stream, '') - new_contents = contents + bytes_read.decode("utf-8") - rconn.hset(name=job_id, key=stream, value=new_contents) - -def update_status(rconn, job_id, status): - """Update status of job in redis.""" - rconn.hset(name=job_id, key="status", value=status) - -def run_job(job, rconn): +def run_job(rconn: Redis, job: dict, redisprefix: str): "Run the actual job." - job_id = job["job_id"] + jobid = job["job_id"] try: with TemporaryDirectory() as tmpdir: - stderrpath = f"{tmpdir}/{job_id}.stderr" + stderrpath = f"{tmpdir}/{jobid}.stderr" with open(stderrpath, "w+b") as tmpfl: with subprocess.Popen( shlex.split(job["command"]), stdout=subprocess.PIPE, stderr=tmpfl) as process: while process.poll() is None: - update_status(rconn, job_id, "running") - update_stdout_stderr( - rconn, job_id, process.stdout.read1(), "stdout") + jobs.update_status(rconn, redisprefix, jobid, "running") + jobs.update_stdout_stderr( + rconn, redisprefix, jobid, process.stdout.read1(), "stdout") sleep(1) - update_status( + jobs.update_status( rconn, - job_id, + redisprefix, + jobid, ("error" if process.returncode != 0 else "success")) with open(stderrpath, "rb") as stderr: stderr_content = stderr.read() - update_stdout_stderr(rconn, job_id, stderr_content, "stderr") + jobs.update_stdout_stderr(rconn, redisprefix, jobid, stderr_content, "stderr") os.remove(stderrpath) return process.poll() except Exception as exc:# pylint: disable=[broad-except,unused-variable] - update_status(rconn, job_id, "error") - update_stdout_stderr( - rconn, job_id, traceback.format_exc().encode("utf-8"), "stderr") + jobs.update_status(rconn, redisprefix, jobid, "error") + jobs.update_stdout_stderr( + rconn, redisprefix, jobid, traceback.format_exc().encode("utf-8"), "stderr") print(traceback.format_exc(), file=sys.stderr) - sys.exit(4) + return 4 def main(): "Entry point function" args = parse_args() with Redis.from_url(args.redisurl, decode_responses=True) as rconn: - job = jobs.job(rconn, args.job_id) + job = jobs.job(rconn, args.redisprefix, args.job_id) if job: - return run_job(job, rconn) + return run_job(rconn, job, args.redisprefix) + jobs.update_status(rconn, args.job_id, "error") + fqjobid = jobs.job_key(redisprefix, args.jobid) + rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.") + redis_conn.expire(name=job_key(args.redisprefix, args.job_id), + time=timedelta(seconds=(2 * 60 * 60))) print(f"No such job. '{args.job_id}'.", file=sys.stderr) return 2 return 3 |