"""Database migrations for the jobs to ensure consistency downstream.""" from gn_libs.protocols import DbCursor from gn_libs.sqlite3 import cursor, connection def __create_table_jobs__(cursor: DbCursor): """Create the jobs table""" cursor.execute( """ CREATE TABLE IF NOT EXISTS jobs( job_id TEXT PRIMARY KEY NOT NULL, created TEXT NOT NULL, expires TEXT, command TEXT NOT NULL ) WITHOUT ROWID """) def __create_table_jobs_metadata__(cursor: DbCursor): cursor.execute( """ CREATE TABLE IF NOT EXISTS jobs_metadata( job_id TEXT, metadata_key TEXT NOT NULL, metadata_value TEXT NOT NULL, FOREIGN KEY(job_id) REFERENCES jobs(job_id) ON UPDATE CASCADE ON DELETE RESTRICT, PRIMARY KEY(job_id, metadata_key) ) WITHOUT ROWID """) cursor.execute( """ CREATE INDEX IF NOT EXISTS idx_tbl_jobs_metadata_cols_job_id ON jobs_metadata(job_id) """) def __create_table_jobs_output_streams__(cursor: DbCursor): cursor.execute( """ CREATE TABLE IF NOT EXISTS jobs_standard_outputs( job_id TEXT NOT NULL, output_stream TEXT, value TEXT, FOREIGN KEY(job_id) REFERENCES jobs(job_id) ON UPDATE CASCADE ON DELETE RESTRICT, CHECK (output_stream IN ('stdout', 'stderr')), PRIMARY KEY(job_id, output_stream) ) WITHOUT ROWID """) cursor.execute( """ CREATE INDEX IF NOT EXISTS idx_tbl_jobs_standard_outputs_cols_job_id ON jobs_standard_outputs(job_id) """) cursor.execute( """ CREATE INDEX IF NOT EXISTS idx_tbl_jobs_standard_outputs_cols_job_id_output_stream ON jobs_standard_outputs(job_id, output_stream) """) def run_migrations(sqlite_url: str): with (connection(sqlite_url) as conn, cursor(conn) as curr): __create_table_jobs__(curr) __create_table_jobs_metadata__(curr) __create_table_jobs_output_streams__(curr)