diff options
-rw-r--r-- | sheepdog/worker.py | 33 |
1 files changed, 24 insertions, 9 deletions
diff --git a/sheepdog/worker.py b/sheepdog/worker.py index cf28b74..0209699 100644 --- a/sheepdog/worker.py +++ b/sheepdog/worker.py @@ -3,6 +3,7 @@ import os import sys import time import redis +import argparse import redis.connection # Enable importing from one dir up: put as first to override any other globally @@ -36,11 +37,11 @@ def make_incremental_backoff(init_val: float=0.1, maximum: int=420): return __increment_or_reset__ -def run_jobs(conn): +def run_jobs(conn, queue_name: str = "GN3::job-queue"): """Process the redis using a redis connection, CONN""" # pylint: disable=E0401, C0415 from gn3.commands import run_cmd - cmd_id = (conn.lpop("GN3::job-queue") or b'').decode("utf-8") + cmd_id = (conn.lpop(queue_name) or b'').decode("utf-8") if bool(cmd_id): cmd = conn.hget(name=cmd_id, key="cmd") if cmd and (conn.hget(cmd_id, "status") == b"queued"): @@ -54,11 +55,25 @@ def run_jobs(conn): update_status(conn, cmd_id, "error") return cmd_id +def parse_cli_arguments(): + parser = argparse.ArgumentParser( + description="Run asynchronous (service) commands.") + parser.add_argument( + "--daemon", default=False, action="store_true", + help=( + "Run process as a daemon instead of the default 'one-shot' " + "process")) + return parser.parse_args() + if __name__ == "__main__": - redis_conn = redis.Redis() - sleep_time = make_incremental_backoff() - while True: # Daemon that keeps running forever: - if run_jobs(redis_conn): - time.sleep(sleep_time("reset")) - continue - time.sleep(sleep_time("increment", sleep_time("return_current"))) + args = parse_cli_arguments() + with redis.Redis() as redis_conn: + if not args.daemon: + run_jobs(redis_conn) + else: + sleep_time = make_incremental_backoff() + while True: # Daemon that keeps running forever: + if run_jobs(redis_conn): + time.sleep(sleep_time("reset")) + continue + time.sleep(sleep_time("increment", sleep_time("return_current"))) |