diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/storage/blob/_deserialize.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/storage/blob/_deserialize.py | 234 |
1 files changed, 234 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_deserialize.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_deserialize.py new file mode 100644 index 00000000..b6ee9160 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_deserialize.py @@ -0,0 +1,234 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING +from urllib.parse import unquote +from xml.etree.ElementTree import Element + +from ._models import ( + BlobAnalyticsLogging, + BlobProperties, + BlobType, + ContainerProperties, + ContentSettings, + CopyProperties, + CorsRule, + ImmutabilityPolicy, + LeaseProperties, + Metrics, + ObjectReplicationPolicy, + ObjectReplicationRule, + RetentionPolicy, + StaticWebsite +) +from ._shared.models import get_enum_value +from ._shared.response_handlers import deserialize_metadata + +if TYPE_CHECKING: + from azure.core.pipeline import PipelineResponse + from ._generated.models import ( + BlobItemInternal, + BlobTags, + PageList, + StorageServiceProperties, + StorageServiceStats, + ) + from ._shared.models import LocationMode + +def deserialize_pipeline_response_into_cls(cls_method, response: "PipelineResponse", obj: Any, headers: Dict[str, Any]): + try: + deserialized_response = response.http_response + except AttributeError: + deserialized_response = response + return cls_method(deserialized_response, obj, headers) + + +def deserialize_blob_properties(response: "PipelineResponse", obj: Any, headers: Dict[str, Any]) -> BlobProperties: + blob_properties = BlobProperties( + metadata=deserialize_metadata(response, obj, headers), + object_replication_source_properties=deserialize_ors_policies(response.http_response.headers), + **headers + ) + if 'Content-Range' in headers: + if 'x-ms-blob-content-md5' in headers: + blob_properties.content_settings.content_md5 = headers['x-ms-blob-content-md5'] + else: + blob_properties.content_settings.content_md5 = None + return blob_properties + + +def deserialize_ors_policies(policy_dictionary: Optional[Dict[str, str]]) -> Optional[List[ObjectReplicationPolicy]]: + + if policy_dictionary is None: + return None + # For source blobs (blobs that have policy ids and rule ids applied to them), + # the header will be formatted as "x-ms-or-<policy_id>_<rule_id>: {Complete, Failed}". + # The value of this header is the status of the replication. + or_policy_status_headers = {key: val for key, val in policy_dictionary.items() + if 'or-' in key and key != 'x-ms-or-policy-id'} + + parsed_result: Dict[str, List[ObjectReplicationRule]] = {} + + for key, val in or_policy_status_headers.items(): + # list blobs gives or-policy_rule and get blob properties gives x-ms-or-policy_rule + policy_and_rule_ids = key.split('or-')[1].split('_') + policy_id = policy_and_rule_ids[0] + rule_id = policy_and_rule_ids[1] + + # If we are seeing this policy for the first time, create a new list to store rule_id -> result + parsed_result[policy_id] = parsed_result.get(policy_id) or [] + parsed_result[policy_id].append(ObjectReplicationRule(rule_id=rule_id, status=val)) + + result_list = [ObjectReplicationPolicy(policy_id=k, rules=v) for k, v in parsed_result.items()] + + return result_list + + +def deserialize_blob_stream( + response: "PipelineResponse", + obj: Any, + headers: Dict[str, Any] +) -> Tuple["LocationMode", Any]: + blob_properties = deserialize_blob_properties(response, obj, headers) + obj.properties = blob_properties + return response.http_response.location_mode, obj + + +def deserialize_container_properties( + response: "PipelineResponse", + obj: Any, + headers: Dict[str, Any] +) -> ContainerProperties: + metadata = deserialize_metadata(response, obj, headers) + container_properties = ContainerProperties( + metadata=metadata, + **headers + ) + return container_properties + + +def get_page_ranges_result(ranges: "PageList") -> Tuple[List[Dict[str, int]], List[Dict[str, int]]]: + page_range = [] + clear_range = [] + if ranges.page_range: + page_range = [{'start': b.start, 'end': b.end} for b in ranges.page_range] + if ranges.clear_range: + clear_range = [{'start': b.start, 'end': b.end} for b in ranges.clear_range] + return page_range, clear_range + + +def service_stats_deserialize(generated: "StorageServiceStats") -> Dict[str, Any]: + status = None + last_sync_time = None + if generated.geo_replication is not None: + status = generated.geo_replication.status + last_sync_time = generated.geo_replication.last_sync_time + return { + 'geo_replication': { + 'status': status, + 'last_sync_time': last_sync_time + } + } + +def service_properties_deserialize(generated: "StorageServiceProperties") -> Dict[str, Any]: + cors_list = None + if generated.cors is not None: + cors_list = [CorsRule._from_generated(cors) for cors in generated.cors] # pylint: disable=protected-access + return { + 'analytics_logging': BlobAnalyticsLogging._from_generated(generated.logging), # pylint: disable=protected-access + 'hour_metrics': Metrics._from_generated(generated.hour_metrics), # pylint: disable=protected-access + 'minute_metrics': Metrics._from_generated(generated.minute_metrics), # pylint: disable=protected-access + 'cors': cors_list, + 'target_version': generated.default_service_version, + 'delete_retention_policy': RetentionPolicy._from_generated(generated.delete_retention_policy), # pylint: disable=protected-access + 'static_website': StaticWebsite._from_generated(generated.static_website), # pylint: disable=protected-access + } + + +def get_blob_properties_from_generated_code(generated: "BlobItemInternal") -> BlobProperties: + blob = BlobProperties() + if generated.name.encoded and generated.name.content is not None: + blob.name = unquote(generated.name.content) + else: + blob.name = generated.name.content #type: ignore + blob_type = get_enum_value(generated.properties.blob_type) + blob.blob_type = BlobType(blob_type) + blob.etag = generated.properties.etag + blob.deleted = generated.deleted + blob.snapshot = generated.snapshot + blob.is_append_blob_sealed = generated.properties.is_sealed + blob.metadata = generated.metadata.additional_properties if generated.metadata else {} # type: ignore [assignment] + blob.encrypted_metadata = generated.metadata.encrypted if generated.metadata else None + blob.lease = LeaseProperties._from_generated(generated) # pylint: disable=protected-access + blob.copy = CopyProperties._from_generated(generated) # pylint: disable=protected-access + blob.last_modified = generated.properties.last_modified + blob.creation_time = generated.properties.creation_time # type: ignore [assignment] + blob.content_settings = ContentSettings._from_generated(generated) # pylint: disable=protected-access + blob.size = generated.properties.content_length # type: ignore [assignment] + blob.page_blob_sequence_number = generated.properties.blob_sequence_number + blob.server_encrypted = generated.properties.server_encrypted # type: ignore [assignment] + blob.encryption_scope = generated.properties.encryption_scope + blob.deleted_time = generated.properties.deleted_time + blob.remaining_retention_days = generated.properties.remaining_retention_days + blob.blob_tier = generated.properties.access_tier # type: ignore [assignment] + blob.rehydrate_priority = generated.properties.rehydrate_priority + blob.blob_tier_inferred = generated.properties.access_tier_inferred + blob.archive_status = generated.properties.archive_status + blob.blob_tier_change_time = generated.properties.access_tier_change_time + blob.version_id = generated.version_id + blob.is_current_version = generated.is_current_version + blob.tag_count = generated.properties.tag_count + blob.tags = parse_tags(generated.blob_tags) + blob.object_replication_source_properties = deserialize_ors_policies(generated.object_replication_metadata) + blob.last_accessed_on = generated.properties.last_accessed_on + blob.immutability_policy = ImmutabilityPolicy._from_generated(generated) # pylint: disable=protected-access + blob.has_legal_hold = generated.properties.legal_hold + blob.has_versions_only = generated.has_versions_only + return blob + +def parse_tags(generated_tags: Optional["BlobTags"]) -> Optional[Dict[str, str]]: + """Deserialize a list of BlobTag objects into a dict. + + :param Optional[BlobTags] generated_tags: + A list containing the BlobTag objects from generated code. + :returns: A dictionary of the BlobTag objects. + :rtype: Optional[Dict[str, str]] + """ + if generated_tags: + tag_dict = {t.key: t.value for t in generated_tags.blob_tag_set} + return tag_dict + return None + + +def load_single_xml_node(element: Element, name: str) -> Optional[Element]: + return element.find(name) + + +def load_many_xml_nodes( + element: Element, + name: str, + wrapper: Optional[str] = None +) -> List[Optional[Element]]: + found_element: Optional[Element] = element + if wrapper: + found_element = load_single_xml_node(element, wrapper) + if found_element is None: + return [] + return list(found_element.findall(name)) + + +def load_xml_string(element: Element, name: str) -> Optional[str]: + node = element.find(name) + if node is None or not node.text: + return None + return node.text + + +def load_xml_int(element: Element, name: str) -> Optional[int]: + node = element.find(name) + if node is None or not node.text: + return None + return int(node.text) |