diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/prometheus_client/parser.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/prometheus_client/parser.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/prometheus_client/parser.py | 225 |
1 files changed, 225 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/prometheus_client/parser.py b/.venv/lib/python3.12/site-packages/prometheus_client/parser.py new file mode 100644 index 00000000..dc8e30df --- /dev/null +++ b/.venv/lib/python3.12/site-packages/prometheus_client/parser.py @@ -0,0 +1,225 @@ +import io as StringIO +import re +from typing import Dict, Iterable, List, Match, Optional, TextIO, Tuple + +from .metrics_core import Metric +from .samples import Sample + + +def text_string_to_metric_families(text: str) -> Iterable[Metric]: + """Parse Prometheus text format from a unicode string. + + See text_fd_to_metric_families. + """ + yield from text_fd_to_metric_families(StringIO.StringIO(text)) + + +ESCAPE_SEQUENCES = { + '\\\\': '\\', + '\\n': '\n', + '\\"': '"', +} + + +def replace_escape_sequence(match: Match[str]) -> str: + return ESCAPE_SEQUENCES[match.group(0)] + + +HELP_ESCAPING_RE = re.compile(r'\\[\\n]') +ESCAPING_RE = re.compile(r'\\[\\n"]') + + +def _replace_help_escaping(s: str) -> str: + return HELP_ESCAPING_RE.sub(replace_escape_sequence, s) + + +def _replace_escaping(s: str) -> str: + return ESCAPING_RE.sub(replace_escape_sequence, s) + + +def _is_character_escaped(s: str, charpos: int) -> bool: + num_bslashes = 0 + while (charpos > num_bslashes + and s[charpos - 1 - num_bslashes] == '\\'): + num_bslashes += 1 + return num_bslashes % 2 == 1 + + +def _parse_labels(labels_string: str) -> Dict[str, str]: + labels: Dict[str, str] = {} + # Return if we don't have valid labels + if "=" not in labels_string: + return labels + + escaping = False + if "\\" in labels_string: + escaping = True + + # Copy original labels + sub_labels = labels_string + try: + # Process one label at a time + while sub_labels: + # The label name is before the equal + value_start = sub_labels.index("=") + label_name = sub_labels[:value_start] + sub_labels = sub_labels[value_start + 1:].lstrip() + # Find the first quote after the equal + quote_start = sub_labels.index('"') + 1 + value_substr = sub_labels[quote_start:] + + # Find the last unescaped quote + i = 0 + while i < len(value_substr): + i = value_substr.index('"', i) + if not _is_character_escaped(value_substr, i): + break + i += 1 + + # The label value is between the first and last quote + quote_end = i + 1 + label_value = sub_labels[quote_start:quote_end] + # Replace escaping if needed + if escaping: + label_value = _replace_escaping(label_value) + labels[label_name.strip()] = label_value + + # Remove the processed label from the sub-slice for next iteration + sub_labels = sub_labels[quote_end + 1:] + next_comma = sub_labels.find(",") + 1 + sub_labels = sub_labels[next_comma:].lstrip() + + return labels + + except ValueError: + raise ValueError("Invalid labels: %s" % labels_string) + + +# If we have multiple values only consider the first +def _parse_value_and_timestamp(s: str) -> Tuple[float, Optional[float]]: + s = s.lstrip() + separator = " " + if separator not in s: + separator = "\t" + values = [value.strip() for value in s.split(separator) if value.strip()] + if not values: + return float(s), None + value = float(values[0]) + timestamp = (float(values[-1]) / 1000) if len(values) > 1 else None + return value, timestamp + + +def _parse_sample(text: str) -> Sample: + # Detect the labels in the text + try: + label_start, label_end = text.index("{"), text.rindex("}") + # The name is before the labels + name = text[:label_start].strip() + # We ignore the starting curly brace + label = text[label_start + 1:label_end] + # The value is after the label end (ignoring curly brace) + value, timestamp = _parse_value_and_timestamp(text[label_end + 1:]) + return Sample(name, _parse_labels(label), value, timestamp) + + # We don't have labels + except ValueError: + # Detect what separator is used + separator = " " + if separator not in text: + separator = "\t" + name_end = text.index(separator) + name = text[:name_end] + # The value is after the name + value, timestamp = _parse_value_and_timestamp(text[name_end:]) + return Sample(name, {}, value, timestamp) + + +def text_fd_to_metric_families(fd: TextIO) -> Iterable[Metric]: + """Parse Prometheus text format from a file descriptor. + + This is a laxer parser than the main Go parser, + so successful parsing does not imply that the parsed + text meets the specification. + + Yields Metric's. + """ + name = '' + documentation = '' + typ = 'untyped' + samples: List[Sample] = [] + allowed_names = [] + + def build_metric(name: str, documentation: str, typ: str, samples: List[Sample]) -> Metric: + # Munge counters into OpenMetrics representation + # used internally. + if typ == 'counter': + if name.endswith('_total'): + name = name[:-6] + else: + new_samples = [] + for s in samples: + new_samples.append(Sample(s[0] + '_total', *s[1:])) + samples = new_samples + metric = Metric(name, documentation, typ) + metric.samples = samples + return metric + + for line in fd: + line = line.strip() + + if line.startswith('#'): + parts = line.split(None, 3) + if len(parts) < 2: + continue + if parts[1] == 'HELP': + if parts[2] != name: + if name != '': + yield build_metric(name, documentation, typ, samples) + # New metric + name = parts[2] + typ = 'untyped' + samples = [] + allowed_names = [parts[2]] + if len(parts) == 4: + documentation = _replace_help_escaping(parts[3]) + else: + documentation = '' + elif parts[1] == 'TYPE': + if parts[2] != name: + if name != '': + yield build_metric(name, documentation, typ, samples) + # New metric + name = parts[2] + documentation = '' + samples = [] + typ = parts[3] + allowed_names = { + 'counter': [''], + 'gauge': [''], + 'summary': ['_count', '_sum', ''], + 'histogram': ['_count', '_sum', '_bucket'], + }.get(typ, ['']) + allowed_names = [name + n for n in allowed_names] + else: + # Ignore other comment tokens + pass + elif line == '': + # Ignore blank lines + pass + else: + sample = _parse_sample(line) + if sample.name not in allowed_names: + if name != '': + yield build_metric(name, documentation, typ, samples) + # New metric, yield immediately as untyped singleton + name = '' + documentation = '' + typ = 'untyped' + samples = [] + allowed_names = [] + yield build_metric(sample[0], documentation, typ, [sample]) + else: + samples.append(sample) + + if name != '': + yield build_metric(name, documentation, typ, samples) |