1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
"""Job management functions"""
import sys
import json
import shlex
import subprocess
from uuid import UUID, uuid4
from redis import Redis
from pymonad.maybe import Maybe
JOBS_NAMESPACE="gn2:jobs" # The namespace where jobs are kept
class NoSuchJob(Exception):
"""Raised if a given job does not exist"""
def __init__(self, job_id: UUID):
"""Initialise the exception object."""
super().__init__(f"Could not find a job with the id '{job_id}'.")
class InvalidJobCommand(Exception):
"""Raised if the job command is invalid."""
def __init__(self, job_command: list[str]):
"""Initialise the exception object."""
super().__init__(f"The job command '{job_command}' is invalid.")
def job_namespace(job_id: UUID):
return f"{JOBS_NAMESPACE}:{job_id}"
def job(redis_conn: Redis, job_id: UUID):
job = redis_conn.hgetall(job_namespace(job_id))
return Maybe(job, bool(job))
def status(the_job: Maybe) -> str:
return job.maybe("NOT-FOUND", lambda val: val.get("status", "NOT-FOUND"))
def command(job: Maybe) -> list[str]:
return job.maybe(
["NOT-FOUND"], lambda val: shlex.split(val.get("command", "NOT-FOUND")))
def __validate_command__(job_command):
try:
assert isinstance(job_command, list), "Not a list"
assert all((isinstance(val, str) for val in job_command))
assert all((len(val) > 1 for val in job_command))
except AssertionError as assert_err:
raise InvalidJobCommand(job_command)
def queue(redis_conn: Redis, job: dict) -> UUID:
command = job["command"]
__validate_command__(command)
job_id = uuid4()
redis_conn.hset(
name=job_namespace(job_id),
mapping={"job_id": str(job_id), **job, "command": shlex.join(command)})
return job_id
def run(job_id: UUID, redis_uri: str):
command = [
sys.executable, "-m", "scripts.run_external",
f"--redis-uri={redis_uri}", "run-job", str(job_id)]
print(f"COMMAND: {shlex.join(command)}")
subprocess.Popen(command)
def completed_successfully(job):
return (
job.get("status") == "completed" and
job.get("completion-status") == "success")
def completed_erroneously(job):
return (
job.get("status") == "completed" and
job.get("completion-status") == "error")
|