1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
from strictyaml.ruamel.comments import CommentedSeq, CommentedMap
from strictyaml.exceptions import YAMLValidationError
from strictyaml.yamlpointer import YAMLPointer
from strictyaml import utils
from copy import deepcopy, copy
import sys
if sys.version_info[0] == 3:
unicode = str
class YAMLChunk(object):
"""
Represents a section of the document with references to the ruamel
parsed document and the strictparsed document.
Most operations done by validators on the document are done using this object.
Before validation the strictparsed document will be identical to the
ruamelparsed document. After it will contain CommentedMaps, CommentedSeqs
and YAML objects.
"""
def __init__(
self,
ruamelparsed,
pointer=None,
label=None,
strictparsed=None,
key_association=None,
):
self._ruamelparsed = ruamelparsed
self._strictparsed = (
deepcopy(ruamelparsed) if strictparsed is None else strictparsed
)
self._pointer = pointer if pointer is not None else YAMLPointer()
self._label = label
# Associates strictparsed key names with ruamelparsed key names
# E.g. "my-key-name" -> "My Key name"
self._key_association = {} if key_association is None else key_association
def expecting_but_found(self, expecting, found=None):
raise YAMLValidationError(
expecting,
found if found is not None else "found {0}".format(self.found()),
self,
)
def while_parsing_found(self, what, found=None):
self.expecting_but_found("while parsing {0}".format(what), found=found)
def process(self, new_item):
strictparsed = self.pointer.parent().get(self._strictparsed, strictdoc=True)
current_parsed = (
strictparsed._value if hasattr(strictparsed, "_value") else strictparsed
)
def actual_key_from_string_key(string_key):
if string_key in current_parsed.keys():
return string_key
else:
for key in current_parsed.keys():
if hasattr(key, "_value"):
if key.text == string_key:
return key
if self.pointer.is_index():
current_parsed[self.pointer.last_index] = new_item
elif self.pointer.is_val():
current_parsed[
actual_key_from_string_key(self.pointer.last_regularkey)
] = new_item
elif self.pointer.is_key():
key = actual_key_from_string_key(self.pointer.last_regularkey)
existing_val = current_parsed[key]
del current_parsed[key]
current_parsed[new_item] = existing_val
def is_sequence(self):
return isinstance(self.contents, CommentedSeq)
def is_mapping(self):
return isinstance(self.contents, CommentedMap)
def is_scalar(self):
return not isinstance(self.contents, (CommentedMap, CommentedSeq))
def found(self):
if self.is_sequence():
return "a sequence"
elif self.is_mapping():
return "a mapping"
elif self.contents == "":
return "a blank string"
elif utils.is_integer(self.contents):
return "an arbitrary integer"
elif utils.is_decimal(self.contents):
return "an arbitrary number"
else:
return "arbitrary text"
def expect_sequence(self, expecting="when expecting a sequence"):
if not self.is_sequence():
self.expecting_but_found(expecting, "found {0}".format(self.found()))
return [self.index(i) for i in range(len(self.contents))]
def expect_mapping(self):
if not self.is_mapping():
self.expecting_but_found(
"when expecting a mapping", "found {0}".format(self.found())
)
return [
(
self.key(regular_key, unicode(validated_key)),
self.val(unicode(validated_key)),
)
for (regular_key, validated_key) in zip(
self.contents.keys(), self.strictparsed().keys()
)
]
def expect_scalar(self, what):
if not self.is_scalar():
self.expecting_but_found(
"when expecting {0}".format(what), "found {0}".format(self.found())
)
@property
def label(self):
return self._label
@property
def whole_document(self):
return self._ruamelparsed
@property
def pointer(self):
return self._pointer
def fork(self, strictindex, new_value):
"""
Return a chunk referring to the same location in a duplicated document.
Used when modifying a YAML chunk so that the modification can be validated
before changing it.
"""
forked_chunk = YAMLChunk(
deepcopy(self._ruamelparsed),
pointer=self.pointer,
label=self.label,
key_association=copy(self._key_association),
)
if self.is_scalar():
# Necessary for e.g. EmptyDict, which reports as a scalar.
forked_chunk.pointer.set(forked_chunk, "_ruamelparsed", CommentedMap())
forked_chunk.pointer.set(
forked_chunk, "_strictparsed", CommentedMap(), strictdoc=True
)
forked_chunk.contents[self.ruamelindex(strictindex)] = new_value.as_marked_up()
forked_chunk.strictparsed()[strictindex] = deepcopy(new_value.as_marked_up())
return forked_chunk
def add_key_association(self, unprocessed_key, processed_key):
self._key_association[processed_key] = unprocessed_key
@property
def key_association(self):
return self._key_association
def make_child_of(self, chunk):
"""
Link one YAML chunk to another.
Used when inserting a chunk of YAML into another chunk.
"""
if self.is_mapping():
for key, value in self.contents.items():
self.key(key, key).pointer.make_child_of(chunk.pointer)
self.val(key).make_child_of(chunk)
elif self.is_sequence():
for index, item in enumerate(self.contents):
self.index(index).make_child_of(chunk)
else:
self.pointer.make_child_of(chunk.pointer)
def _select(self, pointer):
"""
Get a YAMLChunk referenced by a pointer.
"""
return YAMLChunk(
self._ruamelparsed,
pointer=pointer,
label=self._label,
strictparsed=self._strictparsed,
key_association=copy(self._key_association),
)
def index(self, strictindex):
"""
Return a chunk in a sequence referenced by index.
"""
return self._select(self._pointer.index(self.ruamelindex(strictindex)))
def ruamelindex(self, strictindex):
"""
Get the ruamel equivalent of a strict parsed index.
E.g. 0 -> 0, 1 -> 2, parsed-via-slugify -> Parsed via slugify
"""
return (
self.key_association.get(strictindex, strictindex)
if self.is_mapping()
else strictindex
)
def val(self, strictkey):
"""
Return a chunk referencing a value in a mapping with the key 'key'.
"""
ruamelkey = self.ruamelindex(strictkey)
return self._select(self._pointer.val(ruamelkey, strictkey))
def key(self, key, strictkey=None):
"""
Return a chunk referencing a key in a mapping with the name 'key'.
"""
return self._select(self._pointer.key(key, strictkey))
def textslice(self, start, end):
"""
Return a chunk referencing a slice of a scalar text value.
"""
return self._select(self._pointer.textslice(start, end))
def start_line(self):
return self._pointer.start_line(self._ruamelparsed)
def end_line(self):
return self._pointer.end_line(self._ruamelparsed)
def lines(self):
return self._pointer.lines(self._ruamelparsed)
def lines_before(self, how_many):
return self._pointer.lines_before(self._ruamelparsed, how_many)
def lines_after(self, how_many):
return self._pointer.lines_after(self._ruamelparsed, how_many)
@property
def contents(self):
return self._pointer.get(self._ruamelparsed)
def strictparsed(self):
return self._pointer.get(self._strictparsed, strictdoc=True)
|