"""Handle jobs""" import os import sys import uuid import json import shlex import subprocess from uuid import UUID, uuid4 from datetime import timedelta from typing import Union, Optional from redis import Redis from flask import current_app as app from functional_tools import take JOBS_PREFIX = "jobs" class JobNotFound(Exception): """Raised if we try to retrieve a non-existent job.""" def jobsnamespace(): """ Return the jobs namespace prefix. It depends on app configuration. Calling this function outside of an application context will cause an exception to be raised. It is mostly a convenience utility to use within the application. """ return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}" def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str: """Build the key by appending it to the namespace prefix.""" return f"{namespaceprefix}:{jobid}" def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]): """Utility to raise a `NoSuchJobError`""" raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.") def error_filename(jobid, error_dir): "Compute the path of the file where errors will be dumped." return f"{error_dir}/job_{jobid}.error" def initialise_job(# pylint: disable=[too-many-arguments] rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str, ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict: "Initialise a job 'object' and put in on redis" the_job = { "jobid": jobid, "command": shlex.join(command), "status": "pending", "percent": 0, "job-type": job_type, **(extra_meta or {}) } rconn.hset(job_key(rprefix, jobid), mapping=the_job) rconn.expire( name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds)) return the_job def build_file_verification_job(#pylint: disable=[too-many-arguments] redis_conn: Redis, dburi: str, redisuri: str, speciesid: int, filepath: str, filetype: str, ttl_seconds: int): "Build a file verification job" jobid = str(uuid4()) command = [ sys.executable, "-m", "scripts.validate_file", dburi, redisuri, jobsnamespace(), jobid, "--redisexpiry", str(ttl_seconds), str(speciesid), filetype, filepath, ] return initialise_job( redis_conn, jobsnamespace(), jobid, command, "file-verification", ttl_seconds, { "filetype": filetype, "filename": os.path.basename(filepath), "percent": 0 }) def data_insertion_job(# pylint: disable=[too-many-arguments] redis_conn: Redis, filepath: str, filetype: str, totallines: int, speciesid: int, platformid: int, datasetid: int, databaseuri: str, redisuri: str, ttl_seconds: int) -> dict: "Build a data insertion job" jobid = str(uuid4()) command = [ sys.executable, "-m", "scripts.insert_data", filetype, filepath, speciesid, platformid, datasetid, databaseuri, redisuri ] return initialise_job( redis_conn, jobsnamespace(), jobid, command, "data-insertion", ttl_seconds, { "filename": os.path.basename(filepath), "filetype": filetype, "totallines": totallines }) def launch_job(the_job: dict, redisurl: str, error_dir): """Launch a job in the background""" if not os.path.exists(error_dir): os.mkdir(error_dir) jobid = the_job["jobid"] with open(error_filename(jobid, error_dir), "w", encoding="utf-8") as errorfile: subprocess.Popen( # pylint: disable=[consider-using-with] [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(), jobid], stderr=errorfile, env={"PYTHONPATH": ":".join(sys.path)}) return the_job def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]): "Retrieve the job" thejob = (rconn.hgetall(job_key(rprefix, jobid)) or raise_jobnotfound(rprefix, jobid)) return thejob def update_status( rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str): """Update status of job in redis.""" rconn.hset(name=job_key(rprefix, jobid), key="status", value=status) def update_stdout_stderr(rconn: Redis, rprefix: str, jobid: Union[str, UUID], bytes_read: bytes, stream: str): "Update the stdout/stderr keys according to the value of `stream`." thejob = job(rconn, rprefix, jobid) contents = thejob.get(stream, '') new_contents = contents + bytes_read.decode("utf-8") rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents) def job_errors( rconn: Redis, prefix: str, job_id: Union[str, uuid.UUID], count: int = 100 ) -> list: """Fetch job errors""" return take( ( json.loads(error) for key in rconn.keys(f"{prefix}:{str(job_id)}:*:errors:*") for error in rconn.lrange(key, 0, -1)), count) def job_files_metadata( rconn: Redis, prefix: str, job_id: Union[str, uuid.UUID] ) -> dict: """Get the metadata for specific job file.""" return { key.split(":")[-1]: { **rconn.hgetall(key), "filetype": key.split(":")[-3] } for key in rconn.keys(f"{prefix}:{str(job_id)}:*:metadata*") }