1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
|
from strictyaml.exceptions import YAMLSerializationError, InvalidOptionalDefault
from strictyaml.validators import Validator, MapValidator, SeqValidator
from strictyaml.ruamel.comments import CommentedMap, CommentedSeq
from strictyaml.representation import YAML
from strictyaml.scalar import ScalarValidator, Str
from strictyaml.yamllocation import YAMLChunk
import sys
if sys.version_info[0] == 3:
unicode = str
class Optional(object):
def __init__(self, key, default=None, drop_if_none=True):
self.key = key
self.default = default
self.drop_if_none = drop_if_none
def __repr__(self):
# TODO: Add default
return 'Optional("{0}")'.format(self.key)
class MapPattern(MapValidator):
def __init__(
self, key_validator, value_validator, minimum_keys=None, maximum_keys=None
):
self._key_validator = key_validator
self._value_validator = value_validator
self._maximum_keys = maximum_keys
self._minimum_keys = minimum_keys
assert isinstance(
self._key_validator, ScalarValidator
), "key_validator must be ScalarValidator"
assert isinstance(
self._value_validator, Validator
), "value_validator must be Validator"
assert isinstance(
maximum_keys, (type(None), int)
), "maximum_keys must be an integer"
assert isinstance(
minimum_keys, (type(None), int)
), "maximum_keys must be an integer"
@property
def key_validator(self):
return self._key_validator
def validate(self, chunk):
items = chunk.expect_mapping()
if self._maximum_keys is not None and len(items) > self._maximum_keys:
chunk.expecting_but_found(
"while parsing a mapping",
"expected a maximum of {0} key{1}, found {2}.".format(
self._maximum_keys,
"s" if self._maximum_keys > 1 else "",
len(items),
),
)
if self._minimum_keys is not None and len(items) < self._minimum_keys:
chunk.expecting_but_found(
"while parsing a mapping",
"expected a minimum of {0} key{1}, found {2}.".format(
self._minimum_keys,
"s" if self._minimum_keys > 1 else "",
len(items),
),
)
for key, value in items:
yaml_key = self._key_validator(key)
key.process(yaml_key)
value.process(self._value_validator(value))
chunk.add_key_association(key.contents, yaml_key.data)
def to_yaml(self, data):
self._should_be_mapping(data)
# TODO : Maximum minimum keys
return CommentedMap(
[
(self._key_validator.to_yaml(key), self._value_validator.to_yaml(value))
for key, value in data.items()
]
)
def __repr__(self):
return "MapPattern({0}, {1})".format(
repr(self._key_validator), repr(self._value_validator)
)
class Map(MapValidator):
def __init__(self, validator, key_validator=None):
self._validator = validator
self._key_validator = Str() if key_validator is None else key_validator
assert isinstance(
self._key_validator, ScalarValidator
), "key validator must be ScalarValidator"
self._validator_dict = {
key.key if isinstance(key, Optional) else key: value
for key, value in validator.items()
}
self._required_keys = [
key for key in validator.keys() if not isinstance(key, Optional)
]
for key_val, value_val in validator.items():
if isinstance(key_val, Optional):
if key_val.default is not None and not key_val.drop_if_none:
raise InvalidOptionalDefault(
"If you have a default that isn't None, drop_if_none must be True."
)
if key_val.default is not None and key_val.drop_if_none:
try:
value_val.to_yaml(key_val.default)
except YAMLSerializationError as error:
raise InvalidOptionalDefault(
"Optional default for '{}' failed validation:\n {}".format(
key_val.key, error
)
)
self._defaults = {
key.key: key.default
for key in validator.keys()
if isinstance(key, Optional)
and (key.default is not None or not key.drop_if_none)
}
@property
def key_validator(self):
return self._key_validator
def __repr__(self):
# TODO : repr key_validator
return "Map({{{0}}})".format(
", ".join(
[
"{0}: {1}".format(repr(key), repr(value))
for key, value in self._validator.items()
]
)
)
def get_validator(self, key):
return self._validator_dict[key]
def unexpected_key(self, key, yaml_key, value, chunk):
key.expecting_but_found(
"while parsing a mapping",
"unexpected key not in schema '{0}'".format(unicode(yaml_key.scalar)),
)
def validate(self, chunk):
found_keys = set()
items = chunk.expect_mapping()
for key, value in items:
yaml_key = self._key_validator(key)
if yaml_key.scalar not in self._validator_dict.keys():
self.unexpected_key(key, yaml_key, value, chunk)
value.process(self.get_validator(yaml_key.scalar)(value))
key.process(yaml_key)
chunk.add_key_association(key.contents, yaml_key.data)
found_keys.add(yaml_key.scalar)
for default_key, default_data in self._defaults.items():
if default_key not in [key.contents for key, _ in items]:
key_chunk = YAMLChunk(default_key)
yaml_key = self._key_validator(key_chunk)
strictindex = yaml_key.data
value_validator = self.get_validator(default_key)
new_value = value_validator(
YAMLChunk(value_validator.to_yaml(default_data))
)
forked_chunk = chunk.fork(strictindex, new_value)
forked_chunk.val(strictindex).process(new_value)
updated_value = value_validator(forked_chunk.val(strictindex))
updated_value._chunk.make_child_of(chunk.val(strictindex))
# marked_up = new_value.as_marked_up()
# chunk.contents[chunk.ruamelindex(strictindex)] = marked_up
chunk.add_key_association(default_key, strictindex)
sp = chunk.strictparsed()
if isinstance(sp, YAML):
# Do not trigger __setitem__ validation at this point, as
# we just ran the validator, and
# representation.py:revalidate() doesn't overwrite the
# _validator property until after all values are checked,
# which leads to an exception being raised if it is
# re-checked.
sp._value[yaml_key] = updated_value
else:
sp[yaml_key] = updated_value
if not set(self._required_keys).issubset(found_keys):
chunk.while_parsing_found(
"a mapping",
"required key(s) '{0}' not found".format(
"', '".join(
sorted(list(set(self._required_keys).difference(found_keys)))
)
),
)
def to_yaml(self, data):
self._should_be_mapping(data)
# TODO : if keys not in list or required keys missing, raise exception.
return CommentedMap(
[
(key, self.get_validator(key).to_yaml(value))
for key, value in data.items()
if key not in self._defaults.keys()
or key in self._defaults.keys()
and value != self._defaults[key]
]
)
class MapCombined(Map):
def __init__(self, map_validator, key_validator, value_validator):
super(MapCombined, self).__init__(map_validator, key_validator)
self._value_validator = value_validator
def get_validator(self, key):
return self._validator_dict.get(key, self._value_validator)
def unexpected_key(self, key, yaml_key, value, chunk):
pass
class Seq(SeqValidator):
def __init__(self, validator):
self._validator = validator
def __repr__(self):
return "Seq({0})".format(repr(self._validator))
def validate(self, chunk):
for item in chunk.expect_sequence():
item.process(self._validator(item))
def to_yaml(self, data):
self._should_be_list(data)
return CommentedSeq([self._validator.to_yaml(item) for item in data])
class FixedSeq(SeqValidator):
def __init__(self, validators):
self._validators = validators
for item in validators:
assert isinstance(
item, Validator
), "all FixedSeq validators must be Validators"
def __repr__(self):
return "FixedSeq({0})".format(repr(self._validators))
def validate(self, chunk):
sequence = chunk.expect_sequence(
"when expecting a sequence of {0} elements".format(len(self._validators))
)
if len(self._validators) != len(sequence):
chunk.expecting_but_found(
"when expecting a sequence of {0} elements".format(
len(self._validators)
),
"found a sequence of {0} elements".format(len(chunk.contents)),
)
for item, validator in zip(sequence, self._validators):
item.process(validator(item))
def to_yaml(self, data):
self._should_be_list(data)
# TODO : Different length string
return CommentedSeq(
[validator.to_yaml(item) for item, validator in zip(data, self._validators)]
)
class UniqueSeq(SeqValidator):
def __init__(self, validator):
self._validator = validator
assert isinstance(
self._validator, ScalarValidator
), "UniqueSeq validator must be ScalarValidator"
def __repr__(self):
return "UniqueSeq({0})".format(repr(self._validator))
def validate(self, chunk):
existing_items = set()
for item in chunk.expect_sequence("when expecting a unique sequence"):
if item.contents in existing_items:
chunk.while_parsing_found("a sequence", "duplicate found")
else:
existing_items.add(item.contents)
item.process(self._validator(item))
def to_yaml(self, data):
self._should_be_list(data)
if len(set(data)) < len(data):
raise YAMLSerializationError(
(
"Expecting all unique items, "
"but duplicates were found in '{}'.".format(data)
)
)
return CommentedSeq([self._validator.to_yaml(item) for item in data])
|