diff options
Diffstat (limited to 'gn_libs/jobs')
-rw-r--r-- | gn_libs/jobs/jobs.py | 25 | ||||
-rw-r--r-- | gn_libs/jobs/launcher.py | 6 |
2 files changed, 30 insertions, 1 deletions
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py index 38cd9c0..1f66772 100644 --- a/gn_libs/jobs/jobs.py +++ b/gn_libs/jobs/jobs.py @@ -7,6 +7,7 @@ import logging import subprocess from pathlib import Path from functools import reduce +from functools import partial from typing import Union, Optional from datetime import datetime, timezone, timedelta @@ -29,6 +30,20 @@ def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict: } +def job_stdstream_outputs(conn, job_id, streamname: str): + """Fetch the standard-error output for the job.""" + with _cursor(conn) as cursor: + cursor.execute( + "SELECT * FROM jobs_standard_outputs " + "WHERE job_id=? AND output_stream=?", + (str(job_id), streamname)) + return dict(cursor.fetchone() or {}).get("value") + + +job_stderr = partial(job_stdstream_outputs, streamname="stderr") +job_stdout = partial(job_stdstream_outputs, streamname="stderr") + + def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict: """Fetch the job details for a job with a particular ID""" with _cursor(conn) as cursor: @@ -101,6 +116,14 @@ def error_filename(jobid, error_dir): return f"{error_dir}/job_{jobid}.error" +def build_environment(extras: dict[str, str] = {}): + return { + **dict(os.environ), + "PYTHONPATH": ":".join(sys.path), + **extras + } + + def launch_job( the_job: dict, sqlite3_url: str, @@ -123,7 +146,7 @@ def launch_job( job_id, str(error_dir)], stderr=errorfile, - env={"PYTHONPATH": ":".join(sys.path)}) + env=build_environment()) return the_job diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py index b7369a4..0b14f98 100644 --- a/gn_libs/jobs/launcher.py +++ b/gn_libs/jobs/launcher.py @@ -1,3 +1,4 @@ +import os import sys import time import shlex @@ -36,6 +37,11 @@ def run_job(conn, job, outputs_directory: Path): # Fetch any remaining content. jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read()) jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read()) + + os.remove(stdout_file) + os.remove(stderr_file) + jobs.update_metadata(conn, job_id, "status", "completed") + return process.poll() except: jobs.update_metadata(conn, job_id, "status", "error") jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc()) |