aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/asyncpg.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/integrations/asyncpg.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/integrations/asyncpg.py208
1 files changed, 208 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/asyncpg.py b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/asyncpg.py
new file mode 100644
index 00000000..b6b53f46
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/integrations/asyncpg.py
@@ -0,0 +1,208 @@
+from __future__ import annotations
+import contextlib
+from typing import Any, TypeVar, Callable, Awaitable, Iterator
+
+import sentry_sdk
+from sentry_sdk.consts import OP, SPANDATA
+from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable
+from sentry_sdk.tracing import Span
+from sentry_sdk.tracing_utils import add_query_source, record_sql_queries
+from sentry_sdk.utils import (
+ ensure_integration_enabled,
+ parse_version,
+ capture_internal_exceptions,
+)
+
+try:
+ import asyncpg # type: ignore[import-not-found]
+ from asyncpg.cursor import BaseCursor # type: ignore
+
+except ImportError:
+ raise DidNotEnable("asyncpg not installed.")
+
+
+class AsyncPGIntegration(Integration):
+ identifier = "asyncpg"
+ origin = f"auto.db.{identifier}"
+ _record_params = False
+
+ def __init__(self, *, record_params: bool = False):
+ AsyncPGIntegration._record_params = record_params
+
+ @staticmethod
+ def setup_once() -> None:
+ # asyncpg.__version__ is a string containing the semantic version in the form of "<major>.<minor>.<patch>"
+ asyncpg_version = parse_version(asyncpg.__version__)
+ _check_minimum_version(AsyncPGIntegration, asyncpg_version)
+
+ asyncpg.Connection.execute = _wrap_execute(
+ asyncpg.Connection.execute,
+ )
+
+ asyncpg.Connection._execute = _wrap_connection_method(
+ asyncpg.Connection._execute
+ )
+ asyncpg.Connection._executemany = _wrap_connection_method(
+ asyncpg.Connection._executemany, executemany=True
+ )
+ asyncpg.Connection.cursor = _wrap_cursor_creation(asyncpg.Connection.cursor)
+ asyncpg.Connection.prepare = _wrap_connection_method(asyncpg.Connection.prepare)
+ asyncpg.connect_utils._connect_addr = _wrap_connect_addr(
+ asyncpg.connect_utils._connect_addr
+ )
+
+
+T = TypeVar("T")
+
+
+def _wrap_execute(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
+ async def _inner(*args: Any, **kwargs: Any) -> T:
+ if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
+ return await f(*args, **kwargs)
+
+ # Avoid recording calls to _execute twice.
+ # Calls to Connection.execute with args also call
+ # Connection._execute, which is recorded separately
+ # args[0] = the connection object, args[1] is the query
+ if len(args) > 2:
+ return await f(*args, **kwargs)
+
+ query = args[1]
+ with record_sql_queries(
+ cursor=None,
+ query=query,
+ params_list=None,
+ paramstyle=None,
+ executemany=False,
+ span_origin=AsyncPGIntegration.origin,
+ ) as span:
+ res = await f(*args, **kwargs)
+
+ with capture_internal_exceptions():
+ add_query_source(span)
+
+ return res
+
+ return _inner
+
+
+SubCursor = TypeVar("SubCursor", bound=BaseCursor)
+
+
+@contextlib.contextmanager
+def _record(
+ cursor: SubCursor | None,
+ query: str,
+ params_list: tuple[Any, ...] | None,
+ *,
+ executemany: bool = False,
+) -> Iterator[Span]:
+ integration = sentry_sdk.get_client().get_integration(AsyncPGIntegration)
+ if integration is not None and not integration._record_params:
+ params_list = None
+
+ param_style = "pyformat" if params_list else None
+
+ with record_sql_queries(
+ cursor=cursor,
+ query=query,
+ params_list=params_list,
+ paramstyle=param_style,
+ executemany=executemany,
+ record_cursor_repr=cursor is not None,
+ span_origin=AsyncPGIntegration.origin,
+ ) as span:
+ yield span
+
+
+def _wrap_connection_method(
+ f: Callable[..., Awaitable[T]], *, executemany: bool = False
+) -> Callable[..., Awaitable[T]]:
+ async def _inner(*args: Any, **kwargs: Any) -> T:
+ if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
+ return await f(*args, **kwargs)
+ query = args[1]
+ params_list = args[2] if len(args) > 2 else None
+ with _record(None, query, params_list, executemany=executemany) as span:
+ _set_db_data(span, args[0])
+ res = await f(*args, **kwargs)
+
+ return res
+
+ return _inner
+
+
+def _wrap_cursor_creation(f: Callable[..., T]) -> Callable[..., T]:
+ @ensure_integration_enabled(AsyncPGIntegration, f)
+ def _inner(*args: Any, **kwargs: Any) -> T: # noqa: N807
+ query = args[1]
+ params_list = args[2] if len(args) > 2 else None
+
+ with _record(
+ None,
+ query,
+ params_list,
+ executemany=False,
+ ) as span:
+ _set_db_data(span, args[0])
+ res = f(*args, **kwargs)
+ span.set_data("db.cursor", res)
+
+ return res
+
+ return _inner
+
+
+def _wrap_connect_addr(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
+ async def _inner(*args: Any, **kwargs: Any) -> T:
+ if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
+ return await f(*args, **kwargs)
+
+ user = kwargs["params"].user
+ database = kwargs["params"].database
+
+ with sentry_sdk.start_span(
+ op=OP.DB,
+ name="connect",
+ origin=AsyncPGIntegration.origin,
+ ) as span:
+ span.set_data(SPANDATA.DB_SYSTEM, "postgresql")
+ addr = kwargs.get("addr")
+ if addr:
+ try:
+ span.set_data(SPANDATA.SERVER_ADDRESS, addr[0])
+ span.set_data(SPANDATA.SERVER_PORT, addr[1])
+ except IndexError:
+ pass
+ span.set_data(SPANDATA.DB_NAME, database)
+ span.set_data(SPANDATA.DB_USER, user)
+
+ with capture_internal_exceptions():
+ sentry_sdk.add_breadcrumb(
+ message="connect", category="query", data=span._data
+ )
+ res = await f(*args, **kwargs)
+
+ return res
+
+ return _inner
+
+
+def _set_db_data(span: Span, conn: Any) -> None:
+ span.set_data(SPANDATA.DB_SYSTEM, "postgresql")
+
+ addr = conn._addr
+ if addr:
+ try:
+ span.set_data(SPANDATA.SERVER_ADDRESS, addr[0])
+ span.set_data(SPANDATA.SERVER_PORT, addr[1])
+ except IndexError:
+ pass
+
+ database = conn._params.database
+ if database:
+ span.set_data(SPANDATA.DB_NAME, database)
+
+ user = conn._params.user
+ if user:
+ span.set_data(SPANDATA.DB_USER, user)