aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/botocore/endpoint_provider.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/botocore/endpoint_provider.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/botocore/endpoint_provider.py')
-rw-r--r--.venv/lib/python3.12/site-packages/botocore/endpoint_provider.py723
1 files changed, 723 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/botocore/endpoint_provider.py b/.venv/lib/python3.12/site-packages/botocore/endpoint_provider.py
new file mode 100644
index 00000000..d76f9ac2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/botocore/endpoint_provider.py
@@ -0,0 +1,723 @@
+# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"). You
+# may not use this file except in compliance with the License. A copy of
+# the License is located at
+#
+# http://aws.amazon.com/apache2.0/
+#
+# or in the "license" file accompanying this file. This file is
+# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+# ANY KIND, either express or implied. See the License for the specific
+# language governing permissions and limitations under the License.
+
+"""
+NOTE: All classes and functions in this module are considered private and are
+subject to abrupt breaking changes. Please do not use them directly.
+
+To view the raw JSON that the objects in this module represent, please
+go to any `endpoint-rule-set.json` file in /botocore/data/<service>/<api version>/
+or you can look at the test files in /tests/unit/data/endpoints/valid-rules/
+"""
+
+import logging
+import re
+from enum import Enum
+from string import Formatter
+from typing import NamedTuple
+
+from botocore import xform_name
+from botocore.compat import IPV4_RE, quote, urlparse
+from botocore.exceptions import EndpointResolutionError
+from botocore.utils import (
+ ArnParser,
+ InvalidArnException,
+ is_valid_ipv4_endpoint_url,
+ is_valid_ipv6_endpoint_url,
+ lru_cache_weakref,
+ normalize_url_path,
+ percent_encode,
+)
+
+logger = logging.getLogger(__name__)
+
+TEMPLATE_STRING_RE = re.compile(r"\{[a-zA-Z#]+\}")
+GET_ATTR_RE = re.compile(r"(\w*)\[(\d+)\]")
+VALID_HOST_LABEL_RE = re.compile(
+ r"^(?!-)[a-zA-Z\d-]{1,63}(?<!-)$",
+)
+CACHE_SIZE = 100
+ARN_PARSER = ArnParser()
+STRING_FORMATTER = Formatter()
+
+
+class RuleSetStandardLibrary:
+ """Rule actions to be performed by the EndpointProvider."""
+
+ def __init__(self, partitions_data):
+ self.partitions_data = partitions_data
+
+ def is_func(self, argument):
+ """Determine if an object is a function object.
+
+ :type argument: Any
+ :rtype: bool
+ """
+ return isinstance(argument, dict) and "fn" in argument
+
+ def is_ref(self, argument):
+ """Determine if an object is a reference object.
+
+ :type argument: Any
+ :rtype: bool
+ """
+ return isinstance(argument, dict) and "ref" in argument
+
+ def is_template(self, argument):
+ """Determine if an object contains a template string.
+
+ :type argument: Any
+ :rtpe: bool
+ """
+ return (
+ isinstance(argument, str)
+ and TEMPLATE_STRING_RE.search(argument) is not None
+ )
+
+ def resolve_template_string(self, value, scope_vars):
+ """Resolve and inject values into a template string.
+
+ :type value: str
+ :type scope_vars: dict
+ :rtype: str
+ """
+ result = ""
+ for literal, reference, _, _ in STRING_FORMATTER.parse(value):
+ if reference is not None:
+ template_value = scope_vars
+ template_params = reference.split("#")
+ for param in template_params:
+ template_value = template_value[param]
+ result += f"{literal}{template_value}"
+ else:
+ result += literal
+ return result
+
+ def resolve_value(self, value, scope_vars):
+ """Return evaluated value based on type.
+
+ :type value: Any
+ :type scope_vars: dict
+ :rtype: Any
+ """
+ if self.is_func(value):
+ return self.call_function(value, scope_vars)
+ elif self.is_ref(value):
+ return scope_vars.get(value["ref"])
+ elif self.is_template(value):
+ return self.resolve_template_string(value, scope_vars)
+
+ return value
+
+ def convert_func_name(self, value):
+ """Normalize function names.
+
+ :type value: str
+ :rtype: str
+ """
+ normalized_name = f"{xform_name(value)}"
+ if normalized_name == "not":
+ normalized_name = f"_{normalized_name}"
+ return normalized_name.replace(".", "_")
+
+ def call_function(self, func_signature, scope_vars):
+ """Call the function with the resolved arguments and assign to `scope_vars`
+ when applicable.
+
+ :type func_signature: dict
+ :type scope_vars: dict
+ :rtype: Any
+ """
+ func_args = [
+ self.resolve_value(arg, scope_vars)
+ for arg in func_signature["argv"]
+ ]
+ func_name = self.convert_func_name(func_signature["fn"])
+ func = getattr(self, func_name)
+ result = func(*func_args)
+ if "assign" in func_signature:
+ assign = func_signature["assign"]
+ if assign in scope_vars:
+ raise EndpointResolutionError(
+ msg=f"Assignment {assign} already exists in "
+ "scoped variables and cannot be overwritten"
+ )
+ scope_vars[assign] = result
+ return result
+
+ def is_set(self, value):
+ """Evaluates whether a value is set.
+
+ :type value: Any
+ :rytpe: bool
+ """
+ return value is not None
+
+ def get_attr(self, value, path):
+ """Find an attribute within a value given a path string. The path can contain
+ the name of the attribute and an index in brackets. A period separating attribute
+ names indicates the one to the right is nested. The index will always occur at
+ the end of the path.
+
+ :type value: dict or tuple
+ :type path: str
+ :rtype: Any
+ """
+ for part in path.split("."):
+ match = GET_ATTR_RE.search(part)
+ if match is not None:
+ name, index = match.groups()
+ index = int(index)
+ if name:
+ value = value.get(name)
+ if value is None or index >= len(value):
+ return None
+ return value[index]
+ else:
+ value = value[part]
+ return value
+
+ def format_partition_output(self, partition):
+ output = partition["outputs"]
+ output["name"] = partition["id"]
+ return output
+
+ def is_partition_match(self, region, partition):
+ matches_regex = re.match(partition["regionRegex"], region) is not None
+ return region in partition["regions"] or matches_regex
+
+ def aws_partition(self, value):
+ """Match a region string to an AWS partition.
+
+ :type value: str
+ :rtype: dict
+ """
+ partitions = self.partitions_data['partitions']
+
+ if value is not None:
+ for partition in partitions:
+ if self.is_partition_match(value, partition):
+ return self.format_partition_output(partition)
+
+ # return the default partition if no matches were found
+ aws_partition = partitions[0]
+ return self.format_partition_output(aws_partition)
+
+ def aws_parse_arn(self, value):
+ """Parse and validate string for ARN components.
+
+ :type value: str
+ :rtype: dict
+ """
+ if value is None or not value.startswith("arn:"):
+ return None
+
+ try:
+ arn_dict = ARN_PARSER.parse_arn(value)
+ except InvalidArnException:
+ return None
+
+ # partition, resource, and service are required
+ if not all(
+ (arn_dict["partition"], arn_dict["service"], arn_dict["resource"])
+ ):
+ return None
+
+ arn_dict["accountId"] = arn_dict.pop("account")
+
+ resource = arn_dict.pop("resource")
+ arn_dict["resourceId"] = resource.replace(":", "/").split("/")
+
+ return arn_dict
+
+ def is_valid_host_label(self, value, allow_subdomains):
+ """Evaluates whether a value is a valid host label per
+ RFC 1123. If allow_subdomains is True, split on `.` and validate
+ each component separately.
+
+ :type value: str
+ :type allow_subdomains: bool
+ :rtype: bool
+ """
+ if value is None or allow_subdomains is False and value.count(".") > 0:
+ return False
+
+ if allow_subdomains is True:
+ return all(
+ self.is_valid_host_label(label, False)
+ for label in value.split(".")
+ )
+
+ return VALID_HOST_LABEL_RE.match(value) is not None
+
+ def string_equals(self, value1, value2):
+ """Evaluates two string values for equality.
+
+ :type value1: str
+ :type value2: str
+ :rtype: bool
+ """
+ if not all(isinstance(val, str) for val in (value1, value2)):
+ msg = f"Both values must be strings, not {type(value1)} and {type(value2)}."
+ raise EndpointResolutionError(msg=msg)
+ return value1 == value2
+
+ def uri_encode(self, value):
+ """Perform percent-encoding on an input string.
+
+ :type value: str
+ :rytpe: str
+ """
+ if value is None:
+ return None
+
+ return percent_encode(value)
+
+ def parse_url(self, value):
+ """Parse a URL string into components.
+
+ :type value: str
+ :rtype: dict
+ """
+ if value is None:
+ return None
+
+ url_components = urlparse(value)
+ try:
+ # url_parse may assign non-integer values to
+ # `port` and will fail when accessed.
+ url_components.port
+ except ValueError:
+ return None
+
+ scheme = url_components.scheme
+ query = url_components.query
+ # URLs with queries are not supported
+ if scheme not in ("https", "http") or len(query) > 0:
+ return None
+
+ path = url_components.path
+ normalized_path = quote(normalize_url_path(path))
+ if not normalized_path.endswith("/"):
+ normalized_path = f"{normalized_path}/"
+
+ return {
+ "scheme": scheme,
+ "authority": url_components.netloc,
+ "path": path,
+ "normalizedPath": normalized_path,
+ "isIp": is_valid_ipv4_endpoint_url(value)
+ or is_valid_ipv6_endpoint_url(value),
+ }
+
+ def boolean_equals(self, value1, value2):
+ """Evaluates two boolean values for equality.
+
+ :type value1: bool
+ :type value2: bool
+ :rtype: bool
+ """
+ if not all(isinstance(val, bool) for val in (value1, value2)):
+ msg = f"Both arguments must be bools, not {type(value1)} and {type(value2)}."
+ raise EndpointResolutionError(msg=msg)
+ return value1 is value2
+
+ def is_ascii(self, value):
+ """Evaluates if a string only contains ASCII characters.
+
+ :type value: str
+ :rtype: bool
+ """
+ try:
+ value.encode("ascii")
+ return True
+ except UnicodeEncodeError:
+ return False
+
+ def substring(self, value, start, stop, reverse):
+ """Computes a substring given the start index and end index. If `reverse` is
+ True, slice the string from the end instead.
+
+ :type value: str
+ :type start: int
+ :type end: int
+ :type reverse: bool
+ :rtype: str
+ """
+ if not isinstance(value, str):
+ msg = f"Input must be a string, not {type(value)}."
+ raise EndpointResolutionError(msg=msg)
+ if start >= stop or len(value) < stop or not self.is_ascii(value):
+ return None
+
+ if reverse is True:
+ r_start = len(value) - stop
+ r_stop = len(value) - start
+ return value[r_start:r_stop]
+
+ return value[start:stop]
+
+ def _not(self, value):
+ """A function implementation of the logical operator `not`.
+
+ :type value: Any
+ :rtype: bool
+ """
+ return not value
+
+ def aws_is_virtual_hostable_s3_bucket(self, value, allow_subdomains):
+ """Evaluates whether a value is a valid bucket name for virtual host
+ style bucket URLs. To pass, the value must meet the following criteria:
+ 1. is_valid_host_label(value) is True
+ 2. length between 3 and 63 characters (inclusive)
+ 3. does not contain uppercase characters
+ 4. is not formatted as an IP address
+
+ If allow_subdomains is True, split on `.` and validate
+ each component separately.
+
+ :type value: str
+ :type allow_subdomains: bool
+ :rtype: bool
+ """
+ if (
+ value is None
+ or len(value) < 3
+ or value.lower() != value
+ or IPV4_RE.match(value) is not None
+ ):
+ return False
+
+ return self.is_valid_host_label(
+ value, allow_subdomains=allow_subdomains
+ )
+
+
+# maintains backwards compatibility as `Library` was misspelled
+# in earlier versions
+RuleSetStandardLibary = RuleSetStandardLibrary
+
+
+class BaseRule:
+ """Base interface for individual endpoint rules."""
+
+ def __init__(self, conditions, documentation=None):
+ self.conditions = conditions
+ self.documentation = documentation
+
+ def evaluate(self, scope_vars, rule_lib):
+ raise NotImplementedError()
+
+ def evaluate_conditions(self, scope_vars, rule_lib):
+ """Determine if all conditions in a rule are met.
+
+ :type scope_vars: dict
+ :type rule_lib: RuleSetStandardLibrary
+ :rtype: bool
+ """
+ for func_signature in self.conditions:
+ result = rule_lib.call_function(func_signature, scope_vars)
+ if result is False or result is None:
+ return False
+ return True
+
+
+class RuleSetEndpoint(NamedTuple):
+ """A resolved endpoint object returned by a rule."""
+
+ url: str
+ properties: dict
+ headers: dict
+
+
+class EndpointRule(BaseRule):
+ def __init__(self, endpoint, **kwargs):
+ super().__init__(**kwargs)
+ self.endpoint = endpoint
+
+ def evaluate(self, scope_vars, rule_lib):
+ """Determine if conditions are met to provide a valid endpoint.
+
+ :type scope_vars: dict
+ :rtype: RuleSetEndpoint
+ """
+ if self.evaluate_conditions(scope_vars, rule_lib):
+ url = rule_lib.resolve_value(self.endpoint["url"], scope_vars)
+ properties = self.resolve_properties(
+ self.endpoint.get("properties", {}),
+ scope_vars,
+ rule_lib,
+ )
+ headers = self.resolve_headers(scope_vars, rule_lib)
+ return RuleSetEndpoint(
+ url=url, properties=properties, headers=headers
+ )
+
+ return None
+
+ def resolve_properties(self, properties, scope_vars, rule_lib):
+ """Traverse `properties` attribute, resolving any template strings.
+
+ :type properties: dict/list/str
+ :type scope_vars: dict
+ :type rule_lib: RuleSetStandardLibrary
+ :rtype: dict
+ """
+ if isinstance(properties, list):
+ return [
+ self.resolve_properties(prop, scope_vars, rule_lib)
+ for prop in properties
+ ]
+ elif isinstance(properties, dict):
+ return {
+ key: self.resolve_properties(value, scope_vars, rule_lib)
+ for key, value in properties.items()
+ }
+ elif rule_lib.is_template(properties):
+ return rule_lib.resolve_template_string(properties, scope_vars)
+
+ return properties
+
+ def resolve_headers(self, scope_vars, rule_lib):
+ """Iterate through headers attribute resolving all values.
+
+ :type scope_vars: dict
+ :type rule_lib: RuleSetStandardLibrary
+ :rtype: dict
+ """
+ resolved_headers = {}
+ headers = self.endpoint.get("headers", {})
+
+ for header, values in headers.items():
+ resolved_headers[header] = [
+ rule_lib.resolve_value(item, scope_vars) for item in values
+ ]
+ return resolved_headers
+
+
+class ErrorRule(BaseRule):
+ def __init__(self, error, **kwargs):
+ super().__init__(**kwargs)
+ self.error = error
+
+ def evaluate(self, scope_vars, rule_lib):
+ """If an error rule's conditions are met, raise an error rule.
+
+ :type scope_vars: dict
+ :type rule_lib: RuleSetStandardLibrary
+ :rtype: EndpointResolutionError
+ """
+ if self.evaluate_conditions(scope_vars, rule_lib):
+ error = rule_lib.resolve_value(self.error, scope_vars)
+ raise EndpointResolutionError(msg=error)
+ return None
+
+
+class TreeRule(BaseRule):
+ """A tree rule is non-terminal meaning it will never be returned to a provider.
+ Additionally this means it has no attributes that need to be resolved.
+ """
+
+ def __init__(self, rules, **kwargs):
+ super().__init__(**kwargs)
+ self.rules = [RuleCreator.create(**rule) for rule in rules]
+
+ def evaluate(self, scope_vars, rule_lib):
+ """If a tree rule's conditions are met, iterate its sub-rules
+ and return first result found.
+
+ :type scope_vars: dict
+ :type rule_lib: RuleSetStandardLibrary
+ :rtype: RuleSetEndpoint/EndpointResolutionError
+ """
+ if self.evaluate_conditions(scope_vars, rule_lib):
+ for rule in self.rules:
+ # don't share scope_vars between rules
+ rule_result = rule.evaluate(scope_vars.copy(), rule_lib)
+ if rule_result:
+ return rule_result
+ return None
+
+
+class RuleCreator:
+ endpoint = EndpointRule
+ error = ErrorRule
+ tree = TreeRule
+
+ @classmethod
+ def create(cls, **kwargs):
+ """Create a rule instance from metadata.
+
+ :rtype: TreeRule/EndpointRule/ErrorRule
+ """
+ rule_type = kwargs.pop("type")
+ try:
+ rule_class = getattr(cls, rule_type)
+ except AttributeError:
+ raise EndpointResolutionError(
+ msg=f"Unknown rule type: {rule_type}. A rule must "
+ "be of type tree, endpoint or error."
+ )
+ else:
+ return rule_class(**kwargs)
+
+
+class ParameterType(Enum):
+ """Translation from `type` attribute to native Python type."""
+
+ string = str
+ boolean = bool
+ stringarray = tuple
+
+
+class ParameterDefinition:
+ """The spec of an individual parameter defined in a RuleSet."""
+
+ def __init__(
+ self,
+ name,
+ parameter_type,
+ documentation=None,
+ builtIn=None,
+ default=None,
+ required=None,
+ deprecated=None,
+ ):
+ self.name = name
+ try:
+ self.parameter_type = getattr(
+ ParameterType, parameter_type.lower()
+ ).value
+ except AttributeError:
+ raise EndpointResolutionError(
+ msg=f"Unknown parameter type: {parameter_type}. "
+ "A parameter must be of type string, boolean, or stringarray."
+ )
+ self.documentation = documentation
+ self.builtin = builtIn
+ self.default = default
+ self.required = required
+ self.deprecated = deprecated
+
+ def validate_input(self, value):
+ """Perform base validation on parameter input.
+
+ :type value: Any
+ :raises: EndpointParametersError
+ """
+
+ if not isinstance(value, self.parameter_type):
+ raise EndpointResolutionError(
+ msg=f"Value ({self.name}) is the wrong "
+ f"type. Must be {self.parameter_type}."
+ )
+ if self.deprecated is not None:
+ depr_str = f"{self.name} has been deprecated."
+ msg = self.deprecated.get("message")
+ since = self.deprecated.get("since")
+ if msg:
+ depr_str += f"\n{msg}"
+ if since:
+ depr_str += f"\nDeprecated since {since}."
+ logger.info(depr_str)
+
+ return None
+
+ def process_input(self, value):
+ """Process input against spec, applying default if value is None."""
+ if value is None:
+ if self.default is not None:
+ return self.default
+ if self.required:
+ raise EndpointResolutionError(
+ msg=f"Cannot find value for required parameter {self.name}"
+ )
+ # in all other cases, the parameter will keep the value None
+ else:
+ self.validate_input(value)
+ return value
+
+
+class RuleSet:
+ """Collection of rules to derive a routable service endpoint."""
+
+ def __init__(
+ self, version, parameters, rules, partitions, documentation=None
+ ):
+ self.version = version
+ self.parameters = self._ingest_parameter_spec(parameters)
+ self.rules = [RuleCreator.create(**rule) for rule in rules]
+ self.rule_lib = RuleSetStandardLibrary(partitions)
+ self.documentation = documentation
+
+ def _ingest_parameter_spec(self, parameters):
+ return {
+ name: ParameterDefinition(
+ name,
+ spec["type"],
+ spec.get("documentation"),
+ spec.get("builtIn"),
+ spec.get("default"),
+ spec.get("required"),
+ spec.get("deprecated"),
+ )
+ for name, spec in parameters.items()
+ }
+
+ def process_input_parameters(self, input_params):
+ """Process each input parameter against its spec.
+
+ :type input_params: dict
+ """
+ for name, spec in self.parameters.items():
+ value = spec.process_input(input_params.get(name))
+ if value is not None:
+ input_params[name] = value
+ return None
+
+ def evaluate(self, input_parameters):
+ """Evaluate input parameters against rules returning first match.
+
+ :type input_parameters: dict
+ """
+ self.process_input_parameters(input_parameters)
+ for rule in self.rules:
+ evaluation = rule.evaluate(input_parameters.copy(), self.rule_lib)
+ if evaluation is not None:
+ return evaluation
+ return None
+
+
+class EndpointProvider:
+ """Derives endpoints from a RuleSet for given input parameters."""
+
+ def __init__(self, ruleset_data, partition_data):
+ self.ruleset = RuleSet(**ruleset_data, partitions=partition_data)
+
+ @lru_cache_weakref(maxsize=CACHE_SIZE)
+ def resolve_endpoint(self, **input_parameters):
+ """Match input parameters to a rule.
+
+ :type input_parameters: dict
+ :rtype: RuleSetEndpoint
+ """
+ params_for_error = input_parameters.copy()
+ endpoint = self.ruleset.evaluate(input_parameters)
+ if endpoint is None:
+ param_string = "\n".join(
+ [f"{key}: {value}" for key, value in params_for_error.items()]
+ )
+ raise EndpointResolutionError(
+ msg=f"No endpoint found for parameters:\n{param_string}"
+ )
+ return endpoint