diff options
-rw-r--r-- | gn_libs/jobs/jobs.py | 2 | ||||
-rw-r--r-- | gn_libs/jobs/launcher.py | 76 | ||||
-rw-r--r-- | gn_libs/scripts/__init__.py | 1 | ||||
-rw-r--r-- | gn_libs/scripts/worker.py | 49 |
4 files changed, 77 insertions, 51 deletions
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py index 1adbc33..e2a6b00 100644 --- a/gn_libs/jobs/jobs.py +++ b/gn_libs/jobs/jobs.py @@ -105,7 +105,7 @@ def launch_job( the_job: dict, sqlite3_url: str, error_dir: Path, - worker_manager: str = "scripts.worker" + worker_manager: str = "gn_libs.jobs.launcher" ) -> dict: """Launch a job in the background""" if not os.path.exists(error_dir): diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py new file mode 100644 index 0000000..8eab55b --- /dev/null +++ b/gn_libs/jobs/launcher.py @@ -0,0 +1,76 @@ +import sys +import time +import shlex +import argparse +import traceback +import subprocess +from uuid import UUID +from pathlib import Path + +from gn_libs import jobs, sqlite3 + + +def run_job(conn, job, outputs_directory: Path): + """Run the job.""" + job_id = job["job_id"] + stdout_file = outputs_directory.joinpath(f"{job_id}.stdout") + stderr_file = outputs_directory.joinpath(f"{job_id}.stderr") + try: + with (# TODO: Add the output streams' files to job metadata + stdout_file.open(mode="w") as outfile, + stderr_file.open(mode="w") as errfile, + subprocess.Popen( + shlex.split(job["command"]), + encoding="utf-8", + stdout=outfile, + stderr=errfile) as process): + with (stdout_file.open(mode="r") as stdout_output, + stderr_file.open(mode="r") as stderr_output): + while process.poll() is None: + jobs.update_metadata(conn, job_id, "status", "running") + jobs.push_to_stream( + conn, job_id, "stdout", stdout_output.read()) + jobs.push_to_stream( + conn, job_id, "stderr", stderr_output.read()) + time.sleep(1) + except: + jobs.update_metadata(conn, job_id, "status", "error") + jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc()) + return 4 + + +def parse_args(): + """Define and parse CLI args.""" + parser = argparse.ArgumentParser( + prog="GN Jobs Launcher", + description = ( + "Generic launcher and manager of jobs defined with gn-libs")) + parser.add_argument( + "jobs_db_uri", + help="The URI to the SQLite3 database holding the jobs' details") + parser.add_argument( + "job_id", help="The id of the job being processed", type=UUID) + parser.add_argument("outputs_directory", + help="Directory where output files will be created", + type=Path) + return parser.parse_args() + +def main(): + """Entry-point to this program.""" + args = parse_args() + with (sqlite3.connection(args.jobs_db_uri) as conn, + sqlite3.cursor(conn) as cursor): + job = jobs.job(conn, args.job_id) + if job: + return run_job(conn, job, args.outputs_directory) + + jobs.update_metadata(conn, args.job_id, "status", "error") + jobs.push_to_stream(conn, args.job_id, "stderr", "Job not found!") + return 2 + + return 3 + + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/gn_libs/scripts/__init__.py b/gn_libs/scripts/__init__.py deleted file mode 100644 index 678c5e0..0000000 --- a/gn_libs/scripts/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Initialise scripts that will be accessible globally.""" diff --git a/gn_libs/scripts/worker.py b/gn_libs/scripts/worker.py deleted file mode 100644 index 2479aef..0000000 --- a/gn_libs/scripts/worker.py +++ /dev/null @@ -1,49 +0,0 @@ -import sys -import shlex -import argparse -import traceback - -from gn_libs import jobs, sqlite3 - - -def run_job(conn, job_id): - """Run the job.""" - try: - pass - except: - jobs.update_metadata(conn, args.job_id, "status", "error") - jobs.push_to_stream(conn, args.job_id, "stderr", traceback.format_exc()) - return 4 - - -def parse_args(): - """Define and parse CLI args.""" - parser = argparse.ArgumentParser( - prog="GN-Libs Worker", - description = ( - "Generic worker to launch and manage jobs defined with gn-libs")) - parser.add_argument( - "jobs_db_uri", - help="The URI to the SQLite3 database holding the jobs' details") - parser.add_argument("job_id", help="The id of the job being processed") - return parser.parse_args() - -def main(): - """Entry-point to this program.""" - args = parse_args() - with (sqlite3.connection(args.jobs_db_uri) as conn, - sqlite3.cursor(conn) as cursor): - job = jobs.job(conn, args.job_id) - if job: - return run_job(conn, args.job_id) - - jobs.update_metadata(conn, args.job_id, "status", "error") - jobs.push_to_stream(conn, args.job_id, "stderr", "Job not found!") - return 2 - - return 3 - - - -if __name__ == "__main__": - sys.exit(main()) |