import asyncio from logging import Logger from typing import Callable import grpc from hatchet_sdk.clients.run_event_listener import RunEventListenerClient from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener from hatchet_sdk.connection import new_conn from .clients.admin import AdminClient, new_admin from .clients.dispatcher.dispatcher import DispatcherClient, new_dispatcher from .clients.events import EventClient, new_event from .clients.rest_client import RestApi from .loader import ClientConfig, ConfigLoader class Client: admin: AdminClient dispatcher: DispatcherClient event: EventClient rest: RestApi workflow_listener: PooledWorkflowRunListener logInterceptor: Logger debug: bool = False @classmethod def from_environment( cls, defaults: ClientConfig = ClientConfig(), debug: bool = False, *opts_functions: Callable[[ClientConfig], None], ): try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) config: ClientConfig = ConfigLoader(".").load_client_config(defaults) for opt_function in opts_functions: opt_function(config) return cls.from_config(config, debug) @classmethod def from_config( cls, config: ClientConfig = ClientConfig(), debug: bool = False, ): try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) if config.tls_config is None: raise ValueError("TLS config is required") if config.host_port is None: raise ValueError("Host and port are required") conn: grpc.Channel = new_conn(config) # Instantiate clients event_client = new_event(conn, config) admin_client = new_admin(config) dispatcher_client = new_dispatcher(config) rest_client = RestApi(config.server_url, config.token, config.tenant_id) workflow_listener = None # Initialize this if needed return cls( event_client, admin_client, dispatcher_client, workflow_listener, rest_client, config, debug, ) def __init__( self, event_client: EventClient, admin_client: AdminClient, dispatcher_client: DispatcherClient, workflow_listener: PooledWorkflowRunListener, rest_client: RestApi, config: ClientConfig, debug: bool = False, ): try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) self.admin = admin_client self.dispatcher = dispatcher_client self.event = event_client self.rest = rest_client self.config = config self.listener = RunEventListenerClient(config) self.workflow_listener = workflow_listener self.logInterceptor = config.logInterceptor self.debug = debug def with_host_port(host: str, port: int): def with_host_port_impl(config: ClientConfig): config.host = host config.port = port return with_host_port_impl new_client = Client.from_environment new_client_raw = Client.from_config