aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/botocore/validate.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/validate.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/validate.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/validate.py384
1 files changed, 384 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/validate.py b/.venv/lib/python3.12/site-packages/botocore/validate.py
new file mode 100644
index 00000000..82aabd66
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/validate.py
@@ -0,0 +1,384 @@
+"""User input parameter validation.
+
+This module handles user input parameter validation
+against a provided input model.
+
+Note that the objects in this module do *not* mutate any
+arguments. No type version happens here. It is up to another
+layer to properly convert arguments to any required types.
+
+Validation Errors
+-----------------
+
+
+"""
+
+import decimal
+import json
+from datetime import datetime
+
+from botocore.exceptions import ParamValidationError
+from botocore.utils import is_json_value_header, parse_to_aware_datetime
+
+
+def validate_parameters(params, shape):
+ """Validates input parameters against a schema.
+
+ This is a convenience function that validates parameters against a schema.
+ You can also instantiate and use the ParamValidator class directly if you
+ want more control.
+
+ If there are any validation errors then a ParamValidationError
+ will be raised. If there are no validation errors than no exception
+ is raised and a value of None is returned.
+
+ :param params: The user provided input parameters.
+
+ :type shape: botocore.model.Shape
+ :param shape: The schema which the input parameters should
+ adhere to.
+
+ :raise: ParamValidationError
+
+ """
+ validator = ParamValidator()
+ report = validator.validate(params, shape)
+ if report.has_errors():
+ raise ParamValidationError(report=report.generate_report())
+
+
+def type_check(valid_types):
+ def _create_type_check_guard(func):
+ def _on_passes_type_check(self, param, shape, errors, name):
+ if _type_check(param, errors, name):
+ return func(self, param, shape, errors, name)
+
+ def _type_check(param, errors, name):
+ if not isinstance(param, valid_types):
+ valid_type_names = [str(t) for t in valid_types]
+ errors.report(
+ name,
+ 'invalid type',
+ param=param,
+ valid_types=valid_type_names,
+ )
+ return False
+ return True
+
+ return _on_passes_type_check
+
+ return _create_type_check_guard
+
+
+def range_check(name, value, shape, error_type, errors):
+ failed = False
+ min_allowed = float('-inf')
+ if 'min' in shape.metadata:
+ min_allowed = shape.metadata['min']
+ if value < min_allowed:
+ failed = True
+ elif hasattr(shape, 'serialization'):
+ # Members that can be bound to the host have an implicit min of 1
+ if shape.serialization.get('hostLabel'):
+ min_allowed = 1
+ if value < min_allowed:
+ failed = True
+ if failed:
+ errors.report(name, error_type, param=value, min_allowed=min_allowed)
+
+
+class ValidationErrors:
+ def __init__(self):
+ self._errors = []
+
+ def has_errors(self):
+ if self._errors:
+ return True
+ return False
+
+ def generate_report(self):
+ error_messages = []
+ for error in self._errors:
+ error_messages.append(self._format_error(error))
+ return '\n'.join(error_messages)
+
+ def _format_error(self, error):
+ error_type, name, additional = error
+ name = self._get_name(name)
+ if error_type == 'missing required field':
+ return (
+ f"Missing required parameter in {name}: "
+ f"\"{additional['required_name']}\""
+ )
+ elif error_type == 'unknown field':
+ unknown_param = additional['unknown_param']
+ valid_names = ', '.join(additional['valid_names'])
+ return (
+ f'Unknown parameter in {name}: "{unknown_param}", '
+ f'must be one of: {valid_names}'
+ )
+ elif error_type == 'invalid type':
+ param = additional['param']
+ param_type = type(param)
+ valid_types = ', '.join(additional['valid_types'])
+ return (
+ f'Invalid type for parameter {name}, value: {param}, '
+ f'type: {param_type}, valid types: {valid_types}'
+ )
+ elif error_type == 'invalid range':
+ param = additional['param']
+ min_allowed = additional['min_allowed']
+ return (
+ f'Invalid value for parameter {name}, value: {param}, '
+ f'valid min value: {min_allowed}'
+ )
+ elif error_type == 'invalid length':
+ param = additional['param']
+ min_allowed = additional['min_allowed']
+ return (
+ f'Invalid length for parameter {name}, value: {param}, '
+ f'valid min length: {min_allowed}'
+ )
+ elif error_type == 'unable to encode to json':
+ return 'Invalid parameter {} must be json serializable: {}'.format(
+ name,
+ additional['type_error'],
+ )
+ elif error_type == 'invalid type for document':
+ param = additional['param']
+ param_type = type(param)
+ valid_types = ', '.join(additional['valid_types'])
+ return (
+ f'Invalid type for document parameter {name}, value: {param}, '
+ f'type: {param_type}, valid types: {valid_types}'
+ )
+ elif error_type == 'more than one input':
+ members = ', '.join(additional['members'])
+ return (
+ f'Invalid number of parameters set for tagged union structure '
+ f'{name}. Can only set one of the following keys: '
+ f'{members}.'
+ )
+ elif error_type == 'empty input':
+ members = ', '.join(additional['members'])
+ return (
+ f'Must set one of the following keys for tagged union'
+ f'structure {name}: {members}.'
+ )
+
+ def _get_name(self, name):
+ if not name:
+ return 'input'
+ elif name.startswith('.'):
+ return name[1:]
+ else:
+ return name
+
+ def report(self, name, reason, **kwargs):
+ self._errors.append((reason, name, kwargs))
+
+
+class ParamValidator:
+ """Validates parameters against a shape model."""
+
+ def validate(self, params, shape):
+ """Validate parameters against a shape model.
+
+ This method will validate the parameters against a provided shape model.
+ All errors will be collected before returning to the caller. This means
+ that this method will not stop at the first error, it will return all
+ possible errors.
+
+ :param params: User provided dict of parameters
+ :param shape: A shape model describing the expected input.
+
+ :return: A list of errors.
+
+ """
+ errors = ValidationErrors()
+ self._validate(params, shape, errors, name='')
+ return errors
+
+ def _check_special_validation_cases(self, shape):
+ if is_json_value_header(shape):
+ return self._validate_jsonvalue_string
+ if shape.type_name == 'structure' and shape.is_document_type:
+ return self._validate_document
+
+ def _validate(self, params, shape, errors, name):
+ special_validator = self._check_special_validation_cases(shape)
+ if special_validator:
+ special_validator(params, shape, errors, name)
+ else:
+ getattr(self, f'_validate_{shape.type_name}')(
+ params, shape, errors, name
+ )
+
+ def _validate_jsonvalue_string(self, params, shape, errors, name):
+ # Check to see if a value marked as a jsonvalue can be dumped to
+ # a json string.
+ try:
+ json.dumps(params)
+ except (ValueError, TypeError) as e:
+ errors.report(name, 'unable to encode to json', type_error=e)
+
+ def _validate_document(self, params, shape, errors, name):
+ if params is None:
+ return
+
+ if isinstance(params, dict):
+ for key in params:
+ self._validate_document(params[key], shape, errors, key)
+ elif isinstance(params, list):
+ for index, entity in enumerate(params):
+ self._validate_document(
+ entity, shape, errors, '%s[%d]' % (name, index)
+ )
+ elif not isinstance(params, ((str,), int, bool, float)):
+ valid_types = (str, int, bool, float, list, dict)
+ valid_type_names = [str(t) for t in valid_types]
+ errors.report(
+ name,
+ 'invalid type for document',
+ param=params,
+ param_type=type(params),
+ valid_types=valid_type_names,
+ )
+
+ @type_check(valid_types=(dict,))
+ def _validate_structure(self, params, shape, errors, name):
+ if shape.is_tagged_union:
+ if len(params) == 0:
+ errors.report(name, 'empty input', members=shape.members)
+ elif len(params) > 1:
+ errors.report(
+ name, 'more than one input', members=shape.members
+ )
+
+ # Validate required fields.
+ for required_member in shape.metadata.get('required', []):
+ if required_member not in params:
+ errors.report(
+ name,
+ 'missing required field',
+ required_name=required_member,
+ user_params=params,
+ )
+ members = shape.members
+ known_params = []
+ # Validate known params.
+ for param in params:
+ if param not in members:
+ errors.report(
+ name,
+ 'unknown field',
+ unknown_param=param,
+ valid_names=list(members),
+ )
+ else:
+ known_params.append(param)
+ # Validate structure members.
+ for param in known_params:
+ self._validate(
+ params[param],
+ shape.members[param],
+ errors,
+ f'{name}.{param}',
+ )
+
+ @type_check(valid_types=(str,))
+ def _validate_string(self, param, shape, errors, name):
+ # Validate range. For a string, the min/max constraints
+ # are of the string length.
+ # Looks like:
+ # "WorkflowId":{
+ # "type":"string",
+ # "min":1,
+ # "max":256
+ # }
+ range_check(name, len(param), shape, 'invalid length', errors)
+
+ @type_check(valid_types=(list, tuple))
+ def _validate_list(self, param, shape, errors, name):
+ member_shape = shape.member
+ range_check(name, len(param), shape, 'invalid length', errors)
+ for i, item in enumerate(param):
+ self._validate(item, member_shape, errors, f'{name}[{i}]')
+
+ @type_check(valid_types=(dict,))
+ def _validate_map(self, param, shape, errors, name):
+ key_shape = shape.key
+ value_shape = shape.value
+ for key, value in param.items():
+ self._validate(key, key_shape, errors, f"{name} (key: {key})")
+ self._validate(value, value_shape, errors, f'{name}.{key}')
+
+ @type_check(valid_types=(int,))
+ def _validate_integer(self, param, shape, errors, name):
+ range_check(name, param, shape, 'invalid range', errors)
+
+ def _validate_blob(self, param, shape, errors, name):
+ if isinstance(param, (bytes, bytearray, str)):
+ return
+ elif hasattr(param, 'read'):
+ # File like objects are also allowed for blob types.
+ return
+ else:
+ errors.report(
+ name,
+ 'invalid type',
+ param=param,
+ valid_types=[str(bytes), str(bytearray), 'file-like object'],
+ )
+
+ @type_check(valid_types=(bool,))
+ def _validate_boolean(self, param, shape, errors, name):
+ pass
+
+ @type_check(valid_types=(float, decimal.Decimal) + (int,))
+ def _validate_double(self, param, shape, errors, name):
+ range_check(name, param, shape, 'invalid range', errors)
+
+ _validate_float = _validate_double
+
+ @type_check(valid_types=(int,))
+ def _validate_long(self, param, shape, errors, name):
+ range_check(name, param, shape, 'invalid range', errors)
+
+ def _validate_timestamp(self, param, shape, errors, name):
+ # We don't use @type_check because datetimes are a bit
+ # more flexible. You can either provide a datetime
+ # object, or a string that parses to a datetime.
+ is_valid_type = self._type_check_datetime(param)
+ if not is_valid_type:
+ valid_type_names = [str(datetime), 'timestamp-string']
+ errors.report(
+ name, 'invalid type', param=param, valid_types=valid_type_names
+ )
+
+ def _type_check_datetime(self, value):
+ try:
+ parse_to_aware_datetime(value)
+ return True
+ except (TypeError, ValueError, AttributeError):
+ # Yes, dateutil can sometimes raise an AttributeError
+ # when parsing timestamps.
+ return False
+
+
+class ParamValidationDecorator:
+ def __init__(self, param_validator, serializer):
+ self._param_validator = param_validator
+ self._serializer = serializer
+
+ def serialize_to_request(self, parameters, operation_model):
+ input_shape = operation_model.input_shape
+ if input_shape is not None:
+ report = self._param_validator.validate(
+ parameters, operation_model.input_shape
+ )
+ if report.has_errors():
+ raise ParamValidationError(report=report.generate_report())
+ return self._serializer.serialize_to_request(
+ parameters, operation_model
+ )