about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2025-04-08 14:05:46 -0500
committerFrederick Muriuki Muriithi2025-04-08 14:19:55 -0500
commit1c944a7f890da1a40995eb37624e4b72676b6f76 (patch)
tree1b55d0d76bd38c0d91eda568367a4dbc04d66e91
parent76c3800320b722edcd24b73ed1a9b860f305ecb8 (diff)
downloadgenenetwork3-1c944a7f890da1a40995eb37624e4b72676b6f76.tar.gz
Add logging to worker.
-rw-r--r--gn3/commands.py12
-rw-r--r--sheepdog/worker.py18
2 files changed, 28 insertions, 2 deletions
diff --git a/gn3/commands.py b/gn3/commands.py
index 71a4d27..eba01b1 100644
--- a/gn3/commands.py
+++ b/gn3/commands.py
@@ -176,12 +176,20 @@ def run_cmd(cmd: str, success_codes: Tuple = (0,), env: Optional[str] = None) ->
 
 def run_async_cmd(
         conn: Redis, job_queue: str, cmd: Union[str, Sequence[str]],
-        email: Optional[str] = None, env: Optional[dict] = None) -> str:
+        email: Optional[str] = None, log_level: str = "info",
+        env: Optional[dict] = None) -> str:
     """A utility function to call `gn3.commands.queue_cmd` function and run the
     worker in the `one-shot` mode."""
     cmd_id = queue_cmd(conn, job_queue, cmd, email, env)
+    worker_command = [
+        sys.executable,
+        "-m", "sheepdog.worker",
+        "--queue-name", job_queue,
+        "--log-level", log_level
+    ]
+    logging.debug("Launching the worker: %s", worker_command)
     subprocess.Popen( # pylint: disable=[consider-using-with]
-        [sys.executable, "-m", "sheepdog.worker", "--queue-name", job_queue])
+        worker_command)
     return cmd_id
 
 
diff --git a/sheepdog/worker.py b/sheepdog/worker.py
index b5b0f16..67bcbeb 100644
--- a/sheepdog/worker.py
+++ b/sheepdog/worker.py
@@ -2,6 +2,7 @@
 import os
 import sys
 import time
+import logging
 import argparse
 
 import redis
@@ -10,6 +11,10 @@ import redis.connection
 # Enable importing from one dir up: put as first to override any other globally
 # accessible GN3
 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
+logging.basicConfig(
+    format=("%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: "
+            "CommandWorker: %(message)s"))
+logger = logging.getLogger(__name__)
 
 def update_status(conn, cmd_id, status):
     """Helper to update command status"""
@@ -44,6 +49,7 @@ def run_jobs(conn, queue_name):
     if bool(cmd_id):
         cmd = conn.hget(name=cmd_id, key="cmd")
         if cmd and (conn.hget(cmd_id, "status") == b"queued"):
+            logger.debug(f"Updating status for job '{cmd_id}' to 'running'")
             update_status(conn, cmd_id, "running")
             result = run_cmd(
                 cmd.decode("utf-8"), env=conn.hget(name=cmd_id, key="env"))
@@ -68,17 +74,29 @@ def parse_cli_arguments():
     parser.add_argument(
         "--queue-name", default="GN3::job-queue", type=str,
         help="The redis list that holds the unique command ids")
+    parser.add_argument(
+        "--log-level", default="info", type=str,
+        choices=("debug", "info", "warning", "error", "critical"),
+        help="What level to output the logs at.")
     return parser.parse_args()
 
 if __name__ == "__main__":
     args = parse_cli_arguments()
+    logger.setLevel(args.log_level.upper())
+    logger.debug("Worker Script: Initialising worker")
     with redis.Redis() as redis_conn:
         if not args.daemon:
+            logger.info("Worker Script: Running worker in one-shot mode.")
             run_jobs(redis_conn, args.queue_name)
+            logger.debug("Job completed!")
         else:
+            logger.debug("Worker Script: Running worker in daemon-mode.")
             sleep_time = make_incremental_backoff()
             while True:  # Daemon that keeps running forever:
                 if run_jobs(redis_conn, args.queue_name):
+                    logger.debug("Ran a job. Pausing for a while...")
                     time.sleep(sleep_time("reset"))
                     continue
                 time.sleep(sleep_time("increment", sleep_time("return_current")))
+
+    logger.info("Worker exiting …")