about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py')
-rw-r--r--.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py183
1 files changed, 183 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py
new file mode 100644
index 00000000..cf6a2721
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/clients/events.py
@@ -0,0 +1,183 @@
+import asyncio
+import datetime
+import json
+from typing import Any, Dict, List, Optional, TypedDict
+from uuid import uuid4
+
+import grpc
+from google.protobuf import timestamp_pb2
+
+from hatchet_sdk.clients.rest.tenacity_utils import tenacity_retry
+from hatchet_sdk.contracts.events_pb2 import (
+    BulkPushEventRequest,
+    Event,
+    PushEventRequest,
+    PutLogRequest,
+    PutStreamEventRequest,
+)
+from hatchet_sdk.contracts.events_pb2_grpc import EventsServiceStub
+from hatchet_sdk.utils.serialization import flatten
+
+from ..loader import ClientConfig
+from ..metadata import get_metadata
+
+
+def new_event(conn, config: ClientConfig):
+    return EventClient(
+        client=EventsServiceStub(conn),
+        config=config,
+    )
+
+
+def proto_timestamp_now():
+    t = datetime.datetime.now().timestamp()
+    seconds = int(t)
+    nanos = int(t % 1 * 1e9)
+
+    return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
+
+
+class PushEventOptions(TypedDict, total=False):
+    additional_metadata: Dict[str, str] | None = None
+    namespace: str | None = None
+
+
+class BulkPushEventOptions(TypedDict, total=False):
+    namespace: str | None = None
+
+
+class BulkPushEventWithMetadata(TypedDict, total=False):
+    key: str
+    payload: Any
+    additional_metadata: Optional[Dict[str, Any]]  # Optional metadata
+
+
+class EventClient:
+    def __init__(self, client: EventsServiceStub, config: ClientConfig):
+        self.client = client
+        self.token = config.token
+        self.namespace = config.namespace
+
+    async def async_push(
+        self, event_key, payload, options: Optional[PushEventOptions] = None
+    ) -> Event:
+        return await asyncio.to_thread(
+            self.push, event_key=event_key, payload=payload, options=options
+        )
+
+    async def async_bulk_push(
+        self,
+        events: List[BulkPushEventWithMetadata],
+        options: Optional[BulkPushEventOptions] = None,
+    ) -> List[Event]:
+        return await asyncio.to_thread(self.bulk_push, events=events, options=options)
+
+    ## IMPORTANT: Keep this method's signature in sync with the wrapper in the OTel instrumentor
+    @tenacity_retry
+    def push(self, event_key, payload, options: PushEventOptions = None) -> Event:
+        namespace = self.namespace
+
+        if (
+            options is not None
+            and "namespace" in options
+            and options["namespace"] is not None
+        ):
+            namespace = options.pop("namespace")
+
+        namespaced_event_key = namespace + event_key
+
+        try:
+            meta = dict() if options is None else options["additional_metadata"]
+            meta_bytes = None if meta is None else json.dumps(meta).encode("utf-8")
+        except Exception as e:
+            raise ValueError(f"Error encoding meta: {e}")
+
+        try:
+            payload_bytes = json.dumps(payload).encode("utf-8")
+        except json.UnicodeEncodeError as e:
+            raise ValueError(f"Error encoding payload: {e}")
+
+        request = PushEventRequest(
+            key=namespaced_event_key,
+            payload=payload_bytes,
+            eventTimestamp=proto_timestamp_now(),
+            additionalMetadata=meta_bytes,
+        )
+
+        return self.client.Push(request, metadata=get_metadata(self.token))
+
+    ## IMPORTANT: Keep this method's signature in sync with the wrapper in the OTel instrumentor
+    @tenacity_retry
+    def bulk_push(
+        self,
+        events: List[BulkPushEventWithMetadata],
+        options: BulkPushEventOptions = None,
+    ) -> List[Event]:
+        namespace = self.namespace
+
+        if (
+            options is not None
+            and "namespace" in options
+            and options["namespace"] is not None
+        ):
+            namespace = options.pop("namespace")
+
+        bulk_events = []
+        for event in events:
+            event_key = namespace + event["key"]
+            payload = event["payload"]
+
+            try:
+                meta = event.get("additional_metadata", {})
+                meta_bytes = json.dumps(meta).encode("utf-8") if meta else None
+            except Exception as e:
+                raise ValueError(f"Error encoding meta: {e}")
+
+            try:
+                payload_bytes = json.dumps(payload).encode("utf-8")
+            except json.UnicodeEncodeError as e:
+                raise ValueError(f"Error encoding payload: {e}")
+
+            request = PushEventRequest(
+                key=event_key,
+                payload=payload_bytes,
+                eventTimestamp=proto_timestamp_now(),
+                additionalMetadata=meta_bytes,
+            )
+            bulk_events.append(request)
+
+        bulk_request = BulkPushEventRequest(events=bulk_events)
+
+        response = self.client.BulkPush(bulk_request, metadata=get_metadata(self.token))
+
+        return response.events
+
+    def log(self, message: str, step_run_id: str):
+        try:
+            request = PutLogRequest(
+                stepRunId=step_run_id,
+                createdAt=proto_timestamp_now(),
+                message=message,
+            )
+
+            self.client.PutLog(request, metadata=get_metadata(self.token))
+        except Exception as e:
+            raise ValueError(f"Error logging: {e}")
+
+    def stream(self, data: str | bytes, step_run_id: str):
+        try:
+            if isinstance(data, str):
+                data_bytes = data.encode("utf-8")
+            elif isinstance(data, bytes):
+                data_bytes = data
+            else:
+                raise ValueError("Invalid data type. Expected str, bytes, or file.")
+
+            request = PutStreamEventRequest(
+                stepRunId=step_run_id,
+                createdAt=proto_timestamp_now(),
+                message=data_bytes,
+            )
+            self.client.PutStreamEvent(request, metadata=get_metadata(self.token))
+        except Exception as e:
+            raise ValueError(f"Error putting stream event: {e}")