diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/prometheus_client/registry.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/prometheus_client/registry.py | 168 |
1 files changed, 168 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/prometheus_client/registry.py b/.venv/lib/python3.12/site-packages/prometheus_client/registry.py new file mode 100644 index 00000000..694e4bd8 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/prometheus_client/registry.py @@ -0,0 +1,168 @@ +from abc import ABC, abstractmethod +import copy +from threading import Lock +from typing import Dict, Iterable, List, Optional + +from .metrics_core import Metric + + +# Ideally this would be a Protocol, but Protocols are only available in Python >= 3.8. +class Collector(ABC): + @abstractmethod + def collect(self) -> Iterable[Metric]: + pass + + +class _EmptyCollector(Collector): + def collect(self) -> Iterable[Metric]: + return [] + + +class CollectorRegistry(Collector): + """Metric collector registry. + + Collectors must have a no-argument method 'collect' that returns a list of + Metric objects. The returned metrics should be consistent with the Prometheus + exposition formats. + """ + + def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str, str]] = None): + self._collector_to_names: Dict[Collector, List[str]] = {} + self._names_to_collectors: Dict[str, Collector] = {} + self._auto_describe = auto_describe + self._lock = Lock() + self._target_info: Optional[Dict[str, str]] = {} + self.set_target_info(target_info) + + def register(self, collector: Collector) -> None: + """Add a collector to the registry.""" + with self._lock: + names = self._get_names(collector) + duplicates = set(self._names_to_collectors).intersection(names) + if duplicates: + raise ValueError( + 'Duplicated timeseries in CollectorRegistry: {}'.format( + duplicates)) + for name in names: + self._names_to_collectors[name] = collector + self._collector_to_names[collector] = names + + def unregister(self, collector: Collector) -> None: + """Remove a collector from the registry.""" + with self._lock: + for name in self._collector_to_names[collector]: + del self._names_to_collectors[name] + del self._collector_to_names[collector] + + def _get_names(self, collector): + """Get names of timeseries the collector produces and clashes with.""" + desc_func = None + # If there's a describe function, use it. + try: + desc_func = collector.describe + except AttributeError: + pass + # Otherwise, if auto describe is enabled use the collect function. + if not desc_func and self._auto_describe: + desc_func = collector.collect + + if not desc_func: + return [] + + result = [] + type_suffixes = { + 'counter': ['_total', '_created'], + 'summary': ['_sum', '_count', '_created'], + 'histogram': ['_bucket', '_sum', '_count', '_created'], + 'gaugehistogram': ['_bucket', '_gsum', '_gcount'], + 'info': ['_info'], + } + for metric in desc_func(): + result.append(metric.name) + for suffix in type_suffixes.get(metric.type, []): + result.append(metric.name + suffix) + return result + + def collect(self) -> Iterable[Metric]: + """Yields metrics from the collectors in the registry.""" + collectors = None + ti = None + with self._lock: + collectors = copy.copy(self._collector_to_names) + if self._target_info: + ti = self._target_info_metric() + if ti: + yield ti + for collector in collectors: + yield from collector.collect() + + def restricted_registry(self, names: Iterable[str]) -> "RestrictedRegistry": + """Returns object that only collects some metrics. + + Returns an object which upon collect() will return + only samples with the given names. + + Intended usage is: + generate_latest(REGISTRY.restricted_registry(['a_timeseries'])) + + Experimental.""" + names = set(names) + return RestrictedRegistry(names, self) + + def set_target_info(self, labels: Optional[Dict[str, str]]) -> None: + with self._lock: + if labels: + if not self._target_info and 'target_info' in self._names_to_collectors: + raise ValueError('CollectorRegistry already contains a target_info metric') + self._names_to_collectors['target_info'] = _EmptyCollector() + elif self._target_info: + self._names_to_collectors.pop('target_info', None) + self._target_info = labels + + def get_target_info(self) -> Optional[Dict[str, str]]: + with self._lock: + return self._target_info + + def _target_info_metric(self): + m = Metric('target', 'Target metadata', 'info') + m.add_sample('target_info', self._target_info, 1) + return m + + def get_sample_value(self, name: str, labels: Optional[Dict[str, str]] = None) -> Optional[float]: + """Returns the sample value, or None if not found. + + This is inefficient, and intended only for use in unittests. + """ + if labels is None: + labels = {} + for metric in self.collect(): + for s in metric.samples: + if s.name == name and s.labels == labels: + return s.value + return None + + +class RestrictedRegistry: + def __init__(self, names: Iterable[str], registry: CollectorRegistry): + self._name_set = set(names) + self._registry = registry + + def collect(self) -> Iterable[Metric]: + collectors = set() + target_info_metric = None + with self._registry._lock: + if 'target_info' in self._name_set and self._registry._target_info: + target_info_metric = self._registry._target_info_metric() + for name in self._name_set: + if name != 'target_info' and name in self._registry._names_to_collectors: + collectors.add(self._registry._names_to_collectors[name]) + if target_info_metric: + yield target_info_metric + for collector in collectors: + for metric in collector.collect(): + m = metric._restricted_metric(self._name_set) + if m: + yield m + + +REGISTRY = CollectorRegistry(auto_describe=True) |