about summary refs log tree commit diff
path: root/gn3/auth/authorisation/roles
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2023-02-02 11:35:51 +0300
committerFrederick Muriuki Muriithi2023-02-02 12:03:51 +0300
commitdfe5eb18e3ec8dc570d118bfe95c5d4dcb2c7575 (patch)
treeb45da1e9eba405042ef47174215b827739f5a393 /gn3/auth/authorisation/roles
parent6fc120aca6062f96725adaece85a7b76000affda (diff)
downloadgenenetwork3-dfe5eb18e3ec8dc570d118bfe95c5d4dcb2c7575.tar.gz
auth: Reorganise modules/packages for easier dev and maintenance
Split the views/routes into separate modules each dealing with a narrower
scope of the application to aid in maintenance, and help with making the
development easier.
Diffstat (limited to 'gn3/auth/authorisation/roles')
-rw-r--r--gn3/auth/authorisation/roles/__init__.py3
-rw-r--r--gn3/auth/authorisation/roles/models.py151
-rw-r--r--gn3/auth/authorisation/roles/views.py26
3 files changed, 180 insertions, 0 deletions
diff --git a/gn3/auth/authorisation/roles/__init__.py b/gn3/auth/authorisation/roles/__init__.py
new file mode 100644
index 0000000..293a12f
--- /dev/null
+++ b/gn3/auth/authorisation/roles/__init__.py
@@ -0,0 +1,3 @@
+"""Initialise the `gn3.auth.authorisation.roles` package"""
+
+from .models import Role
diff --git a/gn3/auth/authorisation/roles/models.py b/gn3/auth/authorisation/roles/models.py
new file mode 100644
index 0000000..b1aac75
--- /dev/null
+++ b/gn3/auth/authorisation/roles/models.py
@@ -0,0 +1,151 @@
+"""Handle management of roles"""
+from uuid import UUID, uuid4
+from functools import reduce
+from typing import Any, Sequence, Iterable, NamedTuple
+
+from pymonad.maybe import Just, Maybe, Nothing
+from pymonad.either import Left, Right, Either
+
+from gn3.auth import db
+from gn3.auth.dictify import dictify
+from gn3.auth.authentication.users import User
+
+from ..checks import authorised_p
+from ..privileges import Privilege
+from ..errors import NotFoundError
+
+class Role(NamedTuple):
+    """Class representing a role: creates immutable objects."""
+    role_id: UUID
+    role_name: str
+    privileges: Iterable[Privilege]
+
+    def dictify(self) -> dict[str, Any]:
+        """Return a dict representation of `Role` objects."""
+        return {
+            "role_id": self.role_id, "role_name": self.role_name,
+            "privileges": tuple(dictify(priv) for priv in self.privileges)
+        }
+
+@authorised_p(("group:role:create-role",), error_message="Could not create role")
+def create_role(
+        cursor: db.DbCursor, role_name: str,
+        privileges: Iterable[Privilege]) -> Role:
+    """
+    Create a new generic role.
+
+    PARAMS:
+    * cursor: A database cursor object - This function could be used as part of
+              a transaction, hence the use of a cursor rather than a connection
+              object.
+    * role_name: The name of the role
+    * privileges: A 'list' of privileges to assign the new role
+
+    RETURNS: An immutable `gn3.auth.authorisation.roles.Role` object
+    """
+    role = Role(uuid4(), role_name, privileges)
+
+    cursor.execute(
+        "INSERT INTO roles(role_id, role_name) VALUES (?, ?)",
+        (str(role.role_id), role.role_name))
+    cursor.executemany(
+        "INSERT INTO role_privileges(role_id, privilege_id) VALUES (?, ?)",
+        tuple((str(role.role_id), str(priv.privilege_id))
+              for priv in privileges))
+
+    return role
+
+def __organise_privileges__(roles_dict, privilege_row):
+    """Organise the privileges into their roles."""
+    role_id_str = privilege_row["role_id"]
+    if  role_id_str in roles_dict:
+        return {
+            **roles_dict,
+            role_id_str: Role(
+                UUID(role_id_str),
+                privilege_row["role_name"],
+                roles_dict[role_id_str].privileges + (
+                    Privilege(privilege_row["privilege_id"],
+                              privilege_row["privilege_description"]),))
+        }
+
+    return {
+        **roles_dict,
+        role_id_str: Role(
+            UUID(role_id_str),
+            privilege_row["role_name"],
+            (Privilege(privilege_row["privilege_id"],
+                       privilege_row["privilege_description"]),))
+    }
+
+def user_roles(conn: db.DbConnection, user: User) -> Maybe[Sequence[Role]]:
+    """Retrieve non-resource roles assigned to the user."""
+    with db.cursor(conn) as cursor:
+        cursor.execute(
+            "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r "
+            "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp "
+            "ON r.role_id=rp.role_id INNER JOIN privileges AS p "
+            "ON rp.privilege_id=p.privilege_id WHERE ur.user_id=?",
+            (str(user.user_id),))
+
+        results = cursor.fetchall()
+        if results:
+            return Just(tuple(
+                reduce(__organise_privileges__, results, {}).values()))
+        return Nothing
+
+def user_role(conn: db.DbConnection, user: User, role_id: UUID) -> Either:
+    """Retrieve a specific non-resource role assigned to the user."""
+    with db.cursor(conn) as cursor:
+        cursor.execute(
+            "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r "
+            "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp "
+            "ON r.role_id=rp.role_id INNER JOIN privileges AS p "
+            "ON rp.privilege_id=p.privilege_id "
+            "WHERE ur.user_id=? AND ur.role_id=?",
+            (str(user.user_id), str(role_id)))
+
+        results = cursor.fetchall()
+        if results:
+            return Right(tuple(
+                reduce(__organise_privileges__, results, {}).values())[0])
+        return Left(NotFoundError(
+            f"Could not find role with id '{role_id}'",))
+
+def assign_default_roles(cursor: db.DbCursor, user: User):
+    """Assign `user` some default roles."""
+    cursor.execute(
+        'SELECT role_id FROM roles WHERE role_name IN '
+        '("group-creator")')
+    role_ids = cursor.fetchall()
+    str_user_id = str(user.user_id)
+    params = tuple(
+        {"user_id": str_user_id, "role_id": row["role_id"]} for row in role_ids)
+    cursor.executemany(
+        ("INSERT INTO user_roles VALUES (:user_id, :role_id)"),
+        params)
+
+def revoke_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str):
+    """Revoke a role from `user` by the role's name"""
+    cursor.execute(
+        "SELECT role_id FROM roles WHERE role_name=:role_name",
+        {"role_name": role_name})
+    role = cursor.fetchone()
+    if role:
+        cursor.execute(
+            ("DELETE FROM user_roles "
+             "WHERE user_id=:user_id AND role_id=:role_id"),
+            {"user_id": str(user.user_id), "role_id": role["role_id"]})
+
+def assign_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str):
+    """Revoke a role from `user` by the role's name"""
+    cursor.execute(
+        "SELECT role_id FROM roles WHERE role_name=:role_name",
+        {"role_name": role_name})
+    role = cursor.fetchone()
+
+    if role:
+        cursor.execute(
+            ("INSERT INTO user_roles VALUES(:user_id, :role_id) "
+             "ON CONFLICT DO NOTHING"),
+            {"user_id": str(user.user_id), "role_id": role["role_id"]})
diff --git a/gn3/auth/authorisation/roles/views.py b/gn3/auth/authorisation/roles/views.py
new file mode 100644
index 0000000..975fb19
--- /dev/null
+++ b/gn3/auth/authorisation/roles/views.py
@@ -0,0 +1,26 @@
+"""The views/routes for the `gn3.auth.authorisation.roles` package."""
+import uuid
+
+from flask import jsonify, Response, Blueprint, current_app
+
+from gn3.auth import db
+from gn3.auth.dictify import dictify
+
+from .models import user_role
+
+from ...authentication.oauth2.resource_server import require_oauth
+
+roles = Blueprint("roles", __name__)
+
+@roles.route("/view/<uuid:role_id>", methods=["GET"])
+@require_oauth("role")
+def view_role(role_id: uuid.UUID) -> Response:
+    """Retrieve a user role with id `role_id`"""
+    def __error__(exc: Exception):
+        raise exc
+    with require_oauth.acquire("profile role") as the_token:
+        db_uri = current_app.config["AUTH_DB"]
+        with db.connection(db_uri) as conn:
+            the_role = user_role(conn, the_token.user, role_id)
+            return the_role.either(
+                __error__, lambda a_role: jsonify(dictify(a_role)))