import contextlib
import json
import logging
import statistics
from collections import defaultdict
from typing import Any, Callable, Dict, List, Optional, Sequence
from pydantic import BaseModel
logger = logging.getLogger(__name__)
class FilterCriteria(BaseModel):
filters: Optional[dict[str, str]] = None
class LogProcessor:
timestamp_format = "%Y-%m-%d %H:%M:%S"
def __init__(self, filters: Dict[str, Callable[[Dict[str, Any]], bool]]):
self.filters = filters
self.populations = {name: [] for name in filters}
def process_log(self, log: Dict[str, Any]):
for name, filter_func in self.filters.items():
if filter_func(log):
self.populations[name].append(log)
class StatisticsCalculator:
@staticmethod
def calculate_statistics(
population: List[Dict[str, Any]],
stat_functions: Dict[str, Callable[[List[Dict[str, Any]]], Any]],
) -> Dict[str, Any]:
return {
name: func(population) for name, func in stat_functions.items()
}
class DistributionGenerator:
@staticmethod
def generate_distributions(
population: List[Dict[str, Any]],
dist_functions: Dict[str, Callable[[List[Dict[str, Any]]], Any]],
) -> Dict[str, Any]:
return {
name: func(population) for name, func in dist_functions.items()
}
class VisualizationPreparer:
@staticmethod
def prepare_visualization_data(
data: Dict[str, Any],
vis_functions: Dict[str, Callable[[Dict[str, Any]], Any]],
) -> Dict[str, Any]:
return {name: func(data) for name, func in vis_functions.items()}
class LogAnalyticsConfig:
def __init__(self, filters, stat_functions, dist_functions, vis_functions):
self.filters = filters
self.stat_functions = stat_functions
self.dist_functions = dist_functions
self.vis_functions = vis_functions
class AnalysisTypes(BaseModel):
analysis_types: Optional[dict[str, Sequence[str]]] = None
@staticmethod
def generate_bar_chart_data(logs, key):
chart_data = {"labels": [], "datasets": []}
value_counts = defaultdict(int)
for log in logs:
if "entries" in log:
for entry in log["entries"]:
if entry["key"] == key:
value_counts[entry["value"]] += 1
elif "key" in log and log["key"] == key:
value_counts[log["value"]] += 1
for value, count in value_counts.items():
chart_data["labels"].append(value)
chart_data["datasets"].append({"label": key, "data": [count]})
return chart_data
@staticmethod
def calculate_basic_statistics(logs, key):
values = []
for log in logs:
if log["key"] == "search_results":
results = json.loads(log["value"])
scores = [
float(json.loads(result)["score"]) for result in results
]
values.extend(scores)
else:
value = log.get("value")
if value is not None:
with contextlib.suppress(ValueError):
values.append(float(value))
if not values:
return {
"Mean": None,
"Median": None,
"Mode": None,
"Standard Deviation": None,
"Variance": None,
}
if len(values) == 1:
single_value = round(values[0], 3)
return {
"Mean": single_value,
"Median": single_value,
"Mode": single_value,
"Standard Deviation": 0,
"Variance": 0,
}
mean = round(sum(values) / len(values), 3)
median = round(statistics.median(values), 3)
mode = (
round(statistics.mode(values), 3)
if len(set(values)) != len(values)
else None
)
std_dev = round(statistics.stdev(values) if len(values) > 1 else 0, 3)
variance = round(
statistics.variance(values) if len(values) > 1 else 0, 3
)
return {
"Mean": mean,
"Median": median,
"Mode": mode,
"Standard Deviation": std_dev,
"Variance": variance,
}
@staticmethod
def calculate_percentile(logs, key, percentile):
values = []
for log in logs:
if log["key"] == key:
value = log.get("value")
if value is not None:
with contextlib.suppress(ValueError):
values.append(float(value))
if not values:
return {"percentile": percentile, "value": None}
values.sort()
index = int((percentile / 100) * (len(values) - 1))
return {"percentile": percentile, "value": round(values[index], 3)}
class LogAnalytics:
def __init__(self, logs: List[Dict[str, Any]], config: LogAnalyticsConfig):
self.logs = logs
self.log_processor = LogProcessor(config.filters)
self.statistics_calculator = StatisticsCalculator()
self.distribution_generator = DistributionGenerator()
self.visualization_preparer = VisualizationPreparer()
self.config = config
def count_logs(self) -> Dict[str, Any]:
"""Count the logs for each filter."""
return {
name: len(population)
for name, population in self.log_processor.populations.items()
}
def process_logs(self) -> Dict[str, Any]:
for log in self.logs:
self.log_processor.process_log(log)
analytics = {}
for name, population in self.log_processor.populations.items():
stats = self.statistics_calculator.calculate_statistics(
population, self.config.stat_functions
)
dists = self.distribution_generator.generate_distributions(
population, self.config.dist_functions
)
analytics[name] = {"statistics": stats, "distributions": dists}
return self.visualization_preparer.prepare_visualization_data(
analytics, self.config.vis_functions
)