aboutsummaryrefslogtreecommitdiff
path: root/gn_auth/jobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_auth/jobs.py')
-rw-r--r--gn_auth/jobs.py61
1 files changed, 61 insertions, 0 deletions
diff --git a/gn_auth/jobs.py b/gn_auth/jobs.py
new file mode 100644
index 0000000..6fc9bba
--- /dev/null
+++ b/gn_auth/jobs.py
@@ -0,0 +1,61 @@
+"""Handle external processes in a consistent manner."""
+import json
+from typing import Any
+from uuid import UUID, uuid4
+from datetime import datetime
+
+from redis import Redis
+from pymonad.either import Left, Right, Either
+
+from gn_auth import json_encoders_decoders as jed
+
+JOBS_NAMESPACE = "GN3_AUTH::JOBS"
+
+class InvalidCommand(Exception):
+ """Raise if the command to run is invalid."""
+
+def job_key(job_id: UUID, namespace_prefix: str = JOBS_NAMESPACE):
+ """Build the namespace key for a specific job."""
+ return f"{namespace_prefix}::{job_id}"
+
+def job(redisconn: Redis, job_id: UUID) -> Either:
+ """Retrive the job details of a job identified by `job_id`."""
+ the_job = redisconn.hgetall(job_key(job_id))
+ if the_job:
+ return Right({
+ key: json.loads(value, object_hook=jed.custom_json_decoder)
+ for key, value in the_job.items()
+ })
+ return Left({
+ "error": "NotFound",
+ "error_description": f"Job '{job_id}' was not found."
+ })
+
+def __command_valid__(job_command: Any) -> Either:
+ if not isinstance(job_command, list):
+ return Left({
+ "error": "InvalidJobCommand",
+ "error_description": "The job command MUST be a list."
+ })
+ if not all((isinstance(val, str) for val in job_command)):
+ return Left({
+ "error": "InvalidJobCommand",
+ "error_description": "All parts of the command MUST be strings."
+ })
+ return Right(job_command)
+
+def create_job(redisconn: Redis, job_details: dict[str, Any]) -> UUID:
+ """Create a new job and put it on Redis."""
+ def __create__(_job_command):
+ job_id = job_details.get("job_id", uuid4())
+ redisconn.hset(job_key(job_id), mapping={
+ key: json.dumps(value, cls=jed.CustomJSONEncoder) for key, value in {
+ **job_details, "job_id": job_id, "created": datetime.now(),
+ "status": "queued"
+ }.items()
+ })
+ return job_id
+ def __raise__(err):
+ raise InvalidCommand(err["error_description"])
+ return __command_valid__(job_details.get("command")).either(
+ __raise__, __create__)