1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
"""This module contains gn2 decorators"""
import json
import requests
from functools import wraps
from urllib.parse import urljoin
from typing import Dict, Callable
import redis
from flask import g, flash, request, url_for, redirect, current_app
from gn3.authentication import AdminRole
from gn3.authentication import DataRole
from gn2.wqflask.oauth2 import client
from gn2.wqflask.oauth2.session import session_info
from gn2.wqflask.oauth2.checks import user_logged_in
from gn2.wqflask.oauth2.request_utils import process_error
def login_required(pagename: str = ""):
"""Use this for endpoints where login is required"""
def __build_wrap__(func):
@wraps(func)
def wrap(*args, **kwargs):
if not user_logged_in():
msg = ("You need to be logged in to access that page."
if not bool(pagename) else
("You need to be logged in to access the "
f"'{pagename.title()}' page."))
flash(msg, "alert-warning")
return redirect("/")
return func(*args, **kwargs)
return wrap
return __build_wrap__
def edit_access_required(f):
"""Use this for endpoints where people with admin or edit privileges
are required"""
@wraps(f)
def wrap(*args, **kwargs):
resource_id: str = ""
if request.args.get("resource-id"):
resource_id = request.args.get("resource-id")
elif kwargs.get("resource_id"):
resource_id = kwargs.get("resource_id")
response: Dict = {}
try:
user_id = ((g.user_session.record.get(b"user_id") or
b"").decode("utf-8")
or g.user_session.record.get("user_id") or "")
response = json.loads(
requests.get(urljoin(
current_app.config.get("GN2_PROXY"),
("available?resource="
f"{resource_id}&user={user_id}"))).content)
except:
response = {}
if max([DataRole(role) for role in response.get(
"data", ["no-access"])]) < DataRole.EDIT:
return redirect(url_for("no_access_page"))
return f(*args, **kwargs)
return wrap
def edit_admins_access_required(f):
"""Use this for endpoints where ownership of a resource is required"""
@wraps(f)
def wrap(*args, **kwargs):
resource_id: str = kwargs.get("resource_id", "")
response: Dict = {}
try:
user_id = ((g.user_session.record.get(b"user_id") or
b"").decode("utf-8")
or g.user_session.record.get("user_id") or "")
response = json.loads(
requests.get(urljoin(
current_app.config.get("GN2_PROXY"),
("available?resource="
f"{resource_id}&user={user_id}"))).content)
except:
response = {}
if max([AdminRole(role) for role in response.get(
"admin", ["not-admin"])]) < AdminRole.EDIT_ADMINS:
return redirect(url_for("no_access_page"))
return f(*args, **kwargs)
return wrap
class AuthorisationError(Exception):
"""Raise when there is an authorisation issue."""
def __init__(self, description, user):
self.description = description
self.user = user
super().__init__(self, description, user)
def required_access(access_levels: tuple[str, ...],
dataset_key: str = "dataset_name",
trait_key: str = "name") -> Callable:
def __build_access_checker__(func: Callable):
@wraps(func)
def __checker__(*args, **kwargs):
def __error__(err):
error = process_error(err)
raise AuthorisationError(
f"{error['error']}: {error['error_description']}",
session_info()["user"])
def __success__(priv_info):
if all(priv in priv_info[0]["privileges"] for priv in access_levels):
return func(*args, **kwargs)
missing = tuple(f"'{priv}'" for priv in access_levels
if priv not in priv_info[0]["privileges"])
raise AuthorisationError(
f"Missing privileges: {', '.join(missing)}",
session_info()["user"])
dataset_name = kwargs.get(
dataset_key,
request.args.get(dataset_key, request.form.get(dataset_key, "")))
if not bool(dataset_name):
raise AuthorisationError(
"DeveloperError: Dataset name not provided. It is needed "
"for the authorisation checks.",
session_info()["user"])
trait_name = kwargs.get(
trait_key,
request.args.get(trait_key, request.form.get(trait_key, "")))
if not bool(trait_name):
raise AuthorisationError(
"DeveloperError: Trait name not provided. It is needed for "
"the authorisation checks.",
session_info()["user"])
return client.post(
"auth/data/authorisation",
json={"traits": [f"{dataset_name}::{trait_name}"]}).either(
__error__, __success__)
return __checker__
return __build_access_checker__
|