aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/azure_storage/azure_storage.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/integrations/azure_storage/azure_storage.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/azure_storage/azure_storage.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/azure_storage/azure_storage.py381
1 files changed, 381 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/azure_storage/azure_storage.py b/.venv/lib/python3.12/site-packages/litellm/integrations/azure_storage/azure_storage.py
new file mode 100644
index 00000000..ddc46b11
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/azure_storage/azure_storage.py
@@ -0,0 +1,381 @@
+import asyncio
+import json
+import os
+import uuid
+from datetime import datetime, timedelta
+from typing import List, Optional
+
+from litellm._logging import verbose_logger
+from litellm.constants import AZURE_STORAGE_MSFT_VERSION
+from litellm.integrations.custom_batch_logger import CustomBatchLogger
+from litellm.llms.azure.common_utils import get_azure_ad_token_from_entrata_id
+from litellm.llms.custom_httpx.http_handler import (
+ AsyncHTTPHandler,
+ get_async_httpx_client,
+ httpxSpecialProvider,
+)
+from litellm.types.utils import StandardLoggingPayload
+
+
+class AzureBlobStorageLogger(CustomBatchLogger):
+ def __init__(
+ self,
+ **kwargs,
+ ):
+ try:
+ verbose_logger.debug(
+ "AzureBlobStorageLogger: in init azure blob storage logger"
+ )
+
+ # Env Variables used for Azure Storage Authentication
+ self.tenant_id = os.getenv("AZURE_STORAGE_TENANT_ID")
+ self.client_id = os.getenv("AZURE_STORAGE_CLIENT_ID")
+ self.client_secret = os.getenv("AZURE_STORAGE_CLIENT_SECRET")
+ self.azure_storage_account_key: Optional[str] = os.getenv(
+ "AZURE_STORAGE_ACCOUNT_KEY"
+ )
+
+ # Required Env Variables for Azure Storage
+ _azure_storage_account_name = os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
+ if not _azure_storage_account_name:
+ raise ValueError(
+ "Missing required environment variable: AZURE_STORAGE_ACCOUNT_NAME"
+ )
+ self.azure_storage_account_name: str = _azure_storage_account_name
+ _azure_storage_file_system = os.getenv("AZURE_STORAGE_FILE_SYSTEM")
+ if not _azure_storage_file_system:
+ raise ValueError(
+ "Missing required environment variable: AZURE_STORAGE_FILE_SYSTEM"
+ )
+ self.azure_storage_file_system: str = _azure_storage_file_system
+
+ # Internal variables used for Token based authentication
+ self.azure_auth_token: Optional[str] = (
+ None # the Azure AD token to use for Azure Storage API requests
+ )
+ self.token_expiry: Optional[datetime] = (
+ None # the expiry time of the currentAzure AD token
+ )
+
+ asyncio.create_task(self.periodic_flush())
+ self.flush_lock = asyncio.Lock()
+ self.log_queue: List[StandardLoggingPayload] = []
+ super().__init__(**kwargs, flush_lock=self.flush_lock)
+ except Exception as e:
+ verbose_logger.exception(
+ f"AzureBlobStorageLogger: Got exception on init AzureBlobStorageLogger client {str(e)}"
+ )
+ raise e
+
+ async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
+ """
+ Async Log success events to Azure Blob Storage
+
+ Raises:
+ Raises a NON Blocking verbose_logger.exception if an error occurs
+ """
+ try:
+ self._premium_user_check()
+ verbose_logger.debug(
+ "AzureBlobStorageLogger: Logging - Enters logging function for model %s",
+ kwargs,
+ )
+ standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
+ "standard_logging_object"
+ )
+
+ if standard_logging_payload is None:
+ raise ValueError("standard_logging_payload is not set")
+
+ self.log_queue.append(standard_logging_payload)
+
+ except Exception as e:
+ verbose_logger.exception(f"AzureBlobStorageLogger Layer Error - {str(e)}")
+ pass
+
+ async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
+ """
+ Async Log failure events to Azure Blob Storage
+
+ Raises:
+ Raises a NON Blocking verbose_logger.exception if an error occurs
+ """
+ try:
+ self._premium_user_check()
+ verbose_logger.debug(
+ "AzureBlobStorageLogger: Logging - Enters logging function for model %s",
+ kwargs,
+ )
+ standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
+ "standard_logging_object"
+ )
+
+ if standard_logging_payload is None:
+ raise ValueError("standard_logging_payload is not set")
+
+ self.log_queue.append(standard_logging_payload)
+ except Exception as e:
+ verbose_logger.exception(f"AzureBlobStorageLogger Layer Error - {str(e)}")
+ pass
+
+ async def async_send_batch(self):
+ """
+ Sends the in memory logs queue to Azure Blob Storage
+
+ Raises:
+ Raises a NON Blocking verbose_logger.exception if an error occurs
+ """
+ try:
+ if not self.log_queue:
+ verbose_logger.exception("Datadog: log_queue does not exist")
+ return
+
+ verbose_logger.debug(
+ "AzureBlobStorageLogger - about to flush %s events",
+ len(self.log_queue),
+ )
+
+ for payload in self.log_queue:
+ await self.async_upload_payload_to_azure_blob_storage(payload=payload)
+
+ except Exception as e:
+ verbose_logger.exception(
+ f"AzureBlobStorageLogger Error sending batch API - {str(e)}"
+ )
+
+ async def async_upload_payload_to_azure_blob_storage(
+ self, payload: StandardLoggingPayload
+ ):
+ """
+ Uploads the payload to Azure Blob Storage using a 3-step process:
+ 1. Create file resource
+ 2. Append data
+ 3. Flush the data
+ """
+ try:
+
+ if self.azure_storage_account_key:
+ await self.upload_to_azure_data_lake_with_azure_account_key(
+ payload=payload
+ )
+ else:
+ # Get a valid token instead of always requesting a new one
+ await self.set_valid_azure_ad_token()
+ async_client = get_async_httpx_client(
+ llm_provider=httpxSpecialProvider.LoggingCallback
+ )
+ json_payload = (
+ json.dumps(payload) + "\n"
+ ) # Add newline for each log entry
+ payload_bytes = json_payload.encode("utf-8")
+ filename = f"{payload.get('id') or str(uuid.uuid4())}.json"
+ base_url = f"https://{self.azure_storage_account_name}.dfs.core.windows.net/{self.azure_storage_file_system}/{filename}"
+
+ # Execute the 3-step upload process
+ await self._create_file(async_client, base_url)
+ await self._append_data(async_client, base_url, json_payload)
+ await self._flush_data(async_client, base_url, len(payload_bytes))
+
+ verbose_logger.debug(
+ f"Successfully uploaded log to Azure Blob Storage: {filename}"
+ )
+
+ except Exception as e:
+ verbose_logger.exception(f"Error uploading to Azure Blob Storage: {str(e)}")
+ raise e
+
+ async def _create_file(self, client: AsyncHTTPHandler, base_url: str):
+ """Helper method to create the file resource"""
+ try:
+ verbose_logger.debug(f"Creating file resource at: {base_url}")
+ headers = {
+ "x-ms-version": AZURE_STORAGE_MSFT_VERSION,
+ "Content-Length": "0",
+ "Authorization": f"Bearer {self.azure_auth_token}",
+ }
+ response = await client.put(f"{base_url}?resource=file", headers=headers)
+ response.raise_for_status()
+ verbose_logger.debug("Successfully created file resource")
+ except Exception as e:
+ verbose_logger.exception(f"Error creating file resource: {str(e)}")
+ raise
+
+ async def _append_data(
+ self, client: AsyncHTTPHandler, base_url: str, json_payload: str
+ ):
+ """Helper method to append data to the file"""
+ try:
+ verbose_logger.debug(f"Appending data to file: {base_url}")
+ headers = {
+ "x-ms-version": AZURE_STORAGE_MSFT_VERSION,
+ "Content-Type": "application/json",
+ "Authorization": f"Bearer {self.azure_auth_token}",
+ }
+ response = await client.patch(
+ f"{base_url}?action=append&position=0",
+ headers=headers,
+ data=json_payload,
+ )
+ response.raise_for_status()
+ verbose_logger.debug("Successfully appended data")
+ except Exception as e:
+ verbose_logger.exception(f"Error appending data: {str(e)}")
+ raise
+
+ async def _flush_data(self, client: AsyncHTTPHandler, base_url: str, position: int):
+ """Helper method to flush the data"""
+ try:
+ verbose_logger.debug(f"Flushing data at position {position}")
+ headers = {
+ "x-ms-version": AZURE_STORAGE_MSFT_VERSION,
+ "Content-Length": "0",
+ "Authorization": f"Bearer {self.azure_auth_token}",
+ }
+ response = await client.patch(
+ f"{base_url}?action=flush&position={position}", headers=headers
+ )
+ response.raise_for_status()
+ verbose_logger.debug("Successfully flushed data")
+ except Exception as e:
+ verbose_logger.exception(f"Error flushing data: {str(e)}")
+ raise
+
+ ####### Helper methods to managing Authentication to Azure Storage #######
+ ##########################################################################
+
+ async def set_valid_azure_ad_token(self):
+ """
+ Wrapper to set self.azure_auth_token to a valid Azure AD token, refreshing if necessary
+
+ Refreshes the token when:
+ - Token is expired
+ - Token is not set
+ """
+ # Check if token needs refresh
+ if self._azure_ad_token_is_expired() or self.azure_auth_token is None:
+ verbose_logger.debug("Azure AD token needs refresh")
+ self.azure_auth_token = self.get_azure_ad_token_from_azure_storage(
+ tenant_id=self.tenant_id,
+ client_id=self.client_id,
+ client_secret=self.client_secret,
+ )
+ # Token typically expires in 1 hour
+ self.token_expiry = datetime.now() + timedelta(hours=1)
+ verbose_logger.debug(f"New token will expire at {self.token_expiry}")
+
+ def get_azure_ad_token_from_azure_storage(
+ self,
+ tenant_id: Optional[str],
+ client_id: Optional[str],
+ client_secret: Optional[str],
+ ) -> str:
+ """
+ Gets Azure AD token to use for Azure Storage API requests
+ """
+ verbose_logger.debug("Getting Azure AD Token from Azure Storage")
+ verbose_logger.debug(
+ "tenant_id %s, client_id %s, client_secret %s",
+ tenant_id,
+ client_id,
+ client_secret,
+ )
+ if tenant_id is None:
+ raise ValueError(
+ "Missing required environment variable: AZURE_STORAGE_TENANT_ID"
+ )
+ if client_id is None:
+ raise ValueError(
+ "Missing required environment variable: AZURE_STORAGE_CLIENT_ID"
+ )
+ if client_secret is None:
+ raise ValueError(
+ "Missing required environment variable: AZURE_STORAGE_CLIENT_SECRET"
+ )
+
+ token_provider = get_azure_ad_token_from_entrata_id(
+ tenant_id=tenant_id,
+ client_id=client_id,
+ client_secret=client_secret,
+ scope="https://storage.azure.com/.default",
+ )
+ token = token_provider()
+
+ verbose_logger.debug("azure auth token %s", token)
+
+ return token
+
+ def _azure_ad_token_is_expired(self):
+ """
+ Returns True if Azure AD token is expired, False otherwise
+ """
+ if self.azure_auth_token and self.token_expiry:
+ if datetime.now() + timedelta(minutes=5) >= self.token_expiry:
+ verbose_logger.debug("Azure AD token is expired. Requesting new token")
+ return True
+ return False
+
+ def _premium_user_check(self):
+ """
+ Checks if the user is a premium user, raises an error if not
+ """
+ from litellm.proxy.proxy_server import CommonProxyErrors, premium_user
+
+ if premium_user is not True:
+ raise ValueError(
+ f"AzureBlobStorageLogger is only available for premium users. {CommonProxyErrors.not_premium_user}"
+ )
+
+ async def upload_to_azure_data_lake_with_azure_account_key(
+ self, payload: StandardLoggingPayload
+ ):
+ """
+ Uploads the payload to Azure Data Lake using the Azure SDK
+
+ This is used when Azure Storage Account Key is set - Azure Storage Account Key does not work directly with Azure Rest API
+ """
+ from azure.storage.filedatalake.aio import DataLakeServiceClient
+
+ # Create an async service client
+ service_client = DataLakeServiceClient(
+ account_url=f"https://{self.azure_storage_account_name}.dfs.core.windows.net",
+ credential=self.azure_storage_account_key,
+ )
+ # Get file system client
+ file_system_client = service_client.get_file_system_client(
+ file_system=self.azure_storage_file_system
+ )
+
+ try:
+ # Create directory with today's date
+ from datetime import datetime
+
+ today = datetime.now().strftime("%Y-%m-%d")
+ directory_client = file_system_client.get_directory_client(today)
+
+ # check if the directory exists
+ if not await directory_client.exists():
+ await directory_client.create_directory()
+ verbose_logger.debug(f"Created directory: {today}")
+
+ # Create a file client
+ file_name = f"{payload.get('id') or str(uuid.uuid4())}.json"
+ file_client = directory_client.get_file_client(file_name)
+
+ # Create the file
+ await file_client.create_file()
+
+ # Content to append
+ content = json.dumps(payload).encode("utf-8")
+
+ # Append content to the file
+ await file_client.append_data(data=content, offset=0, length=len(content))
+
+ # Flush the content to finalize the file
+ await file_client.flush_data(position=len(content), offset=0)
+
+ verbose_logger.debug(
+ f"Successfully uploaded and wrote to {today}/{file_name}"
+ )
+
+ except Exception as e:
+ verbose_logger.exception(f"Error occurred: {str(e)}")