diff options
-rw-r--r-- | gn3/auth/authorisation/__init__.py | 39 | ||||
-rw-r--r-- | gn3/auth/authorisation/groups.py | 15 | ||||
-rw-r--r-- | gn3/auth/authorisation/privileges.py | 16 | ||||
-rw-r--r-- | gn3/auth/db.py | 19 | ||||
-rw-r--r-- | tests/unit/auth/conftest.py | 18 | ||||
-rw-r--r-- | tests/unit/auth/test_groups.py | 33 |
6 files changed, 117 insertions, 23 deletions
diff --git a/gn3/auth/authorisation/__init__.py b/gn3/auth/authorisation/__init__.py index a6991d2..048f67d 100644 --- a/gn3/auth/authorisation/__init__.py +++ b/gn3/auth/authorisation/__init__.py @@ -1,12 +1,35 @@ """The authorisation module.""" -from typing import Union +from functools import wraps +from typing import Union, Callable -def authorised_p(success_message: Union[str, bool] = False, error_message: Union[str, bool] = False): +from flask import g, current_app as app + +from gn3.auth import db +from . import privileges as auth_privs + +def authorised_p( + privileges: tuple[str] = tuple(), + success_message: Union[str, bool] = False, + error_message: Union[str, bool] = False): """Authorisation decorator.""" - def __authoriser__(*args, **kwargs): - return { - "status": "error", - "message": error_message or "unauthorised" - } + assert len(privileges) > 0, "You must provide at least one privilege" + def __build_authoriser__(func: Callable): + @wraps(func) + def __authoriser__(*args, **kwargs): + if hasattr(g, "user_id") and g.user_id: + with db.connection(app.config["AUTH_DB"]) as conn: + user_privileges = auth_privs.user_privileges(conn, g.user_id) - return __authoriser__ + not_assigned = [ + priv for priv in privileges if priv not in user_privileges] + if len(not_assigned) == 0: + return { + "status": "success", + "message": success_message or "successfully authorised", + "results": func(*args, **kwargs)} + return { + "status": "error", + "message": f"Unauthorised: {error_message or ''}" + } + return __authoriser__ + return __build_authoriser__ diff --git a/gn3/auth/authorisation/groups.py b/gn3/auth/authorisation/groups.py index 8c8a87f..ad30763 100644 --- a/gn3/auth/authorisation/groups.py +++ b/gn3/auth/authorisation/groups.py @@ -1,7 +1,16 @@ """Handle the management of resource/user groups.""" +import uuid +from gn3.auth import db from . import authorised_p -@authorised_p -def create_group(group_name): - raise Exception("NOT IMPLEMENTED!") +@authorised_p( + ("create-group",), success_message="Successfully created group.", + error_message="Failed to create group.") +def create_group(conn, group_name): + with db.cursor(conn) as cursor: + group_id = uuid.uuid4() + cursor.execute( + "INSERT INTO groups(group_id, group_name) VALUES (?, ?)", + (str(group_id), group_name)) + return group_id diff --git a/gn3/auth/authorisation/privileges.py b/gn3/auth/authorisation/privileges.py new file mode 100644 index 0000000..99b36ef --- /dev/null +++ b/gn3/auth/authorisation/privileges.py @@ -0,0 +1,16 @@ +"""Handle privileges""" +from uuid import UUID + +from gn3.auth import db + +def user_privileges(conn, user_id: UUID): + """Fetch the user's privileges from the database.""" + with db.cursor(conn) as cursor: + cursor.execute( + ("SELECT p.privilege_name " + "FROM user_roles AS ur " + "INNER JOIN role_privileges AS rp ON ur.role_id=rp.role_id " + "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id " + "WHERE ur.user_id=?"), + (str(user_id),)) + return tuple(row[0] for row in cursor.fetchall()) diff --git a/gn3/auth/db.py b/gn3/auth/db.py new file mode 100644 index 0000000..c0d0415 --- /dev/null +++ b/gn3/auth/db.py @@ -0,0 +1,19 @@ +"""Handle connection to auth database.""" +import sqlite3 +import contextlib + +@contextlib.contextmanager +def connection(db_path: str): + connection = sqlite3.connect(db_path) + try: + yield connection + finally: + connection.close() + +@contextlib.contextmanager +def cursor(connection): + cur = connection.cursor() + try: + yield cur + finally: + cur.close() diff --git a/tests/unit/auth/conftest.py b/tests/unit/auth/conftest.py index 3d887be..b35ae4a 100644 --- a/tests/unit/auth/conftest.py +++ b/tests/unit/auth/conftest.py @@ -54,6 +54,7 @@ def migrations_up_to(migration, migrations_dir): @pytest.fixture(scope="function") def test_users(conn_after_auth_migrations): query = "INSERT INTO users(user_id, email, name) VALUES (?, ?, ?)" + query_user_roles = "INSERT INTO user_roles(user_id, role_id) VALUES (?, ?)" test_users = ( ("ecb52977-3004-469e-9428-2a1856725c7f", "group@lead.er", "Group Leader"), @@ -63,7 +64,24 @@ def test_users(conn_after_auth_migrations): "Group Member 02"), ("9a0c7ce5-2f40-4e78-979e-bf3527a59579", "unaff@iliated.user", "Unaffiliated User")) + test_user_roles = ( + ("ecb52977-3004-469e-9428-2a1856725c7f", + "a0e67630-d502-4b9f-b23f-6805d0f30e30"),) with closing(conn_after_auth_migrations.cursor()) as cursor: cursor.executemany(query, test_users) + cursor.executemany(query_user_roles, test_user_roles) + conn_after_auth_migrations.commit() yield conn_after_auth_migrations + + with closing(conn_after_auth_migrations.cursor()) as cursor: + cursor.executemany( + "DELETE FROM user_roles WHERE user_id=?", + (("ecb52977-3004-469e-9428-2a1856725c7f",),)) + cursor.executemany( + "DELETE FROM users WHERE user_id=?", + (("ecb52977-3004-469e-9428-2a1856725c7f",), + ("21351b66-8aad-475b-84ac-53ce528451e3",), + ("ae9c6245-0966-41a5-9a5e-20885a96bea7",), + ("9a0c7ce5-2f40-4e78-979e-bf3527a59579",))) + conn_after_auth_migrations.commit() diff --git a/tests/unit/auth/test_groups.py b/tests/unit/auth/test_groups.py index fa7f334..18abbc6 100644 --- a/tests/unit/auth/test_groups.py +++ b/tests/unit/auth/test_groups.py @@ -1,25 +1,34 @@ """Test functions dealing with group management.""" from uuid import UUID +import flask import pytest +from gn3.auth import db from gn3.auth.authorisation.groups import create_group +create_group_failure = { + "status": "error", + "message": "Unauthorised: Failed to create group." +} + +group_leader_id = lambda : UUID("d32611e3-07fc-4564-b56c-786c6db6de2b") + @pytest.mark.unit_test @pytest.mark.parametrize( "user_id,expected", ( ("ecb52977-3004-469e-9428-2a1856725c7f", { "status": "success", - "message": "Successfully created group!", - "group_id": UUID("d32611e3-07fc-4564-b56c-786c6db6de2b") + "message": "Successfully created group.", + "results": UUID("d32611e3-07fc-4564-b56c-786c6db6de2b") }), - ("21351b66-8aad-475b-84ac-53ce528451e3", { - "status": "error", "message": "unauthorised"}), - ("ae9c6245-0966-41a5-9a5e-20885a96bea7", { - "status": "error", "message": "unauthorised"}), - ("9a0c7ce5-2f40-4e78-979e-bf3527a59579", { - "status": "error", "message": "unauthorised"}), - ("e614247d-84d2-491d-a048-f80b578216cb", { - "status": "error", "message": "unauthorised"}))) -def test_create_group(test_users, user_id, expected): - assert create_group("a_test_group") == expected + ("21351b66-8aad-475b-84ac-53ce528451e3", create_group_failure), + ("ae9c6245-0966-41a5-9a5e-20885a96bea7", create_group_failure), + ("9a0c7ce5-2f40-4e78-979e-bf3527a59579", create_group_failure), + ("e614247d-84d2-491d-a048-f80b578216cb", create_group_failure))) +def test_create_group(test_app, auth_testdb_path, mocker, test_users, user_id, expected): + mocker.patch("gn3.auth.authorisation.groups.uuid.uuid4", group_leader_id) + with test_app.test_request_context() as flask_context: + flask_context.g.user_id = UUID(user_id) + with db.connection(auth_testdb_path) as conn: + assert create_group(conn, "a_test_group") == expected |