aboutsummaryrefslogtreecommitdiff
path: root/uploader/jobs.py
blob: 21889daa0615cabdb3f5300a6dfe8ee86f52de74 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""Handle jobs"""
import os
import sys
import shlex
import subprocess
from uuid import UUID, uuid4
from datetime import timedelta
from typing import Union, Optional

from redis import Redis
from flask import current_app as app

JOBS_PREFIX = "JOBS"

class JobNotFound(Exception):
    """Raised if we try to retrieve a non-existent job."""

def jobsnamespace():
    """
    Return the jobs namespace prefix. It depends on app configuration.

    Calling this function outside of an application context will cause an
    exception to be raised. It is mostly a convenience utility to use within the
    application.
    """
    return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}"

def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str:
    """Build the key by appending it to the namespace prefix."""
    return f"{namespaceprefix}:{jobid}"

def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]):
    """Utility to raise a `NoSuchJobError`"""
    raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.")

def error_filename(jobid, error_dir):
    "Compute the path of the file where errors will be dumped."
    return f"{error_dir}/job_{jobid}.error"

def initialise_job(# pylint: disable=[too-many-arguments]
        rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
        ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
    "Initialise a job 'object' and put in on redis"
    the_job = {
        "jobid": jobid, "command": shlex.join(command), "status": "pending",
        "percent": 0, "job-type": job_type, **(extra_meta or {})
    }
    rconn.hset(job_key(rprefix, jobid), mapping=the_job)
    rconn.expire(
        name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds))
    return the_job

def build_file_verification_job(#pylint: disable=[too-many-arguments]
        redis_conn: Redis,
        dburi: str,
        redisuri: str,
        speciesid: int,
        filepath: str,
        filetype: str,
        ttl_seconds: int):
    "Build a file verification job"
    jobid = str(uuid4())
    command = [
        sys.executable, "-m", "scripts.validate_file",
        dburi, redisuri, jobsnamespace(), jobid,
        "--redisexpiry", str(ttl_seconds),
        str(speciesid), filetype, filepath,
    ]
    return initialise_job(
        redis_conn, jobsnamespace(), jobid, command, "file-verification",
        ttl_seconds, {
            "filetype": filetype,
            "filename": os.path.basename(filepath), "percent": 0
        })

def data_insertion_job(# pylint: disable=[too-many-arguments]
        redis_conn: Redis, filepath: str, filetype: str, totallines: int,
        speciesid: int, platformid: int, datasetid: int, databaseuri: str,
        redisuri: str, ttl_seconds: int) -> dict:
    "Build a data insertion job"
    jobid = str(uuid4())
    command = [
        sys.executable, "-m", "scripts.insert_data", filetype, filepath,
        speciesid, platformid, datasetid, databaseuri, redisuri
    ]
    return initialise_job(
        redis_conn, jobsnamespace(), jobid, command, "data-insertion",
        ttl_seconds, {
            "filename": os.path.basename(filepath), "filetype": filetype,
            "totallines": totallines
        })

def launch_job(the_job: dict, redisurl: str, error_dir):
    """Launch a job in the background"""
    if not os.path.exists(error_dir):
        os.mkdir(error_dir)

    jobid = the_job["jobid"]
    with open(error_filename(jobid, error_dir),
              "w",
              encoding="utf-8") as errorfile:
        subprocess.Popen( # pylint: disable=[consider-using-with]
            [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
             jobid],
            stderr=errorfile,
            env={"PYTHONPATH": ":".join(sys.path)})

    return the_job

def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
    "Retrieve the job"
    thejob = (rconn.hgetall(job_key(rprefix, jobid)) or
              raise_jobnotfound(rprefix, jobid))
    return thejob

def update_status(
        rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str):
    """Update status of job in redis."""
    rconn.hset(name=job_key(rprefix, jobid), key="status", value=status)

def update_stdout_stderr(rconn: Redis,
                         rprefix: str,
                         jobid: Union[str, UUID],
                         bytes_read: bytes,
                         stream: str):
    "Update the stdout/stderr keys according to the value of `stream`."
    thejob = job(rconn, rprefix, jobid)
    contents = thejob.get(stream, '')
    new_contents = contents + bytes_read.decode("utf-8")
    rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)