diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/features/scheduled.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/hatchet_sdk/features/scheduled.py | 248 |
1 files changed, 248 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/features/scheduled.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/features/scheduled.py new file mode 100644 index 00000000..45af2609 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/features/scheduled.py @@ -0,0 +1,248 @@ +import datetime +from typing import Any, Coroutine, Dict, List, Optional, Union + +from pydantic import BaseModel + +from hatchet_sdk.client import Client +from hatchet_sdk.clients.rest.models.cron_workflows import CronWorkflows +from hatchet_sdk.clients.rest.models.cron_workflows_order_by_field import ( + CronWorkflowsOrderByField, +) +from hatchet_sdk.clients.rest.models.scheduled_workflows import ScheduledWorkflows +from hatchet_sdk.clients.rest.models.scheduled_workflows_list import ( + ScheduledWorkflowsList, +) +from hatchet_sdk.clients.rest.models.workflow_run_order_by_direction import ( + WorkflowRunOrderByDirection, +) + + +class CreateScheduledTriggerInput(BaseModel): + """ + Schema for creating a scheduled workflow run. + + Attributes: + input (Dict[str, Any]): The input data for the scheduled workflow. + additional_metadata (Dict[str, str]): Additional metadata associated with the future run (e.g. ["key1:value1", "key2:value2"]). + trigger_at (Optional[datetime.datetime]): The datetime when the run should be triggered. + """ + + input: Dict[str, Any] = {} + additional_metadata: Dict[str, str] = {} + trigger_at: Optional[datetime.datetime] = None + + +class ScheduledClient: + """ + Client for managing scheduled workflows synchronously. + + Attributes: + _client (Client): The underlying client used to interact with the REST API. + aio (ScheduledClientAsync): Asynchronous counterpart of ScheduledClient. + """ + + _client: Client + + def __init__(self, _client: Client) -> None: + """ + Initializes the ScheduledClient with a given Client instance. + + Args: + _client (Client): The client instance to be used for REST interactions. + """ + self._client = _client + self.aio: "ScheduledClientAsync" = ScheduledClientAsync(_client) + + def create( + self, + workflow_name: str, + trigger_at: datetime.datetime, + input: Dict[str, Any], + additional_metadata: Dict[str, str], + ) -> ScheduledWorkflows: + """ + Creates a new scheduled workflow run asynchronously. + + Args: + workflow_name (str): The name of the scheduled workflow. + trigger_at (datetime.datetime): The datetime when the run should be triggered. + input (Dict[str, Any]): The input data for the scheduled workflow. + additional_metadata (Dict[str, str]): Additional metadata associated with the future run as a key-value pair (e.g. {"key1": "value1", "key2": "value2"}). + + Returns: + ScheduledWorkflows: The created scheduled workflow instance. + """ + + validated_input = CreateScheduledTriggerInput( + trigger_at=trigger_at, input=input, additional_metadata=additional_metadata + ) + + return self._client.rest.schedule_create( + workflow_name, + validated_input.trigger_at, + validated_input.input, + validated_input.additional_metadata, + ) + + def delete(self, scheduled: Union[str, ScheduledWorkflows]) -> None: + """ + Deletes a scheduled workflow run. + + Args: + scheduled (Union[str, ScheduledWorkflows]): The scheduled workflow trigger ID or ScheduledWorkflows instance to delete. + """ + id_ = scheduled + if isinstance(scheduled, ScheduledWorkflows): + id_ = scheduled.metadata.id + self._client.rest.schedule_delete(id_) + + def list( + self, + offset: Optional[int] = None, + limit: Optional[int] = None, + workflow_id: Optional[str] = None, + additional_metadata: Optional[List[str]] = None, + order_by_field: Optional[CronWorkflowsOrderByField] = None, + order_by_direction: Optional[WorkflowRunOrderByDirection] = None, + ) -> ScheduledWorkflowsList: + """ + Retrieves a list of scheduled workflows based on provided filters. + + Args: + offset (Optional[int]): The starting point for the list. + limit (Optional[int]): The maximum number of items to return. + workflow_id (Optional[str]): Filter by specific workflow ID. + additional_metadata (Optional[List[str]]): Filter by additional metadata keys (e.g. ["key1:value1", "key2:value2"]). + order_by_field (Optional[CronWorkflowsOrderByField]): Field to order the results by. + order_by_direction (Optional[WorkflowRunOrderByDirection]): Direction to order the results. + + Returns: + List[ScheduledWorkflows]: A list of scheduled workflows matching the criteria. + """ + return self._client.rest.schedule_list( + offset=offset, + limit=limit, + workflow_id=workflow_id, + additional_metadata=additional_metadata, + order_by_field=order_by_field, + order_by_direction=order_by_direction, + ) + + def get(self, scheduled: Union[str, ScheduledWorkflows]) -> ScheduledWorkflows: + """ + Retrieves a specific scheduled workflow by scheduled run trigger ID. + + Args: + scheduled (Union[str, ScheduledWorkflows]): The scheduled workflow trigger ID or ScheduledWorkflows instance to retrieve. + + Returns: + ScheduledWorkflows: The requested scheduled workflow instance. + """ + id_ = scheduled + if isinstance(scheduled, ScheduledWorkflows): + id_ = scheduled.metadata.id + return self._client.rest.schedule_get(id_) + + +class ScheduledClientAsync: + """ + Asynchronous client for managing scheduled workflows. + + Attributes: + _client (Client): The underlying client used to interact with the REST API asynchronously. + """ + + _client: Client + + def __init__(self, _client: Client) -> None: + """ + Initializes the ScheduledClientAsync with a given Client instance. + + Args: + _client (Client): The client instance to be used for asynchronous REST interactions. + """ + self._client = _client + + async def create( + self, + workflow_name: str, + trigger_at: datetime.datetime, + input: Dict[str, Any], + additional_metadata: Dict[str, str], + ) -> ScheduledWorkflows: + """ + Creates a new scheduled workflow run asynchronously. + + Args: + workflow_name (str): The name of the scheduled workflow. + trigger_at (datetime.datetime): The datetime when the run should be triggered. + input (Dict[str, Any]): The input data for the scheduled workflow. + additional_metadata (Dict[str, str]): Additional metadata associated with the future run. + + Returns: + ScheduledWorkflows: The created scheduled workflow instance. + """ + return await self._client.rest.aio.schedule_create( + workflow_name, trigger_at, input, additional_metadata + ) + + async def delete(self, scheduled: Union[str, ScheduledWorkflows]) -> None: + """ + Deletes a scheduled workflow asynchronously. + + Args: + scheduled (Union[str, ScheduledWorkflows]): The scheduled workflow trigger ID or ScheduledWorkflows instance to delete. + """ + id_ = scheduled + if isinstance(scheduled, ScheduledWorkflows): + id_ = scheduled.metadata.id + await self._client.rest.aio.schedule_delete(id_) + + async def list( + self, + offset: Optional[int] = None, + limit: Optional[int] = None, + workflow_id: Optional[str] = None, + additional_metadata: Optional[List[str]] = None, + order_by_field: Optional[CronWorkflowsOrderByField] = None, + order_by_direction: Optional[WorkflowRunOrderByDirection] = None, + ) -> ScheduledWorkflowsList: + """ + Retrieves a list of scheduled workflows based on provided filters asynchronously. + + Args: + offset (Optional[int]): The starting point for the list. + limit (Optional[int]): The maximum number of items to return. + workflow_id (Optional[str]): Filter by specific workflow ID. + additional_metadata (Optional[List[str]]): Filter by additional metadata keys (e.g. ["key1:value1", "key2:value2"]). + order_by_field (Optional[CronWorkflowsOrderByField]): Field to order the results by. + order_by_direction (Optional[WorkflowRunOrderByDirection]): Direction to order the results. + + Returns: + ScheduledWorkflowsList: A list of scheduled workflows matching the criteria. + """ + return await self._client.rest.aio.schedule_list( + offset=offset, + limit=limit, + workflow_id=workflow_id, + additional_metadata=additional_metadata, + order_by_field=order_by_field, + order_by_direction=order_by_direction, + ) + + async def get( + self, scheduled: Union[str, ScheduledWorkflows] + ) -> ScheduledWorkflows: + """ + Retrieves a specific scheduled workflow by scheduled run trigger ID asynchronously. + + Args: + scheduled (Union[str, ScheduledWorkflows]): The scheduled workflow trigger ID or ScheduledWorkflows instance to retrieve. + + Returns: + ScheduledWorkflows: The requested scheduled workflow instance. + """ + id_ = scheduled + if isinstance(scheduled, ScheduledWorkflows): + id_ = scheduled.metadata.id + return await self._client.rest.aio.schedule_get(id_) |