1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
import asyncio
import datetime
import json
from typing import Any, Dict, List, Optional, TypedDict
from uuid import uuid4
import grpc
from google.protobuf import timestamp_pb2
from hatchet_sdk.clients.rest.tenacity_utils import tenacity_retry
from hatchet_sdk.contracts.events_pb2 import (
BulkPushEventRequest,
Event,
PushEventRequest,
PutLogRequest,
PutStreamEventRequest,
)
from hatchet_sdk.contracts.events_pb2_grpc import EventsServiceStub
from hatchet_sdk.utils.serialization import flatten
from ..loader import ClientConfig
from ..metadata import get_metadata
def new_event(conn, config: ClientConfig):
return EventClient(
client=EventsServiceStub(conn),
config=config,
)
def proto_timestamp_now():
t = datetime.datetime.now().timestamp()
seconds = int(t)
nanos = int(t % 1 * 1e9)
return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
class PushEventOptions(TypedDict, total=False):
additional_metadata: Dict[str, str] | None = None
namespace: str | None = None
class BulkPushEventOptions(TypedDict, total=False):
namespace: str | None = None
class BulkPushEventWithMetadata(TypedDict, total=False):
key: str
payload: Any
additional_metadata: Optional[Dict[str, Any]] # Optional metadata
class EventClient:
def __init__(self, client: EventsServiceStub, config: ClientConfig):
self.client = client
self.token = config.token
self.namespace = config.namespace
async def async_push(
self, event_key, payload, options: Optional[PushEventOptions] = None
) -> Event:
return await asyncio.to_thread(
self.push, event_key=event_key, payload=payload, options=options
)
async def async_bulk_push(
self,
events: List[BulkPushEventWithMetadata],
options: Optional[BulkPushEventOptions] = None,
) -> List[Event]:
return await asyncio.to_thread(self.bulk_push, events=events, options=options)
## IMPORTANT: Keep this method's signature in sync with the wrapper in the OTel instrumentor
@tenacity_retry
def push(self, event_key, payload, options: PushEventOptions = None) -> Event:
namespace = self.namespace
if (
options is not None
and "namespace" in options
and options["namespace"] is not None
):
namespace = options.pop("namespace")
namespaced_event_key = namespace + event_key
try:
meta = dict() if options is None else options["additional_metadata"]
meta_bytes = None if meta is None else json.dumps(meta).encode("utf-8")
except Exception as e:
raise ValueError(f"Error encoding meta: {e}")
try:
payload_bytes = json.dumps(payload).encode("utf-8")
except json.UnicodeEncodeError as e:
raise ValueError(f"Error encoding payload: {e}")
request = PushEventRequest(
key=namespaced_event_key,
payload=payload_bytes,
eventTimestamp=proto_timestamp_now(),
additionalMetadata=meta_bytes,
)
return self.client.Push(request, metadata=get_metadata(self.token))
## IMPORTANT: Keep this method's signature in sync with the wrapper in the OTel instrumentor
@tenacity_retry
def bulk_push(
self,
events: List[BulkPushEventWithMetadata],
options: BulkPushEventOptions = None,
) -> List[Event]:
namespace = self.namespace
if (
options is not None
and "namespace" in options
and options["namespace"] is not None
):
namespace = options.pop("namespace")
bulk_events = []
for event in events:
event_key = namespace + event["key"]
payload = event["payload"]
try:
meta = event.get("additional_metadata", {})
meta_bytes = json.dumps(meta).encode("utf-8") if meta else None
except Exception as e:
raise ValueError(f"Error encoding meta: {e}")
try:
payload_bytes = json.dumps(payload).encode("utf-8")
except json.UnicodeEncodeError as e:
raise ValueError(f"Error encoding payload: {e}")
request = PushEventRequest(
key=event_key,
payload=payload_bytes,
eventTimestamp=proto_timestamp_now(),
additionalMetadata=meta_bytes,
)
bulk_events.append(request)
bulk_request = BulkPushEventRequest(events=bulk_events)
response = self.client.BulkPush(bulk_request, metadata=get_metadata(self.token))
return response.events
def log(self, message: str, step_run_id: str):
try:
request = PutLogRequest(
stepRunId=step_run_id,
createdAt=proto_timestamp_now(),
message=message,
)
self.client.PutLog(request, metadata=get_metadata(self.token))
except Exception as e:
raise ValueError(f"Error logging: {e}")
def stream(self, data: str | bytes, step_run_id: str):
try:
if isinstance(data, str):
data_bytes = data.encode("utf-8")
elif isinstance(data, bytes):
data_bytes = data
else:
raise ValueError("Invalid data type. Expected str, bytes, or file.")
request = PutStreamEventRequest(
stepRunId=step_run_id,
createdAt=proto_timestamp_now(),
message=data_bytes,
)
self.client.PutStreamEvent(request, metadata=get_metadata(self.token))
except Exception as e:
raise ValueError(f"Error putting stream event: {e}")
|