diff options
Diffstat (limited to 'gn_libs/jobs/launcher.py')
-rw-r--r-- | gn_libs/jobs/launcher.py | 108 |
1 files changed, 108 insertions, 0 deletions
diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py new file mode 100644 index 0000000..d565f9e --- /dev/null +++ b/gn_libs/jobs/launcher.py @@ -0,0 +1,108 @@ +"""Default launcher/manager script for background jobs.""" +import os +import sys +import time +import shlex +import logging +import argparse +import traceback +import subprocess +from uuid import UUID +from pathlib import Path + +from gn_libs import jobs, sqlite3 + +logger = logging.getLogger(__name__) + + +def run_job(conn, job, outputs_directory: Path): + """Run the job.""" + logger.info("Setting up the job.") + job_id = job["job_id"] + stdout_file = outputs_directory.joinpath(f"{job_id}.stdout") + stderr_file = outputs_directory.joinpath(f"{job_id}.stderr") + jobs.update_metadata(conn, job_id, "stdout-file", str(stdout_file)) + jobs.update_metadata(conn, job_id, "stderr-file", str(stderr_file)) + try: + logger.info("Launching the job in a separate process.") + with (stdout_file.open(mode="w") as outfile, + stderr_file.open(mode="w") as errfile, + stdout_file.open(mode="r") as stdout_in, + stderr_file.open(mode="r") as stderr_in, + subprocess.Popen( + shlex.split(job["command"]), + encoding="utf-8", + stdout=outfile, + stderr=errfile) as process): + while process.poll() is None: + jobs.update_metadata(conn, job_id, "status", "running") + jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read()) + jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read()) + time.sleep(1) + + # Fetch any remaining content. + jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read()) + jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read()) + logger.info("Job completed. Cleaning up.") + + os.remove(stdout_file) + os.remove(stderr_file) + exit_status = process.poll() + if exit_status == 0: + jobs.update_metadata(conn, job_id, "status", "completed") + else: + jobs.update_metadata(conn, job_id, "status", "error") + + logger.info("exiting job manager/launcher") + return exit_status + except Exception as _exc:# pylint: disable=[broad-exception-caught] + logger.error("An exception was raised when attempting to run the job", + exc_info=True) + jobs.update_metadata(conn, job_id, "status", "error") + jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc()) + return 4 + + +def parse_args(): + """Define and parse CLI args.""" + parser = argparse.ArgumentParser( + prog="GN Jobs Launcher", + description = ( + "Generic launcher and manager of jobs defined with gn-libs")) + parser.add_argument( + "jobs_db_uri", + help="The URI to the SQLite3 database holding the jobs' details") + parser.add_argument( + "job_id", help="The id of the job being processed", type=UUID) + parser.add_argument("outputs_directory", + help="Directory where output files will be created", + type=Path) + parser.add_argument( + "--log-level", + type=str, + help="Determines what is logged out.", + choices=("debug", "info", "warning", "error", "critical"), + default="info") + return parser.parse_args() + + +def main(): + """Entry-point to this program.""" + args = parse_args() + logger.setLevel(args.log_level.upper()) + args.outputs_directory.mkdir(parents=True, exist_ok=True) + with sqlite3.connection(args.jobs_db_uri) as conn: + job = jobs.job(conn, args.job_id) + if job: + return run_job(conn, job, args.outputs_directory) + + jobs.update_metadata(conn, args.job_id, "status", "error") + jobs.push_to_stream(conn, args.job_id, "stderr", "Job not found!") + return 2 + + return 3 + + + +if __name__ == "__main__": + sys.exit(main()) |