aboutsummaryrefslogtreecommitdiff
from typing import List, Optional, Union

from openai import OpenAI

import litellm
from litellm.llms.custom_httpx.http_handler import (
    AsyncHTTPHandler,
    HTTPHandler,
    get_async_httpx_client,
)
from litellm.llms.openai.openai import OpenAIChatCompletion
from litellm.types.llms.azure_ai import ImageEmbeddingRequest
from litellm.types.utils import EmbeddingResponse
from litellm.utils import convert_to_model_response_object

from .cohere_transformation import AzureAICohereConfig


class AzureAIEmbedding(OpenAIChatCompletion):

    def _process_response(
        self,
        image_embedding_responses: Optional[List],
        text_embedding_responses: Optional[List],
        image_embeddings_idx: List[int],
        model_response: EmbeddingResponse,
        input: List,
    ):
        combined_responses = []
        if (
            image_embedding_responses is not None
            and text_embedding_responses is not None
        ):
            # Combine and order the results
            text_idx = 0
            image_idx = 0

            for idx in range(len(input)):
                if idx in image_embeddings_idx:
                    combined_responses.append(image_embedding_responses[image_idx])
                    image_idx += 1
                else:
                    combined_responses.append(text_embedding_responses[text_idx])
                    text_idx += 1

            model_response.data = combined_responses
        elif image_embedding_responses is not None:
            model_response.data = image_embedding_responses
        elif text_embedding_responses is not None:
            model_response.data = text_embedding_responses

        response = AzureAICohereConfig()._transform_response(response=model_response)  # type: ignore

        return response

    async def async_image_embedding(
        self,
        model: str,
        data: ImageEmbeddingRequest,
        timeout: float,
        logging_obj,
        model_response: litellm.EmbeddingResponse,
        optional_params: dict,
        api_key: Optional[str],
        api_base: Optional[str],
        client: Optional[Union[HTTPHandler, AsyncHTTPHandler]] = None,
    ) -> EmbeddingResponse:
        if client is None or not isinstance(client, AsyncHTTPHandler):
            client = get_async_httpx_client(
                llm_provider=litellm.LlmProviders.AZURE_AI,
                params={"timeout": timeout},
            )

        url = "{}/images/embeddings".format(api_base)

        response = await client.post(
            url=url,
            json=data,  # type: ignore
            headers={"Authorization": "Bearer {}".format(api_key)},
        )

        embedding_response = response.json()
        embedding_headers = dict(response.headers)
        returned_response: EmbeddingResponse = convert_to_model_response_object(  # type: ignore
            response_object=embedding_response,
            model_response_object=model_response,
            response_type="embedding",
            stream=False,
            _response_headers=embedding_headers,
        )
        return returned_response

    def image_embedding(
        self,
        model: str,
        data: ImageEmbeddingRequest,
        timeout: float,
        logging_obj,
        model_response: EmbeddingResponse,
        optional_params: dict,
        api_key: Optional[str],
        api_base: Optional[str],
        client: Optional[Union[HTTPHandler, AsyncHTTPHandler]] = None,
    ):
        if api_base is None:
            raise ValueError(
                "api_base is None. Please set AZURE_AI_API_BASE or dynamically via `api_base` param, to make the request."
            )
        if api_key is None:
            raise ValueError(
                "api_key is None. Please set AZURE_AI_API_KEY or dynamically via `api_key` param, to make the request."
            )

        if client is None or not isinstance(client, HTTPHandler):
            client = HTTPHandler(timeout=timeout, concurrent_limit=1)

        url = "{}/images/embeddings".format(api_base)

        response = client.post(
            url=url,
            json=data,  # type: ignore
            headers={"Authorization": "Bearer {}".format(api_key)},
        )

        embedding_response = response.json()
        embedding_headers = dict(response.headers)
        returned_response: EmbeddingResponse = convert_to_model_response_object(  # type: ignore
            response_object=embedding_response,
            model_response_object=model_response,
            response_type="embedding",
            stream=False,
            _response_headers=embedding_headers,
        )
        return returned_response

    async def async_embedding(
        self,
        model: str,
        input: List,
        timeout: float,
        logging_obj,
        model_response: litellm.EmbeddingResponse,
        optional_params: dict,
        api_key: Optional[str] = None,
        api_base: Optional[str] = None,
        client=None,
    ) -> EmbeddingResponse:

        (
            image_embeddings_request,
            v1_embeddings_request,
            image_embeddings_idx,
        ) = AzureAICohereConfig()._transform_request(
            input=input, optional_params=optional_params, model=model
        )

        image_embedding_responses: Optional[List] = None
        text_embedding_responses: Optional[List] = None

        if image_embeddings_request["input"]:
            image_response = await self.async_image_embedding(
                model=model,
                data=image_embeddings_request,
                timeout=timeout,
                logging_obj=logging_obj,
                model_response=model_response,
                optional_params=optional_params,
                api_key=api_key,
                api_base=api_base,
                client=client,
            )

            image_embedding_responses = image_response.data
            if image_embedding_responses is None:
                raise Exception("/image/embeddings route returned None Embeddings.")

        if v1_embeddings_request["input"]:
            response: EmbeddingResponse = await super().embedding(  # type: ignore
                model=model,
                input=input,
                timeout=timeout,
                logging_obj=logging_obj,
                model_response=model_response,
                optional_params=optional_params,
                api_key=api_key,
                api_base=api_base,
                client=client,
                aembedding=True,
            )
            text_embedding_responses = response.data
            if text_embedding_responses is None:
                raise Exception("/v1/embeddings route returned None Embeddings.")

        return self._process_response(
            image_embedding_responses=image_embedding_responses,
            text_embedding_responses=text_embedding_responses,
            image_embeddings_idx=image_embeddings_idx,
            model_response=model_response,
            input=input,
        )

    def embedding(
        self,
        model: str,
        input: List,
        timeout: float,
        logging_obj,
        model_response: EmbeddingResponse,
        optional_params: dict,
        api_key: Optional[str] = None,
        api_base: Optional[str] = None,
        client=None,
        aembedding=None,
        max_retries: Optional[int] = None,
    ) -> EmbeddingResponse:
        """
        - Separate image url from text
        -> route image url call to `/image/embeddings`
        -> route text call to `/v1/embeddings` (OpenAI route)

        assemble result in-order, and return
        """
        if aembedding is True:
            return self.async_embedding(  # type: ignore
                model,
                input,
                timeout,
                logging_obj,
                model_response,
                optional_params,
                api_key,
                api_base,
                client,
            )

        (
            image_embeddings_request,
            v1_embeddings_request,
            image_embeddings_idx,
        ) = AzureAICohereConfig()._transform_request(
            input=input, optional_params=optional_params, model=model
        )

        image_embedding_responses: Optional[List] = None
        text_embedding_responses: Optional[List] = None

        if image_embeddings_request["input"]:
            image_response = self.image_embedding(
                model=model,
                data=image_embeddings_request,
                timeout=timeout,
                logging_obj=logging_obj,
                model_response=model_response,
                optional_params=optional_params,
                api_key=api_key,
                api_base=api_base,
                client=client,
            )

            image_embedding_responses = image_response.data
            if image_embedding_responses is None:
                raise Exception("/image/embeddings route returned None Embeddings.")

        if v1_embeddings_request["input"]:
            response: EmbeddingResponse = super().embedding(  # type: ignore
                model,
                input,
                timeout,
                logging_obj,
                model_response,
                optional_params,
                api_key,
                api_base,
                client=(
                    client
                    if client is not None and isinstance(client, OpenAI)
                    else None
                ),
                aembedding=aembedding,
            )

            text_embedding_responses = response.data
            if text_embedding_responses is None:
                raise Exception("/v1/embeddings route returned None Embeddings.")

        return self._process_response(
            image_embedding_responses=image_embedding_responses,
            text_embedding_responses=text_embedding_responses,
            image_embeddings_idx=image_embeddings_idx,
            model_response=model_response,
            input=input,
        )