"""Handle asynchronous/background jobs. Job data is stored in SQLite database(s).""" import os import sys import uuid import shlex import logging import subprocess from pathlib import Path from functools import partial from typing import Union, Optional from datetime import datetime, timezone, timedelta from gn_libs.sqlite3 import DbCursor, DbConnection, cursor as _cursor _logger_ = logging.getLogger(__name__) _DEFAULT_EXPIRY_SECONDS_ = 2 * 24 * 60 * 60 # 2 days, in seconds class JobNotFound(Exception): """Raised if we try to retrieve a non-existent job.""" def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict: """Fetch extra job metadata.""" cursor.execute("SELECT * FROM jobs_metadata WHERE job_id=?", (str(job_id),)) return { row["metadata_key"]: row["metadata_value"] for row in cursor.fetchall() } def job_stdstream_outputs(conn, job_id, streamname: str): """Fetch the standard-error output for the job.""" with _cursor(conn) as cursor: cursor.execute( "SELECT * FROM jobs_standard_outputs " "WHERE job_id=? AND output_stream=?", (str(job_id), streamname)) return dict(cursor.fetchone() or {}).get("value") job_stderr = partial(job_stdstream_outputs, streamname="stderr") job_stdout = partial(job_stdstream_outputs, streamname="stdout") def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict: """Fetch the job details for a job with a particular ID""" with _cursor(conn) as cursor: cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),)) _job = dict(cursor.fetchone() or {}) if not bool(_job): raise JobNotFound(f"Could not find job with ID {job_id}") _job["metadata"] = __job_metadata__(cursor, job_id) if fulldetails: _job["stderr"] = job_stderr(conn, job_id) _job["stdout"] = job_stdout(conn, job_id) return _job def jobs_by_external_id(conn: DbConnection, external_id: Union[str, uuid.UUID]) -> tuple[dict, ...]: """Fetch jobs by their external IDs.""" with _cursor(conn) as cursor: cursor.execute( "SELECT jeids.external_id, jobs.* FROM jobs_external_ids AS jeids " "INNER JOIN jobs ON jeids.job_id=jobs.job_id " "WHERE jeids.external_id=? " "ORDER BY jobs.created DESC", (str(external_id),)) _jobs = {row["job_id"]: {**dict(row), "metadata": {}} for row in cursor.fetchall()} _jobs_ids = tuple(_job["job_id"] for _job in _jobs.values()) _paramstr = ", ".join(["?"] * len(_jobs_ids)) cursor.execute( f"SELECT * FROM jobs_metadata WHERE job_id IN ({_paramstr})", _jobs_ids) for row in cursor.fetchall(): _jobs[row["job_id"]]["metadata"][row["metadata_key"]] = row["metadata_value"] return tuple(_jobs.values()) def __save_job__( conn: DbConnection, the_job: dict, expiry_seconds: int, external_id: str = "" ) -> dict: """Save the job to database.""" with _cursor(conn) as cursor: job_id = str(the_job["job_id"]) expires = ((the_job["created"] + timedelta(seconds=expiry_seconds)) if expiry_seconds > 0 else None) cursor.execute("INSERT INTO jobs(job_id, created, expires, command) " "VALUES(:job_id, :created, :expires, :command)", { "job_id": job_id, "created": the_job["created"].isoformat(), "expires": (expires and expires.isoformat()), "command": the_job["command"] }) if bool(external_id.strip()): cursor.execute( "INSERT INTO jobs_external_ids(job_id, external_id) " "VALUES(:job_id, :external_id)", {"job_id": job_id, "external_id": external_id.strip()}) metadata = tuple({"job_id": job_id, "key": key, "value": value} for key,value in the_job["metadata"].items()) if len(metadata) > 0: cursor.executemany( "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) " "VALUES (:job_id, :key, :value)", metadata) return the_job def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-arguments] conn: DbConnection, job_id: uuid.UUID, command: list, job_type: str, extra_meta: Optional[dict] = None, expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_, external_id: Optional[Union[str, uuid.UUID]] = None ) -> dict: """Initialise the job and put the details in a SQLite3 database.""" if extra_meta is None: extra_meta = {} def __process_external_id__(_id: Optional[Union[str, uuid.UUID]]) -> str: if isinstance(_id, uuid.UUID): return str(_id) if _id is not None and bool(_id.strip()): return str(_id.strip()) return "" _ext_id = __process_external_id__(external_id) _job = { "job_id": job_id, "command": shlex.join(command), "created": datetime.now(timezone.utc), "metadata": { "status": "pending", "percent": 0, "job-type": job_type, **extra_meta, **({"external_id": _ext_id} if bool(_ext_id) else {}) } } return __save_job__(conn, _job, expiry_seconds, _ext_id) def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path: """Compute the path for the file where the launcher's `stream` output goes""" assert stream in ("stdout", "stderr"), f"Invalid stream '{stream}'" return outdir.joinpath(f"launcher_job_{jobid}.{stream}") stdout_filename = partial(output_file, stream="stdout") stderr_filename = partial(output_file, stream="stderr") def build_environment(extras: Optional[dict[str, str]] = None) -> dict[str, str]: """Setup the runtime environment variables for the background script.""" if extras is None: extras = {} return { **dict(os.environ), "PYTHONPATH": ":".join(sys.path), **extras } def launch_job( the_job: dict, sqlite3_url: str, error_dir: Path, worker_manager: str = "gn_libs.jobs.launcher", loglevel: str = "info" ) -> dict: """Launch a job in the background""" if not os.path.exists(error_dir): os.mkdir(error_dir) job_id = str(the_job["job_id"]) with (open(stderr_filename(jobid=the_job["job_id"], outdir=error_dir), "w", encoding="utf-8") as stderrfile, open(stdout_filename(jobid=the_job["job_id"], outdir=error_dir), "w", encoding="utf-8") as stdoutfile): subprocess.Popen( # pylint: disable=[consider-using-with] [ sys.executable, "-u", "-m", worker_manager, sqlite3_url, job_id, str(error_dir), "--log-level", loglevel ], stdout=stdoutfile, stderr=stderrfile, env=build_environment()) return the_job def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str, value: str): """Update the value of a metadata item.""" with _cursor(conn) as cursor: cursor.execute( "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) " "VALUES (:job_id, :key, :value) " "ON CONFLICT (job_id, metadata_key) DO UPDATE " "SET metadata_value=:value " "WHERE job_id=:job_id AND metadata_key=:key", { "job_id": str(job_id), "key": key, "value": value }) def push_to_stream( conn: DbConnection, job_id: Union[str, uuid.UUID], stream_name: str, content: str ): """Initialise, and keep adding content to the stream from the provided content.""" with _cursor(conn) as cursor: cursor.execute("SELECT * FROM jobs_standard_outputs " "WHERE job_id=:job_id AND output_stream=:stream", { "job_id": str(job_id), "stream": stream_name }) result = cursor.fetchone() new_content = ((bool(result) and result["value"]) or "") + content cursor.execute( "INSERT INTO jobs_standard_outputs(job_id, output_stream, value) " "VALUES(:job_id, :stream, :content) " "ON CONFLICT (job_id, output_stream) DO UPDATE " "SET value=:content " "WHERE job_id=:job_id AND output_stream=:stream", { "job_id": str(job_id), "stream": stream_name, "content": new_content }) def delete_jobs( conn: DbConnection, job_ids: tuple[Union[uuid.UUID, str], ...]) -> None: """Delete the given jobs.""" with _cursor(conn) as cursor: _paramstr = ", ".join(["?"] * len(job_ids)) _params = tuple(str(job_id) for job_id in job_ids) cursor.execute( f"DELETE FROM jobs_standard_outputs WHERE job_id IN ({_paramstr})", _params) cursor.execute( f"DELETE FROM jobs_metadata WHERE job_id IN ({_paramstr})", _params) cursor.execute( f"DELETE FROM jobs_external_ids WHERE job_id IN ({_paramstr})", _params) cursor.execute(f"DELETE FROM jobs WHERE job_id IN ({_paramstr})", _params) def delete_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None: """Delete a specific job.""" return delete_jobs(conn, (job_id,)) def delete_expired_jobs(conn: DbConnection) -> None: """Delete all jobs that are expired.""" with _cursor(conn) as cursor: cursor.execute( "SELECT job_id FROM jobs WHERE datetime(expires) <= datetime()") return delete_jobs( conn, tuple(row["job_id"] for row in cursor.fetchall())) def kill_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None: """Send a request to kill the job.""" return update_metadata( conn, job_id, "hangup_request", datetime.now(timezone.utc).isoformat())