aboutsummaryrefslogtreecommitdiff
path: root/gn_libs/jobs/launcher.py
blob: d565f9e76f3700749522b191e22a1127ce5b23a4 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""Default launcher/manager script for background jobs."""
import os
import sys
import time
import shlex
import logging
import argparse
import traceback
import subprocess
from uuid import UUID
from pathlib import Path

from gn_libs import jobs, sqlite3

logger = logging.getLogger(__name__)


def run_job(conn, job, outputs_directory: Path):
    """Run the job."""
    logger.info("Setting up the job.")
    job_id = job["job_id"]
    stdout_file = outputs_directory.joinpath(f"{job_id}.stdout")
    stderr_file = outputs_directory.joinpath(f"{job_id}.stderr")
    jobs.update_metadata(conn, job_id, "stdout-file", str(stdout_file))
    jobs.update_metadata(conn, job_id, "stderr-file", str(stderr_file))
    try:
        logger.info("Launching the job in a separate process.")
        with (stdout_file.open(mode="w") as outfile,
              stderr_file.open(mode="w") as errfile,
              stdout_file.open(mode="r") as stdout_in,
              stderr_file.open(mode="r") as stderr_in,
              subprocess.Popen(
                  shlex.split(job["command"]),
                  encoding="utf-8",
                  stdout=outfile,
                  stderr=errfile) as process):
            while process.poll() is None:
                jobs.update_metadata(conn, job_id, "status", "running")
                jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read())
                jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read())
                time.sleep(1)

            # Fetch any remaining content.
            jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read())
            jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read())
            logger.info("Job completed. Cleaning up.")

        os.remove(stdout_file)
        os.remove(stderr_file)
        exit_status = process.poll()
        if exit_status == 0:
            jobs.update_metadata(conn, job_id, "status", "completed")
        else:
            jobs.update_metadata(conn, job_id, "status", "error")

        logger.info("exiting job manager/launcher")
        return exit_status
    except Exception as _exc:# pylint: disable=[broad-exception-caught]
        logger.error("An exception was raised when attempting to run the job",
                     exc_info=True)
        jobs.update_metadata(conn, job_id, "status", "error")
        jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc())
        return 4


def parse_args():
    """Define and parse CLI args."""
    parser = argparse.ArgumentParser(
        prog="GN Jobs Launcher",
        description = (
            "Generic launcher and manager of jobs defined with gn-libs"))
    parser.add_argument(
        "jobs_db_uri",
        help="The URI to the SQLite3 database holding the jobs' details")
    parser.add_argument(
        "job_id", help="The id of the job being processed", type=UUID)
    parser.add_argument("outputs_directory",
                        help="Directory where output files will be created",
                        type=Path)
    parser.add_argument(
        "--log-level",
        type=str,
        help="Determines what is logged out.",
        choices=("debug", "info", "warning", "error", "critical"),
        default="info")
    return parser.parse_args()


def main():
    """Entry-point to this program."""
    args = parse_args()
    logger.setLevel(args.log_level.upper())
    args.outputs_directory.mkdir(parents=True, exist_ok=True)
    with sqlite3.connection(args.jobs_db_uri) as conn:
        job = jobs.job(conn, args.job_id)
        if job:
            return run_job(conn, job, args.outputs_directory)

        jobs.update_metadata(conn, args.job_id, "status", "error")
        jobs.push_to_stream(conn, args.job_id, "stderr", "Job not found!")
        return 2

    return 3



if __name__ == "__main__":
    sys.exit(main())