diff options
Diffstat (limited to 'R2R/r2r/telemetry')
-rwxr-xr-x | R2R/r2r/telemetry/__init__.py | 0 | ||||
-rwxr-xr-x | R2R/r2r/telemetry/events.py | 59 | ||||
-rwxr-xr-x | R2R/r2r/telemetry/posthog.py | 58 | ||||
-rwxr-xr-x | R2R/r2r/telemetry/telemetry_decorator.py | 56 |
4 files changed, 173 insertions, 0 deletions
diff --git a/R2R/r2r/telemetry/__init__.py b/R2R/r2r/telemetry/__init__.py new file mode 100755 index 00000000..e69de29b --- /dev/null +++ b/R2R/r2r/telemetry/__init__.py diff --git a/R2R/r2r/telemetry/events.py b/R2R/r2r/telemetry/events.py new file mode 100755 index 00000000..5bd7528b --- /dev/null +++ b/R2R/r2r/telemetry/events.py @@ -0,0 +1,59 @@ +import uuid +from typing import Any, Dict + + +class BaseTelemetryEvent: + def __init__(self, event_type: str, properties: Dict[str, Any]): + self.event_type = event_type + self.properties = properties + self.event_id = str(uuid.uuid4()) + + +class DailyActiveUserEvent(BaseTelemetryEvent): + def __init__(self, user_id: str): + super().__init__("DailyActiveUser", {"user_id": user_id}) + + +class FeatureUsageEvent(BaseTelemetryEvent): + def __init__(self, user_id: str, feature: str): + super().__init__( + "FeatureUsage", {"user_id": user_id, "feature": feature} + ) + + +class ErrorEvent(BaseTelemetryEvent): + def __init__(self, user_id: str, endpoint: str, error_message: str): + super().__init__( + "Error", + { + "user_id": user_id, + "endpoint": endpoint, + "error_message": error_message, + }, + ) + + +class RequestLatencyEvent(BaseTelemetryEvent): + def __init__(self, endpoint: str, latency: float): + super().__init__( + "RequestLatency", {"endpoint": endpoint, "latency": latency} + ) + + +class GeographicDistributionEvent(BaseTelemetryEvent): + def __init__(self, user_id: str, country: str): + super().__init__( + "GeographicDistribution", {"user_id": user_id, "country": country} + ) + + +class SessionDurationEvent(BaseTelemetryEvent): + def __init__(self, user_id: str, duration: float): + super().__init__( + "SessionDuration", {"user_id": user_id, "duration": duration} + ) + + +class UserPathEvent(BaseTelemetryEvent): + def __init__(self, user_id: str, path: str): + super().__init__("UserPath", {"user_id": user_id, "path": path}) diff --git a/R2R/r2r/telemetry/posthog.py b/R2R/r2r/telemetry/posthog.py new file mode 100755 index 00000000..64e63895 --- /dev/null +++ b/R2R/r2r/telemetry/posthog.py @@ -0,0 +1,58 @@ +import logging +import os + +import posthog + +from r2r.telemetry.events import BaseTelemetryEvent + +logger = logging.getLogger(__name__) + + +class PosthogClient: + """ + This is a write-only project API key, so it can only create new events. It can't + read events or any of your other data stored with PostHog, so it's safe to use in public apps. + """ + + def __init__( + self, api_key: str, enabled: bool = True, debug: bool = False + ): + self.enabled = enabled + self.debug = debug + + if self.enabled: + logger.info( + "Initializing anonymized telemetry. To disable, set TELEMETRY_ENABLED=false in your environment." + ) + posthog.project_api_key = api_key + posthog.disable_geoip = False + else: + posthog.disabled = True + + if self.debug: + posthog.debug = True + + logger.info( + f"Posthog telemetry {'enabled' if self.enabled else 'disabled'}, debug mode {'on' if self.debug else 'off'}" + ) + + def capture(self, event: BaseTelemetryEvent): + if self.enabled: + posthog.capture(event.event_id, event.event_type, event.properties) + + +# Initialize the telemetry client with a flag to enable or disable telemetry +telemetry_enabled = os.getenv("TELEMETRY_ENABLED", "true").lower() in ( + "true", + "1", + "t", +) +debug_mode = os.getenv("DEBUG_MODE", "false").lower() in ( + "true", + "1", + "t", +) +telemetry_client = PosthogClient( + api_key="phc_OPBbibOIErCGc4NDLQsOrMuYFTKDmRwXX6qxnTr6zpU", + enabled=telemetry_enabled, +) diff --git a/R2R/r2r/telemetry/telemetry_decorator.py b/R2R/r2r/telemetry/telemetry_decorator.py new file mode 100755 index 00000000..2938a83e --- /dev/null +++ b/R2R/r2r/telemetry/telemetry_decorator.py @@ -0,0 +1,56 @@ +import asyncio +import logging +from functools import wraps + +from r2r.telemetry.events import ErrorEvent, FeatureUsageEvent +from r2r.telemetry.posthog import telemetry_client + +logger = logging.getLogger(__name__) + + +def telemetry_event(event_name): + def decorator(func): + @wraps(func) + async def async_wrapper(*args, **kwargs): + user_id = kwargs.get("user_id", "unknown_user") + try: + result = await func(*args, **kwargs) + try: + telemetry_client.capture( + FeatureUsageEvent(user_id=user_id, feature=event_name) + ) + except Exception as e: + logger.error(f"Error in telemetry event logging: {str(e)}") + return result + except Exception as e: + try: + telemetry_client.capture( + ErrorEvent( + user_id=user_id, + endpoint=event_name, + error_message=str(e), + ) + ) + except Exception as e: + logger.error(f"Error in telemetry event logging: {str(e)}") + + raise + + @wraps(func) + def sync_wrapper(*args, **kwargs): + loop = asyncio.get_event_loop() + if loop.is_running(): + future = asyncio.run_coroutine_threadsafe( + async_wrapper(*args, **kwargs), loop + ) + return future.result() + else: + return loop.run_until_complete(async_wrapper(*args, **kwargs)) + + return ( + async_wrapper + if asyncio.iscoroutinefunction(func) + else sync_wrapper + ) + + return decorator |