diff options
author | Frederick Muriuki Muriithi | 2022-10-24 14:14:17 +0300 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2022-10-28 15:55:30 +0300 |
commit | 560eb051e127fe4b8b93104200fe55512a72038f (patch) | |
tree | 5d652e2c25f04f54c97230f82b4b1918d4f15f43 /wqflask | |
parent | fcf257bff816703433d10b942f959dbb78f6c5e3 (diff) | |
download | genenetwork2-560eb051e127fe4b8b93104200fe55512a72038f.tar.gz |
Add external process manager
* wqflask/jobs/__init__.py: New jobs module
* wqflask/jobs/jobs.py: New jobs module
* wqflask/scripts/run_external.py: new external process manager.
Diffstat (limited to 'wqflask')
-rw-r--r-- | wqflask/jobs/__init__.py | 0 | ||||
-rw-r--r-- | wqflask/jobs/jobs.py | 74 | ||||
-rw-r--r-- | wqflask/scripts/run_external.py | 159 |
3 files changed, 233 insertions, 0 deletions
diff --git a/wqflask/jobs/__init__.py b/wqflask/jobs/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/wqflask/jobs/__init__.py diff --git a/wqflask/jobs/jobs.py b/wqflask/jobs/jobs.py new file mode 100644 index 00000000..f796fa9a --- /dev/null +++ b/wqflask/jobs/jobs.py @@ -0,0 +1,74 @@ +"""Job management functions""" + +import sys +import json +import shlex +import subprocess +from uuid import UUID, uuid4 + +from redis import Redis +from pymonad.maybe import Maybe + +JOBS_NAMESPACE="gn2:jobs" # The namespace where jobs are kept + +class NoSuchJob(Exception): + """Raised if a given job does not exist""" + + def __init__(self, job_id: UUID): + """Initialise the exception object.""" + super().__init__(f"Could not find a job with the id '{job_id}'.") + +class InvalidJobCommand(Exception): + """Raised if the job command is invalid.""" + + def __init__(self, job_command: list[str]): + """Initialise the exception object.""" + super().__init__(f"The job command '{job_command}' is invalid.") + +def job_namespace(job_id: UUID): + return f"{JOBS_NAMESPACE}:{job_id}" + +def job(redis_conn: Redis, job_id: UUID): + job = redis_conn.hgetall(job_namespace(job_id)) + return Maybe(job, bool(job)) + +def status(the_job: Maybe) -> str: + return job.maybe("NOT-FOUND", lambda val: val.get("status", "NOT-FOUND")) + +def command(job: Maybe) -> list[str]: + return job.maybe( + ["NOT-FOUND"], lambda val: shlex.split(val.get("command", "NOT-FOUND"))) + +def __validate_command__(job_command): + try: + assert isinstance(job_command, list), "Not a list" + assert all((isinstance(val, str) for val in job_command)) + assert all((len(val) > 1 for val in job_command)) + except AssertionError as assert_err: + raise InvalidJobCommand(job_command) + +def queue(redis_conn: Redis, job: dict) -> UUID: + command = job["command"] + __validate_command__(command) + job_id = uuid4() + redis_conn.hset( + name=job_namespace(job_id), + mapping={"job_id": str(job_id), **job, "command": shlex.join(command)}) + return job_id + +def run(job_id: UUID, redis_uri: str): + command = [ + sys.executable, "-m", "scripts.run_external", + f"--redis-uri={redis_uri}", "run-job", str(job_id)] + print(f"COMMAND: {shlex.join(command)}") + subprocess.Popen(command) + +def completed_successfully(job): + return ( + job.get("status") == "completed" and + job.get("completion-status") == "success") + +def completed_erroneously(job): + return ( + job.get("status") == "completed" and + job.get("completion-status") == "error") diff --git a/wqflask/scripts/run_external.py b/wqflask/scripts/run_external.py new file mode 100644 index 00000000..3cefa033 --- /dev/null +++ b/wqflask/scripts/run_external.py @@ -0,0 +1,159 @@ +""" +Run jobs in external processes. +""" + +import os +import sys +import shlex +import argparse +import traceback +import subprocess +from uuid import UUID +from time import sleep +from datetime import datetime +from urllib.parse import urlparse +from tempfile import TemporaryDirectory + +# import psutil +from redis import Redis + +import jobs.jobs as jobs + +def print_help(args, parser): + print(parser.format_help()) + +def UUID4(val): + return UUID(val) + +def redis_connection(parsed_url): + return Redis.from_url( + f"redis://{parsed_url.netloc}{parsed_url.path}", decode_responses=True) + +def update_status(redis_conn: Redis, job_id: UUID, value: str): + "Update the job's status." + redis_conn.hset(jobs.job_namespace(job_id), key="status", value=value) + +def __update_stdout_stderr__( + redis_conn: Redis, job_id: UUID, bytes_read: bytes, stream: str): + job = jobs.job(redis_conn, job_id) + if job.is_nothing(): + raise jobs.NoSuchJob(job_id) + + job = job.maybe({}, lambda x: x) + redis_conn.hset( + jobs.job_namespace(job_id), key=stream, + value=(job.get(stream, "") + bytes_read.decode("utf-8"))) + +def set_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes): + """Set the stdout value for the given job.""" + job = jobs.job(redis_conn, job_id) + if job.is_nothing(): + raise jobs.NoSuchJob(job_id) + + job = job.maybe({}, lambda x: x) + redis_conn.hset( + jobs.job_namespace(job_id), key="stdout", + value=bytes_read.decode("utf-8")) + +def update_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes): + """Update the stdout value for the given job.""" + __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stdout") + +def update_stderr(redis_conn: Redis, job_id:UUID, bytes_read: bytes): + """Update the stderr value for the given job.""" + __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stderr") + +def set_meta(redis_conn: Redis, job_id: UUID, meta_key: str, meta_val: str): + job = jobs.job(redis_conn, job_id) + if job.is_nothing(): + raise jobs.NoSuchJob(job_id) + + redis_conn.hset(jobs.job_namespace(job_id), key=meta_key, value=meta_val) + +def run_job(redis_conn: Redis, job_id: UUID): + """Run the job in an external process.""" + print(f"THE ARGUMENTS TO RUN_JOB:\n\tConnection: {redis_conn}\n\tJob ID: {job_id}\n") + + the_job = jobs.job(redis_conn, job_id) + if the_job.is_nothing(): + raise jobs.NoSuchJob(job_id) + + with TemporaryDirectory() as tmpdir: + stdout_file = f"{tmpdir}/{job_id}.stdout" + stderr_file = f"{tmpdir}/{job_id}.stderr" + with open(stdout_file, "w+b") as outfl, open(stderr_file, "w+b") as errfl: + with subprocess.Popen( + jobs.command(the_job), stdout=outfl, + stderr=errfl) as process: + while process.poll() is None: + update_status(redis_conn, job_id, "running") + update_stdout(redis_conn, job_id, outfl.read1()) + sleep(1) + + update_status(redis_conn, job_id, "completed") + with open(stdout_file, "rb") as outfl, open(stderr_file, "rb") as errfl: + set_stdout(redis_conn, job_id, outfl.read()) + update_stderr(redis_conn, job_id, errfl.read()) + + os.remove(stdout_file) + os.remove(stderr_file) + + returncode = process.returncode + set_meta(redis_conn, job_id, "completion-status", + ("success" if returncode == 0 else "error")) + set_meta(redis_conn, job_id, "return-code", "error") + return process.returncode + +def run_job_parser(parent_parser): + parser = parent_parser.add_parser( + "run-job", + help="run job with given id") + parser.add_argument( + "job_id", type=UUID4, help="A string representing a UUID4 value.") + parser.set_defaults( + run=lambda conn, args, parser: run_job(conn, args.job_id)) + +def add_subparsers(parent_parser, *subparser_fns): + sub_parsers = parent_parser.add_subparsers( + title="subcommands", description="valid subcommands", required=True) + for parser_fn in subparser_fns: + parser_fn(sub_parsers) + pass + + return parent_parser + +def parse_cli_args(): + parser = add_subparsers(argparse.ArgumentParser( + description=sys.modules[__name__].__doc__.strip()), run_job_parser) + parser.add_argument( + "--redis-uri", required=True, + help=( + "URI to use to connect to job management db." + "The URI should be of the form " + "'<scheme>://<user>:<passwd>@<host>:<port>/<path>'"), + type=urlparse) + return parser, parser.parse_args() + +def launch_manager(): + parser, args = parse_cli_args() + with redis_connection(args.redis_uri) as conn: + try: + return args.run(conn, args, parser) + except Exception as nsj: + prev_msg = ( + conn.hget(f"{jobs.JOBS_NAMESPACE}:manager", key="stderr") or "") + if bool(prev_msg): + prev_msg = f"{prev_msg}\n" + + notfoundmsg = ( + f"{prev_msg}" + f"{datetime.now().isoformat()}: {type(nsj).__name__}: {traceback.format_exc()}") + conn.hset( + f"{jobs.JOBS_NAMESPACE}:manager", + key="stderr", + value=notfoundmsg) + +if __name__ == "__main__": + def run(): + sys.exit(launch_manager()) + run() |