aboutsummaryrefslogtreecommitdiff
path: root/gn2/jobs/jobs.py
blob: dfcdf27497c449e9485313022b119d13d93e51a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""Job management functions"""

import sys
import json
import shlex
import subprocess
from uuid import UUID, uuid4

from redis import Redis
from pymonad.maybe import Maybe

JOBS_NAMESPACE="gn2:jobs" # The namespace where jobs are kept

class NoSuchJob(Exception):
    """Raised if a given job does not exist"""

    def __init__(self, job_id: UUID):
        """Initialise the exception object."""
        super().__init__(f"Could not find a job with the id '{job_id}'.")

class InvalidJobCommand(Exception):
    """Raised if the job command is invalid."""

    def __init__(self, job_command: list[str]):
        """Initialise the exception object."""
        super().__init__(f"The job command '{job_command}' is invalid.")

def job_namespace(job_id: UUID):
    return f"{JOBS_NAMESPACE}:{job_id}"

def job(redis_conn: Redis, job_id: UUID):
    job = redis_conn.hgetall(job_namespace(job_id))
    return Maybe(job, bool(job))

def status(the_job: Maybe) -> str:
    return job.maybe("NOT-FOUND", lambda val: val.get("status", "NOT-FOUND"))

def command(job: Maybe) -> list[str]:
    return job.maybe(
        ["NOT-FOUND"], lambda val: shlex.split(val.get("command", "NOT-FOUND")))

def __validate_command__(job_command):
    try:
        assert isinstance(job_command, list), "Not a list"
        assert all((isinstance(val, str) for val in job_command))
        assert all((len(val) > 1 for val in job_command))
    except AssertionError as assert_err:
        raise InvalidJobCommand(job_command)

def queue(redis_conn: Redis, job: dict) -> UUID:
    command = job["command"]
    __validate_command__(command)
    job_id = uuid4()
    redis_conn.hset(
        name=job_namespace(job_id),
        mapping={"job_id": str(job_id), **job, "command": shlex.join(command)})
    return job_id

def run(job_id: UUID, redis_uri: str):
    command = [
        sys.executable, "-m", "gn2.scripts.run_external",
        f"--redis-uri={redis_uri}", "run-job", str(job_id)]
    print(f"COMMAND: {shlex.join(command)}")
    subprocess.Popen(command)

def completed_successfully(job):
    return (
        job.get("status") == "completed" and
        job.get("completion-status") == "success")

def completed_erroneously(job):
    return (
        job.get("status") == "completed" and
        job.get("completion-status") == "error")