diff options
Diffstat (limited to 'R2R/r2r/base/logging/log_processor.py')
-rwxr-xr-x | R2R/r2r/base/logging/log_processor.py | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/R2R/r2r/base/logging/log_processor.py b/R2R/r2r/base/logging/log_processor.py new file mode 100755 index 00000000..e85d8de2 --- /dev/null +++ b/R2R/r2r/base/logging/log_processor.py @@ -0,0 +1,196 @@ +import contextlib +import json +import logging +import statistics +from collections import defaultdict +from typing import Any, Callable, Dict, List, Optional, Sequence + +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + + +class FilterCriteria(BaseModel): + filters: Optional[dict[str, str]] = None + + +class LogProcessor: + timestamp_format = "%Y-%m-%d %H:%M:%S" + + def __init__(self, filters: Dict[str, Callable[[Dict[str, Any]], bool]]): + self.filters = filters + self.populations = {name: [] for name in filters} + + def process_log(self, log: Dict[str, Any]): + for name, filter_func in self.filters.items(): + if filter_func(log): + self.populations[name].append(log) + + +class StatisticsCalculator: + @staticmethod + def calculate_statistics( + population: List[Dict[str, Any]], + stat_functions: Dict[str, Callable[[List[Dict[str, Any]]], Any]], + ) -> Dict[str, Any]: + return { + name: func(population) for name, func in stat_functions.items() + } + + +class DistributionGenerator: + @staticmethod + def generate_distributions( + population: List[Dict[str, Any]], + dist_functions: Dict[str, Callable[[List[Dict[str, Any]]], Any]], + ) -> Dict[str, Any]: + return { + name: func(population) for name, func in dist_functions.items() + } + + +class VisualizationPreparer: + @staticmethod + def prepare_visualization_data( + data: Dict[str, Any], + vis_functions: Dict[str, Callable[[Dict[str, Any]], Any]], + ) -> Dict[str, Any]: + return {name: func(data) for name, func in vis_functions.items()} + + +class LogAnalyticsConfig: + def __init__(self, filters, stat_functions, dist_functions, vis_functions): + self.filters = filters + self.stat_functions = stat_functions + self.dist_functions = dist_functions + self.vis_functions = vis_functions + + +class AnalysisTypes(BaseModel): + analysis_types: Optional[dict[str, Sequence[str]]] = None + + @staticmethod + def generate_bar_chart_data(logs, key): + chart_data = {"labels": [], "datasets": []} + value_counts = defaultdict(int) + + for log in logs: + if "entries" in log: + for entry in log["entries"]: + if entry["key"] == key: + value_counts[entry["value"]] += 1 + elif "key" in log and log["key"] == key: + value_counts[log["value"]] += 1 + + for value, count in value_counts.items(): + chart_data["labels"].append(value) + chart_data["datasets"].append({"label": key, "data": [count]}) + + return chart_data + + @staticmethod + def calculate_basic_statistics(logs, key): + values = [] + for log in logs: + if log["key"] == "search_results": + results = json.loads(log["value"]) + scores = [ + float(json.loads(result)["score"]) for result in results + ] + values.extend(scores) + else: + value = log.get("value") + if value is not None: + with contextlib.suppress(ValueError): + values.append(float(value)) + + if not values: + return { + "Mean": None, + "Median": None, + "Mode": None, + "Standard Deviation": None, + "Variance": None, + } + + if len(values) == 1: + single_value = round(values[0], 3) + return { + "Mean": single_value, + "Median": single_value, + "Mode": single_value, + "Standard Deviation": 0, + "Variance": 0, + } + + mean = round(sum(values) / len(values), 3) + median = round(statistics.median(values), 3) + mode = ( + round(statistics.mode(values), 3) + if len(set(values)) != len(values) + else None + ) + std_dev = round(statistics.stdev(values) if len(values) > 1 else 0, 3) + variance = round( + statistics.variance(values) if len(values) > 1 else 0, 3 + ) + + return { + "Mean": mean, + "Median": median, + "Mode": mode, + "Standard Deviation": std_dev, + "Variance": variance, + } + + @staticmethod + def calculate_percentile(logs, key, percentile): + values = [] + for log in logs: + if log["key"] == key: + value = log.get("value") + if value is not None: + with contextlib.suppress(ValueError): + values.append(float(value)) + + if not values: + return {"percentile": percentile, "value": None} + + values.sort() + index = int((percentile / 100) * (len(values) - 1)) + return {"percentile": percentile, "value": round(values[index], 3)} + + +class LogAnalytics: + def __init__(self, logs: List[Dict[str, Any]], config: LogAnalyticsConfig): + self.logs = logs + self.log_processor = LogProcessor(config.filters) + self.statistics_calculator = StatisticsCalculator() + self.distribution_generator = DistributionGenerator() + self.visualization_preparer = VisualizationPreparer() + self.config = config + + def count_logs(self) -> Dict[str, Any]: + """Count the logs for each filter.""" + return { + name: len(population) + for name, population in self.log_processor.populations.items() + } + + def process_logs(self) -> Dict[str, Any]: + for log in self.logs: + self.log_processor.process_log(log) + + analytics = {} + for name, population in self.log_processor.populations.items(): + stats = self.statistics_calculator.calculate_statistics( + population, self.config.stat_functions + ) + dists = self.distribution_generator.generate_distributions( + population, self.config.dist_functions + ) + analytics[name] = {"statistics": stats, "distributions": dists} + + return self.visualization_preparer.prepare_visualization_data( + analytics, self.config.vis_functions + ) |