aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/fsspec/implementations/dbfs.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/dbfs.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/implementations/dbfs.py467
1 files changed, 467 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/dbfs.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/dbfs.py
new file mode 100644
index 00000000..30c2947b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/dbfs.py
@@ -0,0 +1,467 @@
+import base64
+import urllib
+
+import requests
+import requests.exceptions
+from requests.adapters import HTTPAdapter, Retry
+
+from fsspec import AbstractFileSystem
+from fsspec.spec import AbstractBufferedFile
+
+
+class DatabricksException(Exception):
+ """
+ Helper class for exceptions raised in this module.
+ """
+
+ def __init__(self, error_code, message):
+ """Create a new DatabricksException"""
+ super().__init__(message)
+
+ self.error_code = error_code
+ self.message = message
+
+
+class DatabricksFileSystem(AbstractFileSystem):
+ """
+ Get access to the Databricks filesystem implementation over HTTP.
+ Can be used inside and outside of a databricks cluster.
+ """
+
+ def __init__(self, instance, token, **kwargs):
+ """
+ Create a new DatabricksFileSystem.
+
+ Parameters
+ ----------
+ instance: str
+ The instance URL of the databricks cluster.
+ For example for an Azure databricks cluster, this
+ has the form adb-<some-number>.<two digits>.azuredatabricks.net.
+ token: str
+ Your personal token. Find out more
+ here: https://docs.databricks.com/dev-tools/api/latest/authentication.html
+ """
+ self.instance = instance
+ self.token = token
+ self.session = requests.Session()
+ self.retries = Retry(
+ total=10,
+ backoff_factor=0.05,
+ status_forcelist=[408, 429, 500, 502, 503, 504],
+ )
+
+ self.session.mount("https://", HTTPAdapter(max_retries=self.retries))
+ self.session.headers.update({"Authorization": f"Bearer {self.token}"})
+
+ super().__init__(**kwargs)
+
+ def ls(self, path, detail=True, **kwargs):
+ """
+ List the contents of the given path.
+
+ Parameters
+ ----------
+ path: str
+ Absolute path
+ detail: bool
+ Return not only the list of filenames,
+ but also additional information on file sizes
+ and types.
+ """
+ out = self._ls_from_cache(path)
+ if not out:
+ try:
+ r = self._send_to_api(
+ method="get", endpoint="list", json={"path": path}
+ )
+ except DatabricksException as e:
+ if e.error_code == "RESOURCE_DOES_NOT_EXIST":
+ raise FileNotFoundError(e.message) from e
+
+ raise
+ files = r["files"]
+ out = [
+ {
+ "name": o["path"],
+ "type": "directory" if o["is_dir"] else "file",
+ "size": o["file_size"],
+ }
+ for o in files
+ ]
+ self.dircache[path] = out
+
+ if detail:
+ return out
+ return [o["name"] for o in out]
+
+ def makedirs(self, path, exist_ok=True):
+ """
+ Create a given absolute path and all of its parents.
+
+ Parameters
+ ----------
+ path: str
+ Absolute path to create
+ exist_ok: bool
+ If false, checks if the folder
+ exists before creating it (and raises an
+ Exception if this is the case)
+ """
+ if not exist_ok:
+ try:
+ # If the following succeeds, the path is already present
+ self._send_to_api(
+ method="get", endpoint="get-status", json={"path": path}
+ )
+ raise FileExistsError(f"Path {path} already exists")
+ except DatabricksException as e:
+ if e.error_code == "RESOURCE_DOES_NOT_EXIST":
+ pass
+
+ try:
+ self._send_to_api(method="post", endpoint="mkdirs", json={"path": path})
+ except DatabricksException as e:
+ if e.error_code == "RESOURCE_ALREADY_EXISTS":
+ raise FileExistsError(e.message) from e
+
+ raise
+ self.invalidate_cache(self._parent(path))
+
+ def mkdir(self, path, create_parents=True, **kwargs):
+ """
+ Create a given absolute path and all of its parents.
+
+ Parameters
+ ----------
+ path: str
+ Absolute path to create
+ create_parents: bool
+ Whether to create all parents or not.
+ "False" is not implemented so far.
+ """
+ if not create_parents:
+ raise NotImplementedError
+
+ self.mkdirs(path, **kwargs)
+
+ def rm(self, path, recursive=False, **kwargs):
+ """
+ Remove the file or folder at the given absolute path.
+
+ Parameters
+ ----------
+ path: str
+ Absolute path what to remove
+ recursive: bool
+ Recursively delete all files in a folder.
+ """
+ try:
+ self._send_to_api(
+ method="post",
+ endpoint="delete",
+ json={"path": path, "recursive": recursive},
+ )
+ except DatabricksException as e:
+ # This is not really an exception, it just means
+ # not everything was deleted so far
+ if e.error_code == "PARTIAL_DELETE":
+ self.rm(path=path, recursive=recursive)
+ elif e.error_code == "IO_ERROR":
+ # Using the same exception as the os module would use here
+ raise OSError(e.message) from e
+
+ raise
+ self.invalidate_cache(self._parent(path))
+
+ def mv(
+ self, source_path, destination_path, recursive=False, maxdepth=None, **kwargs
+ ):
+ """
+ Move a source to a destination path.
+
+ A note from the original [databricks API manual]
+ (https://docs.databricks.com/dev-tools/api/latest/dbfs.html#move).
+
+ When moving a large number of files the API call will time out after
+ approximately 60s, potentially resulting in partially moved data.
+ Therefore, for operations that move more than 10k files, we strongly
+ discourage using the DBFS REST API.
+
+ Parameters
+ ----------
+ source_path: str
+ From where to move (absolute path)
+ destination_path: str
+ To where to move (absolute path)
+ recursive: bool
+ Not implemented to far.
+ maxdepth:
+ Not implemented to far.
+ """
+ if recursive:
+ raise NotImplementedError
+ if maxdepth:
+ raise NotImplementedError
+
+ try:
+ self._send_to_api(
+ method="post",
+ endpoint="move",
+ json={"source_path": source_path, "destination_path": destination_path},
+ )
+ except DatabricksException as e:
+ if e.error_code == "RESOURCE_DOES_NOT_EXIST":
+ raise FileNotFoundError(e.message) from e
+ elif e.error_code == "RESOURCE_ALREADY_EXISTS":
+ raise FileExistsError(e.message) from e
+
+ raise
+ self.invalidate_cache(self._parent(source_path))
+ self.invalidate_cache(self._parent(destination_path))
+
+ def _open(self, path, mode="rb", block_size="default", **kwargs):
+ """
+ Overwrite the base class method to make sure to create a DBFile.
+ All arguments are copied from the base method.
+
+ Only the default blocksize is allowed.
+ """
+ return DatabricksFile(self, path, mode=mode, block_size=block_size, **kwargs)
+
+ def _send_to_api(self, method, endpoint, json):
+ """
+ Send the given json to the DBFS API
+ using a get or post request (specified by the argument `method`).
+
+ Parameters
+ ----------
+ method: str
+ Which http method to use for communication; "get" or "post".
+ endpoint: str
+ Where to send the request to (last part of the API URL)
+ json: dict
+ Dictionary of information to send
+ """
+ if method == "post":
+ session_call = self.session.post
+ elif method == "get":
+ session_call = self.session.get
+ else:
+ raise ValueError(f"Do not understand method {method}")
+
+ url = urllib.parse.urljoin(f"https://{self.instance}/api/2.0/dbfs/", endpoint)
+
+ r = session_call(url, json=json)
+
+ # The DBFS API will return a json, also in case of an exception.
+ # We want to preserve this information as good as possible.
+ try:
+ r.raise_for_status()
+ except requests.HTTPError as e:
+ # try to extract json error message
+ # if that fails, fall back to the original exception
+ try:
+ exception_json = e.response.json()
+ except Exception:
+ raise e from None
+
+ raise DatabricksException(**exception_json) from e
+
+ return r.json()
+
+ def _create_handle(self, path, overwrite=True):
+ """
+ Internal function to create a handle, which can be used to
+ write blocks of a file to DBFS.
+ A handle has a unique identifier which needs to be passed
+ whenever written during this transaction.
+ The handle is active for 10 minutes - after that a new
+ write transaction needs to be created.
+ Make sure to close the handle after you are finished.
+
+ Parameters
+ ----------
+ path: str
+ Absolute path for this file.
+ overwrite: bool
+ If a file already exist at this location, either overwrite
+ it or raise an exception.
+ """
+ try:
+ r = self._send_to_api(
+ method="post",
+ endpoint="create",
+ json={"path": path, "overwrite": overwrite},
+ )
+ return r["handle"]
+ except DatabricksException as e:
+ if e.error_code == "RESOURCE_ALREADY_EXISTS":
+ raise FileExistsError(e.message) from e
+
+ raise
+
+ def _close_handle(self, handle):
+ """
+ Close a handle, which was opened by :func:`_create_handle`.
+
+ Parameters
+ ----------
+ handle: str
+ Which handle to close.
+ """
+ try:
+ self._send_to_api(method="post", endpoint="close", json={"handle": handle})
+ except DatabricksException as e:
+ if e.error_code == "RESOURCE_DOES_NOT_EXIST":
+ raise FileNotFoundError(e.message) from e
+
+ raise
+
+ def _add_data(self, handle, data):
+ """
+ Upload data to an already opened file handle
+ (opened by :func:`_create_handle`).
+ The maximal allowed data size is 1MB after
+ conversion to base64.
+ Remember to close the handle when you are finished.
+
+ Parameters
+ ----------
+ handle: str
+ Which handle to upload data to.
+ data: bytes
+ Block of data to add to the handle.
+ """
+ data = base64.b64encode(data).decode()
+ try:
+ self._send_to_api(
+ method="post",
+ endpoint="add-block",
+ json={"handle": handle, "data": data},
+ )
+ except DatabricksException as e:
+ if e.error_code == "RESOURCE_DOES_NOT_EXIST":
+ raise FileNotFoundError(e.message) from e
+ elif e.error_code == "MAX_BLOCK_SIZE_EXCEEDED":
+ raise ValueError(e.message) from e
+
+ raise
+
+ def _get_data(self, path, start, end):
+ """
+ Download data in bytes from a given absolute path in a block
+ from [start, start+length].
+ The maximum number of allowed bytes to read is 1MB.
+
+ Parameters
+ ----------
+ path: str
+ Absolute path to download data from
+ start: int
+ Start position of the block
+ end: int
+ End position of the block
+ """
+ try:
+ r = self._send_to_api(
+ method="get",
+ endpoint="read",
+ json={"path": path, "offset": start, "length": end - start},
+ )
+ return base64.b64decode(r["data"])
+ except DatabricksException as e:
+ if e.error_code == "RESOURCE_DOES_NOT_EXIST":
+ raise FileNotFoundError(e.message) from e
+ elif e.error_code in ["INVALID_PARAMETER_VALUE", "MAX_READ_SIZE_EXCEEDED"]:
+ raise ValueError(e.message) from e
+
+ raise
+
+ def invalidate_cache(self, path=None):
+ if path is None:
+ self.dircache.clear()
+ else:
+ self.dircache.pop(path, None)
+ super().invalidate_cache(path)
+
+
+class DatabricksFile(AbstractBufferedFile):
+ """
+ Helper class for files referenced in the DatabricksFileSystem.
+ """
+
+ DEFAULT_BLOCK_SIZE = 1 * 2**20 # only allowed block size
+
+ def __init__(
+ self,
+ fs,
+ path,
+ mode="rb",
+ block_size="default",
+ autocommit=True,
+ cache_type="readahead",
+ cache_options=None,
+ **kwargs,
+ ):
+ """
+ Create a new instance of the DatabricksFile.
+
+ The blocksize needs to be the default one.
+ """
+ if block_size is None or block_size == "default":
+ block_size = self.DEFAULT_BLOCK_SIZE
+
+ assert (
+ block_size == self.DEFAULT_BLOCK_SIZE
+ ), f"Only the default block size is allowed, not {block_size}"
+
+ super().__init__(
+ fs,
+ path,
+ mode=mode,
+ block_size=block_size,
+ autocommit=autocommit,
+ cache_type=cache_type,
+ cache_options=cache_options or {},
+ **kwargs,
+ )
+
+ def _initiate_upload(self):
+ """Internal function to start a file upload"""
+ self.handle = self.fs._create_handle(self.path)
+
+ def _upload_chunk(self, final=False):
+ """Internal function to add a chunk of data to a started upload"""
+ self.buffer.seek(0)
+ data = self.buffer.getvalue()
+
+ data_chunks = [
+ data[start:end] for start, end in self._to_sized_blocks(len(data))
+ ]
+
+ for data_chunk in data_chunks:
+ self.fs._add_data(handle=self.handle, data=data_chunk)
+
+ if final:
+ self.fs._close_handle(handle=self.handle)
+ return True
+
+ def _fetch_range(self, start, end):
+ """Internal function to download a block of data"""
+ return_buffer = b""
+ length = end - start
+ for chunk_start, chunk_end in self._to_sized_blocks(length, start):
+ return_buffer += self.fs._get_data(
+ path=self.path, start=chunk_start, end=chunk_end
+ )
+
+ return return_buffer
+
+ def _to_sized_blocks(self, length, start=0):
+ """Helper function to split a range from 0 to total_length into bloksizes"""
+ end = start + length
+ for data_chunk in range(start, end, self.blocksize):
+ data_start = data_chunk
+ data_end = min(end, data_chunk + self.blocksize)
+ yield data_start, data_end