""" Auth-flow integration tests for gn-auth. Require live credentials set up by the CI test-session lifecycle (create-test-users / create-test-oauth2-client). Run with: pytest -m "gn_auth and auth_flow" Environment variables: GN_TEST_EMAIL admin test-user email GN_TEST_PASSWORD admin test-user password GN_TEST_BASIC_EMAIL unprivileged test-user email GN_TEST_BASIC_PASSWORD unprivileged test-user password GN_OAUTH2_CLIENT_ID OAuth2 client UUID GN_OAUTH2_CLIENT_SECRET OAuth2 client secret """ import pytest pytestmark = [pytest.mark.gn_auth, pytest.mark.auth_flow] # --------------------------------------------------------------------------- # POST /auth/token — password grant with valid credentials # --------------------------------------------------------------------------- class TestTokenGrant: """Password grant with valid admin credentials issues a usable token.""" def _post_token(self, gn_auth_url, http, oauth2_credentials, scope="profile group resource"): email, password, client_id, client_secret = oauth2_credentials return http.post( f"{gn_auth_url}/auth/token", json={ "grant_type": "password", "username": email, "password": password, "scope": scope, "client_id": client_id, "client_secret": client_secret, }, timeout=30, ) def test_valid_credentials_return_200( self, gn_auth_url, http, oauth2_credentials): resp = self._post_token(gn_auth_url, http, oauth2_credentials) assert resp.status_code == 200, ( f"Expected 200 from token endpoint, got {resp.status_code}: {resp.text}" ) def test_response_contains_access_token( self, gn_auth_url, http, oauth2_credentials): resp = self._post_token(gn_auth_url, http, oauth2_credentials) data = resp.json() assert "access_token" in data, ( f"Token response missing 'access_token': {data}" ) def test_token_type_is_bearer( self, gn_auth_url, http, oauth2_credentials): resp = self._post_token(gn_auth_url, http, oauth2_credentials) token_type = resp.json().get("token_type", "") assert token_type.lower() == "bearer", ( f"Expected token_type 'bearer', got: {token_type!r}" ) def test_granted_scope_covers_requested( self, gn_auth_url, http, oauth2_credentials): requested = {"profile", "group", "resource"} resp = self._post_token(gn_auth_url, http, oauth2_credentials) data = resp.json() assert "scope" in data, f"Token response missing 'scope': {data}" granted = set(data["scope"].split()) assert requested <= granted, ( f"Requested scopes {requested} not all in granted scopes {granted}" ) # --------------------------------------------------------------------------- # POST /auth/token — rejected credentials # --------------------------------------------------------------------------- class TestTokenGrantRejection: """Password grant with bad credentials must return 401.""" def test_wrong_password_returns_400( self, gn_auth_url, http, oauth2_credentials): email, _password, client_id, client_secret = oauth2_credentials resp = http.post( f"{gn_auth_url}/auth/token", json={ "grant_type": "password", "username": email, "password": "definitely-not-the-right-password", "scope": "profile group resource", "client_id": client_id, "client_secret": client_secret, }, timeout=30, ) assert resp.status_code == 400, ( f"Expected 400 for wrong password, got {resp.status_code}: {resp.text}" ) def test_unknown_email_returns_400( self, gn_auth_url, http, oauth2_credentials): _email, password, client_id, client_secret = oauth2_credentials resp = http.post( f"{gn_auth_url}/auth/token", json={ "grant_type": "password", "username": "no-such-user@regression-tests.genenetwork.org", "password": password, "scope": "profile group resource", "client_id": client_id, "client_secret": client_secret, }, timeout=30, ) assert resp.status_code == 400, ( f"Expected 400 for unknown email, got {resp.status_code}: {resp.text}" ) # --------------------------------------------------------------------------- # GET /auth/user/ — protected endpoint # --------------------------------------------------------------------------- class TestUserProfileWithToken: """GET /auth/user/ with a valid Bearer token returns the user's profile.""" def test_returns_200_with_valid_token( self, gn_auth_url, http, access_token): resp = http.get( f"{gn_auth_url}/auth/user/", headers={"Authorization": f"Bearer {access_token}"}, timeout=30, ) assert resp.status_code == 200, ( f"Expected 200 from /auth/user/ with token, " f"got {resp.status_code}: {resp.text}" ) def test_response_contains_user_fields( self, gn_auth_url, http, access_token): resp = http.get( f"{gn_auth_url}/auth/user/", headers={"Authorization": f"Bearer {access_token}"}, timeout=30, ) data = resp.json() for field in ("user_id", "email", "name"): assert field in data, ( f"Missing field '{field}' in user profile response: {data}" ) def test_response_email_matches_credentials( self, gn_auth_url, http, access_token, oauth2_credentials): expected_email, *_ = oauth2_credentials resp = http.get( f"{gn_auth_url}/auth/user/", headers={"Authorization": f"Bearer {access_token}"}, timeout=30, ) assert resp.json().get("email") == expected_email, ( f"Profile email {resp.json().get('email')!r} does not match " f"credentials email {expected_email!r}" ) class TestUserProfileWithoutToken: """GET /auth/user/ without a token must be rejected.""" def test_returns_401_without_token(self, gn_auth_url, http): resp = http.get(f"{gn_auth_url}/auth/user/", timeout=30) assert resp.status_code == 401, ( f"Expected 401 from /auth/user/ without token, " f"got {resp.status_code}: {resp.text}" ) def test_returns_401_with_invalid_token(self, gn_auth_url, http): resp = http.get( f"{gn_auth_url}/auth/user/", headers={"Authorization": "Bearer this-is-not-a-valid-token"}, timeout=30, ) assert resp.status_code == 401, ( f"Expected 401 from /auth/user/ with garbage token, " f"got {resp.status_code}: {resp.text}" ) # --------------------------------------------------------------------------- # POST /auth/user/masquerade/ — role-based privilege check # # The masquerade endpoint requires the "system:user:masquerade" privilege # which only system-administrators hold. Both admin and basic users can # obtain a token with "masquerade" scope (the test client supports it), but # gn-auth's can_masquerade decorator checks the user's roles and raises # ForbiddenAccess (→ 403) for users without the privilege. # --------------------------------------------------------------------------- @pytest.fixture(scope="session") def admin_masquerade_token(gn_auth_url, http, oauth2_credentials): """Admin Bearer token with masquerade scope.""" email, password, client_id, client_secret = oauth2_credentials resp = http.post( f"{gn_auth_url}/auth/token", json={ "grant_type": "password", "username": email, "password": password, "scope": "profile group resource user masquerade", "client_id": client_id, "client_secret": client_secret, }, timeout=30, ) assert resp.status_code == 200, f"Admin masquerade token request failed: {resp.text}" return resp.json()["access_token"] @pytest.fixture(scope="session") def basic_masquerade_token(gn_auth_url, http, basic_oauth2_credentials): """Unprivileged user Bearer token with masquerade scope.""" email, password, client_id, client_secret = basic_oauth2_credentials resp = http.post( f"{gn_auth_url}/auth/token", json={ "grant_type": "password", "username": email, "password": password, "scope": "profile group resource user masquerade", "client_id": client_id, "client_secret": client_secret, }, timeout=30, ) assert resp.status_code == 200, f"Basic masquerade token request failed: {resp.text}" return resp.json()["access_token"] @pytest.fixture(scope="session") def basic_user_id(gn_auth_url, http, basic_access_token): """User ID of the unprivileged test user, fetched via GET /auth/user/.""" resp = http.get( f"{gn_auth_url}/auth/user/", headers={"Authorization": f"Bearer {basic_access_token}"}, timeout=30, ) assert resp.status_code == 200, f"Could not fetch basic user profile: {resp.text}" return resp.json()["user_id"] class TestMasqueradePrivilege: """POST /auth/user/masquerade/ enforces the system:user:masquerade privilege.""" def test_basic_user_cannot_masquerade_returns_403( self, gn_auth_url, http, basic_masquerade_token, basic_user_id, access_token): # Basic user tries to masquerade as the admin; the admin's user_id is # obtained via the profile endpoint. admin_profile = http.get( f"{gn_auth_url}/auth/user/", headers={"Authorization": f"Bearer {access_token}"}, timeout=30, ) assert admin_profile.status_code == 200 admin_id = admin_profile.json()["user_id"] resp = http.post( f"{gn_auth_url}/auth/user/masquerade/", json={"masquerade_as": admin_id}, headers={"Authorization": f"Bearer {basic_masquerade_token}"}, timeout=30, ) assert resp.status_code == 403, ( f"Expected 403 when unprivileged user attempts masquerade, " f"got {resp.status_code}: {resp.text}" ) def test_admin_can_masquerade_as_basic_user( self, gn_auth_url, http, admin_masquerade_token, basic_user_id): resp = http.post( f"{gn_auth_url}/auth/user/masquerade/", json={"masquerade_as": basic_user_id}, headers={"Authorization": f"Bearer {admin_masquerade_token}"}, timeout=30, ) assert resp.status_code == 200, ( f"Expected 200 when admin masquerades as basic user, " f"got {resp.status_code}: {resp.text}" ) data = resp.json() assert "masquerade_as" in data, f"Missing 'masquerade_as' in response: {data}" assert data["masquerade_as"]["user"]["user_id"] == basic_user_id, ( "Masquerade response user_id does not match target user" )