1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
"""Generic views and utilities to handle background jobs."""
import uuid
import importlib
from typing import Callable
from functools import partial
from flask import (
url_for,
redirect,
Response,
Blueprint,
render_template,
current_app as app)
from gn_libs import jobs
from gn_libs import sqlite3
from gn_libs.jobs.jobs import JobNotFound
from uploader.authorisation import require_login
background_jobs_bp = Blueprint("background-jobs", __name__)
HandlerType = Callable[[dict], Response]
def __default_error_handler__(job: dict) -> Response:
return redirect(url_for("background-jobs.job_error", job_id=job["job_id"]))
def register_handlers(
job_type: str,
success_handler: HandlerType,
# pylint: disable=[redefined-outer-name]
error_handler: HandlerType = __default_error_handler__
# pylint: disable=[redefined-outer-name]
) -> str:
"""Register success and error handlers for each job type."""
if not bool(app.config.get("background-jobs")):
app.config["background-jobs"] = {}
if not bool(app.config["background-jobs"].get(job_type)):
app.config["background-jobs"][job_type] = {
"success": success_handler,
"error": error_handler
}
return job_type
def register_job_handlers(job: str):
"""Related to register handlers above."""
def __load_handler__(absolute_function_path):
_parts = absolute_function_path.split(".")
app.logger.debug("THE PARTS ARE: %s", _parts)
assert len(_parts) > 1, f"Invalid path: {absolute_function_path}"
module = importlib.import_module(f".{_parts[-2]}",
package=".".join(_parts[0:-2]))
return getattr(module, _parts[-1])
metadata = job["metadata"]
if metadata["success_handler"]:
_success_handler = __load_handler__(metadata["success_handler"])
try:
_error_handler = __load_handler__(metadata["error_handler"])
except Exception as _exc:# pylint: disable=[broad-exception-caught]
_error_handler = __default_error_handler__
register_handlers(
metadata["job-type"], _success_handler, _error_handler)
def handler(job: dict, handler_type: str) -> HandlerType:
"""Fetch a handler for the job."""
_job_type = job["metadata"]["job-type"]
_handler = app.config.get(
"background-jobs", {}
).get(
_job_type, {}
).get(handler_type)
if bool(_handler):
return _handler(job)
raise Exception(# pylint: disable=[broad-exception-raised]
f"No '{handler_type}' handler registered for job type: {_job_type}")
error_handler = partial(handler, handler_type="error")
success_handler = partial(handler, handler_type="success")
@background_jobs_bp.route("/status/<uuid:job_id>")
@require_login
def job_status(job_id: uuid.UUID):
"""View the job status."""
with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn:
try:
job = jobs.job(conn, job_id, fulldetails=True)
status = job["metadata"]["status"]
register_job_handlers(job)
if status == "error":
return error_handler(job)
if status == "completed":
return success_handler(job)
return render_template("jobs/job-status.html", job=job)
except JobNotFound as _jnf:
return render_template(
"jobs/job-not-found.html",
job_id=job_id)
@background_jobs_bp.route("/error/<uuid:job_id>")
@require_login
def job_error(job_id: uuid.UUID):
"""Handle job errors in a generic manner."""
with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn:
try:
job = jobs.job(conn, job_id, fulldetails=True)
return render_template("jobs/job-error.html", job=job)
except JobNotFound as _jnf:
return render_template("jobs/job-not-found.html", job_id=job_id)
|