diff options
Diffstat (limited to 'gn_libs/jobs/jobs.py')
-rw-r--r-- | gn_libs/jobs/jobs.py | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py new file mode 100644 index 0000000..ec1c3a8 --- /dev/null +++ b/gn_libs/jobs/jobs.py @@ -0,0 +1,213 @@ +"""Handle asynchronous/background jobs. Job data is stored in SQLite database(s).""" +import os +import sys +import uuid +import shlex +import logging +import subprocess +from pathlib import Path +from functools import partial +from typing import Union, Optional +from datetime import datetime, timezone, timedelta + +from gn_libs.sqlite3 import DbCursor, DbConnection, cursor as _cursor + +_logger_ = logging.getLogger(__name__) +_DEFAULT_EXPIRY_SECONDS_ = 2 * 24 * 60 * 60 # 2 days, in seconds + + +class JobNotFound(Exception): + """Raised if we try to retrieve a non-existent job.""" + + +def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict: + """Fetch extra job metadata.""" + cursor.execute("SELECT * FROM jobs_metadata WHERE job_id=?", (str(job_id),)) + return { + row["metadata_key"]: row["metadata_value"] + for row in cursor.fetchall() + } + + +def job_stdstream_outputs(conn, job_id, streamname: str): + """Fetch the standard-error output for the job.""" + with _cursor(conn) as cursor: + cursor.execute( + "SELECT * FROM jobs_standard_outputs " + "WHERE job_id=? AND output_stream=?", + (str(job_id), streamname)) + return dict(cursor.fetchone() or {}).get("value") + + +job_stderr = partial(job_stdstream_outputs, streamname="stderr") +job_stdout = partial(job_stdstream_outputs, streamname="stdout") + + +def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict: + """Fetch the job details for a job with a particular ID""" + with _cursor(conn) as cursor: + cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),)) + _job = dict(cursor.fetchone() or {}) + if not bool(_job): + raise JobNotFound(f"Could not find job with ID {job_id}") + + _job["metadata"] = __job_metadata__(cursor, job_id) + + if fulldetails: + _job["stderr"] = job_stderr(conn, job_id) + _job["stdout"] = job_stdout(conn, job_id) + + return _job + + +def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict: + """Save the job to database.""" + + with _cursor(conn) as cursor: + job_id = str(the_job["job_id"]) + expires = ((the_job["created"] + timedelta(seconds=expiry_seconds)) + if expiry_seconds > 0 else None) + cursor.execute("INSERT INTO jobs(job_id, created, expires, command) " + "VALUES(:job_id, :created, :expires, :command)", + { + "job_id": job_id, + "created": the_job["created"].isoformat(), + "expires": (expires and expires.isoformat()), + "command": the_job["command"] + }) + metadata = tuple({"job_id": job_id, "key": key, "value": value} + for key,value in the_job["metadata"].items()) + if len(metadata) > 0: + cursor.executemany( + "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) " + "VALUES (:job_id, :key, :value)", + metadata) + + return the_job + + +def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-arguments] + conn: DbConnection, + job_id: uuid.UUID, + command: list, + job_type: str, + extra_meta: Optional[dict] = None, + expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_ +) -> dict: + """Initialise the job and put the details in a SQLite3 database.""" + if extra_meta is None: + extra_meta = {} + + _job = { + "job_id": job_id, + "command": shlex.join(command), + "created": datetime.now(timezone.utc), + "metadata": { + "status": "pending", + "percent": 0, + "job-type": job_type, + **extra_meta + } + } + return __save_job__(conn, _job, expiry_seconds) + + +def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path: + """Compute the path for the file where the launcher's `stream` output goes""" + assert stream in ("stdout", "stderr"), f"Invalid stream '{stream}'" + return outdir.joinpath(f"launcher_job_{jobid}.{stream}") + + +stdout_filename = partial(output_file, stream="stdout") +stderr_filename = partial(output_file, stream="stderr") + + +def build_environment(extras: Optional[dict[str, str]] = None) -> dict[str, str]: + """Setup the runtime environment variables for the background script.""" + if extras is None: + extras = {} + + return { + **dict(os.environ), + "PYTHONPATH": ":".join(sys.path), + **extras + } + + +def launch_job( + the_job: dict, + sqlite3_url: str, + error_dir: Path, + worker_manager: str = "gn_libs.jobs.launcher", + loglevel: str = "info" +) -> dict: + """Launch a job in the background""" + if not os.path.exists(error_dir): + os.mkdir(error_dir) + + job_id = str(the_job["job_id"]) + with (open(stderr_filename(jobid=the_job["job_id"], outdir=error_dir), + "w", + encoding="utf-8") as stderrfile, + open(stdout_filename(jobid=the_job["job_id"], outdir=error_dir), + "w", + encoding="utf-8") as stdoutfile): + subprocess.Popen( # pylint: disable=[consider-using-with] + [ + sys.executable, "-u", + "-m", worker_manager, + sqlite3_url, + job_id, + str(error_dir), + "--log-level", + loglevel + ], + stdout=stdoutfile, + stderr=stderrfile, + env=build_environment()) + + return the_job + + +def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str, value: str): + """Update the value of a metadata item.""" + with _cursor(conn) as cursor: + cursor.execute( + "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) " + "VALUES (:job_id, :key, :value) " + "ON CONFLICT (job_id, metadata_key) DO UPDATE " + "SET metadata_value=:value " + "WHERE job_id=:job_id AND metadata_key=:key", + { + "job_id": str(job_id), + "key": key, + "value": value + }) + + +def push_to_stream( + conn: DbConnection, + job_id: Union[str, uuid.UUID], + stream_name: str, content: str +): + """Initialise, and keep adding content to the stream from the provided content.""" + with _cursor(conn) as cursor: + cursor.execute("SELECT * FROM jobs_standard_outputs " + "WHERE job_id=:job_id AND output_stream=:stream", + { + "job_id": str(job_id), + "stream": stream_name + }) + result = cursor.fetchone() + new_content = ((bool(result) and result["value"]) or "") + content + cursor.execute( + "INSERT INTO jobs_standard_outputs(job_id, output_stream, value) " + "VALUES(:job_id, :stream, :content) " + "ON CONFLICT (job_id, output_stream) DO UPDATE " + "SET value=:content " + "WHERE job_id=:job_id AND output_stream=:stream", + { + "job_id": str(job_id), + "stream": stream_name, + "content": new_content + }) |