"""Handle asynchronous/background jobs. Job data is stored in SQLite database(s).""" import os import sys import uuid import shlex import logging import subprocess from pathlib import Path from functools import reduce from functools import partial from typing import Union, Optional from datetime import datetime, timezone, timedelta from gn_libs.sqlite3 import DbCursor, DbConnection, cursor as _cursor _logger_ = logging.getLogger(__name__) _DEFAULT_EXPIRY_SECONDS_ = 2 * 24 * 60 * 60 # 2 days, in seconds class JobNotFound(Exception): """Raised if we try to retrieve a non-existent job.""" def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict: """Fetch extra job metadata.""" cursor.execute("SELECT * FROM jobs_metadata WHERE job_id=?", (str(job_id),)) return { row["metadata_key"]: row["metadata_value"] for row in cursor.fetchall() } def job_stdstream_outputs(conn, job_id, streamname: str): """Fetch the standard-error output for the job.""" with _cursor(conn) as cursor: cursor.execute( "SELECT * FROM jobs_standard_outputs " "WHERE job_id=? AND output_stream=?", (str(job_id), streamname)) return dict(cursor.fetchone() or {}).get("value") job_stderr = partial(job_stdstream_outputs, streamname="stderr") job_stdout = partial(job_stdstream_outputs, streamname="stderr") def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict: """Fetch the job details for a job with a particular ID""" with _cursor(conn) as cursor: cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),)) _job = dict(cursor.fetchone()) if not bool(_job): raise JobNotFound(f"Could not find job with ID {job_id}") _job["metadata"] = __job_metadata__(cursor, job_id) if fulldetails: _job["stderr"] = job_stderr(conn, job_id) _job["stdout"] = job_stdout(conn, job_id) return _job def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict: """Save the job to database.""" with _cursor(conn) as cursor: job_id = str(the_job["job_id"]) expires = ((the_job["created"] + timedelta(seconds=expiry_seconds)) if expiry_seconds > 0 else None) cursor.execute("INSERT INTO jobs(job_id, created, expires, command) " "VALUES(:job_id, :created, :expires, :command)", { "job_id": job_id, "created": the_job["created"].isoformat(), "expires": (expires and expires.isoformat()), "command": the_job["command"] }) metadata = tuple({"job_id": job_id, "key": key, "value": value} for key,value in the_job["metadata"].items()) if len(metadata) > 0: cursor.executemany( "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) " "VALUES (:job_id, :key, :value)", metadata) return the_job def initialise_job( conn: DbConnection, job_id: uuid.UUID, command: list, job_type: str, extra_meta: dict = {}, expiry_seconds: Optional[int] = _DEFAULT_EXPIRY_SECONDS_ ) -> dict: """Initialise the job and put the details in a SQLite3 database.""" _job = { "job_id": job_id, "command": shlex.join(command), "created": datetime.now(timezone.utc), "metadata": { "status": "pending", "percent": 0, "job-type": job_type, **extra_meta } } return __save_job__(conn, _job, expiry_seconds) def error_filename(jobid, error_dir): "Compute the path of the file where errors will be dumped." return f"{error_dir}/job_{jobid}.error" def build_environment(extras: dict[str, str] = {}): return { **dict(os.environ), "PYTHONPATH": ":".join(sys.path), **extras } def launch_job( the_job: dict, sqlite3_url: str, error_dir: Path, worker_manager: str = "gn_libs.jobs.launcher" ) -> dict: """Launch a job in the background""" if not os.path.exists(error_dir): os.mkdir(error_dir) job_id = str(the_job["job_id"]) with open(error_filename(job_id, error_dir), "w", encoding="utf-8") as errorfile: subprocess.Popen( # pylint: disable=[consider-using-with] [ sys.executable, "-u", "-m", worker_manager, sqlite3_url, job_id, str(error_dir)], stderr=errorfile, env=build_environment()) return the_job def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str, value: str): """Update the value of a metadata item.""" with _cursor(conn) as cursor: cursor.execute( "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) " "VALUES (:job_id, :key, :value) " "ON CONFLICT (job_id, metadata_key) DO UPDATE " "SET metadata_value=:value " "WHERE job_id=:job_id AND metadata_key=:key", { "job_id": str(job_id), "key": key, "value": value }) def push_to_stream(conn: DbConnection, job_id: Union[str, uuid.UUID], stream_name: str, content: str): """Initialise, and keep adding content to the stream from the provided content.""" with _cursor(conn) as cursor: cursor.execute("SELECT * FROM jobs_standard_outputs " "WHERE job_id=:job_id AND output_stream=:stream", { "job_id": str(job_id), "stream": stream_name }) result = cursor.fetchone() new_content = ((bool(result) and result["value"]) or "") + content cursor.execute( "INSERT INTO jobs_standard_outputs(job_id, output_stream, value) " "VALUES(:job_id, :stream, :content) " "ON CONFLICT (job_id, output_stream) DO UPDATE " "SET value=:content " "WHERE job_id=:job_id AND output_stream=:stream", { "job_id": str(job_id), "stream": stream_name, "content": new_content })