aboutsummaryrefslogtreecommitdiff
path: root/gn2/scripts/run_external.py
blob: 297d17a1378b0bd54ceca7f623398a00c3e4ffd8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
Run jobs in external processes.
"""

import os
import sys
import shlex
import argparse
import traceback
import subprocess
from uuid import UUID
from time import sleep
from datetime import datetime
from urllib.parse import urlparse
from tempfile import TemporaryDirectory

# import psutil
from redis import Redis

import gn2.jobs.jobs as jobs

def print_help(args, parser):
    print(parser.format_help())

def UUID4(val):
    return UUID(val)

def redis_connection(parsed_url):
    return Redis.from_url(
        f"redis://{parsed_url.netloc}{parsed_url.path}", decode_responses=True)

def update_status(redis_conn: Redis, job_id: UUID, value: str):
    "Update the job's status."
    redis_conn.hset(jobs.job_namespace(job_id), key="status", value=value)

def __update_stdout_stderr__(
        redis_conn: Redis, job_id: UUID, bytes_read: bytes, stream: str):
    job = jobs.job(redis_conn, job_id)
    if job.is_nothing():
        raise jobs.NoSuchJob(job_id)

    job = job.maybe({}, lambda x: x)
    redis_conn.hset(
        jobs.job_namespace(job_id), key=stream,
        value=(job.get(stream, "") + bytes_read.decode("utf-8")))

def set_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
    """Set the stdout value for the given job."""
    job = jobs.job(redis_conn, job_id)
    if job.is_nothing():
        raise jobs.NoSuchJob(job_id)

    job = job.maybe({}, lambda x: x)
    redis_conn.hset(
        jobs.job_namespace(job_id), key="stdout",
        value=bytes_read.decode("utf-8"))

def update_stdout(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
    """Update the stdout value for the given job."""
    __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stdout")

def update_stderr(redis_conn: Redis, job_id:UUID, bytes_read: bytes):
    """Update the stderr value for the given job."""
    __update_stdout_stderr__(redis_conn, job_id, bytes_read, "stderr")

def set_meta(redis_conn: Redis, job_id: UUID, meta_key: str, meta_val: str):
    job = jobs.job(redis_conn, job_id)
    if job.is_nothing():
        raise jobs.NoSuchJob(job_id)

    redis_conn.hset(jobs.job_namespace(job_id), key=meta_key, value=meta_val)

def run_job(redis_conn: Redis, job_id: UUID):
    """Run the job in an external process."""
    print(f"THE ARGUMENTS TO RUN_JOB:\n\tConnection: {redis_conn}\n\tJob ID: {job_id}\n")

    the_job = jobs.job(redis_conn, job_id)
    if the_job.is_nothing():
        raise jobs.NoSuchJob(job_id)

    with TemporaryDirectory() as tmpdir:
        stdout_file = f"{tmpdir}/{job_id}.stdout"
        stderr_file = f"{tmpdir}/{job_id}.stderr"
        with open(stdout_file, "w+b") as outfl, open(stderr_file, "w+b") as errfl:
            with subprocess.Popen(
                    jobs.command(the_job), stdout=outfl,
                    stderr=errfl) as process:
                while process.poll() is None:
                    update_status(redis_conn, job_id, "running")
                    update_stdout(redis_conn, job_id, outfl.read1())
                    sleep(1)

            update_status(redis_conn, job_id, "completed")
            with open(stdout_file, "rb") as outfl, open(stderr_file, "rb") as errfl:
                set_stdout(redis_conn, job_id, outfl.read())
                update_stderr(redis_conn, job_id, errfl.read())

            os.remove(stdout_file)
            os.remove(stderr_file)

    returncode = process.returncode
    set_meta(redis_conn, job_id, "completion-status",
             ("success" if returncode == 0 else "error"))
    set_meta(redis_conn, job_id, "return-code", returncode)
    return process.returncode

def run_job_parser(parent_parser):
    parser = parent_parser.add_parser(
        "run-job",
        help="run job with given id")
    parser.add_argument(
        "job_id", type=UUID4, help="A string representing a UUID4 value.")
    parser.set_defaults(
        run=lambda conn, args, parser: run_job(conn, args.job_id))

def add_subparsers(parent_parser, *subparser_fns):
    sub_parsers = parent_parser.add_subparsers(
        title="subcommands", description="valid subcommands", required=True)
    for parser_fn in subparser_fns:
        parser_fn(sub_parsers)
        pass

    return parent_parser

def parse_cli_args():
    parser = add_subparsers(argparse.ArgumentParser(
        description=sys.modules[__name__].__doc__.strip()), run_job_parser)
    parser.add_argument(
        "--redis-uri", required=True,
        help=(
            "URI to use to connect to job management db."
            "The URI should be of the form "
            "'<scheme>://<user>:<passwd>@<host>:<port>/<path>'"),
        type=urlparse)
    return parser, parser.parse_args()

def launch_manager():
    parser, args = parse_cli_args()
    with redis_connection(args.redis_uri) as conn:
        try:
            return args.run(conn, args, parser)
        except Exception as nsj:
            prev_msg = (
                conn.hget(f"{jobs.JOBS_NAMESPACE}:manager", key="stderr") or "")
            if bool(prev_msg):
                prev_msg = f"{prev_msg}\n"

            notfoundmsg = (
                f"{prev_msg}"
                f"{datetime.now().isoformat()}: {type(nsj).__name__}: {traceback.format_exc()}")
            conn.hset(
                f"{jobs.JOBS_NAMESPACE}:manager",
                key="stderr",
                value=notfoundmsg)

if __name__ == "__main__":
    def run():
        sys.exit(launch_manager())
    run()