From 560eb051e127fe4b8b93104200fe55512a72038f Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Mon, 24 Oct 2022 14:14:17 +0300 Subject: Add external process manager * wqflask/jobs/__init__.py: New jobs module * wqflask/jobs/jobs.py: New jobs module * wqflask/scripts/run_external.py: new external process manager. --- wqflask/jobs/jobs.py | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 wqflask/jobs/jobs.py (limited to 'wqflask/jobs/jobs.py') diff --git a/wqflask/jobs/jobs.py b/wqflask/jobs/jobs.py new file mode 100644 index 00000000..f796fa9a --- /dev/null +++ b/wqflask/jobs/jobs.py @@ -0,0 +1,74 @@ +"""Job management functions""" + +import sys +import json +import shlex +import subprocess +from uuid import UUID, uuid4 + +from redis import Redis +from pymonad.maybe import Maybe + +JOBS_NAMESPACE="gn2:jobs" # The namespace where jobs are kept + +class NoSuchJob(Exception): + """Raised if a given job does not exist""" + + def __init__(self, job_id: UUID): + """Initialise the exception object.""" + super().__init__(f"Could not find a job with the id '{job_id}'.") + +class InvalidJobCommand(Exception): + """Raised if the job command is invalid.""" + + def __init__(self, job_command: list[str]): + """Initialise the exception object.""" + super().__init__(f"The job command '{job_command}' is invalid.") + +def job_namespace(job_id: UUID): + return f"{JOBS_NAMESPACE}:{job_id}" + +def job(redis_conn: Redis, job_id: UUID): + job = redis_conn.hgetall(job_namespace(job_id)) + return Maybe(job, bool(job)) + +def status(the_job: Maybe) -> str: + return job.maybe("NOT-FOUND", lambda val: val.get("status", "NOT-FOUND")) + +def command(job: Maybe) -> list[str]: + return job.maybe( + ["NOT-FOUND"], lambda val: shlex.split(val.get("command", "NOT-FOUND"))) + +def __validate_command__(job_command): + try: + assert isinstance(job_command, list), "Not a list" + assert all((isinstance(val, str) for val in job_command)) + assert all((len(val) > 1 for val in job_command)) + except AssertionError as assert_err: + raise InvalidJobCommand(job_command) + +def queue(redis_conn: Redis, job: dict) -> UUID: + command = job["command"] + __validate_command__(command) + job_id = uuid4() + redis_conn.hset( + name=job_namespace(job_id), + mapping={"job_id": str(job_id), **job, "command": shlex.join(command)}) + return job_id + +def run(job_id: UUID, redis_uri: str): + command = [ + sys.executable, "-m", "scripts.run_external", + f"--redis-uri={redis_uri}", "run-job", str(job_id)] + print(f"COMMAND: {shlex.join(command)}") + subprocess.Popen(command) + +def completed_successfully(job): + return ( + job.get("status") == "completed" and + job.get("completion-status") == "success") + +def completed_erroneously(job): + return ( + job.get("status") == "completed" and + job.get("completion-status") == "error") -- cgit v1.2.3