about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--gn_libs/jobs/__init__.py5
-rw-r--r--gn_libs/jobs/jobs.py33
2 files changed, 37 insertions, 1 deletions
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py
index 4960349..2dcfaca 100644
--- a/gn_libs/jobs/__init__.py
+++ b/gn_libs/jobs/__init__.py
@@ -2,10 +2,13 @@
 from .migrations import run_migrations
 from .jobs import (job,
                    launch_job,
+                   delete_job,
+                   delete_jobs,
                    initialise_job,
                    push_to_stream,
                    update_metadata,
-                   jobs_by_external_id)
+                   jobs_by_external_id,
+                   delete_expired_jobs)
 
 def init_app(flask_app):
     """Initialise the migrations for flask"""
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
index cee6e7e..6bf6b39 100644
--- a/gn_libs/jobs/jobs.py
+++ b/gn_libs/jobs/jobs.py
@@ -254,3 +254,36 @@ def push_to_stream(
                 "stream": stream_name,
                 "content": new_content
             })
+
+
+def delete_jobs(
+        conn: DbConnection, job_ids: tuple[Union[uuid.UUID, str], ...]) -> None:
+    """Delete the given jobs."""
+    with _cursor(conn) as cursor:
+        _paramstr = ", ".join(["?"] * len(job_ids))
+        _params = tuple(str(job_id) for job_id in job_ids)
+        cursor.execute(
+            f"DELETE FROM jobs_standard_outputs WHERE job_id IN ({_paramstr})",
+            _params)
+        cursor.execute(
+            f"DELETE FROM jobs_metadata WHERE job_id IN ({_paramstr})",
+            _params)
+        cursor.execute(
+            f"DELETE FROM jobs_external_ids WHERE job_id IN ({_paramstr})",
+            _params)
+        cursor.execute(f"DELETE FROM jobs WHERE job_id IN ({_paramstr})",
+                       _params)
+
+
+def delete_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None:
+    """Delete a specific job."""
+    return delete_jobs(conn, (job_id,))
+
+
+def delete_expired_jobs(conn: DbConnection) -> None:
+    """Delete all jobs that are expired."""
+    with _cursor(conn) as cursor:
+        cursor.execute(
+            "SELECT job_id FROM jobs WHERE datetime(expires) <= datetime()")
+        return delete_jobs(
+            conn, tuple(row["job_id"] for row in cursor.fetchall()))