aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-07-15 06:58:37 +0300
committerFrederick Muriuki Muriithi2022-07-19 05:08:17 +0300
commite59108d3cea2e61f1a23c22c20edf2d3974e7a10 (patch)
treecd8940531044a64958020ac2994f3cdb18e73d8c
parent772872452dc98b9919a861c017347e88a6536063 (diff)
downloadgn-uploader-e59108d3cea2e61f1a23c22c20edf2d3974e7a10.tar.gz
Add generic worker script
Add a generic worker script, whose purpose is: - to launch the specific worker script - to capture both stdin and stderr streams and put them on redis In this way, we can launch redis-aware and redis-unaware workers and capture their outputs or errors for later processing.
-rw-r--r--scripts/worker.py72
1 files changed, 72 insertions, 0 deletions
diff --git a/scripts/worker.py b/scripts/worker.py
new file mode 100644
index 0000000..4077ad1
--- /dev/null
+++ b/scripts/worker.py
@@ -0,0 +1,72 @@
+"Generic worker script that runs actual worker script"
+import sys
+import shlex
+import argparse
+import traceback
+import subprocess
+from time import sleep
+from tempfile import TemporaryFile
+
+from redis import Redis
+
+from qc_app import jobs
+
+def parse_args():
+ "Parse the command-line arguments"
+ parser = argparse.ArgumentParser(
+ prog="worker", description = (
+ "Generic worker to launch and manage specific worker scripts"))
+ parser.add_argument(
+ "redisurl", default="redis:///", help="URL to the redis server")
+ parser.add_argument("job_id", help="The id of the job being processed")
+
+ args = parser.parse_args()
+ try:
+ conn = Redis.from_url(args.redisurl) # pylint: disable=[unused-variable]
+ except ConnectionError as conn_err: # pylint: disable=[unused-variable]
+ print(traceback.format_exc(), file=sys.stderr)
+ sys.exit(1)
+
+ return args
+
+def update_stdout_stderr(bytes_read, stream: str, rconn, job_id):
+ "Update the stdout/stderr keys according to the value of `stream`."
+ job = jobs.job(rconn, job_id)
+ contents = job.get(stream, b'')
+ rconn.hset(
+ name=job_id,
+ key=stream,
+ value=contents + bytes_read)
+
+def run_job(job, rconn):
+ "Run the actual job."
+ job_id = job["job_id"]
+ try:
+ with TemporaryFile() as tmpfl:
+ with subprocess.Popen(
+ shlex.split(job["command"]), stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE) as process:
+ while process.poll() is None:
+ update_stdout_stderr(
+ process.stdout.read1(), "stdout", rconn, job_id)
+ sleep(1)
+
+ update_stdout_stderr(tmpfl.read(), "stderr", rconn, job_id)
+ return process.poll()
+ except subprocess.CalledProcessError as cpe: # pylint: disable=[unused-variable]
+ print(traceback.format_exc(), file=sys.stderr)
+ sys.exit(4)
+
+def main():
+ "Entry point function"
+ args = parse_args()
+ with Redis.from_url(args.redisurl, decode_responses=True) as rconn:
+ job = jobs.job(rconn, args.job_id)
+ if job:
+ return run_job(job, rconn)
+ print(f"No such job. '{args.job_id}'.", file=sys.stderr)
+ return 2
+ return 3
+
+if __name__ == "__main__":
+ sys.exit(main())