"""Generic views and utilities to handle background jobs."""
import uuid
import datetime
import importlib
from typing import Callable
from functools import partial
from werkzeug.wrappers.response import Response
from flask import (
flash,
request,
redirect,
Blueprint,
current_app as app)
from gn_libs import jobs
from gn_libs import sqlite3
from gn_libs.jobs.jobs import JobNotFound
from uploader import session
from uploader.authorisation import require_login
from uploader.flask_extensions import url_for, render_template
background_jobs_bp = Blueprint("background-jobs", __name__)
HandlerType = Callable[[dict], Response]
def make_datetime_formatter(dtformat: str = "%A, %d %B %Y at %H:%M %Z") -> Callable[[str], str]:
"""Make a datetime formatter with the provided `dtformat`"""
def __formatter__(val: str) -> str:
dt = datetime.datetime.fromisoformat(val)
return dt.strftime(dtformat.strip())
return __formatter__
__default_datetime_formatter__ = make_datetime_formatter()
def __default_handler__(_job):
return render_template("background-jobs/job-summary.html",
job=_job,
display_datetime=__default_datetime_formatter__)
def register_handlers(
job_type: str,
success_handler: HandlerType,
# pylint: disable=[redefined-outer-name]
error_handler: HandlerType = __default_handler__
# pylint: disable=[redefined-outer-name]
) -> str:
"""Register success and error handlers for each job type."""
if not bool(app.config.get("background-jobs")):
app.config["background-jobs"] = {}
if not bool(app.config["background-jobs"].get(job_type)):
app.config["background-jobs"][job_type] = {
"success": success_handler,
"error": error_handler
}
return job_type
def register_job_handlers(job: dict):
"""Related to register handlers above."""
def __load_handler__(absolute_function_path):
_parts = absolute_function_path.split(".")
app.logger.debug("THE PARTS ARE: %s", _parts)
assert len(_parts) > 1, f"Invalid path: {absolute_function_path}"
module = importlib.import_module(f".{_parts[-2]}",
package=".".join(_parts[0:-2]))
return getattr(module, _parts[-1])
metadata = job["metadata"]
if metadata.get("success_handler"):
_success_handler = __load_handler__(metadata["success_handler"])
try:
_error_handler = __load_handler__(metadata["error_handler"])
except Exception as _exc:# pylint: disable=[broad-exception-caught]
_error_handler = __default_handler__
register_handlers(
metadata["job-type"], _success_handler, _error_handler)
def handler(job: dict, handler_type: str) -> HandlerType:
"""Fetch a handler for the job."""
_job_type = job["metadata"]["job-type"]
_handler = app.config.get(
"background-jobs", {}
).get(
_job_type, {}
).get(handler_type)
if bool(_handler):
return _handler(job)
return __default_handler__(job)
error_handler = partial(handler, handler_type="error")
success_handler = partial(handler, handler_type="success")
@background_jobs_bp.route("/status/<uuid:job_id>")
@require_login
def job_status(job_id: uuid.UUID):
"""View the job status."""
with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn:
try:
job = jobs.job(conn, job_id, fulldetails=True)
status = job["metadata"]["status"]
register_job_handlers(job)
if status in ("error", "stopped"):
return error_handler(job)
if status == "completed":
return success_handler(job)
return render_template("background-jobs/job-status.html",
job=job,
display_datetime=__default_datetime_formatter__)
except JobNotFound as _jnf:
return render_template("jobs/job-not-found.html", job_id=job_id)
@background_jobs_bp.route("/error/<uuid:job_id>")
@require_login
def job_error(job_id: uuid.UUID):
"""Handle job errors in a generic manner."""
with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn:
try:
job = jobs.job(conn, job_id, fulldetails=True)
return render_template("jobs/job-error.html", job=job)
except JobNotFound as _jnf:
return render_template("jobs/job-not-found.html", job_id=job_id)
@background_jobs_bp.route("/list")
@require_login
def list_jobs():
"""List background jobs."""
with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn:
return render_template(
"background-jobs/list-jobs.html",
jobs=jobs.jobs_by_external_id(
conn, session.user_details()["user_id"]),
display_datetime=__default_datetime_formatter__)
@background_jobs_bp.route("/summary/<uuid:job_id>")
@require_login
def job_summary(job_id: uuid.UUID):
"""Provide a summary for completed jobs."""
with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn:
try:
job = jobs.job(conn, job_id, fulldetails=True)
status = job["metadata"]["status"]
if status in ("completed", "error", "stopped"):
return render_template("background-jobs/job-summary.html",
job=job,
display_datetime=__default_datetime_formatter__)
return redirect(url_for(
"background-jobs.job_status", job_id=job["job_id"]))
except JobNotFound as _jnf:
return render_template("jobs/job-not-found.html", job_id=job_id)
@background_jobs_bp.route("/delete/<uuid:job_id>", methods=["GET", "POST"])
@require_login
def delete_single(job_id: uuid.UUID):
"""Delete a single job."""
with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn:
try:
job = jobs.job(conn, job_id, fulldetails=True)
status = job["metadata"]["status"]
if status not in ("completed", "error", "stopped"):
flash("We cannot delete a running job.", "alert alert-danger")
return redirect(url_for(
"background-jobs.job_summary", job_id=job_id))
if request.method == "GET":
return render_template("background-jobs/delete-job.html",
job=job,
display_datetime=__default_datetime_formatter__)
if request.form["btn-confirm-delete"] == "delete":
jobs.delete_job(conn, job_id)
flash("Job was deleted successfully.", "alert alert-success")
return redirect(url_for("background-jobs.list_jobs"))
flash("Delete cancelled.", "alert alert-info")
return redirect(url_for(
"background-jobs.job_summary", job_id=job_id))
except JobNotFound as _jnf:
return render_template("jobs/job-not-found.html", job_id=job_id)
@background_jobs_bp.route("/stop/<uuid:job_id>", methods=["GET", "POST"])
@require_login
def stop_job(job_id: uuid.UUID):
"""Stop a running job."""
with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn:
try:
job = jobs.job(conn, job_id, fulldetails=True)
status = job["metadata"]["status"]
if status != "running":
flash("Cannot stop a job that is not running.", "alert alert-danger")
return redirect(url_for(
"background-jobs.job_summary", job_id=job_id))
if request.method == "GET":
return render_template("background-jobs/stop-job.html",
job=job,
display_datetime=__default_datetime_formatter__)
if request.form["btn-confirm-stop"] == "stop":
jobs.kill_job(conn, job_id)
flash("Job was stopped successfully.", "alert alert-success")
return redirect(url_for(
"background-jobs.job_summary", job_id=job_id))
flash("Stop cancelled.", "alert alert-info")
return redirect(url_for(
"background-jobs.job_summary", job_id=job_id))
except JobNotFound as _jnf:
return render_template("jobs/job-not-found.html", job_id=job_id)