aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-08-04 08:37:22 +0300
committerFrederick Muriuki Muriithi2022-08-04 08:37:22 +0300
commit7bd9116019afe4aed369c5dfe69496abc6867381 (patch)
treecc557a01a291522e493a289620574ee78bf9ea48
parent0ee146650e841069045ec6262b8361d3c73fa093 (diff)
downloadgn-uploader-7bd9116019afe4aed369c5dfe69496abc6867381.tar.gz
Handle stderr output
* Fix bug where stderr was not being updated on redis appropriately
-rw-r--r--scripts/worker.py38
1 files changed, 22 insertions, 16 deletions
diff --git a/scripts/worker.py b/scripts/worker.py
index fee4ec8..879442a 100644
--- a/scripts/worker.py
+++ b/scripts/worker.py
@@ -1,11 +1,12 @@
"Generic worker script that runs actual worker script"
+import os
import sys
import shlex
import argparse
import traceback
import subprocess
from time import sleep
-from tempfile import TemporaryFile
+from tempfile import TemporaryDirectory
from redis import Redis
@@ -29,28 +30,33 @@ def parse_args():
def update_stdout_stderr(bytes_read, stream: str, rconn, job_id):
"Update the stdout/stderr keys according to the value of `stream`."
job = jobs.job(rconn, job_id)
- contents = job.get(stream, b'')
- rconn.hset(
- name=job_id,
- key=stream,
- value=contents + bytes_read)
+ contents = job.get(stream, '')
+ new_contents = contents + bytes_read.decode("utf-8")
+ rconn.hset(name=job_id, key=stream, value=new_contents)
def run_job(job, rconn):
"Run the actual job."
job_id = job["job_id"]
try:
- with TemporaryFile() as tmpfl:
- with subprocess.Popen(
- shlex.split(job["command"]), stdout=subprocess.PIPE,
- stderr=subprocess.PIPE) as process:
- while process.poll() is None:
- update_stdout_stderr(
- process.stdout.read1(), "stdout", rconn, job_id)
- sleep(1)
+ with TemporaryDirectory() as tmpdir:
+ stderrpath = f"{tmpdir}/{job_id}.stderr"
+ with open(stderrpath, "w+b") as tmpfl:
+ with subprocess.Popen(
+ shlex.split(job["command"]), stdout=subprocess.PIPE,
+ stderr=tmpfl) as process:
+ while process.poll() is None:
+ update_stdout_stderr(
+ process.stdout.read1(), "stdout", rconn, job_id)
+ sleep(1)
- update_stdout_stderr(tmpfl.read(), "stderr", rconn, job_id)
+ with open(stderrpath, "rb") as stderr:
+ update_stdout_stderr(stderr.read(), "stderr", rconn, job_id)
+
+ os.remove(stderrpath)
return process.poll()
- except subprocess.CalledProcessError as cpe: # pylint: disable=[unused-variable]
+ except Exception as exc:
+ update_stdout_stderr(
+ traceback.format_exc().encode("utf-8"), "stderr", rconn, job_id)
print(traceback.format_exc(), file=sys.stderr)
sys.exit(4)