diff options
Diffstat (limited to 'gn_libs/jobs/launcher.py')
-rw-r--r-- | gn_libs/jobs/launcher.py | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py new file mode 100644 index 0000000..8eab55b --- /dev/null +++ b/gn_libs/jobs/launcher.py @@ -0,0 +1,76 @@ +import sys +import time +import shlex +import argparse +import traceback +import subprocess +from uuid import UUID +from pathlib import Path + +from gn_libs import jobs, sqlite3 + + +def run_job(conn, job, outputs_directory: Path): + """Run the job.""" + job_id = job["job_id"] + stdout_file = outputs_directory.joinpath(f"{job_id}.stdout") + stderr_file = outputs_directory.joinpath(f"{job_id}.stderr") + try: + with (# TODO: Add the output streams' files to job metadata + stdout_file.open(mode="w") as outfile, + stderr_file.open(mode="w") as errfile, + subprocess.Popen( + shlex.split(job["command"]), + encoding="utf-8", + stdout=outfile, + stderr=errfile) as process): + with (stdout_file.open(mode="r") as stdout_output, + stderr_file.open(mode="r") as stderr_output): + while process.poll() is None: + jobs.update_metadata(conn, job_id, "status", "running") + jobs.push_to_stream( + conn, job_id, "stdout", stdout_output.read()) + jobs.push_to_stream( + conn, job_id, "stderr", stderr_output.read()) + time.sleep(1) + except: + jobs.update_metadata(conn, job_id, "status", "error") + jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc()) + return 4 + + +def parse_args(): + """Define and parse CLI args.""" + parser = argparse.ArgumentParser( + prog="GN Jobs Launcher", + description = ( + "Generic launcher and manager of jobs defined with gn-libs")) + parser.add_argument( + "jobs_db_uri", + help="The URI to the SQLite3 database holding the jobs' details") + parser.add_argument( + "job_id", help="The id of the job being processed", type=UUID) + parser.add_argument("outputs_directory", + help="Directory where output files will be created", + type=Path) + return parser.parse_args() + +def main(): + """Entry-point to this program.""" + args = parse_args() + with (sqlite3.connection(args.jobs_db_uri) as conn, + sqlite3.cursor(conn) as cursor): + job = jobs.job(conn, args.job_id) + if job: + return run_job(conn, job, args.outputs_directory) + + jobs.update_metadata(conn, args.job_id, "status", "error") + jobs.push_to_stream(conn, args.job_id, "stderr", "Job not found!") + return 2 + + return 3 + + + +if __name__ == "__main__": + sys.exit(main()) |