1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
"""Handle asynchronous/background jobs. Job data is stored in SQLite database(s)."""
import os
import sys
import uuid
import shlex
import logging
import subprocess
from pathlib import Path
from functools import reduce
from functools import partial
from typing import Union, Optional
from datetime import datetime, timezone, timedelta
from gn_libs.sqlite3 import DbCursor, DbConnection, cursor as _cursor
_logger_ = logging.getLogger(__name__)
_DEFAULT_EXPIRY_SECONDS_ = 2 * 24 * 60 * 60 # 2 days, in seconds
class JobNotFound(Exception):
"""Raised if we try to retrieve a non-existent job."""
def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict:
"""Fetch extra job metadata."""
cursor.execute("SELECT * FROM jobs_metadata WHERE job_id=?", (str(job_id),))
return {
row["metadata_key"]: row["metadata_value"]
for row in cursor.fetchall()
}
def job_stdstream_outputs(conn, job_id, streamname: str):
"""Fetch the standard-error output for the job."""
with _cursor(conn) as cursor:
cursor.execute(
"SELECT * FROM jobs_standard_outputs "
"WHERE job_id=? AND output_stream=?",
(str(job_id), streamname))
return dict(cursor.fetchone() or {}).get("value")
job_stderr = partial(job_stdstream_outputs, streamname="stderr")
job_stdout = partial(job_stdstream_outputs, streamname="stderr")
def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict:
"""Fetch the job details for a job with a particular ID"""
with _cursor(conn) as cursor:
cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),))
_job = dict(cursor.fetchone())
if not bool(_job):
raise JobNotFound(f"Could not find job with ID {job_id}")
_job["metadata"] = __job_metadata__(cursor, job_id)
if fulldetails:
_job["stderr"] = job_stderr(conn, job_id)
_job["stdout"] = job_stdout(conn, job_id)
return _job
def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict:
"""Save the job to database."""
with _cursor(conn) as cursor:
job_id = str(the_job["job_id"])
expires = ((the_job["created"] + timedelta(seconds=expiry_seconds))
if expiry_seconds > 0 else None)
cursor.execute("INSERT INTO jobs(job_id, created, expires, command) "
"VALUES(:job_id, :created, :expires, :command)",
{
"job_id": job_id,
"created": the_job["created"].isoformat(),
"expires": (expires and expires.isoformat()),
"command": the_job["command"]
})
metadata = tuple({"job_id": job_id, "key": key, "value": value}
for key,value in the_job["metadata"].items())
if len(metadata) > 0:
cursor.executemany(
"INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) "
"VALUES (:job_id, :key, :value)",
metadata)
return the_job
def initialise_job(
conn: DbConnection,
job_id: uuid.UUID,
command: list,
job_type: str,
extra_meta: dict = {},
expiry_seconds: Optional[int] = _DEFAULT_EXPIRY_SECONDS_
) -> dict:
"""Initialise the job and put the details in a SQLite3 database."""
_job = {
"job_id": job_id,
"command": shlex.join(command),
"created": datetime.now(timezone.utc),
"metadata": {
"status": "pending",
"percent": 0,
"job-type": job_type,
**extra_meta
}
}
return __save_job__(conn, _job, expiry_seconds)
def error_filename(jobid, error_dir):
"Compute the path of the file where errors will be dumped."
return f"{error_dir}/job_{jobid}.error"
def build_environment(extras: dict[str, str] = {}):
return {
**dict(os.environ),
"PYTHONPATH": ":".join(sys.path),
**extras
}
def launch_job(
the_job: dict,
sqlite3_url: str,
error_dir: Path,
worker_manager: str = "gn_libs.jobs.launcher"
) -> dict:
"""Launch a job in the background"""
if not os.path.exists(error_dir):
os.mkdir(error_dir)
job_id = str(the_job["job_id"])
with open(error_filename(job_id, error_dir),
"w",
encoding="utf-8") as errorfile:
subprocess.Popen( # pylint: disable=[consider-using-with]
[
sys.executable, "-u",
"-m", worker_manager,
sqlite3_url,
job_id,
str(error_dir)],
stderr=errorfile,
env=build_environment())
return the_job
def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str, value: str):
"""Update the value of a metadata item."""
with _cursor(conn) as cursor:
cursor.execute(
"INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) "
"VALUES (:job_id, :key, :value) "
"ON CONFLICT (job_id, metadata_key) DO UPDATE "
"SET metadata_value=:value "
"WHERE job_id=:job_id AND metadata_key=:key",
{
"job_id": str(job_id),
"key": key,
"value": value
})
def push_to_stream(conn: DbConnection, job_id: Union[str, uuid.UUID], stream_name: str, content: str):
"""Initialise, and keep adding content to the stream from the provided content."""
with _cursor(conn) as cursor:
cursor.execute("SELECT * FROM jobs_standard_outputs "
"WHERE job_id=:job_id AND output_stream=:stream",
{
"job_id": str(job_id),
"stream": stream_name
})
result = cursor.fetchone()
new_content = ((bool(result) and result["value"]) or "") + content
cursor.execute(
"INSERT INTO jobs_standard_outputs(job_id, output_stream, value) "
"VALUES(:job_id, :stream, :content) "
"ON CONFLICT (job_id, output_stream) DO UPDATE "
"SET value=:content "
"WHERE job_id=:job_id AND output_stream=:stream",
{
"job_id": str(job_id),
"stream": stream_name,
"content": new_content
})
|