aboutsummaryrefslogtreecommitdiff
path: root/gn3/auth/authorisation/roles.py
blob: 23b74cc1023c0d0f58038c74e2e39158c4fd2de1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""Handle management of roles"""
from uuid import UUID, uuid4
from functools import reduce
from typing import Any, Sequence, Iterable, NamedTuple

from pymonad.maybe import Just, Maybe, Nothing
from pymonad.either import Left, Right, Either

from gn3.auth import db
from gn3.auth.dictify import dictify
from gn3.auth.authentication.users import User
from gn3.auth.authentication.checks import authenticated_p

from .checks import authorised_p
from .privileges import Privilege
from .errors import NotFoundError, AuthorisationError

class Role(NamedTuple):
    """Class representing a role: creates immutable objects."""
    role_id: UUID
    role_name: str
    privileges: Iterable[Privilege]

    def dictify(self) -> dict[str, Any]:
        """Return a dict representation of `Role` objects."""
        return {
            "role_id": self.role_id, "role_name": self.role_name,
            "privileges": tuple(dictify(priv) for priv in self.privileges)
        }

@authenticated_p
@authorised_p(("group:role:create-role",), error_message="Could not create role")
def create_role(
        cursor: db.DbCursor, role_name: str,
        privileges: Iterable[Privilege]) -> Role:
    """
    Create a new generic role.

    PARAMS:
    * cursor: A database cursor object - This function could be used as part of
              a transaction, hence the use of a cursor rather than a connection
              object.
    * role_name: The name of the role
    * privileges: A 'list' of privileges to assign the new role

    RETURNS: An immutable `gn3.auth.authorisation.roles.Role` object
    """
    role = Role(uuid4(), role_name, privileges)

    cursor.execute(
        "INSERT INTO roles(role_id, role_name) VALUES (?, ?)",
        (str(role.role_id), role.role_name))
    cursor.executemany(
        "INSERT INTO role_privileges(role_id, privilege_id) VALUES (?, ?)",
        tuple((str(role.role_id), str(priv.privilege_id))
              for priv in privileges))

    return role

def __organise_privileges__(roles_dict, privilege_row):
    """Organise the privileges into their roles."""
    role_id_str = privilege_row["role_id"]
    if  role_id_str in roles_dict:
        return {
            **roles_dict,
            role_id_str: Role(
                UUID(role_id_str),
                privilege_row["role_name"],
                roles_dict[role_id_str].privileges + (
                    Privilege(privilege_row["privilege_id"],
                              privilege_row["privilege_description"]),))
        }

    return {
        **roles_dict,
        role_id_str: Role(
            UUID(role_id_str),
            privilege_row["role_name"],
            (Privilege(privilege_row["privilege_id"],
                       privilege_row["privilege_description"]),))
    }

def user_roles(conn: db.DbConnection, user: User) -> Maybe[Sequence[Role]]:
    """Retrieve non-resource roles assigned to the user."""
    with db.cursor(conn) as cursor:
        cursor.execute(
            "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r "
            "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp "
            "ON r.role_id=rp.role_id INNER JOIN privileges AS p "
            "ON rp.privilege_id=p.privilege_id WHERE ur.user_id=?",
            (str(user.user_id),))

        results = cursor.fetchall()
        if results:
            return Just(tuple(
                reduce(__organise_privileges__, results, {}).values()))
        return Nothing

def user_role(conn: db.DbConnection, user: User, role_id: UUID) -> Either:
    """Retrieve a specific non-resource role assigned to the user."""
    with db.cursor(conn) as cursor:
        cursor.execute(
            "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r "
            "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp "
            "ON r.role_id=rp.role_id INNER JOIN privileges AS p "
            "ON rp.privilege_id=p.privilege_id "
            "WHERE ur.user_id=? AND ur.role_id=?",
            (str(user.user_id), str(role_id)))

        results = cursor.fetchall()
        if results:
            return Right(tuple(
                reduce(__organise_privileges__, results, {}).values())[0])
        return Left(NotFoundError(
            f"Could not find role with id '{role_id}'",))

def assign_default_roles(cursor: db.DbCursor, user: User):
    """Assign `user` some default roles."""
    cursor.execute(
        'SELECT role_id FROM roles WHERE role_name IN '
        '("group-creator")')
    role_ids = cursor.fetchall()
    str_user_id = str(user.user_id)
    params = tuple(
        {"user_id": str_user_id, "role_id": row["role_id"]} for row in role_ids)
    cursor.executemany(
        ("INSERT INTO user_roles VALUES (:user_id, :role_id)"),
        params)

def revoke_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str):
    """Revoke a role from `user` by the role's name"""
    cursor.execute(
        "SELECT role_id FROM roles WHERE role_name=:role_name",
        {"role_name": role_name})
    role = cursor.fetchone()
    if role:
        cursor.execute(
            ("DELETE FROM user_roles "
             "WHERE user_id=:user_id AND role_id=:role_id"),
            {"user_id": str(user.user_id), "role_id": role["role_id"]})

def assign_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str):
    """Revoke a role from `user` by the role's name"""
    cursor.execute(
        "SELECT role_id FROM roles WHERE role_name=:role_name",
        {"role_name": role_name})
    role = cursor.fetchone()

    if role:
        cursor.execute(
            ("INSERT INTO user_roles VALUES(:user_id, :role_id) "
             "ON CONFLICT DO NOTHING"),
            {"user_id": str(user.user_id), "role_id": role["role_id"]})