diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/batches | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/batches')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py | 182 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/batches/main.py | 794 |
2 files changed, 976 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py b/.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py new file mode 100644 index 00000000..af53304e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py @@ -0,0 +1,182 @@ +import json +from typing import Any, List, Literal, Tuple + +import litellm +from litellm._logging import verbose_logger +from litellm.types.llms.openai import Batch +from litellm.types.utils import CallTypes, Usage + + +async def _handle_completed_batch( + batch: Batch, + custom_llm_provider: Literal["openai", "azure", "vertex_ai"], +) -> Tuple[float, Usage, List[str]]: + """Helper function to process a completed batch and handle logging""" + # Get batch results + file_content_dictionary = await _get_batch_output_file_content_as_dictionary( + batch, custom_llm_provider + ) + + # Calculate costs and usage + batch_cost = await _batch_cost_calculator( + custom_llm_provider=custom_llm_provider, + file_content_dictionary=file_content_dictionary, + ) + batch_usage = _get_batch_job_total_usage_from_file_content( + file_content_dictionary=file_content_dictionary, + custom_llm_provider=custom_llm_provider, + ) + + batch_models = _get_batch_models_from_file_content(file_content_dictionary) + + return batch_cost, batch_usage, batch_models + + +def _get_batch_models_from_file_content( + file_content_dictionary: List[dict], +) -> List[str]: + """ + Get the models from the file content + """ + batch_models = [] + for _item in file_content_dictionary: + if _batch_response_was_successful(_item): + _response_body = _get_response_from_batch_job_output_file(_item) + _model = _response_body.get("model") + if _model: + batch_models.append(_model) + return batch_models + + +async def _batch_cost_calculator( + file_content_dictionary: List[dict], + custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai", +) -> float: + """ + Calculate the cost of a batch based on the output file id + """ + if custom_llm_provider == "vertex_ai": + raise ValueError("Vertex AI does not support file content retrieval") + total_cost = _get_batch_job_cost_from_file_content( + file_content_dictionary=file_content_dictionary, + custom_llm_provider=custom_llm_provider, + ) + verbose_logger.debug("total_cost=%s", total_cost) + return total_cost + + +async def _get_batch_output_file_content_as_dictionary( + batch: Batch, + custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai", +) -> List[dict]: + """ + Get the batch output file content as a list of dictionaries + """ + from litellm.files.main import afile_content + + if custom_llm_provider == "vertex_ai": + raise ValueError("Vertex AI does not support file content retrieval") + + if batch.output_file_id is None: + raise ValueError("Output file id is None cannot retrieve file content") + + _file_content = await afile_content( + file_id=batch.output_file_id, + custom_llm_provider=custom_llm_provider, + ) + return _get_file_content_as_dictionary(_file_content.content) + + +def _get_file_content_as_dictionary(file_content: bytes) -> List[dict]: + """ + Get the file content as a list of dictionaries from JSON Lines format + """ + try: + _file_content_str = file_content.decode("utf-8") + # Split by newlines and parse each line as a separate JSON object + json_objects = [] + for line in _file_content_str.strip().split("\n"): + if line: # Skip empty lines + json_objects.append(json.loads(line)) + verbose_logger.debug("json_objects=%s", json.dumps(json_objects, indent=4)) + return json_objects + except Exception as e: + raise e + + +def _get_batch_job_cost_from_file_content( + file_content_dictionary: List[dict], + custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai", +) -> float: + """ + Get the cost of a batch job from the file content + """ + try: + total_cost: float = 0.0 + # parse the file content as json + verbose_logger.debug( + "file_content_dictionary=%s", json.dumps(file_content_dictionary, indent=4) + ) + for _item in file_content_dictionary: + if _batch_response_was_successful(_item): + _response_body = _get_response_from_batch_job_output_file(_item) + total_cost += litellm.completion_cost( + completion_response=_response_body, + custom_llm_provider=custom_llm_provider, + call_type=CallTypes.aretrieve_batch.value, + ) + verbose_logger.debug("total_cost=%s", total_cost) + return total_cost + except Exception as e: + verbose_logger.error("error in _get_batch_job_cost_from_file_content", e) + raise e + + +def _get_batch_job_total_usage_from_file_content( + file_content_dictionary: List[dict], + custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai", +) -> Usage: + """ + Get the tokens of a batch job from the file content + """ + total_tokens: int = 0 + prompt_tokens: int = 0 + completion_tokens: int = 0 + for _item in file_content_dictionary: + if _batch_response_was_successful(_item): + _response_body = _get_response_from_batch_job_output_file(_item) + usage: Usage = _get_batch_job_usage_from_response_body(_response_body) + total_tokens += usage.total_tokens + prompt_tokens += usage.prompt_tokens + completion_tokens += usage.completion_tokens + return Usage( + total_tokens=total_tokens, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + ) + + +def _get_batch_job_usage_from_response_body(response_body: dict) -> Usage: + """ + Get the tokens of a batch job from the response body + """ + _usage_dict = response_body.get("usage", None) or {} + usage: Usage = Usage(**_usage_dict) + return usage + + +def _get_response_from_batch_job_output_file(batch_job_output_file: dict) -> Any: + """ + Get the response from the batch job output file + """ + _response: dict = batch_job_output_file.get("response", None) or {} + _response_body = _response.get("body", None) or {} + return _response_body + + +def _batch_response_was_successful(batch_job_output_file: dict) -> bool: + """ + Check if the batch job response status == 200 + """ + _response: dict = batch_job_output_file.get("response", None) or {} + return _response.get("status_code", None) == 200 diff --git a/.venv/lib/python3.12/site-packages/litellm/batches/main.py b/.venv/lib/python3.12/site-packages/litellm/batches/main.py new file mode 100644 index 00000000..1ddcafce --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/batches/main.py @@ -0,0 +1,794 @@ +""" +Main File for Batches API implementation + +https://platform.openai.com/docs/api-reference/batch + +- create_batch() +- retrieve_batch() +- cancel_batch() +- list_batch() + +""" + +import asyncio +import contextvars +import os +from functools import partial +from typing import Any, Coroutine, Dict, Literal, Optional, Union + +import httpx + +import litellm +from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj +from litellm.llms.azure.batches.handler import AzureBatchesAPI +from litellm.llms.openai.openai import OpenAIBatchesAPI +from litellm.llms.vertex_ai.batches.handler import VertexAIBatchPrediction +from litellm.secret_managers.main import get_secret_str +from litellm.types.llms.openai import ( + Batch, + CancelBatchRequest, + CreateBatchRequest, + RetrieveBatchRequest, +) +from litellm.types.router import GenericLiteLLMParams +from litellm.types.utils import LiteLLMBatch +from litellm.utils import client, get_litellm_params, supports_httpx_timeout + +####### ENVIRONMENT VARIABLES ################### +openai_batches_instance = OpenAIBatchesAPI() +azure_batches_instance = AzureBatchesAPI() +vertex_ai_batches_instance = VertexAIBatchPrediction(gcs_bucket_name="") +################################################# + + +@client +async def acreate_batch( + completion_window: Literal["24h"], + endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"], + input_file_id: str, + custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai", + metadata: Optional[Dict[str, str]] = None, + extra_headers: Optional[Dict[str, str]] = None, + extra_body: Optional[Dict[str, str]] = None, + **kwargs, +) -> Batch: + """ + Async: Creates and executes a batch from an uploaded file of request + + LiteLLM Equivalent of POST: https://api.openai.com/v1/batches + """ + try: + loop = asyncio.get_event_loop() + kwargs["acreate_batch"] = True + + # Use a partial function to pass your keyword arguments + func = partial( + create_batch, + completion_window, + endpoint, + input_file_id, + custom_llm_provider, + metadata, + extra_headers, + extra_body, + **kwargs, + ) + + # Add the context to the function + ctx = contextvars.copy_context() + func_with_context = partial(ctx.run, func) + init_response = await loop.run_in_executor(None, func_with_context) + + if asyncio.iscoroutine(init_response): + response = await init_response + else: + response = init_response + + return response + except Exception as e: + raise e + + +@client +def create_batch( + completion_window: Literal["24h"], + endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"], + input_file_id: str, + custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai", + metadata: Optional[Dict[str, str]] = None, + extra_headers: Optional[Dict[str, str]] = None, + extra_body: Optional[Dict[str, str]] = None, + **kwargs, +) -> Union[LiteLLMBatch, Coroutine[Any, Any, LiteLLMBatch]]: + """ + Creates and executes a batch from an uploaded file of request + + LiteLLM Equivalent of POST: https://api.openai.com/v1/batches + """ + try: + optional_params = GenericLiteLLMParams(**kwargs) + litellm_call_id = kwargs.get("litellm_call_id", None) + proxy_server_request = kwargs.get("proxy_server_request", None) + model_info = kwargs.get("model_info", None) + _is_async = kwargs.pop("acreate_batch", False) is True + litellm_params = get_litellm_params(**kwargs) + litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None) + ### TIMEOUT LOGIC ### + timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600 + litellm_logging_obj.update_environment_variables( + model=None, + user=None, + optional_params=optional_params.model_dump(), + litellm_params={ + "litellm_call_id": litellm_call_id, + "proxy_server_request": proxy_server_request, + "model_info": model_info, + "metadata": metadata, + "preset_cache_key": None, + "stream_response": {}, + **optional_params.model_dump(exclude_unset=True), + }, + custom_llm_provider=custom_llm_provider, + ) + + if ( + timeout is not None + and isinstance(timeout, httpx.Timeout) + and supports_httpx_timeout(custom_llm_provider) is False + ): + read_timeout = timeout.read or 600 + timeout = read_timeout # default 10 min timeout + elif timeout is not None and not isinstance(timeout, httpx.Timeout): + timeout = float(timeout) # type: ignore + elif timeout is None: + timeout = 600.0 + + _create_batch_request = CreateBatchRequest( + completion_window=completion_window, + endpoint=endpoint, + input_file_id=input_file_id, + metadata=metadata, + extra_headers=extra_headers, + extra_body=extra_body, + ) + api_base: Optional[str] = None + if custom_llm_provider == "openai": + + # for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there + api_base = ( + optional_params.api_base + or litellm.api_base + or os.getenv("OPENAI_API_BASE") + or "https://api.openai.com/v1" + ) + organization = ( + optional_params.organization + or litellm.organization + or os.getenv("OPENAI_ORGANIZATION", None) + or None # default - https://github.com/openai/openai-python/blob/284c1799070c723c6a553337134148a7ab088dd8/openai/util.py#L105 + ) + # set API KEY + api_key = ( + optional_params.api_key + or litellm.api_key # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there + or litellm.openai_key + or os.getenv("OPENAI_API_KEY") + ) + + response = openai_batches_instance.create_batch( + api_base=api_base, + api_key=api_key, + organization=organization, + create_batch_data=_create_batch_request, + timeout=timeout, + max_retries=optional_params.max_retries, + _is_async=_is_async, + ) + elif custom_llm_provider == "azure": + api_base = ( + optional_params.api_base + or litellm.api_base + or get_secret_str("AZURE_API_BASE") + ) + api_version = ( + optional_params.api_version + or litellm.api_version + or get_secret_str("AZURE_API_VERSION") + ) + + api_key = ( + optional_params.api_key + or litellm.api_key + or litellm.azure_key + or get_secret_str("AZURE_OPENAI_API_KEY") + or get_secret_str("AZURE_API_KEY") + ) + + extra_body = optional_params.get("extra_body", {}) + if extra_body is not None: + extra_body.pop("azure_ad_token", None) + else: + get_secret_str("AZURE_AD_TOKEN") # type: ignore + + response = azure_batches_instance.create_batch( + _is_async=_is_async, + api_base=api_base, + api_key=api_key, + api_version=api_version, + timeout=timeout, + max_retries=optional_params.max_retries, + create_batch_data=_create_batch_request, + litellm_params=litellm_params, + ) + elif custom_llm_provider == "vertex_ai": + api_base = optional_params.api_base or "" + vertex_ai_project = ( + optional_params.vertex_project + or litellm.vertex_project + or get_secret_str("VERTEXAI_PROJECT") + ) + vertex_ai_location = ( + optional_params.vertex_location + or litellm.vertex_location + or get_secret_str("VERTEXAI_LOCATION") + ) + vertex_credentials = optional_params.vertex_credentials or get_secret_str( + "VERTEXAI_CREDENTIALS" + ) + + response = vertex_ai_batches_instance.create_batch( + _is_async=_is_async, + api_base=api_base, + vertex_project=vertex_ai_project, + vertex_location=vertex_ai_location, + vertex_credentials=vertex_credentials, + timeout=timeout, + max_retries=optional_params.max_retries, + create_batch_data=_create_batch_request, + ) + else: + raise litellm.exceptions.BadRequestError( + message="LiteLLM doesn't support custom_llm_provider={} for 'create_batch'".format( + custom_llm_provider + ), + model="n/a", + llm_provider=custom_llm_provider, + response=httpx.Response( + status_code=400, + content="Unsupported provider", + request=httpx.Request(method="create_batch", url="https://github.com/BerriAI/litellm"), # type: ignore + ), + ) + return response + except Exception as e: + raise e + + +@client +async def aretrieve_batch( + batch_id: str, + custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai", + metadata: Optional[Dict[str, str]] = None, + extra_headers: Optional[Dict[str, str]] = None, + extra_body: Optional[Dict[str, str]] = None, + **kwargs, +) -> LiteLLMBatch: + """ + Async: Retrieves a batch. + + LiteLLM Equivalent of GET https://api.openai.com/v1/batches/{batch_id} + """ + try: + loop = asyncio.get_event_loop() + kwargs["aretrieve_batch"] = True + + # Use a partial function to pass your keyword arguments + func = partial( + retrieve_batch, + batch_id, + custom_llm_provider, + metadata, + extra_headers, + extra_body, + **kwargs, + ) + # Add the context to the function + ctx = contextvars.copy_context() + func_with_context = partial(ctx.run, func) + init_response = await loop.run_in_executor(None, func_with_context) + if asyncio.iscoroutine(init_response): + response = await init_response + else: + response = init_response # type: ignore + + return response + except Exception as e: + raise e + + +@client +def retrieve_batch( + batch_id: str, + custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai", + metadata: Optional[Dict[str, str]] = None, + extra_headers: Optional[Dict[str, str]] = None, + extra_body: Optional[Dict[str, str]] = None, + **kwargs, +) -> Union[LiteLLMBatch, Coroutine[Any, Any, LiteLLMBatch]]: + """ + Retrieves a batch. + + LiteLLM Equivalent of GET https://api.openai.com/v1/batches/{batch_id} + """ + try: + optional_params = GenericLiteLLMParams(**kwargs) + litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None) + ### TIMEOUT LOGIC ### + timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600 + litellm_params = get_litellm_params( + custom_llm_provider=custom_llm_provider, + **kwargs, + ) + litellm_logging_obj.update_environment_variables( + model=None, + user=None, + optional_params=optional_params.model_dump(), + litellm_params=litellm_params, + custom_llm_provider=custom_llm_provider, + ) + + if ( + timeout is not None + and isinstance(timeout, httpx.Timeout) + and supports_httpx_timeout(custom_llm_provider) is False + ): + read_timeout = timeout.read or 600 + timeout = read_timeout # default 10 min timeout + elif timeout is not None and not isinstance(timeout, httpx.Timeout): + timeout = float(timeout) # type: ignore + elif timeout is None: + timeout = 600.0 + + _retrieve_batch_request = RetrieveBatchRequest( + batch_id=batch_id, + extra_headers=extra_headers, + extra_body=extra_body, + ) + + _is_async = kwargs.pop("aretrieve_batch", False) is True + api_base: Optional[str] = None + if custom_llm_provider == "openai": + + # for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there + api_base = ( + optional_params.api_base + or litellm.api_base + or os.getenv("OPENAI_API_BASE") + or "https://api.openai.com/v1" + ) + organization = ( + optional_params.organization + or litellm.organization + or os.getenv("OPENAI_ORGANIZATION", None) + or None # default - https://github.com/openai/openai-python/blob/284c1799070c723c6a553337134148a7ab088dd8/openai/util.py#L105 + ) + # set API KEY + api_key = ( + optional_params.api_key + or litellm.api_key # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there + or litellm.openai_key + or os.getenv("OPENAI_API_KEY") + ) + + response = openai_batches_instance.retrieve_batch( + _is_async=_is_async, + retrieve_batch_data=_retrieve_batch_request, + api_base=api_base, + api_key=api_key, + organization=organization, + timeout=timeout, + max_retries=optional_params.max_retries, + ) + elif custom_llm_provider == "azure": + api_base = ( + optional_params.api_base + or litellm.api_base + or get_secret_str("AZURE_API_BASE") + ) + api_version = ( + optional_params.api_version + or litellm.api_version + or get_secret_str("AZURE_API_VERSION") + ) + + api_key = ( + optional_params.api_key + or litellm.api_key + or litellm.azure_key + or get_secret_str("AZURE_OPENAI_API_KEY") + or get_secret_str("AZURE_API_KEY") + ) + + extra_body = optional_params.get("extra_body", {}) + if extra_body is not None: + extra_body.pop("azure_ad_token", None) + else: + get_secret_str("AZURE_AD_TOKEN") # type: ignore + + response = azure_batches_instance.retrieve_batch( + _is_async=_is_async, + api_base=api_base, + api_key=api_key, + api_version=api_version, + timeout=timeout, + max_retries=optional_params.max_retries, + retrieve_batch_data=_retrieve_batch_request, + litellm_params=litellm_params, + ) + elif custom_llm_provider == "vertex_ai": + api_base = optional_params.api_base or "" + vertex_ai_project = ( + optional_params.vertex_project + or litellm.vertex_project + or get_secret_str("VERTEXAI_PROJECT") + ) + vertex_ai_location = ( + optional_params.vertex_location + or litellm.vertex_location + or get_secret_str("VERTEXAI_LOCATION") + ) + vertex_credentials = optional_params.vertex_credentials or get_secret_str( + "VERTEXAI_CREDENTIALS" + ) + + response = vertex_ai_batches_instance.retrieve_batch( + _is_async=_is_async, + batch_id=batch_id, + api_base=api_base, + vertex_project=vertex_ai_project, + vertex_location=vertex_ai_location, + vertex_credentials=vertex_credentials, + timeout=timeout, + max_retries=optional_params.max_retries, + ) + else: + raise litellm.exceptions.BadRequestError( + message="LiteLLM doesn't support {} for 'create_batch'. Only 'openai' is supported.".format( + custom_llm_provider + ), + model="n/a", + llm_provider=custom_llm_provider, + response=httpx.Response( + status_code=400, + content="Unsupported provider", + request=httpx.Request(method="create_thread", url="https://github.com/BerriAI/litellm"), # type: ignore + ), + ) + return response + except Exception as e: + raise e + + +async def alist_batches( + after: Optional[str] = None, + limit: Optional[int] = None, + custom_llm_provider: Literal["openai", "azure"] = "openai", + metadata: Optional[Dict[str, str]] = None, + extra_headers: Optional[Dict[str, str]] = None, + extra_body: Optional[Dict[str, str]] = None, + **kwargs, +): + """ + Async: List your organization's batches. + """ + try: + loop = asyncio.get_event_loop() + kwargs["alist_batches"] = True + + # Use a partial function to pass your keyword arguments + func = partial( + list_batches, + after, + limit, + custom_llm_provider, + extra_headers, + extra_body, + **kwargs, + ) + + # Add the context to the function + ctx = contextvars.copy_context() + func_with_context = partial(ctx.run, func) + init_response = await loop.run_in_executor(None, func_with_context) + if asyncio.iscoroutine(init_response): + response = await init_response + else: + response = init_response # type: ignore + + return response + except Exception as e: + raise e + + +def list_batches( + after: Optional[str] = None, + limit: Optional[int] = None, + custom_llm_provider: Literal["openai", "azure"] = "openai", + extra_headers: Optional[Dict[str, str]] = None, + extra_body: Optional[Dict[str, str]] = None, + **kwargs, +): + """ + Lists batches + + List your organization's batches. + """ + try: + # set API KEY + optional_params = GenericLiteLLMParams(**kwargs) + litellm_params = get_litellm_params( + custom_llm_provider=custom_llm_provider, + **kwargs, + ) + api_key = ( + optional_params.api_key + or litellm.api_key # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there + or litellm.openai_key + or os.getenv("OPENAI_API_KEY") + ) + ### TIMEOUT LOGIC ### + timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600 + # set timeout for 10 minutes by default + + if ( + timeout is not None + and isinstance(timeout, httpx.Timeout) + and supports_httpx_timeout(custom_llm_provider) is False + ): + read_timeout = timeout.read or 600 + timeout = read_timeout # default 10 min timeout + elif timeout is not None and not isinstance(timeout, httpx.Timeout): + timeout = float(timeout) # type: ignore + elif timeout is None: + timeout = 600.0 + + _is_async = kwargs.pop("alist_batches", False) is True + if custom_llm_provider == "openai": + # for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there + api_base = ( + optional_params.api_base + or litellm.api_base + or os.getenv("OPENAI_API_BASE") + or "https://api.openai.com/v1" + ) + organization = ( + optional_params.organization + or litellm.organization + or os.getenv("OPENAI_ORGANIZATION", None) + or None # default - https://github.com/openai/openai-python/blob/284c1799070c723c6a553337134148a7ab088dd8/openai/util.py#L105 + ) + + response = openai_batches_instance.list_batches( + _is_async=_is_async, + after=after, + limit=limit, + api_base=api_base, + api_key=api_key, + organization=organization, + timeout=timeout, + max_retries=optional_params.max_retries, + ) + elif custom_llm_provider == "azure": + api_base = optional_params.api_base or litellm.api_base or get_secret_str("AZURE_API_BASE") # type: ignore + api_version = ( + optional_params.api_version + or litellm.api_version + or get_secret_str("AZURE_API_VERSION") + ) + + api_key = ( + optional_params.api_key + or litellm.api_key + or litellm.azure_key + or get_secret_str("AZURE_OPENAI_API_KEY") + or get_secret_str("AZURE_API_KEY") + ) + + extra_body = optional_params.get("extra_body", {}) + if extra_body is not None: + extra_body.pop("azure_ad_token", None) + else: + get_secret_str("AZURE_AD_TOKEN") # type: ignore + + response = azure_batches_instance.list_batches( + _is_async=_is_async, + api_base=api_base, + api_key=api_key, + api_version=api_version, + timeout=timeout, + max_retries=optional_params.max_retries, + litellm_params=litellm_params, + ) + else: + raise litellm.exceptions.BadRequestError( + message="LiteLLM doesn't support {} for 'list_batch'. Only 'openai' is supported.".format( + custom_llm_provider + ), + model="n/a", + llm_provider=custom_llm_provider, + response=httpx.Response( + status_code=400, + content="Unsupported provider", + request=httpx.Request(method="create_thread", url="https://github.com/BerriAI/litellm"), # type: ignore + ), + ) + return response + except Exception as e: + raise e + + +async def acancel_batch( + batch_id: str, + custom_llm_provider: Literal["openai", "azure"] = "openai", + metadata: Optional[Dict[str, str]] = None, + extra_headers: Optional[Dict[str, str]] = None, + extra_body: Optional[Dict[str, str]] = None, + **kwargs, +) -> Batch: + """ + Async: Cancels a batch. + + LiteLLM Equivalent of POST https://api.openai.com/v1/batches/{batch_id}/cancel + """ + try: + loop = asyncio.get_event_loop() + kwargs["acancel_batch"] = True + + # Use a partial function to pass your keyword arguments + func = partial( + cancel_batch, + batch_id, + custom_llm_provider, + metadata, + extra_headers, + extra_body, + **kwargs, + ) + # Add the context to the function + ctx = contextvars.copy_context() + func_with_context = partial(ctx.run, func) + init_response = await loop.run_in_executor(None, func_with_context) + if asyncio.iscoroutine(init_response): + response = await init_response + else: + response = init_response + + return response + except Exception as e: + raise e + + +def cancel_batch( + batch_id: str, + custom_llm_provider: Literal["openai", "azure"] = "openai", + metadata: Optional[Dict[str, str]] = None, + extra_headers: Optional[Dict[str, str]] = None, + extra_body: Optional[Dict[str, str]] = None, + **kwargs, +) -> Union[Batch, Coroutine[Any, Any, Batch]]: + """ + Cancels a batch. + + LiteLLM Equivalent of POST https://api.openai.com/v1/batches/{batch_id}/cancel + """ + try: + optional_params = GenericLiteLLMParams(**kwargs) + litellm_params = get_litellm_params( + custom_llm_provider=custom_llm_provider, + **kwargs, + ) + ### TIMEOUT LOGIC ### + timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600 + # set timeout for 10 minutes by default + + if ( + timeout is not None + and isinstance(timeout, httpx.Timeout) + and supports_httpx_timeout(custom_llm_provider) is False + ): + read_timeout = timeout.read or 600 + timeout = read_timeout # default 10 min timeout + elif timeout is not None and not isinstance(timeout, httpx.Timeout): + timeout = float(timeout) # type: ignore + elif timeout is None: + timeout = 600.0 + + _cancel_batch_request = CancelBatchRequest( + batch_id=batch_id, + extra_headers=extra_headers, + extra_body=extra_body, + ) + + _is_async = kwargs.pop("acancel_batch", False) is True + api_base: Optional[str] = None + if custom_llm_provider == "openai": + api_base = ( + optional_params.api_base + or litellm.api_base + or os.getenv("OPENAI_API_BASE") + or "https://api.openai.com/v1" + ) + organization = ( + optional_params.organization + or litellm.organization + or os.getenv("OPENAI_ORGANIZATION", None) + or None + ) + api_key = ( + optional_params.api_key + or litellm.api_key + or litellm.openai_key + or os.getenv("OPENAI_API_KEY") + ) + + response = openai_batches_instance.cancel_batch( + _is_async=_is_async, + cancel_batch_data=_cancel_batch_request, + api_base=api_base, + api_key=api_key, + organization=organization, + timeout=timeout, + max_retries=optional_params.max_retries, + ) + elif custom_llm_provider == "azure": + api_base = ( + optional_params.api_base + or litellm.api_base + or get_secret_str("AZURE_API_BASE") + ) + api_version = ( + optional_params.api_version + or litellm.api_version + or get_secret_str("AZURE_API_VERSION") + ) + + api_key = ( + optional_params.api_key + or litellm.api_key + or litellm.azure_key + or get_secret_str("AZURE_OPENAI_API_KEY") + or get_secret_str("AZURE_API_KEY") + ) + + extra_body = optional_params.get("extra_body", {}) + if extra_body is not None: + extra_body.pop("azure_ad_token", None) + else: + get_secret_str("AZURE_AD_TOKEN") # type: ignore + + response = azure_batches_instance.cancel_batch( + _is_async=_is_async, + api_base=api_base, + api_key=api_key, + api_version=api_version, + timeout=timeout, + max_retries=optional_params.max_retries, + cancel_batch_data=_cancel_batch_request, + litellm_params=litellm_params, + ) + else: + raise litellm.exceptions.BadRequestError( + message="LiteLLM doesn't support {} for 'cancel_batch'. Only 'openai' and 'azure' are supported.".format( + custom_llm_provider + ), + model="n/a", + llm_provider=custom_llm_provider, + response=httpx.Response( + status_code=400, + content="Unsupported provider", + request=httpx.Request(method="cancel_batch", url="https://github.com/BerriAI/litellm"), # type: ignore + ), + ) + return response + except Exception as e: + raise e |