about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2026-01-07 15:04:53 -0600
committerFrederick Muriuki Muriithi2026-01-07 15:05:10 -0600
commit63ad52c70bf32ca46ccd4ca6010e2aaa434c0344 (patch)
treeb60cba4feb1dfd589ad4cdd8e0ef1934fbcb318d
parent4818780622db79725ade4cad930487d75654ae51 (diff)
downloadgn-libs-63ad52c70bf32ca46ccd4ca6010e2aaa434c0344.tar.gz
Link a jobs to external IDs.
-rw-r--r--gn_libs/jobs/jobs.py22
-rw-r--r--gn_libs/jobs/migrations.py24
2 files changed, 42 insertions, 4 deletions
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
index ec1c3a8..8453697 100644
--- a/gn_libs/jobs/jobs.py
+++ b/gn_libs/jobs/jobs.py
@@ -60,7 +60,12 @@ def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = F
     return _job
 
 
-def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict:
+def __save_job__(
+        conn: DbConnection,
+        the_job: dict,
+        expiry_seconds: int,
+        external_id: str = ""
+) -> dict:
     """Save the job to database."""
 
     with _cursor(conn) as cursor:
@@ -75,6 +80,11 @@ def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict
                            "expires": (expires and expires.isoformat()),
                            "command": the_job["command"]
                        })
+        if bool(external_id.strip()):
+            cursor.execute(
+                "INSERT INTO jobs_external_ids(job_id, external_id) "
+                "VALUES(:job_id, :external_id)",
+                {"job_id": job_id, "external_id": external_id.strip()})
         metadata = tuple({"job_id": job_id, "key": key, "value": value}
                          for key,value in the_job["metadata"].items())
         if len(metadata) > 0:
@@ -92,7 +102,8 @@ def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-ar
         command: list,
         job_type: str,
         extra_meta: Optional[dict] = None,
-        expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_
+        expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_,
+        external_id: str = ""
 ) -> dict:
     """Initialise the job and put the details in a SQLite3 database."""
     if extra_meta is None:
@@ -106,10 +117,13 @@ def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-ar
             "status": "pending",
             "percent": 0,
             "job-type": job_type,
-            **extra_meta
+            **extra_meta,
+            **({"external_id": external_id.strip()}
+               if bool(external_id.strip())
+               else {})
         }
     }
-    return __save_job__(conn, _job, expiry_seconds)
+    return __save_job__(conn, _job, expiry_seconds, external_id)
 
 
 def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path:
diff --git a/gn_libs/jobs/migrations.py b/gn_libs/jobs/migrations.py
index 0c9825b..2af16ae 100644
--- a/gn_libs/jobs/migrations.py
+++ b/gn_libs/jobs/migrations.py
@@ -60,6 +60,29 @@ def __create_table_jobs_output_streams__(cursor: DbCursor):
         """)
 
 
+def __create_table_jobs_external_ids__(cursor: DbCursor):
+    """Create the jobs_external_ids table.
+
+    The purpose of this table is to allow external systems to link background
+    jobs to specific users/events that triggered them. What the external IDs are
+    is irrelevant to the background jobs system here, and should not affect how
+    the system works."""
+    cursor.execute(
+        """
+        CREATE TABLE IF NOT EXISTS jobs_external_ids(
+          job_id TEXT PRIMARY KEY NOT NULL,
+          external_id TEXT NOT NULL,
+          FOREIGN KEY(job_id) REFERENCES jobs(job_id)
+            ON UPDATE CASCADE ON DELETE RESTRICT
+        ) WITHOUT ROWID
+        """)
+    cursor.execute(
+        """
+        CREATE INDEX IF NOT EXISTS idx_tbl_jobs_external_ids_cols_external_id
+        ON jobs_external_ids(external_id)
+        """)
+
+
 def run_migrations(sqlite_url: str):
     """Run the migrations to setup the background jobs database."""
     with (connection(sqlite_url) as conn,
@@ -67,3 +90,4 @@ def run_migrations(sqlite_url: str):
         __create_table_jobs__(curr)
         __create_table_jobs_metadata__(curr)
         __create_table_jobs_output_streams__(curr)
+        __create_table_jobs_external_ids__(curr)