# ------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. # -------------------------------------------------------------------------- from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING, Union from urllib.parse import quote, urlparse from azure.core import MatchConditions from azure.core.pipeline.transport import HttpRequest from ._blob_client_helpers import _generic_delete_blob_options from ._generated import AzureBlobStorage from ._models import BlobProperties from ._shared.base_client import parse_query if TYPE_CHECKING: from azure.storage.blob import RehydratePriority from urllib.parse import ParseResult from ._generated.models import LeaseAccessConditions, ModifiedAccessConditions from ._models import PremiumPageBlobTier, StandardBlobTier def _parse_url(account_url: str, container_name: str) -> Tuple["ParseResult", Any]: try: if not account_url.lower().startswith('http'): account_url = "https://" + account_url except AttributeError as exc: raise ValueError("Container URL must be a string.") from exc parsed_url = urlparse(account_url.rstrip('/')) if not container_name: raise ValueError("Please specify a container name.") if not parsed_url.netloc: raise ValueError(f"Invalid URL: {account_url}") _, sas_token = parse_query(parsed_url.query) return parsed_url, sas_token def _format_url(container_name: Union[bytes, str], hostname: str, scheme: str, query_str: str) -> str: if isinstance(container_name, str): container_name = container_name.encode('UTF-8') return f"{scheme}://{hostname}/{quote(container_name)}{query_str}" # This code is a copy from _generated. # Once Autorest is able to provide request preparation this code should be removed. def _generate_delete_blobs_subrequest_options( client: AzureBlobStorage, snapshot: Optional[str] = None, version_id: Optional[str] = None, delete_snapshots: Optional[str] = None, lease_access_conditions: Optional["LeaseAccessConditions"] = None, modified_access_conditions: Optional["ModifiedAccessConditions"] = None, **kwargs ) -> Tuple[Dict[str, Any], Dict[str, Any]]: lease_id = None if lease_access_conditions is not None: lease_id = lease_access_conditions.lease_id if_modified_since = None if modified_access_conditions is not None: if_modified_since = modified_access_conditions.if_modified_since if_unmodified_since = None if modified_access_conditions is not None: if_unmodified_since = modified_access_conditions.if_unmodified_since if_match = None if modified_access_conditions is not None: if_match = modified_access_conditions.if_match if_none_match = None if modified_access_conditions is not None: if_none_match = modified_access_conditions.if_none_match if_tags = None if modified_access_conditions is not None: if_tags = modified_access_conditions.if_tags # Construct parameters timeout = kwargs.pop('timeout', None) query_parameters = {} if snapshot is not None: query_parameters['snapshot'] = client._serialize.query("snapshot", snapshot, 'str') # pylint: disable=protected-access if version_id is not None: query_parameters['versionid'] = client._serialize.query("version_id", version_id, 'str') # pylint: disable=protected-access if timeout is not None: query_parameters['timeout'] = client._serialize.query("timeout", timeout, 'int', minimum=0) # pylint: disable=protected-access # Construct headers header_parameters = {} if delete_snapshots is not None: header_parameters['x-ms-delete-snapshots'] = client._serialize.header( # pylint: disable=protected-access "delete_snapshots", delete_snapshots, 'DeleteSnapshotsOptionType') if lease_id is not None: header_parameters['x-ms-lease-id'] = client._serialize.header( # pylint: disable=protected-access "lease_id", lease_id, 'str') if if_modified_since is not None: header_parameters['If-Modified-Since'] = client._serialize.header( # pylint: disable=protected-access "if_modified_since", if_modified_since, 'rfc-1123') if if_unmodified_since is not None: header_parameters['If-Unmodified-Since'] = client._serialize.header( # pylint: disable=protected-access "if_unmodified_since", if_unmodified_since, 'rfc-1123') if if_match is not None: header_parameters['If-Match'] = client._serialize.header( # pylint: disable=protected-access "if_match", if_match, 'str') if if_none_match is not None: header_parameters['If-None-Match'] = client._serialize.header( # pylint: disable=protected-access "if_none_match", if_none_match, 'str') if if_tags is not None: header_parameters['x-ms-if-tags'] = client._serialize.header("if_tags", if_tags, 'str') # pylint: disable=protected-access return query_parameters, header_parameters def _generate_delete_blobs_options( query_str: str, container_name: str, client: AzureBlobStorage, *blobs: Union[str, Dict[str, Any], BlobProperties], **kwargs: Any ) -> Tuple[List[HttpRequest], Dict[str, Any]]: timeout = kwargs.pop('timeout', None) raise_on_any_failure = kwargs.pop('raise_on_any_failure', True) delete_snapshots = kwargs.pop('delete_snapshots', None) if_modified_since = kwargs.pop('if_modified_since', None) if_unmodified_since = kwargs.pop('if_unmodified_since', None) if_tags_match_condition = kwargs.pop('if_tags_match_condition', None) url_prepend = kwargs.pop('url_prepend', None) kwargs.update({'raise_on_any_failure': raise_on_any_failure, 'sas': query_str.replace('?', '&'), 'timeout': '&timeout=' + str(timeout) if timeout else "", 'path': container_name, 'restype': 'restype=container&' }) reqs = [] for blob in blobs: if not isinstance(blob, str): blob_name = blob.get('name') options = _generic_delete_blob_options( snapshot=blob.get('snapshot'), version_id=blob.get('version_id'), delete_snapshots=delete_snapshots or blob.get('delete_snapshots'), lease=blob.get('lease_id'), if_modified_since=if_modified_since or blob.get('if_modified_since'), if_unmodified_since=if_unmodified_since or blob.get('if_unmodified_since'), etag=blob.get('etag'), if_tags_match_condition=if_tags_match_condition or blob.get('if_tags_match_condition'), match_condition=blob.get('match_condition') or MatchConditions.IfNotModified if blob.get('etag') else None, timeout=blob.get('timeout'), ) else: blob_name = blob options = _generic_delete_blob_options( delete_snapshots=delete_snapshots, if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since, if_tags_match_condition=if_tags_match_condition ) query_parameters, header_parameters = _generate_delete_blobs_subrequest_options(client, **options) req = HttpRequest( "DELETE", (f"{'/' + quote(url_prepend) if url_prepend else ''}/" f"{quote(container_name)}/{quote(str(blob_name), safe='/~')}{query_str}"), headers=header_parameters ) req.format_parameters(query_parameters) reqs.append(req) return reqs, kwargs # This code is a copy from _generated. # Once Autorest is able to provide request preparation this code should be removed. def _generate_set_tiers_subrequest_options( client: AzureBlobStorage, tier: Optional[Union["PremiumPageBlobTier", "StandardBlobTier", str]], snapshot: Optional[str] = None, version_id: Optional[str] = None, rehydrate_priority: Optional["RehydratePriority"] = None, lease_access_conditions: Optional["LeaseAccessConditions"] = None, **kwargs: Any ) -> Tuple[Dict[str, Any], Dict[str, Any]]: if not tier: raise ValueError("A blob tier must be specified") if snapshot and version_id: raise ValueError("Snapshot and version_id cannot be set at the same time") if_tags = kwargs.pop('if_tags', None) lease_id = None if lease_access_conditions is not None: lease_id = lease_access_conditions.lease_id comp = "tier" timeout = kwargs.pop('timeout', None) # Construct parameters query_parameters = {} if snapshot is not None: query_parameters['snapshot'] = client._serialize.query("snapshot", snapshot, 'str') # pylint: disable=protected-access if version_id is not None: query_parameters['versionid'] = client._serialize.query("version_id", version_id, 'str') # pylint: disable=protected-access if timeout is not None: query_parameters['timeout'] = client._serialize.query("timeout", timeout, 'int', minimum=0) # pylint: disable=protected-access query_parameters['comp'] = client._serialize.query("comp", comp, 'str') # pylint: disable=protected-access # Construct headers header_parameters = {} header_parameters['x-ms-access-tier'] = client._serialize.header("tier", tier, 'str') # pylint: disable=protected-access if rehydrate_priority is not None: header_parameters['x-ms-rehydrate-priority'] = client._serialize.header( # pylint: disable=protected-access "rehydrate_priority", rehydrate_priority, 'str') if lease_id is not None: header_parameters['x-ms-lease-id'] = client._serialize.header("lease_id", lease_id, 'str') # pylint: disable=protected-access if if_tags is not None: header_parameters['x-ms-if-tags'] = client._serialize.header("if_tags", if_tags, 'str') # pylint: disable=protected-access return query_parameters, header_parameters def _generate_set_tiers_options( query_str: str, container_name: str, blob_tier: Optional[Union["PremiumPageBlobTier", "StandardBlobTier", str]], client: AzureBlobStorage, *blobs: Union[str, Dict[str, Any], BlobProperties], **kwargs: Any ) -> Tuple[List[HttpRequest], Dict[str, Any]]: timeout = kwargs.pop('timeout', None) raise_on_any_failure = kwargs.pop('raise_on_any_failure', True) rehydrate_priority = kwargs.pop('rehydrate_priority', None) if_tags = kwargs.pop('if_tags_match_condition', None) url_prepend = kwargs.pop('url_prepend', None) kwargs.update({'raise_on_any_failure': raise_on_any_failure, 'sas': query_str.replace('?', '&'), 'timeout': '&timeout=' + str(timeout) if timeout else "", 'path': container_name, 'restype': 'restype=container&' }) reqs = [] for blob in blobs: if not isinstance(blob, str): blob_name = blob.get('name') tier = blob_tier or blob.get('blob_tier') query_parameters, header_parameters = _generate_set_tiers_subrequest_options( client=client, tier=tier, snapshot=blob.get('snapshot'), version_id=blob.get('version_id'), rehydrate_priority=rehydrate_priority or blob.get('rehydrate_priority'), lease_access_conditions=blob.get('lease_id'), if_tags=if_tags or blob.get('if_tags_match_condition'), timeout=timeout or blob.get('timeout') ) else: blob_name = blob query_parameters, header_parameters = _generate_set_tiers_subrequest_options( client, blob_tier, rehydrate_priority=rehydrate_priority, if_tags=if_tags) req = HttpRequest( "PUT", (f"{'/' + quote(url_prepend) if url_prepend else ''}/" f"{quote(container_name)}/{quote(str(blob_name), safe='/~')}{query_str}"), headers=header_parameters ) req.format_parameters(query_parameters) reqs.append(req) return reqs, kwargs