1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
import uuid
import logging
import importlib
from typing import Callable
from functools import partial
from flask import (
request,
url_for,
redirect,
Response,
Blueprint,
render_template,
current_app as app)
from gn_libs import jobs
from gn_libs import sqlite3
from gn_libs.jobs.jobs import JobNotFound
from uploader.authorisation import require_login
background_jobs_bp = Blueprint("background-jobs", __name__)
HandlerType = Callable[[dict], Response]
def __default_error_handler__(job: dict) -> Response:
return redirect(url_for("background-jobs.job_error", job_id=job["job_id"]))
def register_handlers(
job_type: str,
success_handler: HandlerType,
error_handler: HandlerType = __default_error_handler__
) -> str:
if not bool(app.config.get("background-jobs")):
app.config["background-jobs"] = {}
if not bool(app.config["background-jobs"].get(job_type)):
app.config["background-jobs"][job_type] = {
"success": success_handler,
"error": error_handler
}
return job_type
def register_job_handlers(job: str):
"""Related to register handlers above."""
def __load_handler__(absolute_function_path):
_parts = absolute_function_path.split(".")
app.logger.debug("THE PARTS ARE: %s", _parts)
assert len(_parts) > 1, f"Invalid path: {absolute_function_path}"
function = _parts[-1]
module = importlib.import_module(f".{_parts[-2]}",
package=".".join(_parts[0:-2]))
return getattr(module, _parts[-1])
metadata = job["metadata"]
if metadata["success_handler"]:
success_handler = __load_handler__(metadata["success_handler"])
try:
error_handler = __load_handler__(metadata["error_handler"])
except Exception as _exc:
error_handler = __default_error_handler__
register_handlers(metadata["job-type"], success_handler, error_handler)
def handler(job: dict, handler_type: str) -> HandlerType:
"""Fetch a handler for the job."""
_job_type = job["metadata"]["job-type"]
_handler = app.config.get(
"background-jobs", {}
).get(
_job_type, {}
).get(handler_type)
if bool(_handler):
return _handler(job)
raise Exception(
f"No '{handler_type}' handler registered for job type: {_job_type}")
error_handler = partial(handler, handler_type="error")
success_handler = partial(handler, handler_type="success")
@background_jobs_bp.route("/status/<uuid:job_id>")
@require_login
def job_status(job_id: uuid.UUID):
"""View the job status."""
with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn:
try:
job = jobs.job(conn, job_id, fulldetails=True)
status = job["metadata"]["status"]
register_job_handlers(job)
if status == "error":
return error_handler(job)
if status == "completed":
return success_handler(job)
return render_template(f"jobs/job-status.html", job=job)
except JobNotFound as jnf:
return render_template(
"jobs/job-not-found.html",
job_id=job_id)
@background_jobs_bp.route("/error/<uuid:job_id>")
@require_login
def job_error(job_id: uuid.UUID):
with sqlite3.connection(app.config["ASYNCHRONOUS_JOBS_SQLITE_DB"]) as conn:
try:
job = jobs.job(conn, job_id, fulldetails=True)
return render_template("jobs/job-error.html", job=job)
except JobNotFound as jnf:
return render_template("jobs/job-not-found.html", job_id=job_id)
|