aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py')
-rw-r--r--.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py183
1 files changed, 183 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py
new file mode 100644
index 00000000..cf6a2721
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py
@@ -0,0 +1,183 @@
+import asyncio
+import datetime
+import json
+from typing import Any, Dict, List, Optional, TypedDict
+from uuid import uuid4
+
+import grpc
+from google.protobuf import timestamp_pb2
+
+from hatchet_sdk.clients.rest.tenacity_utils import tenacity_retry
+from hatchet_sdk.contracts.events_pb2 import (
+ BulkPushEventRequest,
+ Event,
+ PushEventRequest,
+ PutLogRequest,
+ PutStreamEventRequest,
+)
+from hatchet_sdk.contracts.events_pb2_grpc import EventsServiceStub
+from hatchet_sdk.utils.serialization import flatten
+
+from ..loader import ClientConfig
+from ..metadata import get_metadata
+
+
+def new_event(conn, config: ClientConfig):
+ return EventClient(
+ client=EventsServiceStub(conn),
+ config=config,
+ )
+
+
+def proto_timestamp_now():
+ t = datetime.datetime.now().timestamp()
+ seconds = int(t)
+ nanos = int(t % 1 * 1e9)
+
+ return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
+
+
+class PushEventOptions(TypedDict, total=False):
+ additional_metadata: Dict[str, str] | None = None
+ namespace: str | None = None
+
+
+class BulkPushEventOptions(TypedDict, total=False):
+ namespace: str | None = None
+
+
+class BulkPushEventWithMetadata(TypedDict, total=False):
+ key: str
+ payload: Any
+ additional_metadata: Optional[Dict[str, Any]] # Optional metadata
+
+
+class EventClient:
+ def __init__(self, client: EventsServiceStub, config: ClientConfig):
+ self.client = client
+ self.token = config.token
+ self.namespace = config.namespace
+
+ async def async_push(
+ self, event_key, payload, options: Optional[PushEventOptions] = None
+ ) -> Event:
+ return await asyncio.to_thread(
+ self.push, event_key=event_key, payload=payload, options=options
+ )
+
+ async def async_bulk_push(
+ self,
+ events: List[BulkPushEventWithMetadata],
+ options: Optional[BulkPushEventOptions] = None,
+ ) -> List[Event]:
+ return await asyncio.to_thread(self.bulk_push, events=events, options=options)
+
+ ## IMPORTANT: Keep this method's signature in sync with the wrapper in the OTel instrumentor
+ @tenacity_retry
+ def push(self, event_key, payload, options: PushEventOptions = None) -> Event:
+ namespace = self.namespace
+
+ if (
+ options is not None
+ and "namespace" in options
+ and options["namespace"] is not None
+ ):
+ namespace = options.pop("namespace")
+
+ namespaced_event_key = namespace + event_key
+
+ try:
+ meta = dict() if options is None else options["additional_metadata"]
+ meta_bytes = None if meta is None else json.dumps(meta).encode("utf-8")
+ except Exception as e:
+ raise ValueError(f"Error encoding meta: {e}")
+
+ try:
+ payload_bytes = json.dumps(payload).encode("utf-8")
+ except json.UnicodeEncodeError as e:
+ raise ValueError(f"Error encoding payload: {e}")
+
+ request = PushEventRequest(
+ key=namespaced_event_key,
+ payload=payload_bytes,
+ eventTimestamp=proto_timestamp_now(),
+ additionalMetadata=meta_bytes,
+ )
+
+ return self.client.Push(request, metadata=get_metadata(self.token))
+
+ ## IMPORTANT: Keep this method's signature in sync with the wrapper in the OTel instrumentor
+ @tenacity_retry
+ def bulk_push(
+ self,
+ events: List[BulkPushEventWithMetadata],
+ options: BulkPushEventOptions = None,
+ ) -> List[Event]:
+ namespace = self.namespace
+
+ if (
+ options is not None
+ and "namespace" in options
+ and options["namespace"] is not None
+ ):
+ namespace = options.pop("namespace")
+
+ bulk_events = []
+ for event in events:
+ event_key = namespace + event["key"]
+ payload = event["payload"]
+
+ try:
+ meta = event.get("additional_metadata", {})
+ meta_bytes = json.dumps(meta).encode("utf-8") if meta else None
+ except Exception as e:
+ raise ValueError(f"Error encoding meta: {e}")
+
+ try:
+ payload_bytes = json.dumps(payload).encode("utf-8")
+ except json.UnicodeEncodeError as e:
+ raise ValueError(f"Error encoding payload: {e}")
+
+ request = PushEventRequest(
+ key=event_key,
+ payload=payload_bytes,
+ eventTimestamp=proto_timestamp_now(),
+ additionalMetadata=meta_bytes,
+ )
+ bulk_events.append(request)
+
+ bulk_request = BulkPushEventRequest(events=bulk_events)
+
+ response = self.client.BulkPush(bulk_request, metadata=get_metadata(self.token))
+
+ return response.events
+
+ def log(self, message: str, step_run_id: str):
+ try:
+ request = PutLogRequest(
+ stepRunId=step_run_id,
+ createdAt=proto_timestamp_now(),
+ message=message,
+ )
+
+ self.client.PutLog(request, metadata=get_metadata(self.token))
+ except Exception as e:
+ raise ValueError(f"Error logging: {e}")
+
+ def stream(self, data: str | bytes, step_run_id: str):
+ try:
+ if isinstance(data, str):
+ data_bytes = data.encode("utf-8")
+ elif isinstance(data, bytes):
+ data_bytes = data
+ else:
+ raise ValueError("Invalid data type. Expected str, bytes, or file.")
+
+ request = PutStreamEventRequest(
+ stepRunId=step_run_id,
+ createdAt=proto_timestamp_now(),
+ message=data_bytes,
+ )
+ self.client.PutStreamEvent(request, metadata=get_metadata(self.token))
+ except Exception as e:
+ raise ValueError(f"Error putting stream event: {e}")