aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/storage/blob/_deserialize.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/storage/blob/_deserialize.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/storage/blob/_deserialize.py234
1 files changed, 234 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_deserialize.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_deserialize.py
new file mode 100644
index 00000000..b6ee9160
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_deserialize.py
@@ -0,0 +1,234 @@
+# -------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for
+# license information.
+# --------------------------------------------------------------------------
+
+from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
+from urllib.parse import unquote
+from xml.etree.ElementTree import Element
+
+from ._models import (
+ BlobAnalyticsLogging,
+ BlobProperties,
+ BlobType,
+ ContainerProperties,
+ ContentSettings,
+ CopyProperties,
+ CorsRule,
+ ImmutabilityPolicy,
+ LeaseProperties,
+ Metrics,
+ ObjectReplicationPolicy,
+ ObjectReplicationRule,
+ RetentionPolicy,
+ StaticWebsite
+)
+from ._shared.models import get_enum_value
+from ._shared.response_handlers import deserialize_metadata
+
+if TYPE_CHECKING:
+ from azure.core.pipeline import PipelineResponse
+ from ._generated.models import (
+ BlobItemInternal,
+ BlobTags,
+ PageList,
+ StorageServiceProperties,
+ StorageServiceStats,
+ )
+ from ._shared.models import LocationMode
+
+def deserialize_pipeline_response_into_cls(cls_method, response: "PipelineResponse", obj: Any, headers: Dict[str, Any]):
+ try:
+ deserialized_response = response.http_response
+ except AttributeError:
+ deserialized_response = response
+ return cls_method(deserialized_response, obj, headers)
+
+
+def deserialize_blob_properties(response: "PipelineResponse", obj: Any, headers: Dict[str, Any]) -> BlobProperties:
+ blob_properties = BlobProperties(
+ metadata=deserialize_metadata(response, obj, headers),
+ object_replication_source_properties=deserialize_ors_policies(response.http_response.headers),
+ **headers
+ )
+ if 'Content-Range' in headers:
+ if 'x-ms-blob-content-md5' in headers:
+ blob_properties.content_settings.content_md5 = headers['x-ms-blob-content-md5']
+ else:
+ blob_properties.content_settings.content_md5 = None
+ return blob_properties
+
+
+def deserialize_ors_policies(policy_dictionary: Optional[Dict[str, str]]) -> Optional[List[ObjectReplicationPolicy]]:
+
+ if policy_dictionary is None:
+ return None
+ # For source blobs (blobs that have policy ids and rule ids applied to them),
+ # the header will be formatted as "x-ms-or-<policy_id>_<rule_id>: {Complete, Failed}".
+ # The value of this header is the status of the replication.
+ or_policy_status_headers = {key: val for key, val in policy_dictionary.items()
+ if 'or-' in key and key != 'x-ms-or-policy-id'}
+
+ parsed_result: Dict[str, List[ObjectReplicationRule]] = {}
+
+ for key, val in or_policy_status_headers.items():
+ # list blobs gives or-policy_rule and get blob properties gives x-ms-or-policy_rule
+ policy_and_rule_ids = key.split('or-')[1].split('_')
+ policy_id = policy_and_rule_ids[0]
+ rule_id = policy_and_rule_ids[1]
+
+ # If we are seeing this policy for the first time, create a new list to store rule_id -> result
+ parsed_result[policy_id] = parsed_result.get(policy_id) or []
+ parsed_result[policy_id].append(ObjectReplicationRule(rule_id=rule_id, status=val))
+
+ result_list = [ObjectReplicationPolicy(policy_id=k, rules=v) for k, v in parsed_result.items()]
+
+ return result_list
+
+
+def deserialize_blob_stream(
+ response: "PipelineResponse",
+ obj: Any,
+ headers: Dict[str, Any]
+) -> Tuple["LocationMode", Any]:
+ blob_properties = deserialize_blob_properties(response, obj, headers)
+ obj.properties = blob_properties
+ return response.http_response.location_mode, obj
+
+
+def deserialize_container_properties(
+ response: "PipelineResponse",
+ obj: Any,
+ headers: Dict[str, Any]
+) -> ContainerProperties:
+ metadata = deserialize_metadata(response, obj, headers)
+ container_properties = ContainerProperties(
+ metadata=metadata,
+ **headers
+ )
+ return container_properties
+
+
+def get_page_ranges_result(ranges: "PageList") -> Tuple[List[Dict[str, int]], List[Dict[str, int]]]:
+ page_range = []
+ clear_range = []
+ if ranges.page_range:
+ page_range = [{'start': b.start, 'end': b.end} for b in ranges.page_range]
+ if ranges.clear_range:
+ clear_range = [{'start': b.start, 'end': b.end} for b in ranges.clear_range]
+ return page_range, clear_range
+
+
+def service_stats_deserialize(generated: "StorageServiceStats") -> Dict[str, Any]:
+ status = None
+ last_sync_time = None
+ if generated.geo_replication is not None:
+ status = generated.geo_replication.status
+ last_sync_time = generated.geo_replication.last_sync_time
+ return {
+ 'geo_replication': {
+ 'status': status,
+ 'last_sync_time': last_sync_time
+ }
+ }
+
+def service_properties_deserialize(generated: "StorageServiceProperties") -> Dict[str, Any]:
+ cors_list = None
+ if generated.cors is not None:
+ cors_list = [CorsRule._from_generated(cors) for cors in generated.cors] # pylint: disable=protected-access
+ return {
+ 'analytics_logging': BlobAnalyticsLogging._from_generated(generated.logging), # pylint: disable=protected-access
+ 'hour_metrics': Metrics._from_generated(generated.hour_metrics), # pylint: disable=protected-access
+ 'minute_metrics': Metrics._from_generated(generated.minute_metrics), # pylint: disable=protected-access
+ 'cors': cors_list,
+ 'target_version': generated.default_service_version,
+ 'delete_retention_policy': RetentionPolicy._from_generated(generated.delete_retention_policy), # pylint: disable=protected-access
+ 'static_website': StaticWebsite._from_generated(generated.static_website), # pylint: disable=protected-access
+ }
+
+
+def get_blob_properties_from_generated_code(generated: "BlobItemInternal") -> BlobProperties:
+ blob = BlobProperties()
+ if generated.name.encoded and generated.name.content is not None:
+ blob.name = unquote(generated.name.content)
+ else:
+ blob.name = generated.name.content #type: ignore
+ blob_type = get_enum_value(generated.properties.blob_type)
+ blob.blob_type = BlobType(blob_type)
+ blob.etag = generated.properties.etag
+ blob.deleted = generated.deleted
+ blob.snapshot = generated.snapshot
+ blob.is_append_blob_sealed = generated.properties.is_sealed
+ blob.metadata = generated.metadata.additional_properties if generated.metadata else {} # type: ignore [assignment]
+ blob.encrypted_metadata = generated.metadata.encrypted if generated.metadata else None
+ blob.lease = LeaseProperties._from_generated(generated) # pylint: disable=protected-access
+ blob.copy = CopyProperties._from_generated(generated) # pylint: disable=protected-access
+ blob.last_modified = generated.properties.last_modified
+ blob.creation_time = generated.properties.creation_time # type: ignore [assignment]
+ blob.content_settings = ContentSettings._from_generated(generated) # pylint: disable=protected-access
+ blob.size = generated.properties.content_length # type: ignore [assignment]
+ blob.page_blob_sequence_number = generated.properties.blob_sequence_number
+ blob.server_encrypted = generated.properties.server_encrypted # type: ignore [assignment]
+ blob.encryption_scope = generated.properties.encryption_scope
+ blob.deleted_time = generated.properties.deleted_time
+ blob.remaining_retention_days = generated.properties.remaining_retention_days
+ blob.blob_tier = generated.properties.access_tier # type: ignore [assignment]
+ blob.rehydrate_priority = generated.properties.rehydrate_priority
+ blob.blob_tier_inferred = generated.properties.access_tier_inferred
+ blob.archive_status = generated.properties.archive_status
+ blob.blob_tier_change_time = generated.properties.access_tier_change_time
+ blob.version_id = generated.version_id
+ blob.is_current_version = generated.is_current_version
+ blob.tag_count = generated.properties.tag_count
+ blob.tags = parse_tags(generated.blob_tags)
+ blob.object_replication_source_properties = deserialize_ors_policies(generated.object_replication_metadata)
+ blob.last_accessed_on = generated.properties.last_accessed_on
+ blob.immutability_policy = ImmutabilityPolicy._from_generated(generated) # pylint: disable=protected-access
+ blob.has_legal_hold = generated.properties.legal_hold
+ blob.has_versions_only = generated.has_versions_only
+ return blob
+
+def parse_tags(generated_tags: Optional["BlobTags"]) -> Optional[Dict[str, str]]:
+ """Deserialize a list of BlobTag objects into a dict.
+
+ :param Optional[BlobTags] generated_tags:
+ A list containing the BlobTag objects from generated code.
+ :returns: A dictionary of the BlobTag objects.
+ :rtype: Optional[Dict[str, str]]
+ """
+ if generated_tags:
+ tag_dict = {t.key: t.value for t in generated_tags.blob_tag_set}
+ return tag_dict
+ return None
+
+
+def load_single_xml_node(element: Element, name: str) -> Optional[Element]:
+ return element.find(name)
+
+
+def load_many_xml_nodes(
+ element: Element,
+ name: str,
+ wrapper: Optional[str] = None
+) -> List[Optional[Element]]:
+ found_element: Optional[Element] = element
+ if wrapper:
+ found_element = load_single_xml_node(element, wrapper)
+ if found_element is None:
+ return []
+ return list(found_element.findall(name))
+
+
+def load_xml_string(element: Element, name: str) -> Optional[str]:
+ node = element.find(name)
+ if node is None or not node.text:
+ return None
+ return node.text
+
+
+def load_xml_int(element: Element, name: str) -> Optional[int]:
+ node = element.find(name)
+ if node is None or not node.text:
+ return None
+ return int(node.text)