about summary refs log tree commit diff
path: root/wqflask/scripts/run_external.py
diff options
context:
space:
mode:
Diffstat (limited to 'wqflask/scripts/run_external.py')
-rw-r--r--wqflask/scripts/run_external.py159
1 files changed, 0 insertions, 159 deletions
diff --git a/wqflask/scripts/run_external.py b/wqflask/scripts/run_external.py
deleted file mode 100644
index b44a6db7..00000000
--- a/wqflask/scripts/run_external.py
+++ /dev/null
@@ -1,159 +0,0 @@
-"""
-Run jobs in external processes.
-"""
-
-import os
-import sys
-import shlex
-import argparse
-import traceback
-import subprocess
-from uuid import UUID
-from time import sleep
-from datetime import datetime
-from urllib.parse import urlparse
-from tempfile import TemporaryDirectory
-
-# import psutil
-from redis import Redis
-
-import jobs.jobs as jobs
-
-def print_help(args, parser):
-    print(parser.format_help())
-
-def UUID4(val):
-    return UUID(val)
-
-def redis_connection(parsed_url):
-    return Redis.from_url(
-        f"redis://{parsed_url.netloc}{parsed_url.path}", decode_responses=True)
-
-def update_status(redis_conn: Redis, job_id: UUID, value: str):
-    "Update the job's status."
-    redis_conn.hset(jobs.job_namespace(job_id), key="status", value=value)
-
-def __update_stdout_stderr__(
-        redis_conn: Redis, job_id: UUID, bytes_read: bytes, stream: str):
-    job = jobs.job(redis_conn, job_id)
-    if job.is_nothing():
-        raise jobs.NoSuchJob(job_id)
-
-    job = job.maybe({}, lambda x: x)
-    redis_conn.hset(
-        jobs.job_namespace(job_id), key=stream,
-        value=(job.get(stream, "") + bytes_read.decode("utf-8")))
-
-def set_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
-    """Set the stdout value for the given job."""
-    job = jobs.job(redis_conn, job_id)
-    if job.is_nothing():
-        raise jobs.NoSuchJob(job_id)
-
-    job = job.maybe({}, lambda x: x)
-    redis_conn.hset(
-        jobs.job_namespace(job_id), key="stdout",
-        value=bytes_read.decode("utf-8"))
-
-def update_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
-    """Update the stdout value for the given job."""
-    __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stdout")
-
-def update_stderr(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
-    """Update the stderr value for the given job."""
-    __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stderr")
-
-def set_meta(redis_conn: Redis, job_id: UUID, meta_key: str, meta_val: str):
-    job = jobs.job(redis_conn, job_id)
-    if job.is_nothing():
-        raise jobs.NoSuchJob(job_id)
-
-    redis_conn.hset(jobs.job_namespace(job_id), key=meta_key, value=meta_val)
-
-def run_job(redis_conn: Redis, job_id: UUID):
-    """Run the job in an external process."""
-    print(f"THE ARGUMENTS TO RUN_JOB:\n\tConnection: {redis_conn}\n\tJob ID: {job_id}\n")
-
-    the_job = jobs.job(redis_conn, job_id)
-    if the_job.is_nothing():
-        raise jobs.NoSuchJob(job_id)
-
-    with TemporaryDirectory() as tmpdir:
-        stdout_file = f"{tmpdir}/{job_id}.stdout"
-        stderr_file = f"{tmpdir}/{job_id}.stderr"
-        with open(stdout_file, "w+b") as outfl, open(stderr_file, "w+b") as errfl:
-            with subprocess.Popen(
-                    jobs.command(the_job), stdout=outfl,
-                    stderr=errfl) as process:
-                while process.poll() is None:
-                    update_status(redis_conn, job_id, "running")
-                    update_stdout(redis_conn, job_id, outfl.read1())
-                    sleep(1)
-
-            update_status(redis_conn, job_id, "completed")
-            with open(stdout_file, "rb") as outfl, open(stderr_file, "rb") as errfl:
-                set_stdout(redis_conn, job_id, outfl.read())
-                update_stderr(redis_conn, job_id, errfl.read())
-
-            os.remove(stdout_file)
-            os.remove(stderr_file)
-
-    returncode = process.returncode
-    set_meta(redis_conn, job_id, "completion-status",
-             ("success" if returncode == 0 else "error"))
-    set_meta(redis_conn, job_id, "return-code", returncode)
-    return process.returncode
-
-def run_job_parser(parent_parser):
-    parser = parent_parser.add_parser(
-        "run-job",
-        help="run job with given id")
-    parser.add_argument(
-        "job_id", type=UUID4, help="A string representing a UUID4 value.")
-    parser.set_defaults(
-        run=lambda conn, args, parser: run_job(conn, args.job_id))
-
-def add_subparsers(parent_parser, *subparser_fns):
-    sub_parsers = parent_parser.add_subparsers(
-        title="subcommands", description="valid subcommands", required=True)
-    for parser_fn in subparser_fns:
-        parser_fn(sub_parsers)
-        pass
-
-    return parent_parser
-
-def parse_cli_args():
-    parser = add_subparsers(argparse.ArgumentParser(
-        description=sys.modules[__name__].__doc__.strip()), run_job_parser)
-    parser.add_argument(
-        "--redis-uri", required=True,
-        help=(
-            "URI to use to connect to job management db."
-            "The URI should be of the form "
-            "'<scheme>://<user>:<passwd>@<host>:<port>/<path>'"),
-        type=urlparse)
-    return parser, parser.parse_args()
-
-def launch_manager():
-    parser, args = parse_cli_args()
-    with redis_connection(args.redis_uri) as conn:
-        try:
-            return args.run(conn, args, parser)
-        except Exception as nsj:
-            prev_msg = (
-                conn.hget(f"{jobs.JOBS_NAMESPACE}:manager", key="stderr") or "")
-            if bool(prev_msg):
-                prev_msg = f"{prev_msg}\n"
-
-            notfoundmsg = (
-                f"{prev_msg}"
-                f"{datetime.now().isoformat()}: {type(nsj).__name__}: {traceback.format_exc()}")
-            conn.hset(
-                f"{jobs.JOBS_NAMESPACE}:manager",
-                key="stderr",
-                value=notfoundmsg)
-
-if __name__ == "__main__":
-    def run():
-        sys.exit(launch_manager())
-    run()