diff options
Diffstat (limited to 'gn_libs')
-rw-r--r-- | gn_libs/jobs/__init__.py | 6 | ||||
-rw-r--r-- | gn_libs/jobs/jobs.py | 136 | ||||
-rw-r--r-- | gn_libs/jobs/migrations.py | 68 |
3 files changed, 210 insertions, 0 deletions
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py new file mode 100644 index 0000000..a1a5dd6 --- /dev/null +++ b/gn_libs/jobs/__init__.py @@ -0,0 +1,6 @@ +from .migrations import run_migrations +from .jobs import job, launch_job, initialise_job + +def init_app(flask_app): + """Initialise the migrations for flask""" + run_migrations(flask_app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py new file mode 100644 index 0000000..1adbc33 --- /dev/null +++ b/gn_libs/jobs/jobs.py @@ -0,0 +1,136 @@ +"""Handle asynchronous/background jobs. Job data is stored in SQLite database(s).""" +import os +import sys +import uuid +import shlex +import logging +import subprocess +from pathlib import Path +from functools import reduce +from typing import Union, Optional +from datetime import datetime, timezone, timedelta + +from gn_libs.sqlite3 import DbCursor, DbConnection, cursor as _cursor + +_logger_ = logging.getLogger(__name__) +_DEFAULT_EXPIRY_SECONDS_ = 2 * 24 * 60 * 60 # 2 days, in seconds + + +class JobNotFound(Exception): + """Raised if we try to retrieve a non-existent job.""" + + +def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict: + """Fetch extra job metadata.""" + cursor.execute("SELECT * FROM jobs_metadata WHERE job_id=?", (str(job_id),)) + return { + row["metadata_key"]: row["metadata_value"] + for row in cursor.fetchall() + } + + +def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict: + """Fetch the job details for a job with a particular ID""" + with _cursor(conn) as cursor: + cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),)) + _job = dict(cursor.fetchone()) + if not bool(_job): + raise JobNotFound(f"Could not find job with ID {job_id}") + + _job["metadata"] = __job_metadata__(cursor, job_id) + + if fulldetails: + _job["stderr"] = job_stderr(conn, job_id) + _job["stdout"] = job_stdout(conn, job_id) + + return _job + + +def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict: + """Save the job to database.""" + + with _cursor(conn) as cursor: + job_id = str(the_job["job_id"]) + expires = ((the_job["created"] + timedelta(seconds=expiry_seconds)) + if expiry_seconds > 0 else None) + cursor.execute("INSERT INTO jobs(job_id, created, expires, command) " + "VALUES(:job_id, :created, :expires, :command)", + { + "job_id": job_id, + "created": the_job["created"].isoformat(), + "expires": (expires and expires.isoformat()), + "command": the_job["command"] + }) + metadata = tuple({"job_id": job_id, "key": key, "value": value} + for key,value in the_job["metadata"].items()) + if len(metadata) > 0: + cursor.executemany( + "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) " + "VALUES (:job_id, :key, :value)", + metadata) + + return the_job + + +def initialise_job( + conn: DbConnection, + job_id: uuid.UUID, + command: list, + job_type: str, + extra_meta: dict = {}, + expiry_seconds: Optional[int] = _DEFAULT_EXPIRY_SECONDS_ +) -> dict: + """Initialise the job and put the details in a SQLite3 database.""" + + _job = { + "job_id": job_id, + "command": shlex.join(command), + "created": datetime.now(timezone.utc), + "metadata": { + "status": "pending", + "percent": 0, + "job-type": job_type, + **extra_meta + } + } + return __save_job__(conn, _job, expiry_seconds) + + +def error_filename(jobid, error_dir): + "Compute the path of the file where errors will be dumped." + return f"{error_dir}/job_{jobid}.error" + + +def launch_job( + the_job: dict, + sqlite3_url: str, + error_dir: Path, + worker_manager: str = "scripts.worker" +) -> dict: + """Launch a job in the background""" + if not os.path.exists(error_dir): + os.mkdir(error_dir) + + job_id = str(the_job["job_id"]) + with open(error_filename(job_id, error_dir), + "w", + encoding="utf-8") as errorfile: + subprocess.Popen( # pylint: disable=[consider-using-with] + [sys.executable, "-m", worker_manager, sqlite3_url, job_id], + stderr=errorfile, + env={"PYTHONPATH": ":".join(sys.path)}) + + return the_job + + +def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str, value: str): + """Update the value of a metadata item.""" + with _cursor(conn) as cursor: + cursor.execute( + "UPDATE jobs_metadata SET metadata_value=:value " + "WHERE job_id=:job_id AND metadata_key=:key", + { + "job_id": str(job_id), + "key": key, + "value": value + }) diff --git a/gn_libs/jobs/migrations.py b/gn_libs/jobs/migrations.py new file mode 100644 index 0000000..cc463e3 --- /dev/null +++ b/gn_libs/jobs/migrations.py @@ -0,0 +1,68 @@ +"""Database migrations for the jobs to ensure consistency downstream.""" +from gn_libs.protocols import DbCursor +from gn_libs.sqlite3 import cursor, connection + +def __create_table_jobs__(cursor: DbCursor): + """Create the jobs table""" + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS jobs( + job_id TEXT PRIMARY KEY NOT NULL, + created TEXT NOT NULL, + expires TEXT, + command TEXT NOT NULL + ) WITHOUT ROWID + """) + + +def __create_table_jobs_metadata__(cursor: DbCursor): + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS jobs_metadata( + job_id TEXT, + metadata_key TEXT NOT NULL, + metadata_value TEXT NOT NULL, + FOREIGN KEY(job_id) REFERENCES jobs(job_id) + ON UPDATE CASCADE ON DELETE RESTRICT, + PRIMARY KEY(job_id, metadata_key) + ) WITHOUT ROWID + """) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS idx_tbl_jobs_metadata_cols_job_id + ON jobs_metadata(job_id) + """) + + +def __create_table_jobs_output_streams__(cursor: DbCursor): + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS jobs_standard_outputs( + job_id TEXT PRIMARY KEY, + output_stream TEXT, + timestamp TEXT, + value TEXT, + FOREIGN KEY(job_id) REFERENCES jobs(job_id) + ON UPDATE CASCADE ON DELETE RESTRICT, + CHECK (output_stream IN ('stdout', 'stderr')) + ) WITHOUT ROWID + """) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS idx_tbl_jobs_standard_outputs_cols_job_id + ON jobs_standard_outputs(job_id) + """) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS + idx_tbl_jobs_standard_outputs_cols_job_id_output_stream + ON jobs_standard_outputs(job_id, output_stream) + """) + + +def run_migrations(sqlite_url: str): + with (connection(sqlite_url) as conn, + cursor(conn) as curr): + __create_table_jobs__(curr) + __create_table_jobs_metadata__(curr) + __create_table_jobs_output_streams__(curr) |