about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/testing/profiling.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sqlalchemy/testing/profiling.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/testing/profiling.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/testing/profiling.py324
1 files changed, 324 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/testing/profiling.py b/.venv/lib/python3.12/site-packages/sqlalchemy/testing/profiling.py
new file mode 100644
index 00000000..0d90947e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/testing/profiling.py
@@ -0,0 +1,324 @@
+# testing/profiling.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+
+"""Profiling support for unit and performance tests.
+
+These are special purpose profiling methods which operate
+in a more fine-grained way than nose's profiling plugin.
+
+"""
+
+from __future__ import annotations
+
+import collections
+import contextlib
+import os
+import platform
+import pstats
+import re
+import sys
+
+from . import config
+from .util import gc_collect
+from ..util import has_compiled_ext
+
+
+try:
+    import cProfile
+except ImportError:
+    cProfile = None
+
+_profile_stats = None
+"""global ProfileStatsFileInstance.
+
+plugin_base assigns this at the start of all tests.
+
+"""
+
+
+_current_test = None
+"""String id of current test.
+
+plugin_base assigns this at the start of each test using
+_start_current_test.
+
+"""
+
+
+def _start_current_test(id_):
+    global _current_test
+    _current_test = id_
+
+    if _profile_stats.force_write:
+        _profile_stats.reset_count()
+
+
+class ProfileStatsFile:
+    """Store per-platform/fn profiling results in a file.
+
+    There was no json module available when this was written, but now
+    the file format which is very deterministically line oriented is kind of
+    handy in any case for diffs and merges.
+
+    """
+
+    def __init__(self, filename, sort="cumulative", dump=None):
+        self.force_write = (
+            config.options is not None and config.options.force_write_profiles
+        )
+        self.write = self.force_write or (
+            config.options is not None and config.options.write_profiles
+        )
+        self.fname = os.path.abspath(filename)
+        self.short_fname = os.path.split(self.fname)[-1]
+        self.data = collections.defaultdict(
+            lambda: collections.defaultdict(dict)
+        )
+        self.dump = dump
+        self.sort = sort
+        self._read()
+        if self.write:
+            # rewrite for the case where features changed,
+            # etc.
+            self._write()
+
+    @property
+    def platform_key(self):
+        dbapi_key = config.db.name + "_" + config.db.driver
+
+        if config.db.name == "sqlite" and config.db.dialect._is_url_file_db(
+            config.db.url
+        ):
+            dbapi_key += "_file"
+
+        # keep it at 2.7, 3.1, 3.2, etc. for now.
+        py_version = ".".join([str(v) for v in sys.version_info[0:2]])
+
+        platform_tokens = [
+            platform.machine(),
+            platform.system().lower(),
+            platform.python_implementation().lower(),
+            py_version,
+            dbapi_key,
+        ]
+
+        platform_tokens.append("dbapiunicode")
+        _has_cext = has_compiled_ext()
+        platform_tokens.append(_has_cext and "cextensions" or "nocextensions")
+        return "_".join(platform_tokens)
+
+    def has_stats(self):
+        test_key = _current_test
+        return (
+            test_key in self.data and self.platform_key in self.data[test_key]
+        )
+
+    def result(self, callcount):
+        test_key = _current_test
+        per_fn = self.data[test_key]
+        per_platform = per_fn[self.platform_key]
+
+        if "counts" not in per_platform:
+            per_platform["counts"] = counts = []
+        else:
+            counts = per_platform["counts"]
+
+        if "current_count" not in per_platform:
+            per_platform["current_count"] = current_count = 0
+        else:
+            current_count = per_platform["current_count"]
+
+        has_count = len(counts) > current_count
+
+        if not has_count:
+            counts.append(callcount)
+            if self.write:
+                self._write()
+            result = None
+        else:
+            result = per_platform["lineno"], counts[current_count]
+        per_platform["current_count"] += 1
+        return result
+
+    def reset_count(self):
+        test_key = _current_test
+        # since self.data is a defaultdict, don't access a key
+        # if we don't know it's there first.
+        if test_key not in self.data:
+            return
+        per_fn = self.data[test_key]
+        if self.platform_key not in per_fn:
+            return
+        per_platform = per_fn[self.platform_key]
+        if "counts" in per_platform:
+            per_platform["counts"][:] = []
+
+    def replace(self, callcount):
+        test_key = _current_test
+        per_fn = self.data[test_key]
+        per_platform = per_fn[self.platform_key]
+        counts = per_platform["counts"]
+        current_count = per_platform["current_count"]
+        if current_count < len(counts):
+            counts[current_count - 1] = callcount
+        else:
+            counts[-1] = callcount
+        if self.write:
+            self._write()
+
+    def _header(self):
+        return (
+            "# %s\n"
+            "# This file is written out on a per-environment basis.\n"
+            "# For each test in aaa_profiling, the corresponding "
+            "function and \n"
+            "# environment is located within this file.  "
+            "If it doesn't exist,\n"
+            "# the test is skipped.\n"
+            "# If a callcount does exist, it is compared "
+            "to what we received. \n"
+            "# assertions are raised if the counts do not match.\n"
+            "# \n"
+            "# To add a new callcount test, apply the function_call_count \n"
+            "# decorator and re-run the tests using the --write-profiles \n"
+            "# option - this file will be rewritten including the new count.\n"
+            "# \n"
+        ) % (self.fname)
+
+    def _read(self):
+        try:
+            profile_f = open(self.fname)
+        except OSError:
+            return
+        for lineno, line in enumerate(profile_f):
+            line = line.strip()
+            if not line or line.startswith("#"):
+                continue
+
+            test_key, platform_key, counts = line.split()
+            per_fn = self.data[test_key]
+            per_platform = per_fn[platform_key]
+            c = [int(count) for count in counts.split(",")]
+            per_platform["counts"] = c
+            per_platform["lineno"] = lineno + 1
+            per_platform["current_count"] = 0
+        profile_f.close()
+
+    def _write(self):
+        print("Writing profile file %s" % self.fname)
+        profile_f = open(self.fname, "w")
+        profile_f.write(self._header())
+        for test_key in sorted(self.data):
+            per_fn = self.data[test_key]
+            profile_f.write("\n# TEST: %s\n\n" % test_key)
+            for platform_key in sorted(per_fn):
+                per_platform = per_fn[platform_key]
+                c = ",".join(str(count) for count in per_platform["counts"])
+                profile_f.write("%s %s %s\n" % (test_key, platform_key, c))
+        profile_f.close()
+
+
+def function_call_count(variance=0.05, times=1, warmup=0):
+    """Assert a target for a test case's function call count.
+
+    The main purpose of this assertion is to detect changes in
+    callcounts for various functions - the actual number is not as important.
+    Callcounts are stored in a file keyed to Python version and OS platform
+    information.  This file is generated automatically for new tests,
+    and versioned so that unexpected changes in callcounts will be detected.
+
+    """
+
+    # use signature-rewriting decorator function so that pytest fixtures
+    # still work on py27.  In Py3, update_wrapper() alone is good enough,
+    # likely due to the introduction of __signature__.
+
+    from sqlalchemy.util import decorator
+
+    @decorator
+    def wrap(fn, *args, **kw):
+        for warm in range(warmup):
+            fn(*args, **kw)
+
+        timerange = range(times)
+        with count_functions(variance=variance):
+            for time in timerange:
+                rv = fn(*args, **kw)
+            return rv
+
+    return wrap
+
+
+@contextlib.contextmanager
+def count_functions(variance=0.05):
+    if cProfile is None:
+        raise config._skip_test_exception("cProfile is not installed")
+
+    if not _profile_stats.has_stats() and not _profile_stats.write:
+        config.skip_test(
+            "No profiling stats available on this "
+            "platform for this function.  Run tests with "
+            "--write-profiles to add statistics to %s for "
+            "this platform." % _profile_stats.short_fname
+        )
+
+    gc_collect()
+
+    pr = cProfile.Profile()
+    pr.enable()
+    # began = time.time()
+    yield
+    # ended = time.time()
+    pr.disable()
+
+    # s = StringIO()
+    stats = pstats.Stats(pr, stream=sys.stdout)
+
+    # timespent = ended - began
+    callcount = stats.total_calls
+
+    expected = _profile_stats.result(callcount)
+
+    if expected is None:
+        expected_count = None
+    else:
+        line_no, expected_count = expected
+
+    print("Pstats calls: %d Expected %s" % (callcount, expected_count))
+    stats.sort_stats(*re.split(r"[, ]", _profile_stats.sort))
+    stats.print_stats()
+    if _profile_stats.dump:
+        base, ext = os.path.splitext(_profile_stats.dump)
+        test_name = _current_test.split(".")[-1]
+        dumpfile = "%s_%s%s" % (base, test_name, ext or ".profile")
+        stats.dump_stats(dumpfile)
+        print("Dumped stats to file %s" % dumpfile)
+    # stats.print_callers()
+    if _profile_stats.force_write:
+        _profile_stats.replace(callcount)
+    elif expected_count:
+        deviance = int(callcount * variance)
+        failed = abs(callcount - expected_count) > deviance
+
+        if failed:
+            if _profile_stats.write:
+                _profile_stats.replace(callcount)
+            else:
+                raise AssertionError(
+                    "Adjusted function call count %s not within %s%% "
+                    "of expected %s, platform %s. Rerun with "
+                    "--write-profiles to "
+                    "regenerate this callcount."
+                    % (
+                        callcount,
+                        (variance * 100),
+                        expected_count,
+                        _profile_stats.platform_key,
+                    )
+                )