aboutsummaryrefslogtreecommitdiff
path: root/scripts/worker.py
blob: 0eb9ea5ddb1d519e42ddfc5bc0884748ff08d704 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"Generic worker script that runs actual worker script"
import os
import sys
import shlex
import argparse
import traceback
import subprocess
from time import sleep
from datetime import timedelta
from tempfile import TemporaryDirectory

from redis import Redis

from qc_app import jobs
from qc_app.check_connections import check_redis

def parse_args():
    "Parse the command-line arguments"
    parser = argparse.ArgumentParser(
        prog="worker", description = (
            "Generic worker to launch and manage specific worker scripts"))
    parser.add_argument(
        "redisurl", default="redis:///", help="URL to the redis server")
    parser.add_argument(
        "redisprefix", type=str, help="The prefix before the job ID.")
    parser.add_argument("job_id", help="The id of the job being processed")

    args = parser.parse_args()
    check_redis(args.redisurl)

    return args

def run_job(rconn: Redis, job: dict, redisprefix: str):
    "Run the actual job."
    try:
        jobid = job["jobid"]
        with TemporaryDirectory() as tmpdir:
            stderrpath = f"{tmpdir}/{jobid}.stderr"
            with open(stderrpath, "w+b") as tmpfl:
                with subprocess.Popen(
                        shlex.split(job["command"]), stdout=subprocess.PIPE,
                        stderr=tmpfl) as process:
                    while process.poll() is None:
                        jobs.update_status(rconn, redisprefix, jobid, "running")
                        jobs.update_stdout_stderr(rconn,
                                                  redisprefix,
                                                  jobid,
                                                  process.stdout.read1(),#type: ignore[union-attr]
                                                  "stdout")
                        sleep(1)

                    jobs.update_status(
                        rconn,
                        redisprefix,
                        jobid,
                        ("error" if process.returncode != 0 else "success"))

            with open(stderrpath, "rb") as stderr:
                stderr_content = stderr.read()
                jobs.update_stdout_stderr(rconn, redisprefix, jobid, stderr_content, "stderr")

            os.remove(stderrpath)
        return process.poll()
    except Exception as exc:# pylint: disable=[broad-except,unused-variable]
        jobs.update_status(rconn, redisprefix, jobid, "error")
        jobs.update_stdout_stderr(
            rconn, redisprefix, jobid, traceback.format_exc().encode("utf-8"), "stderr")
        print(traceback.format_exc(), file=sys.stderr)
        return 4

def main():
    "Entry point function"
    args = parse_args()
    with Redis.from_url(args.redisurl, decode_responses=True) as rconn:
        job = jobs.job(rconn, args.redisprefix, args.job_id)
        if job:
            return run_job(rconn, job, args.redisprefix)
        jobs.update_status(rconn, args.redisprefix, args.job_id, "error")
        fqjobid = jobs.job_key(args.redisprefix, args.jobid)
        rconn.hset(fqjobid, "stderr", f"No such job. '{args.job_id}'.")
        rconn.expire(name=jobs.job_key(args.redisprefix, args.job_id),
                     time=timedelta(seconds=(2 * 60 * 60)))
        print(f"No such job. '{args.job_id}'.", file=sys.stderr)
        return 2
    return 3

if __name__ == "__main__":
    sys.exit(main())