aboutsummaryrefslogtreecommitdiff
"""Handle jobs"""
import os
import sys
import uuid
import json
import shlex
import subprocess
from uuid import UUID, uuid4
from datetime import timedelta
from typing import Union, Optional

from redis import Redis
from flask import current_app as app

from functional_tools import take

JOBS_PREFIX = "jobs"

class JobNotFound(Exception):
    """Raised if we try to retrieve a non-existent job."""

def jobsnamespace():
    """
    Return the jobs namespace prefix. It depends on app configuration.

    Calling this function outside of an application context will cause an
    exception to be raised. It is mostly a convenience utility to use within the
    application.
    """
    return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}"

def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str:
    """Build the key by appending it to the namespace prefix."""
    return f"{namespaceprefix}:{jobid}"

def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]):
    """Utility to raise a `NoSuchJobError`"""
    raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.")

def error_filename(jobid, error_dir):
    "Compute the path of the file where errors will be dumped."
    return f"{error_dir}/job_{jobid}.error"

def initialise_job(# pylint: disable=[too-many-arguments]
        rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
        ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
    "Initialise a job 'object' and put in on redis"
    the_job = {
        "jobid": jobid, "command": shlex.join(command), "status": "pending",
        "percent": 0, "job-type": job_type, **(extra_meta or {})
    }
    rconn.hset(job_key(rprefix, jobid), mapping=the_job)
    rconn.expire(
        name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds))
    return the_job

def build_file_verification_job(#pylint: disable=[too-many-arguments]
        redis_conn: Redis,
        dburi: str,
        redisuri: str,
        speciesid: int,
        filepath: str,
        filetype: str,
        ttl_seconds: int):
    "Build a file verification job"
    jobid = str(uuid4())
    command = [
        sys.executable, "-m", "scripts.validate_file",
        dburi, redisuri, jobsnamespace(), jobid,
        "--redisexpiry", str(ttl_seconds),
        str(speciesid), filetype, filepath,
    ]
    return initialise_job(
        redis_conn, jobsnamespace(), jobid, command, "file-verification",
        ttl_seconds, {
            "filetype": filetype,
            "filename": os.path.basename(filepath), "percent": 0
        })

def data_insertion_job(# pylint: disable=[too-many-arguments]
        redis_conn: Redis, filepath: str, filetype: str, totallines: int,
        speciesid: int, platformid: int, datasetid: int, databaseuri: str,
        redisuri: str, ttl_seconds: int) -> dict:
    "Build a data insertion job"
    jobid = str(uuid4())
    command = [
        sys.executable, "-m", "scripts.insert_data", filetype, filepath,
        speciesid, platformid, datasetid, databaseuri, redisuri
    ]
    return initialise_job(
        redis_conn, jobsnamespace(), jobid, command, "data-insertion",
        ttl_seconds, {
            "filename": os.path.basename(filepath), "filetype": filetype,
            "totallines": totallines
        })

def launch_job(the_job: dict, redisurl: str, error_dir):
    """Launch a job in the background"""
    if not os.path.exists(error_dir):
        os.mkdir(error_dir)

    jobid = the_job["jobid"]
    with open(error_filename(jobid, error_dir),
              "w",
              encoding="utf-8") as errorfile:
        subprocess.Popen( # pylint: disable=[consider-using-with]
            [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
             jobid],
            stderr=errorfile,
            env={"PYTHONPATH": ":".join(sys.path)})

    return the_job

def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
    "Retrieve the job"
    thejob = (rconn.hgetall(job_key(rprefix, jobid)) or
              raise_jobnotfound(rprefix, jobid))
    return thejob

def update_status(
        rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str):
    """Update status of job in redis."""
    rconn.hset(name=job_key(rprefix, jobid), key="status", value=status)

def update_stdout_stderr(rconn: Redis,
                         rprefix: str,
                         jobid: Union[str, UUID],
                         bytes_read: bytes,
                         stream: str):
    "Update the stdout/stderr keys according to the value of `stream`."
    thejob = job(rconn, rprefix, jobid)
    contents = thejob.get(stream, '')
    new_contents = contents + bytes_read.decode("utf-8")
    rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)


def job_errors(
        rconn: Redis,
        prefix: str,
        job_id: Union[str, uuid.UUID],
        count: int = 100
) -> list:
    """Fetch job errors"""
    return take(
        (
            json.loads(error)
            for key in rconn.keys(f"{prefix}:{str(job_id)}:*:errors:*")
            for error in rconn.lrange(key, 0, -1)),
        count)


def job_files_metadata(
        rconn: Redis,
        prefix: str,
        job_id: Union[str, uuid.UUID]
) -> dict:
    """Get the metadata for specific job file."""
    return {
        key.split(":")[-1]: {
            **rconn.hgetall(key),
            "filetype": key.split(":")[-3]
        }
        for key in rconn.keys(f"{prefix}:{str(job_id)}:*:metadata*")
    }