about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/botocore/monitoring.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/monitoring.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/monitoring.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/monitoring.py586
1 files changed, 586 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/monitoring.py b/.venv/lib/python3.12/site-packages/botocore/monitoring.py
new file mode 100644
index 00000000..71d72302
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/monitoring.py
@@ -0,0 +1,586 @@
+# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+#     http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import json
+import logging
+import re
+import time
+
+from botocore.compat import ensure_bytes, ensure_unicode, urlparse
+from botocore.retryhandler import EXCEPTION_MAP as RETRYABLE_EXCEPTIONS
+
+logger = logging.getLogger(__name__)
+
+
+class Monitor:
+    _EVENTS_TO_REGISTER = [
+        'before-parameter-build',
+        'request-created',
+        'response-received',
+        'after-call',
+        'after-call-error',
+    ]
+
+    def __init__(self, adapter, publisher):
+        """Abstraction for monitoring clients API calls
+
+        :param adapter: An adapter that takes event emitter events
+            and produces monitor events
+
+        :param publisher: A publisher for generated monitor events
+        """
+        self._adapter = adapter
+        self._publisher = publisher
+
+    def register(self, event_emitter):
+        """Register an event emitter to the monitor"""
+        for event_to_register in self._EVENTS_TO_REGISTER:
+            event_emitter.register_last(event_to_register, self.capture)
+
+    def capture(self, event_name, **payload):
+        """Captures an incoming event from the event emitter
+
+        It will feed an event emitter event to the monitor's adaptor to create
+        a monitor event and then publish that event to the monitor's publisher.
+        """
+        try:
+            monitor_event = self._adapter.feed(event_name, payload)
+            if monitor_event:
+                self._publisher.publish(monitor_event)
+        except Exception as e:
+            logger.debug(
+                'Exception %s raised by client monitor in handling event %s',
+                e,
+                event_name,
+                exc_info=True,
+            )
+
+
+class MonitorEventAdapter:
+    def __init__(self, time=time.time):
+        """Adapts event emitter events to produce monitor events
+
+        :type time: callable
+        :param time: A callable that produces the current time
+        """
+        self._time = time
+
+    def feed(self, emitter_event_name, emitter_payload):
+        """Feed an event emitter event to generate a monitor event
+
+        :type emitter_event_name: str
+        :param emitter_event_name: The name of the event emitted
+
+        :type emitter_payload: dict
+        :param emitter_payload: The payload to associated to the event
+            emitted
+
+        :rtype: BaseMonitorEvent
+        :returns: A monitor event based on the event emitter events
+            fired
+        """
+        return self._get_handler(emitter_event_name)(**emitter_payload)
+
+    def _get_handler(self, event_name):
+        return getattr(
+            self, '_handle_' + event_name.split('.')[0].replace('-', '_')
+        )
+
+    def _handle_before_parameter_build(self, model, context, **kwargs):
+        context['current_api_call_event'] = APICallEvent(
+            service=model.service_model.service_id,
+            operation=model.wire_name,
+            timestamp=self._get_current_time(),
+        )
+
+    def _handle_request_created(self, request, **kwargs):
+        context = request.context
+        new_attempt_event = context[
+            'current_api_call_event'
+        ].new_api_call_attempt(timestamp=self._get_current_time())
+        new_attempt_event.request_headers = request.headers
+        new_attempt_event.url = request.url
+        context['current_api_call_attempt_event'] = new_attempt_event
+
+    def _handle_response_received(
+        self, parsed_response, context, exception, **kwargs
+    ):
+        attempt_event = context.pop('current_api_call_attempt_event')
+        attempt_event.latency = self._get_latency(attempt_event)
+        if parsed_response is not None:
+            attempt_event.http_status_code = parsed_response[
+                'ResponseMetadata'
+            ]['HTTPStatusCode']
+            attempt_event.response_headers = parsed_response[
+                'ResponseMetadata'
+            ]['HTTPHeaders']
+            attempt_event.parsed_error = parsed_response.get('Error')
+        else:
+            attempt_event.wire_exception = exception
+        return attempt_event
+
+    def _handle_after_call(self, context, parsed, **kwargs):
+        context['current_api_call_event'].retries_exceeded = parsed[
+            'ResponseMetadata'
+        ].get('MaxAttemptsReached', False)
+        return self._complete_api_call(context)
+
+    def _handle_after_call_error(self, context, exception, **kwargs):
+        # If the after-call-error was emitted and the error being raised
+        # was a retryable connection error, then the retries must have exceeded
+        # for that exception as this event gets emitted **after** retries
+        # happen.
+        context[
+            'current_api_call_event'
+        ].retries_exceeded = self._is_retryable_exception(exception)
+        return self._complete_api_call(context)
+
+    def _is_retryable_exception(self, exception):
+        return isinstance(
+            exception, tuple(RETRYABLE_EXCEPTIONS['GENERAL_CONNECTION_ERROR'])
+        )
+
+    def _complete_api_call(self, context):
+        call_event = context.pop('current_api_call_event')
+        call_event.latency = self._get_latency(call_event)
+        return call_event
+
+    def _get_latency(self, event):
+        return self._get_current_time() - event.timestamp
+
+    def _get_current_time(self):
+        return int(self._time() * 1000)
+
+
+class BaseMonitorEvent:
+    def __init__(self, service, operation, timestamp):
+        """Base monitor event
+
+        :type service: str
+        :param service: A string identifying the service associated to
+            the event
+
+        :type operation: str
+        :param operation: A string identifying the operation of service
+            associated to the event
+
+        :type timestamp: int
+        :param timestamp: Epoch time in milliseconds from when the event began
+        """
+        self.service = service
+        self.operation = operation
+        self.timestamp = timestamp
+
+    def __repr__(self):
+        return f'{self.__class__.__name__}({self.__dict__!r})'
+
+    def __eq__(self, other):
+        if isinstance(other, self.__class__):
+            return self.__dict__ == other.__dict__
+        return False
+
+
+class APICallEvent(BaseMonitorEvent):
+    def __init__(
+        self,
+        service,
+        operation,
+        timestamp,
+        latency=None,
+        attempts=None,
+        retries_exceeded=False,
+    ):
+        """Monitor event for a single API call
+
+        This event corresponds to a single client method call, which includes
+        every HTTP requests attempt made in order to complete the client call
+
+        :type service: str
+        :param service: A string identifying the service associated to
+            the event
+
+        :type operation: str
+        :param operation: A string identifying the operation of service
+            associated to the event
+
+        :type timestamp: int
+        :param timestamp: Epoch time in milliseconds from when the event began
+
+        :type latency: int
+        :param latency: The time in milliseconds to complete the client call
+
+        :type attempts: list
+        :param attempts: The list of APICallAttempts associated to the
+            APICall
+
+        :type retries_exceeded: bool
+        :param retries_exceeded: True if API call exceeded retries. False
+            otherwise
+        """
+        super().__init__(
+            service=service, operation=operation, timestamp=timestamp
+        )
+        self.latency = latency
+        self.attempts = attempts
+        if attempts is None:
+            self.attempts = []
+        self.retries_exceeded = retries_exceeded
+
+    def new_api_call_attempt(self, timestamp):
+        """Instantiates APICallAttemptEvent associated to the APICallEvent
+
+        :type timestamp: int
+        :param timestamp: Epoch time in milliseconds to associate to the
+            APICallAttemptEvent
+        """
+        attempt_event = APICallAttemptEvent(
+            service=self.service, operation=self.operation, timestamp=timestamp
+        )
+        self.attempts.append(attempt_event)
+        return attempt_event
+
+
+class APICallAttemptEvent(BaseMonitorEvent):
+    def __init__(
+        self,
+        service,
+        operation,
+        timestamp,
+        latency=None,
+        url=None,
+        http_status_code=None,
+        request_headers=None,
+        response_headers=None,
+        parsed_error=None,
+        wire_exception=None,
+    ):
+        """Monitor event for a single API call attempt
+
+        This event corresponds to a single HTTP request attempt in completing
+        the entire client method call.
+
+        :type service: str
+        :param service: A string identifying the service associated to
+            the event
+
+        :type operation: str
+        :param operation: A string identifying the operation of service
+            associated to the event
+
+        :type timestamp: int
+        :param timestamp: Epoch time in milliseconds from when the HTTP request
+            started
+
+        :type latency: int
+        :param latency: The time in milliseconds to complete the HTTP request
+            whether it succeeded or failed
+
+        :type url: str
+        :param url: The URL the attempt was sent to
+
+        :type http_status_code: int
+        :param http_status_code: The HTTP status code of the HTTP response
+            if there was a response
+
+        :type request_headers: dict
+        :param request_headers: The HTTP headers sent in making the HTTP
+            request
+
+        :type response_headers: dict
+        :param response_headers: The HTTP headers returned in the HTTP response
+            if there was a response
+
+        :type parsed_error: dict
+        :param parsed_error: The error parsed if the service returned an
+            error back
+
+        :type wire_exception: Exception
+        :param wire_exception: The exception raised in sending the HTTP
+            request (i.e. ConnectionError)
+        """
+        super().__init__(
+            service=service, operation=operation, timestamp=timestamp
+        )
+        self.latency = latency
+        self.url = url
+        self.http_status_code = http_status_code
+        self.request_headers = request_headers
+        self.response_headers = response_headers
+        self.parsed_error = parsed_error
+        self.wire_exception = wire_exception
+
+
+class CSMSerializer:
+    _MAX_CLIENT_ID_LENGTH = 255
+    _MAX_EXCEPTION_CLASS_LENGTH = 128
+    _MAX_ERROR_CODE_LENGTH = 128
+    _MAX_USER_AGENT_LENGTH = 256
+    _MAX_MESSAGE_LENGTH = 512
+    _RESPONSE_HEADERS_TO_EVENT_ENTRIES = {
+        'x-amzn-requestid': 'XAmznRequestId',
+        'x-amz-request-id': 'XAmzRequestId',
+        'x-amz-id-2': 'XAmzId2',
+    }
+    _AUTH_REGEXS = {
+        'v4': re.compile(
+            r'AWS4-HMAC-SHA256 '
+            r'Credential=(?P<access_key>\w+)/\d+/'
+            r'(?P<signing_region>[a-z0-9-]+)/'
+        ),
+        's3': re.compile(r'AWS (?P<access_key>\w+):'),
+    }
+    _SERIALIZEABLE_EVENT_PROPERTIES = [
+        'service',
+        'operation',
+        'timestamp',
+        'attempts',
+        'latency',
+        'retries_exceeded',
+        'url',
+        'request_headers',
+        'http_status_code',
+        'response_headers',
+        'parsed_error',
+        'wire_exception',
+    ]
+
+    def __init__(self, csm_client_id):
+        """Serializes monitor events to CSM (Client Side Monitoring) format
+
+        :type csm_client_id: str
+        :param csm_client_id: The application identifier to associate
+            to the serialized events
+        """
+        self._validate_client_id(csm_client_id)
+        self.csm_client_id = csm_client_id
+
+    def _validate_client_id(self, csm_client_id):
+        if len(csm_client_id) > self._MAX_CLIENT_ID_LENGTH:
+            raise ValueError(
+                f'The value provided for csm_client_id: {csm_client_id} exceeds '
+                f'the maximum length of {self._MAX_CLIENT_ID_LENGTH} characters'
+            )
+
+    def serialize(self, event):
+        """Serializes a monitor event to the CSM format
+
+        :type event: BaseMonitorEvent
+        :param event: The event to serialize to bytes
+
+        :rtype: bytes
+        :returns: The CSM serialized form of the event
+        """
+        event_dict = self._get_base_event_dict(event)
+        event_type = self._get_event_type(event)
+        event_dict['Type'] = event_type
+        for attr in self._SERIALIZEABLE_EVENT_PROPERTIES:
+            value = getattr(event, attr, None)
+            if value is not None:
+                getattr(self, '_serialize_' + attr)(
+                    value, event_dict, event_type=event_type
+                )
+        return ensure_bytes(json.dumps(event_dict, separators=(',', ':')))
+
+    def _get_base_event_dict(self, event):
+        return {
+            'Version': 1,
+            'ClientId': self.csm_client_id,
+        }
+
+    def _serialize_service(self, service, event_dict, **kwargs):
+        event_dict['Service'] = service
+
+    def _serialize_operation(self, operation, event_dict, **kwargs):
+        event_dict['Api'] = operation
+
+    def _serialize_timestamp(self, timestamp, event_dict, **kwargs):
+        event_dict['Timestamp'] = timestamp
+
+    def _serialize_attempts(self, attempts, event_dict, **kwargs):
+        event_dict['AttemptCount'] = len(attempts)
+        if attempts:
+            self._add_fields_from_last_attempt(event_dict, attempts[-1])
+
+    def _add_fields_from_last_attempt(self, event_dict, last_attempt):
+        if last_attempt.request_headers:
+            # It does not matter which attempt to use to grab the region
+            # for the ApiCall event, but SDKs typically do the last one.
+            region = self._get_region(last_attempt.request_headers)
+            if region is not None:
+                event_dict['Region'] = region
+            event_dict['UserAgent'] = self._get_user_agent(
+                last_attempt.request_headers
+            )
+        if last_attempt.http_status_code is not None:
+            event_dict['FinalHttpStatusCode'] = last_attempt.http_status_code
+        if last_attempt.parsed_error is not None:
+            self._serialize_parsed_error(
+                last_attempt.parsed_error, event_dict, 'ApiCall'
+            )
+        if last_attempt.wire_exception is not None:
+            self._serialize_wire_exception(
+                last_attempt.wire_exception, event_dict, 'ApiCall'
+            )
+
+    def _serialize_latency(self, latency, event_dict, event_type):
+        if event_type == 'ApiCall':
+            event_dict['Latency'] = latency
+        elif event_type == 'ApiCallAttempt':
+            event_dict['AttemptLatency'] = latency
+
+    def _serialize_retries_exceeded(
+        self, retries_exceeded, event_dict, **kwargs
+    ):
+        event_dict['MaxRetriesExceeded'] = 1 if retries_exceeded else 0
+
+    def _serialize_url(self, url, event_dict, **kwargs):
+        event_dict['Fqdn'] = urlparse(url).netloc
+
+    def _serialize_request_headers(
+        self, request_headers, event_dict, **kwargs
+    ):
+        event_dict['UserAgent'] = self._get_user_agent(request_headers)
+        if self._is_signed(request_headers):
+            event_dict['AccessKey'] = self._get_access_key(request_headers)
+        region = self._get_region(request_headers)
+        if region is not None:
+            event_dict['Region'] = region
+        if 'X-Amz-Security-Token' in request_headers:
+            event_dict['SessionToken'] = request_headers[
+                'X-Amz-Security-Token'
+            ]
+
+    def _serialize_http_status_code(
+        self, http_status_code, event_dict, **kwargs
+    ):
+        event_dict['HttpStatusCode'] = http_status_code
+
+    def _serialize_response_headers(
+        self, response_headers, event_dict, **kwargs
+    ):
+        for header, entry in self._RESPONSE_HEADERS_TO_EVENT_ENTRIES.items():
+            if header in response_headers:
+                event_dict[entry] = response_headers[header]
+
+    def _serialize_parsed_error(
+        self, parsed_error, event_dict, event_type, **kwargs
+    ):
+        field_prefix = 'Final' if event_type == 'ApiCall' else ''
+        event_dict[field_prefix + 'AwsException'] = self._truncate(
+            parsed_error['Code'], self._MAX_ERROR_CODE_LENGTH
+        )
+        event_dict[field_prefix + 'AwsExceptionMessage'] = self._truncate(
+            parsed_error['Message'], self._MAX_MESSAGE_LENGTH
+        )
+
+    def _serialize_wire_exception(
+        self, wire_exception, event_dict, event_type, **kwargs
+    ):
+        field_prefix = 'Final' if event_type == 'ApiCall' else ''
+        event_dict[field_prefix + 'SdkException'] = self._truncate(
+            wire_exception.__class__.__name__, self._MAX_EXCEPTION_CLASS_LENGTH
+        )
+        event_dict[field_prefix + 'SdkExceptionMessage'] = self._truncate(
+            str(wire_exception), self._MAX_MESSAGE_LENGTH
+        )
+
+    def _get_event_type(self, event):
+        if isinstance(event, APICallEvent):
+            return 'ApiCall'
+        elif isinstance(event, APICallAttemptEvent):
+            return 'ApiCallAttempt'
+
+    def _get_access_key(self, request_headers):
+        auth_val = self._get_auth_value(request_headers)
+        _, auth_match = self._get_auth_match(auth_val)
+        return auth_match.group('access_key')
+
+    def _get_region(self, request_headers):
+        if not self._is_signed(request_headers):
+            return None
+        auth_val = self._get_auth_value(request_headers)
+        signature_version, auth_match = self._get_auth_match(auth_val)
+        if signature_version != 'v4':
+            return None
+        return auth_match.group('signing_region')
+
+    def _get_user_agent(self, request_headers):
+        return self._truncate(
+            ensure_unicode(request_headers.get('User-Agent', '')),
+            self._MAX_USER_AGENT_LENGTH,
+        )
+
+    def _is_signed(self, request_headers):
+        return 'Authorization' in request_headers
+
+    def _get_auth_value(self, request_headers):
+        return ensure_unicode(request_headers['Authorization'])
+
+    def _get_auth_match(self, auth_val):
+        for signature_version, regex in self._AUTH_REGEXS.items():
+            match = regex.match(auth_val)
+            if match:
+                return signature_version, match
+        return None, None
+
+    def _truncate(self, text, max_length):
+        if len(text) > max_length:
+            logger.debug(
+                'Truncating following value to maximum length of ' '%s: %s',
+                text,
+                max_length,
+            )
+            return text[:max_length]
+        return text
+
+
+class SocketPublisher:
+    _MAX_MONITOR_EVENT_LENGTH = 8 * 1024
+
+    def __init__(self, socket, host, port, serializer):
+        """Publishes monitor events to a socket
+
+        :type socket: socket.socket
+        :param socket: The socket object to use to publish events
+
+        :type host: string
+        :param host: The host to send events to
+
+        :type port: integer
+        :param port: The port on the host to send events to
+
+        :param serializer: The serializer to use to serialize the event
+            to a form that can be published to the socket. This must
+            have a `serialize()` method that accepts a monitor event
+            and return bytes
+        """
+        self._socket = socket
+        self._address = (host, port)
+        self._serializer = serializer
+
+    def publish(self, event):
+        """Publishes a specified monitor event
+
+        :type event: BaseMonitorEvent
+        :param event: The monitor event to be sent
+            over the publisher's socket to the desired address.
+        """
+        serialized_event = self._serializer.serialize(event)
+        if len(serialized_event) > self._MAX_MONITOR_EVENT_LENGTH:
+            logger.debug(
+                'Serialized event of size %s exceeds the maximum length '
+                'allowed: %s. Not sending event to socket.',
+                len(serialized_event),
+                self._MAX_MONITOR_EVENT_LENGTH,
+            )
+            return
+        self._socket.sendto(serialized_event, self._address)