diff options
Diffstat (limited to 'gn_libs/jobs')
-rw-r--r-- | gn_libs/jobs/__init__.py | 1 | ||||
-rw-r--r-- | gn_libs/jobs/jobs.py | 79 | ||||
-rw-r--r-- | gn_libs/jobs/launcher.py | 34 | ||||
-rw-r--r-- | gn_libs/jobs/migrations.py | 5 |
4 files changed, 97 insertions, 22 deletions
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py index 6f400ef..d6e4ce3 100644 --- a/gn_libs/jobs/__init__.py +++ b/gn_libs/jobs/__init__.py @@ -1,3 +1,4 @@ +"""This package deals with launching and managing background/async jobs.""" from .migrations import run_migrations from .jobs import (job, launch_job, diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py index 38cd9c0..ec1c3a8 100644 --- a/gn_libs/jobs/jobs.py +++ b/gn_libs/jobs/jobs.py @@ -6,7 +6,7 @@ import shlex import logging import subprocess from pathlib import Path -from functools import reduce +from functools import partial from typing import Union, Optional from datetime import datetime, timezone, timedelta @@ -29,11 +29,25 @@ def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict: } +def job_stdstream_outputs(conn, job_id, streamname: str): + """Fetch the standard-error output for the job.""" + with _cursor(conn) as cursor: + cursor.execute( + "SELECT * FROM jobs_standard_outputs " + "WHERE job_id=? AND output_stream=?", + (str(job_id), streamname)) + return dict(cursor.fetchone() or {}).get("value") + + +job_stderr = partial(job_stdstream_outputs, streamname="stderr") +job_stdout = partial(job_stdstream_outputs, streamname="stdout") + + def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict: """Fetch the job details for a job with a particular ID""" with _cursor(conn) as cursor: cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),)) - _job = dict(cursor.fetchone()) + _job = dict(cursor.fetchone() or {}) if not bool(_job): raise JobNotFound(f"Could not find job with ID {job_id}") @@ -72,16 +86,18 @@ def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict return the_job -def initialise_job( +def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-arguments] conn: DbConnection, job_id: uuid.UUID, command: list, job_type: str, - extra_meta: dict = {}, - expiry_seconds: Optional[int] = _DEFAULT_EXPIRY_SECONDS_ + extra_meta: Optional[dict] = None, + expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_ ) -> dict: """Initialise the job and put the details in a SQLite3 database.""" - + if extra_meta is None: + extra_meta = {} + _job = { "job_id": job_id, "command": shlex.join(command), @@ -96,34 +112,59 @@ def initialise_job( return __save_job__(conn, _job, expiry_seconds) -def error_filename(jobid, error_dir): - "Compute the path of the file where errors will be dumped." - return f"{error_dir}/job_{jobid}.error" +def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path: + """Compute the path for the file where the launcher's `stream` output goes""" + assert stream in ("stdout", "stderr"), f"Invalid stream '{stream}'" + return outdir.joinpath(f"launcher_job_{jobid}.{stream}") + + +stdout_filename = partial(output_file, stream="stdout") +stderr_filename = partial(output_file, stream="stderr") + + +def build_environment(extras: Optional[dict[str, str]] = None) -> dict[str, str]: + """Setup the runtime environment variables for the background script.""" + if extras is None: + extras = {} + + return { + **dict(os.environ), + "PYTHONPATH": ":".join(sys.path), + **extras + } def launch_job( the_job: dict, sqlite3_url: str, error_dir: Path, - worker_manager: str = "gn_libs.jobs.launcher" + worker_manager: str = "gn_libs.jobs.launcher", + loglevel: str = "info" ) -> dict: """Launch a job in the background""" if not os.path.exists(error_dir): os.mkdir(error_dir) job_id = str(the_job["job_id"]) - with open(error_filename(job_id, error_dir), - "w", - encoding="utf-8") as errorfile: + with (open(stderr_filename(jobid=the_job["job_id"], outdir=error_dir), + "w", + encoding="utf-8") as stderrfile, + open(stdout_filename(jobid=the_job["job_id"], outdir=error_dir), + "w", + encoding="utf-8") as stdoutfile): subprocess.Popen( # pylint: disable=[consider-using-with] [ sys.executable, "-u", "-m", worker_manager, sqlite3_url, job_id, - str(error_dir)], - stderr=errorfile, - env={"PYTHONPATH": ":".join(sys.path)}) + str(error_dir), + "--log-level", + loglevel + ], + stdout=stdoutfile, + stderr=stderrfile, + env=build_environment()) return the_job @@ -144,7 +185,11 @@ def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str, }) -def push_to_stream(conn: DbConnection, job_id: Union[str, uuid.UUID], stream_name: str, content: str): +def push_to_stream( + conn: DbConnection, + job_id: Union[str, uuid.UUID], + stream_name: str, content: str +): """Initialise, and keep adding content to the stream from the provided content.""" with _cursor(conn) as cursor: cursor.execute("SELECT * FROM jobs_standard_outputs " diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py index b7369a4..d565f9e 100644 --- a/gn_libs/jobs/launcher.py +++ b/gn_libs/jobs/launcher.py @@ -1,6 +1,9 @@ +"""Default launcher/manager script for background jobs.""" +import os import sys import time import shlex +import logging import argparse import traceback import subprocess @@ -9,15 +12,19 @@ from pathlib import Path from gn_libs import jobs, sqlite3 +logger = logging.getLogger(__name__) + def run_job(conn, job, outputs_directory: Path): """Run the job.""" + logger.info("Setting up the job.") job_id = job["job_id"] stdout_file = outputs_directory.joinpath(f"{job_id}.stdout") stderr_file = outputs_directory.joinpath(f"{job_id}.stderr") jobs.update_metadata(conn, job_id, "stdout-file", str(stdout_file)) jobs.update_metadata(conn, job_id, "stderr-file", str(stderr_file)) try: + logger.info("Launching the job in a separate process.") with (stdout_file.open(mode="w") as outfile, stderr_file.open(mode="w") as errfile, stdout_file.open(mode="r") as stdout_in, @@ -36,7 +43,21 @@ def run_job(conn, job, outputs_directory: Path): # Fetch any remaining content. jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read()) jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read()) - except: + logger.info("Job completed. Cleaning up.") + + os.remove(stdout_file) + os.remove(stderr_file) + exit_status = process.poll() + if exit_status == 0: + jobs.update_metadata(conn, job_id, "status", "completed") + else: + jobs.update_metadata(conn, job_id, "status", "error") + + logger.info("exiting job manager/launcher") + return exit_status + except Exception as _exc:# pylint: disable=[broad-exception-caught] + logger.error("An exception was raised when attempting to run the job", + exc_info=True) jobs.update_metadata(conn, job_id, "status", "error") jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc()) return 4 @@ -56,14 +77,21 @@ def parse_args(): parser.add_argument("outputs_directory", help="Directory where output files will be created", type=Path) + parser.add_argument( + "--log-level", + type=str, + help="Determines what is logged out.", + choices=("debug", "info", "warning", "error", "critical"), + default="info") return parser.parse_args() + def main(): """Entry-point to this program.""" args = parse_args() + logger.setLevel(args.log_level.upper()) args.outputs_directory.mkdir(parents=True, exist_ok=True) - with (sqlite3.connection(args.jobs_db_uri) as conn, - sqlite3.cursor(conn) as cursor): + with sqlite3.connection(args.jobs_db_uri) as conn: job = jobs.job(conn, args.job_id) if job: return run_job(conn, job, args.outputs_directory) diff --git a/gn_libs/jobs/migrations.py b/gn_libs/jobs/migrations.py index 86fb958..0c9825b 100644 --- a/gn_libs/jobs/migrations.py +++ b/gn_libs/jobs/migrations.py @@ -1,6 +1,6 @@ """Database migrations for the jobs to ensure consistency downstream.""" from gn_libs.protocols import DbCursor -from gn_libs.sqlite3 import cursor, connection +from gn_libs.sqlite3 import connection, cursor as acquire_cursor def __create_table_jobs__(cursor: DbCursor): """Create the jobs table""" @@ -61,8 +61,9 @@ def __create_table_jobs_output_streams__(cursor: DbCursor): def run_migrations(sqlite_url: str): + """Run the migrations to setup the background jobs database.""" with (connection(sqlite_url) as conn, - cursor(conn) as curr): + acquire_cursor(conn) as curr): __create_table_jobs__(curr) __create_table_jobs_metadata__(curr) __create_table_jobs_output_streams__(curr) |