From 63ad52c70bf32ca46ccd4ca6010e2aaa434c0344 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Wed, 7 Jan 2026 15:04:53 -0600 Subject: Link a jobs to external IDs. --- gn_libs/jobs/jobs.py | 22 ++++++++++++++++++---- gn_libs/jobs/migrations.py | 24 ++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py index ec1c3a8..8453697 100644 --- a/gn_libs/jobs/jobs.py +++ b/gn_libs/jobs/jobs.py @@ -60,7 +60,12 @@ def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = F return _job -def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict: +def __save_job__( + conn: DbConnection, + the_job: dict, + expiry_seconds: int, + external_id: str = "" +) -> dict: """Save the job to database.""" with _cursor(conn) as cursor: @@ -75,6 +80,11 @@ def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict "expires": (expires and expires.isoformat()), "command": the_job["command"] }) + if bool(external_id.strip()): + cursor.execute( + "INSERT INTO jobs_external_ids(job_id, external_id) " + "VALUES(:job_id, :external_id)", + {"job_id": job_id, "external_id": external_id.strip()}) metadata = tuple({"job_id": job_id, "key": key, "value": value} for key,value in the_job["metadata"].items()) if len(metadata) > 0: @@ -92,7 +102,8 @@ def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-ar command: list, job_type: str, extra_meta: Optional[dict] = None, - expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_ + expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_, + external_id: str = "" ) -> dict: """Initialise the job and put the details in a SQLite3 database.""" if extra_meta is None: @@ -106,10 +117,13 @@ def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-ar "status": "pending", "percent": 0, "job-type": job_type, - **extra_meta + **extra_meta, + **({"external_id": external_id.strip()} + if bool(external_id.strip()) + else {}) } } - return __save_job__(conn, _job, expiry_seconds) + return __save_job__(conn, _job, expiry_seconds, external_id) def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path: diff --git a/gn_libs/jobs/migrations.py b/gn_libs/jobs/migrations.py index 0c9825b..2af16ae 100644 --- a/gn_libs/jobs/migrations.py +++ b/gn_libs/jobs/migrations.py @@ -60,6 +60,29 @@ def __create_table_jobs_output_streams__(cursor: DbCursor): """) +def __create_table_jobs_external_ids__(cursor: DbCursor): + """Create the jobs_external_ids table. + + The purpose of this table is to allow external systems to link background + jobs to specific users/events that triggered them. What the external IDs are + is irrelevant to the background jobs system here, and should not affect how + the system works.""" + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS jobs_external_ids( + job_id TEXT PRIMARY KEY NOT NULL, + external_id TEXT NOT NULL, + FOREIGN KEY(job_id) REFERENCES jobs(job_id) + ON UPDATE CASCADE ON DELETE RESTRICT + ) WITHOUT ROWID + """) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS idx_tbl_jobs_external_ids_cols_external_id + ON jobs_external_ids(external_id) + """) + + def run_migrations(sqlite_url: str): """Run the migrations to setup the background jobs database.""" with (connection(sqlite_url) as conn, @@ -67,3 +90,4 @@ def run_migrations(sqlite_url: str): __create_table_jobs__(curr) __create_table_jobs_metadata__(curr) __create_table_jobs_output_streams__(curr) + __create_table_jobs_external_ids__(curr) -- cgit 1.4.1