aboutsummaryrefslogtreecommitdiff
path: root/gn2/wqflask/decorators.py
blob: 4fe865c9c398022d39c40ae4a48ff31f65e8d36b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""This module contains gn2 decorators"""
import json
import requests
from functools import wraps
from urllib.parse import urljoin
from typing import Dict, Callable

import redis
from flask import g, flash, request, url_for, redirect, current_app

from gn3.authentication import AdminRole
from gn3.authentication import DataRole

from gn2.wqflask.oauth2 import client
from gn2.wqflask.oauth2.session import session_info
from gn2.wqflask.oauth2.checks import user_logged_in
from gn2.wqflask.oauth2.request_utils import process_error


def login_required(pagename: str = ""):
    """Use this for endpoints where login is required"""
    def __build_wrap__(func):
        @wraps(func)
        def wrap(*args, **kwargs):
            if not user_logged_in():
                msg = ("You need to be logged in to access that page."
                       if not bool(pagename) else
                       ("You need to be logged in to access the "
                        f"'{pagename.title()}' page."))
                flash(msg, "alert-warning")
                return redirect("/")
            return func(*args, **kwargs)
        return wrap
    return __build_wrap__


def edit_access_required(f):
    """Use this for endpoints where people with admin or edit privileges
are required"""
    @wraps(f)
    def wrap(*args, **kwargs):
        resource_id: str = ""
        if request.args.get("resource-id"):
            resource_id = request.args.get("resource-id")
        elif kwargs.get("resource_id"):
            resource_id = kwargs.get("resource_id")
        response: Dict = {}
        try:
            user_id = ((g.user_session.record.get(b"user_id") or
                        b"").decode("utf-8")
                       or g.user_session.record.get("user_id") or "")
            response = json.loads(
                requests.get(urljoin(
                    current_app.config.get("GN2_PROXY"),
                    ("available?resource="
                     f"{resource_id}&user={user_id}"))).content)
        except:
            response = {}
        if max([DataRole(role) for role in response.get(
                "data", ["no-access"])]) < DataRole.EDIT:
            return redirect(url_for("no_access_page"))
        return f(*args, **kwargs)
    return wrap


def edit_admins_access_required(f):
    """Use this for endpoints where ownership of a resource is required"""
    @wraps(f)
    def wrap(*args, **kwargs):
        resource_id: str = kwargs.get("resource_id", "")
        response: Dict = {}
        try:
            user_id = ((g.user_session.record.get(b"user_id") or
                        b"").decode("utf-8")
                       or g.user_session.record.get("user_id") or "")
            response = json.loads(
                requests.get(urljoin(
                    current_app.config.get("GN2_PROXY"),
                    ("available?resource="
                     f"{resource_id}&user={user_id}"))).content)
        except:
            response = {}
        if max([AdminRole(role) for role in response.get(
                "admin", ["not-admin"])]) < AdminRole.EDIT_ADMINS:
            return redirect(url_for("no_access_page"))
        return f(*args, **kwargs)
    return wrap

class AuthorisationError(Exception):
    """Raise when there is an authorisation issue."""
    def __init__(self, description, user):
        self.description = description
        self.user = user
        super().__init__(self, description, user)

def required_access(access_levels: tuple[str, ...],
                    dataset_key: str = "dataset_name",
                    trait_key: str = "name") -> Callable:
    def __build_access_checker__(func: Callable):
        @wraps(func)
        def __checker__(*args, **kwargs):
            def __error__(err):
                error = process_error(err)
                raise AuthorisationError(
                    f"{error['error']}: {error['error_description']}",
                    session_info()["user"])

            def __success__(priv_info):
                if all(priv in priv_info[0]["privileges"] for priv in access_levels):
                    return func(*args, **kwargs)
                missing = tuple(f"'{priv}'" for priv in access_levels
                                if priv not in priv_info[0]["privileges"])
                raise AuthorisationError(
                    f"Missing privileges: {', '.join(missing)}",
                    session_info()["user"])
            dataset_name = kwargs.get(
                dataset_key,
                request.args.get(dataset_key, request.form.get(dataset_key, "")))
            if not bool(dataset_name):
                raise AuthorisationError(
                    "DeveloperError: Dataset name not provided. It is needed "
                    "for the authorisation checks.",
                    session_info()["user"])
            trait_name = kwargs.get(
                trait_key,
                request.args.get(trait_key, request.form.get(trait_key, "")))
            if not bool(trait_name):
                raise AuthorisationError(
                    "DeveloperError: Trait name not provided. It is needed for "
                    "the authorisation checks.",
                    session_info()["user"])
            return client.post(
                "auth/data/authorisation",
                json={"traits": [f"{dataset_name}::{trait_name}"]}).either(
                    __error__, __success__)
        return __checker__
    return __build_access_checker__