diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/_queue.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/sentry_sdk/_queue.py | 289 |
1 files changed, 289 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/_queue.py b/.venv/lib/python3.12/site-packages/sentry_sdk/_queue.py new file mode 100644 index 00000000..a21c86ec --- /dev/null +++ b/.venv/lib/python3.12/site-packages/sentry_sdk/_queue.py @@ -0,0 +1,289 @@ +""" +A fork of Python 3.6's stdlib queue (found in Pythons 'cpython/Lib/queue.py') +with Lock swapped out for RLock to avoid a deadlock while garbage collecting. + +https://github.com/python/cpython/blob/v3.6.12/Lib/queue.py + + +See also +https://codewithoutrules.com/2017/08/16/concurrency-python/ +https://bugs.python.org/issue14976 +https://github.com/sqlalchemy/sqlalchemy/blob/4eb747b61f0c1b1c25bdee3856d7195d10a0c227/lib/sqlalchemy/queue.py#L1 + +We also vendor the code to evade eventlet's broken monkeypatching, see +https://github.com/getsentry/sentry-python/pull/484 + + +Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020 Python Software Foundation; + +All Rights Reserved + + +PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2 +-------------------------------------------- + +1. This LICENSE AGREEMENT is between the Python Software Foundation +("PSF"), and the Individual or Organization ("Licensee") accessing and +otherwise using this software ("Python") in source or binary form and +its associated documentation. + +2. Subject to the terms and conditions of this License Agreement, PSF hereby +grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce, +analyze, test, perform and/or display publicly, prepare derivative works, +distribute, and otherwise use Python alone or in any derivative version, +provided, however, that PSF's License Agreement and PSF's notice of copyright, +i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020 Python Software Foundation; +All Rights Reserved" are retained in Python alone or in any derivative version +prepared by Licensee. + +3. In the event Licensee prepares a derivative work that is based on +or incorporates Python or any part thereof, and wants to make +the derivative work available to others as provided herein, then +Licensee hereby agrees to include in any such work a brief summary of +the changes made to Python. + +4. PSF is making Python available to Licensee on an "AS IS" +basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR +IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND +DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS +FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT +INFRINGE ANY THIRD PARTY RIGHTS. + +5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON +FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS +A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON, +OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF. + +6. This License Agreement will automatically terminate upon a material +breach of its terms and conditions. + +7. Nothing in this License Agreement shall be deemed to create any +relationship of agency, partnership, or joint venture between PSF and +Licensee. This License Agreement does not grant permission to use PSF +trademarks or trade name in a trademark sense to endorse or promote +products or services of Licensee, or any third party. + +8. By copying, installing or otherwise using Python, Licensee +agrees to be bound by the terms and conditions of this License +Agreement. + +""" + +import threading + +from collections import deque +from time import time + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Any + +__all__ = ["EmptyError", "FullError", "Queue"] + + +class EmptyError(Exception): + "Exception raised by Queue.get(block=0)/get_nowait()." + + pass + + +class FullError(Exception): + "Exception raised by Queue.put(block=0)/put_nowait()." + + pass + + +class Queue: + """Create a queue object with a given maximum size. + + If maxsize is <= 0, the queue size is infinite. + """ + + def __init__(self, maxsize=0): + self.maxsize = maxsize + self._init(maxsize) + + # mutex must be held whenever the queue is mutating. All methods + # that acquire mutex must release it before returning. mutex + # is shared between the three conditions, so acquiring and + # releasing the conditions also acquires and releases mutex. + self.mutex = threading.RLock() + + # Notify not_empty whenever an item is added to the queue; a + # thread waiting to get is notified then. + self.not_empty = threading.Condition(self.mutex) + + # Notify not_full whenever an item is removed from the queue; + # a thread waiting to put is notified then. + self.not_full = threading.Condition(self.mutex) + + # Notify all_tasks_done whenever the number of unfinished tasks + # drops to zero; thread waiting to join() is notified to resume + self.all_tasks_done = threading.Condition(self.mutex) + self.unfinished_tasks = 0 + + def task_done(self): + """Indicate that a formerly enqueued task is complete. + + Used by Queue consumer threads. For each get() used to fetch a task, + a subsequent call to task_done() tells the queue that the processing + on the task is complete. + + If a join() is currently blocking, it will resume when all items + have been processed (meaning that a task_done() call was received + for every item that had been put() into the queue). + + Raises a ValueError if called more times than there were items + placed in the queue. + """ + with self.all_tasks_done: + unfinished = self.unfinished_tasks - 1 + if unfinished <= 0: + if unfinished < 0: + raise ValueError("task_done() called too many times") + self.all_tasks_done.notify_all() + self.unfinished_tasks = unfinished + + def join(self): + """Blocks until all items in the Queue have been gotten and processed. + + The count of unfinished tasks goes up whenever an item is added to the + queue. The count goes down whenever a consumer thread calls task_done() + to indicate the item was retrieved and all work on it is complete. + + When the count of unfinished tasks drops to zero, join() unblocks. + """ + with self.all_tasks_done: + while self.unfinished_tasks: + self.all_tasks_done.wait() + + def qsize(self): + """Return the approximate size of the queue (not reliable!).""" + with self.mutex: + return self._qsize() + + def empty(self): + """Return True if the queue is empty, False otherwise (not reliable!). + + This method is likely to be removed at some point. Use qsize() == 0 + as a direct substitute, but be aware that either approach risks a race + condition where a queue can grow before the result of empty() or + qsize() can be used. + + To create code that needs to wait for all queued tasks to be + completed, the preferred technique is to use the join() method. + """ + with self.mutex: + return not self._qsize() + + def full(self): + """Return True if the queue is full, False otherwise (not reliable!). + + This method is likely to be removed at some point. Use qsize() >= n + as a direct substitute, but be aware that either approach risks a race + condition where a queue can shrink before the result of full() or + qsize() can be used. + """ + with self.mutex: + return 0 < self.maxsize <= self._qsize() + + def put(self, item, block=True, timeout=None): + """Put an item into the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until a free slot is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the FullError exception if no free slot was available within that time. + Otherwise ('block' is false), put an item on the queue if a free slot + is immediately available, else raise the FullError exception ('timeout' + is ignored in that case). + """ + with self.not_full: + if self.maxsize > 0: + if not block: + if self._qsize() >= self.maxsize: + raise FullError() + elif timeout is None: + while self._qsize() >= self.maxsize: + self.not_full.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time() + timeout + while self._qsize() >= self.maxsize: + remaining = endtime - time() + if remaining <= 0.0: + raise FullError() + self.not_full.wait(remaining) + self._put(item) + self.unfinished_tasks += 1 + self.not_empty.notify() + + def get(self, block=True, timeout=None): + """Remove and return an item from the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the EmptyError exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the EmptyError exception ('timeout' is ignored + in that case). + """ + with self.not_empty: + if not block: + if not self._qsize(): + raise EmptyError() + elif timeout is None: + while not self._qsize(): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time() + timeout + while not self._qsize(): + remaining = endtime - time() + if remaining <= 0.0: + raise EmptyError() + self.not_empty.wait(remaining) + item = self._get() + self.not_full.notify() + return item + + def put_nowait(self, item): + """Put an item into the queue without blocking. + + Only enqueue the item if a free slot is immediately available. + Otherwise raise the FullError exception. + """ + return self.put(item, block=False) + + def get_nowait(self): + """Remove and return an item from the queue without blocking. + + Only get an item if one is immediately available. Otherwise + raise the EmptyError exception. + """ + return self.get(block=False) + + # Override these methods to implement other queue organizations + # (e.g. stack or priority queue). + # These will only be called with appropriate locks held + + # Initialize the queue representation + def _init(self, maxsize): + self.queue = deque() # type: Any + + def _qsize(self): + return len(self.queue) + + # Put a new item in the queue + def _put(self, item): + self.queue.append(item) + + # Get an item from the queue + def _get(self): + return self.queue.popleft() |