diff options
Diffstat (limited to 'scripts/worker.py')
-rw-r--r-- | scripts/worker.py | 83 |
1 files changed, 83 insertions, 0 deletions
diff --git a/scripts/worker.py b/scripts/worker.py new file mode 100644 index 0000000..0a77d41 --- /dev/null +++ b/scripts/worker.py @@ -0,0 +1,83 @@ +"""Daemon that processes commands""" +import os +import sys +import time +import argparse + +import redis +import redis.connection + +from gn_auth.commands import run_cmd + +# Enable importing from one dir up: put as first to override any other globally +# accessible GN3 +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +def update_status(conn, cmd_id, status): + """Helper to update command status""" + conn.hset(name=f"{cmd_id}", key="status", value=f"{status}") + +def make_incremental_backoff(init_val: float=0.1, maximum: int=420): + """ + Returns a closure that can be used to increment the returned value up to + `maximum` or reset it to `init_val`. + """ + current = init_val + + def __increment_or_reset__(command: str, value: float=0.1): + nonlocal current + if command == "reset": + current = init_val + return current + + if command == "increment": + current = min(current + abs(value), maximum) + return current + + return current + + return __increment_or_reset__ + +def run_jobs(conn, queue_name: str): + """Process the redis using a redis connection, CONN""" + # pylint: disable=E0401, C0415 + cmd_id = (conn.lpop(queue_name) or b'').decode("utf-8") + if bool(cmd_id): + cmd = conn.hget(name=cmd_id, key="cmd") + if cmd and (conn.hget(cmd_id, "status") == b"queued"): + update_status(conn, cmd_id, "running") + result = run_cmd( + cmd.decode("utf-8"), env=conn.hget(name=cmd_id, key="env")) + conn.hset(name=cmd_id, key="result", value=result.get("output")) + if result.get("code") == 0: # Success + update_status(conn, cmd_id, "success") + else: + update_status(conn, cmd_id, "error") + conn.hset(cmd_id, "stderr", result.get("output")) + return cmd_id + return None + +def parse_cli_arguments(): + """Parse the command-line arguments.""" + parser = argparse.ArgumentParser( + description="Run asynchronous (service) commands.") + parser.add_argument("queue_name", help="Queue to check in redis") + parser.add_argument( + "--daemon", default=False, action="store_true", + help=( + "Run process as a daemon instead of the default 'one-shot' " + "process")) + return parser.parse_args() + +if __name__ == "__main__": + args = parse_cli_arguments() + with redis.Redis() as redis_conn: + if not args.daemon: + run_jobs(redis_conn, args.queue_name) + else: + sleep_time = make_incremental_backoff() + while True: # Daemon that keeps running forever: + if run_jobs(redis_conn, args.queue_name): + time.sleep(sleep_time("reset")) + continue + time.sleep(sleep_time("increment", sleep_time("return_current"))) |