1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
|
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from __future__ import annotations
import uuid
from base64 import b64decode
from datetime import datetime
from typing import cast, Union, Any, Optional, Dict, TypeVar, Generic
from .utils._utils import _convert_to_isoformat, TZ_UTC
from .utils._messaging_shared import _get_json_content
from .serialization import NULL
__all__ = ["CloudEvent"]
_Unset: Any = object()
DataType = TypeVar("DataType")
class CloudEvent(Generic[DataType]):
"""Properties of the CloudEvent 1.0 Schema.
All required parameters must be populated in order to send to Azure.
:param source: Required. Identifies the context in which an event happened. The combination of id and source must
be unique for each distinct event. If publishing to a domain topic, source must be the domain topic name.
:type source: str
:param type: Required. Type of event related to the originating occurrence.
:type type: str
:keyword specversion: Optional. The version of the CloudEvent spec. Defaults to "1.0"
:paramtype specversion: str
:keyword data: Optional. Event data specific to the event type.
:paramtype data: object
:keyword time: Optional. The time (in UTC) the event was generated.
:paramtype time: ~datetime.datetime
:keyword dataschema: Optional. Identifies the schema that data adheres to.
:paramtype dataschema: str
:keyword datacontenttype: Optional. Content type of data value.
:paramtype datacontenttype: str
:keyword subject: Optional. This describes the subject of the event in the context of the event producer
(identified by source).
:paramtype subject: str
:keyword id: Optional. An identifier for the event. The combination of id and source must be
unique for each distinct event. If not provided, a random UUID will be generated and used.
:paramtype id: Optional[str]
:keyword extensions: Optional. A CloudEvent MAY include any number of additional context attributes
with distinct names represented as key - value pairs. Each extension must be alphanumeric, lower cased
and must not exceed the length of 20 characters.
:paramtype extensions: Optional[dict]
"""
source: str
"""Identifies the context in which an event happened. The combination of id and source must
be unique for each distinct event. If publishing to a domain topic, source must be the domain topic name."""
type: str
"""Type of event related to the originating occurrence."""
specversion: str = "1.0"
"""The version of the CloudEvent spec. Defaults to "1.0" """
id: str
"""An identifier for the event. The combination of id and source must be
unique for each distinct event. If not provided, a random UUID will be generated and used."""
data: Optional[DataType]
"""Event data specific to the event type."""
time: Optional[datetime]
"""The time (in UTC) the event was generated."""
dataschema: Optional[str]
"""Identifies the schema that data adheres to."""
datacontenttype: Optional[str]
"""Content type of data value."""
subject: Optional[str]
"""This describes the subject of the event in the context of the event producer
(identified by source)"""
extensions: Optional[Dict[str, Any]]
"""A CloudEvent MAY include any number of additional context attributes
with distinct names represented as key - value pairs. Each extension must be alphanumeric, lower cased
and must not exceed the length of 20 characters."""
def __init__(
self,
source: str,
type: str, # pylint: disable=redefined-builtin
*,
specversion: Optional[str] = None,
id: Optional[str] = None, # pylint: disable=redefined-builtin
time: Optional[datetime] = _Unset,
datacontenttype: Optional[str] = None,
dataschema: Optional[str] = None,
subject: Optional[str] = None,
data: Optional[DataType] = None,
extensions: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
self.source: str = source
self.type: str = type
if specversion:
self.specversion: str = specversion
self.id: str = id if id else str(uuid.uuid4())
self.time: Optional[datetime]
if time is _Unset:
self.time = datetime.now(TZ_UTC)
else:
self.time = time
self.datacontenttype: Optional[str] = datacontenttype
self.dataschema: Optional[str] = dataschema
self.subject: Optional[str] = subject
self.data: Optional[DataType] = data
self.extensions: Optional[Dict[str, Any]] = extensions
if self.extensions:
for key in self.extensions.keys():
if not key.islower() or not key.isalnum():
raise ValueError("Extension attributes should be lower cased and alphanumeric.")
if kwargs:
remaining = ", ".join(kwargs.keys())
raise ValueError(
f"Unexpected keyword arguments {remaining}. "
+ "Any extension attributes must be passed explicitly using extensions."
)
def __repr__(self) -> str:
return "CloudEvent(source={}, type={}, specversion={}, id={}, time={})".format(
self.source, self.type, self.specversion, self.id, self.time
)[:1024]
@classmethod
def from_dict(cls, event: Dict[str, Any]) -> CloudEvent[DataType]:
"""Returns the deserialized CloudEvent object when a dict is provided.
:param event: The dict representation of the event which needs to be deserialized.
:type event: dict
:rtype: CloudEvent
:return: The deserialized CloudEvent object.
"""
kwargs: Dict[str, Any] = {}
reserved_attr = [
"data",
"data_base64",
"id",
"source",
"type",
"specversion",
"time",
"dataschema",
"datacontenttype",
"subject",
]
if "data" in event and "data_base64" in event:
raise ValueError("Invalid input. Only one of data and data_base64 must be present.")
if "data" in event:
data = event.get("data")
kwargs["data"] = data if data is not None else NULL
elif "data_base64" in event:
kwargs["data"] = b64decode(cast(Union[str, bytes], event.get("data_base64")))
for item in ["datacontenttype", "dataschema", "subject"]:
if item in event:
val = event.get(item)
kwargs[item] = val if val is not None else NULL
extensions = {k: v for k, v in event.items() if k not in reserved_attr}
if extensions:
kwargs["extensions"] = extensions
try:
event_obj = cls(
id=event.get("id"),
source=event["source"],
type=event["type"],
specversion=event.get("specversion"),
time=_convert_to_isoformat(event.get("time")),
**kwargs,
)
except KeyError as err:
# https://github.com/cloudevents/spec Cloud event spec requires source, type,
# specversion. We autopopulate everything other than source, type.
# So we will assume the KeyError is coming from source/type access.
if all(
key in event
for key in (
"subject",
"eventType",
"data",
"dataVersion",
"id",
"eventTime",
)
):
raise ValueError(
"The event you are trying to parse follows the Eventgrid Schema. You can parse"
+ " EventGrid events using EventGridEvent.from_dict method in the azure-eventgrid library."
) from err
raise ValueError(
"The event does not conform to the cloud event spec https://github.com/cloudevents/spec."
+ " The `source` and `type` params are required."
) from err
return event_obj
@classmethod
def from_json(cls, event: Any) -> CloudEvent[DataType]:
"""Returns the deserialized CloudEvent object when a json payload is provided.
:param event: The json string that should be converted into a CloudEvent. This can also be
a storage QueueMessage, eventhub's EventData or ServiceBusMessage
:type event: object
:rtype: CloudEvent
:return: The deserialized CloudEvent object.
:raises ValueError: If the provided JSON is invalid.
"""
dict_event = _get_json_content(event)
return CloudEvent.from_dict(dict_event)
|