1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
"""Database migrations for the jobs to ensure consistency downstream."""
from gn_libs.protocols import DbCursor
from gn_libs.sqlite3 import connection, cursor as acquire_cursor
def __create_table_jobs__(cursor: DbCursor):
"""Create the jobs table"""
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS jobs(
job_id TEXT PRIMARY KEY NOT NULL,
created TEXT NOT NULL,
expires TEXT,
command TEXT NOT NULL
) WITHOUT ROWID
""")
def __create_table_jobs_metadata__(cursor: DbCursor):
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS jobs_metadata(
job_id TEXT,
metadata_key TEXT NOT NULL,
metadata_value TEXT NOT NULL,
FOREIGN KEY(job_id) REFERENCES jobs(job_id)
ON UPDATE CASCADE ON DELETE RESTRICT,
PRIMARY KEY(job_id, metadata_key)
) WITHOUT ROWID
""")
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS idx_tbl_jobs_metadata_cols_job_id
ON jobs_metadata(job_id)
""")
def __create_table_jobs_output_streams__(cursor: DbCursor):
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS jobs_standard_outputs(
job_id TEXT NOT NULL,
output_stream TEXT,
value TEXT,
FOREIGN KEY(job_id) REFERENCES jobs(job_id)
ON UPDATE CASCADE ON DELETE RESTRICT,
CHECK (output_stream IN ('stdout', 'stderr')),
PRIMARY KEY(job_id, output_stream)
) WITHOUT ROWID
""")
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS idx_tbl_jobs_standard_outputs_cols_job_id
ON jobs_standard_outputs(job_id)
""")
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS
idx_tbl_jobs_standard_outputs_cols_job_id_output_stream
ON jobs_standard_outputs(job_id, output_stream)
""")
def __create_table_jobs_external_ids__(cursor: DbCursor):
"""Create the jobs_external_ids table.
The purpose of this table is to allow external systems to link background
jobs to specific users/events that triggered them. What the external IDs are
is irrelevant to the background jobs system here, and should not affect how
the system works."""
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS jobs_external_ids(
job_id TEXT PRIMARY KEY NOT NULL,
external_id TEXT NOT NULL,
FOREIGN KEY(job_id) REFERENCES jobs(job_id)
ON UPDATE CASCADE ON DELETE RESTRICT
) WITHOUT ROWID
""")
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS idx_tbl_jobs_external_ids_cols_external_id
ON jobs_external_ids(external_id)
""")
def run_migrations(sqlite_url: str):
"""Run the migrations to setup the background jobs database."""
with (connection(sqlite_url) as conn,
acquire_cursor(conn) as curr):
__create_table_jobs__(curr)
__create_table_jobs_metadata__(curr)
__create_table_jobs_output_streams__(curr)
__create_table_jobs_external_ids__(curr)
|