import contextlib import json import logging import statistics from collections import defaultdict from typing import Any, Callable, Dict, List, Optional, Sequence from pydantic import BaseModel logger = logging.getLogger(__name__) class FilterCriteria(BaseModel): filters: Optional[dict[str, str]] = None class LogProcessor: timestamp_format = "%Y-%m-%d %H:%M:%S" def __init__(self, filters: Dict[str, Callable[[Dict[str, Any]], bool]]): self.filters = filters self.populations = {name: [] for name in filters} def process_log(self, log: Dict[str, Any]): for name, filter_func in self.filters.items(): if filter_func(log): self.populations[name].append(log) class StatisticsCalculator: @staticmethod def calculate_statistics( population: List[Dict[str, Any]], stat_functions: Dict[str, Callable[[List[Dict[str, Any]]], Any]], ) -> Dict[str, Any]: return { name: func(population) for name, func in stat_functions.items() } class DistributionGenerator: @staticmethod def generate_distributions( population: List[Dict[str, Any]], dist_functions: Dict[str, Callable[[List[Dict[str, Any]]], Any]], ) -> Dict[str, Any]: return { name: func(population) for name, func in dist_functions.items() } class VisualizationPreparer: @staticmethod def prepare_visualization_data( data: Dict[str, Any], vis_functions: Dict[str, Callable[[Dict[str, Any]], Any]], ) -> Dict[str, Any]: return {name: func(data) for name, func in vis_functions.items()} class LogAnalyticsConfig: def __init__(self, filters, stat_functions, dist_functions, vis_functions): self.filters = filters self.stat_functions = stat_functions self.dist_functions = dist_functions self.vis_functions = vis_functions class AnalysisTypes(BaseModel): analysis_types: Optional[dict[str, Sequence[str]]] = None @staticmethod def generate_bar_chart_data(logs, key): chart_data = {"labels": [], "datasets": []} value_counts = defaultdict(int) for log in logs: if "entries" in log: for entry in log["entries"]: if entry["key"] == key: value_counts[entry["value"]] += 1 elif "key" in log and log["key"] == key: value_counts[log["value"]] += 1 for value, count in value_counts.items(): chart_data["labels"].append(value) chart_data["datasets"].append({"label": key, "data": [count]}) return chart_data @staticmethod def calculate_basic_statistics(logs, key): values = [] for log in logs: if log["key"] == "search_results": results = json.loads(log["value"]) scores = [ float(json.loads(result)["score"]) for result in results ] values.extend(scores) else: value = log.get("value") if value is not None: with contextlib.suppress(ValueError): values.append(float(value)) if not values: return { "Mean": None, "Median": None, "Mode": None, "Standard Deviation": None, "Variance": None, } if len(values) == 1: single_value = round(values[0], 3) return { "Mean": single_value, "Median": single_value, "Mode": single_value, "Standard Deviation": 0, "Variance": 0, } mean = round(sum(values) / len(values), 3) median = round(statistics.median(values), 3) mode = ( round(statistics.mode(values), 3) if len(set(values)) != len(values) else None ) std_dev = round(statistics.stdev(values) if len(values) > 1 else 0, 3) variance = round( statistics.variance(values) if len(values) > 1 else 0, 3 ) return { "Mean": mean, "Median": median, "Mode": mode, "Standard Deviation": std_dev, "Variance": variance, } @staticmethod def calculate_percentile(logs, key, percentile): values = [] for log in logs: if log["key"] == key: value = log.get("value") if value is not None: with contextlib.suppress(ValueError): values.append(float(value)) if not values: return {"percentile": percentile, "value": None} values.sort() index = int((percentile / 100) * (len(values) - 1)) return {"percentile": percentile, "value": round(values[index], 3)} class LogAnalytics: def __init__(self, logs: List[Dict[str, Any]], config: LogAnalyticsConfig): self.logs = logs self.log_processor = LogProcessor(config.filters) self.statistics_calculator = StatisticsCalculator() self.distribution_generator = DistributionGenerator() self.visualization_preparer = VisualizationPreparer() self.config = config def count_logs(self) -> Dict[str, Any]: """Count the logs for each filter.""" return { name: len(population) for name, population in self.log_processor.populations.items() } def process_logs(self) -> Dict[str, Any]: for log in self.logs: self.log_processor.process_log(log) analytics = {} for name, population in self.log_processor.populations.items(): stats = self.statistics_calculator.calculate_statistics( population, self.config.stat_functions ) dists = self.distribution_generator.generate_distributions( population, self.config.dist_functions ) analytics[name] = {"statistics": stats, "distributions": dists} return self.visualization_preparer.prepare_visualization_data( analytics, self.config.vis_functions )