aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/prometheus_client/registry.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/prometheus_client/registry.py')
-rw-r--r--.venv/lib/python3.12/site-packages/prometheus_client/registry.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/prometheus_client/registry.py b/.venv/lib/python3.12/site-packages/prometheus_client/registry.py
new file mode 100644
index 00000000..694e4bd8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/prometheus_client/registry.py
@@ -0,0 +1,168 @@
+from abc import ABC, abstractmethod
+import copy
+from threading import Lock
+from typing import Dict, Iterable, List, Optional
+
+from .metrics_core import Metric
+
+
+# Ideally this would be a Protocol, but Protocols are only available in Python >= 3.8.
+class Collector(ABC):
+ @abstractmethod
+ def collect(self) -> Iterable[Metric]:
+ pass
+
+
+class _EmptyCollector(Collector):
+ def collect(self) -> Iterable[Metric]:
+ return []
+
+
+class CollectorRegistry(Collector):
+ """Metric collector registry.
+
+ Collectors must have a no-argument method 'collect' that returns a list of
+ Metric objects. The returned metrics should be consistent with the Prometheus
+ exposition formats.
+ """
+
+ def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str, str]] = None):
+ self._collector_to_names: Dict[Collector, List[str]] = {}
+ self._names_to_collectors: Dict[str, Collector] = {}
+ self._auto_describe = auto_describe
+ self._lock = Lock()
+ self._target_info: Optional[Dict[str, str]] = {}
+ self.set_target_info(target_info)
+
+ def register(self, collector: Collector) -> None:
+ """Add a collector to the registry."""
+ with self._lock:
+ names = self._get_names(collector)
+ duplicates = set(self._names_to_collectors).intersection(names)
+ if duplicates:
+ raise ValueError(
+ 'Duplicated timeseries in CollectorRegistry: {}'.format(
+ duplicates))
+ for name in names:
+ self._names_to_collectors[name] = collector
+ self._collector_to_names[collector] = names
+
+ def unregister(self, collector: Collector) -> None:
+ """Remove a collector from the registry."""
+ with self._lock:
+ for name in self._collector_to_names[collector]:
+ del self._names_to_collectors[name]
+ del self._collector_to_names[collector]
+
+ def _get_names(self, collector):
+ """Get names of timeseries the collector produces and clashes with."""
+ desc_func = None
+ # If there's a describe function, use it.
+ try:
+ desc_func = collector.describe
+ except AttributeError:
+ pass
+ # Otherwise, if auto describe is enabled use the collect function.
+ if not desc_func and self._auto_describe:
+ desc_func = collector.collect
+
+ if not desc_func:
+ return []
+
+ result = []
+ type_suffixes = {
+ 'counter': ['_total', '_created'],
+ 'summary': ['_sum', '_count', '_created'],
+ 'histogram': ['_bucket', '_sum', '_count', '_created'],
+ 'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
+ 'info': ['_info'],
+ }
+ for metric in desc_func():
+ result.append(metric.name)
+ for suffix in type_suffixes.get(metric.type, []):
+ result.append(metric.name + suffix)
+ return result
+
+ def collect(self) -> Iterable[Metric]:
+ """Yields metrics from the collectors in the registry."""
+ collectors = None
+ ti = None
+ with self._lock:
+ collectors = copy.copy(self._collector_to_names)
+ if self._target_info:
+ ti = self._target_info_metric()
+ if ti:
+ yield ti
+ for collector in collectors:
+ yield from collector.collect()
+
+ def restricted_registry(self, names: Iterable[str]) -> "RestrictedRegistry":
+ """Returns object that only collects some metrics.
+
+ Returns an object which upon collect() will return
+ only samples with the given names.
+
+ Intended usage is:
+ generate_latest(REGISTRY.restricted_registry(['a_timeseries']))
+
+ Experimental."""
+ names = set(names)
+ return RestrictedRegistry(names, self)
+
+ def set_target_info(self, labels: Optional[Dict[str, str]]) -> None:
+ with self._lock:
+ if labels:
+ if not self._target_info and 'target_info' in self._names_to_collectors:
+ raise ValueError('CollectorRegistry already contains a target_info metric')
+ self._names_to_collectors['target_info'] = _EmptyCollector()
+ elif self._target_info:
+ self._names_to_collectors.pop('target_info', None)
+ self._target_info = labels
+
+ def get_target_info(self) -> Optional[Dict[str, str]]:
+ with self._lock:
+ return self._target_info
+
+ def _target_info_metric(self):
+ m = Metric('target', 'Target metadata', 'info')
+ m.add_sample('target_info', self._target_info, 1)
+ return m
+
+ def get_sample_value(self, name: str, labels: Optional[Dict[str, str]] = None) -> Optional[float]:
+ """Returns the sample value, or None if not found.
+
+ This is inefficient, and intended only for use in unittests.
+ """
+ if labels is None:
+ labels = {}
+ for metric in self.collect():
+ for s in metric.samples:
+ if s.name == name and s.labels == labels:
+ return s.value
+ return None
+
+
+class RestrictedRegistry:
+ def __init__(self, names: Iterable[str], registry: CollectorRegistry):
+ self._name_set = set(names)
+ self._registry = registry
+
+ def collect(self) -> Iterable[Metric]:
+ collectors = set()
+ target_info_metric = None
+ with self._registry._lock:
+ if 'target_info' in self._name_set and self._registry._target_info:
+ target_info_metric = self._registry._target_info_metric()
+ for name in self._name_set:
+ if name != 'target_info' and name in self._registry._names_to_collectors:
+ collectors.add(self._registry._names_to_collectors[name])
+ if target_info_metric:
+ yield target_info_metric
+ for collector in collectors:
+ for metric in collector.collect():
+ m = metric._restricted_metric(self._name_set)
+ if m:
+ yield m
+
+
+REGISTRY = CollectorRegistry(auto_describe=True)