aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authorisation/checks.py
blob: 0825c841cd6e41f23e32ccad890d589362781603 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""Functions to check for authorisation."""
from functools import wraps
from typing import Callable

from flask import current_app as app

from gn3.auth import db

from . import privileges as auth_privs
from .errors import AuthorisationError

from ..authentication.oauth2.resource_server import require_oauth

def __system_privileges_in_roles__(conn, user):
    """
    This really is a hack since groups are not treated as resources at the
    moment of writing this.

    We need a way of allowing the user to have the system:group:* privileges.
    """
    query = (
        "SELECT DISTINCT p.* FROM users AS u "
        "INNER JOIN group_user_roles_on_resources AS guror "
        "ON u.user_id=guror.user_id "
        "INNER JOIN roles AS r ON guror.role_id=r.role_id "
        "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
        "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id "
        "WHERE u.user_id=? AND p.privilege_id LIKE 'system:%'")
    with db.cursor(conn) as cursor:
        cursor.execute(query, (str(user.user_id),))
        return (row["privilege_id"] for row in cursor.fetchall())

def authorised_p(
        privileges: tuple[str, ...],
        error_description: str = (
            "You lack authorisation to perform requested action"),
        oauth2_scope = "profile"):
    """Authorisation decorator."""
    assert len(privileges) > 0, "You must provide at least one privilege"
    def __build_authoriser__(func: Callable):
        @wraps(func)
        def __authoriser__(*args, **kwargs):
            # the_user = user or (hasattr(g, "user") and g.user)
            with require_oauth.acquire(oauth2_scope) as the_token:
                the_user = the_token.user
                if the_user:
                    with db.connection(app.config["AUTH_DB"]) as conn:
                        user_privileges = tuple(
                            priv.privilege_id for priv in
                            auth_privs.user_privileges(conn, the_user)) + tuple(
                                priv_id for priv_id in
                                __system_privileges_in_roles__(conn, the_user))

                    not_assigned = [
                        priv for priv in privileges if priv not in user_privileges]
                    if len(not_assigned) == 0:
                        return func(*args, **kwargs)

                raise AuthorisationError(error_description)
        return __authoriser__
    return __build_authoriser__