diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/base/providers/orchestration.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/base/providers/orchestration.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/core/base/providers/orchestration.py | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/base/providers/orchestration.py b/.venv/lib/python3.12/site-packages/core/base/providers/orchestration.py new file mode 100644 index 00000000..c3105f30 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/base/providers/orchestration.py @@ -0,0 +1,70 @@ +from abc import abstractmethod +from enum import Enum +from typing import Any + +from .base import Provider, ProviderConfig + + +class Workflow(Enum): + INGESTION = "ingestion" + GRAPH = "graph" + + +class OrchestrationConfig(ProviderConfig): + provider: str + max_runs: int = 2_048 + graph_search_results_creation_concurrency_limit: int = 32 + ingestion_concurrency_limit: int = 16 + graph_search_results_concurrency_limit: int = 8 + + def validate_config(self) -> None: + if self.provider not in self.supported_providers: + raise ValueError(f"Provider {self.provider} is not supported.") + + @property + def supported_providers(self) -> list[str]: + return ["hatchet", "simple"] + + +class OrchestrationProvider(Provider): + def __init__(self, config: OrchestrationConfig): + super().__init__(config) + self.config = config + self.worker = None + + @abstractmethod + async def start_worker(self): + pass + + @abstractmethod + def get_worker(self, name: str, max_runs: int) -> Any: + pass + + @abstractmethod + def step(self, *args, **kwargs) -> Any: + pass + + @abstractmethod + def workflow(self, *args, **kwargs) -> Any: + pass + + @abstractmethod + def failure(self, *args, **kwargs) -> Any: + pass + + @abstractmethod + def register_workflows( + self, workflow: Workflow, service: Any, messages: dict + ) -> None: + pass + + @abstractmethod + async def run_workflow( + self, + workflow_name: str, + parameters: dict, + options: dict, + *args, + **kwargs, + ) -> dict[str, str]: + pass |