# coding: utf-8 """ Hatchet API The Hatchet API The version of the OpenAPI document: 1.0.0 Generated by OpenAPI Generator (https://openapi-generator.tech) Do not edit the class manually. """ # noqa: E501 import warnings from datetime import datetime from typing import Any, Dict, List, Optional, Tuple, Union from pydantic import Field, StrictFloat, StrictInt, StrictStr, validate_call from typing_extensions import Annotated from hatchet_sdk.clients.rest.api_client import ApiClient, RequestSerialized from hatchet_sdk.clients.rest.api_response import ApiResponse from hatchet_sdk.clients.rest.models.cron_workflows import CronWorkflows from hatchet_sdk.clients.rest.models.cron_workflows_list import CronWorkflowsList from hatchet_sdk.clients.rest.models.cron_workflows_order_by_field import ( CronWorkflowsOrderByField, ) from hatchet_sdk.clients.rest.models.scheduled_run_status import ScheduledRunStatus from hatchet_sdk.clients.rest.models.scheduled_workflows import ScheduledWorkflows from hatchet_sdk.clients.rest.models.scheduled_workflows_list import ( ScheduledWorkflowsList, ) from hatchet_sdk.clients.rest.models.scheduled_workflows_order_by_field import ( ScheduledWorkflowsOrderByField, ) from hatchet_sdk.clients.rest.models.tenant_queue_metrics import TenantQueueMetrics from hatchet_sdk.clients.rest.models.workflow import Workflow from hatchet_sdk.clients.rest.models.workflow_kind import WorkflowKind from hatchet_sdk.clients.rest.models.workflow_list import WorkflowList from hatchet_sdk.clients.rest.models.workflow_metrics import WorkflowMetrics from hatchet_sdk.clients.rest.models.workflow_run import WorkflowRun from hatchet_sdk.clients.rest.models.workflow_run_list import WorkflowRunList from hatchet_sdk.clients.rest.models.workflow_run_order_by_direction import ( WorkflowRunOrderByDirection, ) from hatchet_sdk.clients.rest.models.workflow_run_order_by_field import ( WorkflowRunOrderByField, ) from hatchet_sdk.clients.rest.models.workflow_run_shape import WorkflowRunShape from hatchet_sdk.clients.rest.models.workflow_run_status import WorkflowRunStatus from hatchet_sdk.clients.rest.models.workflow_runs_metrics import WorkflowRunsMetrics from hatchet_sdk.clients.rest.models.workflow_update_request import ( WorkflowUpdateRequest, ) from hatchet_sdk.clients.rest.models.workflow_version import WorkflowVersion from hatchet_sdk.clients.rest.models.workflow_workers_count import WorkflowWorkersCount from hatchet_sdk.clients.rest.rest import RESTResponseType class WorkflowApi: """NOTE: This class is auto generated by OpenAPI Generator Ref: https://openapi-generator.tech Do not edit the class manually. """ def __init__(self, api_client=None) -> None: if api_client is None: api_client = ApiClient.get_default() self.api_client = api_client @validate_call async def cron_workflow_list( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, workflow_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, order_by_field: Annotated[ Optional[CronWorkflowsOrderByField], Field(description="The order by field") ] = None, order_by_direction: Annotated[ Optional[WorkflowRunOrderByDirection], Field(description="The order by direction"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> CronWorkflowsList: """Get cron job workflows Get all cron job workflow triggers for a tenant :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param workflow_id: The workflow id to get runs for. :type workflow_id: str :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param order_by_field: The order by field :type order_by_field: CronWorkflowsOrderByField :param order_by_direction: The order by direction :type order_by_direction: WorkflowRunOrderByDirection :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._cron_workflow_list_serialize( tenant=tenant, offset=offset, limit=limit, workflow_id=workflow_id, additional_metadata=additional_metadata, order_by_field=order_by_field, order_by_direction=order_by_direction, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "CronWorkflowsList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def cron_workflow_list_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, workflow_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, order_by_field: Annotated[ Optional[CronWorkflowsOrderByField], Field(description="The order by field") ] = None, order_by_direction: Annotated[ Optional[WorkflowRunOrderByDirection], Field(description="The order by direction"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[CronWorkflowsList]: """Get cron job workflows Get all cron job workflow triggers for a tenant :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param workflow_id: The workflow id to get runs for. :type workflow_id: str :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param order_by_field: The order by field :type order_by_field: CronWorkflowsOrderByField :param order_by_direction: The order by direction :type order_by_direction: WorkflowRunOrderByDirection :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._cron_workflow_list_serialize( tenant=tenant, offset=offset, limit=limit, workflow_id=workflow_id, additional_metadata=additional_metadata, order_by_field=order_by_field, order_by_direction=order_by_direction, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "CronWorkflowsList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def cron_workflow_list_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, workflow_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, order_by_field: Annotated[ Optional[CronWorkflowsOrderByField], Field(description="The order by field") ] = None, order_by_direction: Annotated[ Optional[WorkflowRunOrderByDirection], Field(description="The order by direction"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get cron job workflows Get all cron job workflow triggers for a tenant :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param workflow_id: The workflow id to get runs for. :type workflow_id: str :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param order_by_field: The order by field :type order_by_field: CronWorkflowsOrderByField :param order_by_direction: The order by direction :type order_by_direction: WorkflowRunOrderByDirection :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._cron_workflow_list_serialize( tenant=tenant, offset=offset, limit=limit, workflow_id=workflow_id, additional_metadata=additional_metadata, order_by_field=order_by_field, order_by_direction=order_by_direction, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "CronWorkflowsList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _cron_workflow_list_serialize( self, tenant, offset, limit, workflow_id, additional_metadata, order_by_field, order_by_direction, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { "additionalMetadata": "multi", } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant # process the query parameters if offset is not None: _query_params.append(("offset", offset)) if limit is not None: _query_params.append(("limit", limit)) if workflow_id is not None: _query_params.append(("workflowId", workflow_id)) if additional_metadata is not None: _query_params.append(("additionalMetadata", additional_metadata)) if order_by_field is not None: _query_params.append(("orderByField", order_by_field.value)) if order_by_direction is not None: _query_params.append(("orderByDirection", order_by_direction.value)) # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/workflows/crons", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def tenant_get_queue_metrics( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflows: Annotated[ Optional[List[StrictStr]], Field(description="A list of workflow IDs to filter by"), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> TenantQueueMetrics: """Get workflow metrics Get the queue metrics for the tenant :param tenant: The tenant id (required) :type tenant: str :param workflows: A list of workflow IDs to filter by :type workflows: List[str] :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._tenant_get_queue_metrics_serialize( tenant=tenant, workflows=workflows, additional_metadata=additional_metadata, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "TenantQueueMetrics", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def tenant_get_queue_metrics_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflows: Annotated[ Optional[List[StrictStr]], Field(description="A list of workflow IDs to filter by"), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[TenantQueueMetrics]: """Get workflow metrics Get the queue metrics for the tenant :param tenant: The tenant id (required) :type tenant: str :param workflows: A list of workflow IDs to filter by :type workflows: List[str] :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._tenant_get_queue_metrics_serialize( tenant=tenant, workflows=workflows, additional_metadata=additional_metadata, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "TenantQueueMetrics", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def tenant_get_queue_metrics_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflows: Annotated[ Optional[List[StrictStr]], Field(description="A list of workflow IDs to filter by"), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get workflow metrics Get the queue metrics for the tenant :param tenant: The tenant id (required) :type tenant: str :param workflows: A list of workflow IDs to filter by :type workflows: List[str] :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._tenant_get_queue_metrics_serialize( tenant=tenant, workflows=workflows, additional_metadata=additional_metadata, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "TenantQueueMetrics", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _tenant_get_queue_metrics_serialize( self, tenant, workflows, additional_metadata, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { "workflows": "multi", "additionalMetadata": "multi", } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant # process the query parameters if workflows is not None: _query_params.append(("workflows", workflows)) if additional_metadata is not None: _query_params.append(("additionalMetadata", additional_metadata)) # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/queue-metrics", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_cron_delete( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], cron_workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The cron job id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> None: """Delete cron job workflow run Delete a cron job workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param cron_workflow: The cron job id (required) :type cron_workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_cron_delete_serialize( tenant=tenant, cron_workflow=cron_workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "204": None, "400": "APIErrors", "403": "APIError", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_cron_delete_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], cron_workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The cron job id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[None]: """Delete cron job workflow run Delete a cron job workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param cron_workflow: The cron job id (required) :type cron_workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_cron_delete_serialize( tenant=tenant, cron_workflow=cron_workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "204": None, "400": "APIErrors", "403": "APIError", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_cron_delete_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], cron_workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The cron job id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Delete cron job workflow run Delete a cron job workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param cron_workflow: The cron job id (required) :type cron_workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_cron_delete_serialize( tenant=tenant, cron_workflow=cron_workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "204": None, "400": "APIErrors", "403": "APIError", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_cron_delete_serialize( self, tenant, cron_workflow, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant if cron_workflow is not None: _path_params["cron-workflow"] = cron_workflow # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="DELETE", resource_path="/api/v1/tenants/{tenant}/workflows/crons/{cron-workflow}", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_cron_get( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], cron_workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The cron job id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> CronWorkflows: """Get cron job workflow run Get a cron job workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param cron_workflow: The cron job id (required) :type cron_workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_cron_get_serialize( tenant=tenant, cron_workflow=cron_workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "CronWorkflows", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_cron_get_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], cron_workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The cron job id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[CronWorkflows]: """Get cron job workflow run Get a cron job workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param cron_workflow: The cron job id (required) :type cron_workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_cron_get_serialize( tenant=tenant, cron_workflow=cron_workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "CronWorkflows", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_cron_get_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], cron_workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The cron job id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get cron job workflow run Get a cron job workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param cron_workflow: The cron job id (required) :type cron_workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_cron_get_serialize( tenant=tenant, cron_workflow=cron_workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "CronWorkflows", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_cron_get_serialize( self, tenant, cron_workflow, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant if cron_workflow is not None: _path_params["cron-workflow"] = cron_workflow # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/workflows/crons/{cron-workflow}", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_delete( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> None: """Delete workflow Delete a workflow for a tenant :param workflow: The workflow id (required) :type workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_delete_serialize( workflow=workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "204": None, "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_delete_with_http_info( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[None]: """Delete workflow Delete a workflow for a tenant :param workflow: The workflow id (required) :type workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_delete_serialize( workflow=workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "204": None, "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_delete_without_preload_content( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Delete workflow Delete a workflow for a tenant :param workflow: The workflow id (required) :type workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_delete_serialize( workflow=workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "204": None, "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_delete_serialize( self, workflow, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if workflow is not None: _path_params["workflow"] = workflow # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="DELETE", resource_path="/api/v1/workflows/{workflow}", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_get( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> Workflow: """Get workflow Get a workflow for a tenant :param workflow: The workflow id (required) :type workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_get_serialize( workflow=workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Workflow", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_get_with_http_info( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[Workflow]: """Get workflow Get a workflow for a tenant :param workflow: The workflow id (required) :type workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_get_serialize( workflow=workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Workflow", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_get_without_preload_content( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get workflow Get a workflow for a tenant :param workflow: The workflow id (required) :type workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_get_serialize( workflow=workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Workflow", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_get_serialize( self, workflow, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if workflow is not None: _path_params["workflow"] = workflow # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/workflows/{workflow}", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_get_metrics( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], status: Annotated[ Optional[WorkflowRunStatus], Field(description="A status of workflow run statuses to filter by"), ] = None, group_key: Annotated[ Optional[StrictStr], Field(description="A group key to filter metrics by") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> WorkflowMetrics: """Get workflow metrics Get the metrics for a workflow version :param workflow: The workflow id (required) :type workflow: str :param status: A status of workflow run statuses to filter by :type status: WorkflowRunStatus :param group_key: A group key to filter metrics by :type group_key: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_get_metrics_serialize( workflow=workflow, status=status, group_key=group_key, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowMetrics", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_get_metrics_with_http_info( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], status: Annotated[ Optional[WorkflowRunStatus], Field(description="A status of workflow run statuses to filter by"), ] = None, group_key: Annotated[ Optional[StrictStr], Field(description="A group key to filter metrics by") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[WorkflowMetrics]: """Get workflow metrics Get the metrics for a workflow version :param workflow: The workflow id (required) :type workflow: str :param status: A status of workflow run statuses to filter by :type status: WorkflowRunStatus :param group_key: A group key to filter metrics by :type group_key: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_get_metrics_serialize( workflow=workflow, status=status, group_key=group_key, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowMetrics", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_get_metrics_without_preload_content( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], status: Annotated[ Optional[WorkflowRunStatus], Field(description="A status of workflow run statuses to filter by"), ] = None, group_key: Annotated[ Optional[StrictStr], Field(description="A group key to filter metrics by") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get workflow metrics Get the metrics for a workflow version :param workflow: The workflow id (required) :type workflow: str :param status: A status of workflow run statuses to filter by :type status: WorkflowRunStatus :param group_key: A group key to filter metrics by :type group_key: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_get_metrics_serialize( workflow=workflow, status=status, group_key=group_key, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowMetrics", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_get_metrics_serialize( self, workflow, status, group_key, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if workflow is not None: _path_params["workflow"] = workflow # process the query parameters if status is not None: _query_params.append(("status", status.value)) if group_key is not None: _query_params.append(("groupKey", group_key)) # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/workflows/{workflow}/metrics", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_get_workers_count( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> WorkflowWorkersCount: """Get workflow worker count Get a count of the workers available for workflow :param tenant: The tenant id (required) :type tenant: str :param workflow: The workflow id (required) :type workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_get_workers_count_serialize( tenant=tenant, workflow=workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowWorkersCount", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_get_workers_count_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[WorkflowWorkersCount]: """Get workflow worker count Get a count of the workers available for workflow :param tenant: The tenant id (required) :type tenant: str :param workflow: The workflow id (required) :type workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_get_workers_count_serialize( tenant=tenant, workflow=workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowWorkersCount", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_get_workers_count_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get workflow worker count Get a count of the workers available for workflow :param tenant: The tenant id (required) :type tenant: str :param workflow: The workflow id (required) :type workflow: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_get_workers_count_serialize( tenant=tenant, workflow=workflow, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowWorkersCount", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_get_workers_count_serialize( self, tenant, workflow, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant if workflow is not None: _path_params["workflow"] = workflow # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/workflows/{workflow}/worker-count", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_list( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, name: Annotated[ Optional[StrictStr], Field(description="Search by name") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> WorkflowList: """Get workflows Get all workflows for a tenant :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param name: Search by name :type name: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_list_serialize( tenant=tenant, offset=offset, limit=limit, name=name, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_list_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, name: Annotated[ Optional[StrictStr], Field(description="Search by name") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[WorkflowList]: """Get workflows Get all workflows for a tenant :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param name: Search by name :type name: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_list_serialize( tenant=tenant, offset=offset, limit=limit, name=name, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_list_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, name: Annotated[ Optional[StrictStr], Field(description="Search by name") ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get workflows Get all workflows for a tenant :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param name: Search by name :type name: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_list_serialize( tenant=tenant, offset=offset, limit=limit, name=name, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_list_serialize( self, tenant, offset, limit, name, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant # process the query parameters if offset is not None: _query_params.append(("offset", offset)) if limit is not None: _query_params.append(("limit", limit)) if name is not None: _query_params.append(("name", name)) # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/workflows", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_run_get( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow run id", ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> WorkflowRun: """Get workflow run Get a workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param workflow_run: The workflow run id (required) :type workflow_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_get_serialize( tenant=tenant, workflow_run=workflow_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowRun", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_run_get_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow run id", ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[WorkflowRun]: """Get workflow run Get a workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param workflow_run: The workflow run id (required) :type workflow_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_get_serialize( tenant=tenant, workflow_run=workflow_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowRun", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_run_get_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow run id", ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get workflow run Get a workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param workflow_run: The workflow run id (required) :type workflow_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_get_serialize( tenant=tenant, workflow_run=workflow_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowRun", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_run_get_serialize( self, tenant, workflow_run, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant if workflow_run is not None: _path_params["workflow-run"] = workflow_run # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/workflow-runs/{workflow-run}", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_run_get_metrics( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], event_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The event id to get runs for."), ] = None, workflow_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, parent_workflow_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent workflow run id"), ] = None, parent_step_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent step run id"), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, created_after: Annotated[ Optional[datetime], Field(description="The time after the workflow run was created"), ] = None, created_before: Annotated[ Optional[datetime], Field(description="The time before the workflow run was created"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> WorkflowRunsMetrics: """Get workflow runs metrics Get a summary of workflow run metrics for a tenant :param tenant: The tenant id (required) :type tenant: str :param event_id: The event id to get runs for. :type event_id: str :param workflow_id: The workflow id to get runs for. :type workflow_id: str :param parent_workflow_run_id: The parent workflow run id :type parent_workflow_run_id: str :param parent_step_run_id: The parent step run id :type parent_step_run_id: str :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param created_after: The time after the workflow run was created :type created_after: datetime :param created_before: The time before the workflow run was created :type created_before: datetime :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_get_metrics_serialize( tenant=tenant, event_id=event_id, workflow_id=workflow_id, parent_workflow_run_id=parent_workflow_run_id, parent_step_run_id=parent_step_run_id, additional_metadata=additional_metadata, created_after=created_after, created_before=created_before, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowRunsMetrics", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_run_get_metrics_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], event_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The event id to get runs for."), ] = None, workflow_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, parent_workflow_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent workflow run id"), ] = None, parent_step_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent step run id"), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, created_after: Annotated[ Optional[datetime], Field(description="The time after the workflow run was created"), ] = None, created_before: Annotated[ Optional[datetime], Field(description="The time before the workflow run was created"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[WorkflowRunsMetrics]: """Get workflow runs metrics Get a summary of workflow run metrics for a tenant :param tenant: The tenant id (required) :type tenant: str :param event_id: The event id to get runs for. :type event_id: str :param workflow_id: The workflow id to get runs for. :type workflow_id: str :param parent_workflow_run_id: The parent workflow run id :type parent_workflow_run_id: str :param parent_step_run_id: The parent step run id :type parent_step_run_id: str :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param created_after: The time after the workflow run was created :type created_after: datetime :param created_before: The time before the workflow run was created :type created_before: datetime :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_get_metrics_serialize( tenant=tenant, event_id=event_id, workflow_id=workflow_id, parent_workflow_run_id=parent_workflow_run_id, parent_step_run_id=parent_step_run_id, additional_metadata=additional_metadata, created_after=created_after, created_before=created_before, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowRunsMetrics", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_run_get_metrics_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], event_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The event id to get runs for."), ] = None, workflow_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, parent_workflow_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent workflow run id"), ] = None, parent_step_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent step run id"), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, created_after: Annotated[ Optional[datetime], Field(description="The time after the workflow run was created"), ] = None, created_before: Annotated[ Optional[datetime], Field(description="The time before the workflow run was created"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get workflow runs metrics Get a summary of workflow run metrics for a tenant :param tenant: The tenant id (required) :type tenant: str :param event_id: The event id to get runs for. :type event_id: str :param workflow_id: The workflow id to get runs for. :type workflow_id: str :param parent_workflow_run_id: The parent workflow run id :type parent_workflow_run_id: str :param parent_step_run_id: The parent step run id :type parent_step_run_id: str :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param created_after: The time after the workflow run was created :type created_after: datetime :param created_before: The time before the workflow run was created :type created_before: datetime :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_get_metrics_serialize( tenant=tenant, event_id=event_id, workflow_id=workflow_id, parent_workflow_run_id=parent_workflow_run_id, parent_step_run_id=parent_step_run_id, additional_metadata=additional_metadata, created_after=created_after, created_before=created_before, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowRunsMetrics", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_run_get_metrics_serialize( self, tenant, event_id, workflow_id, parent_workflow_run_id, parent_step_run_id, additional_metadata, created_after, created_before, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { "additionalMetadata": "multi", } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant # process the query parameters if event_id is not None: _query_params.append(("eventId", event_id)) if workflow_id is not None: _query_params.append(("workflowId", workflow_id)) if parent_workflow_run_id is not None: _query_params.append(("parentWorkflowRunId", parent_workflow_run_id)) if parent_step_run_id is not None: _query_params.append(("parentStepRunId", parent_step_run_id)) if additional_metadata is not None: _query_params.append(("additionalMetadata", additional_metadata)) if created_after is not None: if isinstance(created_after, datetime): _query_params.append( ( "createdAfter", created_after.strftime( self.api_client.configuration.datetime_format ), ) ) else: _query_params.append(("createdAfter", created_after)) if created_before is not None: if isinstance(created_before, datetime): _query_params.append( ( "createdBefore", created_before.strftime( self.api_client.configuration.datetime_format ), ) ) else: _query_params.append(("createdBefore", created_before)) # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/workflows/runs/metrics", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_run_get_shape( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow run id", ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> WorkflowRunShape: """Get workflow run Get a workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param workflow_run: The workflow run id (required) :type workflow_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_get_shape_serialize( tenant=tenant, workflow_run=workflow_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowRunShape", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_run_get_shape_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow run id", ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[WorkflowRunShape]: """Get workflow run Get a workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param workflow_run: The workflow run id (required) :type workflow_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_get_shape_serialize( tenant=tenant, workflow_run=workflow_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowRunShape", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_run_get_shape_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow run id", ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get workflow run Get a workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param workflow_run: The workflow run id (required) :type workflow_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_get_shape_serialize( tenant=tenant, workflow_run=workflow_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowRunShape", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_run_get_shape_serialize( self, tenant, workflow_run, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant if workflow_run is not None: _path_params["workflow-run"] = workflow_run # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/workflow-runs/{workflow-run}/shape", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_run_list( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, event_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The event id to get runs for."), ] = None, workflow_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, parent_workflow_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent workflow run id"), ] = None, parent_step_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent step run id"), ] = None, statuses: Annotated[ Optional[List[WorkflowRunStatus]], Field(description="A list of workflow run statuses to filter by"), ] = None, kinds: Annotated[ Optional[List[WorkflowKind]], Field(description="A list of workflow kinds to filter by"), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, created_after: Annotated[ Optional[datetime], Field(description="The time after the workflow run was created"), ] = None, created_before: Annotated[ Optional[datetime], Field(description="The time before the workflow run was created"), ] = None, finished_after: Annotated[ Optional[datetime], Field(description="The time after the workflow run was finished"), ] = None, finished_before: Annotated[ Optional[datetime], Field(description="The time before the workflow run was finished"), ] = None, order_by_field: Annotated[ Optional[WorkflowRunOrderByField], Field(description="The order by field") ] = None, order_by_direction: Annotated[ Optional[WorkflowRunOrderByDirection], Field(description="The order by direction"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> WorkflowRunList: """Get workflow runs Get all workflow runs for a tenant :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param event_id: The event id to get runs for. :type event_id: str :param workflow_id: The workflow id to get runs for. :type workflow_id: str :param parent_workflow_run_id: The parent workflow run id :type parent_workflow_run_id: str :param parent_step_run_id: The parent step run id :type parent_step_run_id: str :param statuses: A list of workflow run statuses to filter by :type statuses: List[WorkflowRunStatus] :param kinds: A list of workflow kinds to filter by :type kinds: List[WorkflowKind] :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param created_after: The time after the workflow run was created :type created_after: datetime :param created_before: The time before the workflow run was created :type created_before: datetime :param finished_after: The time after the workflow run was finished :type finished_after: datetime :param finished_before: The time before the workflow run was finished :type finished_before: datetime :param order_by_field: The order by field :type order_by_field: WorkflowRunOrderByField :param order_by_direction: The order by direction :type order_by_direction: WorkflowRunOrderByDirection :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_list_serialize( tenant=tenant, offset=offset, limit=limit, event_id=event_id, workflow_id=workflow_id, parent_workflow_run_id=parent_workflow_run_id, parent_step_run_id=parent_step_run_id, statuses=statuses, kinds=kinds, additional_metadata=additional_metadata, created_after=created_after, created_before=created_before, finished_after=finished_after, finished_before=finished_before, order_by_field=order_by_field, order_by_direction=order_by_direction, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowRunList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_run_list_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, event_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The event id to get runs for."), ] = None, workflow_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, parent_workflow_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent workflow run id"), ] = None, parent_step_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent step run id"), ] = None, statuses: Annotated[ Optional[List[WorkflowRunStatus]], Field(description="A list of workflow run statuses to filter by"), ] = None, kinds: Annotated[ Optional[List[WorkflowKind]], Field(description="A list of workflow kinds to filter by"), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, created_after: Annotated[ Optional[datetime], Field(description="The time after the workflow run was created"), ] = None, created_before: Annotated[ Optional[datetime], Field(description="The time before the workflow run was created"), ] = None, finished_after: Annotated[ Optional[datetime], Field(description="The time after the workflow run was finished"), ] = None, finished_before: Annotated[ Optional[datetime], Field(description="The time before the workflow run was finished"), ] = None, order_by_field: Annotated[ Optional[WorkflowRunOrderByField], Field(description="The order by field") ] = None, order_by_direction: Annotated[ Optional[WorkflowRunOrderByDirection], Field(description="The order by direction"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[WorkflowRunList]: """Get workflow runs Get all workflow runs for a tenant :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param event_id: The event id to get runs for. :type event_id: str :param workflow_id: The workflow id to get runs for. :type workflow_id: str :param parent_workflow_run_id: The parent workflow run id :type parent_workflow_run_id: str :param parent_step_run_id: The parent step run id :type parent_step_run_id: str :param statuses: A list of workflow run statuses to filter by :type statuses: List[WorkflowRunStatus] :param kinds: A list of workflow kinds to filter by :type kinds: List[WorkflowKind] :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param created_after: The time after the workflow run was created :type created_after: datetime :param created_before: The time before the workflow run was created :type created_before: datetime :param finished_after: The time after the workflow run was finished :type finished_after: datetime :param finished_before: The time before the workflow run was finished :type finished_before: datetime :param order_by_field: The order by field :type order_by_field: WorkflowRunOrderByField :param order_by_direction: The order by direction :type order_by_direction: WorkflowRunOrderByDirection :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_list_serialize( tenant=tenant, offset=offset, limit=limit, event_id=event_id, workflow_id=workflow_id, parent_workflow_run_id=parent_workflow_run_id, parent_step_run_id=parent_step_run_id, statuses=statuses, kinds=kinds, additional_metadata=additional_metadata, created_after=created_after, created_before=created_before, finished_after=finished_after, finished_before=finished_before, order_by_field=order_by_field, order_by_direction=order_by_direction, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowRunList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_run_list_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, event_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The event id to get runs for."), ] = None, workflow_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, parent_workflow_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent workflow run id"), ] = None, parent_step_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent step run id"), ] = None, statuses: Annotated[ Optional[List[WorkflowRunStatus]], Field(description="A list of workflow run statuses to filter by"), ] = None, kinds: Annotated[ Optional[List[WorkflowKind]], Field(description="A list of workflow kinds to filter by"), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, created_after: Annotated[ Optional[datetime], Field(description="The time after the workflow run was created"), ] = None, created_before: Annotated[ Optional[datetime], Field(description="The time before the workflow run was created"), ] = None, finished_after: Annotated[ Optional[datetime], Field(description="The time after the workflow run was finished"), ] = None, finished_before: Annotated[ Optional[datetime], Field(description="The time before the workflow run was finished"), ] = None, order_by_field: Annotated[ Optional[WorkflowRunOrderByField], Field(description="The order by field") ] = None, order_by_direction: Annotated[ Optional[WorkflowRunOrderByDirection], Field(description="The order by direction"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get workflow runs Get all workflow runs for a tenant :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param event_id: The event id to get runs for. :type event_id: str :param workflow_id: The workflow id to get runs for. :type workflow_id: str :param parent_workflow_run_id: The parent workflow run id :type parent_workflow_run_id: str :param parent_step_run_id: The parent step run id :type parent_step_run_id: str :param statuses: A list of workflow run statuses to filter by :type statuses: List[WorkflowRunStatus] :param kinds: A list of workflow kinds to filter by :type kinds: List[WorkflowKind] :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param created_after: The time after the workflow run was created :type created_after: datetime :param created_before: The time before the workflow run was created :type created_before: datetime :param finished_after: The time after the workflow run was finished :type finished_after: datetime :param finished_before: The time before the workflow run was finished :type finished_before: datetime :param order_by_field: The order by field :type order_by_field: WorkflowRunOrderByField :param order_by_direction: The order by direction :type order_by_direction: WorkflowRunOrderByDirection :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_run_list_serialize( tenant=tenant, offset=offset, limit=limit, event_id=event_id, workflow_id=workflow_id, parent_workflow_run_id=parent_workflow_run_id, parent_step_run_id=parent_step_run_id, statuses=statuses, kinds=kinds, additional_metadata=additional_metadata, created_after=created_after, created_before=created_before, finished_after=finished_after, finished_before=finished_before, order_by_field=order_by_field, order_by_direction=order_by_direction, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowRunList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_run_list_serialize( self, tenant, offset, limit, event_id, workflow_id, parent_workflow_run_id, parent_step_run_id, statuses, kinds, additional_metadata, created_after, created_before, finished_after, finished_before, order_by_field, order_by_direction, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { "statuses": "multi", "kinds": "multi", "additionalMetadata": "multi", } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant # process the query parameters if offset is not None: _query_params.append(("offset", offset)) if limit is not None: _query_params.append(("limit", limit)) if event_id is not None: _query_params.append(("eventId", event_id)) if workflow_id is not None: _query_params.append(("workflowId", workflow_id)) if parent_workflow_run_id is not None: _query_params.append(("parentWorkflowRunId", parent_workflow_run_id)) if parent_step_run_id is not None: _query_params.append(("parentStepRunId", parent_step_run_id)) if statuses is not None: _query_params.append(("statuses", statuses)) if kinds is not None: _query_params.append(("kinds", kinds)) if additional_metadata is not None: _query_params.append(("additionalMetadata", additional_metadata)) if created_after is not None: if isinstance(created_after, datetime): _query_params.append( ( "createdAfter", created_after.strftime( self.api_client.configuration.datetime_format ), ) ) else: _query_params.append(("createdAfter", created_after)) if created_before is not None: if isinstance(created_before, datetime): _query_params.append( ( "createdBefore", created_before.strftime( self.api_client.configuration.datetime_format ), ) ) else: _query_params.append(("createdBefore", created_before)) if finished_after is not None: if isinstance(finished_after, datetime): _query_params.append( ( "finishedAfter", finished_after.strftime( self.api_client.configuration.datetime_format ), ) ) else: _query_params.append(("finishedAfter", finished_after)) if finished_before is not None: if isinstance(finished_before, datetime): _query_params.append( ( "finishedBefore", finished_before.strftime( self.api_client.configuration.datetime_format ), ) ) else: _query_params.append(("finishedBefore", finished_before)) if order_by_field is not None: _query_params.append(("orderByField", order_by_field.value)) if order_by_direction is not None: _query_params.append(("orderByDirection", order_by_direction.value)) # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/workflows/runs", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_scheduled_delete( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], scheduled_workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The scheduled workflow id", ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> None: """Delete scheduled workflow run Delete a scheduled workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param scheduled_workflow_run: The scheduled workflow id (required) :type scheduled_workflow_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_scheduled_delete_serialize( tenant=tenant, scheduled_workflow_run=scheduled_workflow_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "204": None, "400": "APIErrors", "403": "APIError", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_scheduled_delete_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], scheduled_workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The scheduled workflow id", ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[None]: """Delete scheduled workflow run Delete a scheduled workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param scheduled_workflow_run: The scheduled workflow id (required) :type scheduled_workflow_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_scheduled_delete_serialize( tenant=tenant, scheduled_workflow_run=scheduled_workflow_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "204": None, "400": "APIErrors", "403": "APIError", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_scheduled_delete_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], scheduled_workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The scheduled workflow id", ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Delete scheduled workflow run Delete a scheduled workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param scheduled_workflow_run: The scheduled workflow id (required) :type scheduled_workflow_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_scheduled_delete_serialize( tenant=tenant, scheduled_workflow_run=scheduled_workflow_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "204": None, "400": "APIErrors", "403": "APIError", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_scheduled_delete_serialize( self, tenant, scheduled_workflow_run, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant if scheduled_workflow_run is not None: _path_params["scheduled-workflow-run"] = scheduled_workflow_run # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="DELETE", resource_path="/api/v1/tenants/{tenant}/workflows/scheduled/{scheduled-workflow-run}", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_scheduled_get( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], scheduled_workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The scheduled workflow id", ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ScheduledWorkflows: """Get scheduled workflow run Get a scheduled workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param scheduled_workflow_run: The scheduled workflow id (required) :type scheduled_workflow_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_scheduled_get_serialize( tenant=tenant, scheduled_workflow_run=scheduled_workflow_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "ScheduledWorkflows", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_scheduled_get_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], scheduled_workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The scheduled workflow id", ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[ScheduledWorkflows]: """Get scheduled workflow run Get a scheduled workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param scheduled_workflow_run: The scheduled workflow id (required) :type scheduled_workflow_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_scheduled_get_serialize( tenant=tenant, scheduled_workflow_run=scheduled_workflow_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "ScheduledWorkflows", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_scheduled_get_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], scheduled_workflow_run: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The scheduled workflow id", ), ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get scheduled workflow run Get a scheduled workflow run for a tenant :param tenant: The tenant id (required) :type tenant: str :param scheduled_workflow_run: The scheduled workflow id (required) :type scheduled_workflow_run: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_scheduled_get_serialize( tenant=tenant, scheduled_workflow_run=scheduled_workflow_run, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "ScheduledWorkflows", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_scheduled_get_serialize( self, tenant, scheduled_workflow_run, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant if scheduled_workflow_run is not None: _path_params["scheduled-workflow-run"] = scheduled_workflow_run # process the query parameters # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/workflows/scheduled/{scheduled-workflow-run}", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_scheduled_list( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, order_by_field: Annotated[ Optional[ScheduledWorkflowsOrderByField], Field(description="The order by field"), ] = None, order_by_direction: Annotated[ Optional[WorkflowRunOrderByDirection], Field(description="The order by direction"), ] = None, workflow_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, parent_workflow_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent workflow run id"), ] = None, parent_step_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent step run id"), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, statuses: Annotated[ Optional[List[ScheduledRunStatus]], Field(description="A list of scheduled run statuses to filter by"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ScheduledWorkflowsList: """Get scheduled workflow runs Get all scheduled workflow runs for a tenant :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param order_by_field: The order by field :type order_by_field: ScheduledWorkflowsOrderByField :param order_by_direction: The order by direction :type order_by_direction: WorkflowRunOrderByDirection :param workflow_id: The workflow id to get runs for. :type workflow_id: str :param parent_workflow_run_id: The parent workflow run id :type parent_workflow_run_id: str :param parent_step_run_id: The parent step run id :type parent_step_run_id: str :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param statuses: A list of scheduled run statuses to filter by :type statuses: List[ScheduledRunStatus] :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_scheduled_list_serialize( tenant=tenant, offset=offset, limit=limit, order_by_field=order_by_field, order_by_direction=order_by_direction, workflow_id=workflow_id, parent_workflow_run_id=parent_workflow_run_id, parent_step_run_id=parent_step_run_id, additional_metadata=additional_metadata, statuses=statuses, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "ScheduledWorkflowsList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_scheduled_list_with_http_info( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, order_by_field: Annotated[ Optional[ScheduledWorkflowsOrderByField], Field(description="The order by field"), ] = None, order_by_direction: Annotated[ Optional[WorkflowRunOrderByDirection], Field(description="The order by direction"), ] = None, workflow_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, parent_workflow_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent workflow run id"), ] = None, parent_step_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent step run id"), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, statuses: Annotated[ Optional[List[ScheduledRunStatus]], Field(description="A list of scheduled run statuses to filter by"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[ScheduledWorkflowsList]: """Get scheduled workflow runs Get all scheduled workflow runs for a tenant :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param order_by_field: The order by field :type order_by_field: ScheduledWorkflowsOrderByField :param order_by_direction: The order by direction :type order_by_direction: WorkflowRunOrderByDirection :param workflow_id: The workflow id to get runs for. :type workflow_id: str :param parent_workflow_run_id: The parent workflow run id :type parent_workflow_run_id: str :param parent_step_run_id: The parent step run id :type parent_step_run_id: str :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param statuses: A list of scheduled run statuses to filter by :type statuses: List[ScheduledRunStatus] :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_scheduled_list_serialize( tenant=tenant, offset=offset, limit=limit, order_by_field=order_by_field, order_by_direction=order_by_direction, workflow_id=workflow_id, parent_workflow_run_id=parent_workflow_run_id, parent_step_run_id=parent_step_run_id, additional_metadata=additional_metadata, statuses=statuses, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "ScheduledWorkflowsList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_scheduled_list_without_preload_content( self, tenant: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The tenant id" ), ], offset: Annotated[ Optional[StrictInt], Field(description="The number to skip") ] = None, limit: Annotated[ Optional[StrictInt], Field(description="The number to limit by") ] = None, order_by_field: Annotated[ Optional[ScheduledWorkflowsOrderByField], Field(description="The order by field"), ] = None, order_by_direction: Annotated[ Optional[WorkflowRunOrderByDirection], Field(description="The order by direction"), ] = None, workflow_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The workflow id to get runs for."), ] = None, parent_workflow_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent workflow run id"), ] = None, parent_step_run_id: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field(description="The parent step run id"), ] = None, additional_metadata: Annotated[ Optional[List[StrictStr]], Field(description="A list of metadata key value pairs to filter by"), ] = None, statuses: Annotated[ Optional[List[ScheduledRunStatus]], Field(description="A list of scheduled run statuses to filter by"), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get scheduled workflow runs Get all scheduled workflow runs for a tenant :param tenant: The tenant id (required) :type tenant: str :param offset: The number to skip :type offset: int :param limit: The number to limit by :type limit: int :param order_by_field: The order by field :type order_by_field: ScheduledWorkflowsOrderByField :param order_by_direction: The order by direction :type order_by_direction: WorkflowRunOrderByDirection :param workflow_id: The workflow id to get runs for. :type workflow_id: str :param parent_workflow_run_id: The parent workflow run id :type parent_workflow_run_id: str :param parent_step_run_id: The parent step run id :type parent_step_run_id: str :param additional_metadata: A list of metadata key value pairs to filter by :type additional_metadata: List[str] :param statuses: A list of scheduled run statuses to filter by :type statuses: List[ScheduledRunStatus] :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_scheduled_list_serialize( tenant=tenant, offset=offset, limit=limit, order_by_field=order_by_field, order_by_direction=order_by_direction, workflow_id=workflow_id, parent_workflow_run_id=parent_workflow_run_id, parent_step_run_id=parent_step_run_id, additional_metadata=additional_metadata, statuses=statuses, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "ScheduledWorkflowsList", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_scheduled_list_serialize( self, tenant, offset, limit, order_by_field, order_by_direction, workflow_id, parent_workflow_run_id, parent_step_run_id, additional_metadata, statuses, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = { "additionalMetadata": "multi", "statuses": "multi", } _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if tenant is not None: _path_params["tenant"] = tenant # process the query parameters if offset is not None: _query_params.append(("offset", offset)) if limit is not None: _query_params.append(("limit", limit)) if order_by_field is not None: _query_params.append(("orderByField", order_by_field.value)) if order_by_direction is not None: _query_params.append(("orderByDirection", order_by_direction.value)) if workflow_id is not None: _query_params.append(("workflowId", workflow_id)) if parent_workflow_run_id is not None: _query_params.append(("parentWorkflowRunId", parent_workflow_run_id)) if parent_step_run_id is not None: _query_params.append(("parentStepRunId", parent_step_run_id)) if additional_metadata is not None: _query_params.append(("additionalMetadata", additional_metadata)) if statuses is not None: _query_params.append(("statuses", statuses)) # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/tenants/{tenant}/workflows/scheduled", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_update( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], workflow_update_request: Annotated[ WorkflowUpdateRequest, Field(description="The input to update the workflow") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> Workflow: """Update workflow Update a workflow for a tenant :param workflow: The workflow id (required) :type workflow: str :param workflow_update_request: The input to update the workflow (required) :type workflow_update_request: WorkflowUpdateRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_update_serialize( workflow=workflow, workflow_update_request=workflow_update_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Workflow", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_update_with_http_info( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], workflow_update_request: Annotated[ WorkflowUpdateRequest, Field(description="The input to update the workflow") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[Workflow]: """Update workflow Update a workflow for a tenant :param workflow: The workflow id (required) :type workflow: str :param workflow_update_request: The input to update the workflow (required) :type workflow_update_request: WorkflowUpdateRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_update_serialize( workflow=workflow, workflow_update_request=workflow_update_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Workflow", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_update_without_preload_content( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], workflow_update_request: Annotated[ WorkflowUpdateRequest, Field(description="The input to update the workflow") ], _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Update workflow Update a workflow for a tenant :param workflow: The workflow id (required) :type workflow: str :param workflow_update_request: The input to update the workflow (required) :type workflow_update_request: WorkflowUpdateRequest :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_update_serialize( workflow=workflow, workflow_update_request=workflow_update_request, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "Workflow", "400": "APIErrors", "403": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_update_serialize( self, workflow, workflow_update_request, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if workflow is not None: _path_params["workflow"] = workflow # process the query parameters # process the header parameters # process the form parameters # process the body parameter if workflow_update_request is not None: _body_params = workflow_update_request # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # set the HTTP header `Content-Type` if _content_type: _header_params["Content-Type"] = _content_type else: _default_content_type = self.api_client.select_header_content_type( ["application/json"] ) if _default_content_type is not None: _header_params["Content-Type"] = _default_content_type # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="PATCH", resource_path="/api/v1/workflows/{workflow}", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, ) @validate_call async def workflow_version_get( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], version: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field( description="The workflow version. If not supplied, the latest version is fetched." ), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> WorkflowVersion: """Get workflow version Get a workflow version for a tenant :param workflow: The workflow id (required) :type workflow: str :param version: The workflow version. If not supplied, the latest version is fetched. :type version: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_version_get_serialize( workflow=workflow, version=version, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowVersion", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ).data @validate_call async def workflow_version_get_with_http_info( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], version: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field( description="The workflow version. If not supplied, the latest version is fetched." ), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> ApiResponse[WorkflowVersion]: """Get workflow version Get a workflow version for a tenant :param workflow: The workflow id (required) :type workflow: str :param version: The workflow version. If not supplied, the latest version is fetched. :type version: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_version_get_serialize( workflow=workflow, version=version, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowVersion", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) await response_data.read() return self.api_client.response_deserialize( response_data=response_data, response_types_map=_response_types_map, ) @validate_call async def workflow_version_get_without_preload_content( self, workflow: Annotated[ str, Field( min_length=36, strict=True, max_length=36, description="The workflow id" ), ], version: Annotated[ Optional[Annotated[str, Field(min_length=36, strict=True, max_length=36)]], Field( description="The workflow version. If not supplied, the latest version is fetched." ), ] = None, _request_timeout: Union[ None, Annotated[StrictFloat, Field(gt=0)], Tuple[ Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] ], ] = None, _request_auth: Optional[Dict[StrictStr, Any]] = None, _content_type: Optional[StrictStr] = None, _headers: Optional[Dict[StrictStr, Any]] = None, _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, ) -> RESTResponseType: """Get workflow version Get a workflow version for a tenant :param workflow: The workflow id (required) :type workflow: str :param version: The workflow version. If not supplied, the latest version is fetched. :type version: str :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :type _request_timeout: int, tuple(int, int), optional :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :param _content_type: force content-type for the request. :type _content_type: str, Optional :param _headers: set to override the headers for a single request; this effectively ignores the headers in the spec for a single request. :type _headers: dict, optional :param _host_index: set to override the host_index for a single request; this effectively ignores the host_index in the spec for a single request. :type _host_index: int, optional :return: Returns the result object. """ # noqa: E501 _param = self._workflow_version_get_serialize( workflow=workflow, version=version, _request_auth=_request_auth, _content_type=_content_type, _headers=_headers, _host_index=_host_index, ) _response_types_map: Dict[str, Optional[str]] = { "200": "WorkflowVersion", "400": "APIErrors", "403": "APIErrors", "404": "APIErrors", } response_data = await self.api_client.call_api( *_param, _request_timeout=_request_timeout ) return response_data.response def _workflow_version_get_serialize( self, workflow, version, _request_auth, _content_type, _headers, _host_index, ) -> RequestSerialized: _host = None _collection_formats: Dict[str, str] = {} _path_params: Dict[str, str] = {} _query_params: List[Tuple[str, str]] = [] _header_params: Dict[str, Optional[str]] = _headers or {} _form_params: List[Tuple[str, str]] = [] _files: Dict[ str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] ] = {} _body_params: Optional[bytes] = None # process the path parameters if workflow is not None: _path_params["workflow"] = workflow # process the query parameters if version is not None: _query_params.append(("version", version)) # process the header parameters # process the form parameters # process the body parameter # set the HTTP header `Accept` if "Accept" not in _header_params: _header_params["Accept"] = self.api_client.select_header_accept( ["application/json"] ) # authentication setting _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] return self.api_client.param_serialize( method="GET", resource_path="/api/v1/workflows/{workflow}/versions", path_params=_path_params, query_params=_query_params, header_params=_header_params, body=_body_params, post_params=_form_params, files=_files, auth_settings=_auth_settings, collection_formats=_collection_formats, _host=_host, _request_auth=_request_auth, )