about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sentry_sdk/metrics.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sentry_sdk/metrics.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sentry_sdk/metrics.py965
1 files changed, 965 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sentry_sdk/metrics.py b/.venv/lib/python3.12/site-packages/sentry_sdk/metrics.py
new file mode 100644
index 00000000..4bdbc622
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sentry_sdk/metrics.py
@@ -0,0 +1,965 @@
+import io
+import os
+import random
+import re
+import sys
+import threading
+import time
+import warnings
+import zlib
+from abc import ABC, abstractmethod
+from contextlib import contextmanager
+from datetime import datetime, timezone
+from functools import wraps, partial
+
+import sentry_sdk
+from sentry_sdk.utils import (
+    ContextVar,
+    now,
+    nanosecond_time,
+    to_timestamp,
+    serialize_frame,
+    json_dumps,
+)
+from sentry_sdk.envelope import Envelope, Item
+from sentry_sdk.tracing import TransactionSource
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from typing import Any
+    from typing import Callable
+    from typing import Dict
+    from typing import Generator
+    from typing import Iterable
+    from typing import List
+    from typing import Optional
+    from typing import Set
+    from typing import Tuple
+    from typing import Union
+
+    from sentry_sdk._types import BucketKey
+    from sentry_sdk._types import DurationUnit
+    from sentry_sdk._types import FlushedMetricValue
+    from sentry_sdk._types import MeasurementUnit
+    from sentry_sdk._types import MetricMetaKey
+    from sentry_sdk._types import MetricTagValue
+    from sentry_sdk._types import MetricTags
+    from sentry_sdk._types import MetricTagsInternal
+    from sentry_sdk._types import MetricType
+    from sentry_sdk._types import MetricValue
+
+
+warnings.warn(
+    "The sentry_sdk.metrics module is deprecated and will be removed in the next major release. "
+    "Sentry will reject all metrics sent after October 7, 2024. "
+    "Learn more: https://sentry.zendesk.com/hc/en-us/articles/26369339769883-Upcoming-API-Changes-to-Metrics",
+    DeprecationWarning,
+    stacklevel=2,
+)
+
+_in_metrics = ContextVar("in_metrics", default=False)
+_set = set  # set is shadowed below
+
+GOOD_TRANSACTION_SOURCES = frozenset(
+    [
+        TransactionSource.ROUTE,
+        TransactionSource.VIEW,
+        TransactionSource.COMPONENT,
+        TransactionSource.TASK,
+    ]
+)
+
+_sanitize_unit = partial(re.compile(r"[^a-zA-Z0-9_]+").sub, "")
+_sanitize_metric_key = partial(re.compile(r"[^a-zA-Z0-9_\-.]+").sub, "_")
+_sanitize_tag_key = partial(re.compile(r"[^a-zA-Z0-9_\-.\/]+").sub, "")
+
+
+def _sanitize_tag_value(value):
+    # type: (str) -> str
+    table = str.maketrans(
+        {
+            "\n": "\\n",
+            "\r": "\\r",
+            "\t": "\\t",
+            "\\": "\\\\",
+            "|": "\\u{7c}",
+            ",": "\\u{2c}",
+        }
+    )
+    return value.translate(table)
+
+
+def get_code_location(stacklevel):
+    # type: (int) -> Optional[Dict[str, Any]]
+    try:
+        frm = sys._getframe(stacklevel)
+    except Exception:
+        return None
+
+    return serialize_frame(
+        frm, include_local_variables=False, include_source_context=True
+    )
+
+
+@contextmanager
+def recursion_protection():
+    # type: () -> Generator[bool, None, None]
+    """Enters recursion protection and returns the old flag."""
+    old_in_metrics = _in_metrics.get()
+    _in_metrics.set(True)
+    try:
+        yield old_in_metrics
+    finally:
+        _in_metrics.set(old_in_metrics)
+
+
+def metrics_noop(func):
+    # type: (Any) -> Any
+    """Convenient decorator that uses `recursion_protection` to
+    make a function a noop.
+    """
+
+    @wraps(func)
+    def new_func(*args, **kwargs):
+        # type: (*Any, **Any) -> Any
+        with recursion_protection() as in_metrics:
+            if not in_metrics:
+                return func(*args, **kwargs)
+
+    return new_func
+
+
+class Metric(ABC):
+    __slots__ = ()
+
+    @abstractmethod
+    def __init__(self, first):
+        # type: (MetricValue) -> None
+        pass
+
+    @property
+    @abstractmethod
+    def weight(self):
+        # type: () -> int
+        pass
+
+    @abstractmethod
+    def add(self, value):
+        # type: (MetricValue) -> None
+        pass
+
+    @abstractmethod
+    def serialize_value(self):
+        # type: () -> Iterable[FlushedMetricValue]
+        pass
+
+
+class CounterMetric(Metric):
+    __slots__ = ("value",)
+
+    def __init__(
+        self, first  # type: MetricValue
+    ):
+        # type: (...) -> None
+        self.value = float(first)
+
+    @property
+    def weight(self):
+        # type: (...) -> int
+        return 1
+
+    def add(
+        self, value  # type: MetricValue
+    ):
+        # type: (...) -> None
+        self.value += float(value)
+
+    def serialize_value(self):
+        # type: (...) -> Iterable[FlushedMetricValue]
+        return (self.value,)
+
+
+class GaugeMetric(Metric):
+    __slots__ = (
+        "last",
+        "min",
+        "max",
+        "sum",
+        "count",
+    )
+
+    def __init__(
+        self, first  # type: MetricValue
+    ):
+        # type: (...) -> None
+        first = float(first)
+        self.last = first
+        self.min = first
+        self.max = first
+        self.sum = first
+        self.count = 1
+
+    @property
+    def weight(self):
+        # type: (...) -> int
+        # Number of elements.
+        return 5
+
+    def add(
+        self, value  # type: MetricValue
+    ):
+        # type: (...) -> None
+        value = float(value)
+        self.last = value
+        self.min = min(self.min, value)
+        self.max = max(self.max, value)
+        self.sum += value
+        self.count += 1
+
+    def serialize_value(self):
+        # type: (...) -> Iterable[FlushedMetricValue]
+        return (
+            self.last,
+            self.min,
+            self.max,
+            self.sum,
+            self.count,
+        )
+
+
+class DistributionMetric(Metric):
+    __slots__ = ("value",)
+
+    def __init__(
+        self, first  # type: MetricValue
+    ):
+        # type(...) -> None
+        self.value = [float(first)]
+
+    @property
+    def weight(self):
+        # type: (...) -> int
+        return len(self.value)
+
+    def add(
+        self, value  # type: MetricValue
+    ):
+        # type: (...) -> None
+        self.value.append(float(value))
+
+    def serialize_value(self):
+        # type: (...) -> Iterable[FlushedMetricValue]
+        return self.value
+
+
+class SetMetric(Metric):
+    __slots__ = ("value",)
+
+    def __init__(
+        self, first  # type: MetricValue
+    ):
+        # type: (...) -> None
+        self.value = {first}
+
+    @property
+    def weight(self):
+        # type: (...) -> int
+        return len(self.value)
+
+    def add(
+        self, value  # type: MetricValue
+    ):
+        # type: (...) -> None
+        self.value.add(value)
+
+    def serialize_value(self):
+        # type: (...) -> Iterable[FlushedMetricValue]
+        def _hash(x):
+            # type: (MetricValue) -> int
+            if isinstance(x, str):
+                return zlib.crc32(x.encode("utf-8")) & 0xFFFFFFFF
+            return int(x)
+
+        return (_hash(value) for value in self.value)
+
+
+def _encode_metrics(flushable_buckets):
+    # type: (Iterable[Tuple[int, Dict[BucketKey, Metric]]]) -> bytes
+    out = io.BytesIO()
+    _write = out.write
+
+    # Note on sanitization: we intentionally sanitize in emission (serialization)
+    # and not during aggregation for performance reasons.  This means that the
+    # envelope can in fact have duplicate buckets stored.  This is acceptable for
+    # relay side emission and should not happen commonly.
+
+    for timestamp, buckets in flushable_buckets:
+        for bucket_key, metric in buckets.items():
+            metric_type, metric_name, metric_unit, metric_tags = bucket_key
+            metric_name = _sanitize_metric_key(metric_name)
+            metric_unit = _sanitize_unit(metric_unit)
+            _write(metric_name.encode("utf-8"))
+            _write(b"@")
+            _write(metric_unit.encode("utf-8"))
+
+            for serialized_value in metric.serialize_value():
+                _write(b":")
+                _write(str(serialized_value).encode("utf-8"))
+
+            _write(b"|")
+            _write(metric_type.encode("ascii"))
+
+            if metric_tags:
+                _write(b"|#")
+                first = True
+                for tag_key, tag_value in metric_tags:
+                    tag_key = _sanitize_tag_key(tag_key)
+                    if not tag_key:
+                        continue
+                    if first:
+                        first = False
+                    else:
+                        _write(b",")
+                    _write(tag_key.encode("utf-8"))
+                    _write(b":")
+                    _write(_sanitize_tag_value(tag_value).encode("utf-8"))
+
+            _write(b"|T")
+            _write(str(timestamp).encode("ascii"))
+            _write(b"\n")
+
+    return out.getvalue()
+
+
+def _encode_locations(timestamp, code_locations):
+    # type: (int, Iterable[Tuple[MetricMetaKey, Dict[str, Any]]]) -> bytes
+    mapping = {}  # type: Dict[str, List[Any]]
+
+    for key, loc in code_locations:
+        metric_type, name, unit = key
+        mri = "{}:{}@{}".format(
+            metric_type, _sanitize_metric_key(name), _sanitize_unit(unit)
+        )
+
+        loc["type"] = "location"
+        mapping.setdefault(mri, []).append(loc)
+
+    return json_dumps({"timestamp": timestamp, "mapping": mapping})
+
+
+METRIC_TYPES = {
+    "c": CounterMetric,
+    "g": GaugeMetric,
+    "d": DistributionMetric,
+    "s": SetMetric,
+}  # type: dict[MetricType, type[Metric]]
+
+# some of these are dumb
+TIMING_FUNCTIONS = {
+    "nanosecond": nanosecond_time,
+    "microsecond": lambda: nanosecond_time() / 1000.0,
+    "millisecond": lambda: nanosecond_time() / 1000000.0,
+    "second": now,
+    "minute": lambda: now() / 60.0,
+    "hour": lambda: now() / 3600.0,
+    "day": lambda: now() / 3600.0 / 24.0,
+    "week": lambda: now() / 3600.0 / 24.0 / 7.0,
+}
+
+
+class LocalAggregator:
+    __slots__ = ("_measurements",)
+
+    def __init__(self):
+        # type: (...) -> None
+        self._measurements = (
+            {}
+        )  # type: Dict[Tuple[str, MetricTagsInternal], Tuple[float, float, int, float]]
+
+    def add(
+        self,
+        ty,  # type: MetricType
+        key,  # type: str
+        value,  # type: float
+        unit,  # type: MeasurementUnit
+        tags,  # type: MetricTagsInternal
+    ):
+        # type: (...) -> None
+        export_key = "%s:%s@%s" % (ty, key, unit)
+        bucket_key = (export_key, tags)
+
+        old = self._measurements.get(bucket_key)
+        if old is not None:
+            v_min, v_max, v_count, v_sum = old
+            v_min = min(v_min, value)
+            v_max = max(v_max, value)
+            v_count += 1
+            v_sum += value
+        else:
+            v_min = v_max = v_sum = value
+            v_count = 1
+        self._measurements[bucket_key] = (v_min, v_max, v_count, v_sum)
+
+    def to_json(self):
+        # type: (...) -> Dict[str, Any]
+        rv = {}  # type: Any
+        for (export_key, tags), (
+            v_min,
+            v_max,
+            v_count,
+            v_sum,
+        ) in self._measurements.items():
+            rv.setdefault(export_key, []).append(
+                {
+                    "tags": _tags_to_dict(tags),
+                    "min": v_min,
+                    "max": v_max,
+                    "count": v_count,
+                    "sum": v_sum,
+                }
+            )
+        return rv
+
+
+class MetricsAggregator:
+    ROLLUP_IN_SECONDS = 10.0
+    MAX_WEIGHT = 100000
+    FLUSHER_SLEEP_TIME = 5.0
+
+    def __init__(
+        self,
+        capture_func,  # type: Callable[[Envelope], None]
+        enable_code_locations=False,  # type: bool
+    ):
+        # type: (...) -> None
+        self.buckets = {}  # type: Dict[int, Any]
+        self._enable_code_locations = enable_code_locations
+        self._seen_locations = _set()  # type: Set[Tuple[int, MetricMetaKey]]
+        self._pending_locations = {}  # type: Dict[int, List[Tuple[MetricMetaKey, Any]]]
+        self._buckets_total_weight = 0
+        self._capture_func = capture_func
+        self._running = True
+        self._lock = threading.Lock()
+
+        self._flush_event = threading.Event()  # type: threading.Event
+        self._force_flush = False
+
+        # The aggregator shifts its flushing by up to an entire rollup window to
+        # avoid multiple clients trampling on end of a 10 second window as all the
+        # buckets are anchored to multiples of ROLLUP seconds.  We randomize this
+        # number once per aggregator boot to achieve some level of offsetting
+        # across a fleet of deployed SDKs.  Relay itself will also apply independent
+        # jittering.
+        self._flush_shift = random.random() * self.ROLLUP_IN_SECONDS
+
+        self._flusher = None  # type: Optional[threading.Thread]
+        self._flusher_pid = None  # type: Optional[int]
+
+    def _ensure_thread(self):
+        # type: (...) -> bool
+        """For forking processes we might need to restart this thread.
+        This ensures that our process actually has that thread running.
+        """
+        if not self._running:
+            return False
+
+        pid = os.getpid()
+        if self._flusher_pid == pid:
+            return True
+
+        with self._lock:
+            # Recheck to make sure another thread didn't get here and start the
+            # the flusher in the meantime
+            if self._flusher_pid == pid:
+                return True
+
+            self._flusher_pid = pid
+
+            self._flusher = threading.Thread(target=self._flush_loop)
+            self._flusher.daemon = True
+
+            try:
+                self._flusher.start()
+            except RuntimeError:
+                # Unfortunately at this point the interpreter is in a state that no
+                # longer allows us to spawn a thread and we have to bail.
+                self._running = False
+                return False
+
+        return True
+
+    def _flush_loop(self):
+        # type: (...) -> None
+        _in_metrics.set(True)
+        while self._running or self._force_flush:
+            if self._running:
+                self._flush_event.wait(self.FLUSHER_SLEEP_TIME)
+            self._flush()
+
+    def _flush(self):
+        # type: (...) -> None
+        self._emit(self._flushable_buckets(), self._flushable_locations())
+
+    def _flushable_buckets(self):
+        # type: (...) -> (Iterable[Tuple[int, Dict[BucketKey, Metric]]])
+        with self._lock:
+            force_flush = self._force_flush
+            cutoff = time.time() - self.ROLLUP_IN_SECONDS - self._flush_shift
+            flushable_buckets = ()  # type: Iterable[Tuple[int, Dict[BucketKey, Metric]]]
+            weight_to_remove = 0
+
+            if force_flush:
+                flushable_buckets = self.buckets.items()
+                self.buckets = {}
+                self._buckets_total_weight = 0
+                self._force_flush = False
+            else:
+                flushable_buckets = []
+                for buckets_timestamp, buckets in self.buckets.items():
+                    # If the timestamp of the bucket is newer that the rollup we want to skip it.
+                    if buckets_timestamp <= cutoff:
+                        flushable_buckets.append((buckets_timestamp, buckets))
+
+                # We will clear the elements while holding the lock, in order to avoid requesting it downstream again.
+                for buckets_timestamp, buckets in flushable_buckets:
+                    for metric in buckets.values():
+                        weight_to_remove += metric.weight
+                    del self.buckets[buckets_timestamp]
+
+                self._buckets_total_weight -= weight_to_remove
+
+        return flushable_buckets
+
+    def _flushable_locations(self):
+        # type: (...) -> Dict[int, List[Tuple[MetricMetaKey, Dict[str, Any]]]]
+        with self._lock:
+            locations = self._pending_locations
+            self._pending_locations = {}
+        return locations
+
+    @metrics_noop
+    def add(
+        self,
+        ty,  # type: MetricType
+        key,  # type: str
+        value,  # type: MetricValue
+        unit,  # type: MeasurementUnit
+        tags,  # type: Optional[MetricTags]
+        timestamp=None,  # type: Optional[Union[float, datetime]]
+        local_aggregator=None,  # type: Optional[LocalAggregator]
+        stacklevel=0,  # type: Optional[int]
+    ):
+        # type: (...) -> None
+        if not self._ensure_thread() or self._flusher is None:
+            return None
+
+        if timestamp is None:
+            timestamp = time.time()
+        elif isinstance(timestamp, datetime):
+            timestamp = to_timestamp(timestamp)
+
+        bucket_timestamp = int(
+            (timestamp // self.ROLLUP_IN_SECONDS) * self.ROLLUP_IN_SECONDS
+        )
+        serialized_tags = _serialize_tags(tags)
+        bucket_key = (
+            ty,
+            key,
+            unit,
+            serialized_tags,
+        )
+
+        with self._lock:
+            local_buckets = self.buckets.setdefault(bucket_timestamp, {})
+            metric = local_buckets.get(bucket_key)
+            if metric is not None:
+                previous_weight = metric.weight
+                metric.add(value)
+            else:
+                metric = local_buckets[bucket_key] = METRIC_TYPES[ty](value)
+                previous_weight = 0
+
+            added = metric.weight - previous_weight
+
+            if stacklevel is not None:
+                self.record_code_location(ty, key, unit, stacklevel + 2, timestamp)
+
+        # Given the new weight we consider whether we want to force flush.
+        self._consider_force_flush()
+
+        # For sets, we only record that a value has been added to the set but not which one.
+        # See develop docs: https://develop.sentry.dev/sdk/metrics/#sets
+        if local_aggregator is not None:
+            local_value = float(added if ty == "s" else value)
+            local_aggregator.add(ty, key, local_value, unit, serialized_tags)
+
+    def record_code_location(
+        self,
+        ty,  # type: MetricType
+        key,  # type: str
+        unit,  # type: MeasurementUnit
+        stacklevel,  # type: int
+        timestamp=None,  # type: Optional[float]
+    ):
+        # type: (...) -> None
+        if not self._enable_code_locations:
+            return
+        if timestamp is None:
+            timestamp = time.time()
+        meta_key = (ty, key, unit)
+        start_of_day = datetime.fromtimestamp(timestamp, timezone.utc).replace(
+            hour=0, minute=0, second=0, microsecond=0, tzinfo=None
+        )
+        start_of_day = int(to_timestamp(start_of_day))
+
+        if (start_of_day, meta_key) not in self._seen_locations:
+            self._seen_locations.add((start_of_day, meta_key))
+            loc = get_code_location(stacklevel + 3)
+            if loc is not None:
+                # Group metadata by day to make flushing more efficient.
+                # There needs to be one envelope item per timestamp.
+                self._pending_locations.setdefault(start_of_day, []).append(
+                    (meta_key, loc)
+                )
+
+    @metrics_noop
+    def need_code_location(
+        self,
+        ty,  # type: MetricType
+        key,  # type: str
+        unit,  # type: MeasurementUnit
+        timestamp,  # type: float
+    ):
+        # type: (...) -> bool
+        if self._enable_code_locations:
+            return False
+        meta_key = (ty, key, unit)
+        start_of_day = datetime.fromtimestamp(timestamp, timezone.utc).replace(
+            hour=0, minute=0, second=0, microsecond=0, tzinfo=None
+        )
+        start_of_day = int(to_timestamp(start_of_day))
+        return (start_of_day, meta_key) not in self._seen_locations
+
+    def kill(self):
+        # type: (...) -> None
+        if self._flusher is None:
+            return
+
+        self._running = False
+        self._flush_event.set()
+        self._flusher = None
+
+    @metrics_noop
+    def flush(self):
+        # type: (...) -> None
+        self._force_flush = True
+        self._flush()
+
+    def _consider_force_flush(self):
+        # type: (...) -> None
+        # It's important to acquire a lock around this method, since it will touch shared data structures.
+        total_weight = len(self.buckets) + self._buckets_total_weight
+        if total_weight >= self.MAX_WEIGHT:
+            self._force_flush = True
+            self._flush_event.set()
+
+    def _emit(
+        self,
+        flushable_buckets,  # type: (Iterable[Tuple[int, Dict[BucketKey, Metric]]])
+        code_locations,  # type: Dict[int, List[Tuple[MetricMetaKey, Dict[str, Any]]]]
+    ):
+        # type: (...) -> Optional[Envelope]
+        envelope = Envelope()
+
+        if flushable_buckets:
+            encoded_metrics = _encode_metrics(flushable_buckets)
+            envelope.add_item(Item(payload=encoded_metrics, type="statsd"))
+
+        for timestamp, locations in code_locations.items():
+            encoded_locations = _encode_locations(timestamp, locations)
+            envelope.add_item(Item(payload=encoded_locations, type="metric_meta"))
+
+        if envelope.items:
+            self._capture_func(envelope)
+            return envelope
+        return None
+
+
+def _serialize_tags(
+    tags,  # type: Optional[MetricTags]
+):
+    # type: (...) -> MetricTagsInternal
+    if not tags:
+        return ()
+
+    rv = []
+    for key, value in tags.items():
+        # If the value is a collection, we want to flatten it.
+        if isinstance(value, (list, tuple)):
+            for inner_value in value:
+                if inner_value is not None:
+                    rv.append((key, str(inner_value)))
+        elif value is not None:
+            rv.append((key, str(value)))
+
+    # It's very important to sort the tags in order to obtain the
+    # same bucket key.
+    return tuple(sorted(rv))
+
+
+def _tags_to_dict(tags):
+    # type: (MetricTagsInternal) -> Dict[str, Any]
+    rv = {}  # type: Dict[str, Any]
+    for tag_name, tag_value in tags:
+        old_value = rv.get(tag_name)
+        if old_value is not None:
+            if isinstance(old_value, list):
+                old_value.append(tag_value)
+            else:
+                rv[tag_name] = [old_value, tag_value]
+        else:
+            rv[tag_name] = tag_value
+    return rv
+
+
+def _get_aggregator():
+    # type: () -> Optional[MetricsAggregator]
+    client = sentry_sdk.get_client()
+    return (
+        client.metrics_aggregator
+        if client.is_active() and client.metrics_aggregator is not None
+        else None
+    )
+
+
+def _get_aggregator_and_update_tags(key, value, unit, tags):
+    # type: (str, Optional[MetricValue], MeasurementUnit, Optional[MetricTags]) -> Tuple[Optional[MetricsAggregator], Optional[LocalAggregator], Optional[MetricTags]]
+    client = sentry_sdk.get_client()
+    if not client.is_active() or client.metrics_aggregator is None:
+        return None, None, tags
+
+    updated_tags = dict(tags or ())  # type: Dict[str, MetricTagValue]
+    updated_tags.setdefault("release", client.options["release"])
+    updated_tags.setdefault("environment", client.options["environment"])
+
+    scope = sentry_sdk.get_current_scope()
+    local_aggregator = None
+
+    # We go with the low-level API here to access transaction information as
+    # this one is the same between just errors and errors + performance
+    transaction_source = scope._transaction_info.get("source")
+    if transaction_source in GOOD_TRANSACTION_SOURCES:
+        transaction_name = scope._transaction
+        if transaction_name:
+            updated_tags.setdefault("transaction", transaction_name)
+        if scope._span is not None:
+            local_aggregator = scope._span._get_local_aggregator()
+
+    experiments = client.options.get("_experiments", {})
+    before_emit_callback = experiments.get("before_emit_metric")
+    if before_emit_callback is not None:
+        with recursion_protection() as in_metrics:
+            if not in_metrics:
+                if not before_emit_callback(key, value, unit, updated_tags):
+                    return None, None, updated_tags
+
+    return client.metrics_aggregator, local_aggregator, updated_tags
+
+
+def increment(
+    key,  # type: str
+    value=1.0,  # type: float
+    unit="none",  # type: MeasurementUnit
+    tags=None,  # type: Optional[MetricTags]
+    timestamp=None,  # type: Optional[Union[float, datetime]]
+    stacklevel=0,  # type: int
+):
+    # type: (...) -> None
+    """Increments a counter."""
+    aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
+        key, value, unit, tags
+    )
+    if aggregator is not None:
+        aggregator.add(
+            "c", key, value, unit, tags, timestamp, local_aggregator, stacklevel
+        )
+
+
+# alias as incr is relatively common in python
+incr = increment
+
+
+class _Timing:
+    def __init__(
+        self,
+        key,  # type: str
+        tags,  # type: Optional[MetricTags]
+        timestamp,  # type: Optional[Union[float, datetime]]
+        value,  # type: Optional[float]
+        unit,  # type: DurationUnit
+        stacklevel,  # type: int
+    ):
+        # type: (...) -> None
+        self.key = key
+        self.tags = tags
+        self.timestamp = timestamp
+        self.value = value
+        self.unit = unit
+        self.entered = None  # type: Optional[float]
+        self._span = None  # type: Optional[sentry_sdk.tracing.Span]
+        self.stacklevel = stacklevel
+
+    def _validate_invocation(self, context):
+        # type: (str) -> None
+        if self.value is not None:
+            raise TypeError(
+                "cannot use timing as %s when a value is provided" % context
+            )
+
+    def __enter__(self):
+        # type: (...) -> _Timing
+        self.entered = TIMING_FUNCTIONS[self.unit]()
+        self._validate_invocation("context-manager")
+        self._span = sentry_sdk.start_span(op="metric.timing", name=self.key)
+        if self.tags:
+            for key, value in self.tags.items():
+                if isinstance(value, (tuple, list)):
+                    value = ",".join(sorted(map(str, value)))
+                self._span.set_tag(key, value)
+        self._span.__enter__()
+
+        # report code locations here for better accuracy
+        aggregator = _get_aggregator()
+        if aggregator is not None:
+            aggregator.record_code_location("d", self.key, self.unit, self.stacklevel)
+
+        return self
+
+    def __exit__(self, exc_type, exc_value, tb):
+        # type: (Any, Any, Any) -> None
+        assert self._span, "did not enter"
+        aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
+            self.key,
+            self.value,
+            self.unit,
+            self.tags,
+        )
+        if aggregator is not None:
+            elapsed = TIMING_FUNCTIONS[self.unit]() - self.entered  # type: ignore
+            aggregator.add(
+                "d",
+                self.key,
+                elapsed,
+                self.unit,
+                tags,
+                self.timestamp,
+                local_aggregator,
+                None,  # code locations are reported in __enter__
+            )
+
+        self._span.__exit__(exc_type, exc_value, tb)
+        self._span = None
+
+    def __call__(self, f):
+        # type: (Any) -> Any
+        self._validate_invocation("decorator")
+
+        @wraps(f)
+        def timed_func(*args, **kwargs):
+            # type: (*Any, **Any) -> Any
+            with timing(
+                key=self.key,
+                tags=self.tags,
+                timestamp=self.timestamp,
+                unit=self.unit,
+                stacklevel=self.stacklevel + 1,
+            ):
+                return f(*args, **kwargs)
+
+        return timed_func
+
+
+def timing(
+    key,  # type: str
+    value=None,  # type: Optional[float]
+    unit="second",  # type: DurationUnit
+    tags=None,  # type: Optional[MetricTags]
+    timestamp=None,  # type: Optional[Union[float, datetime]]
+    stacklevel=0,  # type: int
+):
+    # type: (...) -> _Timing
+    """Emits a distribution with the time it takes to run the given code block.
+
+    This method supports three forms of invocation:
+
+    - when a `value` is provided, it functions similar to `distribution` but with
+    - it can be used as a context manager
+    - it can be used as a decorator
+    """
+    if value is not None:
+        aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
+            key, value, unit, tags
+        )
+        if aggregator is not None:
+            aggregator.add(
+                "d", key, value, unit, tags, timestamp, local_aggregator, stacklevel
+            )
+    return _Timing(key, tags, timestamp, value, unit, stacklevel)
+
+
+def distribution(
+    key,  # type: str
+    value,  # type: float
+    unit="none",  # type: MeasurementUnit
+    tags=None,  # type: Optional[MetricTags]
+    timestamp=None,  # type: Optional[Union[float, datetime]]
+    stacklevel=0,  # type: int
+):
+    # type: (...) -> None
+    """Emits a distribution."""
+    aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
+        key, value, unit, tags
+    )
+    if aggregator is not None:
+        aggregator.add(
+            "d", key, value, unit, tags, timestamp, local_aggregator, stacklevel
+        )
+
+
+def set(
+    key,  # type: str
+    value,  # type: Union[int, str]
+    unit="none",  # type: MeasurementUnit
+    tags=None,  # type: Optional[MetricTags]
+    timestamp=None,  # type: Optional[Union[float, datetime]]
+    stacklevel=0,  # type: int
+):
+    # type: (...) -> None
+    """Emits a set."""
+    aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
+        key, value, unit, tags
+    )
+    if aggregator is not None:
+        aggregator.add(
+            "s", key, value, unit, tags, timestamp, local_aggregator, stacklevel
+        )
+
+
+def gauge(
+    key,  # type: str
+    value,  # type: float
+    unit="none",  # type: MeasurementUnit
+    tags=None,  # type: Optional[MetricTags]
+    timestamp=None,  # type: Optional[Union[float, datetime]]
+    stacklevel=0,  # type: int
+):
+    # type: (...) -> None
+    """Emits a gauge."""
+    aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
+        key, value, unit, tags
+    )
+    if aggregator is not None:
+        aggregator.add(
+            "g", key, value, unit, tags, timestamp, local_aggregator, stacklevel
+        )