1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
"""This module contains gn2 decorators"""
import json
import requests
from functools import wraps
from urllib.parse import urljoin
from typing import Dict, Callable
from flask import g, flash, request, url_for, redirect, current_app
from gn3.authentication import AdminRole
from gn2.wqflask.oauth2 import client
from gn2.wqflask.oauth2.session import session_info
from gn2.wqflask.oauth2.checks import user_logged_in
from gn2.wqflask.oauth2.request_utils import process_error
def login_required(pagename: str = ""):
"""Use this for endpoints where login is required"""
def __build_wrap__(func):
@wraps(func)
def wrap(*args, **kwargs):
if not user_logged_in():
msg = ("You need to be logged in to access that page."
if not bool(pagename) else
("You need to be logged in to access the "
f"'{pagename.title()}' page."))
flash(msg, "alert-warning")
return redirect("/")
return func(*args, **kwargs)
return wrap
return __build_wrap__
def edit_admins_access_required(f):
"""Use this for endpoints where ownership of a resource is required"""
@wraps(f)
def wrap(*args, **kwargs):
resource_id: str = kwargs.get("resource_id", "")
response: Dict = {}
try:
user_id = ((g.user_session.record.get(b"user_id") or
b"").decode("utf-8")
or g.user_session.record.get("user_id") or "")
response = json.loads(
requests.get(urljoin(
current_app.config.get("GN2_PROXY"),
("available?resource="
f"{resource_id}&user={user_id}"))).content)
except:
response = {}
if max([AdminRole(role) for role in response.get(
"admin", ["not-admin"])]) < AdminRole.EDIT_ADMINS:
return redirect(url_for("no_access_page"))
return f(*args, **kwargs)
return wrap
class AuthorisationError(Exception):
"""Raise when there is an authorisation issue."""
def __init__(self, description, user):
self.description = description
self.user = user
super().__init__(self, description, user)
def required_trait_access(access_levels: tuple[str, ...],
dataset_key: str = "dataset_name",
trait_key: str = "name") -> Callable:
@wraps(required_trait_access)
def __build_access_checker__(func: Callable):
@wraps(func)
def __checker__(*args, **kwargs):
def __error__(err):
error = process_error(err)
raise AuthorisationError(
f"{error['error']}: {error['error_description']}",
session_info()["user"])
def __success__(priv_info):
if all(priv in priv_info[0]["privileges"] for priv in access_levels):
return func(*args, **kwargs)
missing = tuple(f"'{priv}'" for priv in access_levels
if priv not in priv_info[0]["privileges"])
raise AuthorisationError(
f"Missing privileges: {', '.join(missing)}",
session_info()["user"])
dataset_name = kwargs.get(
dataset_key,
request.args.get(dataset_key, request.form.get(dataset_key, "")))
if not bool(dataset_name):
raise AuthorisationError(
"DeveloperError: Dataset name not provided. It is needed "
"for the authorisation checks.",
session_info()["user"])
trait_name = kwargs.get(
trait_key,
request.args.get(trait_key, request.form.get(trait_key, "")))
if not bool(trait_name):
raise AuthorisationError(
"DeveloperError: Trait name not provided. It is needed for "
"the authorisation checks.",
session_info()["user"])
return client.post(
"auth/data/authorisation",
json={"traits": [f"{dataset_name}::{trait_name}"]}).either(
__error__, __success__)
return __checker__
return __build_access_checker__
|