about summary refs log tree commit diff
path: root/R2R/r2r/base/logging/log_processor.py
diff options
context:
space:
mode:
Diffstat (limited to 'R2R/r2r/base/logging/log_processor.py')
-rwxr-xr-xR2R/r2r/base/logging/log_processor.py196
1 files changed, 196 insertions, 0 deletions
diff --git a/R2R/r2r/base/logging/log_processor.py b/R2R/r2r/base/logging/log_processor.py
new file mode 100755
index 00000000..e85d8de2
--- /dev/null
+++ b/R2R/r2r/base/logging/log_processor.py
@@ -0,0 +1,196 @@
+import contextlib
+import json
+import logging
+import statistics
+from collections import defaultdict
+from typing import Any, Callable, Dict, List, Optional, Sequence
+
+from pydantic import BaseModel
+
+logger = logging.getLogger(__name__)
+
+
+class FilterCriteria(BaseModel):
+    filters: Optional[dict[str, str]] = None
+
+
+class LogProcessor:
+    timestamp_format = "%Y-%m-%d %H:%M:%S"
+
+    def __init__(self, filters: Dict[str, Callable[[Dict[str, Any]], bool]]):
+        self.filters = filters
+        self.populations = {name: [] for name in filters}
+
+    def process_log(self, log: Dict[str, Any]):
+        for name, filter_func in self.filters.items():
+            if filter_func(log):
+                self.populations[name].append(log)
+
+
+class StatisticsCalculator:
+    @staticmethod
+    def calculate_statistics(
+        population: List[Dict[str, Any]],
+        stat_functions: Dict[str, Callable[[List[Dict[str, Any]]], Any]],
+    ) -> Dict[str, Any]:
+        return {
+            name: func(population) for name, func in stat_functions.items()
+        }
+
+
+class DistributionGenerator:
+    @staticmethod
+    def generate_distributions(
+        population: List[Dict[str, Any]],
+        dist_functions: Dict[str, Callable[[List[Dict[str, Any]]], Any]],
+    ) -> Dict[str, Any]:
+        return {
+            name: func(population) for name, func in dist_functions.items()
+        }
+
+
+class VisualizationPreparer:
+    @staticmethod
+    def prepare_visualization_data(
+        data: Dict[str, Any],
+        vis_functions: Dict[str, Callable[[Dict[str, Any]], Any]],
+    ) -> Dict[str, Any]:
+        return {name: func(data) for name, func in vis_functions.items()}
+
+
+class LogAnalyticsConfig:
+    def __init__(self, filters, stat_functions, dist_functions, vis_functions):
+        self.filters = filters
+        self.stat_functions = stat_functions
+        self.dist_functions = dist_functions
+        self.vis_functions = vis_functions
+
+
+class AnalysisTypes(BaseModel):
+    analysis_types: Optional[dict[str, Sequence[str]]] = None
+
+    @staticmethod
+    def generate_bar_chart_data(logs, key):
+        chart_data = {"labels": [], "datasets": []}
+        value_counts = defaultdict(int)
+
+        for log in logs:
+            if "entries" in log:
+                for entry in log["entries"]:
+                    if entry["key"] == key:
+                        value_counts[entry["value"]] += 1
+            elif "key" in log and log["key"] == key:
+                value_counts[log["value"]] += 1
+
+        for value, count in value_counts.items():
+            chart_data["labels"].append(value)
+            chart_data["datasets"].append({"label": key, "data": [count]})
+
+        return chart_data
+
+    @staticmethod
+    def calculate_basic_statistics(logs, key):
+        values = []
+        for log in logs:
+            if log["key"] == "search_results":
+                results = json.loads(log["value"])
+                scores = [
+                    float(json.loads(result)["score"]) for result in results
+                ]
+                values.extend(scores)
+            else:
+                value = log.get("value")
+                if value is not None:
+                    with contextlib.suppress(ValueError):
+                        values.append(float(value))
+
+        if not values:
+            return {
+                "Mean": None,
+                "Median": None,
+                "Mode": None,
+                "Standard Deviation": None,
+                "Variance": None,
+            }
+
+        if len(values) == 1:
+            single_value = round(values[0], 3)
+            return {
+                "Mean": single_value,
+                "Median": single_value,
+                "Mode": single_value,
+                "Standard Deviation": 0,
+                "Variance": 0,
+            }
+
+        mean = round(sum(values) / len(values), 3)
+        median = round(statistics.median(values), 3)
+        mode = (
+            round(statistics.mode(values), 3)
+            if len(set(values)) != len(values)
+            else None
+        )
+        std_dev = round(statistics.stdev(values) if len(values) > 1 else 0, 3)
+        variance = round(
+            statistics.variance(values) if len(values) > 1 else 0, 3
+        )
+
+        return {
+            "Mean": mean,
+            "Median": median,
+            "Mode": mode,
+            "Standard Deviation": std_dev,
+            "Variance": variance,
+        }
+
+    @staticmethod
+    def calculate_percentile(logs, key, percentile):
+        values = []
+        for log in logs:
+            if log["key"] == key:
+                value = log.get("value")
+                if value is not None:
+                    with contextlib.suppress(ValueError):
+                        values.append(float(value))
+
+        if not values:
+            return {"percentile": percentile, "value": None}
+
+        values.sort()
+        index = int((percentile / 100) * (len(values) - 1))
+        return {"percentile": percentile, "value": round(values[index], 3)}
+
+
+class LogAnalytics:
+    def __init__(self, logs: List[Dict[str, Any]], config: LogAnalyticsConfig):
+        self.logs = logs
+        self.log_processor = LogProcessor(config.filters)
+        self.statistics_calculator = StatisticsCalculator()
+        self.distribution_generator = DistributionGenerator()
+        self.visualization_preparer = VisualizationPreparer()
+        self.config = config
+
+    def count_logs(self) -> Dict[str, Any]:
+        """Count the logs for each filter."""
+        return {
+            name: len(population)
+            for name, population in self.log_processor.populations.items()
+        }
+
+    def process_logs(self) -> Dict[str, Any]:
+        for log in self.logs:
+            self.log_processor.process_log(log)
+
+        analytics = {}
+        for name, population in self.log_processor.populations.items():
+            stats = self.statistics_calculator.calculate_statistics(
+                population, self.config.stat_functions
+            )
+            dists = self.distribution_generator.generate_distributions(
+                population, self.config.dist_functions
+            )
+            analytics[name] = {"statistics": stats, "distributions": dists}
+
+        return self.visualization_preparer.prepare_visualization_data(
+            analytics, self.config.vis_functions
+        )