aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gn3/auth/authorisation/__init__.py39
-rw-r--r--gn3/auth/authorisation/groups.py15
-rw-r--r--gn3/auth/authorisation/privileges.py16
-rw-r--r--gn3/auth/db.py19
-rw-r--r--tests/unit/auth/conftest.py18
-rw-r--r--tests/unit/auth/test_groups.py33
6 files changed, 117 insertions, 23 deletions
diff --git a/gn3/auth/authorisation/__init__.py b/gn3/auth/authorisation/__init__.py
index a6991d2..048f67d 100644
--- a/gn3/auth/authorisation/__init__.py
+++ b/gn3/auth/authorisation/__init__.py
@@ -1,12 +1,35 @@
"""The authorisation module."""
-from typing import Union
+from functools import wraps
+from typing import Union, Callable
-def authorised_p(success_message: Union[str, bool] = False, error_message: Union[str, bool] = False):
+from flask import g, current_app as app
+
+from gn3.auth import db
+from . import privileges as auth_privs
+
+def authorised_p(
+ privileges: tuple[str] = tuple(),
+ success_message: Union[str, bool] = False,
+ error_message: Union[str, bool] = False):
"""Authorisation decorator."""
- def __authoriser__(*args, **kwargs):
- return {
- "status": "error",
- "message": error_message or "unauthorised"
- }
+ assert len(privileges) > 0, "You must provide at least one privilege"
+ def __build_authoriser__(func: Callable):
+ @wraps(func)
+ def __authoriser__(*args, **kwargs):
+ if hasattr(g, "user_id") and g.user_id:
+ with db.connection(app.config["AUTH_DB"]) as conn:
+ user_privileges = auth_privs.user_privileges(conn, g.user_id)
- return __authoriser__
+ not_assigned = [
+ priv for priv in privileges if priv not in user_privileges]
+ if len(not_assigned) == 0:
+ return {
+ "status": "success",
+ "message": success_message or "successfully authorised",
+ "results": func(*args, **kwargs)}
+ return {
+ "status": "error",
+ "message": f"Unauthorised: {error_message or ''}"
+ }
+ return __authoriser__
+ return __build_authoriser__
diff --git a/gn3/auth/authorisation/groups.py b/gn3/auth/authorisation/groups.py
index 8c8a87f..ad30763 100644
--- a/gn3/auth/authorisation/groups.py
+++ b/gn3/auth/authorisation/groups.py
@@ -1,7 +1,16 @@
"""Handle the management of resource/user groups."""
+import uuid
+from gn3.auth import db
from . import authorised_p
-@authorised_p
-def create_group(group_name):
- raise Exception("NOT IMPLEMENTED!")
+@authorised_p(
+ ("create-group",), success_message="Successfully created group.",
+ error_message="Failed to create group.")
+def create_group(conn, group_name):
+ with db.cursor(conn) as cursor:
+ group_id = uuid.uuid4()
+ cursor.execute(
+ "INSERT INTO groups(group_id, group_name) VALUES (?, ?)",
+ (str(group_id), group_name))
+ return group_id
diff --git a/gn3/auth/authorisation/privileges.py b/gn3/auth/authorisation/privileges.py
new file mode 100644
index 0000000..99b36ef
--- /dev/null
+++ b/gn3/auth/authorisation/privileges.py
@@ -0,0 +1,16 @@
+"""Handle privileges"""
+from uuid import UUID
+
+from gn3.auth import db
+
+def user_privileges(conn, user_id: UUID):
+ """Fetch the user's privileges from the database."""
+ with db.cursor(conn) as cursor:
+ cursor.execute(
+ ("SELECT p.privilege_name "
+ "FROM user_roles AS ur "
+ "INNER JOIN role_privileges AS rp ON ur.role_id=rp.role_id "
+ "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id "
+ "WHERE ur.user_id=?"),
+ (str(user_id),))
+ return tuple(row[0] for row in cursor.fetchall())
diff --git a/gn3/auth/db.py b/gn3/auth/db.py
new file mode 100644
index 0000000..c0d0415
--- /dev/null
+++ b/gn3/auth/db.py
@@ -0,0 +1,19 @@
+"""Handle connection to auth database."""
+import sqlite3
+import contextlib
+
+@contextlib.contextmanager
+def connection(db_path: str):
+ connection = sqlite3.connect(db_path)
+ try:
+ yield connection
+ finally:
+ connection.close()
+
+@contextlib.contextmanager
+def cursor(connection):
+ cur = connection.cursor()
+ try:
+ yield cur
+ finally:
+ cur.close()
diff --git a/tests/unit/auth/conftest.py b/tests/unit/auth/conftest.py
index 3d887be..b35ae4a 100644
--- a/tests/unit/auth/conftest.py
+++ b/tests/unit/auth/conftest.py
@@ -54,6 +54,7 @@ def migrations_up_to(migration, migrations_dir):
@pytest.fixture(scope="function")
def test_users(conn_after_auth_migrations):
query = "INSERT INTO users(user_id, email, name) VALUES (?, ?, ?)"
+ query_user_roles = "INSERT INTO user_roles(user_id, role_id) VALUES (?, ?)"
test_users = (
("ecb52977-3004-469e-9428-2a1856725c7f", "group@lead.er",
"Group Leader"),
@@ -63,7 +64,24 @@ def test_users(conn_after_auth_migrations):
"Group Member 02"),
("9a0c7ce5-2f40-4e78-979e-bf3527a59579", "unaff@iliated.user",
"Unaffiliated User"))
+ test_user_roles = (
+ ("ecb52977-3004-469e-9428-2a1856725c7f",
+ "a0e67630-d502-4b9f-b23f-6805d0f30e30"),)
with closing(conn_after_auth_migrations.cursor()) as cursor:
cursor.executemany(query, test_users)
+ cursor.executemany(query_user_roles, test_user_roles)
+ conn_after_auth_migrations.commit()
yield conn_after_auth_migrations
+
+ with closing(conn_after_auth_migrations.cursor()) as cursor:
+ cursor.executemany(
+ "DELETE FROM user_roles WHERE user_id=?",
+ (("ecb52977-3004-469e-9428-2a1856725c7f",),))
+ cursor.executemany(
+ "DELETE FROM users WHERE user_id=?",
+ (("ecb52977-3004-469e-9428-2a1856725c7f",),
+ ("21351b66-8aad-475b-84ac-53ce528451e3",),
+ ("ae9c6245-0966-41a5-9a5e-20885a96bea7",),
+ ("9a0c7ce5-2f40-4e78-979e-bf3527a59579",)))
+ conn_after_auth_migrations.commit()
diff --git a/tests/unit/auth/test_groups.py b/tests/unit/auth/test_groups.py
index fa7f334..18abbc6 100644
--- a/tests/unit/auth/test_groups.py
+++ b/tests/unit/auth/test_groups.py
@@ -1,25 +1,34 @@
"""Test functions dealing with group management."""
from uuid import UUID
+import flask
import pytest
+from gn3.auth import db
from gn3.auth.authorisation.groups import create_group
+create_group_failure = {
+ "status": "error",
+ "message": "Unauthorised: Failed to create group."
+}
+
+group_leader_id = lambda : UUID("d32611e3-07fc-4564-b56c-786c6db6de2b")
+
@pytest.mark.unit_test
@pytest.mark.parametrize(
"user_id,expected", (
("ecb52977-3004-469e-9428-2a1856725c7f", {
"status": "success",
- "message": "Successfully created group!",
- "group_id": UUID("d32611e3-07fc-4564-b56c-786c6db6de2b")
+ "message": "Successfully created group.",
+ "results": UUID("d32611e3-07fc-4564-b56c-786c6db6de2b")
}),
- ("21351b66-8aad-475b-84ac-53ce528451e3", {
- "status": "error", "message": "unauthorised"}),
- ("ae9c6245-0966-41a5-9a5e-20885a96bea7", {
- "status": "error", "message": "unauthorised"}),
- ("9a0c7ce5-2f40-4e78-979e-bf3527a59579", {
- "status": "error", "message": "unauthorised"}),
- ("e614247d-84d2-491d-a048-f80b578216cb", {
- "status": "error", "message": "unauthorised"})))
-def test_create_group(test_users, user_id, expected):
- assert create_group("a_test_group") == expected
+ ("21351b66-8aad-475b-84ac-53ce528451e3", create_group_failure),
+ ("ae9c6245-0966-41a5-9a5e-20885a96bea7", create_group_failure),
+ ("9a0c7ce5-2f40-4e78-979e-bf3527a59579", create_group_failure),
+ ("e614247d-84d2-491d-a048-f80b578216cb", create_group_failure)))
+def test_create_group(test_app, auth_testdb_path, mocker, test_users, user_id, expected):
+ mocker.patch("gn3.auth.authorisation.groups.uuid.uuid4", group_leader_id)
+ with test_app.test_request_context() as flask_context:
+ flask_context.g.user_id = UUID(user_id)
+ with db.connection(auth_testdb_path) as conn:
+ assert create_group(conn, "a_test_group") == expected