diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/uvicorn/supervisors/watchgodreload.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/uvicorn/supervisors/watchgodreload.py | 163 |
1 files changed, 163 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/uvicorn/supervisors/watchgodreload.py b/.venv/lib/python3.12/site-packages/uvicorn/supervisors/watchgodreload.py new file mode 100644 index 00000000..987909fd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/uvicorn/supervisors/watchgodreload.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +import logging +import warnings +from pathlib import Path +from socket import socket +from typing import TYPE_CHECKING, Callable + +from watchgod import DefaultWatcher + +from uvicorn.config import Config +from uvicorn.supervisors.basereload import BaseReload + +if TYPE_CHECKING: + import os + + DirEntry = os.DirEntry[str] + +logger = logging.getLogger("uvicorn.error") + + +class CustomWatcher(DefaultWatcher): + def __init__(self, root_path: Path, config: Config): + default_includes = ["*.py"] + self.includes = [ + default + for default in default_includes + if default not in config.reload_excludes + ] + self.includes.extend(config.reload_includes) + self.includes = list(set(self.includes)) + + default_excludes = [".*", ".py[cod]", ".sw.*", "~*"] + self.excludes = [ + default + for default in default_excludes + if default not in config.reload_includes + ] + self.excludes.extend(config.reload_excludes) + self.excludes = list(set(self.excludes)) + + self.watched_dirs: dict[str, bool] = {} + self.watched_files: dict[str, bool] = {} + self.dirs_includes = set(config.reload_dirs) + self.dirs_excludes = set(config.reload_dirs_excludes) + self.resolved_root = root_path + super().__init__(str(root_path)) + + def should_watch_file(self, entry: "DirEntry") -> bool: + cached_result = self.watched_files.get(entry.path) + if cached_result is not None: + return cached_result + + entry_path = Path(entry) + + # cwd is not verified through should_watch_dir, so we need to verify here + if entry_path.parent == Path.cwd() and Path.cwd() not in self.dirs_includes: + self.watched_files[entry.path] = False + return False + for include_pattern in self.includes: + if str(entry_path).endswith(include_pattern): + self.watched_files[entry.path] = True + return True + if entry_path.match(include_pattern): + for exclude_pattern in self.excludes: + if entry_path.match(exclude_pattern): + self.watched_files[entry.path] = False + return False + self.watched_files[entry.path] = True + return True + self.watched_files[entry.path] = False + return False + + def should_watch_dir(self, entry: "DirEntry") -> bool: + cached_result = self.watched_dirs.get(entry.path) + if cached_result is not None: + return cached_result + + entry_path = Path(entry) + + if entry_path in self.dirs_excludes: + self.watched_dirs[entry.path] = False + return False + + for exclude_pattern in self.excludes: + if entry_path.match(exclude_pattern): + is_watched = False + if entry_path in self.dirs_includes: + is_watched = True + + for directory in self.dirs_includes: + if directory in entry_path.parents: + is_watched = True + + if is_watched: + logger.debug( + "WatchGodReload detected a new excluded dir '%s' in '%s'; " + "Adding to exclude list.", + entry_path.relative_to(self.resolved_root), + str(self.resolved_root), + ) + self.watched_dirs[entry.path] = False + self.dirs_excludes.add(entry_path) + return False + + if entry_path in self.dirs_includes: + self.watched_dirs[entry.path] = True + return True + + for directory in self.dirs_includes: + if directory in entry_path.parents: + self.watched_dirs[entry.path] = True + return True + + for include_pattern in self.includes: + if entry_path.match(include_pattern): + logger.info( + "WatchGodReload detected a new reload dir '%s' in '%s'; " + "Adding to watch list.", + str(entry_path.relative_to(self.resolved_root)), + str(self.resolved_root), + ) + self.dirs_includes.add(entry_path) + self.watched_dirs[entry.path] = True + return True + + self.watched_dirs[entry.path] = False + return False + + +class WatchGodReload(BaseReload): + def __init__( + self, + config: Config, + target: Callable[[list[socket] | None], None], + sockets: list[socket], + ) -> None: + warnings.warn( + '"watchgod" is deprecated, you should switch ' + "to watchfiles (`pip install watchfiles`).", + DeprecationWarning, + ) + super().__init__(config, target, sockets) + self.reloader_name = "WatchGod" + self.watchers = [] + reload_dirs = [] + for directory in config.reload_dirs: + if Path.cwd() not in directory.parents: + reload_dirs.append(directory) + if Path.cwd() not in reload_dirs: + reload_dirs.append(Path.cwd()) + for w in reload_dirs: + self.watchers.append(CustomWatcher(w.resolve(), self.config)) + + def should_restart(self) -> list[Path] | None: + self.pause() + + for watcher in self.watchers: + change = watcher.check() + if change != set(): + return list({Path(c[1]) for c in change}) + + return None |