diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py
index 2dcfaca..7927f8d 100644
--- a/gn_libs/jobs/__init__.py
+++ b/gn_libs/jobs/__init__.py
@@ -1,6 +1,7 @@
"""This package deals with launching and managing background/async jobs."""
from .migrations import run_migrations
from .jobs import (job,
+ kill_job,
launch_job,
delete_job,
delete_jobs,
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
index 6bf6b39..bccddd5 100644
--- a/gn_libs/jobs/jobs.py
+++ b/gn_libs/jobs/jobs.py
@@ -287,3 +287,9 @@ def delete_expired_jobs(conn: DbConnection) -> None:
"SELECT job_id FROM jobs WHERE datetime(expires) <= datetime()")
return delete_jobs(
conn, tuple(row["job_id"] for row in cursor.fetchall()))
+
+
+def kill_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None:
+ """Send a request to kill the job."""
+ return update_metadata(
+ conn, job_id, "hangup_request", datetime.now(timezone.utc).isoformat())
diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py
index d565f9e..f915b81 100644
--- a/gn_libs/jobs/launcher.py
+++ b/gn_libs/jobs/launcher.py
@@ -3,6 +3,7 @@ import os
import sys
import time
import shlex
+import signal
import logging
import argparse
import traceback
@@ -34,8 +35,13 @@ def run_job(conn, job, outputs_directory: Path):
encoding="utf-8",
stdout=outfile,
stderr=errfile) as process):
+ jobs.update_metadata(conn, job_id, "status", "running")
while process.poll() is None:
- jobs.update_metadata(conn, job_id, "status", "running")
+ _job = jobs.job(conn, job_id, True)
+ if bool(_job["metadata"].get("hangup_request")):
+ process.send_signal(signal.SIGHUP)
+ jobs.update_metadata(conn, job_id, "status", "stopped")
+ break;
jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read())
jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read())
time.sleep(1)
@@ -51,7 +57,9 @@ def run_job(conn, job, outputs_directory: Path):
if exit_status == 0:
jobs.update_metadata(conn, job_id, "status", "completed")
else:
- jobs.update_metadata(conn, job_id, "status", "error")
+ _job = jobs.job(conn, job_id, True)
+ if _job["metadata"]["status"] != "stopped":
+ jobs.update_metadata(conn, job_id, "status", "error")
logger.info("exiting job manager/launcher")
return exit_status
|