about summary refs log tree commit diff
import contextlib
import json
import logging
import statistics
from collections import defaultdict
from typing import Any, Callable, Dict, List, Optional, Sequence

from pydantic import BaseModel

logger = logging.getLogger(__name__)


class FilterCriteria(BaseModel):
    filters: Optional[dict[str, str]] = None


class LogProcessor:
    timestamp_format = "%Y-%m-%d %H:%M:%S"

    def __init__(self, filters: Dict[str, Callable[[Dict[str, Any]], bool]]):
        self.filters = filters
        self.populations = {name: [] for name in filters}

    def process_log(self, log: Dict[str, Any]):
        for name, filter_func in self.filters.items():
            if filter_func(log):
                self.populations[name].append(log)


class StatisticsCalculator:
    @staticmethod
    def calculate_statistics(
        population: List[Dict[str, Any]],
        stat_functions: Dict[str, Callable[[List[Dict[str, Any]]], Any]],
    ) -> Dict[str, Any]:
        return {
            name: func(population) for name, func in stat_functions.items()
        }


class DistributionGenerator:
    @staticmethod
    def generate_distributions(
        population: List[Dict[str, Any]],
        dist_functions: Dict[str, Callable[[List[Dict[str, Any]]], Any]],
    ) -> Dict[str, Any]:
        return {
            name: func(population) for name, func in dist_functions.items()
        }


class VisualizationPreparer:
    @staticmethod
    def prepare_visualization_data(
        data: Dict[str, Any],
        vis_functions: Dict[str, Callable[[Dict[str, Any]], Any]],
    ) -> Dict[str, Any]:
        return {name: func(data) for name, func in vis_functions.items()}


class LogAnalyticsConfig:
    def __init__(self, filters, stat_functions, dist_functions, vis_functions):
        self.filters = filters
        self.stat_functions = stat_functions
        self.dist_functions = dist_functions
        self.vis_functions = vis_functions


class AnalysisTypes(BaseModel):
    analysis_types: Optional[dict[str, Sequence[str]]] = None

    @staticmethod
    def generate_bar_chart_data(logs, key):
        chart_data = {"labels": [], "datasets": []}
        value_counts = defaultdict(int)

        for log in logs:
            if "entries" in log:
                for entry in log["entries"]:
                    if entry["key"] == key:
                        value_counts[entry["value"]] += 1
            elif "key" in log and log["key"] == key:
                value_counts[log["value"]] += 1

        for value, count in value_counts.items():
            chart_data["labels"].append(value)
            chart_data["datasets"].append({"label": key, "data": [count]})

        return chart_data

    @staticmethod
    def calculate_basic_statistics(logs, key):
        values = []
        for log in logs:
            if log["key"] == "search_results":
                results = json.loads(log["value"])
                scores = [
                    float(json.loads(result)["score"]) for result in results
                ]
                values.extend(scores)
            else:
                value = log.get("value")
                if value is not None:
                    with contextlib.suppress(ValueError):
                        values.append(float(value))

        if not values:
            return {
                "Mean": None,
                "Median": None,
                "Mode": None,
                "Standard Deviation": None,
                "Variance": None,
            }

        if len(values) == 1:
            single_value = round(values[0], 3)
            return {
                "Mean": single_value,
                "Median": single_value,
                "Mode": single_value,
                "Standard Deviation": 0,
                "Variance": 0,
            }

        mean = round(sum(values) / len(values), 3)
        median = round(statistics.median(values), 3)
        mode = (
            round(statistics.mode(values), 3)
            if len(set(values)) != len(values)
            else None
        )
        std_dev = round(statistics.stdev(values) if len(values) > 1 else 0, 3)
        variance = round(
            statistics.variance(values) if len(values) > 1 else 0, 3
        )

        return {
            "Mean": mean,
            "Median": median,
            "Mode": mode,
            "Standard Deviation": std_dev,
            "Variance": variance,
        }

    @staticmethod
    def calculate_percentile(logs, key, percentile):
        values = []
        for log in logs:
            if log["key"] == key:
                value = log.get("value")
                if value is not None:
                    with contextlib.suppress(ValueError):
                        values.append(float(value))

        if not values:
            return {"percentile": percentile, "value": None}

        values.sort()
        index = int((percentile / 100) * (len(values) - 1))
        return {"percentile": percentile, "value": round(values[index], 3)}


class LogAnalytics:
    def __init__(self, logs: List[Dict[str, Any]], config: LogAnalyticsConfig):
        self.logs = logs
        self.log_processor = LogProcessor(config.filters)
        self.statistics_calculator = StatisticsCalculator()
        self.distribution_generator = DistributionGenerator()
        self.visualization_preparer = VisualizationPreparer()
        self.config = config

    def count_logs(self) -> Dict[str, Any]:
        """Count the logs for each filter."""
        return {
            name: len(population)
            for name, population in self.log_processor.populations.items()
        }

    def process_logs(self) -> Dict[str, Any]:
        for log in self.logs:
            self.log_processor.process_log(log)

        analytics = {}
        for name, population in self.log_processor.populations.items():
            stats = self.statistics_calculator.calculate_statistics(
                population, self.config.stat_functions
            )
            dists = self.distribution_generator.generate_distributions(
                population, self.config.dist_functions
            )
            analytics[name] = {"statistics": stats, "distributions": dists}

        return self.visualization_preparer.prepare_visualization_data(
            analytics, self.config.vis_functions
        )