aboutsummaryrefslogtreecommitdiff
path: root/scripts/worker.py
blob: 4077ad1d4e5e96b59c54d9f56af20789d6fb54fb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"Generic worker script that runs actual worker script"
import sys
import shlex
import argparse
import traceback
import subprocess
from time import sleep
from tempfile import TemporaryFile

from redis import Redis

from qc_app import jobs

def parse_args():
    "Parse the command-line arguments"
    parser = argparse.ArgumentParser(
        prog="worker", description = (
            "Generic worker to launch and manage specific worker scripts"))
    parser.add_argument(
        "redisurl", default="redis:///", help="URL to the redis server")
    parser.add_argument("job_id", help="The id of the job being processed")

    args = parser.parse_args()
    try:
        conn = Redis.from_url(args.redisurl) # pylint: disable=[unused-variable]
    except ConnectionError as conn_err: # pylint: disable=[unused-variable]
        print(traceback.format_exc(), file=sys.stderr)
        sys.exit(1)

    return args

def update_stdout_stderr(bytes_read, stream: str, rconn, job_id):
    "Update the stdout/stderr keys according to the value of `stream`."
    job = jobs.job(rconn, job_id)
    contents = job.get(stream, b'')
    rconn.hset(
        name=job_id,
        key=stream,
        value=contents + bytes_read)

def run_job(job, rconn):
    "Run the actual job."
    job_id = job["job_id"]
    try:
        with TemporaryFile() as tmpfl:
            with subprocess.Popen(
                    shlex.split(job["command"]), stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE) as process:
                while process.poll() is None:
                    update_stdout_stderr(
                        process.stdout.read1(), "stdout", rconn, job_id)
                    sleep(1)

            update_stdout_stderr(tmpfl.read(), "stderr", rconn, job_id)
        return process.poll()
    except subprocess.CalledProcessError as cpe: # pylint: disable=[unused-variable]
        print(traceback.format_exc(), file=sys.stderr)
        sys.exit(4)

def main():
    "Entry point function"
    args = parse_args()
    with Redis.from_url(args.redisurl, decode_responses=True) as rconn:
        job = jobs.job(rconn, args.job_id)
        if job:
            return run_job(job, rconn)
        print(f"No such job. '{args.job_id}'.", file=sys.stderr)
        return 2
    return 3

if __name__ == "__main__":
    sys.exit(main())