aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/llms/cohere/embed/handler.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/llms/cohere/embed/handler.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/cohere/embed/handler.py178
1 files changed, 178 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/cohere/embed/handler.py b/.venv/lib/python3.12/site-packages/litellm/llms/cohere/embed/handler.py
new file mode 100644
index 00000000..e7f22ea7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/cohere/embed/handler.py
@@ -0,0 +1,178 @@
+import json
+from typing import Any, Callable, Optional, Union
+
+import httpx
+
+import litellm
+from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
+from litellm.llms.custom_httpx.http_handler import (
+ AsyncHTTPHandler,
+ HTTPHandler,
+ get_async_httpx_client,
+)
+from litellm.types.llms.bedrock import CohereEmbeddingRequest
+from litellm.types.utils import EmbeddingResponse
+
+from .transformation import CohereEmbeddingConfig
+
+
+def validate_environment(api_key, headers: dict):
+ headers.update(
+ {
+ "Request-Source": "unspecified:litellm",
+ "accept": "application/json",
+ "content-type": "application/json",
+ }
+ )
+ if api_key:
+ headers["Authorization"] = f"Bearer {api_key}"
+ return headers
+
+
+class CohereError(Exception):
+ def __init__(self, status_code, message):
+ self.status_code = status_code
+ self.message = message
+ self.request = httpx.Request(
+ method="POST", url="https://api.cohere.ai/v1/generate"
+ )
+ self.response = httpx.Response(status_code=status_code, request=self.request)
+ super().__init__(
+ self.message
+ ) # Call the base class constructor with the parameters it needs
+
+
+async def async_embedding(
+ model: str,
+ data: Union[dict, CohereEmbeddingRequest],
+ input: list,
+ model_response: litellm.utils.EmbeddingResponse,
+ timeout: Optional[Union[float, httpx.Timeout]],
+ logging_obj: LiteLLMLoggingObj,
+ optional_params: dict,
+ api_base: str,
+ api_key: Optional[str],
+ headers: dict,
+ encoding: Callable,
+ client: Optional[AsyncHTTPHandler] = None,
+):
+
+ ## LOGGING
+ logging_obj.pre_call(
+ input=input,
+ api_key=api_key,
+ additional_args={
+ "complete_input_dict": data,
+ "headers": headers,
+ "api_base": api_base,
+ },
+ )
+ ## COMPLETION CALL
+
+ if client is None:
+ client = get_async_httpx_client(
+ llm_provider=litellm.LlmProviders.COHERE,
+ params={"timeout": timeout},
+ )
+
+ try:
+ response = await client.post(api_base, headers=headers, data=json.dumps(data))
+ except httpx.HTTPStatusError as e:
+ ## LOGGING
+ logging_obj.post_call(
+ input=input,
+ api_key=api_key,
+ additional_args={"complete_input_dict": data},
+ original_response=e.response.text,
+ )
+ raise e
+ except Exception as e:
+ ## LOGGING
+ logging_obj.post_call(
+ input=input,
+ api_key=api_key,
+ additional_args={"complete_input_dict": data},
+ original_response=str(e),
+ )
+ raise e
+
+ ## PROCESS RESPONSE ##
+ return CohereEmbeddingConfig()._transform_response(
+ response=response,
+ api_key=api_key,
+ logging_obj=logging_obj,
+ data=data,
+ model_response=model_response,
+ model=model,
+ encoding=encoding,
+ input=input,
+ )
+
+
+def embedding(
+ model: str,
+ input: list,
+ model_response: EmbeddingResponse,
+ logging_obj: LiteLLMLoggingObj,
+ optional_params: dict,
+ headers: dict,
+ encoding: Any,
+ data: Optional[Union[dict, CohereEmbeddingRequest]] = None,
+ complete_api_base: Optional[str] = None,
+ api_key: Optional[str] = None,
+ aembedding: Optional[bool] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = httpx.Timeout(None),
+ client: Optional[Union[HTTPHandler, AsyncHTTPHandler]] = None,
+):
+ headers = validate_environment(api_key, headers=headers)
+ embed_url = complete_api_base or "https://api.cohere.ai/v1/embed"
+ model = model
+
+ data = data or CohereEmbeddingConfig()._transform_request(
+ model=model, input=input, inference_params=optional_params
+ )
+
+ ## ROUTING
+ if aembedding is True:
+ return async_embedding(
+ model=model,
+ data=data,
+ input=input,
+ model_response=model_response,
+ timeout=timeout,
+ logging_obj=logging_obj,
+ optional_params=optional_params,
+ api_base=embed_url,
+ api_key=api_key,
+ headers=headers,
+ encoding=encoding,
+ client=(
+ client
+ if client is not None and isinstance(client, AsyncHTTPHandler)
+ else None
+ ),
+ )
+
+ ## LOGGING
+ logging_obj.pre_call(
+ input=input,
+ api_key=api_key,
+ additional_args={"complete_input_dict": data},
+ )
+
+ ## COMPLETION CALL
+ if client is None or not isinstance(client, HTTPHandler):
+ client = HTTPHandler(concurrent_limit=1)
+
+ response = client.post(embed_url, headers=headers, data=json.dumps(data))
+
+ return CohereEmbeddingConfig()._transform_response(
+ response=response,
+ api_key=api_key,
+ logging_obj=logging_obj,
+ data=data,
+ model_response=model_response,
+ model=model,
+ encoding=encoding,
+ input=input,
+ )