diff options
author | Frederick Muriuki Muriithi | 2022-02-25 12:22:19 +0300 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2022-03-03 10:20:04 +0300 |
commit | 7de9fea87dc2c9bcb242fd7ffda11af63dbf4268 (patch) | |
tree | dd78cfbdfd30cc563c1b5065d4b1fb3931f4e000 /sheepdog | |
parent | 6d39c92fbc9a7b82cd8eef60c62cd5d83acb49a1 (diff) | |
download | genenetwork3-7de9fea87dc2c9bcb242fd7ffda11af63dbf4268.tar.gz |
Do incremental backoff if there are no jobs on the queue
Since the worker polls a queue for jobs, it can get into a busy poll. This was
the reason that there was a delay of 0.1 seconds between each poll instance.
This commit takes this a little further by doing an incremental backoff, where
it waits longer and longer after each poll where it does not find a job
available, up to an arbitrary maximum.
Diffstat (limited to 'sheepdog')
-rw-r--r-- | sheepdog/worker.py | 32 |
1 files changed, 30 insertions, 2 deletions
diff --git a/sheepdog/worker.py b/sheepdog/worker.py index 4e7f9e7..cf28b74 100644 --- a/sheepdog/worker.py +++ b/sheepdog/worker.py @@ -10,8 +10,32 @@ import redis.connection sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) def update_status(conn, cmd_id, status): + """Helper to update command status""" conn.hset(name=f"{cmd_id}", key="status", value=f"{status}") +def make_incremental_backoff(init_val: float=0.1, maximum: int=420): + """ + Returns a closure that can be used to increment the returned value up to + `maximum` or reset it to `init_val`. + """ + current = init_val + + def __increment_or_reset__(command: str, value: float=0.1): + nonlocal current + if command == "reset": + current = init_val + return current + + if command == "increment": + current = current + abs(value) + if current > maximum: + current = maximum + return current + + return current + + return __increment_or_reset__ + def run_jobs(conn): """Process the redis using a redis connection, CONN""" # pylint: disable=E0401, C0415 @@ -28,9 +52,13 @@ def run_jobs(conn): update_status(conn, cmd_id, "success") else: update_status(conn, cmd_id, "error") + return cmd_id if __name__ == "__main__": redis_conn = redis.Redis() + sleep_time = make_incremental_backoff() while True: # Daemon that keeps running forever: - run_jobs(redis_conn) - time.sleep(0.1) + if run_jobs(redis_conn): + time.sleep(sleep_time("reset")) + continue + time.sleep(sleep_time("increment", sleep_time("return_current"))) |