about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2025-03-25 12:52:48 -0500
committerFrederick Muriuki Muriithi2025-03-25 12:56:05 -0500
commit7a7e6a6a52517b035152093429c9c551844e563d (patch)
treec1c5a9f1050030e72e8ed5b7656e18afa2014b11
parentd4b7312423d5aef91b264bb21429e79ec65cec7c (diff)
downloadgn-libs-7a7e6a6a52517b035152093429c9c551844e563d.tar.gz
Rename manager/launcher script and implement the management logic.
Give the launcher a better name, and implement the logic to actually
launch the job and do basic management of the process(es).
-rw-r--r--gn_libs/jobs/jobs.py2
-rw-r--r--gn_libs/jobs/launcher.py76
-rw-r--r--gn_libs/scripts/__init__.py1
-rw-r--r--gn_libs/scripts/worker.py49
4 files changed, 77 insertions, 51 deletions
diff --git a/gn_libs/jobs/jobs.py b/gn_libs/jobs/jobs.py
index 1adbc33..e2a6b00 100644
--- a/gn_libs/jobs/jobs.py
+++ b/gn_libs/jobs/jobs.py
@@ -105,7 +105,7 @@ def launch_job(
         the_job: dict,
         sqlite3_url: str,
         error_dir: Path,
-        worker_manager: str = "scripts.worker"
+        worker_manager: str = "gn_libs.jobs.launcher"
 ) -> dict:
     """Launch a job in the background"""
     if not os.path.exists(error_dir):
diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py
new file mode 100644
index 0000000..8eab55b
--- /dev/null
+++ b/gn_libs/jobs/launcher.py
@@ -0,0 +1,76 @@
+import sys
+import time
+import shlex
+import argparse
+import traceback
+import subprocess
+from uuid import UUID
+from pathlib import Path
+
+from gn_libs import jobs, sqlite3
+
+
+def run_job(conn, job, outputs_directory: Path):
+    """Run the job."""
+    job_id = job["job_id"]
+    stdout_file = outputs_directory.joinpath(f"{job_id}.stdout")
+    stderr_file = outputs_directory.joinpath(f"{job_id}.stderr")
+    try:
+        with (# TODO: Add the output streams' files to job metadata
+                stdout_file.open(mode="w") as outfile,
+                stderr_file.open(mode="w") as errfile,
+                subprocess.Popen(
+                    shlex.split(job["command"]),
+                    encoding="utf-8",
+                    stdout=outfile,
+                    stderr=errfile) as process):
+            with (stdout_file.open(mode="r") as stdout_output,
+                  stderr_file.open(mode="r") as stderr_output):
+                while process.poll() is None:
+                    jobs.update_metadata(conn, job_id, "status", "running")
+                    jobs.push_to_stream(
+                        conn, job_id, "stdout", stdout_output.read())
+                    jobs.push_to_stream(
+                        conn, job_id, "stderr", stderr_output.read())
+                    time.sleep(1)
+    except:
+        jobs.update_metadata(conn, job_id, "status", "error")
+        jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc())
+        return 4
+
+
+def parse_args():
+    """Define and parse CLI args."""
+    parser = argparse.ArgumentParser(
+        prog="GN Jobs Launcher",
+        description = (
+            "Generic launcher and manager of jobs defined with gn-libs"))
+    parser.add_argument(
+        "jobs_db_uri",
+        help="The URI to the SQLite3 database holding the jobs' details")
+    parser.add_argument(
+        "job_id", help="The id of the job being processed", type=UUID)
+    parser.add_argument("outputs_directory",
+                        help="Directory where output files will be created",
+                        type=Path)
+    return parser.parse_args()
+
+def main():
+    """Entry-point to this program."""
+    args = parse_args()
+    with (sqlite3.connection(args.jobs_db_uri) as conn,
+          sqlite3.cursor(conn) as cursor):
+        job = jobs.job(conn, args.job_id)
+        if job:
+            return run_job(conn, job, args.outputs_directory)
+
+        jobs.update_metadata(conn, args.job_id, "status", "error")
+        jobs.push_to_stream(conn, args.job_id, "stderr", "Job not found!")
+        return 2
+
+    return 3
+
+
+
+if __name__ == "__main__":
+    sys.exit(main())
diff --git a/gn_libs/scripts/__init__.py b/gn_libs/scripts/__init__.py
deleted file mode 100644
index 678c5e0..0000000
--- a/gn_libs/scripts/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-"""Initialise scripts that will be accessible globally."""
diff --git a/gn_libs/scripts/worker.py b/gn_libs/scripts/worker.py
deleted file mode 100644
index 2479aef..0000000
--- a/gn_libs/scripts/worker.py
+++ /dev/null
@@ -1,49 +0,0 @@
-import sys
-import shlex
-import argparse
-import traceback
-
-from gn_libs import jobs, sqlite3
-
-
-def run_job(conn, job_id):
-    """Run the job."""
-    try:
-        pass
-    except:
-        jobs.update_metadata(conn, args.job_id, "status", "error")
-        jobs.push_to_stream(conn, args.job_id, "stderr", traceback.format_exc())
-        return 4
-
-
-def parse_args():
-    """Define and parse CLI args."""
-    parser = argparse.ArgumentParser(
-        prog="GN-Libs Worker",
-        description = (
-            "Generic worker to launch and manage jobs defined with gn-libs"))
-    parser.add_argument(
-        "jobs_db_uri",
-        help="The URI to the SQLite3 database holding the jobs' details")
-    parser.add_argument("job_id", help="The id of the job being processed")
-    return parser.parse_args()
-
-def main():
-    """Entry-point to this program."""
-    args = parse_args()
-    with (sqlite3.connection(args.jobs_db_uri) as conn,
-          sqlite3.cursor(conn) as cursor):
-        job = jobs.job(conn, args.job_id)
-        if job:
-            return run_job(conn, args.job_id)
-
-        jobs.update_metadata(conn, args.job_id, "status", "error")
-        jobs.push_to_stream(conn, args.job_id, "stderr", "Job not found!")
-        return 2
-
-    return 3
-
-
-
-if __name__ == "__main__":
-    sys.exit(main())