about summary refs log tree commit diff
path: root/gn2/jobs
diff options
context:
space:
mode:
authorAlexander_Kabui2024-01-02 13:21:07 +0300
committerAlexander_Kabui2024-01-02 13:21:07 +0300
commit70c4201b332e0e2c0d958428086512f291469b87 (patch)
treeaea4fac8782c110fc233c589c3f0f7bd34bada6c /gn2/jobs
parent5092eb42f062b1695c4e39619f0bd74a876cfac2 (diff)
parent965ce5114d585624d5edb082c710b83d83a3be40 (diff)
downloadgenenetwork2-70c4201b332e0e2c0d958428086512f291469b87.tar.gz
merge changes
Diffstat (limited to 'gn2/jobs')
-rw-r--r--gn2/jobs/__init__.py0
-rw-r--r--gn2/jobs/jobs.py74
2 files changed, 74 insertions, 0 deletions
diff --git a/gn2/jobs/__init__.py b/gn2/jobs/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/gn2/jobs/__init__.py
diff --git a/gn2/jobs/jobs.py b/gn2/jobs/jobs.py
new file mode 100644
index 00000000..f796fa9a
--- /dev/null
+++ b/gn2/jobs/jobs.py
@@ -0,0 +1,74 @@
+"""Job management functions"""
+
+import sys
+import json
+import shlex
+import subprocess
+from uuid import UUID, uuid4
+
+from redis import Redis
+from pymonad.maybe import Maybe
+
+JOBS_NAMESPACE="gn2:jobs" # The namespace where jobs are kept
+
+class NoSuchJob(Exception):
+    """Raised if a given job does not exist"""
+
+    def __init__(self, job_id: UUID):
+        """Initialise the exception object."""
+        super().__init__(f"Could not find a job with the id '{job_id}'.")
+
+class InvalidJobCommand(Exception):
+    """Raised if the job command is invalid."""
+
+    def __init__(self, job_command: list[str]):
+        """Initialise the exception object."""
+        super().__init__(f"The job command '{job_command}' is invalid.")
+
+def job_namespace(job_id: UUID):
+    return f"{JOBS_NAMESPACE}:{job_id}"
+
+def job(redis_conn: Redis, job_id: UUID):
+    job = redis_conn.hgetall(job_namespace(job_id))
+    return Maybe(job, bool(job))
+
+def status(the_job: Maybe) -> str:
+    return job.maybe("NOT-FOUND", lambda val: val.get("status", "NOT-FOUND"))
+
+def command(job: Maybe) -> list[str]:
+    return job.maybe(
+        ["NOT-FOUND"], lambda val: shlex.split(val.get("command", "NOT-FOUND")))
+
+def __validate_command__(job_command):
+    try:
+        assert isinstance(job_command, list), "Not a list"
+        assert all((isinstance(val, str) for val in job_command))
+        assert all((len(val) > 1 for val in job_command))
+    except AssertionError as assert_err:
+        raise InvalidJobCommand(job_command)
+
+def queue(redis_conn: Redis, job: dict) -> UUID:
+    command = job["command"]
+    __validate_command__(command)
+    job_id = uuid4()
+    redis_conn.hset(
+        name=job_namespace(job_id),
+        mapping={"job_id": str(job_id), **job, "command": shlex.join(command)})
+    return job_id
+
+def run(job_id: UUID, redis_uri: str):
+    command = [
+        sys.executable, "-m", "scripts.run_external",
+        f"--redis-uri={redis_uri}", "run-job", str(job_id)]
+    print(f"COMMAND: {shlex.join(command)}")
+    subprocess.Popen(command)
+
+def completed_successfully(job):
+    return (
+        job.get("status") == "completed" and
+        job.get("completion-status") == "success")
+
+def completed_erroneously(job):
+    return (
+        job.get("status") == "completed" and
+        job.get("completion-status") == "error")