about summary refs log tree commit diff
path: root/gn_libs
diff options
context:
space:
mode:
Diffstat (limited to 'gn_libs')
-rw-r--r--gn_libs/jobs/__init__.py1
-rw-r--r--gn_libs/jobs/jobs.py6
-rw-r--r--gn_libs/jobs/launcher.py12
3 files changed, 17 insertions, 2 deletions
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py
index 2dcfaca..7927f8d 100644
--- a/gn_libs/jobs/__init__.py
+++ b/gn_libs/jobs/__init__.py
@@ -1,6 +1,7 @@
 """This package deals with launching and managing background/async jobs."""
 from .migrations import run_migrations
 from .jobs import (job,
+                   kill_job,
                    launch_job,
                    delete_job,
                    delete_jobs,
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
index 6bf6b39..bccddd5 100644
--- a/gn_libs/jobs/jobs.py
+++ b/gn_libs/jobs/jobs.py
@@ -287,3 +287,9 @@ def delete_expired_jobs(conn: DbConnection) -> None:
             "SELECT job_id FROM jobs WHERE datetime(expires) <= datetime()")
         return delete_jobs(
             conn, tuple(row["job_id"] for row in cursor.fetchall()))
+
+
+def kill_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None:
+    """Send a request to kill the job."""
+    return update_metadata(
+        conn, job_id, "hangup_request", datetime.now(timezone.utc).isoformat())
diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py
index d565f9e..f915b81 100644
--- a/gn_libs/jobs/launcher.py
+++ b/gn_libs/jobs/launcher.py
@@ -3,6 +3,7 @@ import os
 import sys
 import time
 import shlex
+import signal
 import logging
 import argparse
 import traceback
@@ -34,8 +35,13 @@ def run_job(conn, job, outputs_directory: Path):
                   encoding="utf-8",
                   stdout=outfile,
                   stderr=errfile) as process):
+            jobs.update_metadata(conn, job_id, "status", "running")
             while process.poll() is None:
-                jobs.update_metadata(conn, job_id, "status", "running")
+                _job = jobs.job(conn, job_id, True)
+                if bool(_job["metadata"].get("hangup_request")):
+                    process.send_signal(signal.SIGHUP)
+                    jobs.update_metadata(conn, job_id, "status", "stopped")
+                    break;
                 jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read())
                 jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read())
                 time.sleep(1)
@@ -51,7 +57,9 @@ def run_job(conn, job, outputs_directory: Path):
         if exit_status == 0:
             jobs.update_metadata(conn, job_id, "status", "completed")
         else:
-            jobs.update_metadata(conn, job_id, "status", "error")
+            _job = jobs.job(conn, job_id, True)
+            if _job["metadata"]["status"] != "stopped":
+                jobs.update_metadata(conn, job_id, "status", "error")
 
         logger.info("exiting job manager/launcher")
         return exit_status