aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/uvicorn/supervisors/watchgodreload.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/uvicorn/supervisors/watchgodreload.py')
-rw-r--r--.venv/lib/python3.12/site-packages/uvicorn/supervisors/watchgodreload.py163
1 files changed, 163 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/uvicorn/supervisors/watchgodreload.py b/.venv/lib/python3.12/site-packages/uvicorn/supervisors/watchgodreload.py
new file mode 100644
index 00000000..987909fd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/uvicorn/supervisors/watchgodreload.py
@@ -0,0 +1,163 @@
+from __future__ import annotations
+
+import logging
+import warnings
+from pathlib import Path
+from socket import socket
+from typing import TYPE_CHECKING, Callable
+
+from watchgod import DefaultWatcher
+
+from uvicorn.config import Config
+from uvicorn.supervisors.basereload import BaseReload
+
+if TYPE_CHECKING:
+ import os
+
+ DirEntry = os.DirEntry[str]
+
+logger = logging.getLogger("uvicorn.error")
+
+
+class CustomWatcher(DefaultWatcher):
+ def __init__(self, root_path: Path, config: Config):
+ default_includes = ["*.py"]
+ self.includes = [
+ default
+ for default in default_includes
+ if default not in config.reload_excludes
+ ]
+ self.includes.extend(config.reload_includes)
+ self.includes = list(set(self.includes))
+
+ default_excludes = [".*", ".py[cod]", ".sw.*", "~*"]
+ self.excludes = [
+ default
+ for default in default_excludes
+ if default not in config.reload_includes
+ ]
+ self.excludes.extend(config.reload_excludes)
+ self.excludes = list(set(self.excludes))
+
+ self.watched_dirs: dict[str, bool] = {}
+ self.watched_files: dict[str, bool] = {}
+ self.dirs_includes = set(config.reload_dirs)
+ self.dirs_excludes = set(config.reload_dirs_excludes)
+ self.resolved_root = root_path
+ super().__init__(str(root_path))
+
+ def should_watch_file(self, entry: "DirEntry") -> bool:
+ cached_result = self.watched_files.get(entry.path)
+ if cached_result is not None:
+ return cached_result
+
+ entry_path = Path(entry)
+
+ # cwd is not verified through should_watch_dir, so we need to verify here
+ if entry_path.parent == Path.cwd() and Path.cwd() not in self.dirs_includes:
+ self.watched_files[entry.path] = False
+ return False
+ for include_pattern in self.includes:
+ if str(entry_path).endswith(include_pattern):
+ self.watched_files[entry.path] = True
+ return True
+ if entry_path.match(include_pattern):
+ for exclude_pattern in self.excludes:
+ if entry_path.match(exclude_pattern):
+ self.watched_files[entry.path] = False
+ return False
+ self.watched_files[entry.path] = True
+ return True
+ self.watched_files[entry.path] = False
+ return False
+
+ def should_watch_dir(self, entry: "DirEntry") -> bool:
+ cached_result = self.watched_dirs.get(entry.path)
+ if cached_result is not None:
+ return cached_result
+
+ entry_path = Path(entry)
+
+ if entry_path in self.dirs_excludes:
+ self.watched_dirs[entry.path] = False
+ return False
+
+ for exclude_pattern in self.excludes:
+ if entry_path.match(exclude_pattern):
+ is_watched = False
+ if entry_path in self.dirs_includes:
+ is_watched = True
+
+ for directory in self.dirs_includes:
+ if directory in entry_path.parents:
+ is_watched = True
+
+ if is_watched:
+ logger.debug(
+ "WatchGodReload detected a new excluded dir '%s' in '%s'; "
+ "Adding to exclude list.",
+ entry_path.relative_to(self.resolved_root),
+ str(self.resolved_root),
+ )
+ self.watched_dirs[entry.path] = False
+ self.dirs_excludes.add(entry_path)
+ return False
+
+ if entry_path in self.dirs_includes:
+ self.watched_dirs[entry.path] = True
+ return True
+
+ for directory in self.dirs_includes:
+ if directory in entry_path.parents:
+ self.watched_dirs[entry.path] = True
+ return True
+
+ for include_pattern in self.includes:
+ if entry_path.match(include_pattern):
+ logger.info(
+ "WatchGodReload detected a new reload dir '%s' in '%s'; "
+ "Adding to watch list.",
+ str(entry_path.relative_to(self.resolved_root)),
+ str(self.resolved_root),
+ )
+ self.dirs_includes.add(entry_path)
+ self.watched_dirs[entry.path] = True
+ return True
+
+ self.watched_dirs[entry.path] = False
+ return False
+
+
+class WatchGodReload(BaseReload):
+ def __init__(
+ self,
+ config: Config,
+ target: Callable[[list[socket] | None], None],
+ sockets: list[socket],
+ ) -> None:
+ warnings.warn(
+ '"watchgod" is deprecated, you should switch '
+ "to watchfiles (`pip install watchfiles`).",
+ DeprecationWarning,
+ )
+ super().__init__(config, target, sockets)
+ self.reloader_name = "WatchGod"
+ self.watchers = []
+ reload_dirs = []
+ for directory in config.reload_dirs:
+ if Path.cwd() not in directory.parents:
+ reload_dirs.append(directory)
+ if Path.cwd() not in reload_dirs:
+ reload_dirs.append(Path.cwd())
+ for w in reload_dirs:
+ self.watchers.append(CustomWatcher(w.resolve(), self.config))
+
+ def should_restart(self) -> list[Path] | None:
+ self.pause()
+
+ for watcher in self.watchers:
+ change = watcher.check()
+ if change != set():
+ return list({Path(c[1]) for c in change})
+
+ return None