about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--gn_libs/jobs/__init__.py6
-rw-r--r--gn_libs/jobs/jobs.py136
-rw-r--r--gn_libs/jobs/migrations.py68
3 files changed, 210 insertions, 0 deletions
diff --git a/gn_libs/jobs/__init__.py b/gn_libs/jobs/__init__.py
new file mode 100644
index 0000000..a1a5dd6
--- /dev/null
+++ b/gn_libs/jobs/__init__.py
@@ -0,0 +1,6 @@
+from .migrations import run_migrations
+from .jobs import job, launch_job, initialise_job
+
+def init_app(flask_app):
+    """Initialise the migrations for flask"""
+    run_migrations(flask_app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"])
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
new file mode 100644
index 0000000..1adbc33
--- /dev/null
+++ b/gn_libs/jobs/jobs.py
@@ -0,0 +1,136 @@
+"""Handle asynchronous/background jobs. Job data is stored in SQLite database(s)."""
+import os
+import sys
+import uuid
+import shlex
+import logging
+import subprocess
+from pathlib import Path
+from functools import reduce
+from typing import Union, Optional
+from datetime import datetime, timezone, timedelta
+
+from gn_libs.sqlite3 import DbCursor, DbConnection, cursor as _cursor
+
+_logger_ = logging.getLogger(__name__)
+_DEFAULT_EXPIRY_SECONDS_ = 2 * 24 * 60 * 60 # 2 days, in seconds
+
+
+class JobNotFound(Exception):
+    """Raised if we try to retrieve a non-existent job."""
+
+
+def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict:
+    """Fetch extra job metadata."""
+    cursor.execute("SELECT * FROM jobs_metadata WHERE job_id=?", (str(job_id),))
+    return {
+        row["metadata_key"]: row["metadata_value"]
+        for row in cursor.fetchall()
+    }
+
+
+def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict:
+    """Fetch the job details for a job with a particular ID"""
+    with _cursor(conn) as cursor:
+        cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),))
+        _job = dict(cursor.fetchone())
+        if not bool(_job):
+            raise JobNotFound(f"Could not find job with ID {job_id}")
+
+        _job["metadata"] = __job_metadata__(cursor, job_id)
+
+    if fulldetails:
+        _job["stderr"] = job_stderr(conn, job_id)
+        _job["stdout"] = job_stdout(conn, job_id)
+
+    return _job
+
+
+def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict:
+    """Save the job to database."""
+
+    with _cursor(conn) as cursor:
+        job_id = str(the_job["job_id"])
+        expires = ((the_job["created"] + timedelta(seconds=expiry_seconds))
+                   if expiry_seconds > 0 else None)
+        cursor.execute("INSERT INTO jobs(job_id, created, expires, command) "
+                       "VALUES(:job_id, :created, :expires, :command)",
+                       {
+                           "job_id": job_id,
+                           "created": the_job["created"].isoformat(),
+                           "expires": (expires and expires.isoformat()),
+                           "command": the_job["command"]
+                       })
+        metadata = tuple({"job_id": job_id, "key": key, "value": value}
+                         for key,value in the_job["metadata"].items())
+        if len(metadata) > 0:
+            cursor.executemany(
+                "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) "
+                "VALUES (:job_id, :key, :value)",
+                metadata)
+
+    return the_job
+
+
+def initialise_job(
+        conn: DbConnection,
+        job_id: uuid.UUID,
+        command: list,
+        job_type: str,
+        extra_meta: dict = {},
+        expiry_seconds: Optional[int] = _DEFAULT_EXPIRY_SECONDS_
+) -> dict:
+    """Initialise the job and put the details in a SQLite3 database."""
+    
+    _job = {
+        "job_id": job_id,
+        "command": shlex.join(command),
+        "created": datetime.now(timezone.utc),
+        "metadata": {
+            "status": "pending",
+            "percent": 0,
+            "job-type": job_type,
+            **extra_meta
+        }
+    }
+    return __save_job__(conn, _job, expiry_seconds)
+
+
+def error_filename(jobid, error_dir):
+    "Compute the path of the file where errors will be dumped."
+    return f"{error_dir}/job_{jobid}.error"
+
+
+def launch_job(
+        the_job: dict,
+        sqlite3_url: str,
+        error_dir: Path,
+        worker_manager: str = "scripts.worker"
+) -> dict:
+    """Launch a job in the background"""
+    if not os.path.exists(error_dir):
+        os.mkdir(error_dir)
+
+    job_id = str(the_job["job_id"])
+    with open(error_filename(job_id, error_dir),
+              "w",
+              encoding="utf-8") as errorfile:
+        subprocess.Popen( # pylint: disable=[consider-using-with]
+            [sys.executable, "-m", worker_manager, sqlite3_url, job_id],
+            stderr=errorfile,
+            env={"PYTHONPATH": ":".join(sys.path)})
+
+    return the_job
+
+
+def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str, value: str):
+    """Update the value of a metadata item."""
+    with _cursor(conn) as cursor:
+        cursor.execute(
+            "UPDATE jobs_metadata SET metadata_value=:value "
+            "WHERE job_id=:job_id AND metadata_key=:key",
+            {
+                "job_id": str(job_id),
+                "key": key,
+                "value": value
+            })
diff --git a/gn_libs/jobs/migrations.py b/gn_libs/jobs/migrations.py
new file mode 100644
index 0000000..cc463e3
--- /dev/null
+++ b/gn_libs/jobs/migrations.py
@@ -0,0 +1,68 @@
+"""Database migrations for the jobs to ensure consistency downstream."""
+from gn_libs.protocols import DbCursor
+from gn_libs.sqlite3 import cursor, connection
+
+def __create_table_jobs__(cursor: DbCursor):
+    """Create the jobs table"""
+    cursor.execute(
+        """
+        CREATE TABLE IF NOT EXISTS jobs(
+          job_id TEXT PRIMARY KEY NOT NULL,
+          created TEXT NOT NULL,
+          expires TEXT,
+          command TEXT NOT NULL
+        ) WITHOUT ROWID
+        """)
+
+
+def __create_table_jobs_metadata__(cursor: DbCursor):
+    cursor.execute(
+        """
+        CREATE TABLE IF NOT EXISTS jobs_metadata(
+          job_id TEXT,
+          metadata_key TEXT NOT NULL,
+          metadata_value TEXT NOT NULL,
+          FOREIGN KEY(job_id) REFERENCES jobs(job_id)
+            ON UPDATE CASCADE ON DELETE RESTRICT,
+          PRIMARY KEY(job_id, metadata_key)
+        ) WITHOUT ROWID
+        """)
+    cursor.execute(
+        """
+        CREATE INDEX IF NOT EXISTS idx_tbl_jobs_metadata_cols_job_id
+        ON jobs_metadata(job_id)
+        """)
+
+
+def __create_table_jobs_output_streams__(cursor: DbCursor):
+    cursor.execute(
+        """
+        CREATE TABLE IF NOT EXISTS jobs_standard_outputs(
+          job_id TEXT PRIMARY KEY,
+          output_stream TEXT,
+          timestamp TEXT,
+          value TEXT,
+          FOREIGN KEY(job_id) REFERENCES jobs(job_id)
+            ON UPDATE CASCADE ON DELETE RESTRICT,
+          CHECK (output_stream IN ('stdout', 'stderr'))
+        ) WITHOUT ROWID
+        """)
+    cursor.execute(
+        """
+        CREATE INDEX IF NOT EXISTS idx_tbl_jobs_standard_outputs_cols_job_id
+        ON jobs_standard_outputs(job_id)
+        """)
+    cursor.execute(
+        """
+        CREATE INDEX IF NOT EXISTS
+        idx_tbl_jobs_standard_outputs_cols_job_id_output_stream
+        ON jobs_standard_outputs(job_id, output_stream)
+        """)
+
+
+def run_migrations(sqlite_url: str):
+    with (connection(sqlite_url) as conn,
+          cursor(conn) as curr):
+        __create_table_jobs__(curr)
+        __create_table_jobs_metadata__(curr)
+        __create_table_jobs_output_streams__(curr)