aboutsummaryrefslogtreecommitdiff
path: root/sheepdog/worker.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-02-25 12:22:19 +0300
committerFrederick Muriuki Muriithi2022-03-03 10:20:04 +0300
commit7de9fea87dc2c9bcb242fd7ffda11af63dbf4268 (patch)
treedd78cfbdfd30cc563c1b5065d4b1fb3931f4e000 /sheepdog/worker.py
parent6d39c92fbc9a7b82cd8eef60c62cd5d83acb49a1 (diff)
downloadgenenetwork3-7de9fea87dc2c9bcb242fd7ffda11af63dbf4268.tar.gz
Do incremental backoff if there are no jobs on the queue
Since the worker polls a queue for jobs, it can get into a busy poll. This was the reason that there was a delay of 0.1 seconds between each poll instance. This commit takes this a little further by doing an incremental backoff, where it waits longer and longer after each poll where it does not find a job available, up to an arbitrary maximum.
Diffstat (limited to 'sheepdog/worker.py')
-rw-r--r--sheepdog/worker.py32
1 files changed, 30 insertions, 2 deletions
diff --git a/sheepdog/worker.py b/sheepdog/worker.py
index 4e7f9e7..cf28b74 100644
--- a/sheepdog/worker.py
+++ b/sheepdog/worker.py
@@ -10,8 +10,32 @@ import redis.connection
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
def update_status(conn, cmd_id, status):
+ """Helper to update command status"""
conn.hset(name=f"{cmd_id}", key="status", value=f"{status}")
+def make_incremental_backoff(init_val: float=0.1, maximum: int=420):
+ """
+ Returns a closure that can be used to increment the returned value up to
+ `maximum` or reset it to `init_val`.
+ """
+ current = init_val
+
+ def __increment_or_reset__(command: str, value: float=0.1):
+ nonlocal current
+ if command == "reset":
+ current = init_val
+ return current
+
+ if command == "increment":
+ current = current + abs(value)
+ if current > maximum:
+ current = maximum
+ return current
+
+ return current
+
+ return __increment_or_reset__
+
def run_jobs(conn):
"""Process the redis using a redis connection, CONN"""
# pylint: disable=E0401, C0415
@@ -28,9 +52,13 @@ def run_jobs(conn):
update_status(conn, cmd_id, "success")
else:
update_status(conn, cmd_id, "error")
+ return cmd_id
if __name__ == "__main__":
redis_conn = redis.Redis()
+ sleep_time = make_incremental_backoff()
while True: # Daemon that keeps running forever:
- run_jobs(redis_conn)
- time.sleep(0.1)
+ if run_jobs(redis_conn):
+ time.sleep(sleep_time("reset"))
+ continue
+ time.sleep(sleep_time("increment", sleep_time("return_current")))