diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/requests_toolbelt/threaded | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/requests_toolbelt/threaded')
3 files changed, 361 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/threaded/__init__.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/threaded/__init__.py new file mode 100644 index 00000000..984f1e80 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/threaded/__init__.py @@ -0,0 +1,97 @@ +""" +This module provides the API for ``requests_toolbelt.threaded``. + +The module provides a clean and simple API for making requests via a thread +pool. The thread pool will use sessions for increased performance. + +A simple use-case is: + +.. code-block:: python + + from requests_toolbelt import threaded + + urls_to_get = [{ + 'url': 'https://api.github.com/users/sigmavirus24', + 'method': 'GET', + }, { + 'url': 'https://api.github.com/repos/requests/toolbelt', + 'method': 'GET', + }, { + 'url': 'https://google.com', + 'method': 'GET', + }] + responses, errors = threaded.map(urls_to_get) + +By default, the threaded submodule will detect the number of CPUs your +computer has and use that if no other number of processes is selected. To +change this, always use the keyword argument ``num_processes``. Using the +above example, we would expand it like so: + +.. code-block:: python + + responses, errors = threaded.map(urls_to_get, num_processes=10) + +You can also customize how a :class:`requests.Session` is initialized by +creating a callback function: + +.. code-block:: python + + from requests_toolbelt import user_agent + + def initialize_session(session): + session.headers['User-Agent'] = user_agent('my-scraper', '0.1') + session.headers['Accept'] = 'application/json' + + responses, errors = threaded.map(urls_to_get, + initializer=initialize_session) + +.. autofunction:: requests_toolbelt.threaded.map + +Inspiration is blatantly drawn from the standard library's multiprocessing +library. See the following references: + +- multiprocessing's `pool source`_ + +- map and map_async `inspiration`_ + +.. _pool source: + https://hg.python.org/cpython/file/8ef4f75a8018/Lib/multiprocessing/pool.py +.. _inspiration: + https://hg.python.org/cpython/file/8ef4f75a8018/Lib/multiprocessing/pool.py#l340 +""" +from . import pool +from .._compat import queue + + +def map(requests, **kwargs): + r"""Simple interface to the threaded Pool object. + + This function takes a list of dictionaries representing requests to make + using Sessions in threads and returns a tuple where the first item is + a generator of successful responses and the second is a generator of + exceptions. + + :param list requests: + Collection of dictionaries representing requests to make with the Pool + object. + :param \*\*kwargs: + Keyword arguments that are passed to the + :class:`~requests_toolbelt.threaded.pool.Pool` object. + :returns: Tuple of responses and exceptions from the pool + :rtype: (:class:`~requests_toolbelt.threaded.pool.ThreadResponse`, + :class:`~requests_toolbelt.threaded.pool.ThreadException`) + """ + if not (requests and all(isinstance(r, dict) for r in requests)): + raise ValueError('map expects a list of dictionaries.') + + # Build our queue of requests + job_queue = queue.Queue() + for request in requests: + job_queue.put(request) + + # Ensure the user doesn't try to pass their own job_queue + kwargs['job_queue'] = job_queue + + threadpool = pool.Pool(**kwargs) + threadpool.join_all() + return threadpool.responses(), threadpool.exceptions() diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/threaded/pool.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/threaded/pool.py new file mode 100644 index 00000000..1fe81461 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/threaded/pool.py @@ -0,0 +1,211 @@ +"""Module implementing the Pool for :mod:``requests_toolbelt.threaded``.""" +import multiprocessing +import requests + +from . import thread +from .._compat import queue + + +class Pool(object): + """Pool that manages the threads containing sessions. + + :param queue: + The queue you're expected to use to which you should add items. + :type queue: queue.Queue + :param initializer: + Function used to initialize an instance of ``session``. + :type initializer: collections.Callable + :param auth_generator: + Function used to generate new auth credentials for the session. + :type auth_generator: collections.Callable + :param int num_process: + Number of threads to create. + :param session: + :type session: requests.Session + """ + + def __init__(self, job_queue, initializer=None, auth_generator=None, + num_processes=None, session=requests.Session): + if num_processes is None: + num_processes = multiprocessing.cpu_count() or 1 + + if num_processes < 1: + raise ValueError("Number of processes should at least be 1.") + + self._job_queue = job_queue + self._response_queue = queue.Queue() + self._exc_queue = queue.Queue() + self._processes = num_processes + self._initializer = initializer or _identity + self._auth = auth_generator or _identity + self._session = session + self._pool = [ + thread.SessionThread(self._new_session(), self._job_queue, + self._response_queue, self._exc_queue) + for _ in range(self._processes) + ] + + def _new_session(self): + return self._auth(self._initializer(self._session())) + + @classmethod + def from_exceptions(cls, exceptions, **kwargs): + r"""Create a :class:`~Pool` from an :class:`~ThreadException`\ s. + + Provided an iterable that provides :class:`~ThreadException` objects, + this classmethod will generate a new pool to retry the requests that + caused the exceptions. + + :param exceptions: + Iterable that returns :class:`~ThreadException` + :type exceptions: iterable + :param kwargs: + Keyword arguments passed to the :class:`~Pool` initializer. + :returns: An initialized :class:`~Pool` object. + :rtype: :class:`~Pool` + """ + job_queue = queue.Queue() + for exc in exceptions: + job_queue.put(exc.request_kwargs) + + return cls(job_queue=job_queue, **kwargs) + + @classmethod + def from_urls(cls, urls, request_kwargs=None, **kwargs): + """Create a :class:`~Pool` from an iterable of URLs. + + :param urls: + Iterable that returns URLs with which we create a pool. + :type urls: iterable + :param dict request_kwargs: + Dictionary of other keyword arguments to provide to the request + method. + :param kwargs: + Keyword arguments passed to the :class:`~Pool` initializer. + :returns: An initialized :class:`~Pool` object. + :rtype: :class:`~Pool` + """ + request_dict = {'method': 'GET'} + request_dict.update(request_kwargs or {}) + job_queue = queue.Queue() + for url in urls: + job = request_dict.copy() + job.update({'url': url}) + job_queue.put(job) + + return cls(job_queue=job_queue, **kwargs) + + def exceptions(self): + """Iterate over all the exceptions in the pool. + + :returns: Generator of :class:`~ThreadException` + """ + while True: + exc = self.get_exception() + if exc is None: + break + yield exc + + def get_exception(self): + """Get an exception from the pool. + + :rtype: :class:`~ThreadException` + """ + try: + (request, exc) = self._exc_queue.get_nowait() + except queue.Empty: + return None + else: + return ThreadException(request, exc) + + def get_response(self): + """Get a response from the pool. + + :rtype: :class:`~ThreadResponse` + """ + try: + (request, response) = self._response_queue.get_nowait() + except queue.Empty: + return None + else: + return ThreadResponse(request, response) + + def responses(self): + """Iterate over all the responses in the pool. + + :returns: Generator of :class:`~ThreadResponse` + """ + while True: + resp = self.get_response() + if resp is None: + break + yield resp + + def join_all(self): + """Join all the threads to the master thread.""" + for session_thread in self._pool: + session_thread.join() + + +class ThreadProxy(object): + proxied_attr = None + + def __getattr__(self, attr): + """Proxy attribute accesses to the proxied object.""" + get = object.__getattribute__ + if attr not in self.attrs: + response = get(self, self.proxied_attr) + return getattr(response, attr) + else: + return get(self, attr) + + +class ThreadResponse(ThreadProxy): + """A wrapper around a requests Response object. + + This will proxy most attribute access actions to the Response object. For + example, if you wanted the parsed JSON from the response, you might do: + + .. code-block:: python + + thread_response = pool.get_response() + json = thread_response.json() + + """ + proxied_attr = 'response' + attrs = frozenset(['request_kwargs', 'response']) + + def __init__(self, request_kwargs, response): + #: The original keyword arguments provided to the queue + self.request_kwargs = request_kwargs + #: The wrapped response + self.response = response + + +class ThreadException(ThreadProxy): + """A wrapper around an exception raised during a request. + + This will proxy most attribute access actions to the exception object. For + example, if you wanted the message from the exception, you might do: + + .. code-block:: python + + thread_exc = pool.get_exception() + msg = thread_exc.message + + """ + proxied_attr = 'exception' + attrs = frozenset(['request_kwargs', 'exception']) + + def __init__(self, request_kwargs, exception): + #: The original keyword arguments provided to the queue + self.request_kwargs = request_kwargs + #: The captured and wrapped exception + self.exception = exception + + +def _identity(session_obj): + return session_obj + + +__all__ = ['ThreadException', 'ThreadResponse', 'Pool'] diff --git a/.venv/lib/python3.12/site-packages/requests_toolbelt/threaded/thread.py b/.venv/lib/python3.12/site-packages/requests_toolbelt/threaded/thread.py new file mode 100644 index 00000000..542813c1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/requests_toolbelt/threaded/thread.py @@ -0,0 +1,53 @@ +"""Module containing the SessionThread class.""" +import threading +import uuid + +import requests.exceptions as exc + +from .._compat import queue + + +class SessionThread(object): + def __init__(self, initialized_session, job_queue, response_queue, + exception_queue): + self._session = initialized_session + self._jobs = job_queue + self._create_worker() + self._responses = response_queue + self._exceptions = exception_queue + + def _create_worker(self): + self._worker = threading.Thread( + target=self._make_request, + name=uuid.uuid4(), + ) + self._worker.daemon = True + self._worker._state = 0 + self._worker.start() + + def _handle_request(self, kwargs): + try: + response = self._session.request(**kwargs) + except exc.RequestException as e: + self._exceptions.put((kwargs, e)) + else: + self._responses.put((kwargs, response)) + finally: + self._jobs.task_done() + + def _make_request(self): + while True: + try: + kwargs = self._jobs.get_nowait() + except queue.Empty: + break + + self._handle_request(kwargs) + + def is_alive(self): + """Proxy to the thread's ``is_alive`` method.""" + return self._worker.is_alive() + + def join(self): + """Join this thread to the master thread.""" + self._worker.join() |