diff options
Diffstat (limited to 'R2R/r2r/base/logging/run_manager.py')
-rwxr-xr-x | R2R/r2r/base/logging/run_manager.py | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/R2R/r2r/base/logging/run_manager.py b/R2R/r2r/base/logging/run_manager.py new file mode 100755 index 00000000..ac192bca --- /dev/null +++ b/R2R/r2r/base/logging/run_manager.py @@ -0,0 +1,56 @@ +import contextvars +import uuid +from contextlib import asynccontextmanager +from typing import Any + +from .kv_logger import KVLoggingSingleton + +run_id_var = contextvars.ContextVar("run_id", default=None) + + +class RunManager: + def __init__(self, logger: KVLoggingSingleton): + self.logger = logger + self.run_info = {} + + def generate_run_id(self) -> uuid.UUID: + return uuid.uuid4() + + async def set_run_info(self, pipeline_type: str): + run_id = run_id_var.get() + if run_id is None: + run_id = self.generate_run_id() + token = run_id_var.set(run_id) + self.run_info[run_id] = {"pipeline_type": pipeline_type} + else: + token = run_id_var.set(run_id) + return run_id, token + + async def get_run_info(self): + run_id = run_id_var.get() + return self.run_info.get(run_id, None) + + async def log_run_info( + self, key: str, value: Any, is_info_log: bool = False + ): + run_id = run_id_var.get() + if run_id: + await self.logger.log( + log_id=run_id, key=key, value=value, is_info_log=is_info_log + ) + + async def clear_run_info(self, token: contextvars.Token): + run_id = run_id_var.get() + run_id_var.reset(token) + if run_id and run_id in self.run_info: + del self.run_info[run_id] + + +@asynccontextmanager +async def manage_run(run_manager: RunManager, pipeline_type: str): + run_id, token = await run_manager.set_run_info(pipeline_type) + try: + yield run_id + finally: + # Note: Do not clear the run info to ensure the run ID remains the same + run_id_var.reset(token) |