"""Utilities to log to redis for our worker scripts.""" import logging from typing import Optional from redis import Redis class RedisLogger(logging.Handler): """Log out to redis for our worker scripts""" def __init__(self,#pylint: disable=[too-many-arguments] rconn: Redis, fullyqualifiedjobid: str, messageslistname: str, level:int = logging.NOTSET, expiry: int = 86400): """Initialise the handler.""" super().__init__(level) self.redisconnection = rconn self.jobid = fullyqualifiedjobid self.messageslistname = messageslistname self.expiry = expiry rconn.hset(name=fullyqualifiedjobid, key="log-messagelist", value=messageslistname) def emit(self, record): """Log to the messages list for a given worker.""" self.redisconnection.rpush(self.messageslistname, self.format(record)) self.redisconnection.expire(self.messageslistname, self.expiry) class RedisMessageListHandler(logging.Handler): """Send messages to specified redis list.""" def __init__(self, rconn: Redis, fullyqualifiedkey: str, loglevel: int = logging.NOTSET, expiry: Optional[int] = 86400): super().__init__(loglevel) self.redisconnection = rconn self.fullyqualifiedkey = fullyqualifiedkey self.expiry = expiry def emit(self, record): """Log out to specified `fullyqualifiedkey`.""" self.redisconnection.rpush(self.fullyqualifiedkey, self.format(record)) if bool(self.expiry): self.redisconnection.expire(self.fullyqualifiedkey, self.expiry) else: self.redisconnection.persist(self.fullyqualifiedkey) def setup_redis_logger(rconn: Redis, fullyqualifiedjobid: str, job_messagelist: str, expiry: int = 86400) -> RedisLogger: """Setup a default RedisLogger logger.""" formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s: %(message)s") rconn.hset( name=fullyqualifiedjobid, key="log-messagelist", value=job_messagelist) redislogger = RedisLogger( rconn, fullyqualifiedjobid, job_messagelist, expiry=expiry) redislogger.setFormatter(formatter) return redislogger