about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/azure/core/messaging.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/azure/core/messaging.py')
-rw-r--r--.venv/lib/python3.12/site-packages/azure/core/messaging.py229
1 files changed, 229 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/azure/core/messaging.py b/.venv/lib/python3.12/site-packages/azure/core/messaging.py
new file mode 100644
index 00000000..a05739cd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/azure/core/messaging.py
@@ -0,0 +1,229 @@
+# coding=utf-8
+# --------------------------------------------------------------------------
+# Copyright (c) Microsoft Corporation. All rights reserved.
+# Licensed under the MIT License. See License.txt in the project root for
+# license information.
+# --------------------------------------------------------------------------
+from __future__ import annotations
+import uuid
+from base64 import b64decode
+from datetime import datetime
+from typing import cast, Union, Any, Optional, Dict, TypeVar, Generic
+from .utils._utils import _convert_to_isoformat, TZ_UTC
+from .utils._messaging_shared import _get_json_content
+from .serialization import NULL
+
+
+__all__ = ["CloudEvent"]
+
+
+_Unset: Any = object()
+
+DataType = TypeVar("DataType")
+
+
+class CloudEvent(Generic[DataType]):
+    """Properties of the CloudEvent 1.0 Schema.
+    All required parameters must be populated in order to send to Azure.
+
+    :param source: Required. Identifies the context in which an event happened. The combination of id and source must
+     be unique for each distinct event. If publishing to a domain topic, source must be the domain topic name.
+    :type source: str
+    :param type: Required. Type of event related to the originating occurrence.
+    :type type: str
+    :keyword specversion: Optional. The version of the CloudEvent spec. Defaults to "1.0"
+    :paramtype specversion: str
+    :keyword data: Optional. Event data specific to the event type.
+    :paramtype data: object
+    :keyword time: Optional. The time (in UTC) the event was generated.
+    :paramtype time: ~datetime.datetime
+    :keyword dataschema: Optional. Identifies the schema that data adheres to.
+    :paramtype dataschema: str
+    :keyword datacontenttype: Optional. Content type of data value.
+    :paramtype datacontenttype: str
+    :keyword subject: Optional. This describes the subject of the event in the context of the event producer
+     (identified by source).
+    :paramtype subject: str
+    :keyword id: Optional. An identifier for the event. The combination of id and source must be
+     unique for each distinct event. If not provided, a random UUID will be generated and used.
+    :paramtype id: Optional[str]
+    :keyword extensions: Optional. A CloudEvent MAY include any number of additional context attributes
+     with distinct names represented as key - value pairs. Each extension must be alphanumeric, lower cased
+     and must not exceed the length of 20 characters.
+    :paramtype extensions: Optional[dict]
+    """
+
+    source: str
+    """Identifies the context in which an event happened. The combination of id and source must
+       be unique for each distinct event. If publishing to a domain topic, source must be the domain topic name."""
+
+    type: str
+    """Type of event related to the originating occurrence."""
+
+    specversion: str = "1.0"
+    """The version of the CloudEvent spec. Defaults to "1.0" """
+
+    id: str
+    """An identifier for the event. The combination of id and source must be
+       unique for each distinct event. If not provided, a random UUID will be generated and used."""
+
+    data: Optional[DataType]
+    """Event data specific to the event type."""
+
+    time: Optional[datetime]
+    """The time (in UTC) the event was generated."""
+
+    dataschema: Optional[str]
+    """Identifies the schema that data adheres to."""
+
+    datacontenttype: Optional[str]
+    """Content type of data value."""
+
+    subject: Optional[str]
+    """This describes the subject of the event in the context of the event producer
+       (identified by source)"""
+
+    extensions: Optional[Dict[str, Any]]
+    """A CloudEvent MAY include any number of additional context attributes
+       with distinct names represented as key - value pairs. Each extension must be alphanumeric, lower cased
+       and must not exceed the length of 20 characters."""
+
+    def __init__(
+        self,
+        source: str,
+        type: str,  # pylint: disable=redefined-builtin
+        *,
+        specversion: Optional[str] = None,
+        id: Optional[str] = None,  # pylint: disable=redefined-builtin
+        time: Optional[datetime] = _Unset,
+        datacontenttype: Optional[str] = None,
+        dataschema: Optional[str] = None,
+        subject: Optional[str] = None,
+        data: Optional[DataType] = None,
+        extensions: Optional[Dict[str, Any]] = None,
+        **kwargs: Any,
+    ) -> None:
+        self.source: str = source
+        self.type: str = type
+
+        if specversion:
+            self.specversion: str = specversion
+        self.id: str = id if id else str(uuid.uuid4())
+
+        self.time: Optional[datetime]
+        if time is _Unset:
+            self.time = datetime.now(TZ_UTC)
+        else:
+            self.time = time
+
+        self.datacontenttype: Optional[str] = datacontenttype
+        self.dataschema: Optional[str] = dataschema
+        self.subject: Optional[str] = subject
+        self.data: Optional[DataType] = data
+
+        self.extensions: Optional[Dict[str, Any]] = extensions
+        if self.extensions:
+            for key in self.extensions.keys():
+                if not key.islower() or not key.isalnum():
+                    raise ValueError("Extension attributes should be lower cased and alphanumeric.")
+
+        if kwargs:
+            remaining = ", ".join(kwargs.keys())
+            raise ValueError(
+                f"Unexpected keyword arguments {remaining}. "
+                + "Any extension attributes must be passed explicitly using extensions."
+            )
+
+    def __repr__(self) -> str:
+        return "CloudEvent(source={}, type={}, specversion={}, id={}, time={})".format(
+            self.source, self.type, self.specversion, self.id, self.time
+        )[:1024]
+
+    @classmethod
+    def from_dict(cls, event: Dict[str, Any]) -> CloudEvent[DataType]:
+        """Returns the deserialized CloudEvent object when a dict is provided.
+
+        :param event: The dict representation of the event which needs to be deserialized.
+        :type event: dict
+        :rtype: CloudEvent
+        :return: The deserialized CloudEvent object.
+        """
+        kwargs: Dict[str, Any] = {}
+        reserved_attr = [
+            "data",
+            "data_base64",
+            "id",
+            "source",
+            "type",
+            "specversion",
+            "time",
+            "dataschema",
+            "datacontenttype",
+            "subject",
+        ]
+
+        if "data" in event and "data_base64" in event:
+            raise ValueError("Invalid input. Only one of data and data_base64 must be present.")
+
+        if "data" in event:
+            data = event.get("data")
+            kwargs["data"] = data if data is not None else NULL
+        elif "data_base64" in event:
+            kwargs["data"] = b64decode(cast(Union[str, bytes], event.get("data_base64")))
+
+        for item in ["datacontenttype", "dataschema", "subject"]:
+            if item in event:
+                val = event.get(item)
+                kwargs[item] = val if val is not None else NULL
+
+        extensions = {k: v for k, v in event.items() if k not in reserved_attr}
+        if extensions:
+            kwargs["extensions"] = extensions
+
+        try:
+            event_obj = cls(
+                id=event.get("id"),
+                source=event["source"],
+                type=event["type"],
+                specversion=event.get("specversion"),
+                time=_convert_to_isoformat(event.get("time")),
+                **kwargs,
+            )
+        except KeyError as err:
+            # https://github.com/cloudevents/spec Cloud event spec requires source, type,
+            # specversion. We autopopulate everything other than source, type.
+            # So we will assume the KeyError is coming from source/type access.
+            if all(
+                key in event
+                for key in (
+                    "subject",
+                    "eventType",
+                    "data",
+                    "dataVersion",
+                    "id",
+                    "eventTime",
+                )
+            ):
+                raise ValueError(
+                    "The event you are trying to parse follows the Eventgrid Schema. You can parse"
+                    + " EventGrid events using EventGridEvent.from_dict method in the azure-eventgrid library."
+                ) from err
+            raise ValueError(
+                "The event does not conform to the cloud event spec https://github.com/cloudevents/spec."
+                + " The `source` and `type` params are required."
+            ) from err
+        return event_obj
+
+    @classmethod
+    def from_json(cls, event: Any) -> CloudEvent[DataType]:
+        """Returns the deserialized CloudEvent object when a json payload is provided.
+
+        :param event: The json string that should be converted into a CloudEvent. This can also be
+         a storage QueueMessage, eventhub's EventData or ServiceBusMessage
+        :type event: object
+        :rtype: CloudEvent
+        :return: The deserialized CloudEvent object.
+        :raises ValueError: If the provided JSON is invalid.
+        """
+        dict_event = _get_json_content(event)
+        return CloudEvent.from_dict(dict_event)