aboutsummaryrefslogtreecommitdiff
path: root/uploader/jobs.py
blob: e86ee056f5886cf9e6816a7887a20c5e51c03fc5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""Handle jobs"""
import os
import sys
import uuid
import json
import shlex
import subprocess
from uuid import UUID, uuid4
from datetime import timedelta
from typing import Union, Optional

from redis import Redis
from flask import current_app as app

from functional_tools import take

JOBS_PREFIX = "jobs"

class JobNotFound(Exception):
    """Raised if we try to retrieve a non-existent job."""

def jobsnamespace():
    """
    Return the jobs namespace prefix. It depends on app configuration.

    Calling this function outside of an application context will cause an
    exception to be raised. It is mostly a convenience utility to use within the
    application.
    """
    return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}"

def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str:
    """Build the key by appending it to the namespace prefix."""
    return f"{namespaceprefix}:{jobid}"

def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]):
    """Utility to raise a `NoSuchJobError`"""
    raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.")

def error_filename(jobid, error_dir):
    "Compute the path of the file where errors will be dumped."
    return f"{error_dir}/job_{jobid}.error"

def initialise_job(# pylint: disable=[too-many-arguments]
        rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
        ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
    "Initialise a job 'object' and put in on redis"
    the_job = {
        "jobid": jobid, "command": shlex.join(command), "status": "pending",
        "percent": 0, "job-type": job_type, **(extra_meta or {})
    }
    rconn.hset(job_key(rprefix, jobid), mapping=the_job)
    rconn.expire(
        name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds))
    return the_job

def build_file_verification_job(#pylint: disable=[too-many-arguments]
        redis_conn: Redis,
        dburi: str,
        redisuri: str,
        speciesid: int,
        filepath: str,
        filetype: str,
        ttl_seconds: int):
    "Build a file verification job"
    jobid = str(uuid4())
    command = [
        sys.executable, "-m", "scripts.validate_file",
        dburi, redisuri, jobsnamespace(), jobid,
        "--redisexpiry", str(ttl_seconds),
        str(speciesid), filetype, filepath,
    ]
    return initialise_job(
        redis_conn, jobsnamespace(), jobid, command, "file-verification",
        ttl_seconds, {
            "filetype": filetype,
            "filename": os.path.basename(filepath), "percent": 0
        })

def data_insertion_job(# pylint: disable=[too-many-arguments]
        redis_conn: Redis, filepath: str, filetype: str, totallines: int,
        speciesid: int, platformid: int, datasetid: int, databaseuri: str,
        redisuri: str, ttl_seconds: int) -> dict:
    "Build a data insertion job"
    jobid = str(uuid4())
    command = [
        sys.executable, "-m", "scripts.insert_data", filetype, filepath,
        speciesid, platformid, datasetid, databaseuri, redisuri
    ]
    return initialise_job(
        redis_conn, jobsnamespace(), jobid, command, "data-insertion",
        ttl_seconds, {
            "filename": os.path.basename(filepath), "filetype": filetype,
            "totallines": totallines
        })

def launch_job(the_job: dict, redisurl: str, error_dir):
    """Launch a job in the background"""
    if not os.path.exists(error_dir):
        os.mkdir(error_dir)

    jobid = the_job["jobid"]
    with open(error_filename(jobid, error_dir),
              "w",
              encoding="utf-8") as errorfile:
        subprocess.Popen( # pylint: disable=[consider-using-with]
            [sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
             jobid],
            stderr=errorfile,
            env={"PYTHONPATH": ":".join(sys.path)})

    return the_job

def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
    "Retrieve the job"
    thejob = (rconn.hgetall(job_key(rprefix, jobid)) or
              raise_jobnotfound(rprefix, jobid))
    return thejob

def update_status(
        rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str):
    """Update status of job in redis."""
    rconn.hset(name=job_key(rprefix, jobid), key="status", value=status)

def update_stdout_stderr(rconn: Redis,
                         rprefix: str,
                         jobid: Union[str, UUID],
                         bytes_read: bytes,
                         stream: str):
    "Update the stdout/stderr keys according to the value of `stream`."
    thejob = job(rconn, rprefix, jobid)
    contents = thejob.get(stream, '')
    new_contents = contents + bytes_read.decode("utf-8")
    rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)


def job_errors(
        rconn: Redis,
        prefix: str,
        job_id: Union[str, uuid.UUID],
        count: int = 100
) -> list:
    """Fetch job errors"""
    return take(
        (
            json.loads(error)
            for key in rconn.keys(f"{prefix}:{str(job_id)}:*:errors:*")
            for error in rconn.lrange(key, 0, -1)),
        count)


def job_files_metadata(
        rconn: Redis,
        prefix: str,
        job_id: Union[str, uuid.UUID]
) -> dict:
    """Get the metadata for specific job file."""
    return {
        key.split(":")[-1]: {
            **rconn.hgetall(key),
            "filetype": key.split(":")[-3]
        }
        for key in rconn.keys(f"{prefix}:{str(job_id)}:*:metadata*")
    }