aboutsummaryrefslogtreecommitdiff
path: root/R2R/r2r/base/logging/run_manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'R2R/r2r/base/logging/run_manager.py')
-rwxr-xr-xR2R/r2r/base/logging/run_manager.py56
1 files changed, 56 insertions, 0 deletions
diff --git a/R2R/r2r/base/logging/run_manager.py b/R2R/r2r/base/logging/run_manager.py
new file mode 100755
index 00000000..ac192bca
--- /dev/null
+++ b/R2R/r2r/base/logging/run_manager.py
@@ -0,0 +1,56 @@
+import contextvars
+import uuid
+from contextlib import asynccontextmanager
+from typing import Any
+
+from .kv_logger import KVLoggingSingleton
+
+run_id_var = contextvars.ContextVar("run_id", default=None)
+
+
+class RunManager:
+ def __init__(self, logger: KVLoggingSingleton):
+ self.logger = logger
+ self.run_info = {}
+
+ def generate_run_id(self) -> uuid.UUID:
+ return uuid.uuid4()
+
+ async def set_run_info(self, pipeline_type: str):
+ run_id = run_id_var.get()
+ if run_id is None:
+ run_id = self.generate_run_id()
+ token = run_id_var.set(run_id)
+ self.run_info[run_id] = {"pipeline_type": pipeline_type}
+ else:
+ token = run_id_var.set(run_id)
+ return run_id, token
+
+ async def get_run_info(self):
+ run_id = run_id_var.get()
+ return self.run_info.get(run_id, None)
+
+ async def log_run_info(
+ self, key: str, value: Any, is_info_log: bool = False
+ ):
+ run_id = run_id_var.get()
+ if run_id:
+ await self.logger.log(
+ log_id=run_id, key=key, value=value, is_info_log=is_info_log
+ )
+
+ async def clear_run_info(self, token: contextvars.Token):
+ run_id = run_id_var.get()
+ run_id_var.reset(token)
+ if run_id and run_id in self.run_info:
+ del self.run_info[run_id]
+
+
+@asynccontextmanager
+async def manage_run(run_manager: RunManager, pipeline_type: str):
+ run_id, token = await run_manager.set_run_info(pipeline_type)
+ try:
+ yield run_id
+ finally:
+ # Note: Do not clear the run info to ensure the run ID remains the same
+ run_id_var.reset(token)