about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-10-24 14:14:17 +0300
committerFrederick Muriuki Muriithi2022-10-28 15:55:30 +0300
commit560eb051e127fe4b8b93104200fe55512a72038f (patch)
tree5d652e2c25f04f54c97230f82b4b1918d4f15f43
parentfcf257bff816703433d10b942f959dbb78f6c5e3 (diff)
downloadgenenetwork2-560eb051e127fe4b8b93104200fe55512a72038f.tar.gz
Add external process manager
* wqflask/jobs/__init__.py: New jobs module
* wqflask/jobs/jobs.py: New jobs module
* wqflask/scripts/run_external.py: new external process manager.
-rw-r--r--wqflask/jobs/__init__.py0
-rw-r--r--wqflask/jobs/jobs.py74
-rw-r--r--wqflask/scripts/run_external.py159
3 files changed, 233 insertions, 0 deletions
diff --git a/wqflask/jobs/__init__.py b/wqflask/jobs/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/wqflask/jobs/__init__.py
diff --git a/wqflask/jobs/jobs.py b/wqflask/jobs/jobs.py
new file mode 100644
index 00000000..f796fa9a
--- /dev/null
+++ b/wqflask/jobs/jobs.py
@@ -0,0 +1,74 @@
+"""Job management functions"""
+
+import sys
+import json
+import shlex
+import subprocess
+from uuid import UUID, uuid4
+
+from redis import Redis
+from pymonad.maybe import Maybe
+
+JOBS_NAMESPACE="gn2:jobs" # The namespace where jobs are kept
+
+class NoSuchJob(Exception):
+    """Raised if a given job does not exist"""
+
+    def __init__(self, job_id: UUID):
+        """Initialise the exception object."""
+        super().__init__(f"Could not find a job with the id '{job_id}'.")
+
+class InvalidJobCommand(Exception):
+    """Raised if the job command is invalid."""
+
+    def __init__(self, job_command: list[str]):
+        """Initialise the exception object."""
+        super().__init__(f"The job command '{job_command}' is invalid.")
+
+def job_namespace(job_id: UUID):
+    return f"{JOBS_NAMESPACE}:{job_id}"
+
+def job(redis_conn: Redis, job_id: UUID):
+    job = redis_conn.hgetall(job_namespace(job_id))
+    return Maybe(job, bool(job))
+
+def status(the_job: Maybe) -> str:
+    return job.maybe("NOT-FOUND", lambda val: val.get("status", "NOT-FOUND"))
+
+def command(job: Maybe) -> list[str]:
+    return job.maybe(
+        ["NOT-FOUND"], lambda val: shlex.split(val.get("command", "NOT-FOUND")))
+
+def __validate_command__(job_command):
+    try:
+        assert isinstance(job_command, list), "Not a list"
+        assert all((isinstance(val, str) for val in job_command))
+        assert all((len(val) > 1 for val in job_command))
+    except AssertionError as assert_err:
+        raise InvalidJobCommand(job_command)
+
+def queue(redis_conn: Redis, job: dict) -> UUID:
+    command = job["command"]
+    __validate_command__(command)
+    job_id = uuid4()
+    redis_conn.hset(
+        name=job_namespace(job_id),
+        mapping={"job_id": str(job_id), **job, "command": shlex.join(command)})
+    return job_id
+
+def run(job_id: UUID, redis_uri: str):
+    command = [
+        sys.executable, "-m", "scripts.run_external",
+        f"--redis-uri={redis_uri}", "run-job", str(job_id)]
+    print(f"COMMAND: {shlex.join(command)}")
+    subprocess.Popen(command)
+
+def completed_successfully(job):
+    return (
+        job.get("status") == "completed" and
+        job.get("completion-status") == "success")
+
+def completed_erroneously(job):
+    return (
+        job.get("status") == "completed" and
+        job.get("completion-status") == "error")
diff --git a/wqflask/scripts/run_external.py b/wqflask/scripts/run_external.py
new file mode 100644
index 00000000..3cefa033
--- /dev/null
+++ b/wqflask/scripts/run_external.py
@@ -0,0 +1,159 @@
+"""
+Run jobs in external processes.
+"""
+
+import os
+import sys
+import shlex
+import argparse
+import traceback
+import subprocess
+from uuid import UUID
+from time import sleep
+from datetime import datetime
+from urllib.parse import urlparse
+from tempfile import TemporaryDirectory
+
+# import psutil
+from redis import Redis
+
+import jobs.jobs as jobs
+
+def print_help(args, parser):
+    print(parser.format_help())
+
+def UUID4(val):
+    return UUID(val)
+
+def redis_connection(parsed_url):
+    return Redis.from_url(
+        f"redis://{parsed_url.netloc}{parsed_url.path}", decode_responses=True)
+
+def update_status(redis_conn: Redis, job_id: UUID, value: str):
+    "Update the job's status."
+    redis_conn.hset(jobs.job_namespace(job_id), key="status", value=value)
+
+def __update_stdout_stderr__(
+        redis_conn: Redis, job_id: UUID, bytes_read: bytes, stream: str):
+    job = jobs.job(redis_conn, job_id)
+    if job.is_nothing():
+        raise jobs.NoSuchJob(job_id)
+
+    job = job.maybe({}, lambda x: x)
+    redis_conn.hset(
+        jobs.job_namespace(job_id), key=stream,
+        value=(job.get(stream, "") + bytes_read.decode("utf-8")))
+
+def set_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
+    """Set the stdout value for the given job."""
+    job = jobs.job(redis_conn, job_id)
+    if job.is_nothing():
+        raise jobs.NoSuchJob(job_id)
+
+    job = job.maybe({}, lambda x: x)
+    redis_conn.hset(
+        jobs.job_namespace(job_id), key="stdout",
+        value=bytes_read.decode("utf-8"))
+
+def update_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
+    """Update the stdout value for the given job."""
+    __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stdout")
+
+def update_stderr(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
+    """Update the stderr value for the given job."""
+    __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stderr")
+
+def set_meta(redis_conn: Redis, job_id: UUID, meta_key: str, meta_val: str):
+    job = jobs.job(redis_conn, job_id)
+    if job.is_nothing():
+        raise jobs.NoSuchJob(job_id)
+
+    redis_conn.hset(jobs.job_namespace(job_id), key=meta_key, value=meta_val)
+
+def run_job(redis_conn: Redis, job_id: UUID):
+    """Run the job in an external process."""
+    print(f"THE ARGUMENTS TO RUN_JOB:\n\tConnection: {redis_conn}\n\tJob ID: {job_id}\n")
+
+    the_job = jobs.job(redis_conn, job_id)
+    if the_job.is_nothing():
+        raise jobs.NoSuchJob(job_id)
+
+    with TemporaryDirectory() as tmpdir:
+        stdout_file = f"{tmpdir}/{job_id}.stdout"
+        stderr_file = f"{tmpdir}/{job_id}.stderr"
+        with open(stdout_file, "w+b") as outfl, open(stderr_file, "w+b") as errfl:
+            with subprocess.Popen(
+                    jobs.command(the_job), stdout=outfl,
+                    stderr=errfl) as process:
+                while process.poll() is None:
+                    update_status(redis_conn, job_id, "running")
+                    update_stdout(redis_conn, job_id, outfl.read1())
+                    sleep(1)
+
+            update_status(redis_conn, job_id, "completed")
+            with open(stdout_file, "rb") as outfl, open(stderr_file, "rb") as errfl:
+                set_stdout(redis_conn, job_id, outfl.read())
+                update_stderr(redis_conn, job_id, errfl.read())
+
+            os.remove(stdout_file)
+            os.remove(stderr_file)
+
+    returncode = process.returncode
+    set_meta(redis_conn, job_id, "completion-status",
+             ("success" if returncode == 0 else "error"))
+    set_meta(redis_conn, job_id, "return-code", "error")
+    return process.returncode
+
+def run_job_parser(parent_parser):
+    parser = parent_parser.add_parser(
+        "run-job",
+        help="run job with given id")
+    parser.add_argument(
+        "job_id", type=UUID4, help="A string representing a UUID4 value.")
+    parser.set_defaults(
+        run=lambda conn, args, parser: run_job(conn, args.job_id))
+
+def add_subparsers(parent_parser, *subparser_fns):
+    sub_parsers = parent_parser.add_subparsers(
+        title="subcommands", description="valid subcommands", required=True)
+    for parser_fn in subparser_fns:
+        parser_fn(sub_parsers)
+        pass
+
+    return parent_parser
+
+def parse_cli_args():
+    parser = add_subparsers(argparse.ArgumentParser(
+        description=sys.modules[__name__].__doc__.strip()), run_job_parser)
+    parser.add_argument(
+        "--redis-uri", required=True,
+        help=(
+            "URI to use to connect to job management db."
+            "The URI should be of the form "
+            "'<scheme>://<user>:<passwd>@<host>:<port>/<path>'"),
+        type=urlparse)
+    return parser, parser.parse_args()
+
+def launch_manager():
+    parser, args = parse_cli_args()
+    with redis_connection(args.redis_uri) as conn:
+        try:
+            return args.run(conn, args, parser)
+        except Exception as nsj:
+            prev_msg = (
+                conn.hget(f"{jobs.JOBS_NAMESPACE}:manager", key="stderr") or "")
+            if bool(prev_msg):
+                prev_msg = f"{prev_msg}\n"
+
+            notfoundmsg = (
+                f"{prev_msg}"
+                f"{datetime.now().isoformat()}: {type(nsj).__name__}: {traceback.format_exc()}")
+            conn.hset(
+                f"{jobs.JOBS_NAMESPACE}:manager",
+                key="stderr",
+                value=notfoundmsg)
+
+if __name__ == "__main__":
+    def run():
+        sys.exit(launch_manager())
+    run()