aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/psycopg2/sql.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/psycopg2/sql.py')
-rw-r--r--.venv/lib/python3.12/site-packages/psycopg2/sql.py455
1 files changed, 455 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/psycopg2/sql.py b/.venv/lib/python3.12/site-packages/psycopg2/sql.py
new file mode 100644
index 00000000..69b352b7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/psycopg2/sql.py
@@ -0,0 +1,455 @@
+"""SQL composition utility module
+"""
+
+# psycopg/sql.py - SQL composition utility module
+#
+# Copyright (C) 2016-2019 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+# Copyright (C) 2020-2021 The Psycopg Team
+#
+# psycopg2 is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# In addition, as a special exception, the copyright holders give
+# permission to link this program with the OpenSSL library (or with
+# modified versions of OpenSSL that use the same license as OpenSSL),
+# and distribute linked combinations including the two.
+#
+# You must obey the GNU Lesser General Public License in all respects for
+# all of the code used other than OpenSSL.
+#
+# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+# License for more details.
+
+import string
+
+from psycopg2 import extensions as ext
+
+
+_formatter = string.Formatter()
+
+
+class Composable:
+ """
+ Abstract base class for objects that can be used to compose an SQL string.
+
+ `!Composable` objects can be passed directly to `~cursor.execute()`,
+ `~cursor.executemany()`, `~cursor.copy_expert()` in place of the query
+ string.
+
+ `!Composable` objects can be joined using the ``+`` operator: the result
+ will be a `Composed` instance containing the objects joined. The operator
+ ``*`` is also supported with an integer argument: the result is a
+ `!Composed` instance containing the left argument repeated as many times as
+ requested.
+ """
+ def __init__(self, wrapped):
+ self._wrapped = wrapped
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}({self._wrapped!r})"
+
+ def as_string(self, context):
+ """
+ Return the string value of the object.
+
+ :param context: the context to evaluate the string into.
+ :type context: `connection` or `cursor`
+
+ The method is automatically invoked by `~cursor.execute()`,
+ `~cursor.executemany()`, `~cursor.copy_expert()` if a `!Composable` is
+ passed instead of the query string.
+ """
+ raise NotImplementedError
+
+ def __add__(self, other):
+ if isinstance(other, Composed):
+ return Composed([self]) + other
+ if isinstance(other, Composable):
+ return Composed([self]) + Composed([other])
+ else:
+ return NotImplemented
+
+ def __mul__(self, n):
+ return Composed([self] * n)
+
+ def __eq__(self, other):
+ return type(self) is type(other) and self._wrapped == other._wrapped
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+
+class Composed(Composable):
+ """
+ A `Composable` object made of a sequence of `!Composable`.
+
+ The object is usually created using `!Composable` operators and methods.
+ However it is possible to create a `!Composed` directly specifying a
+ sequence of `!Composable` as arguments.
+
+ Example::
+
+ >>> comp = sql.Composed(
+ ... [sql.SQL("insert into "), sql.Identifier("table")])
+ >>> print(comp.as_string(conn))
+ insert into "table"
+
+ `!Composed` objects are iterable (so they can be used in `SQL.join` for
+ instance).
+ """
+ def __init__(self, seq):
+ wrapped = []
+ for i in seq:
+ if not isinstance(i, Composable):
+ raise TypeError(
+ f"Composed elements must be Composable, got {i!r} instead")
+ wrapped.append(i)
+
+ super().__init__(wrapped)
+
+ @property
+ def seq(self):
+ """The list of the content of the `!Composed`."""
+ return list(self._wrapped)
+
+ def as_string(self, context):
+ rv = []
+ for i in self._wrapped:
+ rv.append(i.as_string(context))
+ return ''.join(rv)
+
+ def __iter__(self):
+ return iter(self._wrapped)
+
+ def __add__(self, other):
+ if isinstance(other, Composed):
+ return Composed(self._wrapped + other._wrapped)
+ if isinstance(other, Composable):
+ return Composed(self._wrapped + [other])
+ else:
+ return NotImplemented
+
+ def join(self, joiner):
+ """
+ Return a new `!Composed` interposing the *joiner* with the `!Composed` items.
+
+ The *joiner* must be a `SQL` or a string which will be interpreted as
+ an `SQL`.
+
+ Example::
+
+ >>> fields = sql.Identifier('foo') + sql.Identifier('bar') # a Composed
+ >>> print(fields.join(', ').as_string(conn))
+ "foo", "bar"
+
+ """
+ if isinstance(joiner, str):
+ joiner = SQL(joiner)
+ elif not isinstance(joiner, SQL):
+ raise TypeError(
+ "Composed.join() argument must be a string or an SQL")
+
+ return joiner.join(self)
+
+
+class SQL(Composable):
+ """
+ A `Composable` representing a snippet of SQL statement.
+
+ `!SQL` exposes `join()` and `format()` methods useful to create a template
+ where to merge variable parts of a query (for instance field or table
+ names).
+
+ The *string* doesn't undergo any form of escaping, so it is not suitable to
+ represent variable identifiers or values: you should only use it to pass
+ constant strings representing templates or snippets of SQL statements; use
+ other objects such as `Identifier` or `Literal` to represent variable
+ parts.
+
+ Example::
+
+ >>> query = sql.SQL("select {0} from {1}").format(
+ ... sql.SQL(', ').join([sql.Identifier('foo'), sql.Identifier('bar')]),
+ ... sql.Identifier('table'))
+ >>> print(query.as_string(conn))
+ select "foo", "bar" from "table"
+ """
+ def __init__(self, string):
+ if not isinstance(string, str):
+ raise TypeError("SQL values must be strings")
+ super().__init__(string)
+
+ @property
+ def string(self):
+ """The string wrapped by the `!SQL` object."""
+ return self._wrapped
+
+ def as_string(self, context):
+ return self._wrapped
+
+ def format(self, *args, **kwargs):
+ """
+ Merge `Composable` objects into a template.
+
+ :param `Composable` args: parameters to replace to numbered
+ (``{0}``, ``{1}``) or auto-numbered (``{}``) placeholders
+ :param `Composable` kwargs: parameters to replace to named (``{name}``)
+ placeholders
+ :return: the union of the `!SQL` string with placeholders replaced
+ :rtype: `Composed`
+
+ The method is similar to the Python `str.format()` method: the string
+ template supports auto-numbered (``{}``), numbered (``{0}``,
+ ``{1}``...), and named placeholders (``{name}``), with positional
+ arguments replacing the numbered placeholders and keywords replacing
+ the named ones. However placeholder modifiers (``{0!r}``, ``{0:<10}``)
+ are not supported. Only `!Composable` objects can be passed to the
+ template.
+
+ Example::
+
+ >>> print(sql.SQL("select * from {} where {} = %s")
+ ... .format(sql.Identifier('people'), sql.Identifier('id'))
+ ... .as_string(conn))
+ select * from "people" where "id" = %s
+
+ >>> print(sql.SQL("select * from {tbl} where {pkey} = %s")
+ ... .format(tbl=sql.Identifier('people'), pkey=sql.Identifier('id'))
+ ... .as_string(conn))
+ select * from "people" where "id" = %s
+
+ """
+ rv = []
+ autonum = 0
+ for pre, name, spec, conv in _formatter.parse(self._wrapped):
+ if spec:
+ raise ValueError("no format specification supported by SQL")
+ if conv:
+ raise ValueError("no format conversion supported by SQL")
+ if pre:
+ rv.append(SQL(pre))
+
+ if name is None:
+ continue
+
+ if name.isdigit():
+ if autonum:
+ raise ValueError(
+ "cannot switch from automatic field numbering to manual")
+ rv.append(args[int(name)])
+ autonum = None
+
+ elif not name:
+ if autonum is None:
+ raise ValueError(
+ "cannot switch from manual field numbering to automatic")
+ rv.append(args[autonum])
+ autonum += 1
+
+ else:
+ rv.append(kwargs[name])
+
+ return Composed(rv)
+
+ def join(self, seq):
+ """
+ Join a sequence of `Composable`.
+
+ :param seq: the elements to join.
+ :type seq: iterable of `!Composable`
+
+ Use the `!SQL` object's *string* to separate the elements in *seq*.
+ Note that `Composed` objects are iterable too, so they can be used as
+ argument for this method.
+
+ Example::
+
+ >>> snip = sql.SQL(', ').join(
+ ... sql.Identifier(n) for n in ['foo', 'bar', 'baz'])
+ >>> print(snip.as_string(conn))
+ "foo", "bar", "baz"
+ """
+ rv = []
+ it = iter(seq)
+ try:
+ rv.append(next(it))
+ except StopIteration:
+ pass
+ else:
+ for i in it:
+ rv.append(self)
+ rv.append(i)
+
+ return Composed(rv)
+
+
+class Identifier(Composable):
+ """
+ A `Composable` representing an SQL identifier or a dot-separated sequence.
+
+ Identifiers usually represent names of database objects, such as tables or
+ fields. PostgreSQL identifiers follow `different rules`__ than SQL string
+ literals for escaping (e.g. they use double quotes instead of single).
+
+ .. __: https://www.postgresql.org/docs/current/static/sql-syntax-lexical.html# \
+ SQL-SYNTAX-IDENTIFIERS
+
+ Example::
+
+ >>> t1 = sql.Identifier("foo")
+ >>> t2 = sql.Identifier("ba'r")
+ >>> t3 = sql.Identifier('ba"z')
+ >>> print(sql.SQL(', ').join([t1, t2, t3]).as_string(conn))
+ "foo", "ba'r", "ba""z"
+
+ Multiple strings can be passed to the object to represent a qualified name,
+ i.e. a dot-separated sequence of identifiers.
+
+ Example::
+
+ >>> query = sql.SQL("select {} from {}").format(
+ ... sql.Identifier("table", "field"),
+ ... sql.Identifier("schema", "table"))
+ >>> print(query.as_string(conn))
+ select "table"."field" from "schema"."table"
+
+ """
+ def __init__(self, *strings):
+ if not strings:
+ raise TypeError("Identifier cannot be empty")
+
+ for s in strings:
+ if not isinstance(s, str):
+ raise TypeError("SQL identifier parts must be strings")
+
+ super().__init__(strings)
+
+ @property
+ def strings(self):
+ """A tuple with the strings wrapped by the `Identifier`."""
+ return self._wrapped
+
+ @property
+ def string(self):
+ """The string wrapped by the `Identifier`.
+ """
+ if len(self._wrapped) == 1:
+ return self._wrapped[0]
+ else:
+ raise AttributeError(
+ "the Identifier wraps more than one than one string")
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}({', '.join(map(repr, self._wrapped))})"
+
+ def as_string(self, context):
+ return '.'.join(ext.quote_ident(s, context) for s in self._wrapped)
+
+
+class Literal(Composable):
+ """
+ A `Composable` representing an SQL value to include in a query.
+
+ Usually you will want to include placeholders in the query and pass values
+ as `~cursor.execute()` arguments. If however you really really need to
+ include a literal value in the query you can use this object.
+
+ The string returned by `!as_string()` follows the normal :ref:`adaptation
+ rules <python-types-adaptation>` for Python objects.
+
+ Example::
+
+ >>> s1 = sql.Literal("foo")
+ >>> s2 = sql.Literal("ba'r")
+ >>> s3 = sql.Literal(42)
+ >>> print(sql.SQL(', ').join([s1, s2, s3]).as_string(conn))
+ 'foo', 'ba''r', 42
+
+ """
+ @property
+ def wrapped(self):
+ """The object wrapped by the `!Literal`."""
+ return self._wrapped
+
+ def as_string(self, context):
+ # is it a connection or cursor?
+ if isinstance(context, ext.connection):
+ conn = context
+ elif isinstance(context, ext.cursor):
+ conn = context.connection
+ else:
+ raise TypeError("context must be a connection or a cursor")
+
+ a = ext.adapt(self._wrapped)
+ if hasattr(a, 'prepare'):
+ a.prepare(conn)
+
+ rv = a.getquoted()
+ if isinstance(rv, bytes):
+ rv = rv.decode(ext.encodings[conn.encoding])
+
+ return rv
+
+
+class Placeholder(Composable):
+ """A `Composable` representing a placeholder for query parameters.
+
+ If the name is specified, generate a named placeholder (e.g. ``%(name)s``),
+ otherwise generate a positional placeholder (e.g. ``%s``).
+
+ The object is useful to generate SQL queries with a variable number of
+ arguments.
+
+ Examples::
+
+ >>> names = ['foo', 'bar', 'baz']
+
+ >>> q1 = sql.SQL("insert into table ({}) values ({})").format(
+ ... sql.SQL(', ').join(map(sql.Identifier, names)),
+ ... sql.SQL(', ').join(sql.Placeholder() * len(names)))
+ >>> print(q1.as_string(conn))
+ insert into table ("foo", "bar", "baz") values (%s, %s, %s)
+
+ >>> q2 = sql.SQL("insert into table ({}) values ({})").format(
+ ... sql.SQL(', ').join(map(sql.Identifier, names)),
+ ... sql.SQL(', ').join(map(sql.Placeholder, names)))
+ >>> print(q2.as_string(conn))
+ insert into table ("foo", "bar", "baz") values (%(foo)s, %(bar)s, %(baz)s)
+
+ """
+
+ def __init__(self, name=None):
+ if isinstance(name, str):
+ if ')' in name:
+ raise ValueError(f"invalid name: {name!r}")
+
+ elif name is not None:
+ raise TypeError(f"expected string or None as name, got {name!r}")
+
+ super().__init__(name)
+
+ @property
+ def name(self):
+ """The name of the `!Placeholder`."""
+ return self._wrapped
+
+ def __repr__(self):
+ if self._wrapped is None:
+ return f"{self.__class__.__name__}()"
+ else:
+ return f"{self.__class__.__name__}({self._wrapped!r})"
+
+ def as_string(self, context):
+ if self._wrapped is not None:
+ return f"%({self._wrapped})s"
+ else:
+ return "%s"
+
+
+# Literals
+NULL = SQL("NULL")
+DEFAULT = SQL("DEFAULT")