diff options
Diffstat (limited to 'wqflask/jobs')
-rw-r--r-- | wqflask/jobs/__init__.py | 0 | ||||
-rw-r--r-- | wqflask/jobs/jobs.py | 74 |
2 files changed, 0 insertions, 74 deletions
diff --git a/wqflask/jobs/__init__.py b/wqflask/jobs/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/wqflask/jobs/__init__.py +++ /dev/null diff --git a/wqflask/jobs/jobs.py b/wqflask/jobs/jobs.py deleted file mode 100644 index f796fa9a..00000000 --- a/wqflask/jobs/jobs.py +++ /dev/null @@ -1,74 +0,0 @@ -"""Job management functions""" - -import sys -import json -import shlex -import subprocess -from uuid import UUID, uuid4 - -from redis import Redis -from pymonad.maybe import Maybe - -JOBS_NAMESPACE="gn2:jobs" # The namespace where jobs are kept - -class NoSuchJob(Exception): - """Raised if a given job does not exist""" - - def __init__(self, job_id: UUID): - """Initialise the exception object.""" - super().__init__(f"Could not find a job with the id '{job_id}'.") - -class InvalidJobCommand(Exception): - """Raised if the job command is invalid.""" - - def __init__(self, job_command: list[str]): - """Initialise the exception object.""" - super().__init__(f"The job command '{job_command}' is invalid.") - -def job_namespace(job_id: UUID): - return f"{JOBS_NAMESPACE}:{job_id}" - -def job(redis_conn: Redis, job_id: UUID): - job = redis_conn.hgetall(job_namespace(job_id)) - return Maybe(job, bool(job)) - -def status(the_job: Maybe) -> str: - return job.maybe("NOT-FOUND", lambda val: val.get("status", "NOT-FOUND")) - -def command(job: Maybe) -> list[str]: - return job.maybe( - ["NOT-FOUND"], lambda val: shlex.split(val.get("command", "NOT-FOUND"))) - -def __validate_command__(job_command): - try: - assert isinstance(job_command, list), "Not a list" - assert all((isinstance(val, str) for val in job_command)) - assert all((len(val) > 1 for val in job_command)) - except AssertionError as assert_err: - raise InvalidJobCommand(job_command) - -def queue(redis_conn: Redis, job: dict) -> UUID: - command = job["command"] - __validate_command__(command) - job_id = uuid4() - redis_conn.hset( - name=job_namespace(job_id), - mapping={"job_id": str(job_id), **job, "command": shlex.join(command)}) - return job_id - -def run(job_id: UUID, redis_uri: str): - command = [ - sys.executable, "-m", "scripts.run_external", - f"--redis-uri={redis_uri}", "run-job", str(job_id)] - print(f"COMMAND: {shlex.join(command)}") - subprocess.Popen(command) - -def completed_successfully(job): - return ( - job.get("status") == "completed" and - job.get("completion-status") == "success") - -def completed_erroneously(job): - return ( - job.get("status") == "completed" and - job.get("completion-status") == "error") |