about summary refs log tree commit diff
path: root/wqflask/jobs
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-10-24 14:14:17 +0300
committerFrederick Muriuki Muriithi2022-10-28 15:55:30 +0300
commit560eb051e127fe4b8b93104200fe55512a72038f (patch)
tree5d652e2c25f04f54c97230f82b4b1918d4f15f43 /wqflask/jobs
parentfcf257bff816703433d10b942f959dbb78f6c5e3 (diff)
downloadgenenetwork2-560eb051e127fe4b8b93104200fe55512a72038f.tar.gz
Add external process manager
* wqflask/jobs/__init__.py: New jobs module
* wqflask/jobs/jobs.py: New jobs module
* wqflask/scripts/run_external.py: new external process manager.
Diffstat (limited to 'wqflask/jobs')
-rw-r--r--wqflask/jobs/__init__.py0
-rw-r--r--wqflask/jobs/jobs.py74
2 files changed, 74 insertions, 0 deletions
diff --git a/wqflask/jobs/__init__.py b/wqflask/jobs/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/wqflask/jobs/__init__.py
diff --git a/wqflask/jobs/jobs.py b/wqflask/jobs/jobs.py
new file mode 100644
index 00000000..f796fa9a
--- /dev/null
+++ b/wqflask/jobs/jobs.py
@@ -0,0 +1,74 @@
+"""Job management functions"""
+
+import sys
+import json
+import shlex
+import subprocess
+from uuid import UUID, uuid4
+
+from redis import Redis
+from pymonad.maybe import Maybe
+
+JOBS_NAMESPACE="gn2:jobs" # The namespace where jobs are kept
+
+class NoSuchJob(Exception):
+    """Raised if a given job does not exist"""
+
+    def __init__(self, job_id: UUID):
+        """Initialise the exception object."""
+        super().__init__(f"Could not find a job with the id '{job_id}'.")
+
+class InvalidJobCommand(Exception):
+    """Raised if the job command is invalid."""
+
+    def __init__(self, job_command: list[str]):
+        """Initialise the exception object."""
+        super().__init__(f"The job command '{job_command}' is invalid.")
+
+def job_namespace(job_id: UUID):
+    return f"{JOBS_NAMESPACE}:{job_id}"
+
+def job(redis_conn: Redis, job_id: UUID):
+    job = redis_conn.hgetall(job_namespace(job_id))
+    return Maybe(job, bool(job))
+
+def status(the_job: Maybe) -> str:
+    return job.maybe("NOT-FOUND", lambda val: val.get("status", "NOT-FOUND"))
+
+def command(job: Maybe) -> list[str]:
+    return job.maybe(
+        ["NOT-FOUND"], lambda val: shlex.split(val.get("command", "NOT-FOUND")))
+
+def __validate_command__(job_command):
+    try:
+        assert isinstance(job_command, list), "Not a list"
+        assert all((isinstance(val, str) for val in job_command))
+        assert all((len(val) > 1 for val in job_command))
+    except AssertionError as assert_err:
+        raise InvalidJobCommand(job_command)
+
+def queue(redis_conn: Redis, job: dict) -> UUID:
+    command = job["command"]
+    __validate_command__(command)
+    job_id = uuid4()
+    redis_conn.hset(
+        name=job_namespace(job_id),
+        mapping={"job_id": str(job_id), **job, "command": shlex.join(command)})
+    return job_id
+
+def run(job_id: UUID, redis_uri: str):
+    command = [
+        sys.executable, "-m", "scripts.run_external",
+        f"--redis-uri={redis_uri}", "run-job", str(job_id)]
+    print(f"COMMAND: {shlex.join(command)}")
+    subprocess.Popen(command)
+
+def completed_successfully(job):
+    return (
+        job.get("status") == "completed" and
+        job.get("completion-status") == "success")
+
+def completed_erroneously(job):
+    return (
+        job.get("status") == "completed" and
+        job.get("completion-status") == "error")