diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py new file mode 100644 index 00000000..4dd3faf0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py @@ -0,0 +1,224 @@ +from typing import Any, Callable, TypeVar, Union + +from hatchet_sdk import Worker +from hatchet_sdk.context.context import Context +from hatchet_sdk.contracts.workflows_pb2 import ( # type: ignore[attr-defined] + ConcurrencyLimitStrategy, + StickyStrategy, +) +from hatchet_sdk.hatchet import Hatchet as HatchetV1 +from hatchet_sdk.hatchet import workflow +from hatchet_sdk.labels import DesiredWorkerLabel +from hatchet_sdk.rate_limit import RateLimit +from hatchet_sdk.v2.callable import DurableContext, HatchetCallable +from hatchet_sdk.v2.concurrency import ConcurrencyFunction +from hatchet_sdk.worker.worker import register_on_worker + +T = TypeVar("T") + + +def function( + name: str = "", + auto_register: bool = True, + on_events: list[str] | None = None, + on_crons: list[str] | None = None, + version: str = "", + timeout: str = "60m", + schedule_timeout: str = "5m", + sticky: StickyStrategy = None, + retries: int = 0, + rate_limits: list[RateLimit] | None = None, + desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, + concurrency: ConcurrencyFunction | None = None, + on_failure: Union["HatchetCallable[T]", None] = None, + default_priority: int | None = None, +) -> Callable[[Callable[[Context], str]], HatchetCallable[T]]: + def inner(func: Callable[[Context], T]) -> HatchetCallable[T]: + return HatchetCallable( + func=func, + name=name, + auto_register=auto_register, + on_events=on_events, + on_crons=on_crons, + version=version, + timeout=timeout, + schedule_timeout=schedule_timeout, + sticky=sticky, + retries=retries, + rate_limits=rate_limits, + desired_worker_labels=desired_worker_labels, + concurrency=concurrency, + on_failure=on_failure, + default_priority=default_priority, + ) + + return inner + + +def durable( + name: str = "", + auto_register: bool = True, + on_events: list[str] | None = None, + on_crons: list[str] | None = None, + version: str = "", + timeout: str = "60m", + schedule_timeout: str = "5m", + sticky: StickyStrategy = None, + retries: int = 0, + rate_limits: list[RateLimit] | None = None, + desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, + concurrency: ConcurrencyFunction | None = None, + on_failure: HatchetCallable[T] | None = None, + default_priority: int | None = None, +) -> Callable[[HatchetCallable[T]], HatchetCallable[T]]: + def inner(func: HatchetCallable[T]) -> HatchetCallable[T]: + func.durable = True + + f = function( + name=name, + auto_register=auto_register, + on_events=on_events, + on_crons=on_crons, + version=version, + timeout=timeout, + schedule_timeout=schedule_timeout, + sticky=sticky, + retries=retries, + rate_limits=rate_limits, + desired_worker_labels=desired_worker_labels, + concurrency=concurrency, + on_failure=on_failure, + default_priority=default_priority, + ) + + resp = f(func) + + resp.durable = True + + return resp + + return inner + + +def concurrency( + name: str = "concurrency", + max_runs: int = 1, + limit_strategy: ConcurrencyLimitStrategy = ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, +) -> Callable[[Callable[[Context], str]], ConcurrencyFunction]: + def inner(func: Callable[[Context], str]) -> ConcurrencyFunction: + return ConcurrencyFunction(func, name, max_runs, limit_strategy) + + return inner + + +class Hatchet(HatchetV1): + dag = staticmethod(workflow) + concurrency = staticmethod(concurrency) + + functions: list[HatchetCallable[T]] = [] + + def function( + self, + name: str = "", + auto_register: bool = True, + on_events: list[str] | None = None, + on_crons: list[str] | None = None, + version: str = "", + timeout: str = "60m", + schedule_timeout: str = "5m", + retries: int = 0, + rate_limits: list[RateLimit] | None = None, + desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, + concurrency: ConcurrencyFunction | None = None, + on_failure: Union["HatchetCallable[T]", None] = None, + default_priority: int | None = None, + ) -> Callable[[Callable[[Context], Any]], Callable[[Context], Any]]: + resp = function( + name=name, + auto_register=auto_register, + on_events=on_events, + on_crons=on_crons, + version=version, + timeout=timeout, + schedule_timeout=schedule_timeout, + retries=retries, + rate_limits=rate_limits, + desired_worker_labels=desired_worker_labels, + concurrency=concurrency, + on_failure=on_failure, + default_priority=default_priority, + ) + + def wrapper(func: Callable[[Context], str]) -> HatchetCallable[T]: + wrapped_resp = resp(func) + + if wrapped_resp.function_auto_register: + self.functions.append(wrapped_resp) + + wrapped_resp.with_namespace(self._client.config.namespace) + + return wrapped_resp + + return wrapper + + def durable( + self, + name: str = "", + auto_register: bool = True, + on_events: list[str] | None = None, + on_crons: list[str] | None = None, + version: str = "", + timeout: str = "60m", + schedule_timeout: str = "5m", + sticky: StickyStrategy = None, + retries: int = 0, + rate_limits: list[RateLimit] | None = None, + desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, + concurrency: ConcurrencyFunction | None = None, + on_failure: Union["HatchetCallable[T]", None] = None, + default_priority: int | None = None, + ) -> Callable[[Callable[[DurableContext], Any]], Callable[[DurableContext], Any]]: + resp = durable( + name=name, + auto_register=auto_register, + on_events=on_events, + on_crons=on_crons, + version=version, + timeout=timeout, + schedule_timeout=schedule_timeout, + sticky=sticky, + retries=retries, + rate_limits=rate_limits, + desired_worker_labels=desired_worker_labels, + concurrency=concurrency, + on_failure=on_failure, + default_priority=default_priority, + ) + + def wrapper(func: HatchetCallable[T]) -> HatchetCallable[T]: + wrapped_resp = resp(func) + + if wrapped_resp.function_auto_register: + self.functions.append(wrapped_resp) + + wrapped_resp.with_namespace(self._client.config.namespace) + + return wrapped_resp + + return wrapper + + def worker( + self, name: str, max_runs: int | None = None, labels: dict[str, str | int] = {} + ): + worker = Worker( + name=name, + max_runs=max_runs, + labels=labels, + config=self._client.config, + debug=self._client.debug, + ) + + for func in self.functions: + register_on_worker(func, worker) + + return worker |