# coding: utf-8 """ Hatchet API The Hatchet API The version of the OpenAPI document: 1.0.0 Generated by OpenAPI Generator (https://openapi-generator.tech) Do not edit the class manually. """ # noqa: E501 import warnings from typing import Any, Dict, List, Optional, Tuple, Union from pydantic import Field, StrictFloat, StrictInt, StrictStr, validate_call from typing_extensions import Annotated from hatchet_sdk.clients.rest.api_client import ApiClient, RequestSerialized from hatchet_sdk.clients.rest.api_response import ApiResponse from hatchet_sdk.clients.rest.models.bulk_create_event_request import ( BulkCreateEventRequest, ) from hatchet_sdk.clients.rest.models.cancel_event_request import CancelEventRequest from hatchet_sdk.clients.rest.models.create_event_request import CreateEventRequest from hatchet_sdk.clients.rest.models.event import Event from hatchet_sdk.clients.rest.models.event_data import EventData from hatchet_sdk.clients.rest.models.event_key_list import EventKeyList from hatchet_sdk.clients.rest.models.event_list import EventList from hatchet_sdk.clients.rest.models.event_order_by_direction import ( EventOrderByDirection, ) from hatchet_sdk.clients.rest.models.event_order_by_field import EventOrderByField from hatchet_sdk.clients.rest.models.event_update_cancel200_response import ( EventUpdateCancel200Response, ) from hatchet_sdk.clients.rest.models.events import Events from hatchet_sdk.clients.rest.models.replay_event_request import ReplayEventRequest from hatchet_sdk.clients.rest.models.workflow_run_status import WorkflowRunStatus from hatchet_sdk.clients.rest.rest import RESTResponseType class EventApi: """NOTE: This class is auto generated by OpenAPI Generator Ref: https://openapi-generator.tech Do not edit the class manually. """ def __init__(self, api_client=None) -> None: if api_client is None: api_client = ApiClient.get_default() self.api_client = api_client @validate_call async def event_create( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], create_event_request: Annotated[ CreateEventRequest, Field(description="The event to create") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> Event: """Create event Creates a new event. :param tenant: The tenant id (required) :type tenant: str :param create_event_request: The event to create (required) :type create_event_request: CreateEventRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_create_serialize( tenant=tenant, create_event_request=create_event_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Event", "400": "APIErrors", "403": "APIErrors", "429": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def event_create_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], create_event_request: Annotated[ CreateEventRequest, Field(description="The event to create") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[Event]: """Create event Creates a new event. :param tenant: The tenant id (required) :type tenant: str :param create_event_request: The event to create (required) :type create_event_request: CreateEventRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_create_serialize( tenant=tenant, create_event_request=create_event_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Event", "400": "APIErrors", "403": "APIErrors", "429": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def event_create_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], create_event_request: Annotated[ CreateEventRequest, Field(description="The event to create") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Create event Creates a new event. :param tenant: The tenant id (required) :type tenant: str :param create_event_request: The event to create (required) :type create_event_request: CreateEventRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_create_serialize( tenant=tenant, create_event_request=create_event_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Event", "400": "APIErrors", "403": "APIErrors", "429": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _event_create_serialize( self, tenant, create_event_request, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant # process the query parameters # process the header parameters # process the form parameters # process the body parameter if create_event_request is not None: _body_params = create_event_request # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # set the HTTP header `Content-Type` if _content_type: _header_params["Content-Type"] = _content_type else: _default_content_type = self.api_client.select_header_content_type( ["application/json"] ) if _default_content_type is not None: _header_params["Content-Type"] = _default_content_type # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="POST", resource_path="/api/v1/tenants/{tenant}/events", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def event_create_bulk( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], bulk_create_event_request: Annotated[ BulkCreateEventRequest, Field(description="The events to create") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> Events: """Bulk Create events Bulk creates new events. :param tenant: The tenant id (required) :type tenant: str :param bulk_create_event_request: The events to create (required) :type bulk_create_event_request: BulkCreateEventRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_create_bulk_serialize( tenant=tenant, bulk_create_event_request=bulk_create_event_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Events", "400": "APIErrors", "403": "APIErrors", "429": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def event_create_bulk_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], bulk_create_event_request: Annotated[ BulkCreateEventRequest, Field(description="The events to create") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[Events]: """Bulk Create events Bulk creates new events. :param tenant: The tenant id (required) :type tenant: str :param bulk_create_event_request: The events to create (required) :type bulk_create_event_request: BulkCreateEventRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_create_bulk_serialize( tenant=tenant, bulk_create_event_request=bulk_create_event_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Events", "400": "APIErrors", "403": "APIErrors", "429": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def event_create_bulk_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], bulk_create_event_request: Annotated[ BulkCreateEventRequest, Field(description="The events to create") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Bulk Create events Bulk creates new events. :param tenant: The tenant id (required) :type tenant: str :param bulk_create_event_request: The events to create (required) :type bulk_create_event_request: BulkCreateEventRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_create_bulk_serialize( tenant=tenant, bulk_create_event_request=bulk_create_event_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Events", "400": "APIErrors", "403": "APIErrors", "429": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _event_create_bulk_serialize( self, tenant, bulk_create_event_request, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant # process the query parameters # process the header parameters # process the form parameters # process the body parameter if bulk_create_event_request is not None: _body_params = bulk_create_event_request # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # set the HTTP header `Content-Type` if _content_type: _header_params["Content-Type"] = _content_type else: _default_content_type = self.api_client.select_header_content_type( ["application/json"] ) if _default_content_type is not None: _header_params["Content-Type"] = _default_content_type # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="POST", resource_path="/api/v1/tenants/{tenant}/events/bulk", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def event_data_get( self, event: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The event id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> EventData: """Get event data Get the data for an event. :param event: The event id (required) :type event: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_data_get_serialize( event=event, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventData", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def event_data_get_with_http_info( self, event: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The event id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[EventData]: """Get event data Get the data for an event. :param event: The event id (required) :type event: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_data_get_serialize( event=event, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventData", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def event_data_get_without_preload_content( self, event: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The event id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get event data Get the data for an event. :param event: The event id (required) :type event: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_data_get_serialize( event=event, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventData", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _event_data_get_serialize( self, event, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if event is not None: _path_params["event"] = event # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/events/{event}/data", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def event_get( self, event: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The event id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> Event: """Get event data Get an event. :param event: The event id (required) :type event: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_get_serialize( event=event, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Event", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def event_get_with_http_info( self, event: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The event id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[Event]: """Get event data Get an event. :param event: The event id (required) :type event: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_get_serialize( event=event, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Event", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def event_get_without_preload_content( self, event: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The event id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get event data Get an event. :param event: The event id (required) :type event: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_get_serialize( event=event, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Event", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _event_get_serialize( self, event, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if event is not None: _path_params["event"] = event # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/events/{event}", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def event_key_list( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> EventKeyList: """List event keys Lists all event keys for a tenant. :param tenant: The tenant id (required) :type tenant: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_key_list_serialize( tenant=tenant, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventKeyList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def event_key_list_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[EventKeyList]: """List event keys Lists all event keys for a tenant. :param tenant: The tenant id (required) :type tenant: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_key_list_serialize( tenant=tenant, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventKeyList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def event_key_list_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """List event keys Lists all event keys for a tenant. :param tenant: The tenant id (required) :type tenant: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_key_list_serialize( tenant=tenant, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventKeyList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _event_key_list_serialize( self, tenant, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/events/keys", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def event_list( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, keys: Annotated[ Optional[List[StrictStr]], Field(description="A list of keys to filter by") ] = None, workflows: Annotated[ Optional[List[StrictStr]], Field(description="A list of workflow IDs to filter by"), ] = None, statuses: Annotated[ Optional[List[WorkflowRunStatus]], Field(description="A list of workflow run statuses to filter by"), ] = None, search: Annotated[ Optional[StrictStr], Field(description="The search query to filter for") ] = None, order_by_field: Annotated[ Optional[EventOrderByField], Field(description="What to order by") ] = None, order_by_direction: Annotated[ Optional[EventOrderByDirection], Field(description="The order direction") ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, event_ids: Annotated[ Optional[ List[Annotated[str, Field(min_length=36, strict=True, max_length=36)]] ], Field(description="A list of event ids to filter by"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> EventList: """List events Lists all events for a tenant. :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param keys: A list of keys to filter by :type keys: List[str] :param workflows: A list of workflow IDs to filter by :type workflows: List[str] :param statuses: A list of workflow run statuses to filter by :type statuses: List[WorkflowRunStatus] :param search: The search query to filter for :type search: str :param order_by_field: What to order by :type order_by_field: EventOrderByField :param order_by_direction: The order direction :type order_by_direction: EventOrderByDirection :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param event_ids: A list of event ids to filter by :type event_ids: List[str] :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_list_serialize( tenant=tenant, offset=offset, limit=limit, keys=keys, workflows=workflows, statuses=statuses, search=search, order_by_field=order_by_field, order_by_direction=order_by_direction, additional_metadata=additional_metadata, event_ids=event_ids, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def event_list_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, keys: Annotated[ Optional[List[StrictStr]], Field(description="A list of keys to filter by") ] = None, workflows: Annotated[ Optional[List[StrictStr]], Field(description="A list of workflow IDs to filter by"), ] = None, statuses: Annotated[ Optional[List[WorkflowRunStatus]], Field(description="A list of workflow run statuses to filter by"), ] = None, search: Annotated[ Optional[StrictStr], Field(description="The search query to filter for") ] = None, order_by_field: Annotated[ Optional[EventOrderByField], Field(description="What to order by") ] = None, order_by_direction: Annotated[ Optional[EventOrderByDirection], Field(description="The order direction") ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, event_ids: Annotated[ Optional[ List[Annotated[str, Field(min_length=36, strict=True, max_length=36)]] ], Field(description="A list of event ids to filter by"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[EventList]: """List events Lists all events for a tenant. :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param keys: A list of keys to filter by :type keys: List[str] :param workflows: A list of workflow IDs to filter by :type workflows: List[str] :param statuses: A list of workflow run statuses to filter by :type statuses: List[WorkflowRunStatus] :param search: The search query to filter for :type search: str :param order_by_field: What to order by :type order_by_field: EventOrderByField :param order_by_direction: The order direction :type order_by_direction: EventOrderByDirection :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param event_ids: A list of event ids to filter by :type event_ids: List[str] :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_list_serialize( tenant=tenant, offset=offset, limit=limit, keys=keys, workflows=workflows, statuses=statuses, search=search, order_by_field=order_by_field, order_by_direction=order_by_direction, additional_metadata=additional_metadata, event_ids=event_ids, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def event_list_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, keys: Annotated[ Optional[List[StrictStr]], Field(description="A list of keys to filter by") ] = None, workflows: Annotated[ Optional[List[StrictStr]], Field(description="A list of workflow IDs to filter by"), ] = None, statuses: Annotated[ Optional[List[WorkflowRunStatus]], Field(description="A list of workflow run statuses to filter by"), ] = None, search: Annotated[ Optional[StrictStr], Field(description="The search query to filter for") ] = None, order_by_field: Annotated[ Optional[EventOrderByField], Field(description="What to order by") ] = None, order_by_direction: Annotated[ Optional[EventOrderByDirection], Field(description="The order direction") ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, event_ids: Annotated[ Optional[ List[Annotated[str, Field(min_length=36, strict=True, max_length=36)]] ], Field(description="A list of event ids to filter by"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """List events Lists all events for a tenant. :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param keys: A list of keys to filter by :type keys: List[str] :param workflows: A list of workflow IDs to filter by :type workflows: List[str] :param statuses: A list of workflow run statuses to filter by :type statuses: List[WorkflowRunStatus] :param search: The search query to filter for :type search: str :param order_by_field: What to order by :type order_by_field: EventOrderByField :param order_by_direction: The order direction :type order_by_direction: EventOrderByDirection :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param event_ids: A list of event ids to filter by :type event_ids: List[str] :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_list_serialize( tenant=tenant, offset=offset, limit=limit, keys=keys, workflows=workflows, statuses=statuses, search=search, order_by_field=order_by_field, order_by_direction=order_by_direction, additional_metadata=additional_metadata, event_ids=event_ids, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _event_list_serialize( self, tenant, offset, limit, keys, workflows, statuses, search, order_by_field, order_by_direction, additional_metadata, event_ids, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { "keys": "multi", "workflows": "multi", "statuses": "multi", "additionalMetadata": "multi", "eventIds": "multi", } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant # process the query parameters if offset is not None: _query_params.append(("offset", offset)) if limit is not None: _query_params.append(("limit", limit)) if keys is not None: _query_params.append(("keys", keys)) if workflows is not None: _query_params.append(("workflows", workflows)) if statuses is not None: _query_params.append(("statuses", statuses)) if search is not None: _query_params.append(("search", search)) if order_by_field is not None: _query_params.append(("orderByField", order_by_field.value)) if order_by_direction is not None: _query_params.append(("orderByDirection", order_by_direction.value)) if additional_metadata is not None: _query_params.append(("additionalMetadata", additional_metadata)) if event_ids is not None: _query_params.append(("eventIds", event_ids)) # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/events", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def event_update_cancel( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], cancel_event_request: Annotated[ CancelEventRequest, Field(description="The event ids to replay") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> EventUpdateCancel200Response: """Replay events Cancels all runs for a list of events. :param tenant: The tenant id (required) :type tenant: str :param cancel_event_request: The event ids to replay (required) :type cancel_event_request: CancelEventRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_update_cancel_serialize( tenant=tenant, cancel_event_request=cancel_event_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventUpdateCancel200Response", "400": "APIErrors", "403": "APIErrors", "429": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def event_update_cancel_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], cancel_event_request: Annotated[ CancelEventRequest, Field(description="The event ids to replay") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[EventUpdateCancel200Response]: """Replay events Cancels all runs for a list of events. :param tenant: The tenant id (required) :type tenant: str :param cancel_event_request: The event ids to replay (required) :type cancel_event_request: CancelEventRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_update_cancel_serialize( tenant=tenant, cancel_event_request=cancel_event_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventUpdateCancel200Response", "400": "APIErrors", "403": "APIErrors", "429": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def event_update_cancel_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], cancel_event_request: Annotated[ CancelEventRequest, Field(description="The event ids to replay") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Replay events Cancels all runs for a list of events. :param tenant: The tenant id (required) :type tenant: str :param cancel_event_request: The event ids to replay (required) :type cancel_event_request: CancelEventRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_update_cancel_serialize( tenant=tenant, cancel_event_request=cancel_event_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventUpdateCancel200Response", "400": "APIErrors", "403": "APIErrors", "429": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _event_update_cancel_serialize( self, tenant, cancel_event_request, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant # process the query parameters # process the header parameters # process the form parameters # process the body parameter if cancel_event_request is not None: _body_params = cancel_event_request # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # set the HTTP header `Content-Type` if _content_type: _header_params["Content-Type"] = _content_type else: _default_content_type = self.api_client.select_header_content_type( ["application/json"] ) if _default_content_type is not None: _header_params["Content-Type"] = _default_content_type # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="POST", resource_path="/api/v1/tenants/{tenant}/events/cancel", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def event_update_replay( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], replay_event_request: Annotated[ ReplayEventRequest, Field(description="The event ids to replay") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> EventList: """Replay events Replays a list of events. :param tenant: The tenant id (required) :type tenant: str :param replay_event_request: The event ids to replay (required) :type replay_event_request: ReplayEventRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_update_replay_serialize( tenant=tenant, replay_event_request=replay_event_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventList", "400": "APIErrors", "403": "APIErrors", "429": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def event_update_replay_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], replay_event_request: Annotated[ ReplayEventRequest, Field(description="The event ids to replay") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[EventList]: """Replay events Replays a list of events. :param tenant: The tenant id (required) :type tenant: str :param replay_event_request: The event ids to replay (required) :type replay_event_request: ReplayEventRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_update_replay_serialize( tenant=tenant, replay_event_request=replay_event_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventList", "400": "APIErrors", "403": "APIErrors", "429": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def event_update_replay_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], replay_event_request: Annotated[ ReplayEventRequest, Field(description="The event ids to replay") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Replay events Replays a list of events. :param tenant: The tenant id (required) :type tenant: str :param replay_event_request: The event ids to replay (required) :type replay_event_request: ReplayEventRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._event_update_replay_serialize( tenant=tenant, replay_event_request=replay_event_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "EventList", "400": "APIErrors", "403": "APIErrors", "429": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _event_update_replay_serialize( self, tenant, replay_event_request, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant # process the query parameters # process the header parameters # process the form parameters # process the body parameter if replay_event_request is not None: _body_params = replay_event_request # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # set the HTTP header `Content-Type` if _content_type: _header_params["Content-Type"] = _content_type else: _default_content_type = self.api_client.select_header_content_type( ["application/json"] ) if _default_content_type is not None: _header_params["Content-Type"] = _default_content_type # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="POST", resource_path="/api/v1/tenants/{tenant}/events/replay", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, )