diff options
| author | Frederick Muriuki Muriithi | 2026-01-08 11:42:52 -0600 |
|---|---|---|
| committer | Frederick Muriuki Muriithi | 2026-01-08 11:42:52 -0600 |
| commit | b1f8f05a451e161c769f33e973e629d6266cfc64 (patch) | |
| tree | bd612119cbe7d28dcb1c9aa62519f4a7ad69f102 | |
| parent | 7b12d6f0180fb8e807f2854689abf5fa682e5a1e (diff) | |
| download | gn-libs-b1f8f05a451e161c769f33e973e629d6266cfc64.tar.gz | |
Enable job deletion.
Provide functions to enable deletion of jobs from the database.
| -rw-r--r-- | gn_libs/jobs/__init__.py | 5 | ||||
| -rw-r--r-- | gn_libs/jobs/jobs.py | 33 |
2 files changed, 37 insertions, 1 deletions
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py index 4960349..2dcfaca 100644 --- a/gn_libs/jobs/__init__.py +++ b/gn_libs/jobs/__init__.py @@ -2,10 +2,13 @@ from .migrations import run_migrations from .jobs import (job, launch_job, + delete_job, + delete_jobs, initialise_job, push_to_stream, update_metadata, - jobs_by_external_id) + jobs_by_external_id, + delete_expired_jobs) def init_app(flask_app): """Initialise the migrations for flask""" diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py index cee6e7e..6bf6b39 100644 --- a/gn_libs/jobs/jobs.py +++ b/gn_libs/jobs/jobs.py @@ -254,3 +254,36 @@ def push_to_stream( "stream": stream_name, "content": new_content }) + + +def delete_jobs( + conn: DbConnection, job_ids: tuple[Union[uuid.UUID, str], ...]) -> None: + """Delete the given jobs.""" + with _cursor(conn) as cursor: + _paramstr = ", ".join(["?"] * len(job_ids)) + _params = tuple(str(job_id) for job_id in job_ids) + cursor.execute( + f"DELETE FROM jobs_standard_outputs WHERE job_id IN ({_paramstr})", + _params) + cursor.execute( + f"DELETE FROM jobs_metadata WHERE job_id IN ({_paramstr})", + _params) + cursor.execute( + f"DELETE FROM jobs_external_ids WHERE job_id IN ({_paramstr})", + _params) + cursor.execute(f"DELETE FROM jobs WHERE job_id IN ({_paramstr})", + _params) + + +def delete_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None: + """Delete a specific job.""" + return delete_jobs(conn, (job_id,)) + + +def delete_expired_jobs(conn: DbConnection) -> None: + """Delete all jobs that are expired.""" + with _cursor(conn) as cursor: + cursor.execute( + "SELECT job_id FROM jobs WHERE datetime(expires) <= datetime()") + return delete_jobs( + conn, tuple(row["job_id"] for row in cursor.fetchall())) |
