about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/asyncpg/prepared_stmt.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/asyncpg/prepared_stmt.py')
-rw-r--r--.venv/lib/python3.12/site-packages/asyncpg/prepared_stmt.py259
1 files changed, 259 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/asyncpg/prepared_stmt.py b/.venv/lib/python3.12/site-packages/asyncpg/prepared_stmt.py
new file mode 100644
index 00000000..8e241d67
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/asyncpg/prepared_stmt.py
@@ -0,0 +1,259 @@
+# Copyright (C) 2016-present the asyncpg authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of asyncpg and is released under
+# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
+
+
+import json
+
+from . import connresource
+from . import cursor
+from . import exceptions
+
+
+class PreparedStatement(connresource.ConnectionResource):
+    """A representation of a prepared statement."""
+
+    __slots__ = ('_state', '_query', '_last_status')
+
+    def __init__(self, connection, query, state):
+        super().__init__(connection)
+        self._state = state
+        self._query = query
+        state.attach()
+        self._last_status = None
+
+    @connresource.guarded
+    def get_name(self) -> str:
+        """Return the name of this prepared statement.
+
+        .. versionadded:: 0.25.0
+        """
+        return self._state.name
+
+    @connresource.guarded
+    def get_query(self) -> str:
+        """Return the text of the query for this prepared statement.
+
+        Example::
+
+            stmt = await connection.prepare('SELECT $1::int')
+            assert stmt.get_query() == "SELECT $1::int"
+        """
+        return self._query
+
+    @connresource.guarded
+    def get_statusmsg(self) -> str:
+        """Return the status of the executed command.
+
+        Example::
+
+            stmt = await connection.prepare('CREATE TABLE mytab (a int)')
+            await stmt.fetch()
+            assert stmt.get_statusmsg() == "CREATE TABLE"
+        """
+        if self._last_status is None:
+            return self._last_status
+        return self._last_status.decode()
+
+    @connresource.guarded
+    def get_parameters(self):
+        """Return a description of statement parameters types.
+
+        :return: A tuple of :class:`asyncpg.types.Type`.
+
+        Example::
+
+            stmt = await connection.prepare('SELECT ($1::int, $2::text)')
+            print(stmt.get_parameters())
+
+            # Will print:
+            #   (Type(oid=23, name='int4', kind='scalar', schema='pg_catalog'),
+            #    Type(oid=25, name='text', kind='scalar', schema='pg_catalog'))
+        """
+        return self._state._get_parameters()
+
+    @connresource.guarded
+    def get_attributes(self):
+        """Return a description of relation attributes (columns).
+
+        :return: A tuple of :class:`asyncpg.types.Attribute`.
+
+        Example::
+
+            st = await self.con.prepare('''
+                SELECT typname, typnamespace FROM pg_type
+            ''')
+            print(st.get_attributes())
+
+            # Will print:
+            #   (Attribute(
+            #       name='typname',
+            #       type=Type(oid=19, name='name', kind='scalar',
+            #                 schema='pg_catalog')),
+            #    Attribute(
+            #       name='typnamespace',
+            #       type=Type(oid=26, name='oid', kind='scalar',
+            #                 schema='pg_catalog')))
+        """
+        return self._state._get_attributes()
+
+    @connresource.guarded
+    def cursor(self, *args, prefetch=None,
+               timeout=None) -> cursor.CursorFactory:
+        """Return a *cursor factory* for the prepared statement.
+
+        :param args: Query arguments.
+        :param int prefetch: The number of rows the *cursor iterator*
+                             will prefetch (defaults to ``50``.)
+        :param float timeout: Optional timeout in seconds.
+
+        :return: A :class:`~cursor.CursorFactory` object.
+        """
+        return cursor.CursorFactory(
+            self._connection,
+            self._query,
+            self._state,
+            args,
+            prefetch,
+            timeout,
+            self._state.record_class,
+        )
+
+    @connresource.guarded
+    async def explain(self, *args, analyze=False):
+        """Return the execution plan of the statement.
+
+        :param args: Query arguments.
+        :param analyze: If ``True``, the statement will be executed and
+                        the run time statitics added to the return value.
+
+        :return: An object representing the execution plan.  This value
+                 is actually a deserialized JSON output of the SQL
+                 ``EXPLAIN`` command.
+        """
+        query = 'EXPLAIN (FORMAT JSON, VERBOSE'
+        if analyze:
+            query += ', ANALYZE) '
+        else:
+            query += ') '
+        query += self._state.query
+
+        if analyze:
+            # From PostgreSQL docs:
+            # Important: Keep in mind that the statement is actually
+            # executed when the ANALYZE option is used. Although EXPLAIN
+            # will discard any output that a SELECT would return, other
+            # side effects of the statement will happen as usual. If you
+            # wish to use EXPLAIN ANALYZE on an INSERT, UPDATE, DELETE,
+            # CREATE TABLE AS, or EXECUTE statement without letting the
+            # command affect your data, use this approach:
+            #     BEGIN;
+            #     EXPLAIN ANALYZE ...;
+            #     ROLLBACK;
+            tr = self._connection.transaction()
+            await tr.start()
+            try:
+                data = await self._connection.fetchval(query, *args)
+            finally:
+                await tr.rollback()
+        else:
+            data = await self._connection.fetchval(query, *args)
+
+        return json.loads(data)
+
+    @connresource.guarded
+    async def fetch(self, *args, timeout=None):
+        r"""Execute the statement and return a list of :class:`Record` objects.
+
+        :param str query: Query text
+        :param args: Query arguments
+        :param float timeout: Optional timeout value in seconds.
+
+        :return: A list of :class:`Record` instances.
+        """
+        data = await self.__bind_execute(args, 0, timeout)
+        return data
+
+    @connresource.guarded
+    async def fetchval(self, *args, column=0, timeout=None):
+        """Execute the statement and return a value in the first row.
+
+        :param args: Query arguments.
+        :param int column: Numeric index within the record of the value to
+                           return (defaults to 0).
+        :param float timeout: Optional timeout value in seconds.
+                            If not specified, defaults to the value of
+                            ``command_timeout`` argument to the ``Connection``
+                            instance constructor.
+
+        :return: The value of the specified column of the first record.
+        """
+        data = await self.__bind_execute(args, 1, timeout)
+        if not data:
+            return None
+        return data[0][column]
+
+    @connresource.guarded
+    async def fetchrow(self, *args, timeout=None):
+        """Execute the statement and return the first row.
+
+        :param str query: Query text
+        :param args: Query arguments
+        :param float timeout: Optional timeout value in seconds.
+
+        :return: The first row as a :class:`Record` instance.
+        """
+        data = await self.__bind_execute(args, 1, timeout)
+        if not data:
+            return None
+        return data[0]
+
+    @connresource.guarded
+    async def executemany(self, args, *, timeout: float=None):
+        """Execute the statement for each sequence of arguments in *args*.
+
+        :param args: An iterable containing sequences of arguments.
+        :param float timeout: Optional timeout value in seconds.
+        :return None: This method discards the results of the operations.
+
+        .. versionadded:: 0.22.0
+        """
+        return await self.__do_execute(
+            lambda protocol: protocol.bind_execute_many(
+                self._state, args, '', timeout))
+
+    async def __do_execute(self, executor):
+        protocol = self._connection._protocol
+        try:
+            return await executor(protocol)
+        except exceptions.OutdatedSchemaCacheError:
+            await self._connection.reload_schema_state()
+            # We can not find all manually created prepared statements, so just
+            # drop known cached ones in the `self._connection`.
+            # Other manually created prepared statements will fail and
+            # invalidate themselves (unfortunately, clearing caches again).
+            self._state.mark_closed()
+            raise
+
+    async def __bind_execute(self, args, limit, timeout):
+        data, status, _ = await self.__do_execute(
+            lambda protocol: protocol.bind_execute(
+                self._state, args, '', limit, True, timeout))
+        self._last_status = status
+        return data
+
+    def _check_open(self, meth_name):
+        if self._state.closed:
+            raise exceptions.InterfaceError(
+                'cannot call PreparedStmt.{}(): '
+                'the prepared statement is closed'.format(meth_name))
+
+    def _check_conn_validity(self, meth_name):
+        self._check_open(meth_name)
+        super()._check_conn_validity(meth_name)
+
+    def __del__(self):
+        self._state.detach()
+        self._connection._maybe_gc_stmt(self._state)