diff options
Diffstat (limited to 'gn_libs/jobs')
-rw-r--r-- | gn_libs/jobs/__init__.py | 11 | ||||
-rw-r--r-- | gn_libs/jobs/jobs.py | 213 | ||||
-rw-r--r-- | gn_libs/jobs/launcher.py | 108 | ||||
-rw-r--r-- | gn_libs/jobs/migrations.py | 69 |
4 files changed, 401 insertions, 0 deletions
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py new file mode 100644 index 0000000..d6e4ce3 --- /dev/null +++ b/gn_libs/jobs/__init__.py @@ -0,0 +1,11 @@ +"""This package deals with launching and managing background/async jobs.""" +from .migrations import run_migrations +from .jobs import (job, + launch_job, + initialise_job, + push_to_stream, + update_metadata) + +def init_app(flask_app): + """Initialise the migrations for flask""" + run_migrations(flask_app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py new file mode 100644 index 0000000..ec1c3a8 --- /dev/null +++ b/gn_libs/jobs/jobs.py @@ -0,0 +1,213 @@ +"""Handle asynchronous/background jobs. Job data is stored in SQLite database(s).""" +import os +import sys +import uuid +import shlex +import logging +import subprocess +from pathlib import Path +from functools import partial +from typing import Union, Optional +from datetime import datetime, timezone, timedelta + +from gn_libs.sqlite3 import DbCursor, DbConnection, cursor as _cursor + +_logger_ = logging.getLogger(__name__) +_DEFAULT_EXPIRY_SECONDS_ = 2 * 24 * 60 * 60 # 2 days, in seconds + + +class JobNotFound(Exception): + """Raised if we try to retrieve a non-existent job.""" + + +def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict: + """Fetch extra job metadata.""" + cursor.execute("SELECT * FROM jobs_metadata WHERE job_id=?", (str(job_id),)) + return { + row["metadata_key"]: row["metadata_value"] + for row in cursor.fetchall() + } + + +def job_stdstream_outputs(conn, job_id, streamname: str): + """Fetch the standard-error output for the job.""" + with _cursor(conn) as cursor: + cursor.execute( + "SELECT * FROM jobs_standard_outputs " + "WHERE job_id=? AND output_stream=?", + (str(job_id), streamname)) + return dict(cursor.fetchone() or {}).get("value") + + +job_stderr = partial(job_stdstream_outputs, streamname="stderr") +job_stdout = partial(job_stdstream_outputs, streamname="stdout") + + +def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict: + """Fetch the job details for a job with a particular ID""" + with _cursor(conn) as cursor: + cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),)) + _job = dict(cursor.fetchone() or {}) + if not bool(_job): + raise JobNotFound(f"Could not find job with ID {job_id}") + + _job["metadata"] = __job_metadata__(cursor, job_id) + + if fulldetails: + _job["stderr"] = job_stderr(conn, job_id) + _job["stdout"] = job_stdout(conn, job_id) + + return _job + + +def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict: + """Save the job to database.""" + + with _cursor(conn) as cursor: + job_id = str(the_job["job_id"]) + expires = ((the_job["created"] + timedelta(seconds=expiry_seconds)) + if expiry_seconds > 0 else None) + cursor.execute("INSERT INTO jobs(job_id, created, expires, command) " + "VALUES(:job_id, :created, :expires, :command)", + { + "job_id": job_id, + "created": the_job["created"].isoformat(), + "expires": (expires and expires.isoformat()), + "command": the_job["command"] + }) + metadata = tuple({"job_id": job_id, "key": key, "value": value} + for key,value in the_job["metadata"].items()) + if len(metadata) > 0: + cursor.executemany( + "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) " + "VALUES (:job_id, :key, :value)", + metadata) + + return the_job + + +def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-arguments] + conn: DbConnection, + job_id: uuid.UUID, + command: list, + job_type: str, + extra_meta: Optional[dict] = None, + expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_ +) -> dict: + """Initialise the job and put the details in a SQLite3 database.""" + if extra_meta is None: + extra_meta = {} + + _job = { + "job_id": job_id, + "command": shlex.join(command), + "created": datetime.now(timezone.utc), + "metadata": { + "status": "pending", + "percent": 0, + "job-type": job_type, + **extra_meta + } + } + return __save_job__(conn, _job, expiry_seconds) + + +def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path: + """Compute the path for the file where the launcher's `stream` output goes""" + assert stream in ("stdout", "stderr"), f"Invalid stream '{stream}'" + return outdir.joinpath(f"launcher_job_{jobid}.{stream}") + + +stdout_filename = partial(output_file, stream="stdout") +stderr_filename = partial(output_file, stream="stderr") + + +def build_environment(extras: Optional[dict[str, str]] = None) -> dict[str, str]: + """Setup the runtime environment variables for the background script.""" + if extras is None: + extras = {} + + return { + **dict(os.environ), + "PYTHONPATH": ":".join(sys.path), + **extras + } + + +def launch_job( + the_job: dict, + sqlite3_url: str, + error_dir: Path, + worker_manager: str = "gn_libs.jobs.launcher", + loglevel: str = "info" +) -> dict: + """Launch a job in the background""" + if not os.path.exists(error_dir): + os.mkdir(error_dir) + + job_id = str(the_job["job_id"]) + with (open(stderr_filename(jobid=the_job["job_id"], outdir=error_dir), + "w", + encoding="utf-8") as stderrfile, + open(stdout_filename(jobid=the_job["job_id"], outdir=error_dir), + "w", + encoding="utf-8") as stdoutfile): + subprocess.Popen( # pylint: disable=[consider-using-with] + [ + sys.executable, "-u", + "-m", worker_manager, + sqlite3_url, + job_id, + str(error_dir), + "--log-level", + loglevel + ], + stdout=stdoutfile, + stderr=stderrfile, + env=build_environment()) + + return the_job + + +def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str, value: str): + """Update the value of a metadata item.""" + with _cursor(conn) as cursor: + cursor.execute( + "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) " + "VALUES (:job_id, :key, :value) " + "ON CONFLICT (job_id, metadata_key) DO UPDATE " + "SET metadata_value=:value " + "WHERE job_id=:job_id AND metadata_key=:key", + { + "job_id": str(job_id), + "key": key, + "value": value + }) + + +def push_to_stream( + conn: DbConnection, + job_id: Union[str, uuid.UUID], + stream_name: str, content: str +): + """Initialise, and keep adding content to the stream from the provided content.""" + with _cursor(conn) as cursor: + cursor.execute("SELECT * FROM jobs_standard_outputs " + "WHERE job_id=:job_id AND output_stream=:stream", + { + "job_id": str(job_id), + "stream": stream_name + }) + result = cursor.fetchone() + new_content = ((bool(result) and result["value"]) or "") + content + cursor.execute( + "INSERT INTO jobs_standard_outputs(job_id, output_stream, value) " + "VALUES(:job_id, :stream, :content) " + "ON CONFLICT (job_id, output_stream) DO UPDATE " + "SET value=:content " + "WHERE job_id=:job_id AND output_stream=:stream", + { + "job_id": str(job_id), + "stream": stream_name, + "content": new_content + }) diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py new file mode 100644 index 0000000..d565f9e --- /dev/null +++ b/gn_libs/jobs/launcher.py @@ -0,0 +1,108 @@ +"""Default launcher/manager script for background jobs.""" +import os +import sys +import time +import shlex +import logging +import argparse +import traceback +import subprocess +from uuid import UUID +from pathlib import Path + +from gn_libs import jobs, sqlite3 + +logger = logging.getLogger(__name__) + + +def run_job(conn, job, outputs_directory: Path): + """Run the job.""" + logger.info("Setting up the job.") + job_id = job["job_id"] + stdout_file = outputs_directory.joinpath(f"{job_id}.stdout") + stderr_file = outputs_directory.joinpath(f"{job_id}.stderr") + jobs.update_metadata(conn, job_id, "stdout-file", str(stdout_file)) + jobs.update_metadata(conn, job_id, "stderr-file", str(stderr_file)) + try: + logger.info("Launching the job in a separate process.") + with (stdout_file.open(mode="w") as outfile, + stderr_file.open(mode="w") as errfile, + stdout_file.open(mode="r") as stdout_in, + stderr_file.open(mode="r") as stderr_in, + subprocess.Popen( + shlex.split(job["command"]), + encoding="utf-8", + stdout=outfile, + stderr=errfile) as process): + while process.poll() is None: + jobs.update_metadata(conn, job_id, "status", "running") + jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read()) + jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read()) + time.sleep(1) + + # Fetch any remaining content. + jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read()) + jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read()) + logger.info("Job completed. Cleaning up.") + + os.remove(stdout_file) + os.remove(stderr_file) + exit_status = process.poll() + if exit_status == 0: + jobs.update_metadata(conn, job_id, "status", "completed") + else: + jobs.update_metadata(conn, job_id, "status", "error") + + logger.info("exiting job manager/launcher") + return exit_status + except Exception as _exc:# pylint: disable=[broad-exception-caught] + logger.error("An exception was raised when attempting to run the job", + exc_info=True) + jobs.update_metadata(conn, job_id, "status", "error") + jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc()) + return 4 + + +def parse_args(): + """Define and parse CLI args.""" + parser = argparse.ArgumentParser( + prog="GN Jobs Launcher", + description = ( + "Generic launcher and manager of jobs defined with gn-libs")) + parser.add_argument( + "jobs_db_uri", + help="The URI to the SQLite3 database holding the jobs' details") + parser.add_argument( + "job_id", help="The id of the job being processed", type=UUID) + parser.add_argument("outputs_directory", + help="Directory where output files will be created", + type=Path) + parser.add_argument( + "--log-level", + type=str, + help="Determines what is logged out.", + choices=("debug", "info", "warning", "error", "critical"), + default="info") + return parser.parse_args() + + +def main(): + """Entry-point to this program.""" + args = parse_args() + logger.setLevel(args.log_level.upper()) + args.outputs_directory.mkdir(parents=True, exist_ok=True) + with sqlite3.connection(args.jobs_db_uri) as conn: + job = jobs.job(conn, args.job_id) + if job: + return run_job(conn, job, args.outputs_directory) + + jobs.update_metadata(conn, args.job_id, "status", "error") + jobs.push_to_stream(conn, args.job_id, "stderr", "Job not found!") + return 2 + + return 3 + + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/gn_libs/jobs/migrations.py b/gn_libs/jobs/migrations.py new file mode 100644 index 0000000..0c9825b --- /dev/null +++ b/gn_libs/jobs/migrations.py @@ -0,0 +1,69 @@ +"""Database migrations for the jobs to ensure consistency downstream.""" +from gn_libs.protocols import DbCursor +from gn_libs.sqlite3 import connection, cursor as acquire_cursor + +def __create_table_jobs__(cursor: DbCursor): + """Create the jobs table""" + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS jobs( + job_id TEXT PRIMARY KEY NOT NULL, + created TEXT NOT NULL, + expires TEXT, + command TEXT NOT NULL + ) WITHOUT ROWID + """) + + +def __create_table_jobs_metadata__(cursor: DbCursor): + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS jobs_metadata( + job_id TEXT, + metadata_key TEXT NOT NULL, + metadata_value TEXT NOT NULL, + FOREIGN KEY(job_id) REFERENCES jobs(job_id) + ON UPDATE CASCADE ON DELETE RESTRICT, + PRIMARY KEY(job_id, metadata_key) + ) WITHOUT ROWID + """) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS idx_tbl_jobs_metadata_cols_job_id + ON jobs_metadata(job_id) + """) + + +def __create_table_jobs_output_streams__(cursor: DbCursor): + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS jobs_standard_outputs( + job_id TEXT NOT NULL, + output_stream TEXT, + value TEXT, + FOREIGN KEY(job_id) REFERENCES jobs(job_id) + ON UPDATE CASCADE ON DELETE RESTRICT, + CHECK (output_stream IN ('stdout', 'stderr')), + PRIMARY KEY(job_id, output_stream) + ) WITHOUT ROWID + """) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS idx_tbl_jobs_standard_outputs_cols_job_id + ON jobs_standard_outputs(job_id) + """) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS + idx_tbl_jobs_standard_outputs_cols_job_id_output_stream + ON jobs_standard_outputs(job_id, output_stream) + """) + + +def run_migrations(sqlite_url: str): + """Run the migrations to setup the background jobs database.""" + with (connection(sqlite_url) as conn, + acquire_cursor(conn) as curr): + __create_table_jobs__(curr) + __create_table_jobs_metadata__(curr) + __create_table_jobs_output_streams__(curr) |