aboutsummaryrefslogtreecommitdiff
path: root/gn3/jobs.py
blob: 8854a61469e4c4b7f8320588a8476b0593a1dc20 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""Handle external processes in a consistent manner."""
import json
from typing import Any
from uuid import UUID, uuid4
from datetime import datetime

from redis import Redis

from pymonad.either import Left, Right, Either

JOBS_NAMESPACE = "GN3::JOBS"

class InvalidCommand(Exception):
    """Raise if the command to run is invalid."""

def job_key(job_id: UUID, namespace_prefix: str = JOBS_NAMESPACE):
    """Build the namespace key for a specific job."""
    return f"{namespace_prefix}::{job_id}"

def job(redisconn: Redis, job_id: UUID) -> Either:
    """Retrive the job details of a job identified by `job_id`."""
    the_job = redisconn.hgetall(job_key(job_id))
    if the_job:
        return Right({
            **the_job,
            "search_results": json.loads(the_job["search_results"])
        })
    return Left({
        "error": "NotFound",
        "error_description": f"Job '{job_id}' was not found."
    })

def __command_valid__(job_command: Any) -> Either:
    if not isinstance(job_command, list):
        return Left({
            "error": "InvalidJobCommand",
            "error_description": "The job command MUST be a list."
        })
    if not all((isinstance(val, str) for val in job_command)):
        return Left({
            "error": "InvalidJobCommand",
            "error_description": "All parts of the command MUST be strings."
        })
    return Right(job_command)

def create_job(redisconn: Redis, job_details: dict[str, Any]) -> UUID:
    """Create a new job and put it on Redis."""
    def __create__(_job_command):
        job_id = job_details.get("job_id", uuid4())
        redisconn.hset(job_key(job_id), mapping={
            **job_details, "job_id": job_id, "created": datetime.now(), "stdout": "",
            "stderr": "", "status": "queued"
        })
        return job_id
    def __raise__(err):
        raise InvalidCommand(err["error_description"])
    return __command_valid__(job_details.get("command")).either(
        __raise__, __create__)