about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/huggingface_hub/_commit_scheduler.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/huggingface_hub/_commit_scheduler.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/huggingface_hub/_commit_scheduler.py')
-rw-r--r--.venv/lib/python3.12/site-packages/huggingface_hub/_commit_scheduler.py353
1 files changed, 353 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/huggingface_hub/_commit_scheduler.py b/.venv/lib/python3.12/site-packages/huggingface_hub/_commit_scheduler.py
new file mode 100644
index 00000000..f1f20339
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/huggingface_hub/_commit_scheduler.py
@@ -0,0 +1,353 @@
+import atexit
+import logging
+import os
+import time
+from concurrent.futures import Future
+from dataclasses import dataclass
+from io import SEEK_END, SEEK_SET, BytesIO
+from pathlib import Path
+from threading import Lock, Thread
+from typing import Dict, List, Optional, Union
+
+from .hf_api import DEFAULT_IGNORE_PATTERNS, CommitInfo, CommitOperationAdd, HfApi
+from .utils import filter_repo_objects
+
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass(frozen=True)
+class _FileToUpload:
+    """Temporary dataclass to store info about files to upload. Not meant to be used directly."""
+
+    local_path: Path
+    path_in_repo: str
+    size_limit: int
+    last_modified: float
+
+
+class CommitScheduler:
+    """
+    Scheduler to upload a local folder to the Hub at regular intervals (e.g. push to hub every 5 minutes).
+
+    The recommended way to use the scheduler is to use it as a context manager. This ensures that the scheduler is
+    properly stopped and the last commit is triggered when the script ends. The scheduler can also be stopped manually
+    with the `stop` method. Checkout the [upload guide](https://huggingface.co/docs/huggingface_hub/guides/upload#scheduled-uploads)
+    to learn more about how to use it.
+
+    Args:
+        repo_id (`str`):
+            The id of the repo to commit to.
+        folder_path (`str` or `Path`):
+            Path to the local folder to upload regularly.
+        every (`int` or `float`, *optional*):
+            The number of minutes between each commit. Defaults to 5 minutes.
+        path_in_repo (`str`, *optional*):
+            Relative path of the directory in the repo, for example: `"checkpoints/"`. Defaults to the root folder
+            of the repository.
+        repo_type (`str`, *optional*):
+            The type of the repo to commit to. Defaults to `model`.
+        revision (`str`, *optional*):
+            The revision of the repo to commit to. Defaults to `main`.
+        private (`bool`, *optional*):
+            Whether to make the repo private. If `None` (default), the repo will be public unless the organization's default is private. This value is ignored if the repo already exists.
+        token (`str`, *optional*):
+            The token to use to commit to the repo. Defaults to the token saved on the machine.
+        allow_patterns (`List[str]` or `str`, *optional*):
+            If provided, only files matching at least one pattern are uploaded.
+        ignore_patterns (`List[str]` or `str`, *optional*):
+            If provided, files matching any of the patterns are not uploaded.
+        squash_history (`bool`, *optional*):
+            Whether to squash the history of the repo after each commit. Defaults to `False`. Squashing commits is
+            useful to avoid degraded performances on the repo when it grows too large.
+        hf_api (`HfApi`, *optional*):
+            The [`HfApi`] client to use to commit to the Hub. Can be set with custom settings (user agent, token,...).
+
+    Example:
+    ```py
+    >>> from pathlib import Path
+    >>> from huggingface_hub import CommitScheduler
+
+    # Scheduler uploads every 10 minutes
+    >>> csv_path = Path("watched_folder/data.csv")
+    >>> CommitScheduler(repo_id="test_scheduler", repo_type="dataset", folder_path=csv_path.parent, every=10)
+
+    >>> with csv_path.open("a") as f:
+    ...     f.write("first line")
+
+    # Some time later (...)
+    >>> with csv_path.open("a") as f:
+    ...     f.write("second line")
+    ```
+
+    Example using a context manager:
+    ```py
+    >>> from pathlib import Path
+    >>> from huggingface_hub import CommitScheduler
+
+    >>> with CommitScheduler(repo_id="test_scheduler", repo_type="dataset", folder_path="watched_folder", every=10) as scheduler:
+    ...     csv_path = Path("watched_folder/data.csv")
+    ...     with csv_path.open("a") as f:
+    ...         f.write("first line")
+    ...     (...)
+    ...     with csv_path.open("a") as f:
+    ...         f.write("second line")
+
+    # Scheduler is now stopped and last commit have been triggered
+    ```
+    """
+
+    def __init__(
+        self,
+        *,
+        repo_id: str,
+        folder_path: Union[str, Path],
+        every: Union[int, float] = 5,
+        path_in_repo: Optional[str] = None,
+        repo_type: Optional[str] = None,
+        revision: Optional[str] = None,
+        private: Optional[bool] = None,
+        token: Optional[str] = None,
+        allow_patterns: Optional[Union[List[str], str]] = None,
+        ignore_patterns: Optional[Union[List[str], str]] = None,
+        squash_history: bool = False,
+        hf_api: Optional["HfApi"] = None,
+    ) -> None:
+        self.api = hf_api or HfApi(token=token)
+
+        # Folder
+        self.folder_path = Path(folder_path).expanduser().resolve()
+        self.path_in_repo = path_in_repo or ""
+        self.allow_patterns = allow_patterns
+
+        if ignore_patterns is None:
+            ignore_patterns = []
+        elif isinstance(ignore_patterns, str):
+            ignore_patterns = [ignore_patterns]
+        self.ignore_patterns = ignore_patterns + DEFAULT_IGNORE_PATTERNS
+
+        if self.folder_path.is_file():
+            raise ValueError(f"'folder_path' must be a directory, not a file: '{self.folder_path}'.")
+        self.folder_path.mkdir(parents=True, exist_ok=True)
+
+        # Repository
+        repo_url = self.api.create_repo(repo_id=repo_id, private=private, repo_type=repo_type, exist_ok=True)
+        self.repo_id = repo_url.repo_id
+        self.repo_type = repo_type
+        self.revision = revision
+        self.token = token
+
+        # Keep track of already uploaded files
+        self.last_uploaded: Dict[Path, float] = {}  # key is local path, value is timestamp
+
+        # Scheduler
+        if not every > 0:
+            raise ValueError(f"'every' must be a positive integer, not '{every}'.")
+        self.lock = Lock()
+        self.every = every
+        self.squash_history = squash_history
+
+        logger.info(f"Scheduled job to push '{self.folder_path}' to '{self.repo_id}' every {self.every} minutes.")
+        self._scheduler_thread = Thread(target=self._run_scheduler, daemon=True)
+        self._scheduler_thread.start()
+        atexit.register(self._push_to_hub)
+
+        self.__stopped = False
+
+    def stop(self) -> None:
+        """Stop the scheduler.
+
+        A stopped scheduler cannot be restarted. Mostly for tests purposes.
+        """
+        self.__stopped = True
+
+    def __enter__(self) -> "CommitScheduler":
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback) -> None:
+        # Upload last changes before exiting
+        self.trigger().result()
+        self.stop()
+        return
+
+    def _run_scheduler(self) -> None:
+        """Dumb thread waiting between each scheduled push to Hub."""
+        while True:
+            self.last_future = self.trigger()
+            time.sleep(self.every * 60)
+            if self.__stopped:
+                break
+
+    def trigger(self) -> Future:
+        """Trigger a `push_to_hub` and return a future.
+
+        This method is automatically called every `every` minutes. You can also call it manually to trigger a commit
+        immediately, without waiting for the next scheduled commit.
+        """
+        return self.api.run_as_future(self._push_to_hub)
+
+    def _push_to_hub(self) -> Optional[CommitInfo]:
+        if self.__stopped:  # If stopped, already scheduled commits are ignored
+            return None
+
+        logger.info("(Background) scheduled commit triggered.")
+        try:
+            value = self.push_to_hub()
+            if self.squash_history:
+                logger.info("(Background) squashing repo history.")
+                self.api.super_squash_history(repo_id=self.repo_id, repo_type=self.repo_type, branch=self.revision)
+            return value
+        except Exception as e:
+            logger.error(f"Error while pushing to Hub: {e}")  # Depending on the setup, error might be silenced
+            raise
+
+    def push_to_hub(self) -> Optional[CommitInfo]:
+        """
+        Push folder to the Hub and return the commit info.
+
+        <Tip warning={true}>
+
+        This method is not meant to be called directly. It is run in the background by the scheduler, respecting a
+        queue mechanism to avoid concurrent commits. Making a direct call to the method might lead to concurrency
+        issues.
+
+        </Tip>
+
+        The default behavior of `push_to_hub` is to assume an append-only folder. It lists all files in the folder and
+        uploads only changed files. If no changes are found, the method returns without committing anything. If you want
+        to change this behavior, you can inherit from [`CommitScheduler`] and override this method. This can be useful
+        for example to compress data together in a single file before committing. For more details and examples, check
+        out our [integration guide](https://huggingface.co/docs/huggingface_hub/main/en/guides/upload#scheduled-uploads).
+        """
+        # Check files to upload (with lock)
+        with self.lock:
+            logger.debug("Listing files to upload for scheduled commit.")
+
+            # List files from folder (taken from `_prepare_upload_folder_additions`)
+            relpath_to_abspath = {
+                path.relative_to(self.folder_path).as_posix(): path
+                for path in sorted(self.folder_path.glob("**/*"))  # sorted to be deterministic
+                if path.is_file()
+            }
+            prefix = f"{self.path_in_repo.strip('/')}/" if self.path_in_repo else ""
+
+            # Filter with pattern + filter out unchanged files + retrieve current file size
+            files_to_upload: List[_FileToUpload] = []
+            for relpath in filter_repo_objects(
+                relpath_to_abspath.keys(), allow_patterns=self.allow_patterns, ignore_patterns=self.ignore_patterns
+            ):
+                local_path = relpath_to_abspath[relpath]
+                stat = local_path.stat()
+                if self.last_uploaded.get(local_path) is None or self.last_uploaded[local_path] != stat.st_mtime:
+                    files_to_upload.append(
+                        _FileToUpload(
+                            local_path=local_path,
+                            path_in_repo=prefix + relpath,
+                            size_limit=stat.st_size,
+                            last_modified=stat.st_mtime,
+                        )
+                    )
+
+        # Return if nothing to upload
+        if len(files_to_upload) == 0:
+            logger.debug("Dropping schedule commit: no changed file to upload.")
+            return None
+
+        # Convert `_FileToUpload` as `CommitOperationAdd` (=> compute file shas + limit to file size)
+        logger.debug("Removing unchanged files since previous scheduled commit.")
+        add_operations = [
+            CommitOperationAdd(
+                # Cap the file to its current size, even if the user append data to it while a scheduled commit is happening
+                path_or_fileobj=PartialFileIO(file_to_upload.local_path, size_limit=file_to_upload.size_limit),
+                path_in_repo=file_to_upload.path_in_repo,
+            )
+            for file_to_upload in files_to_upload
+        ]
+
+        # Upload files (append mode expected - no need for lock)
+        logger.debug("Uploading files for scheduled commit.")
+        commit_info = self.api.create_commit(
+            repo_id=self.repo_id,
+            repo_type=self.repo_type,
+            operations=add_operations,
+            commit_message="Scheduled Commit",
+            revision=self.revision,
+        )
+
+        # Successful commit: keep track of the latest "last_modified" for each file
+        for file in files_to_upload:
+            self.last_uploaded[file.local_path] = file.last_modified
+        return commit_info
+
+
+class PartialFileIO(BytesIO):
+    """A file-like object that reads only the first part of a file.
+
+    Useful to upload a file to the Hub when the user might still be appending data to it. Only the first part of the
+    file is uploaded (i.e. the part that was available when the filesystem was first scanned).
+
+    In practice, only used internally by the CommitScheduler to regularly push a folder to the Hub with minimal
+    disturbance for the user. The object is passed to `CommitOperationAdd`.
+
+    Only supports `read`, `tell` and `seek` methods.
+
+    Args:
+        file_path (`str` or `Path`):
+            Path to the file to read.
+        size_limit (`int`):
+            The maximum number of bytes to read from the file. If the file is larger than this, only the first part
+            will be read (and uploaded).
+    """
+
+    def __init__(self, file_path: Union[str, Path], size_limit: int) -> None:
+        self._file_path = Path(file_path)
+        self._file = self._file_path.open("rb")
+        self._size_limit = min(size_limit, os.fstat(self._file.fileno()).st_size)
+
+    def __del__(self) -> None:
+        self._file.close()
+        return super().__del__()
+
+    def __repr__(self) -> str:
+        return f"<PartialFileIO file_path={self._file_path} size_limit={self._size_limit}>"
+
+    def __len__(self) -> int:
+        return self._size_limit
+
+    def __getattribute__(self, name: str):
+        if name.startswith("_") or name in ("read", "tell", "seek"):  # only 3 public methods supported
+            return super().__getattribute__(name)
+        raise NotImplementedError(f"PartialFileIO does not support '{name}'.")
+
+    def tell(self) -> int:
+        """Return the current file position."""
+        return self._file.tell()
+
+    def seek(self, __offset: int, __whence: int = SEEK_SET) -> int:
+        """Change the stream position to the given offset.
+
+        Behavior is the same as a regular file, except that the position is capped to the size limit.
+        """
+        if __whence == SEEK_END:
+            # SEEK_END => set from the truncated end
+            __offset = len(self) + __offset
+            __whence = SEEK_SET
+
+        pos = self._file.seek(__offset, __whence)
+        if pos > self._size_limit:
+            return self._file.seek(self._size_limit)
+        return pos
+
+    def read(self, __size: Optional[int] = -1) -> bytes:
+        """Read at most `__size` bytes from the file.
+
+        Behavior is the same as a regular file, except that it is capped to the size limit.
+        """
+        current = self._file.tell()
+        if __size is None or __size < 0:
+            # Read until file limit
+            truncated_size = self._size_limit - current
+        else:
+            # Read until file limit or __size
+            truncated_size = min(__size, self._size_limit - current)
+        return self._file.read(truncated_size)