aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/botocore/monitoring.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/monitoring.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/monitoring.py586
1 files changed, 586 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/monitoring.py b/.venv/lib/python3.12/site-packages/botocore/monitoring.py
new file mode 100644
index 00000000..71d72302
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/monitoring.py
@@ -0,0 +1,586 @@
+# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+import json
+import logging
+import re
+import time
+
+from botocore.compat import ensure_bytes, ensure_unicode, urlparse
+from botocore.retryhandler import EXCEPTION_MAP as RETRYABLE_EXCEPTIONS
+
+logger = logging.getLogger(__name__)
+
+
+class Monitor:
+ _EVENTS_TO_REGISTER = [
+ 'before-parameter-build',
+ 'request-created',
+ 'response-received',
+ 'after-call',
+ 'after-call-error',
+ ]
+
+ def __init__(self, adapter, publisher):
+ """Abstraction for monitoring clients API calls
+
+ :param adapter: An adapter that takes event emitter events
+ and produces monitor events
+
+ :param publisher: A publisher for generated monitor events
+ """
+ self._adapter = adapter
+ self._publisher = publisher
+
+ def register(self, event_emitter):
+ """Register an event emitter to the monitor"""
+ for event_to_register in self._EVENTS_TO_REGISTER:
+ event_emitter.register_last(event_to_register, self.capture)
+
+ def capture(self, event_name, **payload):
+ """Captures an incoming event from the event emitter
+
+ It will feed an event emitter event to the monitor's adaptor to create
+ a monitor event and then publish that event to the monitor's publisher.
+ """
+ try:
+ monitor_event = self._adapter.feed(event_name, payload)
+ if monitor_event:
+ self._publisher.publish(monitor_event)
+ except Exception as e:
+ logger.debug(
+ 'Exception %s raised by client monitor in handling event %s',
+ e,
+ event_name,
+ exc_info=True,
+ )
+
+
+class MonitorEventAdapter:
+ def __init__(self, time=time.time):
+ """Adapts event emitter events to produce monitor events
+
+ :type time: callable
+ :param time: A callable that produces the current time
+ """
+ self._time = time
+
+ def feed(self, emitter_event_name, emitter_payload):
+ """Feed an event emitter event to generate a monitor event
+
+ :type emitter_event_name: str
+ :param emitter_event_name: The name of the event emitted
+
+ :type emitter_payload: dict
+ :param emitter_payload: The payload to associated to the event
+ emitted
+
+ :rtype: BaseMonitorEvent
+ :returns: A monitor event based on the event emitter events
+ fired
+ """
+ return self._get_handler(emitter_event_name)(**emitter_payload)
+
+ def _get_handler(self, event_name):
+ return getattr(
+ self, '_handle_' + event_name.split('.')[0].replace('-', '_')
+ )
+
+ def _handle_before_parameter_build(self, model, context, **kwargs):
+ context['current_api_call_event'] = APICallEvent(
+ service=model.service_model.service_id,
+ operation=model.wire_name,
+ timestamp=self._get_current_time(),
+ )
+
+ def _handle_request_created(self, request, **kwargs):
+ context = request.context
+ new_attempt_event = context[
+ 'current_api_call_event'
+ ].new_api_call_attempt(timestamp=self._get_current_time())
+ new_attempt_event.request_headers = request.headers
+ new_attempt_event.url = request.url
+ context['current_api_call_attempt_event'] = new_attempt_event
+
+ def _handle_response_received(
+ self, parsed_response, context, exception, **kwargs
+ ):
+ attempt_event = context.pop('current_api_call_attempt_event')
+ attempt_event.latency = self._get_latency(attempt_event)
+ if parsed_response is not None:
+ attempt_event.http_status_code = parsed_response[
+ 'ResponseMetadata'
+ ]['HTTPStatusCode']
+ attempt_event.response_headers = parsed_response[
+ 'ResponseMetadata'
+ ]['HTTPHeaders']
+ attempt_event.parsed_error = parsed_response.get('Error')
+ else:
+ attempt_event.wire_exception = exception
+ return attempt_event
+
+ def _handle_after_call(self, context, parsed, **kwargs):
+ context['current_api_call_event'].retries_exceeded = parsed[
+ 'ResponseMetadata'
+ ].get('MaxAttemptsReached', False)
+ return self._complete_api_call(context)
+
+ def _handle_after_call_error(self, context, exception, **kwargs):
+ # If the after-call-error was emitted and the error being raised
+ # was a retryable connection error, then the retries must have exceeded
+ # for that exception as this event gets emitted **after** retries
+ # happen.
+ context[
+ 'current_api_call_event'
+ ].retries_exceeded = self._is_retryable_exception(exception)
+ return self._complete_api_call(context)
+
+ def _is_retryable_exception(self, exception):
+ return isinstance(
+ exception, tuple(RETRYABLE_EXCEPTIONS['GENERAL_CONNECTION_ERROR'])
+ )
+
+ def _complete_api_call(self, context):
+ call_event = context.pop('current_api_call_event')
+ call_event.latency = self._get_latency(call_event)
+ return call_event
+
+ def _get_latency(self, event):
+ return self._get_current_time() - event.timestamp
+
+ def _get_current_time(self):
+ return int(self._time() * 1000)
+
+
+class BaseMonitorEvent:
+ def __init__(self, service, operation, timestamp):
+ """Base monitor event
+
+ :type service: str
+ :param service: A string identifying the service associated to
+ the event
+
+ :type operation: str
+ :param operation: A string identifying the operation of service
+ associated to the event
+
+ :type timestamp: int
+ :param timestamp: Epoch time in milliseconds from when the event began
+ """
+ self.service = service
+ self.operation = operation
+ self.timestamp = timestamp
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}({self.__dict__!r})'
+
+ def __eq__(self, other):
+ if isinstance(other, self.__class__):
+ return self.__dict__ == other.__dict__
+ return False
+
+
+class APICallEvent(BaseMonitorEvent):
+ def __init__(
+ self,
+ service,
+ operation,
+ timestamp,
+ latency=None,
+ attempts=None,
+ retries_exceeded=False,
+ ):
+ """Monitor event for a single API call
+
+ This event corresponds to a single client method call, which includes
+ every HTTP requests attempt made in order to complete the client call
+
+ :type service: str
+ :param service: A string identifying the service associated to
+ the event
+
+ :type operation: str
+ :param operation: A string identifying the operation of service
+ associated to the event
+
+ :type timestamp: int
+ :param timestamp: Epoch time in milliseconds from when the event began
+
+ :type latency: int
+ :param latency: The time in milliseconds to complete the client call
+
+ :type attempts: list
+ :param attempts: The list of APICallAttempts associated to the
+ APICall
+
+ :type retries_exceeded: bool
+ :param retries_exceeded: True if API call exceeded retries. False
+ otherwise
+ """
+ super().__init__(
+ service=service, operation=operation, timestamp=timestamp
+ )
+ self.latency = latency
+ self.attempts = attempts
+ if attempts is None:
+ self.attempts = []
+ self.retries_exceeded = retries_exceeded
+
+ def new_api_call_attempt(self, timestamp):
+ """Instantiates APICallAttemptEvent associated to the APICallEvent
+
+ :type timestamp: int
+ :param timestamp: Epoch time in milliseconds to associate to the
+ APICallAttemptEvent
+ """
+ attempt_event = APICallAttemptEvent(
+ service=self.service, operation=self.operation, timestamp=timestamp
+ )
+ self.attempts.append(attempt_event)
+ return attempt_event
+
+
+class APICallAttemptEvent(BaseMonitorEvent):
+ def __init__(
+ self,
+ service,
+ operation,
+ timestamp,
+ latency=None,
+ url=None,
+ http_status_code=None,
+ request_headers=None,
+ response_headers=None,
+ parsed_error=None,
+ wire_exception=None,
+ ):
+ """Monitor event for a single API call attempt
+
+ This event corresponds to a single HTTP request attempt in completing
+ the entire client method call.
+
+ :type service: str
+ :param service: A string identifying the service associated to
+ the event
+
+ :type operation: str
+ :param operation: A string identifying the operation of service
+ associated to the event
+
+ :type timestamp: int
+ :param timestamp: Epoch time in milliseconds from when the HTTP request
+ started
+
+ :type latency: int
+ :param latency: The time in milliseconds to complete the HTTP request
+ whether it succeeded or failed
+
+ :type url: str
+ :param url: The URL the attempt was sent to
+
+ :type http_status_code: int
+ :param http_status_code: The HTTP status code of the HTTP response
+ if there was a response
+
+ :type request_headers: dict
+ :param request_headers: The HTTP headers sent in making the HTTP
+ request
+
+ :type response_headers: dict
+ :param response_headers: The HTTP headers returned in the HTTP response
+ if there was a response
+
+ :type parsed_error: dict
+ :param parsed_error: The error parsed if the service returned an
+ error back
+
+ :type wire_exception: Exception
+ :param wire_exception: The exception raised in sending the HTTP
+ request (i.e. ConnectionError)
+ """
+ super().__init__(
+ service=service, operation=operation, timestamp=timestamp
+ )
+ self.latency = latency
+ self.url = url
+ self.http_status_code = http_status_code
+ self.request_headers = request_headers
+ self.response_headers = response_headers
+ self.parsed_error = parsed_error
+ self.wire_exception = wire_exception
+
+
+class CSMSerializer:
+ _MAX_CLIENT_ID_LENGTH = 255
+ _MAX_EXCEPTION_CLASS_LENGTH = 128
+ _MAX_ERROR_CODE_LENGTH = 128
+ _MAX_USER_AGENT_LENGTH = 256
+ _MAX_MESSAGE_LENGTH = 512
+ _RESPONSE_HEADERS_TO_EVENT_ENTRIES = {
+ 'x-amzn-requestid': 'XAmznRequestId',
+ 'x-amz-request-id': 'XAmzRequestId',
+ 'x-amz-id-2': 'XAmzId2',
+ }
+ _AUTH_REGEXS = {
+ 'v4': re.compile(
+ r'AWS4-HMAC-SHA256 '
+ r'Credential=(?P<access_key>\w+)/\d+/'
+ r'(?P<signing_region>[a-z0-9-]+)/'
+ ),
+ 's3': re.compile(r'AWS (?P<access_key>\w+):'),
+ }
+ _SERIALIZEABLE_EVENT_PROPERTIES = [
+ 'service',
+ 'operation',
+ 'timestamp',
+ 'attempts',
+ 'latency',
+ 'retries_exceeded',
+ 'url',
+ 'request_headers',
+ 'http_status_code',
+ 'response_headers',
+ 'parsed_error',
+ 'wire_exception',
+ ]
+
+ def __init__(self, csm_client_id):
+ """Serializes monitor events to CSM (Client Side Monitoring) format
+
+ :type csm_client_id: str
+ :param csm_client_id: The application identifier to associate
+ to the serialized events
+ """
+ self._validate_client_id(csm_client_id)
+ self.csm_client_id = csm_client_id
+
+ def _validate_client_id(self, csm_client_id):
+ if len(csm_client_id) > self._MAX_CLIENT_ID_LENGTH:
+ raise ValueError(
+ f'The value provided for csm_client_id: {csm_client_id} exceeds '
+ f'the maximum length of {self._MAX_CLIENT_ID_LENGTH} characters'
+ )
+
+ def serialize(self, event):
+ """Serializes a monitor event to the CSM format
+
+ :type event: BaseMonitorEvent
+ :param event: The event to serialize to bytes
+
+ :rtype: bytes
+ :returns: The CSM serialized form of the event
+ """
+ event_dict = self._get_base_event_dict(event)
+ event_type = self._get_event_type(event)
+ event_dict['Type'] = event_type
+ for attr in self._SERIALIZEABLE_EVENT_PROPERTIES:
+ value = getattr(event, attr, None)
+ if value is not None:
+ getattr(self, '_serialize_' + attr)(
+ value, event_dict, event_type=event_type
+ )
+ return ensure_bytes(json.dumps(event_dict, separators=(',', ':')))
+
+ def _get_base_event_dict(self, event):
+ return {
+ 'Version': 1,
+ 'ClientId': self.csm_client_id,
+ }
+
+ def _serialize_service(self, service, event_dict, **kwargs):
+ event_dict['Service'] = service
+
+ def _serialize_operation(self, operation, event_dict, **kwargs):
+ event_dict['Api'] = operation
+
+ def _serialize_timestamp(self, timestamp, event_dict, **kwargs):
+ event_dict['Timestamp'] = timestamp
+
+ def _serialize_attempts(self, attempts, event_dict, **kwargs):
+ event_dict['AttemptCount'] = len(attempts)
+ if attempts:
+ self._add_fields_from_last_attempt(event_dict, attempts[-1])
+
+ def _add_fields_from_last_attempt(self, event_dict, last_attempt):
+ if last_attempt.request_headers:
+ # It does not matter which attempt to use to grab the region
+ # for the ApiCall event, but SDKs typically do the last one.
+ region = self._get_region(last_attempt.request_headers)
+ if region is not None:
+ event_dict['Region'] = region
+ event_dict['UserAgent'] = self._get_user_agent(
+ last_attempt.request_headers
+ )
+ if last_attempt.http_status_code is not None:
+ event_dict['FinalHttpStatusCode'] = last_attempt.http_status_code
+ if last_attempt.parsed_error is not None:
+ self._serialize_parsed_error(
+ last_attempt.parsed_error, event_dict, 'ApiCall'
+ )
+ if last_attempt.wire_exception is not None:
+ self._serialize_wire_exception(
+ last_attempt.wire_exception, event_dict, 'ApiCall'
+ )
+
+ def _serialize_latency(self, latency, event_dict, event_type):
+ if event_type == 'ApiCall':
+ event_dict['Latency'] = latency
+ elif event_type == 'ApiCallAttempt':
+ event_dict['AttemptLatency'] = latency
+
+ def _serialize_retries_exceeded(
+ self, retries_exceeded, event_dict, **kwargs
+ ):
+ event_dict['MaxRetriesExceeded'] = 1 if retries_exceeded else 0
+
+ def _serialize_url(self, url, event_dict, **kwargs):
+ event_dict['Fqdn'] = urlparse(url).netloc
+
+ def _serialize_request_headers(
+ self, request_headers, event_dict, **kwargs
+ ):
+ event_dict['UserAgent'] = self._get_user_agent(request_headers)
+ if self._is_signed(request_headers):
+ event_dict['AccessKey'] = self._get_access_key(request_headers)
+ region = self._get_region(request_headers)
+ if region is not None:
+ event_dict['Region'] = region
+ if 'X-Amz-Security-Token' in request_headers:
+ event_dict['SessionToken'] = request_headers[
+ 'X-Amz-Security-Token'
+ ]
+
+ def _serialize_http_status_code(
+ self, http_status_code, event_dict, **kwargs
+ ):
+ event_dict['HttpStatusCode'] = http_status_code
+
+ def _serialize_response_headers(
+ self, response_headers, event_dict, **kwargs
+ ):
+ for header, entry in self._RESPONSE_HEADERS_TO_EVENT_ENTRIES.items():
+ if header in response_headers:
+ event_dict[entry] = response_headers[header]
+
+ def _serialize_parsed_error(
+ self, parsed_error, event_dict, event_type, **kwargs
+ ):
+ field_prefix = 'Final' if event_type == 'ApiCall' else ''
+ event_dict[field_prefix + 'AwsException'] = self._truncate(
+ parsed_error['Code'], self._MAX_ERROR_CODE_LENGTH
+ )
+ event_dict[field_prefix + 'AwsExceptionMessage'] = self._truncate(
+ parsed_error['Message'], self._MAX_MESSAGE_LENGTH
+ )
+
+ def _serialize_wire_exception(
+ self, wire_exception, event_dict, event_type, **kwargs
+ ):
+ field_prefix = 'Final' if event_type == 'ApiCall' else ''
+ event_dict[field_prefix + 'SdkException'] = self._truncate(
+ wire_exception.__class__.__name__, self._MAX_EXCEPTION_CLASS_LENGTH
+ )
+ event_dict[field_prefix + 'SdkExceptionMessage'] = self._truncate(
+ str(wire_exception), self._MAX_MESSAGE_LENGTH
+ )
+
+ def _get_event_type(self, event):
+ if isinstance(event, APICallEvent):
+ return 'ApiCall'
+ elif isinstance(event, APICallAttemptEvent):
+ return 'ApiCallAttempt'
+
+ def _get_access_key(self, request_headers):
+ auth_val = self._get_auth_value(request_headers)
+ _, auth_match = self._get_auth_match(auth_val)
+ return auth_match.group('access_key')
+
+ def _get_region(self, request_headers):
+ if not self._is_signed(request_headers):
+ return None
+ auth_val = self._get_auth_value(request_headers)
+ signature_version, auth_match = self._get_auth_match(auth_val)
+ if signature_version != 'v4':
+ return None
+ return auth_match.group('signing_region')
+
+ def _get_user_agent(self, request_headers):
+ return self._truncate(
+ ensure_unicode(request_headers.get('User-Agent', '')),
+ self._MAX_USER_AGENT_LENGTH,
+ )
+
+ def _is_signed(self, request_headers):
+ return 'Authorization' in request_headers
+
+ def _get_auth_value(self, request_headers):
+ return ensure_unicode(request_headers['Authorization'])
+
+ def _get_auth_match(self, auth_val):
+ for signature_version, regex in self._AUTH_REGEXS.items():
+ match = regex.match(auth_val)
+ if match:
+ return signature_version, match
+ return None, None
+
+ def _truncate(self, text, max_length):
+ if len(text) > max_length:
+ logger.debug(
+ 'Truncating following value to maximum length of ' '%s: %s',
+ text,
+ max_length,
+ )
+ return text[:max_length]
+ return text
+
+
+class SocketPublisher:
+ _MAX_MONITOR_EVENT_LENGTH = 8 * 1024
+
+ def __init__(self, socket, host, port, serializer):
+ """Publishes monitor events to a socket
+
+ :type socket: socket.socket
+ :param socket: The socket object to use to publish events
+
+ :type host: string
+ :param host: The host to send events to
+
+ :type port: integer
+ :param port: The port on the host to send events to
+
+ :param serializer: The serializer to use to serialize the event
+ to a form that can be published to the socket. This must
+ have a `serialize()` method that accepts a monitor event
+ and return bytes
+ """
+ self._socket = socket
+ self._address = (host, port)
+ self._serializer = serializer
+
+ def publish(self, event):
+ """Publishes a specified monitor event
+
+ :type event: BaseMonitorEvent
+ :param event: The monitor event to be sent
+ over the publisher's socket to the desired address.
+ """
+ serialized_event = self._serializer.serialize(event)
+ if len(serialized_event) > self._MAX_MONITOR_EVENT_LENGTH:
+ logger.debug(
+ 'Serialized event of size %s exceeds the maximum length '
+ 'allowed: %s. Not sending event to socket.',
+ len(serialized_event),
+ self._MAX_MONITOR_EVENT_LENGTH,
+ )
+ return
+ self._socket.sendto(serialized_event, self._address)