1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
import os
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
import litellm
from litellm._logging import verbose_logger
from litellm.integrations.custom_logger import CustomLogger
from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client,
httpxSpecialProvider,
)
# from here: https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-observe/how-to/logging-data-via-restful-apis#structuring-your-records
class LLMResponse(BaseModel):
latency_ms: int
status_code: int
input_text: str
output_text: str
node_type: str
model: str
num_input_tokens: int
num_output_tokens: int
output_logprobs: Optional[Dict[str, Any]] = Field(
default=None,
description="Optional. When available, logprobs are used to compute Uncertainty.",
)
created_at: str = Field(
..., description='timestamp constructed in "%Y-%m-%dT%H:%M:%S" format'
)
tags: Optional[List[str]] = None
user_metadata: Optional[Dict[str, Any]] = None
class GalileoObserve(CustomLogger):
def __init__(self) -> None:
self.in_memory_records: List[dict] = []
self.batch_size = 1
self.base_url = os.getenv("GALILEO_BASE_URL", None)
self.project_id = os.getenv("GALILEO_PROJECT_ID", None)
self.headers: Optional[Dict[str, str]] = None
self.async_httpx_handler = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)
pass
def set_galileo_headers(self):
# following https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-observe/how-to/logging-data-via-restful-apis#logging-your-records
headers = {
"accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
}
galileo_login_response = litellm.module_level_client.post(
url=f"{self.base_url}/login",
headers=headers,
data={
"username": os.getenv("GALILEO_USERNAME"),
"password": os.getenv("GALILEO_PASSWORD"),
},
)
access_token = galileo_login_response.json()["access_token"]
self.headers = {
"accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}",
}
def get_output_str_from_response(self, response_obj, kwargs):
output = None
if response_obj is not None and (
kwargs.get("call_type", None) == "embedding"
or isinstance(response_obj, litellm.EmbeddingResponse)
):
output = None
elif response_obj is not None and isinstance(
response_obj, litellm.ModelResponse
):
output = response_obj["choices"][0]["message"].json()
elif response_obj is not None and isinstance(
response_obj, litellm.TextCompletionResponse
):
output = response_obj.choices[0].text
elif response_obj is not None and isinstance(
response_obj, litellm.ImageResponse
):
output = response_obj["data"]
return output
async def async_log_success_event(
self, kwargs: Any, response_obj: Any, start_time: Any, end_time: Any
):
verbose_logger.debug("On Async Success")
_latency_ms = int((end_time - start_time).total_seconds() * 1000)
_call_type = kwargs.get("call_type", "litellm")
input_text = litellm.utils.get_formatted_prompt(
data=kwargs, call_type=_call_type
)
_usage = response_obj.get("usage", {}) or {}
num_input_tokens = _usage.get("prompt_tokens", 0)
num_output_tokens = _usage.get("completion_tokens", 0)
output_text = self.get_output_str_from_response(
response_obj=response_obj, kwargs=kwargs
)
if output_text is not None:
request_record = LLMResponse(
latency_ms=_latency_ms,
status_code=200,
input_text=input_text,
output_text=output_text,
node_type=_call_type,
model=kwargs.get("model", "-"),
num_input_tokens=num_input_tokens,
num_output_tokens=num_output_tokens,
created_at=start_time.strftime(
"%Y-%m-%dT%H:%M:%S"
), # timestamp str constructed in "%Y-%m-%dT%H:%M:%S" format
)
# dump to dict
request_dict = request_record.model_dump()
self.in_memory_records.append(request_dict)
if len(self.in_memory_records) >= self.batch_size:
await self.flush_in_memory_records()
async def flush_in_memory_records(self):
verbose_logger.debug("flushing in memory records")
response = await self.async_httpx_handler.post(
url=f"{self.base_url}/projects/{self.project_id}/observe/ingest",
headers=self.headers,
json={"records": self.in_memory_records},
)
if response.status_code == 200:
verbose_logger.debug(
"Galileo Logger:successfully flushed in memory records"
)
self.in_memory_records = []
else:
verbose_logger.debug("Galileo Logger: failed to flush in memory records")
verbose_logger.debug(
"Galileo Logger error=%s, status code=%s",
response.text,
response.status_code,
)
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
verbose_logger.debug("On Async Failure")
|