From 4a52a71956a8d46fcb7294ac71734504bb09bcc2 Mon Sep 17 00:00:00 2001 From: S. Solomon Darnell Date: Fri, 28 Mar 2025 21:52:21 -0500 Subject: two version of R2R are here --- .../site-packages/prometheus_client/metrics.py | 776 +++++++++++++++++++++ 1 file changed, 776 insertions(+) create mode 100644 .venv/lib/python3.12/site-packages/prometheus_client/metrics.py (limited to '.venv/lib/python3.12/site-packages/prometheus_client/metrics.py') diff --git a/.venv/lib/python3.12/site-packages/prometheus_client/metrics.py b/.venv/lib/python3.12/site-packages/prometheus_client/metrics.py new file mode 100644 index 00000000..af512115 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/prometheus_client/metrics.py @@ -0,0 +1,776 @@ +import os +from threading import Lock +import time +import types +from typing import ( + Any, Callable, Dict, Iterable, List, Literal, Optional, Sequence, Tuple, + Type, TypeVar, Union, +) +import warnings + +from . import values # retain this import style for testability +from .context_managers import ExceptionCounter, InprogressTracker, Timer +from .metrics_core import ( + Metric, METRIC_LABEL_NAME_RE, METRIC_NAME_RE, + RESERVED_METRIC_LABEL_NAME_RE, +) +from .registry import Collector, CollectorRegistry, REGISTRY +from .samples import Exemplar, Sample +from .utils import floatToGoString, INF + +T = TypeVar('T', bound='MetricWrapperBase') +F = TypeVar("F", bound=Callable[..., Any]) + + +def _build_full_name(metric_type, name, namespace, subsystem, unit): + full_name = '' + if namespace: + full_name += namespace + '_' + if subsystem: + full_name += subsystem + '_' + full_name += name + if metric_type == 'counter' and full_name.endswith('_total'): + full_name = full_name[:-6] # Munge to OpenMetrics. + if unit and not full_name.endswith("_" + unit): + full_name += "_" + unit + if unit and metric_type in ('info', 'stateset'): + raise ValueError('Metric name is of a type that cannot have a unit: ' + full_name) + return full_name + + +def _validate_labelname(l): + if not METRIC_LABEL_NAME_RE.match(l): + raise ValueError('Invalid label metric name: ' + l) + if RESERVED_METRIC_LABEL_NAME_RE.match(l): + raise ValueError('Reserved label metric name: ' + l) + + +def _validate_labelnames(cls, labelnames): + labelnames = tuple(labelnames) + for l in labelnames: + _validate_labelname(l) + if l in cls._reserved_labelnames: + raise ValueError('Reserved label metric name: ' + l) + return labelnames + + +def _validate_exemplar(exemplar): + runes = 0 + for k, v in exemplar.items(): + _validate_labelname(k) + runes += len(k) + runes += len(v) + if runes > 128: + raise ValueError('Exemplar labels have %d UTF-8 characters, exceeding the limit of 128') + + +def _get_use_created() -> bool: + return os.environ.get("PROMETHEUS_DISABLE_CREATED_SERIES", 'False').lower() not in ('true', '1', 't') + + +_use_created = _get_use_created() + + +def disable_created_metrics(): + """Disable exporting _created metrics on counters, histograms, and summaries.""" + global _use_created + _use_created = False + + +def enable_created_metrics(): + """Enable exporting _created metrics on counters, histograms, and summaries.""" + global _use_created + _use_created = True + + +class MetricWrapperBase(Collector): + _type: Optional[str] = None + _reserved_labelnames: Sequence[str] = () + + def _is_observable(self): + # Whether this metric is observable, i.e. + # * a metric without label names and values, or + # * the child of a labelled metric. + return not self._labelnames or (self._labelnames and self._labelvalues) + + def _raise_if_not_observable(self): + # Functions that mutate the state of the metric, for example incrementing + # a counter, will fail if the metric is not observable, because only if a + # metric is observable will the value be initialized. + if not self._is_observable(): + raise ValueError('%s metric is missing label values' % str(self._type)) + + def _is_parent(self): + return self._labelnames and not self._labelvalues + + def _get_metric(self): + return Metric(self._name, self._documentation, self._type, self._unit) + + def describe(self) -> Iterable[Metric]: + return [self._get_metric()] + + def collect(self) -> Iterable[Metric]: + metric = self._get_metric() + for suffix, labels, value, timestamp, exemplar in self._samples(): + metric.add_sample(self._name + suffix, labels, value, timestamp, exemplar) + return [metric] + + def __str__(self) -> str: + return f"{self._type}:{self._name}" + + def __repr__(self) -> str: + metric_type = type(self) + return f"{metric_type.__module__}.{metric_type.__name__}({self._name})" + + def __init__(self: T, + name: str, + documentation: str, + labelnames: Iterable[str] = (), + namespace: str = '', + subsystem: str = '', + unit: str = '', + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + ) -> None: + self._name = _build_full_name(self._type, name, namespace, subsystem, unit) + self._labelnames = _validate_labelnames(self, labelnames) + self._labelvalues = tuple(_labelvalues or ()) + self._kwargs: Dict[str, Any] = {} + self._documentation = documentation + self._unit = unit + + if not METRIC_NAME_RE.match(self._name): + raise ValueError('Invalid metric name: ' + self._name) + + if self._is_parent(): + # Prepare the fields needed for child metrics. + self._lock = Lock() + self._metrics: Dict[Sequence[str], T] = {} + + if self._is_observable(): + self._metric_init() + + if not self._labelvalues: + # Register the multi-wrapper parent metric, or if a label-less metric, the whole shebang. + if registry: + registry.register(self) + + def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T: + """Return the child for the given labelset. + + All metrics can have labels, allowing grouping of related time series. + Taking a counter as an example: + + from prometheus_client import Counter + + c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint']) + c.labels('get', '/').inc() + c.labels('post', '/submit').inc() + + Labels can also be provided as keyword arguments: + + from prometheus_client import Counter + + c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint']) + c.labels(method='get', endpoint='/').inc() + c.labels(method='post', endpoint='/submit').inc() + + See the best practices on [naming](http://prometheus.io/docs/practices/naming/) + and [labels](http://prometheus.io/docs/practices/instrumentation/#use-labels). + """ + if not self._labelnames: + raise ValueError('No label names were set when constructing %s' % self) + + if self._labelvalues: + raise ValueError('{} already has labels set ({}); can not chain calls to .labels()'.format( + self, + dict(zip(self._labelnames, self._labelvalues)) + )) + + if labelvalues and labelkwargs: + raise ValueError("Can't pass both *args and **kwargs") + + if labelkwargs: + if sorted(labelkwargs) != sorted(self._labelnames): + raise ValueError('Incorrect label names') + labelvalues = tuple(str(labelkwargs[l]) for l in self._labelnames) + else: + if len(labelvalues) != len(self._labelnames): + raise ValueError('Incorrect label count') + labelvalues = tuple(str(l) for l in labelvalues) + with self._lock: + if labelvalues not in self._metrics: + self._metrics[labelvalues] = self.__class__( + self._name, + documentation=self._documentation, + labelnames=self._labelnames, + unit=self._unit, + _labelvalues=labelvalues, + **self._kwargs + ) + return self._metrics[labelvalues] + + def remove(self, *labelvalues: Any) -> None: + if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ: + warnings.warn( + "Removal of labels has not been implemented in multi-process mode yet.", + UserWarning) + + if not self._labelnames: + raise ValueError('No label names were set when constructing %s' % self) + + """Remove the given labelset from the metric.""" + if len(labelvalues) != len(self._labelnames): + raise ValueError('Incorrect label count (expected %d, got %s)' % (len(self._labelnames), labelvalues)) + labelvalues = tuple(str(l) for l in labelvalues) + with self._lock: + del self._metrics[labelvalues] + + def clear(self) -> None: + """Remove all labelsets from the metric""" + if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ: + warnings.warn( + "Clearing labels has not been implemented in multi-process mode yet", + UserWarning) + with self._lock: + self._metrics = {} + + def _samples(self) -> Iterable[Sample]: + if self._is_parent(): + return self._multi_samples() + else: + return self._child_samples() + + def _multi_samples(self) -> Iterable[Sample]: + with self._lock: + metrics = self._metrics.copy() + for labels, metric in metrics.items(): + series_labels = list(zip(self._labelnames, labels)) + for suffix, sample_labels, value, timestamp, exemplar in metric._samples(): + yield Sample(suffix, dict(series_labels + list(sample_labels.items())), value, timestamp, exemplar) + + def _child_samples(self) -> Iterable[Sample]: # pragma: no cover + raise NotImplementedError('_child_samples() must be implemented by %r' % self) + + def _metric_init(self): # pragma: no cover + """ + Initialize the metric object as a child, i.e. when it has labels (if any) set. + + This is factored as a separate function to allow for deferred initialization. + """ + raise NotImplementedError('_metric_init() must be implemented by %r' % self) + + +class Counter(MetricWrapperBase): + """A Counter tracks counts of events or running totals. + + Example use cases for Counters: + - Number of requests processed + - Number of items that were inserted into a queue + - Total amount of data that a system has processed + + Counters can only go up (and be reset when the process restarts). If your use case can go down, + you should use a Gauge instead. + + An example for a Counter: + + from prometheus_client import Counter + + c = Counter('my_failures_total', 'Description of counter') + c.inc() # Increment by 1 + c.inc(1.6) # Increment by given value + + There are utilities to count exceptions raised: + + @c.count_exceptions() + def f(): + pass + + with c.count_exceptions(): + pass + + # Count only one type of exception + with c.count_exceptions(ValueError): + pass + + You can also reset the counter to zero in case your logical "process" restarts + without restarting the actual python process. + + c.reset() + + """ + _type = 'counter' + + def _metric_init(self) -> None: + self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames, + self._labelvalues, self._documentation) + self._created = time.time() + + def inc(self, amount: float = 1, exemplar: Optional[Dict[str, str]] = None) -> None: + """Increment counter by the given amount.""" + self._raise_if_not_observable() + if amount < 0: + raise ValueError('Counters can only be incremented by non-negative amounts.') + self._value.inc(amount) + if exemplar: + _validate_exemplar(exemplar) + self._value.set_exemplar(Exemplar(exemplar, amount, time.time())) + + def reset(self) -> None: + """Reset the counter to zero. Use this when a logical process restarts without restarting the actual python process.""" + self._value.set(0) + self._created = time.time() + + def count_exceptions(self, exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception) -> ExceptionCounter: + """Count exceptions in a block of code or function. + + Can be used as a function decorator or context manager. + Increments the counter when an exception of the given + type is raised up out of the code. + """ + self._raise_if_not_observable() + return ExceptionCounter(self, exception) + + def _child_samples(self) -> Iterable[Sample]: + sample = Sample('_total', {}, self._value.get(), None, self._value.get_exemplar()) + if _use_created: + return ( + sample, + Sample('_created', {}, self._created, None, None) + ) + return (sample,) + + +class Gauge(MetricWrapperBase): + """Gauge metric, to report instantaneous values. + + Examples of Gauges include: + - Inprogress requests + - Number of items in a queue + - Free memory + - Total memory + - Temperature + + Gauges can go both up and down. + + from prometheus_client import Gauge + + g = Gauge('my_inprogress_requests', 'Description of gauge') + g.inc() # Increment by 1 + g.dec(10) # Decrement by given value + g.set(4.2) # Set to a given value + + There are utilities for common use cases: + + g.set_to_current_time() # Set to current unixtime + + # Increment when entered, decrement when exited. + @g.track_inprogress() + def f(): + pass + + with g.track_inprogress(): + pass + + A Gauge can also take its value from a callback: + + d = Gauge('data_objects', 'Number of objects') + my_dict = {} + d.set_function(lambda: len(my_dict)) + """ + _type = 'gauge' + _MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent')) + _MOST_RECENT_MODES = frozenset(('mostrecent', 'livemostrecent')) + + def __init__(self, + name: str, + documentation: str, + labelnames: Iterable[str] = (), + namespace: str = '', + subsystem: str = '', + unit: str = '', + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all', + ): + self._multiprocess_mode = multiprocess_mode + if multiprocess_mode not in self._MULTIPROC_MODES: + raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode) + super().__init__( + name=name, + documentation=documentation, + labelnames=labelnames, + namespace=namespace, + subsystem=subsystem, + unit=unit, + registry=registry, + _labelvalues=_labelvalues, + ) + self._kwargs['multiprocess_mode'] = self._multiprocess_mode + self._is_most_recent = self._multiprocess_mode in self._MOST_RECENT_MODES + + def _metric_init(self) -> None: + self._value = values.ValueClass( + self._type, self._name, self._name, self._labelnames, self._labelvalues, + self._documentation, multiprocess_mode=self._multiprocess_mode + ) + + def inc(self, amount: float = 1) -> None: + """Increment gauge by the given amount.""" + if self._is_most_recent: + raise RuntimeError("inc must not be used with the mostrecent mode") + self._raise_if_not_observable() + self._value.inc(amount) + + def dec(self, amount: float = 1) -> None: + """Decrement gauge by the given amount.""" + if self._is_most_recent: + raise RuntimeError("dec must not be used with the mostrecent mode") + self._raise_if_not_observable() + self._value.inc(-amount) + + def set(self, value: float) -> None: + """Set gauge to the given value.""" + self._raise_if_not_observable() + if self._is_most_recent: + self._value.set(float(value), timestamp=time.time()) + else: + self._value.set(float(value)) + + def set_to_current_time(self) -> None: + """Set gauge to the current unixtime.""" + self.set(time.time()) + + def track_inprogress(self) -> InprogressTracker: + """Track inprogress blocks of code or functions. + + Can be used as a function decorator or context manager. + Increments the gauge when the code is entered, + and decrements when it is exited. + """ + self._raise_if_not_observable() + return InprogressTracker(self) + + def time(self) -> Timer: + """Time a block of code or function, and set the duration in seconds. + + Can be used as a function decorator or context manager. + """ + return Timer(self, 'set') + + def set_function(self, f: Callable[[], float]) -> None: + """Call the provided function to return the Gauge value. + + The function must return a float, and may be called from + multiple threads. All other methods of the Gauge become NOOPs. + """ + + self._raise_if_not_observable() + + def samples(_: Gauge) -> Iterable[Sample]: + return (Sample('', {}, float(f()), None, None),) + + self._child_samples = types.MethodType(samples, self) # type: ignore + + def _child_samples(self) -> Iterable[Sample]: + return (Sample('', {}, self._value.get(), None, None),) + + +class Summary(MetricWrapperBase): + """A Summary tracks the size and number of events. + + Example use cases for Summaries: + - Response latency + - Request size + + Example for a Summary: + + from prometheus_client import Summary + + s = Summary('request_size_bytes', 'Request size (bytes)') + s.observe(512) # Observe 512 (bytes) + + Example for a Summary using time: + + from prometheus_client import Summary + + REQUEST_TIME = Summary('response_latency_seconds', 'Response latency (seconds)') + + @REQUEST_TIME.time() + def create_response(request): + '''A dummy function''' + time.sleep(1) + + Example for using the same Summary object as a context manager: + + with REQUEST_TIME.time(): + pass # Logic to be timed + """ + _type = 'summary' + _reserved_labelnames = ['quantile'] + + def _metric_init(self) -> None: + self._count = values.ValueClass(self._type, self._name, self._name + '_count', self._labelnames, + self._labelvalues, self._documentation) + self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation) + self._created = time.time() + + def observe(self, amount: float) -> None: + """Observe the given amount. + + The amount is usually positive or zero. Negative values are + accepted but prevent current versions of Prometheus from + properly detecting counter resets in the sum of + observations. See + https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations + for details. + """ + self._raise_if_not_observable() + self._count.inc(1) + self._sum.inc(amount) + + def time(self) -> Timer: + """Time a block of code or function, and observe the duration in seconds. + + Can be used as a function decorator or context manager. + """ + return Timer(self, 'observe') + + def _child_samples(self) -> Iterable[Sample]: + samples = [ + Sample('_count', {}, self._count.get(), None, None), + Sample('_sum', {}, self._sum.get(), None, None), + ] + if _use_created: + samples.append(Sample('_created', {}, self._created, None, None)) + return tuple(samples) + + +class Histogram(MetricWrapperBase): + """A Histogram tracks the size and number of events in buckets. + + You can use Histograms for aggregatable calculation of quantiles. + + Example use cases: + - Response latency + - Request size + + Example for a Histogram: + + from prometheus_client import Histogram + + h = Histogram('request_size_bytes', 'Request size (bytes)') + h.observe(512) # Observe 512 (bytes) + + Example for a Histogram using time: + + from prometheus_client import Histogram + + REQUEST_TIME = Histogram('response_latency_seconds', 'Response latency (seconds)') + + @REQUEST_TIME.time() + def create_response(request): + '''A dummy function''' + time.sleep(1) + + Example of using the same Histogram object as a context manager: + + with REQUEST_TIME.time(): + pass # Logic to be timed + + The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds. + They can be overridden by passing `buckets` keyword argument to `Histogram`. + """ + _type = 'histogram' + _reserved_labelnames = ['le'] + DEFAULT_BUCKETS = (.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, INF) + + def __init__(self, + name: str, + documentation: str, + labelnames: Iterable[str] = (), + namespace: str = '', + subsystem: str = '', + unit: str = '', + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS, + ): + self._prepare_buckets(buckets) + super().__init__( + name=name, + documentation=documentation, + labelnames=labelnames, + namespace=namespace, + subsystem=subsystem, + unit=unit, + registry=registry, + _labelvalues=_labelvalues, + ) + self._kwargs['buckets'] = buckets + + def _prepare_buckets(self, source_buckets: Sequence[Union[float, str]]) -> None: + buckets = [float(b) for b in source_buckets] + if buckets != sorted(buckets): + # This is probably an error on the part of the user, + # so raise rather than sorting for them. + raise ValueError('Buckets not in sorted order') + if buckets and buckets[-1] != INF: + buckets.append(INF) + if len(buckets) < 2: + raise ValueError('Must have at least two buckets') + self._upper_bounds = buckets + + def _metric_init(self) -> None: + self._buckets: List[values.ValueClass] = [] + self._created = time.time() + bucket_labelnames = self._labelnames + ('le',) + self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation) + for b in self._upper_bounds: + self._buckets.append(values.ValueClass( + self._type, + self._name, + self._name + '_bucket', + bucket_labelnames, + self._labelvalues + (floatToGoString(b),), + self._documentation) + ) + + def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> None: + """Observe the given amount. + + The amount is usually positive or zero. Negative values are + accepted but prevent current versions of Prometheus from + properly detecting counter resets in the sum of + observations. See + https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations + for details. + """ + self._raise_if_not_observable() + self._sum.inc(amount) + for i, bound in enumerate(self._upper_bounds): + if amount <= bound: + self._buckets[i].inc(1) + if exemplar: + _validate_exemplar(exemplar) + self._buckets[i].set_exemplar(Exemplar(exemplar, amount, time.time())) + break + + def time(self) -> Timer: + """Time a block of code or function, and observe the duration in seconds. + + Can be used as a function decorator or context manager. + """ + return Timer(self, 'observe') + + def _child_samples(self) -> Iterable[Sample]: + samples = [] + acc = 0.0 + for i, bound in enumerate(self._upper_bounds): + acc += self._buckets[i].get() + samples.append(Sample('_bucket', {'le': floatToGoString(bound)}, acc, None, self._buckets[i].get_exemplar())) + samples.append(Sample('_count', {}, acc, None, None)) + if self._upper_bounds[0] >= 0: + samples.append(Sample('_sum', {}, self._sum.get(), None, None)) + if _use_created: + samples.append(Sample('_created', {}, self._created, None, None)) + return tuple(samples) + + +class Info(MetricWrapperBase): + """Info metric, key-value pairs. + + Examples of Info include: + - Build information + - Version information + - Potential target metadata + + Example usage: + from prometheus_client import Info + + i = Info('my_build', 'Description of info') + i.info({'version': '1.2.3', 'buildhost': 'foo@bar'}) + + Info metrics do not work in multiprocess mode. + """ + _type = 'info' + + def _metric_init(self): + self._labelname_set = set(self._labelnames) + self._lock = Lock() + self._value = {} + + def info(self, val: Dict[str, str]) -> None: + """Set info metric.""" + if self._labelname_set.intersection(val.keys()): + raise ValueError('Overlapping labels for Info metric, metric: {} child: {}'.format( + self._labelnames, val)) + if any(i is None for i in val.values()): + raise ValueError('Label value cannot be None') + with self._lock: + self._value = dict(val) + + def _child_samples(self) -> Iterable[Sample]: + with self._lock: + return (Sample('_info', self._value, 1.0, None, None),) + + +class Enum(MetricWrapperBase): + """Enum metric, which of a set of states is true. + + Example usage: + from prometheus_client import Enum + + e = Enum('task_state', 'Description of enum', + states=['starting', 'running', 'stopped']) + e.state('running') + + The first listed state will be the default. + Enum metrics do not work in multiprocess mode. + """ + _type = 'stateset' + + def __init__(self, + name: str, + documentation: str, + labelnames: Sequence[str] = (), + namespace: str = '', + subsystem: str = '', + unit: str = '', + registry: Optional[CollectorRegistry] = REGISTRY, + _labelvalues: Optional[Sequence[str]] = None, + states: Optional[Sequence[str]] = None, + ): + super().__init__( + name=name, + documentation=documentation, + labelnames=labelnames, + namespace=namespace, + subsystem=subsystem, + unit=unit, + registry=registry, + _labelvalues=_labelvalues, + ) + if name in labelnames: + raise ValueError(f'Overlapping labels for Enum metric: {name}') + if not states: + raise ValueError(f'No states provided for Enum metric: {name}') + self._kwargs['states'] = self._states = states + + def _metric_init(self) -> None: + self._value = 0 + self._lock = Lock() + + def state(self, state: str) -> None: + """Set enum metric state.""" + self._raise_if_not_observable() + with self._lock: + self._value = self._states.index(state) + + def _child_samples(self) -> Iterable[Sample]: + with self._lock: + return [ + Sample('', {self._name: s}, 1 if i == self._value else 0, None, None) + for i, s + in enumerate(self._states) + ] -- cgit v1.2.3