about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/prometheus_client/parser.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/prometheus_client/parser.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/prometheus_client/parser.py')
-rw-r--r--.venv/lib/python3.12/site-packages/prometheus_client/parser.py225
1 files changed, 225 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/prometheus_client/parser.py b/.venv/lib/python3.12/site-packages/prometheus_client/parser.py
new file mode 100644
index 00000000..dc8e30df
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/prometheus_client/parser.py
@@ -0,0 +1,225 @@
+import io as StringIO
+import re
+from typing import Dict, Iterable, List, Match, Optional, TextIO, Tuple
+
+from .metrics_core import Metric
+from .samples import Sample
+
+
+def text_string_to_metric_families(text: str) -> Iterable[Metric]:
+    """Parse Prometheus text format from a unicode string.
+
+    See text_fd_to_metric_families.
+    """
+    yield from text_fd_to_metric_families(StringIO.StringIO(text))
+
+
+ESCAPE_SEQUENCES = {
+    '\\\\': '\\',
+    '\\n': '\n',
+    '\\"': '"',
+}
+
+
+def replace_escape_sequence(match: Match[str]) -> str:
+    return ESCAPE_SEQUENCES[match.group(0)]
+
+
+HELP_ESCAPING_RE = re.compile(r'\\[\\n]')
+ESCAPING_RE = re.compile(r'\\[\\n"]')
+
+
+def _replace_help_escaping(s: str) -> str:
+    return HELP_ESCAPING_RE.sub(replace_escape_sequence, s)
+
+
+def _replace_escaping(s: str) -> str:
+    return ESCAPING_RE.sub(replace_escape_sequence, s)
+
+
+def _is_character_escaped(s: str, charpos: int) -> bool:
+    num_bslashes = 0
+    while (charpos > num_bslashes
+           and s[charpos - 1 - num_bslashes] == '\\'):
+        num_bslashes += 1
+    return num_bslashes % 2 == 1
+
+
+def _parse_labels(labels_string: str) -> Dict[str, str]:
+    labels: Dict[str, str] = {}
+    # Return if we don't have valid labels
+    if "=" not in labels_string:
+        return labels
+
+    escaping = False
+    if "\\" in labels_string:
+        escaping = True
+
+    # Copy original labels
+    sub_labels = labels_string
+    try:
+        # Process one label at a time
+        while sub_labels:
+            # The label name is before the equal
+            value_start = sub_labels.index("=")
+            label_name = sub_labels[:value_start]
+            sub_labels = sub_labels[value_start + 1:].lstrip()
+            # Find the first quote after the equal
+            quote_start = sub_labels.index('"') + 1
+            value_substr = sub_labels[quote_start:]
+
+            # Find the last unescaped quote
+            i = 0
+            while i < len(value_substr):
+                i = value_substr.index('"', i)
+                if not _is_character_escaped(value_substr, i):
+                    break
+                i += 1
+
+            # The label value is between the first and last quote
+            quote_end = i + 1
+            label_value = sub_labels[quote_start:quote_end]
+            # Replace escaping if needed
+            if escaping:
+                label_value = _replace_escaping(label_value)
+            labels[label_name.strip()] = label_value
+
+            # Remove the processed label from the sub-slice for next iteration
+            sub_labels = sub_labels[quote_end + 1:]
+            next_comma = sub_labels.find(",") + 1
+            sub_labels = sub_labels[next_comma:].lstrip()
+
+        return labels
+
+    except ValueError:
+        raise ValueError("Invalid labels: %s" % labels_string)
+
+
+# If we have multiple values only consider the first
+def _parse_value_and_timestamp(s: str) -> Tuple[float, Optional[float]]:
+    s = s.lstrip()
+    separator = " "
+    if separator not in s:
+        separator = "\t"
+    values = [value.strip() for value in s.split(separator) if value.strip()]
+    if not values:
+        return float(s), None
+    value = float(values[0])
+    timestamp = (float(values[-1]) / 1000) if len(values) > 1 else None
+    return value, timestamp
+
+
+def _parse_sample(text: str) -> Sample:
+    # Detect the labels in the text
+    try:
+        label_start, label_end = text.index("{"), text.rindex("}")
+        # The name is before the labels
+        name = text[:label_start].strip()
+        # We ignore the starting curly brace
+        label = text[label_start + 1:label_end]
+        # The value is after the label end (ignoring curly brace)
+        value, timestamp = _parse_value_and_timestamp(text[label_end + 1:])
+        return Sample(name, _parse_labels(label), value, timestamp)
+
+    # We don't have labels
+    except ValueError:
+        # Detect what separator is used
+        separator = " "
+        if separator not in text:
+            separator = "\t"
+        name_end = text.index(separator)
+        name = text[:name_end]
+        # The value is after the name
+        value, timestamp = _parse_value_and_timestamp(text[name_end:])
+        return Sample(name, {}, value, timestamp)
+
+
+def text_fd_to_metric_families(fd: TextIO) -> Iterable[Metric]:
+    """Parse Prometheus text format from a file descriptor.
+
+    This is a laxer parser than the main Go parser,
+    so successful parsing does not imply that the parsed
+    text meets the specification.
+
+    Yields Metric's.
+    """
+    name = ''
+    documentation = ''
+    typ = 'untyped'
+    samples: List[Sample] = []
+    allowed_names = []
+
+    def build_metric(name: str, documentation: str, typ: str, samples: List[Sample]) -> Metric:
+        # Munge counters into OpenMetrics representation
+        # used internally.
+        if typ == 'counter':
+            if name.endswith('_total'):
+                name = name[:-6]
+            else:
+                new_samples = []
+                for s in samples:
+                    new_samples.append(Sample(s[0] + '_total', *s[1:]))
+                    samples = new_samples
+        metric = Metric(name, documentation, typ)
+        metric.samples = samples
+        return metric
+
+    for line in fd:
+        line = line.strip()
+
+        if line.startswith('#'):
+            parts = line.split(None, 3)
+            if len(parts) < 2:
+                continue
+            if parts[1] == 'HELP':
+                if parts[2] != name:
+                    if name != '':
+                        yield build_metric(name, documentation, typ, samples)
+                    # New metric
+                    name = parts[2]
+                    typ = 'untyped'
+                    samples = []
+                    allowed_names = [parts[2]]
+                if len(parts) == 4:
+                    documentation = _replace_help_escaping(parts[3])
+                else:
+                    documentation = ''
+            elif parts[1] == 'TYPE':
+                if parts[2] != name:
+                    if name != '':
+                        yield build_metric(name, documentation, typ, samples)
+                    # New metric
+                    name = parts[2]
+                    documentation = ''
+                    samples = []
+                typ = parts[3]
+                allowed_names = {
+                    'counter': [''],
+                    'gauge': [''],
+                    'summary': ['_count', '_sum', ''],
+                    'histogram': ['_count', '_sum', '_bucket'],
+                }.get(typ, [''])
+                allowed_names = [name + n for n in allowed_names]
+            else:
+                # Ignore other comment tokens
+                pass
+        elif line == '':
+            # Ignore blank lines
+            pass
+        else:
+            sample = _parse_sample(line)
+            if sample.name not in allowed_names:
+                if name != '':
+                    yield build_metric(name, documentation, typ, samples)
+                # New metric, yield immediately as untyped singleton
+                name = ''
+                documentation = ''
+                typ = 'untyped'
+                samples = []
+                allowed_names = []
+                yield build_metric(sample[0], documentation, typ, [sample])
+            else:
+                samples.append(sample)
+
+    if name != '':
+        yield build_metric(name, documentation, typ, samples)