From 7976230ffcb1de4f744895ee252298dea9a15f4c Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Tue, 23 Jan 2024 12:33:08 +0300 Subject: Update scripts to use redis prefix. --- qc_app/jobs.py | 19 +++++++++++++++- qc_app/upload/rqtl2.py | 37 +++++++++++++++++-------------- scripts/cli_parser.py | 2 ++ scripts/process_rqtl2_bundle.py | 26 +++++++++++++--------- scripts/redis_logger.py | 33 ++++++++++++--------------- scripts/rqtl2/entry.py | 6 +++-- scripts/worker.py | 49 +++++++++++++++++++---------------------- 7 files changed, 96 insertions(+), 76 deletions(-) diff --git a/qc_app/jobs.py b/qc_app/jobs.py index dc1f967..cf4e4ef 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -97,7 +97,8 @@ def launch_job(the_job: dict, redisurl: str, error_dir): "w", encoding="utf-8") as errorfile: subprocess.Popen( # pylint: disable=[consider-using-with] - [sys.executable, "-m", "scripts.worker", redisurl, job_id], + [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(), + job_id], stderr=errorfile, env={"PYTHONPATH": ":".join(sys.path)}) @@ -107,3 +108,19 @@ def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]): "Retrieve the job" thejob = rconn.hgetall(job_key(rprefix, jobid)) or raise_jobnotfound(jobid) return thejob + +def update_status( + rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str): + """Update status of job in redis.""" + rconn.hset(name=job_key(rprefix, jobid), key="status", value=status) + +def update_stdout_stderr(rconn: Redis, + rprefix: str, + jobid: Union[str, UUID], + bytes_read: bytes, + stream: str): + "Update the stdout/stderr keys according to the value of `stream`." + thejob = job(rconn, rprefix, jobid) + contents = thejob.get(stream, '') + new_contents = contents + bytes_read.decode("utf-8") + rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents) diff --git a/qc_app/upload/rqtl2.py b/qc_app/upload/rqtl2.py index 7805114..eea49e1 100644 --- a/qc_app/upload/rqtl2.py +++ b/qc_app/upload/rqtl2.py @@ -552,11 +552,13 @@ def confirm_bundle_details(species_id: int, population_id: int): _job = jobs.launch_job( jobs.initialise_job( rconn, + jobs.jobsnamespace(), jobid, [ sys.executable, "-m", "scripts.process_rqtl2_bundle", - app.config["SQL_URI"], app.config["REDIS_URL"], jobid, - "--redisexpiry", str(redis_ttl_seconds)], + app.config["SQL_URI"], app.config["REDIS_URL"], + jobs.jobsnamespace(), jobid, "--redisexpiry", + str(redis_ttl_seconds)], "R/qtl2 Bundle Upload", redis_ttl_seconds, { @@ -588,21 +590,22 @@ def confirm_bundle_details(species_id: int, population_id: int): def rqtl2_processing_status(jobid: UUID): """Retrieve the status of the job processing the uploaded R/qtl2 bundle.""" with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: - thejob = jobs.job(rconn, jobid) - if not bool(thejob): - return render_template("rqtl2/no-such-job.html", jobid=jobid) + try: + thejob = jobs.job(rconn, jobs.jobsnamespace(), jobid) - messagelistname = thejob.get("log-messagelist") - logmessages = (rconn.lrange(messagelistname, 0, -1) - if bool(messagelistname) else []) + messagelistname = thejob.get("log-messagelist") + logmessages = (rconn.lrange(messagelistname, 0, -1) + if bool(messagelistname) else []) - if thejob["status"] == "error": - return render_template( - "rqtl2/rqtl2-job-error.html", job=thejob, messages=logmessages) - if thejob["status"] == "success": - return render_template("rqtl2/rqtl2-job-results.html", - job=thejob, - messages=logmessages) + if thejob["status"] == "error": + return render_template( + "rqtl2/rqtl2-job-error.html", job=thejob, messages=logmessages) + if thejob["status"] == "success": + return render_template("rqtl2/rqtl2-job-results.html", + job=thejob, + messages=logmessages) - return render_template( - "rqtl2/rqtl2-job-status.html", job=thejob, messages=logmessages) + return render_template( + "rqtl2/rqtl2-job-status.html", job=thejob, messages=logmessages) + except jobs.JobNotFound as _exc: + return render_template("rqtl2/no-such-job.html", jobid=jobid) diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py index e8f030c..308ee4b 100644 --- a/scripts/cli_parser.py +++ b/scripts/cli_parser.py @@ -11,6 +11,8 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument "databaseuri", type=str, help="URI to connect to MariaDB") parser.add_argument( "redisuri", type=str, help="URI to connect to the redis server.") + parser.add_argument( + "redisprefix", type=str, help="A prefix to the job ID.") parser.add_argument("jobid", type=UUID, help="Job ID that this belongs to") parser.add_argument( "--redisexpiry", diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py index 7b114cf..1b26264 100644 --- a/scripts/process_rqtl2_bundle.py +++ b/scripts/process_rqtl2_bundle.py @@ -27,12 +27,12 @@ def safe_json_decode(value: str) -> Any: except json.decoder.JSONDecodeError: return value -def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict: +def parse_job(rconn: Redis, rprefix: str, jobid: uuid.UUID) -> dict: """Load and parse job from Redis""" return { key: safe_json_decode(value) for key, value - in jobs.job(rconn, jobid).items() + in jobs.job(rconn, rprefix, jobid).items() } def has_geno_file(job: dict) -> bool: @@ -51,14 +51,15 @@ def percent_completion(geno: float, pheno: float) -> float: def process_bundle(dbconn: mdb.Connection, rconn: Redis, + rprefix: str, jobid: uuid.UUID, logger: Logger) -> int: """Process the R/qtl2 bundle.""" try: - thejob = parse_job(rconn, jobid) + thejob = parse_job(rconn, rprefix, jobid) meta = thejob["bundle-metadata"] - rconn.hset(str(jobid), "geno-percent", "0") - rconn.hset(str(jobid), "pheno-percent", "0") + rconn.hset(jobs.job_key(rprefix, jobid), "geno-percent", "0") + rconn.hset(jobs.job_key(rprefix, jobid), "pheno-percent", "0") if has_geno_file(thejob): logger.info("Processing geno files.") @@ -74,7 +75,7 @@ def process_bundle(dbconn: mdb.Connection, logger.debug( "geno file processing completed successfully. (ExitCode: %s)", genoexit) - rconn.hset(str(jobid), "geno-percent", "100") + rconn.hset(jobs.job_key(rprefix, jobid), "geno-percent", "100") if has_pheno_file(thejob): phenoexit = install_pheno_files( @@ -89,7 +90,7 @@ def process_bundle(dbconn: mdb.Connection, logger.debug( "pheno file processing completed successfully. (ExitCode: %s)", phenoexit) - rconn.hset(str(jobid), "pheno-percent", "100") + rconn.hset(jobs.job_key(rprefix, jobid), "pheno-percent", "100") return 0 except jobs.JobNotFound as exc: @@ -113,14 +114,17 @@ if __name__ == "__main__": logger.addHandler(StreamHandler(stream=sys.stderr)) logger.setLevel("DEBUG") - jobid = args.jobid + fqjobid = jobs.job_key(args.redisprefix, args.jobid) with (database_connection(args.databaseuri) as dbconn, Redis.from_url(args.redisuri, decode_responses=True) as rconn): logger.addHandler(setup_redis_logger( - rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry)) + rconn, fqjobid, f"{fqjobid}:log-messages", + args.redisexpiry)) - exitcode = process_bundle(dbconn, rconn, args.jobid, logger) - rconn.hset(str(args.jobid), "percent", "100") + exitcode = process_bundle( + dbconn, rconn, args.redisprefix, args.jobid, logger) + rconn.hset( + jobs.job_key(args.redisprefix, args.jobid), "percent", "100") return exitcode sys.exit(main()) diff --git a/scripts/redis_logger.py b/scripts/redis_logger.py index 76b8ba6..2ae682b 100644 --- a/scripts/redis_logger.py +++ b/scripts/redis_logger.py @@ -1,46 +1,41 @@ """Utilities to log to redis for our worker scripts.""" -import uuid import logging from redis import Redis class RedisLogger(logging.Handler): """Log out to redis for our worker scripts""" - def __init__(self, # pylint: disable=[too-many-arguments] + def __init__(self,#pylint: disable=[too-many-arguments] rconn: Redis, - jobid: uuid.UUID, + fullyqualifiedjobid: str, + messageslistname: str, level:int = logging.NOTSET, - messagelistname: str = "log-messages", expiry: int = 86400): """Initialise the handler.""" super().__init__(level) self.redisconnection = rconn - self.jobid = jobid - self.messagelistname = messagelistname + self.jobid = fullyqualifiedjobid + self.messageslistname = messageslistname self.expiry = expiry - rconn.hset(name=str(jobid), + rconn.hset(name=fullyqualifiedjobid, key="log-messagelist", - value=self.messages_list_name()) - - def messages_list_name(self): - """Retrieve the fully qualified message-list name""" - return f"{str(self.jobid)}:{self.messagelistname}" + value=messageslistname) def emit(self, record): """Log to the messages list for a given worker.""" - self.redisconnection.rpush( - self.messages_list_name(), self.format(record)) - self.redisconnection.expire(self.messages_list_name(), self.expiry) + self.redisconnection.rpush(self.messageslistname, self.format(record)) + self.redisconnection.expire(self.messageslistname, self.expiry) def setup_redis_logger(rconn: Redis, - jobid: uuid.UUID, + fullyqualifiedjobid: str, job_messagelist: str, expiry: int = 86400) -> RedisLogger: """Setup a default RedisLogger logger.""" formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s: %(message)s") - # job_messagelist = f"{str(args.jobid)}:log-messages" - rconn.hset(name=str(jobid), key="log-messagelist", value=job_messagelist) - redislogger = RedisLogger(rconn, jobid, expiry=expiry) + rconn.hset( + name=fullyqualifiedjobid, key="log-messagelist", value=job_messagelist) + redislogger = RedisLogger( + rconn, fullyqualifiedjobid, job_messagelist, expiry=expiry) redislogger.setFormatter(formatter) return redislogger diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py index eccc19d..93fc130 100644 --- a/scripts/rqtl2/entry.py +++ b/scripts/rqtl2/entry.py @@ -6,6 +6,7 @@ from argparse import Namespace from redis import Redis from MySQLdb import Connection +from qc_app import jobs from qc_app.db_utils import database_connection from qc_app.check_connections import check_db, check_redis @@ -25,10 +26,11 @@ def build_main(args: Namespace, with (Redis.from_url(args.redisuri, decode_responses=True) as rconn, database_connection(args.databaseuri) as dbconn): + fqjobid = jobs.job_key(jobs.jobsnamespace(), args.jobid) logger.addHandler(setup_redis_logger( rconn, - args.jobid, - f"{str(args.jobid)}:log-messages", + fqjobid, + f"{fqjobid}:log-messages", args.redisexpiry)) logger.setLevel(loglevel) return run_fn(dbconn, args) diff --git a/scripts/worker.py b/scripts/worker.py index 90d83c4..17d6e06 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -20,6 +20,8 @@ def parse_args(): "Generic worker to launch and manage specific worker scripts")) parser.add_argument( "redisurl", default="redis:///", help="URL to the redis server") + parser.add_argument( + "redisprefix", type=str, help="The prefix before the job ID.") parser.add_argument("job_id", help="The id of the job being processed") args = parser.parse_args() @@ -27,58 +29,53 @@ def parse_args(): return args -def update_stdout_stderr(rconn, job_id, bytes_read, stream: str): - "Update the stdout/stderr keys according to the value of `stream`." - job = jobs.job(rconn, job_id) - contents = job.get(stream, '') - new_contents = contents + bytes_read.decode("utf-8") - rconn.hset(name=job_id, key=stream, value=new_contents) - -def update_status(rconn, job_id, status): - """Update status of job in redis.""" - rconn.hset(name=job_id, key="status", value=status) - -def run_job(job, rconn): +def run_job(rconn: Redis, job: dict, redisprefix: str): "Run the actual job." - job_id = job["job_id"] + jobid = job["job_id"] try: with TemporaryDirectory() as tmpdir: - stderrpath = f"{tmpdir}/{job_id}.stderr" + stderrpath = f"{tmpdir}/{jobid}.stderr" with open(stderrpath, "w+b") as tmpfl: with subprocess.Popen( shlex.split(job["command"]), stdout=subprocess.PIPE, stderr=tmpfl) as process: while process.poll() is None: - update_status(rconn, job_id, "running") - update_stdout_stderr( - rconn, job_id, process.stdout.read1(), "stdout") + jobs.update_status(rconn, redisprefix, jobid, "running") + jobs.update_stdout_stderr( + rconn, redisprefix, jobid, process.stdout.read1(), "stdout") sleep(1) - update_status( + jobs.update_status( rconn, - job_id, + redisprefix, + jobid, ("error" if process.returncode != 0 else "success")) with open(stderrpath, "rb") as stderr: stderr_content = stderr.read() - update_stdout_stderr(rconn, job_id, stderr_content, "stderr") + jobs.update_stdout_stderr(rconn, redisprefix, jobid, stderr_content, "stderr") os.remove(stderrpath) return process.poll() except Exception as exc:# pylint: disable=[broad-except,unused-variable] - update_status(rconn, job_id, "error") - update_stdout_stderr( - rconn, job_id, traceback.format_exc().encode("utf-8"), "stderr") + jobs.update_status(rconn, redisprefix, jobid, "error") + jobs.update_stdout_stderr( + rconn, redisprefix, jobid, traceback.format_exc().encode("utf-8"), "stderr") print(traceback.format_exc(), file=sys.stderr) - sys.exit(4) + return 4 def main(): "Entry point function" args = parse_args() with Redis.from_url(args.redisurl, decode_responses=True) as rconn: - job = jobs.job(rconn, args.job_id) + job = jobs.job(rconn, args.redisprefix, args.job_id) if job: - return run_job(job, rconn) + return run_job(rconn, job, args.redisprefix) + jobs.update_status(rconn, args.job_id, "error") + fqjobid = jobs.job_key(redisprefix, args.jobid) + rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.") + redis_conn.expire(name=job_key(args.redisprefix, args.job_id), + time=timedelta(seconds=(2 * 60 * 60))) print(f"No such job. '{args.job_id}'.", file=sys.stderr) return 2 return 3 -- cgit v1.2.3