aboutsummaryrefslogtreecommitdiff
path: root/wqflask/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'wqflask/jobs')
-rw-r--r--wqflask/jobs/__init__.py0
-rw-r--r--wqflask/jobs/jobs.py74
2 files changed, 74 insertions, 0 deletions
diff --git a/wqflask/jobs/__init__.py b/wqflask/jobs/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/wqflask/jobs/__init__.py
diff --git a/wqflask/jobs/jobs.py b/wqflask/jobs/jobs.py
new file mode 100644
index 00000000..f796fa9a
--- /dev/null
+++ b/wqflask/jobs/jobs.py
@@ -0,0 +1,74 @@
+"""Job management functions"""
+
+import sys
+import json
+import shlex
+import subprocess
+from uuid import UUID, uuid4
+
+from redis import Redis
+from pymonad.maybe import Maybe
+
+JOBS_NAMESPACE="gn2:jobs" # The namespace where jobs are kept
+
+class NoSuchJob(Exception):
+ """Raised if a given job does not exist"""
+
+ def __init__(self, job_id: UUID):
+ """Initialise the exception object."""
+ super().__init__(f"Could not find a job with the id '{job_id}'.")
+
+class InvalidJobCommand(Exception):
+ """Raised if the job command is invalid."""
+
+ def __init__(self, job_command: list[str]):
+ """Initialise the exception object."""
+ super().__init__(f"The job command '{job_command}' is invalid.")
+
+def job_namespace(job_id: UUID):
+ return f"{JOBS_NAMESPACE}:{job_id}"
+
+def job(redis_conn: Redis, job_id: UUID):
+ job = redis_conn.hgetall(job_namespace(job_id))
+ return Maybe(job, bool(job))
+
+def status(the_job: Maybe) -> str:
+ return job.maybe("NOT-FOUND", lambda val: val.get("status", "NOT-FOUND"))
+
+def command(job: Maybe) -> list[str]:
+ return job.maybe(
+ ["NOT-FOUND"], lambda val: shlex.split(val.get("command", "NOT-FOUND")))
+
+def __validate_command__(job_command):
+ try:
+ assert isinstance(job_command, list), "Not a list"
+ assert all((isinstance(val, str) for val in job_command))
+ assert all((len(val) > 1 for val in job_command))
+ except AssertionError as assert_err:
+ raise InvalidJobCommand(job_command)
+
+def queue(redis_conn: Redis, job: dict) -> UUID:
+ command = job["command"]
+ __validate_command__(command)
+ job_id = uuid4()
+ redis_conn.hset(
+ name=job_namespace(job_id),
+ mapping={"job_id": str(job_id), **job, "command": shlex.join(command)})
+ return job_id
+
+def run(job_id: UUID, redis_uri: str):
+ command = [
+ sys.executable, "-m", "scripts.run_external",
+ f"--redis-uri={redis_uri}", "run-job", str(job_id)]
+ print(f"COMMAND: {shlex.join(command)}")
+ subprocess.Popen(command)
+
+def completed_successfully(job):
+ return (
+ job.get("status") == "completed" and
+ job.get("completion-status") == "success")
+
+def completed_erroneously(job):
+ return (
+ job.get("status") == "completed" and
+ job.get("completion-status") == "error")