aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/core/providers/auth/clerk.py
blob: 0db665e0e1d53eb05ea821507e2621323e5705a3 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import logging
import os
from datetime import datetime

from core.base import (
    AuthConfig,
    CryptoProvider,
    EmailProvider,
    R2RException,
    TokenData,
)

from ..database import PostgresDatabaseProvider
from .jwt import JwtAuthProvider

logger = logging.getLogger(__name__)


class ClerkAuthProvider(JwtAuthProvider):
    """
    ClerkAuthProvider extends JwtAuthProvider to support token verification with Clerk.
    It uses Clerk's SDK to verify the JWT token and extract user information.
    """

    def __init__(
        self,
        config: AuthConfig,
        crypto_provider: CryptoProvider,
        database_provider: PostgresDatabaseProvider,
        email_provider: EmailProvider,
    ):
        super().__init__(
            config=config,
            crypto_provider=crypto_provider,
            database_provider=database_provider,
            email_provider=email_provider,
        )
        try:
            from clerk_backend_api.jwks_helpers.verifytoken import (
                VerifyTokenOptions,
                verify_token,
            )

            self.verify_token = verify_token
            self.VerifyTokenOptions = VerifyTokenOptions
        except ImportError as e:
            raise R2RException(
                status_code=500,
                message="Clerk SDK is not installed. Run `pip install clerk-backend-api`",
            ) from e

    async def decode_token(self, token: str) -> TokenData:
        """
        Decode and verify the JWT token using Clerk's verify_token function.

        Args:
            token: The JWT token to decode

        Returns:
            TokenData: The decoded token data with user information

        Raises:
            R2RException: If the token is invalid or verification fails
        """
        clerk_secret_key = os.getenv("CLERK_SECRET_KEY")
        if not clerk_secret_key:
            raise R2RException(
                status_code=500,
                message="CLERK_SECRET_KEY environment variable is not set",
            )

        try:
            # Configure verification options
            options = self.VerifyTokenOptions(
                secret_key=clerk_secret_key,
                # Optional: specify audience if needed
                # audience="your-audience",
                # Optional: specify authorized parties if needed
                # authorized_parties=["https://your-domain.com"]
            )

            # Verify the token using Clerk's SDK
            payload = self.verify_token(token, options)

            # Check for the expected claims in the token payload
            if not payload.get("sub") or not payload.get("email"):
                raise R2RException(
                    status_code=401,
                    message="Invalid token: missing required claims",
                )

            # Create user in database if not exists
            try:
                await self.database_provider.users_handler.get_user_by_email(
                    payload.get("email")
                )
                # TODO do we want to update user info here based on what's in the token?
            except Exception:
                # user doesn't exist, create in db
                logger.debug(f"Creating new user: {payload.get('email')}")
                try:
                    # Construct name from first_name and last_name if available
                    first_name = payload.get("first_name", "")
                    last_name = payload.get("last_name", "")
                    name = payload.get("name")

                    # If name not directly provided, try to build it from first and last names
                    if not name and (first_name or last_name):
                        name = f"{first_name} {last_name}".strip()

                    await self.database_provider.users_handler.create_user(
                        email=payload.get("email"),
                        account_type="external",
                        name=name,
                    )
                except Exception as e:
                    logger.error(f"Error creating user: {e}")
                    raise R2RException(
                        status_code=500, message="Failed to create user"
                    ) from e

            # Return the token data
            return TokenData(
                email=payload.get("email"),
                token_type="bearer",
                exp=datetime.fromtimestamp(payload.get("exp")),
            )

        except Exception as e:
            logger.info(f"Clerk token verification failed: {e}")
            raise R2RException(
                status_code=401, message="Invalid token", detail=str(e)
            ) from e