diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/lxml/serializer.pxi')
-rw-r--r-- | .venv/lib/python3.12/site-packages/lxml/serializer.pxi | 1781 |
1 files changed, 1781 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/lxml/serializer.pxi b/.venv/lib/python3.12/site-packages/lxml/serializer.pxi new file mode 100644 index 00000000..f0de0f9f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/lxml/serializer.pxi @@ -0,0 +1,1781 @@ +# XML serialization and output functions + +cdef object GzipFile +from gzip import GzipFile + + +cdef class SerialisationError(LxmlError): + """A libxml2 error that occurred during serialisation. + """ + + +cdef enum _OutputMethods: + OUTPUT_METHOD_XML + OUTPUT_METHOD_HTML + OUTPUT_METHOD_TEXT + + +cdef int _findOutputMethod(method) except -1: + if method is None: + return OUTPUT_METHOD_XML + method = method.lower() + if method == "xml": + return OUTPUT_METHOD_XML + if method == "html": + return OUTPUT_METHOD_HTML + if method == "text": + return OUTPUT_METHOD_TEXT + raise ValueError(f"unknown output method {method!r}") + + +cdef _textToString(xmlNode* c_node, encoding, bint with_tail): + cdef bint needs_conversion + cdef const_xmlChar* c_text + cdef xmlNode* c_text_node + cdef tree.xmlBuffer* c_buffer + cdef int error_result + + c_buffer = tree.xmlBufferCreate() + if c_buffer is NULL: + raise MemoryError() + + with nogil: + error_result = tree.xmlNodeBufGetContent(c_buffer, c_node) + if with_tail: + c_text_node = _textNodeOrSkip(c_node.next) + while c_text_node is not NULL: + tree.xmlBufferWriteChar(c_buffer, <const_char*>c_text_node.content) + c_text_node = _textNodeOrSkip(c_text_node.next) + c_text = tree.xmlBufferContent(c_buffer) + + if error_result < 0 or c_text is NULL: + tree.xmlBufferFree(c_buffer) + raise SerialisationError, "Error during serialisation (out of memory?)" + + try: + needs_conversion = 0 + if encoding is unicode: + needs_conversion = 1 + elif encoding is not None: + # Python prefers lower case encoding names + encoding = encoding.lower() + if encoding not in ('utf8', 'utf-8'): + if encoding == 'ascii': + if isutf8l(c_text, tree.xmlBufferLength(c_buffer)): + # will raise a decode error below + needs_conversion = 1 + else: + needs_conversion = 1 + + if needs_conversion: + text = (<const_char*>c_text)[:tree.xmlBufferLength(c_buffer)].decode('utf8') + if encoding is not unicode: + encoding = _utf8(encoding) + text = python.PyUnicode_AsEncodedString( + text, encoding, 'strict') + else: + text = (<unsigned char*>c_text)[:tree.xmlBufferLength(c_buffer)] + finally: + tree.xmlBufferFree(c_buffer) + return text + + +cdef _tostring(_Element element, encoding, doctype, method, + bint write_xml_declaration, bint write_complete_document, + bint pretty_print, bint with_tail, int standalone): + """Serialize an element to an encoded string representation of its XML + tree. + """ + cdef tree.xmlOutputBuffer* c_buffer + cdef tree.xmlBuf* c_result_buffer + cdef tree.xmlCharEncodingHandler* enchandler + cdef const_char* c_enc + cdef const_xmlChar* c_version + cdef const_xmlChar* c_doctype + cdef int c_method + cdef int error_result + if element is None: + return None + _assertValidNode(element) + c_method = _findOutputMethod(method) + if c_method == OUTPUT_METHOD_TEXT: + return _textToString(element._c_node, encoding, with_tail) + if encoding is None or encoding is unicode: + c_enc = NULL + else: + encoding = _utf8(encoding) + c_enc = _cstr(encoding) + if doctype is None: + c_doctype = NULL + else: + doctype = _utf8(doctype) + c_doctype = _xcstr(doctype) + # it is necessary to *and* find the encoding handler *and* use + # encoding during output + enchandler = tree.xmlFindCharEncodingHandler(c_enc) + if enchandler is NULL and c_enc is not NULL: + if encoding is not None: + encoding = encoding.decode('UTF-8') + raise LookupError, f"unknown encoding: '{encoding}'" + c_buffer = tree.xmlAllocOutputBuffer(enchandler) + if c_buffer is NULL: + tree.xmlCharEncCloseFunc(enchandler) + raise MemoryError() + + with nogil: + _writeNodeToBuffer(c_buffer, element._c_node, c_enc, c_doctype, c_method, + write_xml_declaration, write_complete_document, + pretty_print, with_tail, standalone) + tree.xmlOutputBufferFlush(c_buffer) + if c_buffer.conv is not NULL: + c_result_buffer = c_buffer.conv + else: + c_result_buffer = c_buffer.buffer + + error_result = c_buffer.error + if error_result != xmlerror.XML_ERR_OK: + tree.xmlOutputBufferClose(c_buffer) + _raiseSerialisationError(error_result) + + try: + if encoding is unicode: + result = (<unsigned char*>tree.xmlBufContent( + c_result_buffer))[:tree.xmlBufUse(c_result_buffer)].decode('UTF-8') + else: + result = <bytes>(<unsigned char*>tree.xmlBufContent( + c_result_buffer))[:tree.xmlBufUse(c_result_buffer)] + finally: + error_result = tree.xmlOutputBufferClose(c_buffer) + if error_result == -1: + _raiseSerialisationError(error_result) + return result + +cdef bytes _tostringC14N(element_or_tree, bint exclusive, bint with_comments, inclusive_ns_prefixes): + cdef xmlDoc* c_doc + cdef xmlChar* c_buffer = NULL + cdef int byte_count = -1 + cdef bytes result + cdef _Document doc + cdef _Element element + cdef xmlChar **c_inclusive_ns_prefixes + + if isinstance(element_or_tree, _Element): + _assertValidNode(<_Element>element_or_tree) + doc = (<_Element>element_or_tree)._doc + c_doc = _plainFakeRootDoc(doc._c_doc, (<_Element>element_or_tree)._c_node, 0) + else: + doc = _documentOrRaise(element_or_tree) + _assertValidDoc(doc) + c_doc = doc._c_doc + + c_inclusive_ns_prefixes = _convert_ns_prefixes(c_doc.dict, inclusive_ns_prefixes) if inclusive_ns_prefixes else NULL + try: + with nogil: + byte_count = c14n.xmlC14NDocDumpMemory( + c_doc, NULL, exclusive, c_inclusive_ns_prefixes, with_comments, &c_buffer) + + finally: + _destroyFakeDoc(doc._c_doc, c_doc) + if c_inclusive_ns_prefixes is not NULL: + python.lxml_free(c_inclusive_ns_prefixes) + + if byte_count < 0 or c_buffer is NULL: + if c_buffer is not NULL: + tree.xmlFree(c_buffer) + raise C14NError, "C14N failed" + try: + result = c_buffer[:byte_count] + finally: + tree.xmlFree(c_buffer) + return result + +cdef _raiseSerialisationError(int error_result): + if error_result == xmlerror.XML_ERR_NO_MEMORY: + raise MemoryError() + message = ErrorTypes._getName(error_result) + if message is None: + message = f"unknown error {error_result}" + raise SerialisationError, message + +############################################################ +# low-level serialisation functions + +cdef void _writeDoctype(tree.xmlOutputBuffer* c_buffer, + const_xmlChar* c_doctype) noexcept nogil: + tree.xmlOutputBufferWrite(c_buffer, tree.xmlStrlen(c_doctype), + <const_char*>c_doctype) + tree.xmlOutputBufferWriteString(c_buffer, "\n") + +cdef void _writeNodeToBuffer(tree.xmlOutputBuffer* c_buffer, + xmlNode* c_node, const_char* encoding, const_xmlChar* c_doctype, + int c_method, bint write_xml_declaration, + bint write_complete_document, + bint pretty_print, bint with_tail, + int standalone) noexcept nogil: + cdef xmlNode* c_nsdecl_node + cdef xmlDoc* c_doc = c_node.doc + if write_xml_declaration and c_method == OUTPUT_METHOD_XML: + _writeDeclarationToBuffer(c_buffer, c_doc.version, encoding, standalone) + + # comments/processing instructions before doctype declaration + if write_complete_document and not c_buffer.error and c_doc.intSubset: + _writePrevSiblings(c_buffer, <xmlNode*>c_doc.intSubset, encoding, pretty_print) + + if c_doctype: + _writeDoctype(c_buffer, c_doctype) + # write internal DTD subset, preceding PIs/comments, etc. + if write_complete_document and not c_buffer.error: + if c_doctype is NULL: + _writeDtdToBuffer(c_buffer, c_doc, c_node.name, c_method, encoding) + _writePrevSiblings(c_buffer, c_node, encoding, pretty_print) + + c_nsdecl_node = c_node + if not c_node.parent or c_node.parent.type != tree.XML_DOCUMENT_NODE: + # copy the node and add namespaces from parents + # this is required to make libxml write them + c_nsdecl_node = tree.xmlCopyNode(c_node, 2) + if not c_nsdecl_node: + c_buffer.error = xmlerror.XML_ERR_NO_MEMORY + return + _copyParentNamespaces(c_node, c_nsdecl_node) + + c_nsdecl_node.parent = c_node.parent + c_nsdecl_node.children = c_node.children + c_nsdecl_node.last = c_node.last + + # write node + if c_method == OUTPUT_METHOD_HTML: + tree.htmlNodeDumpFormatOutput( + c_buffer, c_doc, c_nsdecl_node, encoding, pretty_print) + else: + tree.xmlNodeDumpOutput( + c_buffer, c_doc, c_nsdecl_node, 0, pretty_print, encoding) + + if c_nsdecl_node is not c_node: + # clean up + c_nsdecl_node.children = c_nsdecl_node.last = NULL + tree.xmlFreeNode(c_nsdecl_node) + + if c_buffer.error: + return + + # write tail, trailing comments, etc. + if with_tail: + _writeTail(c_buffer, c_node, encoding, c_method, pretty_print) + if write_complete_document: + _writeNextSiblings(c_buffer, c_node, encoding, pretty_print) + if pretty_print: + tree.xmlOutputBufferWrite(c_buffer, 1, "\n") + +cdef void _writeDeclarationToBuffer(tree.xmlOutputBuffer* c_buffer, + const_xmlChar* version, const_char* encoding, + int standalone) noexcept nogil: + if version is NULL: + version = <unsigned char*>"1.0" + tree.xmlOutputBufferWrite(c_buffer, 15, "<?xml version='") + tree.xmlOutputBufferWriteString(c_buffer, <const_char*>version) + tree.xmlOutputBufferWrite(c_buffer, 12, "' encoding='") + tree.xmlOutputBufferWriteString(c_buffer, encoding) + if standalone == 0: + tree.xmlOutputBufferWrite(c_buffer, 20, "' standalone='no'?>\n") + elif standalone == 1: + tree.xmlOutputBufferWrite(c_buffer, 21, "' standalone='yes'?>\n") + else: + tree.xmlOutputBufferWrite(c_buffer, 4, "'?>\n") + +cdef void _writeDtdToBuffer(tree.xmlOutputBuffer* c_buffer, + xmlDoc* c_doc, const_xmlChar* c_root_name, + int c_method, const_char* encoding) noexcept nogil: + cdef tree.xmlDtd* c_dtd + cdef xmlNode* c_node + cdef char* quotechar + c_dtd = c_doc.intSubset + if not c_dtd or not c_dtd.name: + return + + # Name in document type declaration must match the root element tag. + # For XML, case sensitive match, for HTML insensitive. + if c_method == OUTPUT_METHOD_HTML: + if tree.xmlStrcasecmp(c_root_name, c_dtd.name) != 0: + return + else: + if tree.xmlStrcmp(c_root_name, c_dtd.name) != 0: + return + + tree.xmlOutputBufferWrite(c_buffer, 10, "<!DOCTYPE ") + tree.xmlOutputBufferWriteString(c_buffer, <const_char*>c_dtd.name) + + cdef const_xmlChar* public_id = c_dtd.ExternalID + cdef const_xmlChar* sys_url = c_dtd.SystemID + if public_id and public_id[0] == b'\0': + public_id = NULL + if sys_url and sys_url[0] == b'\0': + sys_url = NULL + + if public_id: + tree.xmlOutputBufferWrite(c_buffer, 9, ' PUBLIC "') + tree.xmlOutputBufferWriteString(c_buffer, <const_char*>public_id) + if sys_url: + tree.xmlOutputBufferWrite(c_buffer, 2, '" ') + else: + tree.xmlOutputBufferWrite(c_buffer, 1, '"') + elif sys_url: + tree.xmlOutputBufferWrite(c_buffer, 8, ' SYSTEM ') + + if sys_url: + if tree.xmlStrchr(sys_url, b'"'): + quotechar = '\'' + else: + quotechar = '"' + tree.xmlOutputBufferWrite(c_buffer, 1, quotechar) + tree.xmlOutputBufferWriteString(c_buffer, <const_char*>sys_url) + tree.xmlOutputBufferWrite(c_buffer, 1, quotechar) + + if (not c_dtd.entities and not c_dtd.elements and + not c_dtd.attributes and not c_dtd.notations and + not c_dtd.pentities): + tree.xmlOutputBufferWrite(c_buffer, 2, '>\n') + return + + tree.xmlOutputBufferWrite(c_buffer, 3, ' [\n') + if c_dtd.notations and not c_buffer.error: + c_buf = tree.xmlBufferCreate() + if not c_buf: + c_buffer.error = xmlerror.XML_ERR_NO_MEMORY + return + tree.xmlDumpNotationTable(c_buf, <tree.xmlNotationTable*>c_dtd.notations) + tree.xmlOutputBufferWrite( + c_buffer, tree.xmlBufferLength(c_buf), + <const_char*>tree.xmlBufferContent(c_buf)) + tree.xmlBufferFree(c_buf) + c_node = c_dtd.children + while c_node and not c_buffer.error: + tree.xmlNodeDumpOutput(c_buffer, c_node.doc, c_node, 0, 0, encoding) + c_node = c_node.next + tree.xmlOutputBufferWrite(c_buffer, 3, "]>\n") + +cdef void _writeTail(tree.xmlOutputBuffer* c_buffer, xmlNode* c_node, + const_char* encoding, int c_method, bint pretty_print) noexcept nogil: + "Write the element tail." + c_node = c_node.next + while c_node and not c_buffer.error and c_node.type in ( + tree.XML_TEXT_NODE, tree.XML_CDATA_SECTION_NODE): + if c_method == OUTPUT_METHOD_HTML: + tree.htmlNodeDumpFormatOutput( + c_buffer, c_node.doc, c_node, encoding, pretty_print) + else: + tree.xmlNodeDumpOutput( + c_buffer, c_node.doc, c_node, 0, pretty_print, encoding) + c_node = c_node.next + +cdef void _writePrevSiblings(tree.xmlOutputBuffer* c_buffer, xmlNode* c_node, + const_char* encoding, bint pretty_print) noexcept nogil: + cdef xmlNode* c_sibling + if c_node.parent and _isElement(c_node.parent): + return + # we are at a root node, so add PI and comment siblings + c_sibling = c_node + while c_sibling.prev and \ + (c_sibling.prev.type == tree.XML_PI_NODE or + c_sibling.prev.type == tree.XML_COMMENT_NODE): + c_sibling = c_sibling.prev + while c_sibling is not c_node and not c_buffer.error: + tree.xmlNodeDumpOutput(c_buffer, c_node.doc, c_sibling, 0, + pretty_print, encoding) + if pretty_print: + tree.xmlOutputBufferWriteString(c_buffer, "\n") + c_sibling = c_sibling.next + +cdef void _writeNextSiblings(tree.xmlOutputBuffer* c_buffer, xmlNode* c_node, + const_char* encoding, bint pretty_print) noexcept nogil: + cdef xmlNode* c_sibling + if c_node.parent and _isElement(c_node.parent): + return + # we are at a root node, so add PI and comment siblings + c_sibling = c_node.next + while not c_buffer.error and c_sibling and \ + (c_sibling.type == tree.XML_PI_NODE or + c_sibling.type == tree.XML_COMMENT_NODE): + if pretty_print: + tree.xmlOutputBufferWriteString(c_buffer, "\n") + tree.xmlNodeDumpOutput(c_buffer, c_node.doc, c_sibling, 0, + pretty_print, encoding) + c_sibling = c_sibling.next + + +# copied and adapted from libxml2 (xmlBufAttrSerializeTxtContent()) +cdef _write_attr_string(tree.xmlOutputBuffer* buf, const char *string): + cdef const char *base + cdef const char *cur + + if string == NULL: + return + + base = cur = <const char*>string + while cur[0] != 0: + if cur[0] == b'\n': + if base != cur: + tree.xmlOutputBufferWrite(buf, cur - base, base) + + tree.xmlOutputBufferWrite(buf, 5, " ") + cur += 1 + base = cur + + elif cur[0] == b'\r': + if base != cur: + tree.xmlOutputBufferWrite(buf, cur - base, base) + + tree.xmlOutputBufferWrite(buf, 5, " ") + cur += 1 + base = cur + + elif cur[0] == b'\t': + if base != cur: + tree.xmlOutputBufferWrite(buf, cur - base, base) + + tree.xmlOutputBufferWrite(buf, 4, "	") + cur += 1 + base = cur + + elif cur[0] == b'"': + if base != cur: + tree.xmlOutputBufferWrite(buf, cur - base, base) + + tree.xmlOutputBufferWrite(buf, 6, """) + cur += 1 + base = cur + + elif cur[0] == b'<': + if base != cur: + tree.xmlOutputBufferWrite(buf, cur - base, base) + + tree.xmlOutputBufferWrite(buf, 4, "<") + cur += 1 + base = cur + + elif cur[0] == b'>': + if base != cur: + tree.xmlOutputBufferWrite(buf, cur - base, base) + + tree.xmlOutputBufferWrite(buf, 4, ">") + cur += 1 + base = cur + elif cur[0] == b'&': + if base != cur: + tree.xmlOutputBufferWrite(buf, cur - base, base) + + tree.xmlOutputBufferWrite(buf, 5, "&") + cur += 1 + base = cur + + else: + # Leave further encoding and escaping to the buffer encoder. + cur += 1 + + if base != cur: + tree.xmlOutputBufferWrite(buf, cur - base, base) + + +############################################################ +# output to file-like objects + +cdef object io_open +from io import open as io_open + +cdef object gzip +import gzip + +cdef object getwriter +from codecs import getwriter +cdef object utf8_writer = getwriter('utf8') + +cdef object contextmanager +from contextlib import contextmanager + +cdef object _open_utf8_file + +@contextmanager +def _open_utf8_file(file, compression=0): + file = _getFSPathOrObject(file) + if _isString(file): + if compression: + with gzip.GzipFile(file, mode='wb', compresslevel=compression) as zf: + yield utf8_writer(zf) + else: + with io_open(file, 'w', encoding='utf8') as f: + yield f + else: + if compression: + with gzip.GzipFile(fileobj=file, mode='wb', compresslevel=compression) as zf: + yield utf8_writer(zf) + else: + yield utf8_writer(file) + + +@cython.final +@cython.internal +cdef class _FilelikeWriter: + cdef object _filelike + cdef object _close_filelike + cdef _ExceptionContext _exc_context + cdef _ErrorLog error_log + def __cinit__(self, filelike, exc_context=None, compression=None, close=False): + if compression is not None and compression > 0: + filelike = GzipFile( + fileobj=filelike, mode='wb', compresslevel=compression) + self._close_filelike = filelike.close + elif close: + self._close_filelike = filelike.close + self._filelike = filelike + if exc_context is None: + self._exc_context = _ExceptionContext() + else: + self._exc_context = exc_context + self.error_log = _ErrorLog() + + cdef tree.xmlOutputBuffer* _createOutputBuffer( + self, tree.xmlCharEncodingHandler* enchandler) except NULL: + cdef tree.xmlOutputBuffer* c_buffer + c_buffer = tree.xmlOutputBufferCreateIO( + <tree.xmlOutputWriteCallback>_writeFilelikeWriter, _closeFilelikeWriter, + <python.PyObject*>self, enchandler) + if c_buffer is NULL: + raise IOError, "Could not create I/O writer context." + return c_buffer + + cdef int write(self, char* c_buffer, int size) noexcept: + try: + if self._filelike is None: + raise IOError, "File is already closed" + py_buffer = <bytes>c_buffer[:size] + self._filelike.write(py_buffer) + except: + size = -1 + self._exc_context._store_raised() + finally: + return size # and swallow any further exceptions + + cdef int close(self) noexcept: + retval = 0 + try: + if self._close_filelike is not None: + self._close_filelike() + # we should not close the file here as we didn't open it + self._filelike = None + except: + retval = -1 + self._exc_context._store_raised() + finally: + return retval # and swallow any further exceptions + +cdef int _writeFilelikeWriter(void* ctxt, char* c_buffer, int length) noexcept: + return (<_FilelikeWriter>ctxt).write(c_buffer, length) + +cdef int _closeFilelikeWriter(void* ctxt) noexcept: + return (<_FilelikeWriter>ctxt).close() + +cdef _tofilelike(f, _Element element, encoding, doctype, method, + bint write_xml_declaration, bint write_doctype, + bint pretty_print, bint with_tail, int standalone, + int compression): + cdef _FilelikeWriter writer = None + cdef tree.xmlOutputBuffer* c_buffer + cdef tree.xmlCharEncodingHandler* enchandler + cdef const_char* c_enc + cdef const_xmlChar* c_doctype + cdef int error_result + + c_method = _findOutputMethod(method) + if c_method == OUTPUT_METHOD_TEXT: + data = _textToString(element._c_node, encoding, with_tail) + if compression: + bytes_out = BytesIO() + with GzipFile(fileobj=bytes_out, mode='wb', compresslevel=compression) as gzip_file: + gzip_file.write(data) + data = bytes_out.getvalue() + f = _getFSPathOrObject(f) + if _isString(f): + filename8 = _encodeFilename(f) + with open(filename8, 'wb') as f: + f.write(data) + else: + f.write(data) + return + + if encoding is None: + c_enc = NULL + else: + encoding = _utf8(encoding) + c_enc = _cstr(encoding) + if doctype is None: + c_doctype = NULL + else: + doctype = _utf8(doctype) + c_doctype = _xcstr(doctype) + + writer = _create_output_buffer(f, c_enc, compression, &c_buffer, close=False) + if writer is None: + with nogil: + error_result = _serialise_node( + c_buffer, c_doctype, c_enc, element._c_node, c_method, + write_xml_declaration, write_doctype, pretty_print, with_tail, standalone) + else: + error_result = _serialise_node( + c_buffer, c_doctype, c_enc, element._c_node, c_method, + write_xml_declaration, write_doctype, pretty_print, with_tail, standalone) + + if writer is not None: + writer._exc_context._raise_if_stored() + if error_result != xmlerror.XML_ERR_OK: + _raiseSerialisationError(error_result) + + +cdef int _serialise_node(tree.xmlOutputBuffer* c_buffer, const_xmlChar* c_doctype, + const_char* c_enc, xmlNode* c_node, int c_method, + bint write_xml_declaration, bint write_doctype, bint pretty_print, + bint with_tail, int standalone) noexcept nogil: + _writeNodeToBuffer( + c_buffer, c_node, c_enc, c_doctype, c_method, + write_xml_declaration, write_doctype, pretty_print, with_tail, standalone) + error_result = c_buffer.error + if error_result == xmlerror.XML_ERR_OK: + error_result = tree.xmlOutputBufferClose(c_buffer) + if error_result != -1: + error_result = xmlerror.XML_ERR_OK + else: + tree.xmlOutputBufferClose(c_buffer) + return error_result + + +cdef _FilelikeWriter _create_output_buffer( + f, const_char* c_enc, int c_compression, + tree.xmlOutputBuffer** c_buffer_ret, bint close): + cdef tree.xmlOutputBuffer* c_buffer + cdef _FilelikeWriter writer + cdef bytes filename8 + enchandler = tree.xmlFindCharEncodingHandler(c_enc) + if enchandler is NULL: + raise LookupError( + f"unknown encoding: '{c_enc.decode('UTF-8') if c_enc is not NULL else u''}'") + try: + f = _getFSPathOrObject(f) + if _isString(f): + filename8 = _encodeFilename(f) + if b'%' in filename8 and ( + # Exclude absolute Windows paths and file:// URLs. + _isFilePath(<const xmlChar*>filename8) not in (NO_FILE_PATH, ABS_WIN_FILE_PATH) + or filename8[:7].lower() == b'file://'): + # A file path (not a URL) containing the '%' URL escape character. + # libxml2 uses URL-unescaping on these, so escape the path before passing it in. + filename8 = filename8.replace(b'%', b'%25') + c_buffer = tree.xmlOutputBufferCreateFilename( + _cstr(filename8), enchandler, c_compression) + if c_buffer is NULL: + python.PyErr_SetFromErrno(IOError) # raises IOError + writer = None + elif hasattr(f, 'write'): + writer = _FilelikeWriter(f, compression=c_compression, close=close) + c_buffer = writer._createOutputBuffer(enchandler) + else: + raise TypeError( + f"File or filename expected, got '{python._fqtypename(f).decode('UTF-8')}'") + except: + tree.xmlCharEncCloseFunc(enchandler) + raise + c_buffer_ret[0] = c_buffer + return writer + +cdef xmlChar **_convert_ns_prefixes(tree.xmlDict* c_dict, ns_prefixes) except NULL: + cdef size_t i, num_ns_prefixes = len(ns_prefixes) + # Need to allocate one extra memory block to handle last NULL entry + c_ns_prefixes = <xmlChar **>python.lxml_malloc(num_ns_prefixes + 1, sizeof(xmlChar*)) + if not c_ns_prefixes: + raise MemoryError() + i = 0 + try: + for prefix in ns_prefixes: + prefix_utf = _utf8(prefix) + c_prefix = tree.xmlDictExists(c_dict, _xcstr(prefix_utf), len(prefix_utf)) + if c_prefix: + # unknown prefixes do not need to get serialised + c_ns_prefixes[i] = <xmlChar*>c_prefix + i += 1 + except: + python.lxml_free(c_ns_prefixes) + raise + + c_ns_prefixes[i] = NULL # append end marker + return c_ns_prefixes + +cdef _tofilelikeC14N(f, _Element element, bint exclusive, bint with_comments, + int compression, inclusive_ns_prefixes): + cdef _FilelikeWriter writer = None + cdef tree.xmlOutputBuffer* c_buffer + cdef xmlChar **c_inclusive_ns_prefixes = NULL + cdef char* c_filename + cdef xmlDoc* c_base_doc + cdef xmlDoc* c_doc + cdef int bytes_count, error = 0 + + c_base_doc = element._c_node.doc + c_doc = _fakeRootDoc(c_base_doc, element._c_node) + try: + c_inclusive_ns_prefixes = ( + _convert_ns_prefixes(c_doc.dict, inclusive_ns_prefixes) + if inclusive_ns_prefixes else NULL) + + f = _getFSPathOrObject(f) + if _isString(f): + filename8 = _encodeFilename(f) + c_filename = _cstr(filename8) + with nogil: + error = c14n.xmlC14NDocSave( + c_doc, NULL, exclusive, c_inclusive_ns_prefixes, + with_comments, c_filename, compression) + elif hasattr(f, 'write'): + writer = _FilelikeWriter(f, compression=compression) + c_buffer = writer._createOutputBuffer(NULL) + try: + with writer.error_log: + bytes_count = c14n.xmlC14NDocSaveTo( + c_doc, NULL, exclusive, c_inclusive_ns_prefixes, + with_comments, c_buffer) + finally: + error = tree.xmlOutputBufferClose(c_buffer) + if bytes_count < 0: + error = bytes_count + elif error != -1: + error = xmlerror.XML_ERR_OK + else: + raise TypeError(f"File or filename expected, got '{python._fqtypename(f).decode('UTF-8')}'") + finally: + _destroyFakeDoc(c_base_doc, c_doc) + if c_inclusive_ns_prefixes is not NULL: + python.lxml_free(c_inclusive_ns_prefixes) + + if writer is not None: + writer._exc_context._raise_if_stored() + + if error < 0: + message = "C14N failed" + if writer is not None: + errors = writer.error_log + if len(errors): + message = errors[0].message + raise C14NError(message) + + +# C14N 2.0 + +def canonicalize(xml_data=None, *, out=None, from_file=None, **options): + """Convert XML to its C14N 2.0 serialised form. + + If *out* is provided, it must be a file or file-like object that receives + the serialised canonical XML output (text, not bytes) through its ``.write()`` + method. To write to a file, open it in text mode with encoding "utf-8". + If *out* is not provided, this function returns the output as text string. + + Either *xml_data* (an XML string, tree or Element) or *file* + (a file path or file-like object) must be provided as input. + + The configuration options are the same as for the ``C14NWriterTarget``. + """ + if xml_data is None and from_file is None: + raise ValueError("Either 'xml_data' or 'from_file' must be provided as input") + + sio = None + if out is None: + sio = out = StringIO() + + target = C14NWriterTarget(out.write, **options) + + if xml_data is not None and not isinstance(xml_data, basestring): + _tree_to_target(xml_data, target) + return sio.getvalue() if sio is not None else None + + cdef _FeedParser parser = XMLParser( + target=target, + attribute_defaults=True, + collect_ids=False, + ) + + if xml_data is not None: + parser.feed(xml_data) + parser.close() + elif from_file is not None: + try: + _parseDocument(from_file, parser, base_url=None) + except _TargetParserResult: + pass + + return sio.getvalue() if sio is not None else None + + +cdef _tree_to_target(element, target): + for event, elem in iterwalk(element, events=('start', 'end', 'start-ns', 'comment', 'pi')): + text = None + if event == 'start': + target.start(elem.tag, elem.attrib) + text = elem.text + elif event == 'end': + target.end(elem.tag) + text = elem.tail + elif event == 'start-ns': + target.start_ns(*elem) + continue + elif event == 'comment': + target.comment(elem.text) + text = elem.tail + elif event == 'pi': + target.pi(elem.target, elem.text) + text = elem.tail + if text: + target.data(text) + return target.close() + + +cdef object _looks_like_prefix_name = re.compile(r'^\w+:\w+$', re.UNICODE).match + + +cdef class C14NWriterTarget: + """ + Canonicalization writer target for the XMLParser. + + Serialises parse events to XML C14N 2.0. + + Configuration options: + + - *with_comments*: set to true to include comments + - *strip_text*: set to true to strip whitespace before and after text content + - *rewrite_prefixes*: set to true to replace namespace prefixes by "n{number}" + - *qname_aware_tags*: a set of qname aware tag names in which prefixes + should be replaced in text content + - *qname_aware_attrs*: a set of qname aware attribute names in which prefixes + should be replaced in text content + - *exclude_attrs*: a set of attribute names that should not be serialised + - *exclude_tags*: a set of tag names that should not be serialised + """ + cdef object _write + cdef list _data + cdef set _qname_aware_tags + cdef object _find_qname_aware_attrs + cdef list _declared_ns_stack + cdef list _ns_stack + cdef dict _prefix_map + cdef list _preserve_space + cdef tuple _pending_start + cdef set _exclude_tags + cdef set _exclude_attrs + cdef Py_ssize_t _ignored_depth + cdef bint _with_comments + cdef bint _strip_text + cdef bint _rewrite_prefixes + cdef bint _root_seen + cdef bint _root_done + + def __init__(self, write, *, + with_comments=False, strip_text=False, rewrite_prefixes=False, + qname_aware_tags=None, qname_aware_attrs=None, + exclude_attrs=None, exclude_tags=None): + self._write = write + self._data = [] + self._with_comments = with_comments + self._strip_text = strip_text + self._exclude_attrs = set(exclude_attrs) if exclude_attrs else None + self._exclude_tags = set(exclude_tags) if exclude_tags else None + + self._rewrite_prefixes = rewrite_prefixes + if qname_aware_tags: + self._qname_aware_tags = set(qname_aware_tags) + else: + self._qname_aware_tags = None + if qname_aware_attrs: + self._find_qname_aware_attrs = set(qname_aware_attrs).intersection + else: + self._find_qname_aware_attrs = None + + # Stack with globally and newly declared namespaces as (uri, prefix) pairs. + self._declared_ns_stack = [[ + ("http://www.w3.org/XML/1998/namespace", "xml"), + ]] + # Stack with user declared namespace prefixes as (uri, prefix) pairs. + self._ns_stack = [] + if not rewrite_prefixes: + self._ns_stack.append(_DEFAULT_NAMESPACE_PREFIXES_ITEMS) + self._ns_stack.append([]) + self._prefix_map = {} + self._preserve_space = [False] + self._pending_start = None + self._ignored_depth = 0 + self._root_seen = False + self._root_done = False + + def _iter_namespaces(self, ns_stack): + for namespaces in reversed(ns_stack): + if namespaces: # almost no element declares new namespaces + yield from namespaces + + cdef _resolve_prefix_name(self, prefixed_name): + prefix, name = prefixed_name.split(':', 1) + for uri, p in self._iter_namespaces(self._ns_stack): + if p == prefix: + return f'{{{uri}}}{name}' + raise ValueError(f'Prefix {prefix} of QName "{prefixed_name}" is not declared in scope') + + cdef _qname(self, qname, uri=None): + if uri is None: + uri, tag = qname[1:].rsplit('}', 1) if qname[:1] == '{' else ('', qname) + else: + tag = qname + + prefixes_seen = set() + for u, prefix in self._iter_namespaces(self._declared_ns_stack): + if u == uri and prefix not in prefixes_seen: + return f'{prefix}:{tag}' if prefix else tag, tag, uri + prefixes_seen.add(prefix) + + # Not declared yet => add new declaration. + if self._rewrite_prefixes: + if uri in self._prefix_map: + prefix = self._prefix_map[uri] + else: + prefix = self._prefix_map[uri] = f'n{len(self._prefix_map)}' + self._declared_ns_stack[-1].append((uri, prefix)) + return f'{prefix}:{tag}', tag, uri + + if not uri and '' not in prefixes_seen: + # No default namespace declared => no prefix needed. + return tag, tag, uri + + for u, prefix in self._iter_namespaces(self._ns_stack): + if u == uri: + self._declared_ns_stack[-1].append((uri, prefix)) + return f'{prefix}:{tag}' if prefix else tag, tag, uri + + if not uri: + # As soon as a default namespace is defined, + # anything that has no namespace (and thus, no prefix) goes there. + return tag, tag, uri + + raise ValueError(f'Namespace "{uri}" of name "{tag}" is not declared in scope') + + def data(self, data): + if not self._ignored_depth: + self._data.append(data) + + cdef _flush(self): + cdef unicode data = ''.join(self._data) + del self._data[:] + if self._strip_text and not self._preserve_space[-1]: + data = data.strip() + if self._pending_start is not None: + (tag, attrs, new_namespaces), self._pending_start = self._pending_start, None + qname_text = data if ':' in data and _looks_like_prefix_name(data) else None + self._start(tag, attrs, new_namespaces, qname_text) + if qname_text is not None: + return + if data and self._root_seen: + self._write(_escape_cdata_c14n(data)) + + def start_ns(self, prefix, uri): + if self._ignored_depth: + return + # we may have to resolve qnames in text content + if self._data: + self._flush() + self._ns_stack[-1].append((uri, prefix)) + + def start(self, tag, attrs): + if self._exclude_tags is not None and ( + self._ignored_depth or tag in self._exclude_tags): + self._ignored_depth += 1 + return + if self._data: + self._flush() + + new_namespaces = [] + self._declared_ns_stack.append(new_namespaces) + + if self._qname_aware_tags is not None and tag in self._qname_aware_tags: + # Need to parse text first to see if it requires a prefix declaration. + self._pending_start = (tag, attrs, new_namespaces) + return + self._start(tag, attrs, new_namespaces) + + cdef _start(self, tag, attrs, new_namespaces, qname_text=None): + if self._exclude_attrs is not None and attrs: + attrs = {k: v for k, v in attrs.items() if k not in self._exclude_attrs} + + qnames = {tag, *attrs} + resolved_names = {} + + # Resolve prefixes in attribute and tag text. + if qname_text is not None: + qname = resolved_names[qname_text] = self._resolve_prefix_name(qname_text) + qnames.add(qname) + if self._find_qname_aware_attrs is not None and attrs: + qattrs = self._find_qname_aware_attrs(attrs) + if qattrs: + for attr_name in qattrs: + value = attrs[attr_name] + if _looks_like_prefix_name(value): + qname = resolved_names[value] = self._resolve_prefix_name(value) + qnames.add(qname) + else: + qattrs = None + else: + qattrs = None + + # Assign prefixes in lexicographical order of used URIs. + parsed_qnames = {n: self._qname(n) for n in sorted( + qnames, key=lambda n: n.split('}', 1))} + + # Write namespace declarations in prefix order ... + if new_namespaces: + attr_list = [ + ('xmlns:' + prefix if prefix else 'xmlns', uri) + for uri, prefix in new_namespaces + ] + attr_list.sort() + else: + # almost always empty + attr_list = [] + + # ... followed by attributes in URI+name order + if attrs: + for k, v in sorted(attrs.items()): + if qattrs is not None and k in qattrs and v in resolved_names: + v = parsed_qnames[resolved_names[v]][0] + attr_qname, attr_name, uri = parsed_qnames[k] + # No prefix for attributes in default ('') namespace. + attr_list.append((attr_qname if uri else attr_name, v)) + + # Honour xml:space attributes. + space_behaviour = attrs.get('{http://www.w3.org/XML/1998/namespace}space') + self._preserve_space.append( + space_behaviour == 'preserve' if space_behaviour + else self._preserve_space[-1]) + + # Write the tag. + write = self._write + write('<' + parsed_qnames[tag][0]) + if attr_list: + write(''.join([f' {k}="{_escape_attrib_c14n(v)}"' for k, v in attr_list])) + write('>') + + # Write the resolved qname text content. + if qname_text is not None: + write(_escape_cdata_c14n(parsed_qnames[resolved_names[qname_text]][0])) + + self._root_seen = True + self._ns_stack.append([]) + + def end(self, tag): + if self._ignored_depth: + self._ignored_depth -= 1 + return + if self._data: + self._flush() + self._write(f'</{self._qname(tag)[0]}>') + self._preserve_space.pop() + self._root_done = len(self._preserve_space) == 1 + self._declared_ns_stack.pop() + self._ns_stack.pop() + + def comment(self, text): + if not self._with_comments: + return + if self._ignored_depth: + return + if self._root_done: + self._write('\n') + elif self._root_seen and self._data: + self._flush() + self._write(f'<!--{_escape_cdata_c14n(text)}-->') + if not self._root_seen: + self._write('\n') + + def pi(self, target, data): + if self._ignored_depth: + return + if self._root_done: + self._write('\n') + elif self._root_seen and self._data: + self._flush() + self._write( + f'<?{target} {_escape_cdata_c14n(data)}?>' if data else f'<?{target}?>') + if not self._root_seen: + self._write('\n') + + def close(self): + return None + + +cdef _raise_serialization_error(text): + raise TypeError("cannot serialize %r (type %s)" % (text, type(text).__name__)) + + +cdef unicode _escape_cdata_c14n(stext): + # escape character data + cdef unicode text + cdef Py_UCS4 ch + cdef Py_ssize_t start = 0, pos = 0 + cdef list substrings = None + try: + text = unicode(stext) + except (TypeError, AttributeError): + return _raise_serialization_error(stext) + + for pos, ch in enumerate(text): + if ch == '&': + escape = '&' + elif ch == '<': + escape = '<' + elif ch == '>': + escape = '>' + elif ch == '\r': + escape = '
' + else: + continue + + if substrings is None: + substrings = [] + if pos > start: + substrings.append(text[start:pos]) + substrings.append(escape) + start = pos + 1 + + if substrings is None: + return text + if pos >= start: + substrings.append(text[start:pos+1]) + return ''.join(substrings) + + +cdef unicode _escape_attrib_c14n(stext): + # escape attribute value + cdef unicode text + cdef Py_UCS4 ch + cdef Py_ssize_t start = 0, pos = 0 + cdef list substrings = None + try: + text = unicode(stext) + except (TypeError, AttributeError): + return _raise_serialization_error(stext) + + for pos, ch in enumerate(text): + if ch == '&': + escape = '&' + elif ch == '<': + escape = '<' + elif ch == '"': + escape = '"' + elif ch == '\t': + escape = '	' + elif ch == '\n': + escape = '
' + elif ch == '\r': + escape = '
' + else: + continue + + if substrings is None: + substrings = [] + if pos > start: + substrings.append(text[start:pos]) + substrings.append(escape) + start = pos + 1 + + if substrings is None: + return text + if pos >= start: + substrings.append(text[start:pos+1]) + return ''.join(substrings) + + +# incremental serialisation + +cdef class xmlfile: + """xmlfile(self, output_file, encoding=None, compression=None, close=False, buffered=True) + + A simple mechanism for incremental XML serialisation. + + Usage example:: + + with xmlfile("somefile.xml", encoding='utf-8') as xf: + xf.write_declaration(standalone=True) + xf.write_doctype('<!DOCTYPE root SYSTEM "some.dtd">') + + # generate an element (the root element) + with xf.element('root'): + # write a complete Element into the open root element + xf.write(etree.Element('test')) + + # generate and write more Elements, e.g. through iterparse + for element in generate_some_elements(): + # serialise generated elements into the XML file + xf.write(element) + + # or write multiple Elements or strings at once + xf.write(etree.Element('start'), "text", etree.Element('end')) + + If 'output_file' is a file(-like) object, passing ``close=True`` will + close it when exiting the context manager. By default, it is left + to the owner to do that. When a file path is used, lxml will take care + of opening and closing the file itself. Also, when a compression level + is set, lxml will deliberately close the file to make sure all data gets + compressed and written. + + Setting ``buffered=False`` will flush the output after each operation, + such as opening or closing an ``xf.element()`` block or calling + ``xf.write()``. Alternatively, calling ``xf.flush()`` can be used to + explicitly flush any pending output when buffering is enabled. + """ + cdef object output_file + cdef bytes encoding + cdef _IncrementalFileWriter writer + cdef _AsyncIncrementalFileWriter async_writer + cdef int compresslevel + cdef bint close + cdef bint buffered + cdef int method + + def __init__(self, output_file not None, encoding=None, compression=None, + close=False, buffered=True): + self.output_file = output_file + self.encoding = _utf8orNone(encoding) + self.compresslevel = compression or 0 + self.close = close + self.buffered = buffered + self.method = OUTPUT_METHOD_XML + + def __enter__(self): + assert self.output_file is not None + self.writer = _IncrementalFileWriter( + self.output_file, self.encoding, self.compresslevel, + self.close, self.buffered, self.method) + return self.writer + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.writer is not None: + old_writer, self.writer = self.writer, None + raise_on_error = exc_type is None + old_writer._close(raise_on_error) + if self.close: + self.output_file = None + + async def __aenter__(self): + assert self.output_file is not None + if isinstance(self.output_file, basestring): + raise TypeError("Cannot asynchronously write to a plain file") + if not hasattr(self.output_file, 'write'): + raise TypeError("Output file needs an async .write() method") + self.async_writer = _AsyncIncrementalFileWriter( + self.output_file, self.encoding, self.compresslevel, + self.close, self.buffered, self.method) + return self.async_writer + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self.async_writer is not None: + old_writer, self.async_writer = self.async_writer, None + raise_on_error = exc_type is None + await old_writer._close(raise_on_error) + if self.close: + self.output_file = None + + +cdef class htmlfile(xmlfile): + """htmlfile(self, output_file, encoding=None, compression=None, close=False, buffered=True) + + A simple mechanism for incremental HTML serialisation. Works the same as + xmlfile. + """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.method = OUTPUT_METHOD_HTML + + +cdef enum _IncrementalFileWriterStatus: + WRITER_STARTING = 0 + WRITER_DECL_WRITTEN = 1 + WRITER_DTD_WRITTEN = 2 + WRITER_IN_ELEMENT = 3 + WRITER_FINISHED = 4 + + +@cython.final +@cython.internal +cdef class _IncrementalFileWriter: + cdef tree.xmlOutputBuffer* _c_out + cdef bytes _encoding + cdef const_char* _c_encoding + cdef _FilelikeWriter _target + cdef list _element_stack + cdef int _status + cdef int _method + cdef bint _buffered + + def __cinit__(self, outfile, bytes encoding, int compresslevel, bint close, + bint buffered, int method): + self._status = WRITER_STARTING + self._element_stack = [] + if encoding is None: + # We always need a document encoding to make the attribute serialisation + # of libxml2 identical to ours. + encoding = b'ASCII' + self._encoding = encoding + self._c_encoding = _cstr(encoding) + self._buffered = buffered + self._target = _create_output_buffer( + outfile, self._c_encoding, compresslevel, &self._c_out, close) + self._method = method + + def __dealloc__(self): + if self._c_out is not NULL: + tree.xmlOutputBufferClose(self._c_out) + + def write_declaration(self, version=None, standalone=None, doctype=None): + """write_declaration(self, version=None, standalone=None, doctype=None) + + Write an XML declaration and (optionally) a doctype into the file. + """ + assert self._c_out is not NULL + cdef const_xmlChar* c_version + cdef int c_standalone + if self._method != OUTPUT_METHOD_XML: + raise LxmlSyntaxError("only XML documents have declarations") + if self._status >= WRITER_DECL_WRITTEN: + raise LxmlSyntaxError("XML declaration already written") + version = _utf8orNone(version) + c_version = _xcstr(version) if version is not None else NULL + doctype = _utf8orNone(doctype) + if standalone is None: + c_standalone = -1 + else: + c_standalone = 1 if standalone else 0 + _writeDeclarationToBuffer(self._c_out, c_version, self._c_encoding, c_standalone) + if doctype is not None: + _writeDoctype(self._c_out, _xcstr(doctype)) + self._status = WRITER_DTD_WRITTEN + else: + self._status = WRITER_DECL_WRITTEN + if not self._buffered: + tree.xmlOutputBufferFlush(self._c_out) + self._handle_error(self._c_out.error) + + def write_doctype(self, doctype): + """write_doctype(self, doctype) + + Writes the given doctype declaration verbatimly into the file. + """ + assert self._c_out is not NULL + if doctype is None: + return + if self._status >= WRITER_DTD_WRITTEN: + raise LxmlSyntaxError("DOCTYPE already written or cannot write it here") + doctype = _utf8(doctype) + _writeDoctype(self._c_out, _xcstr(doctype)) + self._status = WRITER_DTD_WRITTEN + if not self._buffered: + tree.xmlOutputBufferFlush(self._c_out) + self._handle_error(self._c_out.error) + + def method(self, method): + """method(self, method) + + Returns a context manager that overrides and restores the output method. + method is one of (None, 'xml', 'html') where None means 'xml'. + """ + assert self._c_out is not NULL + c_method = self._method if method is None else _findOutputMethod(method) + return _MethodChanger(self, c_method) + + def element(self, tag, attrib=None, nsmap=None, method=None, **_extra): + """element(self, tag, attrib=None, nsmap=None, method, **_extra) + + Returns a context manager that writes an opening and closing tag. + method is one of (None, 'xml', 'html') where None means 'xml'. + """ + assert self._c_out is not NULL + attributes = [] + if attrib is not None: + for name, value in _iter_attrib(attrib): + if name not in _extra: + ns, name = _getNsTag(name) + attributes.append((ns, name, _utf8(value))) + if _extra: + for name, value in _extra.iteritems(): + ns, name = _getNsTag(name) + attributes.append((ns, name, _utf8(value))) + reversed_nsmap = {} + if nsmap: + for prefix, ns in nsmap.items(): + if prefix is not None: + prefix = _utf8(prefix) + _prefixValidOrRaise(prefix) + reversed_nsmap[_utf8(ns)] = prefix + ns, name = _getNsTag(tag) + + c_method = self._method if method is None else _findOutputMethod(method) + + return _FileWriterElement(self, (ns, name, attributes, reversed_nsmap), c_method) + + cdef _write_qname(self, bytes name, bytes prefix): + if prefix: # empty bytes for no prefix (not None to allow sorting) + tree.xmlOutputBufferWrite(self._c_out, len(prefix), _cstr(prefix)) + tree.xmlOutputBufferWrite(self._c_out, 1, ':') + tree.xmlOutputBufferWrite(self._c_out, len(name), _cstr(name)) + + cdef _write_start_element(self, element_config): + if self._status > WRITER_IN_ELEMENT: + raise LxmlSyntaxError("cannot append trailing element to complete XML document") + ns, name, attributes, nsmap = element_config + flat_namespace_map, new_namespaces = self._collect_namespaces(nsmap) + prefix = self._find_prefix(ns, flat_namespace_map, new_namespaces) + tree.xmlOutputBufferWrite(self._c_out, 1, '<') + self._write_qname(name, prefix) + + self._write_attributes_and_namespaces( + attributes, flat_namespace_map, new_namespaces) + + tree.xmlOutputBufferWrite(self._c_out, 1, '>') + if not self._buffered: + tree.xmlOutputBufferFlush(self._c_out) + self._handle_error(self._c_out.error) + + self._element_stack.append((ns, name, prefix, flat_namespace_map)) + self._status = WRITER_IN_ELEMENT + + cdef _write_attributes_and_namespaces(self, list attributes, + dict flat_namespace_map, + list new_namespaces): + if attributes: + # _find_prefix() may append to new_namespaces => build them first + attributes = [ + (self._find_prefix(ns, flat_namespace_map, new_namespaces), name, value) + for ns, name, value in attributes ] + if new_namespaces: + new_namespaces.sort() + self._write_attributes_list(new_namespaces) + if attributes: + self._write_attributes_list(attributes) + + cdef _write_attributes_list(self, list attributes): + for prefix, name, value in attributes: + tree.xmlOutputBufferWrite(self._c_out, 1, ' ') + self._write_qname(name, prefix) + tree.xmlOutputBufferWrite(self._c_out, 2, '="') + _write_attr_string(self._c_out, _cstr(value)) + + tree.xmlOutputBufferWrite(self._c_out, 1, '"') + + cdef _write_end_element(self, element_config): + if self._status != WRITER_IN_ELEMENT: + raise LxmlSyntaxError("not in an element") + if not self._element_stack or self._element_stack[-1][:2] != element_config[:2]: + raise LxmlSyntaxError("inconsistent exit action in context manager") + + # If previous write operations failed, the context manager exit might still call us. + # That is ok, but we stop writing closing tags and handling errors in that case. + # For all non-I/O errors, we continue writing closing tags if we can. + ok_to_write = self._c_out.error == xmlerror.XML_ERR_OK + + name, prefix = self._element_stack.pop()[1:3] + if ok_to_write: + tree.xmlOutputBufferWrite(self._c_out, 2, '</') + self._write_qname(name, prefix) + tree.xmlOutputBufferWrite(self._c_out, 1, '>') + + if not self._element_stack: + self._status = WRITER_FINISHED + if ok_to_write: + if not self._buffered: + tree.xmlOutputBufferFlush(self._c_out) + self._handle_error(self._c_out.error) + + cdef _find_prefix(self, bytes href, dict flat_namespaces_map, list new_namespaces): + if href is None: + return None + if href in flat_namespaces_map: + return flat_namespaces_map[href] + # need to create a new prefix + prefixes = flat_namespaces_map.values() + i = 0 + while True: + prefix = _utf8('ns%d' % i) + if prefix not in prefixes: + new_namespaces.append((b'xmlns', prefix, href)) + flat_namespaces_map[href] = prefix + return prefix + i += 1 + + cdef _collect_namespaces(self, dict nsmap): + new_namespaces = [] + flat_namespaces_map = {} + for ns, prefix in nsmap.iteritems(): + flat_namespaces_map[ns] = prefix + if prefix is None: + # use empty bytes rather than None to allow sorting + new_namespaces.append((b'', b'xmlns', ns)) + else: + new_namespaces.append((b'xmlns', prefix, ns)) + # merge in flat namespace map of parent + if self._element_stack: + for ns, prefix in (<dict>self._element_stack[-1][-1]).iteritems(): + if flat_namespaces_map.get(ns) is None: + # unknown or empty prefix => prefer a 'real' prefix + flat_namespaces_map[ns] = prefix + return flat_namespaces_map, new_namespaces + + def write(self, *args, bint with_tail=True, bint pretty_print=False, method=None): + """write(self, *args, with_tail=True, pretty_print=False, method=None) + + Write subtrees or strings into the file. + + If method is not None, it should be one of ('html', 'xml', 'text') + to temporarily override the output method. + """ + assert self._c_out is not NULL + c_method = self._method if method is None else _findOutputMethod(method) + + for content in args: + if _isString(content): + if self._status != WRITER_IN_ELEMENT: + if self._status > WRITER_IN_ELEMENT or content.strip(): + raise LxmlSyntaxError("not in an element") + bstring = _utf8(content) + if not bstring: + continue + + ns, name, _, _ = self._element_stack[-1] + if (c_method == OUTPUT_METHOD_HTML and + ns in (None, b'http://www.w3.org/1999/xhtml') and + name in (b'script', b'style')): + tree.xmlOutputBufferWrite(self._c_out, len(bstring), _cstr(bstring)) + + else: + tree.xmlOutputBufferWriteEscape(self._c_out, _xcstr(bstring), NULL) + + elif iselement(content): + if self._status > WRITER_IN_ELEMENT: + raise LxmlSyntaxError("cannot append trailing element to complete XML document") + _writeNodeToBuffer(self._c_out, (<_Element>content)._c_node, + self._c_encoding, NULL, c_method, + False, False, pretty_print, with_tail, False) + if (<_Element>content)._c_node.type == tree.XML_ELEMENT_NODE: + if not self._element_stack: + self._status = WRITER_FINISHED + + elif content is not None: + raise TypeError( + f"got invalid input value of type {type(content)}, expected string or Element") + self._handle_error(self._c_out.error) + if not self._buffered: + tree.xmlOutputBufferFlush(self._c_out) + self._handle_error(self._c_out.error) + + def flush(self): + """flush(self) + + Write any pending content of the current output buffer to the stream. + """ + assert self._c_out is not NULL + tree.xmlOutputBufferFlush(self._c_out) + self._handle_error(self._c_out.error) + + cdef _close(self, bint raise_on_error): + if raise_on_error: + if self._status < WRITER_IN_ELEMENT: + raise LxmlSyntaxError("no content written") + if self._element_stack: + raise LxmlSyntaxError("pending open tags on close") + error_result = self._c_out.error + if error_result == xmlerror.XML_ERR_OK: + error_result = tree.xmlOutputBufferClose(self._c_out) + if error_result != -1: + error_result = xmlerror.XML_ERR_OK + else: + tree.xmlOutputBufferClose(self._c_out) + self._status = WRITER_FINISHED + self._c_out = NULL + del self._element_stack[:] + if raise_on_error: + self._handle_error(error_result) + + cdef _handle_error(self, int error_result): + if error_result != xmlerror.XML_ERR_OK: + if self._target is not None: + self._target._exc_context._raise_if_stored() + _raiseSerialisationError(error_result) + + +@cython.final +@cython.internal +cdef class _AsyncDataWriter: + cdef list _data + def __cinit__(self): + self._data = [] + + cdef bytes collect(self): + data = b''.join(self._data) + del self._data[:] + return data + + def write(self, data): + self._data.append(data) + + def close(self): + pass + + +@cython.final +@cython.internal +cdef class _AsyncIncrementalFileWriter: + cdef _IncrementalFileWriter _writer + cdef _AsyncDataWriter _buffer + cdef object _async_outfile + cdef int _flush_after_writes + cdef bint _should_close + cdef bint _buffered + + def __cinit__(self, async_outfile, bytes encoding, int compresslevel, bint close, + bint buffered, int method): + self._flush_after_writes = 20 + self._async_outfile = async_outfile + self._should_close = close + self._buffered = buffered + self._buffer = _AsyncDataWriter() + self._writer = _IncrementalFileWriter( + self._buffer, encoding, compresslevel, close=True, buffered=False, method=method) + + cdef bytes _flush(self): + if not self._buffered or len(self._buffer._data) > self._flush_after_writes: + return self._buffer.collect() + return None + + async def flush(self): + self._writer.flush() + data = self._buffer.collect() + if data: + await self._async_outfile.write(data) + + async def write_declaration(self, version=None, standalone=None, doctype=None): + self._writer.write_declaration(version, standalone, doctype) + data = self._flush() + if data: + await self._async_outfile.write(data) + + async def write_doctype(self, doctype): + self._writer.write_doctype(doctype) + data = self._flush() + if data: + await self._async_outfile.write(data) + + async def write(self, *args, with_tail=True, pretty_print=False, method=None): + self._writer.write(*args, with_tail=with_tail, pretty_print=pretty_print, method=method) + data = self._flush() + if data: + await self._async_outfile.write(data) + + def method(self, method): + return self._writer.method(method) + + def element(self, tag, attrib=None, nsmap=None, method=None, **_extra): + element_writer = self._writer.element(tag, attrib, nsmap, method, **_extra) + return _AsyncFileWriterElement(element_writer, self) + + async def _close(self, bint raise_on_error): + self._writer._close(raise_on_error) + data = self._buffer.collect() + if data: + await self._async_outfile.write(data) + if self._should_close: + await self._async_outfile.close() + + +@cython.final +@cython.internal +cdef class _AsyncFileWriterElement: + cdef _FileWriterElement _element_writer + cdef _AsyncIncrementalFileWriter _writer + + def __cinit__(self, _FileWriterElement element_writer not None, + _AsyncIncrementalFileWriter writer not None): + self._element_writer = element_writer + self._writer = writer + + async def __aenter__(self): + self._element_writer.__enter__() + data = self._writer._flush() + if data: + await self._writer._async_outfile.write(data) + + async def __aexit__(self, *args): + self._element_writer.__exit__(*args) + data = self._writer._flush() + if data: + await self._writer._async_outfile.write(data) + + +@cython.final +@cython.internal +@cython.freelist(8) +cdef class _FileWriterElement: + cdef _IncrementalFileWriter _writer + cdef object _element + cdef int _new_method + cdef int _old_method + + def __cinit__(self, _IncrementalFileWriter writer not None, element_config, int method): + self._writer = writer + self._element = element_config + self._new_method = method + self._old_method = writer._method + + def __enter__(self): + self._writer._method = self._new_method + self._writer._write_start_element(self._element) + + def __exit__(self, exc_type, exc_val, exc_tb): + self._writer._write_end_element(self._element) + self._writer._method = self._old_method + + +@cython.final +@cython.internal +@cython.freelist(8) +cdef class _MethodChanger: + cdef _IncrementalFileWriter _writer + cdef int _new_method + cdef int _old_method + cdef bint _entered + cdef bint _exited + + def __cinit__(self, _IncrementalFileWriter writer not None, int method): + self._writer = writer + self._new_method = method + self._old_method = writer._method + self._entered = False + self._exited = False + + def __enter__(self): + if self._entered: + raise LxmlSyntaxError("Inconsistent enter action in context manager") + self._writer._method = self._new_method + self._entered = True + + def __exit__(self, exc_type, exc_val, exc_tb): + if self._exited: + raise LxmlSyntaxError("Inconsistent exit action in context manager") + if self._writer._method != self._new_method: + raise LxmlSyntaxError("Method changed outside of context manager") + self._writer._method = self._old_method + self._exited = True + + async def __aenter__(self): + # for your async convenience + return self.__enter__() + + async def __aexit__(self, *args): + # for your async convenience + return self.__exit__(*args) |