From 7976230ffcb1de4f744895ee252298dea9a15f4c Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Tue, 23 Jan 2024 12:33:08 +0300 Subject: Update scripts to use redis prefix. --- scripts/cli_parser.py | 2 ++ scripts/process_rqtl2_bundle.py | 26 +++++++++++++--------- scripts/redis_logger.py | 33 ++++++++++++--------------- scripts/rqtl2/entry.py | 6 +++-- scripts/worker.py | 49 +++++++++++++++++++---------------------- 5 files changed, 58 insertions(+), 58 deletions(-) (limited to 'scripts') diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py index e8f030c..308ee4b 100644 --- a/scripts/cli_parser.py +++ b/scripts/cli_parser.py @@ -11,6 +11,8 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument "databaseuri", type=str, help="URI to connect to MariaDB") parser.add_argument( "redisuri", type=str, help="URI to connect to the redis server.") + parser.add_argument( + "redisprefix", type=str, help="A prefix to the job ID.") parser.add_argument("jobid", type=UUID, help="Job ID that this belongs to") parser.add_argument( "--redisexpiry", diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py index 7b114cf..1b26264 100644 --- a/scripts/process_rqtl2_bundle.py +++ b/scripts/process_rqtl2_bundle.py @@ -27,12 +27,12 @@ def safe_json_decode(value: str) -> Any: except json.decoder.JSONDecodeError: return value -def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict: +def parse_job(rconn: Redis, rprefix: str, jobid: uuid.UUID) -> dict: """Load and parse job from Redis""" return { key: safe_json_decode(value) for key, value - in jobs.job(rconn, jobid).items() + in jobs.job(rconn, rprefix, jobid).items() } def has_geno_file(job: dict) -> bool: @@ -51,14 +51,15 @@ def percent_completion(geno: float, pheno: float) -> float: def process_bundle(dbconn: mdb.Connection, rconn: Redis, + rprefix: str, jobid: uuid.UUID, logger: Logger) -> int: """Process the R/qtl2 bundle.""" try: - thejob = parse_job(rconn, jobid) + thejob = parse_job(rconn, rprefix, jobid) meta = thejob["bundle-metadata"] - rconn.hset(str(jobid), "geno-percent", "0") - rconn.hset(str(jobid), "pheno-percent", "0") + rconn.hset(jobs.job_key(rprefix, jobid), "geno-percent", "0") + rconn.hset(jobs.job_key(rprefix, jobid), "pheno-percent", "0") if has_geno_file(thejob): logger.info("Processing geno files.") @@ -74,7 +75,7 @@ def process_bundle(dbconn: mdb.Connection, logger.debug( "geno file processing completed successfully. (ExitCode: %s)", genoexit) - rconn.hset(str(jobid), "geno-percent", "100") + rconn.hset(jobs.job_key(rprefix, jobid), "geno-percent", "100") if has_pheno_file(thejob): phenoexit = install_pheno_files( @@ -89,7 +90,7 @@ def process_bundle(dbconn: mdb.Connection, logger.debug( "pheno file processing completed successfully. (ExitCode: %s)", phenoexit) - rconn.hset(str(jobid), "pheno-percent", "100") + rconn.hset(jobs.job_key(rprefix, jobid), "pheno-percent", "100") return 0 except jobs.JobNotFound as exc: @@ -113,14 +114,17 @@ if __name__ == "__main__": logger.addHandler(StreamHandler(stream=sys.stderr)) logger.setLevel("DEBUG") - jobid = args.jobid + fqjobid = jobs.job_key(args.redisprefix, args.jobid) with (database_connection(args.databaseuri) as dbconn, Redis.from_url(args.redisuri, decode_responses=True) as rconn): logger.addHandler(setup_redis_logger( - rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry)) + rconn, fqjobid, f"{fqjobid}:log-messages", + args.redisexpiry)) - exitcode = process_bundle(dbconn, rconn, args.jobid, logger) - rconn.hset(str(args.jobid), "percent", "100") + exitcode = process_bundle( + dbconn, rconn, args.redisprefix, args.jobid, logger) + rconn.hset( + jobs.job_key(args.redisprefix, args.jobid), "percent", "100") return exitcode sys.exit(main()) diff --git a/scripts/redis_logger.py b/scripts/redis_logger.py index 76b8ba6..2ae682b 100644 --- a/scripts/redis_logger.py +++ b/scripts/redis_logger.py @@ -1,46 +1,41 @@ """Utilities to log to redis for our worker scripts.""" -import uuid import logging from redis import Redis class RedisLogger(logging.Handler): """Log out to redis for our worker scripts""" - def __init__(self, # pylint: disable=[too-many-arguments] + def __init__(self,#pylint: disable=[too-many-arguments] rconn: Redis, - jobid: uuid.UUID, + fullyqualifiedjobid: str, + messageslistname: str, level:int = logging.NOTSET, - messagelistname: str = "log-messages", expiry: int = 86400): """Initialise the handler.""" super().__init__(level) self.redisconnection = rconn - self.jobid = jobid - self.messagelistname = messagelistname + self.jobid = fullyqualifiedjobid + self.messageslistname = messageslistname self.expiry = expiry - rconn.hset(name=str(jobid), + rconn.hset(name=fullyqualifiedjobid, key="log-messagelist", - value=self.messages_list_name()) - - def messages_list_name(self): - """Retrieve the fully qualified message-list name""" - return f"{str(self.jobid)}:{self.messagelistname}" + value=messageslistname) def emit(self, record): """Log to the messages list for a given worker.""" - self.redisconnection.rpush( - self.messages_list_name(), self.format(record)) - self.redisconnection.expire(self.messages_list_name(), self.expiry) + self.redisconnection.rpush(self.messageslistname, self.format(record)) + self.redisconnection.expire(self.messageslistname, self.expiry) def setup_redis_logger(rconn: Redis, - jobid: uuid.UUID, + fullyqualifiedjobid: str, job_messagelist: str, expiry: int = 86400) -> RedisLogger: """Setup a default RedisLogger logger.""" formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s: %(message)s") - # job_messagelist = f"{str(args.jobid)}:log-messages" - rconn.hset(name=str(jobid), key="log-messagelist", value=job_messagelist) - redislogger = RedisLogger(rconn, jobid, expiry=expiry) + rconn.hset( + name=fullyqualifiedjobid, key="log-messagelist", value=job_messagelist) + redislogger = RedisLogger( + rconn, fullyqualifiedjobid, job_messagelist, expiry=expiry) redislogger.setFormatter(formatter) return redislogger diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py index eccc19d..93fc130 100644 --- a/scripts/rqtl2/entry.py +++ b/scripts/rqtl2/entry.py @@ -6,6 +6,7 @@ from argparse import Namespace from redis import Redis from MySQLdb import Connection +from qc_app import jobs from qc_app.db_utils import database_connection from qc_app.check_connections import check_db, check_redis @@ -25,10 +26,11 @@ def build_main(args: Namespace, with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, database_connection(args.databaseuri) as dbconn): + fqjobid = jobs.job_key(jobs.jobsnamespace(), args.jobid) logger.addHandler(setup_redis_logger( rconn, - args.jobid, - f"{str(args.jobid)}:log-messages", + fqjobid, + f"{fqjobid}:log-messages", args.redisexpiry)) logger.setLevel(loglevel) return run_fn(dbconn, args) diff --git a/scripts/worker.py b/scripts/worker.py index 90d83c4..17d6e06 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -20,6 +20,8 @@ def parse_args(): "Generic worker to launch and manage specific worker scripts")) parser.add_argument( "redisurl", default="redis:///", help="URL to the redis server") + parser.add_argument( + "redisprefix", type=str, help="The prefix before the job ID.") parser.add_argument("job_id", help="The id of the job being processed") args = parser.parse_args() @@ -27,58 +29,53 @@ def parse_args(): return args -def update_stdout_stderr(rconn, job_id, bytes_read, stream: str): - "Update the stdout/stderr keys according to the value of `stream`." - job = jobs.job(rconn, job_id) - contents = job.get(stream, '') - new_contents = contents + bytes_read.decode("utf-8") - rconn.hset(name=job_id, key=stream, value=new_contents) - -def update_status(rconn, job_id, status): - """Update status of job in redis.""" - rconn.hset(name=job_id, key="status", value=status) - -def run_job(job, rconn): +def run_job(rconn: Redis, job: dict, redisprefix: str): "Run the actual job." - job_id = job["job_id"] + jobid = job["job_id"] try: with TemporaryDirectory() as tmpdir: - stderrpath = f"{tmpdir}/{job_id}.stderr" + stderrpath = f"{tmpdir}/{jobid}.stderr" with open(stderrpath, "w+b") as tmpfl: with subprocess.Popen( shlex.split(job["command"]), stdout=subprocess.PIPE, stderr=tmpfl) as process: while process.poll() is None: - update_status(rconn, job_id, "running") - update_stdout_stderr( - rconn, job_id, process.stdout.read1(), "stdout") + jobs.update_status(rconn, redisprefix, jobid, "running") + jobs.update_stdout_stderr( + rconn, redisprefix, jobid, process.stdout.read1(), "stdout") sleep(1) - update_status( + jobs.update_status( rconn, - job_id, + redisprefix, + jobid, ("error" if process.returncode != 0 else "success")) with open(stderrpath, "rb") as stderr: stderr_content = stderr.read() - update_stdout_stderr(rconn, job_id, stderr_content, "stderr") + jobs.update_stdout_stderr(rconn, redisprefix, jobid, stderr_content, "stderr") os.remove(stderrpath) return process.poll() except Exception as exc:# pylint: disable=[broad-except,unused-variable] - update_status(rconn, job_id, "error") - update_stdout_stderr( - rconn, job_id, traceback.format_exc().encode("utf-8"), "stderr") + jobs.update_status(rconn, redisprefix, jobid, "error") + jobs.update_stdout_stderr( + rconn, redisprefix, jobid, traceback.format_exc().encode("utf-8"), "stderr") print(traceback.format_exc(), file=sys.stderr) - sys.exit(4) + return 4 def main(): "Entry point function" args = parse_args() with Redis.from_url(args.redisurl, decode_responses=True) as rconn: - job = jobs.job(rconn, args.job_id) + job = jobs.job(rconn, args.redisprefix, args.job_id) if job: - return run_job(job, rconn) + return run_job(rconn, job, args.redisprefix) + jobs.update_status(rconn, args.job_id, "error") + fqjobid = jobs.job_key(redisprefix, args.jobid) + rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.") + redis_conn.expire(name=job_key(args.redisprefix, args.job_id), + time=timedelta(seconds=(2 * 60 * 60))) print(f"No such job. '{args.job_id}'.", file=sys.stderr) return 2 return 3 -- cgit v1.2.3