about summary refs log tree commit diff
path: root/scripts
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-08-04 08:37:22 +0300
committerFrederick Muriuki Muriithi2022-08-04 08:37:22 +0300
commit7bd9116019afe4aed369c5dfe69496abc6867381 (patch)
treecc557a01a291522e493a289620574ee78bf9ea48 /scripts
parent0ee146650e841069045ec6262b8361d3c73fa093 (diff)
downloadgn-uploader-7bd9116019afe4aed369c5dfe69496abc6867381.tar.gz
Handle stderr output
* Fix bug where stderr was not being updated on redis appropriately
Diffstat (limited to 'scripts')
-rw-r--r--scripts/worker.py38
1 files changed, 22 insertions, 16 deletions
diff --git a/scripts/worker.py b/scripts/worker.py
index fee4ec8..879442a 100644
--- a/scripts/worker.py
+++ b/scripts/worker.py
@@ -1,11 +1,12 @@
 "Generic worker script that runs actual worker script"
+import os
 import sys
 import shlex
 import argparse
 import traceback
 import subprocess
 from time import sleep
-from tempfile import TemporaryFile
+from tempfile import TemporaryDirectory
 
 from redis import Redis
 
@@ -29,28 +30,33 @@ def parse_args():
 def update_stdout_stderr(bytes_read, stream: str, rconn, job_id):
     "Update the stdout/stderr keys according to the value of `stream`."
     job = jobs.job(rconn, job_id)
-    contents = job.get(stream, b'')
-    rconn.hset(
-        name=job_id,
-        key=stream,
-        value=contents + bytes_read)
+    contents = job.get(stream, '')
+    new_contents = contents + bytes_read.decode("utf-8")
+    rconn.hset(name=job_id, key=stream, value=new_contents)
 
 def run_job(job, rconn):
     "Run the actual job."
     job_id = job["job_id"]
     try:
-        with TemporaryFile() as tmpfl:
-            with subprocess.Popen(
-                    shlex.split(job["command"]), stdout=subprocess.PIPE,
-                    stderr=subprocess.PIPE) as process:
-                while process.poll() is None:
-                    update_stdout_stderr(
-                        process.stdout.read1(), "stdout", rconn, job_id)
-                    sleep(1)
+        with TemporaryDirectory() as tmpdir:
+            stderrpath = f"{tmpdir}/{job_id}.stderr"
+            with open(stderrpath, "w+b") as tmpfl:
+                with subprocess.Popen(
+                        shlex.split(job["command"]), stdout=subprocess.PIPE,
+                        stderr=tmpfl) as process:
+                    while process.poll() is None:
+                        update_stdout_stderr(
+                            process.stdout.read1(), "stdout", rconn, job_id)
+                        sleep(1)
 
-            update_stdout_stderr(tmpfl.read(), "stderr", rconn, job_id)
+            with open(stderrpath, "rb") as stderr:
+                update_stdout_stderr(stderr.read(), "stderr", rconn, job_id)
+
+            os.remove(stderrpath)
         return process.poll()
-    except subprocess.CalledProcessError as cpe: # pylint: disable=[unused-variable]
+    except Exception as exc:
+        update_stdout_stderr(
+            traceback.format_exc().encode("utf-8"), "stderr", rconn, job_id)
         print(traceback.format_exc(), file=sys.stderr)
         sys.exit(4)