aboutsummaryrefslogtreecommitdiff
path: root/gn_libs/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'gn_libs/jobs')
-rw-r--r--gn_libs/jobs/jobs.py25
-rw-r--r--gn_libs/jobs/launcher.py6
2 files changed, 30 insertions, 1 deletions
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
index 38cd9c0..1f66772 100644
--- a/gn_libs/jobs/jobs.py
+++ b/gn_libs/jobs/jobs.py
@@ -7,6 +7,7 @@ import logging
import subprocess
from pathlib import Path
from functools import reduce
+from functools import partial
from typing import Union, Optional
from datetime import datetime, timezone, timedelta
@@ -29,6 +30,20 @@ def __job_metadata__(cursor: DbCursor, job_id: Union[str, uuid.UUID]) -> dict:
}
+def job_stdstream_outputs(conn, job_id, streamname: str):
+ """Fetch the standard-error output for the job."""
+ with _cursor(conn) as cursor:
+ cursor.execute(
+ "SELECT * FROM jobs_standard_outputs "
+ "WHERE job_id=? AND output_stream=?",
+ (str(job_id), streamname))
+ return dict(cursor.fetchone() or {}).get("value")
+
+
+job_stderr = partial(job_stdstream_outputs, streamname="stderr")
+job_stdout = partial(job_stdstream_outputs, streamname="stderr")
+
+
def job(conn: DbConnection, job_id: Union[str, uuid.UUID], fulldetails: bool = False) -> dict:
"""Fetch the job details for a job with a particular ID"""
with _cursor(conn) as cursor:
@@ -101,6 +116,14 @@ def error_filename(jobid, error_dir):
return f"{error_dir}/job_{jobid}.error"
+def build_environment(extras: dict[str, str] = {}):
+ return {
+ **dict(os.environ),
+ "PYTHONPATH": ":".join(sys.path),
+ **extras
+ }
+
+
def launch_job(
the_job: dict,
sqlite3_url: str,
@@ -123,7 +146,7 @@ def launch_job(
job_id,
str(error_dir)],
stderr=errorfile,
- env={"PYTHONPATH": ":".join(sys.path)})
+ env=build_environment())
return the_job
diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py
index b7369a4..0b14f98 100644
--- a/gn_libs/jobs/launcher.py
+++ b/gn_libs/jobs/launcher.py
@@ -1,3 +1,4 @@
+import os
import sys
import time
import shlex
@@ -36,6 +37,11 @@ def run_job(conn, job, outputs_directory: Path):
# Fetch any remaining content.
jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read())
jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read())
+
+ os.remove(stdout_file)
+ os.remove(stderr_file)
+ jobs.update_metadata(conn, job_id, "status", "completed")
+ return process.poll()
except:
jobs.update_metadata(conn, job_id, "status", "error")
jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc())