about summary refs log tree commit diff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unit/auth/fixtures/__init__.py1
-rw-r--r--tests/unit/auth/fixtures/oauth2_client_fixtures.py44
-rw-r--r--tests/unit/auth/fixtures/user_fixtures.py23
-rw-r--r--tests/unit/auth/test_migrations_add_data_to_table.py4
-rw-r--r--tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py4
-rw-r--r--tests/unit/auth/test_token.py57
6 files changed, 129 insertions, 4 deletions
diff --git a/tests/unit/auth/fixtures/__init__.py b/tests/unit/auth/fixtures/__init__.py
index 7adae3f..a675fc7 100644
--- a/tests/unit/auth/fixtures/__init__.py
+++ b/tests/unit/auth/fixtures/__init__.py
@@ -5,3 +5,4 @@ from .group_fixtures import *
 from .resource_fixtures import *
 # from .privilege_fixtures import *
 from .migration_fixtures import *
+from .oauth2_client_fixtures import *
diff --git a/tests/unit/auth/fixtures/oauth2_client_fixtures.py b/tests/unit/auth/fixtures/oauth2_client_fixtures.py
new file mode 100644
index 0000000..751eadd
--- /dev/null
+++ b/tests/unit/auth/fixtures/oauth2_client_fixtures.py
@@ -0,0 +1,44 @@
+"""Fixtures for OAuth2 clients"""
+import uuid
+import json
+import datetime
+
+import pytest
+
+from gn3.auth import db
+from gn3.auth.authentication.oauth2.models.oauth2client import OAuth2Client
+
+@pytest.fixture
+def fixture_oauth2_clients(fixture_users_with_passwords):
+    """Fixture: Create the OAuth2 clients for use with tests."""
+    conn, users = fixture_users_with_passwords
+    now = datetime.datetime.now()
+
+    clients = tuple(
+        OAuth2Client(str(uuid.uuid4()), f"yabadabadoo_{idx:03}", now,
+         now + datetime.timedelta(hours = 2),
+         {
+             "client_name": f"test_client_{idx:03}",
+             "scope": ["user", "profile"],
+             "redirect_uri": "/test_oauth2",
+             "token_endpoint_auth_method": [
+                 "client_secret_post", "client_secret_basic"],
+             "grant_types": ["password"]
+         }, user)
+        for idx, user  in enumerate(users, start=1))
+
+    with db.cursor(conn) as cursor:
+        cursor.executemany(
+            "INSERT INTO oauth2_clients VALUES (?, ?, ?, ?, ?, ?)",
+            ((str(client.client_id), client.client_secret,
+              int(client.client_id_issued_at.timestamp()),
+              int(client.client_secret_expires_at.timestamp()),
+              json.dumps(client.client_metadata), str(client.user.user_id))
+            for client in clients))
+
+    yield conn, clients
+
+    with db.cursor(conn) as cursor:
+        cursor.executemany(
+            "DELETE FROM oauth2_clients WHERE client_id=?",
+            ((str(client.client_id),) for client in clients))
diff --git a/tests/unit/auth/fixtures/user_fixtures.py b/tests/unit/auth/fixtures/user_fixtures.py
index cc43a74..843d575 100644
--- a/tests/unit/auth/fixtures/user_fixtures.py
+++ b/tests/unit/auth/fixtures/user_fixtures.py
@@ -2,6 +2,7 @@
 import uuid
 
 import pytest
+import bcrypt
 
 from gn3.auth import db
 from gn3.auth.authentication.users import User
@@ -41,3 +42,25 @@ def test_users(conn_after_auth_migrations):# pylint: disable=[redefined-outer-na
              ("21351b66-8aad-475b-84ac-53ce528451e3",),
              ("ae9c6245-0966-41a5-9a5e-20885a96bea7",),
              ("9a0c7ce5-2f40-4e78-979e-bf3527a59579",)))
+
+@pytest.fixture(scope="function")
+def fixture_users_with_passwords(test_users): # pylint: disable=[redefined-outer-name]
+    """Fixture: add passwords to the users"""
+    conn, users = test_users
+    user_passwords_params = tuple(
+        (str(user.user_id), bcrypt.hashpw(
+            f"password_for_user_{idx:03}".encode("utf8"),
+            bcrypt.gensalt()))
+        for idx, user in enumerate(users, start=1))
+
+    with db.cursor(conn) as cursor:
+        cursor.executemany(
+            "INSERT INTO user_credentials VALUES (?, ?)",
+            user_passwords_params)
+
+    yield conn, users
+
+    with db.cursor(conn) as cursor:
+        cursor.executemany(
+            "DELETE FROM user_credentials WHERE user_id=?",
+            ((item[0],) for item in user_passwords_params))
diff --git a/tests/unit/auth/test_migrations_add_data_to_table.py b/tests/unit/auth/test_migrations_add_data_to_table.py
index acd1f6f..9cb5d0c 100644
--- a/tests/unit/auth/test_migrations_add_data_to_table.py
+++ b/tests/unit/auth/test_migrations_add_data_to_table.py
@@ -38,7 +38,7 @@ def test_apply_insert(# pylint: disable=[too-many-arguments]
     older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
     the_migration = get_migration(migration_path)
     apply_migrations(backend, older_migrations)
-    with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+    with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor:
         cursor.execute(query, query_params)
         result_before_migration = cursor.fetchall()
         apply_single_migration(backend, the_migration)
@@ -63,7 +63,7 @@ def test_rollback_insert(# pylint: disable=[too-many-arguments]
     older_migrations = migrations_up_to(migration_path, auth_migrations_dir)
     the_migration = get_migration(migration_path)
     apply_migrations(backend, older_migrations)
-    with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+    with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor:
         cursor.execute(query, query_params)
         result_before_migration = cursor.fetchall()
         apply_single_migration(backend, the_migration)
diff --git a/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py b/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py
index 0e78823..dd3d4c6 100644
--- a/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py
+++ b/tests/unit/auth/test_migrations_init_data_in_resource_categories_table.py
@@ -20,7 +20,7 @@ def test_apply_init_data(auth_testdb_path, auth_migrations_dir, backend):
     older_migrations = migrations_up_to(MIGRATION_PATH, auth_migrations_dir)
     the_migration = get_migration(MIGRATION_PATH)
     apply_migrations(backend, older_migrations)
-    with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+    with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor:
         cursor.execute("SELECT * FROM resource_categories")
         assert len(cursor.fetchall()) == 0, "Expected empty table."
         apply_single_migration(backend, the_migration)
@@ -46,7 +46,7 @@ def test_rollback_init_data(auth_testdb_path, auth_migrations_dir, backend):
     older_migrations = migrations_up_to(MIGRATION_PATH, auth_migrations_dir)
     the_migration = get_migration(MIGRATION_PATH)
     apply_migrations(backend, older_migrations)
-    with db.connection(auth_testdb_path) as conn, db.cursor(conn) as cursor:
+    with db.connection(auth_testdb_path, None) as conn, db.cursor(conn) as cursor:
         cursor.execute("SELECT * FROM resource_categories")
         assert len(cursor.fetchall()) == 0, "Expected empty table."
         apply_single_migration(backend, the_migration)
diff --git a/tests/unit/auth/test_token.py b/tests/unit/auth/test_token.py
new file mode 100644
index 0000000..edf4b19
--- /dev/null
+++ b/tests/unit/auth/test_token.py
@@ -0,0 +1,57 @@
+"""Test the OAuth2 authorisation"""
+
+import pytest
+
+SUCCESS_RESULT = {
+    "status_code": 200,
+    "result": {
+        "access_token": "123456ABCDE",
+        "expires_in": 864000,
+        "scope": "profile",
+        "token_type": "Bearer"}}
+
+USERNAME_PASSWORD_FAIL_RESULT = {
+    "status_code": 400,
+    "result": {
+        'error': 'invalid_request',
+        'error_description': 'Invalid "username" or "password" in request.'}}
+
+def gen_token(client, grant_type, user, scope): # pylint: disable=[unused-argument]
+    """Generate tokens for tests"""
+    return "123456ABCDE"
+
+@pytest.mark.unit_test
+@pytest.mark.parametrize(
+    "test_data,expected",
+    ((("group@lead.er", "password_for_user_001", 0), SUCCESS_RESULT),
+     (("group@mem.ber01", "password_for_user_002", 1), SUCCESS_RESULT),
+     (("group@mem.ber02", "password_for_user_003", 2), SUCCESS_RESULT),
+     (("unaff@iliated.user", "password_for_user_004", 3), SUCCESS_RESULT),
+     (("group@lead.er", "brrr", 0), USERNAME_PASSWORD_FAIL_RESULT),
+     (("group@mem.ber010", "password_for_user_002", 1), USERNAME_PASSWORD_FAIL_RESULT),
+     (("papa", "yada", 2), USERNAME_PASSWORD_FAIL_RESULT),
+     # (("unaff@iliated.user", "password_for_user_004", 1), USERNAME_PASSWORD_FAIL_RESULT)
+     ))
+def test_token(test_app, fixture_oauth2_clients, test_data, expected):
+    """
+    GIVEN: a registered oauth2 client, a user
+    WHEN: a token is requested via the 'password' grant
+    THEN: check that:
+      a) when email and password are valid, we get a token back
+      b) when either email or password or both are invalid, we get error message
+         back
+      c) TODO: when user tries to use wrong client, we get error message back
+    """
+    _conn, oa2clients = fixture_oauth2_clients
+    email, password, client_idx = test_data
+    data = {
+        "grant_type": "password", "scope": "profile nonexistent-scope",
+        "client_id": oa2clients[client_idx].client_id,
+        "client_secret": oa2clients[client_idx].client_secret,
+        "username": email, "password": password}
+
+    with test_app.test_client() as client:
+        res = client.post("/api/oauth2/token", data=data)
+    assert res.status_code == expected["status_code"]
+    for key in expected["result"]:
+        assert res.json[key] == expected["result"][key]