aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/storage/blob/_encryption.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/storage/blob/_encryption.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/storage/blob/_encryption.py1127
1 files changed, 1127 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_encryption.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_encryption.py
new file mode 100644
index 00000000..42f5c51d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_encryption.py
@@ -0,0 +1,1127 @@
+# pylint: disable=too-many-lines
+# -------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for
+# license information.
+# --------------------------------------------------------------------------
+
+import math
+import os
+import sys
+import warnings
+from collections import OrderedDict
+from io import BytesIO
+from json import (
+ dumps,
+ loads,
+)
+from typing import Any, Callable, Dict, IO, Optional, Tuple, TYPE_CHECKING
+from typing import OrderedDict as TypedOrderedDict
+from typing_extensions import Protocol
+
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.ciphers import Cipher
+from cryptography.hazmat.primitives.ciphers.aead import AESGCM
+from cryptography.hazmat.primitives.ciphers.algorithms import AES
+from cryptography.hazmat.primitives.ciphers.modes import CBC
+from cryptography.hazmat.primitives.padding import PKCS7
+
+from azure.core.exceptions import HttpResponseError
+from azure.core.utils import CaseInsensitiveDict
+
+from ._version import VERSION
+from ._shared import decode_base64_to_bytes, encode_base64
+
+if TYPE_CHECKING:
+ from azure.core.pipeline import PipelineResponse
+ from cryptography.hazmat.primitives.ciphers import AEADEncryptionContext
+ from cryptography.hazmat.primitives.padding import PaddingContext
+
+
+_ENCRYPTION_PROTOCOL_V1 = '1.0'
+_ENCRYPTION_PROTOCOL_V2 = '2.0'
+_ENCRYPTION_PROTOCOL_V2_1 = '2.1'
+_VALID_ENCRYPTION_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2, _ENCRYPTION_PROTOCOL_V2_1]
+_ENCRYPTION_V2_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V2, _ENCRYPTION_PROTOCOL_V2_1]
+_GCM_REGION_DATA_LENGTH = 4 * 1024 * 1024
+_GCM_NONCE_LENGTH = 12
+_GCM_TAG_LENGTH = 16
+
+_ERROR_OBJECT_INVALID = \
+ '{0} does not define a complete interface. Value of {1} is either missing or invalid.'
+
+_ERROR_UNSUPPORTED_METHOD_FOR_ENCRYPTION = (
+ 'The require_encryption flag is set, but encryption is not supported'
+ ' for this method.')
+
+
+class KeyEncryptionKey(Protocol):
+
+ def wrap_key(self, key: bytes) -> bytes:
+ ...
+
+ def unwrap_key(self, key: bytes, algorithm: str) -> bytes:
+ ...
+
+ def get_kid(self) -> str:
+ ...
+
+ def get_key_wrap_algorithm(self) -> str:
+ ...
+
+
+def _validate_not_none(param_name: str, param: Any):
+ if param is None:
+ raise ValueError(f'{param_name} should not be None.')
+
+
+def _validate_key_encryption_key_wrap(kek: KeyEncryptionKey):
+ # Note that None is not callable and so will fail the second clause of each check.
+ if not hasattr(kek, 'wrap_key') or not callable(kek.wrap_key):
+ raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'wrap_key'))
+ if not hasattr(kek, 'get_kid') or not callable(kek.get_kid):
+ raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'get_kid'))
+ if not hasattr(kek, 'get_key_wrap_algorithm') or not callable(kek.get_key_wrap_algorithm):
+ raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'get_key_wrap_algorithm'))
+
+
+class StorageEncryptionMixin(object):
+ def _configure_encryption(self, kwargs: Dict[str, Any]):
+ self.require_encryption = kwargs.get("require_encryption", False)
+ self.encryption_version = kwargs.get("encryption_version", "1.0")
+ self.key_encryption_key = kwargs.get("key_encryption_key")
+ self.key_resolver_function = kwargs.get("key_resolver_function")
+ if self.key_encryption_key and self.encryption_version == '1.0':
+ warnings.warn("This client has been configured to use encryption with version 1.0. " +
+ "Version 1.0 is deprecated and no longer considered secure. It is highly " +
+ "recommended that you switch to using version 2.0. The version can be " +
+ "specified using the 'encryption_version' keyword.")
+
+
+class _EncryptionAlgorithm(object):
+ """
+ Specifies which client encryption algorithm is used.
+ """
+ AES_CBC_256 = 'AES_CBC_256'
+ AES_GCM_256 = 'AES_GCM_256'
+
+
+class _WrappedContentKey:
+ """
+ Represents the envelope key details stored on the service.
+ """
+
+ def __init__(self, algorithm: str, encrypted_key: bytes, key_id: str) -> None:
+ """
+ :param str algorithm:
+ The algorithm used for wrapping.
+ :param bytes encrypted_key:
+ The encrypted content-encryption-key.
+ :param str key_id:
+ The key-encryption-key identifier string.
+ """
+ _validate_not_none('algorithm', algorithm)
+ _validate_not_none('encrypted_key', encrypted_key)
+ _validate_not_none('key_id', key_id)
+
+ self.algorithm = algorithm
+ self.encrypted_key = encrypted_key
+ self.key_id = key_id
+
+
+class _EncryptedRegionInfo:
+ """
+ Represents the length of encryption elements.
+ This is only used for Encryption V2.
+ """
+
+ def __init__(self, data_length: int, nonce_length: int, tag_length: int) -> None:
+ """
+ :param int data_length:
+ The length of the encryption region data (not including nonce + tag).
+ :param int nonce_length:
+ The length of nonce used when encrypting.
+ :param int tag_length:
+ The length of the encryption tag.
+ """
+ _validate_not_none('data_length', data_length)
+ _validate_not_none('nonce_length', nonce_length)
+ _validate_not_none('tag_length', tag_length)
+
+ self.data_length = data_length
+ self.nonce_length = nonce_length
+ self.tag_length = tag_length
+
+
+class _EncryptionAgent:
+ """
+ Represents the encryption agent stored on the service.
+ It consists of the encryption protocol version and encryption algorithm used.
+ """
+
+ def __init__(self, encryption_algorithm: _EncryptionAlgorithm, protocol: str) -> None:
+ """
+ :param _EncryptionAlgorithm encryption_algorithm:
+ The algorithm used for encrypting the message contents.
+ :param str protocol:
+ The protocol version used for encryption.
+ """
+ _validate_not_none('encryption_algorithm', encryption_algorithm)
+ _validate_not_none('protocol', protocol)
+
+ self.encryption_algorithm = str(encryption_algorithm)
+ self.protocol = protocol
+
+
+class _EncryptionData:
+ """
+ Represents the encryption data that is stored on the service.
+ """
+
+ def __init__(
+ self, content_encryption_IV: Optional[bytes],
+ encrypted_region_info: Optional[_EncryptedRegionInfo],
+ encryption_agent: _EncryptionAgent,
+ wrapped_content_key: _WrappedContentKey,
+ key_wrapping_metadata: Dict[str, Any]
+ ) -> None:
+ """
+ :param Optional[bytes] content_encryption_IV:
+ The content encryption initialization vector.
+ Required for AES-CBC (V1).
+ :param Optional[_EncryptedRegionInfo] encrypted_region_info:
+ The info about the autenticated block sizes.
+ Required for AES-GCM (V2).
+ :param _EncryptionAgent encryption_agent:
+ The encryption agent.
+ :param _WrappedContentKey wrapped_content_key:
+ An object that stores the wrapping algorithm, the key identifier,
+ and the encrypted key bytes.
+ :param Dict[str, Any] key_wrapping_metadata:
+ A dict containing metadata related to the key wrapping.
+ """
+ _validate_not_none('encryption_agent', encryption_agent)
+ _validate_not_none('wrapped_content_key', wrapped_content_key)
+
+ # Validate we have the right matching optional parameter for the specified algorithm
+ if encryption_agent.encryption_algorithm == _EncryptionAlgorithm.AES_CBC_256:
+ _validate_not_none('content_encryption_IV', content_encryption_IV)
+ elif encryption_agent.encryption_algorithm == _EncryptionAlgorithm.AES_GCM_256:
+ _validate_not_none('encrypted_region_info', encrypted_region_info)
+ else:
+ raise ValueError("Invalid encryption algorithm.")
+
+ self.content_encryption_IV = content_encryption_IV
+ self.encrypted_region_info = encrypted_region_info
+ self.encryption_agent = encryption_agent
+ self.wrapped_content_key = wrapped_content_key
+ self.key_wrapping_metadata = key_wrapping_metadata
+
+
+class GCMBlobEncryptionStream:
+ """
+ A stream that performs AES-GCM encryption on the given data as
+ it's streamed. Data is read and encrypted in regions. The stream
+ will use the same encryption key and will generate a guaranteed unique
+ nonce for each encryption region.
+ """
+ def __init__(
+ self, content_encryption_key: bytes,
+ data_stream: IO[bytes],
+ ) -> None:
+ """
+ :param bytes content_encryption_key: The encryption key to use.
+ :param IO[bytes] data_stream: The data stream to read data from.
+ """
+ self.content_encryption_key = content_encryption_key
+ self.data_stream = data_stream
+
+ self.offset = 0
+ self.current = b''
+ self.nonce_counter = 0
+
+ def read(self, size: int = -1) -> bytes:
+ """
+ Read data from the stream. Specify -1 to read all available data.
+
+ :param int size: The amount of data to read. Defaults to -1 for all data.
+ :return: The bytes read.
+ :rtype: bytes
+ """
+ result = BytesIO()
+ remaining = sys.maxsize if size == -1 else size
+
+ while remaining > 0:
+ # Start by reading from current
+ if len(self.current) > 0:
+ read = min(remaining, len(self.current))
+ result.write(self.current[:read])
+
+ self.current = self.current[read:]
+ self.offset += read
+ remaining -= read
+
+ if remaining > 0:
+ # Read one region of data and encrypt it
+ data = self.data_stream.read(_GCM_REGION_DATA_LENGTH)
+ if len(data) == 0:
+ # No more data to read
+ break
+
+ self.current = encrypt_data_v2(data, self.nonce_counter, self.content_encryption_key)
+ # IMPORTANT: Must increment the nonce each time.
+ self.nonce_counter += 1
+
+ return result.getvalue()
+
+
+def encrypt_data_v2(data: bytes, nonce: int, key: bytes) -> bytes:
+ """
+ Encrypts the given data using the given nonce and key using AES-GCM.
+ The result includes the data in the form: nonce + ciphertext + tag.
+
+ :param bytes data: The raw data to encrypt.
+ :param int nonce: The nonce to use for encryption.
+ :param bytes key: The encryption key to use for encryption.
+ :return: The encrypted bytes in the form: nonce + ciphertext + tag.
+ :rtype: bytes
+ """
+ nonce_bytes = nonce.to_bytes(_GCM_NONCE_LENGTH, 'big')
+ aesgcm = AESGCM(key)
+
+ # Returns ciphertext + tag
+ ciphertext_with_tag = aesgcm.encrypt(nonce_bytes, data, None)
+ return nonce_bytes + ciphertext_with_tag
+
+
+def is_encryption_v2(encryption_data: Optional[_EncryptionData]) -> bool:
+ """
+ Determine whether the given encryption data signifies version 2.0 or 2.1.
+
+ :param Optional[_EncryptionData] encryption_data: The encryption data. Will return False if this is None.
+ :return: True, if the encryption data indicates encryption V2, false otherwise.
+ :rtype: bool
+ """
+ # If encryption_data is None, assume no encryption
+ return bool(encryption_data and (encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS))
+
+
+def modify_user_agent_for_encryption(
+ user_agent: str,
+ moniker: str,
+ encryption_version: str,
+ request_options: Dict[str, Any]
+ ) -> None:
+ """
+ Modifies the request options to contain a user agent string updated with encryption information.
+ Adds azstorage-clientsideencryption/<version> immediately proceeding the SDK descriptor.
+
+ :param str user_agent: The existing User Agent to modify.
+ :param str moniker: The specific SDK moniker. The modification will immediately proceed azsdk-python-{moniker}.
+ :param str encryption_version: The version of encryption being used.
+ :param Dict[str, Any] request_options: The reuqest options to add the user agent override to.
+ """
+ # If the user has specified user_agent_overwrite=True, don't make any modifications
+ if request_options.get('user_agent_overwrite'):
+ return
+
+ # If the feature flag is already present, don't add it again
+ feature_flag = f"azstorage-clientsideencryption/{encryption_version}"
+ if feature_flag in user_agent:
+ return
+
+ index = user_agent.find(f"azsdk-python-{moniker}")
+ user_agent = f"{user_agent[:index]}{feature_flag} {user_agent[index:]}"
+ # Since we are using user_agent_overwrite=True, we must prepend the user's user_agent if there is one
+ if request_options.get('user_agent'):
+ user_agent = f"{request_options.get('user_agent')} {user_agent}"
+
+ request_options['user_agent'] = user_agent
+ request_options['user_agent_overwrite'] = True
+
+
+def get_adjusted_upload_size(length: int, encryption_version: str) -> int:
+ """
+ Get the adjusted size of the blob upload which accounts for
+ extra encryption data (padding OR nonce + tag).
+
+ :param int length: The plaintext data length.
+ :param str encryption_version: The version of encryption being used.
+ :return: The new upload size to use.
+ :rtype: int
+ """
+ if encryption_version == _ENCRYPTION_PROTOCOL_V1:
+ return length + (16 - (length % 16))
+
+ if encryption_version == _ENCRYPTION_PROTOCOL_V2:
+ encryption_data_length = _GCM_NONCE_LENGTH + _GCM_TAG_LENGTH
+ regions = math.ceil(length / _GCM_REGION_DATA_LENGTH)
+ return length + (regions * encryption_data_length)
+
+ raise ValueError("Invalid encryption version specified.")
+
+
+def get_adjusted_download_range_and_offset(
+ start: int,
+ end: int,
+ length: Optional[int],
+ encryption_data: Optional[_EncryptionData]) -> Tuple[Tuple[int, int], Tuple[int, int]]:
+ """
+ Gets the new download range and offsets into the decrypted data for
+ the given user-specified range. The new download range will include all
+ the data needed to decrypt the user-provided range and will include only
+ full encryption regions.
+
+ The offsets returned will be the offsets needed to fetch the user-requested
+ data out of the full decrypted data. The end offset is different based on the
+ encryption version. For V1, the end offset is offset from the end whereas for
+ V2, the end offset is the ending index into the stream.
+ V1: decrypted_data[start_offset : len(decrypted_data) - end_offset]
+ V2: decrypted_data[start_offset : end_offset]
+
+ :param int start: The user-requested start index.
+ :param int end: The user-requested end index.
+ :param Optional[int] length: The user-requested length. Only used for V1.
+ :param Optional[_EncryptionData] encryption_data: The encryption data to determine version and sizes.
+ :return: (new start, new end), (start offset, end offset)
+ :rtype: Tuple[Tuple[int, int], Tuple[int, int]]
+ """
+ start_offset, end_offset = 0, 0
+ if encryption_data is None:
+ return (start, end), (start_offset, end_offset)
+
+ if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V1:
+ if start is not None:
+ # Align the start of the range along a 16 byte block
+ start_offset = start % 16
+ start -= start_offset
+
+ # Include an extra 16 bytes for the IV if necessary
+ # Because of the previous offsetting, start_range will always
+ # be a multiple of 16.
+ if start > 0:
+ start_offset += 16
+ start -= 16
+
+ if length is not None:
+ # Align the end of the range along a 16 byte block
+ end_offset = 15 - (end % 16)
+ end += end_offset
+
+ elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
+ start_offset, end_offset = 0, end
+
+ if encryption_data.encrypted_region_info is None:
+ raise ValueError("Missing required metadata for Encryption V2")
+
+ nonce_length = encryption_data.encrypted_region_info.nonce_length
+ data_length = encryption_data.encrypted_region_info.data_length
+ tag_length = encryption_data.encrypted_region_info.tag_length
+ region_length = nonce_length + data_length + tag_length
+ requested_length = end - start
+
+ if start is not None:
+ # Find which data region the start is in
+ region_num = start // data_length
+ # The start of the data region is different from the start of the encryption region
+ data_start = region_num * data_length
+ region_start = region_num * region_length
+ # Offset is based on data region
+ start_offset = start - data_start
+ # New start is the start of the encryption region
+ start = region_start
+
+ if end is not None:
+ # Find which data region the end is in
+ region_num = end // data_length
+ end_offset = start_offset + requested_length + 1
+ # New end is the end of the encryption region
+ end = (region_num * region_length) + region_length - 1
+
+ return (start, end), (start_offset, end_offset)
+
+
+def parse_encryption_data(metadata: Dict[str, Any]) -> Optional[_EncryptionData]:
+ """
+ Parses the encryption data out of the given blob metadata. If metadata does
+ not exist or there are parsing errors, this function will just return None.
+
+ :param Dict[str, Any] metadata: The blob metadata parsed from the response.
+ :return: The encryption data or None
+ :rtype: Optional[_EncryptionData]
+ """
+ try:
+ # Use case insensitive dict as key needs to be case-insensitive
+ case_insensitive_metadata = CaseInsensitiveDict(metadata)
+ return _dict_to_encryption_data(loads(case_insensitive_metadata['encryptiondata']))
+ except: # pylint: disable=bare-except
+ return None
+
+
+def adjust_blob_size_for_encryption(size: int, encryption_data: Optional[_EncryptionData]) -> int:
+ """
+ Adjusts the given blob size for encryption by subtracting the size of
+ the encryption data (nonce + tag). This only has an affect for encryption V2.
+
+ :param int size: The original blob size.
+ :param Optional[_EncryptionData] encryption_data: The encryption data to determine version and sizes.
+ :return: The new blob size.
+ :rtype: int
+ """
+ if (encryption_data is not None and
+ encryption_data.encrypted_region_info is not None and
+ is_encryption_v2(encryption_data)):
+
+ nonce_length = encryption_data.encrypted_region_info.nonce_length
+ data_length = encryption_data.encrypted_region_info.data_length
+ tag_length = encryption_data.encrypted_region_info.tag_length
+ region_length = nonce_length + data_length + tag_length
+
+ num_regions = math.ceil(size / region_length)
+ metadata_size = num_regions * (nonce_length + tag_length)
+ return size - metadata_size
+
+ return size
+
+
+def _generate_encryption_data_dict(
+ kek: KeyEncryptionKey,
+ cek: bytes,
+ iv: Optional[bytes],
+ version: str
+ ) -> TypedOrderedDict[str, Any]:
+ """
+ Generates and returns the encryption metadata as a dict.
+
+ :param KeyEncryptionKey kek: The key encryption key. See calling functions for more information.
+ :param bytes cek: The content encryption key.
+ :param Optional[bytes] iv: The initialization vector. Only required for AES-CBC.
+ :param str version: The client encryption version used.
+ :return: A dict containing all the encryption metadata.
+ :rtype: Dict[str, Any]
+ """
+ # Encrypt the cek.
+ if version == _ENCRYPTION_PROTOCOL_V1:
+ wrapped_cek = kek.wrap_key(cek)
+ # For V2, we include the encryption version in the wrapped key.
+ elif version == _ENCRYPTION_PROTOCOL_V2:
+ # We must pad the version to 8 bytes for AES Keywrap algorithms
+ to_wrap = _ENCRYPTION_PROTOCOL_V2.encode().ljust(8, b'\0') + cek
+ wrapped_cek = kek.wrap_key(to_wrap)
+ else:
+ raise ValueError("Invalid encryption version specified.")
+
+ # Build the encryption_data dict.
+ # Use OrderedDict to comply with Java's ordering requirement.
+ wrapped_content_key = OrderedDict()
+ wrapped_content_key['KeyId'] = kek.get_kid()
+ wrapped_content_key['EncryptedKey'] = encode_base64(wrapped_cek)
+ wrapped_content_key['Algorithm'] = kek.get_key_wrap_algorithm()
+
+ encryption_agent = OrderedDict()
+ encryption_agent['Protocol'] = version
+
+ if version == _ENCRYPTION_PROTOCOL_V1:
+ encryption_agent['EncryptionAlgorithm'] = _EncryptionAlgorithm.AES_CBC_256
+
+ elif version == _ENCRYPTION_PROTOCOL_V2:
+ encryption_agent['EncryptionAlgorithm'] = _EncryptionAlgorithm.AES_GCM_256
+
+ encrypted_region_info = OrderedDict()
+ encrypted_region_info['DataLength'] = _GCM_REGION_DATA_LENGTH
+ encrypted_region_info['NonceLength'] = _GCM_NONCE_LENGTH
+
+ encryption_data_dict: TypedOrderedDict[str, Any] = OrderedDict()
+ encryption_data_dict['WrappedContentKey'] = wrapped_content_key
+ encryption_data_dict['EncryptionAgent'] = encryption_agent
+ if version == _ENCRYPTION_PROTOCOL_V1:
+ encryption_data_dict['ContentEncryptionIV'] = encode_base64(iv)
+ elif version == _ENCRYPTION_PROTOCOL_V2:
+ encryption_data_dict['EncryptedRegionInfo'] = encrypted_region_info
+ encryption_data_dict['KeyWrappingMetadata'] = OrderedDict({'EncryptionLibrary': 'Python ' + VERSION})
+
+ return encryption_data_dict
+
+
+def _dict_to_encryption_data(encryption_data_dict: Dict[str, Any]) -> _EncryptionData:
+ """
+ Converts the specified dictionary to an EncryptionData object for
+ eventual use in decryption.
+
+ :param dict encryption_data_dict:
+ The dictionary containing the encryption data.
+ :return: an _EncryptionData object built from the dictionary.
+ :rtype: _EncryptionData
+ """
+ try:
+ protocol = encryption_data_dict['EncryptionAgent']['Protocol']
+ if protocol not in _VALID_ENCRYPTION_PROTOCOLS:
+ raise ValueError("Unsupported encryption version.")
+ except KeyError as exc:
+ raise ValueError("Unsupported encryption version.") from exc
+ wrapped_content_key = encryption_data_dict['WrappedContentKey']
+ wrapped_content_key = _WrappedContentKey(wrapped_content_key['Algorithm'],
+ decode_base64_to_bytes(wrapped_content_key['EncryptedKey']),
+ wrapped_content_key['KeyId'])
+
+ encryption_agent = encryption_data_dict['EncryptionAgent']
+ encryption_agent = _EncryptionAgent(encryption_agent['EncryptionAlgorithm'],
+ encryption_agent['Protocol'])
+
+ if 'KeyWrappingMetadata' in encryption_data_dict:
+ key_wrapping_metadata = encryption_data_dict['KeyWrappingMetadata']
+ else:
+ key_wrapping_metadata = None
+
+ # AES-CBC only
+ encryption_iv = None
+ if 'ContentEncryptionIV' in encryption_data_dict:
+ encryption_iv = decode_base64_to_bytes(encryption_data_dict['ContentEncryptionIV'])
+
+ # AES-GCM only
+ region_info = None
+ if 'EncryptedRegionInfo' in encryption_data_dict:
+ encrypted_region_info = encryption_data_dict['EncryptedRegionInfo']
+ region_info = _EncryptedRegionInfo(encrypted_region_info['DataLength'],
+ encrypted_region_info['NonceLength'],
+ _GCM_TAG_LENGTH)
+
+ encryption_data = _EncryptionData(encryption_iv,
+ region_info,
+ encryption_agent,
+ wrapped_content_key,
+ key_wrapping_metadata)
+
+ return encryption_data
+
+
+def _generate_AES_CBC_cipher(cek: bytes, iv: bytes) -> Cipher:
+ """
+ Generates and returns an encryption cipher for AES CBC using the given cek and iv.
+
+ :param bytes[] cek: The content encryption key for the cipher.
+ :param bytes[] iv: The initialization vector for the cipher.
+ :return: A cipher for encrypting in AES256 CBC.
+ :rtype: ~cryptography.hazmat.primitives.ciphers.Cipher
+ """
+
+ backend = default_backend()
+ algorithm = AES(cek)
+ mode = CBC(iv)
+ return Cipher(algorithm, mode, backend)
+
+
+def _validate_and_unwrap_cek(
+ encryption_data: _EncryptionData,
+ key_encryption_key: Optional[KeyEncryptionKey] = None,
+ key_resolver: Optional[Callable[[str], KeyEncryptionKey]] = None
+) -> bytes:
+ """
+ Extracts and returns the content_encryption_key stored in the encryption_data object
+ and performs necessary validation on all parameters.
+ :param _EncryptionData encryption_data:
+ The encryption metadata of the retrieved value.
+ :param Optional[KeyEncryptionKey] key_encryption_key:
+ The user-provided key-encryption-key. Must implement the following methods:
+ wrap_key(key)
+ - Wraps the specified key using an algorithm of the user's choice.
+ get_key_wrap_algorithm()
+ - Returns the algorithm used to wrap the specified symmetric key.
+ get_kid()
+ - Returns a string key id for this key-encryption-key.
+ :param Optional[Callable[[str], KeyEncryptionKey]] key_resolver:
+ A function used that, given a key_id, will return a key_encryption_key. Please refer
+ to high-level service object instance variables for more details.
+ :return: The content_encryption_key stored in the encryption_data object.
+ :rtype: bytes
+ """
+
+ _validate_not_none('encrypted_key', encryption_data.wrapped_content_key.encrypted_key)
+
+ # Validate we have the right info for the specified version
+ if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V1:
+ _validate_not_none('content_encryption_IV', encryption_data.content_encryption_IV)
+ elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
+ _validate_not_none('encrypted_region_info', encryption_data.encrypted_region_info)
+ else:
+ raise ValueError('Specified encryption version is not supported.')
+
+ content_encryption_key: Optional[bytes] = None
+
+ # If the resolver exists, give priority to the key it finds.
+ if key_resolver is not None:
+ key_encryption_key = key_resolver(encryption_data.wrapped_content_key.key_id)
+
+ if key_encryption_key is None:
+ raise ValueError("Unable to decrypt. key_resolver and key_encryption_key cannot both be None.")
+ if not hasattr(key_encryption_key, 'get_kid') or not callable(key_encryption_key.get_kid):
+ raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'get_kid'))
+ if not hasattr(key_encryption_key, 'unwrap_key') or not callable(key_encryption_key.unwrap_key):
+ raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'unwrap_key'))
+ if encryption_data.wrapped_content_key.key_id != key_encryption_key.get_kid():
+ raise ValueError('Provided or resolved key-encryption-key does not match the id of key used to encrypt.')
+ # Will throw an exception if the specified algorithm is not supported.
+ content_encryption_key = key_encryption_key.unwrap_key(
+ encryption_data.wrapped_content_key.encrypted_key,
+ encryption_data.wrapped_content_key.algorithm)
+
+ # For V2, the version is included with the cek. We need to validate it
+ # and remove it from the actual cek.
+ if encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
+ version_2_bytes = encryption_data.encryption_agent.protocol.encode().ljust(8, b'\0')
+ cek_version_bytes = content_encryption_key[:len(version_2_bytes)]
+ if cek_version_bytes != version_2_bytes:
+ raise ValueError('The encryption metadata is not valid and may have been modified.')
+
+ # Remove version from the start of the cek.
+ content_encryption_key = content_encryption_key[len(version_2_bytes):]
+
+ _validate_not_none('content_encryption_key', content_encryption_key)
+
+ return content_encryption_key
+
+
+def _decrypt_message(
+ message: bytes,
+ encryption_data: _EncryptionData,
+ key_encryption_key: Optional[KeyEncryptionKey] = None,
+ resolver: Optional[Callable[[str], KeyEncryptionKey]] = None
+) -> bytes:
+ """
+ Decrypts the given ciphertext using AES256 in CBC mode with 128 bit padding.
+ Unwraps the content-encryption-key using the user-provided or resolved key-encryption-key (kek).
+ Returns the original plaintext.
+
+ :param bytes message:
+ The ciphertext to be decrypted.
+ :param _EncryptionData encryption_data:
+ The metadata associated with this ciphertext.
+ :param Optional[KeyEncryptionKey] key_encryption_key:
+ The user-provided key-encryption-key. Must implement the following methods:
+ wrap_key(key)
+ - Wraps the specified key using an algorithm of the user's choice.
+ get_key_wrap_algorithm()
+ - Returns the algorithm used to wrap the specified symmetric key.
+ get_kid()
+ - Returns a string key id for this key-encryption-key.
+ :param Optional[Callable[[str], KeyEncryptionKey]] resolver:
+ The user-provided key resolver. Uses the kid string to return a key-encryption-key
+ implementing the interface defined above.
+ :return: The decrypted plaintext.
+ :rtype: bytes
+ """
+ _validate_not_none('message', message)
+ content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, resolver)
+
+ if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V1:
+ if not encryption_data.content_encryption_IV:
+ raise ValueError("Missing required metadata for decryption.")
+
+ cipher = _generate_AES_CBC_cipher(content_encryption_key, encryption_data.content_encryption_IV)
+
+ # decrypt data
+ decryptor = cipher.decryptor()
+ decrypted_data = (decryptor.update(message) + decryptor.finalize())
+
+ # unpad data
+ unpadder = PKCS7(128).unpadder()
+ decrypted_data = (unpadder.update(decrypted_data) + unpadder.finalize())
+
+ elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
+ block_info = encryption_data.encrypted_region_info
+ if not block_info or not block_info.nonce_length:
+ raise ValueError("Missing required metadata for decryption.")
+
+ if encryption_data.encrypted_region_info is None:
+ raise ValueError("Missing required metadata for Encryption V2")
+
+ nonce_length = int(encryption_data.encrypted_region_info.nonce_length)
+
+ # First bytes are the nonce
+ nonce = message[:nonce_length]
+ ciphertext_with_tag = message[nonce_length:]
+
+ aesgcm = AESGCM(content_encryption_key)
+ decrypted_data = aesgcm.decrypt(nonce, ciphertext_with_tag, None)
+
+ else:
+ raise ValueError('Specified encryption version is not supported.')
+
+ return decrypted_data
+
+
+def encrypt_blob(blob: bytes, key_encryption_key: KeyEncryptionKey, version: str) -> Tuple[str, bytes]:
+ """
+ Encrypts the given blob using the given encryption protocol version.
+ Wraps the generated content-encryption-key using the user-provided key-encryption-key (kek).
+ Returns a json-formatted string containing the encryption metadata. This method should
+ only be used when a blob is small enough for single shot upload. Encrypting larger blobs
+ is done as a part of the upload_data_chunks method.
+
+ :param bytes blob:
+ The blob to be encrypted.
+ :param KeyEncryptionKey key_encryption_key:
+ The user-provided key-encryption-key. Must implement the following methods:
+ wrap_key(key)
+ - Wraps the specified key using an algorithm of the user's choice.
+ get_key_wrap_algorithm()
+ - Returns the algorithm used to wrap the specified symmetric key.
+ get_kid()
+ - Returns a string key id for this key-encryption-key.
+ :param str version: The client encryption version to use.
+ :return: A tuple of json-formatted string containing the encryption metadata and the encrypted blob data.
+ :rtype: (str, bytes)
+ """
+
+ _validate_not_none('blob', blob)
+ _validate_not_none('key_encryption_key', key_encryption_key)
+ _validate_key_encryption_key_wrap(key_encryption_key)
+
+ if version == _ENCRYPTION_PROTOCOL_V1:
+ # AES256 uses 256 bit (32 byte) keys and always with 16 byte blocks
+ content_encryption_key = os.urandom(32)
+ initialization_vector = os.urandom(16)
+
+ cipher = _generate_AES_CBC_cipher(content_encryption_key, initialization_vector)
+
+ # PKCS7 with 16 byte blocks ensures compatibility with AES.
+ padder = PKCS7(128).padder()
+ padded_data = padder.update(blob) + padder.finalize()
+
+ # Encrypt the data.
+ encryptor = cipher.encryptor()
+ encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
+
+ elif version == _ENCRYPTION_PROTOCOL_V2:
+ # AES256 GCM uses 256 bit (32 byte) keys and a 12 byte nonce.
+ content_encryption_key = os.urandom(32)
+ initialization_vector = None
+
+ data = BytesIO(blob)
+ encryption_stream = GCMBlobEncryptionStream(content_encryption_key, data)
+
+ encrypted_data = encryption_stream.read()
+
+ else:
+ raise ValueError("Invalid encryption version specified.")
+
+ encryption_data = _generate_encryption_data_dict(key_encryption_key, content_encryption_key,
+ initialization_vector, version)
+ encryption_data['EncryptionMode'] = 'FullBlob'
+
+ return dumps(encryption_data), encrypted_data
+
+
+def generate_blob_encryption_data(
+ key_encryption_key: Optional[KeyEncryptionKey],
+ version: str
+) -> Tuple[Optional[bytes], Optional[bytes], Optional[str]]:
+ """
+ Generates the encryption_metadata for the blob.
+
+ :param Optional[KeyEncryptionKey] key_encryption_key:
+ The key-encryption-key used to wrap the cek associate with this blob.
+ :param str version: The client encryption version to use.
+ :return: A tuple containing the cek and iv for this blob as well as the
+ serialized encryption metadata for the blob.
+ :rtype: (Optional[bytes], Optional[bytes], Optional[str])
+ """
+
+ encryption_data = None
+ content_encryption_key = None
+ initialization_vector = None
+ if key_encryption_key:
+ _validate_key_encryption_key_wrap(key_encryption_key)
+ content_encryption_key = os.urandom(32)
+ # Initialization vector only needed for V1
+ if version == _ENCRYPTION_PROTOCOL_V1:
+ initialization_vector = os.urandom(16)
+ encryption_data_dict = _generate_encryption_data_dict(key_encryption_key,
+ content_encryption_key,
+ initialization_vector,
+ version)
+ encryption_data_dict['EncryptionMode'] = 'FullBlob'
+ encryption_data = dumps(encryption_data_dict)
+
+ return content_encryption_key, initialization_vector, encryption_data
+
+
+def decrypt_blob( # pylint: disable=too-many-locals,too-many-statements
+ require_encryption: bool,
+ key_encryption_key: Optional[KeyEncryptionKey],
+ key_resolver: Optional[Callable[[str], KeyEncryptionKey]],
+ content: bytes,
+ start_offset: int,
+ end_offset: int,
+ response_headers: Dict[str, Any]
+) -> bytes:
+ """
+ Decrypts the given blob contents and returns only the requested range.
+
+ :param bool require_encryption:
+ Whether the calling blob service requires objects to be decrypted.
+ :param Optional[KeyEncryptionKey] key_encryption_key:
+ The user-provided key-encryption-key. Must implement the following methods:
+ wrap_key(key)
+ - Wraps the specified key using an algorithm of the user's choice.
+ get_key_wrap_algorithm()
+ - Returns the algorithm used to wrap the specified symmetric key.
+ get_kid()
+ - Returns a string key id for this key-encryption-key.
+ :param key_resolver:
+ The user-provided key resolver. Uses the kid string to return a key-encryption-key
+ implementing the interface defined above.
+ :type key_resolver: Optional[Callable[[str], KeyEncryptionKey]]
+ :param bytes content:
+ The encrypted blob content.
+ :param int start_offset:
+ The adjusted offset from the beginning of the *decrypted* content for the caller's data.
+ :param int end_offset:
+ The adjusted offset from the end of the *decrypted* content for the caller's data.
+ :param Dict[str, Any] response_headers:
+ A dictionary of response headers from the download request. Expected to include the
+ 'x-ms-meta-encryptiondata' header if the blob was encrypted.
+ :return: The decrypted blob content.
+ :rtype: bytes
+ """
+ try:
+ encryption_data = _dict_to_encryption_data(loads(response_headers['x-ms-meta-encryptiondata']))
+ except Exception as exc: # pylint: disable=broad-except
+ if require_encryption:
+ raise ValueError(
+ 'Encryption required, but received data does not contain appropriate metadata.' + \
+ 'Data was either not encrypted or metadata has been lost.') from exc
+
+ return content
+
+ algorithm = encryption_data.encryption_agent.encryption_algorithm
+ if algorithm not in(_EncryptionAlgorithm.AES_CBC_256, _EncryptionAlgorithm.AES_GCM_256):
+ raise ValueError('Specified encryption algorithm is not supported.')
+
+ version = encryption_data.encryption_agent.protocol
+ if version not in _VALID_ENCRYPTION_PROTOCOLS:
+ raise ValueError('Specified encryption version is not supported.')
+
+ content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
+
+ if version == _ENCRYPTION_PROTOCOL_V1:
+ blob_type = response_headers['x-ms-blob-type']
+
+ iv: Optional[bytes] = None
+ unpad = False
+ if 'content-range' in response_headers:
+ content_range = response_headers['content-range']
+ # Format: 'bytes x-y/size'
+
+ # Ignore the word 'bytes'
+ content_range = content_range.split(' ')
+
+ content_range = content_range[1].split('-')
+ content_range = content_range[1].split('/')
+ end_range = int(content_range[0])
+ blob_size = int(content_range[1])
+
+ if start_offset >= 16:
+ iv = content[:16]
+ content = content[16:]
+ start_offset -= 16
+ else:
+ iv = encryption_data.content_encryption_IV
+
+ if end_range == blob_size - 1:
+ unpad = True
+ else:
+ unpad = True
+ iv = encryption_data.content_encryption_IV
+
+ if blob_type == 'PageBlob':
+ unpad = False
+
+ if iv is None:
+ raise ValueError("Missing required metadata for Encryption V1")
+
+ cipher = _generate_AES_CBC_cipher(content_encryption_key, iv)
+ decryptor = cipher.decryptor()
+
+ content = decryptor.update(content) + decryptor.finalize()
+ if unpad:
+ unpadder = PKCS7(128).unpadder()
+ content = unpadder.update(content) + unpadder.finalize()
+
+ return content[start_offset: len(content) - end_offset]
+
+ if version in _ENCRYPTION_V2_PROTOCOLS:
+ # We assume the content contains only full encryption regions
+ total_size = len(content)
+ offset = 0
+
+ if encryption_data.encrypted_region_info is None:
+ raise ValueError("Missing required metadata for Encryption V2")
+
+ nonce_length = encryption_data.encrypted_region_info.nonce_length
+ data_length = encryption_data.encrypted_region_info.data_length
+ tag_length = encryption_data.encrypted_region_info.tag_length
+ region_length = nonce_length + data_length + tag_length
+
+ decrypted_content = bytearray()
+ while offset < total_size:
+ # Process one encryption region at a time
+ process_size = min(region_length, total_size)
+ encrypted_region = content[offset:offset + process_size]
+
+ # First bytes are the nonce
+ nonce = encrypted_region[:nonce_length]
+ ciphertext_with_tag = encrypted_region[nonce_length:]
+
+ aesgcm = AESGCM(content_encryption_key)
+ decrypted_data = aesgcm.decrypt(nonce, ciphertext_with_tag, None)
+ decrypted_content.extend(decrypted_data)
+
+ offset += process_size
+
+ # Read the caller requested data from the decrypted content
+ return decrypted_content[start_offset:end_offset]
+
+ raise ValueError('Specified encryption version is not supported.')
+
+
+def get_blob_encryptor_and_padder(
+ cek: Optional[bytes],
+ iv: Optional[bytes],
+ should_pad: bool
+) -> Tuple[Optional["AEADEncryptionContext"], Optional["PaddingContext"]]:
+ encryptor = None
+ padder = None
+
+ if cek is not None and iv is not None:
+ cipher = _generate_AES_CBC_cipher(cek, iv)
+ encryptor = cipher.encryptor()
+ padder = PKCS7(128).padder() if should_pad else None
+
+ return encryptor, padder
+
+
+def encrypt_queue_message(message: str, key_encryption_key: KeyEncryptionKey, version: str) -> str:
+ """
+ Encrypts the given plain text message using the given protocol version.
+ Wraps the generated content-encryption-key using the user-provided key-encryption-key (kek).
+ Returns a json-formatted string containing the encrypted message and the encryption metadata.
+
+ :param str message:
+ The plain text message to be encrypted.
+ :param KeyEncryptionKey key_encryption_key:
+ The user-provided key-encryption-key. Must implement the following methods:
+ wrap_key(key)
+ - Wraps the specified key using an algorithm of the user's choice.
+ get_key_wrap_algorithm()
+ - Returns the algorithm used to wrap the specified symmetric key.
+ get_kid()
+ - Returns a string key id for this key-encryption-key.
+ :param str version: The client encryption version to use.
+ :return: A json-formatted string containing the encrypted message and the encryption metadata.
+ :rtype: str
+ """
+
+ _validate_not_none('message', message)
+ _validate_not_none('key_encryption_key', key_encryption_key)
+ _validate_key_encryption_key_wrap(key_encryption_key)
+
+ # Queue encoding functions all return unicode strings, and encryption should
+ # operate on binary strings.
+ message_as_bytes: bytes = message.encode('utf-8')
+
+ if version == _ENCRYPTION_PROTOCOL_V1:
+ # AES256 CBC uses 256 bit (32 byte) keys and always with 16 byte blocks
+ content_encryption_key = os.urandom(32)
+ initialization_vector = os.urandom(16)
+
+ cipher = _generate_AES_CBC_cipher(content_encryption_key, initialization_vector)
+
+ # PKCS7 with 16 byte blocks ensures compatibility with AES.
+ padder = PKCS7(128).padder()
+ padded_data = padder.update(message_as_bytes) + padder.finalize()
+
+ # Encrypt the data.
+ encryptor = cipher.encryptor()
+ encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
+
+ elif version == _ENCRYPTION_PROTOCOL_V2:
+ # AES256 GCM uses 256 bit (32 byte) keys and a 12 byte nonce.
+ content_encryption_key = os.urandom(32)
+ initialization_vector = None
+
+ # The nonce MUST be different for each key
+ nonce = os.urandom(12)
+ aesgcm = AESGCM(content_encryption_key)
+
+ # Returns ciphertext + tag
+ cipertext_with_tag = aesgcm.encrypt(nonce, message_as_bytes, None)
+ encrypted_data = nonce + cipertext_with_tag
+
+ else:
+ raise ValueError("Invalid encryption version specified.")
+
+ # Build the dictionary structure.
+ queue_message = {'EncryptedMessageContents': encode_base64(encrypted_data),
+ 'EncryptionData': _generate_encryption_data_dict(key_encryption_key,
+ content_encryption_key,
+ initialization_vector,
+ version)}
+
+ return dumps(queue_message)
+
+
+def decrypt_queue_message(
+ message: str,
+ response: "PipelineResponse",
+ require_encryption: bool,
+ key_encryption_key: Optional[KeyEncryptionKey],
+ resolver: Optional[Callable[[str], KeyEncryptionKey]]
+) -> str:
+ """
+ Returns the decrypted message contents from an EncryptedQueueMessage.
+ If no encryption metadata is present, will return the unaltered message.
+ :param str message:
+ The JSON formatted QueueEncryptedMessage contents with all associated metadata.
+ :param Any response:
+ The pipeline response used to generate an error with.
+ :param bool require_encryption:
+ If set, will enforce that the retrieved messages are encrypted and decrypt them.
+ :param Optional[KeyEncryptionKey] key_encryption_key:
+ The user-provided key-encryption-key. Must implement the following methods:
+ wrap_key(key)
+ - Wraps the specified key using an algorithm of the user's choice.
+ get_key_wrap_algorithm()
+ - Returns the algorithm used to wrap the specified symmetric key.
+ get_kid()
+ - Returns a string key id for this key-encryption-key.
+ :param Optional[Callable[[str], KeyEncryptionKey]] resolver:
+ The user-provided key resolver. Uses the kid string to return a key-encryption-key
+ implementing the interface defined above.
+ :return: The plain text message from the queue message.
+ :rtype: str
+ """
+ response = response.http_response
+
+ try:
+ deserialized_message: Dict[str, Any] = loads(message)
+
+ encryption_data = _dict_to_encryption_data(deserialized_message['EncryptionData'])
+ decoded_data = decode_base64_to_bytes(deserialized_message['EncryptedMessageContents'])
+ except (KeyError, ValueError) as exc:
+ # Message was not json formatted and so was not encrypted
+ # or the user provided a json formatted message
+ # or the metadata was malformed.
+ if require_encryption:
+ raise ValueError(
+ 'Encryption required, but received message does not contain appropriate metatadata. ' + \
+ 'Message was either not encrypted or metadata was incorrect.') from exc
+
+ return message
+ try:
+ return _decrypt_message(decoded_data, encryption_data, key_encryption_key, resolver).decode('utf-8')
+ except Exception as error:
+ raise HttpResponseError(
+ message="Decryption failed.",
+ response=response, #type: ignore [arg-type]
+ error=error) from error