about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFrederick Muriuki Muriithi2022-11-14 18:04:08 +0300
committerFrederick Muriuki Muriithi2022-11-14 18:04:08 +0300
commit6b964b95a67bac69a1217b3f9c39c58a19881df4 (patch)
treef83ac08fa06c7535c1fcb3926876029e2ec17a52
parentbaf7980cd6080c8cb4e6c6b585948144273600aa (diff)
downloadgenenetwork3-6b964b95a67bac69a1217b3f9c39c58a19881df4.tar.gz
auth: Implement `create_group`
-rw-r--r--gn3/auth/authorisation/__init__.py39
-rw-r--r--gn3/auth/authorisation/groups.py15
-rw-r--r--gn3/auth/authorisation/privileges.py16
-rw-r--r--gn3/auth/db.py19
-rw-r--r--tests/unit/auth/conftest.py18
-rw-r--r--tests/unit/auth/test_groups.py33
6 files changed, 117 insertions, 23 deletions
diff --git a/gn3/auth/authorisation/__init__.py b/gn3/auth/authorisation/__init__.py
index a6991d2..048f67d 100644
--- a/gn3/auth/authorisation/__init__.py
+++ b/gn3/auth/authorisation/__init__.py
@@ -1,12 +1,35 @@
 """The authorisation module."""
-from typing import Union
+from functools import wraps
+from typing import Union, Callable
 
-def authorised_p(success_message: Union[str, bool] = False, error_message: Union[str, bool] = False):
+from flask import g, current_app as app
+
+from gn3.auth import db
+from . import privileges as auth_privs
+
+def authorised_p(
+        privileges: tuple[str] = tuple(),
+        success_message: Union[str, bool] = False,
+        error_message: Union[str, bool] = False):
     """Authorisation decorator."""
-    def __authoriser__(*args, **kwargs):
-        return {
-            "status": "error",
-            "message": error_message or "unauthorised"
-        }
+    assert len(privileges) > 0, "You must provide at least one privilege"
+    def __build_authoriser__(func: Callable):
+        @wraps(func)
+        def __authoriser__(*args, **kwargs):
+            if hasattr(g, "user_id") and g.user_id:
+                with db.connection(app.config["AUTH_DB"]) as conn:
+                    user_privileges = auth_privs.user_privileges(conn, g.user_id)
 
-    return __authoriser__
+                not_assigned = [
+                    priv for priv in privileges if priv not in user_privileges]
+                if len(not_assigned) == 0:
+                    return {
+                        "status": "success",
+                        "message": success_message or "successfully authorised",
+                        "results": func(*args, **kwargs)}
+            return {
+                "status": "error",
+                "message": f"Unauthorised: {error_message or ''}"
+            }
+        return __authoriser__
+    return __build_authoriser__
diff --git a/gn3/auth/authorisation/groups.py b/gn3/auth/authorisation/groups.py
index 8c8a87f..ad30763 100644
--- a/gn3/auth/authorisation/groups.py
+++ b/gn3/auth/authorisation/groups.py
@@ -1,7 +1,16 @@
 """Handle the management of resource/user groups."""
+import uuid
 
+from gn3.auth import db
 from . import authorised_p
 
-@authorised_p
-def create_group(group_name):
-    raise Exception("NOT IMPLEMENTED!")
+@authorised_p(
+    ("create-group",), success_message="Successfully created group.",
+    error_message="Failed to create group.")
+def create_group(conn, group_name):
+    with db.cursor(conn) as cursor:
+        group_id = uuid.uuid4()
+        cursor.execute(
+            "INSERT INTO groups(group_id, group_name) VALUES (?, ?)",
+            (str(group_id), group_name))
+        return group_id
diff --git a/gn3/auth/authorisation/privileges.py b/gn3/auth/authorisation/privileges.py
new file mode 100644
index 0000000..99b36ef
--- /dev/null
+++ b/gn3/auth/authorisation/privileges.py
@@ -0,0 +1,16 @@
+"""Handle privileges"""
+from uuid import UUID
+
+from gn3.auth import db
+
+def user_privileges(conn, user_id: UUID):
+    """Fetch the user's privileges from the database."""
+    with db.cursor(conn) as cursor:
+        cursor.execute(
+            ("SELECT p.privilege_name "
+             "FROM user_roles AS ur "
+             "INNER JOIN role_privileges AS rp ON ur.role_id=rp.role_id "
+             "INNER JOIN privileges AS p ON rp.privilege_id=p.privilege_id "
+             "WHERE ur.user_id=?"),
+            (str(user_id),))
+        return tuple(row[0] for row in cursor.fetchall())
diff --git a/gn3/auth/db.py b/gn3/auth/db.py
new file mode 100644
index 0000000..c0d0415
--- /dev/null
+++ b/gn3/auth/db.py
@@ -0,0 +1,19 @@
+"""Handle connection to auth database."""
+import sqlite3
+import contextlib
+
+@contextlib.contextmanager
+def connection(db_path: str):
+    connection = sqlite3.connect(db_path)
+    try:
+        yield connection
+    finally:
+        connection.close()
+
+@contextlib.contextmanager
+def cursor(connection):
+    cur = connection.cursor()
+    try:
+        yield cur
+    finally:
+        cur.close()
diff --git a/tests/unit/auth/conftest.py b/tests/unit/auth/conftest.py
index 3d887be..b35ae4a 100644
--- a/tests/unit/auth/conftest.py
+++ b/tests/unit/auth/conftest.py
@@ -54,6 +54,7 @@ def migrations_up_to(migration, migrations_dir):
 @pytest.fixture(scope="function")
 def test_users(conn_after_auth_migrations):
     query = "INSERT INTO users(user_id, email, name) VALUES (?, ?, ?)"
+    query_user_roles = "INSERT INTO user_roles(user_id, role_id) VALUES (?, ?)"
     test_users = (
         ("ecb52977-3004-469e-9428-2a1856725c7f", "group@lead.er",
          "Group Leader"),
@@ -63,7 +64,24 @@ def test_users(conn_after_auth_migrations):
          "Group Member 02"),
         ("9a0c7ce5-2f40-4e78-979e-bf3527a59579", "unaff@iliated.user",
          "Unaffiliated User"))
+    test_user_roles = (
+        ("ecb52977-3004-469e-9428-2a1856725c7f",
+         "a0e67630-d502-4b9f-b23f-6805d0f30e30"),)
     with closing(conn_after_auth_migrations.cursor()) as cursor:
         cursor.executemany(query, test_users)
+        cursor.executemany(query_user_roles, test_user_roles)
+        conn_after_auth_migrations.commit()
 
     yield conn_after_auth_migrations
+
+    with closing(conn_after_auth_migrations.cursor()) as cursor:
+        cursor.executemany(
+            "DELETE FROM user_roles WHERE user_id=?",
+            (("ecb52977-3004-469e-9428-2a1856725c7f",),))
+        cursor.executemany(
+            "DELETE FROM users WHERE user_id=?",
+            (("ecb52977-3004-469e-9428-2a1856725c7f",),
+             ("21351b66-8aad-475b-84ac-53ce528451e3",),
+             ("ae9c6245-0966-41a5-9a5e-20885a96bea7",),
+             ("9a0c7ce5-2f40-4e78-979e-bf3527a59579",)))
+        conn_after_auth_migrations.commit()
diff --git a/tests/unit/auth/test_groups.py b/tests/unit/auth/test_groups.py
index fa7f334..18abbc6 100644
--- a/tests/unit/auth/test_groups.py
+++ b/tests/unit/auth/test_groups.py
@@ -1,25 +1,34 @@
 """Test functions dealing with group management."""
 from uuid import UUID
 
+import flask
 import pytest
 
+from gn3.auth import db
 from gn3.auth.authorisation.groups import create_group
 
+create_group_failure = {
+    "status": "error",
+    "message": "Unauthorised: Failed to create group."
+}
+
+group_leader_id = lambda : UUID("d32611e3-07fc-4564-b56c-786c6db6de2b")
+
 @pytest.mark.unit_test
 @pytest.mark.parametrize(
     "user_id,expected", (
     ("ecb52977-3004-469e-9428-2a1856725c7f", {
         "status": "success",
-        "message": "Successfully created group!",
-        "group_id": UUID("d32611e3-07fc-4564-b56c-786c6db6de2b")
+        "message": "Successfully created group.",
+        "results": UUID("d32611e3-07fc-4564-b56c-786c6db6de2b")
     }),
-    ("21351b66-8aad-475b-84ac-53ce528451e3", {
-        "status": "error", "message": "unauthorised"}),
-    ("ae9c6245-0966-41a5-9a5e-20885a96bea7", {
-        "status": "error", "message": "unauthorised"}),
-    ("9a0c7ce5-2f40-4e78-979e-bf3527a59579", {
-        "status": "error", "message": "unauthorised"}),
-    ("e614247d-84d2-491d-a048-f80b578216cb", {
-        "status": "error", "message": "unauthorised"})))
-def test_create_group(test_users, user_id, expected):
-    assert create_group("a_test_group") == expected
+    ("21351b66-8aad-475b-84ac-53ce528451e3", create_group_failure),
+    ("ae9c6245-0966-41a5-9a5e-20885a96bea7", create_group_failure),
+    ("9a0c7ce5-2f40-4e78-979e-bf3527a59579", create_group_failure),
+    ("e614247d-84d2-491d-a048-f80b578216cb", create_group_failure)))
+def test_create_group(test_app, auth_testdb_path, mocker, test_users, user_id, expected):
+    mocker.patch("gn3.auth.authorisation.groups.uuid.uuid4", group_leader_id)
+    with test_app.test_request_context() as flask_context:
+        flask_context.g.user_id = UUID(user_id)
+        with db.connection(auth_testdb_path) as conn:
+            assert create_group(conn, "a_test_group") == expected