aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/hatchet_sdk/features/scheduled.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/features/scheduled.py')
-rw-r--r--.venv/lib/python3.12/site-packages/hatchet_sdk/features/scheduled.py248
1 files changed, 248 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/features/scheduled.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/features/scheduled.py
new file mode 100644
index 00000000..45af2609
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/features/scheduled.py
@@ -0,0 +1,248 @@
+import datetime
+from typing import Any, Coroutine, Dict, List, Optional, Union
+
+from pydantic import BaseModel
+
+from hatchet_sdk.client import Client
+from hatchet_sdk.clients.rest.models.cron_workflows import CronWorkflows
+from hatchet_sdk.clients.rest.models.cron_workflows_order_by_field import (
+ CronWorkflowsOrderByField,
+)
+from hatchet_sdk.clients.rest.models.scheduled_workflows import ScheduledWorkflows
+from hatchet_sdk.clients.rest.models.scheduled_workflows_list import (
+ ScheduledWorkflowsList,
+)
+from hatchet_sdk.clients.rest.models.workflow_run_order_by_direction import (
+ WorkflowRunOrderByDirection,
+)
+
+
+class CreateScheduledTriggerInput(BaseModel):
+ """
+ Schema for creating a scheduled workflow run.
+
+ Attributes:
+ input (Dict[str, Any]): The input data for the scheduled workflow.
+ additional_metadata (Dict[str, str]): Additional metadata associated with the future run (e.g. ["key1:value1", "key2:value2"]).
+ trigger_at (Optional[datetime.datetime]): The datetime when the run should be triggered.
+ """
+
+ input: Dict[str, Any] = {}
+ additional_metadata: Dict[str, str] = {}
+ trigger_at: Optional[datetime.datetime] = None
+
+
+class ScheduledClient:
+ """
+ Client for managing scheduled workflows synchronously.
+
+ Attributes:
+ _client (Client): The underlying client used to interact with the REST API.
+ aio (ScheduledClientAsync): Asynchronous counterpart of ScheduledClient.
+ """
+
+ _client: Client
+
+ def __init__(self, _client: Client) -> None:
+ """
+ Initializes the ScheduledClient with a given Client instance.
+
+ Args:
+ _client (Client): The client instance to be used for REST interactions.
+ """
+ self._client = _client
+ self.aio: "ScheduledClientAsync" = ScheduledClientAsync(_client)
+
+ def create(
+ self,
+ workflow_name: str,
+ trigger_at: datetime.datetime,
+ input: Dict[str, Any],
+ additional_metadata: Dict[str, str],
+ ) -> ScheduledWorkflows:
+ """
+ Creates a new scheduled workflow run asynchronously.
+
+ Args:
+ workflow_name (str): The name of the scheduled workflow.
+ trigger_at (datetime.datetime): The datetime when the run should be triggered.
+ input (Dict[str, Any]): The input data for the scheduled workflow.
+ additional_metadata (Dict[str, str]): Additional metadata associated with the future run as a key-value pair (e.g. {"key1": "value1", "key2": "value2"}).
+
+ Returns:
+ ScheduledWorkflows: The created scheduled workflow instance.
+ """
+
+ validated_input = CreateScheduledTriggerInput(
+ trigger_at=trigger_at, input=input, additional_metadata=additional_metadata
+ )
+
+ return self._client.rest.schedule_create(
+ workflow_name,
+ validated_input.trigger_at,
+ validated_input.input,
+ validated_input.additional_metadata,
+ )
+
+ def delete(self, scheduled: Union[str, ScheduledWorkflows]) -> None:
+ """
+ Deletes a scheduled workflow run.
+
+ Args:
+ scheduled (Union[str, ScheduledWorkflows]): The scheduled workflow trigger ID or ScheduledWorkflows instance to delete.
+ """
+ id_ = scheduled
+ if isinstance(scheduled, ScheduledWorkflows):
+ id_ = scheduled.metadata.id
+ self._client.rest.schedule_delete(id_)
+
+ def list(
+ self,
+ offset: Optional[int] = None,
+ limit: Optional[int] = None,
+ workflow_id: Optional[str] = None,
+ additional_metadata: Optional[List[str]] = None,
+ order_by_field: Optional[CronWorkflowsOrderByField] = None,
+ order_by_direction: Optional[WorkflowRunOrderByDirection] = None,
+ ) -> ScheduledWorkflowsList:
+ """
+ Retrieves a list of scheduled workflows based on provided filters.
+
+ Args:
+ offset (Optional[int]): The starting point for the list.
+ limit (Optional[int]): The maximum number of items to return.
+ workflow_id (Optional[str]): Filter by specific workflow ID.
+ additional_metadata (Optional[List[str]]): Filter by additional metadata keys (e.g. ["key1:value1", "key2:value2"]).
+ order_by_field (Optional[CronWorkflowsOrderByField]): Field to order the results by.
+ order_by_direction (Optional[WorkflowRunOrderByDirection]): Direction to order the results.
+
+ Returns:
+ List[ScheduledWorkflows]: A list of scheduled workflows matching the criteria.
+ """
+ return self._client.rest.schedule_list(
+ offset=offset,
+ limit=limit,
+ workflow_id=workflow_id,
+ additional_metadata=additional_metadata,
+ order_by_field=order_by_field,
+ order_by_direction=order_by_direction,
+ )
+
+ def get(self, scheduled: Union[str, ScheduledWorkflows]) -> ScheduledWorkflows:
+ """
+ Retrieves a specific scheduled workflow by scheduled run trigger ID.
+
+ Args:
+ scheduled (Union[str, ScheduledWorkflows]): The scheduled workflow trigger ID or ScheduledWorkflows instance to retrieve.
+
+ Returns:
+ ScheduledWorkflows: The requested scheduled workflow instance.
+ """
+ id_ = scheduled
+ if isinstance(scheduled, ScheduledWorkflows):
+ id_ = scheduled.metadata.id
+ return self._client.rest.schedule_get(id_)
+
+
+class ScheduledClientAsync:
+ """
+ Asynchronous client for managing scheduled workflows.
+
+ Attributes:
+ _client (Client): The underlying client used to interact with the REST API asynchronously.
+ """
+
+ _client: Client
+
+ def __init__(self, _client: Client) -> None:
+ """
+ Initializes the ScheduledClientAsync with a given Client instance.
+
+ Args:
+ _client (Client): The client instance to be used for asynchronous REST interactions.
+ """
+ self._client = _client
+
+ async def create(
+ self,
+ workflow_name: str,
+ trigger_at: datetime.datetime,
+ input: Dict[str, Any],
+ additional_metadata: Dict[str, str],
+ ) -> ScheduledWorkflows:
+ """
+ Creates a new scheduled workflow run asynchronously.
+
+ Args:
+ workflow_name (str): The name of the scheduled workflow.
+ trigger_at (datetime.datetime): The datetime when the run should be triggered.
+ input (Dict[str, Any]): The input data for the scheduled workflow.
+ additional_metadata (Dict[str, str]): Additional metadata associated with the future run.
+
+ Returns:
+ ScheduledWorkflows: The created scheduled workflow instance.
+ """
+ return await self._client.rest.aio.schedule_create(
+ workflow_name, trigger_at, input, additional_metadata
+ )
+
+ async def delete(self, scheduled: Union[str, ScheduledWorkflows]) -> None:
+ """
+ Deletes a scheduled workflow asynchronously.
+
+ Args:
+ scheduled (Union[str, ScheduledWorkflows]): The scheduled workflow trigger ID or ScheduledWorkflows instance to delete.
+ """
+ id_ = scheduled
+ if isinstance(scheduled, ScheduledWorkflows):
+ id_ = scheduled.metadata.id
+ await self._client.rest.aio.schedule_delete(id_)
+
+ async def list(
+ self,
+ offset: Optional[int] = None,
+ limit: Optional[int] = None,
+ workflow_id: Optional[str] = None,
+ additional_metadata: Optional[List[str]] = None,
+ order_by_field: Optional[CronWorkflowsOrderByField] = None,
+ order_by_direction: Optional[WorkflowRunOrderByDirection] = None,
+ ) -> ScheduledWorkflowsList:
+ """
+ Retrieves a list of scheduled workflows based on provided filters asynchronously.
+
+ Args:
+ offset (Optional[int]): The starting point for the list.
+ limit (Optional[int]): The maximum number of items to return.
+ workflow_id (Optional[str]): Filter by specific workflow ID.
+ additional_metadata (Optional[List[str]]): Filter by additional metadata keys (e.g. ["key1:value1", "key2:value2"]).
+ order_by_field (Optional[CronWorkflowsOrderByField]): Field to order the results by.
+ order_by_direction (Optional[WorkflowRunOrderByDirection]): Direction to order the results.
+
+ Returns:
+ ScheduledWorkflowsList: A list of scheduled workflows matching the criteria.
+ """
+ return await self._client.rest.aio.schedule_list(
+ offset=offset,
+ limit=limit,
+ workflow_id=workflow_id,
+ additional_metadata=additional_metadata,
+ order_by_field=order_by_field,
+ order_by_direction=order_by_direction,
+ )
+
+ async def get(
+ self, scheduled: Union[str, ScheduledWorkflows]
+ ) -> ScheduledWorkflows:
+ """
+ Retrieves a specific scheduled workflow by scheduled run trigger ID asynchronously.
+
+ Args:
+ scheduled (Union[str, ScheduledWorkflows]): The scheduled workflow trigger ID or ScheduledWorkflows instance to retrieve.
+
+ Returns:
+ ScheduledWorkflows: The requested scheduled workflow instance.
+ """
+ id_ = scheduled
+ if isinstance(scheduled, ScheduledWorkflows):
+ id_ = scheduled.metadata.id
+ return await self._client.rest.aio.schedule_get(id_)