aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/pip/_internal/network/download.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/pip/_internal/network/download.py')
-rw-r--r--.venv/lib/python3.12/site-packages/pip/_internal/network/download.py187
1 files changed, 187 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pip/_internal/network/download.py b/.venv/lib/python3.12/site-packages/pip/_internal/network/download.py
new file mode 100644
index 00000000..5c3bce3d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pip/_internal/network/download.py
@@ -0,0 +1,187 @@
+"""Download files with progress indicators.
+"""
+
+import email.message
+import logging
+import mimetypes
+import os
+from typing import Iterable, Optional, Tuple
+
+from pip._vendor.requests.models import Response
+
+from pip._internal.cli.progress_bars import get_download_progress_renderer
+from pip._internal.exceptions import NetworkConnectionError
+from pip._internal.models.index import PyPI
+from pip._internal.models.link import Link
+from pip._internal.network.cache import is_from_cache
+from pip._internal.network.session import PipSession
+from pip._internal.network.utils import HEADERS, raise_for_status, response_chunks
+from pip._internal.utils.misc import format_size, redact_auth_from_url, splitext
+
+logger = logging.getLogger(__name__)
+
+
+def _get_http_response_size(resp: Response) -> Optional[int]:
+ try:
+ return int(resp.headers["content-length"])
+ except (ValueError, KeyError, TypeError):
+ return None
+
+
+def _prepare_download(
+ resp: Response,
+ link: Link,
+ progress_bar: str,
+) -> Iterable[bytes]:
+ total_length = _get_http_response_size(resp)
+
+ if link.netloc == PyPI.file_storage_domain:
+ url = link.show_url
+ else:
+ url = link.url_without_fragment
+
+ logged_url = redact_auth_from_url(url)
+
+ if total_length:
+ logged_url = f"{logged_url} ({format_size(total_length)})"
+
+ if is_from_cache(resp):
+ logger.info("Using cached %s", logged_url)
+ else:
+ logger.info("Downloading %s", logged_url)
+
+ if logger.getEffectiveLevel() > logging.INFO:
+ show_progress = False
+ elif is_from_cache(resp):
+ show_progress = False
+ elif not total_length:
+ show_progress = True
+ elif total_length > (512 * 1024):
+ show_progress = True
+ else:
+ show_progress = False
+
+ chunks = response_chunks(resp)
+
+ if not show_progress:
+ return chunks
+
+ renderer = get_download_progress_renderer(bar_type=progress_bar, size=total_length)
+ return renderer(chunks)
+
+
+def sanitize_content_filename(filename: str) -> str:
+ """
+ Sanitize the "filename" value from a Content-Disposition header.
+ """
+ return os.path.basename(filename)
+
+
+def parse_content_disposition(content_disposition: str, default_filename: str) -> str:
+ """
+ Parse the "filename" value from a Content-Disposition header, and
+ return the default filename if the result is empty.
+ """
+ m = email.message.Message()
+ m["content-type"] = content_disposition
+ filename = m.get_param("filename")
+ if filename:
+ # We need to sanitize the filename to prevent directory traversal
+ # in case the filename contains ".." path parts.
+ filename = sanitize_content_filename(str(filename))
+ return filename or default_filename
+
+
+def _get_http_response_filename(resp: Response, link: Link) -> str:
+ """Get an ideal filename from the given HTTP response, falling back to
+ the link filename if not provided.
+ """
+ filename = link.filename # fallback
+ # Have a look at the Content-Disposition header for a better guess
+ content_disposition = resp.headers.get("content-disposition")
+ if content_disposition:
+ filename = parse_content_disposition(content_disposition, filename)
+ ext: Optional[str] = splitext(filename)[1]
+ if not ext:
+ ext = mimetypes.guess_extension(resp.headers.get("content-type", ""))
+ if ext:
+ filename += ext
+ if not ext and link.url != resp.url:
+ ext = os.path.splitext(resp.url)[1]
+ if ext:
+ filename += ext
+ return filename
+
+
+def _http_get_download(session: PipSession, link: Link) -> Response:
+ target_url = link.url.split("#", 1)[0]
+ resp = session.get(target_url, headers=HEADERS, stream=True)
+ raise_for_status(resp)
+ return resp
+
+
+class Downloader:
+ def __init__(
+ self,
+ session: PipSession,
+ progress_bar: str,
+ ) -> None:
+ self._session = session
+ self._progress_bar = progress_bar
+
+ def __call__(self, link: Link, location: str) -> Tuple[str, str]:
+ """Download the file given by link into location."""
+ try:
+ resp = _http_get_download(self._session, link)
+ except NetworkConnectionError as e:
+ assert e.response is not None
+ logger.critical(
+ "HTTP error %s while getting %s", e.response.status_code, link
+ )
+ raise
+
+ filename = _get_http_response_filename(resp, link)
+ filepath = os.path.join(location, filename)
+
+ chunks = _prepare_download(resp, link, self._progress_bar)
+ with open(filepath, "wb") as content_file:
+ for chunk in chunks:
+ content_file.write(chunk)
+ content_type = resp.headers.get("Content-Type", "")
+ return filepath, content_type
+
+
+class BatchDownloader:
+ def __init__(
+ self,
+ session: PipSession,
+ progress_bar: str,
+ ) -> None:
+ self._session = session
+ self._progress_bar = progress_bar
+
+ def __call__(
+ self, links: Iterable[Link], location: str
+ ) -> Iterable[Tuple[Link, Tuple[str, str]]]:
+ """Download the files given by links into location."""
+ for link in links:
+ try:
+ resp = _http_get_download(self._session, link)
+ except NetworkConnectionError as e:
+ assert e.response is not None
+ logger.critical(
+ "HTTP error %s while getting %s",
+ e.response.status_code,
+ link,
+ )
+ raise
+
+ filename = _get_http_response_filename(resp, link)
+ filepath = os.path.join(location, filename)
+
+ chunks = _prepare_download(resp, link, self._progress_bar)
+ with open(filepath, "wb") as content_file:
+ for chunk in chunks:
+ content_file.write(chunk)
+ content_type = resp.headers.get("Content-Type", "")
+ yield link, (filepath, content_type)