diff options
| author | Frederick Muriuki Muriithi | 2025-04-08 14:05:46 -0500 |
|---|---|---|
| committer | Frederick Muriuki Muriithi | 2025-04-08 14:19:55 -0500 |
| commit | 1c944a7f890da1a40995eb37624e4b72676b6f76 (patch) | |
| tree | 1b55d0d76bd38c0d91eda568367a4dbc04d66e91 | |
| parent | 76c3800320b722edcd24b73ed1a9b860f305ecb8 (diff) | |
| download | genenetwork3-1c944a7f890da1a40995eb37624e4b72676b6f76.tar.gz | |
Add logging to worker.
| -rw-r--r-- | gn3/commands.py | 12 | ||||
| -rw-r--r-- | sheepdog/worker.py | 18 |
2 files changed, 28 insertions, 2 deletions
diff --git a/gn3/commands.py b/gn3/commands.py index 71a4d27..eba01b1 100644 --- a/gn3/commands.py +++ b/gn3/commands.py @@ -176,12 +176,20 @@ def run_cmd(cmd: str, success_codes: Tuple = (0,), env: Optional[str] = None) -> def run_async_cmd( conn: Redis, job_queue: str, cmd: Union[str, Sequence[str]], - email: Optional[str] = None, env: Optional[dict] = None) -> str: + email: Optional[str] = None, log_level: str = "info", + env: Optional[dict] = None) -> str: """A utility function to call `gn3.commands.queue_cmd` function and run the worker in the `one-shot` mode.""" cmd_id = queue_cmd(conn, job_queue, cmd, email, env) + worker_command = [ + sys.executable, + "-m", "sheepdog.worker", + "--queue-name", job_queue, + "--log-level", log_level + ] + logging.debug("Launching the worker: %s", worker_command) subprocess.Popen( # pylint: disable=[consider-using-with] - [sys.executable, "-m", "sheepdog.worker", "--queue-name", job_queue]) + worker_command) return cmd_id diff --git a/sheepdog/worker.py b/sheepdog/worker.py index b5b0f16..67bcbeb 100644 --- a/sheepdog/worker.py +++ b/sheepdog/worker.py @@ -2,6 +2,7 @@ import os import sys import time +import logging import argparse import redis @@ -10,6 +11,10 @@ import redis.connection # Enable importing from one dir up: put as first to override any other globally # accessible GN3 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +logging.basicConfig( + format=("%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: " + "CommandWorker: %(message)s")) +logger = logging.getLogger(__name__) def update_status(conn, cmd_id, status): """Helper to update command status""" @@ -44,6 +49,7 @@ def run_jobs(conn, queue_name): if bool(cmd_id): cmd = conn.hget(name=cmd_id, key="cmd") if cmd and (conn.hget(cmd_id, "status") == b"queued"): + logger.debug(f"Updating status for job '{cmd_id}' to 'running'") update_status(conn, cmd_id, "running") result = run_cmd( cmd.decode("utf-8"), env=conn.hget(name=cmd_id, key="env")) @@ -68,17 +74,29 @@ def parse_cli_arguments(): parser.add_argument( "--queue-name", default="GN3::job-queue", type=str, help="The redis list that holds the unique command ids") + parser.add_argument( + "--log-level", default="info", type=str, + choices=("debug", "info", "warning", "error", "critical"), + help="What level to output the logs at.") return parser.parse_args() if __name__ == "__main__": args = parse_cli_arguments() + logger.setLevel(args.log_level.upper()) + logger.debug("Worker Script: Initialising worker") with redis.Redis() as redis_conn: if not args.daemon: + logger.info("Worker Script: Running worker in one-shot mode.") run_jobs(redis_conn, args.queue_name) + logger.debug("Job completed!") else: + logger.debug("Worker Script: Running worker in daemon-mode.") sleep_time = make_incremental_backoff() while True: # Daemon that keeps running forever: if run_jobs(redis_conn, args.queue_name): + logger.debug("Ran a job. Pausing for a while...") time.sleep(sleep_time("reset")) continue time.sleep(sleep_time("increment", sleep_time("return_current"))) + + logger.info("Worker exiting …") |
