# coding: utf-8
"""
Hatchet API
The Hatchet API
The version of the OpenAPI document: 1.0.0
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
import warnings
from typing import Any, Dict, List, Optional, Tuple, Union
from pydantic import Field, StrictFloat, StrictInt, StrictStr, validate_call
from typing_extensions import Annotated
from hatchet_sdk.clients.rest.api_client import ApiClient, RequestSerialized
from hatchet_sdk.clients.rest.api_response import ApiResponse
from hatchet_sdk.clients.rest.models.rerun_step_run_request import RerunStepRunRequest
from hatchet_sdk.clients.rest.models.step_run import StepRun
from hatchet_sdk.clients.rest.models.step_run_archive_list import StepRunArchiveList
from hatchet_sdk.clients.rest.models.step_run_event_list import StepRunEventList
from hatchet_sdk.clients.rest.rest import RESTResponseType
class StepRunApi:
"""NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
"""
def __init__(self, api_client=None) -> None:
if api_client is None:
api_client = ApiClient.get_default()
self.api_client = api_client
@validate_call
async def step_run_get(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> StepRun:
"""Get step run
Get a step run by id
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_get_serialize(
tenant=tenant,
step_run=step_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRun",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def step_run_get_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[StepRun]:
"""Get step run
Get a step run by id
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_get_serialize(
tenant=tenant,
step_run=step_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRun",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def step_run_get_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get step run
Get a step run by id
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_get_serialize(
tenant=tenant,
step_run=step_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRun",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _step_run_get_serialize(
self,
tenant,
step_run,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
if step_run is not None:
_path_params["step-run"] = step_run
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/step-runs/{step-run}",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def step_run_get_schema(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> object:
"""Get step run schema
Get the schema for a step run
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_get_schema_serialize(
tenant=tenant,
step_run=step_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "object",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def step_run_get_schema_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[object]:
"""Get step run schema
Get the schema for a step run
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_get_schema_serialize(
tenant=tenant,
step_run=step_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "object",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def step_run_get_schema_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get step run schema
Get the schema for a step run
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_get_schema_serialize(
tenant=tenant,
step_run=step_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "object",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _step_run_get_schema_serialize(
self,
tenant,
step_run,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
if step_run is not None:
_path_params["step-run"] = step_run
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/step-runs/{step-run}/schema",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def step_run_list_archives(
self,
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> StepRunArchiveList:
"""List archives for step run
List archives for a step run
:param step_run: The step run id (required)
:type step_run: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_list_archives_serialize(
step_run=step_run,
offset=offset,
limit=limit,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRunArchiveList",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def step_run_list_archives_with_http_info(
self,
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[StepRunArchiveList]:
"""List archives for step run
List archives for a step run
:param step_run: The step run id (required)
:type step_run: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_list_archives_serialize(
step_run=step_run,
offset=offset,
limit=limit,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRunArchiveList",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def step_run_list_archives_without_preload_content(
self,
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""List archives for step run
List archives for a step run
:param step_run: The step run id (required)
:type step_run: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_list_archives_serialize(
step_run=step_run,
offset=offset,
limit=limit,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRunArchiveList",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _step_run_list_archives_serialize(
self,
step_run,
offset,
limit,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if step_run is not None:
_path_params["step-run"] = step_run
# process the query parameters
if offset is not None:
_query_params.append(("offset", offset))
if limit is not None:
_query_params.append(("limit", limit))
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/step-runs/{step-run}/archives",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def step_run_list_events(
self,
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> StepRunEventList:
"""List events for step run
List events for a step run
:param step_run: The step run id (required)
:type step_run: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_list_events_serialize(
step_run=step_run,
offset=offset,
limit=limit,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRunEventList",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def step_run_list_events_with_http_info(
self,
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[StepRunEventList]:
"""List events for step run
List events for a step run
:param step_run: The step run id (required)
:type step_run: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_list_events_serialize(
step_run=step_run,
offset=offset,
limit=limit,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRunEventList",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def step_run_list_events_without_preload_content(
self,
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""List events for step run
List events for a step run
:param step_run: The step run id (required)
:type step_run: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_list_events_serialize(
step_run=step_run,
offset=offset,
limit=limit,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRunEventList",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _step_run_list_events_serialize(
self,
step_run,
offset,
limit,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if step_run is not None:
_path_params["step-run"] = step_run
# process the query parameters
if offset is not None:
_query_params.append(("offset", offset))
if limit is not None:
_query_params.append(("limit", limit))
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/step-runs/{step-run}/events",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def step_run_update_cancel(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> StepRun:
"""Attempts to cancel a step run
Attempts to cancel a step run
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_update_cancel_serialize(
tenant=tenant,
step_run=step_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRun",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def step_run_update_cancel_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[StepRun]:
"""Attempts to cancel a step run
Attempts to cancel a step run
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_update_cancel_serialize(
tenant=tenant,
step_run=step_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRun",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def step_run_update_cancel_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Attempts to cancel a step run
Attempts to cancel a step run
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_update_cancel_serialize(
tenant=tenant,
step_run=step_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRun",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _step_run_update_cancel_serialize(
self,
tenant,
step_run,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
if step_run is not None:
_path_params["step-run"] = step_run
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="POST",
resource_path="/api/v1/tenants/{tenant}/step-runs/{step-run}/cancel",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def step_run_update_rerun(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
rerun_step_run_request: Annotated[
RerunStepRunRequest, Field(description="The input to the rerun")
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> StepRun:
"""Rerun step run
Reruns a step run
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param rerun_step_run_request: The input to the rerun (required)
:type rerun_step_run_request: RerunStepRunRequest
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_update_rerun_serialize(
tenant=tenant,
step_run=step_run,
rerun_step_run_request=rerun_step_run_request,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRun",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def step_run_update_rerun_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
rerun_step_run_request: Annotated[
RerunStepRunRequest, Field(description="The input to the rerun")
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[StepRun]:
"""Rerun step run
Reruns a step run
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param rerun_step_run_request: The input to the rerun (required)
:type rerun_step_run_request: RerunStepRunRequest
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_update_rerun_serialize(
tenant=tenant,
step_run=step_run,
rerun_step_run_request=rerun_step_run_request,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRun",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def step_run_update_rerun_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
step_run: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The step run id"
),
],
rerun_step_run_request: Annotated[
RerunStepRunRequest, Field(description="The input to the rerun")
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Rerun step run
Reruns a step run
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param rerun_step_run_request: The input to the rerun (required)
:type rerun_step_run_request: RerunStepRunRequest
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_update_rerun_serialize(
tenant=tenant,
step_run=step_run,
rerun_step_run_request=rerun_step_run_request,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRun",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _step_run_update_rerun_serialize(
self,
tenant,
step_run,
rerun_step_run_request,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
if step_run is not None:
_path_params["step-run"] = step_run
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
if rerun_step_run_request is not None:
_body_params = rerun_step_run_request
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# set the HTTP header `Content-Type`
if _content_type:
_header_params["Content-Type"] = _content_type
else:
_default_content_type = self.api_client.select_header_content_type(
["application/json"]
)
if _default_content_type is not None:
_header_params["Content-Type"] = _default_content_type
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="POST",
resource_path="/api/v1/tenants/{tenant}/step-runs/{step-run}/rerun",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_run_list_step_run_events(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The workflow run id",
),
],
last_id: Annotated[
Optional[StrictInt], Field(description="Last ID of the last event")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> StepRunEventList:
"""List events for all step runs for a workflow run
List events for all step runs for a workflow run
:param tenant: The tenant id (required)
:type tenant: str
:param workflow_run: The workflow run id (required)
:type workflow_run: str
:param last_id: Last ID of the last event
:type last_id: int
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_list_step_run_events_serialize(
tenant=tenant,
workflow_run=workflow_run,
last_id=last_id,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRunEventList",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_run_list_step_run_events_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The workflow run id",
),
],
last_id: Annotated[
Optional[StrictInt], Field(description="Last ID of the last event")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[StepRunEventList]:
"""List events for all step runs for a workflow run
List events for all step runs for a workflow run
:param tenant: The tenant id (required)
:type tenant: str
:param workflow_run: The workflow run id (required)
:type workflow_run: str
:param last_id: Last ID of the last event
:type last_id: int
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_list_step_run_events_serialize(
tenant=tenant,
workflow_run=workflow_run,
last_id=last_id,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRunEventList",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_run_list_step_run_events_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The workflow run id",
),
],
last_id: Annotated[
Optional[StrictInt], Field(description="Last ID of the last event")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""List events for all step runs for a workflow run
List events for all step runs for a workflow run
:param tenant: The tenant id (required)
:type tenant: str
:param workflow_run: The workflow run id (required)
:type workflow_run: str
:param last_id: Last ID of the last event
:type last_id: int
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_list_step_run_events_serialize(
tenant=tenant,
workflow_run=workflow_run,
last_id=last_id,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "StepRunEventList",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_run_list_step_run_events_serialize(
self,
tenant,
workflow_run,
last_id,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
if workflow_run is not None:
_path_params["workflow-run"] = workflow_run
# process the query parameters
if last_id is not None:
_query_params.append(("lastId", last_id))
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/workflow-runs/{workflow-run}/step-run-events",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)