about summary refs log tree commit diff
path: root/sheepdog/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'sheepdog/worker.py')
-rw-r--r--sheepdog/worker.py47
1 files changed, 40 insertions, 7 deletions
diff --git a/sheepdog/worker.py b/sheepdog/worker.py
index c08edec..e8a7177 100644
--- a/sheepdog/worker.py
+++ b/sheepdog/worker.py
@@ -2,27 +2,37 @@
 import os
 import sys
 import time
+import logging
 import argparse
 
 import redis
 import redis.connection
 
+from gn3.loggers import setup_modules_logging
+
 # Enable importing from one dir up: put as first to override any other globally
 # accessible GN3
-sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
+sys.path.insert(0, os.path.abspath(
+    os.path.join(os.path.dirname(__file__), '..')))
+logging.basicConfig(
+    format=("%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: "
+            "CommandWorker: %(message)s"))
+logger = logging.getLogger(__name__)
+
 
 def update_status(conn, cmd_id, status):
     """Helper to update command status"""
     conn.hset(name=f"{cmd_id}", key="status", value=f"{status}")
 
-def make_incremental_backoff(init_val: float=0.1, maximum: int=420):
+
+def make_incremental_backoff(init_val: float = 0.1, maximum: int = 420):
     """
     Returns a closure that can be used to increment the returned value up to
     `maximum` or reset it to `init_val`.
     """
     current = init_val
 
-    def __increment_or_reset__(command: str, value: float=0.1):
+    def __increment_or_reset__(command: str, value: float = 0.1):
         nonlocal current
         if command == "reset":
             current = init_val
@@ -36,7 +46,8 @@ def make_incremental_backoff(init_val: float=0.1, maximum: int=420):
 
     return __increment_or_reset__
 
-def run_jobs(conn, queue_name: str = "GN3::job-queue"):
+
+def run_jobs(conn, queue_name):
     """Process the redis using a redis connection, CONN"""
     # pylint: disable=E0401, C0415
     from gn3.commands import run_cmd
@@ -44,6 +55,7 @@ def run_jobs(conn, queue_name: str = "GN3::job-queue"):
     if bool(cmd_id):
         cmd = conn.hget(name=cmd_id, key="cmd")
         if cmd and (conn.hget(cmd_id, "status") == b"queued"):
+            logger.debug("Updating status for job '%s' to 'running'", cmd_id)
             update_status(conn, cmd_id, "running")
             result = run_cmd(
                 cmd.decode("utf-8"), env=conn.hget(name=cmd_id, key="env"))
@@ -56,6 +68,7 @@ def run_jobs(conn, queue_name: str = "GN3::job-queue"):
         return cmd_id
     return None
 
+
 def parse_cli_arguments():
     """Parse the command-line arguments."""
     parser = argparse.ArgumentParser(
@@ -65,17 +78,37 @@ def parse_cli_arguments():
         help=(
             "Run process as a daemon instead of the default 'one-shot' "
             "process"))
+    parser.add_argument(
+        "--queue-name", default="GN3::job-queue", type=str,
+        help="The redis list that holds the unique command ids")
+    parser.add_argument(
+        "--log-level", default="info", type=str,
+        choices=("debug", "info", "warning", "error", "critical"),
+        help="What level to output the logs at.")
     return parser.parse_args()
 
+
 if __name__ == "__main__":
     args = parse_cli_arguments()
+    logger.setLevel(args.log_level.upper())
+    logger.debug("Worker Script: Initialising worker")
+    setup_modules_logging(
+        logging.getLevelName(logger.getEffectiveLevel()),
+        ("gn3.commands",))
     with redis.Redis() as redis_conn:
         if not args.daemon:
-            run_jobs(redis_conn)
+            logger.info("Worker Script: Running worker in one-shot mode.")
+            run_jobs(redis_conn, args.queue_name)
+            logger.debug("Job completed!")
         else:
+            logger.debug("Worker Script: Running worker in daemon-mode.")
             sleep_time = make_incremental_backoff()
             while True:  # Daemon that keeps running forever:
-                if run_jobs(redis_conn):
+                if run_jobs(redis_conn, args.queue_name):
+                    logger.debug("Ran a job. Pausing for a while...")
                     time.sleep(sleep_time("reset"))
                     continue
-                time.sleep(sleep_time("increment", sleep_time("return_current")))
+                time.sleep(sleep_time(
+                    "increment", sleep_time("return_current")))
+
+    logger.info("Worker exiting …")