import asyncio
from logging import Logger
from typing import Callable
import grpc
from hatchet_sdk.clients.run_event_listener import RunEventListenerClient
from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener
from hatchet_sdk.connection import new_conn
from .clients.admin import AdminClient, new_admin
from .clients.dispatcher.dispatcher import DispatcherClient, new_dispatcher
from .clients.events import EventClient, new_event
from .clients.rest_client import RestApi
from .loader import ClientConfig, ConfigLoader
class Client:
admin: AdminClient
dispatcher: DispatcherClient
event: EventClient
rest: RestApi
workflow_listener: PooledWorkflowRunListener
logInterceptor: Logger
debug: bool = False
@classmethod
def from_environment(
cls,
defaults: ClientConfig = ClientConfig(),
debug: bool = False,
*opts_functions: Callable[[ClientConfig], None],
):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
config: ClientConfig = ConfigLoader(".").load_client_config(defaults)
for opt_function in opts_functions:
opt_function(config)
return cls.from_config(config, debug)
@classmethod
def from_config(
cls,
config: ClientConfig = ClientConfig(),
debug: bool = False,
):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if config.tls_config is None:
raise ValueError("TLS config is required")
if config.host_port is None:
raise ValueError("Host and port are required")
conn: grpc.Channel = new_conn(config)
# Instantiate clients
event_client = new_event(conn, config)
admin_client = new_admin(config)
dispatcher_client = new_dispatcher(config)
rest_client = RestApi(config.server_url, config.token, config.tenant_id)
workflow_listener = None # Initialize this if needed
return cls(
event_client,
admin_client,
dispatcher_client,
workflow_listener,
rest_client,
config,
debug,
)
def __init__(
self,
event_client: EventClient,
admin_client: AdminClient,
dispatcher_client: DispatcherClient,
workflow_listener: PooledWorkflowRunListener,
rest_client: RestApi,
config: ClientConfig,
debug: bool = False,
):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.admin = admin_client
self.dispatcher = dispatcher_client
self.event = event_client
self.rest = rest_client
self.config = config
self.listener = RunEventListenerClient(config)
self.workflow_listener = workflow_listener
self.logInterceptor = config.logInterceptor
self.debug = debug
def with_host_port(host: str, port: int):
def with_host_port_impl(config: ClientConfig):
config.host = host
config.port = port
return with_host_port_impl
new_client = Client.from_environment
new_client_raw = Client.from_config