"""Utilities to log to redis for our worker scripts."""
import logging
from typing import Optional
from redis import Redis
class RedisLogger(logging.Handler):
"""Log out to redis for our worker scripts"""
def __init__(self,#pylint: disable=[too-many-arguments, too-many-positional-arguments]
rconn: Redis,
fullyqualifiedjobid: str,
messageslistname: str,
level:int = logging.NOTSET,
expiry: int = 86400):
"""Initialise the handler."""
super().__init__(level)
self.redisconnection = rconn
self.jobid = fullyqualifiedjobid
self.messageslistname = messageslistname
self.expiry = expiry
rconn.hset(name=fullyqualifiedjobid,
key="log-messagelist",
value=messageslistname)
def emit(self, record):
"""Log to the messages list for a given worker."""
self.redisconnection.rpush(self.messageslistname, self.format(record))
self.redisconnection.expire(self.messageslistname, self.expiry)
class RedisMessageListHandler(logging.Handler):
"""Send messages to specified redis list."""
def __init__(self,
rconn: Redis,
fullyqualifiedkey: str,
loglevel: int = logging.NOTSET,
expiry: Optional[int] = 86400):
super().__init__(loglevel)
self.redisconnection = rconn
self.fullyqualifiedkey = fullyqualifiedkey
self.expiry = expiry
def emit(self, record):
"""Log out to specified `fullyqualifiedkey`."""
self.redisconnection.rpush(self.fullyqualifiedkey, self.format(record))
if bool(self.expiry):
self.redisconnection.expire(self.fullyqualifiedkey, self.expiry)
else:
self.redisconnection.persist(self.fullyqualifiedkey)
def setup_redis_logger(rconn: Redis,
fullyqualifiedjobid: str,
job_messagelist: str,
expiry: int = 86400) -> RedisLogger:
"""Setup a default RedisLogger logger."""
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s: %(message)s")
rconn.hset(
name=fullyqualifiedjobid, key="log-messagelist", value=job_messagelist)
redislogger = RedisLogger(
rconn, fullyqualifiedjobid, job_messagelist, expiry=expiry)
redislogger.setFormatter(formatter)
return redislogger