aboutsummaryrefslogtreecommitdiff
path: root/migrations/auth/20240606_01_xQDwL-move-role-manipulation-privileges-from-group-to-resources.py
blob: a45fd30e3535bfae96df1cdf9b0d4fe4ab815042 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""
Move role-manipulation privileges from group to resources
"""
import sqlite3
from yoyo import step

__depends__ = {'20240529_01_ALNWj-update-schema-for-user-verification'}

def role_by_name(cursor, role_name):
    """Fetch group-admin role"""
    cursor.execute("SELECT * FROM roles WHERE role_name=?",
                   (role_name,))
    return dict(cursor.fetchone())


def move_privileges_to_resources(conn):
    """Move role-manipulation privileges from group to resource."""
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()
    cursor.execute(
        "DELETE FROM role_privileges WHERE privilege_id IN ("
        "  'group:role:create-role',"
        "  'group:role:delete-role',"
        "  'group:role:edit-role',"
        "  'group:user:assign-role'"
        ")")
    cursor.execute(
        "DELETE FROM privileges WHERE privilege_id IN ("
        "  'group:role:create-role',"
        "  'group:role:delete-role',"
        "  'group:role:edit-role',"
        "  'group:user:assign-role'"
        ")")

    resource_owner_role = role_by_name(cursor, "resource-owner")
    privileges = (
        ("resource:role:create-role",
         "Create a new role on a specific resource"),
        ("resource:role:delete-role",
         "Delete an existing role from a specific resource"),
        ("resource:role:edit-role",
         "Edit an existing role on a specific resource"),
        ("resource:user:assign-role",
         "Assign a user to a role on a specific resource"))
    cursor.executemany(
        ("INSERT INTO privileges(privilege_id, privilege_description) "
         "VALUES (?, ?)"),
        privileges)
    cursor.executemany(
        ("INSERT INTO role_privileges(role_id, privilege_id) "
         "VALUES(?, ?)"),
        tuple((resource_owner_role["role_id"], privilege[0])
              for privilege in privileges))
    cursor.close()

def move_privileges_to_groups(conn):
    """Move role-manipulation privileges from resource to group."""
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()
    cursor.execute(
        "DELETE FROM role_privileges WHERE privilege_id IN ("
        "  'resource:role:create-role',"
        "  'resource:role:delete-role',"
        "  'resource:role:edit-role',"
        "  'resource:user:assign-role'"
        ")")
    cursor.execute(
        "DELETE FROM privileges WHERE privilege_id IN ("
        "  'resource:role:create-role',"
        "  'resource:role:delete-role',"
        "  'resource:role:edit-role',"
        "  'resource:user:assign-role'"
        ")")

    group_leader_role = role_by_name(cursor, "group-leader")
    privileges = (
        ("group:role:create-role", "Create a new role"),
        ("group:role:delete-role", "Delete an existing role"),
        ("group:role:edit-role", "edit/update an existing role"),
        ("group:user:assign-role", "Assign a role to an existing user"))
    cursor.executemany(
        ("INSERT INTO privileges(privilege_id, privilege_description) "
         "VALUES (?, ?)"),
        privileges)
    cursor.executemany(
        ("INSERT INTO role_privileges(role_id, privilege_id) "
         "VALUES(?, ?)"),
        tuple((group_leader_role["role_id"], privilege[0])
              for privilege in privileges))
    cursor.close()

steps = [
    step(move_privileges_to_resources, move_privileges_to_groups)
]