1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
"""Handle jobs"""
import os
import sys
import uuid
import json
import shlex
import subprocess
from uuid import UUID, uuid4
from datetime import timedelta
from typing import Union, Optional
from redis import Redis
from functional_tools import take
from flask import current_app as app
JOBS_PREFIX = "jobs"
class JobNotFound(Exception):
"""Raised if we try to retrieve a non-existent job."""
def jobsnamespace():
"""
Return the jobs namespace prefix. It depends on app configuration.
Calling this function outside of an application context will cause an
exception to be raised. It is mostly a convenience utility to use within the
application.
"""
return f"{app.config['GNQC_REDIS_PREFIX']}:{JOBS_PREFIX}"
def job_key(namespaceprefix: str, jobid: Union[str, UUID]) -> str:
"""Build the key by appending it to the namespace prefix."""
return f"{namespaceprefix}:{jobid}"
def raise_jobnotfound(rprefix:str, jobid: Union[str,UUID]):
"""Utility to raise a `NoSuchJobError`"""
raise JobNotFound(f"Could not retrieve job '{jobid}' from '{rprefix}.")
def error_filename(jobid, error_dir):
"Compute the path of the file where errors will be dumped."
return f"{error_dir}/job_{jobid}.error"
def initialise_job(# pylint: disable=[too-many-arguments]
rconn: Redis, rprefix: str, jobid: str, command: list, job_type: str,
ttl_seconds: int = 86400, extra_meta: Optional[dict] = None) -> dict:
"Initialise a job 'object' and put in on redis"
the_job = {
"jobid": jobid, "command": shlex.join(command), "status": "pending",
"percent": 0, "job-type": job_type, **(extra_meta or {})
}
rconn.hset(job_key(rprefix, jobid), mapping=the_job)
rconn.expire(
name=job_key(rprefix, jobid), time=timedelta(seconds=ttl_seconds))
return the_job
def build_file_verification_job(#pylint: disable=[too-many-arguments]
redis_conn: Redis,
dburi: str,
redisuri: str,
speciesid: int,
filepath: str,
filetype: str,
ttl_seconds: int):
"Build a file verification job"
jobid = str(uuid4())
command = [
sys.executable, "-m", "scripts.validate_file",
dburi, redisuri, jobsnamespace(), jobid,
"--redisexpiry", str(ttl_seconds),
str(speciesid), filetype, filepath,
]
return initialise_job(
redis_conn, jobsnamespace(), jobid, command, "file-verification",
ttl_seconds, {
"filetype": filetype,
"filename": os.path.basename(filepath), "percent": 0
})
def data_insertion_job(# pylint: disable=[too-many-arguments]
redis_conn: Redis, filepath: str, filetype: str, totallines: int,
speciesid: int, platformid: int, datasetid: int, databaseuri: str,
redisuri: str, ttl_seconds: int) -> dict:
"Build a data insertion job"
jobid = str(uuid4())
command = [
sys.executable, "-m", "scripts.insert_data", filetype, filepath,
speciesid, platformid, datasetid, databaseuri, redisuri
]
return initialise_job(
redis_conn, jobsnamespace(), jobid, command, "data-insertion",
ttl_seconds, {
"filename": os.path.basename(filepath), "filetype": filetype,
"totallines": totallines
})
def launch_job(the_job: dict, redisurl: str, error_dir):
"""Launch a job in the background"""
if not os.path.exists(error_dir):
os.mkdir(error_dir)
jobid = the_job["jobid"]
with open(error_filename(jobid, error_dir),
"w",
encoding="utf-8") as errorfile:
subprocess.Popen( # pylint: disable=[consider-using-with]
[sys.executable, "-m", "scripts.worker", redisurl, jobsnamespace(),
jobid],
stderr=errorfile,
env={"PYTHONPATH": ":".join(sys.path)})
return the_job
def job(rconn: Redis, rprefix: str, jobid: Union[str,UUID]):
"Retrieve the job"
thejob = (rconn.hgetall(job_key(rprefix, jobid)) or
raise_jobnotfound(rprefix, jobid))
return thejob
def update_status(
rconn: Redis, rprefix: str, jobid: Union[str, UUID], status: str):
"""Update status of job in redis."""
rconn.hset(name=job_key(rprefix, jobid), key="status", value=status)
def update_stdout_stderr(rconn: Redis,
rprefix: str,
jobid: Union[str, UUID],
bytes_read: bytes,
stream: str):
"Update the stdout/stderr keys according to the value of `stream`."
thejob = job(rconn, rprefix, jobid)
contents = thejob.get(stream, '')
new_contents = contents + bytes_read.decode("utf-8")
rconn.hset(name=job_key(rprefix, jobid), key=stream, value=new_contents)
def job_errors(
rconn: Redis,
prefix: str,
job_id: Union[str, uuid.UUID],
count: int = 100
) -> tuple[dict, ...]:
"""Fetch job errors"""
return take(
(
json.loads(error)
for key in rconn.keys(f"{prefix}:{str(job_id)}:*:errors:*")
for error in rconn.lrange(key, 0, -1)),
count)
|