From 754e8f214b940e05298cb360ed829f5c685d55a5 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 25 Jul 2024 11:07:33 -0500 Subject: Rename module: qc_app --> uploader --- qc_app/jobs.py | 130 --------------------------------------------------------- 1 file changed, 130 deletions(-) delete mode 100644 qc_app/jobs.py (limited to 'qc_app/jobs.py') diff --git a/qc_app/jobs.py b/qc_app/jobs.py deleted file mode 100644 index 21889da..0000000 --- a/qc_app/jobs.py +++ /dev/null @@ -1,130 +0,0 @@ -"""Handle jobs""" -import os -import sys -import shlex -import subprocess -from uuid import UUID, uuid4 -from datetime import timedelta -from typing import Union, Optional - -from redis import Redis -from flask import current_app as app - -JOBS_PREFIX = "JOBS" - -class JobNotFound(Exception): - """Raised if we try to retrieve a non-existent job.""" - -def jobsnamespace(): - """ - Return the jobs namespace prefix. It depends on app configuration. - - Calling this function outside of an application context will cause an - exception to be raised. It is mostly a convenience utility to use within the - application. - """ - return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}" - -def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str: - """Build the key by appending it to the namespace prefix.""" - return f"{namespaceprefix}:{jobid}" - -def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]): - """Utility to raise a `NoSuchJobError`""" - raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.") - -def error_filename(jobid, error_dir): - "Compute the path of the file where errors will be dumped." - return f"{error_dir}/job_{jobid}.error" - -def initialise_job(# pylint: disable=[too-many-arguments] - rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str, - ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict: - "Initialise a job 'object' and put in on redis" - the_job = { - "jobid": jobid, "command": shlex.join(command), "status": "pending", - "percent": 0, "job-type": job_type, **(extra_meta or {}) - } - rconn.hset(job_key(rprefix, jobid), mapping=the_job) - rconn.expire( - name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds)) - return the_job - -def build_file_verification_job(#pylint: disable=[too-many-arguments] - redis_conn: Redis, - dburi: str, - redisuri: str, - speciesid: int, - filepath: str, - filetype: str, - ttl_seconds: int): - "Build a file verification job" - jobid = str(uuid4()) - command = [ - sys.executable, "-m", "scripts.validate_file", - dburi, redisuri, jobsnamespace(), jobid, - "--redisexpiry", str(ttl_seconds), - str(speciesid), filetype, filepath, - ] - return initialise_job( - redis_conn, jobsnamespace(), jobid, command, "file-verification", - ttl_seconds, { - "filetype": filetype, - "filename": os.path.basename(filepath), "percent": 0 - }) - -def data_insertion_job(# pylint: disable=[too-many-arguments] - redis_conn: Redis, filepath: str, filetype: str, totallines: int, - speciesid: int, platformid: int, datasetid: int, databaseuri: str, - redisuri: str, ttl_seconds: int) -> dict: - "Build a data insertion job" - jobid = str(uuid4()) - command = [ - sys.executable, "-m", "scripts.insert_data", filetype, filepath, - speciesid, platformid, datasetid, databaseuri, redisuri - ] - return initialise_job( - redis_conn, jobsnamespace(), jobid, command, "data-insertion", - ttl_seconds, { - "filename": os.path.basename(filepath), "filetype": filetype, - "totallines": totallines - }) - -def launch_job(the_job: dict, redisurl: str, error_dir): - """Launch a job in the background""" - if not os.path.exists(error_dir): - os.mkdir(error_dir) - - jobid = the_job["jobid"] - with open(error_filename(jobid, error_dir), - "w", - encoding="utf-8") as errorfile: - subprocess.Popen( # pylint: disable=[consider-using-with] - [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(), - jobid], - stderr=errorfile, - env={"PYTHONPATH": ":".join(sys.path)}) - - return the_job - -def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]): - "Retrieve the job" - thejob = (rconn.hgetall(job_key(rprefix, jobid)) or - raise_jobnotfound(rprefix, jobid)) - return thejob - -def update_status( - rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str): - """Update status of job in redis.""" - rconn.hset(name=job_key(rprefix, jobid), key="status", value=status) - -def update_stdout_stderr(rconn: Redis, - rprefix: str, - jobid: Union[str, UUID], - bytes_read: bytes, - stream: str): - "Update the stdout/stderr keys according to the value of `stream`." - thejob = job(rconn, rprefix, jobid) - contents = thejob.get(stream, '') - new_contents = contents + bytes_read.decode("utf-8") - rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents) -- cgit v1.2.3