# coding: utf-8 """ Hatchet API The Hatchet API The version of the OpenAPI document: 1.0.0 Generated by OpenAPI Generator (https://openapi-generator.tech) Do not edit the class manually. """ # noqa: E501 import warnings from typing import Any, Dict, List, Optional, Tuple, Union from pydantic import Field, StrictFloat, StrictInt, StrictStr, validate_call from typing_extensions import Annotated from hatchet_sdk.clients.rest.api_client import ApiClient, RequestSerialized from hatchet_sdk.clients.rest.api_response import ApiResponse from hatchet_sdk.clients.rest.models.rerun_step_run_request import RerunStepRunRequest from hatchet_sdk.clients.rest.models.step_run import StepRun from hatchet_sdk.clients.rest.models.step_run_archive_list import StepRunArchiveList from hatchet_sdk.clients.rest.models.step_run_event_list import StepRunEventList from hatchet_sdk.clients.rest.rest import RESTResponseType class StepRunApi: """NOTE: This class is auto generated by OpenAPI Generator Ref: https://openapi-generator.tech Do not edit the class manually. """ def __init__(self, api_client=None) -> None: if api_client is None: api_client = ApiClient.get_default() self.api_client = api_client @validate_call async def step_run_get( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> StepRun: """Get step run Get a step run by id :param tenant: The tenant id (required) :type tenant: str :param step_run: The step run id (required) :type step_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_get_serialize( tenant=tenant, step_run=step_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRun", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def step_run_get_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[StepRun]: """Get step run Get a step run by id :param tenant: The tenant id (required) :type tenant: str :param step_run: The step run id (required) :type step_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_get_serialize( tenant=tenant, step_run=step_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRun", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def step_run_get_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get step run Get a step run by id :param tenant: The tenant id (required) :type tenant: str :param step_run: The step run id (required) :type step_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_get_serialize( tenant=tenant, step_run=step_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRun", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _step_run_get_serialize( self, tenant, step_run, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant if step_run is not None: _path_params["step-run"] = step_run # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/step-runs/{step-run}", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def step_run_get_schema( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> object: """Get step run schema Get the schema for a step run :param tenant: The tenant id (required) :type tenant: str :param step_run: The step run id (required) :type step_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_get_schema_serialize( tenant=tenant, step_run=step_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "object", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def step_run_get_schema_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[object]: """Get step run schema Get the schema for a step run :param tenant: The tenant id (required) :type tenant: str :param step_run: The step run id (required) :type step_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_get_schema_serialize( tenant=tenant, step_run=step_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "object", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def step_run_get_schema_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get step run schema Get the schema for a step run :param tenant: The tenant id (required) :type tenant: str :param step_run: The step run id (required) :type step_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_get_schema_serialize( tenant=tenant, step_run=step_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "object", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _step_run_get_schema_serialize( self, tenant, step_run, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant if step_run is not None: _path_params["step-run"] = step_run # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/step-runs/{step-run}/schema", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def step_run_list_archives( self, step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> StepRunArchiveList: """List archives for step run List archives for a step run :param step_run: The step run id (required) :type step_run: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_list_archives_serialize( step_run=step_run, offset=offset, limit=limit, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRunArchiveList", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def step_run_list_archives_with_http_info( self, step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[StepRunArchiveList]: """List archives for step run List archives for a step run :param step_run: The step run id (required) :type step_run: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_list_archives_serialize( step_run=step_run, offset=offset, limit=limit, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRunArchiveList", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def step_run_list_archives_without_preload_content( self, step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """List archives for step run List archives for a step run :param step_run: The step run id (required) :type step_run: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_list_archives_serialize( step_run=step_run, offset=offset, limit=limit, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRunArchiveList", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _step_run_list_archives_serialize( self, step_run, offset, limit, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if step_run is not None: _path_params["step-run"] = step_run # process the query parameters if offset is not None: _query_params.append(("offset", offset)) if limit is not None: _query_params.append(("limit", limit)) # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/step-runs/{step-run}/archives", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def step_run_list_events( self, step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> StepRunEventList: """List events for step run List events for a step run :param step_run: The step run id (required) :type step_run: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_list_events_serialize( step_run=step_run, offset=offset, limit=limit, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRunEventList", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def step_run_list_events_with_http_info( self, step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[StepRunEventList]: """List events for step run List events for a step run :param step_run: The step run id (required) :type step_run: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_list_events_serialize( step_run=step_run, offset=offset, limit=limit, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRunEventList", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def step_run_list_events_without_preload_content( self, step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """List events for step run List events for a step run :param step_run: The step run id (required) :type step_run: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_list_events_serialize( step_run=step_run, offset=offset, limit=limit, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRunEventList", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _step_run_list_events_serialize( self, step_run, offset, limit, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if step_run is not None: _path_params["step-run"] = step_run # process the query parameters if offset is not None: _query_params.append(("offset", offset)) if limit is not None: _query_params.append(("limit", limit)) # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/step-runs/{step-run}/events", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def step_run_update_cancel( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> StepRun: """Attempts to cancel a step run Attempts to cancel a step run :param tenant: The tenant id (required) :type tenant: str :param step_run: The step run id (required) :type step_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_update_cancel_serialize( tenant=tenant, step_run=step_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRun", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def step_run_update_cancel_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[StepRun]: """Attempts to cancel a step run Attempts to cancel a step run :param tenant: The tenant id (required) :type tenant: str :param step_run: The step run id (required) :type step_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_update_cancel_serialize( tenant=tenant, step_run=step_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRun", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def step_run_update_cancel_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Attempts to cancel a step run Attempts to cancel a step run :param tenant: The tenant id (required) :type tenant: str :param step_run: The step run id (required) :type step_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_update_cancel_serialize( tenant=tenant, step_run=step_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRun", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _step_run_update_cancel_serialize( self, tenant, step_run, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant if step_run is not None: _path_params["step-run"] = step_run # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="POST", resource_path="/api/v1/tenants/{tenant}/step-runs/{step-run}/cancel", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def step_run_update_rerun( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], rerun_step_run_request: Annotated[ RerunStepRunRequest, Field(description="The input to the rerun") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> StepRun: """Rerun step run Reruns a step run :param tenant: The tenant id (required) :type tenant: str :param step_run: The step run id (required) :type step_run: str :param rerun_step_run_request: The input to the rerun (required) :type rerun_step_run_request: RerunStepRunRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_update_rerun_serialize( tenant=tenant, step_run=step_run, rerun_step_run_request=rerun_step_run_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRun", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def step_run_update_rerun_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], rerun_step_run_request: Annotated[ RerunStepRunRequest, Field(description="The input to the rerun") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[StepRun]: """Rerun step run Reruns a step run :param tenant: The tenant id (required) :type tenant: str :param step_run: The step run id (required) :type step_run: str :param rerun_step_run_request: The input to the rerun (required) :type rerun_step_run_request: RerunStepRunRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_update_rerun_serialize( tenant=tenant, step_run=step_run, rerun_step_run_request=rerun_step_run_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRun", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def step_run_update_rerun_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], step_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The step run id" ), ], rerun_step_run_request: Annotated[ RerunStepRunRequest, Field(description="The input to the rerun") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Rerun step run Reruns a step run :param tenant: The tenant id (required) :type tenant: str :param step_run: The step run id (required) :type step_run: str :param rerun_step_run_request: The input to the rerun (required) :type rerun_step_run_request: RerunStepRunRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._step_run_update_rerun_serialize( tenant=tenant, step_run=step_run, rerun_step_run_request=rerun_step_run_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRun", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _step_run_update_rerun_serialize( self, tenant, step_run, rerun_step_run_request, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant if step_run is not None: _path_params["step-run"] = step_run # process the query parameters # process the header parameters # process the form parameters # process the body parameter if rerun_step_run_request is not None: _body_params = rerun_step_run_request # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # set the HTTP header `Content-Type` if _content_type: _header_params["Content-Type"] = _content_type else: _default_content_type = self.api_client.select_header_content_type( ["application/json"] ) if _default_content_type is not None: _header_params["Content-Type"] = _default_content_type # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="POST", resource_path="/api/v1/tenants/{tenant}/step-runs/{step-run}/rerun", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_run_list_step_run_events( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow run id", ), ], last_id: Annotated[ Optional[StrictInt], Field(description="Last ID of the last event") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> StepRunEventList: """List events for all step runs for a workflow run List events for all step runs for a workflow run :param tenant: The tenant id (required) :type tenant: str :param workflow_run: The workflow run id (required) :type workflow_run: str :param last_id: Last ID of the last event :type last_id: int :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_list_step_run_events_serialize( tenant=tenant, workflow_run=workflow_run, last_id=last_id, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRunEventList", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_run_list_step_run_events_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow run id", ), ], last_id: Annotated[ Optional[StrictInt], Field(description="Last ID of the last event") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[StepRunEventList]: """List events for all step runs for a workflow run List events for all step runs for a workflow run :param tenant: The tenant id (required) :type tenant: str :param workflow_run: The workflow run id (required) :type workflow_run: str :param last_id: Last ID of the last event :type last_id: int :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_list_step_run_events_serialize( tenant=tenant, workflow_run=workflow_run, last_id=last_id, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRunEventList", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_run_list_step_run_events_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow run id", ), ], last_id: Annotated[ Optional[StrictInt], Field(description="Last ID of the last event") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """List events for all step runs for a workflow run List events for all step runs for a workflow run :param tenant: The tenant id (required) :type tenant: str :param workflow_run: The workflow run id (required) :type workflow_run: str :param last_id: Last ID of the last event :type last_id: int :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_list_step_run_events_serialize( tenant=tenant, workflow_run=workflow_run, last_id=last_id, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "StepRunEventList", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_run_list_step_run_events_serialize( self, tenant, workflow_run, last_id, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant if workflow_run is not None: _path_params["workflow-run"] = workflow_run # process the query parameters if last_id is not None: _query_params.append(("lastId", last_id)) # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/workflow-runs/{workflow-run}/step-run-events", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, )