From dfe5eb18e3ec8dc570d118bfe95c5d4dcb2c7575 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Thu, 2 Feb 2023 11:35:51 +0300 Subject: auth: Reorganise modules/packages for easier dev and maintenance Split the views/routes into separate modules each dealing with a narrower scope of the application to aid in maintenance, and help with making the development easier. --- gn3/auth/authorisation/groups.py | 223 --------------------------- gn3/auth/authorisation/groups/__init__.py | 3 + gn3/auth/authorisation/groups/models.py | 223 +++++++++++++++++++++++++++ gn3/auth/authorisation/groups/views.py | 54 +++++++ gn3/auth/authorisation/resources.py | 141 ----------------- gn3/auth/authorisation/resources/__init__.py | 2 + gn3/auth/authorisation/resources/models.py | 142 +++++++++++++++++ gn3/auth/authorisation/resources/views.py | 4 + gn3/auth/authorisation/roles.py | 151 ------------------ gn3/auth/authorisation/roles/__init__.py | 3 + gn3/auth/authorisation/roles/models.py | 151 ++++++++++++++++++ gn3/auth/authorisation/roles/views.py | 26 ++++ gn3/auth/authorisation/users/__init__.py | 0 gn3/auth/authorisation/users/views.py | 149 ++++++++++++++++++ gn3/auth/authorisation/views.py | 197 ----------------------- 15 files changed, 757 insertions(+), 712 deletions(-) delete mode 100644 gn3/auth/authorisation/groups.py create mode 100644 gn3/auth/authorisation/groups/__init__.py create mode 100644 gn3/auth/authorisation/groups/models.py create mode 100644 gn3/auth/authorisation/groups/views.py delete mode 100644 gn3/auth/authorisation/resources.py create mode 100644 gn3/auth/authorisation/resources/__init__.py create mode 100644 gn3/auth/authorisation/resources/models.py create mode 100644 gn3/auth/authorisation/resources/views.py delete mode 100644 gn3/auth/authorisation/roles.py create mode 100644 gn3/auth/authorisation/roles/__init__.py create mode 100644 gn3/auth/authorisation/roles/models.py create mode 100644 gn3/auth/authorisation/roles/views.py create mode 100644 gn3/auth/authorisation/users/__init__.py create mode 100644 gn3/auth/authorisation/users/views.py delete mode 100644 gn3/auth/authorisation/views.py (limited to 'gn3/auth/authorisation') diff --git a/gn3/auth/authorisation/groups.py b/gn3/auth/authorisation/groups.py deleted file mode 100644 index c691457..0000000 --- a/gn3/auth/authorisation/groups.py +++ /dev/null @@ -1,223 +0,0 @@ -"""Handle the management of resource/user groups.""" -import json -from uuid import UUID, uuid4 -from typing import Any, Sequence, Iterable, Optional, NamedTuple - -from flask import g -from pymonad.maybe import Just, Maybe, Nothing - -from gn3.auth import db -from gn3.auth.dictify import dictify -from gn3.auth.authentication.users import User - -from .checks import authorised_p -from .privileges import Privilege -from .errors import AuthorisationError -from .roles import ( - Role, create_role, revoke_user_role_by_name, assign_user_role_by_name) - -class Group(NamedTuple): - """Class representing a group.""" - group_id: UUID - group_name: str - group_metadata: dict[str, Any] - - def dictify(self): - """Return a dict representation of `Group` objects.""" - return { - "group_id": self.group_id, "group_name": self.group_name, - "group_metadata": self.group_metadata - } - -class GroupRole(NamedTuple): - """Class representing a role tied/belonging to a group.""" - group_role_id: UUID - group: Group - role: Role - - def dictify(self) -> dict[str, Any]: - """Return a dict representation of `GroupRole` objects.""" - return { - "group_role_id": self.group_role_id, "group": dictify(self.group), - "role": dictify(self.role) - } - -class GroupCreationError(AuthorisationError): - """Raised whenever a group creation fails""" - -class MembershipError(AuthorisationError): - """Raised when there is an error with a user's membership to a group.""" - - def __init__(self, user: User, groups: Sequence[Group]): - """Initialise the `MembershipError` exception object.""" - groups_str = ", ".join(group.group_name for group in groups) - error_message = ( - f"User '{user.name} ({user.email})' is a member of {len(groups)} " - f"groups ({groups_str})") - super().__init__(f"{type(self).__name__}: {error_message}.") - -def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]: - """Returns all the groups that a member belongs to""" - query = ( - "SELECT groups.group_id, group_name, groups.group_metadata " - "FROM group_users INNER JOIN groups " - "ON group_users.group_id=groups.group_id " - "WHERE group_users.user_id=?") - with db.cursor(conn) as cursor: - cursor.execute(query, (str(user.user_id),)) - groups = tuple(Group(row[0], row[1], json.loads(row[2])) - for row in cursor.fetchall()) - - return groups - -def create_group( - conn: db.DbConnection, group_name: str, group_leader: User, - group_description: Optional[str] = None) -> Group: - """Create a new group.""" - user_groups = user_membership(conn, group_leader) - if len(user_groups) > 0: - raise MembershipError(group_leader, user_groups) - - @authorised_p( - ("system:group:create-group",), ( - "You do not have the appropriate privileges to enable you to " - "create a new group."), - group_leader) - def __create_group__(): - with db.cursor(conn) as cursor: - new_group = __save_group__( - cursor, group_name,( - {"group_description": group_description} - if group_description else {})) - add_user_to_group(cursor, new_group, group_leader) - revoke_user_role_by_name(cursor, group_leader, "group-creator") - assign_user_role_by_name(cursor, group_leader, "group-leader") - return new_group - - return __create_group__() - -@authorised_p(("group:role:create-role",), - error_message="Could not create the group role") -def create_group_role( - conn: db.DbConnection, group: Group, role_name: str, - privileges: Iterable[Privilege]) -> GroupRole: - """Create a role attached to a group.""" - with db.cursor(conn) as cursor: - group_role_id = uuid4() - role = create_role(cursor, role_name, privileges) - cursor.execute( - ("INSERT INTO group_roles(group_role_id, group_id, role_id) " - "VALUES(?, ?, ?)"), - (str(group_role_id), str(group.group_id), str(role.role_id))) - - return GroupRole(group_role_id, group, role) - -def authenticated_user_group(conn) -> Maybe: - """ - Returns the currently authenticated user's group. - - Look into returning a Maybe object. - """ - user = g.user - with db.cursor(conn) as cursor: - cursor.execute( - ("SELECT groups.* FROM group_users " - "INNER JOIN groups ON group_users.group_id=groups.group_id " - "WHERE group_users.user_id = ?"), - (str(user.user_id),)) - groups = tuple(Group(UUID(row[0]), row[1], json.loads(row[2] or "{}")) - for row in cursor.fetchall()) - - if len(groups) > 1: - raise MembershipError(user, groups) - - if len(groups) == 1: - return Just(groups[0]) - - return Nothing - -def user_group(cursor: db.DbCursor, user: User) -> Maybe[Group]: - """Returns the given user's group""" - cursor.execute( - ("SELECT groups.group_id, groups.group_name, groups.group_metadata " - "FROM group_users " - "INNER JOIN groups ON group_users.group_id=groups.group_id " - "WHERE group_users.user_id = ?"), - (str(user.user_id),)) - groups = tuple( - Group(UUID(row[0]), row[1], json.loads(row[2] or "{}")) - for row in cursor.fetchall()) - - if len(groups) > 1: - raise MembershipError(user, groups) - - if len(groups) == 1: - return Just(groups[0]) - - return Nothing - -def is_group_leader(cursor: db.DbCursor, user: User, group: Group): - """Check whether the given `user` is the leader of `group`.""" - - ugroup = user_group(cursor, user).maybe( - False, lambda val: val) # type: ignore[arg-type, misc] - if not group: - # User cannot be a group leader if not a member of ANY group - return False - - if not ugroup == group: - # User cannot be a group leader if not a member of THIS group - return False - - cursor.execute( - ("SELECT roles.role_name FROM user_roles LEFT JOIN roles " - "ON user_roles.role_id = roles.role_id WHERE user_id = ?"), - (str(user.user_id),)) - role_names = tuple(row[0] for row in cursor.fetchall()) - - return "group-leader" in role_names - -def all_groups(conn: db.DbConnection) -> Maybe[Sequence[Group]]: - """Retrieve all existing groups""" - with db.cursor(conn) as cursor: - cursor.execute("SELECT * FROM groups") - res = cursor.fetchall() - if res: - return Just(tuple( - Group(row["group_id"], row["group_name"], - json.loads(row["group_metadata"])) for row in res)) - - return Nothing - -def __save_group__( - cursor: db.DbCursor, group_name: str, - group_metadata: dict[str, Any]) -> Group: - """Save a group to db""" - the_group = Group(uuid4(), group_name, group_metadata) - cursor.execute( - ("INSERT INTO groups " - "VALUES(:group_id, :group_name, :group_metadata) " - "ON CONFLICT (group_id) DO UPDATE SET " - "group_name=:group_name, group_metadata=:group_metadata"), - {"group_id": str(the_group.group_id), "group_name": the_group.group_name, - "group_metadata": json.dumps(the_group.group_metadata)}) - return the_group - -def add_user_to_group(cursor: db.DbCursor, the_group: Group, user: User): - """Add `user` to `the_group` as a member.""" - cursor.execute( - ("INSERT INTO group_users VALUES (:group_id, :user_id) " - "ON CONFLICT (group_id, user_id) DO NOTHING"), - {"group_id": str(the_group.group_id), "user_id": str(user.user_id)}) - -def group_users(conn: db.DbConnection, group_id: UUID) -> Iterable[User]: - """Retrieve all users that are members of group with id `group_id`.""" - with db.cursor(conn) as cursor: - cursor.execute( - "SELECT u.* FROM group_users AS gu INNER JOIN users AS u " - "ON gu.user_id = u.user_id WHERE gu.group_id=:group_id", - {"group_id": str(group_id)}) - results = cursor.fetchall() - - return (User(UUID(row["user_id"]), row["email"], row["name"]) - for row in results) diff --git a/gn3/auth/authorisation/groups/__init__.py b/gn3/auth/authorisation/groups/__init__.py new file mode 100644 index 0000000..1cb0bba --- /dev/null +++ b/gn3/auth/authorisation/groups/__init__.py @@ -0,0 +1,3 @@ +"""Initialise the `gn3.auth.authorisation.groups` package""" + +from .models import Group, GroupRole diff --git a/gn3/auth/authorisation/groups/models.py b/gn3/auth/authorisation/groups/models.py new file mode 100644 index 0000000..0750419 --- /dev/null +++ b/gn3/auth/authorisation/groups/models.py @@ -0,0 +1,223 @@ +"""Handle the management of resource/user groups.""" +import json +from uuid import UUID, uuid4 +from typing import Any, Sequence, Iterable, Optional, NamedTuple + +from flask import g +from pymonad.maybe import Just, Maybe, Nothing + +from gn3.auth import db +from gn3.auth.dictify import dictify +from gn3.auth.authentication.users import User + +from ..checks import authorised_p +from ..privileges import Privilege +from ..errors import AuthorisationError +from ..roles.models import ( + Role, create_role, revoke_user_role_by_name, assign_user_role_by_name) + +class Group(NamedTuple): + """Class representing a group.""" + group_id: UUID + group_name: str + group_metadata: dict[str, Any] + + def dictify(self): + """Return a dict representation of `Group` objects.""" + return { + "group_id": self.group_id, "group_name": self.group_name, + "group_metadata": self.group_metadata + } + +class GroupRole(NamedTuple): + """Class representing a role tied/belonging to a group.""" + group_role_id: UUID + group: Group + role: Role + + def dictify(self) -> dict[str, Any]: + """Return a dict representation of `GroupRole` objects.""" + return { + "group_role_id": self.group_role_id, "group": dictify(self.group), + "role": dictify(self.role) + } + +class GroupCreationError(AuthorisationError): + """Raised whenever a group creation fails""" + +class MembershipError(AuthorisationError): + """Raised when there is an error with a user's membership to a group.""" + + def __init__(self, user: User, groups: Sequence[Group]): + """Initialise the `MembershipError` exception object.""" + groups_str = ", ".join(group.group_name for group in groups) + error_message = ( + f"User '{user.name} ({user.email})' is a member of {len(groups)} " + f"groups ({groups_str})") + super().__init__(f"{type(self).__name__}: {error_message}.") + +def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]: + """Returns all the groups that a member belongs to""" + query = ( + "SELECT groups.group_id, group_name, groups.group_metadata " + "FROM group_users INNER JOIN groups " + "ON group_users.group_id=groups.group_id " + "WHERE group_users.user_id=?") + with db.cursor(conn) as cursor: + cursor.execute(query, (str(user.user_id),)) + groups = tuple(Group(row[0], row[1], json.loads(row[2])) + for row in cursor.fetchall()) + + return groups + +def create_group( + conn: db.DbConnection, group_name: str, group_leader: User, + group_description: Optional[str] = None) -> Group: + """Create a new group.""" + user_groups = user_membership(conn, group_leader) + if len(user_groups) > 0: + raise MembershipError(group_leader, user_groups) + + @authorised_p( + ("system:group:create-group",), ( + "You do not have the appropriate privileges to enable you to " + "create a new group."), + group_leader) + def __create_group__(): + with db.cursor(conn) as cursor: + new_group = __save_group__( + cursor, group_name,( + {"group_description": group_description} + if group_description else {})) + add_user_to_group(cursor, new_group, group_leader) + revoke_user_role_by_name(cursor, group_leader, "group-creator") + assign_user_role_by_name(cursor, group_leader, "group-leader") + return new_group + + return __create_group__() + +@authorised_p(("group:role:create-role",), + error_message="Could not create the group role") +def create_group_role( + conn: db.DbConnection, group: Group, role_name: str, + privileges: Iterable[Privilege]) -> GroupRole: + """Create a role attached to a group.""" + with db.cursor(conn) as cursor: + group_role_id = uuid4() + role = create_role(cursor, role_name, privileges) + cursor.execute( + ("INSERT INTO group_roles(group_role_id, group_id, role_id) " + "VALUES(?, ?, ?)"), + (str(group_role_id), str(group.group_id), str(role.role_id))) + + return GroupRole(group_role_id, group, role) + +def authenticated_user_group(conn) -> Maybe: + """ + Returns the currently authenticated user's group. + + Look into returning a Maybe object. + """ + user = g.user + with db.cursor(conn) as cursor: + cursor.execute( + ("SELECT groups.* FROM group_users " + "INNER JOIN groups ON group_users.group_id=groups.group_id " + "WHERE group_users.user_id = ?"), + (str(user.user_id),)) + groups = tuple(Group(UUID(row[0]), row[1], json.loads(row[2] or "{}")) + for row in cursor.fetchall()) + + if len(groups) > 1: + raise MembershipError(user, groups) + + if len(groups) == 1: + return Just(groups[0]) + + return Nothing + +def user_group(cursor: db.DbCursor, user: User) -> Maybe[Group]: + """Returns the given user's group""" + cursor.execute( + ("SELECT groups.group_id, groups.group_name, groups.group_metadata " + "FROM group_users " + "INNER JOIN groups ON group_users.group_id=groups.group_id " + "WHERE group_users.user_id = ?"), + (str(user.user_id),)) + groups = tuple( + Group(UUID(row[0]), row[1], json.loads(row[2] or "{}")) + for row in cursor.fetchall()) + + if len(groups) > 1: + raise MembershipError(user, groups) + + if len(groups) == 1: + return Just(groups[0]) + + return Nothing + +def is_group_leader(cursor: db.DbCursor, user: User, group: Group): + """Check whether the given `user` is the leader of `group`.""" + + ugroup = user_group(cursor, user).maybe( + False, lambda val: val) # type: ignore[arg-type, misc] + if not group: + # User cannot be a group leader if not a member of ANY group + return False + + if not ugroup == group: + # User cannot be a group leader if not a member of THIS group + return False + + cursor.execute( + ("SELECT roles.role_name FROM user_roles LEFT JOIN roles " + "ON user_roles.role_id = roles.role_id WHERE user_id = ?"), + (str(user.user_id),)) + role_names = tuple(row[0] for row in cursor.fetchall()) + + return "group-leader" in role_names + +def all_groups(conn: db.DbConnection) -> Maybe[Sequence[Group]]: + """Retrieve all existing groups""" + with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM groups") + res = cursor.fetchall() + if res: + return Just(tuple( + Group(row["group_id"], row["group_name"], + json.loads(row["group_metadata"])) for row in res)) + + return Nothing + +def __save_group__( + cursor: db.DbCursor, group_name: str, + group_metadata: dict[str, Any]) -> Group: + """Save a group to db""" + the_group = Group(uuid4(), group_name, group_metadata) + cursor.execute( + ("INSERT INTO groups " + "VALUES(:group_id, :group_name, :group_metadata) " + "ON CONFLICT (group_id) DO UPDATE SET " + "group_name=:group_name, group_metadata=:group_metadata"), + {"group_id": str(the_group.group_id), "group_name": the_group.group_name, + "group_metadata": json.dumps(the_group.group_metadata)}) + return the_group + +def add_user_to_group(cursor: db.DbCursor, the_group: Group, user: User): + """Add `user` to `the_group` as a member.""" + cursor.execute( + ("INSERT INTO group_users VALUES (:group_id, :user_id) " + "ON CONFLICT (group_id, user_id) DO NOTHING"), + {"group_id": str(the_group.group_id), "user_id": str(user.user_id)}) + +def group_users(conn: db.DbConnection, group_id: UUID) -> Iterable[User]: + """Retrieve all users that are members of group with id `group_id`.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT u.* FROM group_users AS gu INNER JOIN users AS u " + "ON gu.user_id = u.user_id WHERE gu.group_id=:group_id", + {"group_id": str(group_id)}) + results = cursor.fetchall() + + return (User(UUID(row["user_id"]), row["email"], row["name"]) + for row in results) diff --git a/gn3/auth/authorisation/groups/views.py b/gn3/auth/authorisation/groups/views.py new file mode 100644 index 0000000..02b3162 --- /dev/null +++ b/gn3/auth/authorisation/groups/views.py @@ -0,0 +1,54 @@ +"""The views/routes for the `gn3.auth.authorisation.groups` package.""" +import uuid + +from flask import request, jsonify, Response, Blueprint, current_app + +from gn3.auth import db +from gn3.auth.dictify import dictify + +from .models import ( + all_groups, GroupCreationError, group_users as _group_users, + create_group as _create_group) + +from ...authentication.oauth2.resource_server import require_oauth + +groups = Blueprint("groups", __name__) + +@groups.route("/list", methods=["GET"]) +@require_oauth("profile group") +def list_groups(): + """Return the list of groups that exist.""" + with db.connection(current_app.config["AUTH_DB"]) as conn: + the_groups = all_groups(conn) + + return jsonify(the_groups.maybe( + [], lambda grps: [dictify(grp) for grp in grps])) + +@groups.route("/create", methods=["POST"]) +@require_oauth("profile group") +def create_group(): + """Create a new group.""" + with require_oauth.acquire("profile group") as the_token: + group_name=request.form.get("group_name", "").strip() + if not bool(group_name): + raise GroupCreationError("Could not create the group.") + + db_uri = current_app.config["AUTH_DB"] + with db.connection(db_uri) as conn: + user = the_token.user + new_group = _create_group( + conn, group_name, user, request.form.get("group_description")) + return jsonify({ + **dictify(new_group), "group_leader": dictify(user) + }) + +@groups.route("/members/", methods=["GET"]) +@require_oauth("profile group") +def group_members(group_id: uuid.UUID) -> Response: + """Retrieve all the members of a group.""" + with require_oauth.acquire("profile group") as the_token:# pylint: disable=[unused-variable] + db_uri = current_app.config["AUTH_DB"] + ## Check that user has appropriate privileges and remove the pylint disable above + with db.connection(db_uri) as conn: + return jsonify(tuple( + dictify(user) for user in _group_users(conn, group_id))) diff --git a/gn3/auth/authorisation/resources.py b/gn3/auth/authorisation/resources.py deleted file mode 100644 index fe096e8..0000000 --- a/gn3/auth/authorisation/resources.py +++ /dev/null @@ -1,141 +0,0 @@ -"""Handle the management of resources.""" -import json -from uuid import UUID, uuid4 -from typing import Any, Dict, Sequence, NamedTuple - -from gn3.auth import db -from gn3.auth.dictify import dictify -from gn3.auth.authentication.users import User - -from .checks import authorised_p -from .errors import AuthorisationError -from .groups import Group, user_group, is_group_leader, authenticated_user_group - -class MissingGroupError(AuthorisationError): - """Raised for any resource operation without a group.""" - -class ResourceCategory(NamedTuple): - """Class representing a resource category.""" - resource_category_id: UUID - resource_category_key: str - resource_category_description: str - - def dictify(self) -> dict[str, Any]: - """Return a dict representation of `ResourceCategory` objects.""" - return { - "resource_category_id": self.resource_category_id, - "resource_category_key": self.resource_category_key, - "resource_category_description": self.resource_category_description - } - -class Resource(NamedTuple): - """Class representing a resource.""" - group: Group - resource_id: UUID - resource_name: str - resource_category: ResourceCategory - public: bool - - def dictify(self) -> dict[str, Any]: - """Return a dict representation of `Resource` objects.""" - return { - "group": dictify(self.group), "resource_id": self.resource_id, - "resource_name": self.resource_name, - "resource_category": dictify(self.resource_category), - "public": self.public - } - -@authorised_p(("group:resource:create-resource",), - error_message="Could not create resource") -def create_resource( - conn: db.DbConnection, resource_name: str, - resource_category: ResourceCategory) -> Resource: - """Create a resource item.""" - with db.cursor(conn) as cursor: - group = authenticated_user_group(conn).maybe(False, lambda val: val)# type: ignore[misc] - if not group: - raise MissingGroupError( - "User with no group cannot create a resource.") - resource = Resource(group, uuid4(), resource_name, resource_category, False) - cursor.execute( - "INSERT INTO resources VALUES (?, ?, ?, ?, ?)", - (str(resource.group.group_id), str(resource.resource_id), - resource_name, - str(resource.resource_category.resource_category_id), - 1 if resource.public else 0)) - - return resource - -def resource_categories(conn: db.DbConnection) -> Sequence[ResourceCategory]: - """Retrieve all available resource categories""" - with db.cursor(conn) as cursor: - cursor.execute("SELECT * FROM resource_categories") - return tuple( - ResourceCategory(UUID(row[0]), row[1], row[2]) - for row in cursor.fetchall()) - return tuple() - -def public_resources(conn: db.DbConnection) -> Sequence[Resource]: - """List all resources marked as public""" - categories = { - str(cat.resource_category_id): cat for cat in resource_categories(conn) - } - with db.cursor(conn) as cursor: - cursor.execute("SELECT * FROM resources WHERE public=1") - results = cursor.fetchall() - group_uuids = tuple(row[0] for row in results) - query = ("SELECT * FROM groups WHERE group_id IN " - f"({', '.join(['?'] * len(group_uuids))})") - cursor.execute(query, group_uuids) - groups = { - row[0]: Group( - UUID(row[0]), row[1], json.loads(row[2] or "{}")) - for row in cursor.fetchall() - } - return tuple( - Resource(groups[row[0]], UUID(row[1]), row[2], categories[row[3]], - bool(row[4])) - for row in results) - -def group_leader_resources( - cursor: db.DbCursor, user: User, group: Group, - res_categories: Dict[UUID, ResourceCategory]) -> Sequence[Resource]: - """Return all the resources available to the group leader""" - if is_group_leader(cursor, user, group): - cursor.execute("SELECT * FROM resources WHERE group_id=?", - (str(group.group_id),)) - return tuple( - Resource(group, UUID(row[1]), row[2], res_categories[UUID(row[3])], - bool(row[4])) - for row in cursor.fetchall()) - return tuple() - -def user_resources(conn: db.DbConnection, user: User) -> Sequence[Resource]: - """List the resources available to the user""" - categories = { # Repeated in `public_resources` function - cat.resource_category_id: cat for cat in resource_categories(conn) - } - with db.cursor(conn) as cursor: - def __all_resources__(group) -> Sequence[Resource]: - gl_resources = group_leader_resources(cursor, user, group, categories) - - cursor.execute( - ("SELECT resources.* FROM group_user_roles_on_resources " - "LEFT JOIN resources " - "ON group_user_roles_on_resources.resource_id=resources.resource_id " - "WHERE group_user_roles_on_resources.group_id = ? " - "AND group_user_roles_on_resources.user_id = ?"), - (str(group.group_id), str(user.user_id))) - private_res = tuple( - Resource(group, UUID(row[1]), row[2], categories[UUID(row[3])], - bool(row[4])) - for row in cursor.fetchall()) - return tuple({ - res.resource_id: res - for res in - (private_res + gl_resources + public_resources(conn))# type: ignore[operator] - }.values()) - - # Fix the typing here - return user_group(cursor, user).map(__all_resources__).maybe(# type: ignore[arg-type,misc] - public_resources(conn), lambda res: res)# type: ignore[arg-type,return-value] diff --git a/gn3/auth/authorisation/resources/__init__.py b/gn3/auth/authorisation/resources/__init__.py new file mode 100644 index 0000000..869ab60 --- /dev/null +++ b/gn3/auth/authorisation/resources/__init__.py @@ -0,0 +1,2 @@ +"""Initialise the `gn3.auth.authorisation.resources` package.""" +from .models import Resource, ResourceCategory diff --git a/gn3/auth/authorisation/resources/models.py b/gn3/auth/authorisation/resources/models.py new file mode 100644 index 0000000..1959362 --- /dev/null +++ b/gn3/auth/authorisation/resources/models.py @@ -0,0 +1,142 @@ +"""Handle the management of resources.""" +import json +from uuid import UUID, uuid4 +from typing import Any, Dict, Sequence, NamedTuple + +from gn3.auth import db +from gn3.auth.dictify import dictify +from gn3.auth.authentication.users import User + +from ..checks import authorised_p +from ..errors import AuthorisationError +from ..groups.models import ( + Group, user_group, is_group_leader, authenticated_user_group) + +class MissingGroupError(AuthorisationError): + """Raised for any resource operation without a group.""" + +class ResourceCategory(NamedTuple): + """Class representing a resource category.""" + resource_category_id: UUID + resource_category_key: str + resource_category_description: str + + def dictify(self) -> dict[str, Any]: + """Return a dict representation of `ResourceCategory` objects.""" + return { + "resource_category_id": self.resource_category_id, + "resource_category_key": self.resource_category_key, + "resource_category_description": self.resource_category_description + } + +class Resource(NamedTuple): + """Class representing a resource.""" + group: Group + resource_id: UUID + resource_name: str + resource_category: ResourceCategory + public: bool + + def dictify(self) -> dict[str, Any]: + """Return a dict representation of `Resource` objects.""" + return { + "group": dictify(self.group), "resource_id": self.resource_id, + "resource_name": self.resource_name, + "resource_category": dictify(self.resource_category), + "public": self.public + } + +@authorised_p(("group:resource:create-resource",), + error_message="Could not create resource") +def create_resource( + conn: db.DbConnection, resource_name: str, + resource_category: ResourceCategory) -> Resource: + """Create a resource item.""" + with db.cursor(conn) as cursor: + group = authenticated_user_group(conn).maybe(False, lambda val: val)# type: ignore[misc] + if not group: + raise MissingGroupError( + "User with no group cannot create a resource.") + resource = Resource(group, uuid4(), resource_name, resource_category, False) + cursor.execute( + "INSERT INTO resources VALUES (?, ?, ?, ?, ?)", + (str(resource.group.group_id), str(resource.resource_id), + resource_name, + str(resource.resource_category.resource_category_id), + 1 if resource.public else 0)) + + return resource + +def resource_categories(conn: db.DbConnection) -> Sequence[ResourceCategory]: + """Retrieve all available resource categories""" + with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM resource_categories") + return tuple( + ResourceCategory(UUID(row[0]), row[1], row[2]) + for row in cursor.fetchall()) + return tuple() + +def public_resources(conn: db.DbConnection) -> Sequence[Resource]: + """List all resources marked as public""" + categories = { + str(cat.resource_category_id): cat for cat in resource_categories(conn) + } + with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM resources WHERE public=1") + results = cursor.fetchall() + group_uuids = tuple(row[0] for row in results) + query = ("SELECT * FROM groups WHERE group_id IN " + f"({', '.join(['?'] * len(group_uuids))})") + cursor.execute(query, group_uuids) + groups = { + row[0]: Group( + UUID(row[0]), row[1], json.loads(row[2] or "{}")) + for row in cursor.fetchall() + } + return tuple( + Resource(groups[row[0]], UUID(row[1]), row[2], categories[row[3]], + bool(row[4])) + for row in results) + +def group_leader_resources( + cursor: db.DbCursor, user: User, group: Group, + res_categories: Dict[UUID, ResourceCategory]) -> Sequence[Resource]: + """Return all the resources available to the group leader""" + if is_group_leader(cursor, user, group): + cursor.execute("SELECT * FROM resources WHERE group_id=?", + (str(group.group_id),)) + return tuple( + Resource(group, UUID(row[1]), row[2], res_categories[UUID(row[3])], + bool(row[4])) + for row in cursor.fetchall()) + return tuple() + +def user_resources(conn: db.DbConnection, user: User) -> Sequence[Resource]: + """List the resources available to the user""" + categories = { # Repeated in `public_resources` function + cat.resource_category_id: cat for cat in resource_categories(conn) + } + with db.cursor(conn) as cursor: + def __all_resources__(group) -> Sequence[Resource]: + gl_resources = group_leader_resources(cursor, user, group, categories) + + cursor.execute( + ("SELECT resources.* FROM group_user_roles_on_resources " + "LEFT JOIN resources " + "ON group_user_roles_on_resources.resource_id=resources.resource_id " + "WHERE group_user_roles_on_resources.group_id = ? " + "AND group_user_roles_on_resources.user_id = ?"), + (str(group.group_id), str(user.user_id))) + private_res = tuple( + Resource(group, UUID(row[1]), row[2], categories[UUID(row[3])], + bool(row[4])) + for row in cursor.fetchall()) + return tuple({ + res.resource_id: res + for res in + (private_res + gl_resources + public_resources(conn))# type: ignore[operator] + }.values()) + + # Fix the typing here + return user_group(cursor, user).map(__all_resources__).maybe(# type: ignore[arg-type,misc] + public_resources(conn), lambda res: res)# type: ignore[arg-type,return-value] diff --git a/gn3/auth/authorisation/resources/views.py b/gn3/auth/authorisation/resources/views.py new file mode 100644 index 0000000..009cae6 --- /dev/null +++ b/gn3/auth/authorisation/resources/views.py @@ -0,0 +1,4 @@ +"""The views/routes for the resources package""" +from flask import Blueprint + +resources = Blueprint("resources", __name__) diff --git a/gn3/auth/authorisation/roles.py b/gn3/auth/authorisation/roles.py deleted file mode 100644 index e9f3fb0..0000000 --- a/gn3/auth/authorisation/roles.py +++ /dev/null @@ -1,151 +0,0 @@ -"""Handle management of roles""" -from uuid import UUID, uuid4 -from functools import reduce -from typing import Any, Sequence, Iterable, NamedTuple - -from pymonad.maybe import Just, Maybe, Nothing -from pymonad.either import Left, Right, Either - -from gn3.auth import db -from gn3.auth.dictify import dictify -from gn3.auth.authentication.users import User - -from .checks import authorised_p -from .privileges import Privilege -from .errors import NotFoundError - -class Role(NamedTuple): - """Class representing a role: creates immutable objects.""" - role_id: UUID - role_name: str - privileges: Iterable[Privilege] - - def dictify(self) -> dict[str, Any]: - """Return a dict representation of `Role` objects.""" - return { - "role_id": self.role_id, "role_name": self.role_name, - "privileges": tuple(dictify(priv) for priv in self.privileges) - } - -@authorised_p(("group:role:create-role",), error_message="Could not create role") -def create_role( - cursor: db.DbCursor, role_name: str, - privileges: Iterable[Privilege]) -> Role: - """ - Create a new generic role. - - PARAMS: - * cursor: A database cursor object - This function could be used as part of - a transaction, hence the use of a cursor rather than a connection - object. - * role_name: The name of the role - * privileges: A 'list' of privileges to assign the new role - - RETURNS: An immutable `gn3.auth.authorisation.roles.Role` object - """ - role = Role(uuid4(), role_name, privileges) - - cursor.execute( - "INSERT INTO roles(role_id, role_name) VALUES (?, ?)", - (str(role.role_id), role.role_name)) - cursor.executemany( - "INSERT INTO role_privileges(role_id, privilege_id) VALUES (?, ?)", - tuple((str(role.role_id), str(priv.privilege_id)) - for priv in privileges)) - - return role - -def __organise_privileges__(roles_dict, privilege_row): - """Organise the privileges into their roles.""" - role_id_str = privilege_row["role_id"] - if role_id_str in roles_dict: - return { - **roles_dict, - role_id_str: Role( - UUID(role_id_str), - privilege_row["role_name"], - roles_dict[role_id_str].privileges + ( - Privilege(privilege_row["privilege_id"], - privilege_row["privilege_description"]),)) - } - - return { - **roles_dict, - role_id_str: Role( - UUID(role_id_str), - privilege_row["role_name"], - (Privilege(privilege_row["privilege_id"], - privilege_row["privilege_description"]),)) - } - -def user_roles(conn: db.DbConnection, user: User) -> Maybe[Sequence[Role]]: - """Retrieve non-resource roles assigned to the user.""" - with db.cursor(conn) as cursor: - cursor.execute( - "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r " - "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp " - "ON r.role_id=rp.role_id INNER JOIN privileges AS p " - "ON rp.privilege_id=p.privilege_id WHERE ur.user_id=?", - (str(user.user_id),)) - - results = cursor.fetchall() - if results: - return Just(tuple( - reduce(__organise_privileges__, results, {}).values())) - return Nothing - -def user_role(conn: db.DbConnection, user: User, role_id: UUID) -> Either: - """Retrieve a specific non-resource role assigned to the user.""" - with db.cursor(conn) as cursor: - cursor.execute( - "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r " - "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp " - "ON r.role_id=rp.role_id INNER JOIN privileges AS p " - "ON rp.privilege_id=p.privilege_id " - "WHERE ur.user_id=? AND ur.role_id=?", - (str(user.user_id), str(role_id))) - - results = cursor.fetchall() - if results: - return Right(tuple( - reduce(__organise_privileges__, results, {}).values())[0]) - return Left(NotFoundError( - f"Could not find role with id '{role_id}'",)) - -def assign_default_roles(cursor: db.DbCursor, user: User): - """Assign `user` some default roles.""" - cursor.execute( - 'SELECT role_id FROM roles WHERE role_name IN ' - '("group-creator")') - role_ids = cursor.fetchall() - str_user_id = str(user.user_id) - params = tuple( - {"user_id": str_user_id, "role_id": row["role_id"]} for row in role_ids) - cursor.executemany( - ("INSERT INTO user_roles VALUES (:user_id, :role_id)"), - params) - -def revoke_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str): - """Revoke a role from `user` by the role's name""" - cursor.execute( - "SELECT role_id FROM roles WHERE role_name=:role_name", - {"role_name": role_name}) - role = cursor.fetchone() - if role: - cursor.execute( - ("DELETE FROM user_roles " - "WHERE user_id=:user_id AND role_id=:role_id"), - {"user_id": str(user.user_id), "role_id": role["role_id"]}) - -def assign_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str): - """Revoke a role from `user` by the role's name""" - cursor.execute( - "SELECT role_id FROM roles WHERE role_name=:role_name", - {"role_name": role_name}) - role = cursor.fetchone() - - if role: - cursor.execute( - ("INSERT INTO user_roles VALUES(:user_id, :role_id) " - "ON CONFLICT DO NOTHING"), - {"user_id": str(user.user_id), "role_id": role["role_id"]}) diff --git a/gn3/auth/authorisation/roles/__init__.py b/gn3/auth/authorisation/roles/__init__.py new file mode 100644 index 0000000..293a12f --- /dev/null +++ b/gn3/auth/authorisation/roles/__init__.py @@ -0,0 +1,3 @@ +"""Initialise the `gn3.auth.authorisation.roles` package""" + +from .models import Role diff --git a/gn3/auth/authorisation/roles/models.py b/gn3/auth/authorisation/roles/models.py new file mode 100644 index 0000000..b1aac75 --- /dev/null +++ b/gn3/auth/authorisation/roles/models.py @@ -0,0 +1,151 @@ +"""Handle management of roles""" +from uuid import UUID, uuid4 +from functools import reduce +from typing import Any, Sequence, Iterable, NamedTuple + +from pymonad.maybe import Just, Maybe, Nothing +from pymonad.either import Left, Right, Either + +from gn3.auth import db +from gn3.auth.dictify import dictify +from gn3.auth.authentication.users import User + +from ..checks import authorised_p +from ..privileges import Privilege +from ..errors import NotFoundError + +class Role(NamedTuple): + """Class representing a role: creates immutable objects.""" + role_id: UUID + role_name: str + privileges: Iterable[Privilege] + + def dictify(self) -> dict[str, Any]: + """Return a dict representation of `Role` objects.""" + return { + "role_id": self.role_id, "role_name": self.role_name, + "privileges": tuple(dictify(priv) for priv in self.privileges) + } + +@authorised_p(("group:role:create-role",), error_message="Could not create role") +def create_role( + cursor: db.DbCursor, role_name: str, + privileges: Iterable[Privilege]) -> Role: + """ + Create a new generic role. + + PARAMS: + * cursor: A database cursor object - This function could be used as part of + a transaction, hence the use of a cursor rather than a connection + object. + * role_name: The name of the role + * privileges: A 'list' of privileges to assign the new role + + RETURNS: An immutable `gn3.auth.authorisation.roles.Role` object + """ + role = Role(uuid4(), role_name, privileges) + + cursor.execute( + "INSERT INTO roles(role_id, role_name) VALUES (?, ?)", + (str(role.role_id), role.role_name)) + cursor.executemany( + "INSERT INTO role_privileges(role_id, privilege_id) VALUES (?, ?)", + tuple((str(role.role_id), str(priv.privilege_id)) + for priv in privileges)) + + return role + +def __organise_privileges__(roles_dict, privilege_row): + """Organise the privileges into their roles.""" + role_id_str = privilege_row["role_id"] + if role_id_str in roles_dict: + return { + **roles_dict, + role_id_str: Role( + UUID(role_id_str), + privilege_row["role_name"], + roles_dict[role_id_str].privileges + ( + Privilege(privilege_row["privilege_id"], + privilege_row["privilege_description"]),)) + } + + return { + **roles_dict, + role_id_str: Role( + UUID(role_id_str), + privilege_row["role_name"], + (Privilege(privilege_row["privilege_id"], + privilege_row["privilege_description"]),)) + } + +def user_roles(conn: db.DbConnection, user: User) -> Maybe[Sequence[Role]]: + """Retrieve non-resource roles assigned to the user.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r " + "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp " + "ON r.role_id=rp.role_id INNER JOIN privileges AS p " + "ON rp.privilege_id=p.privilege_id WHERE ur.user_id=?", + (str(user.user_id),)) + + results = cursor.fetchall() + if results: + return Just(tuple( + reduce(__organise_privileges__, results, {}).values())) + return Nothing + +def user_role(conn: db.DbConnection, user: User, role_id: UUID) -> Either: + """Retrieve a specific non-resource role assigned to the user.""" + with db.cursor(conn) as cursor: + cursor.execute( + "SELECT r.*, p.* FROM user_roles AS ur INNER JOIN roles AS r " + "ON ur.role_id=r.role_id INNER JOIN role_privileges AS rp " + "ON r.role_id=rp.role_id INNER JOIN privileges AS p " + "ON rp.privilege_id=p.privilege_id " + "WHERE ur.user_id=? AND ur.role_id=?", + (str(user.user_id), str(role_id))) + + results = cursor.fetchall() + if results: + return Right(tuple( + reduce(__organise_privileges__, results, {}).values())[0]) + return Left(NotFoundError( + f"Could not find role with id '{role_id}'",)) + +def assign_default_roles(cursor: db.DbCursor, user: User): + """Assign `user` some default roles.""" + cursor.execute( + 'SELECT role_id FROM roles WHERE role_name IN ' + '("group-creator")') + role_ids = cursor.fetchall() + str_user_id = str(user.user_id) + params = tuple( + {"user_id": str_user_id, "role_id": row["role_id"]} for row in role_ids) + cursor.executemany( + ("INSERT INTO user_roles VALUES (:user_id, :role_id)"), + params) + +def revoke_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str): + """Revoke a role from `user` by the role's name""" + cursor.execute( + "SELECT role_id FROM roles WHERE role_name=:role_name", + {"role_name": role_name}) + role = cursor.fetchone() + if role: + cursor.execute( + ("DELETE FROM user_roles " + "WHERE user_id=:user_id AND role_id=:role_id"), + {"user_id": str(user.user_id), "role_id": role["role_id"]}) + +def assign_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str): + """Revoke a role from `user` by the role's name""" + cursor.execute( + "SELECT role_id FROM roles WHERE role_name=:role_name", + {"role_name": role_name}) + role = cursor.fetchone() + + if role: + cursor.execute( + ("INSERT INTO user_roles VALUES(:user_id, :role_id) " + "ON CONFLICT DO NOTHING"), + {"user_id": str(user.user_id), "role_id": role["role_id"]}) diff --git a/gn3/auth/authorisation/roles/views.py b/gn3/auth/authorisation/roles/views.py new file mode 100644 index 0000000..975fb19 --- /dev/null +++ b/gn3/auth/authorisation/roles/views.py @@ -0,0 +1,26 @@ +"""The views/routes for the `gn3.auth.authorisation.roles` package.""" +import uuid + +from flask import jsonify, Response, Blueprint, current_app + +from gn3.auth import db +from gn3.auth.dictify import dictify + +from .models import user_role + +from ...authentication.oauth2.resource_server import require_oauth + +roles = Blueprint("roles", __name__) + +@roles.route("/view/", methods=["GET"]) +@require_oauth("role") +def view_role(role_id: uuid.UUID) -> Response: + """Retrieve a user role with id `role_id`""" + def __error__(exc: Exception): + raise exc + with require_oauth.acquire("profile role") as the_token: + db_uri = current_app.config["AUTH_DB"] + with db.connection(db_uri) as conn: + the_role = user_role(conn, the_token.user, role_id) + return the_role.either( + __error__, lambda a_role: jsonify(dictify(a_role))) diff --git a/gn3/auth/authorisation/users/__init__.py b/gn3/auth/authorisation/users/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gn3/auth/authorisation/users/views.py b/gn3/auth/authorisation/users/views.py new file mode 100644 index 0000000..460f81c --- /dev/null +++ b/gn3/auth/authorisation/users/views.py @@ -0,0 +1,149 @@ +"""User authorisation endpoints.""" +import traceback +from typing import Tuple, Optional + +import sqlite3 +from flask import request, jsonify, Response, Blueprint, current_app + +from gn3.auth import db +from gn3.auth.dictify import dictify + +from ..groups.models import user_group as _user_group +from ..errors import NotFoundError, UserRegistrationError +from ..resources.models import user_resources as _user_resources +from ..roles.models import assign_default_roles, user_roles as _user_roles + +from ...authentication.oauth2.resource_server import require_oauth +from ...authentication.users import save_user, set_user_password +from ...authentication.oauth2.models.oauth2token import token_by_access_token + +users = Blueprint("users", __name__) + +@users.route("/", methods=["GET"]) +@require_oauth("profile") +def user_details() -> Response: + """Return user's details.""" + with require_oauth.acquire("profile") as the_token: + user = the_token.user + user_dets = { + "user_id": user.user_id, "email": user.email, "name": user.name, + "group": False + } + with db.connection(current_app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: + the_group = _user_group(cursor, user).maybe(# type: ignore[misc] + False, lambda grp: grp)# type: ignore[arg-type] + return jsonify({ + **user_dets, + "group": dictify(the_group) if the_group else False + }) + +@users.route("/roles", methods=["GET"]) +@require_oauth("role") +def user_roles() -> Response: + """Return the non-resource roles assigned to the user.""" + with require_oauth.acquire("role") as token: + with db.connection(current_app.config["AUTH_DB"]) as conn: + return jsonify(tuple( + dictify(role) for role in + _user_roles(conn, token.user).maybe(# type: ignore[misc] + tuple(), lambda roles: roles))) + +def __email_valid__(email: str) -> Tuple[bool, Optional[str]]: + """Validate the email address.""" + if email == "": + return False, "Empty email address" + + ## Check that the address is a valid email address + ## Review use of `email-validator` or `pyIsEmail` python packages for + ## validating the emails, if it turns out this is important. + + ## Success + return True, None + +def __password_valid__(password, confirm_password) -> Tuple[bool, Optional[str]]: + if password == "" or confirm_password == "": + return False, "Empty password value" + + if password != confirm_password: + return False, "Mismatched password values" + + return True, None + +def __user_name_valid__(name: str) -> Tuple[bool, Optional[str]]: + if name == "": + return False, "User's name not provided." + + return True, None + +def __assert_not_logged_in__(conn: db.DbConnection): + bearer = request.headers.get('Authorization') + if bearer: + token = token_by_access_token(conn, bearer.split(None)[1]).maybe(# type: ignore[misc] + False, lambda tok: tok) + if token: + raise UserRegistrationError( + "Cannot register user while authenticated") + +@users.route("/register", methods=["POST"]) +def register_user() -> Response: + """Register a user.""" + with db.connection(current_app.config["AUTH_DB"]) as conn: + __assert_not_logged_in__(conn) + + form = request.form + email = form.get("email", "").strip() + password = form.get("password", "").strip() + user_name = form.get("user_name", "").strip() + errors = tuple( + error for valid,error in + [__email_valid__(email), + __password_valid__( + password, form.get("confirm_password", "").strip()), + __user_name_valid__(user_name)] + if not valid) + if len(errors) > 0: + raise UserRegistrationError(*errors) + + try: + with db.cursor(conn) as cursor: + user, _hashed_password = set_user_password( + cursor, save_user(cursor, email, user_name), password) + assign_default_roles(cursor, user) + return jsonify( + { + "user_id": user.user_id, + "email": user.email, + "name": user.name + }) + except sqlite3.IntegrityError as sq3ie: + current_app.logger.debug(traceback.format_exc()) + raise UserRegistrationError( + "A user with that email already exists") from sq3ie + + raise Exception( + "unknown_error", "The system experienced an unexpected error.") + +@users.route("/group", methods=["GET"]) +@require_oauth("profile group") +def user_group() -> Response: + """Retrieve the group in which the user is a member.""" + with require_oauth.acquire("profile group") as the_token: + db_uri = current_app.config["AUTH_DB"] + with db.connection(db_uri) as conn, db.cursor(conn) as cursor: + group = _user_group(cursor, the_token.user).maybe(# type: ignore[misc] + False, lambda grp: grp)# type: ignore[arg-type] + + if group: + return jsonify(dictify(group)) + raise NotFoundError("User is not a member of any group.") + +@users.route("/resources") +@require_oauth("profile resource") +def user_resources() -> Response: + """Retrieve the resources a user has access to.""" + with require_oauth.acquire("profile resource") as the_token: + db_uri = current_app.config["AUTH_DB"] + with db.connection(db_uri) as conn: + return jsonify([ + dictify(resource) for resource in + _user_resources(conn, the_token.user)]) diff --git a/gn3/auth/authorisation/views.py b/gn3/auth/authorisation/views.py deleted file mode 100644 index 03c4b03..0000000 --- a/gn3/auth/authorisation/views.py +++ /dev/null @@ -1,197 +0,0 @@ -"""Endpoints for the authorisation stuff.""" -import uuid -import traceback -from typing import Tuple, Optional - -import sqlite3 -from flask import request, jsonify, Response, current_app - -from gn3.auth import db -from gn3.auth.dictify import dictify -from gn3.auth.blueprint import oauth2 - -from .errors import NotFoundError, UserRegistrationError -from .resources import user_resources as _user_resources -from .roles import user_role, assign_default_roles, user_roles as _user_roles -from .groups import ( - all_groups, GroupCreationError, user_group as _user_group, - group_users as _group_users, create_group as _create_group) - -from ..authentication.oauth2.resource_server import require_oauth -from ..authentication.users import save_user, set_user_password -from ..authentication.oauth2.models.oauth2token import token_by_access_token - -@oauth2.route("/user", methods=["GET"]) -@require_oauth("profile") -def user_details(): - """Return user's details.""" - with require_oauth.acquire("profile") as the_token: - user = the_token.user - user_dets = { - "user_id": user.user_id, "email": user.email, "name": user.name, - "group": False - } - with db.connection(current_app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: - return jsonify(_user_group(cursor, user).maybe( - user_dets, - lambda group: {**user_dets, "group": dictify(group)})) - -@oauth2.route("/user-roles", methods=["GET"]) -@require_oauth("role") -def user_roles(): - """Return the non-resource roles assigned to the user.""" - with require_oauth.acquire("role") as token: - with db.connection(current_app.config["AUTH_DB"]) as conn: - return jsonify(_user_roles(conn, token.user).maybe( - tuple(), lambda roles: tuple(dictify(role) for role in roles))) - -def __email_valid__(email: str) -> Tuple[bool, Optional[str]]: - """Validate the email address.""" - if email == "": - return False, "Empty email address" - - ## Check that the address is a valid email address - ## Review use of `email-validator` or `pyIsEmail` python packages for - ## validating the emails, if it turns out this is important. - - ## Success - return True, None - -def __password_valid__(password, confirm_password) -> Tuple[bool, Optional[str]]: - if password == "" or confirm_password == "": - return False, "Empty password value" - - if password != confirm_password: - return False, "Mismatched password values" - - return True, None - -def __user_name_valid__(name: str) -> Tuple[bool, Optional[str]]: - if name == "": - return False, "User's name not provided." - - return True, None - -def __assert_not_logged_in__(conn: db.DbConnection): - bearer = request.headers.get('Authorization') - if bearer: - token = token_by_access_token(conn, bearer.split(None)[1]).maybe(# type: ignore[misc] - False, lambda tok: tok) - if token: - raise UserRegistrationError( - "Cannot register user while authenticated") - -@oauth2.route("/register-user", methods=["POST"]) -def register_user(): - """Register a user.""" - with db.connection(current_app.config["AUTH_DB"]) as conn: - __assert_not_logged_in__(conn) - - form = request.form - email = form.get("email", "").strip() - password = form.get("password", "").strip() - user_name = form.get("user_name", "").strip() - errors = tuple( - error for valid,error in - [__email_valid__(email), - __password_valid__( - password, form.get("confirm_password", "").strip()), - __user_name_valid__(user_name)] - if not valid) - if len(errors) > 0: - raise UserRegistrationError(*errors) - - try: - with db.cursor(conn) as cursor: - user, _hashed_password = set_user_password( - cursor, save_user(cursor, email, user_name), password) - assign_default_roles(cursor, user) - return jsonify( - { - "user_id": user.user_id, - "email": user.email, - "name": user.name - }), 200 - except sqlite3.IntegrityError as sq3ie: - current_app.logger.debug(traceback.format_exc()) - raise UserRegistrationError( - "A user with that email already exists") from sq3ie - - raise Exception( - "unknown_error", "The system experienced an unexpected error.") - -@oauth2.route("/groups", methods=["GET"]) -@require_oauth("profile group") -def groups(): - """Return the list of groups that exist.""" - with db.connection(current_app.config["AUTH_DB"]) as conn: - the_groups = all_groups(conn) - - return jsonify(the_groups.maybe( - [], lambda grps: [dictify(grp) for grp in grps])) - -@oauth2.route("/create-group", methods=["POST"]) -@require_oauth("profile group") -def create_group(): - """Create a new group.""" - with require_oauth.acquire("profile group") as the_token: - group_name=request.form.get("group_name", "").strip() - if not bool(group_name): - raise GroupCreationError("Could not create the group.") - - db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn: - user = the_token.user - new_group = _create_group( - conn, group_name, user, request.form.get("group_description")) - return jsonify({ - **dictify(new_group), "group_leader": dictify(user) - }) - -@oauth2.route("/role/", methods=["GET"]) -@require_oauth("role") -def role(role_id: uuid.UUID) -> Response: - """Retrieve a user role with id `role_id`""" - def __error__(exc: Exception): - raise exc - with require_oauth.acquire("profile role") as the_token: - db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn: - the_role = user_role(conn, the_token.user, role_id) - return the_role.either( - __error__, lambda a_role: jsonify(dictify(a_role))) - -@oauth2.route("/user-group", methods=["GET"]) -@require_oauth("profile group") -def user_group(): - """Retrieve the group in which the user is a member.""" - with require_oauth.acquire("profile group") as the_token: - db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn, db.cursor(conn) as cursor: - group = _user_group(cursor, the_token.user).maybe( - False, lambda grp: grp) - - if group: - return jsonify(dictify(group)) - raise NotFoundError("User is not a member of any group.") - -@oauth2.route("/user-resources") -@require_oauth("profile resource") -def user_resources(): - """Retrieve the resources a user has access to.""" - with require_oauth.acquire("profile resource") as the_token: - db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn: - return jsonify([ - dictify(resource) for resource in - _user_resources(conn, the_token.user)]) - -@oauth2.route("/group-users/", methods=["GET"]) -@require_oauth("profile group") -def group_users(group_id: uuid.UUID) -> Response: - """Retrieve all the members of a group.""" - with require_oauth.acquire("profile group") as the_token: - db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn: - return jsonify(tuple( - dictify(user) for user in _group_users(conn, group_id))) -- cgit v1.2.3