about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/greenlet/tests/test_tracing.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/greenlet/tests/test_tracing.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/greenlet/tests/test_tracing.py')
-rw-r--r--.venv/lib/python3.12/site-packages/greenlet/tests/test_tracing.py291
1 files changed, 291 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/greenlet/tests/test_tracing.py b/.venv/lib/python3.12/site-packages/greenlet/tests/test_tracing.py
new file mode 100644
index 00000000..c044d4b6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/greenlet/tests/test_tracing.py
@@ -0,0 +1,291 @@
+from __future__ import print_function
+import sys
+import greenlet
+import unittest
+
+from . import TestCase
+from . import PY312
+
+# https://discuss.python.org/t/cpython-3-12-greenlet-and-tracing-profiling-how-to-not-crash-and-get-correct-results/33144/2
+DEBUG_BUILD_PY312 = (
+    PY312 and hasattr(sys, 'gettotalrefcount'),
+    "Broken on debug builds of Python 3.12"
+)
+
+class SomeError(Exception):
+    pass
+
+class GreenletTracer(object):
+    oldtrace = None
+
+    def __init__(self, error_on_trace=False):
+        self.actions = []
+        self.error_on_trace = error_on_trace
+
+    def __call__(self, *args):
+        self.actions.append(args)
+        if self.error_on_trace:
+            raise SomeError
+
+    def __enter__(self):
+        self.oldtrace = greenlet.settrace(self)
+        return self.actions
+
+    def __exit__(self, *args):
+        greenlet.settrace(self.oldtrace)
+
+
+class TestGreenletTracing(TestCase):
+    """
+    Tests of ``greenlet.settrace()``
+    """
+
+    def test_a_greenlet_tracing(self):
+        main = greenlet.getcurrent()
+        def dummy():
+            pass
+        def dummyexc():
+            raise SomeError()
+
+        with GreenletTracer() as actions:
+            g1 = greenlet.greenlet(dummy)
+            g1.switch()
+            g2 = greenlet.greenlet(dummyexc)
+            self.assertRaises(SomeError, g2.switch)
+
+        self.assertEqual(actions, [
+            ('switch', (main, g1)),
+            ('switch', (g1, main)),
+            ('switch', (main, g2)),
+            ('throw', (g2, main)),
+        ])
+
+    def test_b_exception_disables_tracing(self):
+        main = greenlet.getcurrent()
+        def dummy():
+            main.switch()
+        g = greenlet.greenlet(dummy)
+        g.switch()
+        with GreenletTracer(error_on_trace=True) as actions:
+            self.assertRaises(SomeError, g.switch)
+            self.assertEqual(greenlet.gettrace(), None)
+
+        self.assertEqual(actions, [
+            ('switch', (main, g)),
+        ])
+
+    def test_set_same_tracer_twice(self):
+        # https://github.com/python-greenlet/greenlet/issues/332
+        # Our logic in asserting that the tracefunction should
+        # gain a reference was incorrect if the same tracefunction was set
+        # twice.
+        tracer = GreenletTracer()
+        with tracer:
+            greenlet.settrace(tracer)
+
+
+class PythonTracer(object):
+    oldtrace = None
+
+    def __init__(self):
+        self.actions = []
+
+    def __call__(self, frame, event, arg):
+        # Record the co_name so we have an idea what function we're in.
+        self.actions.append((event, frame.f_code.co_name))
+
+    def __enter__(self):
+        self.oldtrace = sys.setprofile(self)
+        return self.actions
+
+    def __exit__(self, *args):
+        sys.setprofile(self.oldtrace)
+
+def tpt_callback():
+    return 42
+
+class TestPythonTracing(TestCase):
+    """
+    Tests of the interaction of ``sys.settrace()``
+    with greenlet facilities.
+
+    NOTE: Most of this is probably CPython specific.
+    """
+
+    maxDiff = None
+
+    def test_trace_events_trivial(self):
+        with PythonTracer() as actions:
+            tpt_callback()
+        # If we use the sys.settrace instead of setprofile, we get
+        # this:
+
+        # self.assertEqual(actions, [
+        #     ('call', 'tpt_callback'),
+        #     ('call', '__exit__'),
+        # ])
+
+        self.assertEqual(actions, [
+            ('return', '__enter__'),
+            ('call', 'tpt_callback'),
+            ('return', 'tpt_callback'),
+            ('call', '__exit__'),
+            ('c_call', '__exit__'),
+        ])
+
+    def _trace_switch(self, glet):
+        with PythonTracer() as actions:
+            glet.switch()
+        return actions
+
+    def _check_trace_events_func_already_set(self, glet):
+        actions = self._trace_switch(glet)
+        self.assertEqual(actions, [
+            ('return', '__enter__'),
+            ('c_call', '_trace_switch'),
+            ('call', 'run'),
+            ('call', 'tpt_callback'),
+            ('return', 'tpt_callback'),
+            ('return', 'run'),
+            ('c_return', '_trace_switch'),
+            ('call', '__exit__'),
+            ('c_call', '__exit__'),
+        ])
+
+    def test_trace_events_into_greenlet_func_already_set(self):
+        def run():
+            return tpt_callback()
+
+        self._check_trace_events_func_already_set(greenlet.greenlet(run))
+
+    def test_trace_events_into_greenlet_subclass_already_set(self):
+        class X(greenlet.greenlet):
+            def run(self):
+                return tpt_callback()
+        self._check_trace_events_func_already_set(X())
+
+    def _check_trace_events_from_greenlet_sets_profiler(self, g, tracer):
+        g.switch()
+        tpt_callback()
+        tracer.__exit__()
+        self.assertEqual(tracer.actions, [
+            ('return', '__enter__'),
+            ('call', 'tpt_callback'),
+            ('return', 'tpt_callback'),
+            ('return', 'run'),
+            ('call', 'tpt_callback'),
+            ('return', 'tpt_callback'),
+            ('call', '__exit__'),
+            ('c_call', '__exit__'),
+        ])
+
+
+    def test_trace_events_from_greenlet_func_sets_profiler(self):
+        tracer = PythonTracer()
+        def run():
+            tracer.__enter__()
+            return tpt_callback()
+
+        self._check_trace_events_from_greenlet_sets_profiler(greenlet.greenlet(run),
+                                                             tracer)
+
+    def test_trace_events_from_greenlet_subclass_sets_profiler(self):
+        tracer = PythonTracer()
+        class X(greenlet.greenlet):
+            def run(self):
+                tracer.__enter__()
+                return tpt_callback()
+
+        self._check_trace_events_from_greenlet_sets_profiler(X(), tracer)
+
+    @unittest.skipIf(*DEBUG_BUILD_PY312)
+    def test_trace_events_multiple_greenlets_switching(self):
+        tracer = PythonTracer()
+
+        g1 = None
+        g2 = None
+
+        def g1_run():
+            tracer.__enter__()
+            tpt_callback()
+            g2.switch()
+            tpt_callback()
+            return 42
+
+        def g2_run():
+            tpt_callback()
+            tracer.__exit__()
+            tpt_callback()
+            g1.switch()
+
+        g1 = greenlet.greenlet(g1_run)
+        g2 = greenlet.greenlet(g2_run)
+
+        x = g1.switch()
+        self.assertEqual(x, 42)
+        tpt_callback() # ensure not in the trace
+        self.assertEqual(tracer.actions, [
+            ('return', '__enter__'),
+            ('call', 'tpt_callback'),
+            ('return', 'tpt_callback'),
+            ('c_call', 'g1_run'),
+            ('call', 'g2_run'),
+            ('call', 'tpt_callback'),
+            ('return', 'tpt_callback'),
+            ('call', '__exit__'),
+            ('c_call', '__exit__'),
+        ])
+
+    @unittest.skipIf(*DEBUG_BUILD_PY312)
+    def test_trace_events_multiple_greenlets_switching_siblings(self):
+        # Like the first version, but get both greenlets running first
+        # as "siblings" and then establish the tracing.
+        tracer = PythonTracer()
+
+        g1 = None
+        g2 = None
+
+        def g1_run():
+            greenlet.getcurrent().parent.switch()
+            tracer.__enter__()
+            tpt_callback()
+            g2.switch()
+            tpt_callback()
+            return 42
+
+        def g2_run():
+            greenlet.getcurrent().parent.switch()
+
+            tpt_callback()
+            tracer.__exit__()
+            tpt_callback()
+            g1.switch()
+
+        g1 = greenlet.greenlet(g1_run)
+        g2 = greenlet.greenlet(g2_run)
+
+        # Start g1
+        g1.switch()
+        # And it immediately returns control to us.
+        # Start g2
+        g2.switch()
+        # Which also returns. Now kick of the real part of the
+        # test.
+        x = g1.switch()
+        self.assertEqual(x, 42)
+
+        tpt_callback() # ensure not in the trace
+        self.assertEqual(tracer.actions, [
+            ('return', '__enter__'),
+            ('call', 'tpt_callback'),
+            ('return', 'tpt_callback'),
+            ('c_call', 'g1_run'),
+            ('call', 'tpt_callback'),
+            ('return', 'tpt_callback'),
+            ('call', '__exit__'),
+            ('c_call', '__exit__'),
+        ])
+
+
+if __name__ == '__main__':
+    unittest.main()