about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.guix-channel53
-rw-r--r--gn_libs/http_logging.py4
-rw-r--r--gn_libs/jobs/__init__.py7
-rw-r--r--gn_libs/jobs/jobs.py90
-rw-r--r--gn_libs/jobs/launcher.py12
-rw-r--r--gn_libs/jobs/migrations.py24
-rw-r--r--gn_libs/logging.py41
-rw-r--r--gn_libs/monadic_requests.py2
-rw-r--r--gn_libs/mysqldb.py4
-rw-r--r--gn_libs/privileges/__init__.py6
-rw-r--r--gn_libs/privileges/authspec.py (renamed from gn_libs/privileges.py)11
-rw-r--r--gn_libs/privileges/resources.py74
-rw-r--r--gn_libs/privileges/system.py18
-rw-r--r--gn_libs/sqlite3.py11
14 files changed, 344 insertions, 13 deletions
diff --git a/.guix-channel b/.guix-channel
index 90db71e..bfc31db 100644
--- a/.guix-channel
+++ b/.guix-channel
@@ -5,4 +5,55 @@
   (channel
    (name gn-machines)
    (url "https://git.genenetwork.org/gn-machines")
-   (branch "main"))))
+   (branch "main"))
+  ;; Until https://issues.guix.gnu.org/68797 is resolved, we need to
+  ;; explicitly list guix-bioinformatics, guix-forge, guix-past and
+  ;; guix-rust-past-crates—the dependencies of the gn-machines channel—here.
+  (channel
+   (name guix)
+   (url "https://codeberg.org/guix/guix")
+   (branch "master")
+   (commit "0a4740705090acc4c8a10d4f53afc58c9f62e980")
+   (introduction
+    (channel-introduction
+     (version 0)
+     (commit "9edb3f66fd807b096b48283debdcddccfea34bad")
+     (signer
+      "BBB0 2DDF 2CEA F6A8 0D1D  E643 A2A0 6DF2 A33A 54FA"))))
+  (channel
+   (name guix-forge)
+   (url "https://git.systemreboot.net/guix-forge/")
+   (branch "main")
+   (commit "e43fd9a4d73654d3876e2c698af7da89f3408f89")
+   (introduction
+    (channel-introduction
+     (version 0)
+     (commit "0432e37b20dd678a02efee21adf0b9525a670310")
+     (signer
+      "7F73 0343 F2F0 9F3C 77BF  79D3 2E25 EE8B 6180 2BB3"))))
+  (channel
+   (name guix-bioinformatics)
+   (url "https://git.genenetwork.org/guix-bioinformatics")
+   (commit "9b0955f14ec725990abb1f6af3b9f171e4943f77"))
+  (channel
+   (name guix-past)
+   (url "https://codeberg.org/guix-science/guix-past")
+   (branch "master")
+   (commit "473c942b509ab3ead35159d27dfbf2031a36cd4d")
+   (introduction
+    (channel-introduction
+     (version 0)
+     (commit "c3bc94ee752ec545e39c1b8a29f739405767b51c")
+     (signer
+      "3CE4 6455 8A84 FDC6 9DB4  0CFB 090B 1199 3D9A EBB5"))))
+  (channel
+   (name guix-rust-past-crates)
+   (url "https://codeberg.org/guix/guix-rust-past-crates.git")
+   (branch "trunk")
+   (commit "b8b7ffbd1cec9f56f93fae4da3a74163bbc9c570")
+   (introduction
+    (channel-introduction
+     (version 0)
+     (commit "1db24ca92c28255b28076792b93d533eabb3dc6a")
+     (signer
+      "F4C2 D1DF 3FDE EA63 D1D3 0776 ACC6 6D09 CA52 8292"))))))
diff --git a/gn_libs/http_logging.py b/gn_libs/http_logging.py
index 79660a8..c65e0a4 100644
--- a/gn_libs/http_logging.py
+++ b/gn_libs/http_logging.py
@@ -38,7 +38,7 @@ class SilentHTTPHandler(logging.Handler):
             # fire-and-forget
             self._send(payload)
 
-        except Exception:
+        except Exception:# pylint: disable=[broad-exception-caught]
             # absolute silence
             pass
 
@@ -52,5 +52,5 @@ class SilentHTTPHandler(logging.Handler):
             )
             with urllib.request.urlopen(req, timeout=5) as resp:
                 resp.read()  # ignore body
-        except Exception:
+        except Exception:# pylint: disable=[broad-exception-caught]
             pass
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py
index d6e4ce3..7927f8d 100644
--- a/gn_libs/jobs/__init__.py
+++ b/gn_libs/jobs/__init__.py
@@ -1,10 +1,15 @@
 """This package deals with launching and managing background/async jobs."""
 from .migrations import run_migrations
 from .jobs import (job,
+                   kill_job,
                    launch_job,
+                   delete_job,
+                   delete_jobs,
                    initialise_job,
                    push_to_stream,
-                   update_metadata)
+                   update_metadata,
+                   jobs_by_external_id,
+                   delete_expired_jobs)
 
 def init_app(flask_app):
     """Initialise the migrations for flask"""
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
index ec1c3a8..bccddd5 100644
--- a/gn_libs/jobs/jobs.py
+++ b/gn_libs/jobs/jobs.py
@@ -60,7 +60,34 @@ def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = F
     return _job
 
 
-def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict:
+def jobs_by_external_id(conn: DbConnection, external_id: Union[str, uuid.UUID]) -> tuple[dict, ...]:
+    """Fetch jobs by their external IDs."""
+    with _cursor(conn) as cursor:
+        cursor.execute(
+            "SELECT jeids.external_id, jobs.* FROM jobs_external_ids AS jeids "
+            "INNER JOIN jobs ON jeids.job_id=jobs.job_id "
+            "WHERE jeids.external_id=? "
+            "ORDER BY jobs.created DESC",
+            (str(external_id),))
+        _jobs = {row["job_id"]: {**dict(row), "metadata": {}} for row in cursor.fetchall()}
+        _jobs_ids = tuple(_job["job_id"] for _job in _jobs.values())
+
+        _paramstr = ", ".join(["?"] * len(_jobs_ids))
+        cursor.execute(
+            f"SELECT * FROM jobs_metadata WHERE job_id IN ({_paramstr})",
+            _jobs_ids)
+        for row in cursor.fetchall():
+            _jobs[row["job_id"]]["metadata"][row["metadata_key"]] = row["metadata_value"]
+
+        return tuple(_jobs.values())
+
+
+def __save_job__(
+        conn: DbConnection,
+        the_job: dict,
+        expiry_seconds: int,
+        external_id: str = ""
+) -> dict:
     """Save the job to database."""
 
     with _cursor(conn) as cursor:
@@ -75,6 +102,11 @@ def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict
                            "expires": (expires and expires.isoformat()),
                            "command": the_job["command"]
                        })
+        if bool(external_id.strip()):
+            cursor.execute(
+                "INSERT INTO jobs_external_ids(job_id, external_id) "
+                "VALUES(:job_id, :external_id)",
+                {"job_id": job_id, "external_id": external_id.strip()})
         metadata = tuple({"job_id": job_id, "key": key, "value": value}
                          for key,value in the_job["metadata"].items())
         if len(metadata) > 0:
@@ -92,12 +124,22 @@ def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-ar
         command: list,
         job_type: str,
         extra_meta: Optional[dict] = None,
-        expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_
+        expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_,
+        external_id: Optional[Union[str, uuid.UUID]] = None
 ) -> dict:
     """Initialise the job and put the details in a SQLite3 database."""
     if extra_meta is None:
         extra_meta = {}
 
+    def __process_external_id__(_id: Optional[Union[str, uuid.UUID]]) -> str:
+        if isinstance(_id, uuid.UUID):
+            return str(_id)
+
+        if _id is not None and bool(_id.strip()):
+            return str(_id.strip())
+        return ""
+
+    _ext_id = __process_external_id__(external_id)
     _job = {
         "job_id": job_id,
         "command": shlex.join(command),
@@ -106,10 +148,11 @@ def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-ar
             "status": "pending",
             "percent": 0,
             "job-type": job_type,
-            **extra_meta
+            **extra_meta,
+            **({"external_id": _ext_id} if bool(_ext_id) else {})
         }
     }
-    return __save_job__(conn, _job, expiry_seconds)
+    return __save_job__(conn, _job, expiry_seconds, _ext_id)
 
 
 def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path:
@@ -211,3 +254,42 @@ def push_to_stream(
                 "stream": stream_name,
                 "content": new_content
             })
+
+
+def delete_jobs(
+        conn: DbConnection, job_ids: tuple[Union[uuid.UUID, str], ...]) -> None:
+    """Delete the given jobs."""
+    with _cursor(conn) as cursor:
+        _paramstr = ", ".join(["?"] * len(job_ids))
+        _params = tuple(str(job_id) for job_id in job_ids)
+        cursor.execute(
+            f"DELETE FROM jobs_standard_outputs WHERE job_id IN ({_paramstr})",
+            _params)
+        cursor.execute(
+            f"DELETE FROM jobs_metadata WHERE job_id IN ({_paramstr})",
+            _params)
+        cursor.execute(
+            f"DELETE FROM jobs_external_ids WHERE job_id IN ({_paramstr})",
+            _params)
+        cursor.execute(f"DELETE FROM jobs WHERE job_id IN ({_paramstr})",
+                       _params)
+
+
+def delete_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None:
+    """Delete a specific job."""
+    return delete_jobs(conn, (job_id,))
+
+
+def delete_expired_jobs(conn: DbConnection) -> None:
+    """Delete all jobs that are expired."""
+    with _cursor(conn) as cursor:
+        cursor.execute(
+            "SELECT job_id FROM jobs WHERE datetime(expires) <= datetime()")
+        return delete_jobs(
+            conn, tuple(row["job_id"] for row in cursor.fetchall()))
+
+
+def kill_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None:
+    """Send a request to kill the job."""
+    return update_metadata(
+        conn, job_id, "hangup_request", datetime.now(timezone.utc).isoformat())
diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py
index d565f9e..fd171b8 100644
--- a/gn_libs/jobs/launcher.py
+++ b/gn_libs/jobs/launcher.py
@@ -3,6 +3,7 @@ import os
 import sys
 import time
 import shlex
+import signal
 import logging
 import argparse
 import traceback
@@ -34,8 +35,13 @@ def run_job(conn, job, outputs_directory: Path):
                   encoding="utf-8",
                   stdout=outfile,
                   stderr=errfile) as process):
+            jobs.update_metadata(conn, job_id, "status", "running")
             while process.poll() is None:
-                jobs.update_metadata(conn, job_id, "status", "running")
+                _job = jobs.job(conn, job_id, True)
+                if bool(_job["metadata"].get("hangup_request")):
+                    process.send_signal(signal.SIGHUP)
+                    jobs.update_metadata(conn, job_id, "status", "stopped")
+                    break
                 jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read())
                 jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read())
                 time.sleep(1)
@@ -51,7 +57,9 @@ def run_job(conn, job, outputs_directory: Path):
         if exit_status == 0:
             jobs.update_metadata(conn, job_id, "status", "completed")
         else:
-            jobs.update_metadata(conn, job_id, "status", "error")
+            _job = jobs.job(conn, job_id, True)
+            if _job["metadata"]["status"] != "stopped":
+                jobs.update_metadata(conn, job_id, "status", "error")
 
         logger.info("exiting job manager/launcher")
         return exit_status
diff --git a/gn_libs/jobs/migrations.py b/gn_libs/jobs/migrations.py
index 0c9825b..2af16ae 100644
--- a/gn_libs/jobs/migrations.py
+++ b/gn_libs/jobs/migrations.py
@@ -60,6 +60,29 @@ def __create_table_jobs_output_streams__(cursor: DbCursor):
         """)
 
 
+def __create_table_jobs_external_ids__(cursor: DbCursor):
+    """Create the jobs_external_ids table.
+
+    The purpose of this table is to allow external systems to link background
+    jobs to specific users/events that triggered them. What the external IDs are
+    is irrelevant to the background jobs system here, and should not affect how
+    the system works."""
+    cursor.execute(
+        """
+        CREATE TABLE IF NOT EXISTS jobs_external_ids(
+          job_id TEXT PRIMARY KEY NOT NULL,
+          external_id TEXT NOT NULL,
+          FOREIGN KEY(job_id) REFERENCES jobs(job_id)
+            ON UPDATE CASCADE ON DELETE RESTRICT
+        ) WITHOUT ROWID
+        """)
+    cursor.execute(
+        """
+        CREATE INDEX IF NOT EXISTS idx_tbl_jobs_external_ids_cols_external_id
+        ON jobs_external_ids(external_id)
+        """)
+
+
 def run_migrations(sqlite_url: str):
     """Run the migrations to setup the background jobs database."""
     with (connection(sqlite_url) as conn,
@@ -67,3 +90,4 @@ def run_migrations(sqlite_url: str):
         __create_table_jobs__(curr)
         __create_table_jobs_metadata__(curr)
         __create_table_jobs_output_streams__(curr)
+        __create_table_jobs_external_ids__(curr)
diff --git a/gn_libs/logging.py b/gn_libs/logging.py
new file mode 100644
index 0000000..952d30f
--- /dev/null
+++ b/gn_libs/logging.py
@@ -0,0 +1,41 @@
+"""Generalised setup for logging for Genenetwork systems."""
+import os
+import logging
+
+from flask import Flask
+
+logging.basicConfig(
+    encoding="utf-8",
+    format="%(asctime)s — %(filename)s:%(lineno)s — %(levelname)s: %(message)s")
+
+
+def __log_gunicorn__(app: Flask) -> Flask:
+    """Set up logging for the WSGI environment with GUnicorn"""
+    logger = logging.getLogger("gunicorn.error")
+    app.logger.handlers = logger.handlers
+    app.logger.setLevel(logger.level)
+    return app
+
+
+def __log_dev__(app: Flask) -> Flask:
+    """Set up logging for the development environment."""
+    root_logger = logging.getLogger()
+    root_logger.setLevel(
+        app.config.get("LOG_LEVEL", app.config.get("LOGLEVEL", "WARNING")))
+
+    return app
+
+
+def setup_logging(app: Flask) -> Flask:
+    """Set up logging for the application."""
+    software, *_version_and_comments = os.environ.get(
+        "SERVER_SOFTWARE", "").split('/')
+    return __log_gunicorn__(app) if bool(software) else __log_dev__(app)
+
+
+def setup_modules_logging(app_logger: logging.Logger, modules: tuple[str, ...]):
+    """Setup module-level loggers to the same log-level as the application."""
+    loglevel = logging.getLevelName(app_logger.getEffectiveLevel())
+    for module in modules:
+        _logger = logging.getLogger(module)
+        _logger.setLevel(loglevel)
diff --git a/gn_libs/monadic_requests.py b/gn_libs/monadic_requests.py
index a09acc5..1db1aa0 100644
--- a/gn_libs/monadic_requests.py
+++ b/gn_libs/monadic_requests.py
@@ -26,6 +26,7 @@ def get(url, params=None, **kwargs) -> Either:
 
     try:
         resp = requests.get(url, params=params, timeout=timeout, **kwargs)
+        resp.raise_for_status()
         if resp.status_code in SUCCESS_CODES:
             return Right(resp.json())
         return Left(resp)
@@ -48,6 +49,7 @@ def post(url, data=None, json=None, **kwargs) -> Either:
 
     try:
         resp = requests.post(url, data=data, json=json, timeout=timeout, **kwargs)
+        resp.raise_for_status()
         if resp.status_code in SUCCESS_CODES:
             return Right(resp.json())
         return Left(resp)
diff --git a/gn_libs/mysqldb.py b/gn_libs/mysqldb.py
index 3f6390e..0239f7e 100644
--- a/gn_libs/mysqldb.py
+++ b/gn_libs/mysqldb.py
@@ -3,7 +3,7 @@ import logging
 import contextlib
 from logging import Logger
 from urllib.parse import urlparse
-from typing import Any, Iterator, Protocol, Callable
+from typing import Any, Union, Iterator, Protocol, Callable
 
 import MySQLdb as mdb
 from MySQLdb.cursors import Cursor
@@ -45,7 +45,7 @@ def __parse_ssl_mode_options__(val: str) -> str:
     return _val
 
 
-def __parse_ssl_options__(val: str) -> dict:
+def __parse_ssl_options__(val: str) -> Union[dict, bool]:
     if val.strip() == "" or val.strip().lower() == "false":
         return False
 
diff --git a/gn_libs/privileges/__init__.py b/gn_libs/privileges/__init__.py
new file mode 100644
index 0000000..0cab6b3
--- /dev/null
+++ b/gn_libs/privileges/__init__.py
@@ -0,0 +1,6 @@
+"""This package contains code useful for checking privileges."""
+from .authspec import (check,
+                       parse,
+                       SpecificationValueError,
+                       privileges_fulfill_specs)
+from . import resources
diff --git a/gn_libs/privileges.py b/gn_libs/privileges/authspec.py
index 32c943d..2819f9d 100644
--- a/gn_libs/privileges.py
+++ b/gn_libs/privileges/authspec.py
@@ -164,3 +164,14 @@ def check(spec: str, privileges: tuple[str, ...]) -> bool:
     """Check that the sequence of `privileges` satisfies `spec`."""
     _spec = parse(spec)
     return _OPERATOR_FUNCTION_[_spec[0]](privileges, *_spec[1:])
+
+
+def privileges_fulfill_specs(
+        resource_privileges: tuple[str, ...],
+        system_privileges: tuple[str, ...],
+        resource_spec: str,
+        system_spec: str
+) -> bool:
+    """Check whether a user's privileges fulfill the given specs."""
+    return (check(resource_spec, resource_privileges) or
+            check(system_spec, system_privileges))
diff --git a/gn_libs/privileges/resources.py b/gn_libs/privileges/resources.py
new file mode 100644
index 0000000..217a57d
--- /dev/null
+++ b/gn_libs/privileges/resources.py
@@ -0,0 +1,74 @@
+"""Privilege checks for resources"""
+import logging
+from functools import partial
+
+from .authspec import check, privileges_fulfill_specs
+
+
+logger = logging.getLogger(__name__)
+
+
+can_view = partial(
+    privileges_fulfill_specs,
+    resource_spec=(
+        "(OR group:resource:view-resource system:inbredset:view-case-attribute "
+        "    system:resource:public-read)"),
+    system_spec="(OR system:system-wide:data:view)")
+
+
+can_edit = partial(
+    privileges_fulfill_specs,
+    resource_spec=(
+        "(OR "
+        "  (AND group:resource:view-resource group:resource:edit-resource) "
+        "  (AND system:inbredset:view-case-attribute "
+        "       system:inbredset:edit-case-attribute))"),
+    system_spec=(
+        "(OR "
+        "  (AND system:system-wide:data:view system:system-wide:data:edit))"))
+
+
+def can_batch_edit(queried_privileges: tuple[str, ...]) -> bool:
+    """Check whether user has batch-editing privileges."""
+    return check(
+        ("(AND system:data:batch-edit system:system-wide:data:view"
+         "  system:system-wide:data:edit)"),
+        queried_privileges)
+
+
+can_create = partial(
+    privileges_fulfill_specs,
+    resource_spec=("(OR group:resource:create-resource "
+                   "    system:inbredset:create-case-attribute)"),
+    system_spec="(OR system:system-wide:data:create)")
+
+
+can_delete = partial(
+    privileges_fulfill_specs,
+    resource_spec=(
+        "(OR "
+        "  (AND group:resource:view-resource "
+        "       group:resource:edit-resource group:resource:delete-resource) "
+        "  (AND system:inbredset:view-case-attribute "
+        "       system:inbredset:edit-case-attribute "
+        "       system:inbredset:delete-case-attribute))"),
+    system_spec=(
+        "(OR "
+        "  (AND system:system-wide:data:view system:system-wide:data:edit "
+        "       system:system-wide:data:delete))"))
+
+
+can_apply_or_reject_edit = partial(
+    privileges_fulfill_specs,
+    resource_spec=(
+        "(AND system:inbredset:view-case-attribute "
+        "     system:inbredset:edit-case-attribute "
+        "     system:inbredset:delete-case-attribute "
+        "     system:inbredset:apply-case-attribute-edit "
+        "     system:inbredset:reject-case-attribute-edit)"),
+    system_spec=(
+        "(AND system:system-wide:inbredset:view-case-attribute "
+        "     system:system-wide:inbredset:edit-case-attribute "
+        "     system:system-wide:inbredset:delete-case-attribute "
+        "     system:system-wide:inbredset:apply-case-attribute-edit "
+        "     system:system-wide:inbredset:reject-case-attribute-edit)"))
diff --git a/gn_libs/privileges/system.py b/gn_libs/privileges/system.py
new file mode 100644
index 0000000..85e62f9
--- /dev/null
+++ b/gn_libs/privileges/system.py
@@ -0,0 +1,18 @@
+"""Checks for privileges for system-level actions."""
+import logging
+from functools import partial
+
+from .authspec import check
+
+
+logger = logging.getLogger(__name__)
+
+
+def can_link_data(system_privileges: tuple[str, ...]) -> bool:
+    """Check whether user is allowed to link data to user groups."""
+    return check("(AND system:data:link-to-group)", system_privileges)
+
+
+def can_masquerade(system_privileges: tuple[str, ...]) -> bool:
+    """Check whether the user is allowed to masquerade as a different user."""
+    return check("(AND system:user:masquerade)", system_privileges)
diff --git a/gn_libs/sqlite3.py b/gn_libs/sqlite3.py
index 78e1c41..c8bef0d 100644
--- a/gn_libs/sqlite3.py
+++ b/gn_libs/sqlite3.py
@@ -2,7 +2,7 @@
 import logging
 import traceback
 import contextlib
-from typing import Callable, Iterator
+from typing import Callable, Iterator, Any
 
 import sqlite3
 
@@ -43,3 +43,12 @@ def cursor(conn: DbConnection) -> Iterator[DbCursor]:
         raise exc
     finally:
         cur.close()
+
+
+def with_db_connection(db_uri: str, func: Callable[[DbConnection], Any]) -> Any:
+    """
+    Call `func`, a function of one argument with the SQLite3 connection created
+    from the connection string `db_uri`.
+    """
+    with connection(db_uri) as conn:
+        return func(conn)