aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/hatchet_sdk/client.py
blob: 45dfd3946a58f5c2e40d88653d71c1a620d196ad (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import asyncio
from logging import Logger
from typing import Callable

import grpc

from hatchet_sdk.clients.run_event_listener import RunEventListenerClient
from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener
from hatchet_sdk.connection import new_conn

from .clients.admin import AdminClient, new_admin
from .clients.dispatcher.dispatcher import DispatcherClient, new_dispatcher
from .clients.events import EventClient, new_event
from .clients.rest_client import RestApi
from .loader import ClientConfig, ConfigLoader


class Client:
    admin: AdminClient
    dispatcher: DispatcherClient
    event: EventClient
    rest: RestApi
    workflow_listener: PooledWorkflowRunListener
    logInterceptor: Logger
    debug: bool = False

    @classmethod
    def from_environment(
        cls,
        defaults: ClientConfig = ClientConfig(),
        debug: bool = False,
        *opts_functions: Callable[[ClientConfig], None],
    ):
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

        config: ClientConfig = ConfigLoader(".").load_client_config(defaults)
        for opt_function in opts_functions:
            opt_function(config)

        return cls.from_config(config, debug)

    @classmethod
    def from_config(
        cls,
        config: ClientConfig = ClientConfig(),
        debug: bool = False,
    ):
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

        if config.tls_config is None:
            raise ValueError("TLS config is required")

        if config.host_port is None:
            raise ValueError("Host and port are required")

        conn: grpc.Channel = new_conn(config)

        # Instantiate clients
        event_client = new_event(conn, config)
        admin_client = new_admin(config)
        dispatcher_client = new_dispatcher(config)
        rest_client = RestApi(config.server_url, config.token, config.tenant_id)
        workflow_listener = None  # Initialize this if needed

        return cls(
            event_client,
            admin_client,
            dispatcher_client,
            workflow_listener,
            rest_client,
            config,
            debug,
        )

    def __init__(
        self,
        event_client: EventClient,
        admin_client: AdminClient,
        dispatcher_client: DispatcherClient,
        workflow_listener: PooledWorkflowRunListener,
        rest_client: RestApi,
        config: ClientConfig,
        debug: bool = False,
    ):
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

        self.admin = admin_client
        self.dispatcher = dispatcher_client
        self.event = event_client
        self.rest = rest_client
        self.config = config
        self.listener = RunEventListenerClient(config)
        self.workflow_listener = workflow_listener
        self.logInterceptor = config.logInterceptor
        self.debug = debug


def with_host_port(host: str, port: int):
    def with_host_port_impl(config: ClientConfig):
        config.host = host
        config.port = port

    return with_host_port_impl


new_client = Client.from_environment
new_client_raw = Client.from_config