"""Handle jobs""" import os import sys import shlex import subprocess from uuid import uuid4 from datetime import timedelta from redis import Redis def error_filename(job_id, error_dir): "Compute the path of the file where errors will be dumped." return f"{error_dir}/job_{job_id}.error" def __init_job__(# pylint: disable=[too-many-arguments] redis_conn: Redis, job_id: str, command: list, job_type: str, ttl_seconds: int, extra_meta: dict) -> dict: "Initialise a job 'object' and put in on redis" the_job = { "job_id": job_id, "command": shlex.join(command), "status": "pending", "percent": 0, "job-type": job_type, **extra_meta } redis_conn.hset(name=the_job["job_id"], mapping=the_job) redis_conn.expire(name=the_job["job_id"], time=timedelta(seconds=ttl_seconds)) return the_job def build_file_verification_job( redis_conn: Redis, filepath: str, filetype: str, redisurl: str, ttl_seconds: int): "Build a file verification job" job_id = str(uuid4()) command = [ sys.executable, "-m", "scripts.validate_file", filetype, filepath, redisurl, job_id ] return __init_job__( redis_conn, job_id, command, "file-verification", ttl_seconds, { "filetype": filetype, "filename": os.path.basename(filepath), "percent": 0 }) def data_insertion_job(# pylint: disable=[too-many-arguments] redis_conn: Redis, filepath: str, filetype: str, totallines: int, speciesid: int, platformid: int, datasetid: int, databaseuri: str, redisuri: str, ttl_seconds: int) -> dict: "Build a data insertion job" command = [ sys.executable, "-m", "scripts.insert_data", filetype, filepath, speciesid, platformid, datasetid, databaseuri, redisuri ] return __init_job__( redis_conn, str(uuid4()), command, "data-insertion", ttl_seconds, { "filename": os.path.basename(filepath), "filetype": filetype, "totallines": totallines }) def launch_job(the_job: dict, redisurl: str, error_dir): """Launch a job in the background""" if not os.path.exists(error_dir): os.mkdir(error_dir) job_id = the_job["job_id"] with open(error_filename(job_id, error_dir), "w", encoding="utf-8") as errorfile: subprocess.Popen( # pylint: disable=[consider-using-with] [sys.executable, "-m", "scripts.worker", redisurl, job_id], stderr=errorfile, env={"PYTHONPATH": ":".join(sys.path)}) return the_job def job(redis_conn, job_id: str): "Retrieve the job" return redis_conn.hgetall(job_id)