aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2024-01-23 12:33:08 +0300
committerFrederick Muriuki Muriithi2024-01-23 12:33:08 +0300
commit7976230ffcb1de4f744895ee252298dea9a15f4c (patch)
tree93767bf918847c7707cf369c6f3fbc1b2a562590
parent1822e893586e13334cc66d59efbe03c1ffb48b66 (diff)
downloadgn-uploader-7976230ffcb1de4f744895ee252298dea9a15f4c.tar.gz
Update scripts to use redis prefix.
-rw-r--r--qc_app/jobs.py19
-rw-r--r--qc_app/upload/rqtl2.py37
-rw-r--r--scripts/cli_parser.py2
-rw-r--r--scripts/process_rqtl2_bundle.py26
-rw-r--r--scripts/redis_logger.py33
-rw-r--r--scripts/rqtl2/entry.py6
-rw-r--r--scripts/worker.py49
7 files changed, 96 insertions, 76 deletions
diff --git a/qc_app/jobs.py b/qc_app/jobs.py
index dc1f967..cf4e4ef 100644
--- a/qc_app/jobs.py
+++ b/qc_app/jobs.py
@@ -97,7 +97,8 @@ def launch_job(the_job: dict, redisurl: str, error_dir):
"w",
encoding="utf-8") as errorfile:
subprocess.Popen( # pylint: disable=[consider-using-with]
- [sys.executable, "-m", "scripts.worker", redisurl, job_id],
+ [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
+ job_id],
stderr=errorfile,
env={"PYTHONPATH": ":".join(sys.path)})
@@ -107,3 +108,19 @@ def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
"Retrieve the job"
thejob = rconn.hgetall(job_key(rprefix, jobid)) or raise_jobnotfound(jobid)
return thejob
+
+def update_status(
+ rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str):
+ """Update status of job in redis."""
+ rconn.hset(name=job_key(rprefix, jobid), key="status", value=status)
+
+def update_stdout_stderr(rconn: Redis,
+ rprefix: str,
+ jobid: Union[str, UUID],
+ bytes_read: bytes,
+ stream: str):
+ "Update the stdout/stderr keys according to the value of `stream`."
+ thejob = job(rconn, rprefix, jobid)
+ contents = thejob.get(stream, '')
+ new_contents = contents + bytes_read.decode("utf-8")
+ rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)
diff --git a/qc_app/upload/rqtl2.py b/qc_app/upload/rqtl2.py
index 7805114..eea49e1 100644
--- a/qc_app/upload/rqtl2.py
+++ b/qc_app/upload/rqtl2.py
@@ -552,11 +552,13 @@ def confirm_bundle_details(species_id: int, population_id: int):
_job = jobs.launch_job(
jobs.initialise_job(
rconn,
+ jobs.jobsnamespace(),
jobid,
[
sys.executable, "-m", "scripts.process_rqtl2_bundle",
- app.config["SQL_URI"], app.config["REDIS_URL"], jobid,
- "--redisexpiry", str(redis_ttl_seconds)],
+ app.config["SQL_URI"], app.config["REDIS_URL"],
+ jobs.jobsnamespace(), jobid, "--redisexpiry",
+ str(redis_ttl_seconds)],
"R/qtl2 Bundle Upload",
redis_ttl_seconds,
{
@@ -588,21 +590,22 @@ def confirm_bundle_details(species_id: int, population_id: int):
def rqtl2_processing_status(jobid: UUID):
"""Retrieve the status of the job processing the uploaded R/qtl2 bundle."""
with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn:
- thejob = jobs.job(rconn, jobid)
- if not bool(thejob):
- return render_template("rqtl2/no-such-job.html", jobid=jobid)
+ try:
+ thejob = jobs.job(rconn, jobs.jobsnamespace(), jobid)
- messagelistname = thejob.get("log-messagelist")
- logmessages = (rconn.lrange(messagelistname, 0, -1)
- if bool(messagelistname) else [])
+ messagelistname = thejob.get("log-messagelist")
+ logmessages = (rconn.lrange(messagelistname, 0, -1)
+ if bool(messagelistname) else [])
- if thejob["status"] == "error":
- return render_template(
- "rqtl2/rqtl2-job-error.html", job=thejob, messages=logmessages)
- if thejob["status"] == "success":
- return render_template("rqtl2/rqtl2-job-results.html",
- job=thejob,
- messages=logmessages)
+ if thejob["status"] == "error":
+ return render_template(
+ "rqtl2/rqtl2-job-error.html", job=thejob, messages=logmessages)
+ if thejob["status"] == "success":
+ return render_template("rqtl2/rqtl2-job-results.html",
+ job=thejob,
+ messages=logmessages)
- return render_template(
- "rqtl2/rqtl2-job-status.html", job=thejob, messages=logmessages)
+ return render_template(
+ "rqtl2/rqtl2-job-status.html", job=thejob, messages=logmessages)
+ except jobs.JobNotFound as _exc:
+ return render_template("rqtl2/no-such-job.html", jobid=jobid)
diff --git a/scripts/cli_parser.py b/scripts/cli_parser.py
index e8f030c..308ee4b 100644
--- a/scripts/cli_parser.py
+++ b/scripts/cli_parser.py
@@ -11,6 +11,8 @@ def init_cli_parser(program: str, description: Optional[str] = None) -> Argument
"databaseuri", type=str, help="URI to connect to MariaDB")
parser.add_argument(
"redisuri", type=str, help="URI to connect to the redis server.")
+ parser.add_argument(
+ "redisprefix", type=str, help="A prefix to the job ID.")
parser.add_argument("jobid", type=UUID, help="Job ID that this belongs to")
parser.add_argument(
"--redisexpiry",
diff --git a/scripts/process_rqtl2_bundle.py b/scripts/process_rqtl2_bundle.py
index 7b114cf..1b26264 100644
--- a/scripts/process_rqtl2_bundle.py
+++ b/scripts/process_rqtl2_bundle.py
@@ -27,12 +27,12 @@ def safe_json_decode(value: str) -> Any:
except json.decoder.JSONDecodeError:
return value
-def parse_job(rconn: Redis, jobid: uuid.UUID) -> dict:
+def parse_job(rconn: Redis, rprefix: str, jobid: uuid.UUID) -> dict:
"""Load and parse job from Redis"""
return {
key: safe_json_decode(value)
for key, value
- in jobs.job(rconn, jobid).items()
+ in jobs.job(rconn, rprefix, jobid).items()
}
def has_geno_file(job: dict) -> bool:
@@ -51,14 +51,15 @@ def percent_completion(geno: float, pheno: float) -> float:
def process_bundle(dbconn: mdb.Connection,
rconn: Redis,
+ rprefix: str,
jobid: uuid.UUID,
logger: Logger) -> int:
"""Process the R/qtl2 bundle."""
try:
- thejob = parse_job(rconn, jobid)
+ thejob = parse_job(rconn, rprefix, jobid)
meta = thejob["bundle-metadata"]
- rconn.hset(str(jobid), "geno-percent", "0")
- rconn.hset(str(jobid), "pheno-percent", "0")
+ rconn.hset(jobs.job_key(rprefix, jobid), "geno-percent", "0")
+ rconn.hset(jobs.job_key(rprefix, jobid), "pheno-percent", "0")
if has_geno_file(thejob):
logger.info("Processing geno files.")
@@ -74,7 +75,7 @@ def process_bundle(dbconn: mdb.Connection,
logger.debug(
"geno file processing completed successfully. (ExitCode: %s)",
genoexit)
- rconn.hset(str(jobid), "geno-percent", "100")
+ rconn.hset(jobs.job_key(rprefix, jobid), "geno-percent", "100")
if has_pheno_file(thejob):
phenoexit = install_pheno_files(
@@ -89,7 +90,7 @@ def process_bundle(dbconn: mdb.Connection,
logger.debug(
"pheno file processing completed successfully. (ExitCode: %s)",
phenoexit)
- rconn.hset(str(jobid), "pheno-percent", "100")
+ rconn.hset(jobs.job_key(rprefix, jobid), "pheno-percent", "100")
return 0
except jobs.JobNotFound as exc:
@@ -113,14 +114,17 @@ if __name__ == "__main__":
logger.addHandler(StreamHandler(stream=sys.stderr))
logger.setLevel("DEBUG")
- jobid = args.jobid
+ fqjobid = jobs.job_key(args.redisprefix, args.jobid)
with (database_connection(args.databaseuri) as dbconn,
Redis.from_url(args.redisuri, decode_responses=True) as rconn):
logger.addHandler(setup_redis_logger(
- rconn, jobid, f"{str(jobid)}:log-messages", args.redisexpiry))
+ rconn, fqjobid, f"{fqjobid}:log-messages",
+ args.redisexpiry))
- exitcode = process_bundle(dbconn, rconn, args.jobid, logger)
- rconn.hset(str(args.jobid), "percent", "100")
+ exitcode = process_bundle(
+ dbconn, rconn, args.redisprefix, args.jobid, logger)
+ rconn.hset(
+ jobs.job_key(args.redisprefix, args.jobid), "percent", "100")
return exitcode
sys.exit(main())
diff --git a/scripts/redis_logger.py b/scripts/redis_logger.py
index 76b8ba6..2ae682b 100644
--- a/scripts/redis_logger.py
+++ b/scripts/redis_logger.py
@@ -1,46 +1,41 @@
"""Utilities to log to redis for our worker scripts."""
-import uuid
import logging
from redis import Redis
class RedisLogger(logging.Handler):
"""Log out to redis for our worker scripts"""
- def __init__(self, # pylint: disable=[too-many-arguments]
+ def __init__(self,#pylint: disable=[too-many-arguments]
rconn: Redis,
- jobid: uuid.UUID,
+ fullyqualifiedjobid: str,
+ messageslistname: str,
level:int = logging.NOTSET,
- messagelistname: str = "log-messages",
expiry: int = 86400):
"""Initialise the handler."""
super().__init__(level)
self.redisconnection = rconn
- self.jobid = jobid
- self.messagelistname = messagelistname
+ self.jobid = fullyqualifiedjobid
+ self.messageslistname = messageslistname
self.expiry = expiry
- rconn.hset(name=str(jobid),
+ rconn.hset(name=fullyqualifiedjobid,
key="log-messagelist",
- value=self.messages_list_name())
-
- def messages_list_name(self):
- """Retrieve the fully qualified message-list name"""
- return f"{str(self.jobid)}:{self.messagelistname}"
+ value=messageslistname)
def emit(self, record):
"""Log to the messages list for a given worker."""
- self.redisconnection.rpush(
- self.messages_list_name(), self.format(record))
- self.redisconnection.expire(self.messages_list_name(), self.expiry)
+ self.redisconnection.rpush(self.messageslistname, self.format(record))
+ self.redisconnection.expire(self.messageslistname, self.expiry)
def setup_redis_logger(rconn: Redis,
- jobid: uuid.UUID,
+ fullyqualifiedjobid: str,
job_messagelist: str,
expiry: int = 86400) -> RedisLogger:
"""Setup a default RedisLogger logger."""
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s: %(message)s")
- # job_messagelist = f"{str(args.jobid)}:log-messages"
- rconn.hset(name=str(jobid), key="log-messagelist", value=job_messagelist)
- redislogger = RedisLogger(rconn, jobid, expiry=expiry)
+ rconn.hset(
+ name=fullyqualifiedjobid, key="log-messagelist", value=job_messagelist)
+ redislogger = RedisLogger(
+ rconn, fullyqualifiedjobid, job_messagelist, expiry=expiry)
redislogger.setFormatter(formatter)
return redislogger
diff --git a/scripts/rqtl2/entry.py b/scripts/rqtl2/entry.py
index eccc19d..93fc130 100644
--- a/scripts/rqtl2/entry.py
+++ b/scripts/rqtl2/entry.py
@@ -6,6 +6,7 @@ from argparse import Namespace
from redis import Redis
from MySQLdb import Connection
+from qc_app import jobs
from qc_app.db_utils import database_connection
from qc_app.check_connections import check_db, check_redis
@@ -25,10 +26,11 @@ def build_main(args: Namespace,
with (Redis.from_url(args.redisuri, decode_responses=True) as rconn,
database_connection(args.databaseuri) as dbconn):
+ fqjobid = jobs.job_key(jobs.jobsnamespace(), args.jobid)
logger.addHandler(setup_redis_logger(
rconn,
- args.jobid,
- f"{str(args.jobid)}:log-messages",
+ fqjobid,
+ f"{fqjobid}:log-messages",
args.redisexpiry))
logger.setLevel(loglevel)
return run_fn(dbconn, args)
diff --git a/scripts/worker.py b/scripts/worker.py
index 90d83c4..17d6e06 100644
--- a/scripts/worker.py
+++ b/scripts/worker.py
@@ -20,6 +20,8 @@ def parse_args():
"Generic worker to launch and manage specific worker scripts"))
parser.add_argument(
"redisurl", default="redis:///", help="URL to the redis server")
+ parser.add_argument(
+ "redisprefix", type=str, help="The prefix before the job ID.")
parser.add_argument("job_id", help="The id of the job being processed")
args = parser.parse_args()
@@ -27,58 +29,53 @@ def parse_args():
return args
-def update_stdout_stderr(rconn, job_id, bytes_read, stream: str):
- "Update the stdout/stderr keys according to the value of `stream`."
- job = jobs.job(rconn, job_id)
- contents = job.get(stream, '')
- new_contents = contents + bytes_read.decode("utf-8")
- rconn.hset(name=job_id, key=stream, value=new_contents)
-
-def update_status(rconn, job_id, status):
- """Update status of job in redis."""
- rconn.hset(name=job_id, key="status", value=status)
-
-def run_job(job, rconn):
+def run_job(rconn: Redis, job: dict, redisprefix: str):
"Run the actual job."
- job_id = job["job_id"]
+ jobid = job["job_id"]
try:
with TemporaryDirectory() as tmpdir:
- stderrpath = f"{tmpdir}/{job_id}.stderr"
+ stderrpath = f"{tmpdir}/{jobid}.stderr"
with open(stderrpath, "w+b") as tmpfl:
with subprocess.Popen(
shlex.split(job["command"]), stdout=subprocess.PIPE,
stderr=tmpfl) as process:
while process.poll() is None:
- update_status(rconn, job_id, "running")
- update_stdout_stderr(
- rconn, job_id, process.stdout.read1(), "stdout")
+ jobs.update_status(rconn, redisprefix, jobid, "running")
+ jobs.update_stdout_stderr(
+ rconn, redisprefix, jobid, process.stdout.read1(), "stdout")
sleep(1)
- update_status(
+ jobs.update_status(
rconn,
- job_id,
+ redisprefix,
+ jobid,
("error" if process.returncode != 0 else "success"))
with open(stderrpath, "rb") as stderr:
stderr_content = stderr.read()
- update_stdout_stderr(rconn, job_id, stderr_content, "stderr")
+ jobs.update_stdout_stderr(rconn, redisprefix, jobid, stderr_content, "stderr")
os.remove(stderrpath)
return process.poll()
except Exception as exc:# pylint: disable=[broad-except,unused-variable]
- update_status(rconn, job_id, "error")
- update_stdout_stderr(
- rconn, job_id, traceback.format_exc().encode("utf-8"), "stderr")
+ jobs.update_status(rconn, redisprefix, jobid, "error")
+ jobs.update_stdout_stderr(
+ rconn, redisprefix, jobid, traceback.format_exc().encode("utf-8"), "stderr")
print(traceback.format_exc(), file=sys.stderr)
- sys.exit(4)
+ return 4
def main():
"Entry point function"
args = parse_args()
with Redis.from_url(args.redisurl, decode_responses=True) as rconn:
- job = jobs.job(rconn, args.job_id)
+ job = jobs.job(rconn, args.redisprefix, args.job_id)
if job:
- return run_job(job, rconn)
+ return run_job(rconn, job, args.redisprefix)
+ jobs.update_status(rconn, args.job_id, "error")
+ fqjobid = jobs.job_key(redisprefix, args.jobid)
+ rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.")
+ redis_conn.expire(name=job_key(args.redisprefix, args.job_id),
+ time=timedelta(seconds=(2 * 60 * 60)))
print(f"No such job. '{args.job_id}'.", file=sys.stderr)
return 2
return 3