1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
|
# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
import json
from . import connresource
from . import cursor
from . import exceptions
class PreparedStatement(connresource.ConnectionResource):
"""A representation of a prepared statement."""
__slots__ = ('_state', '_query', '_last_status')
def __init__(self, connection, query, state):
super().__init__(connection)
self._state = state
self._query = query
state.attach()
self._last_status = None
@connresource.guarded
def get_name(self) -> str:
"""Return the name of this prepared statement.
.. versionadded:: 0.25.0
"""
return self._state.name
@connresource.guarded
def get_query(self) -> str:
"""Return the text of the query for this prepared statement.
Example::
stmt = await connection.prepare('SELECT $1::int')
assert stmt.get_query() == "SELECT $1::int"
"""
return self._query
@connresource.guarded
def get_statusmsg(self) -> str:
"""Return the status of the executed command.
Example::
stmt = await connection.prepare('CREATE TABLE mytab (a int)')
await stmt.fetch()
assert stmt.get_statusmsg() == "CREATE TABLE"
"""
if self._last_status is None:
return self._last_status
return self._last_status.decode()
@connresource.guarded
def get_parameters(self):
"""Return a description of statement parameters types.
:return: A tuple of :class:`asyncpg.types.Type`.
Example::
stmt = await connection.prepare('SELECT ($1::int, $2::text)')
print(stmt.get_parameters())
# Will print:
# (Type(oid=23, name='int4', kind='scalar', schema='pg_catalog'),
# Type(oid=25, name='text', kind='scalar', schema='pg_catalog'))
"""
return self._state._get_parameters()
@connresource.guarded
def get_attributes(self):
"""Return a description of relation attributes (columns).
:return: A tuple of :class:`asyncpg.types.Attribute`.
Example::
st = await self.con.prepare('''
SELECT typname, typnamespace FROM pg_type
''')
print(st.get_attributes())
# Will print:
# (Attribute(
# name='typname',
# type=Type(oid=19, name='name', kind='scalar',
# schema='pg_catalog')),
# Attribute(
# name='typnamespace',
# type=Type(oid=26, name='oid', kind='scalar',
# schema='pg_catalog')))
"""
return self._state._get_attributes()
@connresource.guarded
def cursor(self, *args, prefetch=None,
timeout=None) -> cursor.CursorFactory:
"""Return a *cursor factory* for the prepared statement.
:param args: Query arguments.
:param int prefetch: The number of rows the *cursor iterator*
will prefetch (defaults to ``50``.)
:param float timeout: Optional timeout in seconds.
:return: A :class:`~cursor.CursorFactory` object.
"""
return cursor.CursorFactory(
self._connection,
self._query,
self._state,
args,
prefetch,
timeout,
self._state.record_class,
)
@connresource.guarded
async def explain(self, *args, analyze=False):
"""Return the execution plan of the statement.
:param args: Query arguments.
:param analyze: If ``True``, the statement will be executed and
the run time statitics added to the return value.
:return: An object representing the execution plan. This value
is actually a deserialized JSON output of the SQL
``EXPLAIN`` command.
"""
query = 'EXPLAIN (FORMAT JSON, VERBOSE'
if analyze:
query += ', ANALYZE) '
else:
query += ') '
query += self._state.query
if analyze:
# From PostgreSQL docs:
# Important: Keep in mind that the statement is actually
# executed when the ANALYZE option is used. Although EXPLAIN
# will discard any output that a SELECT would return, other
# side effects of the statement will happen as usual. If you
# wish to use EXPLAIN ANALYZE on an INSERT, UPDATE, DELETE,
# CREATE TABLE AS, or EXECUTE statement without letting the
# command affect your data, use this approach:
# BEGIN;
# EXPLAIN ANALYZE ...;
# ROLLBACK;
tr = self._connection.transaction()
await tr.start()
try:
data = await self._connection.fetchval(query, *args)
finally:
await tr.rollback()
else:
data = await self._connection.fetchval(query, *args)
return json.loads(data)
@connresource.guarded
async def fetch(self, *args, timeout=None):
r"""Execute the statement and return a list of :class:`Record` objects.
:param str query: Query text
:param args: Query arguments
:param float timeout: Optional timeout value in seconds.
:return: A list of :class:`Record` instances.
"""
data = await self.__bind_execute(args, 0, timeout)
return data
@connresource.guarded
async def fetchval(self, *args, column=0, timeout=None):
"""Execute the statement and return a value in the first row.
:param args: Query arguments.
:param int column: Numeric index within the record of the value to
return (defaults to 0).
:param float timeout: Optional timeout value in seconds.
If not specified, defaults to the value of
``command_timeout`` argument to the ``Connection``
instance constructor.
:return: The value of the specified column of the first record.
"""
data = await self.__bind_execute(args, 1, timeout)
if not data:
return None
return data[0][column]
@connresource.guarded
async def fetchrow(self, *args, timeout=None):
"""Execute the statement and return the first row.
:param str query: Query text
:param args: Query arguments
:param float timeout: Optional timeout value in seconds.
:return: The first row as a :class:`Record` instance.
"""
data = await self.__bind_execute(args, 1, timeout)
if not data:
return None
return data[0]
@connresource.guarded
async def executemany(self, args, *, timeout: float=None):
"""Execute the statement for each sequence of arguments in *args*.
:param args: An iterable containing sequences of arguments.
:param float timeout: Optional timeout value in seconds.
:return None: This method discards the results of the operations.
.. versionadded:: 0.22.0
"""
return await self.__do_execute(
lambda protocol: protocol.bind_execute_many(
self._state, args, '', timeout))
async def __do_execute(self, executor):
protocol = self._connection._protocol
try:
return await executor(protocol)
except exceptions.OutdatedSchemaCacheError:
await self._connection.reload_schema_state()
# We can not find all manually created prepared statements, so just
# drop known cached ones in the `self._connection`.
# Other manually created prepared statements will fail and
# invalidate themselves (unfortunately, clearing caches again).
self._state.mark_closed()
raise
async def __bind_execute(self, args, limit, timeout):
data, status, _ = await self.__do_execute(
lambda protocol: protocol.bind_execute(
self._state, args, '', limit, True, timeout))
self._last_status = status
return data
def _check_open(self, meth_name):
if self._state.closed:
raise exceptions.InterfaceError(
'cannot call PreparedStmt.{}(): '
'the prepared statement is closed'.format(meth_name))
def _check_conn_validity(self, meth_name):
self._check_open(meth_name)
super()._check_conn_validity(meth_name)
def __del__(self):
self._state.detach()
self._connection._maybe_gc_stmt(self._state)
|