diff options
Diffstat (limited to 'gn_libs/jobs/jobs.py')
| -rw-r--r-- | gn_libs/jobs/jobs.py | 146 |
1 files changed, 125 insertions, 21 deletions
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py index 1f66772..bccddd5 100644 --- a/gn_libs/jobs/jobs.py +++ b/gn_libs/jobs/jobs.py @@ -6,7 +6,6 @@ import shlex import logging import subprocess from pathlib import Path -from functools import reduce from functools import partial from typing import Union, Optional from datetime import datetime, timezone, timedelta @@ -41,14 +40,14 @@ def job_stdstream_outputs(conn, job_id, streamname: str): job_stderr = partial(job_stdstream_outputs, streamname="stderr") -job_stdout = partial(job_stdstream_outputs, streamname="stderr") +job_stdout = partial(job_stdstream_outputs, streamname="stdout") def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict: """Fetch the job details for a job with a particular ID""" with _cursor(conn) as cursor: cursor.execute("SELECT * FROM jobs WHERE job_id=?", (str(job_id),)) - _job = dict(cursor.fetchone()) + _job = dict(cursor.fetchone() or {}) if not bool(_job): raise JobNotFound(f"Could not find job with ID {job_id}") @@ -61,7 +60,34 @@ def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = F return _job -def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict: +def jobs_by_external_id(conn: DbConnection, external_id: Union[str, uuid.UUID]) -> tuple[dict, ...]: + """Fetch jobs by their external IDs.""" + with _cursor(conn) as cursor: + cursor.execute( + "SELECT jeids.external_id, jobs.* FROM jobs_external_ids AS jeids " + "INNER JOIN jobs ON jeids.job_id=jobs.job_id " + "WHERE jeids.external_id=? " + "ORDER BY jobs.created DESC", + (str(external_id),)) + _jobs = {row["job_id"]: {**dict(row), "metadata": {}} for row in cursor.fetchall()} + _jobs_ids = tuple(_job["job_id"] for _job in _jobs.values()) + + _paramstr = ", ".join(["?"] * len(_jobs_ids)) + cursor.execute( + f"SELECT * FROM jobs_metadata WHERE job_id IN ({_paramstr})", + _jobs_ids) + for row in cursor.fetchall(): + _jobs[row["job_id"]]["metadata"][row["metadata_key"]] = row["metadata_value"] + + return tuple(_jobs.values()) + + +def __save_job__( + conn: DbConnection, + the_job: dict, + expiry_seconds: int, + external_id: str = "" +) -> dict: """Save the job to database.""" with _cursor(conn) as cursor: @@ -76,6 +102,11 @@ def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict "expires": (expires and expires.isoformat()), "command": the_job["command"] }) + if bool(external_id.strip()): + cursor.execute( + "INSERT INTO jobs_external_ids(job_id, external_id) " + "VALUES(:job_id, :external_id)", + {"job_id": job_id, "external_id": external_id.strip()}) metadata = tuple({"job_id": job_id, "key": key, "value": value} for key,value in the_job["metadata"].items()) if len(metadata) > 0: @@ -87,16 +118,28 @@ def __save_job__(conn: DbConnection, the_job: dict, expiry_seconds: int) -> dict return the_job -def initialise_job( +def initialise_job(# pylint: disable=[too-many-arguments, too-many-positional-arguments] conn: DbConnection, job_id: uuid.UUID, command: list, job_type: str, - extra_meta: dict = {}, - expiry_seconds: Optional[int] = _DEFAULT_EXPIRY_SECONDS_ + extra_meta: Optional[dict] = None, + expiry_seconds: int = _DEFAULT_EXPIRY_SECONDS_, + external_id: Optional[Union[str, uuid.UUID]] = None ) -> dict: """Initialise the job and put the details in a SQLite3 database.""" - + if extra_meta is None: + extra_meta = {} + + def __process_external_id__(_id: Optional[Union[str, uuid.UUID]]) -> str: + if isinstance(_id, uuid.UUID): + return str(_id) + + if _id is not None and bool(_id.strip()): + return str(_id.strip()) + return "" + + _ext_id = __process_external_id__(external_id) _job = { "job_id": job_id, "command": shlex.join(command), @@ -105,18 +148,28 @@ def initialise_job( "status": "pending", "percent": 0, "job-type": job_type, - **extra_meta + **extra_meta, + **({"external_id": _ext_id} if bool(_ext_id) else {}) } } - return __save_job__(conn, _job, expiry_seconds) + return __save_job__(conn, _job, expiry_seconds, _ext_id) + + +def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path: + """Compute the path for the file where the launcher's `stream` output goes""" + assert stream in ("stdout", "stderr"), f"Invalid stream '{stream}'" + return outdir.joinpath(f"launcher_job_{jobid}.{stream}") + +stdout_filename = partial(output_file, stream="stdout") +stderr_filename = partial(output_file, stream="stderr") -def error_filename(jobid, error_dir): - "Compute the path of the file where errors will be dumped." - return f"{error_dir}/job_{jobid}.error" +def build_environment(extras: Optional[dict[str, str]] = None) -> dict[str, str]: + """Setup the runtime environment variables for the background script.""" + if extras is None: + extras = {} -def build_environment(extras: dict[str, str] = {}): return { **dict(os.environ), "PYTHONPATH": ":".join(sys.path), @@ -128,24 +181,32 @@ def launch_job( the_job: dict, sqlite3_url: str, error_dir: Path, - worker_manager: str = "gn_libs.jobs.launcher" + worker_manager: str = "gn_libs.jobs.launcher", + loglevel: str = "info" ) -> dict: """Launch a job in the background""" if not os.path.exists(error_dir): os.mkdir(error_dir) job_id = str(the_job["job_id"]) - with open(error_filename(job_id, error_dir), - "w", - encoding="utf-8") as errorfile: + with (open(stderr_filename(jobid=the_job["job_id"], outdir=error_dir), + "w", + encoding="utf-8") as stderrfile, + open(stdout_filename(jobid=the_job["job_id"], outdir=error_dir), + "w", + encoding="utf-8") as stdoutfile): subprocess.Popen( # pylint: disable=[consider-using-with] [ sys.executable, "-u", "-m", worker_manager, sqlite3_url, job_id, - str(error_dir)], - stderr=errorfile, + str(error_dir), + "--log-level", + loglevel + ], + stdout=stdoutfile, + stderr=stderrfile, env=build_environment()) return the_job @@ -167,7 +228,11 @@ def update_metadata(conn: DbConnection, job_id: Union[str, uuid.UUID], key: str, }) -def push_to_stream(conn: DbConnection, job_id: Union[str, uuid.UUID], stream_name: str, content: str): +def push_to_stream( + conn: DbConnection, + job_id: Union[str, uuid.UUID], + stream_name: str, content: str +): """Initialise, and keep adding content to the stream from the provided content.""" with _cursor(conn) as cursor: cursor.execute("SELECT * FROM jobs_standard_outputs " @@ -189,3 +254,42 @@ def push_to_stream(conn: DbConnection, job_id: Union[str, uuid.UUID], stream_nam "stream": stream_name, "content": new_content }) + + +def delete_jobs( + conn: DbConnection, job_ids: tuple[Union[uuid.UUID, str], ...]) -> None: + """Delete the given jobs.""" + with _cursor(conn) as cursor: + _paramstr = ", ".join(["?"] * len(job_ids)) + _params = tuple(str(job_id) for job_id in job_ids) + cursor.execute( + f"DELETE FROM jobs_standard_outputs WHERE job_id IN ({_paramstr})", + _params) + cursor.execute( + f"DELETE FROM jobs_metadata WHERE job_id IN ({_paramstr})", + _params) + cursor.execute( + f"DELETE FROM jobs_external_ids WHERE job_id IN ({_paramstr})", + _params) + cursor.execute(f"DELETE FROM jobs WHERE job_id IN ({_paramstr})", + _params) + + +def delete_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None: + """Delete a specific job.""" + return delete_jobs(conn, (job_id,)) + + +def delete_expired_jobs(conn: DbConnection) -> None: + """Delete all jobs that are expired.""" + with _cursor(conn) as cursor: + cursor.execute( + "SELECT job_id FROM jobs WHERE datetime(expires) <= datetime()") + return delete_jobs( + conn, tuple(row["job_id"] for row in cursor.fetchall())) + + +def kill_job(conn: DbConnection, job_id: Union[uuid.UUID, str]) -> None: + """Send a request to kill the job.""" + return update_metadata( + conn, job_id, "hangup_request", datetime.now(timezone.utc).isoformat()) |
