about summary refs log tree commit diff
path: root/scripts
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-01-23 12:33:08 +0300
committerFrederick Muriuki Muriithi2024-01-23 12:33:08 +0300
commit7976230ffcb1de4f744895ee252298dea9a15f4c (patch)
tree93767bf918847c7707cf369c6f3fbc1b2a562590 /scripts
parent1822e893586e13334cc66d59efbe03c1ffb48b66 (diff)
downloadgn-uploader-7976230ffcb1de4f744895ee252298dea9a15f4c.tar.gz
Update scripts to use redis prefix.
Diffstat (limited to 'scripts')
-rw-r--r--scripts/cli_parser.py2
-rw-r--r--scripts/process_rqtl2_bundle.py26
-rw-r--r--scripts/redis_logger.py33
-rw-r--r--scripts/rqtl2/entry.py6
-rw-r--r--scripts/worker.py49
5 files changed, 58 insertions, 58 deletions
diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py
index e8f030c..308ee4b 100644
--- a/scripts/cli_parser.py
+++ b/scripts/cli_parser.py
@@ -11,6 +11,8 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument
         "databaseuri", type=str, help="URI to connect to MariaDB")
     parser.add_argument(
         "redisuri", type=str, help="URI to connect to the redis server.")
+    parser.add_argument(
+        "redisprefix", type=str, help="A prefix to the job ID.")
     parser.add_argument("jobid", type=UUID, help="Job ID that this belongs to")
     parser.add_argument(
         "--redisexpiry",
diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py
index 7b114cf..1b26264 100644
--- a/scripts/process_rqtl2_bundle.py
+++ b/scripts/process_rqtl2_bundle.py
@@ -27,12 +27,12 @@ def safe_json_decode(value: str) -> Any:
     except json.decoder.JSONDecodeError:
         return value
 
-def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict:
+def parse_job(rconn: Redis, rprefix: str, jobid: uuid.UUID) -> dict:
     """Load and parse job from Redis"""
     return {
         key: safe_json_decode(value)
             for key, value
-        in jobs.job(rconn, jobid).items()
+        in jobs.job(rconn, rprefix, jobid).items()
     }
 
 def has_geno_file(job: dict) -> bool:
@@ -51,14 +51,15 @@ def percent_completion(geno: float, pheno: float) -> float:
 
 def process_bundle(dbconn: mdb.Connection,
                    rconn: Redis,
+                   rprefix: str,
                    jobid: uuid.UUID,
                    logger: Logger) -> int:
     """Process the R/qtl2 bundle."""
     try:
-        thejob = parse_job(rconn, jobid)
+        thejob = parse_job(rconn, rprefix, jobid)
         meta = thejob["bundle-metadata"]
-        rconn.hset(str(jobid), "geno-percent", "0")
-        rconn.hset(str(jobid), "pheno-percent", "0")
+        rconn.hset(jobs.job_key(rprefix, jobid), "geno-percent", "0")
+        rconn.hset(jobs.job_key(rprefix, jobid), "pheno-percent", "0")
 
         if has_geno_file(thejob):
             logger.info("Processing geno files.")
@@ -74,7 +75,7 @@ def process_bundle(dbconn: mdb.Connection,
             logger.debug(
                 "geno file processing completed successfully. (ExitCode: %s)",
                 genoexit)
-            rconn.hset(str(jobid), "geno-percent", "100")
+            rconn.hset(jobs.job_key(rprefix, jobid), "geno-percent", "100")
 
         if has_pheno_file(thejob):
             phenoexit = install_pheno_files(
@@ -89,7 +90,7 @@ def process_bundle(dbconn: mdb.Connection,
             logger.debug(
                 "pheno file processing completed successfully. (ExitCode: %s)",
                 phenoexit)
-            rconn.hset(str(jobid), "pheno-percent", "100")
+            rconn.hset(jobs.job_key(rprefix, jobid), "pheno-percent", "100")
 
         return 0
     except jobs.JobNotFound as exc:
@@ -113,14 +114,17 @@ if __name__ == "__main__":
         logger.addHandler(StreamHandler(stream=sys.stderr))
         logger.setLevel("DEBUG")
 
-        jobid = args.jobid
+        fqjobid = jobs.job_key(args.redisprefix, args.jobid)
         with (database_connection(args.databaseuri) as dbconn,
               Redis.from_url(args.redisuri, decode_responses=True) as rconn):
             logger.addHandler(setup_redis_logger(
-                rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry))
+                rconn, fqjobid, f"{fqjobid}:log-messages",
+                args.redisexpiry))
 
-            exitcode = process_bundle(dbconn, rconn, args.jobid, logger)
-            rconn.hset(str(args.jobid), "percent", "100")
+            exitcode = process_bundle(
+                dbconn, rconn, args.redisprefix, args.jobid, logger)
+            rconn.hset(
+                jobs.job_key(args.redisprefix, args.jobid), "percent", "100")
             return exitcode
 
     sys.exit(main())
diff --git a/scripts/redis_logger.py b/scripts/redis_logger.py
index 76b8ba6..2ae682b 100644
--- a/scripts/redis_logger.py
+++ b/scripts/redis_logger.py
@@ -1,46 +1,41 @@
 """Utilities to log to redis for our worker scripts."""
-import uuid
 import logging
 
 from redis import Redis
 
 class RedisLogger(logging.Handler):
     """Log out to redis for our worker scripts"""
-    def __init__(self, # pylint: disable=[too-many-arguments]
+    def __init__(self,#pylint: disable=[too-many-arguments]
                  rconn: Redis,
-                 jobid: uuid.UUID,
+                 fullyqualifiedjobid: str,
+                 messageslistname: str,
                  level:int = logging.NOTSET,
-                 messagelistname: str = "log-messages",
                  expiry: int = 86400):
         """Initialise the handler."""
         super().__init__(level)
         self.redisconnection = rconn
-        self.jobid = jobid
-        self.messagelistname = messagelistname
+        self.jobid = fullyqualifiedjobid
+        self.messageslistname = messageslistname
         self.expiry = expiry
-        rconn.hset(name=str(jobid),
+        rconn.hset(name=fullyqualifiedjobid,
                    key="log-messagelist",
-                   value=self.messages_list_name())
-
-    def messages_list_name(self):
-        """Retrieve the fully qualified message-list name"""
-        return f"{str(self.jobid)}:{self.messagelistname}"
+                   value=messageslistname)
 
     def emit(self, record):
         """Log to the messages list for a given worker."""
-        self.redisconnection.rpush(
-            self.messages_list_name(), self.format(record))
-        self.redisconnection.expire(self.messages_list_name(), self.expiry)
+        self.redisconnection.rpush(self.messageslistname, self.format(record))
+        self.redisconnection.expire(self.messageslistname, self.expiry)
 
 def setup_redis_logger(rconn: Redis,
-                       jobid: uuid.UUID,
+                       fullyqualifiedjobid: str,
                        job_messagelist: str,
                        expiry: int = 86400) -> RedisLogger:
     """Setup a default RedisLogger logger."""
     formatter = logging.Formatter(
                 "%(asctime)s - %(name)s - %(levelname)s: %(message)s")
-    # job_messagelist = f"{str(args.jobid)}:log-messages"
-    rconn.hset(name=str(jobid), key="log-messagelist", value=job_messagelist)
-    redislogger = RedisLogger(rconn, jobid, expiry=expiry)
+    rconn.hset(
+        name=fullyqualifiedjobid, key="log-messagelist", value=job_messagelist)
+    redislogger = RedisLogger(
+        rconn, fullyqualifiedjobid, job_messagelist, expiry=expiry)
     redislogger.setFormatter(formatter)
     return redislogger
diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py
index eccc19d..93fc130 100644
--- a/scripts/rqtl2/entry.py
+++ b/scripts/rqtl2/entry.py
@@ -6,6 +6,7 @@ from argparse import Namespace
 from redis import Redis
 from MySQLdb import Connection
 
+from qc_app import jobs
 from qc_app.db_utils import database_connection
 from qc_app.check_connections import check_db, check_redis
 
@@ -25,10 +26,11 @@ def build_main(args: Namespace,
 
         with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
               database_connection(args.databaseuri) as dbconn):
+            fqjobid = jobs.job_key(jobs.jobsnamespace(), args.jobid)
             logger.addHandler(setup_redis_logger(
                 rconn,
-                args.jobid,
-                f"{str(args.jobid)}:log-messages",
+                fqjobid,
+                f"{fqjobid}:log-messages",
                 args.redisexpiry))
             logger.setLevel(loglevel)
             return run_fn(dbconn, args)
diff --git a/scripts/worker.py b/scripts/worker.py
index 90d83c4..17d6e06 100644
--- a/scripts/worker.py
+++ b/scripts/worker.py
@@ -20,6 +20,8 @@ def parse_args():
             "Generic worker to launch and manage specific worker scripts"))
     parser.add_argument(
         "redisurl", default="redis:///", help="URL to the redis server")
+    parser.add_argument(
+        "redisprefix", type=str, help="The prefix before the job ID.")
     parser.add_argument("job_id", help="The id of the job being processed")
 
     args = parser.parse_args()
@@ -27,58 +29,53 @@ def parse_args():
 
     return args
 
-def update_stdout_stderr(rconn, job_id, bytes_read, stream: str):
-    "Update the stdout/stderr keys according to the value of `stream`."
-    job = jobs.job(rconn, job_id)
-    contents = job.get(stream, '')
-    new_contents = contents + bytes_read.decode("utf-8")
-    rconn.hset(name=job_id, key=stream, value=new_contents)
-
-def update_status(rconn, job_id, status):
-    """Update status of job in redis."""
-    rconn.hset(name=job_id, key="status", value=status)
-
-def run_job(job, rconn):
+def run_job(rconn: Redis, job: dict, redisprefix: str):
     "Run the actual job."
-    job_id = job["job_id"]
+    jobid = job["job_id"]
     try:
         with TemporaryDirectory() as tmpdir:
-            stderrpath = f"{tmpdir}/{job_id}.stderr"
+            stderrpath = f"{tmpdir}/{jobid}.stderr"
             with open(stderrpath, "w+b") as tmpfl:
                 with subprocess.Popen(
                         shlex.split(job["command"]), stdout=subprocess.PIPE,
                         stderr=tmpfl) as process:
                     while process.poll() is None:
-                        update_status(rconn, job_id, "running")
-                        update_stdout_stderr(
-                            rconn, job_id, process.stdout.read1(), "stdout")
+                        jobs.update_status(rconn, redisprefix, jobid, "running")
+                        jobs.update_stdout_stderr(
+                            rconn, redisprefix, jobid, process.stdout.read1(), "stdout")
                         sleep(1)
 
-                    update_status(
+                    jobs.update_status(
                         rconn,
-                        job_id,
+                        redisprefix,
+                        jobid,
                         ("error" if process.returncode != 0 else "success"))
 
             with open(stderrpath, "rb") as stderr:
                 stderr_content = stderr.read()
-                update_stdout_stderr(rconn, job_id, stderr_content, "stderr")
+                jobs.update_stdout_stderr(rconn, redisprefix, jobid, stderr_content, "stderr")
 
             os.remove(stderrpath)
         return process.poll()
     except Exception as exc:# pylint: disable=[broad-except,unused-variable]
-        update_status(rconn, job_id, "error")
-        update_stdout_stderr(
-            rconn, job_id, traceback.format_exc().encode("utf-8"), "stderr")
+        jobs.update_status(rconn, redisprefix, jobid, "error")
+        jobs.update_stdout_stderr(
+            rconn, redisprefix, jobid, traceback.format_exc().encode("utf-8"), "stderr")
         print(traceback.format_exc(), file=sys.stderr)
-        sys.exit(4)
+        return 4
 
 def main():
     "Entry point function"
     args = parse_args()
     with Redis.from_url(args.redisurl, decode_responses=True) as rconn:
-        job = jobs.job(rconn, args.job_id)
+        job = jobs.job(rconn, args.redisprefix, args.job_id)
         if job:
-            return run_job(job, rconn)
+            return run_job(rconn, job, args.redisprefix)
+        jobs.update_status(rconn, args.job_id, "error")
+        fqjobid = jobs.job_key(redisprefix, args.jobid)
+        rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.")
+        redis_conn.expire(name=job_key(args.redisprefix, args.job_id),
+                          time=timedelta(seconds=(2 * 60 * 60)))
         print(f"No such job. '{args.job_id}'.", file=sys.stderr)
         return 2
     return 3