about summary refs log tree commit diff
path: root/gn_libs/jobs/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_libs/jobs/jobs.py')
-rw-r--r--gn_libs/jobs/jobs.py146
1 files changed, 125 insertions, 21 deletions
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
index 1f66772..bccddd5 100644
--- a/gn_libs/jobs/jobs.py
+++ b/gn_libs/jobs/jobs.py
@@ -6,7 +6,6 @@ import shlex
 import logging
 import subprocess
 from pathlib import Path
-from functools import reduce
 from functools import partial
 from typing import Union, Optional
 from datetime import datetime, timezone, timedelta
@@ -41,14 +40,14 @@ def job_stdstream_outputs(conn, job_id, streamname: str):
 
 
 job_stderr = partial(job_stdstream_outputs, streamname="stderr")
-job_stdout = partial(job_stdstream_outputs, streamname="stderr")
+job_stdout = partial(job_stdstream_outputs, streamname="stdout")
 
 
 def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict:
     """Fetch the job details for a job with a particular ID"""
     with _cursor(conn) as cursor:
         cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),))
-        _job = dict(cursor.fetchone())
+        _job = dict(cursor.fetchone() or {})
         if not bool(_job):
             raise JobNotFound(f"Could not find job with ID {job_id}")
 
@@ -61,7 +60,34 @@ def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = F
     return _job
 
 
-def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict:
+def jobs_by_external_id(conn: DbConnection, external_id: Union[str, uuid.UUID]) -> tuple[dict, ...]:
+    """Fetch jobs by their external IDs."""
+    with _cursor(conn) as cursor:
+        cursor.execute(
+            "SELECT jeids.external_id, jobs.* FROM jobs_external_ids AS jeids "
+            "INNER JOIN jobs ON jeids.job_id=jobs.job_id "
+            "WHERE jeids.external_id=? "
+            "ORDER BY jobs.created DESC",
+            (str(external_id),))
+        _jobs = {row["job_id"]: {**dict(row), "metadata": {}} for row in cursor.fetchall()}
+        _jobs_ids = tuple(_job["job_id"] for _job in _jobs.values())
+
+        _paramstr = ", ".join(["?"] * len(_jobs_ids))
+        cursor.execute(
+            f"SELECT * FROM jobs_metadata WHERE job_id IN ({_paramstr})",
+            _jobs_ids)
+        for row in cursor.fetchall():
+            _jobs[row["job_id"]]["metadata"][row["metadata_key"]] = row["metadata_value"]
+
+        return tuple(_jobs.values())
+
+
+def __save_job__(
+        conn: DbConnection,
+        the_job: dict,
+        expiry_seconds: int,
+        external_id: str = ""
+) -> dict:
     """Save the job to database."""
 
     with _cursor(conn) as cursor:
@@ -76,6 +102,11 @@ def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict
                            "expires": (expires and expires.isoformat()),
                            "command": the_job["command"]
                        })
+        if bool(external_id.strip()):
+            cursor.execute(
+                "INSERT INTO jobs_external_ids(job_id, external_id) "
+                "VALUES(:job_id, :external_id)",
+                {"job_id": job_id, "external_id": external_id.strip()})
         metadata = tuple({"job_id": job_id, "key": key, "value": value}
                          for key,value in the_job["metadata"].items())
         if len(metadata) > 0:
@@ -87,16 +118,28 @@ def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict
     return the_job
 
 
-def initialise_job(
+def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-arguments]
         conn: DbConnection,
         job_id: uuid.UUID,
         command: list,
         job_type: str,
-        extra_meta: dict = {},
-        expiry_seconds: Optional[int] = _DEFAULT_EXPIRY_SECONDS_
+        extra_meta: Optional[dict] = None,
+        expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_,
+        external_id: Optional[Union[str, uuid.UUID]] = None
 ) -> dict:
     """Initialise the job and put the details in a SQLite3 database."""
-    
+    if extra_meta is None:
+        extra_meta = {}
+
+    def __process_external_id__(_id: Optional[Union[str, uuid.UUID]]) -> str:
+        if isinstance(_id, uuid.UUID):
+            return str(_id)
+
+        if _id is not None and bool(_id.strip()):
+            return str(_id.strip())
+        return ""
+
+    _ext_id = __process_external_id__(external_id)
     _job = {
         "job_id": job_id,
         "command": shlex.join(command),
@@ -105,18 +148,28 @@ def initialise_job(
             "status": "pending",
             "percent": 0,
             "job-type": job_type,
-            **extra_meta
+            **extra_meta,
+            **({"external_id": _ext_id} if bool(_ext_id) else {})
         }
     }
-    return __save_job__(conn, _job, expiry_seconds)
+    return __save_job__(conn, _job, expiry_seconds, _ext_id)
+
+
+def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path:
+    """Compute the path for the file where the launcher's `stream` output goes"""
+    assert stream in ("stdout", "stderr"), f"Invalid stream '{stream}'"
+    return outdir.joinpath(f"launcher_job_{jobid}.{stream}")
+
 
+stdout_filename = partial(output_file, stream="stdout")
+stderr_filename = partial(output_file, stream="stderr")
 
-def error_filename(jobid, error_dir):
-    "Compute the path of the file where errors will be dumped."
-    return f"{error_dir}/job_{jobid}.error"
 
+def build_environment(extras: Optional[dict[str, str]] = None) -> dict[str, str]:
+    """Setup the runtime environment variables for the background script."""
+    if extras is None:
+        extras = {}
 
-def build_environment(extras: dict[str, str] = {}):
     return {
         **dict(os.environ),
         "PYTHONPATH": ":".join(sys.path),
@@ -128,24 +181,32 @@ def launch_job(
         the_job: dict,
         sqlite3_url: str,
         error_dir: Path,
-        worker_manager: str = "gn_libs.jobs.launcher"
+        worker_manager: str = "gn_libs.jobs.launcher",
+        loglevel: str = "info"
 ) -> dict:
     """Launch a job in the background"""
     if not os.path.exists(error_dir):
         os.mkdir(error_dir)
 
     job_id = str(the_job["job_id"])
-    with open(error_filename(job_id, error_dir),
-              "w",
-              encoding="utf-8") as errorfile:
+    with (open(stderr_filename(jobid=the_job["job_id"], outdir=error_dir),
+               "w",
+               encoding="utf-8") as stderrfile,
+          open(stdout_filename(jobid=the_job["job_id"], outdir=error_dir),
+               "w",
+               encoding="utf-8") as stdoutfile):
         subprocess.Popen( # pylint: disable=[consider-using-with]
             [
                 sys.executable, "-u",
                 "-m", worker_manager,
                 sqlite3_url,
                 job_id,
-                str(error_dir)],
-            stderr=errorfile,
+                str(error_dir),
+                "--log-level",
+                loglevel
+            ],
+            stdout=stdoutfile,
+            stderr=stderrfile,
             env=build_environment())
 
     return the_job
@@ -167,7 +228,11 @@ def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str,
             })
 
 
-def push_to_stream(conn: DbConnection, job_id: Union[str, uuid.UUID], stream_name: str, content: str):
+def push_to_stream(
+        conn: DbConnection,
+        job_id: Union[str, uuid.UUID],
+        stream_name: str, content: str
+):
     """Initialise, and keep adding content to the stream from the provided content."""
     with _cursor(conn) as cursor:
         cursor.execute("SELECT * FROM jobs_standard_outputs "
@@ -189,3 +254,42 @@ def push_to_stream(conn: DbConnection, job_id: Union[str, uuid.UUID], stream_nam
                 "stream": stream_name,
                 "content": new_content
             })
+
+
+def delete_jobs(
+        conn: DbConnection, job_ids: tuple[Union[uuid.UUID, str], ...]) -> None:
+    """Delete the given jobs."""
+    with _cursor(conn) as cursor:
+        _paramstr = ", ".join(["?"] * len(job_ids))
+        _params = tuple(str(job_id) for job_id in job_ids)
+        cursor.execute(
+            f"DELETE FROM jobs_standard_outputs WHERE job_id IN ({_paramstr})",
+            _params)
+        cursor.execute(
+            f"DELETE FROM jobs_metadata WHERE job_id IN ({_paramstr})",
+            _params)
+        cursor.execute(
+            f"DELETE FROM jobs_external_ids WHERE job_id IN ({_paramstr})",
+            _params)
+        cursor.execute(f"DELETE FROM jobs WHERE job_id IN ({_paramstr})",
+                       _params)
+
+
+def delete_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None:
+    """Delete a specific job."""
+    return delete_jobs(conn, (job_id,))
+
+
+def delete_expired_jobs(conn: DbConnection) -> None:
+    """Delete all jobs that are expired."""
+    with _cursor(conn) as cursor:
+        cursor.execute(
+            "SELECT job_id FROM jobs WHERE datetime(expires) <= datetime()")
+        return delete_jobs(
+            conn, tuple(row["job_id"] for row in cursor.fetchall()))
+
+
+def kill_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None:
+    """Send a request to kill the job."""
+    return update_metadata(
+        conn, job_id, "hangup_request", datetime.now(timezone.utc).isoformat())