about summary refs log tree commit diff
path: root/gn_libs/jobs/migrations.py
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2025-03-24 16:22:32 -0500
committerFrederick Muriuki Muriithi2025-03-24 16:22:32 -0500
commitae5f0242cd937137cc2c90b6ecd44f63e1dd8c4b (patch)
treef03377c7a1744050f4211846db80a7303ecacf51 /gn_libs/jobs/migrations.py
parentfc6b98cb0aaaf3a8f589c21cbd38685060b3818a (diff)
downloadgn-libs-ae5f0242cd937137cc2c90b6ecd44f63e1dd8c4b.tar.gz
Initialise package for handling background/asynchronous jobs.
Diffstat (limited to 'gn_libs/jobs/migrations.py')
-rw-r--r--gn_libs/jobs/migrations.py68
1 files changed, 68 insertions, 0 deletions
diff --git a/gn_libs/jobs/migrations.py b/gn_libs/jobs/migrations.py
new file mode 100644
index 0000000..cc463e3
--- /dev/null
+++ b/gn_libs/jobs/migrations.py
@@ -0,0 +1,68 @@
+"""Database migrations for the jobs to ensure consistency downstream."""
+from gn_libs.protocols import DbCursor
+from gn_libs.sqlite3 import cursor, connection
+
+def __create_table_jobs__(cursor: DbCursor):
+    """Create the jobs table"""
+    cursor.execute(
+        """
+        CREATE TABLE IF NOT EXISTS jobs(
+          job_id TEXT PRIMARY KEY NOT NULL,
+          created TEXT NOT NULL,
+          expires TEXT,
+          command TEXT NOT NULL
+        ) WITHOUT ROWID
+        """)
+
+
+def __create_table_jobs_metadata__(cursor: DbCursor):
+    cursor.execute(
+        """
+        CREATE TABLE IF NOT EXISTS jobs_metadata(
+          job_id TEXT,
+          metadata_key TEXT NOT NULL,
+          metadata_value TEXT NOT NULL,
+          FOREIGN KEY(job_id) REFERENCES jobs(job_id)
+            ON UPDATE CASCADE ON DELETE RESTRICT,
+          PRIMARY KEY(job_id, metadata_key)
+        ) WITHOUT ROWID
+        """)
+    cursor.execute(
+        """
+        CREATE INDEX IF NOT EXISTS idx_tbl_jobs_metadata_cols_job_id
+        ON jobs_metadata(job_id)
+        """)
+
+
+def __create_table_jobs_output_streams__(cursor: DbCursor):
+    cursor.execute(
+        """
+        CREATE TABLE IF NOT EXISTS jobs_standard_outputs(
+          job_id TEXT PRIMARY KEY,
+          output_stream TEXT,
+          timestamp TEXT,
+          value TEXT,
+          FOREIGN KEY(job_id) REFERENCES jobs(job_id)
+            ON UPDATE CASCADE ON DELETE RESTRICT,
+          CHECK (output_stream IN ('stdout', 'stderr'))
+        ) WITHOUT ROWID
+        """)
+    cursor.execute(
+        """
+        CREATE INDEX IF NOT EXISTS idx_tbl_jobs_standard_outputs_cols_job_id
+        ON jobs_standard_outputs(job_id)
+        """)
+    cursor.execute(
+        """
+        CREATE INDEX IF NOT EXISTS
+        idx_tbl_jobs_standard_outputs_cols_job_id_output_stream
+        ON jobs_standard_outputs(job_id, output_stream)
+        """)
+
+
+def run_migrations(sqlite_url: str):
+    with (connection(sqlite_url) as conn,
+          cursor(conn) as curr):
+        __create_table_jobs__(curr)
+        __create_table_jobs_metadata__(curr)
+        __create_table_jobs_output_streams__(curr)