aboutsummaryrefslogtreecommitdiff
path: root/sheepdog/worker.py
blob: 6557ab364ec8c298ca78b307e41f2d02360b15fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""Daemon that processes commands"""
import os
import sys
import time
import argparse

import redis
import redis.connection

# Enable importing from one dir up: put as first to override any other globally
# accessible GN3
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

def update_status(conn, cmd_id, status):
    """Helper to update command status"""
    conn.hset(name=f"{cmd_id}", key="status", value=f"{status}")

def make_incremental_backoff(init_val: float=0.1, maximum: int=420):
    """
    Returns a closure that can be used to increment the returned value up to
    `maximum` or reset it to `init_val`.
    """
    current = init_val

    def __increment_or_reset__(command: str, value: float=0.1):
        nonlocal current
        if command == "reset":
            current = init_val
            return current

        if command == "increment":
            current = min(current + abs(value), maximum)
            return current

        return current

    return __increment_or_reset__

def run_jobs(conn, queue_name: str = "GN3::job-queue"):
    """Process the redis using a redis connection, CONN"""
    # pylint: disable=E0401, C0415
    from gn3.commands import run_cmd
    cmd_id = (conn.lpop(queue_name) or b'').decode("utf-8")
    if bool(cmd_id):
        cmd = conn.hget(name=cmd_id, key="cmd")
        if cmd and (conn.hget(cmd_id, "status") == b"queued"):
            update_status(conn, cmd_id, "running")
            result = run_cmd(
                cmd.decode("utf-8"), env=conn.hget(name=cmd_id, key="env"))
            conn.hset(name=cmd_id, key="result", value=result.get("output"))
            if result.get("code") == 0:  # Success
                update_status(conn, cmd_id, "success")
            else:
                update_status(conn, cmd_id, "error")
        return cmd_id
    return None

def parse_cli_arguments():
    """Parse the command-line arguments."""
    parser = argparse.ArgumentParser(
        description="Run asynchronous (service) commands.")
    parser.add_argument(
        "--daemon", default=False, action="store_true",
        help=(
            "Run process as a daemon instead of the default 'one-shot' "
            "process"))
    return parser.parse_args()

if __name__ == "__main__":
    args = parse_cli_arguments()
    with redis.Redis() as redis_conn:
        if not args.daemon:
            run_jobs(redis_conn)
        else:
            sleep_time = make_incremental_backoff()
            while True:  # Daemon that keeps running forever:
                if run_jobs(redis_conn):
                    time.sleep(sleep_time("reset"))
                    continue
                time.sleep(sleep_time("increment", sleep_time("return_current")))