"""Default launcher/manager script for background jobs.""" import os import sys import time import shlex import logging import argparse import traceback import subprocess from uuid import UUID from pathlib import Path from gn_libs import jobs, sqlite3 logger = logging.getLogger(__name__) def run_job(conn, job, outputs_directory: Path): """Run the job.""" logger.info("Setting up the job.") job_id = job["job_id"] stdout_file = outputs_directory.joinpath(f"{job_id}.stdout") stderr_file = outputs_directory.joinpath(f"{job_id}.stderr") jobs.update_metadata(conn, job_id, "stdout-file", str(stdout_file)) jobs.update_metadata(conn, job_id, "stderr-file", str(stderr_file)) try: logger.info("Launching the job in a separate process.") with (stdout_file.open(mode="w") as outfile, stderr_file.open(mode="w") as errfile, stdout_file.open(mode="r") as stdout_in, stderr_file.open(mode="r") as stderr_in, subprocess.Popen( shlex.split(job["command"]), encoding="utf-8", stdout=outfile, stderr=errfile) as process): while process.poll() is None: jobs.update_metadata(conn, job_id, "status", "running") jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read()) jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read()) time.sleep(1) # Fetch any remaining content. jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read()) jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read()) logger.info("Job completed. Cleaning up.") os.remove(stdout_file) os.remove(stderr_file) exit_status = process.poll() if exit_status == 0: jobs.update_metadata(conn, job_id, "status", "completed") else: jobs.update_metadata(conn, job_id, "status", "error") logger.info("exiting job manager/launcher") return exit_status except Exception as _exc:# pylint: disable=[broad-exception-caught] logger.error("An exception was raised when attempting to run the job", exc_info=True) jobs.update_metadata(conn, job_id, "status", "error") jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc()) return 4 def parse_args(): """Define and parse CLI args.""" parser = argparse.ArgumentParser( prog="GN Jobs Launcher", description = ( "Generic launcher and manager of jobs defined with gn-libs")) parser.add_argument( "jobs_db_uri", help="The URI to the SQLite3 database holding the jobs' details") parser.add_argument( "job_id", help="The id of the job being processed", type=UUID) parser.add_argument("outputs_directory", help="Directory where output files will be created", type=Path) parser.add_argument( "--log-level", type=str, help="Determines what is logged out.", choices=("debug", "info", "warning", "error", "critical"), default="info") return parser.parse_args() def main(): """Entry-point to this program.""" args = parse_args() logger.setLevel(args.log_level.upper()) args.outputs_directory.mkdir(parents=True, exist_ok=True) with sqlite3.connection(args.jobs_db_uri) as conn: job = jobs.job(conn, args.job_id) if job: return run_job(conn, job, args.outputs_directory) jobs.update_metadata(conn, args.job_id, "status", "error") jobs.push_to_stream(conn, args.job_id, "stderr", "Job not found!") return 2 return 3 if __name__ == "__main__": sys.exit(main())