aboutsummaryrefslogtreecommitdiff
path: root/R2R/r2r/base/logging/log_processor.py
blob: e85d8de2e3d671393036cba84d7b39685d508296 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import contextlib
import json
import logging
import statistics
from collections import defaultdict
from typing import Any, Callable, Dict, List, Optional, Sequence

from pydantic import BaseModel

logger = logging.getLogger(__name__)


class FilterCriteria(BaseModel):
    filters: Optional[dict[str, str]] = None


class LogProcessor:
    timestamp_format = "%Y-%m-%d %H:%M:%S"

    def __init__(self, filters: Dict[str, Callable[[Dict[str, Any]], bool]]):
        self.filters = filters
        self.populations = {name: [] for name in filters}

    def process_log(self, log: Dict[str, Any]):
        for name, filter_func in self.filters.items():
            if filter_func(log):
                self.populations[name].append(log)


class StatisticsCalculator:
    @staticmethod
    def calculate_statistics(
        population: List[Dict[str, Any]],
        stat_functions: Dict[str, Callable[[List[Dict[str, Any]]], Any]],
    ) -> Dict[str, Any]:
        return {
            name: func(population) for name, func in stat_functions.items()
        }


class DistributionGenerator:
    @staticmethod
    def generate_distributions(
        population: List[Dict[str, Any]],
        dist_functions: Dict[str, Callable[[List[Dict[str, Any]]], Any]],
    ) -> Dict[str, Any]:
        return {
            name: func(population) for name, func in dist_functions.items()
        }


class VisualizationPreparer:
    @staticmethod
    def prepare_visualization_data(
        data: Dict[str, Any],
        vis_functions: Dict[str, Callable[[Dict[str, Any]], Any]],
    ) -> Dict[str, Any]:
        return {name: func(data) for name, func in vis_functions.items()}


class LogAnalyticsConfig:
    def __init__(self, filters, stat_functions, dist_functions, vis_functions):
        self.filters = filters
        self.stat_functions = stat_functions
        self.dist_functions = dist_functions
        self.vis_functions = vis_functions


class AnalysisTypes(BaseModel):
    analysis_types: Optional[dict[str, Sequence[str]]] = None

    @staticmethod
    def generate_bar_chart_data(logs, key):
        chart_data = {"labels": [], "datasets": []}
        value_counts = defaultdict(int)

        for log in logs:
            if "entries" in log:
                for entry in log["entries"]:
                    if entry["key"] == key:
                        value_counts[entry["value"]] += 1
            elif "key" in log and log["key"] == key:
                value_counts[log["value"]] += 1

        for value, count in value_counts.items():
            chart_data["labels"].append(value)
            chart_data["datasets"].append({"label": key, "data": [count]})

        return chart_data

    @staticmethod
    def calculate_basic_statistics(logs, key):
        values = []
        for log in logs:
            if log["key"] == "search_results":
                results = json.loads(log["value"])
                scores = [
                    float(json.loads(result)["score"]) for result in results
                ]
                values.extend(scores)
            else:
                value = log.get("value")
                if value is not None:
                    with contextlib.suppress(ValueError):
                        values.append(float(value))

        if not values:
            return {
                "Mean": None,
                "Median": None,
                "Mode": None,
                "Standard Deviation": None,
                "Variance": None,
            }

        if len(values) == 1:
            single_value = round(values[0], 3)
            return {
                "Mean": single_value,
                "Median": single_value,
                "Mode": single_value,
                "Standard Deviation": 0,
                "Variance": 0,
            }

        mean = round(sum(values) / len(values), 3)
        median = round(statistics.median(values), 3)
        mode = (
            round(statistics.mode(values), 3)
            if len(set(values)) != len(values)
            else None
        )
        std_dev = round(statistics.stdev(values) if len(values) > 1 else 0, 3)
        variance = round(
            statistics.variance(values) if len(values) > 1 else 0, 3
        )

        return {
            "Mean": mean,
            "Median": median,
            "Mode": mode,
            "Standard Deviation": std_dev,
            "Variance": variance,
        }

    @staticmethod
    def calculate_percentile(logs, key, percentile):
        values = []
        for log in logs:
            if log["key"] == key:
                value = log.get("value")
                if value is not None:
                    with contextlib.suppress(ValueError):
                        values.append(float(value))

        if not values:
            return {"percentile": percentile, "value": None}

        values.sort()
        index = int((percentile / 100) * (len(values) - 1))
        return {"percentile": percentile, "value": round(values[index], 3)}


class LogAnalytics:
    def __init__(self, logs: List[Dict[str, Any]], config: LogAnalyticsConfig):
        self.logs = logs
        self.log_processor = LogProcessor(config.filters)
        self.statistics_calculator = StatisticsCalculator()
        self.distribution_generator = DistributionGenerator()
        self.visualization_preparer = VisualizationPreparer()
        self.config = config

    def count_logs(self) -> Dict[str, Any]:
        """Count the logs for each filter."""
        return {
            name: len(population)
            for name, population in self.log_processor.populations.items()
        }

    def process_logs(self) -> Dict[str, Any]:
        for log in self.logs:
            self.log_processor.process_log(log)

        analytics = {}
        for name, population in self.log_processor.populations.items():
            stats = self.statistics_calculator.calculate_statistics(
                population, self.config.stat_functions
            )
            dists = self.distribution_generator.generate_distributions(
                population, self.config.dist_functions
            )
            analytics[name] = {"statistics": stats, "distributions": dists}

        return self.visualization_preparer.prepare_visualization_data(
            analytics, self.config.vis_functions
        )