aboutsummaryrefslogtreecommitdiff
import datetime
from typing import Any, Coroutine, Dict, List, Optional, Union

from pydantic import BaseModel

from hatchet_sdk.client import Client
from hatchet_sdk.clients.rest.models.cron_workflows import CronWorkflows
from hatchet_sdk.clients.rest.models.cron_workflows_order_by_field import (
    CronWorkflowsOrderByField,
)
from hatchet_sdk.clients.rest.models.scheduled_workflows import ScheduledWorkflows
from hatchet_sdk.clients.rest.models.scheduled_workflows_list import (
    ScheduledWorkflowsList,
)
from hatchet_sdk.clients.rest.models.workflow_run_order_by_direction import (
    WorkflowRunOrderByDirection,
)


class CreateScheduledTriggerInput(BaseModel):
    """
    Schema for creating a scheduled workflow run.

    Attributes:
        input (Dict[str, Any]): The input data for the scheduled workflow.
        additional_metadata (Dict[str, str]): Additional metadata associated with the future run (e.g. ["key1:value1", "key2:value2"]).
        trigger_at (Optional[datetime.datetime]): The datetime when the run should be triggered.
    """

    input: Dict[str, Any] = {}
    additional_metadata: Dict[str, str] = {}
    trigger_at: Optional[datetime.datetime] = None


class ScheduledClient:
    """
    Client for managing scheduled workflows synchronously.

    Attributes:
        _client (Client): The underlying client used to interact with the REST API.
        aio (ScheduledClientAsync): Asynchronous counterpart of ScheduledClient.
    """

    _client: Client

    def __init__(self, _client: Client) -> None:
        """
        Initializes the ScheduledClient with a given Client instance.

        Args:
            _client (Client): The client instance to be used for REST interactions.
        """
        self._client = _client
        self.aio: "ScheduledClientAsync" = ScheduledClientAsync(_client)

    def create(
        self,
        workflow_name: str,
        trigger_at: datetime.datetime,
        input: Dict[str, Any],
        additional_metadata: Dict[str, str],
    ) -> ScheduledWorkflows:
        """
        Creates a new scheduled workflow run asynchronously.

        Args:
            workflow_name (str): The name of the scheduled workflow.
            trigger_at (datetime.datetime): The datetime when the run should be triggered.
            input (Dict[str, Any]): The input data for the scheduled workflow.
            additional_metadata (Dict[str, str]): Additional metadata associated with the future run as a key-value pair (e.g. {"key1": "value1", "key2": "value2"}).

        Returns:
            ScheduledWorkflows: The created scheduled workflow instance.
        """

        validated_input = CreateScheduledTriggerInput(
            trigger_at=trigger_at, input=input, additional_metadata=additional_metadata
        )

        return self._client.rest.schedule_create(
            workflow_name,
            validated_input.trigger_at,
            validated_input.input,
            validated_input.additional_metadata,
        )

    def delete(self, scheduled: Union[str, ScheduledWorkflows]) -> None:
        """
        Deletes a scheduled workflow run.

        Args:
            scheduled (Union[str, ScheduledWorkflows]): The scheduled workflow trigger ID or ScheduledWorkflows instance to delete.
        """
        id_ = scheduled
        if isinstance(scheduled, ScheduledWorkflows):
            id_ = scheduled.metadata.id
        self._client.rest.schedule_delete(id_)

    def list(
        self,
        offset: Optional[int] = None,
        limit: Optional[int] = None,
        workflow_id: Optional[str] = None,
        additional_metadata: Optional[List[str]] = None,
        order_by_field: Optional[CronWorkflowsOrderByField] = None,
        order_by_direction: Optional[WorkflowRunOrderByDirection] = None,
    ) -> ScheduledWorkflowsList:
        """
        Retrieves a list of scheduled workflows based on provided filters.

        Args:
            offset (Optional[int]): The starting point for the list.
            limit (Optional[int]): The maximum number of items to return.
            workflow_id (Optional[str]): Filter by specific workflow ID.
            additional_metadata (Optional[List[str]]): Filter by additional metadata keys (e.g. ["key1:value1", "key2:value2"]).
            order_by_field (Optional[CronWorkflowsOrderByField]): Field to order the results by.
            order_by_direction (Optional[WorkflowRunOrderByDirection]): Direction to order the results.

        Returns:
            List[ScheduledWorkflows]: A list of scheduled workflows matching the criteria.
        """
        return self._client.rest.schedule_list(
            offset=offset,
            limit=limit,
            workflow_id=workflow_id,
            additional_metadata=additional_metadata,
            order_by_field=order_by_field,
            order_by_direction=order_by_direction,
        )

    def get(self, scheduled: Union[str, ScheduledWorkflows]) -> ScheduledWorkflows:
        """
        Retrieves a specific scheduled workflow by scheduled run trigger ID.

        Args:
            scheduled (Union[str, ScheduledWorkflows]): The scheduled workflow trigger ID or ScheduledWorkflows instance to retrieve.

        Returns:
            ScheduledWorkflows: The requested scheduled workflow instance.
        """
        id_ = scheduled
        if isinstance(scheduled, ScheduledWorkflows):
            id_ = scheduled.metadata.id
        return self._client.rest.schedule_get(id_)


class ScheduledClientAsync:
    """
    Asynchronous client for managing scheduled workflows.

    Attributes:
        _client (Client): The underlying client used to interact with the REST API asynchronously.
    """

    _client: Client

    def __init__(self, _client: Client) -> None:
        """
        Initializes the ScheduledClientAsync with a given Client instance.

        Args:
            _client (Client): The client instance to be used for asynchronous REST interactions.
        """
        self._client = _client

    async def create(
        self,
        workflow_name: str,
        trigger_at: datetime.datetime,
        input: Dict[str, Any],
        additional_metadata: Dict[str, str],
    ) -> ScheduledWorkflows:
        """
        Creates a new scheduled workflow run asynchronously.

        Args:
            workflow_name (str): The name of the scheduled workflow.
            trigger_at (datetime.datetime): The datetime when the run should be triggered.
            input (Dict[str, Any]): The input data for the scheduled workflow.
            additional_metadata (Dict[str, str]): Additional metadata associated with the future run.

        Returns:
            ScheduledWorkflows: The created scheduled workflow instance.
        """
        return await self._client.rest.aio.schedule_create(
            workflow_name, trigger_at, input, additional_metadata
        )

    async def delete(self, scheduled: Union[str, ScheduledWorkflows]) -> None:
        """
        Deletes a scheduled workflow asynchronously.

        Args:
            scheduled (Union[str, ScheduledWorkflows]): The scheduled workflow trigger ID or ScheduledWorkflows instance to delete.
        """
        id_ = scheduled
        if isinstance(scheduled, ScheduledWorkflows):
            id_ = scheduled.metadata.id
        await self._client.rest.aio.schedule_delete(id_)

    async def list(
        self,
        offset: Optional[int] = None,
        limit: Optional[int] = None,
        workflow_id: Optional[str] = None,
        additional_metadata: Optional[List[str]] = None,
        order_by_field: Optional[CronWorkflowsOrderByField] = None,
        order_by_direction: Optional[WorkflowRunOrderByDirection] = None,
    ) -> ScheduledWorkflowsList:
        """
        Retrieves a list of scheduled workflows based on provided filters asynchronously.

        Args:
            offset (Optional[int]): The starting point for the list.
            limit (Optional[int]): The maximum number of items to return.
            workflow_id (Optional[str]): Filter by specific workflow ID.
            additional_metadata (Optional[List[str]]): Filter by additional metadata keys (e.g. ["key1:value1", "key2:value2"]).
            order_by_field (Optional[CronWorkflowsOrderByField]): Field to order the results by.
            order_by_direction (Optional[WorkflowRunOrderByDirection]): Direction to order the results.

        Returns:
            ScheduledWorkflowsList: A list of scheduled workflows matching the criteria.
        """
        return await self._client.rest.aio.schedule_list(
            offset=offset,
            limit=limit,
            workflow_id=workflow_id,
            additional_metadata=additional_metadata,
            order_by_field=order_by_field,
            order_by_direction=order_by_direction,
        )

    async def get(
        self, scheduled: Union[str, ScheduledWorkflows]
    ) -> ScheduledWorkflows:
        """
        Retrieves a specific scheduled workflow by scheduled run trigger ID asynchronously.

        Args:
            scheduled (Union[str, ScheduledWorkflows]): The scheduled workflow trigger ID or ScheduledWorkflows instance to retrieve.

        Returns:
            ScheduledWorkflows: The requested scheduled workflow instance.
        """
        id_ = scheduled
        if isinstance(scheduled, ScheduledWorkflows):
            id_ = scheduled.metadata.id
        return await self._client.rest.aio.schedule_get(id_)