# ------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. # -------------------------------------------------------------------------- import logging from typing import NoReturn, TYPE_CHECKING from xml.etree.ElementTree import Element from azure.core.pipeline.policies import ContentDecodePolicy from azure.core.exceptions import ( HttpResponseError, DecodeError, ResourceModifiedError, ClientAuthenticationError, ResourceNotFoundError, ResourceExistsError ) from ._models import ( FileProperties, DirectoryProperties, LeaseProperties, DeletedPathProperties, StaticWebsite, RetentionPolicy, Metrics, AnalyticsLogging, PathProperties ) from ._shared.models import StorageErrorCode from ._shared.response_handlers import deserialize_metadata if TYPE_CHECKING: pass _LOGGER = logging.getLogger(__name__) def deserialize_dir_properties(response, obj, headers): metadata = deserialize_metadata(response, obj, headers) dir_properties = DirectoryProperties( metadata=metadata, owner=response.headers.get('x-ms-owner'), group=response.headers.get('x-ms-group'), permissions=response.headers.get('x-ms-permissions'), acl=response.headers.get('x-ms-acl'), **headers ) return dir_properties def deserialize_file_properties(response, obj, headers): metadata = deserialize_metadata(response, obj, headers) # DataLake specific headers that are not deserialized in blob are pulled directly from the raw response header file_properties = FileProperties( metadata=metadata, encryption_context=response.headers.get('x-ms-encryption-context'), owner=response.headers.get('x-ms-owner'), group=response.headers.get('x-ms-group'), permissions=response.headers.get('x-ms-permissions'), acl=response.headers.get('x-ms-acl'), **headers ) if 'Content-Range' in headers: if 'x-ms-blob-content-md5' in headers: file_properties.content_settings.content_md5 = headers['x-ms-blob-content-md5'] else: file_properties.content_settings.content_md5 = None return file_properties def deserialize_path_properties(path_list): return [PathProperties._from_generated(path) for path in path_list] # pylint: disable=protected-access def return_headers_and_deserialized_path_list(response, deserialized, response_headers): # pylint: disable=name-too-long, unused-argument return deserialized.paths if deserialized.paths else {}, normalize_headers(response_headers) def get_deleted_path_properties_from_generated_code(generated): # pylint: disable=name-too-long deleted_path = DeletedPathProperties() deleted_path.name = generated.name deleted_path.deleted_time = generated.properties.deleted_time deleted_path.remaining_retention_days = generated.properties.remaining_retention_days deleted_path.deletion_id = generated.deletion_id return deleted_path def is_file_path(_, __, headers): if headers['x-ms-resource-type'] == "file": return True return False def get_datalake_service_properties(datalake_properties): datalake_properties["analytics_logging"] = AnalyticsLogging._from_generated( # pylint: disable=protected-access datalake_properties["analytics_logging"]) datalake_properties["hour_metrics"] = Metrics._from_generated(datalake_properties["hour_metrics"]) # pylint: disable=protected-access datalake_properties["minute_metrics"] = Metrics._from_generated( # pylint: disable=protected-access datalake_properties["minute_metrics"]) datalake_properties["delete_retention_policy"] = RetentionPolicy._from_generated( # pylint: disable=protected-access datalake_properties["delete_retention_policy"]) datalake_properties["static_website"] = StaticWebsite._from_generated( # pylint: disable=protected-access datalake_properties["static_website"]) return datalake_properties def from_blob_properties(blob_properties, **additional_args): file_props = FileProperties() file_props.name = blob_properties.name file_props.etag = blob_properties.etag file_props.deleted = blob_properties.deleted file_props.metadata = blob_properties.metadata file_props.lease = blob_properties.lease file_props.lease.__class__ = LeaseProperties file_props.last_modified = blob_properties.last_modified file_props.creation_time = blob_properties.creation_time file_props.size = blob_properties.size file_props.deleted_time = blob_properties.deleted_time file_props.remaining_retention_days = blob_properties.remaining_retention_days file_props.content_settings = blob_properties.content_settings # Parse additional Datalake-only properties file_props.encryption_context = additional_args.pop('encryption_context', None) file_props.owner = additional_args.pop('owner', None) file_props.group = additional_args.pop('group', None) file_props.permissions = additional_args.pop('permissions', None) file_props.acl = additional_args.pop('acl', None) return file_props def normalize_headers(headers): normalized = {} for key, value in headers.items(): if key.startswith('x-ms-'): key = key[5:] normalized[key.lower().replace('-', '_')] = value return normalized def process_storage_error(storage_error) -> NoReturn: # pylint:disable=too-many-statements raise_error = HttpResponseError serialized = False if not storage_error.response: raise storage_error # If it is one of those three then it has been serialized prior by the generated layer. if isinstance(storage_error, (ResourceNotFoundError, ClientAuthenticationError, ResourceExistsError)): serialized = True error_code = storage_error.response.headers.get('x-ms-error-code') error_message = storage_error.message additional_data = {} error_dict = {} try: error_body = ContentDecodePolicy.deserialize_from_http_generics(storage_error.response) # If it is an XML response if isinstance(error_body, Element): error_dict = { child.tag.lower(): child.text for child in error_body } # If it is a JSON response elif isinstance(error_body, dict): error_dict = error_body.get('error', {}) elif not error_code: _LOGGER.warning( 'Unexpected return type %s from ContentDecodePolicy.deserialize_from_http_generics.', type(error_body)) error_dict = {'message': str(error_body)} # If we extracted from a Json or XML response if error_dict: error_code = error_dict.get('code') error_message = error_dict.get('message') additional_data = {k: v for k, v in error_dict.items() if k not in {'code', 'message'}} except DecodeError: pass try: # This check would be unnecessary if we have already serialized the error. if error_code and not serialized: error_code = StorageErrorCode(error_code) if error_code in [StorageErrorCode.condition_not_met]: raise_error = ResourceModifiedError if error_code in [StorageErrorCode.invalid_authentication_info, StorageErrorCode.authentication_failed]: raise_error = ClientAuthenticationError if error_code in [StorageErrorCode.resource_not_found, StorageErrorCode.invalid_property_name, StorageErrorCode.invalid_source_uri, StorageErrorCode.source_path_not_found, StorageErrorCode.lease_name_mismatch, StorageErrorCode.file_system_not_found, StorageErrorCode.path_not_found, StorageErrorCode.parent_not_found, StorageErrorCode.invalid_destination_path, StorageErrorCode.invalid_rename_source_path, StorageErrorCode.lease_is_already_broken, StorageErrorCode.invalid_source_or_destination_resource_type, StorageErrorCode.rename_destination_parent_path_not_found]: raise_error = ResourceNotFoundError if error_code in [StorageErrorCode.account_already_exists, StorageErrorCode.account_being_created, StorageErrorCode.resource_already_exists, StorageErrorCode.resource_type_mismatch, StorageErrorCode.source_path_is_being_deleted, StorageErrorCode.path_already_exists, StorageErrorCode.destination_path_is_being_deleted, StorageErrorCode.file_system_already_exists, StorageErrorCode.file_system_being_deleted, StorageErrorCode.path_conflict]: raise_error = ResourceExistsError except ValueError: # Got an unknown error code pass # Error message should include all the error properties try: error_message += f"\nErrorCode:{error_code.value}" except AttributeError: error_message += f"\nErrorCode:{error_code}" for name, info in additional_data.items(): error_message += f"\n{name}:{info}" # No need to create an instance if it has already been serialized by the generated layer if serialized: storage_error.message = error_message error = storage_error else: error = raise_error(message=error_message, response=storage_error.response) # Ensure these properties are stored in the error instance as well (not just the error message) error.error_code = error_code error.additional_info = additional_data # error.args is what's surfaced on the traceback - show error message in all cases error.args = (error.message,) try: # `from None` prevents us from double printing the exception (suppresses generated layer error context) exec("raise error from None") # pylint: disable=exec-used # nosec except SyntaxError as exc: raise error from exc