aboutsummaryrefslogtreecommitdiff
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import logging
from typing import NoReturn, TYPE_CHECKING
from xml.etree.ElementTree import Element

from azure.core.pipeline.policies import ContentDecodePolicy
from azure.core.exceptions import (
    HttpResponseError,
    DecodeError,
    ResourceModifiedError,
    ClientAuthenticationError,
    ResourceNotFoundError,
    ResourceExistsError
)
from ._models import (
    FileProperties,
    DirectoryProperties,
    LeaseProperties,
    DeletedPathProperties,
    StaticWebsite,
    RetentionPolicy,
    Metrics,
    AnalyticsLogging,
    PathProperties
)
from ._shared.models import StorageErrorCode
from ._shared.response_handlers import deserialize_metadata

if TYPE_CHECKING:
    pass

_LOGGER = logging.getLogger(__name__)


def deserialize_dir_properties(response, obj, headers):
    metadata = deserialize_metadata(response, obj, headers)
    dir_properties = DirectoryProperties(
        metadata=metadata,
        owner=response.headers.get('x-ms-owner'),
        group=response.headers.get('x-ms-group'),
        permissions=response.headers.get('x-ms-permissions'),
        acl=response.headers.get('x-ms-acl'),
        **headers
    )
    return dir_properties


def deserialize_file_properties(response, obj, headers):
    metadata = deserialize_metadata(response, obj, headers)
    # DataLake specific headers that are not deserialized in blob are pulled directly from the raw response header
    file_properties = FileProperties(
        metadata=metadata,
        encryption_context=response.headers.get('x-ms-encryption-context'),
        owner=response.headers.get('x-ms-owner'),
        group=response.headers.get('x-ms-group'),
        permissions=response.headers.get('x-ms-permissions'),
        acl=response.headers.get('x-ms-acl'),
        **headers
    )
    if 'Content-Range' in headers:
        if 'x-ms-blob-content-md5' in headers:
            file_properties.content_settings.content_md5 = headers['x-ms-blob-content-md5']
        else:
            file_properties.content_settings.content_md5 = None
    return file_properties


def deserialize_path_properties(path_list):
    return [PathProperties._from_generated(path) for path in path_list] # pylint: disable=protected-access


def return_headers_and_deserialized_path_list(response, deserialized, response_headers):  # pylint: disable=name-too-long, unused-argument
    return deserialized.paths if deserialized.paths else {}, normalize_headers(response_headers)


def get_deleted_path_properties_from_generated_code(generated):  # pylint: disable=name-too-long
    deleted_path = DeletedPathProperties()
    deleted_path.name = generated.name
    deleted_path.deleted_time = generated.properties.deleted_time
    deleted_path.remaining_retention_days = generated.properties.remaining_retention_days
    deleted_path.deletion_id = generated.deletion_id
    return deleted_path


def is_file_path(_, __, headers):
    if headers['x-ms-resource-type'] == "file":
        return True
    return False


def get_datalake_service_properties(datalake_properties):
    datalake_properties["analytics_logging"] = AnalyticsLogging._from_generated(    # pylint: disable=protected-access
        datalake_properties["analytics_logging"])
    datalake_properties["hour_metrics"] = Metrics._from_generated(datalake_properties["hour_metrics"])  # pylint: disable=protected-access
    datalake_properties["minute_metrics"] = Metrics._from_generated(    # pylint: disable=protected-access
        datalake_properties["minute_metrics"])
    datalake_properties["delete_retention_policy"] = RetentionPolicy._from_generated(   # pylint: disable=protected-access
        datalake_properties["delete_retention_policy"])
    datalake_properties["static_website"] = StaticWebsite._from_generated(  # pylint: disable=protected-access
        datalake_properties["static_website"])
    return datalake_properties


def from_blob_properties(blob_properties, **additional_args):
    file_props = FileProperties()
    file_props.name = blob_properties.name
    file_props.etag = blob_properties.etag
    file_props.deleted = blob_properties.deleted
    file_props.metadata = blob_properties.metadata
    file_props.lease = blob_properties.lease
    file_props.lease.__class__ = LeaseProperties
    file_props.last_modified = blob_properties.last_modified
    file_props.creation_time = blob_properties.creation_time
    file_props.size = blob_properties.size
    file_props.deleted_time = blob_properties.deleted_time
    file_props.remaining_retention_days = blob_properties.remaining_retention_days
    file_props.content_settings = blob_properties.content_settings

    # Parse additional Datalake-only properties
    file_props.encryption_context = additional_args.pop('encryption_context', None)
    file_props.owner = additional_args.pop('owner', None)
    file_props.group = additional_args.pop('group', None)
    file_props.permissions = additional_args.pop('permissions', None)
    file_props.acl = additional_args.pop('acl', None)

    return file_props


def normalize_headers(headers):
    normalized = {}
    for key, value in headers.items():
        if key.startswith('x-ms-'):
            key = key[5:]
        normalized[key.lower().replace('-', '_')] = value
    return normalized


def process_storage_error(storage_error) -> NoReturn:  # pylint:disable=too-many-statements
    raise_error = HttpResponseError
    serialized = False
    if not storage_error.response:
        raise storage_error
    # If it is one of those three then it has been serialized prior by the generated layer.
    if isinstance(storage_error, (ResourceNotFoundError, ClientAuthenticationError, ResourceExistsError)):
        serialized = True
    error_code = storage_error.response.headers.get('x-ms-error-code')
    error_message = storage_error.message
    additional_data = {}
    error_dict = {}
    try:
        error_body = ContentDecodePolicy.deserialize_from_http_generics(storage_error.response)
        # If it is an XML response
        if isinstance(error_body, Element):
            error_dict = {
                child.tag.lower(): child.text
                for child in error_body
            }
        # If it is a JSON response
        elif isinstance(error_body, dict):
            error_dict = error_body.get('error', {})
        elif not error_code:
            _LOGGER.warning(
                'Unexpected return type %s from ContentDecodePolicy.deserialize_from_http_generics.', type(error_body))
            error_dict = {'message': str(error_body)}

        # If we extracted from a Json or XML response
        if error_dict:
            error_code = error_dict.get('code')
            error_message = error_dict.get('message')
            additional_data = {k: v for k, v in error_dict.items() if k not in {'code', 'message'}}

    except DecodeError:
        pass

    try:
        # This check would be unnecessary if we have already serialized the error.
        if error_code and not serialized:
            error_code = StorageErrorCode(error_code)
            if error_code in [StorageErrorCode.condition_not_met]:
                raise_error = ResourceModifiedError
            if error_code in [StorageErrorCode.invalid_authentication_info,
                              StorageErrorCode.authentication_failed]:
                raise_error = ClientAuthenticationError
            if error_code in [StorageErrorCode.resource_not_found,
                              StorageErrorCode.invalid_property_name,
                              StorageErrorCode.invalid_source_uri,
                              StorageErrorCode.source_path_not_found,
                              StorageErrorCode.lease_name_mismatch,
                              StorageErrorCode.file_system_not_found,
                              StorageErrorCode.path_not_found,
                              StorageErrorCode.parent_not_found,
                              StorageErrorCode.invalid_destination_path,
                              StorageErrorCode.invalid_rename_source_path,
                              StorageErrorCode.lease_is_already_broken,
                              StorageErrorCode.invalid_source_or_destination_resource_type,
                              StorageErrorCode.rename_destination_parent_path_not_found]:
                raise_error = ResourceNotFoundError
            if error_code in [StorageErrorCode.account_already_exists,
                              StorageErrorCode.account_being_created,
                              StorageErrorCode.resource_already_exists,
                              StorageErrorCode.resource_type_mismatch,
                              StorageErrorCode.source_path_is_being_deleted,
                              StorageErrorCode.path_already_exists,
                              StorageErrorCode.destination_path_is_being_deleted,
                              StorageErrorCode.file_system_already_exists,
                              StorageErrorCode.file_system_being_deleted,
                              StorageErrorCode.path_conflict]:
                raise_error = ResourceExistsError
    except ValueError:
        # Got an unknown error code
        pass

    # Error message should include all the error properties
    try:
        error_message += f"\nErrorCode:{error_code.value}"
    except AttributeError:
        error_message += f"\nErrorCode:{error_code}"
    for name, info in additional_data.items():
        error_message += f"\n{name}:{info}"

    # No need to create an instance if it has already been serialized by the generated layer
    if serialized:
        storage_error.message = error_message
        error = storage_error
    else:
        error = raise_error(message=error_message, response=storage_error.response)
    # Ensure these properties are stored in the error instance as well (not just the error message)
    error.error_code = error_code
    error.additional_info = additional_data
    # error.args is what's surfaced on the traceback - show error message in all cases
    error.args = (error.message,)

    try:
        # `from None` prevents us from double printing the exception (suppresses generated layer error context)
        exec("raise error from None")   # pylint: disable=exec-used # nosec
    except SyntaxError as exc:
        raise error from exc