diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/v2')
3 files changed, 473 insertions, 0 deletions
| diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/callable.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/callable.py new file mode 100644 index 00000000..097a7d87 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/callable.py @@ -0,0 +1,202 @@ +import asyncio +from typing import ( + Any, + Callable, + Dict, + Generic, + List, + Optional, + TypedDict, + TypeVar, + Union, +) + +from hatchet_sdk.clients.admin import ChildTriggerWorkflowOptions +from hatchet_sdk.context.context import Context +from hatchet_sdk.contracts.workflows_pb2 import ( # type: ignore[attr-defined] + CreateStepRateLimit, + CreateWorkflowJobOpts, + CreateWorkflowStepOpts, + CreateWorkflowVersionOpts, + DesiredWorkerLabels, + StickyStrategy, + WorkflowConcurrencyOpts, + WorkflowKind, +) +from hatchet_sdk.labels import DesiredWorkerLabel +from hatchet_sdk.logger import logger +from hatchet_sdk.rate_limit import RateLimit +from hatchet_sdk.v2.concurrency import ConcurrencyFunction +from hatchet_sdk.workflow_run import RunRef + +T = TypeVar("T") + + +class HatchetCallable(Generic[T]): + def __init__( + self, + func: Callable[[Context], T], + durable: bool = False, + name: str = "", + auto_register: bool = True, + on_events: list[str] | None = None, + on_crons: list[str] | None = None, + version: str = "", + timeout: str = "60m", + schedule_timeout: str = "5m", + sticky: StickyStrategy = None, + retries: int = 0, + rate_limits: List[RateLimit] | None = None, + concurrency: ConcurrencyFunction | None = None, + on_failure: Union["HatchetCallable[T]", None] = None, + desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, + default_priority: int | None = None, + ): + self.func = func + + on_events = on_events or [] + on_crons = on_crons or [] + + limits = None + if rate_limits: + limits = [rate_limit._req for rate_limit in rate_limits or []] + + self.function_desired_worker_labels = {} + + for key, d in desired_worker_labels.items(): + value = d["value"] if "value" in d else None + self.function_desired_worker_labels[key] = DesiredWorkerLabels( + strValue=str(value) if not isinstance(value, int) else None, + intValue=value if isinstance(value, int) else None, + required=d["required"] if "required" in d else None, + weight=d["weight"] if "weight" in d else None, + comparator=d["comparator"] if "comparator" in d else None, + ) + self.sticky = sticky + self.default_priority = default_priority + self.durable = durable + self.function_name = name.lower() or str(func.__name__).lower() + self.function_version = version + self.function_on_events = on_events + self.function_on_crons = on_crons + self.function_timeout = timeout + self.function_schedule_timeout = schedule_timeout + self.function_retries = retries + self.function_rate_limits = limits + self.function_concurrency = concurrency + self.function_on_failure = on_failure + self.function_namespace = "default" + self.function_auto_register = auto_register + + self.is_coroutine = False + + if asyncio.iscoroutinefunction(func): + self.is_coroutine = True + + def __call__(self, context: Context) -> T: + return self.func(context) + + def with_namespace(self, namespace: str) -> None: + if namespace is not None and namespace != "": + self.function_namespace = namespace + self.function_name = namespace + self.function_name + + def to_workflow_opts(self) -> CreateWorkflowVersionOpts: + kind: WorkflowKind = WorkflowKind.FUNCTION + + if self.durable: + kind = WorkflowKind.DURABLE + + on_failure_job: CreateWorkflowJobOpts | None = None + + if self.function_on_failure is not None: + on_failure_job = CreateWorkflowJobOpts( + name=self.function_name + "-on-failure", + steps=[ + self.function_on_failure.to_step(), + ], + ) + + concurrency: WorkflowConcurrencyOpts | None = None + + if self.function_concurrency is not None: + self.function_concurrency.set_namespace(self.function_namespace) + concurrency = WorkflowConcurrencyOpts( + action=self.function_concurrency.get_action_name(), + max_runs=self.function_concurrency.max_runs, + limit_strategy=self.function_concurrency.limit_strategy, + ) + + validated_priority = ( + max(1, min(3, self.default_priority)) if self.default_priority else None + ) + if validated_priority != self.default_priority: + logger.warning( + "Warning: Default Priority Must be between 1 and 3 -- inclusively. Adjusted to be within the range." + ) + + return CreateWorkflowVersionOpts( + name=self.function_name, + kind=kind, + version=self.function_version, + event_triggers=self.function_on_events, + cron_triggers=self.function_on_crons, + schedule_timeout=self.function_schedule_timeout, + sticky=self.sticky, + on_failure_job=on_failure_job, + concurrency=concurrency, + jobs=[ + CreateWorkflowJobOpts( + name=self.function_name, + steps=[ + self.to_step(), + ], + ) + ], + default_priority=validated_priority, + ) + + def to_step(self) -> CreateWorkflowStepOpts: + return CreateWorkflowStepOpts( + readable_id=self.function_name, + action=self.get_action_name(), + timeout=self.function_timeout, + inputs="{}", + parents=[], + retries=self.function_retries, + rate_limits=self.function_rate_limits, + worker_labels=self.function_desired_worker_labels, + ) + + def get_action_name(self) -> str: + return self.function_namespace + ":" + self.function_name + + +class DurableContext(Context): + def run( + self, + function: str | Callable[[Context], Any], + input: dict[Any, Any] = {}, + key: str | None = None, + options: ChildTriggerWorkflowOptions | None = None, + ) -> "RunRef[T]": + worker_id = self.worker.id() + + workflow_name = function + + if not isinstance(function, str): + workflow_name = function.function_name + + # if ( + # options is not None + # and "sticky" in options + # and options["sticky"] == True + # and not self.worker.has_workflow(workflow_name) + # ): + # raise Exception( + # f"cannot run with sticky: workflow {workflow_name} is not registered on the worker" + # ) + + trigger_options = self._prepare_workflow_options(key, options, worker_id) + + return self.admin_client.run(function, input, trigger_options) diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/concurrency.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/concurrency.py new file mode 100644 index 00000000..73d9e3b4 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/concurrency.py @@ -0,0 +1,47 @@ +from typing import Any, Callable + +from hatchet_sdk.context.context import Context +from hatchet_sdk.contracts.workflows_pb2 import ( # type: ignore[attr-defined] + ConcurrencyLimitStrategy, +) + + +class ConcurrencyFunction: + def __init__( + self, + func: Callable[[Context], str], + name: str = "concurrency", + max_runs: int = 1, + limit_strategy: ConcurrencyLimitStrategy = ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, + ): + self.func = func + self.name = name + self.max_runs = max_runs + self.limit_strategy = limit_strategy + self.namespace = "default" + + def set_namespace(self, namespace: str) -> None: + self.namespace = namespace + + def get_action_name(self) -> str: + return self.namespace + ":" + self.name + + def __call__(self, *args: Any, **kwargs: Any) -> str: + return self.func(*args, **kwargs) + + def __str__(self) -> str: + return f"{self.name}({self.max_runs})" + + def __repr__(self) -> str: + return f"{self.name}({self.max_runs})" + + +def concurrency( + name: str = "", + max_runs: int = 1, + limit_strategy: ConcurrencyLimitStrategy = ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, +) -> Callable[[Callable[[Context], str]], ConcurrencyFunction]: + def inner(func: Callable[[Context], str]) -> ConcurrencyFunction: + return ConcurrencyFunction(func, name, max_runs, limit_strategy) + + return inner diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py new file mode 100644 index 00000000..4dd3faf0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py @@ -0,0 +1,224 @@ +from typing import Any, Callable, TypeVar, Union + +from hatchet_sdk import Worker +from hatchet_sdk.context.context import Context +from hatchet_sdk.contracts.workflows_pb2 import ( # type: ignore[attr-defined] + ConcurrencyLimitStrategy, + StickyStrategy, +) +from hatchet_sdk.hatchet import Hatchet as HatchetV1 +from hatchet_sdk.hatchet import workflow +from hatchet_sdk.labels import DesiredWorkerLabel +from hatchet_sdk.rate_limit import RateLimit +from hatchet_sdk.v2.callable import DurableContext, HatchetCallable +from hatchet_sdk.v2.concurrency import ConcurrencyFunction +from hatchet_sdk.worker.worker import register_on_worker + +T = TypeVar("T") + + +def function( + name: str = "", + auto_register: bool = True, + on_events: list[str] | None = None, + on_crons: list[str] | None = None, + version: str = "", + timeout: str = "60m", + schedule_timeout: str = "5m", + sticky: StickyStrategy = None, + retries: int = 0, + rate_limits: list[RateLimit] | None = None, + desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, + concurrency: ConcurrencyFunction | None = None, + on_failure: Union["HatchetCallable[T]", None] = None, + default_priority: int | None = None, +) -> Callable[[Callable[[Context], str]], HatchetCallable[T]]: + def inner(func: Callable[[Context], T]) -> HatchetCallable[T]: + return HatchetCallable( + func=func, + name=name, + auto_register=auto_register, + on_events=on_events, + on_crons=on_crons, + version=version, + timeout=timeout, + schedule_timeout=schedule_timeout, + sticky=sticky, + retries=retries, + rate_limits=rate_limits, + desired_worker_labels=desired_worker_labels, + concurrency=concurrency, + on_failure=on_failure, + default_priority=default_priority, + ) + + return inner + + +def durable( + name: str = "", + auto_register: bool = True, + on_events: list[str] | None = None, + on_crons: list[str] | None = None, + version: str = "", + timeout: str = "60m", + schedule_timeout: str = "5m", + sticky: StickyStrategy = None, + retries: int = 0, + rate_limits: list[RateLimit] | None = None, + desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, + concurrency: ConcurrencyFunction | None = None, + on_failure: HatchetCallable[T] | None = None, + default_priority: int | None = None, +) -> Callable[[HatchetCallable[T]], HatchetCallable[T]]: + def inner(func: HatchetCallable[T]) -> HatchetCallable[T]: + func.durable = True + + f = function( + name=name, + auto_register=auto_register, + on_events=on_events, + on_crons=on_crons, + version=version, + timeout=timeout, + schedule_timeout=schedule_timeout, + sticky=sticky, + retries=retries, + rate_limits=rate_limits, + desired_worker_labels=desired_worker_labels, + concurrency=concurrency, + on_failure=on_failure, + default_priority=default_priority, + ) + + resp = f(func) + + resp.durable = True + + return resp + + return inner + + +def concurrency( + name: str = "concurrency", + max_runs: int = 1, + limit_strategy: ConcurrencyLimitStrategy = ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, +) -> Callable[[Callable[[Context], str]], ConcurrencyFunction]: + def inner(func: Callable[[Context], str]) -> ConcurrencyFunction: + return ConcurrencyFunction(func, name, max_runs, limit_strategy) + + return inner + + +class Hatchet(HatchetV1): + dag = staticmethod(workflow) + concurrency = staticmethod(concurrency) + + functions: list[HatchetCallable[T]] = [] + + def function( + self, + name: str = "", + auto_register: bool = True, + on_events: list[str] | None = None, + on_crons: list[str] | None = None, + version: str = "", + timeout: str = "60m", + schedule_timeout: str = "5m", + retries: int = 0, + rate_limits: list[RateLimit] | None = None, + desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, + concurrency: ConcurrencyFunction | None = None, + on_failure: Union["HatchetCallable[T]", None] = None, + default_priority: int | None = None, + ) -> Callable[[Callable[[Context], Any]], Callable[[Context], Any]]: + resp = function( + name=name, + auto_register=auto_register, + on_events=on_events, + on_crons=on_crons, + version=version, + timeout=timeout, + schedule_timeout=schedule_timeout, + retries=retries, + rate_limits=rate_limits, + desired_worker_labels=desired_worker_labels, + concurrency=concurrency, + on_failure=on_failure, + default_priority=default_priority, + ) + + def wrapper(func: Callable[[Context], str]) -> HatchetCallable[T]: + wrapped_resp = resp(func) + + if wrapped_resp.function_auto_register: + self.functions.append(wrapped_resp) + + wrapped_resp.with_namespace(self._client.config.namespace) + + return wrapped_resp + + return wrapper + + def durable( + self, + name: str = "", + auto_register: bool = True, + on_events: list[str] | None = None, + on_crons: list[str] | None = None, + version: str = "", + timeout: str = "60m", + schedule_timeout: str = "5m", + sticky: StickyStrategy = None, + retries: int = 0, + rate_limits: list[RateLimit] | None = None, + desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, + concurrency: ConcurrencyFunction | None = None, + on_failure: Union["HatchetCallable[T]", None] = None, + default_priority: int | None = None, + ) -> Callable[[Callable[[DurableContext], Any]], Callable[[DurableContext], Any]]: + resp = durable( + name=name, + auto_register=auto_register, + on_events=on_events, + on_crons=on_crons, + version=version, + timeout=timeout, + schedule_timeout=schedule_timeout, + sticky=sticky, + retries=retries, + rate_limits=rate_limits, + desired_worker_labels=desired_worker_labels, + concurrency=concurrency, + on_failure=on_failure, + default_priority=default_priority, + ) + + def wrapper(func: HatchetCallable[T]) -> HatchetCallable[T]: + wrapped_resp = resp(func) + + if wrapped_resp.function_auto_register: + self.functions.append(wrapped_resp) + + wrapped_resp.with_namespace(self._client.config.namespace) + + return wrapped_resp + + return wrapper + + def worker( + self, name: str, max_runs: int | None = None, labels: dict[str, str | int] = {} + ): + worker = Worker( + name=name, + max_runs=max_runs, + labels=labels, + config=self._client.config, + debug=self._client.debug, + ) + + for func in self.functions: + register_on_worker(func, worker) + + return worker | 
