From 8b7c598407a5fea9a3d78473e72df87606998cd4 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Fri, 4 Aug 2023 10:10:28 +0300 Subject: Copy over files from GN3 repository. --- tests/__init__.py | 0 tests/unit/__init__.py | 0 tests/unit/auth/__init__.py | 0 tests/unit/auth/conftest.py | 24 +++ tests/unit/auth/fixtures/__init__.py | 8 + tests/unit/auth/fixtures/group_fixtures.py | 147 ++++++++++++++++++ tests/unit/auth/fixtures/migration_fixtures.py | 51 +++++++ tests/unit/auth/fixtures/oauth2_client_fixtures.py | 51 +++++++ tests/unit/auth/fixtures/resource_fixtures.py | 25 +++ tests/unit/auth/fixtures/role_fixtures.py | 45 ++++++ tests/unit/auth/fixtures/user_fixtures.py | 66 ++++++++ tests/unit/auth/test_credentials.py | 100 ++++++++++++ tests/unit/auth/test_groups.py | 168 +++++++++++++++++++++ .../unit/auth/test_migrations_add_data_to_table.py | 79 ++++++++++ .../auth/test_migrations_add_remove_columns.py | 116 ++++++++++++++ tests/unit/auth/test_migrations_create_tables.py | 91 +++++++++++ tests/unit/auth/test_migrations_drop_tables.py | 63 ++++++++ tests/unit/auth/test_migrations_indexes.py | 97 ++++++++++++ ...tions_init_data_in_resource_categories_table.py | 60 ++++++++ ...test_migrations_insert_data_into_empty_table.py | 77 ++++++++++ tests/unit/auth/test_privileges.py | 46 ++++++ tests/unit/auth/test_resources.py | 117 ++++++++++++++ tests/unit/auth/test_roles.py | 123 +++++++++++++++ tests/unit/auth/test_token.py | 62 ++++++++ tests/unit/conftest.py | 35 +++++ 25 files changed, 1651 insertions(+) create mode 100644 tests/__init__.py create mode 100644 tests/unit/__init__.py create mode 100644 tests/unit/auth/__init__.py create mode 100644 tests/unit/auth/conftest.py create mode 100644 tests/unit/auth/fixtures/__init__.py create mode 100644 tests/unit/auth/fixtures/group_fixtures.py create mode 100644 tests/unit/auth/fixtures/migration_fixtures.py create mode 100644 tests/unit/auth/fixtures/oauth2_client_fixtures.py create mode 100644 tests/unit/auth/fixtures/resource_fixtures.py create mode 100644 tests/unit/auth/fixtures/role_fixtures.py create mode 100644 tests/unit/auth/fixtures/user_fixtures.py create mode 100644 tests/unit/auth/test_credentials.py create mode 100644 tests/unit/auth/test_groups.py create mode 100644 tests/unit/auth/test_migrations_add_data_to_table.py create mode 100644 tests/unit/auth/test_migrations_add_remove_columns.py create mode 100644 tests/unit/auth/test_migrations_create_tables.py create mode 100644 tests/unit/auth/test_migrations_drop_tables.py create mode 100644 tests/unit/auth/test_migrations_indexes.py create mode 100644 tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py create mode 100644 tests/unit/auth/test_migrations_insert_data_into_empty_table.py create mode 100644 tests/unit/auth/test_privileges.py create mode 100644 tests/unit/auth/test_resources.py create mode 100644 tests/unit/auth/test_roles.py create mode 100644 tests/unit/auth/test_token.py create mode 100644 tests/unit/conftest.py (limited to 'tests') diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/auth/__init__.py b/tests/unit/auth/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/auth/conftest.py b/tests/unit/auth/conftest.py new file mode 100644 index 0000000..a7c64a8 --- /dev/null +++ b/tests/unit/auth/conftest.py @@ -0,0 +1,24 @@ +"""Module for fixtures and test utilities""" +import uuid +import datetime +from contextlib import contextmanager + +from gn3.auth.authentication.oauth2.models.oauth2token import OAuth2Token + +from .fixtures import * # pylint: disable=[wildcard-import,unused-wildcard-import] + +def get_tokeniser(user): + """Get contextmanager for mocking token acquisition.""" + @contextmanager + def __token__(*args, **kwargs):# pylint: disable=[unused-argument] + yield { + usr.user_id: OAuth2Token( + token_id=uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), + client=None, token_type="Bearer", access_token="123456ABCDE", + refresh_token=None, revoked=False, expires_in=864000, + user=usr, issued_at=int(datetime.datetime.now().timestamp()), + scope="profile group role resource register-client") + for usr in TEST_USERS + }[user.user_id] + + return __token__ diff --git a/tests/unit/auth/fixtures/__init__.py b/tests/unit/auth/fixtures/__init__.py new file mode 100644 index 0000000..a675fc7 --- /dev/null +++ b/tests/unit/auth/fixtures/__init__.py @@ -0,0 +1,8 @@ +"""pytest's conftest as a module.""" +from .role_fixtures import * +from .user_fixtures import * +from .group_fixtures import * +from .resource_fixtures import * +# from .privilege_fixtures import * +from .migration_fixtures import * +from .oauth2_client_fixtures import * diff --git a/tests/unit/auth/fixtures/group_fixtures.py b/tests/unit/auth/fixtures/group_fixtures.py new file mode 100644 index 0000000..d7bbc56 --- /dev/null +++ b/tests/unit/auth/fixtures/group_fixtures.py @@ -0,0 +1,147 @@ +"""Fixtures and utilities for group-related tests""" +import uuid + +import pytest + +from gn3.auth import db +from gn3.auth.authorisation.groups import Group, GroupRole +from gn3.auth.authorisation.resources import Resource, ResourceCategory + +from .role_fixtures import RESOURCE_EDITOR_ROLE, RESOURCE_READER_ROLE + +TEST_GROUP_01 = Group(uuid.UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), + "TheTestGroup", {}) +TEST_GROUP_02 = Group(uuid.UUID("e37d59d7-c05e-4d67-b479-81e627d8d634"), + "AnotherTestGroup", {}) +TEST_GROUPS = (TEST_GROUP_01, TEST_GROUP_02) + +TEST_RESOURCES_GROUP_01 = ( + Resource(TEST_GROUPS[0], uuid.UUID("26ad1668-29f5-439d-b905-84d551f85955"), + "ResourceG01R01", + ResourceCategory(uuid.UUID("48056f84-a2a6-41ac-8319-0e1e212cba2a"), + "genotype", "Genotype Dataset"), + True), + Resource(TEST_GROUPS[0], uuid.UUID("2130aec0-fefd-434d-92fd-9ca342348b2d"), + "ResourceG01R02", + ResourceCategory(uuid.UUID("548d684b-d4d1-46fb-a6d3-51a56b7da1b3"), + "phenotype", "Phenotype (Publish) Dataset"), + False), + Resource(TEST_GROUPS[0], uuid.UUID("e9a1184a-e8b4-49fb-b713-8d9cbeea5b83"), + "ResourceG01R03", + ResourceCategory(uuid.UUID("fad071a3-2fc8-40b8-992b-cdefe7dcac79"), + "mrna", "mRNA Dataset"), + False)) + +TEST_RESOURCES_GROUP_02 = ( + Resource(TEST_GROUPS[1], uuid.UUID("14496a1c-c234-49a2-978c-8859ea274054"), + "ResourceG02R01", + ResourceCategory(uuid.UUID("48056f84-a2a6-41ac-8319-0e1e212cba2a"), + "genotype", "Genotype Dataset"), + False), + Resource(TEST_GROUPS[1], uuid.UUID("04ad9e09-94ea-4390-8a02-11f92999806b"), + "ResourceG02R02", + ResourceCategory(uuid.UUID("fad071a3-2fc8-40b8-992b-cdefe7dcac79"), + "mrna", "mRNA Dataset"), + True)) + +TEST_RESOURCES = TEST_RESOURCES_GROUP_01 + TEST_RESOURCES_GROUP_02 +TEST_RESOURCES_PUBLIC = (TEST_RESOURCES_GROUP_01[0], TEST_RESOURCES_GROUP_02[1]) + +def __gtuple__(cursor): + return tuple(dict(row) for row in cursor.fetchall()) + +@pytest.fixture(scope="function") +def fxtr_group(conn_after_auth_migrations):# pylint: disable=[redefined-outer-name] + """Fixture: setup a test group.""" + query = "INSERT INTO groups(group_id, group_name) VALUES (?, ?)" + with db.cursor(conn_after_auth_migrations) as cursor: + cursor.executemany( + query, tuple( + (str(group.group_id), group.group_name) + for group in TEST_GROUPS)) + + yield (conn_after_auth_migrations, TEST_GROUPS[0]) + + with db.cursor(conn_after_auth_migrations) as cursor: + cursor.executemany( + "DELETE FROM groups WHERE group_id=?", + ((str(group.group_id),) for group in TEST_GROUPS)) + +@pytest.fixture(scope="function") +def fxtr_users_in_group(fxtr_group, fxtr_users):# pylint: disable=[redefined-outer-name, unused-argument] + """Link the users to the groups.""" + conn, all_users = fxtr_users + users = tuple( + user for user in all_users if user.email not in ("unaff@iliated.user",)) + query_params = tuple( + (str(TEST_GROUP_01.group_id), str(user.user_id)) for user in users) + with db.cursor(conn) as cursor: + cursor.executemany( + "INSERT INTO group_users(group_id, user_id) VALUES (?, ?)", + query_params) + + yield (conn, TEST_GROUP_01, users) + + with db.cursor(conn) as cursor: + cursor.executemany( + "DELETE FROM group_users WHERE group_id=? AND user_id=?", + query_params) + +@pytest.fixture(scope="function") +def fxtr_group_roles(fxtr_group, fxtr_roles):# pylint: disable=[redefined-outer-name,unused-argument] + """Link roles to group""" + group_roles = ( + GroupRole(uuid.UUID("9c25efb2-b477-4918-a95c-9914770cbf4d"), + TEST_GROUP_01, RESOURCE_EDITOR_ROLE), + GroupRole(uuid.UUID("82aed039-fe2f-408c-ab1e-81cd1ba96630"), + TEST_GROUP_02, RESOURCE_READER_ROLE)) + conn, groups = fxtr_group + with db.cursor(conn) as cursor: + cursor.executemany( + "INSERT INTO group_roles VALUES (?, ?, ?)", + ((str(role.group_role_id), str(role.group.group_id), + str(role.role.role_id)) + for role in group_roles)) + + yield conn, groups, group_roles + + with db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM group_user_roles_on_resources") + cursor.executemany( + ("DELETE FROM group_roles " + "WHERE group_role_id=? AND group_id=? AND role_id=?"), + ((str(role.group_role_id), str(role.group.group_id), + str(role.role.role_id)) + for role in group_roles)) + +@pytest.fixture(scope="function") +def fxtr_group_user_roles(fxtr_resources, fxtr_group_roles, fxtr_users_in_group):#pylint: disable=[redefined-outer-name,unused-argument] + """Assign roles to users.""" + conn, _groups, group_roles = fxtr_group_roles + _conn, group_resources = fxtr_resources + _conn, _group, group_users = fxtr_users_in_group + users = tuple(user for user in group_users if user.email + not in ("unaff@iliated.user", "group@lead.er")) + users_roles_resources = ( + (user, RESOURCE_EDITOR_ROLE, TEST_RESOURCES_GROUP_01[1]) + for user in users if user.email == "group@mem.ber01") + with db.cursor(conn) as cursor: + params = tuple({ + "group_id": str(resource.group.group_id), + "user_id": str(user.user_id), + "role_id": str(role.role_id), + "resource_id": str(resource.resource_id) + } for user, role, resource in users_roles_resources) + cursor.executemany( + ("INSERT INTO group_user_roles_on_resources " + "VALUES (:group_id, :user_id, :role_id, :resource_id)"), + params) + + yield conn, group_users, group_roles, group_resources + + with db.cursor(conn) as cursor: + cursor.executemany( + ("DELETE FROM group_user_roles_on_resources WHERE " + "group_id=:group_id AND user_id=:user_id AND role_id=:role_id AND " + "resource_id=:resource_id"), + params) diff --git a/tests/unit/auth/fixtures/migration_fixtures.py b/tests/unit/auth/fixtures/migration_fixtures.py new file mode 100644 index 0000000..eb42c2b --- /dev/null +++ b/tests/unit/auth/fixtures/migration_fixtures.py @@ -0,0 +1,51 @@ +"""Fixtures and utilities for migration-related tests""" +import pytest +from yoyo.backends import DatabaseBackend +from yoyo import get_backend, read_migrations +from yoyo.migrations import Migration, MigrationList + +from gn3.auth import db +from gn3.migrations import apply_migrations, rollback_migrations + +@pytest.fixture(scope="session") +def auth_testdb_path(fxtr_app_config): # pylint: disable=redefined-outer-name + """Get the test application's auth database file""" + return fxtr_app_config["AUTH_DB"] + +@pytest.fixture(scope="session") +def auth_migrations_dir(fxtr_app_config): # pylint: disable=redefined-outer-name + """Get the test application's auth database file""" + return fxtr_app_config["AUTH_MIGRATIONS"] + +def apply_single_migration(backend: DatabaseBackend, migration: Migration):# pylint: disable=[redefined-outer-name] + """Utility to apply a single migration""" + apply_migrations(backend, MigrationList([migration])) + +def rollback_single_migration(backend: DatabaseBackend, migration: Migration):# pylint: disable=[redefined-outer-name] + """Utility to rollback a single migration""" + rollback_migrations(backend, MigrationList([migration])) + +@pytest.fixture(scope="session") +def backend(auth_testdb_path):# pylint: disable=redefined-outer-name + """Fixture: retrieve yoyo backend for auth database""" + return get_backend(f"sqlite:///{auth_testdb_path}") + +@pytest.fixture(scope="session") +def all_migrations(auth_migrations_dir): # pylint: disable=redefined-outer-name + """Retrieve all the migrations""" + return read_migrations(auth_migrations_dir) + +@pytest.fixture(scope="function") +def conn_after_auth_migrations(backend, auth_testdb_path, all_migrations): # pylint: disable=redefined-outer-name + """Run all migrations and return a connection to the database after""" + apply_migrations(backend, all_migrations) + with db.connection(auth_testdb_path) as conn: + yield conn + + rollback_migrations(backend, all_migrations) + +def migrations_up_to(migration, migrations_dir): + """Run all the migration before `migration`.""" + migrations = read_migrations(migrations_dir) + index = [mig.path for mig in migrations].index(migration) + return MigrationList(migrations[0:index]) diff --git a/tests/unit/auth/fixtures/oauth2_client_fixtures.py b/tests/unit/auth/fixtures/oauth2_client_fixtures.py new file mode 100644 index 0000000..654d048 --- /dev/null +++ b/tests/unit/auth/fixtures/oauth2_client_fixtures.py @@ -0,0 +1,51 @@ +"""Fixtures for OAuth2 clients""" +import uuid +import json +import datetime + +import pytest + +from gn3.auth import db +from gn3.auth.authentication.users import hash_password +from gn3.auth.authentication.oauth2.models.oauth2client import OAuth2Client + +@pytest.fixture(autouse=True) +def fxtr_patch_envvars(monkeypatch): + """Fixture: patch environment variable""" + monkeypatch.setenv("AUTHLIB_INSECURE_TRANSPORT", "true") + +@pytest.fixture +def fxtr_oauth2_clients(fxtr_users_with_passwords): + """Fixture: Create the OAuth2 clients for use with tests.""" + conn, users = fxtr_users_with_passwords + now = datetime.datetime.now() + + clients = tuple( + OAuth2Client(str(uuid.uuid4()), f"yabadabadoo_{idx:03}", now, + now + datetime.timedelta(hours = 2), + { + "client_name": f"test_client_{idx:03}", + "scope": ["profile", "group", "role", "resource", "register-client"], + "redirect_uri": "/test_oauth2", + "token_endpoint_auth_method": [ + "client_secret_post", "client_secret_basic"], + "grant_types": ["password", "authorisation_code", "refresh_token"], + "response_type": "token" + }, user) + for idx, user in enumerate(users, start=1)) + + with db.cursor(conn) as cursor: + cursor.executemany( + "INSERT INTO oauth2_clients VALUES (?, ?, ?, ?, ?, ?)", + ((str(client.client_id), hash_password(client.client_secret), + int(client.client_id_issued_at.timestamp()), + int(client.client_secret_expires_at.timestamp()), + json.dumps(client.client_metadata), str(client.user.user_id)) + for client in clients)) + + yield conn, clients + + with db.cursor(conn) as cursor: + cursor.executemany( + "DELETE FROM oauth2_clients WHERE client_id=?", + ((str(client.client_id),) for client in clients)) diff --git a/tests/unit/auth/fixtures/resource_fixtures.py b/tests/unit/auth/fixtures/resource_fixtures.py new file mode 100644 index 0000000..117b4f4 --- /dev/null +++ b/tests/unit/auth/fixtures/resource_fixtures.py @@ -0,0 +1,25 @@ +"""Fixtures and utilities for resource-related tests""" +import pytest + +from gn3.auth import db + +from .group_fixtures import TEST_RESOURCES + +@pytest.fixture(scope="function") +def fxtr_resources(fxtr_group):# pylint: disable=[redefined-outer-name] + """fixture: setup test resources in the database""" + conn, _group = fxtr_group + with db.cursor(conn) as cursor: + cursor.executemany( + "INSERT INTO resources VALUES (?,?,?,?,?)", + ((str(res.group.group_id), str(res.resource_id), res.resource_name, + str(res.resource_category.resource_category_id), + 1 if res.public else 0) for res in TEST_RESOURCES)) + + yield (conn, TEST_RESOURCES) + + with db.cursor(conn) as cursor: + cursor.executemany( + "DELETE FROM resources WHERE group_id=? AND resource_id=?", + ((str(res.group.group_id), str(res.resource_id),) + for res in TEST_RESOURCES)) diff --git a/tests/unit/auth/fixtures/role_fixtures.py b/tests/unit/auth/fixtures/role_fixtures.py new file mode 100644 index 0000000..ee86aa2 --- /dev/null +++ b/tests/unit/auth/fixtures/role_fixtures.py @@ -0,0 +1,45 @@ +"""Fixtures and utilities for role-related tests""" +import uuid + +import pytest + +from gn3.auth import db +from gn3.auth.authorisation.roles import Role +from gn3.auth.authorisation.privileges import Privilege + +RESOURCE_READER_ROLE = Role( + uuid.UUID("c3ca2507-ee24-4835-9b31-8c21e1c072d3"), "resource_reader", True, + (Privilege("group:resource:view-resource", + "view a resource and use it in computations"),)) + +RESOURCE_EDITOR_ROLE = Role( + uuid.UUID("89819f84-6346-488b-8955-86062e9eedb7"), "resource_editor", True, + ( + Privilege("group:resource:view-resource", + "view a resource and use it in computations"), + Privilege("group:resource:edit-resource", "edit/update a resource"))) + +TEST_ROLES = (RESOURCE_READER_ROLE, RESOURCE_EDITOR_ROLE) + +@pytest.fixture(scope="function") +def fxtr_roles(conn_after_auth_migrations): + """Setup some example roles.""" + with db.cursor(conn_after_auth_migrations) as cursor: + cursor.executemany( + ("INSERT INTO roles VALUES (?, ?, ?)"), + ((str(role.role_id), role.role_name, 1) for role in TEST_ROLES)) + cursor.executemany( + ("INSERT INTO role_privileges VALUES (?, ?)"), + ((str(role.role_id), str(privilege.privilege_id)) + for role in TEST_ROLES for privilege in role.privileges)) + + yield conn_after_auth_migrations, TEST_ROLES + + with db.cursor(conn_after_auth_migrations) as cursor: + cursor.executemany( + ("DELETE FROM role_privileges WHERE role_id=? AND privilege_id=?"), + ((str(role.role_id), str(privilege.privilege_id)) + for role in TEST_ROLES for privilege in role.privileges)) + cursor.executemany( + ("DELETE FROM roles WHERE role_id=?"), + ((str(role.role_id),) for role in TEST_ROLES)) diff --git a/tests/unit/auth/fixtures/user_fixtures.py b/tests/unit/auth/fixtures/user_fixtures.py new file mode 100644 index 0000000..d248f54 --- /dev/null +++ b/tests/unit/auth/fixtures/user_fixtures.py @@ -0,0 +1,66 @@ +"""Fixtures and utilities for user-related tests""" +import uuid + +import pytest + +from gn3.auth import db +from gn3.auth.authentication.users import User, hash_password + +TEST_USERS = ( + User(uuid.UUID("ecb52977-3004-469e-9428-2a1856725c7f"), "group@lead.er", + "Group Leader"), + User(uuid.UUID("21351b66-8aad-475b-84ac-53ce528451e3"), + "group@mem.ber01", "Group Member 01"), + User(uuid.UUID("ae9c6245-0966-41a5-9a5e-20885a96bea7"), + "group@mem.ber02", "Group Member 02"), + User(uuid.UUID("9a0c7ce5-2f40-4e78-979e-bf3527a59579"), + "unaff@iliated.user", "Unaffiliated User")) + +@pytest.fixture(scope="function") +def fxtr_users(conn_after_auth_migrations):# pylint: disable=[redefined-outer-name] + """Fixture: setup test users.""" + query = "INSERT INTO users(user_id, email, name) VALUES (?, ?, ?)" + query_user_roles = "INSERT INTO user_roles(user_id, role_id) VALUES (?, ?)" + test_user_roles = ( + ("ecb52977-3004-469e-9428-2a1856725c7f", + "a0e67630-d502-4b9f-b23f-6805d0f30e30"), + ("ecb52977-3004-469e-9428-2a1856725c7f", + "ade7e6b0-ba9c-4b51-87d0-2af7fe39a347")) + with db.cursor(conn_after_auth_migrations) as cursor: + cursor.executemany(query, ( + (str(user.user_id), user.email, user.name) for user in TEST_USERS)) + cursor.executemany(query_user_roles, test_user_roles) + + yield (conn_after_auth_migrations, TEST_USERS) + + with db.cursor(conn_after_auth_migrations) as cursor: + cursor.executemany( + "DELETE FROM user_roles WHERE user_id=?", + (("ecb52977-3004-469e-9428-2a1856725c7f",),)) + cursor.executemany( + "DELETE FROM users WHERE user_id=?", + (("ecb52977-3004-469e-9428-2a1856725c7f",), + ("21351b66-8aad-475b-84ac-53ce528451e3",), + ("ae9c6245-0966-41a5-9a5e-20885a96bea7",), + ("9a0c7ce5-2f40-4e78-979e-bf3527a59579",))) + +@pytest.fixture(scope="function") +def fxtr_users_with_passwords(fxtr_users): # pylint: disable=[redefined-outer-name] + """Fixture: add passwords to the users""" + conn, users = fxtr_users + user_passwords_params = tuple( + (str(user.user_id), hash_password( + f"password_for_user_{idx:03}".encode("utf8"))) + for idx, user in enumerate(users, start=1)) + + with db.cursor(conn) as cursor: + cursor.executemany( + "INSERT INTO user_credentials VALUES (?, ?)", + user_passwords_params) + + yield conn, users + + with db.cursor(conn) as cursor: + cursor.executemany( + "DELETE FROM user_credentials WHERE user_id=?", + ((item[0],) for item in user_passwords_params)) diff --git a/tests/unit/auth/test_credentials.py b/tests/unit/auth/test_credentials.py new file mode 100644 index 0000000..f2a3d25 --- /dev/null +++ b/tests/unit/auth/test_credentials.py @@ -0,0 +1,100 @@ +"""Test the credentials checks""" +import pytest +from yoyo.migrations import MigrationList +from hypothesis import given, settings, strategies, HealthCheck + +from gn3.auth import db +from gn3.auth.authentication import credentials_in_database +from gn3.migrations import get_migration, apply_migrations, rollback_migrations + +from tests.unit.auth.conftest import migrations_up_to + +@pytest.fixture +def with_credentials_table(backend, auth_testdb_path): + """ + Fixture: Yield a connection object with the 'user_credentials' table + created. + """ + migrations_dir = "migrations/auth" + migration = f"{migrations_dir}/20221103_02_sGrIs-create-user-credentials-table.py" + migrations = (migrations_up_to(migration, migrations_dir) + + MigrationList([get_migration(migration)])) + apply_migrations(backend, migrations) + with db.connection(auth_testdb_path) as conn: + yield conn + + rollback_migrations(backend, migrations) + +@pytest.fixture +def with_credentials(with_credentials_table):# pylint: disable=redefined-outer-name + """ + Fixture: Initialise the database with some user credentials. + """ + with db.cursor(with_credentials_table) as cursor: + cursor.executemany( + "INSERT INTO users VALUES (:user_id, :email, :name)", + ({"user_id": "82552014-21ee-4321-b96a-b8788b97b862", + "email": "first@test.user", + "name": "First Test User" + }, + {"user_id": "bdd5cb7a-072d-4c2b-9872-d0cecb718523", + "email": "second@test.user", + "name": "Second Test User" + })) + cursor.executemany( + "INSERT INTO user_credentials VALUES (:user_id, :password)", + ({"user_id": "82552014-21ee-4321-b96a-b8788b97b862", + "password": b'$2b$12$LAh1PYtUgAFK7d5fA0EfL.4AdTZuYEAfzwO.p.jXVboxcP8bXNj7a' + }, + {"user_id": "bdd5cb7a-072d-4c2b-9872-d0cecb718523", + "password": b'$2b$12$zX77QCFSJuwIjAZGc0Jq5.rCWMHEMKD9Zf3Ay4C0AzwsiZ7SSPdKO' + })) + + yield with_credentials_table + + cursor.executemany("DELETE FROM user_credentials WHERE user_id=?", + (("82552014-21ee-4321-b96a-b8788b97b862",), + ("bdd5cb7a-072d-4c2b-9872-d0cecb718523",))) + cursor.executemany("DELETE FROM users WHERE user_id=?", + (("82552014-21ee-4321-b96a-b8788b97b862",), + ("bdd5cb7a-072d-4c2b-9872-d0cecb718523",))) + +@pytest.mark.unit_test +@given(strategies.emails(), strategies.text()) +@settings(suppress_health_check=[HealthCheck.function_scoped_fixture]) +def test_credentials_not_in_database(with_credentials, email, password):# pylint: disable=redefined-outer-name + """ + GIVEN: credentials that do not exist in the database + WHEN: the `credentials_in_database` function is run against the credentials + THEN: check that the function returns false in all cases. + """ + with db.cursor(with_credentials) as cursor: + assert credentials_in_database(cursor, email, password) is False + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "email,password", + (("first@test.user", "wrongpassword"), + ("first@tes.user", "testuser01"))) +def test_partially_wrong_credentials(with_credentials, email, password):# pylint: disable=redefined-outer-name + """ + GIVEN: credentials that exist in the database + WHEN: the credentials are checked with partially wrong values + THEN: the check fails since the credentials are not correct + """ + with db.cursor(with_credentials) as cursor: + assert credentials_in_database(cursor, email, password) is False + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "email,password", + (("first@test.user", "testuser01"), + ("second@test.user", "testuser02"))) +def test_partially_correct_credentials(with_credentials, email, password):# pylint: disable=redefined-outer-name + """ + GIVEN: credentials that exist in the database + WHEN: the credentials are checked with correct values + THEN: the check passes + """ + with db.cursor(with_credentials) as cursor: + assert credentials_in_database(cursor, email, password) is True diff --git a/tests/unit/auth/test_groups.py b/tests/unit/auth/test_groups.py new file mode 100644 index 0000000..4824e14 --- /dev/null +++ b/tests/unit/auth/test_groups.py @@ -0,0 +1,168 @@ +"""Test functions dealing with group management.""" +from uuid import UUID + +import pytest +from pymonad.maybe import Nothing + +from gn3.auth import db +from gn3.auth.authentication.users import User +from gn3.auth.authorisation.roles import Role +from gn3.auth.authorisation.privileges import Privilege +from gn3.auth.authorisation.errors import AuthorisationError +from gn3.auth.authorisation.groups.models import ( + Group, GroupRole, user_group, create_group, create_group_role) + +from tests.unit.auth import conftest + +create_group_failure = { + "status": "error", + "message": "Unauthorised: Failed to create group." +} + +uuid_fn = lambda : UUID("d32611e3-07fc-4564-b56c-786c6db6de2b") + +GROUP = Group(UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup", + {"group_description": "The test group"}) +PRIVILEGES = ( + Privilege( + "group:resource:view-resource", + "view a resource and use it in computations"), + Privilege("group:resource:edit-resource", "edit/update a resource")) + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", tuple(zip(conftest.TEST_USERS[0:1], ( + Group( + UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_group", + {"group_description": "A test group"}), + create_group_failure, create_group_failure, create_group_failure, + create_group_failure)))) +def test_create_group(# pylint: disable=[too-many-arguments] + fxtr_app, auth_testdb_path, mocker, fxtr_users, user, expected):# pylint: disable=[unused-argument] + """ + GIVEN: an authenticated user + WHEN: the user attempts to create a group + THEN: verify they are only able to create the group if they have the + appropriate privileges + """ + mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn) + mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", + conftest.get_tokeniser(user)) + with db.connection(auth_testdb_path) as conn: + assert create_group( + conn, "a_test_group", user, "A test group") == expected + +@pytest.mark.unit_test +@pytest.mark.parametrize("user", conftest.TEST_USERS[1:]) +def test_create_group_raises_exception_with_non_privileged_user(# pylint: disable=[too-many-arguments] + fxtr_app, auth_testdb_path, mocker, fxtr_users, user):# pylint: disable=[unused-argument] + """ + GIVEN: an authenticated user, without appropriate privileges + WHEN: the user attempts to create a group + THEN: verify the system raises an exception + """ + mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn) + mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", + conftest.get_tokeniser(user)) + with db.connection(auth_testdb_path) as conn: + with pytest.raises(AuthorisationError): + assert create_group(conn, "a_test_group", user, "A test group") + +create_role_failure = { + "status": "error", + "message": "Unauthorised: Could not create the group role" +} + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", tuple(zip(conftest.TEST_USERS[0:1], ( + GroupRole( + UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), + GROUP, + Role(UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), + "ResourceEditor", True, PRIVILEGES)),)))) +def test_create_group_role(mocker, fxtr_users_in_group, user, expected): + """ + GIVEN: an authenticated user + WHEN: the user attempts to create a role, attached to a group + THEN: verify they are only able to create the role if they have the + appropriate privileges and that the role is attached to the given group + """ + mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn) + mocker.patch("gn3.auth.authorisation.roles.models.uuid4", uuid_fn) + mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", + conftest.get_tokeniser(user)) + conn, _group, _users = fxtr_users_in_group + with db.cursor(conn) as cursor: + assert create_group_role( + conn, GROUP, "ResourceEditor", PRIVILEGES) == expected + # cleanup + cursor.execute( + ("DELETE FROM group_roles " + "WHERE group_role_id=? AND group_id=? AND role_id=?"), + (str(uuid_fn()), str(GROUP.group_id), str(uuid_fn()))) + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", tuple(zip(conftest.TEST_USERS[1:], ( + create_role_failure, create_role_failure, create_role_failure)))) +def test_create_group_role_raises_exception_with_unauthorised_users( + mocker, fxtr_users_in_group, user, expected): + """ + GIVEN: an authenticated user + WHEN: the user attempts to create a role, attached to a group + THEN: verify they are only able to create the role if they have the + appropriate privileges and that the role is attached to the given group + """ + mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn) + mocker.patch("gn3.auth.authorisation.roles.models.uuid4", uuid_fn) + mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", + conftest.get_tokeniser(user)) + conn, _group, _users = fxtr_users_in_group + with pytest.raises(AuthorisationError): + assert create_group_role( + conn, GROUP, "ResourceEditor", PRIVILEGES) == expected + +@pytest.mark.unit_test +def test_create_multiple_groups(mocker, fxtr_users): + """ + GIVEN: An authenticated user with appropriate authorisation + WHEN: The user attempts to create a new group, while being a member of an + existing group + THEN: The system should prevent that, and respond with an appropriate error + message + """ + mocker.patch("gn3.auth.authorisation.groups.models.uuid4", uuid_fn) + user = User( + UUID("ecb52977-3004-469e-9428-2a1856725c7f"), "group@lead.er", + "Group Leader") + mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", + conftest.get_tokeniser(user)) + conn, _test_users = fxtr_users + # First time, successfully creates the group + assert create_group(conn, "a_test_group", user) == Group( + UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_group", + {}) + # subsequent attempts should fail + with pytest.raises(AuthorisationError): + create_group(conn, "another_test_group", user) + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", + tuple(zip( + conftest.TEST_USERS, + (([Group(UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup", {})] * 3) + + [Nothing])))) +def test_user_group(fxtr_users_in_group, user, expected): + """ + GIVEN: A bunch of registered users, some of whom are members of a group, and + others are not + WHEN: a particular user's group is requested, + THEN: return a Maybe containing the group that the user belongs to, or + Nothing + """ + conn, _group, _users = fxtr_users_in_group + assert ( + user_group(conn, user).maybe(Nothing, lambda val: val) + == expected) diff --git a/tests/unit/auth/test_migrations_add_data_to_table.py b/tests/unit/auth/test_migrations_add_data_to_table.py new file mode 100644 index 0000000..9cb5d0c --- /dev/null +++ b/tests/unit/auth/test_migrations_add_data_to_table.py @@ -0,0 +1,79 @@ +"""Test data insertion when migrations are run.""" +import pytest + +from gn3.auth import db +from gn3.migrations import get_migration, apply_migrations, rollback_migrations +from tests.unit.auth.conftest import ( + apply_single_migration, rollback_single_migration, migrations_up_to) + +test_params = ( + ("20221116_01_nKUmX-add-privileges-to-group-leader-role.py", + ("SELECT role_id, privilege_id FROM role_privileges " + "WHERE role_id=? AND privilege_id IN (?, ?, ?, ?)"), + ("a0e67630-d502-4b9f-b23f-6805d0f30e30", + "221660b1-df05-4be1-b639-f010269dbda9", + "7bcca363-cba9-4169-9e31-26bdc6179b28", + "5103cc68-96f8-4ebb-83a4-a31692402c9b", + "1c59eff5-9336-4ed2-a166-8f70d4cb012e"), + (("a0e67630-d502-4b9f-b23f-6805d0f30e30", + "221660b1-df05-4be1-b639-f010269dbda9"), + ("a0e67630-d502-4b9f-b23f-6805d0f30e30", + "7bcca363-cba9-4169-9e31-26bdc6179b28"), + ("a0e67630-d502-4b9f-b23f-6805d0f30e30", + "5103cc68-96f8-4ebb-83a4-a31692402c9b"), + ("a0e67630-d502-4b9f-b23f-6805d0f30e30", + "1c59eff5-9336-4ed2-a166-8f70d4cb012e"))),) + +@pytest.mark.unit_test +@pytest.mark.parametrize("migration_file,query,query_params,data", test_params) +def test_apply_insert(# pylint: disable=[too-many-arguments] + auth_migrations_dir, backend, auth_testdb_path, migration_file, query, + query_params, data): + """ + GIVEN: a database migration script + WHEN: the script is applied + THEN: ensure the given data exists in the table + """ + migration_path=f"{auth_migrations_dir}/{migration_file}" + older_migrations = migrations_up_to(migration_path, auth_migrations_dir) + the_migration = get_migration(migration_path) + apply_migrations(backend, older_migrations) + with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor: + cursor.execute(query, query_params) + result_before_migration = cursor.fetchall() + apply_single_migration(backend, the_migration) + cursor.execute(query, query_params) + result_after_migration = cursor.fetchall() + + rollback_migrations(backend, older_migrations + [the_migration]) + assert len(result_before_migration) == 0, "Expected no results before migration" + assert sorted(result_after_migration) == sorted(data) + +@pytest.mark.unit_test +@pytest.mark.parametrize("migration_file,query,query_params,data", test_params) +def test_rollback_insert(# pylint: disable=[too-many-arguments] + auth_migrations_dir, backend, auth_testdb_path, migration_file, query, + query_params, data): + """ + GIVEN: a database migration script + WHEN: the script is rolled back + THEN: ensure the given data no longer exists in the database + """ + migration_path=f"{auth_migrations_dir}/{migration_file}" + older_migrations = migrations_up_to(migration_path, auth_migrations_dir) + the_migration = get_migration(migration_path) + apply_migrations(backend, older_migrations) + with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor: + cursor.execute(query, query_params) + result_before_migration = cursor.fetchall() + apply_single_migration(backend, the_migration) + cursor.execute(query, query_params) + result_after_migration = cursor.fetchall() + rollback_single_migration(backend, the_migration) + cursor.execute(query, query_params) + result_after_rollback = cursor.fetchall() + + rollback_migrations(backend, older_migrations) + assert len(result_before_migration) == 0, "Expected no results before migration" + assert sorted(result_after_migration) == sorted(data) + assert len(result_after_rollback) == 0, "Expected no results after rollback" diff --git a/tests/unit/auth/test_migrations_add_remove_columns.py b/tests/unit/auth/test_migrations_add_remove_columns.py new file mode 100644 index 0000000..ea9bf7b --- /dev/null +++ b/tests/unit/auth/test_migrations_add_remove_columns.py @@ -0,0 +1,116 @@ +"""Test migrations that alter tables adding/removing columns.""" +import pytest + +from gn3.auth import db +from gn3.migrations import get_migration, apply_migrations, rollback_migrations +from tests.unit.auth.conftest import ( + apply_single_migration, rollback_single_migration, migrations_up_to) + +QUERY = "SELECT sql FROM sqlite_schema WHERE name=?" + +TEST_PARAMS = ( + ("20221109_01_HbD5F-add-resource-meta-field-to-resource-categories-field.py", + "resource_categories", "resource_meta TEXT", True), + (("20221110_08_23psB-add-privilege-category-and-privilege-description-" + "columns-to-privileges-table.py"), + "privileges", "privilege_category TEXT", True), + (("20221110_08_23psB-add-privilege-category-and-privilege-description-" + "columns-to-privileges-table.py"), + "privileges", "privilege_description TEXT", True), + ("20221117_01_RDlfx-modify-group-roles-add-group-role-id.py", "group_roles", + "group_role_id", True), + ("20221208_01_sSdHz-add-public-column-to-resources-table.py", "resources", + "public", True)) + +def found(haystack: str, needle: str) -> bool: + """Check whether `needle` is found in `haystack`""" + return any( + (line.strip().find(needle) >= 0) for line in haystack.split("\n")) + +def pristine_before_migration(adding: bool, result_str: str, column: str) -> bool: + """Check that database is pristine before running the migration""" + col_was_found = found(result_str, column) + if adding: + return not col_was_found + return col_was_found + +def applied_successfully(adding: bool, result_str: str, column: str) -> bool: + """Check that the migration ran successfully""" + col_was_found = found(result_str, column) + if adding: + return col_was_found + return not col_was_found + +def rolled_back_successfully(adding: bool, result_str: str, column: str) -> bool: + """Check that the migration ran successfully""" + col_was_found = found(result_str, column) + if adding: + return not col_was_found + return col_was_found + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "migration_file,the_table,the_column,adding", TEST_PARAMS) +def test_apply_add_remove_column(# pylint: disable=[too-many-arguments] + auth_migrations_dir, auth_testdb_path, backend, migration_file, + the_table, the_column, adding): + """ + GIVEN: A migration that alters a table, adding or removing a column + WHEN: The migration is applied + THEN: Ensure the column exists if `adding` is True, otherwise, ensure the + column has been dropped + """ + migration_path = f"{auth_migrations_dir}/{migration_file}" + older_migrations = migrations_up_to(migration_path, auth_migrations_dir) + the_migration = get_migration(migration_path) + apply_migrations(backend, older_migrations) + with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: + cursor.execute(QUERY, (the_table,)) + results_before_migration = cursor.fetchone() + apply_single_migration(backend, the_migration) + cursor.execute(QUERY, (the_table,)) + results_after_migration = cursor.fetchone() + + rollback_migrations(backend, older_migrations + [the_migration]) + + assert pristine_before_migration( + adding, results_before_migration[0], the_column), ( + f"Column `{the_column}` exists before migration and should not" + if adding else + f"Column `{the_column}` doesn't exist before migration and it should") + assert applied_successfully( + adding, results_after_migration[0], the_column), "Migration failed" + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "migration_file,the_table,the_column,adding", TEST_PARAMS) +def test_rollback_add_remove_column(# pylint: disable=[too-many-arguments] + auth_migrations_dir, auth_testdb_path, backend, migration_file, + the_table, the_column, adding): + """ + GIVEN: A migration that alters a table, adding or removing a column + WHEN: The migration is applied + THEN: Ensure the column is dropped if `adding` is True, otherwise, ensure + the column has been restored + """ + migration_path = f"{auth_migrations_dir}/{migration_file}" + older_migrations = migrations_up_to(migration_path, auth_migrations_dir) + the_migration = get_migration(migration_path) + apply_migrations(backend, older_migrations) + apply_single_migration(backend, the_migration) + with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: + cursor.execute(QUERY, (the_table,)) + results_before_rollback = cursor.fetchone() + rollback_single_migration(backend, the_migration) + cursor.execute(QUERY, (the_table,)) + results_after_rollback = cursor.fetchone() + + rollback_migrations(backend, older_migrations + [the_migration]) + + assert pristine_before_migration( + not adding, results_before_rollback[0], the_column), ( + f"Column `{the_column}` doesn't exist before rollback and it should" + if adding else + f"Column `{the_column}` exists before rollback and should not") + assert rolled_back_successfully( + adding, results_after_rollback[0], the_column), "Rollback failed" diff --git a/tests/unit/auth/test_migrations_create_tables.py b/tests/unit/auth/test_migrations_create_tables.py new file mode 100644 index 0000000..2b8140b --- /dev/null +++ b/tests/unit/auth/test_migrations_create_tables.py @@ -0,0 +1,91 @@ +"""Test migrations that create tables""" +import pytest + +from gn3.auth import db +from gn3.migrations import get_migration, apply_migrations, rollback_migrations +from tests.unit.auth.conftest import ( + apply_single_migration, rollback_single_migration, migrations_up_to) + +migrations_and_tables = ( + ("20221103_01_js9ub-initialise-the-auth-entic-oris-ation-database.py", + "users"), + ("20221103_02_sGrIs-create-user-credentials-table.py", "user_credentials"), + ("20221108_01_CoxYh-create-the-groups-table.py", "groups"), + ("20221108_02_wxTr9-create-privileges-table.py", "privileges"), + ("20221108_03_Pbhb1-create-resource-categories-table.py", "resource_categories"), + ("20221110_01_WtZ1I-create-resources-table.py", "resources"), + ("20221110_05_BaNtL-create-roles-table.py", "roles"), + ("20221110_06_Pq2kT-create-generic-roles-table.py", "generic_roles"), + ("20221110_07_7WGa1-create-role-privileges-table.py", "role_privileges"), + ("20221114_01_n8gsF-create-generic-role-privileges-table.py", + "generic_role_privileges"), + ("20221114_03_PtWjc-create-group-roles-table.py", "group_roles"), + ("20221114_05_hQun6-create-user-roles-table.py", "user_roles"), + ("20221117_02_fmuZh-create-group-users-table.py", "group_users"), + ("20221206_01_BbeF9-create-group-user-roles-on-resources-table.py", + "group_user_roles_on_resources"), + ("20221219_01_CI3tN-create-oauth2-clients-table.py", "oauth2_clients"), + ("20221219_02_buSEU-create-oauth2-tokens-table.py", "oauth2_tokens"), + ("20221219_03_PcTrb-create-authorisation-code-table.py", + "authorisation_code"), + ("20230207_01_r0bkZ-create-group-join-requests-table.py", + "group_join_requests"), + ("20230322_01_0dDZR-create-linked-phenotype-data-table.py", + "linked_phenotype_data"), + ("20230322_02_Ll854-create-phenotype-resources-table.py", + "phenotype_resources"), + ("20230404_01_VKxXg-create-linked-genotype-data-table.py", + "linked_genotype_data"), + ("20230404_02_la33P-create-genotype-resources-table.py", + "genotype_resources"), + ("20230410_01_8mwaf-create-linked-mrna-data-table.py", "linked_mrna_data"), + ("20230410_02_WZqSf-create-mrna-resources-table.py", "mrna_resources")) + +@pytest.mark.unit_test +@pytest.mark.parametrize("migration_file,the_table", migrations_and_tables) +def test_create_table( + auth_testdb_path, auth_migrations_dir, backend, migration_file, + the_table): + """ + GIVEN: A database migration script to create table, `the_table` + WHEN: The migration is applied + THEN: Ensure that the table `the_table` is created + """ + migration_path=f"{auth_migrations_dir}/{migration_file}" + older_migrations = migrations_up_to(migration_path, auth_migrations_dir) + apply_migrations(backend, older_migrations) + with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: + cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") + result_before_migration = cursor.fetchall() + apply_single_migration(backend, get_migration(migration_path)) + cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") + result_after_migration = cursor.fetchall() + + rollback_migrations(backend, older_migrations) + assert the_table not in [row[0] for row in result_before_migration] + assert the_table in [row[0] for row in result_after_migration] + +@pytest.mark.unit_test +@pytest.mark.parametrize("migration_file,the_table", migrations_and_tables) +def test_rollback_create_table( + auth_testdb_path, auth_migrations_dir, backend, migration_file, + the_table): + """ + GIVEN: A database migration script to create the table `the_table` + WHEN: The migration is rolled back + THEN: Ensure that the table `the_table` no longer exists + """ + migration_path=f"{auth_migrations_dir}/{migration_file}" + older_migrations = migrations_up_to(migration_path, auth_migrations_dir) + apply_migrations(backend, older_migrations) + with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: + apply_single_migration(backend, get_migration(migration_path)) + cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") + result_after_migration = cursor.fetchall() + rollback_single_migration(backend, get_migration(migration_path)) + cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") + result_after_rollback = cursor.fetchall() + + rollback_migrations(backend, older_migrations) + assert the_table in [row[0] for row in result_after_migration] + assert the_table not in [row[0] for row in result_after_rollback] diff --git a/tests/unit/auth/test_migrations_drop_tables.py b/tests/unit/auth/test_migrations_drop_tables.py new file mode 100644 index 0000000..2362c77 --- /dev/null +++ b/tests/unit/auth/test_migrations_drop_tables.py @@ -0,0 +1,63 @@ +"""Test migrations that create tables""" + +import pytest + +from gn3.auth import db +from gn3.migrations import get_migration, apply_migrations, rollback_migrations +from tests.unit.auth.conftest import ( + apply_single_migration, rollback_single_migration, migrations_up_to) + +test_params = ( + ("20221114_02_DKKjn-drop-generic-role-tables.py", "generic_roles"), + ("20221114_02_DKKjn-drop-generic-role-tables.py", "generic_role_privileges")) + +@pytest.mark.unit_test +@pytest.mark.parametrize("migration_file,the_table", test_params) +def test_drop_table( + auth_testdb_path, auth_migrations_dir, backend, + migration_file, the_table): + """ + GIVEN: A database migration script to create table, `the_table` + WHEN: The migration is applied + THEN: Ensure that the table `the_table` is created + """ + migration_path=f"{auth_migrations_dir}/{migration_file}" + older_migrations = migrations_up_to(migration_path, auth_migrations_dir) + the_migration = get_migration(migration_path) + apply_migrations(backend, older_migrations) + with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: + cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") + result_before_migration = cursor.fetchall() + apply_single_migration(backend, the_migration) + cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") + result_after_migration = cursor.fetchall() + + rollback_migrations(backend, older_migrations + [the_migration]) + assert the_table in [row[0] for row in result_before_migration] + assert the_table not in [row[0] for row in result_after_migration] + +@pytest.mark.unit_test +@pytest.mark.parametrize("migration_file,the_table", test_params) +def test_rollback_drop_table( + auth_testdb_path, auth_migrations_dir, backend, migration_file, + the_table): + """ + GIVEN: A database migration script to create the table `the_table` + WHEN: The migration is rolled back + THEN: Ensure that the table `the_table` no longer exists + """ + migration_path=f"{auth_migrations_dir}/{migration_file}" + older_migrations = migrations_up_to(migration_path, auth_migrations_dir) + the_migration = get_migration(migration_path) + apply_migrations(backend, older_migrations) + with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: + apply_single_migration(backend, the_migration) + cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") + result_after_migration = cursor.fetchall() + rollback_single_migration(backend, the_migration) + cursor.execute("SELECT name FROM sqlite_schema WHERE type='table'") + result_after_rollback = cursor.fetchall() + + rollback_migrations(backend, older_migrations) + assert the_table not in [row[0] for row in result_after_migration] + assert the_table in [row[0] for row in result_after_rollback] diff --git a/tests/unit/auth/test_migrations_indexes.py b/tests/unit/auth/test_migrations_indexes.py new file mode 100644 index 0000000..b1f06d9 --- /dev/null +++ b/tests/unit/auth/test_migrations_indexes.py @@ -0,0 +1,97 @@ +"""Test that indexes are created and removed.""" +import pytest + +from gn3.auth import db +from gn3.migrations import get_migration, apply_migrations, rollback_migrations +from tests.unit.auth.conftest import ( + apply_single_migration, rollback_single_migration, migrations_up_to) + +QUERY = """ +SELECT name FROM sqlite_master WHERE type='index' AND tbl_name = ? +AND name= ? +""" + +migrations_tables_and_indexes = ( + ("20221110_07_7WGa1-create-role-privileges-table.py", "role_privileges", + "idx_tbl_role_privileges_cols_role_id"), + ("20221114_01_n8gsF-create-generic-role-privileges-table.py", + "generic_role_privileges", + "idx_tbl_generic_role_privileges_cols_generic_role_id"), + ("20221114_03_PtWjc-create-group-roles-table.py", "group_roles", + "idx_tbl_group_roles_cols_group_id"), + ("20221114_05_hQun6-create-user-roles-table.py", "user_roles", + "idx_tbl_user_roles_cols_user_id"), + ("20221117_02_fmuZh-create-group-users-table.py", "group_users", + "tbl_group_users_cols_group_id"), + ("20221206_01_BbeF9-create-group-user-roles-on-resources-table.py", + "group_user_roles_on_resources", + "idx_tbl_group_user_roles_on_resources_group_user_resource")) + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "migration_file,the_table,the_index", migrations_tables_and_indexes) +def test_index_created(# pylint: disable=[too-many-arguments] + auth_testdb_path, auth_migrations_dir, backend, migration_file, + the_table, the_index): + """ + GIVEN: A database migration + WHEN: The migration is applied + THEN: Ensure the given index is created for the provided table + """ + migration_path=f"{auth_migrations_dir}/{migration_file}" + older_migrations = migrations_up_to(migration_path, auth_migrations_dir) + the_migration = get_migration(migration_path) + query_params = (the_table, the_index) + apply_migrations(backend, older_migrations) + with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: + cursor.execute(QUERY, query_params) + result_before_migration = cursor.fetchall() + apply_single_migration(backend, the_migration) + cursor.execute(QUERY, query_params) + result_after_migration = cursor.fetchall() + + rollback_migrations(backend, older_migrations + [the_migration]) + assert the_index not in [row[0] for row in result_before_migration], ( + f"Index '{the_index}' was found for table '{the_table}' before migration.") + assert ( + len(result_after_migration) == 1 + and result_after_migration[0][0] == the_index), ( + f"Index '{the_index}' was not found for table '{the_table}' after migration.") + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "migration_file,the_table,the_index", migrations_tables_and_indexes) +def test_index_dropped(# pylint: disable=[too-many-arguments] + auth_testdb_path, auth_migrations_dir, backend, migration_file, + the_table, the_index): + """ + GIVEN: A database migration + WHEN: The migration is rolled-back + THEN: Ensure the given index no longer exists for the given table + """ + migration_path=f"{auth_migrations_dir}/{migration_file}" + older_migrations = migrations_up_to(migration_path, auth_migrations_dir) + the_migration = get_migration(migration_path) + query_params = (the_table, the_index) + apply_migrations(backend, older_migrations) + with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: + cursor.execute(QUERY, query_params) + result_before_migration = cursor.fetchall() + apply_single_migration(backend, the_migration) + cursor.execute(QUERY, query_params) + result_after_migration = cursor.fetchall() + rollback_single_migration(backend, the_migration) + cursor.execute(QUERY, query_params) + result_after_rollback = cursor.fetchall() + + rollback_migrations(backend, older_migrations) + assert the_index not in [row[0] for row in result_before_migration], ( + f"Index '{the_index}' was found for table '{the_table}' before " + "migration") + assert ( + len(result_after_migration) == 1 + and result_after_migration[0][0] == the_index), ( + f"Index '{the_index}' was not found for table '{the_table}' after migration.") + assert the_index not in [row[0] for row in result_after_rollback], ( + f"Index '{the_index}' was found for table '{the_table}' after " + "rollback") diff --git a/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py b/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py new file mode 100644 index 0000000..dd3d4c6 --- /dev/null +++ b/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py @@ -0,0 +1,60 @@ +""" +Test that the `resource_categories` table is initialised with the startup data. +""" +import pytest + +from gn3.auth import db +from gn3.migrations import get_migration, apply_migrations, rollback_migrations +from tests.unit.auth.conftest import ( + apply_single_migration, rollback_single_migration, migrations_up_to) + +MIGRATION_PATH = "migrations/auth/20221108_04_CKcSL-init-data-in-resource-categories-table.py" + +@pytest.mark.unit_test +def test_apply_init_data(auth_testdb_path, auth_migrations_dir, backend): + """ + GIVEN: A migration script + WHEN: The migration is applied + THEN: Verify that the expected data exists in the table + """ + older_migrations = migrations_up_to(MIGRATION_PATH, auth_migrations_dir) + the_migration = get_migration(MIGRATION_PATH) + apply_migrations(backend, older_migrations) + with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM resource_categories") + assert len(cursor.fetchall()) == 0, "Expected empty table." + apply_single_migration(backend, the_migration) + cursor.execute("SELECT * FROM resource_categories") + results = cursor.fetchall() + assert len(results) == 3, "Expected 3 rows of data." + assert sorted(results) == sorted(( + ('fad071a3-2fc8-40b8-992b-cdefe7dcac79', 'mrna', 'mRNA Dataset'), + ('548d684b-d4d1-46fb-a6d3-51a56b7da1b3', 'phenotype', + 'Phenotype (Publish) Dataset'), + ('48056f84-a2a6-41ac-8319-0e1e212cba2a', 'genotype', + 'Genotype Dataset'))) + + rollback_migrations(backend, older_migrations + [the_migration]) + +@pytest.mark.unit_test +def test_rollback_init_data(auth_testdb_path, auth_migrations_dir, backend): + """ + GIVEN: A migration script + WHEN: The migration is rolled back + THEN: Verify that the table is empty + """ + older_migrations = migrations_up_to(MIGRATION_PATH, auth_migrations_dir) + the_migration = get_migration(MIGRATION_PATH) + apply_migrations(backend, older_migrations) + with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor: + cursor.execute("SELECT * FROM resource_categories") + assert len(cursor.fetchall()) == 0, "Expected empty table." + apply_single_migration(backend, the_migration) + cursor.execute("SELECT * FROM resource_categories") + results = cursor.fetchall() + assert len(results) == 3, "Expected 3 rows of data." + rollback_single_migration(backend, the_migration) + cursor.execute("SELECT * FROM resource_categories") + assert len(cursor.fetchall()) == 0, "Expected empty table." + + rollback_migrations(backend, older_migrations) diff --git a/tests/unit/auth/test_migrations_insert_data_into_empty_table.py b/tests/unit/auth/test_migrations_insert_data_into_empty_table.py new file mode 100644 index 0000000..ebb7fa6 --- /dev/null +++ b/tests/unit/auth/test_migrations_insert_data_into_empty_table.py @@ -0,0 +1,77 @@ +"""Test data insertion when migrations are run.""" +import sqlite3 +from contextlib import closing + +import pytest + +from gn3.migrations import get_migration, apply_migrations, rollback_migrations +from tests.unit.auth.conftest import ( + apply_single_migration, rollback_single_migration, migrations_up_to) + +test_params = ( + ("20221113_01_7M0hv-enumerate-initial-privileges.py", "privileges", 19), + ("20221114_04_tLUzB-initialise-basic-roles.py", "roles", 2), + ("20221114_04_tLUzB-initialise-basic-roles.py", "role_privileges", 15)) + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "migration_file,table,row_count", test_params) +def test_apply_insert(# pylint: disable=[too-many-arguments] + auth_testdb_path, auth_migrations_dir, backend, migration_file, + table, row_count): + """ + GIVEN: A database migration + WHEN: The migration is applied + THEN: Ensure the given number of rows are inserted into the table + """ + migration_path=f"{auth_migrations_dir}/{migration_file}" + older_migrations = migrations_up_to(migration_path, auth_migrations_dir) + the_migration = get_migration(migration_path) + apply_migrations(backend, older_migrations) + with closing(sqlite3.connect(auth_testdb_path)) as conn, closing(conn.cursor()) as cursor: + query = f"SELECT COUNT(*) FROM {table}" + cursor.execute(query) + result_before_migration = cursor.fetchall() + apply_single_migration(backend, the_migration) + cursor.execute(query) + result_after_migration = cursor.fetchall() + + rollback_migrations(backend, older_migrations+[the_migration]) + assert result_before_migration[0][0] == 0, ( + "Expected empty table before initialisation") + assert result_after_migration[0][0] == row_count, ( + f"Expected {row_count} rows") + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "migration_file,table,row_count", test_params) +def test_rollback_insert(# pylint: disable=[too-many-arguments] + auth_testdb_path, auth_migrations_dir, backend, migration_file, + table, row_count): + """ + GIVEN: A database migration + WHEN: The migration is applied + THEN: Ensure the given number of rows are inserted into the table + """ + migration_path=f"{auth_migrations_dir}/{migration_file}" + older_migrations = migrations_up_to(migration_path, auth_migrations_dir) + the_migration = get_migration(migration_path) + apply_migrations(backend, older_migrations) + with closing(sqlite3.connect(auth_testdb_path)) as conn, closing(conn.cursor()) as cursor: + query = f"SELECT COUNT(*) FROM {table}" + cursor.execute(query) + result_before_migration = cursor.fetchall() + apply_single_migration(backend, the_migration) + cursor.execute(query) + result_after_migration = cursor.fetchall() + rollback_single_migration(backend, the_migration) + cursor.execute(query) + result_after_rollback = cursor.fetchall() + + rollback_migrations(backend, older_migrations) + assert result_before_migration[0][0] == 0, ( + "Expected empty table before initialisation") + assert result_after_migration[0][0] == row_count, ( + f"Expected {row_count} rows") + assert result_after_rollback[0][0] == 0, ( + "Expected empty table after rollback") diff --git a/tests/unit/auth/test_privileges.py b/tests/unit/auth/test_privileges.py new file mode 100644 index 0000000..8395293 --- /dev/null +++ b/tests/unit/auth/test_privileges.py @@ -0,0 +1,46 @@ +"""Test the privileges module""" +import pytest + +from gn3.auth import db +from gn3.auth.authorisation.privileges import Privilege, user_privileges + +from tests.unit.auth import conftest + +SORT_KEY = lambda x: x.privilege_id + +PRIVILEGES = sorted( + (Privilege("system:group:create-group", "Create a group"), + Privilege("system:group:view-group", "View the details of a group"), + Privilege("system:group:edit-group", "Edit the details of a group"), + Privilege("system:user:list", "List users in the system"), + Privilege("system:group:delete-group", "Delete a group"), + Privilege("group:user:add-group-member", "Add a user to a group"), + Privilege("group:user:remove-group-member", "Remove a user from a group"), + Privilege("system:group:transfer-group-leader", + "Transfer leadership of the group to some other member"), + + Privilege("group:resource:create-resource", "Create a resource object"), + Privilege("group:resource:view-resource", + "view a resource and use it in computations"), + Privilege("group:resource:edit-resource", "edit/update a resource"), + Privilege("group:resource:delete-resource", "Delete a resource"), + + Privilege("group:role:create-role", "Create a new role"), + Privilege("group:role:edit-role", "edit/update an existing role"), + Privilege("group:user:assign-role", "Assign a role to an existing user"), + Privilege("group:role:delete-role", "Delete an existing role")), + key=SORT_KEY) + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", tuple(zip( + conftest.TEST_USERS, (PRIVILEGES, [], [], [], [])))) +def test_user_privileges(auth_testdb_path, fxtr_users, user, expected):# pylint: disable=[unused-argument] + """ + GIVEN: A user + WHEN: An attempt is made to fetch the user's privileges + THEN: Ensure only + """ + with db.connection(auth_testdb_path) as conn: + assert sorted( + user_privileges(conn, user), key=SORT_KEY) == expected diff --git a/tests/unit/auth/test_resources.py b/tests/unit/auth/test_resources.py new file mode 100644 index 0000000..2884add --- /dev/null +++ b/tests/unit/auth/test_resources.py @@ -0,0 +1,117 @@ +"""Test resource-management functions""" +import uuid + +import pytest + +from gn3.auth import db + +from gn3.auth.authorisation.groups import Group +from gn3.auth.authorisation.errors import AuthorisationError +from gn3.auth.authorisation.resources.models import ( + Resource, user_resources, create_resource, ResourceCategory, + public_resources) + +from tests.unit.auth import conftest + +group = Group(uuid.UUID("9988c21d-f02f-4d45-8966-22c968ac2fbf"), "TheTestGroup", + {}) +resource_category = ResourceCategory( + uuid.UUID("fad071a3-2fc8-40b8-992b-cdefe7dcac79"), "mrna", "mRNA Dataset") +create_resource_failure = { + "status": "error", + "message": "Unauthorised: Could not create resource" +} +uuid_fn = lambda : uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b") + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", + tuple(zip( + conftest.TEST_USERS[0:1], + (Resource( + group, uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), + "test_resource", resource_category, False),)))) +def test_create_resource(mocker, fxtr_users_in_group, user, expected): + """Test that resource creation works as expected.""" + mocker.patch("gn3.auth.authorisation.resources.models.uuid4", uuid_fn) + mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", + conftest.get_tokeniser(user)) + conn, _group, _users = fxtr_users_in_group + resource = create_resource( + conn, "test_resource", resource_category, user, False) + assert resource == expected + + with db.cursor(conn) as cursor: + # Cleanup + cursor.execute( + "DELETE FROM group_user_roles_on_resources WHERE resource_id=?", + (str(resource.resource_id),)) + cursor.execute( + "DELETE FROM group_roles WHERE group_id=?", + (str(resource.group.group_id),)) + cursor.execute( + "DELETE FROM resources WHERE resource_id=?", + (str(resource.resource_id),)) + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", + tuple(zip( + conftest.TEST_USERS[1:], + (create_resource_failure, create_resource_failure, + create_resource_failure)))) +def test_create_resource_raises_for_unauthorised_users( + mocker, fxtr_users_in_group, user, expected): + """Test that resource creation works as expected.""" + mocker.patch("gn3.auth.authorisation.resources.models.uuid4", uuid_fn) + mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", + conftest.get_tokeniser(user)) + conn, _group, _users = fxtr_users_in_group + with pytest.raises(AuthorisationError): + assert create_resource( + conn, "test_resource", resource_category, user, False) == expected + +SORTKEY = lambda resource: resource.resource_id + +@pytest.mark.unit_test +def test_public_resources(fxtr_resources): + """ + GIVEN: some resources in the database + WHEN: public resources are requested + THEN: only list the resources that are public + """ + conn, _res = fxtr_resources + assert sorted(public_resources(conn), key=SORTKEY) == sorted(tuple( + res for res in conftest.TEST_RESOURCES if res.public), key=SORTKEY) + +PUBLIC_RESOURCES = sorted( + {res.resource_id: res for res in conftest.TEST_RESOURCES_PUBLIC}.values(), + key=SORTKEY) + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", + tuple(zip( + conftest.TEST_USERS, + (sorted( + {res.resource_id: res for res in + (conftest.TEST_RESOURCES_GROUP_01 + + conftest.TEST_RESOURCES_PUBLIC)}.values(), + key=SORTKEY), + sorted( + {res.resource_id: res for res in + ((conftest.TEST_RESOURCES_GROUP_01[1],) + + conftest.TEST_RESOURCES_PUBLIC)}.values() + , + key=SORTKEY), + PUBLIC_RESOURCES, PUBLIC_RESOURCES)))) +def test_user_resources(fxtr_group_user_roles, user, expected): + """ + GIVEN: some resources in the database + WHEN: a particular user's resources are requested + THEN: list only the resources for which the user can access + """ + conn, *_others = fxtr_group_user_roles + assert sorted( + {res.resource_id: res for res in user_resources(conn, user) + }.values(), key=SORTKEY) == expected diff --git a/tests/unit/auth/test_roles.py b/tests/unit/auth/test_roles.py new file mode 100644 index 0000000..02fd9f7 --- /dev/null +++ b/tests/unit/auth/test_roles.py @@ -0,0 +1,123 @@ +"""Test functions dealing with group management.""" +import uuid + +import pytest + +from gn3.auth import db +from gn3.auth.authorisation.privileges import Privilege +from gn3.auth.authorisation.errors import AuthorisationError +from gn3.auth.authorisation.roles.models import Role, user_roles, create_role + +from tests.unit.auth import conftest +from tests.unit.auth.fixtures import TEST_USERS + +create_role_failure = { + "status": "error", + "message": "Unauthorised: Could not create role" +} + +uuid_fn = lambda : uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b") + +PRIVILEGES = ( + Privilege("group:resource:view-resource", + "view a resource and use it in computations"), + Privilege("group:resource:edit-resource", "edit/update a resource")) + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", tuple(zip(conftest.TEST_USERS[0:1], ( + Role(uuid.UUID("d32611e3-07fc-4564-b56c-786c6db6de2b"), "a_test_role", + True, PRIVILEGES),)))) +def test_create_role(# pylint: disable=[too-many-arguments] + fxtr_app, auth_testdb_path, mocker, fxtr_users, user, expected):# pylint: disable=[unused-argument] + """ + GIVEN: an authenticated user + WHEN: the user attempts to create a role + THEN: verify they are only able to create the role if they have the + appropriate privileges + """ + mocker.patch("gn3.auth.authorisation.roles.models.uuid4", uuid_fn) + mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", + conftest.get_tokeniser(user)) + with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: + the_role = create_role(cursor, "a_test_role", PRIVILEGES) + assert the_role == expected + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", tuple(zip(conftest.TEST_USERS[1:], ( + create_role_failure, create_role_failure, create_role_failure)))) +def test_create_role_raises_exception_for_unauthorised_users(# pylint: disable=[too-many-arguments] + fxtr_app, auth_testdb_path, mocker, fxtr_users, user, expected):# pylint: disable=[unused-argument] + """ + GIVEN: an authenticated user + WHEN: the user attempts to create a role + THEN: verify they are only able to create the role if they have the + appropriate privileges + """ + mocker.patch("gn3.auth.authorisation.roles.models.uuid4", uuid_fn) + mocker.patch("gn3.auth.authorisation.checks.require_oauth.acquire", + conftest.get_tokeniser(user)) + with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor: + with pytest.raises(AuthorisationError): + create_role(cursor, "a_test_role", PRIVILEGES) + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "user,expected", + (zip(TEST_USERS, + ((Role( + role_id=uuid.UUID('a0e67630-d502-4b9f-b23f-6805d0f30e30'), + role_name='group-leader', user_editable=False, + privileges=( + Privilege(privilege_id='group:resource:create-resource', + privilege_description='Create a resource object'), + Privilege(privilege_id='group:resource:delete-resource', + privilege_description='Delete a resource'), + Privilege(privilege_id='group:resource:edit-resource', + privilege_description='edit/update a resource'), + Privilege( + privilege_id='group:resource:view-resource', + privilege_description=( + 'view a resource and use it in computations')), + Privilege(privilege_id='group:role:create-role', + privilege_description='Create a new role'), + Privilege(privilege_id='group:role:delete-role', + privilege_description='Delete an existing role'), + Privilege(privilege_id='group:role:edit-role', + privilege_description='edit/update an existing role'), + Privilege(privilege_id='group:user:add-group-member', + privilege_description='Add a user to a group'), + Privilege(privilege_id='group:user:assign-role', + privilege_description=( + 'Assign a role to an existing user')), + Privilege(privilege_id='group:user:remove-group-member', + privilege_description='Remove a user from a group'), + Privilege(privilege_id='system:group:delete-group', + privilege_description='Delete a group'), + Privilege(privilege_id='system:group:edit-group', + privilege_description='Edit the details of a group'), + Privilege( + privilege_id='system:group:transfer-group-leader', + privilege_description=( + 'Transfer leadership of the group to some other ' + 'member')), + Privilege(privilege_id='system:group:view-group', + privilege_description='View the details of a group'), + Privilege(privilege_id='system:user:list', + privilege_description='List users in the system'))), + Role( + role_id=uuid.UUID("ade7e6b0-ba9c-4b51-87d0-2af7fe39a347"), + role_name="group-creator", user_editable=False, + privileges=( + Privilege(privilege_id='system:group:create-group', + privilege_description = "Create a group"),))), + tuple(), tuple(), tuple())))) +def test_user_roles(fxtr_group_user_roles, user, expected): + """ + GIVEN: an authenticated user + WHEN: we request the user's privileges + THEN: return **ALL** the privileges attached to the user + """ + conn, *_others = fxtr_group_user_roles + assert user_roles(conn, user) == expected diff --git a/tests/unit/auth/test_token.py b/tests/unit/auth/test_token.py new file mode 100644 index 0000000..76316ea --- /dev/null +++ b/tests/unit/auth/test_token.py @@ -0,0 +1,62 @@ +"""Test the OAuth2 authorisation""" + +import pytest + +from gn3.auth import db + +SUCCESS_RESULT = { + "status_code": 200, + "result": { + "access_token": "123456ABCDE", + "expires_in": 864000, + "scope": "profile", + "token_type": "Bearer"}} + +USERNAME_PASSWORD_FAIL_RESULT = { + "status_code": 400, + "result": { + 'error': 'invalid_request', + 'error_description': 'Invalid "username" or "password" in request.'}} + +def gen_token(client, grant_type, user, scope): # pylint: disable=[unused-argument] + """Generate tokens for tests""" + return "123456ABCDE" + +@pytest.mark.unit_test +@pytest.mark.parametrize( + "test_data,expected", + ((("group@lead.er", "password_for_user_001", 0), SUCCESS_RESULT), + (("group@mem.ber01", "password_for_user_002", 1), SUCCESS_RESULT), + (("group@mem.ber02", "password_for_user_003", 2), SUCCESS_RESULT), + (("unaff@iliated.user", "password_for_user_004", 3), SUCCESS_RESULT), + (("group@lead.er", "brrr", 0), USERNAME_PASSWORD_FAIL_RESULT), + (("group@mem.ber010", "password_for_user_002", 1), USERNAME_PASSWORD_FAIL_RESULT), + (("papa", "yada", 2), USERNAME_PASSWORD_FAIL_RESULT), + # (("unaff@iliated.user", "password_for_user_004", 1), USERNAME_PASSWORD_FAIL_RESULT) + )) +def test_token(fxtr_app, fxtr_oauth2_clients, test_data, expected): + """ + GIVEN: a registered oauth2 client, a user + WHEN: a token is requested via the 'password' grant + THEN: check that: + a) when email and password are valid, we get a token back + b) when either email or password or both are invalid, we get error message + back + c) TODO: when user tries to use wrong client, we get error message back + """ + conn, oa2clients = fxtr_oauth2_clients + email, password, client_idx = test_data + data = { + "grant_type": "password", "scope": "profile nonexistent-scope", + "client_id": oa2clients[client_idx].client_id, + "client_secret": oa2clients[client_idx].client_secret, + "username": email, "password": password} + + with fxtr_app.test_client() as client, db.cursor(conn) as cursor: + res = client.post("/api/oauth2/token", data=data) + # cleanup db + cursor.execute("DELETE FROM oauth2_tokens WHERE access_token=?", + (gen_token(None, None, None, None),)) + assert res.status_code == expected["status_code"] + for key in expected["result"]: + assert res.json[key] == expected["result"][key] diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py new file mode 100644 index 0000000..8005c8e --- /dev/null +++ b/tests/unit/conftest.py @@ -0,0 +1,35 @@ +"""Fixtures for unit tests.""" +from pathlib import Path +from datetime import datetime +from tempfile import TemporaryDirectory + +import pytest + +from gn3.app import create_app + +@pytest.fixture(scope="session") +def fxtr_app(): + """Fixture: setup the test app""" + # Do some setup + with TemporaryDirectory() as testdir: + testdb = Path(testdir).joinpath( + f'testdb_{datetime.now().strftime("%Y%m%dT%H%M%S")}') + app = create_app({ + "TESTING": True, "AUTH_DB": testdb, + "OAUTH2_ACCESS_TOKEN_GENERATOR": "tests.unit.auth.test_token.gen_token" + }) + app.testing = True + yield app + # Clean up after ourselves + testdb.unlink(missing_ok=True) + +@pytest.fixture(scope="session") +def client(fxtr_app): # pylint: disable=redefined-outer-name + """Create a test client fixture for tests""" + with fxtr_app.app_context(): + yield fxtr_app.test_client() + +@pytest.fixture(scope="session") +def fxtr_app_config(client): # pylint: disable=redefined-outer-name + """Return the test application's configuration object""" + return client.application.config -- cgit v1.2.3