1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
# SPDX-FileCopyrightText: 2015 Eric Larson
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import hashlib
import os
from textwrap import dedent
from typing import IO, TYPE_CHECKING
from pathlib import Path
from pip._vendor.cachecontrol.cache import BaseCache, SeparateBodyBaseCache
from pip._vendor.cachecontrol.controller import CacheController
if TYPE_CHECKING:
from datetime import datetime
from filelock import BaseFileLock
def _secure_open_write(filename: str, fmode: int) -> IO[bytes]:
# We only want to write to this file, so open it in write only mode
flags = os.O_WRONLY
# os.O_CREAT | os.O_EXCL will fail if the file already exists, so we only
# will open *new* files.
# We specify this because we want to ensure that the mode we pass is the
# mode of the file.
flags |= os.O_CREAT | os.O_EXCL
# Do not follow symlinks to prevent someone from making a symlink that
# we follow and insecurely open a cache file.
if hasattr(os, "O_NOFOLLOW"):
flags |= os.O_NOFOLLOW
# On Windows we'll mark this file as binary
if hasattr(os, "O_BINARY"):
flags |= os.O_BINARY
# Before we open our file, we want to delete any existing file that is
# there
try:
os.remove(filename)
except OSError:
# The file must not exist already, so we can just skip ahead to opening
pass
# Open our file, the use of os.O_CREAT | os.O_EXCL will ensure that if a
# race condition happens between the os.remove and this line, that an
# error will be raised. Because we utilize a lockfile this should only
# happen if someone is attempting to attack us.
fd = os.open(filename, flags, fmode)
try:
return os.fdopen(fd, "wb")
except:
# An error occurred wrapping our FD in a file object
os.close(fd)
raise
class _FileCacheMixin:
"""Shared implementation for both FileCache variants."""
def __init__(
self,
directory: str | Path,
forever: bool = False,
filemode: int = 0o0600,
dirmode: int = 0o0700,
lock_class: type[BaseFileLock] | None = None,
) -> None:
try:
if lock_class is None:
from filelock import FileLock
lock_class = FileLock
except ImportError:
notice = dedent(
"""
NOTE: In order to use the FileCache you must have
filelock installed. You can install it via pip:
pip install cachecontrol[filecache]
"""
)
raise ImportError(notice)
self.directory = directory
self.forever = forever
self.filemode = filemode
self.dirmode = dirmode
self.lock_class = lock_class
@staticmethod
def encode(x: str) -> str:
return hashlib.sha224(x.encode()).hexdigest()
def _fn(self, name: str) -> str:
# NOTE: This method should not change as some may depend on it.
# See: https://github.com/ionrock/cachecontrol/issues/63
hashed = self.encode(name)
parts = list(hashed[:5]) + [hashed]
return os.path.join(self.directory, *parts)
def get(self, key: str) -> bytes | None:
name = self._fn(key)
try:
with open(name, "rb") as fh:
return fh.read()
except FileNotFoundError:
return None
def set(
self, key: str, value: bytes, expires: int | datetime | None = None
) -> None:
name = self._fn(key)
self._write(name, value)
def _write(self, path: str, data: bytes) -> None:
"""
Safely write the data to the given path.
"""
# Make sure the directory exists
try:
os.makedirs(os.path.dirname(path), self.dirmode)
except OSError:
pass
with self.lock_class(path + ".lock"):
# Write our actual file
with _secure_open_write(path, self.filemode) as fh:
fh.write(data)
def _delete(self, key: str, suffix: str) -> None:
name = self._fn(key) + suffix
if not self.forever:
try:
os.remove(name)
except FileNotFoundError:
pass
class FileCache(_FileCacheMixin, BaseCache):
"""
Traditional FileCache: body is stored in memory, so not suitable for large
downloads.
"""
def delete(self, key: str) -> None:
self._delete(key, "")
class SeparateBodyFileCache(_FileCacheMixin, SeparateBodyBaseCache):
"""
Memory-efficient FileCache: body is stored in a separate file, reducing
peak memory usage.
"""
def get_body(self, key: str) -> IO[bytes] | None:
name = self._fn(key) + ".body"
try:
return open(name, "rb")
except FileNotFoundError:
return None
def set_body(self, key: str, body: bytes) -> None:
name = self._fn(key) + ".body"
self._write(name, body)
def delete(self, key: str) -> None:
self._delete(key, "")
self._delete(key, ".body")
def url_to_file_path(url: str, filecache: FileCache) -> str:
"""Return the file cache path based on the URL.
This does not ensure the file exists!
"""
key = CacheController.cache_url(url)
return filecache._fn(key)
|