diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/integrations/datadog | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/datadog')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog.py | 580 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog_llm_obs.py | 203 |
2 files changed, 783 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog.py b/.venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog.py new file mode 100644 index 00000000..4f4b05c8 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog.py @@ -0,0 +1,580 @@ +""" +DataDog Integration - sends logs to /api/v2/log + +DD Reference API: https://docs.datadoghq.com/api/latest/logs + +`async_log_success_event` - used by litellm proxy to send logs to datadog +`log_success_event` - sync version of logging to DataDog, only used on litellm Python SDK, if user opts in to using sync functions + +async_log_success_event: will store batch of DD_MAX_BATCH_SIZE in memory and flush to Datadog once it reaches DD_MAX_BATCH_SIZE or every 5 seconds + +async_service_failure_hook: Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog + +For batching specific details see CustomBatchLogger class +""" + +import asyncio +import datetime +import json +import os +import traceback +import uuid +from datetime import datetime as datetimeObj +from typing import Any, List, Optional, Union + +import httpx +from httpx import Response + +import litellm +from litellm._logging import verbose_logger +from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.llms.custom_httpx.http_handler import ( + _get_httpx_client, + get_async_httpx_client, + httpxSpecialProvider, +) +from litellm.types.integrations.base_health_check import IntegrationHealthCheckStatus +from litellm.types.integrations.datadog import * +from litellm.types.services import ServiceLoggerPayload, ServiceTypes +from litellm.types.utils import StandardLoggingPayload + +from ..additional_logging_utils import AdditionalLoggingUtils + +# max number of logs DD API can accept +DD_MAX_BATCH_SIZE = 1000 + +# specify what ServiceTypes are logged as success events to DD. (We don't want to spam DD traces with large number of service types) +DD_LOGGED_SUCCESS_SERVICE_TYPES = [ + ServiceTypes.RESET_BUDGET_JOB, +] + + +class DataDogLogger( + CustomBatchLogger, + AdditionalLoggingUtils, +): + # Class variables or attributes + def __init__( + self, + **kwargs, + ): + """ + Initializes the datadog logger, checks if the correct env variables are set + + Required environment variables: + `DD_API_KEY` - your datadog api key + `DD_SITE` - your datadog site, example = `"us5.datadoghq.com"` + """ + try: + verbose_logger.debug("Datadog: in init datadog logger") + # check if the correct env variables are set + if os.getenv("DD_API_KEY", None) is None: + raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>") + if os.getenv("DD_SITE", None) is None: + raise Exception("DD_SITE is not set in .env, set 'DD_SITE=<>") + self.async_client = get_async_httpx_client( + llm_provider=httpxSpecialProvider.LoggingCallback + ) + self.DD_API_KEY = os.getenv("DD_API_KEY") + self.intake_url = ( + f"https://http-intake.logs.{os.getenv('DD_SITE')}/api/v2/logs" + ) + + ################################### + # OPTIONAL -only used for testing + dd_base_url: Optional[str] = ( + os.getenv("_DATADOG_BASE_URL") + or os.getenv("DATADOG_BASE_URL") + or os.getenv("DD_BASE_URL") + ) + if dd_base_url is not None: + self.intake_url = f"{dd_base_url}/api/v2/logs" + ################################### + self.sync_client = _get_httpx_client() + asyncio.create_task(self.periodic_flush()) + self.flush_lock = asyncio.Lock() + super().__init__( + **kwargs, flush_lock=self.flush_lock, batch_size=DD_MAX_BATCH_SIZE + ) + except Exception as e: + verbose_logger.exception( + f"Datadog: Got exception on init Datadog client {str(e)}" + ) + raise e + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + """ + Async Log success events to Datadog + + - Creates a Datadog payload + - Adds the Payload to the in memory logs queue + - Payload is flushed every 10 seconds or when batch size is greater than 100 + + + Raises: + Raises a NON Blocking verbose_logger.exception if an error occurs + """ + try: + verbose_logger.debug( + "Datadog: Logging - Enters logging function for model %s", kwargs + ) + await self._log_async_event(kwargs, response_obj, start_time, end_time) + + except Exception as e: + verbose_logger.exception( + f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" + ) + pass + + async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): + try: + verbose_logger.debug( + "Datadog: Logging - Enters logging function for model %s", kwargs + ) + await self._log_async_event(kwargs, response_obj, start_time, end_time) + + except Exception as e: + verbose_logger.exception( + f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" + ) + pass + + async def async_send_batch(self): + """ + Sends the in memory logs queue to datadog api + + Logs sent to /api/v2/logs + + DD Ref: https://docs.datadoghq.com/api/latest/logs/ + + Raises: + Raises a NON Blocking verbose_logger.exception if an error occurs + """ + try: + if not self.log_queue: + verbose_logger.exception("Datadog: log_queue does not exist") + return + + verbose_logger.debug( + "Datadog - about to flush %s events on %s", + len(self.log_queue), + self.intake_url, + ) + + response = await self.async_send_compressed_data(self.log_queue) + if response.status_code == 413: + verbose_logger.exception(DD_ERRORS.DATADOG_413_ERROR.value) + return + + response.raise_for_status() + if response.status_code != 202: + raise Exception( + f"Response from datadog API status_code: {response.status_code}, text: {response.text}" + ) + + verbose_logger.debug( + "Datadog: Response from datadog API status_code: %s, text: %s", + response.status_code, + response.text, + ) + except Exception as e: + verbose_logger.exception( + f"Datadog Error sending batch API - {str(e)}\n{traceback.format_exc()}" + ) + + def log_success_event(self, kwargs, response_obj, start_time, end_time): + """ + Sync Log success events to Datadog + + - Creates a Datadog payload + - instantly logs it on DD API + """ + try: + if litellm.datadog_use_v1 is True: + dd_payload = self._create_v0_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + else: + dd_payload = self.create_datadog_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + + response = self.sync_client.post( + url=self.intake_url, + json=dd_payload, # type: ignore + headers={ + "DD-API-KEY": self.DD_API_KEY, + }, + ) + + response.raise_for_status() + if response.status_code != 202: + raise Exception( + f"Response from datadog API status_code: {response.status_code}, text: {response.text}" + ) + + verbose_logger.debug( + "Datadog: Response from datadog API status_code: %s, text: %s", + response.status_code, + response.text, + ) + + except Exception as e: + verbose_logger.exception( + f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" + ) + pass + pass + + async def _log_async_event(self, kwargs, response_obj, start_time, end_time): + + dd_payload = self.create_datadog_logging_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + + self.log_queue.append(dd_payload) + verbose_logger.debug( + f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..." + ) + + if len(self.log_queue) >= self.batch_size: + await self.async_send_batch() + + def _create_datadog_logging_payload_helper( + self, + standard_logging_object: StandardLoggingPayload, + status: DataDogStatus, + ) -> DatadogPayload: + json_payload = json.dumps(standard_logging_object, default=str) + verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) + dd_payload = DatadogPayload( + ddsource=self._get_datadog_source(), + ddtags=self._get_datadog_tags( + standard_logging_object=standard_logging_object + ), + hostname=self._get_datadog_hostname(), + message=json_payload, + service=self._get_datadog_service(), + status=status, + ) + return dd_payload + + def create_datadog_logging_payload( + self, + kwargs: Union[dict, Any], + response_obj: Any, + start_time: datetime.datetime, + end_time: datetime.datetime, + ) -> DatadogPayload: + """ + Helper function to create a datadog payload for logging + + Args: + kwargs (Union[dict, Any]): request kwargs + response_obj (Any): llm api response + start_time (datetime.datetime): start time of request + end_time (datetime.datetime): end time of request + + Returns: + DatadogPayload: defined in types.py + """ + + standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get( + "standard_logging_object", None + ) + if standard_logging_object is None: + raise ValueError("standard_logging_object not found in kwargs") + + status = DataDogStatus.INFO + if standard_logging_object.get("status") == "failure": + status = DataDogStatus.ERROR + + # Build the initial payload + self.truncate_standard_logging_payload_content(standard_logging_object) + + dd_payload = self._create_datadog_logging_payload_helper( + standard_logging_object=standard_logging_object, + status=status, + ) + return dd_payload + + async def async_send_compressed_data(self, data: List) -> Response: + """ + Async helper to send compressed data to datadog self.intake_url + + Datadog recommends using gzip to compress data + https://docs.datadoghq.com/api/latest/logs/ + + "Datadog recommends sending your logs compressed. Add the Content-Encoding: gzip header to the request when sending" + """ + + import gzip + import json + + compressed_data = gzip.compress(json.dumps(data, default=str).encode("utf-8")) + response = await self.async_client.post( + url=self.intake_url, + data=compressed_data, # type: ignore + headers={ + "DD-API-KEY": self.DD_API_KEY, + "Content-Encoding": "gzip", + "Content-Type": "application/json", + }, + ) + return response + + async def async_service_failure_hook( + self, + payload: ServiceLoggerPayload, + error: Optional[str] = "", + parent_otel_span: Optional[Any] = None, + start_time: Optional[Union[datetimeObj, float]] = None, + end_time: Optional[Union[float, datetimeObj]] = None, + event_metadata: Optional[dict] = None, + ): + """ + Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog + + - example - Redis is failing / erroring, will be logged on DataDog + """ + try: + _payload_dict = payload.model_dump() + _payload_dict.update(event_metadata or {}) + _dd_message_str = json.dumps(_payload_dict, default=str) + _dd_payload = DatadogPayload( + ddsource=self._get_datadog_source(), + ddtags=self._get_datadog_tags(), + hostname=self._get_datadog_hostname(), + message=_dd_message_str, + service=self._get_datadog_service(), + status=DataDogStatus.WARN, + ) + + self.log_queue.append(_dd_payload) + + except Exception as e: + verbose_logger.exception( + f"Datadog: Logger - Exception in async_service_failure_hook: {e}" + ) + pass + + async def async_service_success_hook( + self, + payload: ServiceLoggerPayload, + error: Optional[str] = "", + parent_otel_span: Optional[Any] = None, + start_time: Optional[Union[datetimeObj, float]] = None, + end_time: Optional[Union[float, datetimeObj]] = None, + event_metadata: Optional[dict] = None, + ): + """ + Logs success from Redis, Postgres (Adjacent systems), as 'INFO' on DataDog + + No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this + """ + try: + # intentionally done. Don't want to log all service types to DD + if payload.service not in DD_LOGGED_SUCCESS_SERVICE_TYPES: + return + + _payload_dict = payload.model_dump() + _payload_dict.update(event_metadata or {}) + + _dd_message_str = json.dumps(_payload_dict, default=str) + _dd_payload = DatadogPayload( + ddsource=self._get_datadog_source(), + ddtags=self._get_datadog_tags(), + hostname=self._get_datadog_hostname(), + message=_dd_message_str, + service=self._get_datadog_service(), + status=DataDogStatus.INFO, + ) + + self.log_queue.append(_dd_payload) + + except Exception as e: + verbose_logger.exception( + f"Datadog: Logger - Exception in async_service_failure_hook: {e}" + ) + + def _create_v0_logging_payload( + self, + kwargs: Union[dict, Any], + response_obj: Any, + start_time: datetime.datetime, + end_time: datetime.datetime, + ) -> DatadogPayload: + """ + Note: This is our V1 Version of DataDog Logging Payload + + + (Not Recommended) If you want this to get logged set `litellm.datadog_use_v1 = True` + """ + import json + + litellm_params = kwargs.get("litellm_params", {}) + metadata = ( + litellm_params.get("metadata", {}) or {} + ) # if litellm_params['metadata'] == None + messages = kwargs.get("messages") + optional_params = kwargs.get("optional_params", {}) + call_type = kwargs.get("call_type", "litellm.completion") + cache_hit = kwargs.get("cache_hit", False) + usage = response_obj["usage"] + id = response_obj.get("id", str(uuid.uuid4())) + usage = dict(usage) + try: + response_time = (end_time - start_time).total_seconds() * 1000 + except Exception: + response_time = None + + try: + response_obj = dict(response_obj) + except Exception: + response_obj = response_obj + + # Clean Metadata before logging - never log raw metadata + # the raw metadata can contain circular references which leads to infinite recursion + # we clean out all extra litellm metadata params before logging + clean_metadata = {} + if isinstance(metadata, dict): + for key, value in metadata.items(): + # clean litellm metadata before logging + if key in [ + "endpoint", + "caching_groups", + "previous_models", + ]: + continue + else: + clean_metadata[key] = value + + # Build the initial payload + payload = { + "id": id, + "call_type": call_type, + "cache_hit": cache_hit, + "start_time": start_time, + "end_time": end_time, + "response_time": response_time, + "model": kwargs.get("model", ""), + "user": kwargs.get("user", ""), + "model_parameters": optional_params, + "spend": kwargs.get("response_cost", 0), + "messages": messages, + "response": response_obj, + "usage": usage, + "metadata": clean_metadata, + } + + json_payload = json.dumps(payload, default=str) + + verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) + + dd_payload = DatadogPayload( + ddsource=self._get_datadog_source(), + ddtags=self._get_datadog_tags(), + hostname=self._get_datadog_hostname(), + message=json_payload, + service=self._get_datadog_service(), + status=DataDogStatus.INFO, + ) + return dd_payload + + @staticmethod + def _get_datadog_tags( + standard_logging_object: Optional[StandardLoggingPayload] = None, + ) -> str: + """ + Get the datadog tags for the request + + DD tags need to be as follows: + - tags: ["user_handle:dog@gmail.com", "app_version:1.0.0"] + """ + base_tags = { + "env": os.getenv("DD_ENV", "unknown"), + "service": os.getenv("DD_SERVICE", "litellm"), + "version": os.getenv("DD_VERSION", "unknown"), + "HOSTNAME": DataDogLogger._get_datadog_hostname(), + "POD_NAME": os.getenv("POD_NAME", "unknown"), + } + + tags = [f"{k}:{v}" for k, v in base_tags.items()] + + if standard_logging_object: + _request_tags: List[str] = ( + standard_logging_object.get("request_tags", []) or [] + ) + request_tags = [f"request_tag:{tag}" for tag in _request_tags] + tags.extend(request_tags) + + return ",".join(tags) + + @staticmethod + def _get_datadog_source(): + return os.getenv("DD_SOURCE", "litellm") + + @staticmethod + def _get_datadog_service(): + return os.getenv("DD_SERVICE", "litellm-server") + + @staticmethod + def _get_datadog_hostname(): + return os.getenv("HOSTNAME", "") + + @staticmethod + def _get_datadog_env(): + return os.getenv("DD_ENV", "unknown") + + @staticmethod + def _get_datadog_pod_name(): + return os.getenv("POD_NAME", "unknown") + + async def async_health_check(self) -> IntegrationHealthCheckStatus: + """ + Check if the service is healthy + """ + from litellm.litellm_core_utils.litellm_logging import ( + create_dummy_standard_logging_payload, + ) + + standard_logging_object = create_dummy_standard_logging_payload() + dd_payload = self._create_datadog_logging_payload_helper( + standard_logging_object=standard_logging_object, + status=DataDogStatus.INFO, + ) + log_queue = [dd_payload] + response = await self.async_send_compressed_data(log_queue) + try: + response.raise_for_status() + return IntegrationHealthCheckStatus( + status="healthy", + error_message=None, + ) + except httpx.HTTPStatusError as e: + return IntegrationHealthCheckStatus( + status="unhealthy", + error_message=e.response.text, + ) + except Exception as e: + return IntegrationHealthCheckStatus( + status="unhealthy", + error_message=str(e), + ) + + async def get_request_response_payload( + self, + request_id: str, + start_time_utc: Optional[datetimeObj], + end_time_utc: Optional[datetimeObj], + ) -> Optional[dict]: + pass diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog_llm_obs.py b/.venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog_llm_obs.py new file mode 100644 index 00000000..e4e074ba --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog_llm_obs.py @@ -0,0 +1,203 @@ +""" +Implements logging integration with Datadog's LLM Observability Service + + +API Reference: https://docs.datadoghq.com/llm_observability/setup/api/?tab=example#api-standards + +""" + +import asyncio +import json +import os +import uuid +from datetime import datetime +from typing import Any, Dict, List, Optional, Union + +import litellm +from litellm._logging import verbose_logger +from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.integrations.datadog.datadog import DataDogLogger +from litellm.llms.custom_httpx.http_handler import ( + get_async_httpx_client, + httpxSpecialProvider, +) +from litellm.types.integrations.datadog_llm_obs import * +from litellm.types.utils import StandardLoggingPayload + + +class DataDogLLMObsLogger(DataDogLogger, CustomBatchLogger): + def __init__(self, **kwargs): + try: + verbose_logger.debug("DataDogLLMObs: Initializing logger") + if os.getenv("DD_API_KEY", None) is None: + raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>'") + if os.getenv("DD_SITE", None) is None: + raise Exception( + "DD_SITE is not set, set 'DD_SITE=<>', example sit = `us5.datadoghq.com`" + ) + + self.async_client = get_async_httpx_client( + llm_provider=httpxSpecialProvider.LoggingCallback + ) + self.DD_API_KEY = os.getenv("DD_API_KEY") + self.DD_SITE = os.getenv("DD_SITE") + self.intake_url = ( + f"https://api.{self.DD_SITE}/api/intake/llm-obs/v1/trace/spans" + ) + + # testing base url + dd_base_url = os.getenv("DD_BASE_URL") + if dd_base_url: + self.intake_url = f"{dd_base_url}/api/intake/llm-obs/v1/trace/spans" + + asyncio.create_task(self.periodic_flush()) + self.flush_lock = asyncio.Lock() + self.log_queue: List[LLMObsPayload] = [] + CustomBatchLogger.__init__(self, **kwargs, flush_lock=self.flush_lock) + except Exception as e: + verbose_logger.exception(f"DataDogLLMObs: Error initializing - {str(e)}") + raise e + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + try: + verbose_logger.debug( + f"DataDogLLMObs: Logging success event for model {kwargs.get('model', 'unknown')}" + ) + payload = self.create_llm_obs_payload( + kwargs, response_obj, start_time, end_time + ) + verbose_logger.debug(f"DataDogLLMObs: Payload: {payload}") + self.log_queue.append(payload) + + if len(self.log_queue) >= self.batch_size: + await self.async_send_batch() + except Exception as e: + verbose_logger.exception( + f"DataDogLLMObs: Error logging success event - {str(e)}" + ) + + async def async_send_batch(self): + try: + if not self.log_queue: + return + + verbose_logger.debug( + f"DataDogLLMObs: Flushing {len(self.log_queue)} events" + ) + + # Prepare the payload + payload = { + "data": DDIntakePayload( + type="span", + attributes=DDSpanAttributes( + ml_app=self._get_datadog_service(), + tags=[self._get_datadog_tags()], + spans=self.log_queue, + ), + ), + } + verbose_logger.debug("payload %s", json.dumps(payload, indent=4)) + response = await self.async_client.post( + url=self.intake_url, + json=payload, + headers={ + "DD-API-KEY": self.DD_API_KEY, + "Content-Type": "application/json", + }, + ) + + response.raise_for_status() + if response.status_code != 202: + raise Exception( + f"DataDogLLMObs: Unexpected response - status_code: {response.status_code}, text: {response.text}" + ) + + verbose_logger.debug( + f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}" + ) + self.log_queue.clear() + except Exception as e: + verbose_logger.exception(f"DataDogLLMObs: Error sending batch - {str(e)}") + + def create_llm_obs_payload( + self, kwargs: Dict, response_obj: Any, start_time: datetime, end_time: datetime + ) -> LLMObsPayload: + standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get( + "standard_logging_object" + ) + if standard_logging_payload is None: + raise Exception("DataDogLLMObs: standard_logging_object is not set") + + messages = standard_logging_payload["messages"] + messages = self._ensure_string_content(messages=messages) + + metadata = kwargs.get("litellm_params", {}).get("metadata", {}) + + input_meta = InputMeta(messages=messages) # type: ignore + output_meta = OutputMeta(messages=self._get_response_messages(response_obj)) + + meta = Meta( + kind="llm", + input=input_meta, + output=output_meta, + metadata=self._get_dd_llm_obs_payload_metadata(standard_logging_payload), + ) + + # Calculate metrics (you may need to adjust these based on available data) + metrics = LLMMetrics( + input_tokens=float(standard_logging_payload.get("prompt_tokens", 0)), + output_tokens=float(standard_logging_payload.get("completion_tokens", 0)), + total_tokens=float(standard_logging_payload.get("total_tokens", 0)), + ) + + return LLMObsPayload( + parent_id=metadata.get("parent_id", "undefined"), + trace_id=metadata.get("trace_id", str(uuid.uuid4())), + span_id=metadata.get("span_id", str(uuid.uuid4())), + name=metadata.get("name", "litellm_llm_call"), + meta=meta, + start_ns=int(start_time.timestamp() * 1e9), + duration=int((end_time - start_time).total_seconds() * 1e9), + metrics=metrics, + tags=[ + self._get_datadog_tags(standard_logging_object=standard_logging_payload) + ], + ) + + def _get_response_messages(self, response_obj: Any) -> List[Any]: + """ + Get the messages from the response object + + for now this handles logging /chat/completions responses + """ + if isinstance(response_obj, litellm.ModelResponse): + return [response_obj["choices"][0]["message"].json()] + return [] + + def _ensure_string_content( + self, messages: Optional[Union[str, List[Any], Dict[Any, Any]]] + ) -> List[Any]: + if messages is None: + return [] + if isinstance(messages, str): + return [messages] + elif isinstance(messages, list): + return [message for message in messages] + elif isinstance(messages, dict): + return [str(messages.get("content", ""))] + return [] + + def _get_dd_llm_obs_payload_metadata( + self, standard_logging_payload: StandardLoggingPayload + ) -> Dict: + _metadata = { + "model_name": standard_logging_payload.get("model", "unknown"), + "model_provider": standard_logging_payload.get( + "custom_llm_provider", "unknown" + ), + } + _standard_logging_metadata: dict = ( + dict(standard_logging_payload.get("metadata", {})) or {} + ) + _metadata.update(_standard_logging_metadata) + return _metadata |