From 754e8f214b940e05298cb360ed829f5c685d55a5 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 25 Jul 2024 11:07:33 -0500 Subject: Rename module: qc_app --> uploader --- uploader/jobs.py | 130 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 uploader/jobs.py (limited to 'uploader/jobs.py') diff --git a/uploader/jobs.py b/uploader/jobs.py new file mode 100644 index 0000000..21889da --- /dev/null +++ b/uploader/jobs.py @@ -0,0 +1,130 @@ +"""Handle jobs""" +import os +import sys +import shlex +import subprocess +from uuid import UUID, uuid4 +from datetime import timedelta +from typing import Union, Optional + +from redis import Redis +from flask import current_app as app + +JOBS_PREFIX = "JOBS" + +class JobNotFound(Exception): + """Raised if we try to retrieve a non-existent job.""" + +def jobsnamespace(): + """ + Return the jobs namespace prefix. It depends on app configuration. + + Calling this function outside of an application context will cause an + exception to be raised. It is mostly a convenience utility to use within the + application. + """ + return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}" + +def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str: + """Build the key by appending it to the namespace prefix.""" + return f"{namespaceprefix}:{jobid}" + +def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]): + """Utility to raise a `NoSuchJobError`""" + raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.") + +def error_filename(jobid, error_dir): + "Compute the path of the file where errors will be dumped." + return f"{error_dir}/job_{jobid}.error" + +def initialise_job(# pylint: disable=[too-many-arguments] + rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str, + ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict: + "Initialise a job 'object' and put in on redis" + the_job = { + "jobid": jobid, "command": shlex.join(command), "status": "pending", + "percent": 0, "job-type": job_type, **(extra_meta or {}) + } + rconn.hset(job_key(rprefix, jobid), mapping=the_job) + rconn.expire( + name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds)) + return the_job + +def build_file_verification_job(#pylint: disable=[too-many-arguments] + redis_conn: Redis, + dburi: str, + redisuri: str, + speciesid: int, + filepath: str, + filetype: str, + ttl_seconds: int): + "Build a file verification job" + jobid = str(uuid4()) + command = [ + sys.executable, "-m", "scripts.validate_file", + dburi, redisuri, jobsnamespace(), jobid, + "--redisexpiry", str(ttl_seconds), + str(speciesid), filetype, filepath, + ] + return initialise_job( + redis_conn, jobsnamespace(), jobid, command, "file-verification", + ttl_seconds, { + "filetype": filetype, + "filename": os.path.basename(filepath), "percent": 0 + }) + +def data_insertion_job(# pylint: disable=[too-many-arguments] + redis_conn: Redis, filepath: str, filetype: str, totallines: int, + speciesid: int, platformid: int, datasetid: int, databaseuri: str, + redisuri: str, ttl_seconds: int) -> dict: + "Build a data insertion job" + jobid = str(uuid4()) + command = [ + sys.executable, "-m", "scripts.insert_data", filetype, filepath, + speciesid, platformid, datasetid, databaseuri, redisuri + ] + return initialise_job( + redis_conn, jobsnamespace(), jobid, command, "data-insertion", + ttl_seconds, { + "filename": os.path.basename(filepath), "filetype": filetype, + "totallines": totallines + }) + +def launch_job(the_job: dict, redisurl: str, error_dir): + """Launch a job in the background""" + if not os.path.exists(error_dir): + os.mkdir(error_dir) + + jobid = the_job["jobid"] + with open(error_filename(jobid, error_dir), + "w", + encoding="utf-8") as errorfile: + subprocess.Popen( # pylint: disable=[consider-using-with] + [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(), + jobid], + stderr=errorfile, + env={"PYTHONPATH": ":".join(sys.path)}) + + return the_job + +def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]): + "Retrieve the job" + thejob = (rconn.hgetall(job_key(rprefix, jobid)) or + raise_jobnotfound(rprefix, jobid)) + return thejob + +def update_status( + rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str): + """Update status of job in redis.""" + rconn.hset(name=job_key(rprefix, jobid), key="status", value=status) + +def update_stdout_stderr(rconn: Redis, + rprefix: str, + jobid: Union[str, UUID], + bytes_read: bytes, + stream: str): + "Update the stdout/stderr keys according to the value of `stream`." + thejob = job(rconn, rprefix, jobid) + contents = thejob.get(stream, '') + new_contents = contents + bytes_read.decode("utf-8") + rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents) -- cgit v1.2.3