aboutsummaryrefslogtreecommitdiff
path: root/gn_libs/jobs/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_libs/jobs/jobs.py')
-rw-r--r--gn_libs/jobs/jobs.py213
1 files changed, 213 insertions, 0 deletions
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
new file mode 100644
index 0000000..ec1c3a8
--- /dev/null
+++ b/gn_libs/jobs/jobs.py
@@ -0,0 +1,213 @@
+"""Handle asynchronous/background jobs. Job data is stored in SQLite database(s)."""
+import os
+import sys
+import uuid
+import shlex
+import logging
+import subprocess
+from pathlib import Path
+from functools import partial
+from typing import Union, Optional
+from datetime import datetime, timezone, timedelta
+
+from gn_libs.sqlite3 import DbCursor, DbConnection, cursor as _cursor
+
+_logger_ = logging.getLogger(__name__)
+_DEFAULT_EXPIRY_SECONDS_ = 2 * 24 * 60 * 60 # 2 days, in seconds
+
+
+class JobNotFound(Exception):
+ """Raised if we try to retrieve a non-existent job."""
+
+
+def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict:
+ """Fetch extra job metadata."""
+ cursor.execute("SELECT * FROM jobs_metadata WHERE job_id=?", (str(job_id),))
+ return {
+ row["metadata_key"]: row["metadata_value"]
+ for row in cursor.fetchall()
+ }
+
+
+def job_stdstream_outputs(conn, job_id, streamname: str):
+ """Fetch the standard-error output for the job."""
+ with _cursor(conn) as cursor:
+ cursor.execute(
+ "SELECT * FROM jobs_standard_outputs "
+ "WHERE job_id=? AND output_stream=?",
+ (str(job_id), streamname))
+ return dict(cursor.fetchone() or {}).get("value")
+
+
+job_stderr = partial(job_stdstream_outputs, streamname="stderr")
+job_stdout = partial(job_stdstream_outputs, streamname="stdout")
+
+
+def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict:
+ """Fetch the job details for a job with a particular ID"""
+ with _cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),))
+ _job = dict(cursor.fetchone() or {})
+ if not bool(_job):
+ raise JobNotFound(f"Could not find job with ID {job_id}")
+
+ _job["metadata"] = __job_metadata__(cursor, job_id)
+
+ if fulldetails:
+ _job["stderr"] = job_stderr(conn, job_id)
+ _job["stdout"] = job_stdout(conn, job_id)
+
+ return _job
+
+
+def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict:
+ """Save the job to database."""
+
+ with _cursor(conn) as cursor:
+ job_id = str(the_job["job_id"])
+ expires = ((the_job["created"] + timedelta(seconds=expiry_seconds))
+ if expiry_seconds > 0 else None)
+ cursor.execute("INSERT INTO jobs(job_id, created, expires, command) "
+ "VALUES(:job_id, :created, :expires, :command)",
+ {
+ "job_id": job_id,
+ "created": the_job["created"].isoformat(),
+ "expires": (expires and expires.isoformat()),
+ "command": the_job["command"]
+ })
+ metadata = tuple({"job_id": job_id, "key": key, "value": value}
+ for key,value in the_job["metadata"].items())
+ if len(metadata) > 0:
+ cursor.executemany(
+ "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) "
+ "VALUES (:job_id, :key, :value)",
+ metadata)
+
+ return the_job
+
+
+def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-arguments]
+ conn: DbConnection,
+ job_id: uuid.UUID,
+ command: list,
+ job_type: str,
+ extra_meta: Optional[dict] = None,
+ expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_
+) -> dict:
+ """Initialise the job and put the details in a SQLite3 database."""
+ if extra_meta is None:
+ extra_meta = {}
+
+ _job = {
+ "job_id": job_id,
+ "command": shlex.join(command),
+ "created": datetime.now(timezone.utc),
+ "metadata": {
+ "status": "pending",
+ "percent": 0,
+ "job-type": job_type,
+ **extra_meta
+ }
+ }
+ return __save_job__(conn, _job, expiry_seconds)
+
+
+def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path:
+ """Compute the path for the file where the launcher's `stream` output goes"""
+ assert stream in ("stdout", "stderr"), f"Invalid stream '{stream}'"
+ return outdir.joinpath(f"launcher_job_{jobid}.{stream}")
+
+
+stdout_filename = partial(output_file, stream="stdout")
+stderr_filename = partial(output_file, stream="stderr")
+
+
+def build_environment(extras: Optional[dict[str, str]] = None) -> dict[str, str]:
+ """Setup the runtime environment variables for the background script."""
+ if extras is None:
+ extras = {}
+
+ return {
+ **dict(os.environ),
+ "PYTHONPATH": ":".join(sys.path),
+ **extras
+ }
+
+
+def launch_job(
+ the_job: dict,
+ sqlite3_url: str,
+ error_dir: Path,
+ worker_manager: str = "gn_libs.jobs.launcher",
+ loglevel: str = "info"
+) -> dict:
+ """Launch a job in the background"""
+ if not os.path.exists(error_dir):
+ os.mkdir(error_dir)
+
+ job_id = str(the_job["job_id"])
+ with (open(stderr_filename(jobid=the_job["job_id"], outdir=error_dir),
+ "w",
+ encoding="utf-8") as stderrfile,
+ open(stdout_filename(jobid=the_job["job_id"], outdir=error_dir),
+ "w",
+ encoding="utf-8") as stdoutfile):
+ subprocess.Popen( # pylint: disable=[consider-using-with]
+ [
+ sys.executable, "-u",
+ "-m", worker_manager,
+ sqlite3_url,
+ job_id,
+ str(error_dir),
+ "--log-level",
+ loglevel
+ ],
+ stdout=stdoutfile,
+ stderr=stderrfile,
+ env=build_environment())
+
+ return the_job
+
+
+def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str, value: str):
+ """Update the value of a metadata item."""
+ with _cursor(conn) as cursor:
+ cursor.execute(
+ "INSERT INTO jobs_metadata(job_id, metadata_key, metadata_value) "
+ "VALUES (:job_id, :key, :value) "
+ "ON CONFLICT (job_id, metadata_key) DO UPDATE "
+ "SET metadata_value=:value "
+ "WHERE job_id=:job_id AND metadata_key=:key",
+ {
+ "job_id": str(job_id),
+ "key": key,
+ "value": value
+ })
+
+
+def push_to_stream(
+ conn: DbConnection,
+ job_id: Union[str, uuid.UUID],
+ stream_name: str, content: str
+):
+ """Initialise, and keep adding content to the stream from the provided content."""
+ with _cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM jobs_standard_outputs "
+ "WHERE job_id=:job_id AND output_stream=:stream",
+ {
+ "job_id": str(job_id),
+ "stream": stream_name
+ })
+ result = cursor.fetchone()
+ new_content = ((bool(result) and result["value"]) or "") + content
+ cursor.execute(
+ "INSERT INTO jobs_standard_outputs(job_id, output_stream, value) "
+ "VALUES(:job_id, :stream, :content) "
+ "ON CONFLICT (job_id, output_stream) DO UPDATE "
+ "SET value=:content "
+ "WHERE job_id=:job_id AND output_stream=:stream",
+ {
+ "job_id": str(job_id),
+ "stream": stream_name,
+ "content": new_content
+ })