From 7bd9116019afe4aed369c5dfe69496abc6867381 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 4 Aug 2022 08:37:22 +0300 Subject: Handle stderr output * Fix bug where stderr was not being updated on redis appropriately --- scripts/worker.py | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) (limited to 'scripts/worker.py') diff --git a/scripts/worker.py b/scripts/worker.py index fee4ec8..879442a 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -1,11 +1,12 @@ "Generic worker script that runs actual worker script" +import os import sys import shlex import argparse import traceback import subprocess from time import sleep -from tempfile import TemporaryFile +from tempfile import TemporaryDirectory from redis import Redis @@ -29,28 +30,33 @@ def parse_args(): def update_stdout_stderr(bytes_read, stream: str, rconn, job_id): "Update the stdout/stderr keys according to the value of `stream`." job = jobs.job(rconn, job_id) - contents = job.get(stream, b'') - rconn.hset( - name=job_id, - key=stream, - value=contents + bytes_read) + contents = job.get(stream, '') + new_contents = contents + bytes_read.decode("utf-8") + rconn.hset(name=job_id, key=stream, value=new_contents) def run_job(job, rconn): "Run the actual job." job_id = job["job_id"] try: - with TemporaryFile() as tmpfl: - with subprocess.Popen( - shlex.split(job["command"]), stdout=subprocess.PIPE, - stderr=subprocess.PIPE) as process: - while process.poll() is None: - update_stdout_stderr( - process.stdout.read1(), "stdout", rconn, job_id) - sleep(1) + with TemporaryDirectory() as tmpdir: + stderrpath = f"{tmpdir}/{job_id}.stderr" + with open(stderrpath, "w+b") as tmpfl: + with subprocess.Popen( + shlex.split(job["command"]), stdout=subprocess.PIPE, + stderr=tmpfl) as process: + while process.poll() is None: + update_stdout_stderr( + process.stdout.read1(), "stdout", rconn, job_id) + sleep(1) - update_stdout_stderr(tmpfl.read(), "stderr", rconn, job_id) + with open(stderrpath, "rb") as stderr: + update_stdout_stderr(stderr.read(), "stderr", rconn, job_id) + + os.remove(stderrpath) return process.poll() - except subprocess.CalledProcessError as cpe: # pylint: disable=[unused-variable] + except Exception as exc: + update_stdout_stderr( + traceback.format_exc().encode("utf-8"), "stderr", rconn, job_id) print(traceback.format_exc(), file=sys.stderr) sys.exit(4) -- cgit v1.2.3