aboutsummaryrefslogtreecommitdiff
path: root/gn_libs/jobs/launcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_libs/jobs/launcher.py')
-rw-r--r--gn_libs/jobs/launcher.py108
1 files changed, 108 insertions, 0 deletions
diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py
new file mode 100644
index 0000000..d565f9e
--- /dev/null
+++ b/gn_libs/jobs/launcher.py
@@ -0,0 +1,108 @@
+"""Default launcher/manager script for background jobs."""
+import os
+import sys
+import time
+import shlex
+import logging
+import argparse
+import traceback
+import subprocess
+from uuid import UUID
+from pathlib import Path
+
+from gn_libs import jobs, sqlite3
+
+logger = logging.getLogger(__name__)
+
+
+def run_job(conn, job, outputs_directory: Path):
+ """Run the job."""
+ logger.info("Setting up the job.")
+ job_id = job["job_id"]
+ stdout_file = outputs_directory.joinpath(f"{job_id}.stdout")
+ stderr_file = outputs_directory.joinpath(f"{job_id}.stderr")
+ jobs.update_metadata(conn, job_id, "stdout-file", str(stdout_file))
+ jobs.update_metadata(conn, job_id, "stderr-file", str(stderr_file))
+ try:
+ logger.info("Launching the job in a separate process.")
+ with (stdout_file.open(mode="w") as outfile,
+ stderr_file.open(mode="w") as errfile,
+ stdout_file.open(mode="r") as stdout_in,
+ stderr_file.open(mode="r") as stderr_in,
+ subprocess.Popen(
+ shlex.split(job["command"]),
+ encoding="utf-8",
+ stdout=outfile,
+ stderr=errfile) as process):
+ while process.poll() is None:
+ jobs.update_metadata(conn, job_id, "status", "running")
+ jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read())
+ jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read())
+ time.sleep(1)
+
+ # Fetch any remaining content.
+ jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read())
+ jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read())
+ logger.info("Job completed. Cleaning up.")
+
+ os.remove(stdout_file)
+ os.remove(stderr_file)
+ exit_status = process.poll()
+ if exit_status == 0:
+ jobs.update_metadata(conn, job_id, "status", "completed")
+ else:
+ jobs.update_metadata(conn, job_id, "status", "error")
+
+ logger.info("exiting job manager/launcher")
+ return exit_status
+ except Exception as _exc:# pylint: disable=[broad-exception-caught]
+ logger.error("An exception was raised when attempting to run the job",
+ exc_info=True)
+ jobs.update_metadata(conn, job_id, "status", "error")
+ jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc())
+ return 4
+
+
+def parse_args():
+ """Define and parse CLI args."""
+ parser = argparse.ArgumentParser(
+ prog="GN Jobs Launcher",
+ description = (
+ "Generic launcher and manager of jobs defined with gn-libs"))
+ parser.add_argument(
+ "jobs_db_uri",
+ help="The URI to the SQLite3 database holding the jobs' details")
+ parser.add_argument(
+ "job_id", help="The id of the job being processed", type=UUID)
+ parser.add_argument("outputs_directory",
+ help="Directory where output files will be created",
+ type=Path)
+ parser.add_argument(
+ "--log-level",
+ type=str,
+ help="Determines what is logged out.",
+ choices=("debug", "info", "warning", "error", "critical"),
+ default="info")
+ return parser.parse_args()
+
+
+def main():
+ """Entry-point to this program."""
+ args = parse_args()
+ logger.setLevel(args.log_level.upper())
+ args.outputs_directory.mkdir(parents=True, exist_ok=True)
+ with sqlite3.connection(args.jobs_db_uri) as conn:
+ job = jobs.job(conn, args.job_id)
+ if job:
+ return run_job(conn, job, args.outputs_directory)
+
+ jobs.update_metadata(conn, args.job_id, "status", "error")
+ jobs.push_to_stream(conn, args.job_id, "stderr", "Job not found!")
+ return 2
+
+ return 3
+
+
+
+if __name__ == "__main__":
+ sys.exit(main())