import sys import time import shlex import argparse import traceback import subprocess from uuid import UUID from pathlib import Path from gn_libs import jobs, sqlite3 def run_job(conn, job, outputs_directory: Path): """Run the job.""" job_id = job["job_id"] stdout_file = outputs_directory.joinpath(f"{job_id}.stdout") stderr_file = outputs_directory.joinpath(f"{job_id}.stderr") try: with (# TODO: Add the output streams' files to job metadata stdout_file.open(mode="w") as outfile, stderr_file.open(mode="w") as errfile, subprocess.Popen( shlex.split(job["command"]), encoding="utf-8", stdout=outfile, stderr=errfile) as process): with (stdout_file.open(mode="r") as stdout_output, stderr_file.open(mode="r") as stderr_output): while process.poll() is None: jobs.update_metadata(conn, job_id, "status", "running") jobs.push_to_stream( conn, job_id, "stdout", stdout_output.read()) jobs.push_to_stream( conn, job_id, "stderr", stderr_output.read()) time.sleep(1) except: jobs.update_metadata(conn, job_id, "status", "error") jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc()) return 4 def parse_args(): """Define and parse CLI args.""" parser = argparse.ArgumentParser( prog="GN Jobs Launcher", description = ( "Generic launcher and manager of jobs defined with gn-libs")) parser.add_argument( "jobs_db_uri", help="The URI to the SQLite3 database holding the jobs' details") parser.add_argument( "job_id", help="The id of the job being processed", type=UUID) parser.add_argument("outputs_directory", help="Directory where output files will be created", type=Path) return parser.parse_args() def main(): """Entry-point to this program.""" args = parse_args() with (sqlite3.connection(args.jobs_db_uri) as conn, sqlite3.cursor(conn) as cursor): job = jobs.job(conn, args.job_id) if job: return run_job(conn, job, args.outputs_directory) jobs.update_metadata(conn, args.job_id, "status", "error") jobs.push_to_stream(conn, args.job_id, "stderr", "Job not found!") return 2 return 3 if __name__ == "__main__": sys.exit(main())