diff options
author | Frederick Muriuki Muriithi | 2025-06-10 11:12:39 -0500 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2025-06-10 11:12:39 -0500 |
commit | ea61d349cc6368c13c22e8b23189ae53180b124a (patch) | |
tree | b05901a7bafcd389ea57b7516f04393e9890add6 | |
parent | 44a251732b980b699e2ea00fb9405db39bb6e08f (diff) | |
download | gn-libs-ea61d349cc6368c13c22e8b23189ae53180b124a.tar.gz |
Setup launcher logging.
Enable logging out details of the launcher's process to help with
debugging issues.
-rw-r--r-- | gn_libs/jobs/jobs.py | 31 | ||||
-rw-r--r-- | gn_libs/jobs/launcher.py | 26 |
2 files changed, 46 insertions, 11 deletions
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py index b705676..17c1ac6 100644 --- a/gn_libs/jobs/jobs.py +++ b/gn_libs/jobs/jobs.py @@ -111,9 +111,14 @@ def initialise_job( return __save_job__(conn, _job, expiry_seconds) -def error_filename(jobid, error_dir): - "Compute the path of the file where errors will be dumped." - return f"{error_dir}/job_{jobid}.error" +def output_file(jobid: uuid.UUID, outdir: Path, stream: str) -> Path: + """Compute the path for the file where the launcher's `stream` output goes""" + assert stream in ("stdout", "stderr"), f"Invalid stream '{stream}'" + return f"{outdir}/launcher_job_{jobid}.{stream}" + + +stdout_filename = partial(output_file, stream="stdout") +stderr_filename = partial(output_file, stream="stderr") def build_environment(extras: dict[str, str] = {}): @@ -128,24 +133,32 @@ def launch_job( the_job: dict, sqlite3_url: str, error_dir: Path, - worker_manager: str = "gn_libs.jobs.launcher" + worker_manager: str = "gn_libs.jobs.launcher", + loglevel: str = "info" ) -> dict: """Launch a job in the background""" if not os.path.exists(error_dir): os.mkdir(error_dir) job_id = str(the_job["job_id"]) - with open(error_filename(job_id, error_dir), - "w", - encoding="utf-8") as errorfile: + with (open(stderr_filename(jobid=job_id, outdir=error_dir), + "w", + encoding="utf-8") as stderrfile, + open(stdout_filename(jobid=job_id, outdir=error_dir), + "w", + encoding="utf-8") as stdoutfile): subprocess.Popen( # pylint: disable=[consider-using-with] [ sys.executable, "-u", "-m", worker_manager, sqlite3_url, job_id, - str(error_dir)], - stderr=errorfile, + str(error_dir), + "--log-level", + loglevel + ], + stdout=stdoutfile, + stderr=stderrfile, env=build_environment()) return the_job diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py index 0b14f98..89884b6 100644 --- a/gn_libs/jobs/launcher.py +++ b/gn_libs/jobs/launcher.py @@ -2,6 +2,7 @@ import os import sys import time import shlex +import logging import argparse import traceback import subprocess @@ -10,15 +11,19 @@ from pathlib import Path from gn_libs import jobs, sqlite3 +logger = logging.getLogger(__name__) + def run_job(conn, job, outputs_directory: Path): """Run the job.""" + logger.info("Setting up the job.") job_id = job["job_id"] stdout_file = outputs_directory.joinpath(f"{job_id}.stdout") stderr_file = outputs_directory.joinpath(f"{job_id}.stderr") jobs.update_metadata(conn, job_id, "stdout-file", str(stdout_file)) jobs.update_metadata(conn, job_id, "stderr-file", str(stderr_file)) try: + logger.info("Launching the job in a separate process.") with (stdout_file.open(mode="w") as outfile, stderr_file.open(mode="w") as errfile, stdout_file.open(mode="r") as stdout_in, @@ -37,12 +42,21 @@ def run_job(conn, job, outputs_directory: Path): # Fetch any remaining content. jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read()) jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read()) + logger.info("Job completed. Cleaning up.") os.remove(stdout_file) os.remove(stderr_file) - jobs.update_metadata(conn, job_id, "status", "completed") - return process.poll() + exit_status = process.poll() + if exit_status == 0: + jobs.update_metadata(conn, job_id, "status", "completed") + else: + jobs.update_metadata(conn, job_id, "status", "error") + + logger.info("exiting job manager/launcher") + return exit_status except: + logger.error("An exception was raised when attempting to run the job", + exc_info=True) jobs.update_metadata(conn, job_id, "status", "error") jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc()) return 4 @@ -62,11 +76,19 @@ def parse_args(): parser.add_argument("outputs_directory", help="Directory where output files will be created", type=Path) + parser.add_argument( + "--log-level", + type=str, + help="Determines what is logged out.", + choices=("debug", "info", "warning", "error", "critical"), + default="info") return parser.parse_args() + def main(): """Entry-point to this program.""" args = parse_args() + logger.setLevel(args.log_level.upper()) args.outputs_directory.mkdir(parents=True, exist_ok=True) with (sqlite3.connection(args.jobs_db_uri) as conn, sqlite3.cursor(conn) as cursor): |