aboutsummaryrefslogtreecommitdiff
path: root/gn_libs
diff options
context:
space:
mode:
Diffstat (limited to 'gn_libs')
-rw-r--r--gn_libs/debug.py29
-rw-r--r--gn_libs/jobs/__init__.py11
-rw-r--r--gn_libs/jobs/jobs.py213
-rw-r--r--gn_libs/jobs/launcher.py108
-rw-r--r--gn_libs/jobs/migrations.py69
-rw-r--r--gn_libs/monadic_requests.py74
-rw-r--r--gn_libs/mysqldb.py5
-rw-r--r--gn_libs/protocols/__init__.py2
-rw-r--r--gn_libs/protocols/db.py35
-rw-r--r--gn_libs/sqlite3.py45
10 files changed, 586 insertions, 5 deletions
diff --git a/gn_libs/debug.py b/gn_libs/debug.py
index 6b7173b..7ad10e0 100644
--- a/gn_libs/debug.py
+++ b/gn_libs/debug.py
@@ -1,6 +1,7 @@
"""Debug utilities"""
import logging
-from flask import current_app
+import importlib.util
+from typing import Callable
__this_module_name__ = __name__
@@ -8,10 +9,16 @@ __this_module_name__ = __name__
# pylint: disable=invalid-name
def getLogger(name: str):
"""Return a logger"""
- return (
- logging.getLogger(name)
- if not bool(current_app)
- else current_app.logger)
+ flask_spec = importlib.util.find_spec("flask")
+ if bool(flask_spec):
+ current_app = importlib.import_module("flask").current_app
+ return (
+ logging.getLogger(name)
+ if not bool(current_app)
+ else current_app.logger)
+
+ return logging.getLogger(name)
+
def __pk__(*args):
"""Format log entry"""
@@ -20,3 +27,15 @@ def __pk__(*args):
logger = getLogger(__this_module_name__)
logger.debug("%s: %s", title_vals, value)
return value
+
+
+def make_peeker(logger: logging.Logger) -> Callable:
+ """Make a peeker function that's very much like __pk__ but that uses the
+ given logger."""
+ def peeker(*args):
+ value = args[-1]
+ title_vals = " => ".join(args[0:-1])
+ logger.debug("%s: %s", title_vals, value)
+ return value
+
+ return peeker
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py
new file mode 100644
index 0000000..d6e4ce3
--- /dev/null
+++ b/gn_libs/jobs/__init__.py
@@ -0,0 +1,11 @@
+"""This package deals with launching and managing background/async jobs."""
+from .migrations import run_migrations
+from .jobs import (job,
+ launch_job,
+ initialise_job,
+ push_to_stream,
+ update_metadata)
+
+def init_app(flask_app):
+ """Initialise the migrations for flask"""
+ run_migrations(flask_app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"])
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
new file mode 100644
index 0000000..ec1c3a8
--- /dev/null
+++ b/gn_libs/jobs/jobs.py
@@ -0,0 +1,213 @@
+"""Handle asynchronous/background jobs. Job data is stored in SQLite database(s)."""
+import os
+import sys
+import uuid
+import shlex
+import logging
+import subprocess
+from pathlib import Path
+from functools import partial
+from typing import Union, Optional
+from datetime import datetime, timezone, timedelta
+
+from gn_libs.sqlite3 import DbCursor, DbConnection, cursor as _cursor
+
+_logger_ = logging.getLogger(__name__)
+_DEFAULT_EXPIRY_SECONDS_ = 2 * 24 * 60 * 60 # 2 days, in seconds
+
+
+class JobNotFound(Exception):
+ """Raised if we try to retrieve a non-existent job."""
+
+
+def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict:
+ """Fetch extra job metadata."""
+ cursor.execute("SELECT * FROM jobs_metadata WHERE job_id=?", (str(job_id),))
+ return {
+ row["metadata_key"]: row["metadata_value"]
+ for row in cursor.fetchall()
+ }
+
+
+def job_stdstream_outputs(conn, job_id, streamname: str):
+ """Fetch the standard-error output for the job."""
+ with _cursor(conn) as cursor:
+ cursor.execute(
+ "SELECT * FROM jobs_standard_outputs "
+ "WHERE job_id=? AND output_stream=?",
+ (str(job_id), streamname))
+ return dict(cursor.fetchone() or {}).get("value")
+
+
+job_stderr = partial(job_stdstream_outputs, streamname="stderr")
+job_stdout = partial(job_stdstream_outputs, streamname="stdout")
+
+
+def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict:
+ """Fetch the job details for a job with a particular ID"""
+ with _cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),))
+ _job = dict(cursor.fetchone() or {})
+ if not bool(_job):
+ raise JobNotFound(f"Could not find job with ID {job_id}")
+
+ _job["metadata"] = __job_metadata__(cursor, job_id)
+
+ if fulldetails:
+ _job["stderr"] = job_stderr(conn, job_id)
+ _job["stdout"] = job_stdout(conn, job_id)
+
+ return _job
+
+
+def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict:
+ """Save the job to database."""
+
+ with _cursor(conn) as cursor:
+ job_id = str(the_job["job_id"])
+ expires = ((the_job["created"] + timedelta(seconds=expiry_seconds))
+ if expiry_seconds > 0 else None)
+ cursor.execute("INSERT INTO jobs(job_id, created, expires, command) "
+ "VALUES(:job_id, :created, :expires, :command)",
+ {
+ "job_id": job_id,
+ "created": the_job["created"].isoformat(),
+ "expires": (expires and expires.isoformat()),
+ "command": the_job["command"]
+ })
+ metadata = tuple({"job_id": job_id, "key": key, "value": value}
+ for key,value in the_job["metadata"].items())
+ if len(metadata) > 0:
+ cursor.executemany(
+ "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) "
+ "VALUES (:job_id, :key, :value)",
+ metadata)
+
+ return the_job
+
+
+def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-arguments]
+ conn: DbConnection,
+ job_id: uuid.UUID,
+ command: list,
+ job_type: str,
+ extra_meta: Optional[dict] = None,
+ expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_
+) -> dict:
+ """Initialise the job and put the details in a SQLite3 database."""
+ if extra_meta is None:
+ extra_meta = {}
+
+ _job = {
+ "job_id": job_id,
+ "command": shlex.join(command),
+ "created": datetime.now(timezone.utc),
+ "metadata": {
+ "status": "pending",
+ "percent": 0,
+ "job-type": job_type,
+ **extra_meta
+ }
+ }
+ return __save_job__(conn, _job, expiry_seconds)
+
+
+def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path:
+ """Compute the path for the file where the launcher's `stream` output goes"""
+ assert stream in ("stdout", "stderr"), f"Invalid stream '{stream}'"
+ return outdir.joinpath(f"launcher_job_{jobid}.{stream}")
+
+
+stdout_filename = partial(output_file, stream="stdout")
+stderr_filename = partial(output_file, stream="stderr")
+
+
+def build_environment(extras: Optional[dict[str, str]] = None) -> dict[str, str]:
+ """Setup the runtime environment variables for the background script."""
+ if extras is None:
+ extras = {}
+
+ return {
+ **dict(os.environ),
+ "PYTHONPATH": ":".join(sys.path),
+ **extras
+ }
+
+
+def launch_job(
+ the_job: dict,
+ sqlite3_url: str,
+ error_dir: Path,
+ worker_manager: str = "gn_libs.jobs.launcher",
+ loglevel: str = "info"
+) -> dict:
+ """Launch a job in the background"""
+ if not os.path.exists(error_dir):
+ os.mkdir(error_dir)
+
+ job_id = str(the_job["job_id"])
+ with (open(stderr_filename(jobid=the_job["job_id"], outdir=error_dir),
+ "w",
+ encoding="utf-8") as stderrfile,
+ open(stdout_filename(jobid=the_job["job_id"], outdir=error_dir),
+ "w",
+ encoding="utf-8") as stdoutfile):
+ subprocess.Popen( # pylint: disable=[consider-using-with]
+ [
+ sys.executable, "-u",
+ "-m", worker_manager,
+ sqlite3_url,
+ job_id,
+ str(error_dir),
+ "--log-level",
+ loglevel
+ ],
+ stdout=stdoutfile,
+ stderr=stderrfile,
+ env=build_environment())
+
+ return the_job
+
+
+def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str, value: str):
+ """Update the value of a metadata item."""
+ with _cursor(conn) as cursor:
+ cursor.execute(
+ "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) "
+ "VALUES (:job_id, :key, :value) "
+ "ON CONFLICT (job_id, metadata_key) DO UPDATE "
+ "SET metadata_value=:value "
+ "WHERE job_id=:job_id AND metadata_key=:key",
+ {
+ "job_id": str(job_id),
+ "key": key,
+ "value": value
+ })
+
+
+def push_to_stream(
+ conn: DbConnection,
+ job_id: Union[str, uuid.UUID],
+ stream_name: str, content: str
+):
+ """Initialise, and keep adding content to the stream from the provided content."""
+ with _cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM jobs_standard_outputs "
+ "WHERE job_id=:job_id AND output_stream=:stream",
+ {
+ "job_id": str(job_id),
+ "stream": stream_name
+ })
+ result = cursor.fetchone()
+ new_content = ((bool(result) and result["value"]) or "") + content
+ cursor.execute(
+ "INSERT INTO jobs_standard_outputs(job_id, output_stream, value) "
+ "VALUES(:job_id, :stream, :content) "
+ "ON CONFLICT (job_id, output_stream) DO UPDATE "
+ "SET value=:content "
+ "WHERE job_id=:job_id AND output_stream=:stream",
+ {
+ "job_id": str(job_id),
+ "stream": stream_name,
+ "content": new_content
+ })
diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py
new file mode 100644
index 0000000..d565f9e
--- /dev/null
+++ b/gn_libs/jobs/launcher.py
@@ -0,0 +1,108 @@
+"""Default launcher/manager script for background jobs."""
+import os
+import sys
+import time
+import shlex
+import logging
+import argparse
+import traceback
+import subprocess
+from uuid import UUID
+from pathlib import Path
+
+from gn_libs import jobs, sqlite3
+
+logger = logging.getLogger(__name__)
+
+
+def run_job(conn, job, outputs_directory: Path):
+ """Run the job."""
+ logger.info("Setting up the job.")
+ job_id = job["job_id"]
+ stdout_file = outputs_directory.joinpath(f"{job_id}.stdout")
+ stderr_file = outputs_directory.joinpath(f"{job_id}.stderr")
+ jobs.update_metadata(conn, job_id, "stdout-file", str(stdout_file))
+ jobs.update_metadata(conn, job_id, "stderr-file", str(stderr_file))
+ try:
+ logger.info("Launching the job in a separate process.")
+ with (stdout_file.open(mode="w") as outfile,
+ stderr_file.open(mode="w") as errfile,
+ stdout_file.open(mode="r") as stdout_in,
+ stderr_file.open(mode="r") as stderr_in,
+ subprocess.Popen(
+ shlex.split(job["command"]),
+ encoding="utf-8",
+ stdout=outfile,
+ stderr=errfile) as process):
+ while process.poll() is None:
+ jobs.update_metadata(conn, job_id, "status", "running")
+ jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read())
+ jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read())
+ time.sleep(1)
+
+ # Fetch any remaining content.
+ jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read())
+ jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read())
+ logger.info("Job completed. Cleaning up.")
+
+ os.remove(stdout_file)
+ os.remove(stderr_file)
+ exit_status = process.poll()
+ if exit_status == 0:
+ jobs.update_metadata(conn, job_id, "status", "completed")
+ else:
+ jobs.update_metadata(conn, job_id, "status", "error")
+
+ logger.info("exiting job manager/launcher")
+ return exit_status
+ except Exception as _exc:# pylint: disable=[broad-exception-caught]
+ logger.error("An exception was raised when attempting to run the job",
+ exc_info=True)
+ jobs.update_metadata(conn, job_id, "status", "error")
+ jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc())
+ return 4
+
+
+def parse_args():
+ """Define and parse CLI args."""
+ parser = argparse.ArgumentParser(
+ prog="GN Jobs Launcher",
+ description = (
+ "Generic launcher and manager of jobs defined with gn-libs"))
+ parser.add_argument(
+ "jobs_db_uri",
+ help="The URI to the SQLite3 database holding the jobs' details")
+ parser.add_argument(
+ "job_id", help="The id of the job being processed", type=UUID)
+ parser.add_argument("outputs_directory",
+ help="Directory where output files will be created",
+ type=Path)
+ parser.add_argument(
+ "--log-level",
+ type=str,
+ help="Determines what is logged out.",
+ choices=("debug", "info", "warning", "error", "critical"),
+ default="info")
+ return parser.parse_args()
+
+
+def main():
+ """Entry-point to this program."""
+ args = parse_args()
+ logger.setLevel(args.log_level.upper())
+ args.outputs_directory.mkdir(parents=True, exist_ok=True)
+ with sqlite3.connection(args.jobs_db_uri) as conn:
+ job = jobs.job(conn, args.job_id)
+ if job:
+ return run_job(conn, job, args.outputs_directory)
+
+ jobs.update_metadata(conn, args.job_id, "status", "error")
+ jobs.push_to_stream(conn, args.job_id, "stderr", "Job not found!")
+ return 2
+
+ return 3
+
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/gn_libs/jobs/migrations.py b/gn_libs/jobs/migrations.py
new file mode 100644
index 0000000..0c9825b
--- /dev/null
+++ b/gn_libs/jobs/migrations.py
@@ -0,0 +1,69 @@
+"""Database migrations for the jobs to ensure consistency downstream."""
+from gn_libs.protocols import DbCursor
+from gn_libs.sqlite3 import connection, cursor as acquire_cursor
+
+def __create_table_jobs__(cursor: DbCursor):
+ """Create the jobs table"""
+ cursor.execute(
+ """
+ CREATE TABLE IF NOT EXISTS jobs(
+ job_id TEXT PRIMARY KEY NOT NULL,
+ created TEXT NOT NULL,
+ expires TEXT,
+ command TEXT NOT NULL
+ ) WITHOUT ROWID
+ """)
+
+
+def __create_table_jobs_metadata__(cursor: DbCursor):
+ cursor.execute(
+ """
+ CREATE TABLE IF NOT EXISTS jobs_metadata(
+ job_id TEXT,
+ metadata_key TEXT NOT NULL,
+ metadata_value TEXT NOT NULL,
+ FOREIGN KEY(job_id) REFERENCES jobs(job_id)
+ ON UPDATE CASCADE ON DELETE RESTRICT,
+ PRIMARY KEY(job_id, metadata_key)
+ ) WITHOUT ROWID
+ """)
+ cursor.execute(
+ """
+ CREATE INDEX IF NOT EXISTS idx_tbl_jobs_metadata_cols_job_id
+ ON jobs_metadata(job_id)
+ """)
+
+
+def __create_table_jobs_output_streams__(cursor: DbCursor):
+ cursor.execute(
+ """
+ CREATE TABLE IF NOT EXISTS jobs_standard_outputs(
+ job_id TEXT NOT NULL,
+ output_stream TEXT,
+ value TEXT,
+ FOREIGN KEY(job_id) REFERENCES jobs(job_id)
+ ON UPDATE CASCADE ON DELETE RESTRICT,
+ CHECK (output_stream IN ('stdout', 'stderr')),
+ PRIMARY KEY(job_id, output_stream)
+ ) WITHOUT ROWID
+ """)
+ cursor.execute(
+ """
+ CREATE INDEX IF NOT EXISTS idx_tbl_jobs_standard_outputs_cols_job_id
+ ON jobs_standard_outputs(job_id)
+ """)
+ cursor.execute(
+ """
+ CREATE INDEX IF NOT EXISTS
+ idx_tbl_jobs_standard_outputs_cols_job_id_output_stream
+ ON jobs_standard_outputs(job_id, output_stream)
+ """)
+
+
+def run_migrations(sqlite_url: str):
+ """Run the migrations to setup the background jobs database."""
+ with (connection(sqlite_url) as conn,
+ acquire_cursor(conn) as curr):
+ __create_table_jobs__(curr)
+ __create_table_jobs_metadata__(curr)
+ __create_table_jobs_output_streams__(curr)
diff --git a/gn_libs/monadic_requests.py b/gn_libs/monadic_requests.py
new file mode 100644
index 0000000..a09acc5
--- /dev/null
+++ b/gn_libs/monadic_requests.py
@@ -0,0 +1,74 @@
+"""Wrap requests functions with monads."""
+import logging
+
+import requests
+from requests.models import Response
+from pymonad.either import Left, Right, Either
+
+logger = logging.getLogger(__name__)
+
+# HTML Status codes indicating a successful request.
+SUCCESS_CODES = (200, 201, 202, 203, 204, 205, 206, 207, 208, 226)
+
+
+def get(url, params=None, **kwargs) -> Either:
+ """
+ A wrapper around `requests.get` function.
+
+ Takes the same arguments as `requests.get`.
+
+ :rtype: pymonad.either.Either
+ """
+ timeout = kwargs.get("timeout")
+ kwargs = {key: val for key,val in kwargs.items() if key != "timeout"}
+ if timeout is None:
+ timeout = (9.13, 20)
+
+ try:
+ resp = requests.get(url, params=params, timeout=timeout, **kwargs)
+ if resp.status_code in SUCCESS_CODES:
+ return Right(resp.json())
+ return Left(resp)
+ except requests.exceptions.RequestException as exc:
+ return Left(exc)
+
+
+def post(url, data=None, json=None, **kwargs) -> Either:
+ """
+ A wrapper around `requests.post` function.
+
+ Takes the same arguments as `requests.post`.
+
+ :rtype: pymonad.either.Either
+ """
+ timeout = kwargs.get("timeout")
+ kwargs = {key: val for key,val in kwargs.items() if key != "timeout"}
+ if timeout is None:
+ timeout = (9.13, 20)
+
+ try:
+ resp = requests.post(url, data=data, json=json, timeout=timeout, **kwargs)
+ if resp.status_code in SUCCESS_CODES:
+ return Right(resp.json())
+ return Left(resp)
+ except requests.exceptions.RequestException as exc:
+ return Left(exc)
+
+
+def make_either_error_handler(msg):
+ """Make generic error handler for pymonads Either objects."""
+ def __fail__(error):
+ if issubclass(type(error), Exception):
+ logger.debug("\n\n%s (Exception)\n\n", msg, exc_info=True)
+ raise error
+ if issubclass(type(error), Response):
+ try:
+ _data = error.json()
+ except Exception as _exc:
+ raise Exception(error.content) from _exc# pylint: disable=[broad-exception-raised]
+ raise Exception(_data)# pylint: disable=[broad-exception-raised]
+
+ logger.debug("\n\n%s\n\n", msg)
+ raise Exception(error)# pylint: disable=[broad-exception-raised]
+
+ return __fail__
diff --git a/gn_libs/mysqldb.py b/gn_libs/mysqldb.py
index 64a649d..fec3b30 100644
--- a/gn_libs/mysqldb.py
+++ b/gn_libs/mysqldb.py
@@ -131,6 +131,11 @@ def database_connection(sql_uri: str, logger: logging.Logger = _logger) -> Itera
except mdb.Error as _mbde:
logger.error("DB error encountered", exc_info=True)
connection.rollback()
+ raise _mbde from None
+ except Exception as _exc:
+ connection.rollback()
+ logger.error("General exception encountered", exc_info=True)
+ raise _exc from None
finally:
connection.close()
diff --git a/gn_libs/protocols/__init__.py b/gn_libs/protocols/__init__.py
new file mode 100644
index 0000000..83a31a8
--- /dev/null
+++ b/gn_libs/protocols/__init__.py
@@ -0,0 +1,2 @@
+"""This package is a collection of major Protocols/Interfaces definitions."""
+from .db import DbCursor, DbConnection
diff --git a/gn_libs/protocols/db.py b/gn_libs/protocols/db.py
new file mode 100644
index 0000000..b365f8b
--- /dev/null
+++ b/gn_libs/protocols/db.py
@@ -0,0 +1,35 @@
+"""Generic database protocols."""
+from typing import Any, Protocol
+
+
+class DbCursor(Protocol):
+ """Type annotation for a generic database cursor object."""
+ def execute(self, *args, **kwargs) -> Any:
+ """Execute a single query"""
+
+ def executemany(self, *args, **kwargs) -> Any:
+ """
+ Execute parameterized SQL statement sql against all parameter sequences
+ or mappings found in the sequence parameters.
+ """
+
+ def fetchone(self, *args, **kwargs):
+ """Fetch single result if present, or `None`."""
+
+ def fetchmany(self, *args, **kwargs):
+ """Fetch many results if present or `None`."""
+
+ def fetchall(self, *args, **kwargs):
+ """Fetch all results if present or `None`."""
+
+
+class DbConnection(Protocol):
+ """Type annotation for a generic database connection object."""
+ def cursor(self) -> Any:
+ """A cursor object"""
+
+ def commit(self) -> Any:
+ """Commit the transaction."""
+
+ def rollback(self) -> Any:
+ """Rollback the transaction."""
diff --git a/gn_libs/sqlite3.py b/gn_libs/sqlite3.py
new file mode 100644
index 0000000..78e1c41
--- /dev/null
+++ b/gn_libs/sqlite3.py
@@ -0,0 +1,45 @@
+"""This module deals with connections to a(n) SQLite3 database."""
+import logging
+import traceback
+import contextlib
+from typing import Callable, Iterator
+
+import sqlite3
+
+from .protocols import DbCursor, DbConnection
+
+_logger_ = logging.getLogger(__name__)
+
+
+@contextlib.contextmanager
+def connection(db_path: str, row_factory: Callable = sqlite3.Row) -> Iterator[DbConnection]:
+ """Create the connection to the auth database."""
+ logging.debug("SQLite3 DB Path: '%s'.", db_path)
+ conn = sqlite3.connect(db_path)
+ conn.row_factory = row_factory
+ conn.set_trace_callback(logging.debug)
+ conn.execute("PRAGMA foreign_keys = ON")
+ try:
+ yield conn
+ except sqlite3.Error as exc:
+ conn.rollback()
+ _logger_.debug(traceback.format_exc())
+ raise exc
+ finally:
+ conn.commit()
+ conn.close()
+
+
+@contextlib.contextmanager
+def cursor(conn: DbConnection) -> Iterator[DbCursor]:
+ """Get a cursor from the given connection to the auth database."""
+ cur = conn.cursor()
+ try:
+ yield cur
+ conn.commit()
+ except sqlite3.Error as exc:
+ conn.rollback()
+ _logger_.debug(traceback.format_exc())
+ raise exc
+ finally:
+ cur.close()