diff options
Diffstat (limited to 'gn_libs')
| -rw-r--r-- | gn_libs/jobs/__init__.py | 1 | ||||
| -rw-r--r-- | gn_libs/jobs/jobs.py | 6 | ||||
| -rw-r--r-- | gn_libs/jobs/launcher.py | 12 |
3 files changed, 17 insertions, 2 deletions
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py index 2dcfaca..7927f8d 100644 --- a/gn_libs/jobs/__init__.py +++ b/gn_libs/jobs/__init__.py @@ -1,6 +1,7 @@ """This package deals with launching and managing background/async jobs.""" from .migrations import run_migrations from .jobs import (job, + kill_job, launch_job, delete_job, delete_jobs, diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py index 6bf6b39..bccddd5 100644 --- a/gn_libs/jobs/jobs.py +++ b/gn_libs/jobs/jobs.py @@ -287,3 +287,9 @@ def delete_expired_jobs(conn: DbConnection) -> None: "SELECT job_id FROM jobs WHERE datetime(expires) <= datetime()") return delete_jobs( conn, tuple(row["job_id"] for row in cursor.fetchall())) + + +def kill_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None: + """Send a request to kill the job.""" + return update_metadata( + conn, job_id, "hangup_request", datetime.now(timezone.utc).isoformat()) diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py index d565f9e..f915b81 100644 --- a/gn_libs/jobs/launcher.py +++ b/gn_libs/jobs/launcher.py @@ -3,6 +3,7 @@ import os import sys import time import shlex +import signal import logging import argparse import traceback @@ -34,8 +35,13 @@ def run_job(conn, job, outputs_directory: Path): encoding="utf-8", stdout=outfile, stderr=errfile) as process): + jobs.update_metadata(conn, job_id, "status", "running") while process.poll() is None: - jobs.update_metadata(conn, job_id, "status", "running") + _job = jobs.job(conn, job_id, True) + if bool(_job["metadata"].get("hangup_request")): + process.send_signal(signal.SIGHUP) + jobs.update_metadata(conn, job_id, "status", "stopped") + break; jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read()) jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read()) time.sleep(1) @@ -51,7 +57,9 @@ def run_job(conn, job, outputs_directory: Path): if exit_status == 0: jobs.update_metadata(conn, job_id, "status", "completed") else: - jobs.update_metadata(conn, job_id, "status", "error") + _job = jobs.job(conn, job_id, True) + if _job["metadata"]["status"] != "stopped": + jobs.update_metadata(conn, job_id, "status", "error") logger.info("exiting job manager/launcher") return exit_status |
