about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/cost_calculator.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/cost_calculator.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/cost_calculator.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/cost_calculator.py1066
1 files changed, 1066 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/cost_calculator.py b/.venv/lib/python3.12/site-packages/litellm/cost_calculator.py
new file mode 100644
index 00000000..e17a94c8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/cost_calculator.py
@@ -0,0 +1,1066 @@
+# What is this?
+## File for 'response_cost' calculation in Logging
+import time
+from functools import lru_cache
+from typing import Any, List, Literal, Optional, Tuple, Union
+
+from pydantic import BaseModel
+
+import litellm
+import litellm._logging
+from litellm import verbose_logger
+from litellm.litellm_core_utils.llm_cost_calc.utils import _generic_cost_per_character
+from litellm.llms.anthropic.cost_calculation import (
+    cost_per_token as anthropic_cost_per_token,
+)
+from litellm.llms.azure.cost_calculation import (
+    cost_per_token as azure_openai_cost_per_token,
+)
+from litellm.llms.bedrock.image.cost_calculator import (
+    cost_calculator as bedrock_image_cost_calculator,
+)
+from litellm.llms.databricks.cost_calculator import (
+    cost_per_token as databricks_cost_per_token,
+)
+from litellm.llms.deepseek.cost_calculator import (
+    cost_per_token as deepseek_cost_per_token,
+)
+from litellm.llms.fireworks_ai.cost_calculator import (
+    cost_per_token as fireworks_ai_cost_per_token,
+)
+from litellm.llms.gemini.cost_calculator import cost_per_token as gemini_cost_per_token
+from litellm.llms.openai.cost_calculation import (
+    cost_per_second as openai_cost_per_second,
+)
+from litellm.llms.openai.cost_calculation import cost_per_token as openai_cost_per_token
+from litellm.llms.together_ai.cost_calculator import get_model_params_and_category
+from litellm.llms.vertex_ai.cost_calculator import (
+    cost_per_character as google_cost_per_character,
+)
+from litellm.llms.vertex_ai.cost_calculator import (
+    cost_per_token as google_cost_per_token,
+)
+from litellm.llms.vertex_ai.cost_calculator import cost_router as google_cost_router
+from litellm.llms.vertex_ai.image_generation.cost_calculator import (
+    cost_calculator as vertex_ai_image_cost_calculator,
+)
+from litellm.responses.utils import ResponseAPILoggingUtils
+from litellm.types.llms.openai import (
+    HttpxBinaryResponseContent,
+    ResponseAPIUsage,
+    ResponsesAPIResponse,
+)
+from litellm.types.rerank import RerankBilledUnits, RerankResponse
+from litellm.types.utils import (
+    CallTypesLiteral,
+    LlmProviders,
+    LlmProvidersSet,
+    ModelInfo,
+    PassthroughCallTypes,
+    Usage,
+)
+from litellm.utils import (
+    CallTypes,
+    CostPerToken,
+    EmbeddingResponse,
+    ImageResponse,
+    ModelResponse,
+    ProviderConfigManager,
+    TextCompletionResponse,
+    TranscriptionResponse,
+    _cached_get_model_info_helper,
+    token_counter,
+)
+
+
+def _cost_per_token_custom_pricing_helper(
+    prompt_tokens: float = 0,
+    completion_tokens: float = 0,
+    response_time_ms: Optional[float] = 0.0,
+    ### CUSTOM PRICING ###
+    custom_cost_per_token: Optional[CostPerToken] = None,
+    custom_cost_per_second: Optional[float] = None,
+) -> Optional[Tuple[float, float]]:
+    """Internal helper function for calculating cost, if custom pricing given"""
+    if custom_cost_per_token is None and custom_cost_per_second is None:
+        return None
+
+    if custom_cost_per_token is not None:
+        input_cost = custom_cost_per_token["input_cost_per_token"] * prompt_tokens
+        output_cost = custom_cost_per_token["output_cost_per_token"] * completion_tokens
+        return input_cost, output_cost
+    elif custom_cost_per_second is not None:
+        output_cost = custom_cost_per_second * response_time_ms / 1000  # type: ignore
+        return 0, output_cost
+
+    return None
+
+
+def cost_per_token(  # noqa: PLR0915
+    model: str = "",
+    prompt_tokens: int = 0,
+    completion_tokens: int = 0,
+    response_time_ms: Optional[float] = 0.0,
+    custom_llm_provider: Optional[str] = None,
+    region_name=None,
+    ### CHARACTER PRICING ###
+    prompt_characters: Optional[int] = None,
+    completion_characters: Optional[int] = None,
+    ### PROMPT CACHING PRICING ### - used for anthropic
+    cache_creation_input_tokens: Optional[int] = 0,
+    cache_read_input_tokens: Optional[int] = 0,
+    ### CUSTOM PRICING ###
+    custom_cost_per_token: Optional[CostPerToken] = None,
+    custom_cost_per_second: Optional[float] = None,
+    ### NUMBER OF QUERIES ###
+    number_of_queries: Optional[int] = None,
+    ### USAGE OBJECT ###
+    usage_object: Optional[Usage] = None,  # just read the usage object if provided
+    ### BILLED UNITS ###
+    rerank_billed_units: Optional[RerankBilledUnits] = None,
+    ### CALL TYPE ###
+    call_type: CallTypesLiteral = "completion",
+    audio_transcription_file_duration: float = 0.0,  # for audio transcription calls - the file time in seconds
+) -> Tuple[float, float]:  # type: ignore
+    """
+    Calculates the cost per token for a given model, prompt tokens, and completion tokens.
+
+    Parameters:
+        model (str): The name of the model to use. Default is ""
+        prompt_tokens (int): The number of tokens in the prompt.
+        completion_tokens (int): The number of tokens in the completion.
+        response_time (float): The amount of time, in milliseconds, it took the call to complete.
+        prompt_characters (float): The number of characters in the prompt. Used for vertex ai cost calculation.
+        completion_characters (float): The number of characters in the completion response. Used for vertex ai cost calculation.
+        custom_llm_provider (str): The llm provider to whom the call was made (see init.py for full list)
+        custom_cost_per_token: Optional[CostPerToken]: the cost per input + output token for the llm api call.
+        custom_cost_per_second: Optional[float]: the cost per second for the llm api call.
+        call_type: Optional[str]: the call type
+
+    Returns:
+        tuple: A tuple containing the cost in USD dollars for prompt tokens and completion tokens, respectively.
+    """
+    if model is None:
+        raise Exception("Invalid arg. Model cannot be none.")
+
+    ## RECONSTRUCT USAGE BLOCK ##
+    if usage_object is not None:
+        usage_block = usage_object
+    else:
+        usage_block = Usage(
+            prompt_tokens=prompt_tokens,
+            completion_tokens=completion_tokens,
+            total_tokens=prompt_tokens + completion_tokens,
+            cache_creation_input_tokens=cache_creation_input_tokens,
+            cache_read_input_tokens=cache_read_input_tokens,
+        )
+
+    ## CUSTOM PRICING ##
+    response_cost = _cost_per_token_custom_pricing_helper(
+        prompt_tokens=prompt_tokens,
+        completion_tokens=completion_tokens,
+        response_time_ms=response_time_ms,
+        custom_cost_per_second=custom_cost_per_second,
+        custom_cost_per_token=custom_cost_per_token,
+    )
+
+    if response_cost is not None:
+        return response_cost[0], response_cost[1]
+
+    # given
+    prompt_tokens_cost_usd_dollar: float = 0
+    completion_tokens_cost_usd_dollar: float = 0
+    model_cost_ref = litellm.model_cost
+    model_with_provider = model
+    if custom_llm_provider is not None:
+        model_with_provider = custom_llm_provider + "/" + model
+        if region_name is not None:
+            model_with_provider_and_region = (
+                f"{custom_llm_provider}/{region_name}/{model}"
+            )
+            if (
+                model_with_provider_and_region in model_cost_ref
+            ):  # use region based pricing, if it's available
+                model_with_provider = model_with_provider_and_region
+    else:
+        _, custom_llm_provider, _, _ = litellm.get_llm_provider(model=model)
+    model_without_prefix = model
+    model_parts = model.split("/", 1)
+    if len(model_parts) > 1:
+        model_without_prefix = model_parts[1]
+    else:
+        model_without_prefix = model
+    """
+    Code block that formats model to lookup in litellm.model_cost
+    Option1. model = "bedrock/ap-northeast-1/anthropic.claude-instant-v1". This is the most accurate since it is region based. Should always be option 1
+    Option2. model = "openai/gpt-4"       - model = provider/model
+    Option3. model = "anthropic.claude-3" - model = model
+    """
+    if (
+        model_with_provider in model_cost_ref
+    ):  # Option 2. use model with provider, model = "openai/gpt-4"
+        model = model_with_provider
+    elif model in model_cost_ref:  # Option 1. use model passed, model="gpt-4"
+        model = model
+    elif (
+        model_without_prefix in model_cost_ref
+    ):  # Option 3. if user passed model="bedrock/anthropic.claude-3", use model="anthropic.claude-3"
+        model = model_without_prefix
+
+    # see this https://learn.microsoft.com/en-us/azure/ai-services/openai/concepts/models
+    if call_type == "speech" or call_type == "aspeech":
+        if prompt_characters is None:
+            raise ValueError(
+                "prompt_characters must be provided for tts calls. prompt_characters={}, model={}, custom_llm_provider={}, call_type={}".format(
+                    prompt_characters,
+                    model,
+                    custom_llm_provider,
+                    call_type,
+                )
+            )
+        prompt_cost, completion_cost = _generic_cost_per_character(
+            model=model_without_prefix,
+            custom_llm_provider=custom_llm_provider,
+            prompt_characters=prompt_characters,
+            completion_characters=0,
+            custom_prompt_cost=None,
+            custom_completion_cost=0,
+        )
+        if prompt_cost is None or completion_cost is None:
+            raise ValueError(
+                "cost for tts call is None. prompt_cost={}, completion_cost={}, model={}, custom_llm_provider={}, prompt_characters={}, completion_characters={}".format(
+                    prompt_cost,
+                    completion_cost,
+                    model_without_prefix,
+                    custom_llm_provider,
+                    prompt_characters,
+                    completion_characters,
+                )
+            )
+        return prompt_cost, completion_cost
+    elif call_type == "arerank" or call_type == "rerank":
+        return rerank_cost(
+            model=model,
+            custom_llm_provider=custom_llm_provider,
+            billed_units=rerank_billed_units,
+        )
+    elif (
+        call_type == "aretrieve_batch"
+        or call_type == "retrieve_batch"
+        or call_type == CallTypes.aretrieve_batch
+        or call_type == CallTypes.retrieve_batch
+    ):
+        return batch_cost_calculator(
+            usage=usage_block, model=model, custom_llm_provider=custom_llm_provider
+        )
+    elif call_type == "atranscription" or call_type == "transcription":
+        return openai_cost_per_second(
+            model=model,
+            custom_llm_provider=custom_llm_provider,
+            duration=audio_transcription_file_duration,
+        )
+    elif custom_llm_provider == "vertex_ai":
+        cost_router = google_cost_router(
+            model=model_without_prefix,
+            custom_llm_provider=custom_llm_provider,
+            call_type=call_type,
+        )
+        if cost_router == "cost_per_character":
+            return google_cost_per_character(
+                model=model_without_prefix,
+                custom_llm_provider=custom_llm_provider,
+                prompt_characters=prompt_characters,
+                completion_characters=completion_characters,
+                prompt_tokens=prompt_tokens,
+                completion_tokens=completion_tokens,
+            )
+        elif cost_router == "cost_per_token":
+            return google_cost_per_token(
+                model=model_without_prefix,
+                custom_llm_provider=custom_llm_provider,
+                prompt_tokens=prompt_tokens,
+                completion_tokens=completion_tokens,
+            )
+    elif custom_llm_provider == "anthropic":
+        return anthropic_cost_per_token(model=model, usage=usage_block)
+    elif custom_llm_provider == "openai":
+        return openai_cost_per_token(model=model, usage=usage_block)
+    elif custom_llm_provider == "databricks":
+        return databricks_cost_per_token(model=model, usage=usage_block)
+    elif custom_llm_provider == "fireworks_ai":
+        return fireworks_ai_cost_per_token(model=model, usage=usage_block)
+    elif custom_llm_provider == "azure":
+        return azure_openai_cost_per_token(
+            model=model, usage=usage_block, response_time_ms=response_time_ms
+        )
+    elif custom_llm_provider == "gemini":
+        return gemini_cost_per_token(model=model, usage=usage_block)
+    elif custom_llm_provider == "deepseek":
+        return deepseek_cost_per_token(model=model, usage=usage_block)
+    else:
+        model_info = _cached_get_model_info_helper(
+            model=model, custom_llm_provider=custom_llm_provider
+        )
+
+        if model_info["input_cost_per_token"] > 0:
+            ## COST PER TOKEN ##
+            prompt_tokens_cost_usd_dollar = (
+                model_info["input_cost_per_token"] * prompt_tokens
+            )
+        elif (
+            model_info.get("input_cost_per_second", None) is not None
+            and response_time_ms is not None
+        ):
+            verbose_logger.debug(
+                "For model=%s - input_cost_per_second: %s; response time: %s",
+                model,
+                model_info.get("input_cost_per_second", None),
+                response_time_ms,
+            )
+            ## COST PER SECOND ##
+            prompt_tokens_cost_usd_dollar = (
+                model_info["input_cost_per_second"] * response_time_ms / 1000  # type: ignore
+            )
+
+        if model_info["output_cost_per_token"] > 0:
+            completion_tokens_cost_usd_dollar = (
+                model_info["output_cost_per_token"] * completion_tokens
+            )
+        elif (
+            model_info.get("output_cost_per_second", None) is not None
+            and response_time_ms is not None
+        ):
+            verbose_logger.debug(
+                "For model=%s - output_cost_per_second: %s; response time: %s",
+                model,
+                model_info.get("output_cost_per_second", None),
+                response_time_ms,
+            )
+            ## COST PER SECOND ##
+            completion_tokens_cost_usd_dollar = (
+                model_info["output_cost_per_second"] * response_time_ms / 1000  # type: ignore
+            )
+
+        verbose_logger.debug(
+            "Returned custom cost for model=%s - prompt_tokens_cost_usd_dollar: %s, completion_tokens_cost_usd_dollar: %s",
+            model,
+            prompt_tokens_cost_usd_dollar,
+            completion_tokens_cost_usd_dollar,
+        )
+        return prompt_tokens_cost_usd_dollar, completion_tokens_cost_usd_dollar
+
+
+def get_replicate_completion_pricing(completion_response: dict, total_time=0.0):
+    # see https://replicate.com/pricing
+    # for all litellm currently supported LLMs, almost all requests go to a100_80gb
+    a100_80gb_price_per_second_public = (
+        0.001400  # assume all calls sent to A100 80GB for now
+    )
+    if total_time == 0.0:  # total time is in ms
+        start_time = completion_response.get("created", time.time())
+        end_time = getattr(completion_response, "ended", time.time())
+        total_time = end_time - start_time
+
+    return a100_80gb_price_per_second_public * total_time / 1000
+
+
+def has_hidden_params(obj: Any) -> bool:
+    return hasattr(obj, "_hidden_params")
+
+
+def _get_provider_for_cost_calc(
+    model: Optional[str],
+    custom_llm_provider: Optional[str] = None,
+) -> Optional[str]:
+    if custom_llm_provider is not None:
+        return custom_llm_provider
+    if model is None:
+        return None
+    try:
+        _, custom_llm_provider, _, _ = litellm.get_llm_provider(model=model)
+    except Exception as e:
+        verbose_logger.debug(
+            f"litellm.cost_calculator.py::_get_provider_for_cost_calc() - Error inferring custom_llm_provider - {str(e)}"
+        )
+        return None
+
+    return custom_llm_provider
+
+
+def _select_model_name_for_cost_calc(
+    model: Optional[str],
+    completion_response: Optional[Any],
+    base_model: Optional[str] = None,
+    custom_pricing: Optional[bool] = None,
+    custom_llm_provider: Optional[str] = None,
+) -> Optional[str]:
+    """
+    1. If custom pricing is true, return received model name
+    2. If base_model is set (e.g. for azure models), return that
+    3. If completion response has model set return that
+    4. Check if model is passed in return that
+    """
+
+    return_model: Optional[str] = None
+    region_name: Optional[str] = None
+    custom_llm_provider = _get_provider_for_cost_calc(
+        model=model, custom_llm_provider=custom_llm_provider
+    )
+
+    if custom_pricing is True:
+        return_model = model
+
+    if base_model is not None:
+        return_model = base_model
+
+    completion_response_model: Optional[str] = None
+    if completion_response is not None:
+        if isinstance(completion_response, BaseModel):
+            completion_response_model = getattr(completion_response, "model", None)
+        elif isinstance(completion_response, dict):
+            completion_response_model = completion_response.get("model", None)
+    hidden_params: Optional[dict] = getattr(completion_response, "_hidden_params", None)
+    if completion_response_model is None and hidden_params is not None:
+        if (
+            hidden_params.get("model", None) is not None
+            and len(hidden_params["model"]) > 0
+        ):
+            return_model = hidden_params.get("model", model)
+    if hidden_params is not None and hidden_params.get("region_name", None) is not None:
+        region_name = hidden_params.get("region_name", None)
+
+    if return_model is None and completion_response_model is not None:
+        return_model = completion_response_model
+
+    if return_model is None and model is not None:
+        return_model = model
+
+    if (
+        return_model is not None
+        and custom_llm_provider is not None
+        and not _model_contains_known_llm_provider(return_model)
+    ):  # add provider prefix if not already present, to match model_cost
+        if region_name is not None:
+            return_model = f"{custom_llm_provider}/{region_name}/{return_model}"
+        else:
+            return_model = f"{custom_llm_provider}/{return_model}"
+
+    return return_model
+
+
+@lru_cache(maxsize=16)
+def _model_contains_known_llm_provider(model: str) -> bool:
+    """
+    Check if the model contains a known llm provider
+    """
+    _provider_prefix = model.split("/")[0]
+    return _provider_prefix in LlmProvidersSet
+
+
+def _get_usage_object(
+    completion_response: Any,
+) -> Optional[Usage]:
+    usage_obj: Optional[Usage] = None
+    if completion_response is not None and isinstance(
+        completion_response, ModelResponse
+    ):
+        usage_obj = completion_response.get("usage")
+
+    return usage_obj
+
+
+def _is_known_usage_objects(usage_obj):
+    """Returns True if the usage obj is a known Usage type"""
+    return isinstance(usage_obj, litellm.Usage) or isinstance(
+        usage_obj, ResponseAPIUsage
+    )
+
+
+def _infer_call_type(
+    call_type: Optional[CallTypesLiteral], completion_response: Any
+) -> Optional[CallTypesLiteral]:
+    if call_type is not None:
+        return call_type
+
+    if completion_response is None:
+        return None
+
+    if isinstance(completion_response, ModelResponse):
+        return "completion"
+    elif isinstance(completion_response, EmbeddingResponse):
+        return "embedding"
+    elif isinstance(completion_response, TranscriptionResponse):
+        return "transcription"
+    elif isinstance(completion_response, HttpxBinaryResponseContent):
+        return "speech"
+    elif isinstance(completion_response, RerankResponse):
+        return "rerank"
+    elif isinstance(completion_response, ImageResponse):
+        return "image_generation"
+    elif isinstance(completion_response, TextCompletionResponse):
+        return "text_completion"
+
+    return call_type
+
+
+def completion_cost(  # noqa: PLR0915
+    completion_response=None,
+    model: Optional[str] = None,
+    prompt="",
+    messages: List = [],
+    completion="",
+    total_time: Optional[float] = 0.0,  # used for replicate, sagemaker
+    call_type: Optional[CallTypesLiteral] = None,
+    ### REGION ###
+    custom_llm_provider=None,
+    region_name=None,  # used for bedrock pricing
+    ### IMAGE GEN ###
+    size: Optional[str] = None,
+    quality: Optional[str] = None,
+    n: Optional[int] = None,  # number of images
+    ### CUSTOM PRICING ###
+    custom_cost_per_token: Optional[CostPerToken] = None,
+    custom_cost_per_second: Optional[float] = None,
+    optional_params: Optional[dict] = None,
+    custom_pricing: Optional[bool] = None,
+    base_model: Optional[str] = None,
+) -> float:
+    """
+    Calculate the cost of a given completion call fot GPT-3.5-turbo, llama2, any litellm supported llm.
+
+    Parameters:
+        completion_response (litellm.ModelResponses): [Required] The response received from a LiteLLM completion request.
+
+        [OPTIONAL PARAMS]
+        model (str): Optional. The name of the language model used in the completion calls
+        prompt (str): Optional. The input prompt passed to the llm
+        completion (str): Optional. The output completion text from the llm
+        total_time (float, int): Optional. (Only used for Replicate LLMs) The total time used for the request in seconds
+        custom_cost_per_token: Optional[CostPerToken]: the cost per input + output token for the llm api call.
+        custom_cost_per_second: Optional[float]: the cost per second for the llm api call.
+
+    Returns:
+        float: The cost in USD dollars for the completion based on the provided parameters.
+
+    Exceptions:
+        Raises exception if model not in the litellm model cost map. Register model, via custom pricing or PR - https://github.com/BerriAI/litellm/blob/main/model_prices_and_context_window.json
+
+
+    Note:
+        - If completion_response is provided, the function extracts token information and the model name from it.
+        - If completion_response is not provided, the function calculates token counts based on the model and input text.
+        - The cost is calculated based on the model, prompt tokens, and completion tokens.
+        - For certain models containing "togethercomputer" in the name, prices are based on the model size.
+        - For un-mapped Replicate models, the cost is calculated based on the total time used for the request.
+    """
+    try:
+
+        call_type = _infer_call_type(call_type, completion_response) or "completion"
+
+        if (
+            (call_type == "aimage_generation" or call_type == "image_generation")
+            and model is not None
+            and isinstance(model, str)
+            and len(model) == 0
+            and custom_llm_provider == "azure"
+        ):
+            model = "dall-e-2"  # for dall-e-2, azure expects an empty model name
+        # Handle Inputs to completion_cost
+        prompt_tokens = 0
+        prompt_characters: Optional[int] = None
+        completion_tokens = 0
+        completion_characters: Optional[int] = None
+        cache_creation_input_tokens: Optional[int] = None
+        cache_read_input_tokens: Optional[int] = None
+        audio_transcription_file_duration: float = 0.0
+        cost_per_token_usage_object: Optional[Usage] = _get_usage_object(
+            completion_response=completion_response
+        )
+        rerank_billed_units: Optional[RerankBilledUnits] = None
+        model = _select_model_name_for_cost_calc(
+            model=model,
+            completion_response=completion_response,
+            custom_llm_provider=custom_llm_provider,
+            custom_pricing=custom_pricing,
+            base_model=base_model,
+        )
+
+        verbose_logger.info(f"selected model name for cost calculation: {model}")
+
+        if completion_response is not None and (
+            isinstance(completion_response, BaseModel)
+            or isinstance(completion_response, dict)
+        ):  # tts returns a custom class
+            if isinstance(completion_response, dict):
+                usage_obj: Optional[Union[dict, Usage]] = completion_response.get(
+                    "usage", {}
+                )
+            else:
+                usage_obj = getattr(completion_response, "usage", {})
+            if isinstance(usage_obj, BaseModel) and not _is_known_usage_objects(
+                usage_obj=usage_obj
+            ):
+                setattr(
+                    completion_response,
+                    "usage",
+                    litellm.Usage(**usage_obj.model_dump()),
+                )
+            if usage_obj is None:
+                _usage = {}
+            elif isinstance(usage_obj, BaseModel):
+                _usage = usage_obj.model_dump()
+            else:
+                _usage = usage_obj
+
+            if ResponseAPILoggingUtils._is_response_api_usage(_usage):
+                _usage = (
+                    ResponseAPILoggingUtils._transform_response_api_usage_to_chat_usage(
+                        _usage
+                    ).model_dump()
+                )
+
+            # get input/output tokens from completion_response
+            prompt_tokens = _usage.get("prompt_tokens", 0)
+            completion_tokens = _usage.get("completion_tokens", 0)
+            cache_creation_input_tokens = _usage.get("cache_creation_input_tokens", 0)
+            cache_read_input_tokens = _usage.get("cache_read_input_tokens", 0)
+            if (
+                "prompt_tokens_details" in _usage
+                and _usage["prompt_tokens_details"] != {}
+                and _usage["prompt_tokens_details"]
+            ):
+                prompt_tokens_details = _usage.get("prompt_tokens_details", {})
+                cache_read_input_tokens = prompt_tokens_details.get("cached_tokens", 0)
+
+            total_time = getattr(completion_response, "_response_ms", 0)
+
+            hidden_params = getattr(completion_response, "_hidden_params", None)
+            if hidden_params is not None:
+                custom_llm_provider = hidden_params.get(
+                    "custom_llm_provider", custom_llm_provider or None
+                )
+                region_name = hidden_params.get("region_name", region_name)
+                size = hidden_params.get("optional_params", {}).get(
+                    "size", "1024-x-1024"
+                )  # openai default
+                quality = hidden_params.get("optional_params", {}).get(
+                    "quality", "standard"
+                )  # openai default
+                n = hidden_params.get("optional_params", {}).get(
+                    "n", 1
+                )  # openai default
+        else:
+            if model is None:
+                raise ValueError(
+                    f"Model is None and does not exist in passed completion_response. Passed completion_response={completion_response}, model={model}"
+                )
+            if len(messages) > 0:
+                prompt_tokens = token_counter(model=model, messages=messages)
+            elif len(prompt) > 0:
+                prompt_tokens = token_counter(model=model, text=prompt)
+            completion_tokens = token_counter(model=model, text=completion)
+        if model is None:
+            raise ValueError(
+                f"Model is None and does not exist in passed completion_response. Passed completion_response={completion_response}, model={model}"
+            )
+        if custom_llm_provider is None:
+            try:
+                model, custom_llm_provider, _, _ = litellm.get_llm_provider(
+                    model=model
+                )  # strip the llm provider from the model name -> for image gen cost calculation
+            except Exception as e:
+                verbose_logger.debug(
+                    "litellm.cost_calculator.py::completion_cost() - Error inferring custom_llm_provider - {}".format(
+                        str(e)
+                    )
+                )
+        if (
+            call_type == CallTypes.image_generation.value
+            or call_type == CallTypes.aimage_generation.value
+            or call_type == PassthroughCallTypes.passthrough_image_generation.value
+        ):
+            ### IMAGE GENERATION COST CALCULATION ###
+            if custom_llm_provider == "vertex_ai":
+                if isinstance(completion_response, ImageResponse):
+                    return vertex_ai_image_cost_calculator(
+                        model=model,
+                        image_response=completion_response,
+                    )
+            elif custom_llm_provider == "bedrock":
+                if isinstance(completion_response, ImageResponse):
+                    return bedrock_image_cost_calculator(
+                        model=model,
+                        size=size,
+                        image_response=completion_response,
+                        optional_params=optional_params,
+                    )
+                raise TypeError(
+                    "completion_response must be of type ImageResponse for bedrock image cost calculation"
+                )
+            else:
+                return default_image_cost_calculator(
+                    model=model,
+                    quality=quality,
+                    custom_llm_provider=custom_llm_provider,
+                    n=n,
+                    size=size,
+                    optional_params=optional_params,
+                )
+        elif (
+            call_type == CallTypes.speech.value or call_type == CallTypes.aspeech.value
+        ):
+            prompt_characters = litellm.utils._count_characters(text=prompt)
+        elif (
+            call_type == CallTypes.atranscription.value
+            or call_type == CallTypes.transcription.value
+        ):
+            audio_transcription_file_duration = getattr(
+                completion_response, "duration", 0.0
+            )
+        elif (
+            call_type == CallTypes.rerank.value or call_type == CallTypes.arerank.value
+        ):
+            if completion_response is not None and isinstance(
+                completion_response, RerankResponse
+            ):
+                meta_obj = completion_response.meta
+                if meta_obj is not None:
+                    billed_units = meta_obj.get("billed_units", {}) or {}
+                else:
+                    billed_units = {}
+
+                rerank_billed_units = RerankBilledUnits(
+                    search_units=billed_units.get("search_units"),
+                    total_tokens=billed_units.get("total_tokens"),
+                )
+
+                search_units = (
+                    billed_units.get("search_units") or 1
+                )  # cohere charges per request by default.
+                completion_tokens = search_units
+        # Calculate cost based on prompt_tokens, completion_tokens
+        if (
+            "togethercomputer" in model
+            or "together_ai" in model
+            or custom_llm_provider == "together_ai"
+        ):
+            # together ai prices based on size of llm
+            # get_model_params_and_category takes a model name and returns the category of LLM size it is in model_prices_and_context_window.json
+
+            model = get_model_params_and_category(model, call_type=CallTypes(call_type))
+
+        # replicate llms are calculate based on time for request running
+        # see https://replicate.com/pricing
+        elif (
+            model in litellm.replicate_models or "replicate" in model
+        ) and model not in litellm.model_cost:
+            # for unmapped replicate model, default to replicate's time tracking logic
+            return get_replicate_completion_pricing(completion_response, total_time)  # type: ignore
+
+        if model is None:
+            raise ValueError(
+                f"Model is None and does not exist in passed completion_response. Passed completion_response={completion_response}, model={model}"
+            )
+
+        if custom_llm_provider is not None and custom_llm_provider == "vertex_ai":
+            # Calculate the prompt characters + response characters
+            if len(messages) > 0:
+                prompt_string = litellm.utils.get_formatted_prompt(
+                    data={"messages": messages}, call_type="completion"
+                )
+
+                prompt_characters = litellm.utils._count_characters(text=prompt_string)
+            if completion_response is not None and isinstance(
+                completion_response, ModelResponse
+            ):
+                completion_string = litellm.utils.get_response_string(
+                    response_obj=completion_response
+                )
+                completion_characters = litellm.utils._count_characters(
+                    text=completion_string
+                )
+
+        (
+            prompt_tokens_cost_usd_dollar,
+            completion_tokens_cost_usd_dollar,
+        ) = cost_per_token(
+            model=model,
+            prompt_tokens=prompt_tokens,
+            completion_tokens=completion_tokens,
+            custom_llm_provider=custom_llm_provider,
+            response_time_ms=total_time,
+            region_name=region_name,
+            custom_cost_per_second=custom_cost_per_second,
+            custom_cost_per_token=custom_cost_per_token,
+            prompt_characters=prompt_characters,
+            completion_characters=completion_characters,
+            cache_creation_input_tokens=cache_creation_input_tokens,
+            cache_read_input_tokens=cache_read_input_tokens,
+            usage_object=cost_per_token_usage_object,
+            call_type=call_type,
+            audio_transcription_file_duration=audio_transcription_file_duration,
+            rerank_billed_units=rerank_billed_units,
+        )
+        _final_cost = prompt_tokens_cost_usd_dollar + completion_tokens_cost_usd_dollar
+
+        return _final_cost
+    except Exception as e:
+        raise e
+
+
+def get_response_cost_from_hidden_params(
+    hidden_params: Union[dict, BaseModel]
+) -> Optional[float]:
+    if isinstance(hidden_params, BaseModel):
+        _hidden_params_dict = hidden_params.model_dump()
+    else:
+        _hidden_params_dict = hidden_params
+
+    additional_headers = _hidden_params_dict.get("additional_headers", {})
+    if additional_headers and "x-litellm-response-cost" in additional_headers:
+        response_cost = additional_headers["x-litellm-response-cost"]
+        if response_cost is None:
+            return None
+        return float(additional_headers["x-litellm-response-cost"])
+    return None
+
+
+def response_cost_calculator(
+    response_object: Union[
+        ModelResponse,
+        EmbeddingResponse,
+        ImageResponse,
+        TranscriptionResponse,
+        TextCompletionResponse,
+        HttpxBinaryResponseContent,
+        RerankResponse,
+        ResponsesAPIResponse,
+    ],
+    model: str,
+    custom_llm_provider: Optional[str],
+    call_type: Literal[
+        "embedding",
+        "aembedding",
+        "completion",
+        "acompletion",
+        "atext_completion",
+        "text_completion",
+        "image_generation",
+        "aimage_generation",
+        "moderation",
+        "amoderation",
+        "atranscription",
+        "transcription",
+        "aspeech",
+        "speech",
+        "rerank",
+        "arerank",
+    ],
+    optional_params: dict,
+    cache_hit: Optional[bool] = None,
+    base_model: Optional[str] = None,
+    custom_pricing: Optional[bool] = None,
+    prompt: str = "",
+) -> float:
+    """
+    Returns
+    - float or None: cost of response
+    """
+    try:
+        response_cost: float = 0.0
+        if cache_hit is not None and cache_hit is True:
+            response_cost = 0.0
+        else:
+            if isinstance(response_object, BaseModel):
+                response_object._hidden_params["optional_params"] = optional_params
+
+                if hasattr(response_object, "_hidden_params"):
+                    provider_response_cost = get_response_cost_from_hidden_params(
+                        response_object._hidden_params
+                    )
+                    if provider_response_cost is not None:
+                        return provider_response_cost
+
+            response_cost = completion_cost(
+                completion_response=response_object,
+                model=model,
+                call_type=call_type,
+                custom_llm_provider=custom_llm_provider,
+                optional_params=optional_params,
+                custom_pricing=custom_pricing,
+                base_model=base_model,
+                prompt=prompt,
+            )
+        return response_cost
+    except Exception as e:
+        raise e
+
+
+def rerank_cost(
+    model: str,
+    custom_llm_provider: Optional[str],
+    billed_units: Optional[RerankBilledUnits] = None,
+) -> Tuple[float, float]:
+    """
+    Returns
+    - float or None: cost of response OR none if error.
+    """
+    _, custom_llm_provider, _, _ = litellm.get_llm_provider(
+        model=model, custom_llm_provider=custom_llm_provider
+    )
+
+    try:
+        config = ProviderConfigManager.get_provider_rerank_config(
+            model=model,
+            api_base=None,
+            present_version_params=[],
+            provider=LlmProviders(custom_llm_provider),
+        )
+
+        try:
+            model_info: Optional[ModelInfo] = litellm.get_model_info(
+                model=model, custom_llm_provider=custom_llm_provider
+            )
+        except Exception:
+            model_info = None
+
+        return config.calculate_rerank_cost(
+            model=model,
+            custom_llm_provider=custom_llm_provider,
+            billed_units=billed_units,
+            model_info=model_info,
+        )
+    except Exception as e:
+        raise e
+
+
+def transcription_cost(
+    model: str, custom_llm_provider: Optional[str], duration: float
+) -> Tuple[float, float]:
+    return openai_cost_per_second(
+        model=model, custom_llm_provider=custom_llm_provider, duration=duration
+    )
+
+
+def default_image_cost_calculator(
+    model: str,
+    custom_llm_provider: Optional[str] = None,
+    quality: Optional[str] = None,
+    n: Optional[int] = 1,  # Default to 1 image
+    size: Optional[str] = "1024-x-1024",  # OpenAI default
+    optional_params: Optional[dict] = None,
+) -> float:
+    """
+    Default image cost calculator for image generation
+
+    Args:
+        model (str): Model name
+        image_response (ImageResponse): Response from image generation
+        quality (Optional[str]): Image quality setting
+        n (Optional[int]): Number of images generated
+        size (Optional[str]): Image size (e.g. "1024x1024" or "1024-x-1024")
+
+    Returns:
+        float: Cost in USD for the image generation
+
+    Raises:
+        Exception: If model pricing not found in cost map
+    """
+    # Standardize size format to use "-x-"
+    size_str: str = size or "1024-x-1024"
+    size_str = (
+        size_str.replace("x", "-x-")
+        if "x" in size_str and "-x-" not in size_str
+        else size_str
+    )
+
+    # Parse dimensions
+    height, width = map(int, size_str.split("-x-"))
+
+    # Build model names for cost lookup
+    base_model_name = f"{size_str}/{model}"
+    if custom_llm_provider and model.startswith(custom_llm_provider):
+        base_model_name = (
+            f"{custom_llm_provider}/{size_str}/{model.replace(custom_llm_provider, '')}"
+        )
+    model_name_with_quality = (
+        f"{quality}/{base_model_name}" if quality else base_model_name
+    )
+
+    verbose_logger.debug(
+        f"Looking up cost for models: {model_name_with_quality}, {base_model_name}"
+    )
+
+    # Try model with quality first, fall back to base model name
+    if model_name_with_quality in litellm.model_cost:
+        cost_info = litellm.model_cost[model_name_with_quality]
+    elif base_model_name in litellm.model_cost:
+        cost_info = litellm.model_cost[base_model_name]
+    else:
+        # Try without provider prefix
+        model_without_provider = f"{size_str}/{model.split('/')[-1]}"
+        model_with_quality_without_provider = (
+            f"{quality}/{model_without_provider}" if quality else model_without_provider
+        )
+
+        if model_with_quality_without_provider in litellm.model_cost:
+            cost_info = litellm.model_cost[model_with_quality_without_provider]
+        elif model_without_provider in litellm.model_cost:
+            cost_info = litellm.model_cost[model_without_provider]
+        else:
+            raise Exception(
+                f"Model not found in cost map. Tried {model_name_with_quality}, {base_model_name}, {model_with_quality_without_provider}, and {model_without_provider}"
+            )
+
+    return cost_info["input_cost_per_pixel"] * height * width * n
+
+
+def batch_cost_calculator(
+    usage: Usage,
+    model: str,
+    custom_llm_provider: Optional[str] = None,
+) -> Tuple[float, float]:
+    """
+    Calculate the cost of a batch job
+    """
+
+    _, custom_llm_provider, _, _ = litellm.get_llm_provider(
+        model=model, custom_llm_provider=custom_llm_provider
+    )
+
+    verbose_logger.info(
+        "Calculating batch cost per token. model=%s, custom_llm_provider=%s",
+        model,
+        custom_llm_provider,
+    )
+
+    try:
+        model_info: Optional[ModelInfo] = litellm.get_model_info(
+            model=model, custom_llm_provider=custom_llm_provider
+        )
+    except Exception:
+        model_info = None
+
+    if not model_info:
+        return 0.0, 0.0
+
+    input_cost_per_token_batches = model_info.get("input_cost_per_token_batches")
+    input_cost_per_token = model_info.get("input_cost_per_token")
+    output_cost_per_token_batches = model_info.get("output_cost_per_token_batches")
+    output_cost_per_token = model_info.get("output_cost_per_token")
+    total_prompt_cost = 0.0
+    total_completion_cost = 0.0
+    if input_cost_per_token_batches:
+        total_prompt_cost = usage.prompt_tokens * input_cost_per_token_batches
+    elif input_cost_per_token:
+        total_prompt_cost = (
+            usage.prompt_tokens * (input_cost_per_token) / 2
+        )  # batch cost is usually half of the regular token cost
+    if output_cost_per_token_batches:
+        total_completion_cost = usage.completion_tokens * output_cost_per_token_batches
+    elif output_cost_per_token:
+        total_completion_cost = (
+            usage.completion_tokens * (output_cost_per_token) / 2
+        )  # batch cost is usually half of the regular token cost
+
+    return total_prompt_cost, total_completion_cost