aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authorisation/roles.py
blob: e71d427bdf2104d2232c305f5ba6c354afe9d7e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""Handle management of roles"""
from uuid import UUID, uuid4
from functools import reduce
from typing import Iterable, NamedTuple

from gn3.auth import db
from gn3.auth.authentication.users import User
from gn3.auth.authentication.checks import authenticated_p

from .checks import authorised_p
from .privileges import Privilege

class Role(NamedTuple):
    """Class representing a role: creates immutable objects."""
    role_id: UUID
    role_name: str
    privileges: Iterable[Privilege]

@authenticated_p
@authorised_p(("create-role",), error_message="Could not create role")
def create_role(
        cursor: db.DbCursor, role_name: str,
        privileges: Iterable[Privilege]) -> Role:
    """
    Create a new generic role.

    PARAMS:
    * cursor: A database cursor object - This function could be used as part of
              a transaction, hence the use of a cursor rather than a connection
              object.
    * role_name: The name of the role
    * privileges: A 'list' of privileges to assign the new role

    RETURNS: An immutable `gn3.auth.authorisation.roles.Role` object
    """
    role = Role(uuid4(), role_name, privileges)

    cursor.execute(
        "INSERT INTO roles(role_id, role_name) VALUES (?, ?)",
        (str(role.role_id), role.role_name))
    cursor.executemany(
        "INSERT INTO role_privileges(role_id, privilege_id) VALUES (?, ?)",
        tuple((str(role.role_id), str(priv.privilege_id))
              for priv in privileges))

    return role

def __organise_privileges__(roles_dict, privilege_row):
    """Organise the privileges into their roles."""
    role_id_str = privilege_row["role_id"]
    if  role_id_str in roles_dict:
        return {
            **roles_dict,
            role_id_str: Role(
                UUID(role_id_str),
                privilege_row["role_name"],
                roles_dict[role_id_str].privileges + (
                    Privilege(UUID(privilege_row["privilege_id"]),
                              privilege_row["privilege_name"]),))
        }

    return {
        **roles_dict,
        role_id_str: Role(
            UUID(role_id_str),
            privilege_row["role_name"],
            (Privilege(UUID(privilege_row["privilege_id"]),
                       privilege_row["privilege_name"]),))
    }

def user_roles(conn: db.DbConnection, user: User):
    """Retrieve ALL roles assigned to the user."""
    with db.cursor(conn) as cursor:
        cursor.execute(
            "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r "
            "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp "
            "ON r.role_id=rp.role_id INNER JOIN privileges AS p "
            "ON rp.privilege_id=p.privilege_id WHERE ur.user_id=? "
            "UNION "
            "SELECT r.*, p.* FROM group_user_roles_on_resources AS guror "
            "INNER JOIN roles AS r ON guror.role_id=r.role_id "
            "INNER JOIN role_privileges AS rp ON r.role_id=rp.role_id "
            "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id "
            "WHERE guror.user_id=?",
            ((str(user.user_id),)*2))

        results = cursor.fetchall()
        if results:
            return tuple(
                reduce(__organise_privileges__, results, {}).values())
        return tuple()