"""Generic views and utilities to handle background jobs.""" import uuid import datetime import importlib from typing import Callable from functools import partial from werkzeug.wrappers.response import Response from flask import ( flash, request, redirect, Blueprint, current_app as app) from gn_libs import jobs from gn_libs import sqlite3 from gn_libs.jobs.jobs import JobNotFound from uploader import session from uploader.authorisation import require_login from uploader.flask_extensions import url_for, render_template background_jobs_bp = Blueprint("background-jobs", __name__) HandlerType = Callable[[dict], Response] def make_datetime_formatter(dtformat: str = "%A, %d %B %Y at %H:%M %Z") -> Callable[[str], str]: """Make a datetime formatter with the provided `dtformat`""" def __formatter__(val: str) -> str: dt = datetime.datetime.fromisoformat(val) return dt.strftime(dtformat.strip()) return __formatter__ __default_datetime_formatter__ = make_datetime_formatter() def __default_handler__(_job): return render_template("background-jobs/job-summary.html", job=_job, display_datetime=__default_datetime_formatter__) def register_handlers( job_type: str, success_handler: HandlerType, # pylint: disable=[redefined-outer-name] error_handler: HandlerType = __default_handler__ # pylint: disable=[redefined-outer-name] ) -> str: """Register success and error handlers for each job type.""" if not bool(app.config.get("background-jobs")): app.config["background-jobs"] = {} if not bool(app.config["background-jobs"].get(job_type)): app.config["background-jobs"][job_type] = { "success": success_handler, "error": error_handler } return job_type def register_job_handlers(job: dict): """Related to register handlers above.""" def __load_handler__(absolute_function_path): _parts = absolute_function_path.split(".") app.logger.debug("THE PARTS ARE: %s", _parts) assert len(_parts) > 1, f"Invalid path: {absolute_function_path}" module = importlib.import_module(f".{_parts[-2]}", package=".".join(_parts[0:-2])) return getattr(module, _parts[-1]) metadata = job["metadata"] if metadata.get("success_handler"): _success_handler = __load_handler__(metadata["success_handler"]) try: _error_handler = __load_handler__(metadata["error_handler"]) except Exception as _exc:# pylint: disable=[broad-exception-caught] _error_handler = __default_handler__ register_handlers( metadata["job-type"], _success_handler, _error_handler) def handler(job: dict, handler_type: str) -> HandlerType: """Fetch a handler for the job.""" _job_type = job["metadata"]["job-type"] _handler = app.config.get( "background-jobs", {} ).get( _job_type, {} ).get(handler_type) if bool(_handler): return _handler(job) return __default_handler__(job) error_handler = partial(handler, handler_type="error") success_handler = partial(handler, handler_type="success") @background_jobs_bp.route("/status/") @require_login def job_status(job_id: uuid.UUID): """View the job status.""" with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn: try: job = jobs.job(conn, job_id, fulldetails=True) status = job["metadata"]["status"] register_job_handlers(job) if status in ("error", "stopped"): return error_handler(job) if status == "completed": return success_handler(job) return render_template("background-jobs/job-status.html", job=job, display_datetime=__default_datetime_formatter__) except JobNotFound as _jnf: return render_template("jobs/job-not-found.html", job_id=job_id) @background_jobs_bp.route("/error/") @require_login def job_error(job_id: uuid.UUID): """Handle job errors in a generic manner.""" with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn: try: job = jobs.job(conn, job_id, fulldetails=True) return render_template("jobs/job-error.html", job=job) except JobNotFound as _jnf: return render_template("jobs/job-not-found.html", job_id=job_id) @background_jobs_bp.route("/list") @require_login def list_jobs(): """List background jobs.""" with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn: return render_template( "background-jobs/list-jobs.html", jobs=jobs.jobs_by_external_id( conn, session.user_details()["user_id"]), display_datetime=__default_datetime_formatter__) @background_jobs_bp.route("/summary/") @require_login def job_summary(job_id: uuid.UUID): """Provide a summary for completed jobs.""" with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn: try: job = jobs.job(conn, job_id, fulldetails=True) status = job["metadata"]["status"] if status in ("completed", "error", "stopped"): return render_template("background-jobs/job-summary.html", job=job, display_datetime=__default_datetime_formatter__) return redirect(url_for( "background-jobs.job_status", job_id=job["job_id"])) except JobNotFound as _jnf: return render_template("jobs/job-not-found.html", job_id=job_id) @background_jobs_bp.route("/delete/", methods=["GET", "POST"]) @require_login def delete_single(job_id: uuid.UUID): """Delete a single job.""" with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn: try: job = jobs.job(conn, job_id, fulldetails=True) status = job["metadata"]["status"] if status not in ("completed", "error", "stopped"): flash("We cannot delete a running job.", "alert alert-danger") return redirect(url_for( "background-jobs.job_summary", job_id=job_id)) if request.method == "GET": return render_template("background-jobs/delete-job.html", job=job, display_datetime=__default_datetime_formatter__) if request.form["btn-confirm-delete"] == "delete": jobs.delete_job(conn, job_id) flash("Job was deleted successfully.", "alert alert-success") return redirect(url_for("background-jobs.list_jobs")) flash("Delete cancelled.", "alert alert-info") return redirect(url_for( "background-jobs.job_summary", job_id=job_id)) except JobNotFound as _jnf: return render_template("jobs/job-not-found.html", job_id=job_id) @background_jobs_bp.route("/stop/", methods=["GET", "POST"]) @require_login def stop_job(job_id: uuid.UUID): """Stop a running job.""" with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn: try: job = jobs.job(conn, job_id, fulldetails=True) status = job["metadata"]["status"] if status != "running": flash("Cannot stop a job that is not running.", "alert alert-danger") return redirect(url_for( "background-jobs.job_summary", job_id=job_id)) if request.method == "GET": return render_template("background-jobs/stop-job.html", job=job, display_datetime=__default_datetime_formatter__) if request.form["btn-confirm-stop"] == "stop": jobs.kill_job(conn, job_id) flash("Job was stopped successfully.", "alert alert-success") return redirect(url_for( "background-jobs.job_summary", job_id=job_id)) flash("Stop cancelled.", "alert alert-info") return redirect(url_for( "background-jobs.job_summary", job_id=job_id)) except JobNotFound as _jnf: return render_template("jobs/job-not-found.html", job_id=job_id)