# coding: utf-8
"""
Hatchet API
The Hatchet API
The version of the OpenAPI document: 1.0.0
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
import warnings
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Union
from pydantic import Field, StrictFloat, StrictInt, StrictStr, validate_call
from typing_extensions import Annotated
from hatchet_sdk.clients.rest.api_client import ApiClient, RequestSerialized
from hatchet_sdk.clients.rest.api_response import ApiResponse
from hatchet_sdk.clients.rest.models.cron_workflows import CronWorkflows
from hatchet_sdk.clients.rest.models.cron_workflows_list import CronWorkflowsList
from hatchet_sdk.clients.rest.models.cron_workflows_order_by_field import (
CronWorkflowsOrderByField,
)
from hatchet_sdk.clients.rest.models.scheduled_run_status import ScheduledRunStatus
from hatchet_sdk.clients.rest.models.scheduled_workflows import ScheduledWorkflows
from hatchet_sdk.clients.rest.models.scheduled_workflows_list import (
ScheduledWorkflowsList,
)
from hatchet_sdk.clients.rest.models.scheduled_workflows_order_by_field import (
ScheduledWorkflowsOrderByField,
)
from hatchet_sdk.clients.rest.models.tenant_queue_metrics import TenantQueueMetrics
from hatchet_sdk.clients.rest.models.workflow import Workflow
from hatchet_sdk.clients.rest.models.workflow_kind import WorkflowKind
from hatchet_sdk.clients.rest.models.workflow_list import WorkflowList
from hatchet_sdk.clients.rest.models.workflow_metrics import WorkflowMetrics
from hatchet_sdk.clients.rest.models.workflow_run import WorkflowRun
from hatchet_sdk.clients.rest.models.workflow_run_list import WorkflowRunList
from hatchet_sdk.clients.rest.models.workflow_run_order_by_direction import (
WorkflowRunOrderByDirection,
)
from hatchet_sdk.clients.rest.models.workflow_run_order_by_field import (
WorkflowRunOrderByField,
)
from hatchet_sdk.clients.rest.models.workflow_run_shape import WorkflowRunShape
from hatchet_sdk.clients.rest.models.workflow_run_status import WorkflowRunStatus
from hatchet_sdk.clients.rest.models.workflow_runs_metrics import WorkflowRunsMetrics
from hatchet_sdk.clients.rest.models.workflow_update_request import (
WorkflowUpdateRequest,
)
from hatchet_sdk.clients.rest.models.workflow_version import WorkflowVersion
from hatchet_sdk.clients.rest.models.workflow_workers_count import WorkflowWorkersCount
from hatchet_sdk.clients.rest.rest import RESTResponseType
class WorkflowApi:
"""NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
"""
def __init__(self, api_client=None) -> None:
if api_client is None:
api_client = ApiClient.get_default()
self.api_client = api_client
@validate_call
async def cron_workflow_list(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
workflow_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The workflow id to get runs for."),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
order_by_field: Annotated[
Optional[CronWorkflowsOrderByField], Field(description="The order by field")
] = None,
order_by_direction: Annotated[
Optional[WorkflowRunOrderByDirection],
Field(description="The order by direction"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> CronWorkflowsList:
"""Get cron job workflows
Get all cron job workflow triggers for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param workflow_id: The workflow id to get runs for.
:type workflow_id: str
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param order_by_field: The order by field
:type order_by_field: CronWorkflowsOrderByField
:param order_by_direction: The order by direction
:type order_by_direction: WorkflowRunOrderByDirection
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._cron_workflow_list_serialize(
tenant=tenant,
offset=offset,
limit=limit,
workflow_id=workflow_id,
additional_metadata=additional_metadata,
order_by_field=order_by_field,
order_by_direction=order_by_direction,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "CronWorkflowsList",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def cron_workflow_list_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
workflow_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The workflow id to get runs for."),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
order_by_field: Annotated[
Optional[CronWorkflowsOrderByField], Field(description="The order by field")
] = None,
order_by_direction: Annotated[
Optional[WorkflowRunOrderByDirection],
Field(description="The order by direction"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[CronWorkflowsList]:
"""Get cron job workflows
Get all cron job workflow triggers for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param workflow_id: The workflow id to get runs for.
:type workflow_id: str
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param order_by_field: The order by field
:type order_by_field: CronWorkflowsOrderByField
:param order_by_direction: The order by direction
:type order_by_direction: WorkflowRunOrderByDirection
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._cron_workflow_list_serialize(
tenant=tenant,
offset=offset,
limit=limit,
workflow_id=workflow_id,
additional_metadata=additional_metadata,
order_by_field=order_by_field,
order_by_direction=order_by_direction,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "CronWorkflowsList",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def cron_workflow_list_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
workflow_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The workflow id to get runs for."),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
order_by_field: Annotated[
Optional[CronWorkflowsOrderByField], Field(description="The order by field")
] = None,
order_by_direction: Annotated[
Optional[WorkflowRunOrderByDirection],
Field(description="The order by direction"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get cron job workflows
Get all cron job workflow triggers for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param workflow_id: The workflow id to get runs for.
:type workflow_id: str
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param order_by_field: The order by field
:type order_by_field: CronWorkflowsOrderByField
:param order_by_direction: The order by direction
:type order_by_direction: WorkflowRunOrderByDirection
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._cron_workflow_list_serialize(
tenant=tenant,
offset=offset,
limit=limit,
workflow_id=workflow_id,
additional_metadata=additional_metadata,
order_by_field=order_by_field,
order_by_direction=order_by_direction,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "CronWorkflowsList",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _cron_workflow_list_serialize(
self,
tenant,
offset,
limit,
workflow_id,
additional_metadata,
order_by_field,
order_by_direction,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
"additionalMetadata": "multi",
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
# process the query parameters
if offset is not None:
_query_params.append(("offset", offset))
if limit is not None:
_query_params.append(("limit", limit))
if workflow_id is not None:
_query_params.append(("workflowId", workflow_id))
if additional_metadata is not None:
_query_params.append(("additionalMetadata", additional_metadata))
if order_by_field is not None:
_query_params.append(("orderByField", order_by_field.value))
if order_by_direction is not None:
_query_params.append(("orderByDirection", order_by_direction.value))
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/workflows/crons",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def tenant_get_queue_metrics(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflows: Annotated[
Optional[List[StrictStr]],
Field(description="A list of workflow IDs to filter by"),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> TenantQueueMetrics:
"""Get workflow metrics
Get the queue metrics for the tenant
:param tenant: The tenant id (required)
:type tenant: str
:param workflows: A list of workflow IDs to filter by
:type workflows: List[str]
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._tenant_get_queue_metrics_serialize(
tenant=tenant,
workflows=workflows,
additional_metadata=additional_metadata,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "TenantQueueMetrics",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def tenant_get_queue_metrics_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflows: Annotated[
Optional[List[StrictStr]],
Field(description="A list of workflow IDs to filter by"),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[TenantQueueMetrics]:
"""Get workflow metrics
Get the queue metrics for the tenant
:param tenant: The tenant id (required)
:type tenant: str
:param workflows: A list of workflow IDs to filter by
:type workflows: List[str]
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._tenant_get_queue_metrics_serialize(
tenant=tenant,
workflows=workflows,
additional_metadata=additional_metadata,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "TenantQueueMetrics",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def tenant_get_queue_metrics_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflows: Annotated[
Optional[List[StrictStr]],
Field(description="A list of workflow IDs to filter by"),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get workflow metrics
Get the queue metrics for the tenant
:param tenant: The tenant id (required)
:type tenant: str
:param workflows: A list of workflow IDs to filter by
:type workflows: List[str]
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._tenant_get_queue_metrics_serialize(
tenant=tenant,
workflows=workflows,
additional_metadata=additional_metadata,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "TenantQueueMetrics",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _tenant_get_queue_metrics_serialize(
self,
tenant,
workflows,
additional_metadata,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
"workflows": "multi",
"additionalMetadata": "multi",
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
# process the query parameters
if workflows is not None:
_query_params.append(("workflows", workflows))
if additional_metadata is not None:
_query_params.append(("additionalMetadata", additional_metadata))
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/queue-metrics",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_cron_delete(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
cron_workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The cron job id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> None:
"""Delete cron job workflow run
Delete a cron job workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param cron_workflow: The cron job id (required)
:type cron_workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_cron_delete_serialize(
tenant=tenant,
cron_workflow=cron_workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"204": None,
"400": "APIErrors",
"403": "APIError",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_cron_delete_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
cron_workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The cron job id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[None]:
"""Delete cron job workflow run
Delete a cron job workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param cron_workflow: The cron job id (required)
:type cron_workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_cron_delete_serialize(
tenant=tenant,
cron_workflow=cron_workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"204": None,
"400": "APIErrors",
"403": "APIError",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_cron_delete_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
cron_workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The cron job id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Delete cron job workflow run
Delete a cron job workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param cron_workflow: The cron job id (required)
:type cron_workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_cron_delete_serialize(
tenant=tenant,
cron_workflow=cron_workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"204": None,
"400": "APIErrors",
"403": "APIError",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_cron_delete_serialize(
self,
tenant,
cron_workflow,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
if cron_workflow is not None:
_path_params["cron-workflow"] = cron_workflow
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="DELETE",
resource_path="/api/v1/tenants/{tenant}/workflows/crons/{cron-workflow}",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_cron_get(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
cron_workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The cron job id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> CronWorkflows:
"""Get cron job workflow run
Get a cron job workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param cron_workflow: The cron job id (required)
:type cron_workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_cron_get_serialize(
tenant=tenant,
cron_workflow=cron_workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "CronWorkflows",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_cron_get_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
cron_workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The cron job id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[CronWorkflows]:
"""Get cron job workflow run
Get a cron job workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param cron_workflow: The cron job id (required)
:type cron_workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_cron_get_serialize(
tenant=tenant,
cron_workflow=cron_workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "CronWorkflows",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_cron_get_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
cron_workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The cron job id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get cron job workflow run
Get a cron job workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param cron_workflow: The cron job id (required)
:type cron_workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_cron_get_serialize(
tenant=tenant,
cron_workflow=cron_workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "CronWorkflows",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_cron_get_serialize(
self,
tenant,
cron_workflow,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
if cron_workflow is not None:
_path_params["cron-workflow"] = cron_workflow
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/workflows/crons/{cron-workflow}",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_delete(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> None:
"""Delete workflow
Delete a workflow for a tenant
:param workflow: The workflow id (required)
:type workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_delete_serialize(
workflow=workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"204": None,
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_delete_with_http_info(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[None]:
"""Delete workflow
Delete a workflow for a tenant
:param workflow: The workflow id (required)
:type workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_delete_serialize(
workflow=workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"204": None,
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_delete_without_preload_content(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Delete workflow
Delete a workflow for a tenant
:param workflow: The workflow id (required)
:type workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_delete_serialize(
workflow=workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"204": None,
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_delete_serialize(
self,
workflow,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if workflow is not None:
_path_params["workflow"] = workflow
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="DELETE",
resource_path="/api/v1/workflows/{workflow}",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_get(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> Workflow:
"""Get workflow
Get a workflow for a tenant
:param workflow: The workflow id (required)
:type workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_get_serialize(
workflow=workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "Workflow",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_get_with_http_info(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[Workflow]:
"""Get workflow
Get a workflow for a tenant
:param workflow: The workflow id (required)
:type workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_get_serialize(
workflow=workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "Workflow",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_get_without_preload_content(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get workflow
Get a workflow for a tenant
:param workflow: The workflow id (required)
:type workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_get_serialize(
workflow=workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "Workflow",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_get_serialize(
self,
workflow,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if workflow is not None:
_path_params["workflow"] = workflow
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/workflows/{workflow}",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_get_metrics(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
status: Annotated[
Optional[WorkflowRunStatus],
Field(description="A status of workflow run statuses to filter by"),
] = None,
group_key: Annotated[
Optional[StrictStr], Field(description="A group key to filter metrics by")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> WorkflowMetrics:
"""Get workflow metrics
Get the metrics for a workflow version
:param workflow: The workflow id (required)
:type workflow: str
:param status: A status of workflow run statuses to filter by
:type status: WorkflowRunStatus
:param group_key: A group key to filter metrics by
:type group_key: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_get_metrics_serialize(
workflow=workflow,
status=status,
group_key=group_key,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowMetrics",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_get_metrics_with_http_info(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
status: Annotated[
Optional[WorkflowRunStatus],
Field(description="A status of workflow run statuses to filter by"),
] = None,
group_key: Annotated[
Optional[StrictStr], Field(description="A group key to filter metrics by")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[WorkflowMetrics]:
"""Get workflow metrics
Get the metrics for a workflow version
:param workflow: The workflow id (required)
:type workflow: str
:param status: A status of workflow run statuses to filter by
:type status: WorkflowRunStatus
:param group_key: A group key to filter metrics by
:type group_key: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_get_metrics_serialize(
workflow=workflow,
status=status,
group_key=group_key,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowMetrics",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_get_metrics_without_preload_content(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
status: Annotated[
Optional[WorkflowRunStatus],
Field(description="A status of workflow run statuses to filter by"),
] = None,
group_key: Annotated[
Optional[StrictStr], Field(description="A group key to filter metrics by")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get workflow metrics
Get the metrics for a workflow version
:param workflow: The workflow id (required)
:type workflow: str
:param status: A status of workflow run statuses to filter by
:type status: WorkflowRunStatus
:param group_key: A group key to filter metrics by
:type group_key: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_get_metrics_serialize(
workflow=workflow,
status=status,
group_key=group_key,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowMetrics",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_get_metrics_serialize(
self,
workflow,
status,
group_key,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if workflow is not None:
_path_params["workflow"] = workflow
# process the query parameters
if status is not None:
_query_params.append(("status", status.value))
if group_key is not None:
_query_params.append(("groupKey", group_key))
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/workflows/{workflow}/metrics",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_get_workers_count(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> WorkflowWorkersCount:
"""Get workflow worker count
Get a count of the workers available for workflow
:param tenant: The tenant id (required)
:type tenant: str
:param workflow: The workflow id (required)
:type workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_get_workers_count_serialize(
tenant=tenant,
workflow=workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowWorkersCount",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_get_workers_count_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[WorkflowWorkersCount]:
"""Get workflow worker count
Get a count of the workers available for workflow
:param tenant: The tenant id (required)
:type tenant: str
:param workflow: The workflow id (required)
:type workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_get_workers_count_serialize(
tenant=tenant,
workflow=workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowWorkersCount",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_get_workers_count_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get workflow worker count
Get a count of the workers available for workflow
:param tenant: The tenant id (required)
:type tenant: str
:param workflow: The workflow id (required)
:type workflow: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_get_workers_count_serialize(
tenant=tenant,
workflow=workflow,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowWorkersCount",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_get_workers_count_serialize(
self,
tenant,
workflow,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
if workflow is not None:
_path_params["workflow"] = workflow
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/workflows/{workflow}/worker-count",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_list(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
name: Annotated[
Optional[StrictStr], Field(description="Search by name")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> WorkflowList:
"""Get workflows
Get all workflows for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param name: Search by name
:type name: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_list_serialize(
tenant=tenant,
offset=offset,
limit=limit,
name=name,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowList",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_list_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
name: Annotated[
Optional[StrictStr], Field(description="Search by name")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[WorkflowList]:
"""Get workflows
Get all workflows for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param name: Search by name
:type name: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_list_serialize(
tenant=tenant,
offset=offset,
limit=limit,
name=name,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowList",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_list_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
name: Annotated[
Optional[StrictStr], Field(description="Search by name")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get workflows
Get all workflows for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param name: Search by name
:type name: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_list_serialize(
tenant=tenant,
offset=offset,
limit=limit,
name=name,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowList",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_list_serialize(
self,
tenant,
offset,
limit,
name,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
# process the query parameters
if offset is not None:
_query_params.append(("offset", offset))
if limit is not None:
_query_params.append(("limit", limit))
if name is not None:
_query_params.append(("name", name))
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/workflows",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_run_get(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The workflow run id",
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> WorkflowRun:
"""Get workflow run
Get a workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param workflow_run: The workflow run id (required)
:type workflow_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_get_serialize(
tenant=tenant,
workflow_run=workflow_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowRun",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_run_get_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The workflow run id",
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[WorkflowRun]:
"""Get workflow run
Get a workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param workflow_run: The workflow run id (required)
:type workflow_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_get_serialize(
tenant=tenant,
workflow_run=workflow_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowRun",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_run_get_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The workflow run id",
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get workflow run
Get a workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param workflow_run: The workflow run id (required)
:type workflow_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_get_serialize(
tenant=tenant,
workflow_run=workflow_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowRun",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_run_get_serialize(
self,
tenant,
workflow_run,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
if workflow_run is not None:
_path_params["workflow-run"] = workflow_run
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/workflow-runs/{workflow-run}",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_run_get_metrics(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
event_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The event id to get runs for."),
] = None,
workflow_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The workflow id to get runs for."),
] = None,
parent_workflow_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent workflow run id"),
] = None,
parent_step_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent step run id"),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
created_after: Annotated[
Optional[datetime],
Field(description="The time after the workflow run was created"),
] = None,
created_before: Annotated[
Optional[datetime],
Field(description="The time before the workflow run was created"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> WorkflowRunsMetrics:
"""Get workflow runs metrics
Get a summary of workflow run metrics for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param event_id: The event id to get runs for.
:type event_id: str
:param workflow_id: The workflow id to get runs for.
:type workflow_id: str
:param parent_workflow_run_id: The parent workflow run id
:type parent_workflow_run_id: str
:param parent_step_run_id: The parent step run id
:type parent_step_run_id: str
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param created_after: The time after the workflow run was created
:type created_after: datetime
:param created_before: The time before the workflow run was created
:type created_before: datetime
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_get_metrics_serialize(
tenant=tenant,
event_id=event_id,
workflow_id=workflow_id,
parent_workflow_run_id=parent_workflow_run_id,
parent_step_run_id=parent_step_run_id,
additional_metadata=additional_metadata,
created_after=created_after,
created_before=created_before,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowRunsMetrics",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_run_get_metrics_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
event_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The event id to get runs for."),
] = None,
workflow_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The workflow id to get runs for."),
] = None,
parent_workflow_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent workflow run id"),
] = None,
parent_step_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent step run id"),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
created_after: Annotated[
Optional[datetime],
Field(description="The time after the workflow run was created"),
] = None,
created_before: Annotated[
Optional[datetime],
Field(description="The time before the workflow run was created"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[WorkflowRunsMetrics]:
"""Get workflow runs metrics
Get a summary of workflow run metrics for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param event_id: The event id to get runs for.
:type event_id: str
:param workflow_id: The workflow id to get runs for.
:type workflow_id: str
:param parent_workflow_run_id: The parent workflow run id
:type parent_workflow_run_id: str
:param parent_step_run_id: The parent step run id
:type parent_step_run_id: str
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param created_after: The time after the workflow run was created
:type created_after: datetime
:param created_before: The time before the workflow run was created
:type created_before: datetime
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_get_metrics_serialize(
tenant=tenant,
event_id=event_id,
workflow_id=workflow_id,
parent_workflow_run_id=parent_workflow_run_id,
parent_step_run_id=parent_step_run_id,
additional_metadata=additional_metadata,
created_after=created_after,
created_before=created_before,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowRunsMetrics",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_run_get_metrics_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
event_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The event id to get runs for."),
] = None,
workflow_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The workflow id to get runs for."),
] = None,
parent_workflow_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent workflow run id"),
] = None,
parent_step_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent step run id"),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
created_after: Annotated[
Optional[datetime],
Field(description="The time after the workflow run was created"),
] = None,
created_before: Annotated[
Optional[datetime],
Field(description="The time before the workflow run was created"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get workflow runs metrics
Get a summary of workflow run metrics for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param event_id: The event id to get runs for.
:type event_id: str
:param workflow_id: The workflow id to get runs for.
:type workflow_id: str
:param parent_workflow_run_id: The parent workflow run id
:type parent_workflow_run_id: str
:param parent_step_run_id: The parent step run id
:type parent_step_run_id: str
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param created_after: The time after the workflow run was created
:type created_after: datetime
:param created_before: The time before the workflow run was created
:type created_before: datetime
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_get_metrics_serialize(
tenant=tenant,
event_id=event_id,
workflow_id=workflow_id,
parent_workflow_run_id=parent_workflow_run_id,
parent_step_run_id=parent_step_run_id,
additional_metadata=additional_metadata,
created_after=created_after,
created_before=created_before,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowRunsMetrics",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_run_get_metrics_serialize(
self,
tenant,
event_id,
workflow_id,
parent_workflow_run_id,
parent_step_run_id,
additional_metadata,
created_after,
created_before,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
"additionalMetadata": "multi",
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
# process the query parameters
if event_id is not None:
_query_params.append(("eventId", event_id))
if workflow_id is not None:
_query_params.append(("workflowId", workflow_id))
if parent_workflow_run_id is not None:
_query_params.append(("parentWorkflowRunId", parent_workflow_run_id))
if parent_step_run_id is not None:
_query_params.append(("parentStepRunId", parent_step_run_id))
if additional_metadata is not None:
_query_params.append(("additionalMetadata", additional_metadata))
if created_after is not None:
if isinstance(created_after, datetime):
_query_params.append(
(
"createdAfter",
created_after.strftime(
self.api_client.configuration.datetime_format
),
)
)
else:
_query_params.append(("createdAfter", created_after))
if created_before is not None:
if isinstance(created_before, datetime):
_query_params.append(
(
"createdBefore",
created_before.strftime(
self.api_client.configuration.datetime_format
),
)
)
else:
_query_params.append(("createdBefore", created_before))
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/workflows/runs/metrics",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_run_get_shape(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The workflow run id",
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> WorkflowRunShape:
"""Get workflow run
Get a workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param workflow_run: The workflow run id (required)
:type workflow_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_get_shape_serialize(
tenant=tenant,
workflow_run=workflow_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowRunShape",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_run_get_shape_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The workflow run id",
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[WorkflowRunShape]:
"""Get workflow run
Get a workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param workflow_run: The workflow run id (required)
:type workflow_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_get_shape_serialize(
tenant=tenant,
workflow_run=workflow_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowRunShape",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_run_get_shape_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The workflow run id",
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get workflow run
Get a workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param workflow_run: The workflow run id (required)
:type workflow_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_get_shape_serialize(
tenant=tenant,
workflow_run=workflow_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowRunShape",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_run_get_shape_serialize(
self,
tenant,
workflow_run,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
if workflow_run is not None:
_path_params["workflow-run"] = workflow_run
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/workflow-runs/{workflow-run}/shape",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_run_list(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
event_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The event id to get runs for."),
] = None,
workflow_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The workflow id to get runs for."),
] = None,
parent_workflow_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent workflow run id"),
] = None,
parent_step_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent step run id"),
] = None,
statuses: Annotated[
Optional[List[WorkflowRunStatus]],
Field(description="A list of workflow run statuses to filter by"),
] = None,
kinds: Annotated[
Optional[List[WorkflowKind]],
Field(description="A list of workflow kinds to filter by"),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
created_after: Annotated[
Optional[datetime],
Field(description="The time after the workflow run was created"),
] = None,
created_before: Annotated[
Optional[datetime],
Field(description="The time before the workflow run was created"),
] = None,
finished_after: Annotated[
Optional[datetime],
Field(description="The time after the workflow run was finished"),
] = None,
finished_before: Annotated[
Optional[datetime],
Field(description="The time before the workflow run was finished"),
] = None,
order_by_field: Annotated[
Optional[WorkflowRunOrderByField], Field(description="The order by field")
] = None,
order_by_direction: Annotated[
Optional[WorkflowRunOrderByDirection],
Field(description="The order by direction"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> WorkflowRunList:
"""Get workflow runs
Get all workflow runs for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param event_id: The event id to get runs for.
:type event_id: str
:param workflow_id: The workflow id to get runs for.
:type workflow_id: str
:param parent_workflow_run_id: The parent workflow run id
:type parent_workflow_run_id: str
:param parent_step_run_id: The parent step run id
:type parent_step_run_id: str
:param statuses: A list of workflow run statuses to filter by
:type statuses: List[WorkflowRunStatus]
:param kinds: A list of workflow kinds to filter by
:type kinds: List[WorkflowKind]
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param created_after: The time after the workflow run was created
:type created_after: datetime
:param created_before: The time before the workflow run was created
:type created_before: datetime
:param finished_after: The time after the workflow run was finished
:type finished_after: datetime
:param finished_before: The time before the workflow run was finished
:type finished_before: datetime
:param order_by_field: The order by field
:type order_by_field: WorkflowRunOrderByField
:param order_by_direction: The order by direction
:type order_by_direction: WorkflowRunOrderByDirection
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_list_serialize(
tenant=tenant,
offset=offset,
limit=limit,
event_id=event_id,
workflow_id=workflow_id,
parent_workflow_run_id=parent_workflow_run_id,
parent_step_run_id=parent_step_run_id,
statuses=statuses,
kinds=kinds,
additional_metadata=additional_metadata,
created_after=created_after,
created_before=created_before,
finished_after=finished_after,
finished_before=finished_before,
order_by_field=order_by_field,
order_by_direction=order_by_direction,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowRunList",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_run_list_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
event_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The event id to get runs for."),
] = None,
workflow_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The workflow id to get runs for."),
] = None,
parent_workflow_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent workflow run id"),
] = None,
parent_step_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent step run id"),
] = None,
statuses: Annotated[
Optional[List[WorkflowRunStatus]],
Field(description="A list of workflow run statuses to filter by"),
] = None,
kinds: Annotated[
Optional[List[WorkflowKind]],
Field(description="A list of workflow kinds to filter by"),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
created_after: Annotated[
Optional[datetime],
Field(description="The time after the workflow run was created"),
] = None,
created_before: Annotated[
Optional[datetime],
Field(description="The time before the workflow run was created"),
] = None,
finished_after: Annotated[
Optional[datetime],
Field(description="The time after the workflow run was finished"),
] = None,
finished_before: Annotated[
Optional[datetime],
Field(description="The time before the workflow run was finished"),
] = None,
order_by_field: Annotated[
Optional[WorkflowRunOrderByField], Field(description="The order by field")
] = None,
order_by_direction: Annotated[
Optional[WorkflowRunOrderByDirection],
Field(description="The order by direction"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[WorkflowRunList]:
"""Get workflow runs
Get all workflow runs for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param event_id: The event id to get runs for.
:type event_id: str
:param workflow_id: The workflow id to get runs for.
:type workflow_id: str
:param parent_workflow_run_id: The parent workflow run id
:type parent_workflow_run_id: str
:param parent_step_run_id: The parent step run id
:type parent_step_run_id: str
:param statuses: A list of workflow run statuses to filter by
:type statuses: List[WorkflowRunStatus]
:param kinds: A list of workflow kinds to filter by
:type kinds: List[WorkflowKind]
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param created_after: The time after the workflow run was created
:type created_after: datetime
:param created_before: The time before the workflow run was created
:type created_before: datetime
:param finished_after: The time after the workflow run was finished
:type finished_after: datetime
:param finished_before: The time before the workflow run was finished
:type finished_before: datetime
:param order_by_field: The order by field
:type order_by_field: WorkflowRunOrderByField
:param order_by_direction: The order by direction
:type order_by_direction: WorkflowRunOrderByDirection
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_list_serialize(
tenant=tenant,
offset=offset,
limit=limit,
event_id=event_id,
workflow_id=workflow_id,
parent_workflow_run_id=parent_workflow_run_id,
parent_step_run_id=parent_step_run_id,
statuses=statuses,
kinds=kinds,
additional_metadata=additional_metadata,
created_after=created_after,
created_before=created_before,
finished_after=finished_after,
finished_before=finished_before,
order_by_field=order_by_field,
order_by_direction=order_by_direction,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowRunList",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_run_list_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
event_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The event id to get runs for."),
] = None,
workflow_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The workflow id to get runs for."),
] = None,
parent_workflow_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent workflow run id"),
] = None,
parent_step_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent step run id"),
] = None,
statuses: Annotated[
Optional[List[WorkflowRunStatus]],
Field(description="A list of workflow run statuses to filter by"),
] = None,
kinds: Annotated[
Optional[List[WorkflowKind]],
Field(description="A list of workflow kinds to filter by"),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
created_after: Annotated[
Optional[datetime],
Field(description="The time after the workflow run was created"),
] = None,
created_before: Annotated[
Optional[datetime],
Field(description="The time before the workflow run was created"),
] = None,
finished_after: Annotated[
Optional[datetime],
Field(description="The time after the workflow run was finished"),
] = None,
finished_before: Annotated[
Optional[datetime],
Field(description="The time before the workflow run was finished"),
] = None,
order_by_field: Annotated[
Optional[WorkflowRunOrderByField], Field(description="The order by field")
] = None,
order_by_direction: Annotated[
Optional[WorkflowRunOrderByDirection],
Field(description="The order by direction"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get workflow runs
Get all workflow runs for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param event_id: The event id to get runs for.
:type event_id: str
:param workflow_id: The workflow id to get runs for.
:type workflow_id: str
:param parent_workflow_run_id: The parent workflow run id
:type parent_workflow_run_id: str
:param parent_step_run_id: The parent step run id
:type parent_step_run_id: str
:param statuses: A list of workflow run statuses to filter by
:type statuses: List[WorkflowRunStatus]
:param kinds: A list of workflow kinds to filter by
:type kinds: List[WorkflowKind]
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param created_after: The time after the workflow run was created
:type created_after: datetime
:param created_before: The time before the workflow run was created
:type created_before: datetime
:param finished_after: The time after the workflow run was finished
:type finished_after: datetime
:param finished_before: The time before the workflow run was finished
:type finished_before: datetime
:param order_by_field: The order by field
:type order_by_field: WorkflowRunOrderByField
:param order_by_direction: The order by direction
:type order_by_direction: WorkflowRunOrderByDirection
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_run_list_serialize(
tenant=tenant,
offset=offset,
limit=limit,
event_id=event_id,
workflow_id=workflow_id,
parent_workflow_run_id=parent_workflow_run_id,
parent_step_run_id=parent_step_run_id,
statuses=statuses,
kinds=kinds,
additional_metadata=additional_metadata,
created_after=created_after,
created_before=created_before,
finished_after=finished_after,
finished_before=finished_before,
order_by_field=order_by_field,
order_by_direction=order_by_direction,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowRunList",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_run_list_serialize(
self,
tenant,
offset,
limit,
event_id,
workflow_id,
parent_workflow_run_id,
parent_step_run_id,
statuses,
kinds,
additional_metadata,
created_after,
created_before,
finished_after,
finished_before,
order_by_field,
order_by_direction,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
"statuses": "multi",
"kinds": "multi",
"additionalMetadata": "multi",
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
# process the query parameters
if offset is not None:
_query_params.append(("offset", offset))
if limit is not None:
_query_params.append(("limit", limit))
if event_id is not None:
_query_params.append(("eventId", event_id))
if workflow_id is not None:
_query_params.append(("workflowId", workflow_id))
if parent_workflow_run_id is not None:
_query_params.append(("parentWorkflowRunId", parent_workflow_run_id))
if parent_step_run_id is not None:
_query_params.append(("parentStepRunId", parent_step_run_id))
if statuses is not None:
_query_params.append(("statuses", statuses))
if kinds is not None:
_query_params.append(("kinds", kinds))
if additional_metadata is not None:
_query_params.append(("additionalMetadata", additional_metadata))
if created_after is not None:
if isinstance(created_after, datetime):
_query_params.append(
(
"createdAfter",
created_after.strftime(
self.api_client.configuration.datetime_format
),
)
)
else:
_query_params.append(("createdAfter", created_after))
if created_before is not None:
if isinstance(created_before, datetime):
_query_params.append(
(
"createdBefore",
created_before.strftime(
self.api_client.configuration.datetime_format
),
)
)
else:
_query_params.append(("createdBefore", created_before))
if finished_after is not None:
if isinstance(finished_after, datetime):
_query_params.append(
(
"finishedAfter",
finished_after.strftime(
self.api_client.configuration.datetime_format
),
)
)
else:
_query_params.append(("finishedAfter", finished_after))
if finished_before is not None:
if isinstance(finished_before, datetime):
_query_params.append(
(
"finishedBefore",
finished_before.strftime(
self.api_client.configuration.datetime_format
),
)
)
else:
_query_params.append(("finishedBefore", finished_before))
if order_by_field is not None:
_query_params.append(("orderByField", order_by_field.value))
if order_by_direction is not None:
_query_params.append(("orderByDirection", order_by_direction.value))
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/workflows/runs",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_scheduled_delete(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
scheduled_workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The scheduled workflow id",
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> None:
"""Delete scheduled workflow run
Delete a scheduled workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param scheduled_workflow_run: The scheduled workflow id (required)
:type scheduled_workflow_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_scheduled_delete_serialize(
tenant=tenant,
scheduled_workflow_run=scheduled_workflow_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"204": None,
"400": "APIErrors",
"403": "APIError",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_scheduled_delete_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
scheduled_workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The scheduled workflow id",
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[None]:
"""Delete scheduled workflow run
Delete a scheduled workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param scheduled_workflow_run: The scheduled workflow id (required)
:type scheduled_workflow_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_scheduled_delete_serialize(
tenant=tenant,
scheduled_workflow_run=scheduled_workflow_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"204": None,
"400": "APIErrors",
"403": "APIError",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_scheduled_delete_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
scheduled_workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The scheduled workflow id",
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Delete scheduled workflow run
Delete a scheduled workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param scheduled_workflow_run: The scheduled workflow id (required)
:type scheduled_workflow_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_scheduled_delete_serialize(
tenant=tenant,
scheduled_workflow_run=scheduled_workflow_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"204": None,
"400": "APIErrors",
"403": "APIError",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_scheduled_delete_serialize(
self,
tenant,
scheduled_workflow_run,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
if scheduled_workflow_run is not None:
_path_params["scheduled-workflow-run"] = scheduled_workflow_run
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="DELETE",
resource_path="/api/v1/tenants/{tenant}/workflows/scheduled/{scheduled-workflow-run}",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_scheduled_get(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
scheduled_workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The scheduled workflow id",
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ScheduledWorkflows:
"""Get scheduled workflow run
Get a scheduled workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param scheduled_workflow_run: The scheduled workflow id (required)
:type scheduled_workflow_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_scheduled_get_serialize(
tenant=tenant,
scheduled_workflow_run=scheduled_workflow_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "ScheduledWorkflows",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_scheduled_get_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
scheduled_workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The scheduled workflow id",
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[ScheduledWorkflows]:
"""Get scheduled workflow run
Get a scheduled workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param scheduled_workflow_run: The scheduled workflow id (required)
:type scheduled_workflow_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_scheduled_get_serialize(
tenant=tenant,
scheduled_workflow_run=scheduled_workflow_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "ScheduledWorkflows",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_scheduled_get_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
scheduled_workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The scheduled workflow id",
),
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get scheduled workflow run
Get a scheduled workflow run for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param scheduled_workflow_run: The scheduled workflow id (required)
:type scheduled_workflow_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_scheduled_get_serialize(
tenant=tenant,
scheduled_workflow_run=scheduled_workflow_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "ScheduledWorkflows",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_scheduled_get_serialize(
self,
tenant,
scheduled_workflow_run,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
if scheduled_workflow_run is not None:
_path_params["scheduled-workflow-run"] = scheduled_workflow_run
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/workflows/scheduled/{scheduled-workflow-run}",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_scheduled_list(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
order_by_field: Annotated[
Optional[ScheduledWorkflowsOrderByField],
Field(description="The order by field"),
] = None,
order_by_direction: Annotated[
Optional[WorkflowRunOrderByDirection],
Field(description="The order by direction"),
] = None,
workflow_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The workflow id to get runs for."),
] = None,
parent_workflow_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent workflow run id"),
] = None,
parent_step_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent step run id"),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
statuses: Annotated[
Optional[List[ScheduledRunStatus]],
Field(description="A list of scheduled run statuses to filter by"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ScheduledWorkflowsList:
"""Get scheduled workflow runs
Get all scheduled workflow runs for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param order_by_field: The order by field
:type order_by_field: ScheduledWorkflowsOrderByField
:param order_by_direction: The order by direction
:type order_by_direction: WorkflowRunOrderByDirection
:param workflow_id: The workflow id to get runs for.
:type workflow_id: str
:param parent_workflow_run_id: The parent workflow run id
:type parent_workflow_run_id: str
:param parent_step_run_id: The parent step run id
:type parent_step_run_id: str
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param statuses: A list of scheduled run statuses to filter by
:type statuses: List[ScheduledRunStatus]
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_scheduled_list_serialize(
tenant=tenant,
offset=offset,
limit=limit,
order_by_field=order_by_field,
order_by_direction=order_by_direction,
workflow_id=workflow_id,
parent_workflow_run_id=parent_workflow_run_id,
parent_step_run_id=parent_step_run_id,
additional_metadata=additional_metadata,
statuses=statuses,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "ScheduledWorkflowsList",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_scheduled_list_with_http_info(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
order_by_field: Annotated[
Optional[ScheduledWorkflowsOrderByField],
Field(description="The order by field"),
] = None,
order_by_direction: Annotated[
Optional[WorkflowRunOrderByDirection],
Field(description="The order by direction"),
] = None,
workflow_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The workflow id to get runs for."),
] = None,
parent_workflow_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent workflow run id"),
] = None,
parent_step_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent step run id"),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
statuses: Annotated[
Optional[List[ScheduledRunStatus]],
Field(description="A list of scheduled run statuses to filter by"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[ScheduledWorkflowsList]:
"""Get scheduled workflow runs
Get all scheduled workflow runs for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param order_by_field: The order by field
:type order_by_field: ScheduledWorkflowsOrderByField
:param order_by_direction: The order by direction
:type order_by_direction: WorkflowRunOrderByDirection
:param workflow_id: The workflow id to get runs for.
:type workflow_id: str
:param parent_workflow_run_id: The parent workflow run id
:type parent_workflow_run_id: str
:param parent_step_run_id: The parent step run id
:type parent_step_run_id: str
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param statuses: A list of scheduled run statuses to filter by
:type statuses: List[ScheduledRunStatus]
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_scheduled_list_serialize(
tenant=tenant,
offset=offset,
limit=limit,
order_by_field=order_by_field,
order_by_direction=order_by_direction,
workflow_id=workflow_id,
parent_workflow_run_id=parent_workflow_run_id,
parent_step_run_id=parent_step_run_id,
additional_metadata=additional_metadata,
statuses=statuses,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "ScheduledWorkflowsList",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_scheduled_list_without_preload_content(
self,
tenant: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The tenant id"
),
],
offset: Annotated[
Optional[StrictInt], Field(description="The number to skip")
] = None,
limit: Annotated[
Optional[StrictInt], Field(description="The number to limit by")
] = None,
order_by_field: Annotated[
Optional[ScheduledWorkflowsOrderByField],
Field(description="The order by field"),
] = None,
order_by_direction: Annotated[
Optional[WorkflowRunOrderByDirection],
Field(description="The order by direction"),
] = None,
workflow_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The workflow id to get runs for."),
] = None,
parent_workflow_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent workflow run id"),
] = None,
parent_step_run_id: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(description="The parent step run id"),
] = None,
additional_metadata: Annotated[
Optional[List[StrictStr]],
Field(description="A list of metadata key value pairs to filter by"),
] = None,
statuses: Annotated[
Optional[List[ScheduledRunStatus]],
Field(description="A list of scheduled run statuses to filter by"),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get scheduled workflow runs
Get all scheduled workflow runs for a tenant
:param tenant: The tenant id (required)
:type tenant: str
:param offset: The number to skip
:type offset: int
:param limit: The number to limit by
:type limit: int
:param order_by_field: The order by field
:type order_by_field: ScheduledWorkflowsOrderByField
:param order_by_direction: The order by direction
:type order_by_direction: WorkflowRunOrderByDirection
:param workflow_id: The workflow id to get runs for.
:type workflow_id: str
:param parent_workflow_run_id: The parent workflow run id
:type parent_workflow_run_id: str
:param parent_step_run_id: The parent step run id
:type parent_step_run_id: str
:param additional_metadata: A list of metadata key value pairs to filter by
:type additional_metadata: List[str]
:param statuses: A list of scheduled run statuses to filter by
:type statuses: List[ScheduledRunStatus]
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_scheduled_list_serialize(
tenant=tenant,
offset=offset,
limit=limit,
order_by_field=order_by_field,
order_by_direction=order_by_direction,
workflow_id=workflow_id,
parent_workflow_run_id=parent_workflow_run_id,
parent_step_run_id=parent_step_run_id,
additional_metadata=additional_metadata,
statuses=statuses,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "ScheduledWorkflowsList",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_scheduled_list_serialize(
self,
tenant,
offset,
limit,
order_by_field,
order_by_direction,
workflow_id,
parent_workflow_run_id,
parent_step_run_id,
additional_metadata,
statuses,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
"additionalMetadata": "multi",
"statuses": "multi",
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params["tenant"] = tenant
# process the query parameters
if offset is not None:
_query_params.append(("offset", offset))
if limit is not None:
_query_params.append(("limit", limit))
if order_by_field is not None:
_query_params.append(("orderByField", order_by_field.value))
if order_by_direction is not None:
_query_params.append(("orderByDirection", order_by_direction.value))
if workflow_id is not None:
_query_params.append(("workflowId", workflow_id))
if parent_workflow_run_id is not None:
_query_params.append(("parentWorkflowRunId", parent_workflow_run_id))
if parent_step_run_id is not None:
_query_params.append(("parentStepRunId", parent_step_run_id))
if additional_metadata is not None:
_query_params.append(("additionalMetadata", additional_metadata))
if statuses is not None:
_query_params.append(("statuses", statuses))
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/tenants/{tenant}/workflows/scheduled",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_update(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
workflow_update_request: Annotated[
WorkflowUpdateRequest, Field(description="The input to update the workflow")
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> Workflow:
"""Update workflow
Update a workflow for a tenant
:param workflow: The workflow id (required)
:type workflow: str
:param workflow_update_request: The input to update the workflow (required)
:type workflow_update_request: WorkflowUpdateRequest
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_update_serialize(
workflow=workflow,
workflow_update_request=workflow_update_request,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "Workflow",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_update_with_http_info(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
workflow_update_request: Annotated[
WorkflowUpdateRequest, Field(description="The input to update the workflow")
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[Workflow]:
"""Update workflow
Update a workflow for a tenant
:param workflow: The workflow id (required)
:type workflow: str
:param workflow_update_request: The input to update the workflow (required)
:type workflow_update_request: WorkflowUpdateRequest
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_update_serialize(
workflow=workflow,
workflow_update_request=workflow_update_request,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "Workflow",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_update_without_preload_content(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
workflow_update_request: Annotated[
WorkflowUpdateRequest, Field(description="The input to update the workflow")
],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Update workflow
Update a workflow for a tenant
:param workflow: The workflow id (required)
:type workflow: str
:param workflow_update_request: The input to update the workflow (required)
:type workflow_update_request: WorkflowUpdateRequest
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_update_serialize(
workflow=workflow,
workflow_update_request=workflow_update_request,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "Workflow",
"400": "APIErrors",
"403": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_update_serialize(
self,
workflow,
workflow_update_request,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if workflow is not None:
_path_params["workflow"] = workflow
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
if workflow_update_request is not None:
_body_params = workflow_update_request
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# set the HTTP header `Content-Type`
if _content_type:
_header_params["Content-Type"] = _content_type
else:
_default_content_type = self.api_client.select_header_content_type(
["application/json"]
)
if _default_content_type is not None:
_header_params["Content-Type"] = _default_content_type
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="PATCH",
resource_path="/api/v1/workflows/{workflow}",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
async def workflow_version_get(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
version: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(
description="The workflow version. If not supplied, the latest version is fetched."
),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> WorkflowVersion:
"""Get workflow version
Get a workflow version for a tenant
:param workflow: The workflow id (required)
:type workflow: str
:param version: The workflow version. If not supplied, the latest version is fetched.
:type version: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_version_get_serialize(
workflow=workflow,
version=version,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowVersion",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
async def workflow_version_get_with_http_info(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
version: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(
description="The workflow version. If not supplied, the latest version is fetched."
),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[WorkflowVersion]:
"""Get workflow version
Get a workflow version for a tenant
:param workflow: The workflow id (required)
:type workflow: str
:param version: The workflow version. If not supplied, the latest version is fetched.
:type version: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_version_get_serialize(
workflow=workflow,
version=version,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowVersion",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
await response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
async def workflow_version_get_without_preload_content(
self,
workflow: Annotated[
str,
Field(
min_length=36, strict=True, max_length=36, description="The workflow id"
),
],
version: Annotated[
Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]],
Field(
description="The workflow version. If not supplied, the latest version is fetched."
),
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Get workflow version
Get a workflow version for a tenant
:param workflow: The workflow id (required)
:type workflow: str
:param version: The workflow version. If not supplied, the latest version is fetched.
:type version: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._workflow_version_get_serialize(
workflow=workflow,
version=version,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "WorkflowVersion",
"400": "APIErrors",
"403": "APIErrors",
"404": "APIErrors",
}
response_data = await self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _workflow_version_get_serialize(
self,
workflow,
version,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if workflow is not None:
_path_params["workflow"] = workflow
# process the query parameters
if version is not None:
_query_params.append(("version", version))
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/workflows/{workflow}/versions",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)