aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/ai/inference/prompts')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/__init__.py8
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_core.py312
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_invoker.py295
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_mustache.py671
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_parsers.py156
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_patch.py124
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_prompty_utils.py415
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_renderers.py30
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py316
-rw-r--r--.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_utils.py100
10 files changed, 2427 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/__init__.py b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/__init__.py
new file mode 100644
index 00000000..2e11b31c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/__init__.py
@@ -0,0 +1,8 @@
+# ------------------------------------
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+# ------------------------------------
+# pylint: disable=unused-import
+from ._patch import patch_sdk as _patch_sdk, PromptTemplate
+
+_patch_sdk()
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_core.py b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_core.py
new file mode 100644
index 00000000..ec670299
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_core.py
@@ -0,0 +1,312 @@
+# ------------------------------------
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+# ------------------------------------
+# mypy: disable-error-code="assignment,attr-defined,index,arg-type"
+# pylint: disable=line-too-long,R,consider-iterating-dictionary,raise-missing-from,dangerous-default-value
+from __future__ import annotations
+import os
+from dataclasses import dataclass, field, asdict
+from pathlib import Path
+from typing import Any, AsyncIterator, Dict, Iterator, List, Literal, Union
+from ._tracer import Tracer, to_dict
+from ._utils import load_json
+
+
+@dataclass
+class ToolCall:
+ id: str
+ name: str
+ arguments: str
+
+
+@dataclass
+class PropertySettings:
+ """PropertySettings class to define the properties of the model
+
+ Attributes
+ ----------
+ type : str
+ The type of the property
+ default : Any
+ The default value of the property
+ description : str
+ The description of the property
+ """
+
+ type: Literal["string", "number", "array", "object", "boolean"]
+ default: Union[str, int, float, List, Dict, bool, None] = field(default=None)
+ description: str = field(default="")
+
+
+@dataclass
+class ModelSettings:
+ """ModelSettings class to define the model of the prompty
+
+ Attributes
+ ----------
+ api : str
+ The api of the model
+ configuration : Dict
+ The configuration of the model
+ parameters : Dict
+ The parameters of the model
+ response : Dict
+ The response of the model
+ """
+
+ api: str = field(default="")
+ configuration: Dict = field(default_factory=dict)
+ parameters: Dict = field(default_factory=dict)
+ response: Dict = field(default_factory=dict)
+
+
+@dataclass
+class TemplateSettings:
+ """TemplateSettings class to define the template of the prompty
+
+ Attributes
+ ----------
+ type : str
+ The type of the template
+ parser : str
+ The parser of the template
+ """
+
+ type: str = field(default="mustache")
+ parser: str = field(default="")
+
+
+@dataclass
+class Prompty:
+ """Prompty class to define the prompty
+
+ Attributes
+ ----------
+ name : str
+ The name of the prompty
+ description : str
+ The description of the prompty
+ authors : List[str]
+ The authors of the prompty
+ tags : List[str]
+ The tags of the prompty
+ version : str
+ The version of the prompty
+ base : str
+ The base of the prompty
+ basePrompty : Prompty
+ The base prompty
+ model : ModelSettings
+ The model of the prompty
+ sample : Dict
+ The sample of the prompty
+ inputs : Dict[str, PropertySettings]
+ The inputs of the prompty
+ outputs : Dict[str, PropertySettings]
+ The outputs of the prompty
+ template : TemplateSettings
+ The template of the prompty
+ file : FilePath
+ The file of the prompty
+ content : Union[str, List[str], Dict]
+ The content of the prompty
+ """
+
+ # metadata
+ name: str = field(default="")
+ description: str = field(default="")
+ authors: List[str] = field(default_factory=list)
+ tags: List[str] = field(default_factory=list)
+ version: str = field(default="")
+ base: str = field(default="")
+ basePrompty: Union[Prompty, None] = field(default=None)
+ # model
+ model: ModelSettings = field(default_factory=ModelSettings)
+
+ # sample
+ sample: Dict = field(default_factory=dict)
+
+ # input / output
+ inputs: Dict[str, PropertySettings] = field(default_factory=dict)
+ outputs: Dict[str, PropertySettings] = field(default_factory=dict)
+
+ # template
+ template: TemplateSettings = field(default_factory=TemplateSettings)
+
+ file: Union[Path, str] = field(default="")
+ content: Union[str, List[str], Dict] = field(default="")
+
+ def to_safe_dict(self) -> Dict[str, Any]:
+ d = {}
+ if self.model:
+ d["model"] = asdict(self.model)
+ _mask_secrets(d, ["model", "configuration"])
+ if self.template:
+ d["template"] = asdict(self.template)
+ if self.inputs:
+ d["inputs"] = {k: asdict(v) for k, v in self.inputs.items()}
+ if self.outputs:
+ d["outputs"] = {k: asdict(v) for k, v in self.outputs.items()}
+ if self.file:
+ d["file"] = str(self.file.as_posix()) if isinstance(self.file, Path) else self.file
+ return d
+
+ @staticmethod
+ def hoist_base_prompty(top: Prompty, base: Prompty) -> Prompty:
+ top.name = base.name if top.name == "" else top.name
+ top.description = base.description if top.description == "" else top.description
+ top.authors = list(set(base.authors + top.authors))
+ top.tags = list(set(base.tags + top.tags))
+ top.version = base.version if top.version == "" else top.version
+
+ top.model.api = base.model.api if top.model.api == "" else top.model.api
+ top.model.configuration = param_hoisting(top.model.configuration, base.model.configuration)
+ top.model.parameters = param_hoisting(top.model.parameters, base.model.parameters)
+ top.model.response = param_hoisting(top.model.response, base.model.response)
+
+ top.sample = param_hoisting(top.sample, base.sample)
+
+ top.basePrompty = base
+
+ return top
+
+ @staticmethod
+ def _process_file(file: str, parent: Path) -> Any:
+ file_path = Path(parent / Path(file)).resolve().absolute()
+ if file_path.exists():
+ items = load_json(file_path)
+ if isinstance(items, list):
+ return [Prompty.normalize(value, parent) for value in items]
+ elif isinstance(items, Dict):
+ return {key: Prompty.normalize(value, parent) for key, value in items.items()}
+ else:
+ return items
+ else:
+ raise FileNotFoundError(f"File {file} not found")
+
+ @staticmethod
+ def _process_env(variable: str, env_error=True, default: Union[str, None] = None) -> Any:
+ if variable in os.environ.keys():
+ return os.environ[variable]
+ else:
+ if default:
+ return default
+ if env_error:
+ raise ValueError(f"Variable {variable} not found in environment")
+
+ return ""
+
+ @staticmethod
+ def normalize(attribute: Any, parent: Path, env_error=True) -> Any:
+ if isinstance(attribute, str):
+ attribute = attribute.strip()
+ if attribute.startswith("${") and attribute.endswith("}"):
+ # check if env or file
+ variable = attribute[2:-1].split(":")
+ if variable[0] == "env" and len(variable) > 1:
+ return Prompty._process_env(
+ variable[1],
+ env_error,
+ variable[2] if len(variable) > 2 else None,
+ )
+ elif variable[0] == "file" and len(variable) > 1:
+ return Prompty._process_file(variable[1], parent)
+ else:
+ raise ValueError(f"Invalid attribute format ({attribute})")
+ else:
+ return attribute
+ elif isinstance(attribute, list):
+ return [Prompty.normalize(value, parent) for value in attribute]
+ elif isinstance(attribute, Dict):
+ return {key: Prompty.normalize(value, parent) for key, value in attribute.items()}
+ else:
+ return attribute
+
+
+def param_hoisting(top: Dict[str, Any], bottom: Dict[str, Any], top_key: Union[str, None] = None) -> Dict[str, Any]:
+ if top_key:
+ new_dict = {**top[top_key]} if top_key in top else {}
+ else:
+ new_dict = {**top}
+ for key, value in bottom.items():
+ if not key in new_dict:
+ new_dict[key] = value
+ return new_dict
+
+
+class PromptyStream(Iterator):
+ """PromptyStream class to iterate over LLM stream.
+ Necessary for Prompty to handle streaming data when tracing."""
+
+ def __init__(self, name: str, iterator: Iterator):
+ self.name = name
+ self.iterator = iterator
+ self.items: List[Any] = []
+ self.__name__ = "PromptyStream"
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ try:
+ # enumerate but add to list
+ o = self.iterator.__next__()
+ self.items.append(o)
+ return o
+
+ except StopIteration:
+ # StopIteration is raised
+ # contents are exhausted
+ if len(self.items) > 0:
+ with Tracer.start("PromptyStream") as trace:
+ trace("signature", f"{self.name}.PromptyStream")
+ trace("inputs", "None")
+ trace("result", [to_dict(s) for s in self.items])
+
+ raise StopIteration
+
+
+class AsyncPromptyStream(AsyncIterator):
+ """AsyncPromptyStream class to iterate over LLM stream.
+ Necessary for Prompty to handle streaming data when tracing."""
+
+ def __init__(self, name: str, iterator: AsyncIterator):
+ self.name = name
+ self.iterator = iterator
+ self.items: List[Any] = []
+ self.__name__ = "AsyncPromptyStream"
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ try:
+ # enumerate but add to list
+ o = await self.iterator.__anext__()
+ self.items.append(o)
+ return o
+
+ except StopAsyncIteration:
+ # StopIteration is raised
+ # contents are exhausted
+ if len(self.items) > 0:
+ with Tracer.start("AsyncPromptyStream") as trace:
+ trace("signature", f"{self.name}.AsyncPromptyStream")
+ trace("inputs", "None")
+ trace("result", [to_dict(s) for s in self.items])
+
+ raise StopAsyncIteration
+
+
+def _mask_secrets(d: Dict[str, Any], path: list[str], patterns: list[str] = ["key", "secret"]) -> bool:
+ sub_d = d
+ for key in path:
+ if key not in sub_d:
+ return False
+ sub_d = sub_d[key]
+
+ for k, v in sub_d.items():
+ if any([pattern in k.lower() for pattern in patterns]):
+ sub_d[k] = "*" * len(v)
+ return True
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_invoker.py b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_invoker.py
new file mode 100644
index 00000000..d682662e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_invoker.py
@@ -0,0 +1,295 @@
+# ------------------------------------
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+# ------------------------------------
+# mypy: disable-error-code="return-value,operator"
+# pylint: disable=line-too-long,R,docstring-missing-param,docstring-missing-return,docstring-missing-rtype,unnecessary-pass
+import abc
+from typing import Any, Callable, Dict, Literal
+from ._tracer import trace
+from ._core import Prompty
+
+
+class Invoker(abc.ABC):
+ """Abstract class for Invoker
+
+ Attributes
+ ----------
+ prompty : Prompty
+ The prompty object
+ name : str
+ The name of the invoker
+
+ """
+
+ def __init__(self, prompty: Prompty) -> None:
+ self.prompty = prompty
+ self.name = self.__class__.__name__
+
+ @abc.abstractmethod
+ def invoke(self, data: Any) -> Any:
+ """Abstract method to invoke the invoker
+
+ Parameters
+ ----------
+ data : Any
+ The data to be invoked
+
+ Returns
+ -------
+ Any
+ The invoked
+ """
+ pass
+
+ @abc.abstractmethod
+ async def invoke_async(self, data: Any) -> Any:
+ """Abstract method to invoke the invoker asynchronously
+
+ Parameters
+ ----------
+ data : Any
+ The data to be invoked
+
+ Returns
+ -------
+ Any
+ The invoked
+ """
+ pass
+
+ @trace
+ def run(self, data: Any) -> Any:
+ """Method to run the invoker
+
+ Parameters
+ ----------
+ data : Any
+ The data to be invoked
+
+ Returns
+ -------
+ Any
+ The invoked
+ """
+ return self.invoke(data)
+
+ @trace
+ async def run_async(self, data: Any) -> Any:
+ """Method to run the invoker asynchronously
+
+ Parameters
+ ----------
+ data : Any
+ The data to be invoked
+
+ Returns
+ -------
+ Any
+ The invoked
+ """
+ return await self.invoke_async(data)
+
+
+class InvokerFactory:
+ """Factory class for Invoker"""
+
+ _renderers: Dict[str, Invoker] = {}
+ _parsers: Dict[str, Invoker] = {}
+ _executors: Dict[str, Invoker] = {}
+ _processors: Dict[str, Invoker] = {}
+
+ @classmethod
+ def add_renderer(cls, name: str, invoker: Invoker) -> None:
+ cls._renderers[name] = invoker
+
+ @classmethod
+ def add_parser(cls, name: str, invoker: Invoker) -> None:
+ cls._parsers[name] = invoker
+
+ @classmethod
+ def add_executor(cls, name: str, invoker: Invoker) -> None:
+ cls._executors[name] = invoker
+
+ @classmethod
+ def add_processor(cls, name: str, invoker: Invoker) -> None:
+ cls._processors[name] = invoker
+
+ @classmethod
+ def register_renderer(cls, name: str) -> Callable:
+ def inner_wrapper(wrapped_class: Invoker) -> Callable:
+ cls._renderers[name] = wrapped_class
+ return wrapped_class # type: ignore
+
+ return inner_wrapper
+
+ @classmethod
+ def register_parser(cls, name: str) -> Callable:
+ def inner_wrapper(wrapped_class: Invoker) -> Callable:
+ cls._parsers[name] = wrapped_class
+ return wrapped_class # type: ignore
+
+ return inner_wrapper
+
+ @classmethod
+ def register_executor(cls, name: str) -> Callable:
+ def inner_wrapper(wrapped_class: Invoker) -> Callable:
+ cls._executors[name] = wrapped_class
+ return wrapped_class # type: ignore
+
+ return inner_wrapper
+
+ @classmethod
+ def register_processor(cls, name: str) -> Callable:
+ def inner_wrapper(wrapped_class: Invoker) -> Callable:
+ cls._processors[name] = wrapped_class
+ return wrapped_class # type: ignore
+
+ return inner_wrapper
+
+ @classmethod
+ def _get_name(
+ cls,
+ type: Literal["renderer", "parser", "executor", "processor"],
+ prompty: Prompty,
+ ) -> str:
+ if type == "renderer":
+ return prompty.template.type
+ elif type == "parser":
+ return f"{prompty.template.parser}.{prompty.model.api}"
+ elif type == "executor":
+ return prompty.model.configuration["type"]
+ elif type == "processor":
+ return prompty.model.configuration["type"]
+ else:
+ raise ValueError(f"Type {type} not found")
+
+ @classmethod
+ def _get_invoker(
+ cls,
+ type: Literal["renderer", "parser", "executor", "processor"],
+ prompty: Prompty,
+ ) -> Invoker:
+ if type == "renderer":
+ name = prompty.template.type
+ if name not in cls._renderers:
+ raise ValueError(f"Renderer {name} not found")
+
+ return cls._renderers[name](prompty) # type: ignore
+
+ elif type == "parser":
+ name = f"{prompty.template.parser}.{prompty.model.api}"
+ if name not in cls._parsers:
+ raise ValueError(f"Parser {name} not found")
+
+ return cls._parsers[name](prompty) # type: ignore
+
+ elif type == "executor":
+ name = prompty.model.configuration["type"]
+ if name not in cls._executors:
+ raise ValueError(f"Executor {name} not found")
+
+ return cls._executors[name](prompty) # type: ignore
+
+ elif type == "processor":
+ name = prompty.model.configuration["type"]
+ if name not in cls._processors:
+ raise ValueError(f"Processor {name} not found")
+
+ return cls._processors[name](prompty) # type: ignore
+
+ else:
+ raise ValueError(f"Type {type} not found")
+
+ @classmethod
+ def run(
+ cls,
+ type: Literal["renderer", "parser", "executor", "processor"],
+ prompty: Prompty,
+ data: Any,
+ default: Any = None,
+ ):
+ name = cls._get_name(type, prompty)
+ if name.startswith("NOOP") and default is not None:
+ return default
+ elif name.startswith("NOOP"):
+ return data
+
+ invoker = cls._get_invoker(type, prompty)
+ value = invoker.run(data)
+ return value
+
+ @classmethod
+ async def run_async(
+ cls,
+ type: Literal["renderer", "parser", "executor", "processor"],
+ prompty: Prompty,
+ data: Any,
+ default: Any = None,
+ ):
+ name = cls._get_name(type, prompty)
+ if name.startswith("NOOP") and default is not None:
+ return default
+ elif name.startswith("NOOP"):
+ return data
+ invoker = cls._get_invoker(type, prompty)
+ value = await invoker.run_async(data)
+ return value
+
+ @classmethod
+ def run_renderer(cls, prompty: Prompty, data: Any, default: Any = None) -> Any:
+ return cls.run("renderer", prompty, data, default)
+
+ @classmethod
+ async def run_renderer_async(cls, prompty: Prompty, data: Any, default: Any = None) -> Any:
+ return await cls.run_async("renderer", prompty, data, default)
+
+ @classmethod
+ def run_parser(cls, prompty: Prompty, data: Any, default: Any = None) -> Any:
+ return cls.run("parser", prompty, data, default)
+
+ @classmethod
+ async def run_parser_async(cls, prompty: Prompty, data: Any, default: Any = None) -> Any:
+ return await cls.run_async("parser", prompty, data, default)
+
+ @classmethod
+ def run_executor(cls, prompty: Prompty, data: Any, default: Any = None) -> Any:
+ return cls.run("executor", prompty, data, default)
+
+ @classmethod
+ async def run_executor_async(cls, prompty: Prompty, data: Any, default: Any = None) -> Any:
+ return await cls.run_async("executor", prompty, data, default)
+
+ @classmethod
+ def run_processor(cls, prompty: Prompty, data: Any, default: Any = None) -> Any:
+ return cls.run("processor", prompty, data, default)
+
+ @classmethod
+ async def run_processor_async(cls, prompty: Prompty, data: Any, default: Any = None) -> Any:
+ return await cls.run_async("processor", prompty, data, default)
+
+
+class InvokerException(Exception):
+ """Exception class for Invoker"""
+
+ def __init__(self, message: str, type: str) -> None:
+ super().__init__(message)
+ self.type = type
+
+ def __str__(self) -> str:
+ return f"{super().__str__()}. Make sure to pip install any necessary package extras (i.e. could be something like `pip install prompty[{self.type}]`) for {self.type} as well as import the appropriate invokers (i.e. could be something like `import prompty.{self.type}`)."
+
+
+@InvokerFactory.register_renderer("NOOP")
+@InvokerFactory.register_parser("NOOP")
+@InvokerFactory.register_executor("NOOP")
+@InvokerFactory.register_processor("NOOP")
+@InvokerFactory.register_parser("prompty.embedding")
+@InvokerFactory.register_parser("prompty.image")
+@InvokerFactory.register_parser("prompty.completion")
+class NoOp(Invoker):
+ def invoke(self, data: Any) -> Any:
+ return data
+
+ async def invoke_async(self, data: str) -> Any:
+ return self.invoke(data)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_mustache.py b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_mustache.py
new file mode 100644
index 00000000..f7a0c21d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_mustache.py
@@ -0,0 +1,671 @@
+# ------------------------------------
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+# ------------------------------------
+# pylint: disable=line-too-long,R,consider-using-dict-items,docstring-missing-return,docstring-missing-rtype,docstring-missing-param,global-statement,unused-argument,global-variable-not-assigned,protected-access,logging-fstring-interpolation,deprecated-method
+from __future__ import annotations
+import logging
+from collections.abc import Iterator, Sequence
+from types import MappingProxyType
+from typing import (
+ Any,
+ Dict,
+ List,
+ Literal,
+ Mapping,
+ Optional,
+ Union,
+ cast,
+)
+from typing_extensions import TypeAlias
+
+logger = logging.getLogger(__name__)
+
+
+Scopes: TypeAlias = List[Union[Literal[False, 0], Mapping[str, Any]]]
+
+
+# Globals
+_CURRENT_LINE = 1
+_LAST_TAG_LINE = None
+
+
+class ChevronError(SyntaxError):
+ """Custom exception for Chevron errors."""
+
+
+#
+# Helper functions
+#
+
+
+def grab_literal(template: str, l_del: str) -> tuple[str, str]:
+ """Parse a literal from the template.
+
+ Args:
+ template: The template to parse.
+ l_del: The left delimiter.
+
+ Returns:
+ Tuple[str, str]: The literal and the template.
+ """
+
+ global _CURRENT_LINE
+
+ try:
+ # Look for the next tag and move the template to it
+ literal, template = template.split(l_del, 1)
+ _CURRENT_LINE += literal.count("\n")
+ return (literal, template)
+
+ # There are no more tags in the template?
+ except ValueError:
+ # Then the rest of the template is a literal
+ return (template, "")
+
+
+def l_sa_check(template: str, literal: str, is_standalone: bool) -> bool:
+ """Do a preliminary check to see if a tag could be a standalone.
+
+ Args:
+ template: The template. (Not used.)
+ literal: The literal.
+ is_standalone: Whether the tag is standalone.
+
+ Returns:
+ bool: Whether the tag could be a standalone.
+ """
+
+ # If there is a newline, or the previous tag was a standalone
+ if literal.find("\n") != -1 or is_standalone:
+ padding = literal.split("\n")[-1]
+
+ # If all the characters since the last newline are spaces
+ # Then the next tag could be a standalone
+ # Otherwise it can't be
+ return padding.isspace() or padding == ""
+ else:
+ return False
+
+
+def r_sa_check(template: str, tag_type: str, is_standalone: bool) -> bool:
+ """Do a final check to see if a tag could be a standalone.
+
+ Args:
+ template: The template.
+ tag_type: The type of the tag.
+ is_standalone: Whether the tag is standalone.
+
+ Returns:
+ bool: Whether the tag could be a standalone.
+ """
+
+ # Check right side if we might be a standalone
+ if is_standalone and tag_type not in ["variable", "no escape"]:
+ on_newline = template.split("\n", 1)
+
+ # If the stuff to the right of us are spaces we're a standalone
+ return on_newline[0].isspace() or not on_newline[0]
+
+ # If we're a tag can't be a standalone
+ else:
+ return False
+
+
+def parse_tag(template: str, l_del: str, r_del: str) -> tuple[tuple[str, str], str]:
+ """Parse a tag from a template.
+
+ Args:
+ template: The template.
+ l_del: The left delimiter.
+ r_del: The right delimiter.
+
+ Returns:
+ Tuple[Tuple[str, str], str]: The tag and the template.
+
+ Raises:
+ ChevronError: If the tag is unclosed.
+ ChevronError: If the set delimiter tag is unclosed.
+ """
+ global _CURRENT_LINE
+ global _LAST_TAG_LINE
+
+ tag_types = {
+ "!": "comment",
+ "#": "section",
+ "^": "inverted section",
+ "/": "end",
+ ">": "partial",
+ "=": "set delimiter?",
+ "{": "no escape?",
+ "&": "no escape",
+ }
+
+ # Get the tag
+ try:
+ tag, template = template.split(r_del, 1)
+ except ValueError as e:
+ msg = "unclosed tag " f"at line {_CURRENT_LINE}"
+ raise ChevronError(msg) from e
+
+ # Find the type meaning of the first character
+ tag_type = tag_types.get(tag[0], "variable")
+
+ # If the type is not a variable
+ if tag_type != "variable":
+ # Then that first character is not needed
+ tag = tag[1:]
+
+ # If we might be a set delimiter tag
+ if tag_type == "set delimiter?":
+ # Double check to make sure we are
+ if tag.endswith("="):
+ tag_type = "set delimiter"
+ # Remove the equal sign
+ tag = tag[:-1]
+
+ # Otherwise we should complain
+ else:
+ msg = "unclosed set delimiter tag\n" f"at line {_CURRENT_LINE}"
+ raise ChevronError(msg)
+
+ elif (
+ # If we might be a no html escape tag
+ tag_type == "no escape?"
+ # And we have a third curly brace
+ # (And are using curly braces as delimiters)
+ and l_del == "{{"
+ and r_del == "}}"
+ and template.startswith("}")
+ ):
+ # Then we are a no html escape tag
+ template = template[1:]
+ tag_type = "no escape"
+
+ # Strip the whitespace off the key and return
+ return ((tag_type, tag.strip()), template)
+
+
+#
+# The main tokenizing function
+#
+
+
+def tokenize(template: str, def_ldel: str = "{{", def_rdel: str = "}}") -> Iterator[tuple[str, str]]:
+ """Tokenize a mustache template.
+
+ Tokenizes a mustache template in a generator fashion,
+ using file-like objects. It also accepts a string containing
+ the template.
+
+
+ Arguments:
+
+ template -- a file-like object, or a string of a mustache template
+
+ def_ldel -- The default left delimiter
+ ("{{" by default, as in spec compliant mustache)
+
+ def_rdel -- The default right delimiter
+ ("}}" by default, as in spec compliant mustache)
+
+
+ Returns:
+
+ A generator of mustache tags in the form of a tuple
+
+ -- (tag_type, tag_key)
+
+ Where tag_type is one of:
+ * literal
+ * section
+ * inverted section
+ * end
+ * partial
+ * no escape
+
+ And tag_key is either the key or in the case of a literal tag,
+ the literal itself.
+ """
+
+ global _CURRENT_LINE, _LAST_TAG_LINE
+ _CURRENT_LINE = 1
+ _LAST_TAG_LINE = None
+
+ is_standalone = True
+ open_sections = []
+ l_del = def_ldel
+ r_del = def_rdel
+
+ while template:
+ literal, template = grab_literal(template, l_del)
+
+ # If the template is completed
+ if not template:
+ # Then yield the literal and leave
+ yield ("literal", literal)
+ break
+
+ # Do the first check to see if we could be a standalone
+ is_standalone = l_sa_check(template, literal, is_standalone)
+
+ # Parse the tag
+ tag, template = parse_tag(template, l_del, r_del)
+ tag_type, tag_key = tag
+
+ # Special tag logic
+
+ # If we are a set delimiter tag
+ if tag_type == "set delimiter":
+ # Then get and set the delimiters
+ dels = tag_key.strip().split(" ")
+ l_del, r_del = dels[0], dels[-1]
+
+ # If we are a section tag
+ elif tag_type in ["section", "inverted section"]:
+ # Then open a new section
+ open_sections.append(tag_key)
+ _LAST_TAG_LINE = _CURRENT_LINE
+
+ # If we are an end tag
+ elif tag_type == "end":
+ # Then check to see if the last opened section
+ # is the same as us
+ try:
+ last_section = open_sections.pop()
+ except IndexError as e:
+ msg = f'Trying to close tag "{tag_key}"\n' "Looks like it was not opened.\n" f"line {_CURRENT_LINE + 1}"
+ raise ChevronError(msg) from e
+ if tag_key != last_section:
+ # Otherwise we need to complain
+ msg = (
+ f'Trying to close tag "{tag_key}"\n'
+ f'last open tag is "{last_section}"\n'
+ f"line {_CURRENT_LINE + 1}"
+ )
+ raise ChevronError(msg)
+
+ # Do the second check to see if we're a standalone
+ is_standalone = r_sa_check(template, tag_type, is_standalone)
+
+ # Which if we are
+ if is_standalone:
+ # Remove the stuff before the newline
+ template = template.split("\n", 1)[-1]
+
+ # Partials need to keep the spaces on their left
+ if tag_type != "partial":
+ # But other tags don't
+ literal = literal.rstrip(" ")
+
+ # Start yielding
+ # Ignore literals that are empty
+ if literal != "":
+ yield ("literal", literal)
+
+ # Ignore comments and set delimiters
+ if tag_type not in ["comment", "set delimiter?"]:
+ yield (tag_type, tag_key)
+
+ # If there are any open sections when we're done
+ if open_sections:
+ # Then we need to complain
+ msg = (
+ "Unexpected EOF\n"
+ f'the tag "{open_sections[-1]}" was never closed\n'
+ f"was opened at line {_LAST_TAG_LINE}"
+ )
+ raise ChevronError(msg)
+
+
+#
+# Helper functions
+#
+
+
+def _html_escape(string: str) -> str:
+ """HTML escape all of these " & < >"""
+
+ html_codes = {
+ '"': "&quot;",
+ "<": "&lt;",
+ ">": "&gt;",
+ }
+
+ # & must be handled first
+ string = string.replace("&", "&amp;")
+ for char in html_codes:
+ string = string.replace(char, html_codes[char])
+ return string
+
+
+def _get_key(
+ key: str,
+ scopes: Scopes,
+ warn: bool,
+ keep: bool,
+ def_ldel: str,
+ def_rdel: str,
+) -> Any:
+ """Get a key from the current scope"""
+
+ # If the key is a dot
+ if key == ".":
+ # Then just return the current scope
+ return scopes[0]
+
+ # Loop through the scopes
+ for scope in scopes:
+ try:
+ # Return an empty string if falsy, with two exceptions
+ # 0 should return 0, and False should return False
+ if scope in (0, False):
+ return scope
+
+ # For every dot separated key
+ for child in key.split("."):
+ # Return an empty string if falsy, with two exceptions
+ # 0 should return 0, and False should return False
+ if scope in (0, False):
+ return scope
+ # Move into the scope
+ try:
+ # Try subscripting (Normal dictionaries)
+ scope = cast(Dict[str, Any], scope)[child]
+ except (TypeError, AttributeError):
+ try:
+ scope = getattr(scope, child)
+ except (TypeError, AttributeError):
+ # Try as a list
+ scope = scope[int(child)] # type: ignore
+
+ try:
+ # This allows for custom falsy data types
+ # https://github.com/noahmorrison/chevron/issues/35
+ if scope._CHEVRON_return_scope_when_falsy: # type: ignore
+ return scope
+ except AttributeError:
+ if scope in (0, False):
+ return scope
+ return scope or ""
+ except (AttributeError, KeyError, IndexError, ValueError):
+ # We couldn't find the key in the current scope
+ # We'll try again on the next pass
+ pass
+
+ # We couldn't find the key in any of the scopes
+
+ if warn:
+ logger.warn(f"Could not find key '{key}'")
+
+ if keep:
+ return f"{def_ldel} {key} {def_rdel}"
+
+ return ""
+
+
+def _get_partial(name: str, partials_dict: Mapping[str, str]) -> str:
+ """Load a partial"""
+ try:
+ # Maybe the partial is in the dictionary
+ return partials_dict[name]
+ except KeyError:
+ return ""
+
+
+#
+# The main rendering function
+#
+g_token_cache: Dict[str, List[tuple[str, str]]] = {}
+
+EMPTY_DICT: MappingProxyType[str, str] = MappingProxyType({})
+
+
+def render(
+ template: Union[str, List[tuple[str, str]]] = "",
+ data: Mapping[str, Any] = EMPTY_DICT,
+ partials_dict: Mapping[str, str] = EMPTY_DICT,
+ padding: str = "",
+ def_ldel: str = "{{",
+ def_rdel: str = "}}",
+ scopes: Optional[Scopes] = None,
+ warn: bool = False,
+ keep: bool = False,
+) -> str:
+ """Render a mustache template.
+
+ Renders a mustache template with a data scope and inline partial capability.
+
+ Arguments:
+
+ template -- A file-like object or a string containing the template.
+
+ data -- A python dictionary with your data scope.
+
+ partials_path -- The path to where your partials are stored.
+ If set to None, then partials won't be loaded from the file system
+ (defaults to '.').
+
+ partials_ext -- The extension that you want the parser to look for
+ (defaults to 'mustache').
+
+ partials_dict -- A python dictionary which will be search for partials
+ before the filesystem is. {'include': 'foo'} is the same
+ as a file called include.mustache
+ (defaults to {}).
+
+ padding -- This is for padding partials, and shouldn't be used
+ (but can be if you really want to).
+
+ def_ldel -- The default left delimiter
+ ("{{" by default, as in spec compliant mustache).
+
+ def_rdel -- The default right delimiter
+ ("}}" by default, as in spec compliant mustache).
+
+ scopes -- The list of scopes that get_key will look through.
+
+ warn -- Log a warning when a template substitution isn't found in the data
+
+ keep -- Keep unreplaced tags when a substitution isn't found in the data.
+
+
+ Returns:
+
+ A string containing the rendered template.
+ """
+
+ # If the template is a sequence but not derived from a string
+ if isinstance(template, Sequence) and not isinstance(template, str):
+ # Then we don't need to tokenize it
+ # But it does need to be a generator
+ tokens: Iterator[tuple[str, str]] = (token for token in template)
+ else:
+ if template in g_token_cache:
+ tokens = (token for token in g_token_cache[template])
+ else:
+ # Otherwise make a generator
+ tokens = tokenize(template, def_ldel, def_rdel)
+
+ output = ""
+
+ if scopes is None:
+ scopes = [data]
+
+ # Run through the tokens
+ for tag, key in tokens:
+ # Set the current scope
+ current_scope = scopes[0]
+
+ # If we're an end tag
+ if tag == "end":
+ # Pop out of the latest scope
+ del scopes[0]
+
+ # If the current scope is falsy and not the only scope
+ elif not current_scope and len(scopes) != 1:
+ if tag in ["section", "inverted section"]:
+ # Set the most recent scope to a falsy value
+ scopes.insert(0, False)
+
+ # If we're a literal tag
+ elif tag == "literal":
+ # Add padding to the key and add it to the output
+ output += key.replace("\n", "\n" + padding)
+
+ # If we're a variable tag
+ elif tag == "variable":
+ # Add the html escaped key to the output
+ thing = _get_key(key, scopes, warn=warn, keep=keep, def_ldel=def_ldel, def_rdel=def_rdel)
+ if thing is True and key == ".":
+ # if we've coerced into a boolean by accident
+ # (inverted tags do this)
+ # then get the un-coerced object (next in the stack)
+ thing = scopes[1]
+ if not isinstance(thing, str):
+ thing = str(thing)
+ output += _html_escape(thing)
+
+ # If we're a no html escape tag
+ elif tag == "no escape":
+ # Just lookup the key and add it
+ thing = _get_key(key, scopes, warn=warn, keep=keep, def_ldel=def_ldel, def_rdel=def_rdel)
+ if not isinstance(thing, str):
+ thing = str(thing)
+ output += thing
+
+ # If we're a section tag
+ elif tag == "section":
+ # Get the sections scope
+ scope = _get_key(key, scopes, warn=warn, keep=keep, def_ldel=def_ldel, def_rdel=def_rdel)
+
+ # If the scope is a callable (as described in
+ # https://mustache.github.io/mustache.5.html)
+ if callable(scope):
+ # Generate template text from tags
+ text = ""
+ tags: List[tuple[str, str]] = []
+ for token in tokens:
+ if token == ("end", key):
+ break
+
+ tags.append(token)
+ tag_type, tag_key = token
+ if tag_type == "literal":
+ text += tag_key
+ elif tag_type == "no escape":
+ text += f"{def_ldel}& {tag_key} {def_rdel}"
+ else:
+ text += "{}{} {}{}".format(
+ def_ldel,
+ {
+ "comment": "!",
+ "section": "#",
+ "inverted section": "^",
+ "end": "/",
+ "partial": ">",
+ "set delimiter": "=",
+ "no escape": "&",
+ "variable": "",
+ }[tag_type],
+ tag_key,
+ def_rdel,
+ )
+
+ g_token_cache[text] = tags
+
+ rend = scope(
+ text,
+ lambda template, data=None: render(
+ template,
+ data={},
+ partials_dict=partials_dict,
+ padding=padding,
+ def_ldel=def_ldel,
+ def_rdel=def_rdel,
+ scopes=data and [data] + scopes or scopes,
+ warn=warn,
+ keep=keep,
+ ),
+ )
+
+ output += rend # type: ignore[reportOperatorIssue]
+
+ # If the scope is a sequence, an iterator or generator but not
+ # derived from a string
+ elif isinstance(scope, (Sequence, Iterator)) and not isinstance(scope, str):
+ # Then we need to do some looping
+
+ # Gather up all the tags inside the section
+ # (And don't be tricked by nested end tags with the same key)
+ # TODO: This feels like it still has edge cases, no?
+ tags = []
+ tags_with_same_key = 0
+ for token in tokens:
+ if token == ("section", key):
+ tags_with_same_key += 1
+ if token == ("end", key):
+ tags_with_same_key -= 1
+ if tags_with_same_key < 0:
+ break
+ tags.append(token)
+
+ # For every item in the scope
+ for thing in scope:
+ # Append it as the most recent scope and render
+ new_scope = [thing] + scopes
+ rend = render(
+ template=tags,
+ scopes=new_scope,
+ padding=padding,
+ partials_dict=partials_dict,
+ def_ldel=def_ldel,
+ def_rdel=def_rdel,
+ warn=warn,
+ keep=keep,
+ )
+
+ output += rend
+
+ else:
+ # Otherwise we're just a scope section
+ scopes.insert(0, scope) # type: ignore[reportArgumentType]
+
+ # If we're an inverted section
+ elif tag == "inverted section":
+ # Add the flipped scope to the scopes
+ scope = _get_key(key, scopes, warn=warn, keep=keep, def_ldel=def_ldel, def_rdel=def_rdel)
+ scopes.insert(0, cast(Literal[False], not scope))
+
+ # If we're a partial
+ elif tag == "partial":
+ # Load the partial
+ partial = _get_partial(key, partials_dict)
+
+ # Find what to pad the partial with
+ left = output.rpartition("\n")[2]
+ part_padding = padding
+ if left.isspace():
+ part_padding += left
+
+ # Render the partial
+ part_out = render(
+ template=partial,
+ partials_dict=partials_dict,
+ def_ldel=def_ldel,
+ def_rdel=def_rdel,
+ padding=part_padding,
+ scopes=scopes,
+ warn=warn,
+ keep=keep,
+ )
+
+ # If the partial was indented
+ if left.isspace():
+ # then remove the spaces from the end
+ part_out = part_out.rstrip(" \t")
+
+ # Add the partials output to the output
+ output += part_out
+
+ return output
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_parsers.py b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_parsers.py
new file mode 100644
index 00000000..de3c570e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_parsers.py
@@ -0,0 +1,156 @@
+# ------------------------------------
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+# ------------------------------------
+# mypy: disable-error-code="union-attr,return-value"
+# pylint: disable=line-too-long,R,consider-using-enumerate,docstring-missing-param,docstring-missing-return,docstring-missing-rtype
+import re
+import base64
+from pathlib import Path
+from typing import Any, Union
+from ._core import Prompty
+from ._invoker import Invoker, InvokerFactory
+
+
+ROLES = ["assistant", "function", "system", "user"]
+
+
+@InvokerFactory.register_parser("prompty.chat")
+class PromptyChatParser(Invoker):
+ """Prompty Chat Parser"""
+
+ def __init__(self, prompty: Prompty) -> None:
+ super().__init__(prompty)
+ self.path = Path(self.prompty.file).parent
+
+ def invoke(self, data: str) -> Any:
+ return invoke_parser(self.path, data)
+
+ async def invoke_async(self, data: str) -> Any:
+ """Invoke the Prompty Chat Parser (Async)
+
+ Parameters
+ ----------
+ data : str
+ The data to parse
+
+ Returns
+ -------
+ str
+ The parsed data
+ """
+ return self.invoke(data)
+
+
+def _inline_image(path: Union[Path, None], image_item: str) -> str:
+ """Inline Image
+
+ Parameters
+ ----------
+ image_item : str
+ The image item to inline
+
+ Returns
+ -------
+ str
+ The inlined image
+ """
+ # pass through if it's a url or base64 encoded or the path is None
+ if image_item.startswith("http") or image_item.startswith("data") or path is None:
+ return image_item
+ # otherwise, it's a local file - need to base64 encode it
+ else:
+ image_path = (path if path is not None else Path(".")) / image_item
+ with open(image_path, "rb") as f:
+ base64_image = base64.b64encode(f.read()).decode("utf-8")
+
+ if image_path.suffix == ".png":
+ return f"data:image/png;base64,{base64_image}"
+ elif image_path.suffix == ".jpg":
+ return f"data:image/jpeg;base64,{base64_image}"
+ elif image_path.suffix == ".jpeg":
+ return f"data:image/jpeg;base64,{base64_image}"
+ else:
+ raise ValueError(
+ f"Invalid image format {image_path.suffix} - currently only .png and .jpg / .jpeg are supported."
+ )
+
+
+def _parse_content(path: Union[Path, None], content: str):
+ """for parsing inline images
+
+ Parameters
+ ----------
+ content : str
+ The content to parse
+
+ Returns
+ -------
+ any
+ The parsed content
+ """
+ # regular expression to parse markdown images
+ image = r"(?P<alt>!\[[^\]]*\])\((?P<filename>.*?)(?=\"|\))\)"
+ matches = re.findall(image, content, flags=re.MULTILINE)
+ if len(matches) > 0:
+ content_items = []
+ content_chunks = re.split(image, content, flags=re.MULTILINE)
+ current_chunk = 0
+ for i in range(len(content_chunks)):
+ # image entry
+ if current_chunk < len(matches) and content_chunks[i] == matches[current_chunk][0]:
+ content_items.append(
+ {
+ "type": "image_url",
+ "image_url": {"url": _inline_image(path, matches[current_chunk][1].split(" ")[0].strip())},
+ }
+ )
+ # second part of image entry
+ elif current_chunk < len(matches) and content_chunks[i] == matches[current_chunk][1]:
+ current_chunk += 1
+ # text entry
+ else:
+ if len(content_chunks[i].strip()) > 0:
+ content_items.append({"type": "text", "text": content_chunks[i].strip()})
+ return content_items
+ else:
+ return content
+
+
+def invoke_parser(path: Union[Path, None], data: str) -> Any:
+ """Invoke the Prompty Chat Parser
+
+ Parameters
+ ----------
+ data : str
+ The data to parse
+
+ Returns
+ -------
+ str
+ The parsed data
+ """
+ messages = []
+ separator = r"(?i)^\s*#?\s*(" + "|".join(ROLES) + r")\s*:\s*\n"
+
+ # get valid chunks - remove empty items
+ chunks = [item for item in re.split(separator, data, flags=re.MULTILINE) if len(item.strip()) > 0]
+
+ # if no starter role, then inject system role
+ if not chunks[0].strip().lower() in ROLES:
+ chunks.insert(0, "system")
+
+ # if last chunk is role entry, then remove (no content?)
+ if chunks[-1].strip().lower() in ROLES:
+ chunks.pop()
+
+ if len(chunks) % 2 != 0:
+ raise ValueError("Invalid prompt format")
+
+ # create messages
+ for i in range(0, len(chunks), 2):
+ role = chunks[i].strip().lower()
+ content = chunks[i + 1].strip()
+ messages.append({"role": role, "content": _parse_content(path, content)})
+
+ return messages
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_patch.py b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_patch.py
new file mode 100644
index 00000000..14ad4f62
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_patch.py
@@ -0,0 +1,124 @@
+# ------------------------------------
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+# ------------------------------------
+# pylint: disable=line-too-long,R
+"""Customize generated code here.
+
+Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize
+"""
+
+import traceback
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+from typing_extensions import Self
+from ._core import Prompty
+from ._mustache import render
+from ._parsers import invoke_parser
+from ._prompty_utils import load, prepare
+from ._utils import remove_leading_empty_space
+
+
+class PromptTemplate:
+ """The helper class which takes variant of inputs, e.g. Prompty format or string, and returns the parsed prompt in an array."""
+
+ @classmethod
+ def from_prompty(cls, file_path: str) -> Self:
+ """Initialize a PromptTemplate object from a prompty file.
+
+ :param file_path: The path to the prompty file.
+ :type file_path: str
+ :return: The PromptTemplate object.
+ :rtype: PromptTemplate
+ """
+ if not file_path:
+ raise ValueError("Please provide file_path")
+
+ # Get the absolute path of the file by `traceback.extract_stack()`, it's "-2" because:
+ # In the stack, the last function is the current function.
+ # The second last function is the caller function, which is the root of the file_path.
+ stack = traceback.extract_stack()
+ caller = Path(stack[-2].filename)
+ abs_file_path = Path(caller.parent / Path(file_path)).resolve().absolute()
+
+ prompty = load(str(abs_file_path))
+ return cls(prompty=prompty)
+
+ @classmethod
+ def from_string(cls, prompt_template: str, api: str = "chat", model_name: Optional[str] = None) -> Self:
+ """Initialize a PromptTemplate object from a message template.
+
+ :param prompt_template: The prompt template string.
+ :type prompt_template: str
+ :param api: The API type, e.g. "chat" or "completion".
+ :type api: str
+ :param model_name: The model name, e.g. "gpt-4o-mini".
+ :type model_name: str
+ :return: The PromptTemplate object.
+ :rtype: PromptTemplate
+ """
+ return cls(
+ api=api,
+ prompt_template=prompt_template,
+ model_name=model_name,
+ prompty=None,
+ )
+
+ def __init__(
+ self,
+ *,
+ api: str = "chat",
+ prompty: Optional[Prompty] = None,
+ prompt_template: Optional[str] = None,
+ model_name: Optional[str] = None,
+ ) -> None:
+ self.prompty = prompty
+ if self.prompty is not None:
+ self.model_name = (
+ self.prompty.model.configuration["azure_deployment"]
+ if "azure_deployment" in self.prompty.model.configuration
+ else None
+ )
+ self.parameters = self.prompty.model.parameters
+ self._config = {}
+ elif prompt_template is not None:
+ self.model_name = model_name
+ self.parameters = {}
+ # _config is a dict to hold the internal configuration
+ self._config = {
+ "api": api if api is not None else "chat",
+ "prompt_template": prompt_template,
+ }
+ else:
+ raise ValueError("Please pass valid arguments for PromptTemplate")
+
+ def create_messages(self, data: Optional[Dict[str, Any]] = None, **kwargs) -> List[Dict[str, Any]]:
+ """Render the prompt template with the given data.
+
+ :param data: The data to render the prompt template with.
+ :type data: Optional[Dict[str, Any]]
+ :return: The rendered prompt template.
+ :rtype: List[Dict[str, Any]]
+ """
+ if data is None:
+ data = kwargs
+
+ if self.prompty is not None:
+ parsed = prepare(self.prompty, data)
+ return parsed
+ elif "prompt_template" in self._config:
+ prompt_template = remove_leading_empty_space(self._config["prompt_template"])
+ system_prompt_str = render(prompt_template, data)
+ parsed = invoke_parser(None, system_prompt_str)
+ return parsed
+ else:
+ raise ValueError("Please provide valid prompt template")
+
+
+def patch_sdk():
+ """Do not remove from this file.
+
+ `patch_sdk` is a last resort escape hatch that allows you to do customizations
+ you can't accomplish using the techniques described in
+ https://aka.ms/azsdk/python/dpcodegen/python/customize
+ """
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_prompty_utils.py b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_prompty_utils.py
new file mode 100644
index 00000000..5ea38bda
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_prompty_utils.py
@@ -0,0 +1,415 @@
+# ------------------------------------
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+# ------------------------------------
+# mypy: disable-error-code="assignment"
+# pylint: disable=R,docstring-missing-param,docstring-missing-return,docstring-missing-rtype,dangerous-default-value,redefined-outer-name,unused-wildcard-import,wildcard-import,raise-missing-from
+import traceback
+from pathlib import Path
+from typing import Any, Dict, List, Union
+from ._tracer import trace
+from ._invoker import InvokerFactory
+from ._core import (
+ ModelSettings,
+ Prompty,
+ PropertySettings,
+ TemplateSettings,
+ param_hoisting,
+)
+from ._utils import (
+ load_global_config,
+ load_prompty,
+)
+
+from ._renderers import *
+from ._parsers import *
+
+
+@trace(description="Create a headless prompty object for programmatic use.")
+def headless(
+ api: str,
+ content: Union[str, List[str], dict],
+ configuration: Dict[str, Any] = {},
+ parameters: Dict[str, Any] = {},
+ connection: str = "default",
+) -> Prompty:
+ """Create a headless prompty object for programmatic use.
+
+ Parameters
+ ----------
+ api : str
+ The API to use for the model
+ content : Union[str, List[str], dict]
+ The content to process
+ configuration : Dict[str, Any], optional
+ The configuration to use, by default {}
+ parameters : Dict[str, Any], optional
+ The parameters to use, by default {}
+ connection : str, optional
+ The connection to use, by default "default"
+
+ Returns
+ -------
+ Prompty
+ The headless prompty object
+
+ Example
+ -------
+ >>> import prompty
+ >>> p = prompty.headless(
+ api="embedding",
+ configuration={"type": "azure", "azure_deployment": "text-embedding-ada-002"},
+ content="hello world",
+ )
+ >>> emb = prompty.execute(p)
+
+ """
+
+ # get caller's path (to get relative path for prompty.json)
+ caller = Path(traceback.extract_stack()[-2].filename)
+ templateSettings = TemplateSettings(type="NOOP", parser="NOOP")
+ modelSettings = ModelSettings(
+ api=api,
+ configuration=Prompty.normalize(
+ param_hoisting(configuration, load_global_config(caller.parent, connection)),
+ caller.parent,
+ ),
+ parameters=parameters,
+ )
+
+ return Prompty(model=modelSettings, template=templateSettings, content=content)
+
+
+def _load_raw_prompty(attributes: dict, content: str, p: Path, global_config: dict):
+ if "model" not in attributes:
+ attributes["model"] = {}
+
+ if "configuration" not in attributes["model"]:
+ attributes["model"]["configuration"] = global_config
+ else:
+ attributes["model"]["configuration"] = param_hoisting(
+ attributes["model"]["configuration"],
+ global_config,
+ )
+
+ # pull model settings out of attributes
+ try:
+ model = ModelSettings(**attributes.pop("model"))
+ except Exception as e:
+ raise ValueError(f"Error in model settings: {e}")
+
+ # pull template settings
+ try:
+ if "template" in attributes:
+ t = attributes.pop("template")
+ if isinstance(t, dict):
+ template = TemplateSettings(**t)
+ # has to be a string denoting the type
+ else:
+ template = TemplateSettings(type=t, parser="prompty")
+ else:
+ template = TemplateSettings(type="mustache", parser="prompty")
+ except Exception as e:
+ raise ValueError(f"Error in template loader: {e}")
+
+ # formalize inputs and outputs
+ if "inputs" in attributes:
+ try:
+ inputs = {k: PropertySettings(**v) for (k, v) in attributes.pop("inputs").items()}
+ except Exception as e:
+ raise ValueError(f"Error in inputs: {e}")
+ else:
+ inputs = {}
+ if "outputs" in attributes:
+ try:
+ outputs = {k: PropertySettings(**v) for (k, v) in attributes.pop("outputs").items()}
+ except Exception as e:
+ raise ValueError(f"Error in outputs: {e}")
+ else:
+ outputs = {}
+
+ prompty = Prompty(
+ **attributes,
+ model=model,
+ inputs=inputs,
+ outputs=outputs,
+ template=template,
+ content=content,
+ file=p,
+ )
+
+ return prompty
+
+
+@trace(description="Load a prompty file.")
+def load(prompty_file: Union[str, Path], configuration: str = "default") -> Prompty:
+ """Load a prompty file.
+
+ Parameters
+ ----------
+ prompty_file : Union[str, Path]
+ The path to the prompty file
+ configuration : str, optional
+ The configuration to use, by default "default"
+
+ Returns
+ -------
+ Prompty
+ The loaded prompty object
+
+ Example
+ -------
+ >>> import prompty
+ >>> p = prompty.load("prompts/basic.prompty")
+ >>> print(p)
+ """
+
+ p = Path(prompty_file)
+ if not p.is_absolute():
+ # get caller's path (take into account trace frame)
+ caller = Path(traceback.extract_stack()[-3].filename)
+ p = Path(caller.parent / p).resolve().absolute()
+
+ # load dictionary from prompty file
+ matter = load_prompty(p)
+
+ attributes = matter["attributes"]
+ content = matter["body"]
+
+ # normalize attribute dictionary resolve keys and files
+ attributes = Prompty.normalize(attributes, p.parent)
+
+ # load global configuration
+ global_config = Prompty.normalize(load_global_config(p.parent, configuration), p.parent)
+
+ prompty = _load_raw_prompty(attributes, content, p, global_config)
+
+ # recursive loading of base prompty
+ if "base" in attributes:
+ # load the base prompty from the same directory as the current prompty
+ base = load(p.parent / attributes["base"])
+ prompty = Prompty.hoist_base_prompty(prompty, base)
+
+ return prompty
+
+
+@trace(description="Prepare the inputs for the prompt.")
+def prepare(
+ prompt: Prompty,
+ inputs: Dict[str, Any] = {},
+):
+ """Prepare the inputs for the prompt.
+
+ Parameters
+ ----------
+ prompt : Prompty
+ The prompty object
+ inputs : Dict[str, Any], optional
+ The inputs to the prompt, by default {}
+
+ Returns
+ -------
+ dict
+ The prepared and hidrated template shaped to the LLM model
+
+ Example
+ -------
+ >>> import prompty
+ >>> p = prompty.load("prompts/basic.prompty")
+ >>> inputs = {"name": "John Doe"}
+ >>> content = prompty.prepare(p, inputs)
+ """
+ inputs = param_hoisting(inputs, prompt.sample)
+
+ render = InvokerFactory.run_renderer(prompt, inputs, prompt.content)
+ result = InvokerFactory.run_parser(prompt, render)
+
+ return result
+
+
+@trace(description="Prepare the inputs for the prompt.")
+async def prepare_async(
+ prompt: Prompty,
+ inputs: Dict[str, Any] = {},
+):
+ """Prepare the inputs for the prompt.
+
+ Parameters
+ ----------
+ prompt : Prompty
+ The prompty object
+ inputs : Dict[str, Any], optional
+ The inputs to the prompt, by default {}
+
+ Returns
+ -------
+ dict
+ The prepared and hidrated template shaped to the LLM model
+
+ Example
+ -------
+ >>> import prompty
+ >>> p = prompty.load("prompts/basic.prompty")
+ >>> inputs = {"name": "John Doe"}
+ >>> content = await prompty.prepare_async(p, inputs)
+ """
+ inputs = param_hoisting(inputs, prompt.sample)
+
+ render = await InvokerFactory.run_renderer_async(prompt, inputs, prompt.content)
+ result = await InvokerFactory.run_parser_async(prompt, render)
+
+ return result
+
+
+@trace(description="Run the prepared Prompty content against the model.")
+def run(
+ prompt: Prompty,
+ content: Union[dict, list, str],
+ configuration: Dict[str, Any] = {},
+ parameters: Dict[str, Any] = {},
+ raw: bool = False,
+):
+ """Run the prepared Prompty content.
+
+ Parameters
+ ----------
+ prompt : Prompty
+ The prompty object
+ content : Union[dict, list, str]
+ The content to process
+ configuration : Dict[str, Any], optional
+ The configuration to use, by default {}
+ parameters : Dict[str, Any], optional
+ The parameters to use, by default {}
+ raw : bool, optional
+ Whether to skip processing, by default False
+
+ Returns
+ -------
+ Any
+ The result of the prompt
+
+ Example
+ -------
+ >>> import prompty
+ >>> p = prompty.load("prompts/basic.prompty")
+ >>> inputs = {"name": "John Doe"}
+ >>> content = prompty.prepare(p, inputs)
+ >>> result = prompty.run(p, content)
+ """
+
+ if configuration != {}:
+ prompt.model.configuration = param_hoisting(configuration, prompt.model.configuration)
+
+ if parameters != {}:
+ prompt.model.parameters = param_hoisting(parameters, prompt.model.parameters)
+
+ result = InvokerFactory.run_executor(prompt, content)
+ if not raw:
+ result = InvokerFactory.run_processor(prompt, result)
+
+ return result
+
+
+@trace(description="Run the prepared Prompty content against the model.")
+async def run_async(
+ prompt: Prompty,
+ content: Union[dict, list, str],
+ configuration: Dict[str, Any] = {},
+ parameters: Dict[str, Any] = {},
+ raw: bool = False,
+):
+ """Run the prepared Prompty content.
+
+ Parameters
+ ----------
+ prompt : Prompty
+ The prompty object
+ content : Union[dict, list, str]
+ The content to process
+ configuration : Dict[str, Any], optional
+ The configuration to use, by default {}
+ parameters : Dict[str, Any], optional
+ The parameters to use, by default {}
+ raw : bool, optional
+ Whether to skip processing, by default False
+
+ Returns
+ -------
+ Any
+ The result of the prompt
+
+ Example
+ -------
+ >>> import prompty
+ >>> p = prompty.load("prompts/basic.prompty")
+ >>> inputs = {"name": "John Doe"}
+ >>> content = await prompty.prepare_async(p, inputs)
+ >>> result = await prompty.run_async(p, content)
+ """
+
+ if configuration != {}:
+ prompt.model.configuration = param_hoisting(configuration, prompt.model.configuration)
+
+ if parameters != {}:
+ prompt.model.parameters = param_hoisting(parameters, prompt.model.parameters)
+
+ result = await InvokerFactory.run_executor_async(prompt, content)
+ if not raw:
+ result = await InvokerFactory.run_processor_async(prompt, result)
+
+ return result
+
+
+@trace(description="Execute a prompty")
+def execute(
+ prompt: Union[str, Prompty],
+ configuration: Dict[str, Any] = {},
+ parameters: Dict[str, Any] = {},
+ inputs: Dict[str, Any] = {},
+ raw: bool = False,
+ config_name: str = "default",
+):
+ """Execute a prompty.
+
+ Parameters
+ ----------
+ prompt : Union[str, Prompty]
+ The prompty object or path to the prompty file
+ configuration : Dict[str, Any], optional
+ The configuration to use, by default {}
+ parameters : Dict[str, Any], optional
+ The parameters to use, by default {}
+ inputs : Dict[str, Any], optional
+ The inputs to the prompt, by default {}
+ raw : bool, optional
+ Whether to skip processing, by default False
+ connection : str, optional
+ The connection to use, by default "default"
+
+ Returns
+ -------
+ Any
+ The result of the prompt
+
+ Example
+ -------
+ >>> import prompty
+ >>> inputs = {"name": "John Doe"}
+ >>> result = prompty.execute("prompts/basic.prompty", inputs=inputs)
+ """
+ if isinstance(prompt, str):
+ path = Path(prompt)
+ if not path.is_absolute():
+ # get caller's path (take into account trace frame)
+ caller = Path(traceback.extract_stack()[-3].filename)
+ path = Path(caller.parent / path).resolve().absolute()
+ prompt = load(path, config_name)
+
+ # prepare content
+ content = prepare(prompt, inputs)
+
+ # run LLM model
+ result = run(prompt, content, configuration, parameters, raw)
+
+ return result
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_renderers.py b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_renderers.py
new file mode 100644
index 00000000..0d682a7f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_renderers.py
@@ -0,0 +1,30 @@
+# ------------------------------------
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+# ------------------------------------
+# mypy: disable-error-code="union-attr,assignment,arg-type"
+from pathlib import Path
+from ._core import Prompty
+from ._invoker import Invoker, InvokerFactory
+from ._mustache import render
+
+
+@InvokerFactory.register_renderer("mustache")
+class MustacheRenderer(Invoker):
+ """Render a mustache template."""
+
+ def __init__(self, prompty: Prompty) -> None:
+ super().__init__(prompty)
+ self.templates = {}
+ cur_prompt = self.prompty
+ while cur_prompt:
+ self.templates[Path(cur_prompt.file).name] = cur_prompt.content
+ cur_prompt = cur_prompt.basePrompty
+ self.name = Path(self.prompty.file).name
+
+ def invoke(self, data: str) -> str:
+ generated = render(self.prompty.content, data) # type: ignore
+ return generated
+
+ async def invoke_async(self, data: str) -> str:
+ return self.invoke(data)
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py
new file mode 100644
index 00000000..24f800b4
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_tracer.py
@@ -0,0 +1,316 @@
+# ------------------------------------
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+# ------------------------------------
+# mypy: disable-error-code="union-attr,arg-type,misc,return-value,assignment,func-returns-value"
+# pylint: disable=R,redefined-outer-name,bare-except,unspecified-encoding
+import os
+import json
+import inspect
+import traceback
+import importlib
+import contextlib
+from pathlib import Path
+from numbers import Number
+from datetime import datetime
+from functools import wraps, partial
+from typing import Any, Callable, Dict, Iterator, List, Union
+
+
+# clean up key value pairs for sensitive values
+def sanitize(key: str, value: Any) -> Any:
+ if isinstance(value, str) and any([s in key.lower() for s in ["key", "token", "secret", "password", "credential"]]):
+ return len(str(value)) * "*"
+
+ if isinstance(value, dict):
+ return {k: sanitize(k, v) for k, v in value.items()}
+
+ return value
+
+
+class Tracer:
+ _tracers: Dict[str, Callable[[str], Iterator[Callable[[str, Any], None]]]] = {}
+
+ @classmethod
+ def add(cls, name: str, tracer: Callable[[str], Iterator[Callable[[str, Any], None]]]) -> None:
+ cls._tracers[name] = tracer
+
+ @classmethod
+ def clear(cls) -> None:
+ cls._tracers = {}
+
+ @classmethod
+ @contextlib.contextmanager
+ def start(cls, name: str) -> Iterator[Callable[[str, Any], None]]:
+ with contextlib.ExitStack() as stack:
+ traces: List[Any] = [stack.enter_context(tracer(name)) for tracer in cls._tracers.values()] # type: ignore
+ yield lambda key, value: [ # type: ignore
+ # normalize and sanitize any trace values
+ trace(key, sanitize(key, to_dict(value)))
+ for trace in traces
+ ]
+
+
+def to_dict(obj: Any) -> Union[Dict[str, Any], List[Dict[str, Any]], str, Number, bool]:
+ # simple json types
+ if isinstance(obj, str) or isinstance(obj, Number) or isinstance(obj, bool):
+ return obj
+
+ # datetime
+ if isinstance(obj, datetime):
+ return obj.isoformat()
+
+ # safe Prompty obj serialization
+ if type(obj).__name__ == "Prompty":
+ return obj.to_safe_dict()
+
+ # safe PromptyStream obj serialization
+ if type(obj).__name__ == "PromptyStream":
+ return "PromptyStream"
+
+ if type(obj).__name__ == "AsyncPromptyStream":
+ return "AsyncPromptyStream"
+
+ # recursive list and dict
+ if isinstance(obj, List):
+ return [to_dict(item) for item in obj] # type: ignore
+
+ if isinstance(obj, Dict):
+ return {k: v if isinstance(v, str) else to_dict(v) for k, v in obj.items()}
+
+ if isinstance(obj, Path):
+ return str(obj)
+
+ # cast to string otherwise...
+ return str(obj)
+
+
+def _name(func: Callable, args):
+ if hasattr(func, "__qualname__"):
+ signature = f"{func.__module__}.{func.__qualname__}"
+ else:
+ signature = f"{func.__module__}.{func.__name__}"
+
+ # core invoker gets special treatment prompty.invoker.Invoker
+ core_invoker = signature.startswith("prompty.invoker.Invoker.run")
+ if core_invoker:
+ name = type(args[0]).__name__
+ if signature.endswith("async"):
+ signature = f"{args[0].__module__}.{args[0].__class__.__name__}.invoke_async"
+ else:
+ signature = f"{args[0].__module__}.{args[0].__class__.__name__}.invoke"
+ else:
+ name = func.__name__
+
+ return name, signature
+
+
+def _inputs(func: Callable, args, kwargs) -> dict:
+ ba = inspect.signature(func).bind(*args, **kwargs)
+ ba.apply_defaults()
+
+ inputs = {k: to_dict(v) for k, v in ba.arguments.items() if k != "self"}
+
+ return inputs
+
+
+def _results(result: Any) -> Union[Dict, List[Dict], str, Number, bool]:
+ return to_dict(result) if result is not None else "None"
+
+
+def _trace_sync(func: Union[Callable, None] = None, **okwargs: Any) -> Callable:
+
+ @wraps(func) # type: ignore
+ def wrapper(*args, **kwargs):
+ name, signature = _name(func, args) # type: ignore
+ with Tracer.start(name) as trace:
+ trace("signature", signature)
+
+ # support arbitrary keyword
+ # arguments for trace decorator
+ for k, v in okwargs.items():
+ trace(k, to_dict(v))
+
+ inputs = _inputs(func, args, kwargs) # type: ignore
+ trace("inputs", inputs)
+
+ try:
+ result = func(*args, **kwargs) # type: ignore
+ trace("result", _results(result))
+ except Exception as e:
+ trace(
+ "result",
+ {
+ "exception": {
+ "type": type(e),
+ "traceback": (traceback.format_tb(tb=e.__traceback__) if e.__traceback__ else None),
+ "message": str(e),
+ "args": to_dict(e.args),
+ }
+ },
+ )
+ raise e
+
+ return result
+
+ return wrapper
+
+
+def _trace_async(func: Union[Callable, None] = None, **okwargs: Any) -> Callable:
+
+ @wraps(func) # type: ignore
+ async def wrapper(*args, **kwargs):
+ name, signature = _name(func, args) # type: ignore
+ with Tracer.start(name) as trace:
+ trace("signature", signature)
+
+ # support arbitrary keyword
+ # arguments for trace decorator
+ for k, v in okwargs.items():
+ trace(k, to_dict(v))
+
+ inputs = _inputs(func, args, kwargs) # type: ignore
+ trace("inputs", inputs)
+ try:
+ result = await func(*args, **kwargs) # type: ignore
+ trace("result", _results(result))
+ except Exception as e:
+ trace(
+ "result",
+ {
+ "exception": {
+ "type": type(e),
+ "traceback": (traceback.format_tb(tb=e.__traceback__) if e.__traceback__ else None),
+ "message": str(e),
+ "args": to_dict(e.args),
+ }
+ },
+ )
+ raise e
+
+ return result
+
+ return wrapper
+
+
+def trace(func: Union[Callable, None] = None, **kwargs: Any) -> Callable:
+ if func is None:
+ return partial(trace, **kwargs)
+ wrapped_method = _trace_async if inspect.iscoroutinefunction(func) else _trace_sync
+ return wrapped_method(func, **kwargs)
+
+
+class PromptyTracer:
+ def __init__(self, output_dir: Union[str, None] = None) -> None:
+ if output_dir:
+ self.output = Path(output_dir).resolve().absolute()
+ else:
+ self.output = Path(Path(os.getcwd()) / ".runs").resolve().absolute()
+
+ if not self.output.exists():
+ self.output.mkdir(parents=True, exist_ok=True)
+
+ self.stack: List[Dict[str, Any]] = []
+
+ @contextlib.contextmanager
+ def tracer(self, name: str) -> Iterator[Callable[[str, Any], None]]:
+ try:
+ self.stack.append({"name": name})
+ frame = self.stack[-1]
+ frame["__time"] = {
+ "start": datetime.now(),
+ }
+
+ def add(key: str, value: Any) -> None:
+ if key not in frame:
+ frame[key] = value
+ # multiple values creates list
+ else:
+ if isinstance(frame[key], list):
+ frame[key].append(value)
+ else:
+ frame[key] = [frame[key], value]
+
+ yield add
+ finally:
+ frame = self.stack.pop()
+ start: datetime = frame["__time"]["start"]
+ end: datetime = datetime.now()
+
+ # add duration to frame
+ frame["__time"] = {
+ "start": start.strftime("%Y-%m-%dT%H:%M:%S.%f"),
+ "end": end.strftime("%Y-%m-%dT%H:%M:%S.%f"),
+ "duration": int((end - start).total_seconds() * 1000),
+ }
+
+ # hoist usage to parent frame
+ if "result" in frame and isinstance(frame["result"], dict):
+ if "usage" in frame["result"]:
+ frame["__usage"] = self.hoist_item(
+ frame["result"]["usage"],
+ frame["__usage"] if "__usage" in frame else {},
+ )
+
+ # streamed results may have usage as well
+ if "result" in frame and isinstance(frame["result"], list):
+ for result in frame["result"]:
+ if isinstance(result, dict) and "usage" in result and isinstance(result["usage"], dict):
+ frame["__usage"] = self.hoist_item(
+ result["usage"],
+ frame["__usage"] if "__usage" in frame else {},
+ )
+
+ # add any usage frames from below
+ if "__frames" in frame:
+ for child in frame["__frames"]:
+ if "__usage" in child:
+ frame["__usage"] = self.hoist_item(
+ child["__usage"],
+ frame["__usage"] if "__usage" in frame else {},
+ )
+
+ # if stack is empty, dump the frame
+ if len(self.stack) == 0:
+ self.write_trace(frame)
+ # otherwise, append the frame to the parent
+ else:
+ if "__frames" not in self.stack[-1]:
+ self.stack[-1]["__frames"] = []
+ self.stack[-1]["__frames"].append(frame)
+
+ def hoist_item(self, src: Dict[str, Any], cur: Dict[str, Any]) -> Dict[str, Any]:
+ for key, value in src.items():
+ if value is None or isinstance(value, list) or isinstance(value, dict):
+ continue
+ try:
+ if key not in cur:
+ cur[key] = value
+ else:
+ cur[key] += value
+ except:
+ continue
+
+ return cur
+
+ def write_trace(self, frame: Dict[str, Any]) -> None:
+ trace_file = self.output / f"{frame['name']}.{datetime.now().strftime('%Y%m%d.%H%M%S')}.tracy"
+
+ v = importlib.metadata.version("prompty") # type: ignore
+ enriched_frame = {
+ "runtime": "python",
+ "version": v,
+ "trace": frame,
+ }
+
+ with open(trace_file, "w") as f:
+ json.dump(enriched_frame, f, indent=4)
+
+
+@contextlib.contextmanager
+def console_tracer(name: str) -> Iterator[Callable[[str, Any], None]]:
+ try:
+ print(f"Starting {name}")
+ yield lambda key, value: print(f"{key}:\n{json.dumps(to_dict(value), indent=4)}")
+ finally:
+ print(f"Ending {name}")
diff --git a/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_utils.py b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_utils.py
new file mode 100644
index 00000000..22f28418
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_utils.py
@@ -0,0 +1,100 @@
+# ------------------------------------
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+# ------------------------------------
+# mypy: disable-error-code="import-untyped,return-value"
+# pylint: disable=line-too-long,R,wrong-import-order,global-variable-not-assigned)
+import json
+import os
+import re
+import sys
+from typing import Any, Dict
+from pathlib import Path
+
+
+_yaml_regex = re.compile(
+ r"^\s*" + r"(?:---|\+\+\+)" + r"(.*?)" + r"(?:---|\+\+\+)" + r"\s*(.+)$",
+ re.S | re.M,
+)
+
+
+def load_text(file_path, encoding="utf-8"):
+ with open(file_path, "r", encoding=encoding) as file:
+ return file.read()
+
+
+def load_json(file_path, encoding="utf-8"):
+ return json.loads(load_text(file_path, encoding=encoding))
+
+
+def load_global_config(prompty_path: Path = Path.cwd(), configuration: str = "default") -> Dict[str, Any]:
+ prompty_config_path = prompty_path.joinpath("prompty.json")
+ if os.path.exists(prompty_config_path):
+ c = load_json(prompty_config_path)
+ if configuration in c:
+ return c[configuration]
+ else:
+ raise ValueError(f'Item "{configuration}" not found in "{prompty_config_path}"')
+ else:
+ return {}
+
+
+def load_prompty(file_path, encoding="utf-8") -> Dict[str, Any]:
+ contents = load_text(file_path, encoding=encoding)
+ return parse(contents)
+
+
+def parse(contents):
+ try:
+ import yaml # type: ignore
+ except ImportError as exc:
+ raise ImportError("Please install pyyaml to use this function. Run `pip install pyyaml`.") from exc
+
+ global _yaml_regex
+
+ fmatter = ""
+ body = ""
+ result = _yaml_regex.search(contents)
+
+ if result:
+ fmatter = result.group(1)
+ body = result.group(2)
+ return {
+ "attributes": yaml.load(fmatter, Loader=yaml.SafeLoader),
+ "body": body,
+ "frontmatter": fmatter,
+ }
+
+
+def remove_leading_empty_space(multiline_str: str) -> str:
+ """
+ Processes a multiline string by:
+ 1. Removing empty lines
+ 2. Finding the minimum leading spaces
+ 3. Indenting all lines to the minimum level
+
+ :param multiline_str: The input multiline string.
+ :type multiline_str: str
+ :return: The processed multiline string.
+ :rtype: str
+ """
+ lines = multiline_str.splitlines()
+ start_index = 0
+ while start_index < len(lines) and lines[start_index].strip() == "":
+ start_index += 1
+
+ # Find the minimum number of leading spaces
+ min_spaces = sys.maxsize
+ for line in lines[start_index:]:
+ if len(line.strip()) == 0:
+ continue
+ spaces = len(line) - len(line.lstrip())
+ spaces += line.lstrip().count("\t") * 2 # Count tabs as 2 spaces
+ min_spaces = min(min_spaces, spaces)
+
+ # Remove leading spaces and indent to the minimum level
+ processed_lines = []
+ for line in lines[start_index:]:
+ processed_lines.append(line[min_spaces:])
+
+ return "\n".join(processed_lines)