about summary refs log tree commit diff
path: root/scripts
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-07-15 06:58:37 +0300
committerFrederick Muriuki Muriithi2022-07-19 05:08:17 +0300
commite59108d3cea2e61f1a23c22c20edf2d3974e7a10 (patch)
treecd8940531044a64958020ac2994f3cdb18e73d8c /scripts
parent772872452dc98b9919a861c017347e88a6536063 (diff)
downloadgn-uploader-e59108d3cea2e61f1a23c22c20edf2d3974e7a10.tar.gz
Add generic worker script
Add a generic worker script, whose purpose is:
- to launch the specific worker script
- to capture both stdin and stderr streams and put them on redis

In this way, we can launch redis-aware and redis-unaware workers and
capture their outputs or errors for later processing.
Diffstat (limited to 'scripts')
-rw-r--r--scripts/worker.py72
1 files changed, 72 insertions, 0 deletions
diff --git a/scripts/worker.py b/scripts/worker.py
new file mode 100644
index 0000000..4077ad1
--- /dev/null
+++ b/scripts/worker.py
@@ -0,0 +1,72 @@
+"Generic worker script that runs actual worker script"
+import sys
+import shlex
+import argparse
+import traceback
+import subprocess
+from time import sleep
+from tempfile import TemporaryFile
+
+from redis import Redis
+
+from qc_app import jobs
+
+def parse_args():
+    "Parse the command-line arguments"
+    parser = argparse.ArgumentParser(
+        prog="worker", description = (
+            "Generic worker to launch and manage specific worker scripts"))
+    parser.add_argument(
+        "redisurl", default="redis:///", help="URL to the redis server")
+    parser.add_argument("job_id", help="The id of the job being processed")
+
+    args = parser.parse_args()
+    try:
+        conn = Redis.from_url(args.redisurl) # pylint: disable=[unused-variable]
+    except ConnectionError as conn_err: # pylint: disable=[unused-variable]
+        print(traceback.format_exc(), file=sys.stderr)
+        sys.exit(1)
+
+    return args
+
+def update_stdout_stderr(bytes_read, stream: str, rconn, job_id):
+    "Update the stdout/stderr keys according to the value of `stream`."
+    job = jobs.job(rconn, job_id)
+    contents = job.get(stream, b'')
+    rconn.hset(
+        name=job_id,
+        key=stream,
+        value=contents + bytes_read)
+
+def run_job(job, rconn):
+    "Run the actual job."
+    job_id = job["job_id"]
+    try:
+        with TemporaryFile() as tmpfl:
+            with subprocess.Popen(
+                    shlex.split(job["command"]), stdout=subprocess.PIPE,
+                    stderr=subprocess.PIPE) as process:
+                while process.poll() is None:
+                    update_stdout_stderr(
+                        process.stdout.read1(), "stdout", rconn, job_id)
+                    sleep(1)
+
+            update_stdout_stderr(tmpfl.read(), "stderr", rconn, job_id)
+        return process.poll()
+    except subprocess.CalledProcessError as cpe: # pylint: disable=[unused-variable]
+        print(traceback.format_exc(), file=sys.stderr)
+        sys.exit(4)
+
+def main():
+    "Entry point function"
+    args = parse_args()
+    with Redis.from_url(args.redisurl, decode_responses=True) as rconn:
+        job = jobs.job(rconn, args.job_id)
+        if job:
+            return run_job(job, rconn)
+        print(f"No such job. '{args.job_id}'.", file=sys.stderr)
+        return 2
+    return 3
+
+if __name__ == "__main__":
+    sys.exit(main())