1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
import contextvars
import uuid
from contextlib import asynccontextmanager
from typing import Any
from .kv_logger import KVLoggingSingleton
run_id_var = contextvars.ContextVar("run_id", default=None)
class RunManager:
def __init__(self, logger: KVLoggingSingleton):
self.logger = logger
self.run_info = {}
def generate_run_id(self) -> uuid.UUID:
return uuid.uuid4()
async def set_run_info(self, pipeline_type: str):
run_id = run_id_var.get()
if run_id is None:
run_id = self.generate_run_id()
token = run_id_var.set(run_id)
self.run_info[run_id] = {"pipeline_type": pipeline_type}
else:
token = run_id_var.set(run_id)
return run_id, token
async def get_run_info(self):
run_id = run_id_var.get()
return self.run_info.get(run_id, None)
async def log_run_info(
self, key: str, value: Any, is_info_log: bool = False
):
run_id = run_id_var.get()
if run_id:
await self.logger.log(
log_id=run_id, key=key, value=value, is_info_log=is_info_log
)
async def clear_run_info(self, token: contextvars.Token):
run_id = run_id_var.get()
run_id_var.reset(token)
if run_id and run_id in self.run_info:
del self.run_info[run_id]
@asynccontextmanager
async def manage_run(run_manager: RunManager, pipeline_type: str):
run_id, token = await run_manager.set_run_info(pipeline_type)
try:
yield run_id
finally:
# Note: Do not clear the run info to ensure the run ID remains the same
run_id_var.reset(token)
|