about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/batches
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/batches
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/batches')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py182
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/batches/main.py794
2 files changed, 976 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py b/.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py
new file mode 100644
index 00000000..af53304e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py
@@ -0,0 +1,182 @@
+import json
+from typing import Any, List, Literal, Tuple
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.types.llms.openai import Batch
+from litellm.types.utils import CallTypes, Usage
+
+
+async def _handle_completed_batch(
+    batch: Batch,
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"],
+) -> Tuple[float, Usage, List[str]]:
+    """Helper function to process a completed batch and handle logging"""
+    # Get batch results
+    file_content_dictionary = await _get_batch_output_file_content_as_dictionary(
+        batch, custom_llm_provider
+    )
+
+    # Calculate costs and usage
+    batch_cost = await _batch_cost_calculator(
+        custom_llm_provider=custom_llm_provider,
+        file_content_dictionary=file_content_dictionary,
+    )
+    batch_usage = _get_batch_job_total_usage_from_file_content(
+        file_content_dictionary=file_content_dictionary,
+        custom_llm_provider=custom_llm_provider,
+    )
+
+    batch_models = _get_batch_models_from_file_content(file_content_dictionary)
+
+    return batch_cost, batch_usage, batch_models
+
+
+def _get_batch_models_from_file_content(
+    file_content_dictionary: List[dict],
+) -> List[str]:
+    """
+    Get the models from the file content
+    """
+    batch_models = []
+    for _item in file_content_dictionary:
+        if _batch_response_was_successful(_item):
+            _response_body = _get_response_from_batch_job_output_file(_item)
+            _model = _response_body.get("model")
+            if _model:
+                batch_models.append(_model)
+    return batch_models
+
+
+async def _batch_cost_calculator(
+    file_content_dictionary: List[dict],
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
+) -> float:
+    """
+    Calculate the cost of a batch based on the output file id
+    """
+    if custom_llm_provider == "vertex_ai":
+        raise ValueError("Vertex AI does not support file content retrieval")
+    total_cost = _get_batch_job_cost_from_file_content(
+        file_content_dictionary=file_content_dictionary,
+        custom_llm_provider=custom_llm_provider,
+    )
+    verbose_logger.debug("total_cost=%s", total_cost)
+    return total_cost
+
+
+async def _get_batch_output_file_content_as_dictionary(
+    batch: Batch,
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
+) -> List[dict]:
+    """
+    Get the batch output file content as a list of dictionaries
+    """
+    from litellm.files.main import afile_content
+
+    if custom_llm_provider == "vertex_ai":
+        raise ValueError("Vertex AI does not support file content retrieval")
+
+    if batch.output_file_id is None:
+        raise ValueError("Output file id is None cannot retrieve file content")
+
+    _file_content = await afile_content(
+        file_id=batch.output_file_id,
+        custom_llm_provider=custom_llm_provider,
+    )
+    return _get_file_content_as_dictionary(_file_content.content)
+
+
+def _get_file_content_as_dictionary(file_content: bytes) -> List[dict]:
+    """
+    Get the file content as a list of dictionaries from JSON Lines format
+    """
+    try:
+        _file_content_str = file_content.decode("utf-8")
+        # Split by newlines and parse each line as a separate JSON object
+        json_objects = []
+        for line in _file_content_str.strip().split("\n"):
+            if line:  # Skip empty lines
+                json_objects.append(json.loads(line))
+        verbose_logger.debug("json_objects=%s", json.dumps(json_objects, indent=4))
+        return json_objects
+    except Exception as e:
+        raise e
+
+
+def _get_batch_job_cost_from_file_content(
+    file_content_dictionary: List[dict],
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
+) -> float:
+    """
+    Get the cost of a batch job from the file content
+    """
+    try:
+        total_cost: float = 0.0
+        # parse the file content as json
+        verbose_logger.debug(
+            "file_content_dictionary=%s", json.dumps(file_content_dictionary, indent=4)
+        )
+        for _item in file_content_dictionary:
+            if _batch_response_was_successful(_item):
+                _response_body = _get_response_from_batch_job_output_file(_item)
+                total_cost += litellm.completion_cost(
+                    completion_response=_response_body,
+                    custom_llm_provider=custom_llm_provider,
+                    call_type=CallTypes.aretrieve_batch.value,
+                )
+                verbose_logger.debug("total_cost=%s", total_cost)
+        return total_cost
+    except Exception as e:
+        verbose_logger.error("error in _get_batch_job_cost_from_file_content", e)
+        raise e
+
+
+def _get_batch_job_total_usage_from_file_content(
+    file_content_dictionary: List[dict],
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
+) -> Usage:
+    """
+    Get the tokens of a batch job from the file content
+    """
+    total_tokens: int = 0
+    prompt_tokens: int = 0
+    completion_tokens: int = 0
+    for _item in file_content_dictionary:
+        if _batch_response_was_successful(_item):
+            _response_body = _get_response_from_batch_job_output_file(_item)
+            usage: Usage = _get_batch_job_usage_from_response_body(_response_body)
+            total_tokens += usage.total_tokens
+            prompt_tokens += usage.prompt_tokens
+            completion_tokens += usage.completion_tokens
+    return Usage(
+        total_tokens=total_tokens,
+        prompt_tokens=prompt_tokens,
+        completion_tokens=completion_tokens,
+    )
+
+
+def _get_batch_job_usage_from_response_body(response_body: dict) -> Usage:
+    """
+    Get the tokens of a batch job from the response body
+    """
+    _usage_dict = response_body.get("usage", None) or {}
+    usage: Usage = Usage(**_usage_dict)
+    return usage
+
+
+def _get_response_from_batch_job_output_file(batch_job_output_file: dict) -> Any:
+    """
+    Get the response from the batch job output file
+    """
+    _response: dict = batch_job_output_file.get("response", None) or {}
+    _response_body = _response.get("body", None) or {}
+    return _response_body
+
+
+def _batch_response_was_successful(batch_job_output_file: dict) -> bool:
+    """
+    Check if the batch job response status == 200
+    """
+    _response: dict = batch_job_output_file.get("response", None) or {}
+    return _response.get("status_code", None) == 200
diff --git a/.venv/lib/python3.12/site-packages/litellm/batches/main.py b/.venv/lib/python3.12/site-packages/litellm/batches/main.py
new file mode 100644
index 00000000..1ddcafce
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/batches/main.py
@@ -0,0 +1,794 @@
+"""
+Main File for Batches API implementation
+
+https://platform.openai.com/docs/api-reference/batch
+
+- create_batch()
+- retrieve_batch()
+- cancel_batch()
+- list_batch()
+
+"""
+
+import asyncio
+import contextvars
+import os
+from functools import partial
+from typing import Any, Coroutine, Dict, Literal, Optional, Union
+
+import httpx
+
+import litellm
+from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
+from litellm.llms.azure.batches.handler import AzureBatchesAPI
+from litellm.llms.openai.openai import OpenAIBatchesAPI
+from litellm.llms.vertex_ai.batches.handler import VertexAIBatchPrediction
+from litellm.secret_managers.main import get_secret_str
+from litellm.types.llms.openai import (
+    Batch,
+    CancelBatchRequest,
+    CreateBatchRequest,
+    RetrieveBatchRequest,
+)
+from litellm.types.router import GenericLiteLLMParams
+from litellm.types.utils import LiteLLMBatch
+from litellm.utils import client, get_litellm_params, supports_httpx_timeout
+
+####### ENVIRONMENT VARIABLES ###################
+openai_batches_instance = OpenAIBatchesAPI()
+azure_batches_instance = AzureBatchesAPI()
+vertex_ai_batches_instance = VertexAIBatchPrediction(gcs_bucket_name="")
+#################################################
+
+
+@client
+async def acreate_batch(
+    completion_window: Literal["24h"],
+    endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"],
+    input_file_id: str,
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
+    metadata: Optional[Dict[str, str]] = None,
+    extra_headers: Optional[Dict[str, str]] = None,
+    extra_body: Optional[Dict[str, str]] = None,
+    **kwargs,
+) -> Batch:
+    """
+    Async: Creates and executes a batch from an uploaded file of request
+
+    LiteLLM Equivalent of POST: https://api.openai.com/v1/batches
+    """
+    try:
+        loop = asyncio.get_event_loop()
+        kwargs["acreate_batch"] = True
+
+        # Use a partial function to pass your keyword arguments
+        func = partial(
+            create_batch,
+            completion_window,
+            endpoint,
+            input_file_id,
+            custom_llm_provider,
+            metadata,
+            extra_headers,
+            extra_body,
+            **kwargs,
+        )
+
+        # Add the context to the function
+        ctx = contextvars.copy_context()
+        func_with_context = partial(ctx.run, func)
+        init_response = await loop.run_in_executor(None, func_with_context)
+
+        if asyncio.iscoroutine(init_response):
+            response = await init_response
+        else:
+            response = init_response
+
+        return response
+    except Exception as e:
+        raise e
+
+
+@client
+def create_batch(
+    completion_window: Literal["24h"],
+    endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"],
+    input_file_id: str,
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
+    metadata: Optional[Dict[str, str]] = None,
+    extra_headers: Optional[Dict[str, str]] = None,
+    extra_body: Optional[Dict[str, str]] = None,
+    **kwargs,
+) -> Union[LiteLLMBatch, Coroutine[Any, Any, LiteLLMBatch]]:
+    """
+    Creates and executes a batch from an uploaded file of request
+
+    LiteLLM Equivalent of POST: https://api.openai.com/v1/batches
+    """
+    try:
+        optional_params = GenericLiteLLMParams(**kwargs)
+        litellm_call_id = kwargs.get("litellm_call_id", None)
+        proxy_server_request = kwargs.get("proxy_server_request", None)
+        model_info = kwargs.get("model_info", None)
+        _is_async = kwargs.pop("acreate_batch", False) is True
+        litellm_params = get_litellm_params(**kwargs)
+        litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None)
+        ### TIMEOUT LOGIC ###
+        timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600
+        litellm_logging_obj.update_environment_variables(
+            model=None,
+            user=None,
+            optional_params=optional_params.model_dump(),
+            litellm_params={
+                "litellm_call_id": litellm_call_id,
+                "proxy_server_request": proxy_server_request,
+                "model_info": model_info,
+                "metadata": metadata,
+                "preset_cache_key": None,
+                "stream_response": {},
+                **optional_params.model_dump(exclude_unset=True),
+            },
+            custom_llm_provider=custom_llm_provider,
+        )
+
+        if (
+            timeout is not None
+            and isinstance(timeout, httpx.Timeout)
+            and supports_httpx_timeout(custom_llm_provider) is False
+        ):
+            read_timeout = timeout.read or 600
+            timeout = read_timeout  # default 10 min timeout
+        elif timeout is not None and not isinstance(timeout, httpx.Timeout):
+            timeout = float(timeout)  # type: ignore
+        elif timeout is None:
+            timeout = 600.0
+
+        _create_batch_request = CreateBatchRequest(
+            completion_window=completion_window,
+            endpoint=endpoint,
+            input_file_id=input_file_id,
+            metadata=metadata,
+            extra_headers=extra_headers,
+            extra_body=extra_body,
+        )
+        api_base: Optional[str] = None
+        if custom_llm_provider == "openai":
+
+            # for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there
+            api_base = (
+                optional_params.api_base
+                or litellm.api_base
+                or os.getenv("OPENAI_API_BASE")
+                or "https://api.openai.com/v1"
+            )
+            organization = (
+                optional_params.organization
+                or litellm.organization
+                or os.getenv("OPENAI_ORGANIZATION", None)
+                or None  # default - https://github.com/openai/openai-python/blob/284c1799070c723c6a553337134148a7ab088dd8/openai/util.py#L105
+            )
+            # set API KEY
+            api_key = (
+                optional_params.api_key
+                or litellm.api_key  # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there
+                or litellm.openai_key
+                or os.getenv("OPENAI_API_KEY")
+            )
+
+            response = openai_batches_instance.create_batch(
+                api_base=api_base,
+                api_key=api_key,
+                organization=organization,
+                create_batch_data=_create_batch_request,
+                timeout=timeout,
+                max_retries=optional_params.max_retries,
+                _is_async=_is_async,
+            )
+        elif custom_llm_provider == "azure":
+            api_base = (
+                optional_params.api_base
+                or litellm.api_base
+                or get_secret_str("AZURE_API_BASE")
+            )
+            api_version = (
+                optional_params.api_version
+                or litellm.api_version
+                or get_secret_str("AZURE_API_VERSION")
+            )
+
+            api_key = (
+                optional_params.api_key
+                or litellm.api_key
+                or litellm.azure_key
+                or get_secret_str("AZURE_OPENAI_API_KEY")
+                or get_secret_str("AZURE_API_KEY")
+            )
+
+            extra_body = optional_params.get("extra_body", {})
+            if extra_body is not None:
+                extra_body.pop("azure_ad_token", None)
+            else:
+                get_secret_str("AZURE_AD_TOKEN")  # type: ignore
+
+            response = azure_batches_instance.create_batch(
+                _is_async=_is_async,
+                api_base=api_base,
+                api_key=api_key,
+                api_version=api_version,
+                timeout=timeout,
+                max_retries=optional_params.max_retries,
+                create_batch_data=_create_batch_request,
+                litellm_params=litellm_params,
+            )
+        elif custom_llm_provider == "vertex_ai":
+            api_base = optional_params.api_base or ""
+            vertex_ai_project = (
+                optional_params.vertex_project
+                or litellm.vertex_project
+                or get_secret_str("VERTEXAI_PROJECT")
+            )
+            vertex_ai_location = (
+                optional_params.vertex_location
+                or litellm.vertex_location
+                or get_secret_str("VERTEXAI_LOCATION")
+            )
+            vertex_credentials = optional_params.vertex_credentials or get_secret_str(
+                "VERTEXAI_CREDENTIALS"
+            )
+
+            response = vertex_ai_batches_instance.create_batch(
+                _is_async=_is_async,
+                api_base=api_base,
+                vertex_project=vertex_ai_project,
+                vertex_location=vertex_ai_location,
+                vertex_credentials=vertex_credentials,
+                timeout=timeout,
+                max_retries=optional_params.max_retries,
+                create_batch_data=_create_batch_request,
+            )
+        else:
+            raise litellm.exceptions.BadRequestError(
+                message="LiteLLM doesn't support custom_llm_provider={} for 'create_batch'".format(
+                    custom_llm_provider
+                ),
+                model="n/a",
+                llm_provider=custom_llm_provider,
+                response=httpx.Response(
+                    status_code=400,
+                    content="Unsupported provider",
+                    request=httpx.Request(method="create_batch", url="https://github.com/BerriAI/litellm"),  # type: ignore
+                ),
+            )
+        return response
+    except Exception as e:
+        raise e
+
+
+@client
+async def aretrieve_batch(
+    batch_id: str,
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
+    metadata: Optional[Dict[str, str]] = None,
+    extra_headers: Optional[Dict[str, str]] = None,
+    extra_body: Optional[Dict[str, str]] = None,
+    **kwargs,
+) -> LiteLLMBatch:
+    """
+    Async: Retrieves a batch.
+
+    LiteLLM Equivalent of GET https://api.openai.com/v1/batches/{batch_id}
+    """
+    try:
+        loop = asyncio.get_event_loop()
+        kwargs["aretrieve_batch"] = True
+
+        # Use a partial function to pass your keyword arguments
+        func = partial(
+            retrieve_batch,
+            batch_id,
+            custom_llm_provider,
+            metadata,
+            extra_headers,
+            extra_body,
+            **kwargs,
+        )
+        # Add the context to the function
+        ctx = contextvars.copy_context()
+        func_with_context = partial(ctx.run, func)
+        init_response = await loop.run_in_executor(None, func_with_context)
+        if asyncio.iscoroutine(init_response):
+            response = await init_response
+        else:
+            response = init_response  # type: ignore
+
+        return response
+    except Exception as e:
+        raise e
+
+
+@client
+def retrieve_batch(
+    batch_id: str,
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
+    metadata: Optional[Dict[str, str]] = None,
+    extra_headers: Optional[Dict[str, str]] = None,
+    extra_body: Optional[Dict[str, str]] = None,
+    **kwargs,
+) -> Union[LiteLLMBatch, Coroutine[Any, Any, LiteLLMBatch]]:
+    """
+    Retrieves a batch.
+
+    LiteLLM Equivalent of GET https://api.openai.com/v1/batches/{batch_id}
+    """
+    try:
+        optional_params = GenericLiteLLMParams(**kwargs)
+        litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None)
+        ### TIMEOUT LOGIC ###
+        timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600
+        litellm_params = get_litellm_params(
+            custom_llm_provider=custom_llm_provider,
+            **kwargs,
+        )
+        litellm_logging_obj.update_environment_variables(
+            model=None,
+            user=None,
+            optional_params=optional_params.model_dump(),
+            litellm_params=litellm_params,
+            custom_llm_provider=custom_llm_provider,
+        )
+
+        if (
+            timeout is not None
+            and isinstance(timeout, httpx.Timeout)
+            and supports_httpx_timeout(custom_llm_provider) is False
+        ):
+            read_timeout = timeout.read or 600
+            timeout = read_timeout  # default 10 min timeout
+        elif timeout is not None and not isinstance(timeout, httpx.Timeout):
+            timeout = float(timeout)  # type: ignore
+        elif timeout is None:
+            timeout = 600.0
+
+        _retrieve_batch_request = RetrieveBatchRequest(
+            batch_id=batch_id,
+            extra_headers=extra_headers,
+            extra_body=extra_body,
+        )
+
+        _is_async = kwargs.pop("aretrieve_batch", False) is True
+        api_base: Optional[str] = None
+        if custom_llm_provider == "openai":
+
+            # for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there
+            api_base = (
+                optional_params.api_base
+                or litellm.api_base
+                or os.getenv("OPENAI_API_BASE")
+                or "https://api.openai.com/v1"
+            )
+            organization = (
+                optional_params.organization
+                or litellm.organization
+                or os.getenv("OPENAI_ORGANIZATION", None)
+                or None  # default - https://github.com/openai/openai-python/blob/284c1799070c723c6a553337134148a7ab088dd8/openai/util.py#L105
+            )
+            # set API KEY
+            api_key = (
+                optional_params.api_key
+                or litellm.api_key  # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there
+                or litellm.openai_key
+                or os.getenv("OPENAI_API_KEY")
+            )
+
+            response = openai_batches_instance.retrieve_batch(
+                _is_async=_is_async,
+                retrieve_batch_data=_retrieve_batch_request,
+                api_base=api_base,
+                api_key=api_key,
+                organization=organization,
+                timeout=timeout,
+                max_retries=optional_params.max_retries,
+            )
+        elif custom_llm_provider == "azure":
+            api_base = (
+                optional_params.api_base
+                or litellm.api_base
+                or get_secret_str("AZURE_API_BASE")
+            )
+            api_version = (
+                optional_params.api_version
+                or litellm.api_version
+                or get_secret_str("AZURE_API_VERSION")
+            )
+
+            api_key = (
+                optional_params.api_key
+                or litellm.api_key
+                or litellm.azure_key
+                or get_secret_str("AZURE_OPENAI_API_KEY")
+                or get_secret_str("AZURE_API_KEY")
+            )
+
+            extra_body = optional_params.get("extra_body", {})
+            if extra_body is not None:
+                extra_body.pop("azure_ad_token", None)
+            else:
+                get_secret_str("AZURE_AD_TOKEN")  # type: ignore
+
+            response = azure_batches_instance.retrieve_batch(
+                _is_async=_is_async,
+                api_base=api_base,
+                api_key=api_key,
+                api_version=api_version,
+                timeout=timeout,
+                max_retries=optional_params.max_retries,
+                retrieve_batch_data=_retrieve_batch_request,
+                litellm_params=litellm_params,
+            )
+        elif custom_llm_provider == "vertex_ai":
+            api_base = optional_params.api_base or ""
+            vertex_ai_project = (
+                optional_params.vertex_project
+                or litellm.vertex_project
+                or get_secret_str("VERTEXAI_PROJECT")
+            )
+            vertex_ai_location = (
+                optional_params.vertex_location
+                or litellm.vertex_location
+                or get_secret_str("VERTEXAI_LOCATION")
+            )
+            vertex_credentials = optional_params.vertex_credentials or get_secret_str(
+                "VERTEXAI_CREDENTIALS"
+            )
+
+            response = vertex_ai_batches_instance.retrieve_batch(
+                _is_async=_is_async,
+                batch_id=batch_id,
+                api_base=api_base,
+                vertex_project=vertex_ai_project,
+                vertex_location=vertex_ai_location,
+                vertex_credentials=vertex_credentials,
+                timeout=timeout,
+                max_retries=optional_params.max_retries,
+            )
+        else:
+            raise litellm.exceptions.BadRequestError(
+                message="LiteLLM doesn't support {} for 'create_batch'. Only 'openai' is supported.".format(
+                    custom_llm_provider
+                ),
+                model="n/a",
+                llm_provider=custom_llm_provider,
+                response=httpx.Response(
+                    status_code=400,
+                    content="Unsupported provider",
+                    request=httpx.Request(method="create_thread", url="https://github.com/BerriAI/litellm"),  # type: ignore
+                ),
+            )
+        return response
+    except Exception as e:
+        raise e
+
+
+async def alist_batches(
+    after: Optional[str] = None,
+    limit: Optional[int] = None,
+    custom_llm_provider: Literal["openai", "azure"] = "openai",
+    metadata: Optional[Dict[str, str]] = None,
+    extra_headers: Optional[Dict[str, str]] = None,
+    extra_body: Optional[Dict[str, str]] = None,
+    **kwargs,
+):
+    """
+    Async: List your organization's batches.
+    """
+    try:
+        loop = asyncio.get_event_loop()
+        kwargs["alist_batches"] = True
+
+        # Use a partial function to pass your keyword arguments
+        func = partial(
+            list_batches,
+            after,
+            limit,
+            custom_llm_provider,
+            extra_headers,
+            extra_body,
+            **kwargs,
+        )
+
+        # Add the context to the function
+        ctx = contextvars.copy_context()
+        func_with_context = partial(ctx.run, func)
+        init_response = await loop.run_in_executor(None, func_with_context)
+        if asyncio.iscoroutine(init_response):
+            response = await init_response
+        else:
+            response = init_response  # type: ignore
+
+        return response
+    except Exception as e:
+        raise e
+
+
+def list_batches(
+    after: Optional[str] = None,
+    limit: Optional[int] = None,
+    custom_llm_provider: Literal["openai", "azure"] = "openai",
+    extra_headers: Optional[Dict[str, str]] = None,
+    extra_body: Optional[Dict[str, str]] = None,
+    **kwargs,
+):
+    """
+    Lists batches
+
+    List your organization's batches.
+    """
+    try:
+        # set API KEY
+        optional_params = GenericLiteLLMParams(**kwargs)
+        litellm_params = get_litellm_params(
+            custom_llm_provider=custom_llm_provider,
+            **kwargs,
+        )
+        api_key = (
+            optional_params.api_key
+            or litellm.api_key  # for deepinfra/perplexity/anyscale we check in get_llm_provider and pass in the api key from there
+            or litellm.openai_key
+            or os.getenv("OPENAI_API_KEY")
+        )
+        ### TIMEOUT LOGIC ###
+        timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600
+        # set timeout for 10 minutes by default
+
+        if (
+            timeout is not None
+            and isinstance(timeout, httpx.Timeout)
+            and supports_httpx_timeout(custom_llm_provider) is False
+        ):
+            read_timeout = timeout.read or 600
+            timeout = read_timeout  # default 10 min timeout
+        elif timeout is not None and not isinstance(timeout, httpx.Timeout):
+            timeout = float(timeout)  # type: ignore
+        elif timeout is None:
+            timeout = 600.0
+
+        _is_async = kwargs.pop("alist_batches", False) is True
+        if custom_llm_provider == "openai":
+            # for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there
+            api_base = (
+                optional_params.api_base
+                or litellm.api_base
+                or os.getenv("OPENAI_API_BASE")
+                or "https://api.openai.com/v1"
+            )
+            organization = (
+                optional_params.organization
+                or litellm.organization
+                or os.getenv("OPENAI_ORGANIZATION", None)
+                or None  # default - https://github.com/openai/openai-python/blob/284c1799070c723c6a553337134148a7ab088dd8/openai/util.py#L105
+            )
+
+            response = openai_batches_instance.list_batches(
+                _is_async=_is_async,
+                after=after,
+                limit=limit,
+                api_base=api_base,
+                api_key=api_key,
+                organization=organization,
+                timeout=timeout,
+                max_retries=optional_params.max_retries,
+            )
+        elif custom_llm_provider == "azure":
+            api_base = optional_params.api_base or litellm.api_base or get_secret_str("AZURE_API_BASE")  # type: ignore
+            api_version = (
+                optional_params.api_version
+                or litellm.api_version
+                or get_secret_str("AZURE_API_VERSION")
+            )
+
+            api_key = (
+                optional_params.api_key
+                or litellm.api_key
+                or litellm.azure_key
+                or get_secret_str("AZURE_OPENAI_API_KEY")
+                or get_secret_str("AZURE_API_KEY")
+            )
+
+            extra_body = optional_params.get("extra_body", {})
+            if extra_body is not None:
+                extra_body.pop("azure_ad_token", None)
+            else:
+                get_secret_str("AZURE_AD_TOKEN")  # type: ignore
+
+            response = azure_batches_instance.list_batches(
+                _is_async=_is_async,
+                api_base=api_base,
+                api_key=api_key,
+                api_version=api_version,
+                timeout=timeout,
+                max_retries=optional_params.max_retries,
+                litellm_params=litellm_params,
+            )
+        else:
+            raise litellm.exceptions.BadRequestError(
+                message="LiteLLM doesn't support {} for 'list_batch'. Only 'openai' is supported.".format(
+                    custom_llm_provider
+                ),
+                model="n/a",
+                llm_provider=custom_llm_provider,
+                response=httpx.Response(
+                    status_code=400,
+                    content="Unsupported provider",
+                    request=httpx.Request(method="create_thread", url="https://github.com/BerriAI/litellm"),  # type: ignore
+                ),
+            )
+        return response
+    except Exception as e:
+        raise e
+
+
+async def acancel_batch(
+    batch_id: str,
+    custom_llm_provider: Literal["openai", "azure"] = "openai",
+    metadata: Optional[Dict[str, str]] = None,
+    extra_headers: Optional[Dict[str, str]] = None,
+    extra_body: Optional[Dict[str, str]] = None,
+    **kwargs,
+) -> Batch:
+    """
+    Async: Cancels a batch.
+
+    LiteLLM Equivalent of POST https://api.openai.com/v1/batches/{batch_id}/cancel
+    """
+    try:
+        loop = asyncio.get_event_loop()
+        kwargs["acancel_batch"] = True
+
+        # Use a partial function to pass your keyword arguments
+        func = partial(
+            cancel_batch,
+            batch_id,
+            custom_llm_provider,
+            metadata,
+            extra_headers,
+            extra_body,
+            **kwargs,
+        )
+        # Add the context to the function
+        ctx = contextvars.copy_context()
+        func_with_context = partial(ctx.run, func)
+        init_response = await loop.run_in_executor(None, func_with_context)
+        if asyncio.iscoroutine(init_response):
+            response = await init_response
+        else:
+            response = init_response
+
+        return response
+    except Exception as e:
+        raise e
+
+
+def cancel_batch(
+    batch_id: str,
+    custom_llm_provider: Literal["openai", "azure"] = "openai",
+    metadata: Optional[Dict[str, str]] = None,
+    extra_headers: Optional[Dict[str, str]] = None,
+    extra_body: Optional[Dict[str, str]] = None,
+    **kwargs,
+) -> Union[Batch, Coroutine[Any, Any, Batch]]:
+    """
+    Cancels a batch.
+
+    LiteLLM Equivalent of POST https://api.openai.com/v1/batches/{batch_id}/cancel
+    """
+    try:
+        optional_params = GenericLiteLLMParams(**kwargs)
+        litellm_params = get_litellm_params(
+            custom_llm_provider=custom_llm_provider,
+            **kwargs,
+        )
+        ### TIMEOUT LOGIC ###
+        timeout = optional_params.timeout or kwargs.get("request_timeout", 600) or 600
+        # set timeout for 10 minutes by default
+
+        if (
+            timeout is not None
+            and isinstance(timeout, httpx.Timeout)
+            and supports_httpx_timeout(custom_llm_provider) is False
+        ):
+            read_timeout = timeout.read or 600
+            timeout = read_timeout  # default 10 min timeout
+        elif timeout is not None and not isinstance(timeout, httpx.Timeout):
+            timeout = float(timeout)  # type: ignore
+        elif timeout is None:
+            timeout = 600.0
+
+        _cancel_batch_request = CancelBatchRequest(
+            batch_id=batch_id,
+            extra_headers=extra_headers,
+            extra_body=extra_body,
+        )
+
+        _is_async = kwargs.pop("acancel_batch", False) is True
+        api_base: Optional[str] = None
+        if custom_llm_provider == "openai":
+            api_base = (
+                optional_params.api_base
+                or litellm.api_base
+                or os.getenv("OPENAI_API_BASE")
+                or "https://api.openai.com/v1"
+            )
+            organization = (
+                optional_params.organization
+                or litellm.organization
+                or os.getenv("OPENAI_ORGANIZATION", None)
+                or None
+            )
+            api_key = (
+                optional_params.api_key
+                or litellm.api_key
+                or litellm.openai_key
+                or os.getenv("OPENAI_API_KEY")
+            )
+
+            response = openai_batches_instance.cancel_batch(
+                _is_async=_is_async,
+                cancel_batch_data=_cancel_batch_request,
+                api_base=api_base,
+                api_key=api_key,
+                organization=organization,
+                timeout=timeout,
+                max_retries=optional_params.max_retries,
+            )
+        elif custom_llm_provider == "azure":
+            api_base = (
+                optional_params.api_base
+                or litellm.api_base
+                or get_secret_str("AZURE_API_BASE")
+            )
+            api_version = (
+                optional_params.api_version
+                or litellm.api_version
+                or get_secret_str("AZURE_API_VERSION")
+            )
+
+            api_key = (
+                optional_params.api_key
+                or litellm.api_key
+                or litellm.azure_key
+                or get_secret_str("AZURE_OPENAI_API_KEY")
+                or get_secret_str("AZURE_API_KEY")
+            )
+
+            extra_body = optional_params.get("extra_body", {})
+            if extra_body is not None:
+                extra_body.pop("azure_ad_token", None)
+            else:
+                get_secret_str("AZURE_AD_TOKEN")  # type: ignore
+
+            response = azure_batches_instance.cancel_batch(
+                _is_async=_is_async,
+                api_base=api_base,
+                api_key=api_key,
+                api_version=api_version,
+                timeout=timeout,
+                max_retries=optional_params.max_retries,
+                cancel_batch_data=_cancel_batch_request,
+                litellm_params=litellm_params,
+            )
+        else:
+            raise litellm.exceptions.BadRequestError(
+                message="LiteLLM doesn't support {} for 'cancel_batch'. Only 'openai' and 'azure' are supported.".format(
+                    custom_llm_provider
+                ),
+                model="n/a",
+                llm_provider=custom_llm_provider,
+                response=httpx.Response(
+                    status_code=400,
+                    content="Unsupported provider",
+                    request=httpx.Request(method="cancel_batch", url="https://github.com/BerriAI/litellm"),  # type: ignore
+                ),
+            )
+        return response
+    except Exception as e:
+        raise e