about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/strictyaml/compound.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/strictyaml/compound.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/strictyaml/compound.py')
-rw-r--r--.venv/lib/python3.12/site-packages/strictyaml/compound.py320
1 files changed, 320 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/strictyaml/compound.py b/.venv/lib/python3.12/site-packages/strictyaml/compound.py
new file mode 100644
index 00000000..10899bec
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/strictyaml/compound.py
@@ -0,0 +1,320 @@
+from strictyaml.exceptions import YAMLSerializationError, InvalidOptionalDefault
+from strictyaml.validators import Validator, MapValidator, SeqValidator
+from strictyaml.ruamel.comments import CommentedMap, CommentedSeq
+from strictyaml.representation import YAML
+from strictyaml.scalar import ScalarValidator, Str
+from strictyaml.yamllocation import YAMLChunk
+import sys
+
+
+if sys.version_info[0] == 3:
+    unicode = str
+
+
+class Optional(object):
+    def __init__(self, key, default=None, drop_if_none=True):
+        self.key = key
+        self.default = default
+        self.drop_if_none = drop_if_none
+
+    def __repr__(self):
+        # TODO: Add default
+        return 'Optional("{0}")'.format(self.key)
+
+
+class MapPattern(MapValidator):
+    def __init__(
+        self, key_validator, value_validator, minimum_keys=None, maximum_keys=None
+    ):
+        self._key_validator = key_validator
+        self._value_validator = value_validator
+        self._maximum_keys = maximum_keys
+        self._minimum_keys = minimum_keys
+        assert isinstance(
+            self._key_validator, ScalarValidator
+        ), "key_validator must be ScalarValidator"
+        assert isinstance(
+            self._value_validator, Validator
+        ), "value_validator must be Validator"
+        assert isinstance(
+            maximum_keys, (type(None), int)
+        ), "maximum_keys must be an integer"
+        assert isinstance(
+            minimum_keys, (type(None), int)
+        ), "maximum_keys must be an integer"
+
+    @property
+    def key_validator(self):
+        return self._key_validator
+
+    def validate(self, chunk):
+        items = chunk.expect_mapping()
+
+        if self._maximum_keys is not None and len(items) > self._maximum_keys:
+            chunk.expecting_but_found(
+                "while parsing a mapping",
+                "expected a maximum of {0} key{1}, found {2}.".format(
+                    self._maximum_keys,
+                    "s" if self._maximum_keys > 1 else "",
+                    len(items),
+                ),
+            )
+
+        if self._minimum_keys is not None and len(items) < self._minimum_keys:
+            chunk.expecting_but_found(
+                "while parsing a mapping",
+                "expected a minimum of {0} key{1}, found {2}.".format(
+                    self._minimum_keys,
+                    "s" if self._minimum_keys > 1 else "",
+                    len(items),
+                ),
+            )
+
+        for key, value in items:
+            yaml_key = self._key_validator(key)
+            key.process(yaml_key)
+            value.process(self._value_validator(value))
+            chunk.add_key_association(key.contents, yaml_key.data)
+
+    def to_yaml(self, data):
+        self._should_be_mapping(data)
+        # TODO : Maximum minimum keys
+        return CommentedMap(
+            [
+                (self._key_validator.to_yaml(key), self._value_validator.to_yaml(value))
+                for key, value in data.items()
+            ]
+        )
+
+    def __repr__(self):
+        return "MapPattern({0}, {1})".format(
+            repr(self._key_validator), repr(self._value_validator)
+        )
+
+
+class Map(MapValidator):
+    def __init__(self, validator, key_validator=None):
+        self._validator = validator
+        self._key_validator = Str() if key_validator is None else key_validator
+        assert isinstance(
+            self._key_validator, ScalarValidator
+        ), "key validator must be ScalarValidator"
+
+        self._validator_dict = {
+            key.key if isinstance(key, Optional) else key: value
+            for key, value in validator.items()
+        }
+
+        self._required_keys = [
+            key for key in validator.keys() if not isinstance(key, Optional)
+        ]
+
+        for key_val, value_val in validator.items():
+            if isinstance(key_val, Optional):
+                if key_val.default is not None and not key_val.drop_if_none:
+                    raise InvalidOptionalDefault(
+                        "If you have a default that isn't None, drop_if_none must be True."
+                    )
+                if key_val.default is not None and key_val.drop_if_none:
+                    try:
+                        value_val.to_yaml(key_val.default)
+                    except YAMLSerializationError as error:
+                        raise InvalidOptionalDefault(
+                            "Optional default for '{}' failed validation:\n  {}".format(
+                                key_val.key, error
+                            )
+                        )
+
+        self._defaults = {
+            key.key: key.default
+            for key in validator.keys()
+            if isinstance(key, Optional)
+            and (key.default is not None or not key.drop_if_none)
+        }
+
+    @property
+    def key_validator(self):
+        return self._key_validator
+
+    def __repr__(self):
+        # TODO : repr key_validator
+        return "Map({{{0}}})".format(
+            ", ".join(
+                [
+                    "{0}: {1}".format(repr(key), repr(value))
+                    for key, value in self._validator.items()
+                ]
+            )
+        )
+
+    def get_validator(self, key):
+        return self._validator_dict[key]
+
+    def unexpected_key(self, key, yaml_key, value, chunk):
+        key.expecting_but_found(
+            "while parsing a mapping",
+            "unexpected key not in schema '{0}'".format(unicode(yaml_key.scalar)),
+        )
+
+    def validate(self, chunk):
+        found_keys = set()
+        items = chunk.expect_mapping()
+
+        for key, value in items:
+            yaml_key = self._key_validator(key)
+
+            if yaml_key.scalar not in self._validator_dict.keys():
+                self.unexpected_key(key, yaml_key, value, chunk)
+
+            value.process(self.get_validator(yaml_key.scalar)(value))
+            key.process(yaml_key)
+            chunk.add_key_association(key.contents, yaml_key.data)
+            found_keys.add(yaml_key.scalar)
+
+        for default_key, default_data in self._defaults.items():
+            if default_key not in [key.contents for key, _ in items]:
+                key_chunk = YAMLChunk(default_key)
+                yaml_key = self._key_validator(key_chunk)
+                strictindex = yaml_key.data
+                value_validator = self.get_validator(default_key)
+                new_value = value_validator(
+                    YAMLChunk(value_validator.to_yaml(default_data))
+                )
+                forked_chunk = chunk.fork(strictindex, new_value)
+                forked_chunk.val(strictindex).process(new_value)
+                updated_value = value_validator(forked_chunk.val(strictindex))
+                updated_value._chunk.make_child_of(chunk.val(strictindex))
+                # marked_up = new_value.as_marked_up()
+                # chunk.contents[chunk.ruamelindex(strictindex)] = marked_up
+                chunk.add_key_association(default_key, strictindex)
+                sp = chunk.strictparsed()
+                if isinstance(sp, YAML):
+                    # Do not trigger __setitem__ validation at this point, as
+                    # we just ran the validator, and
+                    # representation.py:revalidate() doesn't overwrite the
+                    # _validator property until after all values are checked,
+                    # which leads to an exception being raised if it is
+                    # re-checked.
+                    sp._value[yaml_key] = updated_value
+                else:
+                    sp[yaml_key] = updated_value
+
+        if not set(self._required_keys).issubset(found_keys):
+            chunk.while_parsing_found(
+                "a mapping",
+                "required key(s) '{0}' not found".format(
+                    "', '".join(
+                        sorted(list(set(self._required_keys).difference(found_keys)))
+                    )
+                ),
+            )
+
+    def to_yaml(self, data):
+        self._should_be_mapping(data)
+        # TODO : if keys not in list or required keys missing, raise exception.
+        return CommentedMap(
+            [
+                (key, self.get_validator(key).to_yaml(value))
+                for key, value in data.items()
+                if key not in self._defaults.keys()
+                or key in self._defaults.keys()
+                and value != self._defaults[key]
+            ]
+        )
+
+
+class MapCombined(Map):
+    def __init__(self, map_validator, key_validator, value_validator):
+        super(MapCombined, self).__init__(map_validator, key_validator)
+        self._value_validator = value_validator
+
+    def get_validator(self, key):
+        return self._validator_dict.get(key, self._value_validator)
+
+    def unexpected_key(self, key, yaml_key, value, chunk):
+        pass
+
+
+class Seq(SeqValidator):
+    def __init__(self, validator):
+        self._validator = validator
+
+    def __repr__(self):
+        return "Seq({0})".format(repr(self._validator))
+
+    def validate(self, chunk):
+        for item in chunk.expect_sequence():
+            item.process(self._validator(item))
+
+    def to_yaml(self, data):
+        self._should_be_list(data)
+        return CommentedSeq([self._validator.to_yaml(item) for item in data])
+
+
+class FixedSeq(SeqValidator):
+    def __init__(self, validators):
+        self._validators = validators
+        for item in validators:
+            assert isinstance(
+                item, Validator
+            ), "all FixedSeq validators must be Validators"
+
+    def __repr__(self):
+        return "FixedSeq({0})".format(repr(self._validators))
+
+    def validate(self, chunk):
+        sequence = chunk.expect_sequence(
+            "when expecting a sequence of {0} elements".format(len(self._validators))
+        )
+
+        if len(self._validators) != len(sequence):
+            chunk.expecting_but_found(
+                "when expecting a sequence of {0} elements".format(
+                    len(self._validators)
+                ),
+                "found a sequence of {0} elements".format(len(chunk.contents)),
+            )
+
+        for item, validator in zip(sequence, self._validators):
+            item.process(validator(item))
+
+    def to_yaml(self, data):
+        self._should_be_list(data)
+        # TODO : Different length string
+        return CommentedSeq(
+            [validator.to_yaml(item) for item, validator in zip(data, self._validators)]
+        )
+
+
+class UniqueSeq(SeqValidator):
+    def __init__(self, validator):
+        self._validator = validator
+        assert isinstance(
+            self._validator, ScalarValidator
+        ), "UniqueSeq validator must be ScalarValidator"
+
+    def __repr__(self):
+        return "UniqueSeq({0})".format(repr(self._validator))
+
+    def validate(self, chunk):
+        existing_items = set()
+
+        for item in chunk.expect_sequence("when expecting a unique sequence"):
+            if item.contents in existing_items:
+                chunk.while_parsing_found("a sequence", "duplicate found")
+            else:
+                existing_items.add(item.contents)
+                item.process(self._validator(item))
+
+    def to_yaml(self, data):
+        self._should_be_list(data)
+
+        if len(set(data)) < len(data):
+            raise YAMLSerializationError(
+                (
+                    "Expecting all unique items, "
+                    "but duplicates were found in '{}'.".format(data)
+                )
+            )
+
+        return CommentedSeq([self._validator.to_yaml(item) for item in data])