about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py182
1 files changed, 182 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py b/.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py
new file mode 100644
index 00000000..af53304e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/batches/batch_utils.py
@@ -0,0 +1,182 @@
+import json
+from typing import Any, List, Literal, Tuple
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.types.llms.openai import Batch
+from litellm.types.utils import CallTypes, Usage
+
+
+async def _handle_completed_batch(
+    batch: Batch,
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"],
+) -> Tuple[float, Usage, List[str]]:
+    """Helper function to process a completed batch and handle logging"""
+    # Get batch results
+    file_content_dictionary = await _get_batch_output_file_content_as_dictionary(
+        batch, custom_llm_provider
+    )
+
+    # Calculate costs and usage
+    batch_cost = await _batch_cost_calculator(
+        custom_llm_provider=custom_llm_provider,
+        file_content_dictionary=file_content_dictionary,
+    )
+    batch_usage = _get_batch_job_total_usage_from_file_content(
+        file_content_dictionary=file_content_dictionary,
+        custom_llm_provider=custom_llm_provider,
+    )
+
+    batch_models = _get_batch_models_from_file_content(file_content_dictionary)
+
+    return batch_cost, batch_usage, batch_models
+
+
+def _get_batch_models_from_file_content(
+    file_content_dictionary: List[dict],
+) -> List[str]:
+    """
+    Get the models from the file content
+    """
+    batch_models = []
+    for _item in file_content_dictionary:
+        if _batch_response_was_successful(_item):
+            _response_body = _get_response_from_batch_job_output_file(_item)
+            _model = _response_body.get("model")
+            if _model:
+                batch_models.append(_model)
+    return batch_models
+
+
+async def _batch_cost_calculator(
+    file_content_dictionary: List[dict],
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
+) -> float:
+    """
+    Calculate the cost of a batch based on the output file id
+    """
+    if custom_llm_provider == "vertex_ai":
+        raise ValueError("Vertex AI does not support file content retrieval")
+    total_cost = _get_batch_job_cost_from_file_content(
+        file_content_dictionary=file_content_dictionary,
+        custom_llm_provider=custom_llm_provider,
+    )
+    verbose_logger.debug("total_cost=%s", total_cost)
+    return total_cost
+
+
+async def _get_batch_output_file_content_as_dictionary(
+    batch: Batch,
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
+) -> List[dict]:
+    """
+    Get the batch output file content as a list of dictionaries
+    """
+    from litellm.files.main import afile_content
+
+    if custom_llm_provider == "vertex_ai":
+        raise ValueError("Vertex AI does not support file content retrieval")
+
+    if batch.output_file_id is None:
+        raise ValueError("Output file id is None cannot retrieve file content")
+
+    _file_content = await afile_content(
+        file_id=batch.output_file_id,
+        custom_llm_provider=custom_llm_provider,
+    )
+    return _get_file_content_as_dictionary(_file_content.content)
+
+
+def _get_file_content_as_dictionary(file_content: bytes) -> List[dict]:
+    """
+    Get the file content as a list of dictionaries from JSON Lines format
+    """
+    try:
+        _file_content_str = file_content.decode("utf-8")
+        # Split by newlines and parse each line as a separate JSON object
+        json_objects = []
+        for line in _file_content_str.strip().split("\n"):
+            if line:  # Skip empty lines
+                json_objects.append(json.loads(line))
+        verbose_logger.debug("json_objects=%s", json.dumps(json_objects, indent=4))
+        return json_objects
+    except Exception as e:
+        raise e
+
+
+def _get_batch_job_cost_from_file_content(
+    file_content_dictionary: List[dict],
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
+) -> float:
+    """
+    Get the cost of a batch job from the file content
+    """
+    try:
+        total_cost: float = 0.0
+        # parse the file content as json
+        verbose_logger.debug(
+            "file_content_dictionary=%s", json.dumps(file_content_dictionary, indent=4)
+        )
+        for _item in file_content_dictionary:
+            if _batch_response_was_successful(_item):
+                _response_body = _get_response_from_batch_job_output_file(_item)
+                total_cost += litellm.completion_cost(
+                    completion_response=_response_body,
+                    custom_llm_provider=custom_llm_provider,
+                    call_type=CallTypes.aretrieve_batch.value,
+                )
+                verbose_logger.debug("total_cost=%s", total_cost)
+        return total_cost
+    except Exception as e:
+        verbose_logger.error("error in _get_batch_job_cost_from_file_content", e)
+        raise e
+
+
+def _get_batch_job_total_usage_from_file_content(
+    file_content_dictionary: List[dict],
+    custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
+) -> Usage:
+    """
+    Get the tokens of a batch job from the file content
+    """
+    total_tokens: int = 0
+    prompt_tokens: int = 0
+    completion_tokens: int = 0
+    for _item in file_content_dictionary:
+        if _batch_response_was_successful(_item):
+            _response_body = _get_response_from_batch_job_output_file(_item)
+            usage: Usage = _get_batch_job_usage_from_response_body(_response_body)
+            total_tokens += usage.total_tokens
+            prompt_tokens += usage.prompt_tokens
+            completion_tokens += usage.completion_tokens
+    return Usage(
+        total_tokens=total_tokens,
+        prompt_tokens=prompt_tokens,
+        completion_tokens=completion_tokens,
+    )
+
+
+def _get_batch_job_usage_from_response_body(response_body: dict) -> Usage:
+    """
+    Get the tokens of a batch job from the response body
+    """
+    _usage_dict = response_body.get("usage", None) or {}
+    usage: Usage = Usage(**_usage_dict)
+    return usage
+
+
+def _get_response_from_batch_job_output_file(batch_job_output_file: dict) -> Any:
+    """
+    Get the response from the batch job output file
+    """
+    _response: dict = batch_job_output_file.get("response", None) or {}
+    _response_body = _response.get("body", None) or {}
+    return _response_body
+
+
+def _batch_response_was_successful(batch_job_output_file: dict) -> bool:
+    """
+    Check if the batch job response status == 200
+    """
+    _response: dict = batch_job_output_file.get("response", None) or {}
+    return _response.get("status_code", None) == 200