aboutsummaryrefslogtreecommitdiff
path: root/sheepdog/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'sheepdog/worker.py')
-rw-r--r--sheepdog/worker.py33
1 files changed, 24 insertions, 9 deletions
diff --git a/sheepdog/worker.py b/sheepdog/worker.py
index cf28b74..0209699 100644
--- a/sheepdog/worker.py
+++ b/sheepdog/worker.py
@@ -3,6 +3,7 @@ import os
import sys
import time
import redis
+import argparse
import redis.connection
# Enable importing from one dir up: put as first to override any other globally
@@ -36,11 +37,11 @@ def make_incremental_backoff(init_val: float=0.1, maximum: int=420):
return __increment_or_reset__
-def run_jobs(conn):
+def run_jobs(conn, queue_name: str = "GN3::job-queue"):
"""Process the redis using a redis connection, CONN"""
# pylint: disable=E0401, C0415
from gn3.commands import run_cmd
- cmd_id = (conn.lpop("GN3::job-queue") or b'').decode("utf-8")
+ cmd_id = (conn.lpop(queue_name) or b'').decode("utf-8")
if bool(cmd_id):
cmd = conn.hget(name=cmd_id, key="cmd")
if cmd and (conn.hget(cmd_id, "status") == b"queued"):
@@ -54,11 +55,25 @@ def run_jobs(conn):
update_status(conn, cmd_id, "error")
return cmd_id
+def parse_cli_arguments():
+ parser = argparse.ArgumentParser(
+ description="Run asynchronous (service) commands.")
+ parser.add_argument(
+ "--daemon", default=False, action="store_true",
+ help=(
+ "Run process as a daemon instead of the default 'one-shot' "
+ "process"))
+ return parser.parse_args()
+
if __name__ == "__main__":
- redis_conn = redis.Redis()
- sleep_time = make_incremental_backoff()
- while True: # Daemon that keeps running forever:
- if run_jobs(redis_conn):
- time.sleep(sleep_time("reset"))
- continue
- time.sleep(sleep_time("increment", sleep_time("return_current")))
+ args = parse_cli_arguments()
+ with redis.Redis() as redis_conn:
+ if not args.daemon:
+ run_jobs(redis_conn)
+ else:
+ sleep_time = make_incremental_backoff()
+ while True: # Daemon that keeps running forever:
+ if run_jobs(redis_conn):
+ time.sleep(sleep_time("reset"))
+ continue
+ time.sleep(sleep_time("increment", sleep_time("return_current")))