about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-05-16 10:37:23 +0300
committerFrederick Muriuki Muriithi2022-05-16 10:37:23 +0300
commita657b502e6ed46ea0887b5febb89a7408f163820 (patch)
tree42a6ad04456e39b37ed581d8042bd39f811e39c7
parent4a9fd93d01b6d4bd9d9880dbf2274b3c7f2de37b (diff)
downloadgenenetwork3-a657b502e6ed46ea0887b5febb89a7408f163820.tar.gz
Enable running the worker in "one-shot" mode
Enable the running of the worker script in one-shot mode.
-rw-r--r--sheepdog/worker.py33
1 files changed, 24 insertions, 9 deletions
diff --git a/sheepdog/worker.py b/sheepdog/worker.py
index cf28b74..0209699 100644
--- a/sheepdog/worker.py
+++ b/sheepdog/worker.py
@@ -3,6 +3,7 @@ import os
 import sys
 import time
 import redis
+import argparse
 import redis.connection
 
 # Enable importing from one dir up: put as first to override any other globally
@@ -36,11 +37,11 @@ def make_incremental_backoff(init_val: float=0.1, maximum: int=420):
 
     return __increment_or_reset__
 
-def run_jobs(conn):
+def run_jobs(conn, queue_name: str = "GN3::job-queue"):
     """Process the redis using a redis connection, CONN"""
     # pylint: disable=E0401, C0415
     from gn3.commands import run_cmd
-    cmd_id = (conn.lpop("GN3::job-queue") or b'').decode("utf-8")
+    cmd_id = (conn.lpop(queue_name) or b'').decode("utf-8")
     if bool(cmd_id):
         cmd = conn.hget(name=cmd_id, key="cmd")
         if cmd and (conn.hget(cmd_id, "status") == b"queued"):
@@ -54,11 +55,25 @@ def run_jobs(conn):
                 update_status(conn, cmd_id, "error")
         return cmd_id
 
+def parse_cli_arguments():
+    parser = argparse.ArgumentParser(
+        description="Run asynchronous (service) commands.")
+    parser.add_argument(
+        "--daemon", default=False, action="store_true",
+        help=(
+            "Run process as a daemon instead of the default 'one-shot' "
+            "process"))
+    return parser.parse_args()
+
 if __name__ == "__main__":
-    redis_conn = redis.Redis()
-    sleep_time = make_incremental_backoff()
-    while True:  # Daemon that keeps running forever:
-        if run_jobs(redis_conn):
-            time.sleep(sleep_time("reset"))
-            continue
-        time.sleep(sleep_time("increment", sleep_time("return_current")))
+    args = parse_cli_arguments()
+    with redis.Redis() as redis_conn:
+        if not args.daemon:
+            run_jobs(redis_conn)
+        else:
+            sleep_time = make_incremental_backoff()
+            while True:  # Daemon that keeps running forever:
+                if run_jobs(redis_conn):
+                    time.sleep(sleep_time("reset"))
+                    continue
+                time.sleep(sleep_time("increment", sleep_time("return_current")))