diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/strictyaml/compound.py')
| -rw-r--r-- | .venv/lib/python3.12/site-packages/strictyaml/compound.py | 320 |
1 files changed, 320 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/strictyaml/compound.py b/.venv/lib/python3.12/site-packages/strictyaml/compound.py new file mode 100644 index 00000000..10899bec --- /dev/null +++ b/.venv/lib/python3.12/site-packages/strictyaml/compound.py @@ -0,0 +1,320 @@ +from strictyaml.exceptions import YAMLSerializationError, InvalidOptionalDefault +from strictyaml.validators import Validator, MapValidator, SeqValidator +from strictyaml.ruamel.comments import CommentedMap, CommentedSeq +from strictyaml.representation import YAML +from strictyaml.scalar import ScalarValidator, Str +from strictyaml.yamllocation import YAMLChunk +import sys + + +if sys.version_info[0] == 3: + unicode = str + + +class Optional(object): + def __init__(self, key, default=None, drop_if_none=True): + self.key = key + self.default = default + self.drop_if_none = drop_if_none + + def __repr__(self): + # TODO: Add default + return 'Optional("{0}")'.format(self.key) + + +class MapPattern(MapValidator): + def __init__( + self, key_validator, value_validator, minimum_keys=None, maximum_keys=None + ): + self._key_validator = key_validator + self._value_validator = value_validator + self._maximum_keys = maximum_keys + self._minimum_keys = minimum_keys + assert isinstance( + self._key_validator, ScalarValidator + ), "key_validator must be ScalarValidator" + assert isinstance( + self._value_validator, Validator + ), "value_validator must be Validator" + assert isinstance( + maximum_keys, (type(None), int) + ), "maximum_keys must be an integer" + assert isinstance( + minimum_keys, (type(None), int) + ), "maximum_keys must be an integer" + + @property + def key_validator(self): + return self._key_validator + + def validate(self, chunk): + items = chunk.expect_mapping() + + if self._maximum_keys is not None and len(items) > self._maximum_keys: + chunk.expecting_but_found( + "while parsing a mapping", + "expected a maximum of {0} key{1}, found {2}.".format( + self._maximum_keys, + "s" if self._maximum_keys > 1 else "", + len(items), + ), + ) + + if self._minimum_keys is not None and len(items) < self._minimum_keys: + chunk.expecting_but_found( + "while parsing a mapping", + "expected a minimum of {0} key{1}, found {2}.".format( + self._minimum_keys, + "s" if self._minimum_keys > 1 else "", + len(items), + ), + ) + + for key, value in items: + yaml_key = self._key_validator(key) + key.process(yaml_key) + value.process(self._value_validator(value)) + chunk.add_key_association(key.contents, yaml_key.data) + + def to_yaml(self, data): + self._should_be_mapping(data) + # TODO : Maximum minimum keys + return CommentedMap( + [ + (self._key_validator.to_yaml(key), self._value_validator.to_yaml(value)) + for key, value in data.items() + ] + ) + + def __repr__(self): + return "MapPattern({0}, {1})".format( + repr(self._key_validator), repr(self._value_validator) + ) + + +class Map(MapValidator): + def __init__(self, validator, key_validator=None): + self._validator = validator + self._key_validator = Str() if key_validator is None else key_validator + assert isinstance( + self._key_validator, ScalarValidator + ), "key validator must be ScalarValidator" + + self._validator_dict = { + key.key if isinstance(key, Optional) else key: value + for key, value in validator.items() + } + + self._required_keys = [ + key for key in validator.keys() if not isinstance(key, Optional) + ] + + for key_val, value_val in validator.items(): + if isinstance(key_val, Optional): + if key_val.default is not None and not key_val.drop_if_none: + raise InvalidOptionalDefault( + "If you have a default that isn't None, drop_if_none must be True." + ) + if key_val.default is not None and key_val.drop_if_none: + try: + value_val.to_yaml(key_val.default) + except YAMLSerializationError as error: + raise InvalidOptionalDefault( + "Optional default for '{}' failed validation:\n {}".format( + key_val.key, error + ) + ) + + self._defaults = { + key.key: key.default + for key in validator.keys() + if isinstance(key, Optional) + and (key.default is not None or not key.drop_if_none) + } + + @property + def key_validator(self): + return self._key_validator + + def __repr__(self): + # TODO : repr key_validator + return "Map({{{0}}})".format( + ", ".join( + [ + "{0}: {1}".format(repr(key), repr(value)) + for key, value in self._validator.items() + ] + ) + ) + + def get_validator(self, key): + return self._validator_dict[key] + + def unexpected_key(self, key, yaml_key, value, chunk): + key.expecting_but_found( + "while parsing a mapping", + "unexpected key not in schema '{0}'".format(unicode(yaml_key.scalar)), + ) + + def validate(self, chunk): + found_keys = set() + items = chunk.expect_mapping() + + for key, value in items: + yaml_key = self._key_validator(key) + + if yaml_key.scalar not in self._validator_dict.keys(): + self.unexpected_key(key, yaml_key, value, chunk) + + value.process(self.get_validator(yaml_key.scalar)(value)) + key.process(yaml_key) + chunk.add_key_association(key.contents, yaml_key.data) + found_keys.add(yaml_key.scalar) + + for default_key, default_data in self._defaults.items(): + if default_key not in [key.contents for key, _ in items]: + key_chunk = YAMLChunk(default_key) + yaml_key = self._key_validator(key_chunk) + strictindex = yaml_key.data + value_validator = self.get_validator(default_key) + new_value = value_validator( + YAMLChunk(value_validator.to_yaml(default_data)) + ) + forked_chunk = chunk.fork(strictindex, new_value) + forked_chunk.val(strictindex).process(new_value) + updated_value = value_validator(forked_chunk.val(strictindex)) + updated_value._chunk.make_child_of(chunk.val(strictindex)) + # marked_up = new_value.as_marked_up() + # chunk.contents[chunk.ruamelindex(strictindex)] = marked_up + chunk.add_key_association(default_key, strictindex) + sp = chunk.strictparsed() + if isinstance(sp, YAML): + # Do not trigger __setitem__ validation at this point, as + # we just ran the validator, and + # representation.py:revalidate() doesn't overwrite the + # _validator property until after all values are checked, + # which leads to an exception being raised if it is + # re-checked. + sp._value[yaml_key] = updated_value + else: + sp[yaml_key] = updated_value + + if not set(self._required_keys).issubset(found_keys): + chunk.while_parsing_found( + "a mapping", + "required key(s) '{0}' not found".format( + "', '".join( + sorted(list(set(self._required_keys).difference(found_keys))) + ) + ), + ) + + def to_yaml(self, data): + self._should_be_mapping(data) + # TODO : if keys not in list or required keys missing, raise exception. + return CommentedMap( + [ + (key, self.get_validator(key).to_yaml(value)) + for key, value in data.items() + if key not in self._defaults.keys() + or key in self._defaults.keys() + and value != self._defaults[key] + ] + ) + + +class MapCombined(Map): + def __init__(self, map_validator, key_validator, value_validator): + super(MapCombined, self).__init__(map_validator, key_validator) + self._value_validator = value_validator + + def get_validator(self, key): + return self._validator_dict.get(key, self._value_validator) + + def unexpected_key(self, key, yaml_key, value, chunk): + pass + + +class Seq(SeqValidator): + def __init__(self, validator): + self._validator = validator + + def __repr__(self): + return "Seq({0})".format(repr(self._validator)) + + def validate(self, chunk): + for item in chunk.expect_sequence(): + item.process(self._validator(item)) + + def to_yaml(self, data): + self._should_be_list(data) + return CommentedSeq([self._validator.to_yaml(item) for item in data]) + + +class FixedSeq(SeqValidator): + def __init__(self, validators): + self._validators = validators + for item in validators: + assert isinstance( + item, Validator + ), "all FixedSeq validators must be Validators" + + def __repr__(self): + return "FixedSeq({0})".format(repr(self._validators)) + + def validate(self, chunk): + sequence = chunk.expect_sequence( + "when expecting a sequence of {0} elements".format(len(self._validators)) + ) + + if len(self._validators) != len(sequence): + chunk.expecting_but_found( + "when expecting a sequence of {0} elements".format( + len(self._validators) + ), + "found a sequence of {0} elements".format(len(chunk.contents)), + ) + + for item, validator in zip(sequence, self._validators): + item.process(validator(item)) + + def to_yaml(self, data): + self._should_be_list(data) + # TODO : Different length string + return CommentedSeq( + [validator.to_yaml(item) for item, validator in zip(data, self._validators)] + ) + + +class UniqueSeq(SeqValidator): + def __init__(self, validator): + self._validator = validator + assert isinstance( + self._validator, ScalarValidator + ), "UniqueSeq validator must be ScalarValidator" + + def __repr__(self): + return "UniqueSeq({0})".format(repr(self._validator)) + + def validate(self, chunk): + existing_items = set() + + for item in chunk.expect_sequence("when expecting a unique sequence"): + if item.contents in existing_items: + chunk.while_parsing_found("a sequence", "duplicate found") + else: + existing_items.add(item.contents) + item.process(self._validator(item)) + + def to_yaml(self, data): + self._should_be_list(data) + + if len(set(data)) < len(data): + raise YAMLSerializationError( + ( + "Expecting all unique items, " + "but duplicates were found in '{}'.".format(data) + ) + ) + + return CommentedSeq([self._validator.to_yaml(item) for item in data]) |
