about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/batch_completion
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/batch_completion
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/batch_completion')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/batch_completion/Readme.md11
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/batch_completion/main.py253
2 files changed, 264 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/batch_completion/Readme.md b/.venv/lib/python3.12/site-packages/litellm/batch_completion/Readme.md
new file mode 100644
index 00000000..23cc8712
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/batch_completion/Readme.md
@@ -0,0 +1,11 @@
+# Implementation of `litellm.batch_completion`, `litellm.batch_completion_models`, `litellm.batch_completion_models_all_responses`
+
+Doc: https://docs.litellm.ai/docs/completion/batching
+
+
+LiteLLM Python SDK allows you to:
+1. `litellm.batch_completion` Batch litellm.completion function for a given model.
+2. `litellm.batch_completion_models` Send a request to multiple language models concurrently and return the response
+    as soon as one of the models responds.
+3. `litellm.batch_completion_models_all_responses` Send a request to multiple language models concurrently and return a list of responses
+    from all models that respond.
\ No newline at end of file
diff --git a/.venv/lib/python3.12/site-packages/litellm/batch_completion/main.py b/.venv/lib/python3.12/site-packages/litellm/batch_completion/main.py
new file mode 100644
index 00000000..7100fb00
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/batch_completion/main.py
@@ -0,0 +1,253 @@
+from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
+from typing import List, Optional
+
+import litellm
+from litellm._logging import print_verbose
+from litellm.utils import get_optional_params
+
+from ..llms.vllm.completion import handler as vllm_handler
+
+
+def batch_completion(
+    model: str,
+    # Optional OpenAI params: see https://platform.openai.com/docs/api-reference/chat/create
+    messages: List = [],
+    functions: Optional[List] = None,
+    function_call: Optional[str] = None,
+    temperature: Optional[float] = None,
+    top_p: Optional[float] = None,
+    n: Optional[int] = None,
+    stream: Optional[bool] = None,
+    stop=None,
+    max_tokens: Optional[int] = None,
+    presence_penalty: Optional[float] = None,
+    frequency_penalty: Optional[float] = None,
+    logit_bias: Optional[dict] = None,
+    user: Optional[str] = None,
+    deployment_id=None,
+    request_timeout: Optional[int] = None,
+    timeout: Optional[int] = 600,
+    max_workers: Optional[int] = 100,
+    # Optional liteLLM function params
+    **kwargs,
+):
+    """
+    Batch litellm.completion function for a given model.
+
+    Args:
+        model (str): The model to use for generating completions.
+        messages (List, optional): List of messages to use as input for generating completions. Defaults to [].
+        functions (List, optional): List of functions to use as input for generating completions. Defaults to [].
+        function_call (str, optional): The function call to use as input for generating completions. Defaults to "".
+        temperature (float, optional): The temperature parameter for generating completions. Defaults to None.
+        top_p (float, optional): The top-p parameter for generating completions. Defaults to None.
+        n (int, optional): The number of completions to generate. Defaults to None.
+        stream (bool, optional): Whether to stream completions or not. Defaults to None.
+        stop (optional): The stop parameter for generating completions. Defaults to None.
+        max_tokens (float, optional): The maximum number of tokens to generate. Defaults to None.
+        presence_penalty (float, optional): The presence penalty for generating completions. Defaults to None.
+        frequency_penalty (float, optional): The frequency penalty for generating completions. Defaults to None.
+        logit_bias (dict, optional): The logit bias for generating completions. Defaults to {}.
+        user (str, optional): The user string for generating completions. Defaults to "".
+        deployment_id (optional): The deployment ID for generating completions. Defaults to None.
+        request_timeout (int, optional): The request timeout for generating completions. Defaults to None.
+        max_workers (int,optional): The maximum number of threads to use for parallel processing.
+
+    Returns:
+        list: A list of completion results.
+    """
+    args = locals()
+
+    batch_messages = messages
+    completions = []
+    model = model
+    custom_llm_provider = None
+    if model.split("/", 1)[0] in litellm.provider_list:
+        custom_llm_provider = model.split("/", 1)[0]
+        model = model.split("/", 1)[1]
+    if custom_llm_provider == "vllm":
+        optional_params = get_optional_params(
+            functions=functions,
+            function_call=function_call,
+            temperature=temperature,
+            top_p=top_p,
+            n=n,
+            stream=stream or False,
+            stop=stop,
+            max_tokens=max_tokens,
+            presence_penalty=presence_penalty,
+            frequency_penalty=frequency_penalty,
+            logit_bias=logit_bias,
+            user=user,
+            # params to identify the model
+            model=model,
+            custom_llm_provider=custom_llm_provider,
+        )
+        results = vllm_handler.batch_completions(
+            model=model,
+            messages=batch_messages,
+            custom_prompt_dict=litellm.custom_prompt_dict,
+            optional_params=optional_params,
+        )
+    # all non VLLM models for batch completion models
+    else:
+
+        def chunks(lst, n):
+            """Yield successive n-sized chunks from lst."""
+            for i in range(0, len(lst), n):
+                yield lst[i : i + n]
+
+        with ThreadPoolExecutor(max_workers=max_workers) as executor:
+            for sub_batch in chunks(batch_messages, 100):
+                for message_list in sub_batch:
+                    kwargs_modified = args.copy()
+                    kwargs_modified.pop("max_workers")
+                    kwargs_modified["messages"] = message_list
+                    original_kwargs = {}
+                    if "kwargs" in kwargs_modified:
+                        original_kwargs = kwargs_modified.pop("kwargs")
+                    future = executor.submit(
+                        litellm.completion, **kwargs_modified, **original_kwargs
+                    )
+                    completions.append(future)
+
+        # Retrieve the results from the futures
+        # results = [future.result() for future in completions]
+        # return exceptions if any
+        results = []
+        for future in completions:
+            try:
+                results.append(future.result())
+            except Exception as exc:
+                results.append(exc)
+
+    return results
+
+
+# send one request to multiple models
+# return as soon as one of the llms responds
+def batch_completion_models(*args, **kwargs):
+    """
+    Send a request to multiple language models concurrently and return the response
+    as soon as one of the models responds.
+
+    Args:
+        *args: Variable-length positional arguments passed to the completion function.
+        **kwargs: Additional keyword arguments:
+            - models (str or list of str): The language models to send requests to.
+            - Other keyword arguments to be passed to the completion function.
+
+    Returns:
+        str or None: The response from one of the language models, or None if no response is received.
+
+    Note:
+        This function utilizes a ThreadPoolExecutor to parallelize requests to multiple models.
+        It sends requests concurrently and returns the response from the first model that responds.
+    """
+
+    if "model" in kwargs:
+        kwargs.pop("model")
+    if "models" in kwargs:
+        models = kwargs["models"]
+        kwargs.pop("models")
+        futures = {}
+        with ThreadPoolExecutor(max_workers=len(models)) as executor:
+            for model in models:
+                futures[model] = executor.submit(
+                    litellm.completion, *args, model=model, **kwargs
+                )
+
+            for model, future in sorted(
+                futures.items(), key=lambda x: models.index(x[0])
+            ):
+                if future.result() is not None:
+                    return future.result()
+    elif "deployments" in kwargs:
+        deployments = kwargs["deployments"]
+        kwargs.pop("deployments")
+        kwargs.pop("model_list")
+        nested_kwargs = kwargs.pop("kwargs", {})
+        futures = {}
+        with ThreadPoolExecutor(max_workers=len(deployments)) as executor:
+            for deployment in deployments:
+                for key in kwargs.keys():
+                    if (
+                        key not in deployment
+                    ):  # don't override deployment values e.g. model name, api base, etc.
+                        deployment[key] = kwargs[key]
+                kwargs = {**deployment, **nested_kwargs}
+                futures[deployment["model"]] = executor.submit(
+                    litellm.completion, **kwargs
+                )
+
+            while futures:
+                # wait for the first returned future
+                print_verbose("\n\n waiting for next result\n\n")
+                done, _ = wait(futures.values(), return_when=FIRST_COMPLETED)
+                print_verbose(f"done list\n{done}")
+                for future in done:
+                    try:
+                        result = future.result()
+                        return result
+                    except Exception:
+                        # if model 1 fails, continue with response from model 2, model3
+                        print_verbose(
+                            "\n\ngot an exception, ignoring, removing from futures"
+                        )
+                        print_verbose(futures)
+                        new_futures = {}
+                        for key, value in futures.items():
+                            if future == value:
+                                print_verbose(f"removing key{key}")
+                                continue
+                            else:
+                                new_futures[key] = value
+                        futures = new_futures
+                        print_verbose(f"new futures{futures}")
+                        continue
+
+                print_verbose("\n\ndone looping through futures\n\n")
+                print_verbose(futures)
+
+    return None  # If no response is received from any model
+
+
+def batch_completion_models_all_responses(*args, **kwargs):
+    """
+    Send a request to multiple language models concurrently and return a list of responses
+    from all models that respond.
+
+    Args:
+        *args: Variable-length positional arguments passed to the completion function.
+        **kwargs: Additional keyword arguments:
+            - models (str or list of str): The language models to send requests to.
+            - Other keyword arguments to be passed to the completion function.
+
+    Returns:
+        list: A list of responses from the language models that responded.
+
+    Note:
+        This function utilizes a ThreadPoolExecutor to parallelize requests to multiple models.
+        It sends requests concurrently and collects responses from all models that respond.
+    """
+    import concurrent.futures
+
+    # ANSI escape codes for colored output
+
+    if "model" in kwargs:
+        kwargs.pop("model")
+    if "models" in kwargs:
+        models = kwargs["models"]
+        kwargs.pop("models")
+    else:
+        raise Exception("'models' param not in kwargs")
+
+    responses = []
+
+    with concurrent.futures.ThreadPoolExecutor(max_workers=len(models)) as executor:
+        for idx, model in enumerate(models):
+            future = executor.submit(litellm.completion, *args, model=model, **kwargs)
+            if future.result() is not None:
+                responses.append(future.result())
+
+    return responses