diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/psycopg2/pool.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/psycopg2/pool.py | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/psycopg2/pool.py b/.venv/lib/python3.12/site-packages/psycopg2/pool.py new file mode 100644 index 00000000..9d67d68e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/psycopg2/pool.py @@ -0,0 +1,187 @@ +"""Connection pooling for psycopg2 + +This module implements thread-safe (and not) connection pools. +""" +# psycopg/pool.py - pooling code for psycopg +# +# Copyright (C) 2003-2019 Federico Di Gregorio <fog@debian.org> +# Copyright (C) 2020-2021 The Psycopg Team +# +# psycopg2 is free software: you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# In addition, as a special exception, the copyright holders give +# permission to link this program with the OpenSSL library (or with +# modified versions of OpenSSL that use the same license as OpenSSL), +# and distribute linked combinations including the two. +# +# You must obey the GNU Lesser General Public License in all respects for +# all of the code used other than OpenSSL. +# +# psycopg2 is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +# License for more details. + +import psycopg2 +from psycopg2 import extensions as _ext + + +class PoolError(psycopg2.Error): + pass + + +class AbstractConnectionPool: + """Generic key-based pooling code.""" + + def __init__(self, minconn, maxconn, *args, **kwargs): + """Initialize the connection pool. + + New 'minconn' connections are created immediately calling 'connfunc' + with given parameters. The connection pool will support a maximum of + about 'maxconn' connections. + """ + self.minconn = int(minconn) + self.maxconn = int(maxconn) + self.closed = False + + self._args = args + self._kwargs = kwargs + + self._pool = [] + self._used = {} + self._rused = {} # id(conn) -> key map + self._keys = 0 + + for i in range(self.minconn): + self._connect() + + def _connect(self, key=None): + """Create a new connection and assign it to 'key' if not None.""" + conn = psycopg2.connect(*self._args, **self._kwargs) + if key is not None: + self._used[key] = conn + self._rused[id(conn)] = key + else: + self._pool.append(conn) + return conn + + def _getkey(self): + """Return a new unique key.""" + self._keys += 1 + return self._keys + + def _getconn(self, key=None): + """Get a free connection and assign it to 'key' if not None.""" + if self.closed: + raise PoolError("connection pool is closed") + if key is None: + key = self._getkey() + + if key in self._used: + return self._used[key] + + if self._pool: + self._used[key] = conn = self._pool.pop() + self._rused[id(conn)] = key + return conn + else: + if len(self._used) == self.maxconn: + raise PoolError("connection pool exhausted") + return self._connect(key) + + def _putconn(self, conn, key=None, close=False): + """Put away a connection.""" + if self.closed: + raise PoolError("connection pool is closed") + + if key is None: + key = self._rused.get(id(conn)) + if key is None: + raise PoolError("trying to put unkeyed connection") + + if len(self._pool) < self.minconn and not close: + # Return the connection into a consistent state before putting + # it back into the pool + if not conn.closed: + status = conn.info.transaction_status + if status == _ext.TRANSACTION_STATUS_UNKNOWN: + # server connection lost + conn.close() + elif status != _ext.TRANSACTION_STATUS_IDLE: + # connection in error or in transaction + conn.rollback() + self._pool.append(conn) + else: + # regular idle connection + self._pool.append(conn) + # If the connection is closed, we just discard it. + else: + conn.close() + + # here we check for the presence of key because it can happen that a + # thread tries to put back a connection after a call to close + if not self.closed or key in self._used: + del self._used[key] + del self._rused[id(conn)] + + def _closeall(self): + """Close all connections. + + Note that this can lead to some code fail badly when trying to use + an already closed connection. If you call .closeall() make sure + your code can deal with it. + """ + if self.closed: + raise PoolError("connection pool is closed") + for conn in self._pool + list(self._used.values()): + try: + conn.close() + except Exception: + pass + self.closed = True + + +class SimpleConnectionPool(AbstractConnectionPool): + """A connection pool that can't be shared across different threads.""" + + getconn = AbstractConnectionPool._getconn + putconn = AbstractConnectionPool._putconn + closeall = AbstractConnectionPool._closeall + + +class ThreadedConnectionPool(AbstractConnectionPool): + """A connection pool that works with the threading module.""" + + def __init__(self, minconn, maxconn, *args, **kwargs): + """Initialize the threading lock.""" + import threading + AbstractConnectionPool.__init__( + self, minconn, maxconn, *args, **kwargs) + self._lock = threading.Lock() + + def getconn(self, key=None): + """Get a free connection and assign it to 'key' if not None.""" + self._lock.acquire() + try: + return self._getconn(key) + finally: + self._lock.release() + + def putconn(self, conn=None, key=None, close=False): + """Put away an unused connection.""" + self._lock.acquire() + try: + self._putconn(conn, key, close) + finally: + self._lock.release() + + def closeall(self): + """Close all connections (even the one currently in use.)""" + self._lock.acquire() + try: + self._closeall() + finally: + self._lock.release() |