"Generic worker script that runs actual worker script" import os import sys import shlex import argparse import traceback import subprocess from time import sleep from tempfile import TemporaryDirectory from redis import Redis from qc_app import jobs from qc_app.check_connections import check_redis def parse_args(): "Parse the command-line arguments" parser = argparse.ArgumentParser( prog="worker", description = ( "Generic worker to launch and manage specific worker scripts")) parser.add_argument( "redisurl", default="redis:///", help="URL to the redis server") parser.add_argument("job_id", help="The id of the job being processed") args = parser.parse_args() check_redis(args.redisurl) return args def update_stdout_stderr(bytes_read, stream: str, rconn, job_id): "Update the stdout/stderr keys according to the value of `stream`." job = jobs.job(rconn, job_id) contents = job.get(stream, '') new_contents = contents + bytes_read.decode("utf-8") rconn.hset(name=job_id, key=stream, value=new_contents) def run_job(job, rconn): "Run the actual job." job_id = job["job_id"] try: with TemporaryDirectory() as tmpdir: stderrpath = f"{tmpdir}/{job_id}.stderr" with open(stderrpath, "w+b") as tmpfl: with subprocess.Popen( shlex.split(job["command"]), stdout=subprocess.PIPE, stderr=tmpfl) as process: while process.poll() is None: update_stdout_stderr( process.stdout.read1(), "stdout", rconn, job_id) sleep(1) with open(stderrpath, "rb") as stderr: update_stdout_stderr(stderr.read(), "stderr", rconn, job_id) os.remove(stderrpath) return process.poll() except Exception as exc:# pylint: disable=[broad-except,unused-variable] update_stdout_stderr( traceback.format_exc().encode("utf-8"), "stderr", rconn, job_id) print(traceback.format_exc(), file=sys.stderr) sys.exit(4) def main(): "Entry point function" args = parse_args() with Redis.from_url(args.redisurl, decode_responses=True) as rconn: job = jobs.job(rconn, args.job_id) if job: return run_job(job, rconn) print(f"No such job. '{args.job_id}'.", file=sys.stderr) return 2 return 3 if __name__ == "__main__": sys.exit(main())