diff options
-rw-r--r-- | gn3/auth/authorisation/groups.py | 17 | ||||
-rw-r--r-- | gn3/auth/authorisation/resources.py | 8 | ||||
-rw-r--r-- | gn3/auth/authorisation/roles.py | 2 | ||||
-rw-r--r-- | gn3/auth/authorisation/views.py | 37 |
4 files changed, 26 insertions, 38 deletions
diff --git a/gn3/auth/authorisation/groups.py b/gn3/auth/authorisation/groups.py index 201ed4d..c301ea4 100644 --- a/gn3/auth/authorisation/groups.py +++ b/gn3/auth/authorisation/groups.py @@ -4,7 +4,6 @@ from uuid import UUID, uuid4 from typing import Any, Sequence, Iterable, Optional, NamedTuple from flask import g -from pymonad.either import Left, Right, Either from pymonad.maybe import Just, Maybe, Nothing from gn3.auth import db @@ -14,7 +13,7 @@ from gn3.auth.authentication.checks import authenticated_p from .checks import authorised_p from .privileges import Privilege -from .errors import NotFoundError, AuthorisationError +from .errors import AuthorisationError from .roles import ( Role, create_role, revoke_user_role_by_name, assign_user_role_by_name) @@ -141,7 +140,7 @@ def authenticated_user_group(conn) -> Maybe: return Nothing -def user_group(cursor: db.DbCursor, user: User) -> Either: +def user_group(cursor: db.DbCursor, user: User) -> Maybe[Group]: """Returns the given user's group""" cursor.execute( ("SELECT groups.group_id, groups.group_name, groups.group_metadata " @@ -157,19 +156,15 @@ def user_group(cursor: db.DbCursor, user: User) -> Either: raise MembershipError(user, groups) if len(groups) == 1: - return Right(groups[0]) + return Just(groups[0]) - return Left(NotFoundError("User is not in any group.")) + return Nothing def is_group_leader(cursor: db.DbCursor, user: User, group: Group): """Check whether the given `user` is the leader of `group`.""" - def __raise__(exc): - if type(exc) == NotFoundError: - return False - raise exc - ugroup = user_group(cursor, user).either( - __raise__, lambda val: val) # type: ignore[arg-type, misc] + ugroup = user_group(cursor, user).maybe( + False, lambda val: val) # type: ignore[arg-type, misc] if not group: # User cannot be a group leader if not a member of ANY group return False diff --git a/gn3/auth/authorisation/resources.py b/gn3/auth/authorisation/resources.py index c9cd392..1e37d7a 100644 --- a/gn3/auth/authorisation/resources.py +++ b/gn3/auth/authorisation/resources.py @@ -136,10 +136,6 @@ def user_resources(conn: db.DbConnection, user: User) -> Sequence[Resource]: (private_res + gl_resources + public_resources(conn))# type: ignore[operator] }.values()) - def __handle_error__(exc): - if type(exc) == NotFoundError: - return public_resources(conn) - raise exc # Fix the typing here - return user_group(cursor, user).map(__all_resources__).either(# type: ignore[arg-type,misc] - __handle_error__, lambda res: res)# type: ignore[arg-type,return-value] + return user_group(cursor, user).map(__all_resources__).maybe(# type: ignore[arg-type,misc] + public_resources(conn), lambda res: res)# type: ignore[arg-type,return-value] diff --git a/gn3/auth/authorisation/roles.py b/gn3/auth/authorisation/roles.py index 23b74cc..f3b2f90 100644 --- a/gn3/auth/authorisation/roles.py +++ b/gn3/auth/authorisation/roles.py @@ -13,7 +13,7 @@ from gn3.auth.authentication.checks import authenticated_p from .checks import authorised_p from .privileges import Privilege -from .errors import NotFoundError, AuthorisationError +from .errors import NotFoundError class Role(NamedTuple): """Class representing a role: creates immutable objects.""" diff --git a/gn3/auth/authorisation/views.py b/gn3/auth/authorisation/views.py index 65b1dc4..6cab0df 100644 --- a/gn3/auth/authorisation/views.py +++ b/gn3/auth/authorisation/views.py @@ -10,7 +10,7 @@ from gn3.auth import db from gn3.auth.dictify import dictify from gn3.auth.blueprint import oauth2 -from .errors import UserRegistrationError +from .errors import NotFoundError, UserRegistrationError from .resources import user_resources as _user_resources from .roles import user_role, assign_default_roles, user_roles as _user_roles from .groups import ( @@ -21,29 +21,20 @@ from ..authentication.oauth2.resource_server import require_oauth from ..authentication.users import save_user, set_user_password from ..authentication.oauth2.models.oauth2token import token_by_access_token -def __raise_error__(exc): - current_app.logger.error(exc) - raise exc - @oauth2.route("/user", methods=["GET"]) @require_oauth("profile") def user_details(): """Return user's details.""" - def __raise__(exc): - if type(exc) == NotFoundError: - return False - raise exc - with require_oauth.acquire("profile") as the_token: user = the_token.user + user_dets = { + "user_id": user.user_id, "email": user.email, "name": user.name, + "group": False + } with db.connection(current_app.config["AUTH_DB"]) as conn, db.cursor(conn) as cursor: - return _user_group(cursor, user).either( - __raise__, lambda group: jsonify({ - "user_id": user.user_id, - "email": user.email, - "name": user.name, - "group": dictify(group) - })) + return jsonify(_user_group(cursor, user).maybe( + user_dets, + lambda group: {**user_dets, "group": dictify(group)})) @oauth2.route("/user-roles", methods=["GET"]) @require_oauth("role") @@ -173,18 +164,24 @@ def role(role_id: uuid.UUID) -> Response: @oauth2.route("/user-group", methods=["GET"]) @require_oauth("group") def user_group(): + """Retrieve the group in which the user is a member.""" with require_oauth.acquire("profile group") as the_token: db_uri = current_app.config["AUTH_DB"] with db.connection(db_uri) as conn, db.cursor(conn) as cursor: - return _user_group(cursor, the_token.user).either( - __raise_error__, lambda grp: jsonify(dictify(grp))) + group = _user_group(cursor, the_token.user).maybe( + False, lambda grp: grp) + + if group: + return jsonify(dictify(group)) + raise NotFoundError("User is not a member of any group.") @oauth2.route("/user-resources") @require_oauth("profile resource") def user_resources(): + """Retrieve the resources a user has access to.""" with require_oauth.acquire("profile resource") as the_token: db_uri = current_app.config["AUTH_DB"] - with db.connection(db_uri) as conn, db.cursor(conn) as cursor: + with db.connection(db_uri) as conn: return jsonify([ dictify(resource) for resource in _user_resources(conn, the_token.user)]) |