aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/jobs.py
blob: 8f9f4f0aa1ef02076994ed4cff2d075bab91c8d9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""Handle external processes in a consistent manner."""
import json
from typing import Any
from uuid import UUID, uuid4
from datetime import datetime

from redis import Redis
from pymonad.either import Left, Right, Either

from gn_auth import json_encoders_decoders as jed

JOBS_NAMESPACE = "GN_AUTH::JOBS"

class InvalidCommand(Exception):
    """Raise if the command to run is invalid."""

def job_key(job_id: UUID, namespace_prefix: str = JOBS_NAMESPACE):
    """Build the namespace key for a specific job."""
    return f"{namespace_prefix}::{job_id}"

def job(redisconn: Redis, job_id: UUID) -> Either:
    """Retrive the job details of a job identified by `job_id`."""
    the_job = redisconn.hgetall(job_key(job_id))
    if the_job:
        return Right({
            key: json.loads(value, object_hook=jed.custom_json_decoder)
            for key, value in the_job.items()
        })
    return Left({
        "error": "NotFound",
        "error_description": f"Job '{job_id}' was not found."
    })

def __command_valid__(job_command: Any) -> Either:
    if not isinstance(job_command, list):
        return Left({
            "error": "InvalidJobCommand",
            "error_description": "The job command MUST be a list."
        })
    if not all((isinstance(val, str) for val in job_command)):
        return Left({
            "error": "InvalidJobCommand",
            "error_description": "All parts of the command MUST be strings."
        })
    return Right(job_command)

def create_job(redisconn: Redis, job_details: dict[str, Any]) -> UUID:
    """Create a new job and put it on Redis."""
    def __create__(_job_command):
        job_id = job_details.get("job_id", uuid4())
        redisconn.hset(job_key(job_id), mapping={
            key: json.dumps(value, cls=jed.CustomJSONEncoder) for key, value in {
                **job_details, "job_id": job_id, "created": datetime.now(),
                "status": "queued"
            }.items()
        })
        return job_id
    def __raise__(err):
        raise InvalidCommand(err["error_description"])
    return __command_valid__(job_details.get("command")).either(
        __raise__, __create__)