From 7976230ffcb1de4f744895ee252298dea9a15f4c Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Tue, 23 Jan 2024 12:33:08 +0300 Subject: Update scripts to use redis prefix. --- qc_app/jobs.py | 19 ++++++++++++++++++- qc_app/upload/rqtl2.py | 37 ++++++++++++++++++++----------------- 2 files changed, 38 insertions(+), 18 deletions(-) (limited to 'qc_app') diff --git a/qc_app/jobs.py b/qc_app/jobs.py index dc1f967..cf4e4ef 100644 --- a/qc_app/jobs.py +++ b/qc_app/jobs.py @@ -97,7 +97,8 @@ def launch_job(the_job: dict, redisurl: str, error_dir): "w", encoding="utf-8") as errorfile: subprocess.Popen( # pylint: disable=[consider-using-with] - [sys.executable, "-m", "scripts.worker", redisurl, job_id], + [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(), + job_id], stderr=errorfile, env={"PYTHONPATH": ":".join(sys.path)}) @@ -107,3 +108,19 @@ def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]): "Retrieve the job" thejob = rconn.hgetall(job_key(rprefix, jobid)) or raise_jobnotfound(jobid) return thejob + +def update_status( + rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str): + """Update status of job in redis.""" + rconn.hset(name=job_key(rprefix, jobid), key="status", value=status) + +def update_stdout_stderr(rconn: Redis, + rprefix: str, + jobid: Union[str, UUID], + bytes_read: bytes, + stream: str): + "Update the stdout/stderr keys according to the value of `stream`." + thejob = job(rconn, rprefix, jobid) + contents = thejob.get(stream, '') + new_contents = contents + bytes_read.decode("utf-8") + rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents) diff --git a/qc_app/upload/rqtl2.py b/qc_app/upload/rqtl2.py index 7805114..eea49e1 100644 --- a/qc_app/upload/rqtl2.py +++ b/qc_app/upload/rqtl2.py @@ -552,11 +552,13 @@ def confirm_bundle_details(species_id: int, population_id: int): _job = jobs.launch_job( jobs.initialise_job( rconn, + jobs.jobsnamespace(), jobid, [ sys.executable, "-m", "scripts.process_rqtl2_bundle", - app.config["SQL_URI"], app.config["REDIS_URL"], jobid, - "--redisexpiry", str(redis_ttl_seconds)], + app.config["SQL_URI"], app.config["REDIS_URL"], + jobs.jobsnamespace(), jobid, "--redisexpiry", + str(redis_ttl_seconds)], "R/qtl2 Bundle Upload", redis_ttl_seconds, { @@ -588,21 +590,22 @@ def confirm_bundle_details(species_id: int, population_id: int): def rqtl2_processing_status(jobid: UUID): """Retrieve the status of the job processing the uploaded R/qtl2 bundle.""" with Redis.from_url(app.config["REDIS_URL"], decode_responses=True) as rconn: - thejob = jobs.job(rconn, jobid) - if not bool(thejob): - return render_template("rqtl2/no-such-job.html", jobid=jobid) + try: + thejob = jobs.job(rconn, jobs.jobsnamespace(), jobid) - messagelistname = thejob.get("log-messagelist") - logmessages = (rconn.lrange(messagelistname, 0, -1) - if bool(messagelistname) else []) + messagelistname = thejob.get("log-messagelist") + logmessages = (rconn.lrange(messagelistname, 0, -1) + if bool(messagelistname) else []) - if thejob["status"] == "error": - return render_template( - "rqtl2/rqtl2-job-error.html", job=thejob, messages=logmessages) - if thejob["status"] == "success": - return render_template("rqtl2/rqtl2-job-results.html", - job=thejob, - messages=logmessages) + if thejob["status"] == "error": + return render_template( + "rqtl2/rqtl2-job-error.html", job=thejob, messages=logmessages) + if thejob["status"] == "success": + return render_template("rqtl2/rqtl2-job-results.html", + job=thejob, + messages=logmessages) - return render_template( - "rqtl2/rqtl2-job-status.html", job=thejob, messages=logmessages) + return render_template( + "rqtl2/rqtl2-job-status.html", job=thejob, messages=logmessages) + except jobs.JobNotFound as _exc: + return render_template("rqtl2/no-such-job.html", jobid=jobid) -- cgit v1.2.3