aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py')
-rw-r--r--.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py224
1 files changed, 224 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py
new file mode 100644
index 00000000..4dd3faf0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/v2/hatchet.py
@@ -0,0 +1,224 @@
+from typing import Any, Callable, TypeVar, Union
+
+from hatchet_sdk import Worker
+from hatchet_sdk.context.context import Context
+from hatchet_sdk.contracts.workflows_pb2 import ( # type: ignore[attr-defined]
+ ConcurrencyLimitStrategy,
+ StickyStrategy,
+)
+from hatchet_sdk.hatchet import Hatchet as HatchetV1
+from hatchet_sdk.hatchet import workflow
+from hatchet_sdk.labels import DesiredWorkerLabel
+from hatchet_sdk.rate_limit import RateLimit
+from hatchet_sdk.v2.callable import DurableContext, HatchetCallable
+from hatchet_sdk.v2.concurrency import ConcurrencyFunction
+from hatchet_sdk.worker.worker import register_on_worker
+
+T = TypeVar("T")
+
+
+def function(
+ name: str = "",
+ auto_register: bool = True,
+ on_events: list[str] | None = None,
+ on_crons: list[str] | None = None,
+ version: str = "",
+ timeout: str = "60m",
+ schedule_timeout: str = "5m",
+ sticky: StickyStrategy = None,
+ retries: int = 0,
+ rate_limits: list[RateLimit] | None = None,
+ desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
+ concurrency: ConcurrencyFunction | None = None,
+ on_failure: Union["HatchetCallable[T]", None] = None,
+ default_priority: int | None = None,
+) -> Callable[[Callable[[Context], str]], HatchetCallable[T]]:
+ def inner(func: Callable[[Context], T]) -> HatchetCallable[T]:
+ return HatchetCallable(
+ func=func,
+ name=name,
+ auto_register=auto_register,
+ on_events=on_events,
+ on_crons=on_crons,
+ version=version,
+ timeout=timeout,
+ schedule_timeout=schedule_timeout,
+ sticky=sticky,
+ retries=retries,
+ rate_limits=rate_limits,
+ desired_worker_labels=desired_worker_labels,
+ concurrency=concurrency,
+ on_failure=on_failure,
+ default_priority=default_priority,
+ )
+
+ return inner
+
+
+def durable(
+ name: str = "",
+ auto_register: bool = True,
+ on_events: list[str] | None = None,
+ on_crons: list[str] | None = None,
+ version: str = "",
+ timeout: str = "60m",
+ schedule_timeout: str = "5m",
+ sticky: StickyStrategy = None,
+ retries: int = 0,
+ rate_limits: list[RateLimit] | None = None,
+ desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
+ concurrency: ConcurrencyFunction | None = None,
+ on_failure: HatchetCallable[T] | None = None,
+ default_priority: int | None = None,
+) -> Callable[[HatchetCallable[T]], HatchetCallable[T]]:
+ def inner(func: HatchetCallable[T]) -> HatchetCallable[T]:
+ func.durable = True
+
+ f = function(
+ name=name,
+ auto_register=auto_register,
+ on_events=on_events,
+ on_crons=on_crons,
+ version=version,
+ timeout=timeout,
+ schedule_timeout=schedule_timeout,
+ sticky=sticky,
+ retries=retries,
+ rate_limits=rate_limits,
+ desired_worker_labels=desired_worker_labels,
+ concurrency=concurrency,
+ on_failure=on_failure,
+ default_priority=default_priority,
+ )
+
+ resp = f(func)
+
+ resp.durable = True
+
+ return resp
+
+ return inner
+
+
+def concurrency(
+ name: str = "concurrency",
+ max_runs: int = 1,
+ limit_strategy: ConcurrencyLimitStrategy = ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
+) -> Callable[[Callable[[Context], str]], ConcurrencyFunction]:
+ def inner(func: Callable[[Context], str]) -> ConcurrencyFunction:
+ return ConcurrencyFunction(func, name, max_runs, limit_strategy)
+
+ return inner
+
+
+class Hatchet(HatchetV1):
+ dag = staticmethod(workflow)
+ concurrency = staticmethod(concurrency)
+
+ functions: list[HatchetCallable[T]] = []
+
+ def function(
+ self,
+ name: str = "",
+ auto_register: bool = True,
+ on_events: list[str] | None = None,
+ on_crons: list[str] | None = None,
+ version: str = "",
+ timeout: str = "60m",
+ schedule_timeout: str = "5m",
+ retries: int = 0,
+ rate_limits: list[RateLimit] | None = None,
+ desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
+ concurrency: ConcurrencyFunction | None = None,
+ on_failure: Union["HatchetCallable[T]", None] = None,
+ default_priority: int | None = None,
+ ) -> Callable[[Callable[[Context], Any]], Callable[[Context], Any]]:
+ resp = function(
+ name=name,
+ auto_register=auto_register,
+ on_events=on_events,
+ on_crons=on_crons,
+ version=version,
+ timeout=timeout,
+ schedule_timeout=schedule_timeout,
+ retries=retries,
+ rate_limits=rate_limits,
+ desired_worker_labels=desired_worker_labels,
+ concurrency=concurrency,
+ on_failure=on_failure,
+ default_priority=default_priority,
+ )
+
+ def wrapper(func: Callable[[Context], str]) -> HatchetCallable[T]:
+ wrapped_resp = resp(func)
+
+ if wrapped_resp.function_auto_register:
+ self.functions.append(wrapped_resp)
+
+ wrapped_resp.with_namespace(self._client.config.namespace)
+
+ return wrapped_resp
+
+ return wrapper
+
+ def durable(
+ self,
+ name: str = "",
+ auto_register: bool = True,
+ on_events: list[str] | None = None,
+ on_crons: list[str] | None = None,
+ version: str = "",
+ timeout: str = "60m",
+ schedule_timeout: str = "5m",
+ sticky: StickyStrategy = None,
+ retries: int = 0,
+ rate_limits: list[RateLimit] | None = None,
+ desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
+ concurrency: ConcurrencyFunction | None = None,
+ on_failure: Union["HatchetCallable[T]", None] = None,
+ default_priority: int | None = None,
+ ) -> Callable[[Callable[[DurableContext], Any]], Callable[[DurableContext], Any]]:
+ resp = durable(
+ name=name,
+ auto_register=auto_register,
+ on_events=on_events,
+ on_crons=on_crons,
+ version=version,
+ timeout=timeout,
+ schedule_timeout=schedule_timeout,
+ sticky=sticky,
+ retries=retries,
+ rate_limits=rate_limits,
+ desired_worker_labels=desired_worker_labels,
+ concurrency=concurrency,
+ on_failure=on_failure,
+ default_priority=default_priority,
+ )
+
+ def wrapper(func: HatchetCallable[T]) -> HatchetCallable[T]:
+ wrapped_resp = resp(func)
+
+ if wrapped_resp.function_auto_register:
+ self.functions.append(wrapped_resp)
+
+ wrapped_resp.with_namespace(self._client.config.namespace)
+
+ return wrapped_resp
+
+ return wrapper
+
+ def worker(
+ self, name: str, max_runs: int | None = None, labels: dict[str, str | int] = {}
+ ):
+ worker = Worker(
+ name=name,
+ max_runs=max_runs,
+ labels=labels,
+ config=self._client.config,
+ debug=self._client.debug,
+ )
+
+ for func in self.functions:
+ register_on_worker(func, worker)
+
+ return worker