about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py2621
1 files changed, 2621 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py b/.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py
new file mode 100644
index 00000000..9141d9d1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py
@@ -0,0 +1,2621 @@
+"""
+KEY MANAGEMENT
+
+All /key management endpoints 
+
+/key/generate
+/key/info
+/key/update
+/key/delete
+"""
+
+import asyncio
+import copy
+import json
+import secrets
+import traceback
+import uuid
+from datetime import datetime, timedelta, timezone
+from typing import List, Literal, Optional, Tuple, cast
+
+import fastapi
+from fastapi import APIRouter, Depends, Header, HTTPException, Query, Request, status
+
+import litellm
+from litellm._logging import verbose_proxy_logger
+from litellm.caching import DualCache
+from litellm.constants import UI_SESSION_TOKEN_TEAM_ID
+from litellm.litellm_core_utils.duration_parser import duration_in_seconds
+from litellm.proxy._types import *
+from litellm.proxy.auth.auth_checks import (
+    _cache_key_object,
+    _delete_cache_key_object,
+    get_key_object,
+    get_team_object,
+)
+from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
+from litellm.proxy.hooks.key_management_event_hooks import KeyManagementEventHooks
+from litellm.proxy.management_endpoints.common_utils import (
+    _is_user_team_admin,
+    _set_object_metadata_field,
+)
+from litellm.proxy.management_endpoints.model_management_endpoints import (
+    _add_model_to_db,
+)
+from litellm.proxy.management_helpers.utils import management_endpoint_wrapper
+from litellm.proxy.spend_tracking.spend_tracking_utils import _is_master_key
+from litellm.proxy.utils import (
+    PrismaClient,
+    _hash_token_if_needed,
+    handle_exception_on_proxy,
+    jsonify_object,
+)
+from litellm.router import Router
+from litellm.secret_managers.main import get_secret
+from litellm.types.router import Deployment
+from litellm.types.utils import (
+    BudgetConfig,
+    PersonalUIKeyGenerationConfig,
+    TeamUIKeyGenerationConfig,
+)
+
+
+def _is_team_key(data: Union[GenerateKeyRequest, LiteLLM_VerificationToken]):
+    return data.team_id is not None
+
+
+def _get_user_in_team(
+    team_table: LiteLLM_TeamTableCachedObj, user_id: Optional[str]
+) -> Optional[Member]:
+    if user_id is None:
+        return None
+    for member in team_table.members_with_roles:
+        if member.user_id is not None and member.user_id == user_id:
+            return member
+
+    return None
+
+
+def _is_allowed_to_make_key_request(
+    user_api_key_dict: UserAPIKeyAuth, user_id: Optional[str], team_id: Optional[str]
+) -> bool:
+    """
+    Assert user only creates keys for themselves
+
+    Relevant issue: https://github.com/BerriAI/litellm/issues/7336
+    """
+    ## BASE CASE - PROXY ADMIN
+    if (
+        user_api_key_dict.user_role is not None
+        and user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value
+    ):
+        return True
+
+    if user_id is not None:
+        assert (
+            user_id == user_api_key_dict.user_id
+        ), "User can only create keys for themselves. Got user_id={}, Your ID={}".format(
+            user_id, user_api_key_dict.user_id
+        )
+
+    if team_id is not None:
+        if (
+            user_api_key_dict.team_id is not None
+            and user_api_key_dict.team_id == UI_TEAM_ID
+        ):
+            return True  # handle https://github.com/BerriAI/litellm/issues/7482
+        assert (
+            user_api_key_dict.team_id == team_id
+        ), "User can only create keys for their own team. Got={}, Your Team ID={}".format(
+            team_id, user_api_key_dict.team_id
+        )
+
+    return True
+
+
+def _team_key_generation_team_member_check(
+    assigned_user_id: Optional[str],
+    team_table: LiteLLM_TeamTableCachedObj,
+    user_api_key_dict: UserAPIKeyAuth,
+    team_key_generation: TeamUIKeyGenerationConfig,
+):
+    if assigned_user_id is not None:
+        key_assigned_user_in_team = _get_user_in_team(
+            team_table=team_table, user_id=assigned_user_id
+        )
+
+        if key_assigned_user_in_team is None:
+            raise HTTPException(
+                status_code=400,
+                detail=f"User={assigned_user_id} not assigned to team={team_table.team_id}",
+            )
+
+    key_creating_user_in_team = _get_user_in_team(
+        team_table=team_table, user_id=user_api_key_dict.user_id
+    )
+
+    is_admin = (
+        user_api_key_dict.user_role is not None
+        and user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value
+    )
+
+    if is_admin:
+        return True
+    elif key_creating_user_in_team is None:
+        raise HTTPException(
+            status_code=400,
+            detail=f"User={user_api_key_dict.user_id} not assigned to team={team_table.team_id}",
+        )
+    elif (
+        "allowed_team_member_roles" in team_key_generation
+        and key_creating_user_in_team.role
+        not in team_key_generation["allowed_team_member_roles"]
+    ):
+        raise HTTPException(
+            status_code=400,
+            detail=f"Team member role {key_creating_user_in_team.role} not in allowed_team_member_roles={team_key_generation['allowed_team_member_roles']}",
+        )
+    return True
+
+
+def _key_generation_required_param_check(
+    data: GenerateKeyRequest, required_params: Optional[List[str]]
+):
+    if required_params is None:
+        return True
+
+    data_dict = data.model_dump(exclude_unset=True)
+    for param in required_params:
+        if param not in data_dict:
+            raise HTTPException(
+                status_code=400,
+                detail=f"Required param {param} not in data",
+            )
+    return True
+
+
+def _team_key_generation_check(
+    team_table: LiteLLM_TeamTableCachedObj,
+    user_api_key_dict: UserAPIKeyAuth,
+    data: GenerateKeyRequest,
+):
+    if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value:
+        return True
+    if (
+        litellm.key_generation_settings is not None
+        and "team_key_generation" in litellm.key_generation_settings
+    ):
+        _team_key_generation = litellm.key_generation_settings["team_key_generation"]
+    else:
+        _team_key_generation = TeamUIKeyGenerationConfig(
+            allowed_team_member_roles=["admin", "user"],
+        )
+
+    _team_key_generation_team_member_check(
+        assigned_user_id=data.user_id,
+        team_table=team_table,
+        user_api_key_dict=user_api_key_dict,
+        team_key_generation=_team_key_generation,
+    )
+    _key_generation_required_param_check(
+        data,
+        _team_key_generation.get("required_params"),
+    )
+
+    return True
+
+
+def _personal_key_membership_check(
+    user_api_key_dict: UserAPIKeyAuth,
+    personal_key_generation: Optional[PersonalUIKeyGenerationConfig],
+):
+    if (
+        personal_key_generation is None
+        or "allowed_user_roles" not in personal_key_generation
+    ):
+        return True
+
+    if user_api_key_dict.user_role not in personal_key_generation["allowed_user_roles"]:
+        raise HTTPException(
+            status_code=400,
+            detail=f"Personal key creation has been restricted by admin. Allowed roles={litellm.key_generation_settings['personal_key_generation']['allowed_user_roles']}. Your role={user_api_key_dict.user_role}",  # type: ignore
+        )
+
+    return True
+
+
+def _personal_key_generation_check(
+    user_api_key_dict: UserAPIKeyAuth, data: GenerateKeyRequest
+):
+
+    if (
+        litellm.key_generation_settings is None
+        or litellm.key_generation_settings.get("personal_key_generation") is None
+    ):
+        return True
+
+    _personal_key_generation = litellm.key_generation_settings["personal_key_generation"]  # type: ignore
+
+    _personal_key_membership_check(
+        user_api_key_dict,
+        personal_key_generation=_personal_key_generation,
+    )
+
+    _key_generation_required_param_check(
+        data,
+        _personal_key_generation.get("required_params"),
+    )
+
+    return True
+
+
+def key_generation_check(
+    team_table: Optional[LiteLLM_TeamTableCachedObj],
+    user_api_key_dict: UserAPIKeyAuth,
+    data: GenerateKeyRequest,
+) -> bool:
+    """
+    Check if admin has restricted key creation to certain roles for teams or individuals
+    """
+
+    ## check if key is for team or individual
+    is_team_key = _is_team_key(data=data)
+    if is_team_key:
+        if team_table is None and litellm.key_generation_settings is not None:
+            raise HTTPException(
+                status_code=400,
+                detail=f"Unable to find team object in database. Team ID: {data.team_id}",
+            )
+        elif team_table is None:
+            return True  # assume user is assigning team_id without using the team table
+        return _team_key_generation_check(
+            team_table=team_table,
+            user_api_key_dict=user_api_key_dict,
+            data=data,
+        )
+    else:
+        return _personal_key_generation_check(
+            user_api_key_dict=user_api_key_dict, data=data
+        )
+
+
+def common_key_access_checks(
+    user_api_key_dict: UserAPIKeyAuth,
+    data: Union[GenerateKeyRequest, UpdateKeyRequest],
+    llm_router: Optional[Router],
+    premium_user: bool,
+) -> Literal[True]:
+    """
+    Check if user is allowed to make a key request, for this key
+    """
+    try:
+        _is_allowed_to_make_key_request(
+            user_api_key_dict=user_api_key_dict,
+            user_id=data.user_id,
+            team_id=data.team_id,
+        )
+    except AssertionError as e:
+        raise HTTPException(
+            status_code=403,
+            detail=str(e),
+        )
+    except Exception as e:
+        raise HTTPException(
+            status_code=500,
+            detail=str(e),
+        )
+
+    _check_model_access_group(
+        models=data.models,
+        llm_router=llm_router,
+        premium_user=premium_user,
+    )
+    return True
+
+
+router = APIRouter()
+
+
+@router.post(
+    "/key/generate",
+    tags=["key management"],
+    dependencies=[Depends(user_api_key_auth)],
+    response_model=GenerateKeyResponse,
+)
+@management_endpoint_wrapper
+async def generate_key_fn(  # noqa: PLR0915
+    data: GenerateKeyRequest,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+    litellm_changed_by: Optional[str] = Header(
+        None,
+        description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
+    ),
+):
+    """
+    Generate an API key based on the provided data.
+
+    Docs: https://docs.litellm.ai/docs/proxy/virtual_keys
+
+    Parameters:
+    - duration: Optional[str] - Specify the length of time the token is valid for. You can set duration as seconds ("30s"), minutes ("30m"), hours ("30h"), days ("30d").
+    - key_alias: Optional[str] - User defined key alias
+    - key: Optional[str] - User defined key value. If not set, a 16-digit unique sk-key is created for you.
+    - team_id: Optional[str] - The team id of the key
+    - user_id: Optional[str] - The user id of the key
+    - budget_id: Optional[str] - The budget id associated with the key. Created by calling `/budget/new`.
+    - models: Optional[list] - Model_name's a user is allowed to call. (if empty, key is allowed to call all models)
+    - aliases: Optional[dict] - Any alias mappings, on top of anything in the config.yaml model list. - https://docs.litellm.ai/docs/proxy/virtual_keys#managing-auth---upgradedowngrade-models
+    - config: Optional[dict] - any key-specific configs, overrides config in config.yaml
+    - spend: Optional[int] - Amount spent by key. Default is 0. Will be updated by proxy whenever key is used. https://docs.litellm.ai/docs/proxy/virtual_keys#managing-auth---tracking-spend
+    - send_invite_email: Optional[bool] - Whether to send an invite email to the user_id, with the generate key
+    - max_budget: Optional[float] - Specify max budget for a given key.
+    - budget_duration: Optional[str] - Budget is reset at the end of specified duration. If not set, budget is never reset. You can set duration as seconds ("30s"), minutes ("30m"), hours ("30h"), days ("30d").
+    - max_parallel_requests: Optional[int] - Rate limit a user based on the number of parallel requests. Raises 429 error, if user's parallel requests > x.
+    - metadata: Optional[dict] - Metadata for key, store information for key. Example metadata = {"team": "core-infra", "app": "app2", "email": "ishaan@berri.ai" }
+    - guardrails: Optional[List[str]] - List of active guardrails for the key
+    - permissions: Optional[dict] - key-specific permissions. Currently just used for turning off pii masking (if connected). Example - {"pii": false}
+    - model_max_budget: Optional[Dict[str, BudgetConfig]] - Model-specific budgets {"gpt-4": {"budget_limit": 0.0005, "time_period": "30d"}}}. IF null or {} then no model specific budget.
+    - model_rpm_limit: Optional[dict] - key-specific model rpm limit. Example - {"text-davinci-002": 1000, "gpt-3.5-turbo": 1000}. IF null or {} then no model specific rpm limit.
+    - model_tpm_limit: Optional[dict] - key-specific model tpm limit. Example - {"text-davinci-002": 1000, "gpt-3.5-turbo": 1000}. IF null or {} then no model specific tpm limit.
+    - allowed_cache_controls: Optional[list] - List of allowed cache control values. Example - ["no-cache", "no-store"]. See all values - https://docs.litellm.ai/docs/proxy/caching#turn-on--off-caching-per-request
+    - blocked: Optional[bool] - Whether the key is blocked.
+    - rpm_limit: Optional[int] - Specify rpm limit for a given key (Requests per minute)
+    - tpm_limit: Optional[int] - Specify tpm limit for a given key (Tokens per minute)
+    - soft_budget: Optional[float] - Specify soft budget for a given key. Will trigger a slack alert when this soft budget is reached.
+    - tags: Optional[List[str]] - Tags for [tracking spend](https://litellm.vercel.app/docs/proxy/enterprise#tracking-spend-for-custom-tags) and/or doing [tag-based routing](https://litellm.vercel.app/docs/proxy/tag_routing).
+    - enforced_params: Optional[List[str]] - List of enforced params for the key (Enterprise only). [Docs](https://docs.litellm.ai/docs/proxy/enterprise#enforce-required-params-for-llm-requests)
+
+    Examples:
+
+    1. Allow users to turn on/off pii masking
+
+    ```bash
+    curl --location 'http://0.0.0.0:4000/key/generate' \
+        --header 'Authorization: Bearer sk-1234' \
+        --header 'Content-Type: application/json' \
+        --data '{
+            "permissions": {"allow_pii_controls": true}
+    }'
+    ```
+
+    Returns:
+    - key: (str) The generated api key
+    - expires: (datetime) Datetime object for when key expires.
+    - user_id: (str) Unique user id - used for tracking spend across multiple keys for same user id.
+    """
+    try:
+        from litellm.proxy.proxy_server import (
+            litellm_proxy_admin_name,
+            llm_router,
+            premium_user,
+            prisma_client,
+            user_api_key_cache,
+            user_custom_key_generate,
+        )
+
+        verbose_proxy_logger.debug("entered /key/generate")
+
+        if user_custom_key_generate is not None:
+            if asyncio.iscoroutinefunction(user_custom_key_generate):
+                result = await user_custom_key_generate(data)  # type: ignore
+            else:
+                raise ValueError("user_custom_key_generate must be a coroutine")
+            decision = result.get("decision", True)
+            message = result.get("message", "Authentication Failed - Custom Auth Rule")
+            if not decision:
+                raise HTTPException(
+                    status_code=status.HTTP_403_FORBIDDEN, detail=message
+                )
+        team_table: Optional[LiteLLM_TeamTableCachedObj] = None
+        if data.team_id is not None:
+            try:
+                team_table = await get_team_object(
+                    team_id=data.team_id,
+                    prisma_client=prisma_client,
+                    user_api_key_cache=user_api_key_cache,
+                    parent_otel_span=user_api_key_dict.parent_otel_span,
+                    check_db_only=True,
+                )
+            except Exception as e:
+                verbose_proxy_logger.debug(
+                    f"Error getting team object in `/key/generate`: {e}"
+                )
+                team_table = None
+
+        key_generation_check(
+            team_table=team_table,
+            user_api_key_dict=user_api_key_dict,
+            data=data,
+        )
+
+        common_key_access_checks(
+            user_api_key_dict=user_api_key_dict,
+            data=data,
+            llm_router=llm_router,
+            premium_user=premium_user,
+        )
+
+        # check if user set default key/generate params on config.yaml
+        if litellm.default_key_generate_params is not None:
+            for elem in data:
+                key, value = elem
+                if value is None and key in [
+                    "max_budget",
+                    "user_id",
+                    "team_id",
+                    "max_parallel_requests",
+                    "tpm_limit",
+                    "rpm_limit",
+                    "budget_duration",
+                ]:
+                    setattr(
+                        data, key, litellm.default_key_generate_params.get(key, None)
+                    )
+                elif key == "models" and value == []:
+                    setattr(data, key, litellm.default_key_generate_params.get(key, []))
+                elif key == "metadata" and value == {}:
+                    setattr(data, key, litellm.default_key_generate_params.get(key, {}))
+
+        # check if user set default key/generate params on config.yaml
+        if litellm.upperbound_key_generate_params is not None:
+            for elem in data:
+                key, value = elem
+                upperbound_value = getattr(
+                    litellm.upperbound_key_generate_params, key, None
+                )
+                if upperbound_value is not None:
+                    if value is None:
+                        # Use the upperbound value if user didn't provide a value
+                        setattr(data, key, upperbound_value)
+                    else:
+                        # Compare with upperbound for numeric fields
+                        if key in [
+                            "max_budget",
+                            "max_parallel_requests",
+                            "tpm_limit",
+                            "rpm_limit",
+                        ]:
+                            if value > upperbound_value:
+                                raise HTTPException(
+                                    status_code=400,
+                                    detail={
+                                        "error": f"{key} is over max limit set in config - user_value={value}; max_value={upperbound_value}"
+                                    },
+                                )
+                        # Compare durations
+                        elif key in ["budget_duration", "duration"]:
+                            upperbound_duration = duration_in_seconds(
+                                duration=upperbound_value
+                            )
+                            user_duration = duration_in_seconds(duration=value)
+                            if user_duration > upperbound_duration:
+                                raise HTTPException(
+                                    status_code=400,
+                                    detail={
+                                        "error": f"{key} is over max limit set in config - user_value={value}; max_value={upperbound_value}"
+                                    },
+                                )
+
+        # TODO: @ishaan-jaff: Migrate all budget tracking to use LiteLLM_BudgetTable
+        _budget_id = data.budget_id
+        if prisma_client is not None and data.soft_budget is not None:
+            # create the Budget Row for the LiteLLM Verification Token
+            budget_row = LiteLLM_BudgetTable(
+                soft_budget=data.soft_budget,
+                model_max_budget=data.model_max_budget or {},
+            )
+            new_budget = prisma_client.jsonify_object(
+                budget_row.json(exclude_none=True)
+            )
+
+            _budget = await prisma_client.db.litellm_budgettable.create(
+                data={
+                    **new_budget,  # type: ignore
+                    "created_by": user_api_key_dict.user_id or litellm_proxy_admin_name,
+                    "updated_by": user_api_key_dict.user_id or litellm_proxy_admin_name,
+                }
+            )
+            _budget_id = getattr(_budget, "budget_id", None)
+
+        # ADD METADATA FIELDS
+        # Set Management Endpoint Metadata Fields
+        for field in LiteLLM_ManagementEndpoint_MetadataFields_Premium:
+            if getattr(data, field) is not None:
+                _set_object_metadata_field(
+                    object_data=data,
+                    field_name=field,
+                    value=getattr(data, field),
+                )
+
+        data_json = data.model_dump(exclude_unset=True, exclude_none=True)  # type: ignore
+
+        # if we get max_budget passed to /key/generate, then use it as key_max_budget. Since generate_key_helper_fn is used to make new users
+        if "max_budget" in data_json:
+            data_json["key_max_budget"] = data_json.pop("max_budget", None)
+        if _budget_id is not None:
+            data_json["budget_id"] = _budget_id
+
+        if "budget_duration" in data_json:
+            data_json["key_budget_duration"] = data_json.pop("budget_duration", None)
+
+        if user_api_key_dict.user_id is not None:
+            data_json["created_by"] = user_api_key_dict.user_id
+            data_json["updated_by"] = user_api_key_dict.user_id
+
+        # Set tags on the new key
+        if "tags" in data_json:
+            from litellm.proxy.proxy_server import premium_user
+
+            if premium_user is not True and data_json["tags"] is not None:
+                raise ValueError(
+                    f"Only premium users can add tags to keys. {CommonProxyErrors.not_premium_user.value}"
+                )
+
+            _metadata = data_json.get("metadata")
+            if not _metadata:
+                data_json["metadata"] = {"tags": data_json["tags"]}
+            else:
+                data_json["metadata"]["tags"] = data_json["tags"]
+
+            data_json.pop("tags")
+
+        await _enforce_unique_key_alias(
+            key_alias=data_json.get("key_alias", None),
+            prisma_client=prisma_client,
+        )
+
+        response = await generate_key_helper_fn(
+            request_type="key", **data_json, table_name="key"
+        )
+
+        response["soft_budget"] = (
+            data.soft_budget
+        )  # include the user-input soft budget in the response
+
+        response = GenerateKeyResponse(**response)
+
+        asyncio.create_task(
+            KeyManagementEventHooks.async_key_generated_hook(
+                data=data,
+                response=response,
+                user_api_key_dict=user_api_key_dict,
+                litellm_changed_by=litellm_changed_by,
+            )
+        )
+
+        return response
+    except Exception as e:
+        verbose_proxy_logger.exception(
+            "litellm.proxy.proxy_server.generate_key_fn(): Exception occured - {}".format(
+                str(e)
+            )
+        )
+        raise handle_exception_on_proxy(e)
+
+
+def prepare_metadata_fields(
+    data: BaseModel, non_default_values: dict, existing_metadata: dict
+) -> dict:
+    """
+    Check LiteLLM_ManagementEndpoint_MetadataFields (proxy/_types.py) for fields that are allowed to be updated
+    """
+    if "metadata" not in non_default_values:  # allow user to set metadata to none
+        non_default_values["metadata"] = existing_metadata.copy()
+
+    casted_metadata = cast(dict, non_default_values["metadata"])
+
+    data_json = data.model_dump(exclude_unset=True, exclude_none=True)
+
+    try:
+        for k, v in data_json.items():
+            if k in LiteLLM_ManagementEndpoint_MetadataFields:
+                if isinstance(v, datetime):
+                    casted_metadata[k] = v.isoformat()
+                else:
+                    casted_metadata[k] = v
+
+    except Exception as e:
+        verbose_proxy_logger.exception(
+            "litellm.proxy.proxy_server.prepare_metadata_fields(): Exception occured - {}".format(
+                str(e)
+            )
+        )
+
+    non_default_values["metadata"] = casted_metadata
+    return non_default_values
+
+
+def prepare_key_update_data(
+    data: Union[UpdateKeyRequest, RegenerateKeyRequest], existing_key_row
+):
+    data_json: dict = data.model_dump(exclude_unset=True)
+    data_json.pop("key", None)
+    non_default_values = {}
+    for k, v in data_json.items():
+        if k in LiteLLM_ManagementEndpoint_MetadataFields:
+            continue
+        non_default_values[k] = v
+
+    if "duration" in non_default_values:
+        duration = non_default_values.pop("duration")
+        if duration and (isinstance(duration, str)) and len(duration) > 0:
+            duration_s = duration_in_seconds(duration=duration)
+            expires = datetime.now(timezone.utc) + timedelta(seconds=duration_s)
+            non_default_values["expires"] = expires
+
+    if "budget_duration" in non_default_values:
+        budget_duration = non_default_values.pop("budget_duration")
+        if (
+            budget_duration
+            and (isinstance(budget_duration, str))
+            and len(budget_duration) > 0
+        ):
+            duration_s = duration_in_seconds(duration=budget_duration)
+            key_reset_at = datetime.now(timezone.utc) + timedelta(seconds=duration_s)
+            non_default_values["budget_reset_at"] = key_reset_at
+            non_default_values["budget_duration"] = budget_duration
+
+    _metadata = existing_key_row.metadata or {}
+
+    # validate model_max_budget
+    if "model_max_budget" in non_default_values:
+        validate_model_max_budget(non_default_values["model_max_budget"])
+
+    non_default_values = prepare_metadata_fields(
+        data=data, non_default_values=non_default_values, existing_metadata=_metadata
+    )
+
+    return non_default_values
+
+
+@router.post(
+    "/key/update", tags=["key management"], dependencies=[Depends(user_api_key_auth)]
+)
+@management_endpoint_wrapper
+async def update_key_fn(
+    request: Request,
+    data: UpdateKeyRequest,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+    litellm_changed_by: Optional[str] = Header(
+        None,
+        description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
+    ),
+):
+    """
+    Update an existing API key's parameters.
+
+    Parameters:
+    - key: str - The key to update
+    - key_alias: Optional[str] - User-friendly key alias
+    - user_id: Optional[str] - User ID associated with key
+    - team_id: Optional[str] - Team ID associated with key
+    - budget_id: Optional[str] - The budget id associated with the key. Created by calling `/budget/new`.
+    - models: Optional[list] - Model_name's a user is allowed to call
+    - tags: Optional[List[str]] - Tags for organizing keys (Enterprise only)
+    - enforced_params: Optional[List[str]] - List of enforced params for the key (Enterprise only). [Docs](https://docs.litellm.ai/docs/proxy/enterprise#enforce-required-params-for-llm-requests)
+    - spend: Optional[float] - Amount spent by key
+    - max_budget: Optional[float] - Max budget for key
+    - model_max_budget: Optional[Dict[str, BudgetConfig]] - Model-specific budgets {"gpt-4": {"budget_limit": 0.0005, "time_period": "30d"}}
+    - budget_duration: Optional[str] - Budget reset period ("30d", "1h", etc.)
+    - soft_budget: Optional[float] - [TODO] Soft budget limit (warning vs. hard stop). Will trigger a slack alert when this soft budget is reached.
+    - max_parallel_requests: Optional[int] - Rate limit for parallel requests
+    - metadata: Optional[dict] - Metadata for key. Example {"team": "core-infra", "app": "app2"}
+    - tpm_limit: Optional[int] - Tokens per minute limit
+    - rpm_limit: Optional[int] - Requests per minute limit
+    - model_rpm_limit: Optional[dict] - Model-specific RPM limits {"gpt-4": 100, "claude-v1": 200}
+    - model_tpm_limit: Optional[dict] - Model-specific TPM limits {"gpt-4": 100000, "claude-v1": 200000}
+    - allowed_cache_controls: Optional[list] - List of allowed cache control values
+    - duration: Optional[str] - Key validity duration ("30d", "1h", etc.)
+    - permissions: Optional[dict] - Key-specific permissions
+    - send_invite_email: Optional[bool] - Send invite email to user_id
+    - guardrails: Optional[List[str]] - List of active guardrails for the key
+    - blocked: Optional[bool] - Whether the key is blocked
+    - aliases: Optional[dict] - Model aliases for the key - [Docs](https://litellm.vercel.app/docs/proxy/virtual_keys#model-aliases)
+    - config: Optional[dict] - [DEPRECATED PARAM] Key-specific config.
+    - temp_budget_increase: Optional[float] - Temporary budget increase for the key (Enterprise only).
+    - temp_budget_expiry: Optional[str] - Expiry time for the temporary budget increase (Enterprise only).
+
+    Example:
+    ```bash
+    curl --location 'http://0.0.0.0:4000/key/update' \
+    --header 'Authorization: Bearer sk-1234' \
+    --header 'Content-Type: application/json' \
+    --data '{
+        "key": "sk-1234",
+        "key_alias": "my-key",
+        "user_id": "user-1234",
+        "team_id": "team-1234",
+        "max_budget": 100,
+        "metadata": {"any_key": "any-val"},
+    }'
+    ```
+    """
+    from litellm.proxy.proxy_server import (
+        llm_router,
+        premium_user,
+        prisma_client,
+        proxy_logging_obj,
+        user_api_key_cache,
+    )
+
+    try:
+        data_json: dict = data.model_dump(exclude_unset=True, exclude_none=True)
+        key = data_json.pop("key")
+
+        # get the row from db
+        if prisma_client is None:
+            raise Exception("Not connected to DB!")
+
+        common_key_access_checks(
+            user_api_key_dict=user_api_key_dict,
+            data=data,
+            llm_router=llm_router,
+            premium_user=premium_user,
+        )
+
+        existing_key_row = await prisma_client.get_data(
+            token=data.key, table_name="key", query_type="find_unique"
+        )
+
+        if existing_key_row is None:
+            raise HTTPException(
+                status_code=404,
+                detail={"error": f"Team not found, passed team_id={data.team_id}"},
+            )
+
+        non_default_values = prepare_key_update_data(
+            data=data, existing_key_row=existing_key_row
+        )
+
+        await _enforce_unique_key_alias(
+            key_alias=non_default_values.get("key_alias", None),
+            prisma_client=prisma_client,
+            existing_key_token=existing_key_row.token,
+        )
+
+        _data = {**non_default_values, "token": key}
+        response = await prisma_client.update_data(token=key, data=_data)
+
+        # Delete - key from cache, since it's been updated!
+        # key updated - a new model could have been added to this key. it should not block requests after this is done
+        await _delete_cache_key_object(
+            hashed_token=hash_token(key),
+            user_api_key_cache=user_api_key_cache,
+            proxy_logging_obj=proxy_logging_obj,
+        )
+
+        asyncio.create_task(
+            KeyManagementEventHooks.async_key_updated_hook(
+                data=data,
+                existing_key_row=existing_key_row,
+                response=response,
+                user_api_key_dict=user_api_key_dict,
+                litellm_changed_by=litellm_changed_by,
+            )
+        )
+
+        if response is None:
+            raise ValueError("Failed to update key got response = None")
+
+        return {"key": key, **response["data"]}
+        # update based on remaining passed in values
+    except Exception as e:
+        verbose_proxy_logger.exception(
+            "litellm.proxy.proxy_server.update_key_fn(): Exception occured - {}".format(
+                str(e)
+            )
+        )
+        if isinstance(e, HTTPException):
+            raise ProxyException(
+                message=getattr(e, "detail", f"Authentication Error({str(e)})"),
+                type=ProxyErrorTypes.auth_error,
+                param=getattr(e, "param", "None"),
+                code=getattr(e, "status_code", status.HTTP_400_BAD_REQUEST),
+            )
+        elif isinstance(e, ProxyException):
+            raise e
+        raise ProxyException(
+            message="Authentication Error, " + str(e),
+            type=ProxyErrorTypes.auth_error,
+            param=getattr(e, "param", "None"),
+            code=status.HTTP_400_BAD_REQUEST,
+        )
+
+
+@router.post(
+    "/key/delete", tags=["key management"], dependencies=[Depends(user_api_key_auth)]
+)
+@management_endpoint_wrapper
+async def delete_key_fn(
+    data: KeyRequest,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+    litellm_changed_by: Optional[str] = Header(
+        None,
+        description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
+    ),
+):
+    """
+    Delete a key from the key management system.
+
+    Parameters::
+    - keys (List[str]): A list of keys or hashed keys to delete. Example {"keys": ["sk-QWrxEynunsNpV1zT48HIrw", "837e17519f44683334df5291321d97b8bf1098cd490e49e215f6fea935aa28be"]}
+    - key_aliases (List[str]): A list of key aliases to delete. Can be passed instead of `keys`.Example {"key_aliases": ["alias1", "alias2"]}
+
+    Returns:
+    - deleted_keys (List[str]): A list of deleted keys. Example {"deleted_keys": ["sk-QWrxEynunsNpV1zT48HIrw", "837e17519f44683334df5291321d97b8bf1098cd490e49e215f6fea935aa28be"]}
+
+    Example:
+    ```bash
+    curl --location 'http://0.0.0.0:4000/key/delete' \
+    --header 'Authorization: Bearer sk-1234' \
+    --header 'Content-Type: application/json' \
+    --data '{
+        "keys": ["sk-QWrxEynunsNpV1zT48HIrw"]
+    }'
+    ```
+
+    Raises:
+        HTTPException: If an error occurs during key deletion.
+    """
+    try:
+        from litellm.proxy.proxy_server import prisma_client, user_api_key_cache
+
+        if prisma_client is None:
+            raise Exception("Not connected to DB!")
+
+        ## only allow user to delete keys they own
+        verbose_proxy_logger.debug(
+            f"user_api_key_dict.user_role: {user_api_key_dict.user_role}"
+        )
+
+        num_keys_to_be_deleted = 0
+        deleted_keys = []
+        if data.keys:
+            number_deleted_keys, _keys_being_deleted = await delete_verification_tokens(
+                tokens=data.keys,
+                user_api_key_cache=user_api_key_cache,
+                user_api_key_dict=user_api_key_dict,
+            )
+            num_keys_to_be_deleted = len(data.keys)
+            deleted_keys = data.keys
+        elif data.key_aliases:
+            number_deleted_keys, _keys_being_deleted = await delete_key_aliases(
+                key_aliases=data.key_aliases,
+                prisma_client=prisma_client,
+                user_api_key_cache=user_api_key_cache,
+                user_api_key_dict=user_api_key_dict,
+            )
+            num_keys_to_be_deleted = len(data.key_aliases)
+            deleted_keys = data.key_aliases
+        else:
+            raise ValueError("Invalid request type")
+
+        if number_deleted_keys is None:
+            raise ProxyException(
+                message="Failed to delete keys got None response from delete_verification_token",
+                type=ProxyErrorTypes.internal_server_error,
+                param="keys",
+                code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+            )
+        verbose_proxy_logger.debug(f"/key/delete - deleted_keys={number_deleted_keys}")
+
+        try:
+            assert num_keys_to_be_deleted == len(deleted_keys)
+        except Exception:
+            raise HTTPException(
+                status_code=400,
+                detail={
+                    "error": f"Not all keys passed in were deleted. This probably means you don't have access to delete all the keys passed in. Keys passed in={num_keys_to_be_deleted}, Deleted keys ={number_deleted_keys}"
+                },
+            )
+
+        verbose_proxy_logger.debug(
+            f"/keys/delete - cache after delete: {user_api_key_cache.in_memory_cache.cache_dict}"
+        )
+
+        asyncio.create_task(
+            KeyManagementEventHooks.async_key_deleted_hook(
+                data=data,
+                keys_being_deleted=_keys_being_deleted,
+                user_api_key_dict=user_api_key_dict,
+                litellm_changed_by=litellm_changed_by,
+                response=number_deleted_keys,
+            )
+        )
+
+        return {"deleted_keys": deleted_keys}
+    except Exception as e:
+        verbose_proxy_logger.exception(
+            "litellm.proxy.proxy_server.delete_key_fn(): Exception occured - {}".format(
+                str(e)
+            )
+        )
+        raise handle_exception_on_proxy(e)
+
+
+@router.post(
+    "/v2/key/info",
+    tags=["key management"],
+    dependencies=[Depends(user_api_key_auth)],
+    include_in_schema=False,
+)
+async def info_key_fn_v2(
+    data: Optional[KeyRequest] = None,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """
+    Retrieve information about a list of keys.
+
+    **New endpoint**. Currently admin only.
+    Parameters:
+        keys: Optional[list] = body parameter representing the key(s) in the request
+        user_api_key_dict: UserAPIKeyAuth = Dependency representing the user's API key
+    Returns:
+        Dict containing the key and its associated information
+
+    Example Curl:
+    ```
+    curl -X GET "http://0.0.0.0:4000/key/info" \
+    -H "Authorization: Bearer sk-1234" \
+    -d {"keys": ["sk-1", "sk-2", "sk-3"]}
+    ```
+    """
+    from litellm.proxy.proxy_server import prisma_client
+
+    try:
+        if prisma_client is None:
+            raise Exception(
+                "Database not connected. Connect a database to your proxy - https://docs.litellm.ai/docs/simple_proxy#managing-auth---virtual-keys"
+            )
+        if data is None:
+            raise HTTPException(
+                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+                detail={"message": "Malformed request. No keys passed in."},
+            )
+
+        key_info = await prisma_client.get_data(
+            token=data.keys, table_name="key", query_type="find_all"
+        )
+        if key_info is None:
+            raise HTTPException(
+                status_code=status.HTTP_404_NOT_FOUND,
+                detail={"message": "No keys found"},
+            )
+        filtered_key_info = []
+        for k in key_info:
+            try:
+                k = k.model_dump()  # noqa
+            except Exception:
+                # if using pydantic v1
+                k = k.dict()
+            filtered_key_info.append(k)
+        return {"key": data.keys, "info": filtered_key_info}
+
+    except Exception as e:
+        raise handle_exception_on_proxy(e)
+
+
+@router.get(
+    "/key/info", tags=["key management"], dependencies=[Depends(user_api_key_auth)]
+)
+async def info_key_fn(
+    key: Optional[str] = fastapi.Query(
+        default=None, description="Key in the request parameters"
+    ),
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """
+    Retrieve information about a key.
+    Parameters:
+        key: Optional[str] = Query parameter representing the key in the request
+        user_api_key_dict: UserAPIKeyAuth = Dependency representing the user's API key
+    Returns:
+        Dict containing the key and its associated information
+
+    Example Curl:
+    ```
+    curl -X GET "http://0.0.0.0:4000/key/info?key=sk-02Wr4IAlN3NvPXvL5JVvDA" \
+-H "Authorization: Bearer sk-1234"
+    ```
+
+    Example Curl - if no key is passed, it will use the Key Passed in Authorization Header
+    ```
+    curl -X GET "http://0.0.0.0:4000/key/info" \
+-H "Authorization: Bearer sk-02Wr4IAlN3NvPXvL5JVvDA"
+    ```
+    """
+    from litellm.proxy.proxy_server import prisma_client
+
+    try:
+        if prisma_client is None:
+            raise Exception(
+                "Database not connected. Connect a database to your proxy - https://docs.litellm.ai/docs/simple_proxy#managing-auth---virtual-keys"
+            )
+
+        # default to using Auth token if no key is passed in
+        key = key or user_api_key_dict.api_key
+        hashed_key: Optional[str] = key
+        if key is not None:
+            hashed_key = _hash_token_if_needed(token=key)
+        key_info = await prisma_client.db.litellm_verificationtoken.find_unique(
+            where={"token": hashed_key},  # type: ignore
+            include={"litellm_budget_table": True},
+        )
+        if key_info is None:
+            raise ProxyException(
+                message="Key not found in database",
+                type=ProxyErrorTypes.not_found_error,
+                param="key",
+                code=status.HTTP_404_NOT_FOUND,
+            )
+
+        if (
+            _can_user_query_key_info(
+                user_api_key_dict=user_api_key_dict,
+                key=key,
+                key_info=key_info,
+            )
+            is not True
+        ):
+            raise HTTPException(
+                status_code=status.HTTP_403_FORBIDDEN,
+                detail="You are not allowed to access this key's info. Your role={}".format(
+                    user_api_key_dict.user_role
+                ),
+            )
+        ## REMOVE HASHED TOKEN INFO BEFORE RETURNING ##
+        try:
+            key_info = key_info.model_dump()  # noqa
+        except Exception:
+            # if using pydantic v1
+            key_info = key_info.dict()
+        key_info.pop("token")
+        return {"key": key, "info": key_info}
+    except Exception as e:
+        raise handle_exception_on_proxy(e)
+
+
+def _check_model_access_group(
+    models: Optional[List[str]], llm_router: Optional[Router], premium_user: bool
+) -> Literal[True]:
+    """
+    if is_model_access_group is True + is_wildcard_route is True, check if user is a premium user
+
+    Return True if user is a premium user, False otherwise
+    """
+    if models is None or llm_router is None:
+        return True
+
+    for model in models:
+        if llm_router._is_model_access_group_for_wildcard_route(
+            model_access_group=model
+        ):
+            if not premium_user:
+                raise HTTPException(
+                    status_code=status.HTTP_403_FORBIDDEN,
+                    detail={
+                        "error": "Setting a model access group on a wildcard model is only available for LiteLLM Enterprise users.{}".format(
+                            CommonProxyErrors.not_premium_user.value
+                        )
+                    },
+                )
+
+    return True
+
+
+async def generate_key_helper_fn(  # noqa: PLR0915
+    request_type: Literal[
+        "user", "key"
+    ],  # identifies if this request is from /user/new or /key/generate
+    duration: Optional[str] = None,
+    models: list = [],
+    aliases: dict = {},
+    config: dict = {},
+    spend: float = 0.0,
+    key_max_budget: Optional[float] = None,  # key_max_budget is used to Budget Per key
+    key_budget_duration: Optional[str] = None,
+    budget_id: Optional[float] = None,  # budget id <-> LiteLLM_BudgetTable
+    soft_budget: Optional[
+        float
+    ] = None,  # soft_budget is used to set soft Budgets Per user
+    max_budget: Optional[float] = None,  # max_budget is used to Budget Per user
+    blocked: Optional[bool] = None,
+    budget_duration: Optional[str] = None,  # max_budget is used to Budget Per user
+    token: Optional[str] = None,
+    key: Optional[
+        str
+    ] = None,  # dev-friendly alt param for 'token'. Exposed on `/key/generate` for setting key value yourself.
+    user_id: Optional[str] = None,
+    user_alias: Optional[str] = None,
+    team_id: Optional[str] = None,
+    user_email: Optional[str] = None,
+    user_role: Optional[str] = None,
+    max_parallel_requests: Optional[int] = None,
+    metadata: Optional[dict] = {},
+    tpm_limit: Optional[int] = None,
+    rpm_limit: Optional[int] = None,
+    query_type: Literal["insert_data", "update_data"] = "insert_data",
+    update_key_values: Optional[dict] = None,
+    key_alias: Optional[str] = None,
+    allowed_cache_controls: Optional[list] = [],
+    permissions: Optional[dict] = {},
+    model_max_budget: Optional[dict] = {},
+    model_rpm_limit: Optional[dict] = None,
+    model_tpm_limit: Optional[dict] = None,
+    guardrails: Optional[list] = None,
+    teams: Optional[list] = None,
+    organization_id: Optional[str] = None,
+    table_name: Optional[Literal["key", "user"]] = None,
+    send_invite_email: Optional[bool] = None,
+    created_by: Optional[str] = None,
+    updated_by: Optional[str] = None,
+):
+    from litellm.proxy.proxy_server import (
+        litellm_proxy_budget_name,
+        premium_user,
+        prisma_client,
+    )
+
+    if prisma_client is None:
+        raise Exception(
+            "Connect Proxy to database to generate keys - https://docs.litellm.ai/docs/proxy/virtual_keys "
+        )
+
+    if token is None:
+        if key is not None:
+            token = key
+        else:
+            token = f"sk-{secrets.token_urlsafe(16)}"
+
+    if duration is None:  # allow tokens that never expire
+        expires = None
+    else:
+        duration_s = duration_in_seconds(duration=duration)
+        expires = datetime.now(timezone.utc) + timedelta(seconds=duration_s)
+
+    if key_budget_duration is None:  # one-time budget
+        key_reset_at = None
+    else:
+        duration_s = duration_in_seconds(duration=key_budget_duration)
+        key_reset_at = datetime.now(timezone.utc) + timedelta(seconds=duration_s)
+
+    if budget_duration is None:  # one-time budget
+        reset_at = None
+    else:
+        duration_s = duration_in_seconds(duration=budget_duration)
+        reset_at = datetime.now(timezone.utc) + timedelta(seconds=duration_s)
+
+    aliases_json = json.dumps(aliases)
+    config_json = json.dumps(config)
+    permissions_json = json.dumps(permissions)
+
+    # Add model_rpm_limit and model_tpm_limit to metadata
+    if model_rpm_limit is not None:
+        metadata = metadata or {}
+        metadata["model_rpm_limit"] = model_rpm_limit
+    if model_tpm_limit is not None:
+        metadata = metadata or {}
+        metadata["model_tpm_limit"] = model_tpm_limit
+    if guardrails is not None:
+        metadata = metadata or {}
+        metadata["guardrails"] = guardrails
+
+    metadata_json = json.dumps(metadata)
+    validate_model_max_budget(model_max_budget)
+    model_max_budget_json = json.dumps(model_max_budget)
+    user_role = user_role
+    tpm_limit = tpm_limit
+    rpm_limit = rpm_limit
+    allowed_cache_controls = allowed_cache_controls
+
+    try:
+        # Create a new verification token (you may want to enhance this logic based on your needs)
+        user_data = {
+            "max_budget": max_budget,
+            "user_email": user_email,
+            "user_id": user_id,
+            "user_alias": user_alias,
+            "team_id": team_id,
+            "organization_id": organization_id,
+            "user_role": user_role,
+            "spend": spend,
+            "models": models,
+            "metadata": metadata_json,
+            "max_parallel_requests": max_parallel_requests,
+            "tpm_limit": tpm_limit,
+            "rpm_limit": rpm_limit,
+            "budget_duration": budget_duration,
+            "budget_reset_at": reset_at,
+            "allowed_cache_controls": allowed_cache_controls,
+        }
+        if teams is not None:
+            user_data["teams"] = teams
+        key_data = {
+            "token": token,
+            "key_alias": key_alias,
+            "expires": expires,
+            "models": models,
+            "aliases": aliases_json,
+            "config": config_json,
+            "spend": spend,
+            "max_budget": key_max_budget,
+            "user_id": user_id,
+            "team_id": team_id,
+            "max_parallel_requests": max_parallel_requests,
+            "metadata": metadata_json,
+            "tpm_limit": tpm_limit,
+            "rpm_limit": rpm_limit,
+            "budget_duration": key_budget_duration,
+            "budget_reset_at": key_reset_at,
+            "allowed_cache_controls": allowed_cache_controls,
+            "permissions": permissions_json,
+            "model_max_budget": model_max_budget_json,
+            "budget_id": budget_id,
+            "blocked": blocked,
+            "created_by": created_by,
+            "updated_by": updated_by,
+        }
+
+        if (
+            get_secret("DISABLE_KEY_NAME", False) is True
+        ):  # allow user to disable storing abbreviated key name (shown in UI, to help figure out which key spent how much)
+            pass
+        else:
+            key_data["key_name"] = f"sk-...{token[-4:]}"
+        saved_token = copy.deepcopy(key_data)
+        if isinstance(saved_token["aliases"], str):
+            saved_token["aliases"] = json.loads(saved_token["aliases"])
+        if isinstance(saved_token["config"], str):
+            saved_token["config"] = json.loads(saved_token["config"])
+        if isinstance(saved_token["metadata"], str):
+            saved_token["metadata"] = json.loads(saved_token["metadata"])
+        if isinstance(saved_token["permissions"], str):
+            if (
+                "get_spend_routes" in saved_token["permissions"]
+                and premium_user is not True
+            ):
+                raise ValueError(
+                    "get_spend_routes permission is only available for LiteLLM Enterprise users"
+                )
+
+            saved_token["permissions"] = json.loads(saved_token["permissions"])
+        if isinstance(saved_token["model_max_budget"], str):
+            saved_token["model_max_budget"] = json.loads(
+                saved_token["model_max_budget"]
+            )
+
+        if saved_token.get("expires", None) is not None and isinstance(
+            saved_token["expires"], datetime
+        ):
+            saved_token["expires"] = saved_token["expires"].isoformat()
+        if prisma_client is not None:
+            if (
+                table_name is None or table_name == "user"
+            ):  # do not auto-create users for `/key/generate`
+                ## CREATE USER (If necessary)
+                if query_type == "insert_data":
+                    user_row = await prisma_client.insert_data(
+                        data=user_data, table_name="user"
+                    )
+
+                    if user_row is None:
+                        raise Exception("Failed to create user")
+                    ## use default user model list if no key-specific model list provided
+                    if len(user_row.models) > 0 and len(key_data["models"]) == 0:  # type: ignore
+                        key_data["models"] = user_row.models  # type: ignore
+                elif query_type == "update_data":
+                    user_row = await prisma_client.update_data(
+                        data=user_data,
+                        table_name="user",
+                        update_key_values=update_key_values,
+                    )
+            if user_id == litellm_proxy_budget_name or (
+                table_name is not None and table_name == "user"
+            ):
+                # do not create a key for litellm_proxy_budget_name or if table name is set to just 'user'
+                # we only need to ensure this exists in the user table
+                # the LiteLLM_VerificationToken table will increase in size if we don't do this check
+                return user_data
+
+            ## CREATE KEY
+            verbose_proxy_logger.debug("prisma_client: Creating Key= %s", key_data)
+            create_key_response = await prisma_client.insert_data(
+                data=key_data, table_name="key"
+            )
+            key_data["token_id"] = getattr(create_key_response, "token", None)
+            key_data["litellm_budget_table"] = getattr(
+                create_key_response, "litellm_budget_table", None
+            )
+    except Exception as e:
+        verbose_proxy_logger.error(
+            "litellm.proxy.proxy_server.generate_key_helper_fn(): Exception occured - {}".format(
+                str(e)
+            )
+        )
+        verbose_proxy_logger.debug(traceback.format_exc())
+        if isinstance(e, HTTPException):
+            raise e
+        raise HTTPException(
+            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+            detail={"error": "Internal Server Error."},
+        )
+
+    # Add budget related info in key_data - this ensures it's returned
+    key_data["budget_id"] = budget_id
+
+    if request_type == "user":
+        # if this is a /user/new request update the key_date with user_data fields
+        key_data.update(user_data)
+    return key_data
+
+
+async def _team_key_deletion_check(
+    user_api_key_dict: UserAPIKeyAuth,
+    key_info: LiteLLM_VerificationToken,
+    prisma_client: PrismaClient,
+    user_api_key_cache: DualCache,
+):
+    is_team_key = _is_team_key(data=key_info)
+
+    if is_team_key and key_info.team_id is not None:
+        team_table = await get_team_object(
+            team_id=key_info.team_id,
+            prisma_client=prisma_client,
+            user_api_key_cache=user_api_key_cache,
+            check_db_only=True,
+        )
+        if (
+            litellm.key_generation_settings is not None
+            and "team_key_generation" in litellm.key_generation_settings
+        ):
+            _team_key_generation = litellm.key_generation_settings[
+                "team_key_generation"
+            ]
+        else:
+            _team_key_generation = TeamUIKeyGenerationConfig(
+                allowed_team_member_roles=["admin", "user"],
+            )
+        # check if user is team admin
+        if team_table is not None:
+            return _team_key_generation_team_member_check(
+                assigned_user_id=user_api_key_dict.user_id,
+                team_table=team_table,
+                user_api_key_dict=user_api_key_dict,
+                team_key_generation=_team_key_generation,
+            )
+        else:
+            raise HTTPException(
+                status_code=status.HTTP_404_NOT_FOUND,
+                detail={
+                    "error": f"Team not found in db, and user not proxy admin. Team id = {key_info.team_id}"
+                },
+            )
+    return False
+
+
+async def can_delete_verification_token(
+    key_info: LiteLLM_VerificationToken,
+    user_api_key_cache: DualCache,
+    user_api_key_dict: UserAPIKeyAuth,
+    prisma_client: PrismaClient,
+) -> bool:
+    """
+    - check if user is proxy admin
+    - check if user is team admin and key is a team key
+    - check if key is personal key
+    """
+    is_team_key = _is_team_key(data=key_info)
+    if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value:
+        return True
+    elif is_team_key and key_info.team_id is not None:
+        return await _team_key_deletion_check(
+            user_api_key_dict=user_api_key_dict,
+            key_info=key_info,
+            prisma_client=prisma_client,
+            user_api_key_cache=user_api_key_cache,
+        )
+    elif key_info.user_id is not None and key_info.user_id == user_api_key_dict.user_id:
+        return True
+    else:
+        return False
+
+
+async def delete_verification_tokens(
+    tokens: List,
+    user_api_key_cache: DualCache,
+    user_api_key_dict: UserAPIKeyAuth,
+) -> Tuple[Optional[Dict], List[LiteLLM_VerificationToken]]:
+    """
+    Helper that deletes the list of tokens from the database
+
+    - check if user is proxy admin
+    - check if user is team admin and key is a team key
+
+    Args:
+        tokens: List of tokens to delete
+        user_id: Optional user_id to filter by
+
+    Returns:
+        Tuple[Optional[Dict], List[LiteLLM_VerificationToken]]:
+            Optional[Dict]:
+                - Number of deleted tokens
+            List[LiteLLM_VerificationToken]:
+                - List of keys being deleted, this contains information about the key_alias, token, and user_id being deleted,
+                this is passed down to the KeyManagementEventHooks to delete the keys from the secret manager and handle audit logs
+    """
+    from litellm.proxy.proxy_server import prisma_client
+
+    try:
+        if prisma_client:
+            tokens = [_hash_token_if_needed(token=key) for key in tokens]
+            _keys_being_deleted: List[LiteLLM_VerificationToken] = (
+                await prisma_client.db.litellm_verificationtoken.find_many(
+                    where={"token": {"in": tokens}}
+                )
+            )
+
+            # Assuming 'db' is your Prisma Client instance
+            # check if admin making request - don't filter by user-id
+            if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value:
+                deleted_tokens = await prisma_client.delete_data(tokens=tokens)
+            # else
+            else:
+                tasks = []
+                deleted_tokens = []
+                for key in _keys_being_deleted:
+
+                    async def _delete_key(key: LiteLLM_VerificationToken):
+                        if await can_delete_verification_token(
+                            key_info=key,
+                            user_api_key_cache=user_api_key_cache,
+                            user_api_key_dict=user_api_key_dict,
+                            prisma_client=prisma_client,
+                        ):
+                            await prisma_client.delete_data(tokens=[key.token])
+                            deleted_tokens.append(key.token)
+                        else:
+                            raise HTTPException(
+                                status_code=status.HTTP_403_FORBIDDEN,
+                                detail={
+                                    "error": "You are not authorized to delete this key"
+                                },
+                            )
+
+                    tasks.append(_delete_key(key))
+                await asyncio.gather(*tasks)
+
+                _num_deleted_tokens = len(deleted_tokens)
+                if _num_deleted_tokens != len(tokens):
+                    failed_tokens = [
+                        token for token in tokens if token not in deleted_tokens
+                    ]
+                    raise Exception(
+                        "Failed to delete all tokens. Failed to delete tokens: "
+                        + str(failed_tokens)
+                    )
+        else:
+            raise Exception("DB not connected. prisma_client is None")
+    except Exception as e:
+        verbose_proxy_logger.exception(
+            "litellm.proxy.proxy_server.delete_verification_tokens(): Exception occured - {}".format(
+                str(e)
+            )
+        )
+        verbose_proxy_logger.debug(traceback.format_exc())
+        raise e
+
+    for key in tokens:
+        user_api_key_cache.delete_cache(key)
+        # remove hash token from cache
+        hashed_token = hash_token(cast(str, key))
+        user_api_key_cache.delete_cache(hashed_token)
+
+    return {"deleted_keys": deleted_tokens}, _keys_being_deleted
+
+
+async def delete_key_aliases(
+    key_aliases: List[str],
+    user_api_key_cache: DualCache,
+    prisma_client: PrismaClient,
+    user_api_key_dict: UserAPIKeyAuth,
+) -> Tuple[Optional[Dict], List[LiteLLM_VerificationToken]]:
+    _keys_being_deleted = await prisma_client.db.litellm_verificationtoken.find_many(
+        where={"key_alias": {"in": key_aliases}}
+    )
+
+    tokens = [key.token for key in _keys_being_deleted]
+    return await delete_verification_tokens(
+        tokens=tokens,
+        user_api_key_cache=user_api_key_cache,
+        user_api_key_dict=user_api_key_dict,
+    )
+
+
+async def _rotate_master_key(
+    prisma_client: PrismaClient,
+    user_api_key_dict: UserAPIKeyAuth,
+    current_master_key: str,
+    new_master_key: str,
+) -> None:
+    """
+    Rotate the master key
+
+    1. Get the values from the DB
+        - Get models from DB
+        - Get config from DB
+    2. Decrypt the values
+        - ModelTable
+            - [{"model_name": "str", "litellm_params": {}}]
+        - ConfigTable
+    3. Encrypt the values with the new master key
+    4. Update the values in the DB
+    """
+    from litellm.proxy.proxy_server import proxy_config
+
+    try:
+        models: Optional[List] = (
+            await prisma_client.db.litellm_proxymodeltable.find_many()
+        )
+    except Exception:
+        models = None
+    # 2. process model table
+    if models:
+        decrypted_models = proxy_config.decrypt_model_list_from_db(new_models=models)
+        verbose_proxy_logger.info(
+            "ABLE TO DECRYPT MODELS - len(decrypted_models): %s", len(decrypted_models)
+        )
+        new_models = []
+        for model in decrypted_models:
+            new_model = await _add_model_to_db(
+                model_params=Deployment(**model),
+                user_api_key_dict=user_api_key_dict,
+                prisma_client=prisma_client,
+                new_encryption_key=new_master_key,
+                should_create_model_in_db=False,
+            )
+            if new_model:
+                new_models.append(jsonify_object(new_model.model_dump()))
+        verbose_proxy_logger.info("Resetting proxy model table")
+        await prisma_client.db.litellm_proxymodeltable.delete_many()
+        verbose_proxy_logger.info("Creating %s models", len(new_models))
+        await prisma_client.db.litellm_proxymodeltable.create_many(
+            data=new_models,
+        )
+    # 3. process config table
+    try:
+        config = await prisma_client.db.litellm_config.find_many()
+    except Exception:
+        config = None
+
+    if config:
+        """If environment_variables is found, decrypt it and encrypt it with the new master key"""
+        environment_variables_dict = {}
+        for c in config:
+            if c.param_name == "environment_variables":
+                environment_variables_dict = c.param_value
+
+        if environment_variables_dict:
+            decrypted_env_vars = proxy_config._decrypt_and_set_db_env_variables(
+                environment_variables=environment_variables_dict
+            )
+            encrypted_env_vars = proxy_config._encrypt_env_variables(
+                environment_variables=decrypted_env_vars,
+                new_encryption_key=new_master_key,
+            )
+
+            if encrypted_env_vars:
+                await prisma_client.db.litellm_config.update(
+                    where={"param_name": "environment_variables"},
+                    data={"param_value": jsonify_object(encrypted_env_vars)},
+                )
+
+
+@router.post(
+    "/key/{key:path}/regenerate",
+    tags=["key management"],
+    dependencies=[Depends(user_api_key_auth)],
+)
+@router.post(
+    "/key/regenerate",
+    tags=["key management"],
+    dependencies=[Depends(user_api_key_auth)],
+)
+@management_endpoint_wrapper
+async def regenerate_key_fn(
+    key: Optional[str] = None,
+    data: Optional[RegenerateKeyRequest] = None,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+    litellm_changed_by: Optional[str] = Header(
+        None,
+        description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
+    ),
+) -> Optional[GenerateKeyResponse]:
+    """
+    Regenerate an existing API key while optionally updating its parameters.
+
+    Parameters:
+    - key: str (path parameter) - The key to regenerate
+    - data: Optional[RegenerateKeyRequest] - Request body containing optional parameters to update
+        - key_alias: Optional[str] - User-friendly key alias
+        - user_id: Optional[str] - User ID associated with key
+        - team_id: Optional[str] - Team ID associated with key
+        - models: Optional[list] - Model_name's a user is allowed to call
+        - tags: Optional[List[str]] - Tags for organizing keys (Enterprise only)
+        - spend: Optional[float] - Amount spent by key
+        - max_budget: Optional[float] - Max budget for key
+        - model_max_budget: Optional[Dict[str, BudgetConfig]] - Model-specific budgets {"gpt-4": {"budget_limit": 0.0005, "time_period": "30d"}}
+        - budget_duration: Optional[str] - Budget reset period ("30d", "1h", etc.)
+        - soft_budget: Optional[float] - Soft budget limit (warning vs. hard stop). Will trigger a slack alert when this soft budget is reached.
+        - max_parallel_requests: Optional[int] - Rate limit for parallel requests
+        - metadata: Optional[dict] - Metadata for key. Example {"team": "core-infra", "app": "app2"}
+        - tpm_limit: Optional[int] - Tokens per minute limit
+        - rpm_limit: Optional[int] - Requests per minute limit
+        - model_rpm_limit: Optional[dict] - Model-specific RPM limits {"gpt-4": 100, "claude-v1": 200}
+        - model_tpm_limit: Optional[dict] - Model-specific TPM limits {"gpt-4": 100000, "claude-v1": 200000}
+        - allowed_cache_controls: Optional[list] - List of allowed cache control values
+        - duration: Optional[str] - Key validity duration ("30d", "1h", etc.)
+        - permissions: Optional[dict] - Key-specific permissions
+        - guardrails: Optional[List[str]] - List of active guardrails for the key
+        - blocked: Optional[bool] - Whether the key is blocked
+
+
+    Returns:
+    - GenerateKeyResponse containing the new key and its updated parameters
+
+    Example:
+    ```bash
+    curl --location --request POST 'http://localhost:4000/key/sk-1234/regenerate' \
+    --header 'Authorization: Bearer sk-1234' \
+    --header 'Content-Type: application/json' \
+    --data-raw '{
+        "max_budget": 100,
+        "metadata": {"team": "core-infra"},
+        "models": ["gpt-4", "gpt-3.5-turbo"]
+    }'
+    ```
+
+    Note: This is an Enterprise feature. It requires a premium license to use.
+    """
+    try:
+
+        from litellm.proxy.proxy_server import (
+            hash_token,
+            master_key,
+            premium_user,
+            prisma_client,
+            proxy_logging_obj,
+            user_api_key_cache,
+        )
+
+        if premium_user is not True:
+            raise ValueError(
+                f"Regenerating Virtual Keys is an Enterprise feature, {CommonProxyErrors.not_premium_user.value}"
+            )
+
+        # Check if key exists, raise exception if key is not in the DB
+        key = data.key if data and data.key else key
+        if not key:
+            raise HTTPException(status_code=400, detail={"error": "No key passed in."})
+        ### 1. Create New copy that is duplicate of existing key
+        ######################################################################
+
+        # create duplicate of existing key
+        # set token = new token generated
+        # insert new token in DB
+
+        # create hash of token
+        if prisma_client is None:
+            raise HTTPException(
+                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+                detail={"error": "DB not connected. prisma_client is None"},
+            )
+
+        _is_master_key_valid = _is_master_key(api_key=key, _master_key=master_key)
+
+        if master_key is not None and data and _is_master_key_valid:
+            if data.new_master_key is None:
+                raise HTTPException(
+                    status_code=status.HTTP_400_BAD_REQUEST,
+                    detail={"error": "New master key is required."},
+                )
+            await _rotate_master_key(
+                prisma_client=prisma_client,
+                user_api_key_dict=user_api_key_dict,
+                current_master_key=master_key,
+                new_master_key=data.new_master_key,
+            )
+            return GenerateKeyResponse(
+                key=data.new_master_key,
+                token=data.new_master_key,
+                key_name=data.new_master_key,
+                expires=None,
+            )
+
+        if "sk" not in key:
+            hashed_api_key = key
+        else:
+            hashed_api_key = hash_token(key)
+
+        _key_in_db = await prisma_client.db.litellm_verificationtoken.find_unique(
+            where={"token": hashed_api_key},
+        )
+        if _key_in_db is None:
+            raise HTTPException(
+                status_code=status.HTTP_404_NOT_FOUND,
+                detail={"error": f"Key {key} not found."},
+            )
+
+        verbose_proxy_logger.debug("key_in_db: %s", _key_in_db)
+
+        new_token = f"sk-{secrets.token_urlsafe(16)}"
+        new_token_hash = hash_token(new_token)
+        new_token_key_name = f"sk-...{new_token[-4:]}"
+
+        # Prepare the update data
+        update_data = {
+            "token": new_token_hash,
+            "key_name": new_token_key_name,
+        }
+
+        non_default_values = {}
+        if data is not None:
+            # Update with any provided parameters from GenerateKeyRequest
+            non_default_values = prepare_key_update_data(
+                data=data, existing_key_row=_key_in_db
+            )
+            verbose_proxy_logger.debug("non_default_values: %s", non_default_values)
+
+        update_data.update(non_default_values)
+        update_data = prisma_client.jsonify_object(data=update_data)
+        # Update the token in the database
+        updated_token = await prisma_client.db.litellm_verificationtoken.update(
+            where={"token": hashed_api_key},
+            data=update_data,  # type: ignore
+        )
+
+        updated_token_dict = {}
+        if updated_token is not None:
+            updated_token_dict = dict(updated_token)
+
+        updated_token_dict["key"] = new_token
+        updated_token_dict["token_id"] = updated_token_dict.pop("token")
+
+        ### 3. remove existing key entry from cache
+        ######################################################################
+        if key:
+            await _delete_cache_key_object(
+                hashed_token=hash_token(key),
+                user_api_key_cache=user_api_key_cache,
+                proxy_logging_obj=proxy_logging_obj,
+            )
+
+        if hashed_api_key:
+            await _delete_cache_key_object(
+                hashed_token=hash_token(key),
+                user_api_key_cache=user_api_key_cache,
+                proxy_logging_obj=proxy_logging_obj,
+            )
+
+        response = GenerateKeyResponse(
+            **updated_token_dict,
+        )
+
+        asyncio.create_task(
+            KeyManagementEventHooks.async_key_rotated_hook(
+                data=data,
+                existing_key_row=_key_in_db,
+                response=response,
+                user_api_key_dict=user_api_key_dict,
+                litellm_changed_by=litellm_changed_by,
+            )
+        )
+
+        return response
+    except Exception as e:
+        verbose_proxy_logger.exception("Error regenerating key: %s", e)
+        raise handle_exception_on_proxy(e)
+
+
+async def validate_key_list_check(
+    user_api_key_dict: UserAPIKeyAuth,
+    user_id: Optional[str],
+    team_id: Optional[str],
+    organization_id: Optional[str],
+    key_alias: Optional[str],
+    prisma_client: PrismaClient,
+) -> Optional[LiteLLM_UserTable]:
+
+    if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value:
+        return None
+
+    if user_api_key_dict.user_id is None:
+        raise ProxyException(
+            message="You are not authorized to access this endpoint. No 'user_id' is associated with your API key.",
+            type=ProxyErrorTypes.bad_request_error,
+            param="user_id",
+            code=status.HTTP_403_FORBIDDEN,
+        )
+    complete_user_info_db_obj: Optional[BaseModel] = (
+        await prisma_client.db.litellm_usertable.find_unique(
+            where={"user_id": user_api_key_dict.user_id},
+            include={"organization_memberships": True},
+        )
+    )
+
+    if complete_user_info_db_obj is None:
+        raise ProxyException(
+            message="You are not authorized to access this endpoint. No 'user_id' is associated with your API key.",
+            type=ProxyErrorTypes.bad_request_error,
+            param="user_id",
+            code=status.HTTP_403_FORBIDDEN,
+        )
+
+    complete_user_info = LiteLLM_UserTable(**complete_user_info_db_obj.model_dump())
+
+    # internal user can only see their own keys
+    if user_id:
+        if complete_user_info.user_id != user_id:
+            raise ProxyException(
+                message="You are not authorized to check another user's keys",
+                type=ProxyErrorTypes.bad_request_error,
+                param="user_id",
+                code=status.HTTP_403_FORBIDDEN,
+            )
+
+    if team_id:
+        if team_id not in complete_user_info.teams:
+            raise ProxyException(
+                message="You are not authorized to check this team's keys",
+                type=ProxyErrorTypes.bad_request_error,
+                param="team_id",
+                code=status.HTTP_403_FORBIDDEN,
+            )
+
+    if organization_id:
+        if (
+            complete_user_info.organization_memberships is None
+            or organization_id
+            not in [
+                membership.organization_id
+                for membership in complete_user_info.organization_memberships
+            ]
+        ):
+            raise ProxyException(
+                message="You are not authorized to check this organization's keys",
+                type=ProxyErrorTypes.bad_request_error,
+                param="organization_id",
+                code=status.HTTP_403_FORBIDDEN,
+            )
+    return complete_user_info
+
+
+async def get_admin_team_ids(
+    complete_user_info: Optional[LiteLLM_UserTable],
+    user_api_key_dict: UserAPIKeyAuth,
+    prisma_client: PrismaClient,
+) -> List[str]:
+    """
+    Get all team IDs where the user is an admin.
+    """
+    if complete_user_info is None:
+        return []
+    # Get all teams that user is an admin of
+    teams: Optional[List[BaseModel]] = (
+        await prisma_client.db.litellm_teamtable.find_many(
+            where={"team_id": {"in": complete_user_info.teams}}
+        )
+    )
+    if teams is None:
+        return []
+
+    teams_pydantic_obj = [LiteLLM_TeamTable(**team.model_dump()) for team in teams]
+
+    admin_team_ids = [
+        team.team_id
+        for team in teams_pydantic_obj
+        if _is_user_team_admin(user_api_key_dict=user_api_key_dict, team_obj=team)
+    ]
+    return admin_team_ids
+
+
+@router.get(
+    "/key/list",
+    tags=["key management"],
+    dependencies=[Depends(user_api_key_auth)],
+)
+@management_endpoint_wrapper
+async def list_keys(
+    request: Request,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+    page: int = Query(1, description="Page number", ge=1),
+    size: int = Query(10, description="Page size", ge=1, le=100),
+    user_id: Optional[str] = Query(None, description="Filter keys by user ID"),
+    team_id: Optional[str] = Query(None, description="Filter keys by team ID"),
+    organization_id: Optional[str] = Query(
+        None, description="Filter keys by organization ID"
+    ),
+    key_alias: Optional[str] = Query(None, description="Filter keys by key alias"),
+    return_full_object: bool = Query(False, description="Return full key object"),
+    include_team_keys: bool = Query(
+        False, description="Include all keys for teams that user is an admin of."
+    ),
+) -> KeyListResponseObject:
+    """
+    List all keys for a given user / team / organization.
+
+    Returns:
+        {
+            "keys": List[str] or List[UserAPIKeyAuth],
+            "total_count": int,
+            "current_page": int,
+            "total_pages": int,
+        }
+    """
+    try:
+        from litellm.proxy.proxy_server import prisma_client
+
+        verbose_proxy_logger.debug("Entering list_keys function")
+
+        if prisma_client is None:
+            verbose_proxy_logger.error("Database not connected")
+            raise Exception("Database not connected")
+
+        complete_user_info = await validate_key_list_check(
+            user_api_key_dict=user_api_key_dict,
+            user_id=user_id,
+            team_id=team_id,
+            organization_id=organization_id,
+            key_alias=key_alias,
+            prisma_client=prisma_client,
+        )
+
+        if include_team_keys:
+            admin_team_ids = await get_admin_team_ids(
+                complete_user_info=complete_user_info,
+                user_api_key_dict=user_api_key_dict,
+                prisma_client=prisma_client,
+            )
+        else:
+            admin_team_ids = None
+
+        if user_id is None and user_api_key_dict.user_role not in [
+            LitellmUserRoles.PROXY_ADMIN.value,
+            LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY.value,
+        ]:
+            user_id = user_api_key_dict.user_id
+
+        response = await _list_key_helper(
+            prisma_client=prisma_client,
+            page=page,
+            size=size,
+            user_id=user_id,
+            team_id=team_id,
+            key_alias=key_alias,
+            return_full_object=return_full_object,
+            organization_id=organization_id,
+            admin_team_ids=admin_team_ids,
+        )
+
+        verbose_proxy_logger.debug("Successfully prepared response")
+
+        return response
+
+    except Exception as e:
+        verbose_proxy_logger.exception(f"Error in list_keys: {e}")
+        if isinstance(e, HTTPException):
+            raise ProxyException(
+                message=getattr(e, "detail", f"error({str(e)})"),
+                type=ProxyErrorTypes.internal_server_error,
+                param=getattr(e, "param", "None"),
+                code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR),
+            )
+        elif isinstance(e, ProxyException):
+            raise e
+        raise ProxyException(
+            message="Authentication Error, " + str(e),
+            type=ProxyErrorTypes.internal_server_error,
+            param=getattr(e, "param", "None"),
+            code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+        )
+
+
+async def _list_key_helper(
+    prisma_client: PrismaClient,
+    page: int,
+    size: int,
+    user_id: Optional[str],
+    team_id: Optional[str],
+    organization_id: Optional[str],
+    key_alias: Optional[str],
+    exclude_team_id: Optional[str] = None,
+    return_full_object: bool = False,
+    admin_team_ids: Optional[
+        List[str]
+    ] = None,  # New parameter for teams where user is admin
+) -> KeyListResponseObject:
+    """
+    Helper function to list keys
+    Args:
+        page: int
+        size: int
+        user_id: Optional[str]
+        team_id: Optional[str]
+        key_alias: Optional[str]
+        exclude_team_id: Optional[str] # exclude a specific team_id
+        return_full_object: bool # when true, will return UserAPIKeyAuth objects instead of just the token
+        admin_team_ids: Optional[List[str]] # list of team IDs where the user is an admin
+
+    Returns:
+        KeyListResponseObject
+        {
+            "keys": List[str] or List[UserAPIKeyAuth],  # Updated to reflect possible return types
+            "total_count": int,
+            "current_page": int,
+            "total_pages": int,
+        }
+    """
+
+    # Prepare filter conditions
+    where: Dict[str, Union[str, Dict[str, Any], List[Dict[str, Any]]]] = {}
+    where.update(_get_condition_to_filter_out_ui_session_tokens())
+
+    # Build the OR conditions for user's keys and admin team keys
+    or_conditions: List[Dict[str, Any]] = []
+
+    # Base conditions for user's own keys
+    user_condition: Dict[str, Any] = {}
+    if user_id and isinstance(user_id, str):
+        user_condition["user_id"] = user_id
+    if team_id and isinstance(team_id, str):
+        user_condition["team_id"] = team_id
+    if key_alias and isinstance(key_alias, str):
+        user_condition["key_alias"] = key_alias
+    if exclude_team_id and isinstance(exclude_team_id, str):
+        user_condition["team_id"] = {"not": exclude_team_id}
+    if organization_id and isinstance(organization_id, str):
+        user_condition["organization_id"] = organization_id
+
+    if user_condition:
+        or_conditions.append(user_condition)
+
+    # Add condition for admin team keys if provided
+    if admin_team_ids:
+        or_conditions.append({"team_id": {"in": admin_team_ids}})
+
+    # Combine conditions with OR if we have multiple conditions
+    if len(or_conditions) > 1:
+        where["OR"] = or_conditions
+    elif len(or_conditions) == 1:
+        where.update(or_conditions[0])
+
+    verbose_proxy_logger.debug(f"Filter conditions: {where}")
+
+    # Calculate skip for pagination
+    skip = (page - 1) * size
+
+    verbose_proxy_logger.debug(f"Pagination: skip={skip}, take={size}")
+
+    # Fetch keys with pagination
+    keys = await prisma_client.db.litellm_verificationtoken.find_many(
+        where=where,  # type: ignore
+        skip=skip,  # type: ignore
+        take=size,  # type: ignore
+        order=[
+            {"created_at": "desc"},
+            {"token": "desc"},  # fallback sort
+        ],
+    )
+
+    verbose_proxy_logger.debug(f"Fetched {len(keys)} keys")
+
+    # Get total count of keys
+    total_count = await prisma_client.db.litellm_verificationtoken.count(
+        where=where  # type: ignore
+    )
+
+    verbose_proxy_logger.debug(f"Total count of keys: {total_count}")
+
+    # Calculate total pages
+    total_pages = -(-total_count // size)  # Ceiling division
+
+    # Prepare response
+    key_list: List[Union[str, UserAPIKeyAuth]] = []
+    for key in keys:
+        if return_full_object is True:
+            key_list.append(UserAPIKeyAuth(**key.dict()))  # Return full key object
+        else:
+            _token = key.dict().get("token")
+            key_list.append(_token)  # Return only the token
+
+    return KeyListResponseObject(
+        keys=key_list,
+        total_count=total_count,
+        current_page=page,
+        total_pages=total_pages,
+    )
+
+
+def _get_condition_to_filter_out_ui_session_tokens() -> Dict[str, Any]:
+    """
+    Condition to filter out UI session tokens
+    """
+    return {
+        "OR": [
+            {"team_id": None},  # Include records where team_id is null
+            {
+                "team_id": {"not": UI_SESSION_TOKEN_TEAM_ID}
+            },  # Include records where team_id != UI_SESSION_TOKEN_TEAM_ID
+        ]
+    }
+
+
+@router.post(
+    "/key/block", tags=["key management"], dependencies=[Depends(user_api_key_auth)]
+)
+@management_endpoint_wrapper
+async def block_key(
+    data: BlockKeyRequest,
+    http_request: Request,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+    litellm_changed_by: Optional[str] = Header(
+        None,
+        description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
+    ),
+) -> Optional[LiteLLM_VerificationToken]:
+    """
+    Block an Virtual key from making any requests.
+
+    Parameters:
+    - key: str - The key to block. Can be either the unhashed key (sk-...) or the hashed key value
+
+     Example:
+    ```bash
+    curl --location 'http://0.0.0.0:4000/key/block' \
+    --header 'Authorization: Bearer sk-1234' \
+    --header 'Content-Type: application/json' \
+    --data '{
+        "key": "sk-Fn8Ej39NxjAXrvpUGKghGw"
+    }'
+    ```
+
+    Note: This is an admin-only endpoint. Only proxy admins can block keys.
+    """
+    from litellm.proxy.proxy_server import (
+        create_audit_log_for_update,
+        hash_token,
+        litellm_proxy_admin_name,
+        prisma_client,
+        proxy_logging_obj,
+        user_api_key_cache,
+    )
+
+    if prisma_client is None:
+        raise Exception("{}".format(CommonProxyErrors.db_not_connected_error.value))
+
+    if data.key.startswith("sk-"):
+        hashed_token = hash_token(token=data.key)
+    else:
+        hashed_token = data.key
+
+    if litellm.store_audit_logs is True:
+        # make an audit log for key update
+        record = await prisma_client.db.litellm_verificationtoken.find_unique(
+            where={"token": hashed_token}
+        )
+        if record is None:
+            raise ProxyException(
+                message=f"Key {data.key} not found",
+                type=ProxyErrorTypes.bad_request_error,
+                param="key",
+                code=status.HTTP_404_NOT_FOUND,
+            )
+        asyncio.create_task(
+            create_audit_log_for_update(
+                request_data=LiteLLM_AuditLogs(
+                    id=str(uuid.uuid4()),
+                    updated_at=datetime.now(timezone.utc),
+                    changed_by=litellm_changed_by
+                    or user_api_key_dict.user_id
+                    or litellm_proxy_admin_name,
+                    changed_by_api_key=user_api_key_dict.api_key,
+                    table_name=LitellmTableNames.KEY_TABLE_NAME,
+                    object_id=hashed_token,
+                    action="blocked",
+                    updated_values="{}",
+                    before_value=record.model_dump_json(),
+                )
+            )
+        )
+
+    record = await prisma_client.db.litellm_verificationtoken.update(
+        where={"token": hashed_token}, data={"blocked": True}  # type: ignore
+    )
+
+    ## UPDATE KEY CACHE
+
+    ### get cached object ###
+    key_object = await get_key_object(
+        hashed_token=hashed_token,
+        prisma_client=prisma_client,
+        user_api_key_cache=user_api_key_cache,
+        parent_otel_span=None,
+        proxy_logging_obj=proxy_logging_obj,
+    )
+
+    ### update cached object ###
+    key_object.blocked = True
+
+    ### store cached object ###
+    await _cache_key_object(
+        hashed_token=hashed_token,
+        user_api_key_obj=key_object,
+        user_api_key_cache=user_api_key_cache,
+        proxy_logging_obj=proxy_logging_obj,
+    )
+
+    return record
+
+
+@router.post(
+    "/key/unblock", tags=["key management"], dependencies=[Depends(user_api_key_auth)]
+)
+@management_endpoint_wrapper
+async def unblock_key(
+    data: BlockKeyRequest,
+    http_request: Request,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+    litellm_changed_by: Optional[str] = Header(
+        None,
+        description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
+    ),
+):
+    """
+    Unblock a Virtual key to allow it to make requests again.
+
+    Parameters:
+    - key: str - The key to unblock. Can be either the unhashed key (sk-...) or the hashed key value
+
+    Example:
+    ```bash
+    curl --location 'http://0.0.0.0:4000/key/unblock' \
+    --header 'Authorization: Bearer sk-1234' \
+    --header 'Content-Type: application/json' \
+    --data '{
+        "key": "sk-Fn8Ej39NxjAXrvpUGKghGw"
+    }'
+    ```
+
+    Note: This is an admin-only endpoint. Only proxy admins can unblock keys.
+    """
+    from litellm.proxy.proxy_server import (
+        create_audit_log_for_update,
+        hash_token,
+        litellm_proxy_admin_name,
+        prisma_client,
+        proxy_logging_obj,
+        user_api_key_cache,
+    )
+
+    if prisma_client is None:
+        raise Exception("{}".format(CommonProxyErrors.db_not_connected_error.value))
+
+    if data.key.startswith("sk-"):
+        hashed_token = hash_token(token=data.key)
+    else:
+        hashed_token = data.key
+
+    if litellm.store_audit_logs is True:
+        # make an audit log for key update
+        record = await prisma_client.db.litellm_verificationtoken.find_unique(
+            where={"token": hashed_token}
+        )
+        if record is None:
+            raise ProxyException(
+                message=f"Key {data.key} not found",
+                type=ProxyErrorTypes.bad_request_error,
+                param="key",
+                code=status.HTTP_404_NOT_FOUND,
+            )
+        asyncio.create_task(
+            create_audit_log_for_update(
+                request_data=LiteLLM_AuditLogs(
+                    id=str(uuid.uuid4()),
+                    updated_at=datetime.now(timezone.utc),
+                    changed_by=litellm_changed_by
+                    or user_api_key_dict.user_id
+                    or litellm_proxy_admin_name,
+                    changed_by_api_key=user_api_key_dict.api_key,
+                    table_name=LitellmTableNames.KEY_TABLE_NAME,
+                    object_id=hashed_token,
+                    action="blocked",
+                    updated_values="{}",
+                    before_value=record.model_dump_json(),
+                )
+            )
+        )
+
+    record = await prisma_client.db.litellm_verificationtoken.update(
+        where={"token": hashed_token}, data={"blocked": False}  # type: ignore
+    )
+
+    ## UPDATE KEY CACHE
+
+    ### get cached object ###
+    key_object = await get_key_object(
+        hashed_token=hashed_token,
+        prisma_client=prisma_client,
+        user_api_key_cache=user_api_key_cache,
+        parent_otel_span=None,
+        proxy_logging_obj=proxy_logging_obj,
+    )
+
+    ### update cached object ###
+    key_object.blocked = False
+
+    ### store cached object ###
+    await _cache_key_object(
+        hashed_token=hashed_token,
+        user_api_key_obj=key_object,
+        user_api_key_cache=user_api_key_cache,
+        proxy_logging_obj=proxy_logging_obj,
+    )
+
+    return record
+
+
+@router.post(
+    "/key/health",
+    tags=["key management"],
+    dependencies=[Depends(user_api_key_auth)],
+    response_model=KeyHealthResponse,
+)
+@management_endpoint_wrapper
+async def key_health(
+    request: Request,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """
+    Check the health of the key
+
+    Checks:
+    - If key based logging is configured correctly - sends a test log
+
+    Usage 
+
+    Pass the key in the request header
+
+    ```bash
+    curl -X POST "http://localhost:4000/key/health" \
+     -H "Authorization: Bearer sk-1234" \
+     -H "Content-Type: application/json"
+    ```
+
+    Response when logging callbacks are setup correctly:
+
+    ```json
+    {
+      "key": "healthy",
+      "logging_callbacks": {
+        "callbacks": [
+          "gcs_bucket"
+        ],
+        "status": "healthy",
+        "details": "No logger exceptions triggered, system is healthy. Manually check if logs were sent to ['gcs_bucket']"
+      }
+    }
+    ```
+
+
+    Response when logging callbacks are not setup correctly:
+    ```json
+    {
+      "key": "unhealthy",
+      "logging_callbacks": {
+        "callbacks": [
+          "gcs_bucket"
+        ],
+        "status": "unhealthy",
+        "details": "Logger exceptions triggered, system is unhealthy: Failed to load vertex credentials. Check to see if credentials containing partial/invalid information."
+      }
+    }
+    ```
+    """
+    try:
+        # Get the key's metadata
+        key_metadata = user_api_key_dict.metadata
+
+        health_status: KeyHealthResponse = KeyHealthResponse(
+            key="healthy",
+            logging_callbacks=None,
+        )
+
+        # Check if logging is configured in metadata
+        if key_metadata and "logging" in key_metadata:
+            logging_statuses = await test_key_logging(
+                user_api_key_dict=user_api_key_dict,
+                request=request,
+                key_logging=key_metadata["logging"],
+            )
+            health_status["logging_callbacks"] = logging_statuses
+
+            # Check if any logging callback is unhealthy
+            if logging_statuses.get("status") == "unhealthy":
+                health_status["key"] = "unhealthy"
+
+        return KeyHealthResponse(**health_status)
+
+    except Exception as e:
+        raise ProxyException(
+            message=f"Key health check failed: {str(e)}",
+            type=ProxyErrorTypes.internal_server_error,
+            param=getattr(e, "param", "None"),
+            code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+        )
+
+
+def _can_user_query_key_info(
+    user_api_key_dict: UserAPIKeyAuth,
+    key: Optional[str],
+    key_info: LiteLLM_VerificationToken,
+) -> bool:
+    """
+    Helper to check if the user has access to the key's info
+    """
+    if (
+        user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value
+        or user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY.value
+    ):
+        return True
+    elif user_api_key_dict.api_key == key:
+        return True
+    # user can query their own key info
+    elif key_info.user_id == user_api_key_dict.user_id:
+        return True
+    return False
+
+
+async def test_key_logging(
+    user_api_key_dict: UserAPIKeyAuth,
+    request: Request,
+    key_logging: List[Dict[str, Any]],
+) -> LoggingCallbackStatus:
+    """
+    Test the key-based logging
+
+    - Test that key logging is correctly formatted and all args are passed correctly
+    - Make a mock completion call -> user can check if it's correctly logged
+    - Check if any logger.exceptions were triggered -> if they were then returns it to the user client side
+    """
+    import logging
+    from io import StringIO
+
+    from litellm.proxy.litellm_pre_call_utils import add_litellm_data_to_request
+    from litellm.proxy.proxy_server import general_settings, proxy_config
+
+    logging_callbacks: List[str] = []
+    for callback in key_logging:
+        if callback.get("callback_name") is not None:
+            logging_callbacks.append(callback["callback_name"])
+        else:
+            raise ValueError("callback_name is required in key_logging")
+
+    log_capture_string = StringIO()
+    ch = logging.StreamHandler(log_capture_string)
+    ch.setLevel(logging.ERROR)
+    logger = logging.getLogger()
+    logger.addHandler(ch)
+
+    try:
+        data = {
+            "model": "openai/litellm-key-health-test",
+            "messages": [
+                {
+                    "role": "user",
+                    "content": "Hello, this is a test from litellm /key/health. No LLM API call was made for this",
+                }
+            ],
+            "mock_response": "test response",
+        }
+        data = await add_litellm_data_to_request(
+            data=data,
+            user_api_key_dict=user_api_key_dict,
+            proxy_config=proxy_config,
+            general_settings=general_settings,
+            request=request,
+        )
+        await litellm.acompletion(
+            **data
+        )  # make mock completion call to trigger key based callbacks
+    except Exception as e:
+        return LoggingCallbackStatus(
+            callbacks=logging_callbacks,
+            status="unhealthy",
+            details=f"Logging test failed: {str(e)}",
+        )
+
+    await asyncio.sleep(
+        2
+    )  # wait for callbacks to run, callbacks use batching so wait for the flush event
+
+    # Check if any logger exceptions were triggered
+    log_contents = log_capture_string.getvalue()
+    logger.removeHandler(ch)
+    if log_contents:
+        return LoggingCallbackStatus(
+            callbacks=logging_callbacks,
+            status="unhealthy",
+            details=f"Logger exceptions triggered, system is unhealthy: {log_contents}",
+        )
+    else:
+        return LoggingCallbackStatus(
+            callbacks=logging_callbacks,
+            status="healthy",
+            details=f"No logger exceptions triggered, system is healthy. Manually check if logs were sent to {logging_callbacks} ",
+        )
+
+
+async def _enforce_unique_key_alias(
+    key_alias: Optional[str],
+    prisma_client: Any,
+    existing_key_token: Optional[str] = None,
+) -> None:
+    """
+    Helper to enforce unique key aliases across all keys.
+
+    Args:
+        key_alias (Optional[str]): The key alias to check
+        prisma_client (Any): Prisma client instance
+        existing_key_token (Optional[str]): ID of existing key being updated, to exclude from uniqueness check
+            (The Admin UI passes key_alias, in all Edit key requests. So we need to be sure that if we find a key with the same alias, it's not the same key we're updating)
+
+    Raises:
+        ProxyException: If key alias already exists on a different key
+    """
+    if key_alias is not None and prisma_client is not None:
+        where_clause: dict[str, Any] = {"key_alias": key_alias}
+        if existing_key_token:
+            # Exclude the current key from the uniqueness check
+            where_clause["NOT"] = {"token": existing_key_token}
+
+        existing_key = await prisma_client.db.litellm_verificationtoken.find_first(
+            where=where_clause
+        )
+        if existing_key is not None:
+            raise ProxyException(
+                message=f"Key with alias '{key_alias}' already exists. Unique key aliases across all keys are required.",
+                type=ProxyErrorTypes.bad_request_error,
+                param="key_alias",
+                code=status.HTTP_400_BAD_REQUEST,
+            )
+
+
+def validate_model_max_budget(model_max_budget: Optional[Dict]) -> None:
+    """
+    Validate the model_max_budget is GenericBudgetConfigType + enforce user has an enterprise license
+
+    Raises:
+        Exception: If model_max_budget is not a valid GenericBudgetConfigType
+    """
+    try:
+        if model_max_budget is None:
+            return
+        if len(model_max_budget) == 0:
+            return
+        if model_max_budget is not None:
+            from litellm.proxy.proxy_server import CommonProxyErrors, premium_user
+
+            if premium_user is not True:
+                raise ValueError(
+                    f"You must have an enterprise license to set model_max_budget. {CommonProxyErrors.not_premium_user.value}"
+                )
+            for _model, _budget_info in model_max_budget.items():
+                assert isinstance(_model, str)
+
+                # /CRUD endpoints can pass budget_limit as a string, so we need to convert it to a float
+                if "budget_limit" in _budget_info:
+                    _budget_info["budget_limit"] = float(_budget_info["budget_limit"])
+                BudgetConfig(**_budget_info)
+    except Exception as e:
+        raise ValueError(
+            f"Invalid model_max_budget: {str(e)}. Example of valid model_max_budget: https://docs.litellm.ai/docs/proxy/users"
+        )