aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/azure/ai/inference/prompts/_parsers.py
blob: de3c570e5c897891d10f744d2dd35ff6ef63b68f (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
# mypy: disable-error-code="union-attr,return-value"
# pylint: disable=line-too-long,R,consider-using-enumerate,docstring-missing-param,docstring-missing-return,docstring-missing-rtype
import re
import base64
from pathlib import Path
from typing import Any, Union
from ._core import Prompty
from ._invoker import Invoker, InvokerFactory


ROLES = ["assistant", "function", "system", "user"]


@InvokerFactory.register_parser("prompty.chat")
class PromptyChatParser(Invoker):
    """Prompty Chat Parser"""

    def __init__(self, prompty: Prompty) -> None:
        super().__init__(prompty)
        self.path = Path(self.prompty.file).parent

    def invoke(self, data: str) -> Any:
        return invoke_parser(self.path, data)

    async def invoke_async(self, data: str) -> Any:
        """Invoke the Prompty Chat Parser (Async)

        Parameters
        ----------
        data : str
            The data to parse

        Returns
        -------
        str
            The parsed data
        """
        return self.invoke(data)


def _inline_image(path: Union[Path, None], image_item: str) -> str:
    """Inline Image

    Parameters
    ----------
    image_item : str
        The image item to inline

    Returns
    -------
    str
        The inlined image
    """
    # pass through if it's a url or base64 encoded or the path is None
    if image_item.startswith("http") or image_item.startswith("data") or path is None:
        return image_item
    # otherwise, it's a local file - need to base64 encode it
    else:
        image_path = (path if path is not None else Path(".")) / image_item
        with open(image_path, "rb") as f:
            base64_image = base64.b64encode(f.read()).decode("utf-8")

        if image_path.suffix == ".png":
            return f"data:image/png;base64,{base64_image}"
        elif image_path.suffix == ".jpg":
            return f"data:image/jpeg;base64,{base64_image}"
        elif image_path.suffix == ".jpeg":
            return f"data:image/jpeg;base64,{base64_image}"
        else:
            raise ValueError(
                f"Invalid image format {image_path.suffix} - currently only .png and .jpg / .jpeg are supported."
            )


def _parse_content(path: Union[Path, None], content: str):
    """for parsing inline images

    Parameters
    ----------
    content : str
        The content to parse

    Returns
    -------
    any
        The parsed content
    """
    # regular expression to parse markdown images
    image = r"(?P<alt>!\[[^\]]*\])\((?P<filename>.*?)(?=\"|\))\)"
    matches = re.findall(image, content, flags=re.MULTILINE)
    if len(matches) > 0:
        content_items = []
        content_chunks = re.split(image, content, flags=re.MULTILINE)
        current_chunk = 0
        for i in range(len(content_chunks)):
            # image entry
            if current_chunk < len(matches) and content_chunks[i] == matches[current_chunk][0]:
                content_items.append(
                    {
                        "type": "image_url",
                        "image_url": {"url": _inline_image(path, matches[current_chunk][1].split(" ")[0].strip())},
                    }
                )
            # second part of image entry
            elif current_chunk < len(matches) and content_chunks[i] == matches[current_chunk][1]:
                current_chunk += 1
            # text entry
            else:
                if len(content_chunks[i].strip()) > 0:
                    content_items.append({"type": "text", "text": content_chunks[i].strip()})
        return content_items
    else:
        return content


def invoke_parser(path: Union[Path, None], data: str) -> Any:
    """Invoke the Prompty Chat Parser

    Parameters
    ----------
    data : str
        The data to parse

    Returns
    -------
    str
        The parsed data
    """
    messages = []
    separator = r"(?i)^\s*#?\s*(" + "|".join(ROLES) + r")\s*:\s*\n"

    # get valid chunks - remove empty items
    chunks = [item for item in re.split(separator, data, flags=re.MULTILINE) if len(item.strip()) > 0]

    # if no starter role, then inject system role
    if not chunks[0].strip().lower() in ROLES:
        chunks.insert(0, "system")

    # if last chunk is role entry, then remove (no content?)
    if chunks[-1].strip().lower() in ROLES:
        chunks.pop()

    if len(chunks) % 2 != 0:
        raise ValueError("Invalid prompt format")

    # create messages
    for i in range(0, len(chunks), 2):
        role = chunks[i].strip().lower()
        content = chunks[i + 1].strip()
        messages.append({"role": role, "content": _parse_content(path, content)})

    return messages