about summary refs log tree commit diff
path: root/gn_libs/jobs/launcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'gn_libs/jobs/launcher.py')
-rw-r--r--gn_libs/jobs/launcher.py26
1 files changed, 24 insertions, 2 deletions
diff --git a/gn_libs/jobs/launcher.py b/gn_libs/jobs/launcher.py
index 0b14f98..89884b6 100644
--- a/gn_libs/jobs/launcher.py
+++ b/gn_libs/jobs/launcher.py
@@ -2,6 +2,7 @@ import os
 import sys
 import time
 import shlex
+import logging
 import argparse
 import traceback
 import subprocess
@@ -10,15 +11,19 @@ from pathlib import Path
 
 from gn_libs import jobs, sqlite3
 
+logger = logging.getLogger(__name__)
+
 
 def run_job(conn, job, outputs_directory: Path):
     """Run the job."""
+    logger.info("Setting up the job.")
     job_id = job["job_id"]
     stdout_file = outputs_directory.joinpath(f"{job_id}.stdout")
     stderr_file = outputs_directory.joinpath(f"{job_id}.stderr")
     jobs.update_metadata(conn, job_id, "stdout-file", str(stdout_file))
     jobs.update_metadata(conn, job_id, "stderr-file", str(stderr_file))
     try:
+        logger.info("Launching the job in a separate process.")
         with (stdout_file.open(mode="w") as outfile,
               stderr_file.open(mode="w") as errfile,
               stdout_file.open(mode="r") as stdout_in,
@@ -37,12 +42,21 @@ def run_job(conn, job, outputs_directory: Path):
             # Fetch any remaining content.
             jobs.push_to_stream(conn, job_id, "stdout", stdout_in.read())
             jobs.push_to_stream(conn, job_id, "stderr", stderr_in.read())
+            logger.info("Job completed. Cleaning up.")
 
         os.remove(stdout_file)
         os.remove(stderr_file)
-        jobs.update_metadata(conn, job_id, "status", "completed")
-        return process.poll()
+        exit_status = process.poll()
+        if exit_status == 0:
+            jobs.update_metadata(conn, job_id, "status", "completed")
+        else:
+            jobs.update_metadata(conn, job_id, "status", "error")
+
+        logger.info("exiting job manager/launcher")
+        return exit_status
     except:
+        logger.error("An exception was raised when attempting to run the job",
+                     exc_info=True)
         jobs.update_metadata(conn, job_id, "status", "error")
         jobs.push_to_stream(conn, job_id, "stderr", traceback.format_exc())
         return 4
@@ -62,11 +76,19 @@ def parse_args():
     parser.add_argument("outputs_directory",
                         help="Directory where output files will be created",
                         type=Path)
+    parser.add_argument(
+        "--log-level",
+        type=str,
+        help="Determines what is logged out.",
+        choices=("debug", "info", "warning", "error", "critical"),
+        default="info")
     return parser.parse_args()
 
+
 def main():
     """Entry-point to this program."""
     args = parse_args()
+    logger.setLevel(args.log_level.upper())
     args.outputs_directory.mkdir(parents=True, exist_ok=True)
     with (sqlite3.connection(args.jobs_db_uri) as conn,
           sqlite3.cursor(conn) as cursor):