about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py316
1 files changed, 316 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py
new file mode 100644
index 00000000..24f800b4
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py
@@ -0,0 +1,316 @@
+# ------------------------------------
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+# ------------------------------------
+# mypy: disable-error-code="union-attr,arg-type,misc,return-value,assignment,func-returns-value"
+# pylint: disable=R,redefined-outer-name,bare-except,unspecified-encoding
+import os
+import json
+import inspect
+import traceback
+import importlib
+import contextlib
+from pathlib import Path
+from numbers import Number
+from datetime import datetime
+from functools import wraps, partial
+from typing import Any, Callable, Dict, Iterator, List, Union
+
+
+# clean up key value pairs for sensitive values
+def sanitize(key: str, value: Any) -> Any:
+    if isinstance(value, str) and any([s in key.lower() for s in ["key", "token", "secret", "password", "credential"]]):
+        return len(str(value)) * "*"
+
+    if isinstance(value, dict):
+        return {k: sanitize(k, v) for k, v in value.items()}
+
+    return value
+
+
+class Tracer:
+    _tracers: Dict[str, Callable[[str], Iterator[Callable[[str, Any], None]]]] = {}
+
+    @classmethod
+    def add(cls, name: str, tracer: Callable[[str], Iterator[Callable[[str, Any], None]]]) -> None:
+        cls._tracers[name] = tracer
+
+    @classmethod
+    def clear(cls) -> None:
+        cls._tracers = {}
+
+    @classmethod
+    @contextlib.contextmanager
+    def start(cls, name: str) -> Iterator[Callable[[str, Any], None]]:
+        with contextlib.ExitStack() as stack:
+            traces: List[Any] = [stack.enter_context(tracer(name)) for tracer in cls._tracers.values()]  # type: ignore
+            yield lambda key, value: [  # type: ignore
+                # normalize and sanitize any trace values
+                trace(key, sanitize(key, to_dict(value)))
+                for trace in traces
+            ]
+
+
+def to_dict(obj: Any) -> Union[Dict[str, Any], List[Dict[str, Any]], str, Number, bool]:
+    # simple json types
+    if isinstance(obj, str) or isinstance(obj, Number) or isinstance(obj, bool):
+        return obj
+
+    # datetime
+    if isinstance(obj, datetime):
+        return obj.isoformat()
+
+    # safe Prompty obj serialization
+    if type(obj).__name__ == "Prompty":
+        return obj.to_safe_dict()
+
+    # safe PromptyStream obj serialization
+    if type(obj).__name__ == "PromptyStream":
+        return "PromptyStream"
+
+    if type(obj).__name__ == "AsyncPromptyStream":
+        return "AsyncPromptyStream"
+
+    # recursive list and dict
+    if isinstance(obj, List):
+        return [to_dict(item) for item in obj]  # type: ignore
+
+    if isinstance(obj, Dict):
+        return {k: v if isinstance(v, str) else to_dict(v) for k, v in obj.items()}
+
+    if isinstance(obj, Path):
+        return str(obj)
+
+    # cast to string otherwise...
+    return str(obj)
+
+
+def _name(func: Callable, args):
+    if hasattr(func, "__qualname__"):
+        signature = f"{func.__module__}.{func.__qualname__}"
+    else:
+        signature = f"{func.__module__}.{func.__name__}"
+
+    # core invoker gets special treatment prompty.invoker.Invoker
+    core_invoker = signature.startswith("prompty.invoker.Invoker.run")
+    if core_invoker:
+        name = type(args[0]).__name__
+        if signature.endswith("async"):
+            signature = f"{args[0].__module__}.{args[0].__class__.__name__}.invoke_async"
+        else:
+            signature = f"{args[0].__module__}.{args[0].__class__.__name__}.invoke"
+    else:
+        name = func.__name__
+
+    return name, signature
+
+
+def _inputs(func: Callable, args, kwargs) -> dict:
+    ba = inspect.signature(func).bind(*args, **kwargs)
+    ba.apply_defaults()
+
+    inputs = {k: to_dict(v) for k, v in ba.arguments.items() if k != "self"}
+
+    return inputs
+
+
+def _results(result: Any) -> Union[Dict, List[Dict], str, Number, bool]:
+    return to_dict(result) if result is not None else "None"
+
+
+def _trace_sync(func: Union[Callable, None] = None, **okwargs: Any) -> Callable:
+
+    @wraps(func)  # type: ignore
+    def wrapper(*args, **kwargs):
+        name, signature = _name(func, args)  # type: ignore
+        with Tracer.start(name) as trace:
+            trace("signature", signature)
+
+            # support arbitrary keyword
+            # arguments for trace decorator
+            for k, v in okwargs.items():
+                trace(k, to_dict(v))
+
+            inputs = _inputs(func, args, kwargs)  # type: ignore
+            trace("inputs", inputs)
+
+            try:
+                result = func(*args, **kwargs)  # type: ignore
+                trace("result", _results(result))
+            except Exception as e:
+                trace(
+                    "result",
+                    {
+                        "exception": {
+                            "type": type(e),
+                            "traceback": (traceback.format_tb(tb=e.__traceback__) if e.__traceback__ else None),
+                            "message": str(e),
+                            "args": to_dict(e.args),
+                        }
+                    },
+                )
+                raise e
+
+            return result
+
+    return wrapper
+
+
+def _trace_async(func: Union[Callable, None] = None, **okwargs: Any) -> Callable:
+
+    @wraps(func)  # type: ignore
+    async def wrapper(*args, **kwargs):
+        name, signature = _name(func, args)  # type: ignore
+        with Tracer.start(name) as trace:
+            trace("signature", signature)
+
+            # support arbitrary keyword
+            # arguments for trace decorator
+            for k, v in okwargs.items():
+                trace(k, to_dict(v))
+
+            inputs = _inputs(func, args, kwargs)  # type: ignore
+            trace("inputs", inputs)
+            try:
+                result = await func(*args, **kwargs)  # type: ignore
+                trace("result", _results(result))
+            except Exception as e:
+                trace(
+                    "result",
+                    {
+                        "exception": {
+                            "type": type(e),
+                            "traceback": (traceback.format_tb(tb=e.__traceback__) if e.__traceback__ else None),
+                            "message": str(e),
+                            "args": to_dict(e.args),
+                        }
+                    },
+                )
+                raise e
+
+            return result
+
+    return wrapper
+
+
+def trace(func: Union[Callable, None] = None, **kwargs: Any) -> Callable:
+    if func is None:
+        return partial(trace, **kwargs)
+    wrapped_method = _trace_async if inspect.iscoroutinefunction(func) else _trace_sync
+    return wrapped_method(func, **kwargs)
+
+
+class PromptyTracer:
+    def __init__(self, output_dir: Union[str, None] = None) -> None:
+        if output_dir:
+            self.output = Path(output_dir).resolve().absolute()
+        else:
+            self.output = Path(Path(os.getcwd()) / ".runs").resolve().absolute()
+
+        if not self.output.exists():
+            self.output.mkdir(parents=True, exist_ok=True)
+
+        self.stack: List[Dict[str, Any]] = []
+
+    @contextlib.contextmanager
+    def tracer(self, name: str) -> Iterator[Callable[[str, Any], None]]:
+        try:
+            self.stack.append({"name": name})
+            frame = self.stack[-1]
+            frame["__time"] = {
+                "start": datetime.now(),
+            }
+
+            def add(key: str, value: Any) -> None:
+                if key not in frame:
+                    frame[key] = value
+                # multiple values creates list
+                else:
+                    if isinstance(frame[key], list):
+                        frame[key].append(value)
+                    else:
+                        frame[key] = [frame[key], value]
+
+            yield add
+        finally:
+            frame = self.stack.pop()
+            start: datetime = frame["__time"]["start"]
+            end: datetime = datetime.now()
+
+            # add duration to frame
+            frame["__time"] = {
+                "start": start.strftime("%Y-%m-%dT%H:%M:%S.%f"),
+                "end": end.strftime("%Y-%m-%dT%H:%M:%S.%f"),
+                "duration": int((end - start).total_seconds() * 1000),
+            }
+
+            # hoist usage to parent frame
+            if "result" in frame and isinstance(frame["result"], dict):
+                if "usage" in frame["result"]:
+                    frame["__usage"] = self.hoist_item(
+                        frame["result"]["usage"],
+                        frame["__usage"] if "__usage" in frame else {},
+                    )
+
+            # streamed results may have usage as well
+            if "result" in frame and isinstance(frame["result"], list):
+                for result in frame["result"]:
+                    if isinstance(result, dict) and "usage" in result and isinstance(result["usage"], dict):
+                        frame["__usage"] = self.hoist_item(
+                            result["usage"],
+                            frame["__usage"] if "__usage" in frame else {},
+                        )
+
+            # add any usage frames from below
+            if "__frames" in frame:
+                for child in frame["__frames"]:
+                    if "__usage" in child:
+                        frame["__usage"] = self.hoist_item(
+                            child["__usage"],
+                            frame["__usage"] if "__usage" in frame else {},
+                        )
+
+            # if stack is empty, dump the frame
+            if len(self.stack) == 0:
+                self.write_trace(frame)
+            # otherwise, append the frame to the parent
+            else:
+                if "__frames" not in self.stack[-1]:
+                    self.stack[-1]["__frames"] = []
+                self.stack[-1]["__frames"].append(frame)
+
+    def hoist_item(self, src: Dict[str, Any], cur: Dict[str, Any]) -> Dict[str, Any]:
+        for key, value in src.items():
+            if value is None or isinstance(value, list) or isinstance(value, dict):
+                continue
+            try:
+                if key not in cur:
+                    cur[key] = value
+                else:
+                    cur[key] += value
+            except:
+                continue
+
+        return cur
+
+    def write_trace(self, frame: Dict[str, Any]) -> None:
+        trace_file = self.output / f"{frame['name']}.{datetime.now().strftime('%Y%m%d.%H%M%S')}.tracy"
+
+        v = importlib.metadata.version("prompty")  # type: ignore
+        enriched_frame = {
+            "runtime": "python",
+            "version": v,
+            "trace": frame,
+        }
+
+        with open(trace_file, "w") as f:
+            json.dump(enriched_frame, f, indent=4)
+
+
+@contextlib.contextmanager
+def console_tracer(name: str) -> Iterator[Callable[[str, Any], None]]:
+    try:
+        print(f"Starting {name}")
+        yield lambda key, value: print(f"{key}:\n{json.dumps(to_dict(value), indent=4)}")
+    finally:
+        print(f"Ending {name}")