aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/__init__.py0
-rw-r--r--tests/unit/__init__.py0
-rw-r--r--tests/unit/auth/__init__.py0
-rw-r--r--tests/unit/auth/conftest.py24
-rw-r--r--tests/unit/auth/fixtures/__init__.py8
-rw-r--r--tests/unit/auth/fixtures/group_fixtures.py147
-rw-r--r--tests/unit/auth/fixtures/migration_fixtures.py51
-rw-r--r--tests/unit/auth/fixtures/oauth2_client_fixtures.py51
-rw-r--r--tests/unit/auth/fixtures/resource_fixtures.py25
-rw-r--r--tests/unit/auth/fixtures/role_fixtures.py45
-rw-r--r--tests/unit/auth/fixtures/user_fixtures.py66
-rw-r--r--tests/unit/auth/test_credentials.py100
-rw-r--r--tests/unit/auth/test_groups.py168
-rw-r--r--tests/unit/auth/test_migrations_add_data_to_table.py79
-rw-r--r--tests/unit/auth/test_migrations_add_remove_columns.py116
-rw-r--r--tests/unit/auth/test_migrations_create_tables.py91
-rw-r--r--tests/unit/auth/test_migrations_drop_tables.py63
-rw-r--r--tests/unit/auth/test_migrations_indexes.py97
-rw-r--r--tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py60
-rw-r--r--tests/unit/auth/test_migrations_insert_data_into_empty_table.py77
-rw-r--r--tests/unit/auth/test_privileges.py46
-rw-r--r--tests/unit/auth/test_resources.py117
-rw-r--r--tests/unit/auth/test_roles.py123
-rw-r--r--tests/unit/auth/test_token.py62
-rw-r--r--tests/unit/conftest.py35
25 files changed, 1651 insertions, 0 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/unit/__init__.py
diff --git a/tests/unit/auth/__init__.py b/tests/unit/auth/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/unit/auth/__init__.py
diff --git a/tests/unit/auth/conftest.py b/tests/unit/auth/conftest.py
new file mode 100644
index 0000000..a7c64a8
--- /dev/null
+++ b/tests/unit/auth/conftest.py
@@ -0,0 +1,24 @@
+"""Module for fixtures and test utilities"""
+import uuid
+import datetime
+from contextlib import contextmanager
+
+from gn3.auth.authentication.oauth2.models.oauth2token import OAuth2Token
+
+from .fixtures import * # pylint: disable=[wildcard-import,unused-wildcard-import]
+
+def get_tokeniser(user):
+ """Get contextmanager for mocking token acquisition."""
+ @contextmanager
+ def __token__(*args, **kwargs):# pylint: disable=[unused-argument]
+ yield {
+ usr.user_id: OAuth2Token(
+ token_id=uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"),
+ client=None, token_type="Bearer", access_token="123456ABCDE",
+ refresh_token=None, revoked=False, expires_in=864000,
+ user=usr, issued_at=int(datetime.datetime.now().timestamp()),
+ scope="profile group role resource register-client")
+ for usr in TEST_USERS
+ }[user.user_id]
+
+ return __token__
diff --git a/tests/unit/auth/fixtures/__init__.py b/tests/unit/auth/fixtures/__init__.py
new file mode 100644
index 0000000..a675fc7
--- /dev/null
+++ b/tests/unit/auth/fixtures/__init__.py
@@ -0,0 +1,8 @@
+"""pytest's conftest as a module."""
+from .role_fixtures import *
+from .user_fixtures import *
+from .group_fixtures import *
+from .resource_fixtures import *
+# from .privilege_fixtures import *
+from .migration_fixtures import *
+from .oauth2_client_fixtures import *
diff --git a/tests/unit/auth/fixtures/group_fixtures.py b/tests/unit/auth/fixtures/group_fixtures.py
new file mode 100644
index 0000000..d7bbc56
--- /dev/null
+++ b/tests/unit/auth/fixtures/group_fixtures.py
@@ -0,0 +1,147 @@
+"""Fixtures and utilities for group-related tests"""
+import uuid
+
+import pytest
+
+from gn3.auth import db
+from gn3.auth.authorisation.groups import Group, GroupRole
+from gn3.auth.authorisation.resources import Resource, ResourceCategory
+
+from .role_fixtures import RESOURCE_EDITOR_ROLE, RESOURCE_READER_ROLE
+
+TEST_GROUP_01 = Group(uuid.UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"),
+ "TheTestGroup", {})
+TEST_GROUP_02 = Group(uuid.UUID("e37d59d7-c05e-4d67-b479-81e627d8d634"),
+ "AnotherTestGroup", {})
+TEST_GROUPS = (TEST_GROUP_01, TEST_GROUP_02)
+
+TEST_RESOURCES_GROUP_01 = (
+ Resource(TEST_GROUPS[0], uuid.UUID("26ad1668-29f5-439d-b905-84d551f85955"),
+ "ResourceG01R01",
+ ResourceCategory(uuid.UUID("48056f84-a2a6-41ac-8319-0e1e212cba2a"),
+ "genotype", "Genotype Dataset"),
+ True),
+ Resource(TEST_GROUPS[0], uuid.UUID("2130aec0-fefd-434d-92fd-9ca342348b2d"),
+ "ResourceG01R02",
+ ResourceCategory(uuid.UUID("548d684b-d4d1-46fb-a6d3-51a56b7da1b3"),
+ "phenotype", "Phenotype (Publish) Dataset"),
+ False),
+ Resource(TEST_GROUPS[0], uuid.UUID("e9a1184a-e8b4-49fb-b713-8d9cbeea5b83"),
+ "ResourceG01R03",
+ ResourceCategory(uuid.UUID("fad071a3-2fc8-40b8-992b-cdefe7dcac79"),
+ "mrna", "mRNA Dataset"),
+ False))
+
+TEST_RESOURCES_GROUP_02 = (
+ Resource(TEST_GROUPS[1], uuid.UUID("14496a1c-c234-49a2-978c-8859ea274054"),
+ "ResourceG02R01",
+ ResourceCategory(uuid.UUID("48056f84-a2a6-41ac-8319-0e1e212cba2a"),
+ "genotype", "Genotype Dataset"),
+ False),
+ Resource(TEST_GROUPS[1], uuid.UUID("04ad9e09-94ea-4390-8a02-11f92999806b"),
+ "ResourceG02R02",
+ ResourceCategory(uuid.UUID("fad071a3-2fc8-40b8-992b-cdefe7dcac79"),
+ "mrna", "mRNA Dataset"),
+ True))
+
+TEST_RESOURCES = TEST_RESOURCES_GROUP_01 + TEST_RESOURCES_GROUP_02
+TEST_RESOURCES_PUBLIC = (TEST_RESOURCES_GROUP_01[0], TEST_RESOURCES_GROUP_02[1])
+
+def __gtuple__(cursor):
+ return tuple(dict(row) for row in cursor.fetchall())
+
+@pytest.fixture(scope="function")
+def fxtr_group(conn_after_auth_migrations):# pylint: disable=[redefined-outer-name]
+ """Fixture: setup a test group."""
+ query = "INSERT INTO groups(group_id, group_name) VALUES (?, ?)"
+ with db.cursor(conn_after_auth_migrations) as cursor:
+ cursor.executemany(
+ query, tuple(
+ (str(group.group_id), group.group_name)
+ for group in TEST_GROUPS))
+
+ yield (conn_after_auth_migrations, TEST_GROUPS[0])
+
+ with db.cursor(conn_after_auth_migrations) as cursor:
+ cursor.executemany(
+ "DELETE FROM groups WHERE group_id=?",
+ ((str(group.group_id),) for group in TEST_GROUPS))
+
+@pytest.fixture(scope="function")
+def fxtr_users_in_group(fxtr_group, fxtr_users):# pylint: disable=[redefined-outer-name, unused-argument]
+ """Link the users to the groups."""
+ conn, all_users = fxtr_users
+ users = tuple(
+ user for user in all_users if user.email not in ("unaff@iliated.user",))
+ query_params = tuple(
+ (str(TEST_GROUP_01.group_id), str(user.user_id)) for user in users)
+ with db.cursor(conn) as cursor:
+ cursor.executemany(
+ "INSERT INTO group_users(group_id, user_id) VALUES (?, ?)",
+ query_params)
+
+ yield (conn, TEST_GROUP_01, users)
+
+ with db.cursor(conn) as cursor:
+ cursor.executemany(
+ "DELETE FROM group_users WHERE group_id=? AND user_id=?",
+ query_params)
+
+@pytest.fixture(scope="function")
+def fxtr_group_roles(fxtr_group, fxtr_roles):# pylint: disable=[redefined-outer-name,unused-argument]
+ """Link roles to group"""
+ group_roles = (
+ GroupRole(uuid.UUID("9c25efb2-b477-4918-a95c-9914770cbf4d"),
+ TEST_GROUP_01, RESOURCE_EDITOR_ROLE),
+ GroupRole(uuid.UUID("82aed039-fe2f-408c-ab1e-81cd1ba96630"),
+ TEST_GROUP_02, RESOURCE_READER_ROLE))
+ conn, groups = fxtr_group
+ with db.cursor(conn) as cursor:
+ cursor.executemany(
+ "INSERT INTO group_roles VALUES (?, ?, ?)",
+ ((str(role.group_role_id), str(role.group.group_id),
+ str(role.role.role_id))
+ for role in group_roles))
+
+ yield conn, groups, group_roles
+
+ with db.cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM group_user_roles_on_resources")
+ cursor.executemany(
+ ("DELETE FROM group_roles "
+ "WHERE group_role_id=? AND group_id=? AND role_id=?"),
+ ((str(role.group_role_id), str(role.group.group_id),
+ str(role.role.role_id))
+ for role in group_roles))
+
+@pytest.fixture(scope="function")
+def fxtr_group_user_roles(fxtr_resources, fxtr_group_roles, fxtr_users_in_group):#pylint: disable=[redefined-outer-name,unused-argument]
+ """Assign roles to users."""
+ conn, _groups, group_roles = fxtr_group_roles
+ _conn, group_resources = fxtr_resources
+ _conn, _group, group_users = fxtr_users_in_group
+ users = tuple(user for user in group_users if user.email
+ not in ("unaff@iliated.user", "group@lead.er"))
+ users_roles_resources = (
+ (user, RESOURCE_EDITOR_ROLE, TEST_RESOURCES_GROUP_01[1])
+ for user in users if user.email == "group@mem.ber01")
+ with db.cursor(conn) as cursor:
+ params = tuple({
+ "group_id": str(resource.group.group_id),
+ "user_id": str(user.user_id),
+ "role_id": str(role.role_id),
+ "resource_id": str(resource.resource_id)
+ } for user, role, resource in users_roles_resources)
+ cursor.executemany(
+ ("INSERT INTO group_user_roles_on_resources "
+ "VALUES (:group_id, :user_id, :role_id, :resource_id)"),
+ params)
+
+ yield conn, group_users, group_roles, group_resources
+
+ with db.cursor(conn) as cursor:
+ cursor.executemany(
+ ("DELETE FROM group_user_roles_on_resources WHERE "
+ "group_id=:group_id AND user_id=:user_id AND role_id=:role_id AND "
+ "resource_id=:resource_id"),
+ params)
diff --git a/tests/unit/auth/fixtures/migration_fixtures.py b/tests/unit/auth/fixtures/migration_fixtures.py
new file mode 100644
index 0000000..eb42c2b
--- /dev/null
+++ b/tests/unit/auth/fixtures/migration_fixtures.py
@@ -0,0 +1,51 @@
+"""Fixtures and utilities for migration-related tests"""
+import pytest
+from yoyo.backends import DatabaseBackend
+from yoyo import get_backend, read_migrations
+from yoyo.migrations import Migration, MigrationList
+
+from gn3.auth import db
+from gn3.migrations import apply_migrations, rollback_migrations
+
+@pytest.fixture(scope="session")
+def auth_testdb_path(fxtr_app_config): # pylint: disable=redefined-outer-name
+ """Get the test application's auth database file"""
+ return fxtr_app_config["AUTH_DB"]
+
+@pytest.fixture(scope="session")
+def auth_migrations_dir(fxtr_app_config): # pylint: disable=redefined-outer-name
+ """Get the test application's auth database file"""
+ return fxtr_app_config["AUTH_MIGRATIONS"]
+
+def apply_single_migration(backend: DatabaseBackend, migration: Migration):# pylint: disable=[redefined-outer-name]
+ """Utility to apply a single migration"""
+ apply_migrations(backend, MigrationList([migration]))
+
+def rollback_single_migration(backend: DatabaseBackend, migration: Migration):# pylint: disable=[redefined-outer-name]
+ """Utility to rollback a single migration"""
+ rollback_migrations(backend, MigrationList([migration]))
+
+@pytest.fixture(scope="session")
+def backend(auth_testdb_path):# pylint: disable=redefined-outer-name
+ """Fixture: retrieve yoyo backend for auth database"""
+ return get_backend(f"sqlite:///{auth_testdb_path}")
+
+@pytest.fixture(scope="session")
+def all_migrations(auth_migrations_dir): # pylint: disable=redefined-outer-name
+ """Retrieve all the migrations"""
+ return read_migrations(auth_migrations_dir)
+
+@pytest.fixture(scope="function")
+def conn_after_auth_migrations(backend, auth_testdb_path, all_migrations): # pylint: disable=redefined-outer-name
+ """Run all migrations and return a connection to the database after"""
+ apply_migrations(backend, all_migrations)
+ with db.connection(auth_testdb_path) as conn:
+ yield conn
+
+ rollback_migrations(backend, all_migrations)
+
+def migrations_up_to(migration, migrations_dir):
+ """Run all the migration before `migration`."""
+ migrations = read_migrations(migrations_dir)
+ index = [mig.path for mig in migrations].index(migration)
+ return MigrationList(migrations[0:index])
diff --git a/tests/unit/auth/fixtures/oauth2_client_fixtures.py b/tests/unit/auth/fixtures/oauth2_client_fixtures.py
new file mode 100644
index 0000000..654d048
--- /dev/null
+++ b/tests/unit/auth/fixtures/oauth2_client_fixtures.py
@@ -0,0 +1,51 @@
+"""Fixtures for OAuth2 clients"""
+import uuid
+import json
+import datetime
+
+import pytest
+
+from gn3.auth import db
+from gn3.auth.authentication.users import hash_password
+from gn3.auth.authentication.oauth2.models.oauth2client import OAuth2Client
+
+@pytest.fixture(autouse=True)
+def fxtr_patch_envvars(monkeypatch):
+ """Fixture: patch environment variable"""
+ monkeypatch.setenv("AUTHLIB_INSECURE_TRANSPORT", "true")
+
+@pytest.fixture
+def fxtr_oauth2_clients(fxtr_users_with_passwords):
+ """Fixture: Create the OAuth2 clients for use with tests."""
+ conn, users = fxtr_users_with_passwords
+ now = datetime.datetime.now()
+
+ clients = tuple(
+ OAuth2Client(str(uuid.uuid4()), f"yabadabadoo_{idx:03}", now,
+ now + datetime.timedelta(hours = 2),
+ {
+ "client_name": f"test_client_{idx:03}",
+ "scope": ["profile", "group", "role", "resource", "register-client"],
+ "redirect_uri": "/test_oauth2",
+ "token_endpoint_auth_method": [
+ "client_secret_post", "client_secret_basic"],
+ "grant_types": ["password", "authorisation_code", "refresh_token"],
+ "response_type": "token"
+ }, user)
+ for idx, user in enumerate(users, start=1))
+
+ with db.cursor(conn) as cursor:
+ cursor.executemany(
+ "INSERT INTO oauth2_clients VALUES (?, ?, ?, ?, ?, ?)",
+ ((str(client.client_id), hash_password(client.client_secret),
+ int(client.client_id_issued_at.timestamp()),
+ int(client.client_secret_expires_at.timestamp()),
+ json.dumps(client.client_metadata), str(client.user.user_id))
+ for client in clients))
+
+ yield conn, clients
+
+ with db.cursor(conn) as cursor:
+ cursor.executemany(
+ "DELETE FROM oauth2_clients WHERE client_id=?",
+ ((str(client.client_id),) for client in clients))
diff --git a/tests/unit/auth/fixtures/resource_fixtures.py b/tests/unit/auth/fixtures/resource_fixtures.py
new file mode 100644
index 0000000..117b4f4
--- /dev/null
+++ b/tests/unit/auth/fixtures/resource_fixtures.py
@@ -0,0 +1,25 @@
+"""Fixtures and utilities for resource-related tests"""
+import pytest
+
+from gn3.auth import db
+
+from .group_fixtures import TEST_RESOURCES
+
+@pytest.fixture(scope="function")
+def fxtr_resources(fxtr_group):# pylint: disable=[redefined-outer-name]
+ """fixture: setup test resources in the database"""
+ conn, _group = fxtr_group
+ with db.cursor(conn) as cursor:
+ cursor.executemany(
+ "INSERT INTO resources VALUES (?,?,?,?,?)",
+ ((str(res.group.group_id), str(res.resource_id), res.resource_name,
+ str(res.resource_category.resource_category_id),
+ 1 if res.public else 0) for res in TEST_RESOURCES))
+
+ yield (conn, TEST_RESOURCES)
+
+ with db.cursor(conn) as cursor:
+ cursor.executemany(
+ "DELETE FROM resources WHERE group_id=? AND resource_id=?",
+ ((str(res.group.group_id), str(res.resource_id),)
+ for res in TEST_RESOURCES))
diff --git a/tests/unit/auth/fixtures/role_fixtures.py b/tests/unit/auth/fixtures/role_fixtures.py
new file mode 100644
index 0000000..ee86aa2
--- /dev/null
+++ b/tests/unit/auth/fixtures/role_fixtures.py
@@ -0,0 +1,45 @@
+"""Fixtures and utilities for role-related tests"""
+import uuid
+
+import pytest
+
+from gn3.auth import db
+from gn3.auth.authorisation.roles import Role
+from gn3.auth.authorisation.privileges import Privilege
+
+RESOURCE_READER_ROLE = Role(
+ uuid.UUID("c3ca2507-ee24-4835-9b31-8c21e1c072d3"), "resource_reader", True,
+ (Privilege("group:resource:view-resource",
+ "view a resource and use it in computations"),))
+
+RESOURCE_EDITOR_ROLE = Role(
+ uuid.UUID("89819f84-6346-488b-8955-86062e9eedb7"), "resource_editor", True,
+ (
+ Privilege("group:resource:view-resource",
+ "view a resource and use it in computations"),
+ Privilege("group:resource:edit-resource", "edit/update a resource")))
+
+TEST_ROLES = (RESOURCE_READER_ROLE, RESOURCE_EDITOR_ROLE)
+
+@pytest.fixture(scope="function")
+def fxtr_roles(conn_after_auth_migrations):
+ """Setup some example roles."""
+ with db.cursor(conn_after_auth_migrations) as cursor:
+ cursor.executemany(
+ ("INSERT INTO roles VALUES (?, ?, ?)"),
+ ((str(role.role_id), role.role_name, 1) for role in TEST_ROLES))
+ cursor.executemany(
+ ("INSERT INTO role_privileges VALUES (?, ?)"),
+ ((str(role.role_id), str(privilege.privilege_id))
+ for role in TEST_ROLES for privilege in role.privileges))
+
+ yield conn_after_auth_migrations, TEST_ROLES
+
+ with db.cursor(conn_after_auth_migrations) as cursor:
+ cursor.executemany(
+ ("DELETE FROM role_privileges WHERE role_id=? AND privilege_id=?"),
+ ((str(role.role_id), str(privilege.privilege_id))
+ for role in TEST_ROLES for privilege in role.privileges))
+ cursor.executemany(
+ ("DELETE FROM roles WHERE role_id=?"),
+ ((str(role.role_id),) for role in TEST_ROLES))
diff --git a/tests/unit/auth/fixtures/user_fixtures.py b/tests/unit/auth/fixtures/user_fixtures.py
new file mode 100644
index 0000000..d248f54
--- /dev/null
+++ b/tests/unit/auth/fixtures/user_fixtures.py
@@ -0,0 +1,66 @@
+"""Fixtures and utilities for user-related tests"""
+import uuid
+
+import pytest
+
+from gn3.auth import db
+from gn3.auth.authentication.users import User, hash_password
+
+TEST_USERS = (
+ User(uuid.UUID("ecb52977-3004-469e-9428-2a1856725c7f"), "group@lead.er",
+ "Group Leader"),
+ User(uuid.UUID("21351b66-8aad-475b-84ac-53ce528451e3"),
+ "group@mem.ber01", "Group Member 01"),
+ User(uuid.UUID("ae9c6245-0966-41a5-9a5e-20885a96bea7"),
+ "group@mem.ber02", "Group Member 02"),
+ User(uuid.UUID("9a0c7ce5-2f40-4e78-979e-bf3527a59579"),
+ "unaff@iliated.user", "Unaffiliated User"))
+
+@pytest.fixture(scope="function")
+def fxtr_users(conn_after_auth_migrations):# pylint: disable=[redefined-outer-name]
+ """Fixture: setup test users."""
+ query = "INSERT INTO users(user_id, email, name) VALUES (?, ?, ?)"
+ query_user_roles = "INSERT INTO user_roles(user_id, role_id) VALUES (?, ?)"
+ test_user_roles = (
+ ("ecb52977-3004-469e-9428-2a1856725c7f",
+ "a0e67630-d502-4b9f-b23f-6805d0f30e30"),
+ ("ecb52977-3004-469e-9428-2a1856725c7f",
+ "ade7e6b0-ba9c-4b51-87d0-2af7fe39a347"))
+ with db.cursor(conn_after_auth_migrations) as cursor:
+ cursor.executemany(query, (
+ (str(user.user_id), user.email, user.name) for user in TEST_USERS))
+ cursor.executemany(query_user_roles, test_user_roles)
+
+ yield (conn_after_auth_migrations, TEST_USERS)
+
+ with db.cursor(conn_after_auth_migrations) as cursor:
+ cursor.executemany(
+ "DELETE FROM user_roles WHERE user_id=?",
+ (("ecb52977-3004-469e-9428-2a1856725c7f",),))
+ cursor.executemany(
+ "DELETE FROM users WHERE user_id=?",
+ (("ecb52977-3004-469e-9428-2a1856725c7f",),
+ ("21351b66-8aad-475b-84ac-53ce528451e3",),
+ ("ae9c6245-0966-41a5-9a5e-20885a96bea7",),
+ ("9a0c7ce5-2f40-4e78-979e-bf3527a59579",)))
+
+@pytest.fixture(scope="function")
+def fxtr_users_with_passwords(fxtr_users): # pylint: disable=[redefined-outer-name]
+ """Fixture: add passwords to the users"""
+ conn, users = fxtr_users
+ user_passwords_params = tuple(
+ (str(user.user_id), hash_password(
+ f"password_for_user_{idx:03}".encode("utf8")))
+ for idx, user in enumerate(users, start=1))
+
+ with db.cursor(conn) as cursor:
+ cursor.executemany(
+ "INSERT INTO user_credentials VALUES (?, ?)",
+ user_passwords_params)
+
+ yield conn, users
+
+ with db.cursor(conn) as cursor:
+ cursor.executemany(
+ "DELETE FROM user_credentials WHERE user_id=?",
+ ((item[0],) for item in user_passwords_params))
diff --git a/tests/unit/auth/test_credentials.py b/tests/unit/auth/test_credentials.py
new file mode 100644
index 0000000..f2a3d25
--- /dev/null
+++ b/tests/unit/auth/test_credentials.py
@@ -0,0 +1,100 @@
+"""Test the credentials checks"""
+import pytest
+from yoyo.migrations import MigrationList
+from hypothesis import given, settings, strategies, HealthCheck
+
+from gn3.auth import db
+from gn3.auth.authentication import credentials_in_database
+from gn3.migrations import get_migration, apply_migrations, rollback_migrations
+
+from tests.unit.auth.conftest import migrations_up_to
+
+@pytest.fixture
+def with_credentials_table(backend, auth_testdb_path):
+ """
+ Fixture: Yield a connection object with the 'user_credentials' table
+ created.
+ """
+ migrations_dir = "migrations/auth"
+ migration = f"{migrations_dir}/20221103_02_sGrIs-create-user-credentials-table.py"
+ migrations = (migrations_up_to(migration, migrations_dir) +
+ MigrationList([get_migration(migration)]))
+ apply_migrations(backend, migrations)
+ with db.connection(auth_testdb_path) as conn:
+ yield conn
+
+ rollback_migrations(backend, migrations)
+
+@pytest.fixture
+def with_credentials(with_credentials_table):# pylint: disable=redefined-outer-name
+ """
+ Fixture: Initialise the database with some user credentials.
+ """
+ with db.cursor(with_credentials_table) as cursor:
+ cursor.executemany(
+ "INSERT INTO users VALUES (:user_id, :email, :name)",
+ ({"user_id": "82552014-21ee-4321-b96a-b8788b97b862",
+ "email": "first@test.user",
+ "name": "First Test User"
+ },
+ {"user_id": "bdd5cb7a-072d-4c2b-9872-d0cecb718523",
+ "email": "second@test.user",
+ "name": "Second Test User"
+ }))
+ cursor.executemany(
+ "INSERT INTO user_credentials VALUES (:user_id, :password)",
+ ({"user_id": "82552014-21ee-4321-b96a-b8788b97b862",
+ "password": b'$2b$12$LAh1PYtUgAFK7d5fA0EfL.4AdTZuYEAfzwO.p.jXVboxcP8bXNj7a'
+ },
+ {"user_id": "bdd5cb7a-072d-4c2b-9872-d0cecb718523",
+ "password": b'$2b$12$zX77QCFSJuwIjAZGc0Jq5.rCWMHEMKD9Zf3Ay4C0AzwsiZ7SSPdKO'
+ }))
+
+ yield with_credentials_table
+
+ cursor.executemany("DELETE FROM user_credentials WHERE user_id=?",
+ (("82552014-21ee-4321-b96a-b8788b97b862",),
+ ("bdd5cb7a-072d-4c2b-9872-d0cecb718523",)))
+ cursor.executemany("DELETE FROM users WHERE user_id=?",
+ (("82552014-21ee-4321-b96a-b8788b97b862",),
+ ("bdd5cb7a-072d-4c2b-9872-d0cecb718523",)))
+
+@pytest.mark.unit_test
+@given(strategies.emails(), strategies.text())
+@settings(suppress_health_check=[HealthCheck.function_scoped_fixture])
+def test_credentials_not_in_database(with_credentials, email, password):# pylint: disable=redefined-outer-name
+ """
+ GIVEN: credentials that do not exist in the database
+ WHEN: the `credentials_in_database` function is run against the credentials
+ THEN: check that the function returns false in all cases.
+ """
+ with db.cursor(with_credentials) as cursor:
+ assert credentials_in_database(cursor, email, password) is False
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "email,password",
+ (("first@test.user", "wrongpassword"),
+ ("first@tes.user", "testuser01")))
+def test_partially_wrong_credentials(with_credentials, email, password):# pylint: disable=redefined-outer-name
+ """
+ GIVEN: credentials that exist in the database
+ WHEN: the credentials are checked with partially wrong values
+ THEN: the check fails since the credentials are not correct
+ """
+ with db.cursor(with_credentials) as cursor:
+ assert credentials_in_database(cursor, email, password) is False
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "email,password",
+ (("first@test.user", "testuser01"),
+ ("second@test.user", "testuser02")))
+def test_partially_correct_credentials(with_credentials, email, password):# pylint: disable=redefined-outer-name
+ """
+ GIVEN: credentials that exist in the database
+ WHEN: the credentials are checked with correct values
+ THEN: the check passes
+ """
+ with db.cursor(with_credentials) as cursor:
+ assert credentials_in_database(cursor, email, password) is True
diff --git a/tests/unit/auth/test_groups.py b/tests/unit/auth/test_groups.py
new file mode 100644
index 0000000..4824e14
--- /dev/null
+++ b/tests/unit/auth/test_groups.py
@@ -0,0 +1,168 @@
+"""Test functions dealing with group management."""
+from uuid import UUID
+
+import pytest
+from pymonad.maybe import Nothing
+
+from gn3.auth import db
+from gn3.auth.authentication.users import User
+from gn3.auth.authorisation.roles import Role
+from gn3.auth.authorisation.privileges import Privilege
+from gn3.auth.authorisation.errors import AuthorisationError
+from gn3.auth.authorisation.groups.models import (
+ Group, GroupRole, user_group, create_group, create_group_role)
+
+from tests.unit.auth import conftest
+
+create_group_failure = {
+ "status": "error",
+ "message": "Unauthorised: Failed to create group."
+}
+
+uuid_fn = lambda : UUID("d32611e3-07fc-4564-b56c-786c6db6de2b")
+
+GROUP = Group(UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup",
+ {"group_description": "The test group"})
+PRIVILEGES = (
+ Privilege(
+ "group:resource:view-resource",
+ "view a resource and use it in computations"),
+ Privilege("group:resource:edit-resource", "edit/update a resource"))
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "user,expected", tuple(zip(conftest.TEST_USERS[0:1], (
+ Group(
+ UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_group",
+ {"group_description": "A test group"}),
+ create_group_failure, create_group_failure, create_group_failure,
+ create_group_failure))))
+def test_create_group(# pylint: disable=[too-many-arguments]
+ fxtr_app, auth_testdb_path, mocker, fxtr_users, user, expected):# pylint: disable=[unused-argument]
+ """
+ GIVEN: an authenticated user
+ WHEN: the user attempts to create a group
+ THEN: verify they are only able to create the group if they have the
+ appropriate privileges
+ """
+ mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn)
+ mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire",
+ conftest.get_tokeniser(user))
+ with db.connection(auth_testdb_path) as conn:
+ assert create_group(
+ conn, "a_test_group", user, "A test group") == expected
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize("user", conftest.TEST_USERS[1:])
+def test_create_group_raises_exception_with_non_privileged_user(# pylint: disable=[too-many-arguments]
+ fxtr_app, auth_testdb_path, mocker, fxtr_users, user):# pylint: disable=[unused-argument]
+ """
+ GIVEN: an authenticated user, without appropriate privileges
+ WHEN: the user attempts to create a group
+ THEN: verify the system raises an exception
+ """
+ mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn)
+ mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire",
+ conftest.get_tokeniser(user))
+ with db.connection(auth_testdb_path) as conn:
+ with pytest.raises(AuthorisationError):
+ assert create_group(conn, "a_test_group", user, "A test group")
+
+create_role_failure = {
+ "status": "error",
+ "message": "Unauthorised: Could not create the group role"
+}
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "user,expected", tuple(zip(conftest.TEST_USERS[0:1], (
+ GroupRole(
+ UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"),
+ GROUP,
+ Role(UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"),
+ "ResourceEditor", True, PRIVILEGES)),))))
+def test_create_group_role(mocker, fxtr_users_in_group, user, expected):
+ """
+ GIVEN: an authenticated user
+ WHEN: the user attempts to create a role, attached to a group
+ THEN: verify they are only able to create the role if they have the
+ appropriate privileges and that the role is attached to the given group
+ """
+ mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn)
+ mocker.patch("gn3.auth.authorisation.roles.models.uuid4", uuid_fn)
+ mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire",
+ conftest.get_tokeniser(user))
+ conn, _group, _users = fxtr_users_in_group
+ with db.cursor(conn) as cursor:
+ assert create_group_role(
+ conn, GROUP, "ResourceEditor", PRIVILEGES) == expected
+ # cleanup
+ cursor.execute(
+ ("DELETE FROM group_roles "
+ "WHERE group_role_id=? AND group_id=? AND role_id=?"),
+ (str(uuid_fn()), str(GROUP.group_id), str(uuid_fn())))
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "user,expected", tuple(zip(conftest.TEST_USERS[1:], (
+ create_role_failure, create_role_failure, create_role_failure))))
+def test_create_group_role_raises_exception_with_unauthorised_users(
+ mocker, fxtr_users_in_group, user, expected):
+ """
+ GIVEN: an authenticated user
+ WHEN: the user attempts to create a role, attached to a group
+ THEN: verify they are only able to create the role if they have the
+ appropriate privileges and that the role is attached to the given group
+ """
+ mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn)
+ mocker.patch("gn3.auth.authorisation.roles.models.uuid4", uuid_fn)
+ mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire",
+ conftest.get_tokeniser(user))
+ conn, _group, _users = fxtr_users_in_group
+ with pytest.raises(AuthorisationError):
+ assert create_group_role(
+ conn, GROUP, "ResourceEditor", PRIVILEGES) == expected
+
+@pytest.mark.unit_test
+def test_create_multiple_groups(mocker, fxtr_users):
+ """
+ GIVEN: An authenticated user with appropriate authorisation
+ WHEN: The user attempts to create a new group, while being a member of an
+ existing group
+ THEN: The system should prevent that, and respond with an appropriate error
+ message
+ """
+ mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn)
+ user = User(
+ UUID("ecb52977-3004-469e-9428-2a1856725c7f"), "group@lead.er",
+ "Group Leader")
+ mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire",
+ conftest.get_tokeniser(user))
+ conn, _test_users = fxtr_users
+ # First time, successfully creates the group
+ assert create_group(conn, "a_test_group", user) == Group(
+ UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_group",
+ {})
+ # subsequent attempts should fail
+ with pytest.raises(AuthorisationError):
+ create_group(conn, "another_test_group", user)
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "user,expected",
+ tuple(zip(
+ conftest.TEST_USERS,
+ (([Group(UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup", {})] * 3)
+ + [Nothing]))))
+def test_user_group(fxtr_users_in_group, user, expected):
+ """
+ GIVEN: A bunch of registered users, some of whom are members of a group, and
+ others are not
+ WHEN: a particular user's group is requested,
+ THEN: return a Maybe containing the group that the user belongs to, or
+ Nothing
+ """
+ conn, _group, _users = fxtr_users_in_group
+ assert (
+ user_group(conn, user).maybe(Nothing, lambda val: val)
+ == expected)
diff --git a/tests/unit/auth/test_migrations_add_data_to_table.py b/tests/unit/auth/test_migrations_add_data_to_table.py
new file mode 100644
index 0000000..9cb5d0c
--- /dev/null
+++ b/tests/unit/auth/test_migrations_add_data_to_table.py
@@ -0,0 +1,79 @@
+"""Test data insertion when migrations are run."""
+import pytest
+
+from gn3.auth import db
+from gn3.migrations import get_migration, apply_migrations, rollback_migrations
+from tests.unit.auth.conftest import (
+ apply_single_migration, rollback_single_migration, migrations_up_to)
+
+test_params = (
+ ("20221116_01_nKUmX-add-privileges-to-group-leader-role.py",
+ ("SELECT role_id, privilege_id FROM role_privileges "
+ "WHERE role_id=? AND privilege_id IN (?, ?, ?, ?)"),
+ ("a0e67630-d502-4b9f-b23f-6805d0f30e30",
+ "221660b1-df05-4be1-b639-f010269dbda9",
+ "7bcca363-cba9-4169-9e31-26bdc6179b28",
+ "5103cc68-96f8-4ebb-83a4-a31692402c9b",
+ "1c59eff5-9336-4ed2-a166-8f70d4cb012e"),
+ (("a0e67630-d502-4b9f-b23f-6805d0f30e30",
+ "221660b1-df05-4be1-b639-f010269dbda9"),
+ ("a0e67630-d502-4b9f-b23f-6805d0f30e30",
+ "7bcca363-cba9-4169-9e31-26bdc6179b28"),
+ ("a0e67630-d502-4b9f-b23f-6805d0f30e30",
+ "5103cc68-96f8-4ebb-83a4-a31692402c9b"),
+ ("a0e67630-d502-4b9f-b23f-6805d0f30e30",
+ "1c59eff5-9336-4ed2-a166-8f70d4cb012e"))),)
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize("migration_file,query,query_params,data", test_params)
+def test_apply_insert(# pylint: disable=[too-many-arguments]
+ auth_migrations_dir, backend, auth_testdb_path, migration_file, query,
+ query_params, data):
+ """
+ GIVEN: a database migration script
+ WHEN: the script is applied
+ THEN: ensure the given data exists in the table
+ """
+ migration_path=f"{auth_migrations_dir}/{migration_file}"
+ older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
+ the_migration = get_migration(migration_path)
+ apply_migrations(backend, older_migrations)
+ with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor:
+ cursor.execute(query, query_params)
+ result_before_migration = cursor.fetchall()
+ apply_single_migration(backend, the_migration)
+ cursor.execute(query, query_params)
+ result_after_migration = cursor.fetchall()
+
+ rollback_migrations(backend, older_migrations + [the_migration])
+ assert len(result_before_migration) == 0, "Expected no results before migration"
+ assert sorted(result_after_migration) == sorted(data)
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize("migration_file,query,query_params,data", test_params)
+def test_rollback_insert(# pylint: disable=[too-many-arguments]
+ auth_migrations_dir, backend, auth_testdb_path, migration_file, query,
+ query_params, data):
+ """
+ GIVEN: a database migration script
+ WHEN: the script is rolled back
+ THEN: ensure the given data no longer exists in the database
+ """
+ migration_path=f"{auth_migrations_dir}/{migration_file}"
+ older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
+ the_migration = get_migration(migration_path)
+ apply_migrations(backend, older_migrations)
+ with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor:
+ cursor.execute(query, query_params)
+ result_before_migration = cursor.fetchall()
+ apply_single_migration(backend, the_migration)
+ cursor.execute(query, query_params)
+ result_after_migration = cursor.fetchall()
+ rollback_single_migration(backend, the_migration)
+ cursor.execute(query, query_params)
+ result_after_rollback = cursor.fetchall()
+
+ rollback_migrations(backend, older_migrations)
+ assert len(result_before_migration) == 0, "Expected no results before migration"
+ assert sorted(result_after_migration) == sorted(data)
+ assert len(result_after_rollback) == 0, "Expected no results after rollback"
diff --git a/tests/unit/auth/test_migrations_add_remove_columns.py b/tests/unit/auth/test_migrations_add_remove_columns.py
new file mode 100644
index 0000000..ea9bf7b
--- /dev/null
+++ b/tests/unit/auth/test_migrations_add_remove_columns.py
@@ -0,0 +1,116 @@
+"""Test migrations that alter tables adding/removing columns."""
+import pytest
+
+from gn3.auth import db
+from gn3.migrations import get_migration, apply_migrations, rollback_migrations
+from tests.unit.auth.conftest import (
+ apply_single_migration, rollback_single_migration, migrations_up_to)
+
+QUERY = "SELECT sql FROM sqlite_schema WHERE name=?"
+
+TEST_PARAMS = (
+ ("20221109_01_HbD5F-add-resource-meta-field-to-resource-categories-field.py",
+ "resource_categories", "resource_meta TEXT", True),
+ (("20221110_08_23psB-add-privilege-category-and-privilege-description-"
+ "columns-to-privileges-table.py"),
+ "privileges", "privilege_category TEXT", True),
+ (("20221110_08_23psB-add-privilege-category-and-privilege-description-"
+ "columns-to-privileges-table.py"),
+ "privileges", "privilege_description TEXT", True),
+ ("20221117_01_RDlfx-modify-group-roles-add-group-role-id.py", "group_roles",
+ "group_role_id", True),
+ ("20221208_01_sSdHz-add-public-column-to-resources-table.py", "resources",
+ "public", True))
+
+def found(haystack: str, needle: str) -> bool:
+ """Check whether `needle` is found in `haystack`"""
+ return any(
+ (line.strip().find(needle) >= 0) for line in haystack.split("\n"))
+
+def pristine_before_migration(adding: bool, result_str: str, column: str) -> bool:
+ """Check that database is pristine before running the migration"""
+ col_was_found = found(result_str, column)
+ if adding:
+ return not col_was_found
+ return col_was_found
+
+def applied_successfully(adding: bool, result_str: str, column: str) -> bool:
+ """Check that the migration ran successfully"""
+ col_was_found = found(result_str, column)
+ if adding:
+ return col_was_found
+ return not col_was_found
+
+def rolled_back_successfully(adding: bool, result_str: str, column: str) -> bool:
+ """Check that the migration ran successfully"""
+ col_was_found = found(result_str, column)
+ if adding:
+ return not col_was_found
+ return col_was_found
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "migration_file,the_table,the_column,adding", TEST_PARAMS)
+def test_apply_add_remove_column(# pylint: disable=[too-many-arguments]
+ auth_migrations_dir, auth_testdb_path, backend, migration_file,
+ the_table, the_column, adding):
+ """
+ GIVEN: A migration that alters a table, adding or removing a column
+ WHEN: The migration is applied
+ THEN: Ensure the column exists if `adding` is True, otherwise, ensure the
+ column has been dropped
+ """
+ migration_path = f"{auth_migrations_dir}/{migration_file}"
+ older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
+ the_migration = get_migration(migration_path)
+ apply_migrations(backend, older_migrations)
+ with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+ cursor.execute(QUERY, (the_table,))
+ results_before_migration = cursor.fetchone()
+ apply_single_migration(backend, the_migration)
+ cursor.execute(QUERY, (the_table,))
+ results_after_migration = cursor.fetchone()
+
+ rollback_migrations(backend, older_migrations + [the_migration])
+
+ assert pristine_before_migration(
+ adding, results_before_migration[0], the_column), (
+ f"Column `{the_column}` exists before migration and should not"
+ if adding else
+ f"Column `{the_column}` doesn't exist before migration and it should")
+ assert applied_successfully(
+ adding, results_after_migration[0], the_column), "Migration failed"
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "migration_file,the_table,the_column,adding", TEST_PARAMS)
+def test_rollback_add_remove_column(# pylint: disable=[too-many-arguments]
+ auth_migrations_dir, auth_testdb_path, backend, migration_file,
+ the_table, the_column, adding):
+ """
+ GIVEN: A migration that alters a table, adding or removing a column
+ WHEN: The migration is applied
+ THEN: Ensure the column is dropped if `adding` is True, otherwise, ensure
+ the column has been restored
+ """
+ migration_path = f"{auth_migrations_dir}/{migration_file}"
+ older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
+ the_migration = get_migration(migration_path)
+ apply_migrations(backend, older_migrations)
+ apply_single_migration(backend, the_migration)
+ with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+ cursor.execute(QUERY, (the_table,))
+ results_before_rollback = cursor.fetchone()
+ rollback_single_migration(backend, the_migration)
+ cursor.execute(QUERY, (the_table,))
+ results_after_rollback = cursor.fetchone()
+
+ rollback_migrations(backend, older_migrations + [the_migration])
+
+ assert pristine_before_migration(
+ not adding, results_before_rollback[0], the_column), (
+ f"Column `{the_column}` doesn't exist before rollback and it should"
+ if adding else
+ f"Column `{the_column}` exists before rollback and should not")
+ assert rolled_back_successfully(
+ adding, results_after_rollback[0], the_column), "Rollback failed"
diff --git a/tests/unit/auth/test_migrations_create_tables.py b/tests/unit/auth/test_migrations_create_tables.py
new file mode 100644
index 0000000..2b8140b
--- /dev/null
+++ b/tests/unit/auth/test_migrations_create_tables.py
@@ -0,0 +1,91 @@
+"""Test migrations that create tables"""
+import pytest
+
+from gn3.auth import db
+from gn3.migrations import get_migration, apply_migrations, rollback_migrations
+from tests.unit.auth.conftest import (
+ apply_single_migration, rollback_single_migration, migrations_up_to)
+
+migrations_and_tables = (
+ ("20221103_01_js9ub-initialise-the-auth-entic-oris-ation-database.py",
+ "users"),
+ ("20221103_02_sGrIs-create-user-credentials-table.py", "user_credentials"),
+ ("20221108_01_CoxYh-create-the-groups-table.py", "groups"),
+ ("20221108_02_wxTr9-create-privileges-table.py", "privileges"),
+ ("20221108_03_Pbhb1-create-resource-categories-table.py", "resource_categories"),
+ ("20221110_01_WtZ1I-create-resources-table.py", "resources"),
+ ("20221110_05_BaNtL-create-roles-table.py", "roles"),
+ ("20221110_06_Pq2kT-create-generic-roles-table.py", "generic_roles"),
+ ("20221110_07_7WGa1-create-role-privileges-table.py", "role_privileges"),
+ ("20221114_01_n8gsF-create-generic-role-privileges-table.py",
+ "generic_role_privileges"),
+ ("20221114_03_PtWjc-create-group-roles-table.py", "group_roles"),
+ ("20221114_05_hQun6-create-user-roles-table.py", "user_roles"),
+ ("20221117_02_fmuZh-create-group-users-table.py", "group_users"),
+ ("20221206_01_BbeF9-create-group-user-roles-on-resources-table.py",
+ "group_user_roles_on_resources"),
+ ("20221219_01_CI3tN-create-oauth2-clients-table.py", "oauth2_clients"),
+ ("20221219_02_buSEU-create-oauth2-tokens-table.py", "oauth2_tokens"),
+ ("20221219_03_PcTrb-create-authorisation-code-table.py",
+ "authorisation_code"),
+ ("20230207_01_r0bkZ-create-group-join-requests-table.py",
+ "group_join_requests"),
+ ("20230322_01_0dDZR-create-linked-phenotype-data-table.py",
+ "linked_phenotype_data"),
+ ("20230322_02_Ll854-create-phenotype-resources-table.py",
+ "phenotype_resources"),
+ ("20230404_01_VKxXg-create-linked-genotype-data-table.py",
+ "linked_genotype_data"),
+ ("20230404_02_la33P-create-genotype-resources-table.py",
+ "genotype_resources"),
+ ("20230410_01_8mwaf-create-linked-mrna-data-table.py", "linked_mrna_data"),
+ ("20230410_02_WZqSf-create-mrna-resources-table.py", "mrna_resources"))
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize("migration_file,the_table", migrations_and_tables)
+def test_create_table(
+ auth_testdb_path, auth_migrations_dir, backend, migration_file,
+ the_table):
+ """
+ GIVEN: A database migration script to create table, `the_table`
+ WHEN: The migration is applied
+ THEN: Ensure that the table `the_table` is created
+ """
+ migration_path=f"{auth_migrations_dir}/{migration_file}"
+ older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
+ apply_migrations(backend, older_migrations)
+ with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+ cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'")
+ result_before_migration = cursor.fetchall()
+ apply_single_migration(backend, get_migration(migration_path))
+ cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'")
+ result_after_migration = cursor.fetchall()
+
+ rollback_migrations(backend, older_migrations)
+ assert the_table not in [row[0] for row in result_before_migration]
+ assert the_table in [row[0] for row in result_after_migration]
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize("migration_file,the_table", migrations_and_tables)
+def test_rollback_create_table(
+ auth_testdb_path, auth_migrations_dir, backend, migration_file,
+ the_table):
+ """
+ GIVEN: A database migration script to create the table `the_table`
+ WHEN: The migration is rolled back
+ THEN: Ensure that the table `the_table` no longer exists
+ """
+ migration_path=f"{auth_migrations_dir}/{migration_file}"
+ older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
+ apply_migrations(backend, older_migrations)
+ with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+ apply_single_migration(backend, get_migration(migration_path))
+ cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'")
+ result_after_migration = cursor.fetchall()
+ rollback_single_migration(backend, get_migration(migration_path))
+ cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'")
+ result_after_rollback = cursor.fetchall()
+
+ rollback_migrations(backend, older_migrations)
+ assert the_table in [row[0] for row in result_after_migration]
+ assert the_table not in [row[0] for row in result_after_rollback]
diff --git a/tests/unit/auth/test_migrations_drop_tables.py b/tests/unit/auth/test_migrations_drop_tables.py
new file mode 100644
index 0000000..2362c77
--- /dev/null
+++ b/tests/unit/auth/test_migrations_drop_tables.py
@@ -0,0 +1,63 @@
+"""Test migrations that create tables"""
+
+import pytest
+
+from gn3.auth import db
+from gn3.migrations import get_migration, apply_migrations, rollback_migrations
+from tests.unit.auth.conftest import (
+ apply_single_migration, rollback_single_migration, migrations_up_to)
+
+test_params = (
+ ("20221114_02_DKKjn-drop-generic-role-tables.py", "generic_roles"),
+ ("20221114_02_DKKjn-drop-generic-role-tables.py", "generic_role_privileges"))
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize("migration_file,the_table", test_params)
+def test_drop_table(
+ auth_testdb_path, auth_migrations_dir, backend,
+ migration_file, the_table):
+ """
+ GIVEN: A database migration script to create table, `the_table`
+ WHEN: The migration is applied
+ THEN: Ensure that the table `the_table` is created
+ """
+ migration_path=f"{auth_migrations_dir}/{migration_file}"
+ older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
+ the_migration = get_migration(migration_path)
+ apply_migrations(backend, older_migrations)
+ with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+ cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'")
+ result_before_migration = cursor.fetchall()
+ apply_single_migration(backend, the_migration)
+ cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'")
+ result_after_migration = cursor.fetchall()
+
+ rollback_migrations(backend, older_migrations + [the_migration])
+ assert the_table in [row[0] for row in result_before_migration]
+ assert the_table not in [row[0] for row in result_after_migration]
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize("migration_file,the_table", test_params)
+def test_rollback_drop_table(
+ auth_testdb_path, auth_migrations_dir, backend, migration_file,
+ the_table):
+ """
+ GIVEN: A database migration script to create the table `the_table`
+ WHEN: The migration is rolled back
+ THEN: Ensure that the table `the_table` no longer exists
+ """
+ migration_path=f"{auth_migrations_dir}/{migration_file}"
+ older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
+ the_migration = get_migration(migration_path)
+ apply_migrations(backend, older_migrations)
+ with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+ apply_single_migration(backend, the_migration)
+ cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'")
+ result_after_migration = cursor.fetchall()
+ rollback_single_migration(backend, the_migration)
+ cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'")
+ result_after_rollback = cursor.fetchall()
+
+ rollback_migrations(backend, older_migrations)
+ assert the_table not in [row[0] for row in result_after_migration]
+ assert the_table in [row[0] for row in result_after_rollback]
diff --git a/tests/unit/auth/test_migrations_indexes.py b/tests/unit/auth/test_migrations_indexes.py
new file mode 100644
index 0000000..b1f06d9
--- /dev/null
+++ b/tests/unit/auth/test_migrations_indexes.py
@@ -0,0 +1,97 @@
+"""Test that indexes are created and removed."""
+import pytest
+
+from gn3.auth import db
+from gn3.migrations import get_migration, apply_migrations, rollback_migrations
+from tests.unit.auth.conftest import (
+ apply_single_migration, rollback_single_migration, migrations_up_to)
+
+QUERY = """
+SELECT name FROM sqlite_master WHERE type='index' AND tbl_name = ?
+AND name= ?
+"""
+
+migrations_tables_and_indexes = (
+ ("20221110_07_7WGa1-create-role-privileges-table.py", "role_privileges",
+ "idx_tbl_role_privileges_cols_role_id"),
+ ("20221114_01_n8gsF-create-generic-role-privileges-table.py",
+ "generic_role_privileges",
+ "idx_tbl_generic_role_privileges_cols_generic_role_id"),
+ ("20221114_03_PtWjc-create-group-roles-table.py", "group_roles",
+ "idx_tbl_group_roles_cols_group_id"),
+ ("20221114_05_hQun6-create-user-roles-table.py", "user_roles",
+ "idx_tbl_user_roles_cols_user_id"),
+ ("20221117_02_fmuZh-create-group-users-table.py", "group_users",
+ "tbl_group_users_cols_group_id"),
+ ("20221206_01_BbeF9-create-group-user-roles-on-resources-table.py",
+ "group_user_roles_on_resources",
+ "idx_tbl_group_user_roles_on_resources_group_user_resource"))
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "migration_file,the_table,the_index", migrations_tables_and_indexes)
+def test_index_created(# pylint: disable=[too-many-arguments]
+ auth_testdb_path, auth_migrations_dir, backend, migration_file,
+ the_table, the_index):
+ """
+ GIVEN: A database migration
+ WHEN: The migration is applied
+ THEN: Ensure the given index is created for the provided table
+ """
+ migration_path=f"{auth_migrations_dir}/{migration_file}"
+ older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
+ the_migration = get_migration(migration_path)
+ query_params = (the_table, the_index)
+ apply_migrations(backend, older_migrations)
+ with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+ cursor.execute(QUERY, query_params)
+ result_before_migration = cursor.fetchall()
+ apply_single_migration(backend, the_migration)
+ cursor.execute(QUERY, query_params)
+ result_after_migration = cursor.fetchall()
+
+ rollback_migrations(backend, older_migrations + [the_migration])
+ assert the_index not in [row[0] for row in result_before_migration], (
+ f"Index '{the_index}' was found for table '{the_table}' before migration.")
+ assert (
+ len(result_after_migration) == 1
+ and result_after_migration[0][0] == the_index), (
+ f"Index '{the_index}' was not found for table '{the_table}' after migration.")
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "migration_file,the_table,the_index", migrations_tables_and_indexes)
+def test_index_dropped(# pylint: disable=[too-many-arguments]
+ auth_testdb_path, auth_migrations_dir, backend, migration_file,
+ the_table, the_index):
+ """
+ GIVEN: A database migration
+ WHEN: The migration is rolled-back
+ THEN: Ensure the given index no longer exists for the given table
+ """
+ migration_path=f"{auth_migrations_dir}/{migration_file}"
+ older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
+ the_migration = get_migration(migration_path)
+ query_params = (the_table, the_index)
+ apply_migrations(backend, older_migrations)
+ with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+ cursor.execute(QUERY, query_params)
+ result_before_migration = cursor.fetchall()
+ apply_single_migration(backend, the_migration)
+ cursor.execute(QUERY, query_params)
+ result_after_migration = cursor.fetchall()
+ rollback_single_migration(backend, the_migration)
+ cursor.execute(QUERY, query_params)
+ result_after_rollback = cursor.fetchall()
+
+ rollback_migrations(backend, older_migrations)
+ assert the_index not in [row[0] for row in result_before_migration], (
+ f"Index '{the_index}' was found for table '{the_table}' before "
+ "migration")
+ assert (
+ len(result_after_migration) == 1
+ and result_after_migration[0][0] == the_index), (
+ f"Index '{the_index}' was not found for table '{the_table}' after migration.")
+ assert the_index not in [row[0] for row in result_after_rollback], (
+ f"Index '{the_index}' was found for table '{the_table}' after "
+ "rollback")
diff --git a/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py b/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py
new file mode 100644
index 0000000..dd3d4c6
--- /dev/null
+++ b/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py
@@ -0,0 +1,60 @@
+"""
+Test that the `resource_categories` table is initialised with the startup data.
+"""
+import pytest
+
+from gn3.auth import db
+from gn3.migrations import get_migration, apply_migrations, rollback_migrations
+from tests.unit.auth.conftest import (
+ apply_single_migration, rollback_single_migration, migrations_up_to)
+
+MIGRATION_PATH = "migrations/auth/20221108_04_CKcSL-init-data-in-resource-categories-table.py"
+
+@pytest.mark.unit_test
+def test_apply_init_data(auth_testdb_path, auth_migrations_dir, backend):
+ """
+ GIVEN: A migration script
+ WHEN: The migration is applied
+ THEN: Verify that the expected data exists in the table
+ """
+ older_migrations = migrations_up_to(MIGRATION_PATH, auth_migrations_dir)
+ the_migration = get_migration(MIGRATION_PATH)
+ apply_migrations(backend, older_migrations)
+ with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM resource_categories")
+ assert len(cursor.fetchall()) == 0, "Expected empty table."
+ apply_single_migration(backend, the_migration)
+ cursor.execute("SELECT * FROM resource_categories")
+ results = cursor.fetchall()
+ assert len(results) == 3, "Expected 3 rows of data."
+ assert sorted(results) == sorted((
+ ('fad071a3-2fc8-40b8-992b-cdefe7dcac79', 'mrna', 'mRNA Dataset'),
+ ('548d684b-d4d1-46fb-a6d3-51a56b7da1b3', 'phenotype',
+ 'Phenotype (Publish) Dataset'),
+ ('48056f84-a2a6-41ac-8319-0e1e212cba2a', 'genotype',
+ 'Genotype Dataset')))
+
+ rollback_migrations(backend, older_migrations + [the_migration])
+
+@pytest.mark.unit_test
+def test_rollback_init_data(auth_testdb_path, auth_migrations_dir, backend):
+ """
+ GIVEN: A migration script
+ WHEN: The migration is rolled back
+ THEN: Verify that the table is empty
+ """
+ older_migrations = migrations_up_to(MIGRATION_PATH, auth_migrations_dir)
+ the_migration = get_migration(MIGRATION_PATH)
+ apply_migrations(backend, older_migrations)
+ with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor:
+ cursor.execute("SELECT * FROM resource_categories")
+ assert len(cursor.fetchall()) == 0, "Expected empty table."
+ apply_single_migration(backend, the_migration)
+ cursor.execute("SELECT * FROM resource_categories")
+ results = cursor.fetchall()
+ assert len(results) == 3, "Expected 3 rows of data."
+ rollback_single_migration(backend, the_migration)
+ cursor.execute("SELECT * FROM resource_categories")
+ assert len(cursor.fetchall()) == 0, "Expected empty table."
+
+ rollback_migrations(backend, older_migrations)
diff --git a/tests/unit/auth/test_migrations_insert_data_into_empty_table.py b/tests/unit/auth/test_migrations_insert_data_into_empty_table.py
new file mode 100644
index 0000000..ebb7fa6
--- /dev/null
+++ b/tests/unit/auth/test_migrations_insert_data_into_empty_table.py
@@ -0,0 +1,77 @@
+"""Test data insertion when migrations are run."""
+import sqlite3
+from contextlib import closing
+
+import pytest
+
+from gn3.migrations import get_migration, apply_migrations, rollback_migrations
+from tests.unit.auth.conftest import (
+ apply_single_migration, rollback_single_migration, migrations_up_to)
+
+test_params = (
+ ("20221113_01_7M0hv-enumerate-initial-privileges.py", "privileges", 19),
+ ("20221114_04_tLUzB-initialise-basic-roles.py", "roles", 2),
+ ("20221114_04_tLUzB-initialise-basic-roles.py", "role_privileges", 15))
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "migration_file,table,row_count", test_params)
+def test_apply_insert(# pylint: disable=[too-many-arguments]
+ auth_testdb_path, auth_migrations_dir, backend, migration_file,
+ table, row_count):
+ """
+ GIVEN: A database migration
+ WHEN: The migration is applied
+ THEN: Ensure the given number of rows are inserted into the table
+ """
+ migration_path=f"{auth_migrations_dir}/{migration_file}"
+ older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
+ the_migration = get_migration(migration_path)
+ apply_migrations(backend, older_migrations)
+ with closing(sqlite3.connect(auth_testdb_path)) as conn, closing(conn.cursor()) as cursor:
+ query = f"SELECT COUNT(*) FROM {table}"
+ cursor.execute(query)
+ result_before_migration = cursor.fetchall()
+ apply_single_migration(backend, the_migration)
+ cursor.execute(query)
+ result_after_migration = cursor.fetchall()
+
+ rollback_migrations(backend, older_migrations+[the_migration])
+ assert result_before_migration[0][0] == 0, (
+ "Expected empty table before initialisation")
+ assert result_after_migration[0][0] == row_count, (
+ f"Expected {row_count} rows")
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "migration_file,table,row_count", test_params)
+def test_rollback_insert(# pylint: disable=[too-many-arguments]
+ auth_testdb_path, auth_migrations_dir, backend, migration_file,
+ table, row_count):
+ """
+ GIVEN: A database migration
+ WHEN: The migration is applied
+ THEN: Ensure the given number of rows are inserted into the table
+ """
+ migration_path=f"{auth_migrations_dir}/{migration_file}"
+ older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
+ the_migration = get_migration(migration_path)
+ apply_migrations(backend, older_migrations)
+ with closing(sqlite3.connect(auth_testdb_path)) as conn, closing(conn.cursor()) as cursor:
+ query = f"SELECT COUNT(*) FROM {table}"
+ cursor.execute(query)
+ result_before_migration = cursor.fetchall()
+ apply_single_migration(backend, the_migration)
+ cursor.execute(query)
+ result_after_migration = cursor.fetchall()
+ rollback_single_migration(backend, the_migration)
+ cursor.execute(query)
+ result_after_rollback = cursor.fetchall()
+
+ rollback_migrations(backend, older_migrations)
+ assert result_before_migration[0][0] == 0, (
+ "Expected empty table before initialisation")
+ assert result_after_migration[0][0] == row_count, (
+ f"Expected {row_count} rows")
+ assert result_after_rollback[0][0] == 0, (
+ "Expected empty table after rollback")
diff --git a/tests/unit/auth/test_privileges.py b/tests/unit/auth/test_privileges.py
new file mode 100644
index 0000000..8395293
--- /dev/null
+++ b/tests/unit/auth/test_privileges.py
@@ -0,0 +1,46 @@
+"""Test the privileges module"""
+import pytest
+
+from gn3.auth import db
+from gn3.auth.authorisation.privileges import Privilege, user_privileges
+
+from tests.unit.auth import conftest
+
+SORT_KEY = lambda x: x.privilege_id
+
+PRIVILEGES = sorted(
+ (Privilege("system:group:create-group", "Create a group"),
+ Privilege("system:group:view-group", "View the details of a group"),
+ Privilege("system:group:edit-group", "Edit the details of a group"),
+ Privilege("system:user:list", "List users in the system"),
+ Privilege("system:group:delete-group", "Delete a group"),
+ Privilege("group:user:add-group-member", "Add a user to a group"),
+ Privilege("group:user:remove-group-member", "Remove a user from a group"),
+ Privilege("system:group:transfer-group-leader",
+ "Transfer leadership of the group to some other member"),
+
+ Privilege("group:resource:create-resource", "Create a resource object"),
+ Privilege("group:resource:view-resource",
+ "view a resource and use it in computations"),
+ Privilege("group:resource:edit-resource", "edit/update a resource"),
+ Privilege("group:resource:delete-resource", "Delete a resource"),
+
+ Privilege("group:role:create-role", "Create a new role"),
+ Privilege("group:role:edit-role", "edit/update an existing role"),
+ Privilege("group:user:assign-role", "Assign a role to an existing user"),
+ Privilege("group:role:delete-role", "Delete an existing role")),
+ key=SORT_KEY)
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "user,expected", tuple(zip(
+ conftest.TEST_USERS, (PRIVILEGES, [], [], [], []))))
+def test_user_privileges(auth_testdb_path, fxtr_users, user, expected):# pylint: disable=[unused-argument]
+ """
+ GIVEN: A user
+ WHEN: An attempt is made to fetch the user's privileges
+ THEN: Ensure only
+ """
+ with db.connection(auth_testdb_path) as conn:
+ assert sorted(
+ user_privileges(conn, user), key=SORT_KEY) == expected
diff --git a/tests/unit/auth/test_resources.py b/tests/unit/auth/test_resources.py
new file mode 100644
index 0000000..2884add
--- /dev/null
+++ b/tests/unit/auth/test_resources.py
@@ -0,0 +1,117 @@
+"""Test resource-management functions"""
+import uuid
+
+import pytest
+
+from gn3.auth import db
+
+from gn3.auth.authorisation.groups import Group
+from gn3.auth.authorisation.errors import AuthorisationError
+from gn3.auth.authorisation.resources.models import (
+ Resource, user_resources, create_resource, ResourceCategory,
+ public_resources)
+
+from tests.unit.auth import conftest
+
+group = Group(uuid.UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup",
+ {})
+resource_category = ResourceCategory(
+ uuid.UUID("fad071a3-2fc8-40b8-992b-cdefe7dcac79"), "mrna", "mRNA Dataset")
+create_resource_failure = {
+ "status": "error",
+ "message": "Unauthorised: Could not create resource"
+}
+uuid_fn = lambda : uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b")
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "user,expected",
+ tuple(zip(
+ conftest.TEST_USERS[0:1],
+ (Resource(
+ group, uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"),
+ "test_resource", resource_category, False),))))
+def test_create_resource(mocker, fxtr_users_in_group, user, expected):
+ """Test that resource creation works as expected."""
+ mocker.patch("gn3.auth.authorisation.resources.models.uuid4", uuid_fn)
+ mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire",
+ conftest.get_tokeniser(user))
+ conn, _group, _users = fxtr_users_in_group
+ resource = create_resource(
+ conn, "test_resource", resource_category, user, False)
+ assert resource == expected
+
+ with db.cursor(conn) as cursor:
+ # Cleanup
+ cursor.execute(
+ "DELETE FROM group_user_roles_on_resources WHERE resource_id=?",
+ (str(resource.resource_id),))
+ cursor.execute(
+ "DELETE FROM group_roles WHERE group_id=?",
+ (str(resource.group.group_id),))
+ cursor.execute(
+ "DELETE FROM resources WHERE resource_id=?",
+ (str(resource.resource_id),))
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "user,expected",
+ tuple(zip(
+ conftest.TEST_USERS[1:],
+ (create_resource_failure, create_resource_failure,
+ create_resource_failure))))
+def test_create_resource_raises_for_unauthorised_users(
+ mocker, fxtr_users_in_group, user, expected):
+ """Test that resource creation works as expected."""
+ mocker.patch("gn3.auth.authorisation.resources.models.uuid4", uuid_fn)
+ mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire",
+ conftest.get_tokeniser(user))
+ conn, _group, _users = fxtr_users_in_group
+ with pytest.raises(AuthorisationError):
+ assert create_resource(
+ conn, "test_resource", resource_category, user, False) == expected
+
+SORTKEY = lambda resource: resource.resource_id
+
+@pytest.mark.unit_test
+def test_public_resources(fxtr_resources):
+ """
+ GIVEN: some resources in the database
+ WHEN: public resources are requested
+ THEN: only list the resources that are public
+ """
+ conn, _res = fxtr_resources
+ assert sorted(public_resources(conn), key=SORTKEY) == sorted(tuple(
+ res for res in conftest.TEST_RESOURCES if res.public), key=SORTKEY)
+
+PUBLIC_RESOURCES = sorted(
+ {res.resource_id: res for res in conftest.TEST_RESOURCES_PUBLIC}.values(),
+ key=SORTKEY)
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "user,expected",
+ tuple(zip(
+ conftest.TEST_USERS,
+ (sorted(
+ {res.resource_id: res for res in
+ (conftest.TEST_RESOURCES_GROUP_01 +
+ conftest.TEST_RESOURCES_PUBLIC)}.values(),
+ key=SORTKEY),
+ sorted(
+ {res.resource_id: res for res in
+ ((conftest.TEST_RESOURCES_GROUP_01[1],) +
+ conftest.TEST_RESOURCES_PUBLIC)}.values()
+ ,
+ key=SORTKEY),
+ PUBLIC_RESOURCES, PUBLIC_RESOURCES))))
+def test_user_resources(fxtr_group_user_roles, user, expected):
+ """
+ GIVEN: some resources in the database
+ WHEN: a particular user's resources are requested
+ THEN: list only the resources for which the user can access
+ """
+ conn, *_others = fxtr_group_user_roles
+ assert sorted(
+ {res.resource_id: res for res in user_resources(conn, user)
+ }.values(), key=SORTKEY) == expected
diff --git a/tests/unit/auth/test_roles.py b/tests/unit/auth/test_roles.py
new file mode 100644
index 0000000..02fd9f7
--- /dev/null
+++ b/tests/unit/auth/test_roles.py
@@ -0,0 +1,123 @@
+"""Test functions dealing with group management."""
+import uuid
+
+import pytest
+
+from gn3.auth import db
+from gn3.auth.authorisation.privileges import Privilege
+from gn3.auth.authorisation.errors import AuthorisationError
+from gn3.auth.authorisation.roles.models import Role, user_roles, create_role
+
+from tests.unit.auth import conftest
+from tests.unit.auth.fixtures import TEST_USERS
+
+create_role_failure = {
+ "status": "error",
+ "message": "Unauthorised: Could not create role"
+}
+
+uuid_fn = lambda : uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b")
+
+PRIVILEGES = (
+ Privilege("group:resource:view-resource",
+ "view a resource and use it in computations"),
+ Privilege("group:resource:edit-resource", "edit/update a resource"))
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "user,expected", tuple(zip(conftest.TEST_USERS[0:1], (
+ Role(uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_role",
+ True, PRIVILEGES),))))
+def test_create_role(# pylint: disable=[too-many-arguments]
+ fxtr_app, auth_testdb_path, mocker, fxtr_users, user, expected):# pylint: disable=[unused-argument]
+ """
+ GIVEN: an authenticated user
+ WHEN: the user attempts to create a role
+ THEN: verify they are only able to create the role if they have the
+ appropriate privileges
+ """
+ mocker.patch("gn3.auth.authorisation.roles.models.uuid4", uuid_fn)
+ mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire",
+ conftest.get_tokeniser(user))
+ with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+ the_role = create_role(cursor, "a_test_role", PRIVILEGES)
+ assert the_role == expected
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "user,expected", tuple(zip(conftest.TEST_USERS[1:], (
+ create_role_failure, create_role_failure, create_role_failure))))
+def test_create_role_raises_exception_for_unauthorised_users(# pylint: disable=[too-many-arguments]
+ fxtr_app, auth_testdb_path, mocker, fxtr_users, user, expected):# pylint: disable=[unused-argument]
+ """
+ GIVEN: an authenticated user
+ WHEN: the user attempts to create a role
+ THEN: verify they are only able to create the role if they have the
+ appropriate privileges
+ """
+ mocker.patch("gn3.auth.authorisation.roles.models.uuid4", uuid_fn)
+ mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire",
+ conftest.get_tokeniser(user))
+ with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+ with pytest.raises(AuthorisationError):
+ create_role(cursor, "a_test_role", PRIVILEGES)
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "user,expected",
+ (zip(TEST_USERS,
+ ((Role(
+ role_id=uuid.UUID('a0e67630-d502-4b9f-b23f-6805d0f30e30'),
+ role_name='group-leader', user_editable=False,
+ privileges=(
+ Privilege(privilege_id='group:resource:create-resource',
+ privilege_description='Create a resource object'),
+ Privilege(privilege_id='group:resource:delete-resource',
+ privilege_description='Delete a resource'),
+ Privilege(privilege_id='group:resource:edit-resource',
+ privilege_description='edit/update a resource'),
+ Privilege(
+ privilege_id='group:resource:view-resource',
+ privilege_description=(
+ 'view a resource and use it in computations')),
+ Privilege(privilege_id='group:role:create-role',
+ privilege_description='Create a new role'),
+ Privilege(privilege_id='group:role:delete-role',
+ privilege_description='Delete an existing role'),
+ Privilege(privilege_id='group:role:edit-role',
+ privilege_description='edit/update an existing role'),
+ Privilege(privilege_id='group:user:add-group-member',
+ privilege_description='Add a user to a group'),
+ Privilege(privilege_id='group:user:assign-role',
+ privilege_description=(
+ 'Assign a role to an existing user')),
+ Privilege(privilege_id='group:user:remove-group-member',
+ privilege_description='Remove a user from a group'),
+ Privilege(privilege_id='system:group:delete-group',
+ privilege_description='Delete a group'),
+ Privilege(privilege_id='system:group:edit-group',
+ privilege_description='Edit the details of a group'),
+ Privilege(
+ privilege_id='system:group:transfer-group-leader',
+ privilege_description=(
+ 'Transfer leadership of the group to some other '
+ 'member')),
+ Privilege(privilege_id='system:group:view-group',
+ privilege_description='View the details of a group'),
+ Privilege(privilege_id='system:user:list',
+ privilege_description='List users in the system'))),
+ Role(
+ role_id=uuid.UUID("ade7e6b0-ba9c-4b51-87d0-2af7fe39a347"),
+ role_name="group-creator", user_editable=False,
+ privileges=(
+ Privilege(privilege_id='system:group:create-group',
+ privilege_description = "Create a group"),))),
+ tuple(), tuple(), tuple()))))
+def test_user_roles(fxtr_group_user_roles, user, expected):
+ """
+ GIVEN: an authenticated user
+ WHEN: we request the user's privileges
+ THEN: return **ALL** the privileges attached to the user
+ """
+ conn, *_others = fxtr_group_user_roles
+ assert user_roles(conn, user) == expected
diff --git a/tests/unit/auth/test_token.py b/tests/unit/auth/test_token.py
new file mode 100644
index 0000000..76316ea
--- /dev/null
+++ b/tests/unit/auth/test_token.py
@@ -0,0 +1,62 @@
+"""Test the OAuth2 authorisation"""
+
+import pytest
+
+from gn3.auth import db
+
+SUCCESS_RESULT = {
+ "status_code": 200,
+ "result": {
+ "access_token": "123456ABCDE",
+ "expires_in": 864000,
+ "scope": "profile",
+ "token_type": "Bearer"}}
+
+USERNAME_PASSWORD_FAIL_RESULT = {
+ "status_code": 400,
+ "result": {
+ 'error': 'invalid_request',
+ 'error_description': 'Invalid "username" or "password" in request.'}}
+
+def gen_token(client, grant_type, user, scope): # pylint: disable=[unused-argument]
+ """Generate tokens for tests"""
+ return "123456ABCDE"
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+ "test_data,expected",
+ ((("group@lead.er", "password_for_user_001", 0), SUCCESS_RESULT),
+ (("group@mem.ber01", "password_for_user_002", 1), SUCCESS_RESULT),
+ (("group@mem.ber02", "password_for_user_003", 2), SUCCESS_RESULT),
+ (("unaff@iliated.user", "password_for_user_004", 3), SUCCESS_RESULT),
+ (("group@lead.er", "brrr", 0), USERNAME_PASSWORD_FAIL_RESULT),
+ (("group@mem.ber010", "password_for_user_002", 1), USERNAME_PASSWORD_FAIL_RESULT),
+ (("papa", "yada", 2), USERNAME_PASSWORD_FAIL_RESULT),
+ # (("unaff@iliated.user", "password_for_user_004", 1), USERNAME_PASSWORD_FAIL_RESULT)
+ ))
+def test_token(fxtr_app, fxtr_oauth2_clients, test_data, expected):
+ """
+ GIVEN: a registered oauth2 client, a user
+ WHEN: a token is requested via the 'password' grant
+ THEN: check that:
+ a) when email and password are valid, we get a token back
+ b) when either email or password or both are invalid, we get error message
+ back
+ c) TODO: when user tries to use wrong client, we get error message back
+ """
+ conn, oa2clients = fxtr_oauth2_clients
+ email, password, client_idx = test_data
+ data = {
+ "grant_type": "password", "scope": "profile nonexistent-scope",
+ "client_id": oa2clients[client_idx].client_id,
+ "client_secret": oa2clients[client_idx].client_secret,
+ "username": email, "password": password}
+
+ with fxtr_app.test_client() as client, db.cursor(conn) as cursor:
+ res = client.post("/api/oauth2/token", data=data)
+ # cleanup db
+ cursor.execute("DELETE FROM oauth2_tokens WHERE access_token=?",
+ (gen_token(None, None, None, None),))
+ assert res.status_code == expected["status_code"]
+ for key in expected["result"]:
+ assert res.json[key] == expected["result"][key]
diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py
new file mode 100644
index 0000000..8005c8e
--- /dev/null
+++ b/tests/unit/conftest.py
@@ -0,0 +1,35 @@
+"""Fixtures for unit tests."""
+from pathlib import Path
+from datetime import datetime
+from tempfile import TemporaryDirectory
+
+import pytest
+
+from gn3.app import create_app
+
+@pytest.fixture(scope="session")
+def fxtr_app():
+ """Fixture: setup the test app"""
+ # Do some setup
+ with TemporaryDirectory() as testdir:
+ testdb = Path(testdir).joinpath(
+ f'testdb_{datetime.now().strftime("%Y%m%dT%H%M%S")}')
+ app = create_app({
+ "TESTING": True, "AUTH_DB": testdb,
+ "OAUTH2_ACCESS_TOKEN_GENERATOR": "tests.unit.auth.test_token.gen_token"
+ })
+ app.testing = True
+ yield app
+ # Clean up after ourselves
+ testdb.unlink(missing_ok=True)
+
+@pytest.fixture(scope="session")
+def client(fxtr_app): # pylint: disable=redefined-outer-name
+ """Create a test client fixture for tests"""
+ with fxtr_app.app_context():
+ yield fxtr_app.test_client()
+
+@pytest.fixture(scope="session")
+def fxtr_app_config(client): # pylint: disable=redefined-outer-name
+ """Return the test application's configuration object"""
+ return client.application.config