diff options
| -rw-r--r-- | .guix-channel | 47 | ||||
| -rw-r--r-- | .guix/modules/gn-libs.scm | 2 | ||||
| -rw-r--r-- | gn_libs/http_logging.py | 56 | ||||
| -rw-r--r-- | gn_libs/jobs/__init__.py | 7 | ||||
| -rw-r--r-- | gn_libs/jobs/jobs.py | 90 | ||||
| -rw-r--r-- | gn_libs/jobs/launcher.py | 12 | ||||
| -rw-r--r-- | gn_libs/jobs/migrations.py | 24 | ||||
| -rw-r--r-- | gn_libs/logging.py | 41 | ||||
| -rw-r--r-- | gn_libs/mysqldb.py | 4 |
9 files changed, 268 insertions, 15 deletions
diff --git a/.guix-channel b/.guix-channel index 2f6b8a2..2c37401 100644 --- a/.guix-channel +++ b/.guix-channel @@ -3,18 +3,55 @@ (directory ".guix/modules") (dependencies (channel + (name gn-machines) + (url "https://git.genenetwork.org/gn-machines") + (branch "main")) + ;; Until https://issues.guix.gnu.org/68797 is resolved, we need to + ;; explicitly list guix-bioinformatics, guix-forge, guix-past and + ;; guix-rust-past-crates—the dependencies of the gn-machines channel—here. + (channel + (name guix) + (url "https://codeberg.org/guix/guix") + (branch "master") + (commit "0a4740705090acc4c8a10d4f53afc58c9f62e980") + (introduction + (channel-introduction + (version 0) + (commit "9edb3f66fd807b096b48283debdcddccfea34bad") + (signer + "BBB0 2DDF 2CEA F6A8 0D1D E643 A2A0 6DF2 A33A 54FA")))) + (channel + (name guix-forge) + (url "https://git.systemreboot.net/guix-forge/") + (branch "main") + (commit "e43fd9a4d73654d3876e2c698af7da89f3408f89") + (introduction + (channel-introduction + (version 0) + (commit "0432e37b20dd678a02efee21adf0b9525a670310") + (signer + "7F73 0343 F2F0 9F3C 77BF 79D3 2E25 EE8B 6180 2BB3")))) + (channel (name guix-bioinformatics) (url "https://git.genenetwork.org/guix-bioinformatics") - (branch "master")) - ;; FIXME: guix-bioinformatics depends on guix-past. So, there - ;; should be no reason to explicitly depend on guix-past. But, the - ;; channel does not build otherwise. This is probably a guix bug. + (commit "903465c85c9b2ae28480b236c3364da873ca8f51")) (channel (name guix-past) (url "https://codeberg.org/guix-science/guix-past") + (branch "master") (introduction (channel-introduction (version 0) (commit "c3bc94ee752ec545e39c1b8a29f739405767b51c") (signer - "3CE4 6455 8A84 FDC6 9DB4 0CFB 090B 1199 3D9A EBB5")))))) + "3CE4 6455 8A84 FDC6 9DB4 0CFB 090B 1199 3D9A EBB5")))) + (channel + (name guix-rust-past-crates) + (url "https://codeberg.org/guix/guix-rust-past-crates.git") + (branch "trunk") + (introduction + (channel-introduction + (version 0) + (commit "1db24ca92c28255b28076792b93d533eabb3dc6a") + (signer + "F4C2 D1DF 3FDE EA63 D1D3 0776 ACC6 6D09 CA52 8292")))))) diff --git a/.guix/modules/gn-libs.scm b/.guix/modules/gn-libs.scm index 78b362b..8fa03db 100644 --- a/.guix/modules/gn-libs.scm +++ b/.guix/modules/gn-libs.scm @@ -1,5 +1,5 @@ (define-module (gn-libs) - #:use-module ((gn packages genenetwork) #:select (gn-libs) #:prefix gn:) + #:use-module ((gn-machines genenetwork) #:select (gn-libs) #:prefix gn:) #:use-module ((gnu packages check) #:select (python-pylint)) #:use-module ((gnu packages python-check) #:select (python-mypy)) #:use-module (guix gexp) diff --git a/gn_libs/http_logging.py b/gn_libs/http_logging.py new file mode 100644 index 0000000..c65e0a4 --- /dev/null +++ b/gn_libs/http_logging.py @@ -0,0 +1,56 @@ +"""Provide a way to emit logs to an HTTP endpoint""" +import logging +import json +import traceback +import urllib.request +from datetime import datetime + + +class SilentHTTPHandler(logging.Handler): + """A logging handler that emits logs to an HTTP endpoint silently. + + This handler converts log records to JSON and sends them via POST + to a specified HTTP endpoint. Failures are suppressed to avoid + interfering with the main application. + """ + def __init__(self, endpoint, timeout=0.1): + super().__init__() + self.endpoint = endpoint + self.timeout = timeout + + def emit(self, record): + try: + payload = { + "timestamp": datetime.utcfromtimestamp(record.created).isoformat(), + "level": record.levelname.lower(), + "logger": record.name, + "message": record.getMessage(), + } + for attr in ("remote_addr", "user_agent", "extra"): + if hasattr(record, attr): + payload.update({attr: getattr(record, attr)}) + + if record.exc_info: + payload["exception"] = "".join( + traceback.format_exception(*record.exc_info) + ) + + # fire-and-forget + self._send(payload) + + except Exception:# pylint: disable=[broad-exception-caught] + # absolute silence + pass + + def _send(self, payload): + try: + req = urllib.request.Request( + url=self.endpoint, + data=json.dumps(payload).encode("utf-8"), + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=5) as resp: + resp.read() # ignore body + except Exception:# pylint: disable=[broad-exception-caught] + pass diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py index d6e4ce3..7927f8d 100644 --- a/gn_libs/jobs/__init__.py +++ b/gn_libs/jobs/__init__.py @@ -1,10 +1,15 @@ """This package deals with launching and managing background/async jobs.""" from .migrations import run_migrations from .jobs import (job, + kill_job, launch_job, + delete_job, + delete_jobs, initialise_job, push_to_stream, - update_metadata) + update_metadata, + jobs_by_external_id, + delete_expired_jobs) def init_app(flask_app): """Initialise the migrations for flask""" diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py index ec1c3a8..bccddd5 100644 --- a/gn_libs/jobs/jobs.py +++ b/gn_libs/jobs/jobs.py @@ -60,7 +60,34 @@ def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = F return _job -def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict: +def jobs_by_external_id(conn: DbConnection, external_id: Union[str, uuid.UUID]) -> tuple[dict, ...]: + """Fetch jobs by their external IDs.""" + with _cursor(conn) as cursor: + cursor.execute( + "SELECT jeids.external_id, jobs.* FROM jobs_external_ids AS jeids " + "INNER JOIN jobs ON jeids.job_id=jobs.job_id " + "WHERE jeids.external_id=? " + "ORDER BY jobs.created DESC", + (str(external_id),)) + _jobs = {row["job_id"]: {**dict(row), "metadata": {}} for row in cursor.fetchall()} + _jobs_ids = tuple(_job["job_id"] for _job in _jobs.values()) + + _paramstr = ", ".join(["?"] * len(_jobs_ids)) + cursor.execute( + f"SELECT * FROM jobs_metadata WHERE job_id IN ({_paramstr})", + _jobs_ids) + for row in cursor.fetchall(): + _jobs[row["job_id"]]["metadata"][row["metadata_key"]] = row["metadata_value"] + + return tuple(_jobs.values()) + + +def __save_job__( + conn: DbConnection, + the_job: dict, + expiry_seconds: int, + external_id: str = "" +) -> dict: """Save the job to database.""" with _cursor(conn) as cursor: @@ -75,6 +102,11 @@ def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict "expires": (expires and expires.isoformat()), "command": the_job["command"] }) + if bool(external_id.strip()): + cursor.execute( + "INSERT INTO jobs_external_ids(job_id, external_id) " + "VALUES(:job_id, :external_id)", + {"job_id": job_id, "external_id": external_id.strip()}) metadata = tuple({"job_id": job_id, "key": key, "value": value} for key,value in the_job["metadata"].items()) if len(metadata) > 0: @@ -92,12 +124,22 @@ def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-ar command: list, job_type: str, extra_meta: Optional[dict] = None, - expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_ + expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_, + external_id: Optional[Union[str, uuid.UUID]] = None ) -> dict: """Initialise the job and put the details in a SQLite3 database.""" if extra_meta is None: extra_meta = {} + def __process_external_id__(_id: Optional[Union[str, uuid.UUID]]) -> str: + if isinstance(_id, uuid.UUID): + return str(_id) + + if _id is not None and bool(_id.strip()): + return str(_id.strip()) + return "" + + _ext_id = __process_external_id__(external_id) _job = { "job_id": job_id, "command": shlex.join(command), @@ -106,10 +148,11 @@ def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-ar "status": "pending", "percent": 0, "job-type": job_type, - **extra_meta + **extra_meta, + **({"external_id": _ext_id} if bool(_ext_id) else {}) } } - return __save_job__(conn, _job, expiry_seconds) + return __save_job__(conn, _job, expiry_seconds, _ext_id) def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path: @@ -211,3 +254,42 @@ def push_to_stream( "stream": stream_name, "content": new_content }) + + +def delete_jobs( + conn: DbConnection, job_ids: tuple[Union[uuid.UUID, str], ...]) -> None: + """Delete the given jobs.""" + with _cursor(conn) as cursor: + _paramstr = ", ".join(["?"] * len(job_ids)) + _params = tuple(str(job_id) for job_id in job_ids) + cursor.execute( + f"DELETE FROM jobs_standard_outputs WHERE job_id IN ({_paramstr})", + _params) + cursor.execute( + f"DELETE FROM jobs_metadata WHERE job_id IN ({_paramstr})", + _params) + cursor.execute( + f"DELETE FROM jobs_external_ids WHERE job_id IN ({_paramstr})", + _params) + cursor.execute(f"DELETE FROM jobs WHERE job_id IN ({_paramstr})", + _params) + + +def delete_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None: + """Delete a specific job.""" + return delete_jobs(conn, (job_id,)) + + +def delete_expired_jobs(conn: DbConnection) -> None: + """Delete all jobs that are expired.""" + with _cursor(conn) as cursor: + cursor.execute( + "SELECT job_id FROM jobs WHERE datetime(expires) <= datetime()") + return delete_jobs( + conn, tuple(row["job_id"] for row in cursor.fetchall())) + + +def kill_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None: + """Send a request to kill the job.""" + return update_metadata( + conn, job_id, "hangup_request", datetime.now(timezone.utc).isoformat()) diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py index d565f9e..f915b81 100644 --- a/gn_libs/jobs/launcher.py +++ b/gn_libs/jobs/launcher.py @@ -3,6 +3,7 @@ import os import sys import time import shlex +import signal import logging import argparse import traceback @@ -34,8 +35,13 @@ def run_job(conn, job, outputs_directory: Path): encoding="utf-8", stdout=outfile, stderr=errfile) as process): + jobs.update_metadata(conn, job_id, "status", "running") while process.poll() is None: - jobs.update_metadata(conn, job_id, "status", "running") + _job = jobs.job(conn, job_id, True) + if bool(_job["metadata"].get("hangup_request")): + process.send_signal(signal.SIGHUP) + jobs.update_metadata(conn, job_id, "status", "stopped") + break; jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read()) jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read()) time.sleep(1) @@ -51,7 +57,9 @@ def run_job(conn, job, outputs_directory: Path): if exit_status == 0: jobs.update_metadata(conn, job_id, "status", "completed") else: - jobs.update_metadata(conn, job_id, "status", "error") + _job = jobs.job(conn, job_id, True) + if _job["metadata"]["status"] != "stopped": + jobs.update_metadata(conn, job_id, "status", "error") logger.info("exiting job manager/launcher") return exit_status diff --git a/gn_libs/jobs/migrations.py b/gn_libs/jobs/migrations.py index 0c9825b..2af16ae 100644 --- a/gn_libs/jobs/migrations.py +++ b/gn_libs/jobs/migrations.py @@ -60,6 +60,29 @@ def __create_table_jobs_output_streams__(cursor: DbCursor): """) +def __create_table_jobs_external_ids__(cursor: DbCursor): + """Create the jobs_external_ids table. + + The purpose of this table is to allow external systems to link background + jobs to specific users/events that triggered them. What the external IDs are + is irrelevant to the background jobs system here, and should not affect how + the system works.""" + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS jobs_external_ids( + job_id TEXT PRIMARY KEY NOT NULL, + external_id TEXT NOT NULL, + FOREIGN KEY(job_id) REFERENCES jobs(job_id) + ON UPDATE CASCADE ON DELETE RESTRICT + ) WITHOUT ROWID + """) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS idx_tbl_jobs_external_ids_cols_external_id + ON jobs_external_ids(external_id) + """) + + def run_migrations(sqlite_url: str): """Run the migrations to setup the background jobs database.""" with (connection(sqlite_url) as conn, @@ -67,3 +90,4 @@ def run_migrations(sqlite_url: str): __create_table_jobs__(curr) __create_table_jobs_metadata__(curr) __create_table_jobs_output_streams__(curr) + __create_table_jobs_external_ids__(curr) diff --git a/gn_libs/logging.py b/gn_libs/logging.py new file mode 100644 index 0000000..952d30f --- /dev/null +++ b/gn_libs/logging.py @@ -0,0 +1,41 @@ +"""Generalised setup for logging for Genenetwork systems.""" +import os +import logging + +from flask import Flask + +logging.basicConfig( + encoding="utf-8", + format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s") + + +def __log_gunicorn__(app: Flask) -> Flask: + """Set up logging for the WSGI environment with GUnicorn""" + logger = logging.getLogger("gunicorn.error") + app.logger.handlers = logger.handlers + app.logger.setLevel(logger.level) + return app + + +def __log_dev__(app: Flask) -> Flask: + """Set up logging for the development environment.""" + root_logger = logging.getLogger() + root_logger.setLevel( + app.config.get("LOG_LEVEL", app.config.get("LOGLEVEL", "WARNING"))) + + return app + + +def setup_logging(app: Flask) -> Flask: + """Set up logging for the application.""" + software, *_version_and_comments = os.environ.get( + "SERVER_SOFTWARE", "").split('/') + return __log_gunicorn__(app) if bool(software) else __log_dev__(app) + + +def setup_modules_logging(app_logger: logging.Logger, modules: tuple[str, ...]): + """Setup module-level loggers to the same log-level as the application.""" + loglevel = logging.getLevelName(app_logger.getEffectiveLevel()) + for module in modules: + _logger = logging.getLogger(module) + _logger.setLevel(loglevel) diff --git a/gn_libs/mysqldb.py b/gn_libs/mysqldb.py index 3f6390e..0239f7e 100644 --- a/gn_libs/mysqldb.py +++ b/gn_libs/mysqldb.py @@ -3,7 +3,7 @@ import logging import contextlib from logging import Logger from urllib.parse import urlparse -from typing import Any, Iterator, Protocol, Callable +from typing import Any, Union, Iterator, Protocol, Callable import MySQLdb as mdb from MySQLdb.cursors import Cursor @@ -45,7 +45,7 @@ def __parse_ssl_mode_options__(val: str) -> str: return _val -def __parse_ssl_options__(val: str) -> dict: +def __parse_ssl_options__(val: str) -> Union[dict, bool]: if val.strip() == "" or val.strip().lower() == "false": return False |
