aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/asyncpg/pool.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/asyncpg/pool.py')
-rw-r--r--.venv/lib/python3.12/site-packages/asyncpg/pool.py1130
1 files changed, 1130 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/asyncpg/pool.py b/.venv/lib/python3.12/site-packages/asyncpg/pool.py
new file mode 100644
index 00000000..06e698df
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/asyncpg/pool.py
@@ -0,0 +1,1130 @@
+# Copyright (C) 2016-present the asyncpg authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of asyncpg and is released under
+# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
+
+
+import asyncio
+import functools
+import inspect
+import logging
+import time
+import warnings
+
+from . import compat
+from . import connection
+from . import exceptions
+from . import protocol
+
+
+logger = logging.getLogger(__name__)
+
+
+class PoolConnectionProxyMeta(type):
+
+ def __new__(mcls, name, bases, dct, *, wrap=False):
+ if wrap:
+ for attrname in dir(connection.Connection):
+ if attrname.startswith('_') or attrname in dct:
+ continue
+
+ meth = getattr(connection.Connection, attrname)
+ if not inspect.isfunction(meth):
+ continue
+
+ wrapper = mcls._wrap_connection_method(attrname)
+ wrapper = functools.update_wrapper(wrapper, meth)
+ dct[attrname] = wrapper
+
+ if '__doc__' not in dct:
+ dct['__doc__'] = connection.Connection.__doc__
+
+ return super().__new__(mcls, name, bases, dct)
+
+ @staticmethod
+ def _wrap_connection_method(meth_name):
+ def call_con_method(self, *args, **kwargs):
+ # This method will be owned by PoolConnectionProxy class.
+ if self._con is None:
+ raise exceptions.InterfaceError(
+ 'cannot call Connection.{}(): '
+ 'connection has been released back to the pool'.format(
+ meth_name))
+
+ meth = getattr(self._con.__class__, meth_name)
+ return meth(self._con, *args, **kwargs)
+
+ return call_con_method
+
+
+class PoolConnectionProxy(connection._ConnectionProxy,
+ metaclass=PoolConnectionProxyMeta,
+ wrap=True):
+
+ __slots__ = ('_con', '_holder')
+
+ def __init__(self, holder: 'PoolConnectionHolder',
+ con: connection.Connection):
+ self._con = con
+ self._holder = holder
+ con._set_proxy(self)
+
+ def __getattr__(self, attr):
+ # Proxy all unresolved attributes to the wrapped Connection object.
+ return getattr(self._con, attr)
+
+ def _detach(self) -> connection.Connection:
+ if self._con is None:
+ return
+
+ con, self._con = self._con, None
+ con._set_proxy(None)
+ return con
+
+ def __repr__(self):
+ if self._con is None:
+ return '<{classname} [released] {id:#x}>'.format(
+ classname=self.__class__.__name__, id=id(self))
+ else:
+ return '<{classname} {con!r} {id:#x}>'.format(
+ classname=self.__class__.__name__, con=self._con, id=id(self))
+
+
+class PoolConnectionHolder:
+
+ __slots__ = ('_con', '_pool', '_loop', '_proxy',
+ '_max_queries', '_setup',
+ '_max_inactive_time', '_in_use',
+ '_inactive_callback', '_timeout',
+ '_generation')
+
+ def __init__(self, pool, *, max_queries, setup, max_inactive_time):
+
+ self._pool = pool
+ self._con = None
+ self._proxy = None
+
+ self._max_queries = max_queries
+ self._max_inactive_time = max_inactive_time
+ self._setup = setup
+ self._inactive_callback = None
+ self._in_use = None # type: asyncio.Future
+ self._timeout = None
+ self._generation = None
+
+ def is_connected(self):
+ return self._con is not None and not self._con.is_closed()
+
+ def is_idle(self):
+ return not self._in_use
+
+ async def connect(self):
+ if self._con is not None:
+ raise exceptions.InternalClientError(
+ 'PoolConnectionHolder.connect() called while another '
+ 'connection already exists')
+
+ self._con = await self._pool._get_new_connection()
+ self._generation = self._pool._generation
+ self._maybe_cancel_inactive_callback()
+ self._setup_inactive_callback()
+
+ async def acquire(self) -> PoolConnectionProxy:
+ if self._con is None or self._con.is_closed():
+ self._con = None
+ await self.connect()
+
+ elif self._generation != self._pool._generation:
+ # Connections have been expired, re-connect the holder.
+ self._pool._loop.create_task(
+ self._con.close(timeout=self._timeout))
+ self._con = None
+ await self.connect()
+
+ self._maybe_cancel_inactive_callback()
+
+ self._proxy = proxy = PoolConnectionProxy(self, self._con)
+
+ if self._setup is not None:
+ try:
+ await self._setup(proxy)
+ except (Exception, asyncio.CancelledError) as ex:
+ # If a user-defined `setup` function fails, we don't
+ # know if the connection is safe for re-use, hence
+ # we close it. A new connection will be created
+ # when `acquire` is called again.
+ try:
+ # Use `close()` to close the connection gracefully.
+ # An exception in `setup` isn't necessarily caused
+ # by an IO or a protocol error. close() will
+ # do the necessary cleanup via _release_on_close().
+ await self._con.close()
+ finally:
+ raise ex
+
+ self._in_use = self._pool._loop.create_future()
+
+ return proxy
+
+ async def release(self, timeout):
+ if self._in_use is None:
+ raise exceptions.InternalClientError(
+ 'PoolConnectionHolder.release() called on '
+ 'a free connection holder')
+
+ if self._con.is_closed():
+ # When closing, pool connections perform the necessary
+ # cleanup, so we don't have to do anything else here.
+ return
+
+ self._timeout = None
+
+ if self._con._protocol.queries_count >= self._max_queries:
+ # The connection has reached its maximum utilization limit,
+ # so close it. Connection.close() will call _release().
+ await self._con.close(timeout=timeout)
+ return
+
+ if self._generation != self._pool._generation:
+ # The connection has expired because it belongs to
+ # an older generation (Pool.expire_connections() has
+ # been called.)
+ await self._con.close(timeout=timeout)
+ return
+
+ try:
+ budget = timeout
+
+ if self._con._protocol._is_cancelling():
+ # If the connection is in cancellation state,
+ # wait for the cancellation
+ started = time.monotonic()
+ await compat.wait_for(
+ self._con._protocol._wait_for_cancellation(),
+ budget)
+ if budget is not None:
+ budget -= time.monotonic() - started
+
+ await self._con.reset(timeout=budget)
+ except (Exception, asyncio.CancelledError) as ex:
+ # If the `reset` call failed, terminate the connection.
+ # A new one will be created when `acquire` is called
+ # again.
+ try:
+ # An exception in `reset` is most likely caused by
+ # an IO error, so terminate the connection.
+ self._con.terminate()
+ finally:
+ raise ex
+
+ # Free this connection holder and invalidate the
+ # connection proxy.
+ self._release()
+
+ # Rearm the connection inactivity timer.
+ self._setup_inactive_callback()
+
+ async def wait_until_released(self):
+ if self._in_use is None:
+ return
+ else:
+ await self._in_use
+
+ async def close(self):
+ if self._con is not None:
+ # Connection.close() will call _release_on_close() to
+ # finish holder cleanup.
+ await self._con.close()
+
+ def terminate(self):
+ if self._con is not None:
+ # Connection.terminate() will call _release_on_close() to
+ # finish holder cleanup.
+ self._con.terminate()
+
+ def _setup_inactive_callback(self):
+ if self._inactive_callback is not None:
+ raise exceptions.InternalClientError(
+ 'pool connection inactivity timer already exists')
+
+ if self._max_inactive_time:
+ self._inactive_callback = self._pool._loop.call_later(
+ self._max_inactive_time, self._deactivate_inactive_connection)
+
+ def _maybe_cancel_inactive_callback(self):
+ if self._inactive_callback is not None:
+ self._inactive_callback.cancel()
+ self._inactive_callback = None
+
+ def _deactivate_inactive_connection(self):
+ if self._in_use is not None:
+ raise exceptions.InternalClientError(
+ 'attempting to deactivate an acquired connection')
+
+ if self._con is not None:
+ # The connection is idle and not in use, so it's fine to
+ # use terminate() instead of close().
+ self._con.terminate()
+ # Must call clear_connection, because _deactivate_connection
+ # is called when the connection is *not* checked out, and
+ # so terminate() above will not call the below.
+ self._release_on_close()
+
+ def _release_on_close(self):
+ self._maybe_cancel_inactive_callback()
+ self._release()
+ self._con = None
+
+ def _release(self):
+ """Release this connection holder."""
+ if self._in_use is None:
+ # The holder is not checked out.
+ return
+
+ if not self._in_use.done():
+ self._in_use.set_result(None)
+ self._in_use = None
+
+ # Deinitialize the connection proxy. All subsequent
+ # operations on it will fail.
+ if self._proxy is not None:
+ self._proxy._detach()
+ self._proxy = None
+
+ # Put ourselves back to the pool queue.
+ self._pool._queue.put_nowait(self)
+
+
+class Pool:
+ """A connection pool.
+
+ Connection pool can be used to manage a set of connections to the database.
+ Connections are first acquired from the pool, then used, and then released
+ back to the pool. Once a connection is released, it's reset to close all
+ open cursors and other resources *except* prepared statements.
+
+ Pools are created by calling :func:`~asyncpg.pool.create_pool`.
+ """
+
+ __slots__ = (
+ '_queue', '_loop', '_minsize', '_maxsize',
+ '_init', '_connect_args', '_connect_kwargs',
+ '_holders', '_initialized', '_initializing', '_closing',
+ '_closed', '_connection_class', '_record_class', '_generation',
+ '_setup', '_max_queries', '_max_inactive_connection_lifetime'
+ )
+
+ def __init__(self, *connect_args,
+ min_size,
+ max_size,
+ max_queries,
+ max_inactive_connection_lifetime,
+ setup,
+ init,
+ loop,
+ connection_class,
+ record_class,
+ **connect_kwargs):
+
+ if len(connect_args) > 1:
+ warnings.warn(
+ "Passing multiple positional arguments to asyncpg.Pool "
+ "constructor is deprecated and will be removed in "
+ "asyncpg 0.17.0. The non-deprecated form is "
+ "asyncpg.Pool(<dsn>, **kwargs)",
+ DeprecationWarning, stacklevel=2)
+
+ if loop is None:
+ loop = asyncio.get_event_loop()
+ self._loop = loop
+
+ if max_size <= 0:
+ raise ValueError('max_size is expected to be greater than zero')
+
+ if min_size < 0:
+ raise ValueError(
+ 'min_size is expected to be greater or equal to zero')
+
+ if min_size > max_size:
+ raise ValueError('min_size is greater than max_size')
+
+ if max_queries <= 0:
+ raise ValueError('max_queries is expected to be greater than zero')
+
+ if max_inactive_connection_lifetime < 0:
+ raise ValueError(
+ 'max_inactive_connection_lifetime is expected to be greater '
+ 'or equal to zero')
+
+ if not issubclass(connection_class, connection.Connection):
+ raise TypeError(
+ 'connection_class is expected to be a subclass of '
+ 'asyncpg.Connection, got {!r}'.format(connection_class))
+
+ if not issubclass(record_class, protocol.Record):
+ raise TypeError(
+ 'record_class is expected to be a subclass of '
+ 'asyncpg.Record, got {!r}'.format(record_class))
+
+ self._minsize = min_size
+ self._maxsize = max_size
+
+ self._holders = []
+ self._initialized = False
+ self._initializing = False
+ self._queue = None
+
+ self._connection_class = connection_class
+ self._record_class = record_class
+
+ self._closing = False
+ self._closed = False
+ self._generation = 0
+ self._init = init
+ self._connect_args = connect_args
+ self._connect_kwargs = connect_kwargs
+
+ self._setup = setup
+ self._max_queries = max_queries
+ self._max_inactive_connection_lifetime = \
+ max_inactive_connection_lifetime
+
+ async def _async__init__(self):
+ if self._initialized:
+ return
+ if self._initializing:
+ raise exceptions.InterfaceError(
+ 'pool is being initialized in another task')
+ if self._closed:
+ raise exceptions.InterfaceError('pool is closed')
+ self._initializing = True
+ try:
+ await self._initialize()
+ return self
+ finally:
+ self._initializing = False
+ self._initialized = True
+
+ async def _initialize(self):
+ self._queue = asyncio.LifoQueue(maxsize=self._maxsize)
+ for _ in range(self._maxsize):
+ ch = PoolConnectionHolder(
+ self,
+ max_queries=self._max_queries,
+ max_inactive_time=self._max_inactive_connection_lifetime,
+ setup=self._setup)
+
+ self._holders.append(ch)
+ self._queue.put_nowait(ch)
+
+ if self._minsize:
+ # Since we use a LIFO queue, the first items in the queue will be
+ # the last ones in `self._holders`. We want to pre-connect the
+ # first few connections in the queue, therefore we want to walk
+ # `self._holders` in reverse.
+
+ # Connect the first connection holder in the queue so that
+ # any connection issues are visible early.
+ first_ch = self._holders[-1] # type: PoolConnectionHolder
+ await first_ch.connect()
+
+ if self._minsize > 1:
+ connect_tasks = []
+ for i, ch in enumerate(reversed(self._holders[:-1])):
+ # `minsize - 1` because we already have first_ch
+ if i >= self._minsize - 1:
+ break
+ connect_tasks.append(ch.connect())
+
+ await asyncio.gather(*connect_tasks)
+
+ def is_closing(self):
+ """Return ``True`` if the pool is closing or is closed.
+
+ .. versionadded:: 0.28.0
+ """
+ return self._closed or self._closing
+
+ def get_size(self):
+ """Return the current number of connections in this pool.
+
+ .. versionadded:: 0.25.0
+ """
+ return sum(h.is_connected() for h in self._holders)
+
+ def get_min_size(self):
+ """Return the minimum number of connections in this pool.
+
+ .. versionadded:: 0.25.0
+ """
+ return self._minsize
+
+ def get_max_size(self):
+ """Return the maximum allowed number of connections in this pool.
+
+ .. versionadded:: 0.25.0
+ """
+ return self._maxsize
+
+ def get_idle_size(self):
+ """Return the current number of idle connections in this pool.
+
+ .. versionadded:: 0.25.0
+ """
+ return sum(h.is_connected() and h.is_idle() for h in self._holders)
+
+ def set_connect_args(self, dsn=None, **connect_kwargs):
+ r"""Set the new connection arguments for this pool.
+
+ The new connection arguments will be used for all subsequent
+ new connection attempts. Existing connections will remain until
+ they expire. Use :meth:`Pool.expire_connections()
+ <asyncpg.pool.Pool.expire_connections>` to expedite the connection
+ expiry.
+
+ :param str dsn:
+ Connection arguments specified using as a single string in
+ the following format:
+ ``postgres://user:pass@host:port/database?option=value``.
+
+ :param \*\*connect_kwargs:
+ Keyword arguments for the :func:`~asyncpg.connection.connect`
+ function.
+
+ .. versionadded:: 0.16.0
+ """
+
+ self._connect_args = [dsn]
+ self._connect_kwargs = connect_kwargs
+
+ async def _get_new_connection(self):
+ con = await connection.connect(
+ *self._connect_args,
+ loop=self._loop,
+ connection_class=self._connection_class,
+ record_class=self._record_class,
+ **self._connect_kwargs,
+ )
+
+ if self._init is not None:
+ try:
+ await self._init(con)
+ except (Exception, asyncio.CancelledError) as ex:
+ # If a user-defined `init` function fails, we don't
+ # know if the connection is safe for re-use, hence
+ # we close it. A new connection will be created
+ # when `acquire` is called again.
+ try:
+ # Use `close()` to close the connection gracefully.
+ # An exception in `init` isn't necessarily caused
+ # by an IO or a protocol error. close() will
+ # do the necessary cleanup via _release_on_close().
+ await con.close()
+ finally:
+ raise ex
+
+ return con
+
+ async def execute(self, query: str, *args, timeout: float=None) -> str:
+ """Execute an SQL command (or commands).
+
+ Pool performs this operation using one of its connections. Other than
+ that, it behaves identically to
+ :meth:`Connection.execute() <asyncpg.connection.Connection.execute>`.
+
+ .. versionadded:: 0.10.0
+ """
+ async with self.acquire() as con:
+ return await con.execute(query, *args, timeout=timeout)
+
+ async def executemany(self, command: str, args, *, timeout: float=None):
+ """Execute an SQL *command* for each sequence of arguments in *args*.
+
+ Pool performs this operation using one of its connections. Other than
+ that, it behaves identically to
+ :meth:`Connection.executemany()
+ <asyncpg.connection.Connection.executemany>`.
+
+ .. versionadded:: 0.10.0
+ """
+ async with self.acquire() as con:
+ return await con.executemany(command, args, timeout=timeout)
+
+ async def fetch(
+ self,
+ query,
+ *args,
+ timeout=None,
+ record_class=None
+ ) -> list:
+ """Run a query and return the results as a list of :class:`Record`.
+
+ Pool performs this operation using one of its connections. Other than
+ that, it behaves identically to
+ :meth:`Connection.fetch() <asyncpg.connection.Connection.fetch>`.
+
+ .. versionadded:: 0.10.0
+ """
+ async with self.acquire() as con:
+ return await con.fetch(
+ query,
+ *args,
+ timeout=timeout,
+ record_class=record_class
+ )
+
+ async def fetchval(self, query, *args, column=0, timeout=None):
+ """Run a query and return a value in the first row.
+
+ Pool performs this operation using one of its connections. Other than
+ that, it behaves identically to
+ :meth:`Connection.fetchval()
+ <asyncpg.connection.Connection.fetchval>`.
+
+ .. versionadded:: 0.10.0
+ """
+ async with self.acquire() as con:
+ return await con.fetchval(
+ query, *args, column=column, timeout=timeout)
+
+ async def fetchrow(self, query, *args, timeout=None, record_class=None):
+ """Run a query and return the first row.
+
+ Pool performs this operation using one of its connections. Other than
+ that, it behaves identically to
+ :meth:`Connection.fetchrow() <asyncpg.connection.Connection.fetchrow>`.
+
+ .. versionadded:: 0.10.0
+ """
+ async with self.acquire() as con:
+ return await con.fetchrow(
+ query,
+ *args,
+ timeout=timeout,
+ record_class=record_class
+ )
+
+ async def copy_from_table(
+ self,
+ table_name,
+ *,
+ output,
+ columns=None,
+ schema_name=None,
+ timeout=None,
+ format=None,
+ oids=None,
+ delimiter=None,
+ null=None,
+ header=None,
+ quote=None,
+ escape=None,
+ force_quote=None,
+ encoding=None
+ ):
+ """Copy table contents to a file or file-like object.
+
+ Pool performs this operation using one of its connections. Other than
+ that, it behaves identically to
+ :meth:`Connection.copy_from_table()
+ <asyncpg.connection.Connection.copy_from_table>`.
+
+ .. versionadded:: 0.24.0
+ """
+ async with self.acquire() as con:
+ return await con.copy_from_table(
+ table_name,
+ output=output,
+ columns=columns,
+ schema_name=schema_name,
+ timeout=timeout,
+ format=format,
+ oids=oids,
+ delimiter=delimiter,
+ null=null,
+ header=header,
+ quote=quote,
+ escape=escape,
+ force_quote=force_quote,
+ encoding=encoding
+ )
+
+ async def copy_from_query(
+ self,
+ query,
+ *args,
+ output,
+ timeout=None,
+ format=None,
+ oids=None,
+ delimiter=None,
+ null=None,
+ header=None,
+ quote=None,
+ escape=None,
+ force_quote=None,
+ encoding=None
+ ):
+ """Copy the results of a query to a file or file-like object.
+
+ Pool performs this operation using one of its connections. Other than
+ that, it behaves identically to
+ :meth:`Connection.copy_from_query()
+ <asyncpg.connection.Connection.copy_from_query>`.
+
+ .. versionadded:: 0.24.0
+ """
+ async with self.acquire() as con:
+ return await con.copy_from_query(
+ query,
+ *args,
+ output=output,
+ timeout=timeout,
+ format=format,
+ oids=oids,
+ delimiter=delimiter,
+ null=null,
+ header=header,
+ quote=quote,
+ escape=escape,
+ force_quote=force_quote,
+ encoding=encoding
+ )
+
+ async def copy_to_table(
+ self,
+ table_name,
+ *,
+ source,
+ columns=None,
+ schema_name=None,
+ timeout=None,
+ format=None,
+ oids=None,
+ freeze=None,
+ delimiter=None,
+ null=None,
+ header=None,
+ quote=None,
+ escape=None,
+ force_quote=None,
+ force_not_null=None,
+ force_null=None,
+ encoding=None,
+ where=None
+ ):
+ """Copy data to the specified table.
+
+ Pool performs this operation using one of its connections. Other than
+ that, it behaves identically to
+ :meth:`Connection.copy_to_table()
+ <asyncpg.connection.Connection.copy_to_table>`.
+
+ .. versionadded:: 0.24.0
+ """
+ async with self.acquire() as con:
+ return await con.copy_to_table(
+ table_name,
+ source=source,
+ columns=columns,
+ schema_name=schema_name,
+ timeout=timeout,
+ format=format,
+ oids=oids,
+ freeze=freeze,
+ delimiter=delimiter,
+ null=null,
+ header=header,
+ quote=quote,
+ escape=escape,
+ force_quote=force_quote,
+ force_not_null=force_not_null,
+ force_null=force_null,
+ encoding=encoding,
+ where=where
+ )
+
+ async def copy_records_to_table(
+ self,
+ table_name,
+ *,
+ records,
+ columns=None,
+ schema_name=None,
+ timeout=None,
+ where=None
+ ):
+ """Copy a list of records to the specified table using binary COPY.
+
+ Pool performs this operation using one of its connections. Other than
+ that, it behaves identically to
+ :meth:`Connection.copy_records_to_table()
+ <asyncpg.connection.Connection.copy_records_to_table>`.
+
+ .. versionadded:: 0.24.0
+ """
+ async with self.acquire() as con:
+ return await con.copy_records_to_table(
+ table_name,
+ records=records,
+ columns=columns,
+ schema_name=schema_name,
+ timeout=timeout,
+ where=where
+ )
+
+ def acquire(self, *, timeout=None):
+ """Acquire a database connection from the pool.
+
+ :param float timeout: A timeout for acquiring a Connection.
+ :return: An instance of :class:`~asyncpg.connection.Connection`.
+
+ Can be used in an ``await`` expression or with an ``async with`` block.
+
+ .. code-block:: python
+
+ async with pool.acquire() as con:
+ await con.execute(...)
+
+ Or:
+
+ .. code-block:: python
+
+ con = await pool.acquire()
+ try:
+ await con.execute(...)
+ finally:
+ await pool.release(con)
+ """
+ return PoolAcquireContext(self, timeout)
+
+ async def _acquire(self, timeout):
+ async def _acquire_impl():
+ ch = await self._queue.get() # type: PoolConnectionHolder
+ try:
+ proxy = await ch.acquire() # type: PoolConnectionProxy
+ except (Exception, asyncio.CancelledError):
+ self._queue.put_nowait(ch)
+ raise
+ else:
+ # Record the timeout, as we will apply it by default
+ # in release().
+ ch._timeout = timeout
+ return proxy
+
+ if self._closing:
+ raise exceptions.InterfaceError('pool is closing')
+ self._check_init()
+
+ if timeout is None:
+ return await _acquire_impl()
+ else:
+ return await compat.wait_for(
+ _acquire_impl(), timeout=timeout)
+
+ async def release(self, connection, *, timeout=None):
+ """Release a database connection back to the pool.
+
+ :param Connection connection:
+ A :class:`~asyncpg.connection.Connection` object to release.
+ :param float timeout:
+ A timeout for releasing the connection. If not specified, defaults
+ to the timeout provided in the corresponding call to the
+ :meth:`Pool.acquire() <asyncpg.pool.Pool.acquire>` method.
+
+ .. versionchanged:: 0.14.0
+ Added the *timeout* parameter.
+ """
+ if (type(connection) is not PoolConnectionProxy or
+ connection._holder._pool is not self):
+ raise exceptions.InterfaceError(
+ 'Pool.release() received invalid connection: '
+ '{connection!r} is not a member of this pool'.format(
+ connection=connection))
+
+ if connection._con is None:
+ # Already released, do nothing.
+ return
+
+ self._check_init()
+
+ # Let the connection do its internal housekeeping when its released.
+ connection._con._on_release()
+
+ ch = connection._holder
+ if timeout is None:
+ timeout = ch._timeout
+
+ # Use asyncio.shield() to guarantee that task cancellation
+ # does not prevent the connection from being returned to the
+ # pool properly.
+ return await asyncio.shield(ch.release(timeout))
+
+ async def close(self):
+ """Attempt to gracefully close all connections in the pool.
+
+ Wait until all pool connections are released, close them and
+ shut down the pool. If any error (including cancellation) occurs
+ in ``close()`` the pool will terminate by calling
+ :meth:`Pool.terminate() <pool.Pool.terminate>`.
+
+ It is advisable to use :func:`python:asyncio.wait_for` to set
+ a timeout.
+
+ .. versionchanged:: 0.16.0
+ ``close()`` now waits until all pool connections are released
+ before closing them and the pool. Errors raised in ``close()``
+ will cause immediate pool termination.
+ """
+ if self._closed:
+ return
+ self._check_init()
+
+ self._closing = True
+
+ warning_callback = None
+ try:
+ warning_callback = self._loop.call_later(
+ 60, self._warn_on_long_close)
+
+ release_coros = [
+ ch.wait_until_released() for ch in self._holders]
+ await asyncio.gather(*release_coros)
+
+ close_coros = [
+ ch.close() for ch in self._holders]
+ await asyncio.gather(*close_coros)
+
+ except (Exception, asyncio.CancelledError):
+ self.terminate()
+ raise
+
+ finally:
+ if warning_callback is not None:
+ warning_callback.cancel()
+ self._closed = True
+ self._closing = False
+
+ def _warn_on_long_close(self):
+ logger.warning('Pool.close() is taking over 60 seconds to complete. '
+ 'Check if you have any unreleased connections left. '
+ 'Use asyncio.wait_for() to set a timeout for '
+ 'Pool.close().')
+
+ def terminate(self):
+ """Terminate all connections in the pool."""
+ if self._closed:
+ return
+ self._check_init()
+ for ch in self._holders:
+ ch.terminate()
+ self._closed = True
+
+ async def expire_connections(self):
+ """Expire all currently open connections.
+
+ Cause all currently open connections to get replaced on the
+ next :meth:`~asyncpg.pool.Pool.acquire()` call.
+
+ .. versionadded:: 0.16.0
+ """
+ self._generation += 1
+
+ def _check_init(self):
+ if not self._initialized:
+ if self._initializing:
+ raise exceptions.InterfaceError(
+ 'pool is being initialized, but not yet ready: '
+ 'likely there is a race between creating a pool and '
+ 'using it')
+ raise exceptions.InterfaceError('pool is not initialized')
+ if self._closed:
+ raise exceptions.InterfaceError('pool is closed')
+
+ def _drop_statement_cache(self):
+ # Drop statement cache for all connections in the pool.
+ for ch in self._holders:
+ if ch._con is not None:
+ ch._con._drop_local_statement_cache()
+
+ def _drop_type_cache(self):
+ # Drop type codec cache for all connections in the pool.
+ for ch in self._holders:
+ if ch._con is not None:
+ ch._con._drop_local_type_cache()
+
+ def __await__(self):
+ return self._async__init__().__await__()
+
+ async def __aenter__(self):
+ await self._async__init__()
+ return self
+
+ async def __aexit__(self, *exc):
+ await self.close()
+
+
+class PoolAcquireContext:
+
+ __slots__ = ('timeout', 'connection', 'done', 'pool')
+
+ def __init__(self, pool, timeout):
+ self.pool = pool
+ self.timeout = timeout
+ self.connection = None
+ self.done = False
+
+ async def __aenter__(self):
+ if self.connection is not None or self.done:
+ raise exceptions.InterfaceError('a connection is already acquired')
+ self.connection = await self.pool._acquire(self.timeout)
+ return self.connection
+
+ async def __aexit__(self, *exc):
+ self.done = True
+ con = self.connection
+ self.connection = None
+ await self.pool.release(con)
+
+ def __await__(self):
+ self.done = True
+ return self.pool._acquire(self.timeout).__await__()
+
+
+def create_pool(dsn=None, *,
+ min_size=10,
+ max_size=10,
+ max_queries=50000,
+ max_inactive_connection_lifetime=300.0,
+ setup=None,
+ init=None,
+ loop=None,
+ connection_class=connection.Connection,
+ record_class=protocol.Record,
+ **connect_kwargs):
+ r"""Create a connection pool.
+
+ Can be used either with an ``async with`` block:
+
+ .. code-block:: python
+
+ async with asyncpg.create_pool(user='postgres',
+ command_timeout=60) as pool:
+ await pool.fetch('SELECT 1')
+
+ Or to perform multiple operations on a single connection:
+
+ .. code-block:: python
+
+ async with asyncpg.create_pool(user='postgres',
+ command_timeout=60) as pool:
+ async with pool.acquire() as con:
+ await con.execute('''
+ CREATE TABLE names (
+ id serial PRIMARY KEY,
+ name VARCHAR (255) NOT NULL)
+ ''')
+ await con.fetch('SELECT 1')
+
+ Or directly with ``await`` (not recommended):
+
+ .. code-block:: python
+
+ pool = await asyncpg.create_pool(user='postgres', command_timeout=60)
+ con = await pool.acquire()
+ try:
+ await con.fetch('SELECT 1')
+ finally:
+ await pool.release(con)
+
+ .. warning::
+ Prepared statements and cursors returned by
+ :meth:`Connection.prepare() <asyncpg.connection.Connection.prepare>`
+ and :meth:`Connection.cursor() <asyncpg.connection.Connection.cursor>`
+ become invalid once the connection is released. Likewise, all
+ notification and log listeners are removed, and ``asyncpg`` will
+ issue a warning if there are any listener callbacks registered on a
+ connection that is being released to the pool.
+
+ :param str dsn:
+ Connection arguments specified using as a single string in
+ the following format:
+ ``postgres://user:pass@host:port/database?option=value``.
+
+ :param \*\*connect_kwargs:
+ Keyword arguments for the :func:`~asyncpg.connection.connect`
+ function.
+
+ :param Connection connection_class:
+ The class to use for connections. Must be a subclass of
+ :class:`~asyncpg.connection.Connection`.
+
+ :param type record_class:
+ If specified, the class to use for records returned by queries on
+ the connections in this pool. Must be a subclass of
+ :class:`~asyncpg.Record`.
+
+ :param int min_size:
+ Number of connection the pool will be initialized with.
+
+ :param int max_size:
+ Max number of connections in the pool.
+
+ :param int max_queries:
+ Number of queries after a connection is closed and replaced
+ with a new connection.
+
+ :param float max_inactive_connection_lifetime:
+ Number of seconds after which inactive connections in the
+ pool will be closed. Pass ``0`` to disable this mechanism.
+
+ :param coroutine setup:
+ A coroutine to prepare a connection right before it is returned
+ from :meth:`Pool.acquire() <pool.Pool.acquire>`. An example use
+ case would be to automatically set up notifications listeners for
+ all connections of a pool.
+
+ :param coroutine init:
+ A coroutine to initialize a connection when it is created.
+ An example use case would be to setup type codecs with
+ :meth:`Connection.set_builtin_type_codec() <\
+ asyncpg.connection.Connection.set_builtin_type_codec>`
+ or :meth:`Connection.set_type_codec() <\
+ asyncpg.connection.Connection.set_type_codec>`.
+
+ :param loop:
+ An asyncio event loop instance. If ``None``, the default
+ event loop will be used.
+
+ :return: An instance of :class:`~asyncpg.pool.Pool`.
+
+ .. versionchanged:: 0.10.0
+ An :exc:`~asyncpg.exceptions.InterfaceError` will be raised on any
+ attempted operation on a released connection.
+
+ .. versionchanged:: 0.13.0
+ An :exc:`~asyncpg.exceptions.InterfaceError` will be raised on any
+ attempted operation on a prepared statement or a cursor created
+ on a connection that has been released to the pool.
+
+ .. versionchanged:: 0.13.0
+ An :exc:`~asyncpg.exceptions.InterfaceWarning` will be produced
+ if there are any active listeners (added via
+ :meth:`Connection.add_listener()
+ <asyncpg.connection.Connection.add_listener>`
+ or :meth:`Connection.add_log_listener()
+ <asyncpg.connection.Connection.add_log_listener>`) present on the
+ connection at the moment of its release to the pool.
+
+ .. versionchanged:: 0.22.0
+ Added the *record_class* parameter.
+ """
+ return Pool(
+ dsn,
+ connection_class=connection_class,
+ record_class=record_class,
+ min_size=min_size, max_size=max_size,
+ max_queries=max_queries, loop=loop, setup=setup, init=init,
+ max_inactive_connection_lifetime=max_inactive_connection_lifetime,
+ **connect_kwargs)