aboutsummaryrefslogtreecommitdiff
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
# pylint: disable=too-many-lines, docstring-keyword-should-match-keyword-only

import re
from datetime import datetime
from typing import (
    Any, Dict, Optional, Tuple, Union,
    TYPE_CHECKING
)
from urllib.parse import urlparse, quote

from azure.core.exceptions import AzureError, HttpResponseError
from azure.core.tracing.decorator import distributed_trace
from azure.storage.blob import BlobClient
from ._data_lake_lease import DataLakeLeaseClient
from ._deserialize import process_storage_error
from ._generated import AzureDataLakeStorageRESTAPI
from ._models import LocationMode, DirectoryProperties, AccessControlChangeResult, AccessControlChanges, \
    AccessControlChangeCounters, AccessControlChangeFailure
from ._serialize import (
    add_metadata_headers,
    compare_api_versions,
    convert_datetime_to_rfc1123,
    convert_dfs_url_to_blob_url,
    get_access_conditions,
    get_api_version,
    get_cpk_info,
    get_lease_id,
    get_mod_conditions,
    get_path_http_headers,
    get_source_mod_conditions,
)
from ._shared.base_client import StorageAccountHostsMixin, parse_query
from ._shared.response_handlers import return_response_headers, return_headers_and_deserialized

if TYPE_CHECKING:
    from azure.core.credentials import AzureNamedKeyCredential, AzureSasCredential, TokenCredential
    from ._models import ContentSettings, FileProperties


class PathClient(StorageAccountHostsMixin):
    """A base client for interacting with a DataLake file/directory, even if the file/directory may not
    yet exist.

    :param str account_url:
        The URI to the storage account.
    :param str file_system_name:
        The file system for the directory or files.
    :param str file_path:
        The whole file path, so that to interact with a specific file.
        eg. "{directory}/{subdirectory}/{file}"
    :param credential:
        The credentials with which to authenticate. This is optional if the
        account URL already has a SAS token. The value can be a SAS token string,
        an instance of a AzureSasCredential or AzureNamedKeyCredential from azure.core.credentials,
        an account shared access key, or an instance of a TokenCredentials class from azure.identity.
        If the resource URI already contains a SAS token, this will be ignored in favor of an explicit credential
        - except in the case of AzureSasCredential, where the conflicting SAS tokens will raise a ValueError.
        If using an instance of AzureNamedKeyCredential, "name" should be the storage account name, and "key"
        should be the storage account key.
    :type credential:
        ~azure.core.credentials.AzureNamedKeyCredential or
        ~azure.core.credentials.AzureSasCredential or
        ~azure.core.credentials.TokenCredential or
        str or dict[str, str] or None
    :keyword str api_version:
        The Storage API version to use for requests. Default value is the most recent service version that is
        compatible with the current SDK. Setting to an older version may result in reduced feature compatibility.
    :keyword str audience: The audience to use when requesting tokens for Azure Active Directory
        authentication. Only has an effect when credential is of type TokenCredential. The value could be
        https://storage.azure.com/ (default) or https://<account>.blob.core.windows.net.
    """
    def __init__(
            self, account_url: str,
            file_system_name: str,
            path_name: str,
            credential: Optional[Union[str, Dict[str, str], "AzureNamedKeyCredential", "AzureSasCredential", "TokenCredential"]] = None,  # pylint: disable=line-too-long
            **kwargs: Any
        ) -> None:
        try:
            if not account_url.lower().startswith('http'):
                account_url = "https://" + account_url
        except AttributeError as exc:
            raise ValueError("Account URL must be a string.") from exc
        parsed_url = urlparse(account_url.rstrip('/'))

        # remove the preceding/trailing delimiter from the path components
        file_system_name = file_system_name.strip('/')

        # the name of root directory is /
        if path_name != '/':
            path_name = path_name.strip('/')

        if not (file_system_name and path_name):
            raise ValueError("Please specify a file system name and file path.")
        if not parsed_url.netloc:
            raise ValueError(f"Invalid URL: {account_url}")

        blob_account_url = convert_dfs_url_to_blob_url(account_url)
        self._blob_account_url = blob_account_url

        datalake_hosts = kwargs.pop('_hosts', None)
        blob_hosts = None
        if datalake_hosts:
            blob_primary_account_url = convert_dfs_url_to_blob_url(datalake_hosts[LocationMode.PRIMARY])
            blob_hosts = {LocationMode.PRIMARY: blob_primary_account_url, LocationMode.SECONDARY: ""}
        self._blob_client = BlobClient(blob_account_url, file_system_name, path_name,
                                       credential=credential, _hosts=blob_hosts, **kwargs)

        _, sas_token = parse_query(parsed_url.query)
        self.file_system_name = file_system_name
        self.path_name = path_name

        self._query_str, self._raw_credential = self._format_query_string(sas_token, credential)

        super(PathClient, self).__init__(parsed_url, service='dfs', credential=self._raw_credential,
                                         _hosts=datalake_hosts, **kwargs)
        # ADLS doesn't support secondary endpoint, make sure it's empty
        self._hosts[LocationMode.SECONDARY] = ""
        self._api_version = get_api_version(kwargs)
        self._client = self._build_generated_client(self.url)
        self._datalake_client_for_blob_operation = self._build_generated_client(self._blob_client.url)

    def _build_generated_client(self, url: str) -> AzureDataLakeStorageRESTAPI:
        client = AzureDataLakeStorageRESTAPI(
            url,
            base_url=url,
            file_system=self.file_system_name,
            path=self.path_name,
            pipeline=self._pipeline
        )
        client._config.version = self._api_version  # pylint: disable=protected-access
        return client

    def __exit__(self, *args):
        self._blob_client.close()
        self._datalake_client_for_blob_operation.close()
        super(PathClient, self).__exit__(*args)

    def close(self):
        # type: () -> None
        """ This method is to close the sockets opened by the client.
        It need not be used when using with a context manager.
        """
        self.__exit__()

    def _format_url(self, hostname):
        file_system_name = self.file_system_name
        if isinstance(file_system_name, str):
            file_system_name = file_system_name.encode('UTF-8')
        return (f"{self.scheme}://{hostname}/{quote(file_system_name)}/"
                f"{quote(self.path_name, safe='~')}{self._query_str}")

    def _create_path_options(self, resource_type,
                             content_settings=None,  # type: Optional[ContentSettings]
                             metadata=None,  # type: Optional[Dict[str, str]]
                             **kwargs):
        # type: (...) -> Dict[str, Any]
        access_conditions = get_access_conditions(kwargs.pop('lease', None))
        mod_conditions = get_mod_conditions(kwargs)

        path_http_headers = None
        if content_settings:
            path_http_headers = get_path_http_headers(content_settings)

        cpk_info = get_cpk_info(self.scheme, kwargs)

        expires_on = kwargs.pop('expires_on', None)
        if expires_on:
            try:
                expires_on = convert_datetime_to_rfc1123(expires_on)
                kwargs['expiry_options'] = 'Absolute'
            except AttributeError:
                expires_on = str(expires_on)
                kwargs['expiry_options'] = 'RelativeToNow'

        options = {
            'resource': resource_type,
            'properties': add_metadata_headers(metadata),
            'permissions': kwargs.pop('permissions', None),
            'umask': kwargs.pop('umask', None),
            'owner': kwargs.pop('owner', None),
            'group': kwargs.pop('group', None),
            'acl': kwargs.pop('acl', None),
            'proposed_lease_id': kwargs.pop('lease_id', None),
            'lease_duration': kwargs.pop('lease_duration', None),
            'expiry_options': kwargs.pop('expiry_options', None),
            'expires_on': expires_on,
            'path_http_headers': path_http_headers,
            'lease_access_conditions': access_conditions,
            'modified_access_conditions': mod_conditions,
            'cpk_info': cpk_info,
            'timeout': kwargs.pop('timeout', None),
            'encryption_context': kwargs.pop('encryption_context', None),
            'cls': return_response_headers}
        options.update(kwargs)
        return options

    def _create(self, resource_type, content_settings=None, metadata=None, **kwargs):
        # type: (...) -> Dict[str, Union[str, datetime]]
        """
        Create directory or file

        :param resource_type:
            Required for Create File and Create Directory.
            The value must be "file" or "directory". Possible values include:
            'directory', 'file'
        :type resource_type: str
        :param ~azure.storage.filedatalake.ContentSettings content_settings:
            ContentSettings object used to set path properties.
        :param metadata:
            Name-value pairs associated with the file/directory as metadata.
        :type metadata: dict(str, str)
        :keyword lease:
            Required if the file/directory has an active lease. Value can be a LeaseClient object
            or the lease ID as a string.
        :paramtype lease: ~azure.storage.filedatalake.DataLakeLeaseClient or str
        :keyword str umask:
            Optional and only valid if Hierarchical Namespace is enabled for the account.
            When creating a file or directory and the parent folder does not have a default ACL,
            the umask restricts the permissions of the file or directory to be created.
            The resulting permission is given by p & ^u, where p is the permission and u is the umask.
            For example, if p is 0777 and u is 0057, then the resulting permission is 0720.
            The default permission is 0777 for a directory and 0666 for a file. The default umask is 0027.
            The umask must be specified in 4-digit octal notation (e.g. 0766).
        :keyword str owner:
            The owner of the file or directory.
        :keyword str group:
            The owning group of the file or directory.
        :keyword str acl:
            Sets POSIX access control rights on files and directories. The value is a
            comma-separated list of access control entries. Each access control entry (ACE) consists of a
            scope, a type, a user or group identifier, and permissions in the format
            "[scope:][type]:[id]:[permissions]".
        :keyword str lease_id:
            Proposed lease ID, in a GUID string format. The DataLake service returns
            400 (Invalid request) if the proposed lease ID is not in the correct format.
        :keyword int lease_duration:
            Specifies the duration of the lease, in seconds, or negative one
            (-1) for a lease that never expires. A non-infinite lease can be
            between 15 and 60 seconds. A lease duration cannot be changed
            using renew or change.
        :keyword expires_on:
            The time to set the file to expiry.
            If the type of expires_on is an int, expiration time will be set
            as the number of milliseconds elapsed from creation time.
            If the type of expires_on is datetime, expiration time will be set
            absolute to the time provided. If no time zone info is provided, this
            will be interpreted as UTC.
        :paramtype expires_on: datetime or int
        :keyword permissions:
            Optional and only valid if Hierarchical Namespace
            is enabled for the account. Sets POSIX access permissions for the file
            owner, the file owning group, and others. Each class may be granted
            read, write, or execute permission.  The sticky bit is also supported.
            Both symbolic (rwxrw-rw-) and 4-digit octal notation (e.g. 0766) are
            supported.
        :type permissions: str
        :keyword ~datetime.datetime if_modified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only
            if the resource has been modified since the specified time.
        :keyword ~datetime.datetime if_unmodified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only if
            the resource has not been modified since the specified date/time.
        :keyword str etag:
            An ETag value, or the wildcard character (*). Used to check if the resource has changed,
            and act according to the condition specified by the `match_condition` parameter.
        :keyword ~azure.core.MatchConditions match_condition:
            The match condition to use upon the etag.
        :keyword ~azure.storage.filedatalake.CustomerProvidedEncryptionKey cpk:
            Encrypts the data on the service-side with the given key.
            Use of customer-provided keys must be done over HTTPS.
        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :return: A dictionary of response headers.
        :keyword str encryption_context:
            Specifies the encryption context to set on the file.
        :rtype: dict[str, str] or dict[str, ~datetime.datetime]
        """
        lease_id = kwargs.get('lease_id', None)
        lease_duration = kwargs.get('lease_duration', None)
        if lease_id and not lease_duration:
            raise ValueError("Please specify a lease_id and a lease_duration.")
        if lease_duration and not lease_id:
            raise ValueError("Please specify a lease_id and a lease_duration.")
        options = self._create_path_options(
            resource_type,
            content_settings=content_settings,
            metadata=metadata,
            **kwargs)
        try:
            return self._client.path.create(**options)
        except HttpResponseError as error:
            process_storage_error(error)

    @staticmethod
    def _delete_path_options(paginated: Optional[bool], **kwargs) -> Dict[str, Any]:
        access_conditions = get_access_conditions(kwargs.pop('lease', None))
        mod_conditions = get_mod_conditions(kwargs)

        options = {
            'paginated': paginated,
            'lease_access_conditions': access_conditions,
            'modified_access_conditions': mod_conditions,
            'cls': return_response_headers,
            'timeout': kwargs.pop('timeout', None)}
        options.update(kwargs)
        return options

    def _delete(self, **kwargs):
        # type: (**Any) -> Dict[Union[datetime, str]]
        """
        Marks the specified path for deletion.

        :keyword lease:
            Required if the file/directory has an active lease. Value can be a LeaseClient object
            or the lease ID as a string.
        :type lease: ~azure.storage.filedatalake.DataLakeLeaseClient or str
        :keyword ~datetime.datetime if_modified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only
            if the resource has been modified since the specified time.
        :keyword ~datetime.datetime if_unmodified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only if
            the resource has not been modified since the specified date/time.
        :keyword str etag:
            An ETag value, or the wildcard character (*). Used to check if the resource has changed,
            and act according to the condition specified by the `match_condition` parameter.
        :keyword ~azure.core.MatchConditions match_condition:
            The match condition to use upon the etag.
        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :returns: A dictionary containing information about the deleted path.
        :rtype: dict[str, Any]
        """
        # Perform paginated delete only if using OAuth, deleting a directory, and api version is 2023-08-03 or later
        # The pagination is only for ACL checks, the final request remains the atomic delete operation
        paginated = None
        if (compare_api_versions(self.api_version, '2023-08-03') >= 0 and
            hasattr(self.credential, 'get_token') and
            kwargs.get('recursive')):  # Directory delete will always specify recursive
            paginated = True

        options = self._delete_path_options(paginated, **kwargs)
        try:
            response_headers = self._client.path.delete(**options)
            # Loop until continuation token is None for paginated delete
            while response_headers['continuation']:
                response_headers = self._client.path.delete(
                    continuation=response_headers['continuation'],
                    **options)

            return response_headers
        except HttpResponseError as error:
            process_storage_error(error)

    @staticmethod
    def _set_access_control_options(owner=None, group=None, permissions=None, acl=None, **kwargs):
        # type: (...) -> Dict[str, Any]

        access_conditions = get_access_conditions(kwargs.pop('lease', None))
        mod_conditions = get_mod_conditions(kwargs)

        options = {
            'owner': owner,
            'group': group,
            'permissions': permissions,
            'acl': acl,
            'lease_access_conditions': access_conditions,
            'modified_access_conditions': mod_conditions,
            'timeout': kwargs.pop('timeout', None),
            'cls': return_response_headers}
        options.update(kwargs)
        return options

    @distributed_trace
    def set_access_control(self, owner=None,  # type: Optional[str]
                           group=None,  # type: Optional[str]
                           permissions=None,  # type: Optional[str]
                           acl=None,  # type: Optional[str]
                           **kwargs):
        # type: (...) -> Dict[str, Union[str, datetime]]
        """
        Set the owner, group, permissions, or access control list for a path.

        :param owner:
            Optional. The owner of the file or directory.
        :type owner: str
        :param group:
            Optional. The owning group of the file or directory.
        :type group: str
        :param permissions:
            Optional and only valid if Hierarchical Namespace
            is enabled for the account. Sets POSIX access permissions for the file
            owner, the file owning group, and others. Each class may be granted
            read, write, or execute permission.  The sticky bit is also supported.
            Both symbolic (rwxrw-rw-) and 4-digit octal notation (e.g. 0766) are
            supported.
            permissions and acl are mutually exclusive.
        :type permissions: str
        :param acl:
            Sets POSIX access control rights on files and directories.
            The value is a comma-separated list of access control entries. Each
            access control entry (ACE) consists of a scope, a type, a user or
            group identifier, and permissions in the format
            "[scope:][type]:[id]:[permissions]".
            permissions and acl are mutually exclusive.
        :type acl: str
        :keyword lease:
            Required if the file/directory has an active lease. Value can be a LeaseClient object
            or the lease ID as a string.
        :paramtype lease: ~azure.storage.filedatalake.DataLakeLeaseClient or str
        :keyword ~datetime.datetime if_modified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only
            if the resource has been modified since the specified time.
        :keyword ~datetime.datetime if_unmodified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only if
            the resource has not been modified since the specified date/time.
        :keyword str etag:
            An ETag value, or the wildcard character (*). Used to check if the resource has changed,
            and act according to the condition specified by the `match_condition` parameter.
        :keyword ~azure.core.MatchConditions match_condition:
            The match condition to use upon the etag.
        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :returns: response dict containing access control options (Etag and last modified).
        :rtype: dict[str, str] or dict[str, ~datetime.datetime]
        """
        if not any([owner, group, permissions, acl]):
            raise ValueError("At least one parameter should be set for set_access_control API")
        options = self._set_access_control_options(owner=owner, group=group, permissions=permissions, acl=acl, **kwargs)
        try:
            return self._client.path.set_access_control(**options)
        except HttpResponseError as error:
            process_storage_error(error)

    @staticmethod
    def _get_access_control_options(upn=None,  # type: Optional[bool]
                                    **kwargs):
        # type: (...) -> Dict[str, Any]

        access_conditions = get_access_conditions(kwargs.pop('lease', None))
        mod_conditions = get_mod_conditions(kwargs)

        options = {
            'action': 'getAccessControl',
            'upn': upn if upn else False,
            'lease_access_conditions': access_conditions,
            'modified_access_conditions': mod_conditions,
            'timeout': kwargs.pop('timeout', None),
            'cls': return_response_headers}
        options.update(kwargs)
        return options

    @distributed_trace
    def get_access_control(self, upn=None,  # type: Optional[bool]
                           **kwargs):
        # type: (...) -> Dict[str, Any]
        """
        :param upn: Optional.
            Valid only when Hierarchical Namespace is
            enabled for the account. If "true", the user identity values returned
            in the x-ms-owner, x-ms-group, and x-ms-acl response headers will be
            transformed from Azure Active Directory Object IDs to User Principal
            Names.  If "false", the values will be returned as Azure Active
            Directory Object IDs. The default value is false. Note that group and
            application Object IDs are not translated because they do not have
            unique friendly names.
        :type upn: bool
        :keyword lease:
            Required if the file/directory has an active lease. Value can be a LeaseClient object
            or the lease ID as a string.
        :paramtype lease: ~azure.storage.filedatalake.DataLakeLeaseClient or str
        :keyword ~datetime.datetime if_modified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only
            if the resource has been modified since the specified time.
        :keyword ~datetime.datetime if_unmodified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only if
            the resource has not been modified since the specified date/time.
        :keyword str etag:
            An ETag value, or the wildcard character (*). Used to check if the resource has changed,
            and act according to the condition specified by the `match_condition` parameter.
        :keyword ~azure.core.MatchConditions match_condition:
            The match condition to use upon the etag.
        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :returns: response dict containing access control options with no modifications.
        :rtype: dict[str, Any]
        """
        options = self._get_access_control_options(upn=upn, **kwargs)
        try:
            return self._client.path.get_properties(**options)
        except HttpResponseError as error:
            process_storage_error(error)

    @staticmethod
    def _set_access_control_recursive_options(mode, acl, **kwargs):
        # type: (str, str, **Any) -> Dict[str, Any]

        options = {
            'mode': mode,
            'force_flag': kwargs.pop('continue_on_failure', None),
            'timeout': kwargs.pop('timeout', None),
            'continuation': kwargs.pop('continuation_token', None),
            'max_records': kwargs.pop('batch_size', None),
            'acl': acl,
            'cls': return_headers_and_deserialized}
        options.update(kwargs)
        return options

    @distributed_trace
    def set_access_control_recursive(self, acl, **kwargs):
        # type: (str, **Any) -> AccessControlChangeResult
        """
        Sets the Access Control on a path and sub-paths.

        :param acl:
            Sets POSIX access control rights on files and directories.
            The value is a comma-separated list of access control entries. Each
            access control entry (ACE) consists of a scope, a type, a user or
            group identifier, and permissions in the format
            "[scope:][type]:[id]:[permissions]".
        :type acl: str
        :keyword func(~azure.storage.filedatalake.AccessControlChanges) progress_hook:
            Callback where the caller can track progress of the operation
            as well as collect paths that failed to change Access Control.
        :keyword str continuation_token:
            Optional continuation token that can be used to resume previously stopped operation.
        :keyword int batch_size:
            Optional. If data set size exceeds batch size then operation will be split into multiple
            requests so that progress can be tracked. Batch size should be between 1 and 2000.
            The default when unspecified is 2000.
        :keyword int max_batches:
            Optional. Defines maximum number of batches that single change Access Control operation can execute.
            If maximum is reached before all sub-paths are processed,
            then continuation token can be used to resume operation.
            Empty value indicates that maximum number of batches in unbound and operation continues till end.
        :keyword bool continue_on_failure:
            If set to False, the operation will terminate quickly on encountering user errors (4XX).
            If True, the operation will ignore user errors and proceed with the operation on other sub-entities of
            the directory.
            Continuation token will only be returned when continue_on_failure is True in case of user errors.
            If not set the default value is False for this.
        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :return: A summary of the recursive operations, including the count of successes and failures,
            as well as a continuation token in case the operation was terminated prematurely.
        :rtype: ~azure.storage.filedatalake.AccessControlChangeResult
        :raises ~azure.core.exceptions.AzureError:
            User can restart the operation using continuation_token field of AzureError if the token is available.
        """
        if not acl:
            raise ValueError("The Access Control List must be set for this operation")

        progress_hook = kwargs.pop('progress_hook', None)
        max_batches = kwargs.pop('max_batches', None)
        options = self._set_access_control_recursive_options(mode='set', acl=acl, **kwargs)
        return self._set_access_control_internal(options=options, progress_hook=progress_hook,
                                                 max_batches=max_batches)

    @distributed_trace
    def update_access_control_recursive(self, acl, **kwargs):
        # type: (str, **Any) -> AccessControlChangeResult
        """
        Modifies the Access Control on a path and sub-paths.

        :param acl:
            Modifies POSIX access control rights on files and directories.
            The value is a comma-separated list of access control entries. Each
            access control entry (ACE) consists of a scope, a type, a user or
            group identifier, and permissions in the format
            "[scope:][type]:[id]:[permissions]".
        :type acl: str
        :keyword func(~azure.storage.filedatalake.AccessControlChanges) progress_hook:
            Callback where the caller can track progress of the operation
            as well as collect paths that failed to change Access Control.
        :keyword str continuation_token:
            Optional continuation token that can be used to resume previously stopped operation.
        :keyword int batch_size:
            Optional. If data set size exceeds batch size then operation will be split into multiple
            requests so that progress can be tracked. Batch size should be between 1 and 2000.
            The default when unspecified is 2000.
        :keyword int max_batches:
            Optional. Defines maximum number of batches that single change Access Control operation can execute.
            If maximum is reached before all sub-paths are processed,
            then continuation token can be used to resume operation.
            Empty value indicates that maximum number of batches in unbound and operation continues till end.
        :keyword bool continue_on_failure:
            If set to False, the operation will terminate quickly on encountering user errors (4XX).
            If True, the operation will ignore user errors and proceed with the operation on other sub-entities of
            the directory.
            Continuation token will only be returned when continue_on_failure is True in case of user errors.
            If not set the default value is False for this.
        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :return: A summary of the recursive operations, including the count of successes and failures,
            as well as a continuation token in case the operation was terminated prematurely.
        :rtype: ~azure.storage.filedatalake.AccessControlChangeResult
        :raises ~azure.core.exceptions.AzureError:
            User can restart the operation using continuation_token field of AzureError if the token is available.
        """
        if not acl:
            raise ValueError("The Access Control List must be set for this operation")

        progress_hook = kwargs.pop('progress_hook', None)
        max_batches = kwargs.pop('max_batches', None)
        options = self._set_access_control_recursive_options(mode='modify', acl=acl, **kwargs)
        return self._set_access_control_internal(options=options, progress_hook=progress_hook,
                                                 max_batches=max_batches)

    @distributed_trace
    def remove_access_control_recursive(self, acl, **kwargs):
        # type: (str, **Any) -> AccessControlChangeResult
        """
        Removes the Access Control on a path and sub-paths.

        :param acl:
            Removes POSIX access control rights on files and directories.
            The value is a comma-separated list of access control entries. Each
            access control entry (ACE) consists of a scope, a type, and a user or
            group identifier in the format "[scope:][type]:[id]".
        :type acl: str
        :keyword func(~azure.storage.filedatalake.AccessControlChanges) progress_hook:
            Callback where the caller can track progress of the operation
            as well as collect paths that failed to change Access Control.
        :keyword str continuation_token:
            Optional continuation token that can be used to resume previously stopped operation.
        :keyword int batch_size:
            Optional. If data set size exceeds batch size then operation will be split into multiple
            requests so that progress can be tracked. Batch size should be between 1 and 2000.
            The default when unspecified is 2000.
        :keyword int max_batches:
            Optional. Defines maximum number of batches that single change Access Control operation can execute.
            If maximum is reached before all sub-paths are processed then,
            continuation token can be used to resume operation.
            Empty value indicates that maximum number of batches in unbound and operation continues till end.
        :keyword bool continue_on_failure:
            If set to False, the operation will terminate quickly on encountering user errors (4XX).
            If True, the operation will ignore user errors and proceed with the operation on other sub-entities of
            the directory.
            Continuation token will only be returned when continue_on_failure is True in case of user errors.
            If not set the default value is False for this.
        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :return: A summary of the recursive operations, including the count of successes and failures,
            as well as a continuation token in case the operation was terminated prematurely.
        :rtype: ~azure.storage.filedatalake.AccessControlChangeResult
        :raises ~azure.core.exceptions.AzureError:
            User can restart the operation using continuation_token field of AzureError if the token is available.
        """
        if not acl:
            raise ValueError("The Access Control List must be set for this operation")

        progress_hook = kwargs.pop('progress_hook', None)
        max_batches = kwargs.pop('max_batches', None)
        options = self._set_access_control_recursive_options(mode='remove', acl=acl, **kwargs)
        return self._set_access_control_internal(options=options, progress_hook=progress_hook,
                                                 max_batches=max_batches)

    def _set_access_control_internal(self, options, progress_hook, max_batches=None):
        try:
            continue_on_failure = options.get('force_flag')
            total_directories_successful = 0
            total_files_success = 0
            total_failure_count = 0
            batch_count = 0
            last_continuation_token = None
            current_continuation_token = None
            continue_operation = True
            while continue_operation:
                headers, resp = self._client.path.set_access_control_recursive(**options)

                # make a running tally so that we can report the final results
                total_directories_successful += resp.directories_successful
                total_files_success += resp.files_successful
                total_failure_count += resp.failure_count
                batch_count += 1
                current_continuation_token = headers['continuation']

                if current_continuation_token is not None:
                    last_continuation_token = current_continuation_token

                if progress_hook is not None:
                    progress_hook(AccessControlChanges(
                        batch_counters=AccessControlChangeCounters(
                            directories_successful=resp.directories_successful,
                            files_successful=resp.files_successful,
                            failure_count=resp.failure_count,
                        ),
                        aggregate_counters=AccessControlChangeCounters(
                            directories_successful=total_directories_successful,
                            files_successful=total_files_success,
                            failure_count=total_failure_count,
                        ),
                        batch_failures=[AccessControlChangeFailure(
                            name=failure.name,
                            is_directory=failure.type == 'DIRECTORY',
                            error_message=failure.error_message) for failure in resp.failed_entries],
                        continuation=last_continuation_token))

                # update the continuation token, if there are more operations that cannot be completed in a single call
                max_batches_satisfied = (max_batches is not None and batch_count == max_batches)
                continue_operation = bool(current_continuation_token) and not max_batches_satisfied
                options['continuation'] = current_continuation_token

            # currently the service stops on any failure, so we should send back the last continuation token
            # for the user to retry the failed updates
            # otherwise we should just return what the service gave us
            return AccessControlChangeResult(counters=AccessControlChangeCounters(
                directories_successful=total_directories_successful,
                files_successful=total_files_success,
                failure_count=total_failure_count),
                continuation=last_continuation_token
                if total_failure_count > 0 and not continue_on_failure else current_continuation_token)
        except HttpResponseError as error:
            error.continuation_token = last_continuation_token
            process_storage_error(error)
        except AzureError as error:
            error.continuation_token = last_continuation_token
            raise error

    def _parse_rename_path(self, new_name: str) -> Tuple[str, str, Optional[str]]:
        new_name = new_name.strip('/')
        new_file_system = new_name.split('/')[0]
        new_path = new_name[len(new_file_system):].strip('/')

        new_sas = None
        sas_split = new_path.split('?')
        # If there is a ?, there could be a SAS token
        if len(sas_split) > 0:
            # Check last element for SAS by looking for sv= and sig=
            potential_sas = sas_split[-1]
            if re.search(r'sv=\d{4}-\d{2}-\d{2}', potential_sas) and 'sig=' in potential_sas:
                new_sas = potential_sas
                # Remove SAS from new path
                new_path = new_path[:-(len(new_sas) + 1)]

        if not new_sas:
            if not self._raw_credential and new_file_system != self.file_system_name:
                raise ValueError("please provide the sas token for the new file")
            if not self._raw_credential and new_file_system == self.file_system_name:
                new_sas = self._query_str.strip('?')

        return new_file_system, new_path, new_sas

    def _rename_path_options(self,
                             rename_source,  # type: str
                             content_settings=None,  # type: Optional[ContentSettings]
                             metadata=None,  # type: Optional[Dict[str, str]]
                             **kwargs):
        # type: (...) -> Dict[str, Any]
        if metadata or kwargs.pop('permissions', None) or kwargs.pop('umask', None):
            raise ValueError("metadata, permissions, umask is not supported for this operation")

        access_conditions = get_access_conditions(kwargs.pop('lease', None))
        source_lease_id = get_lease_id(kwargs.pop('source_lease', None))
        mod_conditions = get_mod_conditions(kwargs)
        source_mod_conditions = get_source_mod_conditions(kwargs)

        path_http_headers = None
        if content_settings:
            path_http_headers = get_path_http_headers(content_settings)

        options = {
            'rename_source': rename_source,
            'path_http_headers': path_http_headers,
            'lease_access_conditions': access_conditions,
            'source_lease_id': source_lease_id,
            'modified_access_conditions': mod_conditions,
            'source_modified_access_conditions': source_mod_conditions,
            'timeout': kwargs.pop('timeout', None),
            'mode': 'legacy',
            'cls': return_response_headers}
        options.update(kwargs)
        return options

    def _rename_path(self, rename_source, **kwargs):
        # type: (str, **Any) -> Dict[str, Any]
        """
        Rename directory or file

        :param rename_source:
            The value must have the following format: "/{filesystem}/{path}".
        :type rename_source: str
        :keyword ~azure.storage.filedatalake.ContentSettings content_settings:
            ContentSettings object used to set path properties.
        :keyword source_lease:
            A lease ID for the source path. If specified,
            the source path must have an active lease and the lease ID must
            match.
        :paramtype source_lease: ~azure.storage.filedatalake.DataLakeLeaseClient or str
        :keyword lease:
            Required if the file/directory has an active lease. Value can be a LeaseClient object
            or the lease ID as a string.
        :paramtype lease: ~azure.storage.filedatalake.DataLakeLeaseClient or str
        :keyword ~datetime.datetime if_modified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only
            if the resource has been modified since the specified time.
        :keyword ~datetime.datetime if_unmodified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only if
            the resource has not been modified since the specified date/time.
        :keyword str etag:
            An ETag value, or the wildcard character (*). Used to check if the resource has changed,
            and act according to the condition specified by the `match_condition` parameter.
        :keyword ~azure.core.MatchConditions match_condition:
            The match condition to use upon the etag.
        :keyword ~datetime.datetime source_if_modified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only
            if the resource has been modified since the specified time.
        :keyword ~datetime.datetime source_if_unmodified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only if
            the resource has not been modified since the specified date/time.
        :keyword str source_etag:
            The source ETag value, or the wildcard character (*). Used to check if the resource has changed,
            and act according to the condition specified by the `match_condition` parameter.
        :keyword ~azure.core.MatchConditions source_match_condition:
            The source match condition to use upon the etag.
        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :returns: response dict containing information about the renamed path.
        :rtype: dict[str, Any]
        """
        options = self._rename_path_options(
            rename_source,
            **kwargs)
        try:
            return self._client.path.create(**options)
        except HttpResponseError as error:
            process_storage_error(error)

    def _get_path_properties(self, **kwargs):
        # type: (**Any) -> Union[FileProperties, DirectoryProperties]
        """Returns all user-defined metadata, standard HTTP properties, and
        system properties for the file or directory. It does not return the content of the directory or file.

        :keyword lease:
            Required if the directory or file has an active lease. Value can be a DataLakeLeaseClient object
            or the lease ID as a string.
        :paramtype lease: ~azure.storage.filedatalake.DataLakeLeaseClient or str
        :keyword ~datetime.datetime if_modified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only
            if the resource has been modified since the specified time.
        :keyword ~datetime.datetime if_unmodified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only if
            the resource has not been modified since the specified date/time.
        :keyword str etag:
            An ETag value, or the wildcard character (*). Used to check if the resource has changed,
            and act according to the condition specified by the `match_condition` parameter.
        :keyword ~azure.core.MatchConditions match_condition:
            The match condition to use upon the etag.
        :keyword ~azure.storage.filedatalake.CustomerProvidedEncryptionKey cpk:
            Decrypts the data on the service-side with the given key.
            Use of customer-provided keys must be done over HTTPS.
            Required if the file/directory was created with a customer-provided key.
        :keyword bool upn:
            If True, the user identity values returned in the x-ms-owner, x-ms-group,
            and x-ms-acl response headers will be transformed from Azure Active Directory Object IDs to User 
            Principal Names in the owner, group, and acl fields of the respective property object returned.
            If False, the values will be returned as Azure Active Directory Object IDs.
            The default value is False. Note that group and application Object IDs are not translate
            because they do not have unique friendly names.
        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :returns:
            Information including user-defined metadata, standard HTTP properties,
            and system properties for the file or directory.
        :rtype: DirectoryProperties or FileProperties

        .. admonition:: Example:

            .. literalinclude:: ../tests/test_blob_samples_common.py
                :start-after: [START get_blob_properties]
                :end-before: [END get_blob_properties]
                :language: python
                :dedent: 8
                :caption: Getting the properties for a file/directory.
        """
        upn = kwargs.pop('upn', None)
        if upn:
            headers = kwargs.pop('headers', {})
            headers['x-ms-upn'] = str(upn)
            kwargs['headers'] = headers
        path_properties = self._blob_client.get_blob_properties(**kwargs)
        return path_properties

    def _exists(self, **kwargs):
        # type: (**Any) -> bool
        """
        Returns True if a path exists and returns False otherwise.

        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :returns: True if a path exists, False otherwise.
        :rtype: bool
        """
        return self._blob_client.exists(**kwargs)

    @distributed_trace
    def set_metadata(self, metadata,  # type: Dict[str, str]
                     **kwargs):
        # type: (...) -> Dict[str, Union[str, datetime]]
        """Sets one or more user-defined name-value pairs for the specified
        file system. Each call to this operation replaces all existing metadata
        attached to the file system. To remove all metadata from the file system,
        call this operation with no metadata dict.

        :param metadata:
            A dict containing name-value pairs to associate with the file system as
            metadata. Example: {'category':'test'}
        :type metadata: dict[str, str]
        :keyword lease:
            If specified, set_file_system_metadata only succeeds if the
            file system's lease is active and matches this ID.
        :paramtype lease: ~azure.storage.filedatalake.DataLakeLeaseClient or str
        :keyword ~datetime.datetime if_modified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only
            if the resource has been modified since the specified time.
        :keyword ~datetime.datetime if_unmodified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only if
            the resource has not been modified since the specified date/time.
        :keyword str etag:
            An ETag value, or the wildcard character (*). Used to check if the resource has changed,
            and act according to the condition specified by the `match_condition` parameter.
        :keyword ~azure.core.MatchConditions match_condition:
            The match condition to use upon the etag.
        :keyword ~azure.storage.filedatalake.CustomerProvidedEncryptionKey cpk:
            Encrypts the data on the service-side with the given key.
            Use of customer-provided keys must be done over HTTPS.
        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :returns: file system-updated property dict (Etag and last modified).
        :rtype: dict[str, str] or dict[str, ~datetime.datetime]
        """
        return self._blob_client.set_blob_metadata(metadata=metadata, **kwargs)

    @distributed_trace
    def set_http_headers(self, content_settings: Optional["ContentSettings"] = None, **kwargs):
        # type: (...) -> Dict[str, Any]
        """Sets system properties on the file or directory.

        If one property is set for the content_settings, all properties will be overridden.

        :param ~azure.storage.filedatalake.ContentSettings content_settings:
            ContentSettings object used to set file/directory properties.
        :keyword lease:
            If specified, set_file_system_metadata only succeeds if the
            file system's lease is active and matches this ID.
        :paramtype lease: ~azure.storage.filedatalake.DataLakeLeaseClient or str
        :keyword ~datetime.datetime if_modified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only
            if the resource has been modified since the specified time.
        :keyword ~datetime.datetime if_unmodified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only if
            the resource has not been modified since the specified date/time.
        :keyword str etag:
            An ETag value, or the wildcard character (*). Used to check if the resource has changed,
            and act according to the condition specified by the `match_condition` parameter.
        :keyword ~azure.core.MatchConditions match_condition:
            The match condition to use upon the etag.
        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :returns: file/directory-updated property dict (Etag and last modified)
        :rtype: dict[str, Any]
        """
        return self._blob_client.set_http_headers(content_settings=content_settings, **kwargs)

    @distributed_trace
    def acquire_lease(self, lease_duration=-1,  # type: Optional[int]
                      lease_id=None,  # type: Optional[str]
                      **kwargs):
        # type: (...) -> DataLakeLeaseClient
        """
        Requests a new lease. If the file or directory does not have an active lease,
        the DataLake service creates a lease on the file/directory and returns a new
        lease ID.

        :param int lease_duration:
            Specifies the duration of the lease, in seconds, or negative one
            (-1) for a lease that never expires. A non-infinite lease can be
            between 15 and 60 seconds. A lease duration cannot be changed
            using renew or change. Default is -1 (infinite lease).
        :param str lease_id:
            Proposed lease ID, in a GUID string format. The DataLake service returns
            400 (Invalid request) if the proposed lease ID is not in the correct format.
        :keyword ~datetime.datetime if_modified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only
            if the resource has been modified since the specified time.
        :keyword ~datetime.datetime if_unmodified_since:
            A DateTime value. Azure expects the date value passed in to be UTC.
            If timezone is included, any non-UTC datetimes will be converted to UTC.
            If a date is passed in without timezone info, it is assumed to be UTC.
            Specify this header to perform the operation only if
            the resource has not been modified since the specified date/time.
        :keyword str etag:
            An ETag value, or the wildcard character (*). Used to check if the resource has changed,
            and act according to the condition specified by the `match_condition` parameter.
        :keyword ~azure.core.MatchConditions match_condition:
            The match condition to use upon the etag.
        :keyword int timeout:
            Sets the server-side timeout for the operation in seconds. For more details see
            https://learn.microsoft.com/rest/api/storageservices/setting-timeouts-for-blob-service-operations.
            This value is not tracked or validated on the client. To configure client-side network timesouts
            see `here <https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-file-datalake
            #other-client--per-operation-configuration>`_.
        :returns: A DataLakeLeaseClient object, that can be run in a context manager.
        :rtype: ~azure.storage.filedatalake.DataLakeLeaseClient
        """
        lease = DataLakeLeaseClient(self, lease_id=lease_id)  # type: ignore
        lease.acquire(lease_duration=lease_duration, **kwargs)
        return lease