diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/huggingface_hub/lfs.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/huggingface_hub/lfs.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/huggingface_hub/lfs.py | 460 |
1 files changed, 460 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/huggingface_hub/lfs.py b/.venv/lib/python3.12/site-packages/huggingface_hub/lfs.py new file mode 100644 index 00000000..c2d4f368 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/huggingface_hub/lfs.py @@ -0,0 +1,460 @@ +# coding=utf-8 +# Copyright 2019-present, the HuggingFace Inc. team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Git LFS related type definitions and utilities""" + +import inspect +import io +import re +import warnings +from dataclasses import dataclass +from math import ceil +from os.path import getsize +from pathlib import Path +from typing import TYPE_CHECKING, BinaryIO, Dict, Iterable, List, Optional, Tuple, TypedDict +from urllib.parse import unquote + +from huggingface_hub import constants + +from .utils import ( + build_hf_headers, + fix_hf_endpoint_in_url, + get_session, + hf_raise_for_status, + http_backoff, + logging, + tqdm, + validate_hf_hub_args, +) +from .utils._lfs import SliceFileObj +from .utils.sha import sha256, sha_fileobj +from .utils.tqdm import is_tqdm_disabled + + +if TYPE_CHECKING: + from ._commit_api import CommitOperationAdd + +logger = logging.get_logger(__name__) + +OID_REGEX = re.compile(r"^[0-9a-f]{40}$") + +LFS_MULTIPART_UPLOAD_COMMAND = "lfs-multipart-upload" + +LFS_HEADERS = { + "Accept": "application/vnd.git-lfs+json", + "Content-Type": "application/vnd.git-lfs+json", +} + + +@dataclass +class UploadInfo: + """ + Dataclass holding required information to determine whether a blob + should be uploaded to the hub using the LFS protocol or the regular protocol + + Args: + sha256 (`bytes`): + SHA256 hash of the blob + size (`int`): + Size in bytes of the blob + sample (`bytes`): + First 512 bytes of the blob + """ + + sha256: bytes + size: int + sample: bytes + + @classmethod + def from_path(cls, path: str): + size = getsize(path) + with io.open(path, "rb") as file: + sample = file.peek(512)[:512] + sha = sha_fileobj(file) + return cls(size=size, sha256=sha, sample=sample) + + @classmethod + def from_bytes(cls, data: bytes): + sha = sha256(data).digest() + return cls(size=len(data), sample=data[:512], sha256=sha) + + @classmethod + def from_fileobj(cls, fileobj: BinaryIO): + sample = fileobj.read(512) + fileobj.seek(0, io.SEEK_SET) + sha = sha_fileobj(fileobj) + size = fileobj.tell() + fileobj.seek(0, io.SEEK_SET) + return cls(size=size, sha256=sha, sample=sample) + + +@validate_hf_hub_args +def post_lfs_batch_info( + upload_infos: Iterable[UploadInfo], + token: Optional[str], + repo_type: str, + repo_id: str, + revision: Optional[str] = None, + endpoint: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, +) -> Tuple[List[dict], List[dict]]: + """ + Requests the LFS batch endpoint to retrieve upload instructions + + Learn more: https://github.com/git-lfs/git-lfs/blob/main/docs/api/batch.md + + Args: + upload_infos (`Iterable` of `UploadInfo`): + `UploadInfo` for the files that are being uploaded, typically obtained + from `CommitOperationAdd.upload_info` + repo_type (`str`): + Type of the repo to upload to: `"model"`, `"dataset"` or `"space"`. + repo_id (`str`): + A namespace (user or an organization) and a repo name separated + by a `/`. + revision (`str`, *optional*): + The git revision to upload to. + headers (`dict`, *optional*): + Additional headers to include in the request + + Returns: + `LfsBatchInfo`: 2-tuple: + - First element is the list of upload instructions from the server + - Second element is an list of errors, if any + + Raises: + [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError) + If an argument is invalid or the server response is malformed. + [`HTTPError`](https://requests.readthedocs.io/en/latest/api/#requests.HTTPError) + If the server returned an error. + """ + endpoint = endpoint if endpoint is not None else constants.ENDPOINT + url_prefix = "" + if repo_type in constants.REPO_TYPES_URL_PREFIXES: + url_prefix = constants.REPO_TYPES_URL_PREFIXES[repo_type] + batch_url = f"{endpoint}/{url_prefix}{repo_id}.git/info/lfs/objects/batch" + payload: Dict = { + "operation": "upload", + "transfers": ["basic", "multipart"], + "objects": [ + { + "oid": upload.sha256.hex(), + "size": upload.size, + } + for upload in upload_infos + ], + "hash_algo": "sha256", + } + if revision is not None: + payload["ref"] = {"name": unquote(revision)} # revision has been previously 'quoted' + + headers = { + **LFS_HEADERS, + **build_hf_headers(token=token), + **(headers or {}), + } + resp = get_session().post(batch_url, headers=headers, json=payload) + hf_raise_for_status(resp) + batch_info = resp.json() + + objects = batch_info.get("objects", None) + if not isinstance(objects, list): + raise ValueError("Malformed response from server") + + return ( + [_validate_batch_actions(obj) for obj in objects if "error" not in obj], + [_validate_batch_error(obj) for obj in objects if "error" in obj], + ) + + +class PayloadPartT(TypedDict): + partNumber: int + etag: str + + +class CompletionPayloadT(TypedDict): + """Payload that will be sent to the Hub when uploading multi-part.""" + + oid: str + parts: List[PayloadPartT] + + +def lfs_upload( + operation: "CommitOperationAdd", + lfs_batch_action: Dict, + token: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + endpoint: Optional[str] = None, +) -> None: + """ + Handles uploading a given object to the Hub with the LFS protocol. + + Can be a No-op if the content of the file is already present on the hub large file storage. + + Args: + operation (`CommitOperationAdd`): + The add operation triggering this upload. + lfs_batch_action (`dict`): + Upload instructions from the LFS batch endpoint for this object. See [`~utils.lfs.post_lfs_batch_info`] for + more details. + headers (`dict`, *optional*): + Headers to include in the request, including authentication and user agent headers. + + Raises: + [`ValueError`](https://docs.python.org/3/library/exceptions.html#ValueError) + If `lfs_batch_action` is improperly formatted + [`HTTPError`](https://requests.readthedocs.io/en/latest/api/#requests.HTTPError) + If the upload resulted in an error + """ + # 0. If LFS file is already present, skip upload + _validate_batch_actions(lfs_batch_action) + actions = lfs_batch_action.get("actions") + if actions is None: + # The file was already uploaded + logger.debug(f"Content of file {operation.path_in_repo} is already present upstream - skipping upload") + return + + # 1. Validate server response (check required keys in dict) + upload_action = lfs_batch_action["actions"]["upload"] + _validate_lfs_action(upload_action) + verify_action = lfs_batch_action["actions"].get("verify") + if verify_action is not None: + _validate_lfs_action(verify_action) + + # 2. Upload file (either single part or multi-part) + header = upload_action.get("header", {}) + chunk_size = header.get("chunk_size") + upload_url = fix_hf_endpoint_in_url(upload_action["href"], endpoint=endpoint) + if chunk_size is not None: + try: + chunk_size = int(chunk_size) + except (ValueError, TypeError): + raise ValueError( + f"Malformed response from LFS batch endpoint: `chunk_size` should be an integer. Got '{chunk_size}'." + ) + _upload_multi_part(operation=operation, header=header, chunk_size=chunk_size, upload_url=upload_url) + else: + _upload_single_part(operation=operation, upload_url=upload_url) + + # 3. Verify upload went well + if verify_action is not None: + _validate_lfs_action(verify_action) + verify_url = fix_hf_endpoint_in_url(verify_action["href"], endpoint) + verify_resp = get_session().post( + verify_url, + headers=build_hf_headers(token=token, headers=headers), + json={"oid": operation.upload_info.sha256.hex(), "size": operation.upload_info.size}, + ) + hf_raise_for_status(verify_resp) + logger.debug(f"{operation.path_in_repo}: Upload successful") + + +def _validate_lfs_action(lfs_action: dict): + """validates response from the LFS batch endpoint""" + if not ( + isinstance(lfs_action.get("href"), str) + and (lfs_action.get("header") is None or isinstance(lfs_action.get("header"), dict)) + ): + raise ValueError("lfs_action is improperly formatted") + return lfs_action + + +def _validate_batch_actions(lfs_batch_actions: dict): + """validates response from the LFS batch endpoint""" + if not (isinstance(lfs_batch_actions.get("oid"), str) and isinstance(lfs_batch_actions.get("size"), int)): + raise ValueError("lfs_batch_actions is improperly formatted") + + upload_action = lfs_batch_actions.get("actions", {}).get("upload") + verify_action = lfs_batch_actions.get("actions", {}).get("verify") + if upload_action is not None: + _validate_lfs_action(upload_action) + if verify_action is not None: + _validate_lfs_action(verify_action) + return lfs_batch_actions + + +def _validate_batch_error(lfs_batch_error: dict): + """validates response from the LFS batch endpoint""" + if not (isinstance(lfs_batch_error.get("oid"), str) and isinstance(lfs_batch_error.get("size"), int)): + raise ValueError("lfs_batch_error is improperly formatted") + error_info = lfs_batch_error.get("error") + if not ( + isinstance(error_info, dict) + and isinstance(error_info.get("message"), str) + and isinstance(error_info.get("code"), int) + ): + raise ValueError("lfs_batch_error is improperly formatted") + return lfs_batch_error + + +def _upload_single_part(operation: "CommitOperationAdd", upload_url: str) -> None: + """ + Uploads `fileobj` as a single PUT HTTP request (basic LFS transfer protocol) + + Args: + upload_url (`str`): + The URL to PUT the file to. + fileobj: + The file-like object holding the data to upload. + + Returns: `requests.Response` + + Raises: + [`HTTPError`](https://requests.readthedocs.io/en/latest/api/#requests.HTTPError) + If the upload resulted in an error. + """ + with operation.as_file(with_tqdm=True) as fileobj: + # S3 might raise a transient 500 error -> let's retry if that happens + response = http_backoff("PUT", upload_url, data=fileobj, retry_on_status_codes=(500, 502, 503, 504)) + hf_raise_for_status(response) + + +def _upload_multi_part(operation: "CommitOperationAdd", header: Dict, chunk_size: int, upload_url: str) -> None: + """ + Uploads file using HF multipart LFS transfer protocol. + """ + # 1. Get upload URLs for each part + sorted_parts_urls = _get_sorted_parts_urls(header=header, upload_info=operation.upload_info, chunk_size=chunk_size) + + # 2. Upload parts (either with hf_transfer or in pure Python) + use_hf_transfer = constants.HF_HUB_ENABLE_HF_TRANSFER + if ( + constants.HF_HUB_ENABLE_HF_TRANSFER + and not isinstance(operation.path_or_fileobj, str) + and not isinstance(operation.path_or_fileobj, Path) + ): + warnings.warn( + "hf_transfer is enabled but does not support uploading from bytes or BinaryIO, falling back to regular" + " upload" + ) + use_hf_transfer = False + + response_headers = ( + _upload_parts_hf_transfer(operation=operation, sorted_parts_urls=sorted_parts_urls, chunk_size=chunk_size) + if use_hf_transfer + else _upload_parts_iteratively(operation=operation, sorted_parts_urls=sorted_parts_urls, chunk_size=chunk_size) + ) + + # 3. Send completion request + completion_res = get_session().post( + upload_url, + json=_get_completion_payload(response_headers, operation.upload_info.sha256.hex()), + headers=LFS_HEADERS, + ) + hf_raise_for_status(completion_res) + + +def _get_sorted_parts_urls(header: Dict, upload_info: UploadInfo, chunk_size: int) -> List[str]: + sorted_part_upload_urls = [ + upload_url + for _, upload_url in sorted( + [ + (int(part_num, 10), upload_url) + for part_num, upload_url in header.items() + if part_num.isdigit() and len(part_num) > 0 + ], + key=lambda t: t[0], + ) + ] + num_parts = len(sorted_part_upload_urls) + if num_parts != ceil(upload_info.size / chunk_size): + raise ValueError("Invalid server response to upload large LFS file") + return sorted_part_upload_urls + + +def _get_completion_payload(response_headers: List[Dict], oid: str) -> CompletionPayloadT: + parts: List[PayloadPartT] = [] + for part_number, header in enumerate(response_headers): + etag = header.get("etag") + if etag is None or etag == "": + raise ValueError(f"Invalid etag (`{etag}`) returned for part {part_number + 1}") + parts.append( + { + "partNumber": part_number + 1, + "etag": etag, + } + ) + return {"oid": oid, "parts": parts} + + +def _upload_parts_iteratively( + operation: "CommitOperationAdd", sorted_parts_urls: List[str], chunk_size: int +) -> List[Dict]: + headers = [] + with operation.as_file(with_tqdm=True) as fileobj: + for part_idx, part_upload_url in enumerate(sorted_parts_urls): + with SliceFileObj( + fileobj, + seek_from=chunk_size * part_idx, + read_limit=chunk_size, + ) as fileobj_slice: + # S3 might raise a transient 500 error -> let's retry if that happens + part_upload_res = http_backoff( + "PUT", part_upload_url, data=fileobj_slice, retry_on_status_codes=(500, 502, 503, 504) + ) + hf_raise_for_status(part_upload_res) + headers.append(part_upload_res.headers) + return headers # type: ignore + + +def _upload_parts_hf_transfer( + operation: "CommitOperationAdd", sorted_parts_urls: List[str], chunk_size: int +) -> List[Dict]: + # Upload file using an external Rust-based package. Upload is faster but support less features (no progress bars). + try: + from hf_transfer import multipart_upload + except ImportError: + raise ValueError( + "Fast uploading using 'hf_transfer' is enabled (HF_HUB_ENABLE_HF_TRANSFER=1) but 'hf_transfer' package is" + " not available in your environment. Try `pip install hf_transfer`." + ) + + supports_callback = "callback" in inspect.signature(multipart_upload).parameters + if not supports_callback: + warnings.warn( + "You are using an outdated version of `hf_transfer`. Consider upgrading to latest version to enable progress bars using `pip install -U hf_transfer`." + ) + + total = operation.upload_info.size + desc = operation.path_in_repo + if len(desc) > 40: + desc = f"(…){desc[-40:]}" + + with tqdm( + unit="B", + unit_scale=True, + total=total, + initial=0, + desc=desc, + disable=is_tqdm_disabled(logger.getEffectiveLevel()), + name="huggingface_hub.lfs_upload", + ) as progress: + try: + output = multipart_upload( + file_path=operation.path_or_fileobj, + parts_urls=sorted_parts_urls, + chunk_size=chunk_size, + max_files=128, + parallel_failures=127, # could be removed + max_retries=5, + **({"callback": progress.update} if supports_callback else {}), + ) + except Exception as e: + raise RuntimeError( + "An error occurred while uploading using `hf_transfer`. Consider disabling HF_HUB_ENABLE_HF_TRANSFER for" + " better error handling." + ) from e + if not supports_callback: + progress.update(total) + return output |