about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/hatchet_sdk/opentelemetry
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/opentelemetry')
-rw-r--r--.venv/lib/python3.12/site-packages/hatchet_sdk/opentelemetry/instrumentor.py396
1 files changed, 396 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/opentelemetry/instrumentor.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/opentelemetry/instrumentor.py
new file mode 100644
index 00000000..91474c52
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/opentelemetry/instrumentor.py
@@ -0,0 +1,396 @@
+from importlib.metadata import version
+from typing import Any, Callable, Collection, Coroutine
+
+try:
+    from opentelemetry.context import Context
+    from opentelemetry.instrumentation.instrumentor import (  # type: ignore[attr-defined]
+        BaseInstrumentor,
+    )
+    from opentelemetry.instrumentation.utils import unwrap
+    from opentelemetry.metrics import MeterProvider, NoOpMeterProvider, get_meter
+    from opentelemetry.trace import (
+        NoOpTracerProvider,
+        StatusCode,
+        TracerProvider,
+        get_tracer,
+        get_tracer_provider,
+    )
+    from opentelemetry.trace.propagation.tracecontext import (
+        TraceContextTextMapPropagator,
+    )
+    from wrapt import wrap_function_wrapper  # type: ignore[import-untyped]
+except (RuntimeError, ImportError, ModuleNotFoundError):
+    raise ModuleNotFoundError(
+        "To use the HatchetInstrumentor, you must install Hatchet's `otel` extra using (e.g.) `pip install hatchet-sdk[otel]`"
+    )
+
+import hatchet_sdk
+from hatchet_sdk.clients.admin import (
+    AdminClient,
+    TriggerWorkflowOptions,
+    WorkflowRunDict,
+)
+from hatchet_sdk.clients.dispatcher.action_listener import Action
+from hatchet_sdk.clients.events import (
+    BulkPushEventWithMetadata,
+    EventClient,
+    PushEventOptions,
+)
+from hatchet_sdk.contracts.events_pb2 import Event
+from hatchet_sdk.worker.runner.runner import Runner
+from hatchet_sdk.workflow_run import WorkflowRunRef
+
+hatchet_sdk_version = version("hatchet-sdk")
+
+InstrumentKwargs = TracerProvider | MeterProvider | None
+
+OTEL_TRACEPARENT_KEY = "traceparent"
+
+
+def create_traceparent() -> str | None:
+    """
+    Creates and returns a W3C traceparent header value using OpenTelemetry's context propagation.
+
+    The traceparent header is used to propagate context information across service boundaries
+    in distributed tracing systems. It follows the W3C Trace Context specification.
+
+    :returns: A W3C-formatted traceparent header value if successful, None if the context
+                    injection fails or no active span exists.\n
+                    Example: `00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01`
+    :rtype: str | None:
+    """
+
+    carrier: dict[str, str] = {}
+    TraceContextTextMapPropagator().inject(carrier)
+
+    return carrier.get("traceparent")
+
+
+def parse_carrier_from_metadata(metadata: dict[str, str] | None) -> Context | None:
+    """
+    Parses OpenTelemetry trace context from a metadata dictionary.
+
+    Extracts the trace context from metadata using the W3C Trace Context format,
+    specifically looking for the `traceparent` header.
+
+    :param metadata: A dictionary containing metadata key-value pairs,
+                     potentially including the `traceparent` header. Can be None.
+    :type metadata: dict[str, str] | None
+    :returns: The extracted OpenTelemetry Context object if a valid `traceparent`
+              is found in the metadata, otherwise None.
+    :rtype: Context | None
+
+    :Example:
+
+    >>> metadata = {"traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"}
+    >>> context = parse_carrier_from_metadata(metadata)
+    """
+
+    if not metadata:
+        return None
+
+    traceparent = metadata.get(OTEL_TRACEPARENT_KEY)
+
+    if not traceparent:
+        return None
+
+    return TraceContextTextMapPropagator().extract({OTEL_TRACEPARENT_KEY: traceparent})
+
+
+def inject_traceparent_into_metadata(
+    metadata: dict[str, str], traceparent: str | None = None
+) -> dict[str, str]:
+    """
+    Injects OpenTelemetry `traceparent` into a metadata dictionary.
+
+    Takes a metadata dictionary and an optional `traceparent` string,
+    returning a new metadata dictionary with the `traceparent` added under the
+    `OTEL_TRACEPARENT_KEY`. If no `traceparent` is provided, it attempts to create one.
+
+    :param metadata: The metadata dictionary to inject the `traceparent` into.
+    :type metadata: dict[str, str]
+    :param traceparent: The `traceparent` string to inject. If None, attempts to use
+                        the current span.
+    :type traceparent: str | None, optional
+    :returns: A new metadata dictionary containing the original metadata plus
+              the injected `traceparent`, if one was available or could be created.
+    :rtype: dict[str, str]
+
+    :Example:
+
+    >>> metadata = {"key": "value"}
+    >>> new_metadata = inject_traceparent(metadata, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01")
+    >>> print(new_metadata)
+    {"key": "value", "traceparent": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}
+    """
+
+    if not traceparent:
+        traceparent = create_traceparent()
+
+    if not traceparent:
+        return metadata
+
+    return {
+        **metadata,
+        OTEL_TRACEPARENT_KEY: traceparent,
+    }
+
+
+class HatchetInstrumentor(BaseInstrumentor):  # type: ignore[misc]
+    def __init__(
+        self,
+        tracer_provider: TracerProvider | None = None,
+        meter_provider: MeterProvider | None = None,
+    ):
+        """
+        Hatchet OpenTelemetry instrumentor.
+
+        The instrumentor provides an OpenTelemetry integration for Hatchet by setting up
+        tracing and metrics collection.
+
+        :param tracer_provider: TracerProvider | None: The OpenTelemetry TracerProvider to use.
+                If not provided, the global tracer provider will be used.
+        :param meter_provider: MeterProvider | None: The OpenTelemetry MeterProvider to use.
+                If not provided, a no-op meter provider will be used.
+        """
+
+        self.tracer_provider = tracer_provider or get_tracer_provider()
+        self.meter_provider = meter_provider or NoOpMeterProvider()
+
+        super().__init__()
+
+    def instrumentation_dependencies(self) -> Collection[str]:
+        return tuple()
+
+    def _instrument(self, **kwargs: InstrumentKwargs) -> None:
+        self._tracer = get_tracer(__name__, hatchet_sdk_version, self.tracer_provider)
+        self._meter = get_meter(__name__, hatchet_sdk_version, self.meter_provider)
+
+        wrap_function_wrapper(
+            hatchet_sdk,
+            "worker.runner.runner.Runner.handle_start_step_run",
+            self._wrap_handle_start_step_run,
+        )
+        wrap_function_wrapper(
+            hatchet_sdk,
+            "worker.runner.runner.Runner.handle_start_group_key_run",
+            self._wrap_handle_get_group_key_run,
+        )
+        wrap_function_wrapper(
+            hatchet_sdk,
+            "worker.runner.runner.Runner.handle_cancel_action",
+            self._wrap_handle_cancel_action,
+        )
+
+        wrap_function_wrapper(
+            hatchet_sdk,
+            "clients.events.EventClient.push",
+            self._wrap_push_event,
+        )
+
+        wrap_function_wrapper(
+            hatchet_sdk,
+            "clients.events.EventClient.bulk_push",
+            self._wrap_bulk_push_event,
+        )
+
+        wrap_function_wrapper(
+            hatchet_sdk,
+            "clients.admin.AdminClient.run_workflow",
+            self._wrap_run_workflow,
+        )
+
+        wrap_function_wrapper(
+            hatchet_sdk,
+            "clients.admin.AdminClientAioImpl.run_workflow",
+            self._wrap_async_run_workflow,
+        )
+
+        wrap_function_wrapper(
+            hatchet_sdk,
+            "clients.admin.AdminClient.run_workflows",
+            self._wrap_run_workflows,
+        )
+
+        wrap_function_wrapper(
+            hatchet_sdk,
+            "clients.admin.AdminClientAioImpl.run_workflows",
+            self._wrap_async_run_workflows,
+        )
+
+    ## IMPORTANT: Keep these types in sync with the wrapped method's signature
+    async def _wrap_handle_start_step_run(
+        self,
+        wrapped: Callable[[Action], Coroutine[None, None, Exception | None]],
+        instance: Runner,
+        args: tuple[Action],
+        kwargs: Any,
+    ) -> Exception | None:
+        action = args[0]
+        traceparent = parse_carrier_from_metadata(action.additional_metadata)
+
+        with self._tracer.start_as_current_span(
+            "hatchet.start_step_run",
+            attributes=action.otel_attributes,
+            context=traceparent,
+        ) as span:
+            result = await wrapped(*args, **kwargs)
+
+            if isinstance(result, Exception):
+                span.set_status(StatusCode.ERROR, str(result))
+
+            return result
+
+    ## IMPORTANT: Keep these types in sync with the wrapped method's signature
+    async def _wrap_handle_get_group_key_run(
+        self,
+        wrapped: Callable[[Action], Coroutine[None, None, Exception | None]],
+        instance: Runner,
+        args: tuple[Action],
+        kwargs: Any,
+    ) -> Exception | None:
+        action = args[0]
+
+        with self._tracer.start_as_current_span(
+            "hatchet.get_group_key_run",
+            attributes=action.otel_attributes,
+        ) as span:
+            result = await wrapped(*args, **kwargs)
+
+            if isinstance(result, Exception):
+                span.set_status(StatusCode.ERROR, str(result))
+
+            return result
+
+    ## IMPORTANT: Keep these types in sync with the wrapped method's signature
+    async def _wrap_handle_cancel_action(
+        self,
+        wrapped: Callable[[str], Coroutine[None, None, Exception | None]],
+        instance: Runner,
+        args: tuple[str],
+        kwargs: Any,
+    ) -> Exception | None:
+        step_run_id = args[0]
+
+        with self._tracer.start_as_current_span(
+            "hatchet.cancel_step_run",
+            attributes={
+                "hatchet.step_run_id": step_run_id,
+            },
+        ):
+            return await wrapped(*args, **kwargs)
+
+    ## IMPORTANT: Keep these types in sync with the wrapped method's signature
+    def _wrap_push_event(
+        self,
+        wrapped: Callable[[str, dict[str, Any], PushEventOptions | None], Event],
+        instance: EventClient,
+        args: tuple[
+            str,
+            dict[str, Any],
+            PushEventOptions | None,
+        ],
+        kwargs: dict[str, str | dict[str, Any] | PushEventOptions | None],
+    ) -> Event:
+        with self._tracer.start_as_current_span(
+            "hatchet.push_event",
+        ):
+            return wrapped(*args, **kwargs)
+
+    ## IMPORTANT: Keep these types in sync with the wrapped method's signature
+    def _wrap_bulk_push_event(
+        self,
+        wrapped: Callable[
+            [list[BulkPushEventWithMetadata], PushEventOptions | None], list[Event]
+        ],
+        instance: EventClient,
+        args: tuple[
+            list[BulkPushEventWithMetadata],
+            PushEventOptions | None,
+        ],
+        kwargs: dict[str, list[BulkPushEventWithMetadata] | PushEventOptions | None],
+    ) -> list[Event]:
+        with self._tracer.start_as_current_span(
+            "hatchet.bulk_push_event",
+        ):
+            return wrapped(*args, **kwargs)
+
+    ## IMPORTANT: Keep these types in sync with the wrapped method's signature
+    def _wrap_run_workflow(
+        self,
+        wrapped: Callable[[str, Any, TriggerWorkflowOptions | None], WorkflowRunRef],
+        instance: AdminClient,
+        args: tuple[str, Any, TriggerWorkflowOptions | None],
+        kwargs: dict[str, str | Any | TriggerWorkflowOptions | None],
+    ) -> WorkflowRunRef:
+        with self._tracer.start_as_current_span(
+            "hatchet.run_workflow",
+        ):
+            return wrapped(*args, **kwargs)
+
+    ## IMPORTANT: Keep these types in sync with the wrapped method's signature
+    async def _wrap_async_run_workflow(
+        self,
+        wrapped: Callable[
+            [str, Any, TriggerWorkflowOptions | None],
+            Coroutine[None, None, WorkflowRunRef],
+        ],
+        instance: AdminClient,
+        args: tuple[str, Any, TriggerWorkflowOptions | None],
+        kwargs: dict[str, str | Any | TriggerWorkflowOptions | None],
+    ) -> WorkflowRunRef:
+        with self._tracer.start_as_current_span(
+            "hatchet.run_workflow",
+        ):
+            return await wrapped(*args, **kwargs)
+
+    ## IMPORTANT: Keep these types in sync with the wrapped method's signature
+    def _wrap_run_workflows(
+        self,
+        wrapped: Callable[
+            [list[WorkflowRunDict], TriggerWorkflowOptions | None], list[WorkflowRunRef]
+        ],
+        instance: AdminClient,
+        args: tuple[
+            list[WorkflowRunDict],
+            TriggerWorkflowOptions | None,
+        ],
+        kwargs: dict[str, list[WorkflowRunDict] | TriggerWorkflowOptions | None],
+    ) -> list[WorkflowRunRef]:
+        with self._tracer.start_as_current_span(
+            "hatchet.run_workflows",
+        ):
+            return wrapped(*args, **kwargs)
+
+    ## IMPORTANT: Keep these types in sync with the wrapped method's signature
+    async def _wrap_async_run_workflows(
+        self,
+        wrapped: Callable[
+            [list[WorkflowRunDict], TriggerWorkflowOptions | None],
+            Coroutine[None, None, list[WorkflowRunRef]],
+        ],
+        instance: AdminClient,
+        args: tuple[
+            list[WorkflowRunDict],
+            TriggerWorkflowOptions | None,
+        ],
+        kwargs: dict[str, list[WorkflowRunDict] | TriggerWorkflowOptions | None],
+    ) -> list[WorkflowRunRef]:
+        with self._tracer.start_as_current_span(
+            "hatchet.run_workflows",
+        ):
+            return await wrapped(*args, **kwargs)
+
+    def _uninstrument(self, **kwargs: InstrumentKwargs) -> None:
+        self.tracer_provider = NoOpTracerProvider()
+        self.meter_provider = NoOpMeterProvider()
+
+        unwrap(hatchet_sdk, "worker.runner.runner.Runner.handle_start_step_run")
+        unwrap(hatchet_sdk, "worker.runner.runner.Runner.handle_start_group_key_run")
+        unwrap(hatchet_sdk, "worker.runner.runner.Runner.handle_cancel_action")
+        unwrap(hatchet_sdk, "clients.events.EventClient.push")
+        unwrap(hatchet_sdk, "clients.events.EventClient.bulk_push")
+        unwrap(hatchet_sdk, "clients.admin.AdminClient.run_workflow")
+        unwrap(hatchet_sdk, "clients.admin.AdminClientAioImpl.run_workflow")
+        unwrap(hatchet_sdk, "clients.admin.AdminClient.run_workflows")
+        unwrap(hatchet_sdk, "clients.admin.AdminClientAioImpl.run_workflows")