aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/prometheus_client/parser.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/prometheus_client/parser.py')
-rw-r--r--.venv/lib/python3.12/site-packages/prometheus_client/parser.py225
1 files changed, 225 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/prometheus_client/parser.py b/.venv/lib/python3.12/site-packages/prometheus_client/parser.py
new file mode 100644
index 00000000..dc8e30df
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/prometheus_client/parser.py
@@ -0,0 +1,225 @@
+import io as StringIO
+import re
+from typing import Dict, Iterable, List, Match, Optional, TextIO, Tuple
+
+from .metrics_core import Metric
+from .samples import Sample
+
+
+def text_string_to_metric_families(text: str) -> Iterable[Metric]:
+ """Parse Prometheus text format from a unicode string.
+
+ See text_fd_to_metric_families.
+ """
+ yield from text_fd_to_metric_families(StringIO.StringIO(text))
+
+
+ESCAPE_SEQUENCES = {
+ '\\\\': '\\',
+ '\\n': '\n',
+ '\\"': '"',
+}
+
+
+def replace_escape_sequence(match: Match[str]) -> str:
+ return ESCAPE_SEQUENCES[match.group(0)]
+
+
+HELP_ESCAPING_RE = re.compile(r'\\[\\n]')
+ESCAPING_RE = re.compile(r'\\[\\n"]')
+
+
+def _replace_help_escaping(s: str) -> str:
+ return HELP_ESCAPING_RE.sub(replace_escape_sequence, s)
+
+
+def _replace_escaping(s: str) -> str:
+ return ESCAPING_RE.sub(replace_escape_sequence, s)
+
+
+def _is_character_escaped(s: str, charpos: int) -> bool:
+ num_bslashes = 0
+ while (charpos > num_bslashes
+ and s[charpos - 1 - num_bslashes] == '\\'):
+ num_bslashes += 1
+ return num_bslashes % 2 == 1
+
+
+def _parse_labels(labels_string: str) -> Dict[str, str]:
+ labels: Dict[str, str] = {}
+ # Return if we don't have valid labels
+ if "=" not in labels_string:
+ return labels
+
+ escaping = False
+ if "\\" in labels_string:
+ escaping = True
+
+ # Copy original labels
+ sub_labels = labels_string
+ try:
+ # Process one label at a time
+ while sub_labels:
+ # The label name is before the equal
+ value_start = sub_labels.index("=")
+ label_name = sub_labels[:value_start]
+ sub_labels = sub_labels[value_start + 1:].lstrip()
+ # Find the first quote after the equal
+ quote_start = sub_labels.index('"') + 1
+ value_substr = sub_labels[quote_start:]
+
+ # Find the last unescaped quote
+ i = 0
+ while i < len(value_substr):
+ i = value_substr.index('"', i)
+ if not _is_character_escaped(value_substr, i):
+ break
+ i += 1
+
+ # The label value is between the first and last quote
+ quote_end = i + 1
+ label_value = sub_labels[quote_start:quote_end]
+ # Replace escaping if needed
+ if escaping:
+ label_value = _replace_escaping(label_value)
+ labels[label_name.strip()] = label_value
+
+ # Remove the processed label from the sub-slice for next iteration
+ sub_labels = sub_labels[quote_end + 1:]
+ next_comma = sub_labels.find(",") + 1
+ sub_labels = sub_labels[next_comma:].lstrip()
+
+ return labels
+
+ except ValueError:
+ raise ValueError("Invalid labels: %s" % labels_string)
+
+
+# If we have multiple values only consider the first
+def _parse_value_and_timestamp(s: str) -> Tuple[float, Optional[float]]:
+ s = s.lstrip()
+ separator = " "
+ if separator not in s:
+ separator = "\t"
+ values = [value.strip() for value in s.split(separator) if value.strip()]
+ if not values:
+ return float(s), None
+ value = float(values[0])
+ timestamp = (float(values[-1]) / 1000) if len(values) > 1 else None
+ return value, timestamp
+
+
+def _parse_sample(text: str) -> Sample:
+ # Detect the labels in the text
+ try:
+ label_start, label_end = text.index("{"), text.rindex("}")
+ # The name is before the labels
+ name = text[:label_start].strip()
+ # We ignore the starting curly brace
+ label = text[label_start + 1:label_end]
+ # The value is after the label end (ignoring curly brace)
+ value, timestamp = _parse_value_and_timestamp(text[label_end + 1:])
+ return Sample(name, _parse_labels(label), value, timestamp)
+
+ # We don't have labels
+ except ValueError:
+ # Detect what separator is used
+ separator = " "
+ if separator not in text:
+ separator = "\t"
+ name_end = text.index(separator)
+ name = text[:name_end]
+ # The value is after the name
+ value, timestamp = _parse_value_and_timestamp(text[name_end:])
+ return Sample(name, {}, value, timestamp)
+
+
+def text_fd_to_metric_families(fd: TextIO) -> Iterable[Metric]:
+ """Parse Prometheus text format from a file descriptor.
+
+ This is a laxer parser than the main Go parser,
+ so successful parsing does not imply that the parsed
+ text meets the specification.
+
+ Yields Metric's.
+ """
+ name = ''
+ documentation = ''
+ typ = 'untyped'
+ samples: List[Sample] = []
+ allowed_names = []
+
+ def build_metric(name: str, documentation: str, typ: str, samples: List[Sample]) -> Metric:
+ # Munge counters into OpenMetrics representation
+ # used internally.
+ if typ == 'counter':
+ if name.endswith('_total'):
+ name = name[:-6]
+ else:
+ new_samples = []
+ for s in samples:
+ new_samples.append(Sample(s[0] + '_total', *s[1:]))
+ samples = new_samples
+ metric = Metric(name, documentation, typ)
+ metric.samples = samples
+ return metric
+
+ for line in fd:
+ line = line.strip()
+
+ if line.startswith('#'):
+ parts = line.split(None, 3)
+ if len(parts) < 2:
+ continue
+ if parts[1] == 'HELP':
+ if parts[2] != name:
+ if name != '':
+ yield build_metric(name, documentation, typ, samples)
+ # New metric
+ name = parts[2]
+ typ = 'untyped'
+ samples = []
+ allowed_names = [parts[2]]
+ if len(parts) == 4:
+ documentation = _replace_help_escaping(parts[3])
+ else:
+ documentation = ''
+ elif parts[1] == 'TYPE':
+ if parts[2] != name:
+ if name != '':
+ yield build_metric(name, documentation, typ, samples)
+ # New metric
+ name = parts[2]
+ documentation = ''
+ samples = []
+ typ = parts[3]
+ allowed_names = {
+ 'counter': [''],
+ 'gauge': [''],
+ 'summary': ['_count', '_sum', ''],
+ 'histogram': ['_count', '_sum', '_bucket'],
+ }.get(typ, [''])
+ allowed_names = [name + n for n in allowed_names]
+ else:
+ # Ignore other comment tokens
+ pass
+ elif line == '':
+ # Ignore blank lines
+ pass
+ else:
+ sample = _parse_sample(line)
+ if sample.name not in allowed_names:
+ if name != '':
+ yield build_metric(name, documentation, typ, samples)
+ # New metric, yield immediately as untyped singleton
+ name = ''
+ documentation = ''
+ typ = 'untyped'
+ samples = []
+ allowed_names = []
+ yield build_metric(sample[0], documentation, typ, [sample])
+ else:
+ samples.append(sample)
+
+ if name != '':
+ yield build_metric(name, documentation, typ, samples)