diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/core/utils | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/core/utils')
6 files changed, 806 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/core/utils/__init__.py b/.venv/lib/python3.12/site-packages/azure/core/utils/__init__.py new file mode 100644 index 00000000..0e06c1a3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/utils/__init__.py @@ -0,0 +1,35 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +""" + +This `utils` module provides functionality that is intended to be used by developers +building on top of `azure-core`. + +""" +from ._connection_string_parser import parse_connection_string +from ._utils import case_insensitive_dict, CaseInsensitiveDict + +__all__ = ["parse_connection_string", "case_insensitive_dict", "CaseInsensitiveDict"] diff --git a/.venv/lib/python3.12/site-packages/azure/core/utils/_connection_string_parser.py b/.venv/lib/python3.12/site-packages/azure/core/utils/_connection_string_parser.py new file mode 100644 index 00000000..61494b48 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/utils/_connection_string_parser.py @@ -0,0 +1,44 @@ +# coding=utf-8 +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +from typing import Mapping + + +def parse_connection_string(conn_str: str, case_sensitive_keys: bool = False) -> Mapping[str, str]: + """Parses the connection string into a dict of its component parts, with the option of preserving case + of keys, and validates that each key in the connection string has a provided value. If case of keys + is not preserved (ie. `case_sensitive_keys=False`), then a dict with LOWERCASE KEYS will be returned. + + :param str conn_str: String with connection details provided by Azure services. + :param bool case_sensitive_keys: Indicates whether the casing of the keys will be preserved. When `False`(the + default), all keys will be lower-cased. If set to `True`, the original casing of the keys will be preserved. + :rtype: Mapping + :returns: Dict of connection string key/value pairs. + :raises: + ValueError: if each key in conn_str does not have a corresponding value and + for other bad formatting of connection strings - including duplicate + args, bad syntax, etc. + """ + + cs_args = [s.split("=", 1) for s in conn_str.strip().rstrip(";").split(";")] + if any(len(tup) != 2 or not all(tup) for tup in cs_args): + raise ValueError("Connection string is either blank or malformed.") + args_dict = dict(cs_args) + + if len(cs_args) != len(args_dict): + raise ValueError("Connection string is either blank or malformed.") + + if not case_sensitive_keys: + # if duplicate case insensitive keys are passed in, raise error + new_args_dict = {} + for key in args_dict.keys(): # pylint: disable=consider-using-dict-items + new_key = key.lower() + if new_key in new_args_dict: + raise ValueError("Duplicate key in connection string: {}".format(new_key)) + new_args_dict[new_key] = args_dict[key] + return new_args_dict + + return args_dict diff --git a/.venv/lib/python3.12/site-packages/azure/core/utils/_messaging_shared.py b/.venv/lib/python3.12/site-packages/azure/core/utils/_messaging_shared.py new file mode 100644 index 00000000..e282db7e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/utils/_messaging_shared.py @@ -0,0 +1,46 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +# ========================================================================== +# This file contains duplicate code that is shared with azure-eventgrid. +# Both the files should always be identical. +# ========================================================================== + + +import json + + +def _get_json_content(obj): + """Event mixin to have methods that are common to different Event types + like CloudEvent, EventGridEvent etc. + + :param obj: The object to get the JSON content from. + :type obj: any + :return: The JSON content of the object. + :rtype: dict + :raises ValueError if JSON content cannot be loaded from the object + """ + msg = "Failed to load JSON content from the object." + try: + # storage queue + return json.loads(obj.content) + except ValueError as err: + raise ValueError(msg) from err + except AttributeError: + # eventhubs + try: + return json.loads(next(obj.body))[0] + except KeyError: + # servicebus + return json.loads(next(obj.body)) + except ValueError as err: + raise ValueError(msg) from err + except: # pylint: disable=bare-except + try: + return json.loads(obj) + except ValueError as err: + raise ValueError(msg) from err diff --git a/.venv/lib/python3.12/site-packages/azure/core/utils/_pipeline_transport_rest_shared.py b/.venv/lib/python3.12/site-packages/azure/core/utils/_pipeline_transport_rest_shared.py new file mode 100644 index 00000000..4fbd064a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/utils/_pipeline_transport_rest_shared.py @@ -0,0 +1,422 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +from __future__ import absolute_import +from collections.abc import Mapping + +from io import BytesIO +from email.message import Message +from email.policy import HTTP +from email import message_from_bytes as message_parser +import os +from typing import ( + TYPE_CHECKING, + cast, + IO, + Union, + Tuple, + Optional, + Callable, + Type, + Iterator, + List, + Sequence, +) +from http.client import HTTPConnection +from urllib.parse import urlparse + +from ..pipeline import ( + PipelineRequest, + PipelineResponse, + PipelineContext, +) +from ..pipeline._tools import await_result as _await_result + +if TYPE_CHECKING: + # importing both the py3 RestHttpRequest and the fallback RestHttpRequest + from azure.core.rest._rest_py3 import HttpRequest as RestHttpRequestPy3 + from azure.core.pipeline.transport import ( + HttpRequest as PipelineTransportHttpRequest, + ) + + HTTPRequestType = Union[RestHttpRequestPy3, PipelineTransportHttpRequest] + from ..pipeline.policies import SansIOHTTPPolicy + from azure.core.pipeline.transport import ( # pylint: disable=non-abstract-transport-import + HttpResponse as PipelineTransportHttpResponse, + AioHttpTransportResponse as PipelineTransportAioHttpTransportResponse, + ) + from azure.core.pipeline.transport._base import ( + _HttpResponseBase as PipelineTransportHttpResponseBase, + ) + from azure.core.rest._helpers import FilesType, FileType, FileContent + +binary_type = str + + +class BytesIOSocket: + """Mocking the "makefile" of socket for HTTPResponse. + This can be used to create a http.client.HTTPResponse object + based on bytes and not a real socket. + + :param bytes bytes_data: The bytes to use to mock the socket. + """ + + def __init__(self, bytes_data): + self.bytes_data = bytes_data + + def makefile(self, *_): + return BytesIO(self.bytes_data) + + +def _format_parameters_helper(http_request, params): + """Helper for format_parameters. + + Format parameters into a valid query string. + It's assumed all parameters have already been quoted as + valid URL strings. + + :param http_request: The http request whose parameters + we are trying to format + :type http_request: any + :param dict params: A dictionary of parameters. + """ + query = urlparse(http_request.url).query + if query: + http_request.url = http_request.url.partition("?")[0] + existing_params = {p[0]: p[-1] for p in [p.partition("=") for p in query.split("&")]} + params.update(existing_params) + query_params = [] + for k, v in params.items(): + if isinstance(v, list): + for w in v: + if w is None: + raise ValueError("Query parameter {} cannot be None".format(k)) + query_params.append("{}={}".format(k, w)) + else: + if v is None: + raise ValueError("Query parameter {} cannot be None".format(k)) + query_params.append("{}={}".format(k, v)) + query = "?" + "&".join(query_params) + http_request.url = http_request.url + query + + +def _pad_attr_name(attr: str, backcompat_attrs: Sequence[str]) -> str: + """Pad hidden attributes so users can access them. + + Currently, for our backcompat attributes, we define them + as private, so they're hidden from intellisense and sphinx, + but still allow users to access them as public attributes + for backcompat purposes. This function is called so if + users access publicly call a private backcompat attribute, + we can return them the private variable in getattr + + :param str attr: The attribute name + :param list[str] backcompat_attrs: The list of backcompat attributes + :rtype: str + :return: The padded attribute name + """ + return "_{}".format(attr) if attr in backcompat_attrs else attr + + +def _prepare_multipart_body_helper(http_request: "HTTPRequestType", content_index: int = 0) -> int: + """Helper for prepare_multipart_body. + + Will prepare the body of this request according to the multipart information. + + This call assumes the on_request policies have been applied already in their + correct context (sync/async) + + Does nothing if "set_multipart_mixed" was never called. + :param http_request: The http request whose multipart body we are trying + to prepare + :type http_request: any + :param int content_index: The current index of parts within the batch message. + :returns: The updated index after all parts in this request have been added. + :rtype: int + """ + if not http_request.multipart_mixed_info: + return 0 + + requests: Sequence["HTTPRequestType"] = http_request.multipart_mixed_info[0] + boundary: Optional[str] = http_request.multipart_mixed_info[2] + + # Update the main request with the body + main_message = Message() + main_message.add_header("Content-Type", "multipart/mixed") + if boundary: + main_message.set_boundary(boundary) + + for req in requests: + part_message = Message() + if req.multipart_mixed_info: + content_index = req.prepare_multipart_body(content_index=content_index) + part_message.add_header("Content-Type", req.headers["Content-Type"]) + payload = req.serialize() + # We need to remove the ~HTTP/1.1 prefix along with the added content-length + payload = payload[payload.index(b"--") :] + else: + part_message.add_header("Content-Type", "application/http") + part_message.add_header("Content-Transfer-Encoding", "binary") + part_message.add_header("Content-ID", str(content_index)) + payload = req.serialize() + content_index += 1 + part_message.set_payload(payload) + main_message.attach(part_message) + + full_message = main_message.as_bytes(policy=HTTP) + # From "as_bytes" doc: + # Flattening the message may trigger changes to the EmailMessage if defaults need to be filled in to complete + # the transformation to a string (for example, MIME boundaries may be generated or modified). + # After this call, we know `get_boundary` will return a valid boundary and not None. Mypy doesn't know that. + final_boundary: str = cast(str, main_message.get_boundary()) + eol = b"\r\n" + _, _, body = full_message.split(eol, 2) + http_request.set_bytes_body(body) + http_request.headers["Content-Type"] = "multipart/mixed; boundary=" + final_boundary + return content_index + + +class _HTTPSerializer(HTTPConnection): + """Hacking the stdlib HTTPConnection to serialize HTTP request as strings.""" + + def __init__(self, *args, **kwargs): + self.buffer = b"" + kwargs.setdefault("host", "fakehost") + super(_HTTPSerializer, self).__init__(*args, **kwargs) + + def putheader(self, header, *values): + if header in ["Host", "Accept-Encoding"]: + return + super(_HTTPSerializer, self).putheader(header, *values) + + def send(self, data): + self.buffer += data + + +def _serialize_request(http_request: "HTTPRequestType") -> bytes: + """Helper for serialize. + + Serialize a request using the application/http spec/ + + :param http_request: The http request which we are trying + to serialize. + :type http_request: any + :rtype: bytes + :return: The serialized request + """ + if isinstance(http_request.body, dict): + raise TypeError("Cannot serialize an HTTPRequest with dict body.") + serializer = _HTTPSerializer() + serializer.request( + method=http_request.method, + url=http_request.url, + body=http_request.body, + headers=http_request.headers, + ) + return serializer.buffer + + +def _decode_parts_helper( + response: "PipelineTransportHttpResponseBase", + message: Message, + http_response_type: Type["PipelineTransportHttpResponseBase"], + requests: Sequence["PipelineTransportHttpRequest"], + deserialize_response: Callable, +) -> List["PipelineTransportHttpResponse"]: + """Helper for _decode_parts. + + Rebuild an HTTP response from pure string. + + :param response: The response to decode + :type response: ~azure.core.pipeline.transport.HttpResponse + :param message: The message to decode + :type message: ~email.message.Message + :param http_response_type: The type of response to return + :type http_response_type: ~azure.core.pipeline.transport.HttpResponse + :param requests: The requests that were batched together + :type requests: list[~azure.core.pipeline.transport.HttpRequest] + :param deserialize_response: The function to deserialize the response + :type deserialize_response: callable + :rtype: list[~azure.core.pipeline.transport.HttpResponse] + :return: The list of responses + """ + responses = [] + for index, raw_response in enumerate(message.get_payload()): + content_type = raw_response.get_content_type() + if content_type == "application/http": + try: + matching_request = requests[index] + except IndexError: + # If we have no matching request, this could mean that we had an empty batch. + # The request object is only needed to get the HTTP METHOD and to store in the response object, + # so let's just use the parent request so allow the rest of the deserialization to continue. + matching_request = response.request + responses.append( + deserialize_response( + raw_response.get_payload(decode=True), + matching_request, + http_response_type=http_response_type, + ) + ) + elif content_type == "multipart/mixed" and requests[index].multipart_mixed_info: + # The message batch contains one or more change sets + changeset_requests = requests[index].multipart_mixed_info[0] # type: ignore + changeset_responses = response._decode_parts( # pylint: disable=protected-access + raw_response, http_response_type, changeset_requests + ) + responses.extend(changeset_responses) + else: + raise ValueError("Multipart doesn't support part other than application/http for now") + return responses + + +def _get_raw_parts_helper(response, http_response_type: Type): + """Helper for _get_raw_parts + + Assuming this body is multipart, return the iterator or parts. + + If parts are application/http use http_response_type or HttpClientTransportResponse + as envelope. + + :param response: The response to decode + :type response: ~azure.core.pipeline.transport.HttpResponse + :param http_response_type: The type of response to return + :type http_response_type: any + :rtype: iterator[~azure.core.pipeline.transport.HttpResponse] + :return: The parts of the response + """ + body_as_bytes = response.body() + # In order to use email.message parser, I need full HTTP bytes. Faking something to make the parser happy + http_body = b"Content-Type: " + response.content_type.encode("ascii") + b"\r\n\r\n" + body_as_bytes + message: Message = message_parser(http_body) + requests = response.request.multipart_mixed_info[0] + return response._decode_parts(message, http_response_type, requests) # pylint: disable=protected-access + + +def _parts_helper( + response: "PipelineTransportHttpResponse", +) -> Iterator["PipelineTransportHttpResponse"]: + """Assuming the content-type is multipart/mixed, will return the parts as an iterator. + + :param response: The response to decode + :type response: ~azure.core.pipeline.transport.HttpResponse + :rtype: iterator[HttpResponse] + :return: The parts of the response + :raises ValueError: If the content is not multipart/mixed + """ + if not response.content_type or not response.content_type.startswith("multipart/mixed"): + raise ValueError("You can't get parts if the response is not multipart/mixed") + + responses = response._get_raw_parts() # pylint: disable=protected-access + if response.request.multipart_mixed_info: + policies: Sequence["SansIOHTTPPolicy"] = response.request.multipart_mixed_info[1] + + # Apply on_response concurrently to all requests + import concurrent.futures + + def parse_responses(response): + http_request = response.request + context = PipelineContext(None) + pipeline_request = PipelineRequest(http_request, context) + pipeline_response = PipelineResponse(http_request, response, context=context) + + for policy in policies: + _await_result(policy.on_response, pipeline_request, pipeline_response) + + with concurrent.futures.ThreadPoolExecutor() as executor: + # List comprehension to raise exceptions if happened + [ # pylint: disable=expression-not-assigned, unnecessary-comprehension + _ for _ in executor.map(parse_responses, responses) + ] + + return responses + + +def _format_data_helper( + data: "FileType", +) -> Union[Tuple[Optional[str], str], Tuple[Optional[str], "FileContent", str]]: + """Helper for _format_data. + + Format field data according to whether it is a stream or + a string for a form-data request. + + :param data: The request field data. + :type data: str or file-like object. + :rtype: tuple[str, IO, str] or tuple[None, str] + :return: A tuple of (data name, data IO, "application/octet-stream") or (None, data str) + """ + content_type: Optional[str] = None + filename: Optional[str] = None + if isinstance(data, tuple): + if len(data) == 2: + # Filename and file bytes are included + filename, file_bytes = cast(Tuple[Optional[str], "FileContent"], data) + elif len(data) == 3: + # Filename, file object, and content_type are included + filename, file_bytes, content_type = cast(Tuple[Optional[str], "FileContent", str], data) + else: + raise ValueError( + "Unexpected data format. Expected file, or tuple of (filename, file_bytes) or " + "(filename, file_bytes, content_type)." + ) + else: + # here we just get the file content + if hasattr(data, "read"): + data = cast(IO, data) + try: + if data.name[0] != "<" and data.name[-1] != ">": + filename = os.path.basename(data.name) + except (AttributeError, TypeError): + pass + content_type = "application/octet-stream" + file_bytes = data + if content_type: + return (filename, file_bytes, content_type) + return (filename, cast(str, file_bytes)) + + +def _aiohttp_body_helper( + response: "PipelineTransportAioHttpTransportResponse", +) -> bytes: + # pylint: disable=protected-access + """Helper for body method of Aiohttp responses. + + Since aiohttp body methods need decompression work synchronously, + need to share this code across old and new aiohttp transport responses + for backcompat. + + :param response: The response to decode + :type response: ~azure.core.pipeline.transport.AioHttpTransportResponse + :rtype: bytes + :return: The response's bytes + """ + if response._content is None: + raise ValueError("Body is not available. Call async method load_body, or do your call with stream=False.") + if not response._decompress: + return response._content + if response._decompressed_content: + return response._content + enc = response.headers.get("Content-Encoding") + if not enc: + return response._content + enc = enc.lower() + if enc in ("gzip", "deflate"): + import zlib + + zlib_mode = (16 + zlib.MAX_WBITS) if enc == "gzip" else -zlib.MAX_WBITS + decompressor = zlib.decompressobj(wbits=zlib_mode) + response._content = decompressor.decompress(response._content) + response._decompressed_content = True + return response._content + return response._content + + +def get_file_items(files: "FilesType") -> Sequence[Tuple[str, "FileType"]]: + if isinstance(files, Mapping): + # casting because ItemsView technically isn't a Sequence, even + # though realistically it is ordered python 3.7 and after + return cast(Sequence[Tuple[str, "FileType"]], files.items()) + return files diff --git a/.venv/lib/python3.12/site-packages/azure/core/utils/_pipeline_transport_rest_shared_async.py b/.venv/lib/python3.12/site-packages/azure/core/utils/_pipeline_transport_rest_shared_async.py new file mode 100644 index 00000000..997a435c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/utils/_pipeline_transport_rest_shared_async.py @@ -0,0 +1,71 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +import asyncio +from typing import ( + TYPE_CHECKING, + List, + Generic, + TypeVar, + Type, + Optional, + AsyncIterator, + Iterator, +) +from ..pipeline import PipelineContext, PipelineRequest, PipelineResponse +from ..pipeline._tools_async import await_result as _await_result + +if TYPE_CHECKING: + from ..pipeline.policies import SansIOHTTPPolicy + + +HttpResponseType = TypeVar("HttpResponseType") + + +class _PartGenerator(AsyncIterator[HttpResponseType], Generic[HttpResponseType]): + """Until parts is a real async iterator, wrap the sync call. + + :param response: The response to parse + :type response: ~azure.core.pipeline.transport.AsyncHttpResponse + :param default_http_response_type: The default HTTP response type to use + :type default_http_response_type: any + """ + + def __init__(self, response, default_http_response_type: Type[HttpResponseType]) -> None: + self._response = response + self._parts: Optional[Iterator[HttpResponseType]] = None + self._default_http_response_type = default_http_response_type + + async def _parse_response(self) -> Iterator[HttpResponseType]: + responses = self._response._get_raw_parts( # pylint: disable=protected-access + http_response_type=self._default_http_response_type + ) + if self._response.request.multipart_mixed_info: + policies: List["SansIOHTTPPolicy"] = self._response.request.multipart_mixed_info[1] + + async def parse_responses(response): + http_request = response.request + context = PipelineContext(None) + pipeline_request = PipelineRequest(http_request, context) + pipeline_response = PipelineResponse(http_request, response, context=context) + + for policy in policies: + await _await_result(policy.on_response, pipeline_request, pipeline_response) + + # Not happy to make this code asyncio specific, but that's multipart only for now + # If we need trio and multipart, let's reinvesitgate that later + await asyncio.gather(*[parse_responses(res) for res in responses]) + + return responses + + async def __anext__(self) -> HttpResponseType: + if not self._parts: + self._parts = iter(await self._parse_response()) + + try: + return next(self._parts) + except StopIteration: + raise StopAsyncIteration() # pylint: disable=raise-missing-from diff --git a/.venv/lib/python3.12/site-packages/azure/core/utils/_utils.py b/.venv/lib/python3.12/site-packages/azure/core/utils/_utils.py new file mode 100644 index 00000000..c9d09a38 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/core/utils/_utils.py @@ -0,0 +1,188 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +import datetime +import sys +from typing import ( + Any, + AsyncContextManager, + Iterable, + Iterator, + Mapping, + MutableMapping, + Optional, + Tuple, + Union, + Dict, +) +from datetime import timezone + +TZ_UTC = timezone.utc + + +class _FixedOffset(datetime.tzinfo): + """Fixed offset in minutes east from UTC. + + Copy/pasted from Python doc + + :param int offset: offset in minutes + """ + + def __init__(self, offset): + self.__offset = datetime.timedelta(minutes=offset) + + def utcoffset(self, dt): + return self.__offset + + def tzname(self, dt): + return str(self.__offset.total_seconds() / 3600) + + def __repr__(self): + return "<FixedOffset {}>".format(self.tzname(None)) + + def dst(self, dt): + return datetime.timedelta(0) + + +def _convert_to_isoformat(date_time): + """Deserialize a date in RFC 3339 format to datetime object. + Check https://tools.ietf.org/html/rfc3339#section-5.8 for examples. + + :param str date_time: The date in RFC 3339 format. + """ + if not date_time: + return None + if date_time[-1] == "Z": + delta = 0 + timestamp = date_time[:-1] + else: + timestamp = date_time[:-6] + sign, offset = date_time[-6], date_time[-5:] + delta = int(sign + offset[:1]) * 60 + int(sign + offset[-2:]) + + check_decimal = timestamp.split(".") + if len(check_decimal) > 1: + decimal_str = "" + for digit in check_decimal[1]: + if digit.isdigit(): + decimal_str += digit + else: + break + if len(decimal_str) > 6: + timestamp = timestamp.replace(decimal_str, decimal_str[0:6]) + + if delta == 0: + tzinfo = TZ_UTC + else: + tzinfo = timezone(datetime.timedelta(minutes=delta)) + + try: + deserialized = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f") + except ValueError: + deserialized = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S") + + deserialized = deserialized.replace(tzinfo=tzinfo) + return deserialized + + +def case_insensitive_dict( + *args: Optional[Union[Mapping[str, Any], Iterable[Tuple[str, Any]]]], **kwargs: Any +) -> MutableMapping[str, Any]: + """Return a case-insensitive mutable mapping from an inputted mapping structure. + + :param args: The positional arguments to pass to the dict. + :type args: Mapping[str, Any] or Iterable[Tuple[str, Any] + :return: A case-insensitive mutable mapping object. + :rtype: ~collections.abc.MutableMapping + """ + return CaseInsensitiveDict(*args, **kwargs) + + +class CaseInsensitiveDict(MutableMapping[str, Any]): + """ + NOTE: This implementation is heavily inspired from the case insensitive dictionary from the requests library. + Thank you !! + Case insensitive dictionary implementation. + The keys are expected to be strings and will be stored in lower case. + case_insensitive_dict = CaseInsensitiveDict() + case_insensitive_dict['Key'] = 'some_value' + case_insensitive_dict['key'] == 'some_value' #True + + :param data: Initial data to store in the dictionary. + :type data: Mapping[str, Any] or Iterable[Tuple[str, Any]] + """ + + def __init__( + self, data: Optional[Union[Mapping[str, Any], Iterable[Tuple[str, Any]]]] = None, **kwargs: Any + ) -> None: + self._store: Dict[str, Any] = {} + if data is None: + data = {} + + self.update(data, **kwargs) + + def copy(self) -> "CaseInsensitiveDict": + return CaseInsensitiveDict(self._store.values()) + + def __setitem__(self, key: str, value: Any) -> None: + """Set the `key` to `value`. + + The original key will be stored with the value + + :param str key: The key to set. + :param value: The value to set the key to. + :type value: any + """ + self._store[key.lower()] = (key, value) + + def __getitem__(self, key: str) -> Any: + return self._store[key.lower()][1] + + def __delitem__(self, key: str) -> None: + del self._store[key.lower()] + + def __iter__(self) -> Iterator[str]: + return (key for key, _ in self._store.values()) + + def __len__(self) -> int: + return len(self._store) + + def lowerkey_items(self) -> Iterator[Tuple[str, Any]]: + return ((lower_case_key, pair[1]) for lower_case_key, pair in self._store.items()) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, Mapping): + other = CaseInsensitiveDict(other) + else: + return False + + return dict(self.lowerkey_items()) == dict(other.lowerkey_items()) + + def __repr__(self) -> str: + return str(dict(self.items())) + + +def get_running_async_lock() -> AsyncContextManager: + """Get a lock instance from the async library that the current context is running under. + + :return: An instance of the running async library's Lock class. + :rtype: AsyncContextManager + :raises: RuntimeError if the current context is not running under an async library. + """ + + try: + import asyncio + + # Check if we are running in an asyncio event loop. + asyncio.get_running_loop() + return asyncio.Lock() + except RuntimeError as err: + # Otherwise, assume we are running in a trio event loop if it has already been imported. + if "trio" in sys.modules: + import trio # pylint: disable=networking-import-outside-azure-core-transport + + return trio.Lock() + raise RuntimeError("An asyncio or trio event loop is required.") from err |