from typing import Union
from pydantic import BaseModel, field_validator
from hatchet_sdk.client import Client
from hatchet_sdk.clients.rest.models.cron_workflows import CronWorkflows
from hatchet_sdk.clients.rest.models.cron_workflows_list import CronWorkflowsList
from hatchet_sdk.clients.rest.models.cron_workflows_order_by_field import (
CronWorkflowsOrderByField,
)
from hatchet_sdk.clients.rest.models.workflow_run_order_by_direction import (
WorkflowRunOrderByDirection,
)
class CreateCronTriggerInput(BaseModel):
"""
Schema for creating a workflow run triggered by a cron.
Attributes:
expression (str): The cron expression defining the schedule.
input (dict): The input data for the cron workflow.
additional_metadata (dict[str, str]): Additional metadata associated with the cron trigger (e.g. {"key1": "value1", "key2": "value2"}).
"""
expression: str = None
input: dict = {}
additional_metadata: dict[str, str] = {}
@field_validator("expression")
def validate_cron_expression(cls, v):
"""
Validates the cron expression to ensure it adheres to the expected format.
Args:
v (str): The cron expression to validate.
Raises:
ValueError: If the expression is invalid.
Returns:
str: The validated cron expression.
"""
if not v:
raise ValueError("Cron expression is required")
parts = v.split()
if len(parts) != 5:
raise ValueError(
"Cron expression must have 5 parts: minute hour day month weekday"
)
for part in parts:
if not (
part == "*"
or part.replace("*/", "").replace("-", "").replace(",", "").isdigit()
):
raise ValueError(f"Invalid cron expression part: {part}")
return v
class CronClient:
"""
Client for managing workflow cron triggers synchronously.
Attributes:
_client (Client): The underlying client used to interact with the REST API.
aio (CronClientAsync): Asynchronous counterpart of CronClient.
"""
_client: Client
def __init__(self, _client: Client):
"""
Initializes the CronClient with a given Client instance.
Args:
_client (Client): The client instance to be used for REST interactions.
"""
self._client = _client
self.aio = CronClientAsync(_client)
def create(
self,
workflow_name: str,
cron_name: str,
expression: str,
input: dict,
additional_metadata: dict[str, str],
) -> CronWorkflows:
"""
Creates a new workflow cron trigger.
Args:
workflow_name (str): The name of the workflow to trigger.
cron_name (str): The name of the cron trigger.
expression (str): The cron expression defining the schedule.
input (dict): The input data for the cron workflow.
additional_metadata (dict[str, str]): Additional metadata associated with the cron trigger (e.g. {"key1": "value1", "key2": "value2"}).
Returns:
CronWorkflows: The created cron workflow instance.
"""
validated_input = CreateCronTriggerInput(
expression=expression, input=input, additional_metadata=additional_metadata
)
return self._client.rest.cron_create(
workflow_name,
cron_name,
validated_input.expression,
validated_input.input,
validated_input.additional_metadata,
)
def delete(self, cron_trigger: Union[str, CronWorkflows]) -> None:
"""
Deletes a workflow cron trigger.
Args:
cron_trigger (Union[str, CronWorkflows]): The cron trigger ID or CronWorkflows instance to delete.
"""
id_ = cron_trigger
if isinstance(cron_trigger, CronWorkflows):
id_ = cron_trigger.metadata.id
self._client.rest.cron_delete(id_)
def list(
self,
offset: int | None = None,
limit: int | None = None,
workflow_id: str | None = None,
additional_metadata: list[str] | None = None,
order_by_field: CronWorkflowsOrderByField | None = None,
order_by_direction: WorkflowRunOrderByDirection | None = None,
) -> CronWorkflowsList:
"""
Retrieves a list of all workflow cron triggers matching the criteria.
Args:
offset (int | None): The offset to start the list from.
limit (int | None): The maximum number of items to return.
workflow_id (str | None): The ID of the workflow to filter by.
additional_metadata (list[str] | None): Filter by additional metadata keys (e.g. ["key1:value1", "key2:value2"]).
order_by_field (CronWorkflowsOrderByField | None): The field to order the list by.
order_by_direction (WorkflowRunOrderByDirection | None): The direction to order the list by.
Returns:
CronWorkflowsList: A list of cron workflows.
"""
return self._client.rest.cron_list(
offset=offset,
limit=limit,
workflow_id=workflow_id,
additional_metadata=additional_metadata,
order_by_field=order_by_field,
order_by_direction=order_by_direction,
)
def get(self, cron_trigger: Union[str, CronWorkflows]) -> CronWorkflows:
"""
Retrieves a specific workflow cron trigger by ID.
Args:
cron_trigger (Union[str, CronWorkflows]): The cron trigger ID or CronWorkflows instance to retrieve.
Returns:
CronWorkflows: The requested cron workflow instance.
"""
id_ = cron_trigger
if isinstance(cron_trigger, CronWorkflows):
id_ = cron_trigger.metadata.id
return self._client.rest.cron_get(id_)
class CronClientAsync:
"""
Asynchronous client for managing workflow cron triggers.
Attributes:
_client (Client): The underlying client used to interact with the REST API asynchronously.
"""
_client: Client
def __init__(self, _client: Client):
"""
Initializes the CronClientAsync with a given Client instance.
Args:
_client (Client): The client instance to be used for asynchronous REST interactions.
"""
self._client = _client
async def create(
self,
workflow_name: str,
cron_name: str,
expression: str,
input: dict,
additional_metadata: dict[str, str],
) -> CronWorkflows:
"""
Asynchronously creates a new workflow cron trigger.
Args:
workflow_name (str): The name of the workflow to trigger.
cron_name (str): The name of the cron trigger.
expression (str): The cron expression defining the schedule.
input (dict): The input data for the cron workflow.
additional_metadata (dict[str, str]): Additional metadata associated with the cron trigger (e.g. {"key1": "value1", "key2": "value2"}).
Returns:
CronWorkflows: The created cron workflow instance.
"""
validated_input = CreateCronTriggerInput(
expression=expression, input=input, additional_metadata=additional_metadata
)
return await self._client.rest.aio.cron_create(
workflow_name=workflow_name,
cron_name=cron_name,
expression=validated_input.expression,
input=validated_input.input,
additional_metadata=validated_input.additional_metadata,
)
async def delete(self, cron_trigger: Union[str, CronWorkflows]) -> None:
"""
Asynchronously deletes a workflow cron trigger.
Args:
cron_trigger (Union[str, CronWorkflows]): The cron trigger ID or CronWorkflows instance to delete.
"""
id_ = cron_trigger
if isinstance(cron_trigger, CronWorkflows):
id_ = cron_trigger.metadata.id
await self._client.rest.aio.cron_delete(id_)
async def list(
self,
offset: int | None = None,
limit: int | None = None,
workflow_id: str | None = None,
additional_metadata: list[str] | None = None,
order_by_field: CronWorkflowsOrderByField | None = None,
order_by_direction: WorkflowRunOrderByDirection | None = None,
) -> CronWorkflowsList:
"""
Asynchronously retrieves a list of all workflow cron triggers matching the criteria.
Args:
offset (int | None): The offset to start the list from.
limit (int | None): The maximum number of items to return.
workflow_id (str | None): The ID of the workflow to filter by.
additional_metadata (list[str] | None): Filter by additional metadata keys (e.g. ["key1:value1", "key2:value2"]).
order_by_field (CronWorkflowsOrderByField | None): The field to order the list by.
order_by_direction (WorkflowRunOrderByDirection | None): The direction to order the list by.
Returns:
CronWorkflowsList: A list of cron workflows.
"""
return await self._client.rest.aio.cron_list(
offset=offset,
limit=limit,
workflow_id=workflow_id,
additional_metadata=additional_metadata,
order_by_field=order_by_field,
order_by_direction=order_by_direction,
)
async def get(self, cron_trigger: Union[str, CronWorkflows]) -> CronWorkflows:
"""
Asynchronously retrieves a specific workflow cron trigger by ID.
Args:
cron_trigger (Union[str, CronWorkflows]): The cron trigger ID or CronWorkflows instance to retrieve.
Returns:
CronWorkflows: The requested cron workflow instance.
"""
id_ = cron_trigger
if isinstance(cron_trigger, CronWorkflows):
id_ = cron_trigger.metadata.id
return await self._client.rest.aio.cron_get(id_)