diff options
author | Frederick Muriuki Muriithi | 2022-08-04 08:37:22 +0300 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2022-08-04 08:37:22 +0300 |
commit | 7bd9116019afe4aed369c5dfe69496abc6867381 (patch) | |
tree | cc557a01a291522e493a289620574ee78bf9ea48 | |
parent | 0ee146650e841069045ec6262b8361d3c73fa093 (diff) | |
download | gn-uploader-7bd9116019afe4aed369c5dfe69496abc6867381.tar.gz |
Handle stderr output
* Fix bug where stderr was not being updated on redis appropriately
-rw-r--r-- | scripts/worker.py | 38 |
1 files changed, 22 insertions, 16 deletions
diff --git a/scripts/worker.py b/scripts/worker.py index fee4ec8..879442a 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -1,11 +1,12 @@ "Generic worker script that runs actual worker script" +import os import sys import shlex import argparse import traceback import subprocess from time import sleep -from tempfile import TemporaryFile +from tempfile import TemporaryDirectory from redis import Redis @@ -29,28 +30,33 @@ def parse_args(): def update_stdout_stderr(bytes_read, stream: str, rconn, job_id): "Update the stdout/stderr keys according to the value of `stream`." job = jobs.job(rconn, job_id) - contents = job.get(stream, b'') - rconn.hset( - name=job_id, - key=stream, - value=contents + bytes_read) + contents = job.get(stream, '') + new_contents = contents + bytes_read.decode("utf-8") + rconn.hset(name=job_id, key=stream, value=new_contents) def run_job(job, rconn): "Run the actual job." job_id = job["job_id"] try: - with TemporaryFile() as tmpfl: - with subprocess.Popen( - shlex.split(job["command"]), stdout=subprocess.PIPE, - stderr=subprocess.PIPE) as process: - while process.poll() is None: - update_stdout_stderr( - process.stdout.read1(), "stdout", rconn, job_id) - sleep(1) + with TemporaryDirectory() as tmpdir: + stderrpath = f"{tmpdir}/{job_id}.stderr" + with open(stderrpath, "w+b") as tmpfl: + with subprocess.Popen( + shlex.split(job["command"]), stdout=subprocess.PIPE, + stderr=tmpfl) as process: + while process.poll() is None: + update_stdout_stderr( + process.stdout.read1(), "stdout", rconn, job_id) + sleep(1) - update_stdout_stderr(tmpfl.read(), "stderr", rconn, job_id) + with open(stderrpath, "rb") as stderr: + update_stdout_stderr(stderr.read(), "stderr", rconn, job_id) + + os.remove(stderrpath) return process.poll() - except subprocess.CalledProcessError as cpe: # pylint: disable=[unused-variable] + except Exception as exc: + update_stdout_stderr( + traceback.format_exc().encode("utf-8"), "stderr", rconn, job_id) print(traceback.format_exc(), file=sys.stderr) sys.exit(4) |