about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/storage/blob/_encryption.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/storage/blob/_encryption.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/storage/blob/_encryption.py1127
1 files changed, 1127 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_encryption.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_encryption.py
new file mode 100644
index 00000000..42f5c51d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_encryption.py
@@ -0,0 +1,1127 @@
+# pylint: disable=too-many-lines
+# -------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for
+# license information.
+# --------------------------------------------------------------------------
+
+import math
+import os
+import sys
+import warnings
+from collections import OrderedDict
+from io import BytesIO
+from json import (
+    dumps,
+    loads,
+)
+from typing import Any, Callable, Dict, IO, Optional, Tuple, TYPE_CHECKING
+from typing import OrderedDict as TypedOrderedDict
+from typing_extensions import Protocol
+
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.ciphers import Cipher
+from cryptography.hazmat.primitives.ciphers.aead import AESGCM
+from cryptography.hazmat.primitives.ciphers.algorithms import AES
+from cryptography.hazmat.primitives.ciphers.modes import CBC
+from cryptography.hazmat.primitives.padding import PKCS7
+
+from azure.core.exceptions import HttpResponseError
+from azure.core.utils import CaseInsensitiveDict
+
+from ._version import VERSION
+from ._shared import decode_base64_to_bytes, encode_base64
+
+if TYPE_CHECKING:
+    from azure.core.pipeline import PipelineResponse
+    from cryptography.hazmat.primitives.ciphers import AEADEncryptionContext
+    from cryptography.hazmat.primitives.padding import PaddingContext
+
+
+_ENCRYPTION_PROTOCOL_V1 = '1.0'
+_ENCRYPTION_PROTOCOL_V2 = '2.0'
+_ENCRYPTION_PROTOCOL_V2_1 = '2.1'
+_VALID_ENCRYPTION_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2, _ENCRYPTION_PROTOCOL_V2_1]
+_ENCRYPTION_V2_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V2, _ENCRYPTION_PROTOCOL_V2_1]
+_GCM_REGION_DATA_LENGTH = 4 * 1024 * 1024
+_GCM_NONCE_LENGTH = 12
+_GCM_TAG_LENGTH = 16
+
+_ERROR_OBJECT_INVALID = \
+    '{0} does not define a complete interface. Value of {1} is either missing or invalid.'
+
+_ERROR_UNSUPPORTED_METHOD_FOR_ENCRYPTION = (
+    'The require_encryption flag is set, but encryption is not supported'
+    ' for this method.')
+
+
+class KeyEncryptionKey(Protocol):
+
+    def wrap_key(self, key: bytes) -> bytes:
+        ...
+
+    def unwrap_key(self, key: bytes, algorithm: str) -> bytes:
+        ...
+
+    def get_kid(self) -> str:
+        ...
+
+    def get_key_wrap_algorithm(self) -> str:
+        ...
+
+
+def _validate_not_none(param_name: str, param: Any):
+    if param is None:
+        raise ValueError(f'{param_name} should not be None.')
+
+
+def _validate_key_encryption_key_wrap(kek: KeyEncryptionKey):
+    # Note that None is not callable and so will fail the second clause of each check.
+    if not hasattr(kek, 'wrap_key') or not callable(kek.wrap_key):
+        raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'wrap_key'))
+    if not hasattr(kek, 'get_kid') or not callable(kek.get_kid):
+        raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'get_kid'))
+    if not hasattr(kek, 'get_key_wrap_algorithm') or not callable(kek.get_key_wrap_algorithm):
+        raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'get_key_wrap_algorithm'))
+
+
+class StorageEncryptionMixin(object):
+    def _configure_encryption(self, kwargs: Dict[str, Any]):
+        self.require_encryption = kwargs.get("require_encryption", False)
+        self.encryption_version = kwargs.get("encryption_version", "1.0")
+        self.key_encryption_key = kwargs.get("key_encryption_key")
+        self.key_resolver_function = kwargs.get("key_resolver_function")
+        if self.key_encryption_key and self.encryption_version == '1.0':
+            warnings.warn("This client has been configured to use encryption with version 1.0. " +
+                          "Version 1.0 is deprecated and no longer considered secure. It is highly " +
+                          "recommended that you switch to using version 2.0. The version can be " +
+                          "specified using the 'encryption_version' keyword.")
+
+
+class _EncryptionAlgorithm(object):
+    """
+    Specifies which client encryption algorithm is used.
+    """
+    AES_CBC_256 = 'AES_CBC_256'
+    AES_GCM_256 = 'AES_GCM_256'
+
+
+class _WrappedContentKey:
+    """
+    Represents the envelope key details stored on the service.
+    """
+
+    def __init__(self, algorithm: str, encrypted_key: bytes, key_id: str) -> None:
+        """
+        :param str algorithm:
+            The algorithm used for wrapping.
+        :param bytes encrypted_key:
+            The encrypted content-encryption-key.
+        :param str key_id:
+            The key-encryption-key identifier string.
+        """
+        _validate_not_none('algorithm', algorithm)
+        _validate_not_none('encrypted_key', encrypted_key)
+        _validate_not_none('key_id', key_id)
+
+        self.algorithm = algorithm
+        self.encrypted_key = encrypted_key
+        self.key_id = key_id
+
+
+class _EncryptedRegionInfo:
+    """
+    Represents the length of encryption elements.
+    This is only used for Encryption V2.
+    """
+
+    def __init__(self, data_length: int, nonce_length: int, tag_length: int) -> None:
+        """
+        :param int data_length:
+            The length of the encryption region data (not including nonce + tag).
+        :param int nonce_length:
+            The length of nonce used when encrypting.
+        :param int tag_length:
+            The length of the encryption tag.
+        """
+        _validate_not_none('data_length', data_length)
+        _validate_not_none('nonce_length', nonce_length)
+        _validate_not_none('tag_length', tag_length)
+
+        self.data_length = data_length
+        self.nonce_length = nonce_length
+        self.tag_length = tag_length
+
+
+class _EncryptionAgent:
+    """
+    Represents the encryption agent stored on the service.
+    It consists of the encryption protocol version and encryption algorithm used.
+    """
+
+    def __init__(self, encryption_algorithm: _EncryptionAlgorithm, protocol: str) -> None:
+        """
+        :param _EncryptionAlgorithm encryption_algorithm:
+            The algorithm used for encrypting the message contents.
+        :param str protocol:
+            The protocol version used for encryption.
+        """
+        _validate_not_none('encryption_algorithm', encryption_algorithm)
+        _validate_not_none('protocol', protocol)
+
+        self.encryption_algorithm = str(encryption_algorithm)
+        self.protocol = protocol
+
+
+class _EncryptionData:
+    """
+    Represents the encryption data that is stored on the service.
+    """
+
+    def __init__(
+        self, content_encryption_IV: Optional[bytes],
+        encrypted_region_info: Optional[_EncryptedRegionInfo],
+        encryption_agent: _EncryptionAgent,
+        wrapped_content_key: _WrappedContentKey,
+        key_wrapping_metadata: Dict[str, Any]
+    ) -> None:
+        """
+        :param Optional[bytes] content_encryption_IV:
+            The content encryption initialization vector.
+            Required for AES-CBC (V1).
+        :param Optional[_EncryptedRegionInfo] encrypted_region_info:
+            The info about the autenticated block sizes.
+            Required for AES-GCM (V2).
+        :param _EncryptionAgent encryption_agent:
+            The encryption agent.
+        :param _WrappedContentKey wrapped_content_key:
+            An object that stores the wrapping algorithm, the key identifier,
+            and the encrypted key bytes.
+        :param Dict[str, Any] key_wrapping_metadata:
+            A dict containing metadata related to the key wrapping.
+        """
+        _validate_not_none('encryption_agent', encryption_agent)
+        _validate_not_none('wrapped_content_key', wrapped_content_key)
+
+        # Validate we have the right matching optional parameter for the specified algorithm
+        if encryption_agent.encryption_algorithm == _EncryptionAlgorithm.AES_CBC_256:
+            _validate_not_none('content_encryption_IV', content_encryption_IV)
+        elif encryption_agent.encryption_algorithm == _EncryptionAlgorithm.AES_GCM_256:
+            _validate_not_none('encrypted_region_info', encrypted_region_info)
+        else:
+            raise ValueError("Invalid encryption algorithm.")
+
+        self.content_encryption_IV = content_encryption_IV
+        self.encrypted_region_info = encrypted_region_info
+        self.encryption_agent = encryption_agent
+        self.wrapped_content_key = wrapped_content_key
+        self.key_wrapping_metadata = key_wrapping_metadata
+
+
+class GCMBlobEncryptionStream:
+    """
+    A stream that performs AES-GCM encryption on the given data as
+    it's streamed. Data is read and encrypted in regions. The stream
+    will use the same encryption key and will generate a guaranteed unique
+    nonce for each encryption region.
+    """
+    def __init__(
+        self, content_encryption_key: bytes,
+        data_stream: IO[bytes],
+    ) -> None:
+        """
+        :param bytes content_encryption_key: The encryption key to use.
+        :param IO[bytes] data_stream: The data stream to read data from.
+        """
+        self.content_encryption_key = content_encryption_key
+        self.data_stream = data_stream
+
+        self.offset = 0
+        self.current = b''
+        self.nonce_counter = 0
+
+    def read(self, size: int = -1) -> bytes:
+        """
+        Read data from the stream. Specify -1 to read all available data.
+
+        :param int size: The amount of data to read. Defaults to -1 for all data.
+        :return: The bytes read.
+        :rtype: bytes
+        """
+        result = BytesIO()
+        remaining = sys.maxsize if size == -1 else size
+
+        while remaining > 0:
+            # Start by reading from current
+            if len(self.current) > 0:
+                read = min(remaining, len(self.current))
+                result.write(self.current[:read])
+
+                self.current = self.current[read:]
+                self.offset += read
+                remaining -= read
+
+            if remaining > 0:
+                # Read one region of data and encrypt it
+                data = self.data_stream.read(_GCM_REGION_DATA_LENGTH)
+                if len(data) == 0:
+                    # No more data to read
+                    break
+
+                self.current = encrypt_data_v2(data, self.nonce_counter, self.content_encryption_key)
+                # IMPORTANT: Must increment the nonce each time.
+                self.nonce_counter += 1
+
+        return result.getvalue()
+
+
+def encrypt_data_v2(data: bytes, nonce: int, key: bytes) -> bytes:
+    """
+    Encrypts the given data using the given nonce and key using AES-GCM.
+    The result includes the data in the form: nonce + ciphertext + tag.
+
+    :param bytes data: The raw data to encrypt.
+    :param int nonce: The nonce to use for encryption.
+    :param bytes key: The encryption key to use for encryption.
+    :return: The encrypted bytes in the form: nonce + ciphertext + tag.
+    :rtype: bytes
+    """
+    nonce_bytes = nonce.to_bytes(_GCM_NONCE_LENGTH, 'big')
+    aesgcm = AESGCM(key)
+
+    # Returns ciphertext + tag
+    ciphertext_with_tag = aesgcm.encrypt(nonce_bytes, data, None)
+    return nonce_bytes + ciphertext_with_tag
+
+
+def is_encryption_v2(encryption_data: Optional[_EncryptionData]) -> bool:
+    """
+    Determine whether the given encryption data signifies version 2.0 or 2.1.
+
+    :param Optional[_EncryptionData] encryption_data: The encryption data. Will return False if this is None.
+    :return: True, if the encryption data indicates encryption V2, false otherwise.
+    :rtype: bool
+    """
+    # If encryption_data is None, assume no encryption
+    return bool(encryption_data and (encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS))
+
+
+def modify_user_agent_for_encryption(
+        user_agent: str,
+        moniker: str,
+        encryption_version: str,
+        request_options: Dict[str, Any]
+    ) -> None:
+    """
+    Modifies the request options to contain a user agent string updated with encryption information.
+    Adds azstorage-clientsideencryption/<version> immediately proceeding the SDK descriptor.
+
+    :param str user_agent: The existing User Agent to modify.
+    :param str moniker: The specific SDK moniker. The modification will immediately proceed azsdk-python-{moniker}.
+    :param str encryption_version: The version of encryption being used.
+    :param Dict[str, Any] request_options: The reuqest options to add the user agent override to.
+    """
+    # If the user has specified user_agent_overwrite=True, don't make any modifications
+    if request_options.get('user_agent_overwrite'):
+        return
+
+    # If the feature flag is already present, don't add it again
+    feature_flag = f"azstorage-clientsideencryption/{encryption_version}"
+    if feature_flag in user_agent:
+        return
+
+    index = user_agent.find(f"azsdk-python-{moniker}")
+    user_agent = f"{user_agent[:index]}{feature_flag} {user_agent[index:]}"
+    # Since we are using user_agent_overwrite=True, we must prepend the user's user_agent if there is one
+    if request_options.get('user_agent'):
+        user_agent = f"{request_options.get('user_agent')} {user_agent}"
+
+    request_options['user_agent'] = user_agent
+    request_options['user_agent_overwrite'] = True
+
+
+def get_adjusted_upload_size(length: int, encryption_version: str) -> int:
+    """
+    Get the adjusted size of the blob upload which accounts for
+    extra encryption data (padding OR nonce + tag).
+
+    :param int length: The plaintext data length.
+    :param str encryption_version: The version of encryption being used.
+    :return: The new upload size to use.
+    :rtype: int
+    """
+    if encryption_version == _ENCRYPTION_PROTOCOL_V1:
+        return length + (16 - (length % 16))
+
+    if encryption_version == _ENCRYPTION_PROTOCOL_V2:
+        encryption_data_length = _GCM_NONCE_LENGTH + _GCM_TAG_LENGTH
+        regions = math.ceil(length / _GCM_REGION_DATA_LENGTH)
+        return length + (regions * encryption_data_length)
+
+    raise ValueError("Invalid encryption version specified.")
+
+
+def get_adjusted_download_range_and_offset(
+        start: int,
+        end: int,
+        length: Optional[int],
+        encryption_data: Optional[_EncryptionData]) -> Tuple[Tuple[int, int], Tuple[int, int]]:
+    """
+    Gets the new download range and offsets into the decrypted data for
+    the given user-specified range. The new download range will include all
+    the data needed to decrypt the user-provided range and will include only
+    full encryption regions.
+
+    The offsets returned will be the offsets needed to fetch the user-requested
+    data out of the full decrypted data. The end offset is different based on the
+    encryption version. For V1, the end offset is offset from the end whereas for
+    V2, the end offset is the ending index into the stream.
+    V1: decrypted_data[start_offset : len(decrypted_data) - end_offset]
+    V2: decrypted_data[start_offset : end_offset]
+
+    :param int start: The user-requested start index.
+    :param int end: The user-requested end index.
+    :param Optional[int] length: The user-requested length. Only used for V1.
+    :param Optional[_EncryptionData] encryption_data: The encryption data to determine version and sizes.
+    :return: (new start, new end), (start offset, end offset)
+    :rtype: Tuple[Tuple[int, int], Tuple[int, int]]
+    """
+    start_offset, end_offset = 0, 0
+    if encryption_data is None:
+        return (start, end), (start_offset, end_offset)
+
+    if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V1:
+        if start is not None:
+            # Align the start of the range along a 16 byte block
+            start_offset = start % 16
+            start -= start_offset
+
+            # Include an extra 16 bytes for the IV if necessary
+            # Because of the previous offsetting, start_range will always
+            # be a multiple of 16.
+            if start > 0:
+                start_offset += 16
+                start -= 16
+
+        if length is not None:
+            # Align the end of the range along a 16 byte block
+            end_offset = 15 - (end % 16)
+            end += end_offset
+
+    elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
+        start_offset, end_offset = 0, end
+
+        if encryption_data.encrypted_region_info is None:
+            raise ValueError("Missing required metadata for Encryption V2")
+
+        nonce_length = encryption_data.encrypted_region_info.nonce_length
+        data_length = encryption_data.encrypted_region_info.data_length
+        tag_length = encryption_data.encrypted_region_info.tag_length
+        region_length = nonce_length + data_length + tag_length
+        requested_length = end - start
+
+        if start is not None:
+            # Find which data region the start is in
+            region_num = start // data_length
+            # The start of the data region is different from the start of the encryption region
+            data_start = region_num * data_length
+            region_start = region_num * region_length
+            # Offset is based on data region
+            start_offset = start - data_start
+            # New start is the start of the encryption region
+            start = region_start
+
+        if end is not None:
+            # Find which data region the end is in
+            region_num = end // data_length
+            end_offset = start_offset + requested_length + 1
+            # New end is the end of the encryption region
+            end = (region_num * region_length) + region_length - 1
+
+    return (start, end), (start_offset, end_offset)
+
+
+def parse_encryption_data(metadata: Dict[str, Any]) -> Optional[_EncryptionData]:
+    """
+    Parses the encryption data out of the given blob metadata. If metadata does
+    not exist or there are parsing errors, this function will just return None.
+
+    :param Dict[str, Any] metadata: The blob metadata parsed from the response.
+    :return: The encryption data or None
+    :rtype: Optional[_EncryptionData]
+    """
+    try:
+        # Use case insensitive dict as key needs to be case-insensitive
+        case_insensitive_metadata = CaseInsensitiveDict(metadata)
+        return _dict_to_encryption_data(loads(case_insensitive_metadata['encryptiondata']))
+    except:  # pylint: disable=bare-except
+        return None
+
+
+def adjust_blob_size_for_encryption(size: int, encryption_data: Optional[_EncryptionData]) -> int:
+    """
+    Adjusts the given blob size for encryption by subtracting the size of
+    the encryption data (nonce + tag). This only has an affect for encryption V2.
+
+    :param int size: The original blob size.
+    :param Optional[_EncryptionData] encryption_data: The encryption data to determine version and sizes.
+    :return: The new blob size.
+    :rtype: int
+    """
+    if (encryption_data is not None and
+        encryption_data.encrypted_region_info is not None and
+        is_encryption_v2(encryption_data)):
+
+        nonce_length = encryption_data.encrypted_region_info.nonce_length
+        data_length = encryption_data.encrypted_region_info.data_length
+        tag_length = encryption_data.encrypted_region_info.tag_length
+        region_length = nonce_length + data_length + tag_length
+
+        num_regions = math.ceil(size / region_length)
+        metadata_size = num_regions * (nonce_length + tag_length)
+        return size - metadata_size
+
+    return size
+
+
+def _generate_encryption_data_dict(
+        kek: KeyEncryptionKey,
+        cek: bytes,
+        iv: Optional[bytes],
+        version: str
+    ) -> TypedOrderedDict[str, Any]:
+    """
+    Generates and returns the encryption metadata as a dict.
+
+    :param KeyEncryptionKey kek: The key encryption key. See calling functions for more information.
+    :param bytes cek: The content encryption key.
+    :param Optional[bytes] iv: The initialization vector. Only required for AES-CBC.
+    :param str version: The client encryption version used.
+    :return: A dict containing all the encryption metadata.
+    :rtype: Dict[str, Any]
+    """
+    # Encrypt the cek.
+    if version == _ENCRYPTION_PROTOCOL_V1:
+        wrapped_cek = kek.wrap_key(cek)
+    # For V2, we include the encryption version in the wrapped key.
+    elif version == _ENCRYPTION_PROTOCOL_V2:
+        # We must pad the version to 8 bytes for AES Keywrap algorithms
+        to_wrap = _ENCRYPTION_PROTOCOL_V2.encode().ljust(8, b'\0') + cek
+        wrapped_cek = kek.wrap_key(to_wrap)
+    else:
+        raise ValueError("Invalid encryption version specified.")
+
+    # Build the encryption_data dict.
+    # Use OrderedDict to comply with Java's ordering requirement.
+    wrapped_content_key = OrderedDict()
+    wrapped_content_key['KeyId'] = kek.get_kid()
+    wrapped_content_key['EncryptedKey'] = encode_base64(wrapped_cek)
+    wrapped_content_key['Algorithm'] = kek.get_key_wrap_algorithm()
+
+    encryption_agent = OrderedDict()
+    encryption_agent['Protocol'] = version
+
+    if version == _ENCRYPTION_PROTOCOL_V1:
+        encryption_agent['EncryptionAlgorithm'] = _EncryptionAlgorithm.AES_CBC_256
+
+    elif version == _ENCRYPTION_PROTOCOL_V2:
+        encryption_agent['EncryptionAlgorithm'] = _EncryptionAlgorithm.AES_GCM_256
+
+        encrypted_region_info = OrderedDict()
+        encrypted_region_info['DataLength'] = _GCM_REGION_DATA_LENGTH
+        encrypted_region_info['NonceLength'] = _GCM_NONCE_LENGTH
+
+    encryption_data_dict: TypedOrderedDict[str, Any] = OrderedDict()
+    encryption_data_dict['WrappedContentKey'] = wrapped_content_key
+    encryption_data_dict['EncryptionAgent'] = encryption_agent
+    if version == _ENCRYPTION_PROTOCOL_V1:
+        encryption_data_dict['ContentEncryptionIV'] = encode_base64(iv)
+    elif version == _ENCRYPTION_PROTOCOL_V2:
+        encryption_data_dict['EncryptedRegionInfo'] = encrypted_region_info
+    encryption_data_dict['KeyWrappingMetadata'] = OrderedDict({'EncryptionLibrary': 'Python ' + VERSION})
+
+    return encryption_data_dict
+
+
+def _dict_to_encryption_data(encryption_data_dict: Dict[str, Any]) -> _EncryptionData:
+    """
+    Converts the specified dictionary to an EncryptionData object for
+    eventual use in decryption.
+
+    :param dict encryption_data_dict:
+        The dictionary containing the encryption data.
+    :return: an _EncryptionData object built from the dictionary.
+    :rtype: _EncryptionData
+    """
+    try:
+        protocol = encryption_data_dict['EncryptionAgent']['Protocol']
+        if protocol not in _VALID_ENCRYPTION_PROTOCOLS:
+            raise ValueError("Unsupported encryption version.")
+    except KeyError as exc:
+        raise ValueError("Unsupported encryption version.") from exc
+    wrapped_content_key = encryption_data_dict['WrappedContentKey']
+    wrapped_content_key = _WrappedContentKey(wrapped_content_key['Algorithm'],
+                                             decode_base64_to_bytes(wrapped_content_key['EncryptedKey']),
+                                             wrapped_content_key['KeyId'])
+
+    encryption_agent = encryption_data_dict['EncryptionAgent']
+    encryption_agent = _EncryptionAgent(encryption_agent['EncryptionAlgorithm'],
+                                        encryption_agent['Protocol'])
+
+    if 'KeyWrappingMetadata' in encryption_data_dict:
+        key_wrapping_metadata = encryption_data_dict['KeyWrappingMetadata']
+    else:
+        key_wrapping_metadata = None
+
+    # AES-CBC only
+    encryption_iv = None
+    if 'ContentEncryptionIV' in encryption_data_dict:
+        encryption_iv = decode_base64_to_bytes(encryption_data_dict['ContentEncryptionIV'])
+
+    # AES-GCM only
+    region_info = None
+    if 'EncryptedRegionInfo' in encryption_data_dict:
+        encrypted_region_info = encryption_data_dict['EncryptedRegionInfo']
+        region_info = _EncryptedRegionInfo(encrypted_region_info['DataLength'],
+                                           encrypted_region_info['NonceLength'],
+                                           _GCM_TAG_LENGTH)
+
+    encryption_data = _EncryptionData(encryption_iv,
+                                      region_info,
+                                      encryption_agent,
+                                      wrapped_content_key,
+                                      key_wrapping_metadata)
+
+    return encryption_data
+
+
+def _generate_AES_CBC_cipher(cek: bytes, iv: bytes) -> Cipher:
+    """
+    Generates and returns an encryption cipher for AES CBC using the given cek and iv.
+
+    :param bytes[] cek: The content encryption key for the cipher.
+    :param bytes[] iv: The initialization vector for the cipher.
+    :return: A cipher for encrypting in AES256 CBC.
+    :rtype: ~cryptography.hazmat.primitives.ciphers.Cipher
+    """
+
+    backend = default_backend()
+    algorithm = AES(cek)
+    mode = CBC(iv)
+    return Cipher(algorithm, mode, backend)
+
+
+def _validate_and_unwrap_cek(
+    encryption_data: _EncryptionData,
+    key_encryption_key: Optional[KeyEncryptionKey] = None,
+    key_resolver: Optional[Callable[[str], KeyEncryptionKey]] = None
+) -> bytes:
+    """
+    Extracts and returns the content_encryption_key stored in the encryption_data object
+    and performs necessary validation on all parameters.
+    :param _EncryptionData encryption_data:
+        The encryption metadata of the retrieved value.
+    :param Optional[KeyEncryptionKey] key_encryption_key:
+        The user-provided key-encryption-key. Must implement the following methods:
+        wrap_key(key)
+            - Wraps the specified key using an algorithm of the user's choice.
+        get_key_wrap_algorithm()
+            - Returns the algorithm used to wrap the specified symmetric key.
+        get_kid()
+            - Returns a string key id for this key-encryption-key.
+    :param Optional[Callable[[str], KeyEncryptionKey]] key_resolver:
+        A function used that, given a key_id, will return a key_encryption_key. Please refer
+        to high-level service object instance variables for more details.
+    :return: The content_encryption_key stored in the encryption_data object.
+    :rtype: bytes
+    """
+
+    _validate_not_none('encrypted_key', encryption_data.wrapped_content_key.encrypted_key)
+
+    # Validate we have the right info for the specified version
+    if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V1:
+        _validate_not_none('content_encryption_IV', encryption_data.content_encryption_IV)
+    elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
+        _validate_not_none('encrypted_region_info', encryption_data.encrypted_region_info)
+    else:
+        raise ValueError('Specified encryption version is not supported.')
+
+    content_encryption_key: Optional[bytes] = None
+
+    # If the resolver exists, give priority to the key it finds.
+    if key_resolver is not None:
+        key_encryption_key = key_resolver(encryption_data.wrapped_content_key.key_id)
+
+    if key_encryption_key is None:
+        raise ValueError("Unable to decrypt. key_resolver and key_encryption_key cannot both be None.")
+    if not hasattr(key_encryption_key, 'get_kid') or not callable(key_encryption_key.get_kid):
+        raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'get_kid'))
+    if not hasattr(key_encryption_key, 'unwrap_key') or not callable(key_encryption_key.unwrap_key):
+        raise AttributeError(_ERROR_OBJECT_INVALID.format('key encryption key', 'unwrap_key'))
+    if encryption_data.wrapped_content_key.key_id != key_encryption_key.get_kid():
+        raise ValueError('Provided or resolved key-encryption-key does not match the id of key used to encrypt.')
+    # Will throw an exception if the specified algorithm is not supported.
+    content_encryption_key = key_encryption_key.unwrap_key(
+        encryption_data.wrapped_content_key.encrypted_key,
+        encryption_data.wrapped_content_key.algorithm)
+
+    # For V2, the version is included with the cek. We need to validate it
+    # and remove it from the actual cek.
+    if encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
+        version_2_bytes = encryption_data.encryption_agent.protocol.encode().ljust(8, b'\0')
+        cek_version_bytes = content_encryption_key[:len(version_2_bytes)]
+        if cek_version_bytes != version_2_bytes:
+            raise ValueError('The encryption metadata is not valid and may have been modified.')
+
+        # Remove version from the start of the cek.
+        content_encryption_key = content_encryption_key[len(version_2_bytes):]
+
+    _validate_not_none('content_encryption_key', content_encryption_key)
+
+    return content_encryption_key
+
+
+def _decrypt_message(
+    message: bytes,
+    encryption_data: _EncryptionData,
+    key_encryption_key: Optional[KeyEncryptionKey] = None,
+    resolver: Optional[Callable[[str], KeyEncryptionKey]] = None
+) -> bytes:
+    """
+    Decrypts the given ciphertext using AES256 in CBC mode with 128 bit padding.
+    Unwraps the content-encryption-key using the user-provided or resolved key-encryption-key (kek).
+    Returns the original plaintext.
+
+    :param bytes message:
+        The ciphertext to be decrypted.
+    :param _EncryptionData encryption_data:
+        The metadata associated with this ciphertext.
+    :param Optional[KeyEncryptionKey] key_encryption_key:
+        The user-provided key-encryption-key. Must implement the following methods:
+        wrap_key(key)
+            - Wraps the specified key using an algorithm of the user's choice.
+        get_key_wrap_algorithm()
+            - Returns the algorithm used to wrap the specified symmetric key.
+        get_kid()
+            - Returns a string key id for this key-encryption-key.
+    :param Optional[Callable[[str], KeyEncryptionKey]] resolver:
+        The user-provided key resolver. Uses the kid string to return a key-encryption-key
+        implementing the interface defined above.
+    :return: The decrypted plaintext.
+    :rtype: bytes
+    """
+    _validate_not_none('message', message)
+    content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, resolver)
+
+    if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V1:
+        if not encryption_data.content_encryption_IV:
+            raise ValueError("Missing required metadata for decryption.")
+
+        cipher = _generate_AES_CBC_cipher(content_encryption_key, encryption_data.content_encryption_IV)
+
+        # decrypt data
+        decryptor = cipher.decryptor()
+        decrypted_data = (decryptor.update(message) + decryptor.finalize())
+
+        # unpad data
+        unpadder = PKCS7(128).unpadder()
+        decrypted_data = (unpadder.update(decrypted_data) + unpadder.finalize())
+
+    elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
+        block_info = encryption_data.encrypted_region_info
+        if not block_info or not block_info.nonce_length:
+            raise ValueError("Missing required metadata for decryption.")
+
+        if encryption_data.encrypted_region_info is None:
+            raise ValueError("Missing required metadata for Encryption V2")
+
+        nonce_length = int(encryption_data.encrypted_region_info.nonce_length)
+
+        # First bytes are the nonce
+        nonce = message[:nonce_length]
+        ciphertext_with_tag = message[nonce_length:]
+
+        aesgcm = AESGCM(content_encryption_key)
+        decrypted_data = aesgcm.decrypt(nonce, ciphertext_with_tag, None)
+
+    else:
+        raise ValueError('Specified encryption version is not supported.')
+
+    return decrypted_data
+
+
+def encrypt_blob(blob: bytes, key_encryption_key: KeyEncryptionKey, version: str) -> Tuple[str, bytes]:
+    """
+    Encrypts the given blob using the given encryption protocol version.
+    Wraps the generated content-encryption-key using the user-provided key-encryption-key (kek).
+    Returns a json-formatted string containing the encryption metadata. This method should
+    only be used when a blob is small enough for single shot upload. Encrypting larger blobs
+    is done as a part of the upload_data_chunks method.
+
+    :param bytes blob:
+        The blob to be encrypted.
+    :param KeyEncryptionKey key_encryption_key:
+        The user-provided key-encryption-key. Must implement the following methods:
+        wrap_key(key)
+            - Wraps the specified key using an algorithm of the user's choice.
+        get_key_wrap_algorithm()
+            - Returns the algorithm used to wrap the specified symmetric key.
+        get_kid()
+            - Returns a string key id for this key-encryption-key.
+    :param str version: The client encryption version to use.
+    :return: A tuple of json-formatted string containing the encryption metadata and the encrypted blob data.
+    :rtype: (str, bytes)
+    """
+
+    _validate_not_none('blob', blob)
+    _validate_not_none('key_encryption_key', key_encryption_key)
+    _validate_key_encryption_key_wrap(key_encryption_key)
+
+    if version == _ENCRYPTION_PROTOCOL_V1:
+        # AES256 uses 256 bit (32 byte) keys and always with 16 byte blocks
+        content_encryption_key = os.urandom(32)
+        initialization_vector = os.urandom(16)
+
+        cipher = _generate_AES_CBC_cipher(content_encryption_key, initialization_vector)
+
+        # PKCS7 with 16 byte blocks ensures compatibility with AES.
+        padder = PKCS7(128).padder()
+        padded_data = padder.update(blob) + padder.finalize()
+
+        # Encrypt the data.
+        encryptor = cipher.encryptor()
+        encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
+
+    elif version == _ENCRYPTION_PROTOCOL_V2:
+        # AES256 GCM uses 256 bit (32 byte) keys and a 12 byte nonce.
+        content_encryption_key = os.urandom(32)
+        initialization_vector = None
+
+        data = BytesIO(blob)
+        encryption_stream = GCMBlobEncryptionStream(content_encryption_key, data)
+
+        encrypted_data = encryption_stream.read()
+
+    else:
+        raise ValueError("Invalid encryption version specified.")
+
+    encryption_data = _generate_encryption_data_dict(key_encryption_key, content_encryption_key,
+                                                     initialization_vector, version)
+    encryption_data['EncryptionMode'] = 'FullBlob'
+
+    return dumps(encryption_data), encrypted_data
+
+
+def generate_blob_encryption_data(
+    key_encryption_key: Optional[KeyEncryptionKey],
+    version: str
+) -> Tuple[Optional[bytes], Optional[bytes], Optional[str]]:
+    """
+    Generates the encryption_metadata for the blob.
+
+    :param Optional[KeyEncryptionKey] key_encryption_key:
+        The key-encryption-key used to wrap the cek associate with this blob.
+    :param str version: The client encryption version to use.
+    :return: A tuple containing the cek and iv for this blob as well as the
+        serialized encryption metadata for the blob.
+    :rtype: (Optional[bytes], Optional[bytes], Optional[str])
+    """
+
+    encryption_data = None
+    content_encryption_key = None
+    initialization_vector = None
+    if key_encryption_key:
+        _validate_key_encryption_key_wrap(key_encryption_key)
+        content_encryption_key = os.urandom(32)
+        # Initialization vector only needed for V1
+        if version == _ENCRYPTION_PROTOCOL_V1:
+            initialization_vector = os.urandom(16)
+        encryption_data_dict = _generate_encryption_data_dict(key_encryption_key,
+                                                         content_encryption_key,
+                                                         initialization_vector,
+                                                         version)
+        encryption_data_dict['EncryptionMode'] = 'FullBlob'
+        encryption_data = dumps(encryption_data_dict)
+
+    return content_encryption_key, initialization_vector, encryption_data
+
+
+def decrypt_blob(  # pylint: disable=too-many-locals,too-many-statements
+        require_encryption: bool,
+        key_encryption_key: Optional[KeyEncryptionKey],
+        key_resolver: Optional[Callable[[str], KeyEncryptionKey]],
+        content: bytes,
+        start_offset: int,
+        end_offset: int,
+        response_headers: Dict[str, Any]
+) -> bytes:
+    """
+    Decrypts the given blob contents and returns only the requested range.
+
+    :param bool require_encryption:
+        Whether the calling blob service requires objects to be decrypted.
+    :param Optional[KeyEncryptionKey] key_encryption_key:
+        The user-provided key-encryption-key. Must implement the following methods:
+        wrap_key(key)
+            - Wraps the specified key using an algorithm of the user's choice.
+        get_key_wrap_algorithm()
+            - Returns the algorithm used to wrap the specified symmetric key.
+        get_kid()
+            - Returns a string key id for this key-encryption-key.
+    :param key_resolver:
+        The user-provided key resolver. Uses the kid string to return a key-encryption-key
+        implementing the interface defined above.
+    :type key_resolver: Optional[Callable[[str], KeyEncryptionKey]]
+    :param bytes content:
+        The encrypted blob content.
+    :param int start_offset:
+        The adjusted offset from the beginning of the *decrypted* content for the caller's data.
+    :param int end_offset:
+        The adjusted offset from the end of the *decrypted* content for the caller's data.
+    :param Dict[str, Any] response_headers:
+        A dictionary of response headers from the download request. Expected to include the
+        'x-ms-meta-encryptiondata' header if the blob was encrypted.
+    :return: The decrypted blob content.
+    :rtype: bytes
+    """
+    try:
+        encryption_data = _dict_to_encryption_data(loads(response_headers['x-ms-meta-encryptiondata']))
+    except Exception as exc:  # pylint: disable=broad-except
+        if require_encryption:
+            raise ValueError(
+                'Encryption required, but received data does not contain appropriate metadata.' + \
+                'Data was either not encrypted or metadata has been lost.') from exc
+
+        return content
+
+    algorithm = encryption_data.encryption_agent.encryption_algorithm
+    if algorithm not in(_EncryptionAlgorithm.AES_CBC_256, _EncryptionAlgorithm.AES_GCM_256):
+        raise ValueError('Specified encryption algorithm is not supported.')
+
+    version = encryption_data.encryption_agent.protocol
+    if version not in _VALID_ENCRYPTION_PROTOCOLS:
+        raise ValueError('Specified encryption version is not supported.')
+
+    content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
+
+    if version == _ENCRYPTION_PROTOCOL_V1:
+        blob_type = response_headers['x-ms-blob-type']
+
+        iv: Optional[bytes] = None
+        unpad = False
+        if 'content-range' in response_headers:
+            content_range = response_headers['content-range']
+            # Format: 'bytes x-y/size'
+
+            # Ignore the word 'bytes'
+            content_range = content_range.split(' ')
+
+            content_range = content_range[1].split('-')
+            content_range = content_range[1].split('/')
+            end_range = int(content_range[0])
+            blob_size = int(content_range[1])
+
+            if start_offset >= 16:
+                iv = content[:16]
+                content = content[16:]
+                start_offset -= 16
+            else:
+                iv = encryption_data.content_encryption_IV
+
+            if end_range == blob_size - 1:
+                unpad = True
+        else:
+            unpad = True
+            iv = encryption_data.content_encryption_IV
+
+        if blob_type == 'PageBlob':
+            unpad = False
+
+        if iv is None:
+            raise ValueError("Missing required metadata for Encryption V1")
+
+        cipher = _generate_AES_CBC_cipher(content_encryption_key, iv)
+        decryptor = cipher.decryptor()
+
+        content = decryptor.update(content) + decryptor.finalize()
+        if unpad:
+            unpadder = PKCS7(128).unpadder()
+            content = unpadder.update(content) + unpadder.finalize()
+
+        return content[start_offset: len(content) - end_offset]
+
+    if version in _ENCRYPTION_V2_PROTOCOLS:
+        # We assume the content contains only full encryption regions
+        total_size = len(content)
+        offset = 0
+
+        if encryption_data.encrypted_region_info is None:
+            raise ValueError("Missing required metadata for Encryption V2")
+
+        nonce_length = encryption_data.encrypted_region_info.nonce_length
+        data_length = encryption_data.encrypted_region_info.data_length
+        tag_length = encryption_data.encrypted_region_info.tag_length
+        region_length = nonce_length + data_length + tag_length
+
+        decrypted_content = bytearray()
+        while offset < total_size:
+            # Process one encryption region at a time
+            process_size = min(region_length, total_size)
+            encrypted_region = content[offset:offset + process_size]
+
+            # First bytes are the nonce
+            nonce = encrypted_region[:nonce_length]
+            ciphertext_with_tag = encrypted_region[nonce_length:]
+
+            aesgcm = AESGCM(content_encryption_key)
+            decrypted_data = aesgcm.decrypt(nonce, ciphertext_with_tag, None)
+            decrypted_content.extend(decrypted_data)
+
+            offset += process_size
+
+        # Read the caller requested data from the decrypted content
+        return decrypted_content[start_offset:end_offset]
+
+    raise ValueError('Specified encryption version is not supported.')
+
+
+def get_blob_encryptor_and_padder(
+    cek: Optional[bytes],
+    iv: Optional[bytes],
+    should_pad: bool
+) -> Tuple[Optional["AEADEncryptionContext"], Optional["PaddingContext"]]:
+    encryptor = None
+    padder = None
+
+    if cek is not None and iv is not None:
+        cipher = _generate_AES_CBC_cipher(cek, iv)
+        encryptor = cipher.encryptor()
+        padder = PKCS7(128).padder() if should_pad else None
+
+    return encryptor, padder
+
+
+def encrypt_queue_message(message: str, key_encryption_key: KeyEncryptionKey, version: str) -> str:
+    """
+    Encrypts the given plain text message using the given protocol version.
+    Wraps the generated content-encryption-key using the user-provided key-encryption-key (kek).
+    Returns a json-formatted string containing the encrypted message and the encryption metadata.
+
+    :param str message:
+        The plain text message to be encrypted.
+    :param KeyEncryptionKey key_encryption_key:
+        The user-provided key-encryption-key. Must implement the following methods:
+        wrap_key(key)
+            - Wraps the specified key using an algorithm of the user's choice.
+        get_key_wrap_algorithm()
+            - Returns the algorithm used to wrap the specified symmetric key.
+        get_kid()
+            - Returns a string key id for this key-encryption-key.
+    :param str version: The client encryption version to use.
+    :return: A json-formatted string containing the encrypted message and the encryption metadata.
+    :rtype: str
+    """
+
+    _validate_not_none('message', message)
+    _validate_not_none('key_encryption_key', key_encryption_key)
+    _validate_key_encryption_key_wrap(key_encryption_key)
+
+    # Queue encoding functions all return unicode strings, and encryption should
+    # operate on binary strings.
+    message_as_bytes: bytes = message.encode('utf-8')
+
+    if version == _ENCRYPTION_PROTOCOL_V1:
+        # AES256 CBC uses 256 bit (32 byte) keys and always with 16 byte blocks
+        content_encryption_key = os.urandom(32)
+        initialization_vector = os.urandom(16)
+
+        cipher = _generate_AES_CBC_cipher(content_encryption_key, initialization_vector)
+
+        # PKCS7 with 16 byte blocks ensures compatibility with AES.
+        padder = PKCS7(128).padder()
+        padded_data = padder.update(message_as_bytes) + padder.finalize()
+
+        # Encrypt the data.
+        encryptor = cipher.encryptor()
+        encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
+
+    elif version == _ENCRYPTION_PROTOCOL_V2:
+        # AES256 GCM uses 256 bit (32 byte) keys and a 12 byte nonce.
+        content_encryption_key = os.urandom(32)
+        initialization_vector = None
+
+        # The nonce MUST be different for each key
+        nonce = os.urandom(12)
+        aesgcm = AESGCM(content_encryption_key)
+
+        # Returns ciphertext + tag
+        cipertext_with_tag = aesgcm.encrypt(nonce, message_as_bytes, None)
+        encrypted_data = nonce + cipertext_with_tag
+
+    else:
+        raise ValueError("Invalid encryption version specified.")
+
+    # Build the dictionary structure.
+    queue_message = {'EncryptedMessageContents': encode_base64(encrypted_data),
+                     'EncryptionData': _generate_encryption_data_dict(key_encryption_key,
+                                                                      content_encryption_key,
+                                                                      initialization_vector,
+                                                                      version)}
+
+    return dumps(queue_message)
+
+
+def decrypt_queue_message(
+    message: str,
+    response: "PipelineResponse",
+    require_encryption: bool,
+    key_encryption_key: Optional[KeyEncryptionKey],
+    resolver: Optional[Callable[[str], KeyEncryptionKey]]
+) -> str:
+    """
+    Returns the decrypted message contents from an EncryptedQueueMessage.
+    If no encryption metadata is present, will return the unaltered message.
+    :param str message:
+        The JSON formatted QueueEncryptedMessage contents with all associated metadata.
+    :param Any response:
+        The pipeline response used to generate an error with.
+    :param bool require_encryption:
+        If set, will enforce that the retrieved messages are encrypted and decrypt them.
+    :param Optional[KeyEncryptionKey] key_encryption_key:
+        The user-provided key-encryption-key. Must implement the following methods:
+        wrap_key(key)
+            - Wraps the specified key using an algorithm of the user's choice.
+        get_key_wrap_algorithm()
+            - Returns the algorithm used to wrap the specified symmetric key.
+        get_kid()
+            - Returns a string key id for this key-encryption-key.
+    :param Optional[Callable[[str], KeyEncryptionKey]] resolver:
+        The user-provided key resolver. Uses the kid string to return a key-encryption-key
+        implementing the interface defined above.
+    :return: The plain text message from the queue message.
+    :rtype: str
+    """
+    response = response.http_response
+
+    try:
+        deserialized_message: Dict[str, Any] = loads(message)
+
+        encryption_data = _dict_to_encryption_data(deserialized_message['EncryptionData'])
+        decoded_data = decode_base64_to_bytes(deserialized_message['EncryptedMessageContents'])
+    except (KeyError, ValueError) as exc:
+        # Message was not json formatted and so was not encrypted
+        # or the user provided a json formatted message
+        # or the metadata was malformed.
+        if require_encryption:
+            raise ValueError(
+                'Encryption required, but received message does not contain appropriate metatadata. ' + \
+                'Message was either not encrypted or metadata was incorrect.') from exc
+
+        return message
+    try:
+        return _decrypt_message(decoded_data, encryption_data, key_encryption_key, resolver).decode('utf-8')
+    except Exception as error:
+        raise HttpResponseError(
+            message="Decryption failed.",
+            response=response, #type: ignore [arg-type]
+            error=error) from error