aboutsummaryrefslogtreecommitdiff
path: root/gn_libs/jobs/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_libs/jobs/jobs.py')
-rw-r--r--gn_libs/jobs/jobs.py79
1 files changed, 62 insertions, 17 deletions
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
index 38cd9c0..ec1c3a8 100644
--- a/gn_libs/jobs/jobs.py
+++ b/gn_libs/jobs/jobs.py
@@ -6,7 +6,7 @@ import shlex
import logging
import subprocess
from pathlib import Path
-from functools import reduce
+from functools import partial
from typing import Union, Optional
from datetime import datetime, timezone, timedelta
@@ -29,11 +29,25 @@ def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict:
}
+def job_stdstream_outputs(conn, job_id, streamname: str):
+ """Fetch the standard-error output for the job."""
+ with _cursor(conn) as cursor:
+ cursor.execute(
+ "SELECT * FROM jobs_standard_outputs "
+ "WHERE job_id=? AND output_stream=?",
+ (str(job_id), streamname))
+ return dict(cursor.fetchone() or {}).get("value")
+
+
+job_stderr = partial(job_stdstream_outputs, streamname="stderr")
+job_stdout = partial(job_stdstream_outputs, streamname="stdout")
+
+
def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict:
"""Fetch the job details for a job with a particular ID"""
with _cursor(conn) as cursor:
cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),))
- _job = dict(cursor.fetchone())
+ _job = dict(cursor.fetchone() or {})
if not bool(_job):
raise JobNotFound(f"Could not find job with ID {job_id}")
@@ -72,16 +86,18 @@ def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict
return the_job
-def initialise_job(
+def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-arguments]
conn: DbConnection,
job_id: uuid.UUID,
command: list,
job_type: str,
- extra_meta: dict = {},
- expiry_seconds: Optional[int] = _DEFAULT_EXPIRY_SECONDS_
+ extra_meta: Optional[dict] = None,
+ expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_
) -> dict:
"""Initialise the job and put the details in a SQLite3 database."""
-
+ if extra_meta is None:
+ extra_meta = {}
+
_job = {
"job_id": job_id,
"command": shlex.join(command),
@@ -96,34 +112,59 @@ def initialise_job(
return __save_job__(conn, _job, expiry_seconds)
-def error_filename(jobid, error_dir):
- "Compute the path of the file where errors will be dumped."
- return f"{error_dir}/job_{jobid}.error"
+def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path:
+ """Compute the path for the file where the launcher's `stream` output goes"""
+ assert stream in ("stdout", "stderr"), f"Invalid stream '{stream}'"
+ return outdir.joinpath(f"launcher_job_{jobid}.{stream}")
+
+
+stdout_filename = partial(output_file, stream="stdout")
+stderr_filename = partial(output_file, stream="stderr")
+
+
+def build_environment(extras: Optional[dict[str, str]] = None) -> dict[str, str]:
+ """Setup the runtime environment variables for the background script."""
+ if extras is None:
+ extras = {}
+
+ return {
+ **dict(os.environ),
+ "PYTHONPATH": ":".join(sys.path),
+ **extras
+ }
def launch_job(
the_job: dict,
sqlite3_url: str,
error_dir: Path,
- worker_manager: str = "gn_libs.jobs.launcher"
+ worker_manager: str = "gn_libs.jobs.launcher",
+ loglevel: str = "info"
) -> dict:
"""Launch a job in the background"""
if not os.path.exists(error_dir):
os.mkdir(error_dir)
job_id = str(the_job["job_id"])
- with open(error_filename(job_id, error_dir),
- "w",
- encoding="utf-8") as errorfile:
+ with (open(stderr_filename(jobid=the_job["job_id"], outdir=error_dir),
+ "w",
+ encoding="utf-8") as stderrfile,
+ open(stdout_filename(jobid=the_job["job_id"], outdir=error_dir),
+ "w",
+ encoding="utf-8") as stdoutfile):
subprocess.Popen( # pylint: disable=[consider-using-with]
[
sys.executable, "-u",
"-m", worker_manager,
sqlite3_url,
job_id,
- str(error_dir)],
- stderr=errorfile,
- env={"PYTHONPATH": ":".join(sys.path)})
+ str(error_dir),
+ "--log-level",
+ loglevel
+ ],
+ stdout=stdoutfile,
+ stderr=stderrfile,
+ env=build_environment())
return the_job
@@ -144,7 +185,11 @@ def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str,
})
-def push_to_stream(conn: DbConnection, job_id: Union[str, uuid.UUID], stream_name: str, content: str):
+def push_to_stream(
+ conn: DbConnection,
+ job_id: Union[str, uuid.UUID],
+ stream_name: str, content: str
+):
"""Initialise, and keep adding content to the stream from the provided content."""
with _cursor(conn) as cursor:
cursor.execute("SELECT * FROM jobs_standard_outputs "