diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py | 316 |
1 files changed, 316 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py new file mode 100644 index 00000000..24f800b4 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py @@ -0,0 +1,316 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +# mypy: disable-error-code="union-attr,arg-type,misc,return-value,assignment,func-returns-value" +# pylint: disable=R,redefined-outer-name,bare-except,unspecified-encoding +import os +import json +import inspect +import traceback +import importlib +import contextlib +from pathlib import Path +from numbers import Number +from datetime import datetime +from functools import wraps, partial +from typing import Any, Callable, Dict, Iterator, List, Union + + +# clean up key value pairs for sensitive values +def sanitize(key: str, value: Any) -> Any: + if isinstance(value, str) and any([s in key.lower() for s in ["key", "token", "secret", "password", "credential"]]): + return len(str(value)) * "*" + + if isinstance(value, dict): + return {k: sanitize(k, v) for k, v in value.items()} + + return value + + +class Tracer: + _tracers: Dict[str, Callable[[str], Iterator[Callable[[str, Any], None]]]] = {} + + @classmethod + def add(cls, name: str, tracer: Callable[[str], Iterator[Callable[[str, Any], None]]]) -> None: + cls._tracers[name] = tracer + + @classmethod + def clear(cls) -> None: + cls._tracers = {} + + @classmethod + @contextlib.contextmanager + def start(cls, name: str) -> Iterator[Callable[[str, Any], None]]: + with contextlib.ExitStack() as stack: + traces: List[Any] = [stack.enter_context(tracer(name)) for tracer in cls._tracers.values()] # type: ignore + yield lambda key, value: [ # type: ignore + # normalize and sanitize any trace values + trace(key, sanitize(key, to_dict(value))) + for trace in traces + ] + + +def to_dict(obj: Any) -> Union[Dict[str, Any], List[Dict[str, Any]], str, Number, bool]: + # simple json types + if isinstance(obj, str) or isinstance(obj, Number) or isinstance(obj, bool): + return obj + + # datetime + if isinstance(obj, datetime): + return obj.isoformat() + + # safe Prompty obj serialization + if type(obj).__name__ == "Prompty": + return obj.to_safe_dict() + + # safe PromptyStream obj serialization + if type(obj).__name__ == "PromptyStream": + return "PromptyStream" + + if type(obj).__name__ == "AsyncPromptyStream": + return "AsyncPromptyStream" + + # recursive list and dict + if isinstance(obj, List): + return [to_dict(item) for item in obj] # type: ignore + + if isinstance(obj, Dict): + return {k: v if isinstance(v, str) else to_dict(v) for k, v in obj.items()} + + if isinstance(obj, Path): + return str(obj) + + # cast to string otherwise... + return str(obj) + + +def _name(func: Callable, args): + if hasattr(func, "__qualname__"): + signature = f"{func.__module__}.{func.__qualname__}" + else: + signature = f"{func.__module__}.{func.__name__}" + + # core invoker gets special treatment prompty.invoker.Invoker + core_invoker = signature.startswith("prompty.invoker.Invoker.run") + if core_invoker: + name = type(args[0]).__name__ + if signature.endswith("async"): + signature = f"{args[0].__module__}.{args[0].__class__.__name__}.invoke_async" + else: + signature = f"{args[0].__module__}.{args[0].__class__.__name__}.invoke" + else: + name = func.__name__ + + return name, signature + + +def _inputs(func: Callable, args, kwargs) -> dict: + ba = inspect.signature(func).bind(*args, **kwargs) + ba.apply_defaults() + + inputs = {k: to_dict(v) for k, v in ba.arguments.items() if k != "self"} + + return inputs + + +def _results(result: Any) -> Union[Dict, List[Dict], str, Number, bool]: + return to_dict(result) if result is not None else "None" + + +def _trace_sync(func: Union[Callable, None] = None, **okwargs: Any) -> Callable: + + @wraps(func) # type: ignore + def wrapper(*args, **kwargs): + name, signature = _name(func, args) # type: ignore + with Tracer.start(name) as trace: + trace("signature", signature) + + # support arbitrary keyword + # arguments for trace decorator + for k, v in okwargs.items(): + trace(k, to_dict(v)) + + inputs = _inputs(func, args, kwargs) # type: ignore + trace("inputs", inputs) + + try: + result = func(*args, **kwargs) # type: ignore + trace("result", _results(result)) + except Exception as e: + trace( + "result", + { + "exception": { + "type": type(e), + "traceback": (traceback.format_tb(tb=e.__traceback__) if e.__traceback__ else None), + "message": str(e), + "args": to_dict(e.args), + } + }, + ) + raise e + + return result + + return wrapper + + +def _trace_async(func: Union[Callable, None] = None, **okwargs: Any) -> Callable: + + @wraps(func) # type: ignore + async def wrapper(*args, **kwargs): + name, signature = _name(func, args) # type: ignore + with Tracer.start(name) as trace: + trace("signature", signature) + + # support arbitrary keyword + # arguments for trace decorator + for k, v in okwargs.items(): + trace(k, to_dict(v)) + + inputs = _inputs(func, args, kwargs) # type: ignore + trace("inputs", inputs) + try: + result = await func(*args, **kwargs) # type: ignore + trace("result", _results(result)) + except Exception as e: + trace( + "result", + { + "exception": { + "type": type(e), + "traceback": (traceback.format_tb(tb=e.__traceback__) if e.__traceback__ else None), + "message": str(e), + "args": to_dict(e.args), + } + }, + ) + raise e + + return result + + return wrapper + + +def trace(func: Union[Callable, None] = None, **kwargs: Any) -> Callable: + if func is None: + return partial(trace, **kwargs) + wrapped_method = _trace_async if inspect.iscoroutinefunction(func) else _trace_sync + return wrapped_method(func, **kwargs) + + +class PromptyTracer: + def __init__(self, output_dir: Union[str, None] = None) -> None: + if output_dir: + self.output = Path(output_dir).resolve().absolute() + else: + self.output = Path(Path(os.getcwd()) / ".runs").resolve().absolute() + + if not self.output.exists(): + self.output.mkdir(parents=True, exist_ok=True) + + self.stack: List[Dict[str, Any]] = [] + + @contextlib.contextmanager + def tracer(self, name: str) -> Iterator[Callable[[str, Any], None]]: + try: + self.stack.append({"name": name}) + frame = self.stack[-1] + frame["__time"] = { + "start": datetime.now(), + } + + def add(key: str, value: Any) -> None: + if key not in frame: + frame[key] = value + # multiple values creates list + else: + if isinstance(frame[key], list): + frame[key].append(value) + else: + frame[key] = [frame[key], value] + + yield add + finally: + frame = self.stack.pop() + start: datetime = frame["__time"]["start"] + end: datetime = datetime.now() + + # add duration to frame + frame["__time"] = { + "start": start.strftime("%Y-%m-%dT%H:%M:%S.%f"), + "end": end.strftime("%Y-%m-%dT%H:%M:%S.%f"), + "duration": int((end - start).total_seconds() * 1000), + } + + # hoist usage to parent frame + if "result" in frame and isinstance(frame["result"], dict): + if "usage" in frame["result"]: + frame["__usage"] = self.hoist_item( + frame["result"]["usage"], + frame["__usage"] if "__usage" in frame else {}, + ) + + # streamed results may have usage as well + if "result" in frame and isinstance(frame["result"], list): + for result in frame["result"]: + if isinstance(result, dict) and "usage" in result and isinstance(result["usage"], dict): + frame["__usage"] = self.hoist_item( + result["usage"], + frame["__usage"] if "__usage" in frame else {}, + ) + + # add any usage frames from below + if "__frames" in frame: + for child in frame["__frames"]: + if "__usage" in child: + frame["__usage"] = self.hoist_item( + child["__usage"], + frame["__usage"] if "__usage" in frame else {}, + ) + + # if stack is empty, dump the frame + if len(self.stack) == 0: + self.write_trace(frame) + # otherwise, append the frame to the parent + else: + if "__frames" not in self.stack[-1]: + self.stack[-1]["__frames"] = [] + self.stack[-1]["__frames"].append(frame) + + def hoist_item(self, src: Dict[str, Any], cur: Dict[str, Any]) -> Dict[str, Any]: + for key, value in src.items(): + if value is None or isinstance(value, list) or isinstance(value, dict): + continue + try: + if key not in cur: + cur[key] = value + else: + cur[key] += value + except: + continue + + return cur + + def write_trace(self, frame: Dict[str, Any]) -> None: + trace_file = self.output / f"{frame['name']}.{datetime.now().strftime('%Y%m%d.%H%M%S')}.tracy" + + v = importlib.metadata.version("prompty") # type: ignore + enriched_frame = { + "runtime": "python", + "version": v, + "trace": frame, + } + + with open(trace_file, "w") as f: + json.dump(enriched_frame, f, indent=4) + + +@contextlib.contextmanager +def console_tracer(name: str) -> Iterator[Callable[[str, Any], None]]: + try: + print(f"Starting {name}") + yield lambda key, value: print(f"{key}:\n{json.dumps(to_dict(value), indent=4)}") + finally: + print(f"Ending {name}") |