1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import asyncio
from logging import Logger
from typing import Callable
import grpc
from hatchet_sdk.clients.run_event_listener import RunEventListenerClient
from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener
from hatchet_sdk.connection import new_conn
from .clients.admin import AdminClient, new_admin
from .clients.dispatcher.dispatcher import DispatcherClient, new_dispatcher
from .clients.events import EventClient, new_event
from .clients.rest_client import RestApi
from .loader import ClientConfig, ConfigLoader
class Client:
admin: AdminClient
dispatcher: DispatcherClient
event: EventClient
rest: RestApi
workflow_listener: PooledWorkflowRunListener
logInterceptor: Logger
debug: bool = False
@classmethod
def from_environment(
cls,
defaults: ClientConfig = ClientConfig(),
debug: bool = False,
*opts_functions: Callable[[ClientConfig], None],
):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
config: ClientConfig = ConfigLoader(".").load_client_config(defaults)
for opt_function in opts_functions:
opt_function(config)
return cls.from_config(config, debug)
@classmethod
def from_config(
cls,
config: ClientConfig = ClientConfig(),
debug: bool = False,
):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if config.tls_config is None:
raise ValueError("TLS config is required")
if config.host_port is None:
raise ValueError("Host and port are required")
conn: grpc.Channel = new_conn(config)
# Instantiate clients
event_client = new_event(conn, config)
admin_client = new_admin(config)
dispatcher_client = new_dispatcher(config)
rest_client = RestApi(config.server_url, config.token, config.tenant_id)
workflow_listener = None # Initialize this if needed
return cls(
event_client,
admin_client,
dispatcher_client,
workflow_listener,
rest_client,
config,
debug,
)
def __init__(
self,
event_client: EventClient,
admin_client: AdminClient,
dispatcher_client: DispatcherClient,
workflow_listener: PooledWorkflowRunListener,
rest_client: RestApi,
config: ClientConfig,
debug: bool = False,
):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.admin = admin_client
self.dispatcher = dispatcher_client
self.event = event_client
self.rest = rest_client
self.config = config
self.listener = RunEventListenerClient(config)
self.workflow_listener = workflow_listener
self.logInterceptor = config.logInterceptor
self.debug = debug
def with_host_port(host: str, port: int):
def with_host_port_impl(config: ClientConfig):
config.host = host
config.port = port
return with_host_port_impl
new_client = Client.from_environment
new_client_raw = Client.from_config
|