diff options
Diffstat (limited to 'tests/unit/auth')
22 files changed, 0 insertions, 1629 deletions
diff --git a/tests/unit/auth/__init__.py b/tests/unit/auth/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/tests/unit/auth/__init__.py +++ /dev/null diff --git a/tests/unit/auth/conftest.py b/tests/unit/auth/conftest.py deleted file mode 100644 index a7c64a8..0000000 --- a/tests/unit/auth/conftest.py +++ /dev/null @@ -1,24 +0,0 @@ -"""Module for fixtures and test utilities""" -import uuid -import datetime -from contextlib import contextmanager - -from gn3.auth.authentication.oauth2.models.oauth2token import OAuth2Token - -from .fixtures import * # pylint: disable=[wildcard-import,unused-wildcard-import] - -def get_tokeniser(user): - """Get contextmanager for mocking token acquisition.""" - @contextmanager - def __token__(*args, **kwargs):# pylint: disable=[unused-argument] - yield { - usr.user_id: OAuth2Token( - token_id=uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), - client=None, token_type="Bearer", access_token="123456ABCDE", - refresh_token=None, revoked=False, expires_in=864000, - user=usr, issued_at=int(datetime.datetime.now().timestamp()), - scope="profile group role resource register-client") - for usr in TEST_USERS - }[user.user_id] - - return __token__ diff --git a/tests/unit/auth/fixtures/__init__.py b/tests/unit/auth/fixtures/__init__.py deleted file mode 100644 index a675fc7..0000000 --- a/tests/unit/auth/fixtures/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -"""pytest's conftest as a module.""" -from .role_fixtures import * -from .user_fixtures import * -from .group_fixtures import * -from .resource_fixtures import * -# from .privilege_fixtures import * -from .migration_fixtures import * -from .oauth2_client_fixtures import * diff --git a/tests/unit/auth/fixtures/group_fixtures.py b/tests/unit/auth/fixtures/group_fixtures.py deleted file mode 100644 index d7bbc56..0000000 --- a/tests/unit/auth/fixtures/group_fixtures.py +++ /dev/null @@ -1,147 +0,0 @@ -"""Fixtures and utilities for group-related tests""" -import uuid - -import pytest - -from gn3.auth import db -from gn3.auth.authorisation.groups import Group, GroupRole -from gn3.auth.authorisation.resources import Resource, ResourceCategory - -from .role_fixtures import RESOURCE_EDITOR_ROLE, RESOURCE_READER_ROLE - -TEST_GROUP_01 = Group(uuid.UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), - "TheTestGroup", {}) -TEST_GROUP_02 = Group(uuid.UUID("e37d59d7-c05e-4d67-b479-81e627d8d634"), - "AnotherTestGroup", {}) -TEST_GROUPS = (TEST_GROUP_01, TEST_GROUP_02) - -TEST_RESOURCES_GROUP_01 = ( - Resource(TEST_GROUPS[0], uuid.UUID("26ad1668-29f5-439d-b905-84d551f85955"), - "ResourceG01R01", - ResourceCategory(uuid.UUID("48056f84-a2a6-41ac-8319-0e1e212cba2a"), - "genotype", "Genotype Dataset"), - True), - Resource(TEST_GROUPS[0], uuid.UUID("2130aec0-fefd-434d-92fd-9ca342348b2d"), - "ResourceG01R02", - ResourceCategory(uuid.UUID("548d684b-d4d1-46fb-a6d3-51a56b7da1b3"), - "phenotype", "Phenotype (Publish) Dataset"), - False), - Resource(TEST_GROUPS[0], uuid.UUID("e9a1184a-e8b4-49fb-b713-8d9cbeea5b83"), - "ResourceG01R03", - ResourceCategory(uuid.UUID("fad071a3-2fc8-40b8-992b-cdefe7dcac79"), - "mrna", "mRNA Dataset"), - False)) - -TEST_RESOURCES_GROUP_02 = ( - Resource(TEST_GROUPS[1], uuid.UUID("14496a1c-c234-49a2-978c-8859ea274054"), - "ResourceG02R01", - ResourceCategory(uuid.UUID("48056f84-a2a6-41ac-8319-0e1e212cba2a"), - "genotype", "Genotype Dataset"), - False), - Resource(TEST_GROUPS[1], uuid.UUID("04ad9e09-94ea-4390-8a02-11f92999806b"), - "ResourceG02R02", - ResourceCategory(uuid.UUID("fad071a3-2fc8-40b8-992b-cdefe7dcac79"), - "mrna", "mRNA Dataset"), - True)) - -TEST_RESOURCES = TEST_RESOURCES_GROUP_01 + TEST_RESOURCES_GROUP_02 -TEST_RESOURCES_PUBLIC = (TEST_RESOURCES_GROUP_01[0], TEST_RESOURCES_GROUP_02[1]) - -def __gtuple__(cursor): - return tuple(dict(row) for row in cursor.fetchall()) - -@pytest.fixture(scope="function") -def fxtr_group(conn_after_auth_migrations):# pylint: disable=[redefined-outer-name] - """Fixture: setup a test group.""" - query = "INSERT INTO groups(group_id, group_name) VALUES (?, ?)" - with db.cursor(conn_after_auth_migrations) as cursor: - cursor.executemany( - query, tuple( - (str(group.group_id), group.group_name) - for group in TEST_GROUPS)) - - yield (conn_after_auth_migrations, TEST_GROUPS[0]) - - with db.cursor(conn_after_auth_migrations) as cursor: - cursor.executemany( - "DELETE FROM groups WHERE group_id=?", - ((str(group.group_id),) for group in TEST_GROUPS)) - -@pytest.fixture(scope="function") -def fxtr_users_in_group(fxtr_group, fxtr_users):# pylint: disable=[redefined-outer-name, unused-argument] - """Link the users to the groups.""" - conn, all_users = fxtr_users - users = tuple( - user for user in all_users if user.email not in ("unaff@iliated.user",)) - query_params = tuple( - (str(TEST_GROUP_01.group_id), str(user.user_id)) for user in users) - with db.cursor(conn) as cursor: - cursor.executemany( - "INSERT INTO group_users(group_id, user_id) VALUES (?, ?)", - query_params) - - yield (conn, TEST_GROUP_01, users) - - with db.cursor(conn) as cursor: - cursor.executemany( - "DELETE FROM group_users WHERE group_id=? AND user_id=?", - query_params) - -@pytest.fixture(scope="function") -def fxtr_group_roles(fxtr_group, fxtr_roles):# pylint: disable=[redefined-outer-name,unused-argument] - """Link roles to group""" - group_roles = ( - GroupRole(uuid.UUID("9c25efb2-b477-4918-a95c-9914770cbf4d"), - TEST_GROUP_01, RESOURCE_EDITOR_ROLE), - GroupRole(uuid.UUID("82aed039-fe2f-408c-ab1e-81cd1ba96630"), - TEST_GROUP_02, RESOURCE_READER_ROLE)) - conn, groups = fxtr_group - with db.cursor(conn) as cursor: - cursor.executemany( - "INSERT INTO group_roles VALUES (?, ?, ?)", - ((str(role.group_role_id), str(role.group.group_id), - str(role.role.role_id)) - for role in group_roles)) - - yield conn, groups, group_roles - - with db.cursor(conn) as cursor: - cursor.execute("SELECT * FROM group_user_roles_on_resources") - cursor.executemany( - ("DELETE FROM group_roles " - "WHERE group_role_id=? AND group_id=? AND role_id=?"), - ((str(role.group_role_id), str(role.group.group_id), - str(role.role.role_id)) - for role in group_roles)) - -@pytest.fixture(scope="function") -def fxtr_group_user_roles(fxtr_resources, fxtr_group_roles, fxtr_users_in_group):#pylint: disable=[redefined-outer-name,unused-argument] - """Assign roles to users.""" - conn, _groups, group_roles = fxtr_group_roles - _conn, group_resources = fxtr_resources - _conn, _group, group_users = fxtr_users_in_group - users = tuple(user for user in group_users if user.email - not in ("unaff@iliated.user", "group@lead.er")) - users_roles_resources = ( - (user, RESOURCE_EDITOR_ROLE, TEST_RESOURCES_GROUP_01[1]) - for user in users if user.email == "group@mem.ber01") - with db.cursor(conn) as cursor: - params = tuple({ - "group_id": str(resource.group.group_id), - "user_id": str(user.user_id), - "role_id": str(role.role_id), - "resource_id": str(resource.resource_id) - } for user, role, resource in users_roles_resources) - cursor.executemany( - ("INSERT INTO group_user_roles_on_resources " - "VALUES (:group_id, :user_id, :role_id, :resource_id)"), - params) - - yield conn, group_users, group_roles, group_resources - - with db.cursor(conn) as cursor: - cursor.executemany( - ("DELETE FROM group_user_roles_on_resources WHERE " - "group_id=:group_id AND user_id=:user_id AND role_id=:role_id AND " - "resource_id=:resource_id"), - params) diff --git a/tests/unit/auth/fixtures/migration_fixtures.py b/tests/unit/auth/fixtures/migration_fixtures.py deleted file mode 100644 index eb42c2b..0000000 --- a/tests/unit/auth/fixtures/migration_fixtures.py +++ /dev/null @@ -1,51 +0,0 @@ -"""Fixtures and utilities for migration-related tests""" -import pytest -from yoyo.backends import DatabaseBackend -from yoyo import get_backend, read_migrations -from yoyo.migrations import Migration, MigrationList - -from gn3.auth import db -from gn3.migrations import apply_migrations, rollback_migrations - -@pytest.fixture(scope="session") -def auth_testdb_path(fxtr_app_config): # pylint: disable=redefined-outer-name - """Get the test application's auth database file""" - return fxtr_app_config["AUTH_DB"] - -@pytest.fixture(scope="session") -def auth_migrations_dir(fxtr_app_config): # pylint: disable=redefined-outer-name - """Get the test application's auth database file""" - return fxtr_app_config["AUTH_MIGRATIONS"] - -def apply_single_migration(backend: DatabaseBackend, migration: Migration):# pylint: disable=[redefined-outer-name] - """Utility to apply a single migration""" - apply_migrations(backend, MigrationList([migration])) - -def rollback_single_migration(backend: DatabaseBackend, migration: Migration):# pylint: disable=[redefined-outer-name] - """Utility to rollback a single migration""" - rollback_migrations(backend, MigrationList([migration])) - -@pytest.fixture(scope="session") -def backend(auth_testdb_path):# pylint: disable=redefined-outer-name - """Fixture: retrieve yoyo backend for auth database""" - return get_backend(f"sqlite:///{auth_testdb_path}") - -@pytest.fixture(scope="session") -def all_migrations(auth_migrations_dir): # pylint: disable=redefined-outer-name - """Retrieve all the migrations""" - return read_migrations(auth_migrations_dir) - -@pytest.fixture(scope="function") -def conn_after_auth_migrations(backend, auth_testdb_path, all_migrations): # pylint: disable=redefined-outer-name - """Run all migrations and return a connection to the database after""" - apply_migrations(backend, all_migrations) - with db.connection(auth_testdb_path) as conn: - yield conn - - rollback_migrations(backend, all_migrations) - -def migrations_up_to(migration, migrations_dir): - """Run all the migration before `migration`.""" - migrations = read_migrations(migrations_dir) - index = [mig.path for mig in migrations].index(migration) - return MigrationList(migrations[0:index]) diff --git a/tests/unit/auth/fixtures/oauth2_client_fixtures.py b/tests/unit/auth/fixtures/oauth2_client_fixtures.py deleted file mode 100644 index 654d048..0000000 --- a/tests/unit/auth/fixtures/oauth2_client_fixtures.py +++ /dev/null @@ -1,51 +0,0 @@ -"""Fixtures for OAuth2 clients""" -import uuid -import json -import datetime - -import pytest - -from gn3.auth import db -from gn3.auth.authentication.users import hash_password -from gn3.auth.authentication.oauth2.models.oauth2client import OAuth2Client - -@pytest.fixture(autouse=True) -def fxtr_patch_envvars(monkeypatch): - """Fixture: patch environment variable""" - monkeypatch.setenv("AUTHLIB_INSECURE_TRANSPORT", "true") - -@pytest.fixture -def fxtr_oauth2_clients(fxtr_users_with_passwords): - """Fixture: Create the OAuth2 clients for use with tests.""" - conn, users = fxtr_users_with_passwords - now = datetime.datetime.now() - - clients = tuple( - OAuth2Client(str(uuid.uuid4()), f"yabadabadoo_{idx:03}", now, - now + datetime.timedelta(hours = 2), - { - "client_name": f"test_client_{idx:03}", - "scope": ["profile", "group", "role", "resource", "register-client"], - "redirect_uri": "/test_oauth2", - "token_endpoint_auth_method": [ - "client_secret_post", "client_secret_basic"], - "grant_types": ["password", "authorisation_code", "refresh_token"], - "response_type": "token" - }, user) - for idx, user in enumerate(users, start=1)) - - with db.cursor(conn) as cursor: - cursor.executemany( - "INSERT INTO oauth2_clients VALUES (?, ?, ?, ?, ?, ?)", - ((str(client.client_id), hash_password(client.client_secret), - int(client.client_id_issued_at.timestamp()), - int(client.client_secret_expires_at.timestamp()), - json.dumps(client.client_metadata), str(client.user.user_id)) - for client in clients)) - - yield conn, clients - - with db.cursor(conn) as cursor: - cursor.executemany( - "DELETE FROM oauth2_clients WHERE client_id=?", - ((str(client.client_id),) for client in clients)) diff --git a/tests/unit/auth/fixtures/resource_fixtures.py b/tests/unit/auth/fixtures/resource_fixtures.py deleted file mode 100644 index 117b4f4..0000000 --- a/tests/unit/auth/fixtures/resource_fixtures.py +++ /dev/null @@ -1,25 +0,0 @@ -"""Fixtures and utilities for resource-related tests""" -import pytest - -from gn3.auth import db - -from .group_fixtures import TEST_RESOURCES - -@pytest.fixture(scope="function") -def fxtr_resources(fxtr_group):# pylint: disable=[redefined-outer-name] - """fixture: setup test resources in the database""" - conn, _group = fxtr_group - with db.cursor(conn) as cursor: - cursor.executemany( - "INSERT INTO resources VALUES (?,?,?,?,?)", - ((str(res.group.group_id), str(res.resource_id), res.resource_name, - str(res.resource_category.resource_category_id), - 1 if res.public else 0) for res in TEST_RESOURCES)) - - yield (conn, TEST_RESOURCES) - - with db.cursor(conn) as cursor: - cursor.executemany( - "DELETE FROM resources WHERE group_id=? AND resource_id=?", - ((str(res.group.group_id), str(res.resource_id),) - for res in TEST_RESOURCES)) diff --git a/tests/unit/auth/fixtures/role_fixtures.py b/tests/unit/auth/fixtures/role_fixtures.py deleted file mode 100644 index ee86aa2..0000000 --- a/tests/unit/auth/fixtures/role_fixtures.py +++ /dev/null @@ -1,45 +0,0 @@ -"""Fixtures and utilities for role-related tests""" -import uuid - -import pytest - -from gn3.auth import db -from gn3.auth.authorisation.roles import Role -from gn3.auth.authorisation.privileges import Privilege - -RESOURCE_READER_ROLE = Role( - uuid.UUID("c3ca2507-ee24-4835-9b31-8c21e1c072d3"), "resource_reader", True, - (Privilege("group:resource:view-resource", - "view a resource and use it in computations"),)) - -RESOURCE_EDITOR_ROLE = Role( - uuid.UUID("89819f84-6346-488b-8955-86062e9eedb7"), "resource_editor", True, - ( - Privilege("group:resource:view-resource", - "view a resource and use it in computations"), - Privilege("group:resource:edit-resource", "edit/update a resource"))) - -TEST_ROLES = (RESOURCE_READER_ROLE, RESOURCE_EDITOR_ROLE) - -@pytest.fixture(scope="function") -def fxtr_roles(conn_after_auth_migrations): - """Setup some example roles.""" - with db.cursor(conn_after_auth_migrations) as cursor: - cursor.executemany( - ("INSERT INTO roles VALUES (?, ?, ?)"), - ((str(role.role_id), role.role_name, 1) for role in TEST_ROLES)) - cursor.executemany( - ("INSERT INTO role_privileges VALUES (?, ?)"), - ((str(role.role_id), str(privilege.privilege_id)) - for role in TEST_ROLES for privilege in role.privileges)) - - yield conn_after_auth_migrations, TEST_ROLES - - with db.cursor(conn_after_auth_migrations) as cursor: - cursor.executemany( - ("DELETE FROM role_privileges WHERE role_id=? AND privilege_id=?"), - ((str(role.role_id), str(privilege.privilege_id)) - for role in TEST_ROLES for privilege in role.privileges)) - cursor.executemany( - ("DELETE FROM roles WHERE role_id=?"), - ((str(role.role_id),) for role in TEST_ROLES)) diff --git a/tests/unit/auth/fixtures/user_fixtures.py b/tests/unit/auth/fixtures/user_fixtures.py deleted file mode 100644 index d248f54..0000000 --- a/tests/unit/auth/fixtures/user_fixtures.py +++ /dev/null @@ -1,66 +0,0 @@ -"""Fixtures and utilities for user-related tests""" -import uuid - -import pytest - -from gn3.auth import db -from gn3.auth.authentication.users import User, hash_password - -TEST_USERS = ( - User(uuid.UUID("ecb52977-3004-469e-9428-2a1856725c7f"), "group@lead.er", - "Group Leader"), - User(uuid.UUID("21351b66-8aad-475b-84ac-53ce528451e3"), - "group@mem.ber01", "Group Member 01"), - User(uuid.UUID("ae9c6245-0966-41a5-9a5e-20885a96bea7"), - "group@mem.ber02", "Group Member 02"), - User(uuid.UUID("9a0c7ce5-2f40-4e78-979e-bf3527a59579"), - "unaff@iliated.user", "Unaffiliated User")) - -@pytest.fixture(scope="function") -def fxtr_users(conn_after_auth_migrations):# pylint: disable=[redefined-outer-name] - """Fixture: setup test users.""" - query = "INSERT INTO users(user_id, email, name) VALUES (?, ?, ?)" - query_user_roles = "INSERT INTO user_roles(user_id, role_id) VALUES (?, ?)" - test_user_roles = ( - ("ecb52977-3004-469e-9428-2a1856725c7f", - "a0e67630-d502-4b9f-b23f-6805d0f30e30"), - ("ecb52977-3004-469e-9428-2a1856725c7f", - "ade7e6b0-ba9c-4b51-87d0-2af7fe39a347")) - with db.cursor(conn_after_auth_migrations) as cursor: - cursor.executemany(query, ( - (str(user.user_id), user.email, user.name) for user in TEST_USERS)) - cursor.executemany(query_user_roles, test_user_roles) - - yield (conn_after_auth_migrations, TEST_USERS) - - with db.cursor(conn_after_auth_migrations) as cursor: - cursor.executemany( - "DELETE FROM user_roles WHERE user_id=?", - (("ecb52977-3004-469e-9428-2a1856725c7f",),)) - cursor.executemany( - "DELETE FROM users WHERE user_id=?", - (("ecb52977-3004-469e-9428-2a1856725c7f",), - ("21351b66-8aad-475b-84ac-53ce528451e3",), - ("ae9c6245-0966-41a5-9a5e-20885a96bea7",), - ("9a0c7ce5-2f40-4e78-979e-bf3527a59579",))) - -@pytest.fixture(scope="function") -def fxtr_users_with_passwords(fxtr_users): # pylint: disable=[redefined-outer-name] - """Fixture: add passwords to the users""" - conn, users = fxtr_users - user_passwords_params = tuple( - (str(user.user_id), hash_password( - f"password_for_user_{idx:03}".encode("utf8"))) - for idx, user in enumerate(users, start=1)) - - with db.cursor(conn) as cursor: - cursor.executemany( - "INSERT INTO user_credentials VALUES (?, ?)", - user_passwords_params) - - yield conn, users - - with db.cursor(conn) as cursor: - cursor.executemany( - "DELETE FROM user_credentials WHERE user_id=?", - ((item[0],) for item in user_passwords_params)) diff --git a/tests/unit/auth/test_credentials.py b/tests/unit/auth/test_credentials.py deleted file mode 100644 index f2a3d25..0000000 --- a/tests/unit/auth/test_credentials.py +++ /dev/null @@ -1,100 +0,0 @@ -"""Test the credentials checks""" -import pytest -from yoyo.migrations import MigrationList -from hypothesis import given, settings, strategies, HealthCheck - -from gn3.auth import db -from gn3.auth.authentication import credentials_in_database -from gn3.migrations import get_migration, apply_migrations, rollback_migrations - -from tests.unit.auth.conftest import migrations_up_to - -@pytest.fixture -def with_credentials_table(backend, auth_testdb_path): - """ - Fixture: Yield a connection object with the 'user_credentials' table - created. - """ - migrations_dir = "migrations/auth" - migration = f"{migrations_dir}/20221103_02_sGrIs-create-user-credentials-table.py" - migrations = (migrations_up_to(migration, migrations_dir) + - MigrationList([get_migration(migration)])) - apply_migrations(backend, migrations) - with db.connection(auth_testdb_path) as conn: - yield conn - - rollback_migrations(backend, migrations) - -@pytest.fixture -def with_credentials(with_credentials_table):# pylint: disable=redefined-outer-name - """ - Fixture: Initialise the database with some user credentials. - """ - with db.cursor(with_credentials_table) as cursor: - cursor.executemany( - "INSERT INTO users VALUES (:user_id, :email, :name)", - ({"user_id": "82552014-21ee-4321-b96a-b8788b97b862", - "email": "first@test.user", - "name": "First Test User" - }, - {"user_id": "bdd5cb7a-072d-4c2b-9872-d0cecb718523", - "email": "second@test.user", - "name": "Second Test User" - })) - cursor.executemany( - "INSERT INTO user_credentials VALUES (:user_id, :password)", - ({"user_id": "82552014-21ee-4321-b96a-b8788b97b862", - "password": b'$2b$12$LAh1PYtUgAFK7d5fA0EfL.4AdTZuYEAfzwO.p.jXVboxcP8bXNj7a' - }, - {"user_id": "bdd5cb7a-072d-4c2b-9872-d0cecb718523", - "password": b'$2b$12$zX77QCFSJuwIjAZGc0Jq5.rCWMHEMKD9Zf3Ay4C0AzwsiZ7SSPdKO' - })) - - yield with_credentials_table - - cursor.executemany("DELETE FROM user_credentials WHERE user_id=?", - (("82552014-21ee-4321-b96a-b8788b97b862",), - ("bdd5cb7a-072d-4c2b-9872-d0cecb718523",))) - cursor.executemany("DELETE FROM users WHERE user_id=?", - (("82552014-21ee-4321-b96a-b8788b97b862",), - ("bdd5cb7a-072d-4c2b-9872-d0cecb718523",))) - -@pytest.mark.unit_test -@given(strategies.emails(), strategies.text()) -@settings(suppress_health_check=[HealthCheck.function_scoped_fixture]) -def test_credentials_not_in_database(with_credentials, email, password):# pylint: disable=redefined-outer-name - """ - GIVEN: credentials that do not exist in the database - WHEN: the `credentials_in_database` function is run against the credentials - THEN: check that the function returns false in all cases. - """ - with db.cursor(with_credentials) as cursor: - assert credentials_in_database(cursor, email, password) is False - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "email,password", - (("first@test.user", "wrongpassword"), - ("first@tes.user", "testuser01"))) -def test_partially_wrong_credentials(with_credentials, email, password):# pylint: disable=redefined-outer-name - """ - GIVEN: credentials that exist in the database - WHEN: the credentials are checked with partially wrong values - THEN: the check fails since the credentials are not correct - """ - with db.cursor(with_credentials) as cursor: - assert credentials_in_database(cursor, email, password) is False - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "email,password", - (("first@test.user", "testuser01"), - ("second@test.user", "testuser02"))) -def test_partially_correct_credentials(with_credentials, email, password):# pylint: disable=redefined-outer-name - """ - GIVEN: credentials that exist in the database - WHEN: the credentials are checked with correct values - THEN: the check passes - """ - with db.cursor(with_credentials) as cursor: - assert credentials_in_database(cursor, email, password) is True diff --git a/tests/unit/auth/test_groups.py b/tests/unit/auth/test_groups.py deleted file mode 100644 index d3b8fd4..0000000 --- a/tests/unit/auth/test_groups.py +++ /dev/null @@ -1,171 +0,0 @@ -"""Test functions dealing with group management.""" -from uuid import UUID - -import pytest -from pymonad.maybe import Nothing - -from gn3.auth import db -from gn3.auth.authentication.users import User -from gn3.auth.authorisation.roles import Role -from gn3.auth.authorisation.privileges import Privilege -from gn3.auth.authorisation.errors import AuthorisationError -from gn3.auth.authorisation.groups.models import ( - Group, GroupRole, user_group, create_group, create_group_role) - -from tests.unit.auth import conftest - -create_group_failure = { - "status": "error", - "message": "Unauthorised: Failed to create group." -} - -def uuid_fn(): - """Mock function for uuid""" - return UUID("d32611e3-07fc-4564-b56c-786c6db6de2b") - - -GROUP = Group(UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup", - {"group_description": "The test group"}) -PRIVILEGES = ( - Privilege( - "group:resource:view-resource", - "view a resource and use it in computations"), - Privilege("group:resource:edit-resource", "edit/update a resource")) - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "user,expected", tuple(zip(conftest.TEST_USERS[0:1], ( - Group( - UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_group", - {"group_description": "A test group"}), - create_group_failure, create_group_failure, create_group_failure, - create_group_failure)))) -def test_create_group(# pylint: disable=[too-many-arguments] - fxtr_app, auth_testdb_path, mocker, fxtr_users, user, expected):# pylint: disable=[unused-argument] - """ - GIVEN: an authenticated user - WHEN: the user attempts to create a group - THEN: verify they are only able to create the group if they have the - appropriate privileges - """ - mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn) - mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", - conftest.get_tokeniser(user)) - with db.connection(auth_testdb_path) as conn: - assert create_group( - conn, "a_test_group", user, "A test group") == expected - -@pytest.mark.unit_test -@pytest.mark.parametrize("user", conftest.TEST_USERS[1:]) -def test_create_group_raises_exception_with_non_privileged_user(# pylint: disable=[too-many-arguments] - fxtr_app, auth_testdb_path, mocker, fxtr_users, user):# pylint: disable=[unused-argument] - """ - GIVEN: an authenticated user, without appropriate privileges - WHEN: the user attempts to create a group - THEN: verify the system raises an exception - """ - mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn) - mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", - conftest.get_tokeniser(user)) - with db.connection(auth_testdb_path) as conn: - with pytest.raises(AuthorisationError): - assert create_group(conn, "a_test_group", user, "A test group") - -create_role_failure = { - "status": "error", - "message": "Unauthorised: Could not create the group role" -} - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "user,expected", tuple(zip(conftest.TEST_USERS[0:1], ( - GroupRole( - UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), - GROUP, - Role(UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), - "ResourceEditor", True, PRIVILEGES)),)))) -def test_create_group_role(mocker, fxtr_users_in_group, user, expected): - """ - GIVEN: an authenticated user - WHEN: the user attempts to create a role, attached to a group - THEN: verify they are only able to create the role if they have the - appropriate privileges and that the role is attached to the given group - """ - mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn) - mocker.patch("gn3.auth.authorisation.roles.models.uuid4", uuid_fn) - mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", - conftest.get_tokeniser(user)) - conn, _group, _users = fxtr_users_in_group - with db.cursor(conn) as cursor: - assert create_group_role( - conn, GROUP, "ResourceEditor", PRIVILEGES) == expected - # cleanup - cursor.execute( - ("DELETE FROM group_roles " - "WHERE group_role_id=? AND group_id=? AND role_id=?"), - (str(uuid_fn()), str(GROUP.group_id), str(uuid_fn()))) - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "user,expected", tuple(zip(conftest.TEST_USERS[1:], ( - create_role_failure, create_role_failure, create_role_failure)))) -def test_create_group_role_raises_exception_with_unauthorised_users( - mocker, fxtr_users_in_group, user, expected): - """ - GIVEN: an authenticated user - WHEN: the user attempts to create a role, attached to a group - THEN: verify they are only able to create the role if they have the - appropriate privileges and that the role is attached to the given group - """ - mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn) - mocker.patch("gn3.auth.authorisation.roles.models.uuid4", uuid_fn) - mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", - conftest.get_tokeniser(user)) - conn, _group, _users = fxtr_users_in_group - with pytest.raises(AuthorisationError): - assert create_group_role( - conn, GROUP, "ResourceEditor", PRIVILEGES) == expected - -@pytest.mark.unit_test -def test_create_multiple_groups(mocker, fxtr_users): - """ - GIVEN: An authenticated user with appropriate authorisation - WHEN: The user attempts to create a new group, while being a member of an - existing group - THEN: The system should prevent that, and respond with an appropriate error - message - """ - mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn) - user = User( - UUID("ecb52977-3004-469e-9428-2a1856725c7f"), "group@lead.er", - "Group Leader") - mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", - conftest.get_tokeniser(user)) - conn, _test_users = fxtr_users - # First time, successfully creates the group - assert create_group(conn, "a_test_group", user) == Group( - UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_group", - {}) - # subsequent attempts should fail - with pytest.raises(AuthorisationError): - create_group(conn, "another_test_group", user) - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "user,expected", - tuple(zip( - conftest.TEST_USERS, - (([Group(UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup", {})] * 3) - + [Nothing])))) -def test_user_group(fxtr_users_in_group, user, expected): - """ - GIVEN: A bunch of registered users, some of whom are members of a group, and - others are not - WHEN: a particular user's group is requested, - THEN: return a Maybe containing the group that the user belongs to, or - Nothing - """ - conn, _group, _users = fxtr_users_in_group - assert ( - user_group(conn, user).maybe(Nothing, lambda val: val) - == expected) diff --git a/tests/unit/auth/test_migrations_add_data_to_table.py b/tests/unit/auth/test_migrations_add_data_to_table.py deleted file mode 100644 index 9cb5d0c..0000000 --- a/tests/unit/auth/test_migrations_add_data_to_table.py +++ /dev/null @@ -1,79 +0,0 @@ -"""Test data insertion when migrations are run.""" -import pytest - -from gn3.auth import db -from gn3.migrations import get_migration, apply_migrations, rollback_migrations -from tests.unit.auth.conftest import ( - apply_single_migration, rollback_single_migration, migrations_up_to) - -test_params = ( - ("20221116_01_nKUmX-add-privileges-to-group-leader-role.py", - ("SELECT role_id, privilege_id FROM role_privileges " - "WHERE role_id=? AND privilege_id IN (?, ?, ?, ?)"), - ("a0e67630-d502-4b9f-b23f-6805d0f30e30", - "221660b1-df05-4be1-b639-f010269dbda9", - "7bcca363-cba9-4169-9e31-26bdc6179b28", - "5103cc68-96f8-4ebb-83a4-a31692402c9b", - "1c59eff5-9336-4ed2-a166-8f70d4cb012e"), - (("a0e67630-d502-4b9f-b23f-6805d0f30e30", - "221660b1-df05-4be1-b639-f010269dbda9"), - ("a0e67630-d502-4b9f-b23f-6805d0f30e30", - "7bcca363-cba9-4169-9e31-26bdc6179b28"), - ("a0e67630-d502-4b9f-b23f-6805d0f30e30", - "5103cc68-96f8-4ebb-83a4-a31692402c9b"), - ("a0e67630-d502-4b9f-b23f-6805d0f30e30", - "1c59eff5-9336-4ed2-a166-8f70d4cb012e"))),) - -@pytest.mark.unit_test -@pytest.mark.parametrize("migration_file,query,query_params,data", test_params) -def test_apply_insert(# pylint: disable=[too-many-arguments] - auth_migrations_dir, backend, auth_testdb_path, migration_file, query, - query_params, data): - """ - GIVEN: a database migration script - WHEN: the script is applied - THEN: ensure the given data exists in the table - """ - migration_path=f"{auth_migrations_dir}/{migration_file}" - older_migrations = migrations_up_to(migration_path, auth_migrations_dir) - the_migration = get_migration(migration_path) - apply_migrations(backend, older_migrations) - with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor: - cursor.execute(query, query_params) - result_before_migration = cursor.fetchall() - apply_single_migration(backend, the_migration) - cursor.execute(query, query_params) - result_after_migration = cursor.fetchall() - - rollback_migrations(backend, older_migrations + [the_migration]) - assert len(result_before_migration) == 0, "Expected no results before migration" - assert sorted(result_after_migration) == sorted(data) - -@pytest.mark.unit_test -@pytest.mark.parametrize("migration_file,query,query_params,data", test_params) -def test_rollback_insert(# pylint: disable=[too-many-arguments] - auth_migrations_dir, backend, auth_testdb_path, migration_file, query, - query_params, data): - """ - GIVEN: a database migration script - WHEN: the script is rolled back - THEN: ensure the given data no longer exists in the database - """ - migration_path=f"{auth_migrations_dir}/{migration_file}" - older_migrations = migrations_up_to(migration_path, auth_migrations_dir) - the_migration = get_migration(migration_path) - apply_migrations(backend, older_migrations) - with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor: - cursor.execute(query, query_params) - result_before_migration = cursor.fetchall() - apply_single_migration(backend, the_migration) - cursor.execute(query, query_params) - result_after_migration = cursor.fetchall() - rollback_single_migration(backend, the_migration) - cursor.execute(query, query_params) - result_after_rollback = cursor.fetchall() - - rollback_migrations(backend, older_migrations) - assert len(result_before_migration) == 0, "Expected no results before migration" - assert sorted(result_after_migration) == sorted(data) - assert len(result_after_rollback) == 0, "Expected no results after rollback" diff --git a/tests/unit/auth/test_migrations_add_remove_columns.py b/tests/unit/auth/test_migrations_add_remove_columns.py deleted file mode 100644 index ea9bf7b..0000000 --- a/tests/unit/auth/test_migrations_add_remove_columns.py +++ /dev/null @@ -1,116 +0,0 @@ -"""Test migrations that alter tables adding/removing columns.""" -import pytest - -from gn3.auth import db -from gn3.migrations import get_migration, apply_migrations, rollback_migrations -from tests.unit.auth.conftest import ( - apply_single_migration, rollback_single_migration, migrations_up_to) - -QUERY = "SELECT sql FROM sqlite_schema WHERE name=?" - -TEST_PARAMS = ( - ("20221109_01_HbD5F-add-resource-meta-field-to-resource-categories-field.py", - "resource_categories", "resource_meta TEXT", True), - (("20221110_08_23psB-add-privilege-category-and-privilege-description-" - "columns-to-privileges-table.py"), - "privileges", "privilege_category TEXT", True), - (("20221110_08_23psB-add-privilege-category-and-privilege-description-" - "columns-to-privileges-table.py"), - "privileges", "privilege_description TEXT", True), - ("20221117_01_RDlfx-modify-group-roles-add-group-role-id.py", "group_roles", - "group_role_id", True), - ("20221208_01_sSdHz-add-public-column-to-resources-table.py", "resources", - "public", True)) - -def found(haystack: str, needle: str) -> bool: - """Check whether `needle` is found in `haystack`""" - return any( - (line.strip().find(needle) >= 0) for line in haystack.split("\n")) - -def pristine_before_migration(adding: bool, result_str: str, column: str) -> bool: - """Check that database is pristine before running the migration""" - col_was_found = found(result_str, column) - if adding: - return not col_was_found - return col_was_found - -def applied_successfully(adding: bool, result_str: str, column: str) -> bool: - """Check that the migration ran successfully""" - col_was_found = found(result_str, column) - if adding: - return col_was_found - return not col_was_found - -def rolled_back_successfully(adding: bool, result_str: str, column: str) -> bool: - """Check that the migration ran successfully""" - col_was_found = found(result_str, column) - if adding: - return not col_was_found - return col_was_found - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "migration_file,the_table,the_column,adding", TEST_PARAMS) -def test_apply_add_remove_column(# pylint: disable=[too-many-arguments] - auth_migrations_dir, auth_testdb_path, backend, migration_file, - the_table, the_column, adding): - """ - GIVEN: A migration that alters a table, adding or removing a column - WHEN: The migration is applied - THEN: Ensure the column exists if `adding` is True, otherwise, ensure the - column has been dropped - """ - migration_path = f"{auth_migrations_dir}/{migration_file}" - older_migrations = migrations_up_to(migration_path, auth_migrations_dir) - the_migration = get_migration(migration_path) - apply_migrations(backend, older_migrations) - with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: - cursor.execute(QUERY, (the_table,)) - results_before_migration = cursor.fetchone() - apply_single_migration(backend, the_migration) - cursor.execute(QUERY, (the_table,)) - results_after_migration = cursor.fetchone() - - rollback_migrations(backend, older_migrations + [the_migration]) - - assert pristine_before_migration( - adding, results_before_migration[0], the_column), ( - f"Column `{the_column}` exists before migration and should not" - if adding else - f"Column `{the_column}` doesn't exist before migration and it should") - assert applied_successfully( - adding, results_after_migration[0], the_column), "Migration failed" - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "migration_file,the_table,the_column,adding", TEST_PARAMS) -def test_rollback_add_remove_column(# pylint: disable=[too-many-arguments] - auth_migrations_dir, auth_testdb_path, backend, migration_file, - the_table, the_column, adding): - """ - GIVEN: A migration that alters a table, adding or removing a column - WHEN: The migration is applied - THEN: Ensure the column is dropped if `adding` is True, otherwise, ensure - the column has been restored - """ - migration_path = f"{auth_migrations_dir}/{migration_file}" - older_migrations = migrations_up_to(migration_path, auth_migrations_dir) - the_migration = get_migration(migration_path) - apply_migrations(backend, older_migrations) - apply_single_migration(backend, the_migration) - with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: - cursor.execute(QUERY, (the_table,)) - results_before_rollback = cursor.fetchone() - rollback_single_migration(backend, the_migration) - cursor.execute(QUERY, (the_table,)) - results_after_rollback = cursor.fetchone() - - rollback_migrations(backend, older_migrations + [the_migration]) - - assert pristine_before_migration( - not adding, results_before_rollback[0], the_column), ( - f"Column `{the_column}` doesn't exist before rollback and it should" - if adding else - f"Column `{the_column}` exists before rollback and should not") - assert rolled_back_successfully( - adding, results_after_rollback[0], the_column), "Rollback failed" diff --git a/tests/unit/auth/test_migrations_create_tables.py b/tests/unit/auth/test_migrations_create_tables.py deleted file mode 100644 index 2b8140b..0000000 --- a/tests/unit/auth/test_migrations_create_tables.py +++ /dev/null @@ -1,91 +0,0 @@ -"""Test migrations that create tables""" -import pytest - -from gn3.auth import db -from gn3.migrations import get_migration, apply_migrations, rollback_migrations -from tests.unit.auth.conftest import ( - apply_single_migration, rollback_single_migration, migrations_up_to) - -migrations_and_tables = ( - ("20221103_01_js9ub-initialise-the-auth-entic-oris-ation-database.py", - "users"), - ("20221103_02_sGrIs-create-user-credentials-table.py", "user_credentials"), - ("20221108_01_CoxYh-create-the-groups-table.py", "groups"), - ("20221108_02_wxTr9-create-privileges-table.py", "privileges"), - ("20221108_03_Pbhb1-create-resource-categories-table.py", "resource_categories"), - ("20221110_01_WtZ1I-create-resources-table.py", "resources"), - ("20221110_05_BaNtL-create-roles-table.py", "roles"), - ("20221110_06_Pq2kT-create-generic-roles-table.py", "generic_roles"), - ("20221110_07_7WGa1-create-role-privileges-table.py", "role_privileges"), - ("20221114_01_n8gsF-create-generic-role-privileges-table.py", - "generic_role_privileges"), - ("20221114_03_PtWjc-create-group-roles-table.py", "group_roles"), - ("20221114_05_hQun6-create-user-roles-table.py", "user_roles"), - ("20221117_02_fmuZh-create-group-users-table.py", "group_users"), - ("20221206_01_BbeF9-create-group-user-roles-on-resources-table.py", - "group_user_roles_on_resources"), - ("20221219_01_CI3tN-create-oauth2-clients-table.py", "oauth2_clients"), - ("20221219_02_buSEU-create-oauth2-tokens-table.py", "oauth2_tokens"), - ("20221219_03_PcTrb-create-authorisation-code-table.py", - "authorisation_code"), - ("20230207_01_r0bkZ-create-group-join-requests-table.py", - "group_join_requests"), - ("20230322_01_0dDZR-create-linked-phenotype-data-table.py", - "linked_phenotype_data"), - ("20230322_02_Ll854-create-phenotype-resources-table.py", - "phenotype_resources"), - ("20230404_01_VKxXg-create-linked-genotype-data-table.py", - "linked_genotype_data"), - ("20230404_02_la33P-create-genotype-resources-table.py", - "genotype_resources"), - ("20230410_01_8mwaf-create-linked-mrna-data-table.py", "linked_mrna_data"), - ("20230410_02_WZqSf-create-mrna-resources-table.py", "mrna_resources")) - -@pytest.mark.unit_test -@pytest.mark.parametrize("migration_file,the_table", migrations_and_tables) -def test_create_table( - auth_testdb_path, auth_migrations_dir, backend, migration_file, - the_table): - """ - GIVEN: A database migration script to create table, `the_table` - WHEN: The migration is applied - THEN: Ensure that the table `the_table` is created - """ - migration_path=f"{auth_migrations_dir}/{migration_file}" - older_migrations = migrations_up_to(migration_path, auth_migrations_dir) - apply_migrations(backend, older_migrations) - with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: - cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") - result_before_migration = cursor.fetchall() - apply_single_migration(backend, get_migration(migration_path)) - cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") - result_after_migration = cursor.fetchall() - - rollback_migrations(backend, older_migrations) - assert the_table not in [row[0] for row in result_before_migration] - assert the_table in [row[0] for row in result_after_migration] - -@pytest.mark.unit_test -@pytest.mark.parametrize("migration_file,the_table", migrations_and_tables) -def test_rollback_create_table( - auth_testdb_path, auth_migrations_dir, backend, migration_file, - the_table): - """ - GIVEN: A database migration script to create the table `the_table` - WHEN: The migration is rolled back - THEN: Ensure that the table `the_table` no longer exists - """ - migration_path=f"{auth_migrations_dir}/{migration_file}" - older_migrations = migrations_up_to(migration_path, auth_migrations_dir) - apply_migrations(backend, older_migrations) - with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: - apply_single_migration(backend, get_migration(migration_path)) - cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") - result_after_migration = cursor.fetchall() - rollback_single_migration(backend, get_migration(migration_path)) - cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") - result_after_rollback = cursor.fetchall() - - rollback_migrations(backend, older_migrations) - assert the_table in [row[0] for row in result_after_migration] - assert the_table not in [row[0] for row in result_after_rollback] diff --git a/tests/unit/auth/test_migrations_drop_tables.py b/tests/unit/auth/test_migrations_drop_tables.py deleted file mode 100644 index 2362c77..0000000 --- a/tests/unit/auth/test_migrations_drop_tables.py +++ /dev/null @@ -1,63 +0,0 @@ -"""Test migrations that create tables""" - -import pytest - -from gn3.auth import db -from gn3.migrations import get_migration, apply_migrations, rollback_migrations -from tests.unit.auth.conftest import ( - apply_single_migration, rollback_single_migration, migrations_up_to) - -test_params = ( - ("20221114_02_DKKjn-drop-generic-role-tables.py", "generic_roles"), - ("20221114_02_DKKjn-drop-generic-role-tables.py", "generic_role_privileges")) - -@pytest.mark.unit_test -@pytest.mark.parametrize("migration_file,the_table", test_params) -def test_drop_table( - auth_testdb_path, auth_migrations_dir, backend, - migration_file, the_table): - """ - GIVEN: A database migration script to create table, `the_table` - WHEN: The migration is applied - THEN: Ensure that the table `the_table` is created - """ - migration_path=f"{auth_migrations_dir}/{migration_file}" - older_migrations = migrations_up_to(migration_path, auth_migrations_dir) - the_migration = get_migration(migration_path) - apply_migrations(backend, older_migrations) - with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: - cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") - result_before_migration = cursor.fetchall() - apply_single_migration(backend, the_migration) - cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") - result_after_migration = cursor.fetchall() - - rollback_migrations(backend, older_migrations + [the_migration]) - assert the_table in [row[0] for row in result_before_migration] - assert the_table not in [row[0] for row in result_after_migration] - -@pytest.mark.unit_test -@pytest.mark.parametrize("migration_file,the_table", test_params) -def test_rollback_drop_table( - auth_testdb_path, auth_migrations_dir, backend, migration_file, - the_table): - """ - GIVEN: A database migration script to create the table `the_table` - WHEN: The migration is rolled back - THEN: Ensure that the table `the_table` no longer exists - """ - migration_path=f"{auth_migrations_dir}/{migration_file}" - older_migrations = migrations_up_to(migration_path, auth_migrations_dir) - the_migration = get_migration(migration_path) - apply_migrations(backend, older_migrations) - with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: - apply_single_migration(backend, the_migration) - cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") - result_after_migration = cursor.fetchall() - rollback_single_migration(backend, the_migration) - cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") - result_after_rollback = cursor.fetchall() - - rollback_migrations(backend, older_migrations) - assert the_table not in [row[0] for row in result_after_migration] - assert the_table in [row[0] for row in result_after_rollback] diff --git a/tests/unit/auth/test_migrations_indexes.py b/tests/unit/auth/test_migrations_indexes.py deleted file mode 100644 index b1f06d9..0000000 --- a/tests/unit/auth/test_migrations_indexes.py +++ /dev/null @@ -1,97 +0,0 @@ -"""Test that indexes are created and removed.""" -import pytest - -from gn3.auth import db -from gn3.migrations import get_migration, apply_migrations, rollback_migrations -from tests.unit.auth.conftest import ( - apply_single_migration, rollback_single_migration, migrations_up_to) - -QUERY = """ -SELECT name FROM sqlite_master WHERE type='index' AND tbl_name = ? -AND name= ? -""" - -migrations_tables_and_indexes = ( - ("20221110_07_7WGa1-create-role-privileges-table.py", "role_privileges", - "idx_tbl_role_privileges_cols_role_id"), - ("20221114_01_n8gsF-create-generic-role-privileges-table.py", - "generic_role_privileges", - "idx_tbl_generic_role_privileges_cols_generic_role_id"), - ("20221114_03_PtWjc-create-group-roles-table.py", "group_roles", - "idx_tbl_group_roles_cols_group_id"), - ("20221114_05_hQun6-create-user-roles-table.py", "user_roles", - "idx_tbl_user_roles_cols_user_id"), - ("20221117_02_fmuZh-create-group-users-table.py", "group_users", - "tbl_group_users_cols_group_id"), - ("20221206_01_BbeF9-create-group-user-roles-on-resources-table.py", - "group_user_roles_on_resources", - "idx_tbl_group_user_roles_on_resources_group_user_resource")) - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "migration_file,the_table,the_index", migrations_tables_and_indexes) -def test_index_created(# pylint: disable=[too-many-arguments] - auth_testdb_path, auth_migrations_dir, backend, migration_file, - the_table, the_index): - """ - GIVEN: A database migration - WHEN: The migration is applied - THEN: Ensure the given index is created for the provided table - """ - migration_path=f"{auth_migrations_dir}/{migration_file}" - older_migrations = migrations_up_to(migration_path, auth_migrations_dir) - the_migration = get_migration(migration_path) - query_params = (the_table, the_index) - apply_migrations(backend, older_migrations) - with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: - cursor.execute(QUERY, query_params) - result_before_migration = cursor.fetchall() - apply_single_migration(backend, the_migration) - cursor.execute(QUERY, query_params) - result_after_migration = cursor.fetchall() - - rollback_migrations(backend, older_migrations + [the_migration]) - assert the_index not in [row[0] for row in result_before_migration], ( - f"Index '{the_index}' was found for table '{the_table}' before migration.") - assert ( - len(result_after_migration) == 1 - and result_after_migration[0][0] == the_index), ( - f"Index '{the_index}' was not found for table '{the_table}' after migration.") - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "migration_file,the_table,the_index", migrations_tables_and_indexes) -def test_index_dropped(# pylint: disable=[too-many-arguments] - auth_testdb_path, auth_migrations_dir, backend, migration_file, - the_table, the_index): - """ - GIVEN: A database migration - WHEN: The migration is rolled-back - THEN: Ensure the given index no longer exists for the given table - """ - migration_path=f"{auth_migrations_dir}/{migration_file}" - older_migrations = migrations_up_to(migration_path, auth_migrations_dir) - the_migration = get_migration(migration_path) - query_params = (the_table, the_index) - apply_migrations(backend, older_migrations) - with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: - cursor.execute(QUERY, query_params) - result_before_migration = cursor.fetchall() - apply_single_migration(backend, the_migration) - cursor.execute(QUERY, query_params) - result_after_migration = cursor.fetchall() - rollback_single_migration(backend, the_migration) - cursor.execute(QUERY, query_params) - result_after_rollback = cursor.fetchall() - - rollback_migrations(backend, older_migrations) - assert the_index not in [row[0] for row in result_before_migration], ( - f"Index '{the_index}' was found for table '{the_table}' before " - "migration") - assert ( - len(result_after_migration) == 1 - and result_after_migration[0][0] == the_index), ( - f"Index '{the_index}' was not found for table '{the_table}' after migration.") - assert the_index not in [row[0] for row in result_after_rollback], ( - f"Index '{the_index}' was found for table '{the_table}' after " - "rollback") diff --git a/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py b/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py deleted file mode 100644 index dd3d4c6..0000000 --- a/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py +++ /dev/null @@ -1,60 +0,0 @@ -""" -Test that the `resource_categories` table is initialised with the startup data. -""" -import pytest - -from gn3.auth import db -from gn3.migrations import get_migration, apply_migrations, rollback_migrations -from tests.unit.auth.conftest import ( - apply_single_migration, rollback_single_migration, migrations_up_to) - -MIGRATION_PATH = "migrations/auth/20221108_04_CKcSL-init-data-in-resource-categories-table.py" - -@pytest.mark.unit_test -def test_apply_init_data(auth_testdb_path, auth_migrations_dir, backend): - """ - GIVEN: A migration script - WHEN: The migration is applied - THEN: Verify that the expected data exists in the table - """ - older_migrations = migrations_up_to(MIGRATION_PATH, auth_migrations_dir) - the_migration = get_migration(MIGRATION_PATH) - apply_migrations(backend, older_migrations) - with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor: - cursor.execute("SELECT * FROM resource_categories") - assert len(cursor.fetchall()) == 0, "Expected empty table." - apply_single_migration(backend, the_migration) - cursor.execute("SELECT * FROM resource_categories") - results = cursor.fetchall() - assert len(results) == 3, "Expected 3 rows of data." - assert sorted(results) == sorted(( - ('fad071a3-2fc8-40b8-992b-cdefe7dcac79', 'mrna', 'mRNA Dataset'), - ('548d684b-d4d1-46fb-a6d3-51a56b7da1b3', 'phenotype', - 'Phenotype (Publish) Dataset'), - ('48056f84-a2a6-41ac-8319-0e1e212cba2a', 'genotype', - 'Genotype Dataset'))) - - rollback_migrations(backend, older_migrations + [the_migration]) - -@pytest.mark.unit_test -def test_rollback_init_data(auth_testdb_path, auth_migrations_dir, backend): - """ - GIVEN: A migration script - WHEN: The migration is rolled back - THEN: Verify that the table is empty - """ - older_migrations = migrations_up_to(MIGRATION_PATH, auth_migrations_dir) - the_migration = get_migration(MIGRATION_PATH) - apply_migrations(backend, older_migrations) - with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor: - cursor.execute("SELECT * FROM resource_categories") - assert len(cursor.fetchall()) == 0, "Expected empty table." - apply_single_migration(backend, the_migration) - cursor.execute("SELECT * FROM resource_categories") - results = cursor.fetchall() - assert len(results) == 3, "Expected 3 rows of data." - rollback_single_migration(backend, the_migration) - cursor.execute("SELECT * FROM resource_categories") - assert len(cursor.fetchall()) == 0, "Expected empty table." - - rollback_migrations(backend, older_migrations) diff --git a/tests/unit/auth/test_migrations_insert_data_into_empty_table.py b/tests/unit/auth/test_migrations_insert_data_into_empty_table.py deleted file mode 100644 index ebb7fa6..0000000 --- a/tests/unit/auth/test_migrations_insert_data_into_empty_table.py +++ /dev/null @@ -1,77 +0,0 @@ -"""Test data insertion when migrations are run.""" -import sqlite3 -from contextlib import closing - -import pytest - -from gn3.migrations import get_migration, apply_migrations, rollback_migrations -from tests.unit.auth.conftest import ( - apply_single_migration, rollback_single_migration, migrations_up_to) - -test_params = ( - ("20221113_01_7M0hv-enumerate-initial-privileges.py", "privileges", 19), - ("20221114_04_tLUzB-initialise-basic-roles.py", "roles", 2), - ("20221114_04_tLUzB-initialise-basic-roles.py", "role_privileges", 15)) - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "migration_file,table,row_count", test_params) -def test_apply_insert(# pylint: disable=[too-many-arguments] - auth_testdb_path, auth_migrations_dir, backend, migration_file, - table, row_count): - """ - GIVEN: A database migration - WHEN: The migration is applied - THEN: Ensure the given number of rows are inserted into the table - """ - migration_path=f"{auth_migrations_dir}/{migration_file}" - older_migrations = migrations_up_to(migration_path, auth_migrations_dir) - the_migration = get_migration(migration_path) - apply_migrations(backend, older_migrations) - with closing(sqlite3.connect(auth_testdb_path)) as conn, closing(conn.cursor()) as cursor: - query = f"SELECT COUNT(*) FROM {table}" - cursor.execute(query) - result_before_migration = cursor.fetchall() - apply_single_migration(backend, the_migration) - cursor.execute(query) - result_after_migration = cursor.fetchall() - - rollback_migrations(backend, older_migrations+[the_migration]) - assert result_before_migration[0][0] == 0, ( - "Expected empty table before initialisation") - assert result_after_migration[0][0] == row_count, ( - f"Expected {row_count} rows") - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "migration_file,table,row_count", test_params) -def test_rollback_insert(# pylint: disable=[too-many-arguments] - auth_testdb_path, auth_migrations_dir, backend, migration_file, - table, row_count): - """ - GIVEN: A database migration - WHEN: The migration is applied - THEN: Ensure the given number of rows are inserted into the table - """ - migration_path=f"{auth_migrations_dir}/{migration_file}" - older_migrations = migrations_up_to(migration_path, auth_migrations_dir) - the_migration = get_migration(migration_path) - apply_migrations(backend, older_migrations) - with closing(sqlite3.connect(auth_testdb_path)) as conn, closing(conn.cursor()) as cursor: - query = f"SELECT COUNT(*) FROM {table}" - cursor.execute(query) - result_before_migration = cursor.fetchall() - apply_single_migration(backend, the_migration) - cursor.execute(query) - result_after_migration = cursor.fetchall() - rollback_single_migration(backend, the_migration) - cursor.execute(query) - result_after_rollback = cursor.fetchall() - - rollback_migrations(backend, older_migrations) - assert result_before_migration[0][0] == 0, ( - "Expected empty table before initialisation") - assert result_after_migration[0][0] == row_count, ( - f"Expected {row_count} rows") - assert result_after_rollback[0][0] == 0, ( - "Expected empty table after rollback") diff --git a/tests/unit/auth/test_privileges.py b/tests/unit/auth/test_privileges.py deleted file mode 100644 index 3c645c7..0000000 --- a/tests/unit/auth/test_privileges.py +++ /dev/null @@ -1,45 +0,0 @@ -"""Test the privileges module""" -import pytest - -from gn3.auth import db -from gn3.auth.authorisation.privileges import Privilege, user_privileges - -from tests.unit.auth import conftest - - -PRIVILEGES = sorted( - (Privilege("system:group:create-group", "Create a group"), - Privilege("system:group:view-group", "View the details of a group"), - Privilege("system:group:edit-group", "Edit the details of a group"), - Privilege("system:user:list", "List users in the system"), - Privilege("system:group:delete-group", "Delete a group"), - Privilege("group:user:add-group-member", "Add a user to a group"), - Privilege("group:user:remove-group-member", "Remove a user from a group"), - Privilege("system:group:transfer-group-leader", - "Transfer leadership of the group to some other member"), - - Privilege("group:resource:create-resource", "Create a resource object"), - Privilege("group:resource:view-resource", - "view a resource and use it in computations"), - Privilege("group:resource:edit-resource", "edit/update a resource"), - Privilege("group:resource:delete-resource", "Delete a resource"), - - Privilege("group:role:create-role", "Create a new role"), - Privilege("group:role:edit-role", "edit/update an existing role"), - Privilege("group:user:assign-role", "Assign a role to an existing user"), - Privilege("group:role:delete-role", "Delete an existing role")), - key=lambda x: x.privilege_id) - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "user,expected", tuple(zip( - conftest.TEST_USERS, (PRIVILEGES, [], [], [], [])))) -def test_user_privileges(auth_testdb_path, fxtr_users, user, expected):# pylint: disable=[unused-argument] - """ - GIVEN: A user - WHEN: An attempt is made to fetch the user's privileges - THEN: Ensure only - """ - with db.connection(auth_testdb_path) as conn: - assert sorted( - user_privileges(conn, user), key=lambda x: x.privilege_id) == expected diff --git a/tests/unit/auth/test_resources.py b/tests/unit/auth/test_resources.py deleted file mode 100644 index 7b9798a..0000000 --- a/tests/unit/auth/test_resources.py +++ /dev/null @@ -1,124 +0,0 @@ -"""Test resource-management functions""" -import uuid - -import pytest - -from gn3.auth import db - -from gn3.auth.authorisation.groups import Group -from gn3.auth.authorisation.errors import AuthorisationError -from gn3.auth.authorisation.resources.models import ( - Resource, user_resources, create_resource, ResourceCategory, - public_resources) - -from tests.unit.auth import conftest - -group = Group(uuid.UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup", - {}) -resource_category = ResourceCategory( - uuid.UUID("fad071a3-2fc8-40b8-992b-cdefe7dcac79"), "mrna", "mRNA Dataset") -create_resource_failure = { - "status": "error", - "message": "Unauthorised: Could not create resource" -} - - -def uuid_fn(): - """Mock function for uuid""" - return uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b") - - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "user,expected", - tuple(zip( - conftest.TEST_USERS[0:1], - (Resource( - group, uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), - "test_resource", resource_category, False),)))) -def test_create_resource(mocker, fxtr_users_in_group, user, expected): - """Test that resource creation works as expected.""" - mocker.patch("gn3.auth.authorisation.resources.models.uuid4", uuid_fn) - mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", - conftest.get_tokeniser(user)) - conn, _group, _users = fxtr_users_in_group - resource = create_resource( - conn, "test_resource", resource_category, user, False) - assert resource == expected - - with db.cursor(conn) as cursor: - # Cleanup - cursor.execute( - "DELETE FROM group_user_roles_on_resources WHERE resource_id=?", - (str(resource.resource_id),)) - cursor.execute( - "DELETE FROM group_roles WHERE group_id=?", - (str(resource.group.group_id),)) - cursor.execute( - "DELETE FROM resources WHERE resource_id=?", - (str(resource.resource_id),)) - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "user,expected", - tuple(zip( - conftest.TEST_USERS[1:], - (create_resource_failure, create_resource_failure, - create_resource_failure)))) -def test_create_resource_raises_for_unauthorised_users( - mocker, fxtr_users_in_group, user, expected): - """Test that resource creation works as expected.""" - mocker.patch("gn3.auth.authorisation.resources.models.uuid4", uuid_fn) - mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", - conftest.get_tokeniser(user)) - conn, _group, _users = fxtr_users_in_group - with pytest.raises(AuthorisationError): - assert create_resource( - conn, "test_resource", resource_category, user, False) == expected - - -@pytest.mark.unit_test -def test_public_resources(fxtr_resources): - """ - GIVEN: some resources in the database - WHEN: public resources are requested - THEN: only list the resources that are public - """ - conn, _res = fxtr_resources - assert sorted( - public_resources(conn), - key=lambda resource: resource.resource_id) == sorted(tuple( - res for res in - conftest.TEST_RESOURCES - if res.public), key=lambda resource: resource.resource_id) - -PUBLIC_RESOURCES = sorted( - {res.resource_id: res for res in conftest.TEST_RESOURCES_PUBLIC}.values(), - key=lambda resource: resource.resource_id) - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "user,expected", - tuple(zip( - conftest.TEST_USERS, - (sorted( - {res.resource_id: res for res in - (conftest.TEST_RESOURCES_GROUP_01 + - conftest.TEST_RESOURCES_PUBLIC)}.values(), - key=lambda resource: resource.resource_id), - sorted( - {res.resource_id: res for res in - ((conftest.TEST_RESOURCES_GROUP_01[1],) + - conftest.TEST_RESOURCES_PUBLIC)}.values() - , key=lambda resource: resource.resource_id), - PUBLIC_RESOURCES, PUBLIC_RESOURCES)))) -def test_user_resources(fxtr_group_user_roles, user, expected): - """ - GIVEN: some resources in the database - WHEN: a particular user's resources are requested - THEN: list only the resources for which the user can access - """ - conn, *_others = fxtr_group_user_roles - assert sorted( - {res.resource_id: res for res in user_resources(conn, user) - }.values(), key=lambda resource: resource.resource_id) == expected diff --git a/tests/unit/auth/test_roles.py b/tests/unit/auth/test_roles.py deleted file mode 100644 index 8e22bb5..0000000 --- a/tests/unit/auth/test_roles.py +++ /dev/null @@ -1,127 +0,0 @@ -"""Test functions dealing with group management.""" -import uuid - -import pytest - -from gn3.auth import db -from gn3.auth.authorisation.privileges import Privilege -from gn3.auth.authorisation.errors import AuthorisationError -from gn3.auth.authorisation.roles.models import Role, user_roles, create_role - -from tests.unit.auth import conftest -from tests.unit.auth.fixtures import TEST_USERS - -create_role_failure = { - "status": "error", - "message": "Unauthorised: Could not create role" -} - - -def uuid_fn(): - """Mock function for uuid""" - return uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b") - - -PRIVILEGES = ( - Privilege("group:resource:view-resource", - "view a resource and use it in computations"), - Privilege("group:resource:edit-resource", "edit/update a resource")) - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "user,expected", tuple(zip(conftest.TEST_USERS[0:1], ( - Role(uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_role", - True, PRIVILEGES),)))) -def test_create_role(# pylint: disable=[too-many-arguments] - fxtr_app, auth_testdb_path, mocker, fxtr_users, user, expected):# pylint: disable=[unused-argument] - """ - GIVEN: an authenticated user - WHEN: the user attempts to create a role - THEN: verify they are only able to create the role if they have the - appropriate privileges - """ - mocker.patch("gn3.auth.authorisation.roles.models.uuid4", uuid_fn) - mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", - conftest.get_tokeniser(user)) - with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: - the_role = create_role(cursor, "a_test_role", PRIVILEGES) - assert the_role == expected - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "user,expected", tuple(zip(conftest.TEST_USERS[1:], ( - create_role_failure, create_role_failure, create_role_failure)))) -def test_create_role_raises_exception_for_unauthorised_users(# pylint: disable=[too-many-arguments] - fxtr_app, auth_testdb_path, mocker, fxtr_users, user, expected):# pylint: disable=[unused-argument] - """ - GIVEN: an authenticated user - WHEN: the user attempts to create a role - THEN: verify they are only able to create the role if they have the - appropriate privileges - """ - mocker.patch("gn3.auth.authorisation.roles.models.uuid4", uuid_fn) - mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", - conftest.get_tokeniser(user)) - with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: - with pytest.raises(AuthorisationError): - create_role(cursor, "a_test_role", PRIVILEGES) - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "user,expected", - (zip(TEST_USERS, - ((Role( - role_id=uuid.UUID('a0e67630-d502-4b9f-b23f-6805d0f30e30'), - role_name='group-leader', user_editable=False, - privileges=( - Privilege(privilege_id='group:resource:create-resource', - privilege_description='Create a resource object'), - Privilege(privilege_id='group:resource:delete-resource', - privilege_description='Delete a resource'), - Privilege(privilege_id='group:resource:edit-resource', - privilege_description='edit/update a resource'), - Privilege( - privilege_id='group:resource:view-resource', - privilege_description=( - 'view a resource and use it in computations')), - Privilege(privilege_id='group:role:create-role', - privilege_description='Create a new role'), - Privilege(privilege_id='group:role:delete-role', - privilege_description='Delete an existing role'), - Privilege(privilege_id='group:role:edit-role', - privilege_description='edit/update an existing role'), - Privilege(privilege_id='group:user:add-group-member', - privilege_description='Add a user to a group'), - Privilege(privilege_id='group:user:assign-role', - privilege_description=( - 'Assign a role to an existing user')), - Privilege(privilege_id='group:user:remove-group-member', - privilege_description='Remove a user from a group'), - Privilege(privilege_id='system:group:delete-group', - privilege_description='Delete a group'), - Privilege(privilege_id='system:group:edit-group', - privilege_description='Edit the details of a group'), - Privilege( - privilege_id='system:group:transfer-group-leader', - privilege_description=( - 'Transfer leadership of the group to some other ' - 'member')), - Privilege(privilege_id='system:group:view-group', - privilege_description='View the details of a group'), - Privilege(privilege_id='system:user:list', - privilege_description='List users in the system'))), - Role( - role_id=uuid.UUID("ade7e6b0-ba9c-4b51-87d0-2af7fe39a347"), - role_name="group-creator", user_editable=False, - privileges=( - Privilege(privilege_id='system:group:create-group', - privilege_description = "Create a group"),))), - tuple(), tuple(), tuple())))) -def test_user_roles(fxtr_group_user_roles, user, expected): - """ - GIVEN: an authenticated user - WHEN: we request the user's privileges - THEN: return **ALL** the privileges attached to the user - """ - conn, *_others = fxtr_group_user_roles - assert user_roles(conn, user) == expected diff --git a/tests/unit/auth/test_token.py b/tests/unit/auth/test_token.py deleted file mode 100644 index 76316ea..0000000 --- a/tests/unit/auth/test_token.py +++ /dev/null @@ -1,62 +0,0 @@ -"""Test the OAuth2 authorisation""" - -import pytest - -from gn3.auth import db - -SUCCESS_RESULT = { - "status_code": 200, - "result": { - "access_token": "123456ABCDE", - "expires_in": 864000, - "scope": "profile", - "token_type": "Bearer"}} - -USERNAME_PASSWORD_FAIL_RESULT = { - "status_code": 400, - "result": { - 'error': 'invalid_request', - 'error_description': 'Invalid "username" or "password" in request.'}} - -def gen_token(client, grant_type, user, scope): # pylint: disable=[unused-argument] - """Generate tokens for tests""" - return "123456ABCDE" - -@pytest.mark.unit_test -@pytest.mark.parametrize( - "test_data,expected", - ((("group@lead.er", "password_for_user_001", 0), SUCCESS_RESULT), - (("group@mem.ber01", "password_for_user_002", 1), SUCCESS_RESULT), - (("group@mem.ber02", "password_for_user_003", 2), SUCCESS_RESULT), - (("unaff@iliated.user", "password_for_user_004", 3), SUCCESS_RESULT), - (("group@lead.er", "brrr", 0), USERNAME_PASSWORD_FAIL_RESULT), - (("group@mem.ber010", "password_for_user_002", 1), USERNAME_PASSWORD_FAIL_RESULT), - (("papa", "yada", 2), USERNAME_PASSWORD_FAIL_RESULT), - # (("unaff@iliated.user", "password_for_user_004", 1), USERNAME_PASSWORD_FAIL_RESULT) - )) -def test_token(fxtr_app, fxtr_oauth2_clients, test_data, expected): - """ - GIVEN: a registered oauth2 client, a user - WHEN: a token is requested via the 'password' grant - THEN: check that: - a) when email and password are valid, we get a token back - b) when either email or password or both are invalid, we get error message - back - c) TODO: when user tries to use wrong client, we get error message back - """ - conn, oa2clients = fxtr_oauth2_clients - email, password, client_idx = test_data - data = { - "grant_type": "password", "scope": "profile nonexistent-scope", - "client_id": oa2clients[client_idx].client_id, - "client_secret": oa2clients[client_idx].client_secret, - "username": email, "password": password} - - with fxtr_app.test_client() as client, db.cursor(conn) as cursor: - res = client.post("/api/oauth2/token", data=data) - # cleanup db - cursor.execute("DELETE FROM oauth2_tokens WHERE access_token=?", - (gen_token(None, None, None, None),)) - assert res.status_code == expected["status_code"] - for key in expected["result"]: - assert res.json[key] == expected["result"][key] |