diff options
| -rw-r--r-- | gn_libs/jobs/__init__.py | 5 | ||||
| -rw-r--r-- | gn_libs/jobs/jobs.py | 33 |
2 files changed, 37 insertions, 1 deletions
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py index 4960349..2dcfaca 100644 --- a/gn_libs/jobs/__init__.py +++ b/gn_libs/jobs/__init__.py @@ -2,10 +2,13 @@ from .migrations import run_migrations from .jobs import (job, launch_job, + delete_job, + delete_jobs, initialise_job, push_to_stream, update_metadata, - jobs_by_external_id) + jobs_by_external_id, + delete_expired_jobs) def init_app(flask_app): """Initialise the migrations for flask""" diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py index cee6e7e..6bf6b39 100644 --- a/gn_libs/jobs/jobs.py +++ b/gn_libs/jobs/jobs.py @@ -254,3 +254,36 @@ def push_to_stream( "stream": stream_name, "content": new_content }) + + +def delete_jobs( + conn: DbConnection, job_ids: tuple[Union[uuid.UUID, str], ...]) -> None: + """Delete the given jobs.""" + with _cursor(conn) as cursor: + _paramstr = ", ".join(["?"] * len(job_ids)) + _params = tuple(str(job_id) for job_id in job_ids) + cursor.execute( + f"DELETE FROM jobs_standard_outputs WHERE job_id IN ({_paramstr})", + _params) + cursor.execute( + f"DELETE FROM jobs_metadata WHERE job_id IN ({_paramstr})", + _params) + cursor.execute( + f"DELETE FROM jobs_external_ids WHERE job_id IN ({_paramstr})", + _params) + cursor.execute(f"DELETE FROM jobs WHERE job_id IN ({_paramstr})", + _params) + + +def delete_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None: + """Delete a specific job.""" + return delete_jobs(conn, (job_id,)) + + +def delete_expired_jobs(conn: DbConnection) -> None: + """Delete all jobs that are expired.""" + with _cursor(conn) as cursor: + cursor.execute( + "SELECT job_id FROM jobs WHERE datetime(expires) <= datetime()") + return delete_jobs( + conn, tuple(row["job_id"] for row in cursor.fetchall())) |
