1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
"""Handle external processes in a consistent manner."""
import json
from typing import Any
from uuid import UUID, uuid4
from datetime import datetime
from redis import Redis
from pymonad.either import Left, Right, Either
JOBS_NAMESPACE = "GN3::JOBS"
class InvalidCommand(Exception):
"""Raise if the command to run is invalid."""
def job_key(job_id: UUID, namespace_prefix: str = JOBS_NAMESPACE):
"""Build the namespace key for a specific job."""
return f"{namespace_prefix}::{job_id}"
def job(redisconn: Redis, job_id: UUID) -> Either:
"""Retrive the job details of a job identified by `job_id`."""
the_job = redisconn.hgetall(job_key(job_id))
if the_job:
return Right({
**the_job,
"search_results": json.loads(the_job["search_results"])
})
return Left({
"error": "NotFound",
"error_description": f"Job '{job_id}' was not found."
})
def __command_valid__(job_command: Any) -> Either:
if not isinstance(job_command, list):
return Left({
"error": "InvalidJobCommand",
"error_description": "The job command MUST be a list."
})
if not all((isinstance(val, str) for val in job_command)):
return Left({
"error": "InvalidJobCommand",
"error_description": "All parts of the command MUST be strings."
})
return Right(job_command)
def create_job(redisconn: Redis, job_details: dict[str, Any]) -> UUID:
"""Create a new job and put it on Redis."""
def __create__(_job_command):
job_id = job_details.get("job_id", uuid4())
redisconn.hset(job_key(job_id), mapping={
**job_details, "job_id": job_id, "created": datetime.now(), "stdout": "",
"stderr": "", "status": "queued"
})
return job_id
def __raise__(err):
raise InvalidCommand(err["error_description"])
return __command_valid__(job_details.get("command")).either(
__raise__, __create__)
|