about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/storage/blob/_serialize.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/storage/blob/_serialize.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/storage/blob/_serialize.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/storage/blob/_serialize.py214
1 files changed, 214 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/storage/blob/_serialize.py b/.venv/lib/python3.12/site-packages/azure/storage/blob/_serialize.py
new file mode 100644
index 00000000..316e321c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/storage/blob/_serialize.py
@@ -0,0 +1,214 @@
+# -------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for
+# license information.
+# --------------------------------------------------------------------------
+from typing import Any, cast, Dict, Optional, Tuple, Union, TYPE_CHECKING
+
+try:
+    from urllib.parse import quote
+except ImportError:
+    from urllib2 import quote  # type: ignore
+
+from azure.core import MatchConditions
+
+from ._generated.models import (
+    ArrowConfiguration,
+    BlobTag,
+    BlobTags,
+    ContainerCpkScopeInfo,
+    CpkScopeInfo,
+    DelimitedTextConfiguration,
+    JsonTextConfiguration,
+    LeaseAccessConditions,
+    ModifiedAccessConditions,
+    QueryFormat,
+    QueryFormatType,
+    QuerySerialization,
+    SourceModifiedAccessConditions
+)
+from ._models import ContainerEncryptionScope, DelimitedJsonDialect
+
+if TYPE_CHECKING:
+    from ._lease import BlobLeaseClient
+
+
+_SUPPORTED_API_VERSIONS = [
+    '2019-02-02',
+    '2019-07-07',
+    '2019-10-10',
+    '2019-12-12',
+    '2020-02-10',
+    '2020-04-08',
+    '2020-06-12',
+    '2020-08-04',
+    '2020-10-02',
+    '2020-12-06',
+    '2021-02-12',
+    '2021-04-10',
+    '2021-06-08',
+    '2021-08-06',
+    '2021-12-02',
+    '2022-11-02',
+    '2023-01-03',
+    '2023-05-03',
+    '2023-08-03',
+    '2023-11-03',
+    '2024-05-04',
+    '2024-08-04',
+    '2024-11-04',
+    '2025-01-05',
+    '2025-05-05',
+]
+
+
+def _get_match_headers(
+    kwargs: Dict[str, Any],
+    match_param: str,
+    etag_param: str
+) -> Tuple[Optional[str], Optional[Any]]:
+    if_match = None
+    if_none_match = None
+    match_condition = kwargs.pop(match_param, None)
+    if match_condition == MatchConditions.IfNotModified:
+        if_match = kwargs.pop(etag_param, None)
+        if not if_match:
+            raise ValueError(f"'{match_param}' specified without '{etag_param}'.")
+    elif match_condition == MatchConditions.IfPresent:
+        if_match = '*'
+    elif match_condition == MatchConditions.IfModified:
+        if_none_match = kwargs.pop(etag_param, None)
+        if not if_none_match:
+            raise ValueError(f"'{match_param}' specified without '{etag_param}'.")
+    elif match_condition == MatchConditions.IfMissing:
+        if_none_match = '*'
+    elif match_condition is None:
+        if kwargs.get(etag_param):
+            raise ValueError(f"'{etag_param}' specified without '{match_param}'.")
+    else:
+        raise TypeError(f"Invalid match condition: {match_condition}")
+    return if_match, if_none_match
+
+
+def get_access_conditions(lease: Optional[Union["BlobLeaseClient", str]]) -> Optional[LeaseAccessConditions]:
+    try:
+        lease_id = lease.id # type: ignore
+    except AttributeError:
+        lease_id = lease # type: ignore
+    return LeaseAccessConditions(lease_id=lease_id) if lease_id else None
+
+
+def get_modify_conditions(kwargs: Dict[str, Any]) -> ModifiedAccessConditions:
+    if_match, if_none_match = _get_match_headers(kwargs, 'match_condition', 'etag')
+    return ModifiedAccessConditions(
+        if_modified_since=kwargs.pop('if_modified_since', None),
+        if_unmodified_since=kwargs.pop('if_unmodified_since', None),
+        if_match=if_match or kwargs.pop('if_match', None),
+        if_none_match=if_none_match or kwargs.pop('if_none_match', None),
+        if_tags=kwargs.pop('if_tags_match_condition', None)
+    )
+
+
+def get_source_conditions(kwargs: Dict[str, Any]) -> SourceModifiedAccessConditions:
+    if_match, if_none_match = _get_match_headers(kwargs, 'source_match_condition', 'source_etag')
+    return SourceModifiedAccessConditions(
+        source_if_modified_since=kwargs.pop('source_if_modified_since', None),
+        source_if_unmodified_since=kwargs.pop('source_if_unmodified_since', None),
+        source_if_match=if_match or kwargs.pop('source_if_match', None),
+        source_if_none_match=if_none_match or kwargs.pop('source_if_none_match', None),
+        source_if_tags=kwargs.pop('source_if_tags_match_condition', None)
+    )
+
+
+def get_cpk_scope_info(kwargs: Dict[str, Any]) -> Optional[CpkScopeInfo]:
+    if 'encryption_scope' in kwargs:
+        return CpkScopeInfo(encryption_scope=kwargs.pop('encryption_scope'))
+    return None
+
+
+def get_container_cpk_scope_info(kwargs: Dict[str, Any]) -> Optional[ContainerCpkScopeInfo]:
+    encryption_scope = kwargs.pop('container_encryption_scope', None)
+    if encryption_scope:
+        if isinstance(encryption_scope, ContainerEncryptionScope):
+            return ContainerCpkScopeInfo(
+                default_encryption_scope=encryption_scope.default_encryption_scope,
+                prevent_encryption_scope_override=encryption_scope.prevent_encryption_scope_override
+            )
+        if isinstance(encryption_scope, dict):
+            return ContainerCpkScopeInfo(
+                default_encryption_scope=encryption_scope['default_encryption_scope'],
+                prevent_encryption_scope_override=encryption_scope.get('prevent_encryption_scope_override')
+            )
+        raise TypeError("Container encryption scope must be dict or type ContainerEncryptionScope.")
+    return None
+
+
+def get_api_version(kwargs: Dict[str, Any]) -> str:
+    api_version = kwargs.get('api_version', None)
+    if api_version and api_version not in _SUPPORTED_API_VERSIONS:
+        versions = '\n'.join(_SUPPORTED_API_VERSIONS)
+        raise ValueError(f"Unsupported API version '{api_version}'. Please select from:\n{versions}")
+    return api_version or _SUPPORTED_API_VERSIONS[-1]
+
+def get_version_id(self_vid: Optional[str], kwargs: Dict[str, Any]) -> Optional[str]:
+    if 'version_id' in kwargs:
+        return cast(str, kwargs.pop('version_id'))
+    return self_vid
+
+def serialize_blob_tags_header(tags: Optional[Dict[str, str]] = None) -> Optional[str]:
+    if tags is None:
+        return None
+
+    components = []
+    if tags:
+        for key, value in tags.items():
+            components.append(quote(key, safe='.-'))
+            components.append('=')
+            components.append(quote(value, safe='.-'))
+            components.append('&')
+
+    if components:
+        del components[-1]
+
+    return ''.join(components)
+
+
+def serialize_blob_tags(tags: Optional[Dict[str, str]] = None) -> BlobTags:
+    tag_list = []
+    if tags:
+        tag_list = [BlobTag(key=k, value=v) for k, v in tags.items()]
+    return BlobTags(blob_tag_set=tag_list)
+
+
+def serialize_query_format(formater: Union[str, DelimitedJsonDialect]) -> Optional[QuerySerialization]:
+    if formater == "ParquetDialect":
+        qq_format = QueryFormat(type=QueryFormatType.PARQUET, parquet_text_configuration=' ')  #type: ignore [arg-type]
+    elif isinstance(formater, DelimitedJsonDialect):
+        json_serialization_settings = JsonTextConfiguration(record_separator=formater.delimiter)
+        qq_format = QueryFormat(type=QueryFormatType.JSON, json_text_configuration=json_serialization_settings)
+    elif hasattr(formater, 'quotechar'):  # This supports a csv.Dialect as well
+        try:
+            headers = formater.has_header  # type: ignore
+        except AttributeError:
+            headers = False
+        if isinstance(formater, str):
+            raise ValueError("Unknown string value provided. Accepted values: ParquetDialect")
+        csv_serialization_settings = DelimitedTextConfiguration(
+            column_separator=formater.delimiter,
+            field_quote=formater.quotechar,
+            record_separator=formater.lineterminator,
+            escape_char=formater.escapechar,
+            headers_present=headers
+        )
+        qq_format = QueryFormat(
+            type=QueryFormatType.DELIMITED,
+            delimited_text_configuration=csv_serialization_settings
+        )
+    elif isinstance(formater, list):
+        arrow_serialization_settings = ArrowConfiguration(schema=formater)
+        qq_format = QueryFormat(type=QueryFormatType.arrow, arrow_configuration=arrow_serialization_settings)
+    elif not formater:
+        return None
+    else:
+        raise TypeError("Format must be DelimitedTextDialect or DelimitedJsonDialect or ParquetDialect.")
+    return QuerySerialization(format=qq_format)