aboutsummaryrefslogtreecommitdiff
path: root/gn3/jobs.py
blob: 1af63f7aaed3c57a6fea537c0c7e26c383b83b44 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""Handle external processes in a consistent manner."""
import json
from typing import Any
from uuid import UUID, uuid4
from datetime import datetime

from redis import Redis
from pymonad.either import Left, Right, Either

from gn3 import json_encoders_decoders as jed

JOBS_NAMESPACE = "GN3::JOBS"

class InvalidCommand(Exception):
    """Raise if the command to run is invalid."""

def job_key(job_id: UUID, namespace_prefix: str = JOBS_NAMESPACE):
    """Build the namespace key for a specific job."""
    return f"{namespace_prefix}::{job_id}"

def job(redisconn: Redis, job_id: UUID) -> Either:
    """Retrive the job details of a job identified by `job_id`."""
    the_job = redisconn.hgetall(job_key(job_id))
    if the_job:
        return Right({
            key: json.loads(value, object_hook=jed.custom_json_decoder)
            for key, value in the_job.items()
        })
    return Left({
        "error": "NotFound",
        "error_description": f"Job '{job_id}' was not found."
    })

def __command_valid__(job_command: Any) -> Either:
    if not isinstance(job_command, list):
        return Left({
            "error": "InvalidJobCommand",
            "error_description": "The job command MUST be a list."
        })
    if not all((isinstance(val, str) for val in job_command)):
        return Left({
            "error": "InvalidJobCommand",
            "error_description": "All parts of the command MUST be strings."
        })
    return Right(job_command)

def create_job(redisconn: Redis, job_details: dict[str, Any]) -> UUID:
    """Create a new job and put it on Redis."""
    def __create__(_job_command):
        job_id = job_details.get("job_id", uuid4())
        redisconn.hset(job_key(job_id), mapping={
            key: json.dumps(value, cls=jed.CustomJSONEncoder) for key, value in {
                **job_details, "job_id": job_id, "created": datetime.now(),
                "status": "queued"
            }.items()
        })
        return job_id
    def __raise__(err):
        raise InvalidCommand(err["error_description"])
    return __command_valid__(job_details.get("command")).either(
        __raise__, __create__)