diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py | 183 |
1 files changed, 183 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py new file mode 100644 index 00000000..cf6a2721 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py @@ -0,0 +1,183 @@ +import asyncio +import datetime +import json +from typing import Any, Dict, List, Optional, TypedDict +from uuid import uuid4 + +import grpc +from google.protobuf import timestamp_pb2 + +from hatchet_sdk.clients.rest.tenacity_utils import tenacity_retry +from hatchet_sdk.contracts.events_pb2 import ( + BulkPushEventRequest, + Event, + PushEventRequest, + PutLogRequest, + PutStreamEventRequest, +) +from hatchet_sdk.contracts.events_pb2_grpc import EventsServiceStub +from hatchet_sdk.utils.serialization import flatten + +from ..loader import ClientConfig +from ..metadata import get_metadata + + +def new_event(conn, config: ClientConfig): + return EventClient( + client=EventsServiceStub(conn), + config=config, + ) + + +def proto_timestamp_now(): + t = datetime.datetime.now().timestamp() + seconds = int(t) + nanos = int(t % 1 * 1e9) + + return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos) + + +class PushEventOptions(TypedDict, total=False): + additional_metadata: Dict[str, str] | None = None + namespace: str | None = None + + +class BulkPushEventOptions(TypedDict, total=False): + namespace: str | None = None + + +class BulkPushEventWithMetadata(TypedDict, total=False): + key: str + payload: Any + additional_metadata: Optional[Dict[str, Any]] # Optional metadata + + +class EventClient: + def __init__(self, client: EventsServiceStub, config: ClientConfig): + self.client = client + self.token = config.token + self.namespace = config.namespace + + async def async_push( + self, event_key, payload, options: Optional[PushEventOptions] = None + ) -> Event: + return await asyncio.to_thread( + self.push, event_key=event_key, payload=payload, options=options + ) + + async def async_bulk_push( + self, + events: List[BulkPushEventWithMetadata], + options: Optional[BulkPushEventOptions] = None, + ) -> List[Event]: + return await asyncio.to_thread(self.bulk_push, events=events, options=options) + + ## IMPORTANT: Keep this method's signature in sync with the wrapper in the OTel instrumentor + @tenacity_retry + def push(self, event_key, payload, options: PushEventOptions = None) -> Event: + namespace = self.namespace + + if ( + options is not None + and "namespace" in options + and options["namespace"] is not None + ): + namespace = options.pop("namespace") + + namespaced_event_key = namespace + event_key + + try: + meta = dict() if options is None else options["additional_metadata"] + meta_bytes = None if meta is None else json.dumps(meta).encode("utf-8") + except Exception as e: + raise ValueError(f"Error encoding meta: {e}") + + try: + payload_bytes = json.dumps(payload).encode("utf-8") + except json.UnicodeEncodeError as e: + raise ValueError(f"Error encoding payload: {e}") + + request = PushEventRequest( + key=namespaced_event_key, + payload=payload_bytes, + eventTimestamp=proto_timestamp_now(), + additionalMetadata=meta_bytes, + ) + + return self.client.Push(request, metadata=get_metadata(self.token)) + + ## IMPORTANT: Keep this method's signature in sync with the wrapper in the OTel instrumentor + @tenacity_retry + def bulk_push( + self, + events: List[BulkPushEventWithMetadata], + options: BulkPushEventOptions = None, + ) -> List[Event]: + namespace = self.namespace + + if ( + options is not None + and "namespace" in options + and options["namespace"] is not None + ): + namespace = options.pop("namespace") + + bulk_events = [] + for event in events: + event_key = namespace + event["key"] + payload = event["payload"] + + try: + meta = event.get("additional_metadata", {}) + meta_bytes = json.dumps(meta).encode("utf-8") if meta else None + except Exception as e: + raise ValueError(f"Error encoding meta: {e}") + + try: + payload_bytes = json.dumps(payload).encode("utf-8") + except json.UnicodeEncodeError as e: + raise ValueError(f"Error encoding payload: {e}") + + request = PushEventRequest( + key=event_key, + payload=payload_bytes, + eventTimestamp=proto_timestamp_now(), + additionalMetadata=meta_bytes, + ) + bulk_events.append(request) + + bulk_request = BulkPushEventRequest(events=bulk_events) + + response = self.client.BulkPush(bulk_request, metadata=get_metadata(self.token)) + + return response.events + + def log(self, message: str, step_run_id: str): + try: + request = PutLogRequest( + stepRunId=step_run_id, + createdAt=proto_timestamp_now(), + message=message, + ) + + self.client.PutLog(request, metadata=get_metadata(self.token)) + except Exception as e: + raise ValueError(f"Error logging: {e}") + + def stream(self, data: str | bytes, step_run_id: str): + try: + if isinstance(data, str): + data_bytes = data.encode("utf-8") + elif isinstance(data, bytes): + data_bytes = data + else: + raise ValueError("Invalid data type. Expected str, bytes, or file.") + + request = PutStreamEventRequest( + stepRunId=step_run_id, + createdAt=proto_timestamp_now(), + message=data_bytes, + ) + self.client.PutStreamEvent(request, metadata=get_metadata(self.token)) + except Exception as e: + raise ValueError(f"Error putting stream event: {e}") |