aboutsummaryrefslogtreecommitdiff
path: root/scripts/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/worker.py')
-rw-r--r--scripts/worker.py49
1 files changed, 23 insertions, 26 deletions
diff --git a/scripts/worker.py b/scripts/worker.py
index 90d83c4..17d6e06 100644
--- a/scripts/worker.py
+++ b/scripts/worker.py
@@ -20,6 +20,8 @@ def parse_args():
"Generic worker to launch and manage specific worker scripts"))
parser.add_argument(
"redisurl", default="redis:///", help="URL to the redis server")
+ parser.add_argument(
+ "redisprefix", type=str, help="The prefix before the job ID.")
parser.add_argument("job_id", help="The id of the job being processed")
args = parser.parse_args()
@@ -27,58 +29,53 @@ def parse_args():
return args
-def update_stdout_stderr(rconn, job_id, bytes_read, stream: str):
- "Update the stdout/stderr keys according to the value of `stream`."
- job = jobs.job(rconn, job_id)
- contents = job.get(stream, '')
- new_contents = contents + bytes_read.decode("utf-8")
- rconn.hset(name=job_id, key=stream, value=new_contents)
-
-def update_status(rconn, job_id, status):
- """Update status of job in redis."""
- rconn.hset(name=job_id, key="status", value=status)
-
-def run_job(job, rconn):
+def run_job(rconn: Redis, job: dict, redisprefix: str):
"Run the actual job."
- job_id = job["job_id"]
+ jobid = job["job_id"]
try:
with TemporaryDirectory() as tmpdir:
- stderrpath = f"{tmpdir}/{job_id}.stderr"
+ stderrpath = f"{tmpdir}/{jobid}.stderr"
with open(stderrpath, "w+b") as tmpfl:
with subprocess.Popen(
shlex.split(job["command"]), stdout=subprocess.PIPE,
stderr=tmpfl) as process:
while process.poll() is None:
- update_status(rconn, job_id, "running")
- update_stdout_stderr(
- rconn, job_id, process.stdout.read1(), "stdout")
+ jobs.update_status(rconn, redisprefix, jobid, "running")
+ jobs.update_stdout_stderr(
+ rconn, redisprefix, jobid, process.stdout.read1(), "stdout")
sleep(1)
- update_status(
+ jobs.update_status(
rconn,
- job_id,
+ redisprefix,
+ jobid,
("error" if process.returncode != 0 else "success"))
with open(stderrpath, "rb") as stderr:
stderr_content = stderr.read()
- update_stdout_stderr(rconn, job_id, stderr_content, "stderr")
+ jobs.update_stdout_stderr(rconn, redisprefix, jobid, stderr_content, "stderr")
os.remove(stderrpath)
return process.poll()
except Exception as exc:# pylint: disable=[broad-except,unused-variable]
- update_status(rconn, job_id, "error")
- update_stdout_stderr(
- rconn, job_id, traceback.format_exc().encode("utf-8"), "stderr")
+ jobs.update_status(rconn, redisprefix, jobid, "error")
+ jobs.update_stdout_stderr(
+ rconn, redisprefix, jobid, traceback.format_exc().encode("utf-8"), "stderr")
print(traceback.format_exc(), file=sys.stderr)
- sys.exit(4)
+ return 4
def main():
"Entry point function"
args = parse_args()
with Redis.from_url(args.redisurl, decode_responses=True) as rconn:
- job = jobs.job(rconn, args.job_id)
+ job = jobs.job(rconn, args.redisprefix, args.job_id)
if job:
- return run_job(job, rconn)
+ return run_job(rconn, job, args.redisprefix)
+ jobs.update_status(rconn, args.job_id, "error")
+ fqjobid = jobs.job_key(redisprefix, args.jobid)
+ rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.")
+ redis_conn.expire(name=job_key(args.redisprefix, args.job_id),
+ time=timedelta(seconds=(2 * 60 * 60)))
print(f"No such job. '{args.job_id}'.", file=sys.stderr)
return 2
return 3