diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/llms/sagemaker/completion/handler.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/llms/sagemaker/completion/handler.py | 701 |
1 files changed, 701 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/sagemaker/completion/handler.py b/.venv/lib/python3.12/site-packages/litellm/llms/sagemaker/completion/handler.py new file mode 100644 index 00000000..909caf73 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/llms/sagemaker/completion/handler.py @@ -0,0 +1,701 @@ +import json +from copy import deepcopy +from typing import Any, Callable, List, Optional, Union + +import httpx + +import litellm +from litellm._logging import verbose_logger +from litellm.litellm_core_utils.asyncify import asyncify +from litellm.llms.bedrock.base_aws_llm import BaseAWSLLM +from litellm.llms.custom_httpx.http_handler import ( + _get_httpx_client, + get_async_httpx_client, +) +from litellm.types.llms.openai import AllMessageValues +from litellm.utils import ( + CustomStreamWrapper, + EmbeddingResponse, + ModelResponse, + Usage, + get_secret, +) + +from ..common_utils import AWSEventStreamDecoder, SagemakerError +from .transformation import SagemakerConfig + +sagemaker_config = SagemakerConfig() + +""" +SAGEMAKER AUTH Keys/Vars +os.environ['AWS_ACCESS_KEY_ID'] = "" +os.environ['AWS_SECRET_ACCESS_KEY'] = "" +""" + + +# set os.environ['AWS_REGION_NAME'] = <your-region_name> +class SagemakerLLM(BaseAWSLLM): + + def _load_credentials( + self, + optional_params: dict, + ): + try: + from botocore.credentials import Credentials + except ImportError: + raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") + ## CREDENTIALS ## + # pop aws_secret_access_key, aws_access_key_id, aws_session_token, aws_region_name from kwargs, since completion calls fail with them + aws_secret_access_key = optional_params.pop("aws_secret_access_key", None) + aws_access_key_id = optional_params.pop("aws_access_key_id", None) + aws_session_token = optional_params.pop("aws_session_token", None) + aws_region_name = optional_params.pop("aws_region_name", None) + aws_role_name = optional_params.pop("aws_role_name", None) + aws_session_name = optional_params.pop("aws_session_name", None) + aws_profile_name = optional_params.pop("aws_profile_name", None) + optional_params.pop( + "aws_bedrock_runtime_endpoint", None + ) # https://bedrock-runtime.{region_name}.amazonaws.com + aws_web_identity_token = optional_params.pop("aws_web_identity_token", None) + aws_sts_endpoint = optional_params.pop("aws_sts_endpoint", None) + + ### SET REGION NAME ### + if aws_region_name is None: + # check env # + litellm_aws_region_name = get_secret("AWS_REGION_NAME", None) + + if litellm_aws_region_name is not None and isinstance( + litellm_aws_region_name, str + ): + aws_region_name = litellm_aws_region_name + + standard_aws_region_name = get_secret("AWS_REGION", None) + if standard_aws_region_name is not None and isinstance( + standard_aws_region_name, str + ): + aws_region_name = standard_aws_region_name + + if aws_region_name is None: + aws_region_name = "us-west-2" + + credentials: Credentials = self.get_credentials( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_session_token=aws_session_token, + aws_region_name=aws_region_name, + aws_session_name=aws_session_name, + aws_profile_name=aws_profile_name, + aws_role_name=aws_role_name, + aws_web_identity_token=aws_web_identity_token, + aws_sts_endpoint=aws_sts_endpoint, + ) + return credentials, aws_region_name + + def _prepare_request( + self, + credentials, + model: str, + data: dict, + messages: List[AllMessageValues], + optional_params: dict, + aws_region_name: str, + extra_headers: Optional[dict] = None, + ): + try: + from botocore.auth import SigV4Auth + from botocore.awsrequest import AWSRequest + except ImportError: + raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") + + sigv4 = SigV4Auth(credentials, "sagemaker", aws_region_name) + if optional_params.get("stream") is True: + api_base = f"https://runtime.sagemaker.{aws_region_name}.amazonaws.com/endpoints/{model}/invocations-response-stream" + else: + api_base = f"https://runtime.sagemaker.{aws_region_name}.amazonaws.com/endpoints/{model}/invocations" + + sagemaker_base_url = optional_params.get("sagemaker_base_url", None) + if sagemaker_base_url is not None: + api_base = sagemaker_base_url + + encoded_data = json.dumps(data).encode("utf-8") + headers = sagemaker_config.validate_environment( + headers=extra_headers, + model=model, + messages=messages, + optional_params=optional_params, + ) + request = AWSRequest( + method="POST", url=api_base, data=encoded_data, headers=headers + ) + sigv4.add_auth(request) + if ( + extra_headers is not None and "Authorization" in extra_headers + ): # prevent sigv4 from overwriting the auth header + request.headers["Authorization"] = extra_headers["Authorization"] + + prepped_request = request.prepare() + + return prepped_request + + def completion( # noqa: PLR0915 + self, + model: str, + messages: list, + model_response: ModelResponse, + print_verbose: Callable, + encoding, + logging_obj, + optional_params: dict, + litellm_params: dict, + timeout: Optional[Union[float, httpx.Timeout]] = None, + custom_prompt_dict={}, + hf_model_name=None, + logger_fn=None, + acompletion: bool = False, + headers: dict = {}, + ): + + # pop streaming if it's in the optional params as 'stream' raises an error with sagemaker + credentials, aws_region_name = self._load_credentials(optional_params) + inference_params = deepcopy(optional_params) + stream = inference_params.pop("stream", None) + model_id = optional_params.get("model_id", None) + + ## Load Config + config = litellm.SagemakerConfig.get_config() + for k, v in config.items(): + if ( + k not in inference_params + ): # completion(top_k=3) > sagemaker_config(top_k=3) <- allows for dynamic variables to be passed in + inference_params[k] = v + + if stream is True: + if acompletion is True: + response = self.async_streaming( + messages=messages, + model=model, + custom_prompt_dict=custom_prompt_dict, + hf_model_name=hf_model_name, + optional_params=optional_params, + encoding=encoding, + model_response=model_response, + logging_obj=logging_obj, + model_id=model_id, + aws_region_name=aws_region_name, + credentials=credentials, + headers=headers, + litellm_params=litellm_params, + ) + return response + else: + data = sagemaker_config.transform_request( + model=model, + messages=messages, + optional_params=optional_params, + litellm_params=litellm_params, + headers=headers, + ) + prepared_request = self._prepare_request( + model=model, + data=data, + messages=messages, + optional_params=optional_params, + credentials=credentials, + aws_region_name=aws_region_name, + ) + if model_id is not None: + # Add model_id as InferenceComponentName header + # boto3 doc: https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_runtime_InvokeEndpoint.html + prepared_request.headers.update( + {"X-Amzn-SageMaker-Inference-Component": model_id} + ) + sync_handler = _get_httpx_client() + sync_response = sync_handler.post( + url=prepared_request.url, + headers=prepared_request.headers, # type: ignore + data=prepared_request.body, + stream=stream, + ) + + if sync_response.status_code != 200: + raise SagemakerError( + status_code=sync_response.status_code, + message=str(sync_response.read()), + ) + + decoder = AWSEventStreamDecoder(model="") + + completion_stream = decoder.iter_bytes( + sync_response.iter_bytes(chunk_size=1024) + ) + streaming_response = CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="sagemaker", + logging_obj=logging_obj, + ) + + ## LOGGING + logging_obj.post_call( + input=messages, + api_key="", + original_response=streaming_response, + additional_args={"complete_input_dict": data}, + ) + return streaming_response + + # Non-Streaming Requests + + # Async completion + if acompletion is True: + return self.async_completion( + messages=messages, + model=model, + custom_prompt_dict=custom_prompt_dict, + hf_model_name=hf_model_name, + model_response=model_response, + encoding=encoding, + logging_obj=logging_obj, + model_id=model_id, + optional_params=optional_params, + credentials=credentials, + aws_region_name=aws_region_name, + headers=headers, + litellm_params=litellm_params, + ) + + ## Non-Streaming completion CALL + _data = sagemaker_config.transform_request( + model=model, + messages=messages, + optional_params=optional_params, + litellm_params=litellm_params, + headers=headers, + ) + prepared_request_args = { + "model": model, + "data": _data, + "optional_params": optional_params, + "credentials": credentials, + "aws_region_name": aws_region_name, + "messages": messages, + } + prepared_request = self._prepare_request(**prepared_request_args) + try: + if model_id is not None: + # Add model_id as InferenceComponentName header + # boto3 doc: https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_runtime_InvokeEndpoint.html + prepared_request.headers.update( + {"X-Amzn-SageMaker-Inference-Component": model_id} + ) + + ## LOGGING + timeout = 300.0 + sync_handler = _get_httpx_client() + ## LOGGING + logging_obj.pre_call( + input=[], + api_key="", + additional_args={ + "complete_input_dict": _data, + "api_base": prepared_request.url, + "headers": prepared_request.headers, + }, + ) + + # make sync httpx post request here + try: + sync_response = sync_handler.post( + url=prepared_request.url, + headers=prepared_request.headers, # type: ignore + data=prepared_request.body, + timeout=timeout, + ) + + if sync_response.status_code != 200: + raise SagemakerError( + status_code=sync_response.status_code, + message=sync_response.text, + ) + except Exception as e: + ## LOGGING + logging_obj.post_call( + input=[], + api_key="", + original_response=str(e), + additional_args={"complete_input_dict": _data}, + ) + raise e + except Exception as e: + verbose_logger.error("Sagemaker error %s", str(e)) + status_code = ( + getattr(e, "response", {}) + .get("ResponseMetadata", {}) + .get("HTTPStatusCode", 500) + ) + error_message = ( + getattr(e, "response", {}).get("Error", {}).get("Message", str(e)) + ) + if "Inference Component Name header is required" in error_message: + error_message += "\n pass in via `litellm.completion(..., model_id={InferenceComponentName})`" + raise SagemakerError(status_code=status_code, message=error_message) + + return sagemaker_config.transform_response( + model=model, + raw_response=sync_response, + model_response=model_response, + logging_obj=logging_obj, + request_data=_data, + messages=messages, + optional_params=optional_params, + encoding=encoding, + litellm_params=litellm_params, + ) + + async def make_async_call( + self, + api_base: str, + headers: dict, + data: str, + logging_obj, + client=None, + ): + try: + if client is None: + client = get_async_httpx_client( + llm_provider=litellm.LlmProviders.SAGEMAKER + ) # Create a new client if none provided + response = await client.post( + api_base, + headers=headers, + data=data, + stream=True, + ) + + if response.status_code != 200: + raise SagemakerError( + status_code=response.status_code, message=response.text + ) + + decoder = AWSEventStreamDecoder(model="") + completion_stream = decoder.aiter_bytes( + response.aiter_bytes(chunk_size=1024) + ) + + return completion_stream + + # LOGGING + logging_obj.post_call( + input=[], + api_key="", + original_response="first stream response received", + additional_args={"complete_input_dict": data}, + ) + + except httpx.HTTPStatusError as err: + error_code = err.response.status_code + raise SagemakerError(status_code=error_code, message=err.response.text) + except httpx.TimeoutException: + raise SagemakerError(status_code=408, message="Timeout error occurred.") + except Exception as e: + raise SagemakerError(status_code=500, message=str(e)) + + async def async_streaming( + self, + messages: List[AllMessageValues], + model: str, + custom_prompt_dict: dict, + hf_model_name: Optional[str], + credentials, + aws_region_name: str, + optional_params, + encoding, + model_response: ModelResponse, + model_id: Optional[str], + logging_obj: Any, + litellm_params: dict, + headers: dict, + ): + data = await sagemaker_config.async_transform_request( + model=model, + messages=messages, + optional_params={**optional_params, "stream": True}, + litellm_params=litellm_params, + headers=headers, + ) + asyncified_prepare_request = asyncify(self._prepare_request) + prepared_request_args = { + "model": model, + "data": data, + "optional_params": optional_params, + "credentials": credentials, + "aws_region_name": aws_region_name, + "messages": messages, + } + prepared_request = await asyncified_prepare_request(**prepared_request_args) + if model_id is not None: # Fixes https://github.com/BerriAI/litellm/issues/8889 + prepared_request.headers.update( + {"X-Amzn-SageMaker-Inference-Component": model_id} + ) + completion_stream = await self.make_async_call( + api_base=prepared_request.url, + headers=prepared_request.headers, # type: ignore + data=prepared_request.body, + logging_obj=logging_obj, + ) + streaming_response = CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="sagemaker", + logging_obj=logging_obj, + ) + + # LOGGING + logging_obj.post_call( + input=[], + api_key="", + original_response="first stream response received", + additional_args={"complete_input_dict": data}, + ) + + return streaming_response + + async def async_completion( + self, + messages: List[AllMessageValues], + model: str, + custom_prompt_dict: dict, + hf_model_name: Optional[str], + credentials, + aws_region_name: str, + encoding, + model_response: ModelResponse, + optional_params: dict, + logging_obj: Any, + model_id: Optional[str], + headers: dict, + litellm_params: dict, + ): + timeout = 300.0 + async_handler = get_async_httpx_client( + llm_provider=litellm.LlmProviders.SAGEMAKER + ) + + data = await sagemaker_config.async_transform_request( + model=model, + messages=messages, + optional_params=optional_params, + litellm_params=litellm_params, + headers=headers, + ) + + asyncified_prepare_request = asyncify(self._prepare_request) + prepared_request_args = { + "model": model, + "data": data, + "optional_params": optional_params, + "credentials": credentials, + "aws_region_name": aws_region_name, + "messages": messages, + } + + prepared_request = await asyncified_prepare_request(**prepared_request_args) + ## LOGGING + logging_obj.pre_call( + input=[], + api_key="", + additional_args={ + "complete_input_dict": data, + "api_base": prepared_request.url, + "headers": prepared_request.headers, + }, + ) + try: + if model_id is not None: + # Add model_id as InferenceComponentName header + # boto3 doc: https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_runtime_InvokeEndpoint.html + prepared_request.headers.update( + {"X-Amzn-SageMaker-Inference-Component": model_id} + ) + # make async httpx post request here + try: + response = await async_handler.post( + url=prepared_request.url, + headers=prepared_request.headers, # type: ignore + data=prepared_request.body, + timeout=timeout, + ) + + if response.status_code != 200: + raise SagemakerError( + status_code=response.status_code, message=response.text + ) + except Exception as e: + ## LOGGING + logging_obj.post_call( + input=data["inputs"], + api_key="", + original_response=str(e), + additional_args={"complete_input_dict": data}, + ) + raise e + except Exception as e: + error_message = f"{str(e)}" + if "Inference Component Name header is required" in error_message: + error_message += "\n pass in via `litellm.completion(..., model_id={InferenceComponentName})`" + raise SagemakerError(status_code=500, message=error_message) + return sagemaker_config.transform_response( + model=model, + raw_response=response, + model_response=model_response, + logging_obj=logging_obj, + request_data=data, + messages=messages, + optional_params=optional_params, + encoding=encoding, + litellm_params=litellm_params, + ) + + def embedding( + self, + model: str, + input: list, + model_response: EmbeddingResponse, + print_verbose: Callable, + encoding, + logging_obj, + optional_params: dict, + custom_prompt_dict={}, + litellm_params=None, + logger_fn=None, + ): + """ + Supports Huggingface Jumpstart embeddings like GPT-6B + """ + ### BOTO3 INIT + import boto3 + + # pop aws_secret_access_key, aws_access_key_id, aws_region_name from kwargs, since completion calls fail with them + aws_secret_access_key = optional_params.pop("aws_secret_access_key", None) + aws_access_key_id = optional_params.pop("aws_access_key_id", None) + aws_region_name = optional_params.pop("aws_region_name", None) + + if aws_access_key_id is not None: + # uses auth params passed to completion + # aws_access_key_id is not None, assume user is trying to auth using litellm.completion + client = boto3.client( + service_name="sagemaker-runtime", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=aws_region_name, + ) + else: + # aws_access_key_id is None, assume user is trying to auth using env variables + # boto3 automaticaly reads env variables + + # we need to read region name from env + # I assume majority of users use .env for auth + region_name = ( + get_secret("AWS_REGION_NAME") + or aws_region_name # get region from config file if specified + or "us-west-2" # default to us-west-2 if region not specified + ) + client = boto3.client( + service_name="sagemaker-runtime", + region_name=region_name, + ) + + # pop streaming if it's in the optional params as 'stream' raises an error with sagemaker + inference_params = deepcopy(optional_params) + inference_params.pop("stream", None) + + ## Load Config + config = litellm.SagemakerConfig.get_config() + for k, v in config.items(): + if ( + k not in inference_params + ): # completion(top_k=3) > sagemaker_config(top_k=3) <- allows for dynamic variables to be passed in + inference_params[k] = v + + #### HF EMBEDDING LOGIC + data = json.dumps({"text_inputs": input}).encode("utf-8") + + ## LOGGING + request_str = f""" + response = client.invoke_endpoint( + EndpointName={model}, + ContentType="application/json", + Body={data}, # type: ignore + CustomAttributes="accept_eula=true", + )""" # type: ignore + logging_obj.pre_call( + input=input, + api_key="", + additional_args={"complete_input_dict": data, "request_str": request_str}, + ) + ## EMBEDDING CALL + try: + response = client.invoke_endpoint( + EndpointName=model, + ContentType="application/json", + Body=data, + CustomAttributes="accept_eula=true", + ) + except Exception as e: + status_code = ( + getattr(e, "response", {}) + .get("ResponseMetadata", {}) + .get("HTTPStatusCode", 500) + ) + error_message = ( + getattr(e, "response", {}).get("Error", {}).get("Message", str(e)) + ) + raise SagemakerError(status_code=status_code, message=error_message) + + response = json.loads(response["Body"].read().decode("utf8")) + ## LOGGING + logging_obj.post_call( + input=input, + api_key="", + original_response=response, + additional_args={"complete_input_dict": data}, + ) + + print_verbose(f"raw model_response: {response}") + if "embedding" not in response: + raise SagemakerError( + status_code=500, message="embedding not found in response" + ) + embeddings = response["embedding"] + + if not isinstance(embeddings, list): + raise SagemakerError( + status_code=422, + message=f"Response not in expected format - {embeddings}", + ) + + output_data = [] + for idx, embedding in enumerate(embeddings): + output_data.append( + {"object": "embedding", "index": idx, "embedding": embedding} + ) + + model_response.object = "list" + model_response.data = output_data + model_response.model = model + + input_tokens = 0 + for text in input: + input_tokens += len(encoding.encode(text)) + + setattr( + model_response, + "usage", + Usage( + prompt_tokens=input_tokens, + completion_tokens=0, + total_tokens=input_tokens, + ), + ) + + return model_response |