diff options
author | Frederick Muriuki Muriithi | 2023-01-23 14:30:20 +0300 |
---|---|---|
committer | Frederick Muriuki Muriithi | 2023-01-23 14:30:20 +0300 |
commit | b9139c2356f75103bc5fd17f074f4ee0e74b64aa (patch) | |
tree | 06803f97ccea91ce5137d42f42e1abe33c38365c | |
parent | e92ceacccb4c8d32f28ed7d2530ddc6912a730d4 (diff) | |
download | genenetwork3-b9139c2356f75103bc5fd17f074f4ee0e74b64aa.tar.gz |
auth: create group: Fix group creation.
* gn3/auth/authorisation/checks.py: Enable passing user to authorisation
checking function. Raise error on authorisation failure for consistent error
handling.
* gn3/auth/authorisation/groups.py: Add user to group, updating the privileges
as appropriate.
* gn3/auth/authorisation/resources.py: Fix resources querying
* gn3/auth/authorisation/roles.py: Assign/revoke roles by name
* gn3/auth/authorisation/views.py: Create group
* migrations/auth/20221108_01_CoxYh-create-the-groups-table.py: Add
group_metadata field
* tests/unit/auth/fixtures/group_fixtures.py: fix tests
* tests/unit/auth/test_groups.py: fix tests
* tests/unit/auth/test_resources.py: fix tests
* tests/unit/auth/test_roles.py: fix tests
-rw-r--r-- | gn3/auth/authorisation/checks.py | 19 | ||||
-rw-r--r-- | gn3/auth/authorisation/groups.py | 97 | ||||
-rw-r--r-- | gn3/auth/authorisation/resources.py | 39 | ||||
-rw-r--r-- | gn3/auth/authorisation/roles.py | 25 | ||||
-rw-r--r-- | gn3/auth/authorisation/views.py | 30 | ||||
-rw-r--r-- | migrations/auth/20221108_01_CoxYh-create-the-groups-table.py | 3 | ||||
-rw-r--r-- | tests/unit/auth/fixtures/group_fixtures.py | 4 | ||||
-rw-r--r-- | tests/unit/auth/test_groups.py | 63 | ||||
-rw-r--r-- | tests/unit/auth/test_resources.py | 49 | ||||
-rw-r--r-- | tests/unit/auth/test_roles.py | 28 |
10 files changed, 271 insertions, 86 deletions
diff --git a/gn3/auth/authorisation/checks.py b/gn3/auth/authorisation/checks.py index d847c1e..8fef209 100644 --- a/gn3/auth/authorisation/checks.py +++ b/gn3/auth/authorisation/checks.py @@ -1,35 +1,38 @@ """Functions to check for authorisation.""" from functools import wraps -from typing import Callable +from typing import Callable, Optional from flask import g, current_app as app from gn3.auth import db + from . import privileges as auth_privs +from .errors import AuthorisationError + +from ..authentication.users import User def authorised_p( privileges: tuple[str], error_message: str = ( - "You lack authorisation to perform requested action")): + "You lack authorisation to perform requested action"), + user: Optional[User] = None): """Authorisation decorator.""" assert len(privileges) > 0, "You must provide at least one privilege" def __build_authoriser__(func: Callable): @wraps(func) def __authoriser__(*args, **kwargs): - if hasattr(g, "user") and g.user: + the_user = user or (hasattr(g, "user") and g.user) + if the_user: with db.connection(app.config["AUTH_DB"]) as conn: user_privileges = tuple( priv.privilege_id for priv in - auth_privs.user_privileges(conn, g.user)) + auth_privs.user_privileges(conn, the_user)) not_assigned = [ priv for priv in privileges if priv not in user_privileges] if len(not_assigned) == 0: return func(*args, **kwargs) - return { - "status": "error", - "message": f"Unauthorised: {error_message}" - } + raise AuthorisationError(error_message) return __authoriser__ return __build_authoriser__ diff --git a/gn3/auth/authorisation/groups.py b/gn3/auth/authorisation/groups.py index 3367b57..6d1b1a3 100644 --- a/gn3/auth/authorisation/groups.py +++ b/gn3/auth/authorisation/groups.py @@ -1,23 +1,31 @@ """Handle the management of resource/user groups.""" +import json from uuid import UUID, uuid4 -from typing import Sequence, Iterable, NamedTuple +from typing import Any, Sequence, Iterable, Optional, NamedTuple from flask import g from pymonad.maybe import Just, Maybe, Nothing from gn3.auth import db from gn3.auth.authentication.users import User +from gn3.auth.dictify import register_dictifier from gn3.auth.authentication.checks import authenticated_p from .checks import authorised_p from .privileges import Privilege -from .roles import Role, create_role from .errors import AuthorisationError +from .roles import ( + Role, create_role, revoke_user_role_by_name, assign_user_role_by_name) class Group(NamedTuple): """Class representing a group.""" group_id: UUID group_name: str + group_metadata: dict[str, Any] + +register_dictifier(Group, lambda grp: { + "group_id": grp.group_id, "group_name": grp.group_name, + "group_metadata": grp.group_metadata}) class GroupRole(NamedTuple): """Class representing a role tied/belonging to a group.""" @@ -25,6 +33,9 @@ class GroupRole(NamedTuple): group: Group role: Role +class GroupCreationError(AuthorisationError): + """Raised whenever a group creation fails""" + class MembershipError(AuthorisationError): """Raised when there is an error with a user's membership to a group.""" @@ -39,35 +50,43 @@ class MembershipError(AuthorisationError): def user_membership(conn: db.DbConnection, user: User) -> Sequence[Group]: """Returns all the groups that a member belongs to""" query = ( - "SELECT groups.group_id, group_name FROM group_users INNER JOIN groups " + "SELECT groups.group_id, group_name, groups.group_metadata " + "FROM group_users INNER JOIN groups " "ON group_users.group_id=groups.group_id " "WHERE group_users.user_id=?") with db.cursor(conn) as cursor: cursor.execute(query, (str(user.user_id),)) - groups = tuple(Group(row[0], row[1]) for row in cursor.fetchall()) + groups = tuple(Group(row[0], row[1], json.loads(row[2])) + for row in cursor.fetchall()) return groups @authenticated_p -@authorised_p(("system:group:create-group",), - error_message="Failed to create group.") -def create_group(conn: db.DbConnection, group_name: str, - group_leader: User) -> Group: - """Create a group""" - group = Group(uuid4(), group_name) +def create_group( + conn: db.DbConnection, group_name: str, group_leader: User, + group_description: Optional[str] = None) -> Group: + """Create a new group.""" user_groups = user_membership(conn, group_leader) if len(user_groups) > 0: raise MembershipError(group_leader, user_groups) - with db.cursor(conn) as cursor: - cursor.execute( - "INSERT INTO groups(group_id, group_name) VALUES (?, ?)", - (str(group.group_id), group_name)) - cursor.execute( - "INSERT INTO group_users VALUES (?, ?)", - (str(group.group_id), str(group_leader.user_id))) - - return group + @authorised_p( + ("system:group:create-group",), ( + "You do not have the appropriate privileges to enable you to " + "create a new group."), + group_leader) + def __create_group__(): + with db.cursor(conn) as cursor: + new_group = __save_group__( + cursor, group_name,( + {"group_description": group_description} + if group_description else {})) + add_user_to_group(cursor, new_group, group_leader) + revoke_user_role_by_name(cursor, group_leader, "group-creator") + assign_user_role_by_name(cursor, group_leader, "group-leader") + return new_group + + return __create_group__() @authenticated_p @authorised_p(("group:role:create-role",), @@ -96,11 +115,12 @@ def authenticated_user_group(conn) -> Maybe: user = g.user with db.cursor(conn) as cursor: cursor.execute( - ("SELECT groups.group_id, groups.group_name FROM group_users " + ("SELECT groups.* FROM group_users " "INNER JOIN groups ON group_users.group_id=groups.group_id " "WHERE group_users.user_id = ?"), (str(user.user_id),)) - groups = tuple(Group(UUID(row[0]), row[1]) for row in cursor.fetchall()) + groups = tuple(Group(UUID(row[0]), row[1], json.loads(row[2] or "{}")) + for row in cursor.fetchall()) if len(groups) > 1: raise MembershipError(user, groups) @@ -110,14 +130,17 @@ def authenticated_user_group(conn) -> Maybe: return Nothing -def user_group(cursor: db.DbCursor, user: User) -> Maybe: +def user_group(cursor: db.DbCursor, user: User) -> Maybe[Group]: """Returns the given user's group""" cursor.execute( - ("SELECT groups.group_id, groups.group_name FROM group_users " + ("SELECT groups.group_id, groups.group_name, groups.group_metadata " + "FROM group_users " "INNER JOIN groups ON group_users.group_id=groups.group_id " "WHERE group_users.user_id = ?"), (str(user.user_id),)) - groups = tuple(Group(UUID(row[0]), row[1]) for row in cursor.fetchall()) + groups = tuple( + Group(UUID(row[0]), row[1], json.loads(row[2] or "{}")) + for row in cursor.fetchall()) if len(groups) > 1: raise MembershipError(user, groups) @@ -129,7 +152,7 @@ def user_group(cursor: db.DbCursor, user: User) -> Maybe: def is_group_leader(cursor: db.DbCursor, user: User, group: Group): """Check whether the given `user` is the leader of `group`.""" - ugroup = user_group(cursor, user).maybe(False, lambda val: val) # type: ignore[misc] + ugroup = user_group(cursor, user).maybe(False, lambda val: val) # type: ignore[arg-type, misc] if not group: # User cannot be a group leader if not a member of ANY group return False @@ -153,6 +176,28 @@ def all_groups(conn: db.DbConnection) -> Maybe[Sequence[Group]]: res = cursor.fetchall() if res: return Just(tuple( - Group(row["group_id"], row["group_name"]) for row in res)) + Group(row["group_id"], row["group_name"], + json.loads(row["group_metadata"])) for row in res)) return Nothing + +def __save_group__( + cursor: db.DbCursor, group_name: str, + group_metadata: dict[str, Any]) -> Group: + """Save a group to db""" + the_group = Group(uuid4(), group_name, group_metadata) + cursor.execute( + ("INSERT INTO groups " + "VALUES(:group_id, :group_name, :group_metadata) " + "ON CONFLICT (group_id) DO UPDATE SET " + "group_name=:group_name, group_metadata=:group_metadata"), + {"group_id": str(the_group.group_id), "group_name": the_group.group_name, + "group_metadata": json.dumps(the_group.group_metadata)}) + return the_group + +def add_user_to_group(cursor: db.DbCursor, the_group: Group, user: User): + """Add `user` to `the_group` as a member.""" + cursor.execute( + ("INSERT INTO group_users VALUES (:group_id, :user_id) " + "ON CONFLICT (group_id, user_id) DO NOTHING"), + {"group_id": str(the_group.group_id), "user_id": str(user.user_id)}) diff --git a/gn3/auth/authorisation/resources.py b/gn3/auth/authorisation/resources.py index f27d61a..29e50bf 100644 --- a/gn3/auth/authorisation/resources.py +++ b/gn3/auth/authorisation/resources.py @@ -1,4 +1,5 @@ """Handle the management of resources.""" +import json from uuid import UUID, uuid4 from typing import Dict, Sequence, NamedTuple @@ -68,7 +69,11 @@ def public_resources(conn: db.DbConnection) -> Sequence[Resource]: query = ("SELECT * FROM groups WHERE group_id IN " f"({', '.join(['?'] * len(group_uuids))})") cursor.execute(query, group_uuids) - groups = {row[0]: Group(UUID(row[0]), row[1]) for row in cursor.fetchall()} + groups = { + row[0]: Group( + UUID(row[0]), row[1], json.loads(row[2] or "{}")) + for row in cursor.fetchall() + } return tuple( Resource(groups[row[0]], UUID(row[1]), row[2], categories[row[3]], bool(row[4])) @@ -93,22 +98,26 @@ def user_resources(conn: db.DbConnection, user: User) -> Sequence[Resource]: cat.resource_category_id: cat for cat in resource_categories(conn) } with db.cursor(conn) as cursor: - group = user_group(cursor, user).maybe(False, lambda val: val) # type: ignore[misc] - if not group: - return public_resources(conn) - - gl_resources = group_leader_resources(cursor, user, group, categories) + def __all_resources__(group) -> Sequence[Resource]: + gl_resources = group_leader_resources(cursor, user, group, categories) - cursor.execute( - ("SELECT resources.* FROM group_user_roles_on_resources " - "LEFT JOIN resources " - "ON group_user_roles_on_resources.resource_id=resources.resource_id " - "WHERE group_user_roles_on_resources.group_id = ? " - "AND group_user_roles_on_resources.user_id = ?"), - (str(group.group_id), str(user.user_id))) - private_res = tuple( + cursor.execute( + ("SELECT resources.* FROM group_user_roles_on_resources " + "LEFT JOIN resources " + "ON group_user_roles_on_resources.resource_id=resources.resource_id " + "WHERE group_user_roles_on_resources.group_id = ? " + "AND group_user_roles_on_resources.user_id = ?"), + (str(group.group_id), str(user.user_id))) + private_res = tuple( Resource(group, UUID(row[1]), row[2], categories[UUID(row[3])], bool(row[4])) for row in cursor.fetchall()) + return tuple({ + res.resource_id: res + for res in + (private_res + gl_resources + public_resources(conn))# type: ignore[operator] + }.values()) - return tuple(set(private_res).union(gl_resources).union(public_resources(conn))) + # Fix the typing here + return user_group(cursor, user).map(__all_resources__).maybe(# type: ignore[arg-type,misc] + public_resources(conn), lambda res: res)# type: ignore[arg-type,return-value] diff --git a/gn3/auth/authorisation/roles.py b/gn3/auth/authorisation/roles.py index e84eb71..cd59a36 100644 --- a/gn3/auth/authorisation/roles.py +++ b/gn3/auth/authorisation/roles.py @@ -98,3 +98,28 @@ def assign_default_roles(cursor: db.DbCursor, user: User): cursor.executemany( ("INSERT INTO user_roles VALUES (:user_id, :role_id)"), params) + +def revoke_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str): + """Revoke a role from `user` by the role's name""" + cursor.execute( + "SELECT role_id FROM roles WHERE role_name=:role_name", + {"role_name": role_name}) + role = cursor.fetchone() + if role: + cursor.execute( + ("DELETE FROM user_roles " + "WHERE user_id=:user_id AND role_id=:role_id"), + {"user_id": str(user.user_id), "role_id": role["role_id"]}) + +def assign_user_role_by_name(cursor: db.DbCursor, user: User, role_name: str): + """Revoke a role from `user` by the role's name""" + cursor.execute( + "SELECT role_id FROM roles WHERE role_name=:role_name", + {"role_name": role_name}) + role = cursor.fetchone() + + if role: + cursor.execute( + ("INSERT INTO user_roles VALUES(:user_id, :role_id) " + "ON CONFLICT DO NOTHING"), + {"user_id": str(user.user_id), "role_id": role["role_id"]}) diff --git a/gn3/auth/authorisation/views.py b/gn3/auth/authorisation/views.py index d2f7d47..11e43eb 100644 --- a/gn3/auth/authorisation/views.py +++ b/gn3/auth/authorisation/views.py @@ -6,11 +6,13 @@ import sqlite3 from flask import request, jsonify, current_app from gn3.auth import db +from gn3.auth.dictify import dictify from gn3.auth.blueprint import oauth2 from .errors import UserRegistrationError -from .groups import user_group, all_groups from .roles import assign_default_roles, user_roles as _user_roles +from .groups import ( + user_group, all_groups, GroupCreationError, create_group as _create_group) from ..authentication.oauth2.resource_server import require_oauth from ..authentication.users import save_user, set_user_password @@ -29,7 +31,7 @@ def user_details(): "user_id": user.user_id, "email": user.email, "name": user.name, - "group": group.maybe(False, lambda grp: grp) + "group": group.maybe(False, dictify) }) @oauth2.route("/user-roles", methods=["GET"]) @@ -117,11 +119,29 @@ def register_user(): "unknown_error", "The system experienced an unexpected error.") @oauth2.route("/groups", methods=["GET"]) -@require_oauth("profile") +@require_oauth("profile group") def groups(): """Return the list of groups that exist.""" with db.connection(current_app.config["AUTH_DB"]) as conn: the_groups = all_groups(conn) - print(f"The groups: {the_groups}") - return jsonify([]) + return jsonify(the_groups.maybe( + [], lambda grps: [dictify(grp) for grp in grps])) + +@oauth2.route("/create-group", methods=["POST"]) +@require_oauth("profile group") +def create_group(): + """Create a new group.""" + with require_oauth.acquire("profile group") as the_token: + group_name=request.form.get("group_name", "").strip() + if not bool(group_name): + raise GroupCreationError("Could not create the group.") + + db_uri = current_app.config["AUTH_DB"] + with db.connection(db_uri) as conn: + user = the_token.user + new_group = _create_group( + conn, group_name, user, request.form.get("group_description")) + return jsonify({ + **dictify(new_group), "group_leader": dictify(user) + }) diff --git a/migrations/auth/20221108_01_CoxYh-create-the-groups-table.py b/migrations/auth/20221108_01_CoxYh-create-the-groups-table.py index 06491dd..29f92d4 100644 --- a/migrations/auth/20221108_01_CoxYh-create-the-groups-table.py +++ b/migrations/auth/20221108_01_CoxYh-create-the-groups-table.py @@ -11,7 +11,8 @@ steps = [ """ CREATE TABLE IF NOT EXISTS groups( group_id TEXT PRIMARY KEY NOT NULL, - group_name TEXT NOT NULL + group_name TEXT NOT NULL, + group_metadata TEXT ) WITHOUT ROWID """, "DROP TABLE IF EXISTS groups") diff --git a/tests/unit/auth/fixtures/group_fixtures.py b/tests/unit/auth/fixtures/group_fixtures.py index 1830374..d7bbc56 100644 --- a/tests/unit/auth/fixtures/group_fixtures.py +++ b/tests/unit/auth/fixtures/group_fixtures.py @@ -10,9 +10,9 @@ from gn3.auth.authorisation.resources import Resource, ResourceCategory from .role_fixtures import RESOURCE_EDITOR_ROLE, RESOURCE_READER_ROLE TEST_GROUP_01 = Group(uuid.UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), - "TheTestGroup") + "TheTestGroup", {}) TEST_GROUP_02 = Group(uuid.UUID("e37d59d7-c05e-4d67-b479-81e627d8d634"), - "AnotherTestGroup") + "AnotherTestGroup", {}) TEST_GROUPS = (TEST_GROUP_01, TEST_GROUP_02) TEST_RESOURCES_GROUP_01 = ( diff --git a/tests/unit/auth/test_groups.py b/tests/unit/auth/test_groups.py index 158360e..219b82a 100644 --- a/tests/unit/auth/test_groups.py +++ b/tests/unit/auth/test_groups.py @@ -8,8 +8,10 @@ from gn3.auth import db from gn3.auth.authentication.users import User from gn3.auth.authorisation.roles import Role from gn3.auth.authorisation.privileges import Privilege +from gn3.auth.authorisation.errors import AuthorisationError from gn3.auth.authorisation.groups import ( - Group, GroupRole, user_group, create_group, MembershipError, create_group_role) + Group, GroupRole, user_group, create_group, MembershipError, + create_group_role) from tests.unit.auth import conftest @@ -20,7 +22,8 @@ create_group_failure = { uuid_fn = lambda : UUID("d32611e3-07fc-4564-b56c-786c6db6de2b") -GROUP = Group(UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup") +GROUP = Group(UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup", + {"group_description": "The test group"}) PRIVILEGES = ( Privilege( "group:resource:view-resource", @@ -29,9 +32,10 @@ PRIVILEGES = ( @pytest.mark.unit_test @pytest.mark.parametrize( - "user,expected", tuple(zip(conftest.TEST_USERS, ( + "user,expected", tuple(zip(conftest.TEST_USERS[0:1], ( Group( - UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_group"), + UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_group", + {"group_description": "A test group"}), create_group_failure, create_group_failure, create_group_failure, create_group_failure)))) def test_create_group(# pylint: disable=[too-many-arguments] @@ -46,7 +50,24 @@ def test_create_group(# pylint: disable=[too-many-arguments] with fxtr_app.app_context() as flask_context: flask_context.g.user = user with db.connection(auth_testdb_path) as conn: - assert create_group(conn, "a_test_group", user) == expected + assert create_group( + conn, "a_test_group", user, "A test group") == expected + +@pytest.mark.unit_test +@pytest.mark.parametrize("user", conftest.TEST_USERS[1:]) +def test_create_group_raises_exception_with_non_privileged_user(# pylint: disable=[too-many-arguments] + fxtr_app, auth_testdb_path, mocker, fxtr_users, user):# pylint: disable=[unused-argument] + """ + GIVEN: an authenticated user, without appropriate privileges + WHEN: the user attempts to create a group + THEN: verify the system raises an exception + """ + mocker.patch("gn3.auth.authorisation.groups.uuid4", uuid_fn) + with fxtr_app.app_context() as flask_context: + flask_context.g.user = user + with db.connection(auth_testdb_path) as conn: + with pytest.raises(AuthorisationError): + assert create_group(conn, "a_test_group", user, "A test group") create_role_failure = { "status": "error", @@ -55,14 +76,12 @@ create_role_failure = { @pytest.mark.unit_test @pytest.mark.parametrize( - "user,expected", tuple(zip(conftest.TEST_USERS, ( + "user,expected", tuple(zip(conftest.TEST_USERS[0:1], ( GroupRole( UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), GROUP, Role(UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), - "ResourceEditor", PRIVILEGES)), - create_role_failure, create_role_failure, create_role_failure, - create_role_failure)))) + "ResourceEditor", PRIVILEGES)),)))) def test_create_group_role(mocker, fxtr_users_in_group, fxtr_app, user, expected): """ GIVEN: an authenticated user @@ -84,6 +103,27 @@ def test_create_group_role(mocker, fxtr_users_in_group, fxtr_app, user, expected (str(uuid_fn()), str(GROUP.group_id), str(uuid_fn()))) @pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", tuple(zip(conftest.TEST_USERS[1:], ( + create_role_failure, create_role_failure, create_role_failure)))) +def test_create_group_role_raises_exception_with_unauthorised_users( + mocker, fxtr_users_in_group, fxtr_app, user, expected): + """ + GIVEN: an authenticated user + WHEN: the user attempts to create a role, attached to a group + THEN: verify they are only able to create the role if they have the + appropriate privileges and that the role is attached to the given group + """ + mocker.patch("gn3.auth.authorisation.groups.uuid4", uuid_fn) + mocker.patch("gn3.auth.authorisation.roles.uuid4", uuid_fn) + conn, _group, _users = fxtr_users_in_group + with fxtr_app.app_context() as flask_context: + flask_context.g.user = user + with pytest.raises(AuthorisationError): + assert create_group_role( + conn, GROUP, "ResourceEditor", PRIVILEGES) == expected + +@pytest.mark.unit_test def test_create_multiple_groups(mocker, fxtr_app, fxtr_users): """ GIVEN: An authenticated user with appropriate authorisation @@ -101,7 +141,8 @@ def test_create_multiple_groups(mocker, fxtr_app, fxtr_users): flask_context.g.user = user # First time, successfully creates the group assert create_group(conn, "a_test_group", user) == Group( - UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_group") + UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_group", + {}) # subsequent attempts should fail with pytest.raises(MembershipError): create_group(conn, "another_test_group", user) @@ -111,7 +152,7 @@ def test_create_multiple_groups(mocker, fxtr_app, fxtr_users): "user,expected", tuple(zip( conftest.TEST_USERS, - (([Group(UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup")] * 3) + (([Group(UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup", {})] * 3) + [Nothing])))) def test_user_group(fxtr_users_in_group, user, expected): """ diff --git a/tests/unit/auth/test_resources.py b/tests/unit/auth/test_resources.py index e6ebeb9..a0236c4 100644 --- a/tests/unit/auth/test_resources.py +++ b/tests/unit/auth/test_resources.py @@ -5,13 +5,15 @@ import pytest from gn3.auth import db from gn3.auth.authorisation.groups import Group +from gn3.auth.authorisation.errors import AuthorisationError from gn3.auth.authorisation.resources import ( Resource, user_resources, create_resource, ResourceCategory, public_resources) from tests.unit.auth import conftest -group = Group(uuid.UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup") +group = Group(uuid.UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup", + {}) resource_category = ResourceCategory( uuid.UUID("fad071a3-2fc8-40b8-992b-cdefe7dcac79"), "mrna", "mRNA Dataset") create_resource_failure = { @@ -24,14 +26,10 @@ uuid_fn = lambda : uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b") @pytest.mark.parametrize( "user,expected", tuple(zip( - conftest.TEST_USERS, + conftest.TEST_USERS[0:1], (Resource( group, uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), - "test_resource", resource_category, False), - create_resource_failure, - create_resource_failure, - create_resource_failure, - create_resource_failure)))) + "test_resource", resource_category, False),)))) def test_create_resource(mocker, fxtr_app, fxtr_users_in_group, user, expected): """Test that resource creation works as expected.""" mocker.patch("gn3.auth.authorisation.resources.uuid4", uuid_fn) @@ -44,6 +42,24 @@ def test_create_resource(mocker, fxtr_app, fxtr_users_in_group, user, expected): cursor.execute( "DELETE FROM resources WHERE resource_id=?", (str(uuid_fn()),)) +@pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", + tuple(zip( + conftest.TEST_USERS[1:], + (create_resource_failure, create_resource_failure, + create_resource_failure)))) +def test_create_resource_raises_for_unauthorised_users( + mocker, fxtr_app, fxtr_users_in_group, user, expected): + """Test that resource creation works as expected.""" + mocker.patch("gn3.auth.authorisation.resources.uuid4", uuid_fn) + conn, _group, _users = fxtr_users_in_group + with fxtr_app.app_context() as flask_context: + flask_context.g.user = user + with pytest.raises(AuthorisationError): + assert create_resource( + conn, "test_resource", resource_category) == expected + SORTKEY = lambda resource: resource.resource_id @pytest.mark.unit_test @@ -57,7 +73,9 @@ def test_public_resources(fxtr_resources): assert sorted(public_resources(conn), key=SORTKEY) == sorted(tuple( res for res in conftest.TEST_RESOURCES if res.public), key=SORTKEY) -PUBLIC_RESOURCES = sorted(conftest.TEST_RESOURCES_PUBLIC, key=SORTKEY) +PUBLIC_RESOURCES = sorted( + {res.resource_id: res for res in conftest.TEST_RESOURCES_PUBLIC}.values(), + key=SORTKEY) @pytest.mark.unit_test @pytest.mark.parametrize( @@ -65,12 +83,15 @@ PUBLIC_RESOURCES = sorted(conftest.TEST_RESOURCES_PUBLIC, key=SORTKEY) tuple(zip( conftest.TEST_USERS, (sorted( - set(conftest.TEST_RESOURCES_GROUP_01).union( - conftest.TEST_RESOURCES_PUBLIC), + {res.resource_id: res for res in + (conftest.TEST_RESOURCES_GROUP_01 + + conftest.TEST_RESOURCES_PUBLIC)}.values(), key=SORTKEY), sorted( - set([conftest.TEST_RESOURCES_GROUP_01[1]]).union( - conftest.TEST_RESOURCES_PUBLIC), + {res.resource_id: res for res in + ((conftest.TEST_RESOURCES_GROUP_01[1],) + + conftest.TEST_RESOURCES_PUBLIC)}.values() + , key=SORTKEY), PUBLIC_RESOURCES, PUBLIC_RESOURCES)))) def test_user_resources(fxtr_group_user_roles, user, expected): @@ -80,4 +101,6 @@ def test_user_resources(fxtr_group_user_roles, user, expected): THEN: list only the resources for which the user can access """ conn, *_others = fxtr_group_user_roles - assert sorted(user_resources(conn, user), key=SORTKEY) == expected + assert sorted( + {res.resource_id: res for res in user_resources(conn, user) + }.values(), key=SORTKEY) == expected diff --git a/tests/unit/auth/test_roles.py b/tests/unit/auth/test_roles.py index 7252bfe..78ff8a6 100644 --- a/tests/unit/auth/test_roles.py +++ b/tests/unit/auth/test_roles.py @@ -5,6 +5,7 @@ import pytest from gn3.auth import db from gn3.auth.authorisation.privileges import Privilege +from gn3.auth.authorisation.errors import AuthorisationError from gn3.auth.authorisation.roles import Role, user_roles, create_role from tests.unit.auth import conftest @@ -24,11 +25,9 @@ PRIVILEGES = ( @pytest.mark.unit_test @pytest.mark.parametrize( - "user,expected", tuple(zip(conftest.TEST_USERS, ( - Role( - uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_role", - PRIVILEGES), create_role_failure, create_role_failure, - create_role_failure, create_role_failure)))) + "user,expected", tuple(zip(conftest.TEST_USERS[0:1], ( + Role(uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_role", + PRIVILEGES),)))) def test_create_role(# pylint: disable=[too-many-arguments] fxtr_app, auth_testdb_path, mocker, fxtr_users, user, expected):# pylint: disable=[unused-argument] """ @@ -46,6 +45,25 @@ def test_create_role(# pylint: disable=[too-many-arguments] @pytest.mark.unit_test @pytest.mark.parametrize( + "user,expected", tuple(zip(conftest.TEST_USERS[1:], ( + create_role_failure, create_role_failure, create_role_failure)))) +def test_create_role_raises_exception_for_unauthorised_users(# pylint: disable=[too-many-arguments] + fxtr_app, auth_testdb_path, mocker, fxtr_users, user, expected):# pylint: disable=[unused-argument] + """ + GIVEN: an authenticated user + WHEN: the user attempts to create a role + THEN: verify they are only able to create the role if they have the + appropriate privileges + """ + mocker.patch("gn3.auth.authorisation.roles.uuid4", uuid_fn) + with fxtr_app.app_context() as flask_context: + flask_context.g.user = user + with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: + with pytest.raises(AuthorisationError): + create_role(cursor, "a_test_role", PRIVILEGES) + +@pytest.mark.unit_test +@pytest.mark.parametrize( "user,expected", (zip(TEST_USERS, ((Role( |