aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2025-06-10 11:12:39 -0500
committerFrederick Muriuki Muriithi2025-06-10 11:12:39 -0500
commitea61d349cc6368c13c22e8b23189ae53180b124a (patch)
treeb05901a7bafcd389ea57b7516f04393e9890add6
parent44a251732b980b699e2ea00fb9405db39bb6e08f (diff)
downloadgn-libs-ea61d349cc6368c13c22e8b23189ae53180b124a.tar.gz
Setup launcher logging.
Enable logging out details of the launcher's process to help with debugging issues.
-rw-r--r--gn_libs/jobs/jobs.py31
-rw-r--r--gn_libs/jobs/launcher.py26
2 files changed, 46 insertions, 11 deletions
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
index b705676..17c1ac6 100644
--- a/gn_libs/jobs/jobs.py
+++ b/gn_libs/jobs/jobs.py
@@ -111,9 +111,14 @@ def initialise_job(
return __save_job__(conn, _job, expiry_seconds)
-def error_filename(jobid, error_dir):
- "Compute the path of the file where errors will be dumped."
- return f"{error_dir}/job_{jobid}.error"
+def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path:
+ """Compute the path for the file where the launcher's `stream` output goes"""
+ assert stream in ("stdout", "stderr"), f"Invalid stream '{stream}'"
+ return f"{outdir}/launcher_job_{jobid}.{stream}"
+
+
+stdout_filename = partial(output_file, stream="stdout")
+stderr_filename = partial(output_file, stream="stderr")
def build_environment(extras: dict[str, str] = {}):
@@ -128,24 +133,32 @@ def launch_job(
the_job: dict,
sqlite3_url: str,
error_dir: Path,
- worker_manager: str = "gn_libs.jobs.launcher"
+ worker_manager: str = "gn_libs.jobs.launcher",
+ loglevel: str = "info"
) -> dict:
"""Launch a job in the background"""
if not os.path.exists(error_dir):
os.mkdir(error_dir)
job_id = str(the_job["job_id"])
- with open(error_filename(job_id, error_dir),
- "w",
- encoding="utf-8") as errorfile:
+ with (open(stderr_filename(jobid=job_id, outdir=error_dir),
+ "w",
+ encoding="utf-8") as stderrfile,
+ open(stdout_filename(jobid=job_id, outdir=error_dir),
+ "w",
+ encoding="utf-8") as stdoutfile):
subprocess.Popen( # pylint: disable=[consider-using-with]
[
sys.executable, "-u",
"-m", worker_manager,
sqlite3_url,
job_id,
- str(error_dir)],
- stderr=errorfile,
+ str(error_dir),
+ "--log-level",
+ loglevel
+ ],
+ stdout=stdoutfile,
+ stderr=stderrfile,
env=build_environment())
return the_job
diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py
index 0b14f98..89884b6 100644
--- a/gn_libs/jobs/launcher.py
+++ b/gn_libs/jobs/launcher.py
@@ -2,6 +2,7 @@ import os
import sys
import time
import shlex
+import logging
import argparse
import traceback
import subprocess
@@ -10,15 +11,19 @@ from pathlib import Path
from gn_libs import jobs, sqlite3
+logger = logging.getLogger(__name__)
+
def run_job(conn, job, outputs_directory: Path):
"""Run the job."""
+ logger.info("Setting up the job.")
job_id = job["job_id"]
stdout_file = outputs_directory.joinpath(f"{job_id}.stdout")
stderr_file = outputs_directory.joinpath(f"{job_id}.stderr")
jobs.update_metadata(conn, job_id, "stdout-file", str(stdout_file))
jobs.update_metadata(conn, job_id, "stderr-file", str(stderr_file))
try:
+ logger.info("Launching the job in a separate process.")
with (stdout_file.open(mode="w") as outfile,
stderr_file.open(mode="w") as errfile,
stdout_file.open(mode="r") as stdout_in,
@@ -37,12 +42,21 @@ def run_job(conn, job, outputs_directory: Path):
# Fetch any remaining content.
jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read())
jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read())
+ logger.info("Job completed. Cleaning up.")
os.remove(stdout_file)
os.remove(stderr_file)
- jobs.update_metadata(conn, job_id, "status", "completed")
- return process.poll()
+ exit_status = process.poll()
+ if exit_status == 0:
+ jobs.update_metadata(conn, job_id, "status", "completed")
+ else:
+ jobs.update_metadata(conn, job_id, "status", "error")
+
+ logger.info("exiting job manager/launcher")
+ return exit_status
except:
+ logger.error("An exception was raised when attempting to run the job",
+ exc_info=True)
jobs.update_metadata(conn, job_id, "status", "error")
jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc())
return 4
@@ -62,11 +76,19 @@ def parse_args():
parser.add_argument("outputs_directory",
help="Directory where output files will be created",
type=Path)
+ parser.add_argument(
+ "--log-level",
+ type=str,
+ help="Determines what is logged out.",
+ choices=("debug", "info", "warning", "error", "critical"),
+ default="info")
return parser.parse_args()
+
def main():
"""Entry-point to this program."""
args = parse_args()
+ logger.setLevel(args.log_level.upper())
args.outputs_directory.mkdir(parents=True, exist_ok=True)
with (sqlite3.connection(args.jobs_db_uri) as conn,
sqlite3.cursor(conn) as cursor):