aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py2621
1 files changed, 2621 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py b/.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py
new file mode 100644
index 00000000..9141d9d1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/proxy/management_endpoints/key_management_endpoints.py
@@ -0,0 +1,2621 @@
+"""
+KEY MANAGEMENT
+
+All /key management endpoints
+
+/key/generate
+/key/info
+/key/update
+/key/delete
+"""
+
+import asyncio
+import copy
+import json
+import secrets
+import traceback
+import uuid
+from datetime import datetime, timedelta, timezone
+from typing import List, Literal, Optional, Tuple, cast
+
+import fastapi
+from fastapi import APIRouter, Depends, Header, HTTPException, Query, Request, status
+
+import litellm
+from litellm._logging import verbose_proxy_logger
+from litellm.caching import DualCache
+from litellm.constants import UI_SESSION_TOKEN_TEAM_ID
+from litellm.litellm_core_utils.duration_parser import duration_in_seconds
+from litellm.proxy._types import *
+from litellm.proxy.auth.auth_checks import (
+ _cache_key_object,
+ _delete_cache_key_object,
+ get_key_object,
+ get_team_object,
+)
+from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
+from litellm.proxy.hooks.key_management_event_hooks import KeyManagementEventHooks
+from litellm.proxy.management_endpoints.common_utils import (
+ _is_user_team_admin,
+ _set_object_metadata_field,
+)
+from litellm.proxy.management_endpoints.model_management_endpoints import (
+ _add_model_to_db,
+)
+from litellm.proxy.management_helpers.utils import management_endpoint_wrapper
+from litellm.proxy.spend_tracking.spend_tracking_utils import _is_master_key
+from litellm.proxy.utils import (
+ PrismaClient,
+ _hash_token_if_needed,
+ handle_exception_on_proxy,
+ jsonify_object,
+)
+from litellm.router import Router
+from litellm.secret_managers.main import get_secret
+from litellm.types.router import Deployment
+from litellm.types.utils import (
+ BudgetConfig,
+ PersonalUIKeyGenerationConfig,
+ TeamUIKeyGenerationConfig,
+)
+
+
+def _is_team_key(data: Union[GenerateKeyRequest, LiteLLM_VerificationToken]):
+ return data.team_id is not None
+
+
+def _get_user_in_team(
+ team_table: LiteLLM_TeamTableCachedObj, user_id: Optional[str]
+) -> Optional[Member]:
+ if user_id is None:
+ return None
+ for member in team_table.members_with_roles:
+ if member.user_id is not None and member.user_id == user_id:
+ return member
+
+ return None
+
+
+def _is_allowed_to_make_key_request(
+ user_api_key_dict: UserAPIKeyAuth, user_id: Optional[str], team_id: Optional[str]
+) -> bool:
+ """
+ Assert user only creates keys for themselves
+
+ Relevant issue: https://github.com/BerriAI/litellm/issues/7336
+ """
+ ## BASE CASE - PROXY ADMIN
+ if (
+ user_api_key_dict.user_role is not None
+ and user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value
+ ):
+ return True
+
+ if user_id is not None:
+ assert (
+ user_id == user_api_key_dict.user_id
+ ), "User can only create keys for themselves. Got user_id={}, Your ID={}".format(
+ user_id, user_api_key_dict.user_id
+ )
+
+ if team_id is not None:
+ if (
+ user_api_key_dict.team_id is not None
+ and user_api_key_dict.team_id == UI_TEAM_ID
+ ):
+ return True # handle https://github.com/BerriAI/litellm/issues/7482
+ assert (
+ user_api_key_dict.team_id == team_id
+ ), "User can only create keys for their own team. Got={}, Your Team ID={}".format(
+ team_id, user_api_key_dict.team_id
+ )
+
+ return True
+
+
+def _team_key_generation_team_member_check(
+ assigned_user_id: Optional[str],
+ team_table: LiteLLM_TeamTableCachedObj,
+ user_api_key_dict: UserAPIKeyAuth,
+ team_key_generation: TeamUIKeyGenerationConfig,
+):
+ if assigned_user_id is not None:
+ key_assigned_user_in_team = _get_user_in_team(
+ team_table=team_table, user_id=assigned_user_id
+ )
+
+ if key_assigned_user_in_team is None:
+ raise HTTPException(
+ status_code=400,
+ detail=f"User={assigned_user_id} not assigned to team={team_table.team_id}",
+ )
+
+ key_creating_user_in_team = _get_user_in_team(
+ team_table=team_table, user_id=user_api_key_dict.user_id
+ )
+
+ is_admin = (
+ user_api_key_dict.user_role is not None
+ and user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value
+ )
+
+ if is_admin:
+ return True
+ elif key_creating_user_in_team is None:
+ raise HTTPException(
+ status_code=400,
+ detail=f"User={user_api_key_dict.user_id} not assigned to team={team_table.team_id}",
+ )
+ elif (
+ "allowed_team_member_roles" in team_key_generation
+ and key_creating_user_in_team.role
+ not in team_key_generation["allowed_team_member_roles"]
+ ):
+ raise HTTPException(
+ status_code=400,
+ detail=f"Team member role {key_creating_user_in_team.role} not in allowed_team_member_roles={team_key_generation['allowed_team_member_roles']}",
+ )
+ return True
+
+
+def _key_generation_required_param_check(
+ data: GenerateKeyRequest, required_params: Optional[List[str]]
+):
+ if required_params is None:
+ return True
+
+ data_dict = data.model_dump(exclude_unset=True)
+ for param in required_params:
+ if param not in data_dict:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Required param {param} not in data",
+ )
+ return True
+
+
+def _team_key_generation_check(
+ team_table: LiteLLM_TeamTableCachedObj,
+ user_api_key_dict: UserAPIKeyAuth,
+ data: GenerateKeyRequest,
+):
+ if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value:
+ return True
+ if (
+ litellm.key_generation_settings is not None
+ and "team_key_generation" in litellm.key_generation_settings
+ ):
+ _team_key_generation = litellm.key_generation_settings["team_key_generation"]
+ else:
+ _team_key_generation = TeamUIKeyGenerationConfig(
+ allowed_team_member_roles=["admin", "user"],
+ )
+
+ _team_key_generation_team_member_check(
+ assigned_user_id=data.user_id,
+ team_table=team_table,
+ user_api_key_dict=user_api_key_dict,
+ team_key_generation=_team_key_generation,
+ )
+ _key_generation_required_param_check(
+ data,
+ _team_key_generation.get("required_params"),
+ )
+
+ return True
+
+
+def _personal_key_membership_check(
+ user_api_key_dict: UserAPIKeyAuth,
+ personal_key_generation: Optional[PersonalUIKeyGenerationConfig],
+):
+ if (
+ personal_key_generation is None
+ or "allowed_user_roles" not in personal_key_generation
+ ):
+ return True
+
+ if user_api_key_dict.user_role not in personal_key_generation["allowed_user_roles"]:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Personal key creation has been restricted by admin. Allowed roles={litellm.key_generation_settings['personal_key_generation']['allowed_user_roles']}. Your role={user_api_key_dict.user_role}", # type: ignore
+ )
+
+ return True
+
+
+def _personal_key_generation_check(
+ user_api_key_dict: UserAPIKeyAuth, data: GenerateKeyRequest
+):
+
+ if (
+ litellm.key_generation_settings is None
+ or litellm.key_generation_settings.get("personal_key_generation") is None
+ ):
+ return True
+
+ _personal_key_generation = litellm.key_generation_settings["personal_key_generation"] # type: ignore
+
+ _personal_key_membership_check(
+ user_api_key_dict,
+ personal_key_generation=_personal_key_generation,
+ )
+
+ _key_generation_required_param_check(
+ data,
+ _personal_key_generation.get("required_params"),
+ )
+
+ return True
+
+
+def key_generation_check(
+ team_table: Optional[LiteLLM_TeamTableCachedObj],
+ user_api_key_dict: UserAPIKeyAuth,
+ data: GenerateKeyRequest,
+) -> bool:
+ """
+ Check if admin has restricted key creation to certain roles for teams or individuals
+ """
+
+ ## check if key is for team or individual
+ is_team_key = _is_team_key(data=data)
+ if is_team_key:
+ if team_table is None and litellm.key_generation_settings is not None:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Unable to find team object in database. Team ID: {data.team_id}",
+ )
+ elif team_table is None:
+ return True # assume user is assigning team_id without using the team table
+ return _team_key_generation_check(
+ team_table=team_table,
+ user_api_key_dict=user_api_key_dict,
+ data=data,
+ )
+ else:
+ return _personal_key_generation_check(
+ user_api_key_dict=user_api_key_dict, data=data
+ )
+
+
+def common_key_access_checks(
+ user_api_key_dict: UserAPIKeyAuth,
+ data: Union[GenerateKeyRequest, UpdateKeyRequest],
+ llm_router: Optional[Router],
+ premium_user: bool,
+) -> Literal[True]:
+ """
+ Check if user is allowed to make a key request, for this key
+ """
+ try:
+ _is_allowed_to_make_key_request(
+ user_api_key_dict=user_api_key_dict,
+ user_id=data.user_id,
+ team_id=data.team_id,
+ )
+ except AssertionError as e:
+ raise HTTPException(
+ status_code=403,
+ detail=str(e),
+ )
+ except Exception as e:
+ raise HTTPException(
+ status_code=500,
+ detail=str(e),
+ )
+
+ _check_model_access_group(
+ models=data.models,
+ llm_router=llm_router,
+ premium_user=premium_user,
+ )
+ return True
+
+
+router = APIRouter()
+
+
+@router.post(
+ "/key/generate",
+ tags=["key management"],
+ dependencies=[Depends(user_api_key_auth)],
+ response_model=GenerateKeyResponse,
+)
+@management_endpoint_wrapper
+async def generate_key_fn( # noqa: PLR0915
+ data: GenerateKeyRequest,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+ litellm_changed_by: Optional[str] = Header(
+ None,
+ description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
+ ),
+):
+ """
+ Generate an API key based on the provided data.
+
+ Docs: https://docs.litellm.ai/docs/proxy/virtual_keys
+
+ Parameters:
+ - duration: Optional[str] - Specify the length of time the token is valid for. You can set duration as seconds ("30s"), minutes ("30m"), hours ("30h"), days ("30d").
+ - key_alias: Optional[str] - User defined key alias
+ - key: Optional[str] - User defined key value. If not set, a 16-digit unique sk-key is created for you.
+ - team_id: Optional[str] - The team id of the key
+ - user_id: Optional[str] - The user id of the key
+ - budget_id: Optional[str] - The budget id associated with the key. Created by calling `/budget/new`.
+ - models: Optional[list] - Model_name's a user is allowed to call. (if empty, key is allowed to call all models)
+ - aliases: Optional[dict] - Any alias mappings, on top of anything in the config.yaml model list. - https://docs.litellm.ai/docs/proxy/virtual_keys#managing-auth---upgradedowngrade-models
+ - config: Optional[dict] - any key-specific configs, overrides config in config.yaml
+ - spend: Optional[int] - Amount spent by key. Default is 0. Will be updated by proxy whenever key is used. https://docs.litellm.ai/docs/proxy/virtual_keys#managing-auth---tracking-spend
+ - send_invite_email: Optional[bool] - Whether to send an invite email to the user_id, with the generate key
+ - max_budget: Optional[float] - Specify max budget for a given key.
+ - budget_duration: Optional[str] - Budget is reset at the end of specified duration. If not set, budget is never reset. You can set duration as seconds ("30s"), minutes ("30m"), hours ("30h"), days ("30d").
+ - max_parallel_requests: Optional[int] - Rate limit a user based on the number of parallel requests. Raises 429 error, if user's parallel requests > x.
+ - metadata: Optional[dict] - Metadata for key, store information for key. Example metadata = {"team": "core-infra", "app": "app2", "email": "ishaan@berri.ai" }
+ - guardrails: Optional[List[str]] - List of active guardrails for the key
+ - permissions: Optional[dict] - key-specific permissions. Currently just used for turning off pii masking (if connected). Example - {"pii": false}
+ - model_max_budget: Optional[Dict[str, BudgetConfig]] - Model-specific budgets {"gpt-4": {"budget_limit": 0.0005, "time_period": "30d"}}}. IF null or {} then no model specific budget.
+ - model_rpm_limit: Optional[dict] - key-specific model rpm limit. Example - {"text-davinci-002": 1000, "gpt-3.5-turbo": 1000}. IF null or {} then no model specific rpm limit.
+ - model_tpm_limit: Optional[dict] - key-specific model tpm limit. Example - {"text-davinci-002": 1000, "gpt-3.5-turbo": 1000}. IF null or {} then no model specific tpm limit.
+ - allowed_cache_controls: Optional[list] - List of allowed cache control values. Example - ["no-cache", "no-store"]. See all values - https://docs.litellm.ai/docs/proxy/caching#turn-on--off-caching-per-request
+ - blocked: Optional[bool] - Whether the key is blocked.
+ - rpm_limit: Optional[int] - Specify rpm limit for a given key (Requests per minute)
+ - tpm_limit: Optional[int] - Specify tpm limit for a given key (Tokens per minute)
+ - soft_budget: Optional[float] - Specify soft budget for a given key. Will trigger a slack alert when this soft budget is reached.
+ - tags: Optional[List[str]] - Tags for [tracking spend](https://litellm.vercel.app/docs/proxy/enterprise#tracking-spend-for-custom-tags) and/or doing [tag-based routing](https://litellm.vercel.app/docs/proxy/tag_routing).
+ - enforced_params: Optional[List[str]] - List of enforced params for the key (Enterprise only). [Docs](https://docs.litellm.ai/docs/proxy/enterprise#enforce-required-params-for-llm-requests)
+
+ Examples:
+
+ 1. Allow users to turn on/off pii masking
+
+ ```bash
+ curl --location 'http://0.0.0.0:4000/key/generate' \
+ --header 'Authorization: Bearer sk-1234' \
+ --header 'Content-Type: application/json' \
+ --data '{
+ "permissions": {"allow_pii_controls": true}
+ }'
+ ```
+
+ Returns:
+ - key: (str) The generated api key
+ - expires: (datetime) Datetime object for when key expires.
+ - user_id: (str) Unique user id - used for tracking spend across multiple keys for same user id.
+ """
+ try:
+ from litellm.proxy.proxy_server import (
+ litellm_proxy_admin_name,
+ llm_router,
+ premium_user,
+ prisma_client,
+ user_api_key_cache,
+ user_custom_key_generate,
+ )
+
+ verbose_proxy_logger.debug("entered /key/generate")
+
+ if user_custom_key_generate is not None:
+ if asyncio.iscoroutinefunction(user_custom_key_generate):
+ result = await user_custom_key_generate(data) # type: ignore
+ else:
+ raise ValueError("user_custom_key_generate must be a coroutine")
+ decision = result.get("decision", True)
+ message = result.get("message", "Authentication Failed - Custom Auth Rule")
+ if not decision:
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN, detail=message
+ )
+ team_table: Optional[LiteLLM_TeamTableCachedObj] = None
+ if data.team_id is not None:
+ try:
+ team_table = await get_team_object(
+ team_id=data.team_id,
+ prisma_client=prisma_client,
+ user_api_key_cache=user_api_key_cache,
+ parent_otel_span=user_api_key_dict.parent_otel_span,
+ check_db_only=True,
+ )
+ except Exception as e:
+ verbose_proxy_logger.debug(
+ f"Error getting team object in `/key/generate`: {e}"
+ )
+ team_table = None
+
+ key_generation_check(
+ team_table=team_table,
+ user_api_key_dict=user_api_key_dict,
+ data=data,
+ )
+
+ common_key_access_checks(
+ user_api_key_dict=user_api_key_dict,
+ data=data,
+ llm_router=llm_router,
+ premium_user=premium_user,
+ )
+
+ # check if user set default key/generate params on config.yaml
+ if litellm.default_key_generate_params is not None:
+ for elem in data:
+ key, value = elem
+ if value is None and key in [
+ "max_budget",
+ "user_id",
+ "team_id",
+ "max_parallel_requests",
+ "tpm_limit",
+ "rpm_limit",
+ "budget_duration",
+ ]:
+ setattr(
+ data, key, litellm.default_key_generate_params.get(key, None)
+ )
+ elif key == "models" and value == []:
+ setattr(data, key, litellm.default_key_generate_params.get(key, []))
+ elif key == "metadata" and value == {}:
+ setattr(data, key, litellm.default_key_generate_params.get(key, {}))
+
+ # check if user set default key/generate params on config.yaml
+ if litellm.upperbound_key_generate_params is not None:
+ for elem in data:
+ key, value = elem
+ upperbound_value = getattr(
+ litellm.upperbound_key_generate_params, key, None
+ )
+ if upperbound_value is not None:
+ if value is None:
+ # Use the upperbound value if user didn't provide a value
+ setattr(data, key, upperbound_value)
+ else:
+ # Compare with upperbound for numeric fields
+ if key in [
+ "max_budget",
+ "max_parallel_requests",
+ "tpm_limit",
+ "rpm_limit",
+ ]:
+ if value > upperbound_value:
+ raise HTTPException(
+ status_code=400,
+ detail={
+ "error": f"{key} is over max limit set in config - user_value={value}; max_value={upperbound_value}"
+ },
+ )
+ # Compare durations
+ elif key in ["budget_duration", "duration"]:
+ upperbound_duration = duration_in_seconds(
+ duration=upperbound_value
+ )
+ user_duration = duration_in_seconds(duration=value)
+ if user_duration > upperbound_duration:
+ raise HTTPException(
+ status_code=400,
+ detail={
+ "error": f"{key} is over max limit set in config - user_value={value}; max_value={upperbound_value}"
+ },
+ )
+
+ # TODO: @ishaan-jaff: Migrate all budget tracking to use LiteLLM_BudgetTable
+ _budget_id = data.budget_id
+ if prisma_client is not None and data.soft_budget is not None:
+ # create the Budget Row for the LiteLLM Verification Token
+ budget_row = LiteLLM_BudgetTable(
+ soft_budget=data.soft_budget,
+ model_max_budget=data.model_max_budget or {},
+ )
+ new_budget = prisma_client.jsonify_object(
+ budget_row.json(exclude_none=True)
+ )
+
+ _budget = await prisma_client.db.litellm_budgettable.create(
+ data={
+ **new_budget, # type: ignore
+ "created_by": user_api_key_dict.user_id or litellm_proxy_admin_name,
+ "updated_by": user_api_key_dict.user_id or litellm_proxy_admin_name,
+ }
+ )
+ _budget_id = getattr(_budget, "budget_id", None)
+
+ # ADD METADATA FIELDS
+ # Set Management Endpoint Metadata Fields
+ for field in LiteLLM_ManagementEndpoint_MetadataFields_Premium:
+ if getattr(data, field) is not None:
+ _set_object_metadata_field(
+ object_data=data,
+ field_name=field,
+ value=getattr(data, field),
+ )
+
+ data_json = data.model_dump(exclude_unset=True, exclude_none=True) # type: ignore
+
+ # if we get max_budget passed to /key/generate, then use it as key_max_budget. Since generate_key_helper_fn is used to make new users
+ if "max_budget" in data_json:
+ data_json["key_max_budget"] = data_json.pop("max_budget", None)
+ if _budget_id is not None:
+ data_json["budget_id"] = _budget_id
+
+ if "budget_duration" in data_json:
+ data_json["key_budget_duration"] = data_json.pop("budget_duration", None)
+
+ if user_api_key_dict.user_id is not None:
+ data_json["created_by"] = user_api_key_dict.user_id
+ data_json["updated_by"] = user_api_key_dict.user_id
+
+ # Set tags on the new key
+ if "tags" in data_json:
+ from litellm.proxy.proxy_server import premium_user
+
+ if premium_user is not True and data_json["tags"] is not None:
+ raise ValueError(
+ f"Only premium users can add tags to keys. {CommonProxyErrors.not_premium_user.value}"
+ )
+
+ _metadata = data_json.get("metadata")
+ if not _metadata:
+ data_json["metadata"] = {"tags": data_json["tags"]}
+ else:
+ data_json["metadata"]["tags"] = data_json["tags"]
+
+ data_json.pop("tags")
+
+ await _enforce_unique_key_alias(
+ key_alias=data_json.get("key_alias", None),
+ prisma_client=prisma_client,
+ )
+
+ response = await generate_key_helper_fn(
+ request_type="key", **data_json, table_name="key"
+ )
+
+ response["soft_budget"] = (
+ data.soft_budget
+ ) # include the user-input soft budget in the response
+
+ response = GenerateKeyResponse(**response)
+
+ asyncio.create_task(
+ KeyManagementEventHooks.async_key_generated_hook(
+ data=data,
+ response=response,
+ user_api_key_dict=user_api_key_dict,
+ litellm_changed_by=litellm_changed_by,
+ )
+ )
+
+ return response
+ except Exception as e:
+ verbose_proxy_logger.exception(
+ "litellm.proxy.proxy_server.generate_key_fn(): Exception occured - {}".format(
+ str(e)
+ )
+ )
+ raise handle_exception_on_proxy(e)
+
+
+def prepare_metadata_fields(
+ data: BaseModel, non_default_values: dict, existing_metadata: dict
+) -> dict:
+ """
+ Check LiteLLM_ManagementEndpoint_MetadataFields (proxy/_types.py) for fields that are allowed to be updated
+ """
+ if "metadata" not in non_default_values: # allow user to set metadata to none
+ non_default_values["metadata"] = existing_metadata.copy()
+
+ casted_metadata = cast(dict, non_default_values["metadata"])
+
+ data_json = data.model_dump(exclude_unset=True, exclude_none=True)
+
+ try:
+ for k, v in data_json.items():
+ if k in LiteLLM_ManagementEndpoint_MetadataFields:
+ if isinstance(v, datetime):
+ casted_metadata[k] = v.isoformat()
+ else:
+ casted_metadata[k] = v
+
+ except Exception as e:
+ verbose_proxy_logger.exception(
+ "litellm.proxy.proxy_server.prepare_metadata_fields(): Exception occured - {}".format(
+ str(e)
+ )
+ )
+
+ non_default_values["metadata"] = casted_metadata
+ return non_default_values
+
+
+def prepare_key_update_data(
+ data: Union[UpdateKeyRequest, RegenerateKeyRequest], existing_key_row
+):
+ data_json: dict = data.model_dump(exclude_unset=True)
+ data_json.pop("key", None)
+ non_default_values = {}
+ for k, v in data_json.items():
+ if k in LiteLLM_ManagementEndpoint_MetadataFields:
+ continue
+ non_default_values[k] = v
+
+ if "duration" in non_default_values:
+ duration = non_default_values.pop("duration")
+ if duration and (isinstance(duration, str)) and len(duration) > 0:
+ duration_s = duration_in_seconds(duration=duration)
+ expires = datetime.now(timezone.utc) + timedelta(seconds=duration_s)
+ non_default_values["expires"] = expires
+
+ if "budget_duration" in non_default_values:
+ budget_duration = non_default_values.pop("budget_duration")
+ if (
+ budget_duration
+ and (isinstance(budget_duration, str))
+ and len(budget_duration) > 0
+ ):
+ duration_s = duration_in_seconds(duration=budget_duration)
+ key_reset_at = datetime.now(timezone.utc) + timedelta(seconds=duration_s)
+ non_default_values["budget_reset_at"] = key_reset_at
+ non_default_values["budget_duration"] = budget_duration
+
+ _metadata = existing_key_row.metadata or {}
+
+ # validate model_max_budget
+ if "model_max_budget" in non_default_values:
+ validate_model_max_budget(non_default_values["model_max_budget"])
+
+ non_default_values = prepare_metadata_fields(
+ data=data, non_default_values=non_default_values, existing_metadata=_metadata
+ )
+
+ return non_default_values
+
+
+@router.post(
+ "/key/update", tags=["key management"], dependencies=[Depends(user_api_key_auth)]
+)
+@management_endpoint_wrapper
+async def update_key_fn(
+ request: Request,
+ data: UpdateKeyRequest,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+ litellm_changed_by: Optional[str] = Header(
+ None,
+ description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
+ ),
+):
+ """
+ Update an existing API key's parameters.
+
+ Parameters:
+ - key: str - The key to update
+ - key_alias: Optional[str] - User-friendly key alias
+ - user_id: Optional[str] - User ID associated with key
+ - team_id: Optional[str] - Team ID associated with key
+ - budget_id: Optional[str] - The budget id associated with the key. Created by calling `/budget/new`.
+ - models: Optional[list] - Model_name's a user is allowed to call
+ - tags: Optional[List[str]] - Tags for organizing keys (Enterprise only)
+ - enforced_params: Optional[List[str]] - List of enforced params for the key (Enterprise only). [Docs](https://docs.litellm.ai/docs/proxy/enterprise#enforce-required-params-for-llm-requests)
+ - spend: Optional[float] - Amount spent by key
+ - max_budget: Optional[float] - Max budget for key
+ - model_max_budget: Optional[Dict[str, BudgetConfig]] - Model-specific budgets {"gpt-4": {"budget_limit": 0.0005, "time_period": "30d"}}
+ - budget_duration: Optional[str] - Budget reset period ("30d", "1h", etc.)
+ - soft_budget: Optional[float] - [TODO] Soft budget limit (warning vs. hard stop). Will trigger a slack alert when this soft budget is reached.
+ - max_parallel_requests: Optional[int] - Rate limit for parallel requests
+ - metadata: Optional[dict] - Metadata for key. Example {"team": "core-infra", "app": "app2"}
+ - tpm_limit: Optional[int] - Tokens per minute limit
+ - rpm_limit: Optional[int] - Requests per minute limit
+ - model_rpm_limit: Optional[dict] - Model-specific RPM limits {"gpt-4": 100, "claude-v1": 200}
+ - model_tpm_limit: Optional[dict] - Model-specific TPM limits {"gpt-4": 100000, "claude-v1": 200000}
+ - allowed_cache_controls: Optional[list] - List of allowed cache control values
+ - duration: Optional[str] - Key validity duration ("30d", "1h", etc.)
+ - permissions: Optional[dict] - Key-specific permissions
+ - send_invite_email: Optional[bool] - Send invite email to user_id
+ - guardrails: Optional[List[str]] - List of active guardrails for the key
+ - blocked: Optional[bool] - Whether the key is blocked
+ - aliases: Optional[dict] - Model aliases for the key - [Docs](https://litellm.vercel.app/docs/proxy/virtual_keys#model-aliases)
+ - config: Optional[dict] - [DEPRECATED PARAM] Key-specific config.
+ - temp_budget_increase: Optional[float] - Temporary budget increase for the key (Enterprise only).
+ - temp_budget_expiry: Optional[str] - Expiry time for the temporary budget increase (Enterprise only).
+
+ Example:
+ ```bash
+ curl --location 'http://0.0.0.0:4000/key/update' \
+ --header 'Authorization: Bearer sk-1234' \
+ --header 'Content-Type: application/json' \
+ --data '{
+ "key": "sk-1234",
+ "key_alias": "my-key",
+ "user_id": "user-1234",
+ "team_id": "team-1234",
+ "max_budget": 100,
+ "metadata": {"any_key": "any-val"},
+ }'
+ ```
+ """
+ from litellm.proxy.proxy_server import (
+ llm_router,
+ premium_user,
+ prisma_client,
+ proxy_logging_obj,
+ user_api_key_cache,
+ )
+
+ try:
+ data_json: dict = data.model_dump(exclude_unset=True, exclude_none=True)
+ key = data_json.pop("key")
+
+ # get the row from db
+ if prisma_client is None:
+ raise Exception("Not connected to DB!")
+
+ common_key_access_checks(
+ user_api_key_dict=user_api_key_dict,
+ data=data,
+ llm_router=llm_router,
+ premium_user=premium_user,
+ )
+
+ existing_key_row = await prisma_client.get_data(
+ token=data.key, table_name="key", query_type="find_unique"
+ )
+
+ if existing_key_row is None:
+ raise HTTPException(
+ status_code=404,
+ detail={"error": f"Team not found, passed team_id={data.team_id}"},
+ )
+
+ non_default_values = prepare_key_update_data(
+ data=data, existing_key_row=existing_key_row
+ )
+
+ await _enforce_unique_key_alias(
+ key_alias=non_default_values.get("key_alias", None),
+ prisma_client=prisma_client,
+ existing_key_token=existing_key_row.token,
+ )
+
+ _data = {**non_default_values, "token": key}
+ response = await prisma_client.update_data(token=key, data=_data)
+
+ # Delete - key from cache, since it's been updated!
+ # key updated - a new model could have been added to this key. it should not block requests after this is done
+ await _delete_cache_key_object(
+ hashed_token=hash_token(key),
+ user_api_key_cache=user_api_key_cache,
+ proxy_logging_obj=proxy_logging_obj,
+ )
+
+ asyncio.create_task(
+ KeyManagementEventHooks.async_key_updated_hook(
+ data=data,
+ existing_key_row=existing_key_row,
+ response=response,
+ user_api_key_dict=user_api_key_dict,
+ litellm_changed_by=litellm_changed_by,
+ )
+ )
+
+ if response is None:
+ raise ValueError("Failed to update key got response = None")
+
+ return {"key": key, **response["data"]}
+ # update based on remaining passed in values
+ except Exception as e:
+ verbose_proxy_logger.exception(
+ "litellm.proxy.proxy_server.update_key_fn(): Exception occured - {}".format(
+ str(e)
+ )
+ )
+ if isinstance(e, HTTPException):
+ raise ProxyException(
+ message=getattr(e, "detail", f"Authentication Error({str(e)})"),
+ type=ProxyErrorTypes.auth_error,
+ param=getattr(e, "param", "None"),
+ code=getattr(e, "status_code", status.HTTP_400_BAD_REQUEST),
+ )
+ elif isinstance(e, ProxyException):
+ raise e
+ raise ProxyException(
+ message="Authentication Error, " + str(e),
+ type=ProxyErrorTypes.auth_error,
+ param=getattr(e, "param", "None"),
+ code=status.HTTP_400_BAD_REQUEST,
+ )
+
+
+@router.post(
+ "/key/delete", tags=["key management"], dependencies=[Depends(user_api_key_auth)]
+)
+@management_endpoint_wrapper
+async def delete_key_fn(
+ data: KeyRequest,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+ litellm_changed_by: Optional[str] = Header(
+ None,
+ description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
+ ),
+):
+ """
+ Delete a key from the key management system.
+
+ Parameters::
+ - keys (List[str]): A list of keys or hashed keys to delete. Example {"keys": ["sk-QWrxEynunsNpV1zT48HIrw", "837e17519f44683334df5291321d97b8bf1098cd490e49e215f6fea935aa28be"]}
+ - key_aliases (List[str]): A list of key aliases to delete. Can be passed instead of `keys`.Example {"key_aliases": ["alias1", "alias2"]}
+
+ Returns:
+ - deleted_keys (List[str]): A list of deleted keys. Example {"deleted_keys": ["sk-QWrxEynunsNpV1zT48HIrw", "837e17519f44683334df5291321d97b8bf1098cd490e49e215f6fea935aa28be"]}
+
+ Example:
+ ```bash
+ curl --location 'http://0.0.0.0:4000/key/delete' \
+ --header 'Authorization: Bearer sk-1234' \
+ --header 'Content-Type: application/json' \
+ --data '{
+ "keys": ["sk-QWrxEynunsNpV1zT48HIrw"]
+ }'
+ ```
+
+ Raises:
+ HTTPException: If an error occurs during key deletion.
+ """
+ try:
+ from litellm.proxy.proxy_server import prisma_client, user_api_key_cache
+
+ if prisma_client is None:
+ raise Exception("Not connected to DB!")
+
+ ## only allow user to delete keys they own
+ verbose_proxy_logger.debug(
+ f"user_api_key_dict.user_role: {user_api_key_dict.user_role}"
+ )
+
+ num_keys_to_be_deleted = 0
+ deleted_keys = []
+ if data.keys:
+ number_deleted_keys, _keys_being_deleted = await delete_verification_tokens(
+ tokens=data.keys,
+ user_api_key_cache=user_api_key_cache,
+ user_api_key_dict=user_api_key_dict,
+ )
+ num_keys_to_be_deleted = len(data.keys)
+ deleted_keys = data.keys
+ elif data.key_aliases:
+ number_deleted_keys, _keys_being_deleted = await delete_key_aliases(
+ key_aliases=data.key_aliases,
+ prisma_client=prisma_client,
+ user_api_key_cache=user_api_key_cache,
+ user_api_key_dict=user_api_key_dict,
+ )
+ num_keys_to_be_deleted = len(data.key_aliases)
+ deleted_keys = data.key_aliases
+ else:
+ raise ValueError("Invalid request type")
+
+ if number_deleted_keys is None:
+ raise ProxyException(
+ message="Failed to delete keys got None response from delete_verification_token",
+ type=ProxyErrorTypes.internal_server_error,
+ param="keys",
+ code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ )
+ verbose_proxy_logger.debug(f"/key/delete - deleted_keys={number_deleted_keys}")
+
+ try:
+ assert num_keys_to_be_deleted == len(deleted_keys)
+ except Exception:
+ raise HTTPException(
+ status_code=400,
+ detail={
+ "error": f"Not all keys passed in were deleted. This probably means you don't have access to delete all the keys passed in. Keys passed in={num_keys_to_be_deleted}, Deleted keys ={number_deleted_keys}"
+ },
+ )
+
+ verbose_proxy_logger.debug(
+ f"/keys/delete - cache after delete: {user_api_key_cache.in_memory_cache.cache_dict}"
+ )
+
+ asyncio.create_task(
+ KeyManagementEventHooks.async_key_deleted_hook(
+ data=data,
+ keys_being_deleted=_keys_being_deleted,
+ user_api_key_dict=user_api_key_dict,
+ litellm_changed_by=litellm_changed_by,
+ response=number_deleted_keys,
+ )
+ )
+
+ return {"deleted_keys": deleted_keys}
+ except Exception as e:
+ verbose_proxy_logger.exception(
+ "litellm.proxy.proxy_server.delete_key_fn(): Exception occured - {}".format(
+ str(e)
+ )
+ )
+ raise handle_exception_on_proxy(e)
+
+
+@router.post(
+ "/v2/key/info",
+ tags=["key management"],
+ dependencies=[Depends(user_api_key_auth)],
+ include_in_schema=False,
+)
+async def info_key_fn_v2(
+ data: Optional[KeyRequest] = None,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """
+ Retrieve information about a list of keys.
+
+ **New endpoint**. Currently admin only.
+ Parameters:
+ keys: Optional[list] = body parameter representing the key(s) in the request
+ user_api_key_dict: UserAPIKeyAuth = Dependency representing the user's API key
+ Returns:
+ Dict containing the key and its associated information
+
+ Example Curl:
+ ```
+ curl -X GET "http://0.0.0.0:4000/key/info" \
+ -H "Authorization: Bearer sk-1234" \
+ -d {"keys": ["sk-1", "sk-2", "sk-3"]}
+ ```
+ """
+ from litellm.proxy.proxy_server import prisma_client
+
+ try:
+ if prisma_client is None:
+ raise Exception(
+ "Database not connected. Connect a database to your proxy - https://docs.litellm.ai/docs/simple_proxy#managing-auth---virtual-keys"
+ )
+ if data is None:
+ raise HTTPException(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ detail={"message": "Malformed request. No keys passed in."},
+ )
+
+ key_info = await prisma_client.get_data(
+ token=data.keys, table_name="key", query_type="find_all"
+ )
+ if key_info is None:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail={"message": "No keys found"},
+ )
+ filtered_key_info = []
+ for k in key_info:
+ try:
+ k = k.model_dump() # noqa
+ except Exception:
+ # if using pydantic v1
+ k = k.dict()
+ filtered_key_info.append(k)
+ return {"key": data.keys, "info": filtered_key_info}
+
+ except Exception as e:
+ raise handle_exception_on_proxy(e)
+
+
+@router.get(
+ "/key/info", tags=["key management"], dependencies=[Depends(user_api_key_auth)]
+)
+async def info_key_fn(
+ key: Optional[str] = fastapi.Query(
+ default=None, description="Key in the request parameters"
+ ),
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """
+ Retrieve information about a key.
+ Parameters:
+ key: Optional[str] = Query parameter representing the key in the request
+ user_api_key_dict: UserAPIKeyAuth = Dependency representing the user's API key
+ Returns:
+ Dict containing the key and its associated information
+
+ Example Curl:
+ ```
+ curl -X GET "http://0.0.0.0:4000/key/info?key=sk-02Wr4IAlN3NvPXvL5JVvDA" \
+-H "Authorization: Bearer sk-1234"
+ ```
+
+ Example Curl - if no key is passed, it will use the Key Passed in Authorization Header
+ ```
+ curl -X GET "http://0.0.0.0:4000/key/info" \
+-H "Authorization: Bearer sk-02Wr4IAlN3NvPXvL5JVvDA"
+ ```
+ """
+ from litellm.proxy.proxy_server import prisma_client
+
+ try:
+ if prisma_client is None:
+ raise Exception(
+ "Database not connected. Connect a database to your proxy - https://docs.litellm.ai/docs/simple_proxy#managing-auth---virtual-keys"
+ )
+
+ # default to using Auth token if no key is passed in
+ key = key or user_api_key_dict.api_key
+ hashed_key: Optional[str] = key
+ if key is not None:
+ hashed_key = _hash_token_if_needed(token=key)
+ key_info = await prisma_client.db.litellm_verificationtoken.find_unique(
+ where={"token": hashed_key}, # type: ignore
+ include={"litellm_budget_table": True},
+ )
+ if key_info is None:
+ raise ProxyException(
+ message="Key not found in database",
+ type=ProxyErrorTypes.not_found_error,
+ param="key",
+ code=status.HTTP_404_NOT_FOUND,
+ )
+
+ if (
+ _can_user_query_key_info(
+ user_api_key_dict=user_api_key_dict,
+ key=key,
+ key_info=key_info,
+ )
+ is not True
+ ):
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail="You are not allowed to access this key's info. Your role={}".format(
+ user_api_key_dict.user_role
+ ),
+ )
+ ## REMOVE HASHED TOKEN INFO BEFORE RETURNING ##
+ try:
+ key_info = key_info.model_dump() # noqa
+ except Exception:
+ # if using pydantic v1
+ key_info = key_info.dict()
+ key_info.pop("token")
+ return {"key": key, "info": key_info}
+ except Exception as e:
+ raise handle_exception_on_proxy(e)
+
+
+def _check_model_access_group(
+ models: Optional[List[str]], llm_router: Optional[Router], premium_user: bool
+) -> Literal[True]:
+ """
+ if is_model_access_group is True + is_wildcard_route is True, check if user is a premium user
+
+ Return True if user is a premium user, False otherwise
+ """
+ if models is None or llm_router is None:
+ return True
+
+ for model in models:
+ if llm_router._is_model_access_group_for_wildcard_route(
+ model_access_group=model
+ ):
+ if not premium_user:
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail={
+ "error": "Setting a model access group on a wildcard model is only available for LiteLLM Enterprise users.{}".format(
+ CommonProxyErrors.not_premium_user.value
+ )
+ },
+ )
+
+ return True
+
+
+async def generate_key_helper_fn( # noqa: PLR0915
+ request_type: Literal[
+ "user", "key"
+ ], # identifies if this request is from /user/new or /key/generate
+ duration: Optional[str] = None,
+ models: list = [],
+ aliases: dict = {},
+ config: dict = {},
+ spend: float = 0.0,
+ key_max_budget: Optional[float] = None, # key_max_budget is used to Budget Per key
+ key_budget_duration: Optional[str] = None,
+ budget_id: Optional[float] = None, # budget id <-> LiteLLM_BudgetTable
+ soft_budget: Optional[
+ float
+ ] = None, # soft_budget is used to set soft Budgets Per user
+ max_budget: Optional[float] = None, # max_budget is used to Budget Per user
+ blocked: Optional[bool] = None,
+ budget_duration: Optional[str] = None, # max_budget is used to Budget Per user
+ token: Optional[str] = None,
+ key: Optional[
+ str
+ ] = None, # dev-friendly alt param for 'token'. Exposed on `/key/generate` for setting key value yourself.
+ user_id: Optional[str] = None,
+ user_alias: Optional[str] = None,
+ team_id: Optional[str] = None,
+ user_email: Optional[str] = None,
+ user_role: Optional[str] = None,
+ max_parallel_requests: Optional[int] = None,
+ metadata: Optional[dict] = {},
+ tpm_limit: Optional[int] = None,
+ rpm_limit: Optional[int] = None,
+ query_type: Literal["insert_data", "update_data"] = "insert_data",
+ update_key_values: Optional[dict] = None,
+ key_alias: Optional[str] = None,
+ allowed_cache_controls: Optional[list] = [],
+ permissions: Optional[dict] = {},
+ model_max_budget: Optional[dict] = {},
+ model_rpm_limit: Optional[dict] = None,
+ model_tpm_limit: Optional[dict] = None,
+ guardrails: Optional[list] = None,
+ teams: Optional[list] = None,
+ organization_id: Optional[str] = None,
+ table_name: Optional[Literal["key", "user"]] = None,
+ send_invite_email: Optional[bool] = None,
+ created_by: Optional[str] = None,
+ updated_by: Optional[str] = None,
+):
+ from litellm.proxy.proxy_server import (
+ litellm_proxy_budget_name,
+ premium_user,
+ prisma_client,
+ )
+
+ if prisma_client is None:
+ raise Exception(
+ "Connect Proxy to database to generate keys - https://docs.litellm.ai/docs/proxy/virtual_keys "
+ )
+
+ if token is None:
+ if key is not None:
+ token = key
+ else:
+ token = f"sk-{secrets.token_urlsafe(16)}"
+
+ if duration is None: # allow tokens that never expire
+ expires = None
+ else:
+ duration_s = duration_in_seconds(duration=duration)
+ expires = datetime.now(timezone.utc) + timedelta(seconds=duration_s)
+
+ if key_budget_duration is None: # one-time budget
+ key_reset_at = None
+ else:
+ duration_s = duration_in_seconds(duration=key_budget_duration)
+ key_reset_at = datetime.now(timezone.utc) + timedelta(seconds=duration_s)
+
+ if budget_duration is None: # one-time budget
+ reset_at = None
+ else:
+ duration_s = duration_in_seconds(duration=budget_duration)
+ reset_at = datetime.now(timezone.utc) + timedelta(seconds=duration_s)
+
+ aliases_json = json.dumps(aliases)
+ config_json = json.dumps(config)
+ permissions_json = json.dumps(permissions)
+
+ # Add model_rpm_limit and model_tpm_limit to metadata
+ if model_rpm_limit is not None:
+ metadata = metadata or {}
+ metadata["model_rpm_limit"] = model_rpm_limit
+ if model_tpm_limit is not None:
+ metadata = metadata or {}
+ metadata["model_tpm_limit"] = model_tpm_limit
+ if guardrails is not None:
+ metadata = metadata or {}
+ metadata["guardrails"] = guardrails
+
+ metadata_json = json.dumps(metadata)
+ validate_model_max_budget(model_max_budget)
+ model_max_budget_json = json.dumps(model_max_budget)
+ user_role = user_role
+ tpm_limit = tpm_limit
+ rpm_limit = rpm_limit
+ allowed_cache_controls = allowed_cache_controls
+
+ try:
+ # Create a new verification token (you may want to enhance this logic based on your needs)
+ user_data = {
+ "max_budget": max_budget,
+ "user_email": user_email,
+ "user_id": user_id,
+ "user_alias": user_alias,
+ "team_id": team_id,
+ "organization_id": organization_id,
+ "user_role": user_role,
+ "spend": spend,
+ "models": models,
+ "metadata": metadata_json,
+ "max_parallel_requests": max_parallel_requests,
+ "tpm_limit": tpm_limit,
+ "rpm_limit": rpm_limit,
+ "budget_duration": budget_duration,
+ "budget_reset_at": reset_at,
+ "allowed_cache_controls": allowed_cache_controls,
+ }
+ if teams is not None:
+ user_data["teams"] = teams
+ key_data = {
+ "token": token,
+ "key_alias": key_alias,
+ "expires": expires,
+ "models": models,
+ "aliases": aliases_json,
+ "config": config_json,
+ "spend": spend,
+ "max_budget": key_max_budget,
+ "user_id": user_id,
+ "team_id": team_id,
+ "max_parallel_requests": max_parallel_requests,
+ "metadata": metadata_json,
+ "tpm_limit": tpm_limit,
+ "rpm_limit": rpm_limit,
+ "budget_duration": key_budget_duration,
+ "budget_reset_at": key_reset_at,
+ "allowed_cache_controls": allowed_cache_controls,
+ "permissions": permissions_json,
+ "model_max_budget": model_max_budget_json,
+ "budget_id": budget_id,
+ "blocked": blocked,
+ "created_by": created_by,
+ "updated_by": updated_by,
+ }
+
+ if (
+ get_secret("DISABLE_KEY_NAME", False) is True
+ ): # allow user to disable storing abbreviated key name (shown in UI, to help figure out which key spent how much)
+ pass
+ else:
+ key_data["key_name"] = f"sk-...{token[-4:]}"
+ saved_token = copy.deepcopy(key_data)
+ if isinstance(saved_token["aliases"], str):
+ saved_token["aliases"] = json.loads(saved_token["aliases"])
+ if isinstance(saved_token["config"], str):
+ saved_token["config"] = json.loads(saved_token["config"])
+ if isinstance(saved_token["metadata"], str):
+ saved_token["metadata"] = json.loads(saved_token["metadata"])
+ if isinstance(saved_token["permissions"], str):
+ if (
+ "get_spend_routes" in saved_token["permissions"]
+ and premium_user is not True
+ ):
+ raise ValueError(
+ "get_spend_routes permission is only available for LiteLLM Enterprise users"
+ )
+
+ saved_token["permissions"] = json.loads(saved_token["permissions"])
+ if isinstance(saved_token["model_max_budget"], str):
+ saved_token["model_max_budget"] = json.loads(
+ saved_token["model_max_budget"]
+ )
+
+ if saved_token.get("expires", None) is not None and isinstance(
+ saved_token["expires"], datetime
+ ):
+ saved_token["expires"] = saved_token["expires"].isoformat()
+ if prisma_client is not None:
+ if (
+ table_name is None or table_name == "user"
+ ): # do not auto-create users for `/key/generate`
+ ## CREATE USER (If necessary)
+ if query_type == "insert_data":
+ user_row = await prisma_client.insert_data(
+ data=user_data, table_name="user"
+ )
+
+ if user_row is None:
+ raise Exception("Failed to create user")
+ ## use default user model list if no key-specific model list provided
+ if len(user_row.models) > 0 and len(key_data["models"]) == 0: # type: ignore
+ key_data["models"] = user_row.models # type: ignore
+ elif query_type == "update_data":
+ user_row = await prisma_client.update_data(
+ data=user_data,
+ table_name="user",
+ update_key_values=update_key_values,
+ )
+ if user_id == litellm_proxy_budget_name or (
+ table_name is not None and table_name == "user"
+ ):
+ # do not create a key for litellm_proxy_budget_name or if table name is set to just 'user'
+ # we only need to ensure this exists in the user table
+ # the LiteLLM_VerificationToken table will increase in size if we don't do this check
+ return user_data
+
+ ## CREATE KEY
+ verbose_proxy_logger.debug("prisma_client: Creating Key= %s", key_data)
+ create_key_response = await prisma_client.insert_data(
+ data=key_data, table_name="key"
+ )
+ key_data["token_id"] = getattr(create_key_response, "token", None)
+ key_data["litellm_budget_table"] = getattr(
+ create_key_response, "litellm_budget_table", None
+ )
+ except Exception as e:
+ verbose_proxy_logger.error(
+ "litellm.proxy.proxy_server.generate_key_helper_fn(): Exception occured - {}".format(
+ str(e)
+ )
+ )
+ verbose_proxy_logger.debug(traceback.format_exc())
+ if isinstance(e, HTTPException):
+ raise e
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail={"error": "Internal Server Error."},
+ )
+
+ # Add budget related info in key_data - this ensures it's returned
+ key_data["budget_id"] = budget_id
+
+ if request_type == "user":
+ # if this is a /user/new request update the key_date with user_data fields
+ key_data.update(user_data)
+ return key_data
+
+
+async def _team_key_deletion_check(
+ user_api_key_dict: UserAPIKeyAuth,
+ key_info: LiteLLM_VerificationToken,
+ prisma_client: PrismaClient,
+ user_api_key_cache: DualCache,
+):
+ is_team_key = _is_team_key(data=key_info)
+
+ if is_team_key and key_info.team_id is not None:
+ team_table = await get_team_object(
+ team_id=key_info.team_id,
+ prisma_client=prisma_client,
+ user_api_key_cache=user_api_key_cache,
+ check_db_only=True,
+ )
+ if (
+ litellm.key_generation_settings is not None
+ and "team_key_generation" in litellm.key_generation_settings
+ ):
+ _team_key_generation = litellm.key_generation_settings[
+ "team_key_generation"
+ ]
+ else:
+ _team_key_generation = TeamUIKeyGenerationConfig(
+ allowed_team_member_roles=["admin", "user"],
+ )
+ # check if user is team admin
+ if team_table is not None:
+ return _team_key_generation_team_member_check(
+ assigned_user_id=user_api_key_dict.user_id,
+ team_table=team_table,
+ user_api_key_dict=user_api_key_dict,
+ team_key_generation=_team_key_generation,
+ )
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail={
+ "error": f"Team not found in db, and user not proxy admin. Team id = {key_info.team_id}"
+ },
+ )
+ return False
+
+
+async def can_delete_verification_token(
+ key_info: LiteLLM_VerificationToken,
+ user_api_key_cache: DualCache,
+ user_api_key_dict: UserAPIKeyAuth,
+ prisma_client: PrismaClient,
+) -> bool:
+ """
+ - check if user is proxy admin
+ - check if user is team admin and key is a team key
+ - check if key is personal key
+ """
+ is_team_key = _is_team_key(data=key_info)
+ if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value:
+ return True
+ elif is_team_key and key_info.team_id is not None:
+ return await _team_key_deletion_check(
+ user_api_key_dict=user_api_key_dict,
+ key_info=key_info,
+ prisma_client=prisma_client,
+ user_api_key_cache=user_api_key_cache,
+ )
+ elif key_info.user_id is not None and key_info.user_id == user_api_key_dict.user_id:
+ return True
+ else:
+ return False
+
+
+async def delete_verification_tokens(
+ tokens: List,
+ user_api_key_cache: DualCache,
+ user_api_key_dict: UserAPIKeyAuth,
+) -> Tuple[Optional[Dict], List[LiteLLM_VerificationToken]]:
+ """
+ Helper that deletes the list of tokens from the database
+
+ - check if user is proxy admin
+ - check if user is team admin and key is a team key
+
+ Args:
+ tokens: List of tokens to delete
+ user_id: Optional user_id to filter by
+
+ Returns:
+ Tuple[Optional[Dict], List[LiteLLM_VerificationToken]]:
+ Optional[Dict]:
+ - Number of deleted tokens
+ List[LiteLLM_VerificationToken]:
+ - List of keys being deleted, this contains information about the key_alias, token, and user_id being deleted,
+ this is passed down to the KeyManagementEventHooks to delete the keys from the secret manager and handle audit logs
+ """
+ from litellm.proxy.proxy_server import prisma_client
+
+ try:
+ if prisma_client:
+ tokens = [_hash_token_if_needed(token=key) for key in tokens]
+ _keys_being_deleted: List[LiteLLM_VerificationToken] = (
+ await prisma_client.db.litellm_verificationtoken.find_many(
+ where={"token": {"in": tokens}}
+ )
+ )
+
+ # Assuming 'db' is your Prisma Client instance
+ # check if admin making request - don't filter by user-id
+ if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value:
+ deleted_tokens = await prisma_client.delete_data(tokens=tokens)
+ # else
+ else:
+ tasks = []
+ deleted_tokens = []
+ for key in _keys_being_deleted:
+
+ async def _delete_key(key: LiteLLM_VerificationToken):
+ if await can_delete_verification_token(
+ key_info=key,
+ user_api_key_cache=user_api_key_cache,
+ user_api_key_dict=user_api_key_dict,
+ prisma_client=prisma_client,
+ ):
+ await prisma_client.delete_data(tokens=[key.token])
+ deleted_tokens.append(key.token)
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail={
+ "error": "You are not authorized to delete this key"
+ },
+ )
+
+ tasks.append(_delete_key(key))
+ await asyncio.gather(*tasks)
+
+ _num_deleted_tokens = len(deleted_tokens)
+ if _num_deleted_tokens != len(tokens):
+ failed_tokens = [
+ token for token in tokens if token not in deleted_tokens
+ ]
+ raise Exception(
+ "Failed to delete all tokens. Failed to delete tokens: "
+ + str(failed_tokens)
+ )
+ else:
+ raise Exception("DB not connected. prisma_client is None")
+ except Exception as e:
+ verbose_proxy_logger.exception(
+ "litellm.proxy.proxy_server.delete_verification_tokens(): Exception occured - {}".format(
+ str(e)
+ )
+ )
+ verbose_proxy_logger.debug(traceback.format_exc())
+ raise e
+
+ for key in tokens:
+ user_api_key_cache.delete_cache(key)
+ # remove hash token from cache
+ hashed_token = hash_token(cast(str, key))
+ user_api_key_cache.delete_cache(hashed_token)
+
+ return {"deleted_keys": deleted_tokens}, _keys_being_deleted
+
+
+async def delete_key_aliases(
+ key_aliases: List[str],
+ user_api_key_cache: DualCache,
+ prisma_client: PrismaClient,
+ user_api_key_dict: UserAPIKeyAuth,
+) -> Tuple[Optional[Dict], List[LiteLLM_VerificationToken]]:
+ _keys_being_deleted = await prisma_client.db.litellm_verificationtoken.find_many(
+ where={"key_alias": {"in": key_aliases}}
+ )
+
+ tokens = [key.token for key in _keys_being_deleted]
+ return await delete_verification_tokens(
+ tokens=tokens,
+ user_api_key_cache=user_api_key_cache,
+ user_api_key_dict=user_api_key_dict,
+ )
+
+
+async def _rotate_master_key(
+ prisma_client: PrismaClient,
+ user_api_key_dict: UserAPIKeyAuth,
+ current_master_key: str,
+ new_master_key: str,
+) -> None:
+ """
+ Rotate the master key
+
+ 1. Get the values from the DB
+ - Get models from DB
+ - Get config from DB
+ 2. Decrypt the values
+ - ModelTable
+ - [{"model_name": "str", "litellm_params": {}}]
+ - ConfigTable
+ 3. Encrypt the values with the new master key
+ 4. Update the values in the DB
+ """
+ from litellm.proxy.proxy_server import proxy_config
+
+ try:
+ models: Optional[List] = (
+ await prisma_client.db.litellm_proxymodeltable.find_many()
+ )
+ except Exception:
+ models = None
+ # 2. process model table
+ if models:
+ decrypted_models = proxy_config.decrypt_model_list_from_db(new_models=models)
+ verbose_proxy_logger.info(
+ "ABLE TO DECRYPT MODELS - len(decrypted_models): %s", len(decrypted_models)
+ )
+ new_models = []
+ for model in decrypted_models:
+ new_model = await _add_model_to_db(
+ model_params=Deployment(**model),
+ user_api_key_dict=user_api_key_dict,
+ prisma_client=prisma_client,
+ new_encryption_key=new_master_key,
+ should_create_model_in_db=False,
+ )
+ if new_model:
+ new_models.append(jsonify_object(new_model.model_dump()))
+ verbose_proxy_logger.info("Resetting proxy model table")
+ await prisma_client.db.litellm_proxymodeltable.delete_many()
+ verbose_proxy_logger.info("Creating %s models", len(new_models))
+ await prisma_client.db.litellm_proxymodeltable.create_many(
+ data=new_models,
+ )
+ # 3. process config table
+ try:
+ config = await prisma_client.db.litellm_config.find_many()
+ except Exception:
+ config = None
+
+ if config:
+ """If environment_variables is found, decrypt it and encrypt it with the new master key"""
+ environment_variables_dict = {}
+ for c in config:
+ if c.param_name == "environment_variables":
+ environment_variables_dict = c.param_value
+
+ if environment_variables_dict:
+ decrypted_env_vars = proxy_config._decrypt_and_set_db_env_variables(
+ environment_variables=environment_variables_dict
+ )
+ encrypted_env_vars = proxy_config._encrypt_env_variables(
+ environment_variables=decrypted_env_vars,
+ new_encryption_key=new_master_key,
+ )
+
+ if encrypted_env_vars:
+ await prisma_client.db.litellm_config.update(
+ where={"param_name": "environment_variables"},
+ data={"param_value": jsonify_object(encrypted_env_vars)},
+ )
+
+
+@router.post(
+ "/key/{key:path}/regenerate",
+ tags=["key management"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+@router.post(
+ "/key/regenerate",
+ tags=["key management"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+@management_endpoint_wrapper
+async def regenerate_key_fn(
+ key: Optional[str] = None,
+ data: Optional[RegenerateKeyRequest] = None,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+ litellm_changed_by: Optional[str] = Header(
+ None,
+ description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
+ ),
+) -> Optional[GenerateKeyResponse]:
+ """
+ Regenerate an existing API key while optionally updating its parameters.
+
+ Parameters:
+ - key: str (path parameter) - The key to regenerate
+ - data: Optional[RegenerateKeyRequest] - Request body containing optional parameters to update
+ - key_alias: Optional[str] - User-friendly key alias
+ - user_id: Optional[str] - User ID associated with key
+ - team_id: Optional[str] - Team ID associated with key
+ - models: Optional[list] - Model_name's a user is allowed to call
+ - tags: Optional[List[str]] - Tags for organizing keys (Enterprise only)
+ - spend: Optional[float] - Amount spent by key
+ - max_budget: Optional[float] - Max budget for key
+ - model_max_budget: Optional[Dict[str, BudgetConfig]] - Model-specific budgets {"gpt-4": {"budget_limit": 0.0005, "time_period": "30d"}}
+ - budget_duration: Optional[str] - Budget reset period ("30d", "1h", etc.)
+ - soft_budget: Optional[float] - Soft budget limit (warning vs. hard stop). Will trigger a slack alert when this soft budget is reached.
+ - max_parallel_requests: Optional[int] - Rate limit for parallel requests
+ - metadata: Optional[dict] - Metadata for key. Example {"team": "core-infra", "app": "app2"}
+ - tpm_limit: Optional[int] - Tokens per minute limit
+ - rpm_limit: Optional[int] - Requests per minute limit
+ - model_rpm_limit: Optional[dict] - Model-specific RPM limits {"gpt-4": 100, "claude-v1": 200}
+ - model_tpm_limit: Optional[dict] - Model-specific TPM limits {"gpt-4": 100000, "claude-v1": 200000}
+ - allowed_cache_controls: Optional[list] - List of allowed cache control values
+ - duration: Optional[str] - Key validity duration ("30d", "1h", etc.)
+ - permissions: Optional[dict] - Key-specific permissions
+ - guardrails: Optional[List[str]] - List of active guardrails for the key
+ - blocked: Optional[bool] - Whether the key is blocked
+
+
+ Returns:
+ - GenerateKeyResponse containing the new key and its updated parameters
+
+ Example:
+ ```bash
+ curl --location --request POST 'http://localhost:4000/key/sk-1234/regenerate' \
+ --header 'Authorization: Bearer sk-1234' \
+ --header 'Content-Type: application/json' \
+ --data-raw '{
+ "max_budget": 100,
+ "metadata": {"team": "core-infra"},
+ "models": ["gpt-4", "gpt-3.5-turbo"]
+ }'
+ ```
+
+ Note: This is an Enterprise feature. It requires a premium license to use.
+ """
+ try:
+
+ from litellm.proxy.proxy_server import (
+ hash_token,
+ master_key,
+ premium_user,
+ prisma_client,
+ proxy_logging_obj,
+ user_api_key_cache,
+ )
+
+ if premium_user is not True:
+ raise ValueError(
+ f"Regenerating Virtual Keys is an Enterprise feature, {CommonProxyErrors.not_premium_user.value}"
+ )
+
+ # Check if key exists, raise exception if key is not in the DB
+ key = data.key if data and data.key else key
+ if not key:
+ raise HTTPException(status_code=400, detail={"error": "No key passed in."})
+ ### 1. Create New copy that is duplicate of existing key
+ ######################################################################
+
+ # create duplicate of existing key
+ # set token = new token generated
+ # insert new token in DB
+
+ # create hash of token
+ if prisma_client is None:
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail={"error": "DB not connected. prisma_client is None"},
+ )
+
+ _is_master_key_valid = _is_master_key(api_key=key, _master_key=master_key)
+
+ if master_key is not None and data and _is_master_key_valid:
+ if data.new_master_key is None:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail={"error": "New master key is required."},
+ )
+ await _rotate_master_key(
+ prisma_client=prisma_client,
+ user_api_key_dict=user_api_key_dict,
+ current_master_key=master_key,
+ new_master_key=data.new_master_key,
+ )
+ return GenerateKeyResponse(
+ key=data.new_master_key,
+ token=data.new_master_key,
+ key_name=data.new_master_key,
+ expires=None,
+ )
+
+ if "sk" not in key:
+ hashed_api_key = key
+ else:
+ hashed_api_key = hash_token(key)
+
+ _key_in_db = await prisma_client.db.litellm_verificationtoken.find_unique(
+ where={"token": hashed_api_key},
+ )
+ if _key_in_db is None:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail={"error": f"Key {key} not found."},
+ )
+
+ verbose_proxy_logger.debug("key_in_db: %s", _key_in_db)
+
+ new_token = f"sk-{secrets.token_urlsafe(16)}"
+ new_token_hash = hash_token(new_token)
+ new_token_key_name = f"sk-...{new_token[-4:]}"
+
+ # Prepare the update data
+ update_data = {
+ "token": new_token_hash,
+ "key_name": new_token_key_name,
+ }
+
+ non_default_values = {}
+ if data is not None:
+ # Update with any provided parameters from GenerateKeyRequest
+ non_default_values = prepare_key_update_data(
+ data=data, existing_key_row=_key_in_db
+ )
+ verbose_proxy_logger.debug("non_default_values: %s", non_default_values)
+
+ update_data.update(non_default_values)
+ update_data = prisma_client.jsonify_object(data=update_data)
+ # Update the token in the database
+ updated_token = await prisma_client.db.litellm_verificationtoken.update(
+ where={"token": hashed_api_key},
+ data=update_data, # type: ignore
+ )
+
+ updated_token_dict = {}
+ if updated_token is not None:
+ updated_token_dict = dict(updated_token)
+
+ updated_token_dict["key"] = new_token
+ updated_token_dict["token_id"] = updated_token_dict.pop("token")
+
+ ### 3. remove existing key entry from cache
+ ######################################################################
+ if key:
+ await _delete_cache_key_object(
+ hashed_token=hash_token(key),
+ user_api_key_cache=user_api_key_cache,
+ proxy_logging_obj=proxy_logging_obj,
+ )
+
+ if hashed_api_key:
+ await _delete_cache_key_object(
+ hashed_token=hash_token(key),
+ user_api_key_cache=user_api_key_cache,
+ proxy_logging_obj=proxy_logging_obj,
+ )
+
+ response = GenerateKeyResponse(
+ **updated_token_dict,
+ )
+
+ asyncio.create_task(
+ KeyManagementEventHooks.async_key_rotated_hook(
+ data=data,
+ existing_key_row=_key_in_db,
+ response=response,
+ user_api_key_dict=user_api_key_dict,
+ litellm_changed_by=litellm_changed_by,
+ )
+ )
+
+ return response
+ except Exception as e:
+ verbose_proxy_logger.exception("Error regenerating key: %s", e)
+ raise handle_exception_on_proxy(e)
+
+
+async def validate_key_list_check(
+ user_api_key_dict: UserAPIKeyAuth,
+ user_id: Optional[str],
+ team_id: Optional[str],
+ organization_id: Optional[str],
+ key_alias: Optional[str],
+ prisma_client: PrismaClient,
+) -> Optional[LiteLLM_UserTable]:
+
+ if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value:
+ return None
+
+ if user_api_key_dict.user_id is None:
+ raise ProxyException(
+ message="You are not authorized to access this endpoint. No 'user_id' is associated with your API key.",
+ type=ProxyErrorTypes.bad_request_error,
+ param="user_id",
+ code=status.HTTP_403_FORBIDDEN,
+ )
+ complete_user_info_db_obj: Optional[BaseModel] = (
+ await prisma_client.db.litellm_usertable.find_unique(
+ where={"user_id": user_api_key_dict.user_id},
+ include={"organization_memberships": True},
+ )
+ )
+
+ if complete_user_info_db_obj is None:
+ raise ProxyException(
+ message="You are not authorized to access this endpoint. No 'user_id' is associated with your API key.",
+ type=ProxyErrorTypes.bad_request_error,
+ param="user_id",
+ code=status.HTTP_403_FORBIDDEN,
+ )
+
+ complete_user_info = LiteLLM_UserTable(**complete_user_info_db_obj.model_dump())
+
+ # internal user can only see their own keys
+ if user_id:
+ if complete_user_info.user_id != user_id:
+ raise ProxyException(
+ message="You are not authorized to check another user's keys",
+ type=ProxyErrorTypes.bad_request_error,
+ param="user_id",
+ code=status.HTTP_403_FORBIDDEN,
+ )
+
+ if team_id:
+ if team_id not in complete_user_info.teams:
+ raise ProxyException(
+ message="You are not authorized to check this team's keys",
+ type=ProxyErrorTypes.bad_request_error,
+ param="team_id",
+ code=status.HTTP_403_FORBIDDEN,
+ )
+
+ if organization_id:
+ if (
+ complete_user_info.organization_memberships is None
+ or organization_id
+ not in [
+ membership.organization_id
+ for membership in complete_user_info.organization_memberships
+ ]
+ ):
+ raise ProxyException(
+ message="You are not authorized to check this organization's keys",
+ type=ProxyErrorTypes.bad_request_error,
+ param="organization_id",
+ code=status.HTTP_403_FORBIDDEN,
+ )
+ return complete_user_info
+
+
+async def get_admin_team_ids(
+ complete_user_info: Optional[LiteLLM_UserTable],
+ user_api_key_dict: UserAPIKeyAuth,
+ prisma_client: PrismaClient,
+) -> List[str]:
+ """
+ Get all team IDs where the user is an admin.
+ """
+ if complete_user_info is None:
+ return []
+ # Get all teams that user is an admin of
+ teams: Optional[List[BaseModel]] = (
+ await prisma_client.db.litellm_teamtable.find_many(
+ where={"team_id": {"in": complete_user_info.teams}}
+ )
+ )
+ if teams is None:
+ return []
+
+ teams_pydantic_obj = [LiteLLM_TeamTable(**team.model_dump()) for team in teams]
+
+ admin_team_ids = [
+ team.team_id
+ for team in teams_pydantic_obj
+ if _is_user_team_admin(user_api_key_dict=user_api_key_dict, team_obj=team)
+ ]
+ return admin_team_ids
+
+
+@router.get(
+ "/key/list",
+ tags=["key management"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+@management_endpoint_wrapper
+async def list_keys(
+ request: Request,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+ page: int = Query(1, description="Page number", ge=1),
+ size: int = Query(10, description="Page size", ge=1, le=100),
+ user_id: Optional[str] = Query(None, description="Filter keys by user ID"),
+ team_id: Optional[str] = Query(None, description="Filter keys by team ID"),
+ organization_id: Optional[str] = Query(
+ None, description="Filter keys by organization ID"
+ ),
+ key_alias: Optional[str] = Query(None, description="Filter keys by key alias"),
+ return_full_object: bool = Query(False, description="Return full key object"),
+ include_team_keys: bool = Query(
+ False, description="Include all keys for teams that user is an admin of."
+ ),
+) -> KeyListResponseObject:
+ """
+ List all keys for a given user / team / organization.
+
+ Returns:
+ {
+ "keys": List[str] or List[UserAPIKeyAuth],
+ "total_count": int,
+ "current_page": int,
+ "total_pages": int,
+ }
+ """
+ try:
+ from litellm.proxy.proxy_server import prisma_client
+
+ verbose_proxy_logger.debug("Entering list_keys function")
+
+ if prisma_client is None:
+ verbose_proxy_logger.error("Database not connected")
+ raise Exception("Database not connected")
+
+ complete_user_info = await validate_key_list_check(
+ user_api_key_dict=user_api_key_dict,
+ user_id=user_id,
+ team_id=team_id,
+ organization_id=organization_id,
+ key_alias=key_alias,
+ prisma_client=prisma_client,
+ )
+
+ if include_team_keys:
+ admin_team_ids = await get_admin_team_ids(
+ complete_user_info=complete_user_info,
+ user_api_key_dict=user_api_key_dict,
+ prisma_client=prisma_client,
+ )
+ else:
+ admin_team_ids = None
+
+ if user_id is None and user_api_key_dict.user_role not in [
+ LitellmUserRoles.PROXY_ADMIN.value,
+ LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY.value,
+ ]:
+ user_id = user_api_key_dict.user_id
+
+ response = await _list_key_helper(
+ prisma_client=prisma_client,
+ page=page,
+ size=size,
+ user_id=user_id,
+ team_id=team_id,
+ key_alias=key_alias,
+ return_full_object=return_full_object,
+ organization_id=organization_id,
+ admin_team_ids=admin_team_ids,
+ )
+
+ verbose_proxy_logger.debug("Successfully prepared response")
+
+ return response
+
+ except Exception as e:
+ verbose_proxy_logger.exception(f"Error in list_keys: {e}")
+ if isinstance(e, HTTPException):
+ raise ProxyException(
+ message=getattr(e, "detail", f"error({str(e)})"),
+ type=ProxyErrorTypes.internal_server_error,
+ param=getattr(e, "param", "None"),
+ code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR),
+ )
+ elif isinstance(e, ProxyException):
+ raise e
+ raise ProxyException(
+ message="Authentication Error, " + str(e),
+ type=ProxyErrorTypes.internal_server_error,
+ param=getattr(e, "param", "None"),
+ code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ )
+
+
+async def _list_key_helper(
+ prisma_client: PrismaClient,
+ page: int,
+ size: int,
+ user_id: Optional[str],
+ team_id: Optional[str],
+ organization_id: Optional[str],
+ key_alias: Optional[str],
+ exclude_team_id: Optional[str] = None,
+ return_full_object: bool = False,
+ admin_team_ids: Optional[
+ List[str]
+ ] = None, # New parameter for teams where user is admin
+) -> KeyListResponseObject:
+ """
+ Helper function to list keys
+ Args:
+ page: int
+ size: int
+ user_id: Optional[str]
+ team_id: Optional[str]
+ key_alias: Optional[str]
+ exclude_team_id: Optional[str] # exclude a specific team_id
+ return_full_object: bool # when true, will return UserAPIKeyAuth objects instead of just the token
+ admin_team_ids: Optional[List[str]] # list of team IDs where the user is an admin
+
+ Returns:
+ KeyListResponseObject
+ {
+ "keys": List[str] or List[UserAPIKeyAuth], # Updated to reflect possible return types
+ "total_count": int,
+ "current_page": int,
+ "total_pages": int,
+ }
+ """
+
+ # Prepare filter conditions
+ where: Dict[str, Union[str, Dict[str, Any], List[Dict[str, Any]]]] = {}
+ where.update(_get_condition_to_filter_out_ui_session_tokens())
+
+ # Build the OR conditions for user's keys and admin team keys
+ or_conditions: List[Dict[str, Any]] = []
+
+ # Base conditions for user's own keys
+ user_condition: Dict[str, Any] = {}
+ if user_id and isinstance(user_id, str):
+ user_condition["user_id"] = user_id
+ if team_id and isinstance(team_id, str):
+ user_condition["team_id"] = team_id
+ if key_alias and isinstance(key_alias, str):
+ user_condition["key_alias"] = key_alias
+ if exclude_team_id and isinstance(exclude_team_id, str):
+ user_condition["team_id"] = {"not": exclude_team_id}
+ if organization_id and isinstance(organization_id, str):
+ user_condition["organization_id"] = organization_id
+
+ if user_condition:
+ or_conditions.append(user_condition)
+
+ # Add condition for admin team keys if provided
+ if admin_team_ids:
+ or_conditions.append({"team_id": {"in": admin_team_ids}})
+
+ # Combine conditions with OR if we have multiple conditions
+ if len(or_conditions) > 1:
+ where["OR"] = or_conditions
+ elif len(or_conditions) == 1:
+ where.update(or_conditions[0])
+
+ verbose_proxy_logger.debug(f"Filter conditions: {where}")
+
+ # Calculate skip for pagination
+ skip = (page - 1) * size
+
+ verbose_proxy_logger.debug(f"Pagination: skip={skip}, take={size}")
+
+ # Fetch keys with pagination
+ keys = await prisma_client.db.litellm_verificationtoken.find_many(
+ where=where, # type: ignore
+ skip=skip, # type: ignore
+ take=size, # type: ignore
+ order=[
+ {"created_at": "desc"},
+ {"token": "desc"}, # fallback sort
+ ],
+ )
+
+ verbose_proxy_logger.debug(f"Fetched {len(keys)} keys")
+
+ # Get total count of keys
+ total_count = await prisma_client.db.litellm_verificationtoken.count(
+ where=where # type: ignore
+ )
+
+ verbose_proxy_logger.debug(f"Total count of keys: {total_count}")
+
+ # Calculate total pages
+ total_pages = -(-total_count // size) # Ceiling division
+
+ # Prepare response
+ key_list: List[Union[str, UserAPIKeyAuth]] = []
+ for key in keys:
+ if return_full_object is True:
+ key_list.append(UserAPIKeyAuth(**key.dict())) # Return full key object
+ else:
+ _token = key.dict().get("token")
+ key_list.append(_token) # Return only the token
+
+ return KeyListResponseObject(
+ keys=key_list,
+ total_count=total_count,
+ current_page=page,
+ total_pages=total_pages,
+ )
+
+
+def _get_condition_to_filter_out_ui_session_tokens() -> Dict[str, Any]:
+ """
+ Condition to filter out UI session tokens
+ """
+ return {
+ "OR": [
+ {"team_id": None}, # Include records where team_id is null
+ {
+ "team_id": {"not": UI_SESSION_TOKEN_TEAM_ID}
+ }, # Include records where team_id != UI_SESSION_TOKEN_TEAM_ID
+ ]
+ }
+
+
+@router.post(
+ "/key/block", tags=["key management"], dependencies=[Depends(user_api_key_auth)]
+)
+@management_endpoint_wrapper
+async def block_key(
+ data: BlockKeyRequest,
+ http_request: Request,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+ litellm_changed_by: Optional[str] = Header(
+ None,
+ description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
+ ),
+) -> Optional[LiteLLM_VerificationToken]:
+ """
+ Block an Virtual key from making any requests.
+
+ Parameters:
+ - key: str - The key to block. Can be either the unhashed key (sk-...) or the hashed key value
+
+ Example:
+ ```bash
+ curl --location 'http://0.0.0.0:4000/key/block' \
+ --header 'Authorization: Bearer sk-1234' \
+ --header 'Content-Type: application/json' \
+ --data '{
+ "key": "sk-Fn8Ej39NxjAXrvpUGKghGw"
+ }'
+ ```
+
+ Note: This is an admin-only endpoint. Only proxy admins can block keys.
+ """
+ from litellm.proxy.proxy_server import (
+ create_audit_log_for_update,
+ hash_token,
+ litellm_proxy_admin_name,
+ prisma_client,
+ proxy_logging_obj,
+ user_api_key_cache,
+ )
+
+ if prisma_client is None:
+ raise Exception("{}".format(CommonProxyErrors.db_not_connected_error.value))
+
+ if data.key.startswith("sk-"):
+ hashed_token = hash_token(token=data.key)
+ else:
+ hashed_token = data.key
+
+ if litellm.store_audit_logs is True:
+ # make an audit log for key update
+ record = await prisma_client.db.litellm_verificationtoken.find_unique(
+ where={"token": hashed_token}
+ )
+ if record is None:
+ raise ProxyException(
+ message=f"Key {data.key} not found",
+ type=ProxyErrorTypes.bad_request_error,
+ param="key",
+ code=status.HTTP_404_NOT_FOUND,
+ )
+ asyncio.create_task(
+ create_audit_log_for_update(
+ request_data=LiteLLM_AuditLogs(
+ id=str(uuid.uuid4()),
+ updated_at=datetime.now(timezone.utc),
+ changed_by=litellm_changed_by
+ or user_api_key_dict.user_id
+ or litellm_proxy_admin_name,
+ changed_by_api_key=user_api_key_dict.api_key,
+ table_name=LitellmTableNames.KEY_TABLE_NAME,
+ object_id=hashed_token,
+ action="blocked",
+ updated_values="{}",
+ before_value=record.model_dump_json(),
+ )
+ )
+ )
+
+ record = await prisma_client.db.litellm_verificationtoken.update(
+ where={"token": hashed_token}, data={"blocked": True} # type: ignore
+ )
+
+ ## UPDATE KEY CACHE
+
+ ### get cached object ###
+ key_object = await get_key_object(
+ hashed_token=hashed_token,
+ prisma_client=prisma_client,
+ user_api_key_cache=user_api_key_cache,
+ parent_otel_span=None,
+ proxy_logging_obj=proxy_logging_obj,
+ )
+
+ ### update cached object ###
+ key_object.blocked = True
+
+ ### store cached object ###
+ await _cache_key_object(
+ hashed_token=hashed_token,
+ user_api_key_obj=key_object,
+ user_api_key_cache=user_api_key_cache,
+ proxy_logging_obj=proxy_logging_obj,
+ )
+
+ return record
+
+
+@router.post(
+ "/key/unblock", tags=["key management"], dependencies=[Depends(user_api_key_auth)]
+)
+@management_endpoint_wrapper
+async def unblock_key(
+ data: BlockKeyRequest,
+ http_request: Request,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+ litellm_changed_by: Optional[str] = Header(
+ None,
+ description="The litellm-changed-by header enables tracking of actions performed by authorized users on behalf of other users, providing an audit trail for accountability",
+ ),
+):
+ """
+ Unblock a Virtual key to allow it to make requests again.
+
+ Parameters:
+ - key: str - The key to unblock. Can be either the unhashed key (sk-...) or the hashed key value
+
+ Example:
+ ```bash
+ curl --location 'http://0.0.0.0:4000/key/unblock' \
+ --header 'Authorization: Bearer sk-1234' \
+ --header 'Content-Type: application/json' \
+ --data '{
+ "key": "sk-Fn8Ej39NxjAXrvpUGKghGw"
+ }'
+ ```
+
+ Note: This is an admin-only endpoint. Only proxy admins can unblock keys.
+ """
+ from litellm.proxy.proxy_server import (
+ create_audit_log_for_update,
+ hash_token,
+ litellm_proxy_admin_name,
+ prisma_client,
+ proxy_logging_obj,
+ user_api_key_cache,
+ )
+
+ if prisma_client is None:
+ raise Exception("{}".format(CommonProxyErrors.db_not_connected_error.value))
+
+ if data.key.startswith("sk-"):
+ hashed_token = hash_token(token=data.key)
+ else:
+ hashed_token = data.key
+
+ if litellm.store_audit_logs is True:
+ # make an audit log for key update
+ record = await prisma_client.db.litellm_verificationtoken.find_unique(
+ where={"token": hashed_token}
+ )
+ if record is None:
+ raise ProxyException(
+ message=f"Key {data.key} not found",
+ type=ProxyErrorTypes.bad_request_error,
+ param="key",
+ code=status.HTTP_404_NOT_FOUND,
+ )
+ asyncio.create_task(
+ create_audit_log_for_update(
+ request_data=LiteLLM_AuditLogs(
+ id=str(uuid.uuid4()),
+ updated_at=datetime.now(timezone.utc),
+ changed_by=litellm_changed_by
+ or user_api_key_dict.user_id
+ or litellm_proxy_admin_name,
+ changed_by_api_key=user_api_key_dict.api_key,
+ table_name=LitellmTableNames.KEY_TABLE_NAME,
+ object_id=hashed_token,
+ action="blocked",
+ updated_values="{}",
+ before_value=record.model_dump_json(),
+ )
+ )
+ )
+
+ record = await prisma_client.db.litellm_verificationtoken.update(
+ where={"token": hashed_token}, data={"blocked": False} # type: ignore
+ )
+
+ ## UPDATE KEY CACHE
+
+ ### get cached object ###
+ key_object = await get_key_object(
+ hashed_token=hashed_token,
+ prisma_client=prisma_client,
+ user_api_key_cache=user_api_key_cache,
+ parent_otel_span=None,
+ proxy_logging_obj=proxy_logging_obj,
+ )
+
+ ### update cached object ###
+ key_object.blocked = False
+
+ ### store cached object ###
+ await _cache_key_object(
+ hashed_token=hashed_token,
+ user_api_key_obj=key_object,
+ user_api_key_cache=user_api_key_cache,
+ proxy_logging_obj=proxy_logging_obj,
+ )
+
+ return record
+
+
+@router.post(
+ "/key/health",
+ tags=["key management"],
+ dependencies=[Depends(user_api_key_auth)],
+ response_model=KeyHealthResponse,
+)
+@management_endpoint_wrapper
+async def key_health(
+ request: Request,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """
+ Check the health of the key
+
+ Checks:
+ - If key based logging is configured correctly - sends a test log
+
+ Usage
+
+ Pass the key in the request header
+
+ ```bash
+ curl -X POST "http://localhost:4000/key/health" \
+ -H "Authorization: Bearer sk-1234" \
+ -H "Content-Type: application/json"
+ ```
+
+ Response when logging callbacks are setup correctly:
+
+ ```json
+ {
+ "key": "healthy",
+ "logging_callbacks": {
+ "callbacks": [
+ "gcs_bucket"
+ ],
+ "status": "healthy",
+ "details": "No logger exceptions triggered, system is healthy. Manually check if logs were sent to ['gcs_bucket']"
+ }
+ }
+ ```
+
+
+ Response when logging callbacks are not setup correctly:
+ ```json
+ {
+ "key": "unhealthy",
+ "logging_callbacks": {
+ "callbacks": [
+ "gcs_bucket"
+ ],
+ "status": "unhealthy",
+ "details": "Logger exceptions triggered, system is unhealthy: Failed to load vertex credentials. Check to see if credentials containing partial/invalid information."
+ }
+ }
+ ```
+ """
+ try:
+ # Get the key's metadata
+ key_metadata = user_api_key_dict.metadata
+
+ health_status: KeyHealthResponse = KeyHealthResponse(
+ key="healthy",
+ logging_callbacks=None,
+ )
+
+ # Check if logging is configured in metadata
+ if key_metadata and "logging" in key_metadata:
+ logging_statuses = await test_key_logging(
+ user_api_key_dict=user_api_key_dict,
+ request=request,
+ key_logging=key_metadata["logging"],
+ )
+ health_status["logging_callbacks"] = logging_statuses
+
+ # Check if any logging callback is unhealthy
+ if logging_statuses.get("status") == "unhealthy":
+ health_status["key"] = "unhealthy"
+
+ return KeyHealthResponse(**health_status)
+
+ except Exception as e:
+ raise ProxyException(
+ message=f"Key health check failed: {str(e)}",
+ type=ProxyErrorTypes.internal_server_error,
+ param=getattr(e, "param", "None"),
+ code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ )
+
+
+def _can_user_query_key_info(
+ user_api_key_dict: UserAPIKeyAuth,
+ key: Optional[str],
+ key_info: LiteLLM_VerificationToken,
+) -> bool:
+ """
+ Helper to check if the user has access to the key's info
+ """
+ if (
+ user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value
+ or user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY.value
+ ):
+ return True
+ elif user_api_key_dict.api_key == key:
+ return True
+ # user can query their own key info
+ elif key_info.user_id == user_api_key_dict.user_id:
+ return True
+ return False
+
+
+async def test_key_logging(
+ user_api_key_dict: UserAPIKeyAuth,
+ request: Request,
+ key_logging: List[Dict[str, Any]],
+) -> LoggingCallbackStatus:
+ """
+ Test the key-based logging
+
+ - Test that key logging is correctly formatted and all args are passed correctly
+ - Make a mock completion call -> user can check if it's correctly logged
+ - Check if any logger.exceptions were triggered -> if they were then returns it to the user client side
+ """
+ import logging
+ from io import StringIO
+
+ from litellm.proxy.litellm_pre_call_utils import add_litellm_data_to_request
+ from litellm.proxy.proxy_server import general_settings, proxy_config
+
+ logging_callbacks: List[str] = []
+ for callback in key_logging:
+ if callback.get("callback_name") is not None:
+ logging_callbacks.append(callback["callback_name"])
+ else:
+ raise ValueError("callback_name is required in key_logging")
+
+ log_capture_string = StringIO()
+ ch = logging.StreamHandler(log_capture_string)
+ ch.setLevel(logging.ERROR)
+ logger = logging.getLogger()
+ logger.addHandler(ch)
+
+ try:
+ data = {
+ "model": "openai/litellm-key-health-test",
+ "messages": [
+ {
+ "role": "user",
+ "content": "Hello, this is a test from litellm /key/health. No LLM API call was made for this",
+ }
+ ],
+ "mock_response": "test response",
+ }
+ data = await add_litellm_data_to_request(
+ data=data,
+ user_api_key_dict=user_api_key_dict,
+ proxy_config=proxy_config,
+ general_settings=general_settings,
+ request=request,
+ )
+ await litellm.acompletion(
+ **data
+ ) # make mock completion call to trigger key based callbacks
+ except Exception as e:
+ return LoggingCallbackStatus(
+ callbacks=logging_callbacks,
+ status="unhealthy",
+ details=f"Logging test failed: {str(e)}",
+ )
+
+ await asyncio.sleep(
+ 2
+ ) # wait for callbacks to run, callbacks use batching so wait for the flush event
+
+ # Check if any logger exceptions were triggered
+ log_contents = log_capture_string.getvalue()
+ logger.removeHandler(ch)
+ if log_contents:
+ return LoggingCallbackStatus(
+ callbacks=logging_callbacks,
+ status="unhealthy",
+ details=f"Logger exceptions triggered, system is unhealthy: {log_contents}",
+ )
+ else:
+ return LoggingCallbackStatus(
+ callbacks=logging_callbacks,
+ status="healthy",
+ details=f"No logger exceptions triggered, system is healthy. Manually check if logs were sent to {logging_callbacks} ",
+ )
+
+
+async def _enforce_unique_key_alias(
+ key_alias: Optional[str],
+ prisma_client: Any,
+ existing_key_token: Optional[str] = None,
+) -> None:
+ """
+ Helper to enforce unique key aliases across all keys.
+
+ Args:
+ key_alias (Optional[str]): The key alias to check
+ prisma_client (Any): Prisma client instance
+ existing_key_token (Optional[str]): ID of existing key being updated, to exclude from uniqueness check
+ (The Admin UI passes key_alias, in all Edit key requests. So we need to be sure that if we find a key with the same alias, it's not the same key we're updating)
+
+ Raises:
+ ProxyException: If key alias already exists on a different key
+ """
+ if key_alias is not None and prisma_client is not None:
+ where_clause: dict[str, Any] = {"key_alias": key_alias}
+ if existing_key_token:
+ # Exclude the current key from the uniqueness check
+ where_clause["NOT"] = {"token": existing_key_token}
+
+ existing_key = await prisma_client.db.litellm_verificationtoken.find_first(
+ where=where_clause
+ )
+ if existing_key is not None:
+ raise ProxyException(
+ message=f"Key with alias '{key_alias}' already exists. Unique key aliases across all keys are required.",
+ type=ProxyErrorTypes.bad_request_error,
+ param="key_alias",
+ code=status.HTTP_400_BAD_REQUEST,
+ )
+
+
+def validate_model_max_budget(model_max_budget: Optional[Dict]) -> None:
+ """
+ Validate the model_max_budget is GenericBudgetConfigType + enforce user has an enterprise license
+
+ Raises:
+ Exception: If model_max_budget is not a valid GenericBudgetConfigType
+ """
+ try:
+ if model_max_budget is None:
+ return
+ if len(model_max_budget) == 0:
+ return
+ if model_max_budget is not None:
+ from litellm.proxy.proxy_server import CommonProxyErrors, premium_user
+
+ if premium_user is not True:
+ raise ValueError(
+ f"You must have an enterprise license to set model_max_budget. {CommonProxyErrors.not_premium_user.value}"
+ )
+ for _model, _budget_info in model_max_budget.items():
+ assert isinstance(_model, str)
+
+ # /CRUD endpoints can pass budget_limit as a string, so we need to convert it to a float
+ if "budget_limit" in _budget_info:
+ _budget_info["budget_limit"] = float(_budget_info["budget_limit"])
+ BudgetConfig(**_budget_info)
+ except Exception as e:
+ raise ValueError(
+ f"Invalid model_max_budget: {str(e)}. Example of valid model_max_budget: https://docs.litellm.ai/docs/proxy/users"
+ )