aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/lxml/serializer.pxi
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/lxml/serializer.pxi
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/lxml/serializer.pxi')
-rw-r--r--.venv/lib/python3.12/site-packages/lxml/serializer.pxi1781
1 files changed, 1781 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/lxml/serializer.pxi b/.venv/lib/python3.12/site-packages/lxml/serializer.pxi
new file mode 100644
index 00000000..f0de0f9f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/lxml/serializer.pxi
@@ -0,0 +1,1781 @@
+# XML serialization and output functions
+
+cdef object GzipFile
+from gzip import GzipFile
+
+
+cdef class SerialisationError(LxmlError):
+ """A libxml2 error that occurred during serialisation.
+ """
+
+
+cdef enum _OutputMethods:
+ OUTPUT_METHOD_XML
+ OUTPUT_METHOD_HTML
+ OUTPUT_METHOD_TEXT
+
+
+cdef int _findOutputMethod(method) except -1:
+ if method is None:
+ return OUTPUT_METHOD_XML
+ method = method.lower()
+ if method == "xml":
+ return OUTPUT_METHOD_XML
+ if method == "html":
+ return OUTPUT_METHOD_HTML
+ if method == "text":
+ return OUTPUT_METHOD_TEXT
+ raise ValueError(f"unknown output method {method!r}")
+
+
+cdef _textToString(xmlNode* c_node, encoding, bint with_tail):
+ cdef bint needs_conversion
+ cdef const_xmlChar* c_text
+ cdef xmlNode* c_text_node
+ cdef tree.xmlBuffer* c_buffer
+ cdef int error_result
+
+ c_buffer = tree.xmlBufferCreate()
+ if c_buffer is NULL:
+ raise MemoryError()
+
+ with nogil:
+ error_result = tree.xmlNodeBufGetContent(c_buffer, c_node)
+ if with_tail:
+ c_text_node = _textNodeOrSkip(c_node.next)
+ while c_text_node is not NULL:
+ tree.xmlBufferWriteChar(c_buffer, <const_char*>c_text_node.content)
+ c_text_node = _textNodeOrSkip(c_text_node.next)
+ c_text = tree.xmlBufferContent(c_buffer)
+
+ if error_result < 0 or c_text is NULL:
+ tree.xmlBufferFree(c_buffer)
+ raise SerialisationError, "Error during serialisation (out of memory?)"
+
+ try:
+ needs_conversion = 0
+ if encoding is unicode:
+ needs_conversion = 1
+ elif encoding is not None:
+ # Python prefers lower case encoding names
+ encoding = encoding.lower()
+ if encoding not in ('utf8', 'utf-8'):
+ if encoding == 'ascii':
+ if isutf8l(c_text, tree.xmlBufferLength(c_buffer)):
+ # will raise a decode error below
+ needs_conversion = 1
+ else:
+ needs_conversion = 1
+
+ if needs_conversion:
+ text = (<const_char*>c_text)[:tree.xmlBufferLength(c_buffer)].decode('utf8')
+ if encoding is not unicode:
+ encoding = _utf8(encoding)
+ text = python.PyUnicode_AsEncodedString(
+ text, encoding, 'strict')
+ else:
+ text = (<unsigned char*>c_text)[:tree.xmlBufferLength(c_buffer)]
+ finally:
+ tree.xmlBufferFree(c_buffer)
+ return text
+
+
+cdef _tostring(_Element element, encoding, doctype, method,
+ bint write_xml_declaration, bint write_complete_document,
+ bint pretty_print, bint with_tail, int standalone):
+ """Serialize an element to an encoded string representation of its XML
+ tree.
+ """
+ cdef tree.xmlOutputBuffer* c_buffer
+ cdef tree.xmlBuf* c_result_buffer
+ cdef tree.xmlCharEncodingHandler* enchandler
+ cdef const_char* c_enc
+ cdef const_xmlChar* c_version
+ cdef const_xmlChar* c_doctype
+ cdef int c_method
+ cdef int error_result
+ if element is None:
+ return None
+ _assertValidNode(element)
+ c_method = _findOutputMethod(method)
+ if c_method == OUTPUT_METHOD_TEXT:
+ return _textToString(element._c_node, encoding, with_tail)
+ if encoding is None or encoding is unicode:
+ c_enc = NULL
+ else:
+ encoding = _utf8(encoding)
+ c_enc = _cstr(encoding)
+ if doctype is None:
+ c_doctype = NULL
+ else:
+ doctype = _utf8(doctype)
+ c_doctype = _xcstr(doctype)
+ # it is necessary to *and* find the encoding handler *and* use
+ # encoding during output
+ enchandler = tree.xmlFindCharEncodingHandler(c_enc)
+ if enchandler is NULL and c_enc is not NULL:
+ if encoding is not None:
+ encoding = encoding.decode('UTF-8')
+ raise LookupError, f"unknown encoding: '{encoding}'"
+ c_buffer = tree.xmlAllocOutputBuffer(enchandler)
+ if c_buffer is NULL:
+ tree.xmlCharEncCloseFunc(enchandler)
+ raise MemoryError()
+
+ with nogil:
+ _writeNodeToBuffer(c_buffer, element._c_node, c_enc, c_doctype, c_method,
+ write_xml_declaration, write_complete_document,
+ pretty_print, with_tail, standalone)
+ tree.xmlOutputBufferFlush(c_buffer)
+ if c_buffer.conv is not NULL:
+ c_result_buffer = c_buffer.conv
+ else:
+ c_result_buffer = c_buffer.buffer
+
+ error_result = c_buffer.error
+ if error_result != xmlerror.XML_ERR_OK:
+ tree.xmlOutputBufferClose(c_buffer)
+ _raiseSerialisationError(error_result)
+
+ try:
+ if encoding is unicode:
+ result = (<unsigned char*>tree.xmlBufContent(
+ c_result_buffer))[:tree.xmlBufUse(c_result_buffer)].decode('UTF-8')
+ else:
+ result = <bytes>(<unsigned char*>tree.xmlBufContent(
+ c_result_buffer))[:tree.xmlBufUse(c_result_buffer)]
+ finally:
+ error_result = tree.xmlOutputBufferClose(c_buffer)
+ if error_result == -1:
+ _raiseSerialisationError(error_result)
+ return result
+
+cdef bytes _tostringC14N(element_or_tree, bint exclusive, bint with_comments, inclusive_ns_prefixes):
+ cdef xmlDoc* c_doc
+ cdef xmlChar* c_buffer = NULL
+ cdef int byte_count = -1
+ cdef bytes result
+ cdef _Document doc
+ cdef _Element element
+ cdef xmlChar **c_inclusive_ns_prefixes
+
+ if isinstance(element_or_tree, _Element):
+ _assertValidNode(<_Element>element_or_tree)
+ doc = (<_Element>element_or_tree)._doc
+ c_doc = _plainFakeRootDoc(doc._c_doc, (<_Element>element_or_tree)._c_node, 0)
+ else:
+ doc = _documentOrRaise(element_or_tree)
+ _assertValidDoc(doc)
+ c_doc = doc._c_doc
+
+ c_inclusive_ns_prefixes = _convert_ns_prefixes(c_doc.dict, inclusive_ns_prefixes) if inclusive_ns_prefixes else NULL
+ try:
+ with nogil:
+ byte_count = c14n.xmlC14NDocDumpMemory(
+ c_doc, NULL, exclusive, c_inclusive_ns_prefixes, with_comments, &c_buffer)
+
+ finally:
+ _destroyFakeDoc(doc._c_doc, c_doc)
+ if c_inclusive_ns_prefixes is not NULL:
+ python.lxml_free(c_inclusive_ns_prefixes)
+
+ if byte_count < 0 or c_buffer is NULL:
+ if c_buffer is not NULL:
+ tree.xmlFree(c_buffer)
+ raise C14NError, "C14N failed"
+ try:
+ result = c_buffer[:byte_count]
+ finally:
+ tree.xmlFree(c_buffer)
+ return result
+
+cdef _raiseSerialisationError(int error_result):
+ if error_result == xmlerror.XML_ERR_NO_MEMORY:
+ raise MemoryError()
+ message = ErrorTypes._getName(error_result)
+ if message is None:
+ message = f"unknown error {error_result}"
+ raise SerialisationError, message
+
+############################################################
+# low-level serialisation functions
+
+cdef void _writeDoctype(tree.xmlOutputBuffer* c_buffer,
+ const_xmlChar* c_doctype) noexcept nogil:
+ tree.xmlOutputBufferWrite(c_buffer, tree.xmlStrlen(c_doctype),
+ <const_char*>c_doctype)
+ tree.xmlOutputBufferWriteString(c_buffer, "\n")
+
+cdef void _writeNodeToBuffer(tree.xmlOutputBuffer* c_buffer,
+ xmlNode* c_node, const_char* encoding, const_xmlChar* c_doctype,
+ int c_method, bint write_xml_declaration,
+ bint write_complete_document,
+ bint pretty_print, bint with_tail,
+ int standalone) noexcept nogil:
+ cdef xmlNode* c_nsdecl_node
+ cdef xmlDoc* c_doc = c_node.doc
+ if write_xml_declaration and c_method == OUTPUT_METHOD_XML:
+ _writeDeclarationToBuffer(c_buffer, c_doc.version, encoding, standalone)
+
+ # comments/processing instructions before doctype declaration
+ if write_complete_document and not c_buffer.error and c_doc.intSubset:
+ _writePrevSiblings(c_buffer, <xmlNode*>c_doc.intSubset, encoding, pretty_print)
+
+ if c_doctype:
+ _writeDoctype(c_buffer, c_doctype)
+ # write internal DTD subset, preceding PIs/comments, etc.
+ if write_complete_document and not c_buffer.error:
+ if c_doctype is NULL:
+ _writeDtdToBuffer(c_buffer, c_doc, c_node.name, c_method, encoding)
+ _writePrevSiblings(c_buffer, c_node, encoding, pretty_print)
+
+ c_nsdecl_node = c_node
+ if not c_node.parent or c_node.parent.type != tree.XML_DOCUMENT_NODE:
+ # copy the node and add namespaces from parents
+ # this is required to make libxml write them
+ c_nsdecl_node = tree.xmlCopyNode(c_node, 2)
+ if not c_nsdecl_node:
+ c_buffer.error = xmlerror.XML_ERR_NO_MEMORY
+ return
+ _copyParentNamespaces(c_node, c_nsdecl_node)
+
+ c_nsdecl_node.parent = c_node.parent
+ c_nsdecl_node.children = c_node.children
+ c_nsdecl_node.last = c_node.last
+
+ # write node
+ if c_method == OUTPUT_METHOD_HTML:
+ tree.htmlNodeDumpFormatOutput(
+ c_buffer, c_doc, c_nsdecl_node, encoding, pretty_print)
+ else:
+ tree.xmlNodeDumpOutput(
+ c_buffer, c_doc, c_nsdecl_node, 0, pretty_print, encoding)
+
+ if c_nsdecl_node is not c_node:
+ # clean up
+ c_nsdecl_node.children = c_nsdecl_node.last = NULL
+ tree.xmlFreeNode(c_nsdecl_node)
+
+ if c_buffer.error:
+ return
+
+ # write tail, trailing comments, etc.
+ if with_tail:
+ _writeTail(c_buffer, c_node, encoding, c_method, pretty_print)
+ if write_complete_document:
+ _writeNextSiblings(c_buffer, c_node, encoding, pretty_print)
+ if pretty_print:
+ tree.xmlOutputBufferWrite(c_buffer, 1, "\n")
+
+cdef void _writeDeclarationToBuffer(tree.xmlOutputBuffer* c_buffer,
+ const_xmlChar* version, const_char* encoding,
+ int standalone) noexcept nogil:
+ if version is NULL:
+ version = <unsigned char*>"1.0"
+ tree.xmlOutputBufferWrite(c_buffer, 15, "<?xml version='")
+ tree.xmlOutputBufferWriteString(c_buffer, <const_char*>version)
+ tree.xmlOutputBufferWrite(c_buffer, 12, "' encoding='")
+ tree.xmlOutputBufferWriteString(c_buffer, encoding)
+ if standalone == 0:
+ tree.xmlOutputBufferWrite(c_buffer, 20, "' standalone='no'?>\n")
+ elif standalone == 1:
+ tree.xmlOutputBufferWrite(c_buffer, 21, "' standalone='yes'?>\n")
+ else:
+ tree.xmlOutputBufferWrite(c_buffer, 4, "'?>\n")
+
+cdef void _writeDtdToBuffer(tree.xmlOutputBuffer* c_buffer,
+ xmlDoc* c_doc, const_xmlChar* c_root_name,
+ int c_method, const_char* encoding) noexcept nogil:
+ cdef tree.xmlDtd* c_dtd
+ cdef xmlNode* c_node
+ cdef char* quotechar
+ c_dtd = c_doc.intSubset
+ if not c_dtd or not c_dtd.name:
+ return
+
+ # Name in document type declaration must match the root element tag.
+ # For XML, case sensitive match, for HTML insensitive.
+ if c_method == OUTPUT_METHOD_HTML:
+ if tree.xmlStrcasecmp(c_root_name, c_dtd.name) != 0:
+ return
+ else:
+ if tree.xmlStrcmp(c_root_name, c_dtd.name) != 0:
+ return
+
+ tree.xmlOutputBufferWrite(c_buffer, 10, "<!DOCTYPE ")
+ tree.xmlOutputBufferWriteString(c_buffer, <const_char*>c_dtd.name)
+
+ cdef const_xmlChar* public_id = c_dtd.ExternalID
+ cdef const_xmlChar* sys_url = c_dtd.SystemID
+ if public_id and public_id[0] == b'\0':
+ public_id = NULL
+ if sys_url and sys_url[0] == b'\0':
+ sys_url = NULL
+
+ if public_id:
+ tree.xmlOutputBufferWrite(c_buffer, 9, ' PUBLIC "')
+ tree.xmlOutputBufferWriteString(c_buffer, <const_char*>public_id)
+ if sys_url:
+ tree.xmlOutputBufferWrite(c_buffer, 2, '" ')
+ else:
+ tree.xmlOutputBufferWrite(c_buffer, 1, '"')
+ elif sys_url:
+ tree.xmlOutputBufferWrite(c_buffer, 8, ' SYSTEM ')
+
+ if sys_url:
+ if tree.xmlStrchr(sys_url, b'"'):
+ quotechar = '\''
+ else:
+ quotechar = '"'
+ tree.xmlOutputBufferWrite(c_buffer, 1, quotechar)
+ tree.xmlOutputBufferWriteString(c_buffer, <const_char*>sys_url)
+ tree.xmlOutputBufferWrite(c_buffer, 1, quotechar)
+
+ if (not c_dtd.entities and not c_dtd.elements and
+ not c_dtd.attributes and not c_dtd.notations and
+ not c_dtd.pentities):
+ tree.xmlOutputBufferWrite(c_buffer, 2, '>\n')
+ return
+
+ tree.xmlOutputBufferWrite(c_buffer, 3, ' [\n')
+ if c_dtd.notations and not c_buffer.error:
+ c_buf = tree.xmlBufferCreate()
+ if not c_buf:
+ c_buffer.error = xmlerror.XML_ERR_NO_MEMORY
+ return
+ tree.xmlDumpNotationTable(c_buf, <tree.xmlNotationTable*>c_dtd.notations)
+ tree.xmlOutputBufferWrite(
+ c_buffer, tree.xmlBufferLength(c_buf),
+ <const_char*>tree.xmlBufferContent(c_buf))
+ tree.xmlBufferFree(c_buf)
+ c_node = c_dtd.children
+ while c_node and not c_buffer.error:
+ tree.xmlNodeDumpOutput(c_buffer, c_node.doc, c_node, 0, 0, encoding)
+ c_node = c_node.next
+ tree.xmlOutputBufferWrite(c_buffer, 3, "]>\n")
+
+cdef void _writeTail(tree.xmlOutputBuffer* c_buffer, xmlNode* c_node,
+ const_char* encoding, int c_method, bint pretty_print) noexcept nogil:
+ "Write the element tail."
+ c_node = c_node.next
+ while c_node and not c_buffer.error and c_node.type in (
+ tree.XML_TEXT_NODE, tree.XML_CDATA_SECTION_NODE):
+ if c_method == OUTPUT_METHOD_HTML:
+ tree.htmlNodeDumpFormatOutput(
+ c_buffer, c_node.doc, c_node, encoding, pretty_print)
+ else:
+ tree.xmlNodeDumpOutput(
+ c_buffer, c_node.doc, c_node, 0, pretty_print, encoding)
+ c_node = c_node.next
+
+cdef void _writePrevSiblings(tree.xmlOutputBuffer* c_buffer, xmlNode* c_node,
+ const_char* encoding, bint pretty_print) noexcept nogil:
+ cdef xmlNode* c_sibling
+ if c_node.parent and _isElement(c_node.parent):
+ return
+ # we are at a root node, so add PI and comment siblings
+ c_sibling = c_node
+ while c_sibling.prev and \
+ (c_sibling.prev.type == tree.XML_PI_NODE or
+ c_sibling.prev.type == tree.XML_COMMENT_NODE):
+ c_sibling = c_sibling.prev
+ while c_sibling is not c_node and not c_buffer.error:
+ tree.xmlNodeDumpOutput(c_buffer, c_node.doc, c_sibling, 0,
+ pretty_print, encoding)
+ if pretty_print:
+ tree.xmlOutputBufferWriteString(c_buffer, "\n")
+ c_sibling = c_sibling.next
+
+cdef void _writeNextSiblings(tree.xmlOutputBuffer* c_buffer, xmlNode* c_node,
+ const_char* encoding, bint pretty_print) noexcept nogil:
+ cdef xmlNode* c_sibling
+ if c_node.parent and _isElement(c_node.parent):
+ return
+ # we are at a root node, so add PI and comment siblings
+ c_sibling = c_node.next
+ while not c_buffer.error and c_sibling and \
+ (c_sibling.type == tree.XML_PI_NODE or
+ c_sibling.type == tree.XML_COMMENT_NODE):
+ if pretty_print:
+ tree.xmlOutputBufferWriteString(c_buffer, "\n")
+ tree.xmlNodeDumpOutput(c_buffer, c_node.doc, c_sibling, 0,
+ pretty_print, encoding)
+ c_sibling = c_sibling.next
+
+
+# copied and adapted from libxml2 (xmlBufAttrSerializeTxtContent())
+cdef _write_attr_string(tree.xmlOutputBuffer* buf, const char *string):
+ cdef const char *base
+ cdef const char *cur
+
+ if string == NULL:
+ return
+
+ base = cur = <const char*>string
+ while cur[0] != 0:
+ if cur[0] == b'\n':
+ if base != cur:
+ tree.xmlOutputBufferWrite(buf, cur - base, base)
+
+ tree.xmlOutputBufferWrite(buf, 5, "&#10;")
+ cur += 1
+ base = cur
+
+ elif cur[0] == b'\r':
+ if base != cur:
+ tree.xmlOutputBufferWrite(buf, cur - base, base)
+
+ tree.xmlOutputBufferWrite(buf, 5, "&#13;")
+ cur += 1
+ base = cur
+
+ elif cur[0] == b'\t':
+ if base != cur:
+ tree.xmlOutputBufferWrite(buf, cur - base, base)
+
+ tree.xmlOutputBufferWrite(buf, 4, "&#9;")
+ cur += 1
+ base = cur
+
+ elif cur[0] == b'"':
+ if base != cur:
+ tree.xmlOutputBufferWrite(buf, cur - base, base)
+
+ tree.xmlOutputBufferWrite(buf, 6, "&quot;")
+ cur += 1
+ base = cur
+
+ elif cur[0] == b'<':
+ if base != cur:
+ tree.xmlOutputBufferWrite(buf, cur - base, base)
+
+ tree.xmlOutputBufferWrite(buf, 4, "&lt;")
+ cur += 1
+ base = cur
+
+ elif cur[0] == b'>':
+ if base != cur:
+ tree.xmlOutputBufferWrite(buf, cur - base, base)
+
+ tree.xmlOutputBufferWrite(buf, 4, "&gt;")
+ cur += 1
+ base = cur
+ elif cur[0] == b'&':
+ if base != cur:
+ tree.xmlOutputBufferWrite(buf, cur - base, base)
+
+ tree.xmlOutputBufferWrite(buf, 5, "&amp;")
+ cur += 1
+ base = cur
+
+ else:
+ # Leave further encoding and escaping to the buffer encoder.
+ cur += 1
+
+ if base != cur:
+ tree.xmlOutputBufferWrite(buf, cur - base, base)
+
+
+############################################################
+# output to file-like objects
+
+cdef object io_open
+from io import open as io_open
+
+cdef object gzip
+import gzip
+
+cdef object getwriter
+from codecs import getwriter
+cdef object utf8_writer = getwriter('utf8')
+
+cdef object contextmanager
+from contextlib import contextmanager
+
+cdef object _open_utf8_file
+
+@contextmanager
+def _open_utf8_file(file, compression=0):
+ file = _getFSPathOrObject(file)
+ if _isString(file):
+ if compression:
+ with gzip.GzipFile(file, mode='wb', compresslevel=compression) as zf:
+ yield utf8_writer(zf)
+ else:
+ with io_open(file, 'w', encoding='utf8') as f:
+ yield f
+ else:
+ if compression:
+ with gzip.GzipFile(fileobj=file, mode='wb', compresslevel=compression) as zf:
+ yield utf8_writer(zf)
+ else:
+ yield utf8_writer(file)
+
+
+@cython.final
+@cython.internal
+cdef class _FilelikeWriter:
+ cdef object _filelike
+ cdef object _close_filelike
+ cdef _ExceptionContext _exc_context
+ cdef _ErrorLog error_log
+ def __cinit__(self, filelike, exc_context=None, compression=None, close=False):
+ if compression is not None and compression > 0:
+ filelike = GzipFile(
+ fileobj=filelike, mode='wb', compresslevel=compression)
+ self._close_filelike = filelike.close
+ elif close:
+ self._close_filelike = filelike.close
+ self._filelike = filelike
+ if exc_context is None:
+ self._exc_context = _ExceptionContext()
+ else:
+ self._exc_context = exc_context
+ self.error_log = _ErrorLog()
+
+ cdef tree.xmlOutputBuffer* _createOutputBuffer(
+ self, tree.xmlCharEncodingHandler* enchandler) except NULL:
+ cdef tree.xmlOutputBuffer* c_buffer
+ c_buffer = tree.xmlOutputBufferCreateIO(
+ <tree.xmlOutputWriteCallback>_writeFilelikeWriter, _closeFilelikeWriter,
+ <python.PyObject*>self, enchandler)
+ if c_buffer is NULL:
+ raise IOError, "Could not create I/O writer context."
+ return c_buffer
+
+ cdef int write(self, char* c_buffer, int size) noexcept:
+ try:
+ if self._filelike is None:
+ raise IOError, "File is already closed"
+ py_buffer = <bytes>c_buffer[:size]
+ self._filelike.write(py_buffer)
+ except:
+ size = -1
+ self._exc_context._store_raised()
+ finally:
+ return size # and swallow any further exceptions
+
+ cdef int close(self) noexcept:
+ retval = 0
+ try:
+ if self._close_filelike is not None:
+ self._close_filelike()
+ # we should not close the file here as we didn't open it
+ self._filelike = None
+ except:
+ retval = -1
+ self._exc_context._store_raised()
+ finally:
+ return retval # and swallow any further exceptions
+
+cdef int _writeFilelikeWriter(void* ctxt, char* c_buffer, int length) noexcept:
+ return (<_FilelikeWriter>ctxt).write(c_buffer, length)
+
+cdef int _closeFilelikeWriter(void* ctxt) noexcept:
+ return (<_FilelikeWriter>ctxt).close()
+
+cdef _tofilelike(f, _Element element, encoding, doctype, method,
+ bint write_xml_declaration, bint write_doctype,
+ bint pretty_print, bint with_tail, int standalone,
+ int compression):
+ cdef _FilelikeWriter writer = None
+ cdef tree.xmlOutputBuffer* c_buffer
+ cdef tree.xmlCharEncodingHandler* enchandler
+ cdef const_char* c_enc
+ cdef const_xmlChar* c_doctype
+ cdef int error_result
+
+ c_method = _findOutputMethod(method)
+ if c_method == OUTPUT_METHOD_TEXT:
+ data = _textToString(element._c_node, encoding, with_tail)
+ if compression:
+ bytes_out = BytesIO()
+ with GzipFile(fileobj=bytes_out, mode='wb', compresslevel=compression) as gzip_file:
+ gzip_file.write(data)
+ data = bytes_out.getvalue()
+ f = _getFSPathOrObject(f)
+ if _isString(f):
+ filename8 = _encodeFilename(f)
+ with open(filename8, 'wb') as f:
+ f.write(data)
+ else:
+ f.write(data)
+ return
+
+ if encoding is None:
+ c_enc = NULL
+ else:
+ encoding = _utf8(encoding)
+ c_enc = _cstr(encoding)
+ if doctype is None:
+ c_doctype = NULL
+ else:
+ doctype = _utf8(doctype)
+ c_doctype = _xcstr(doctype)
+
+ writer = _create_output_buffer(f, c_enc, compression, &c_buffer, close=False)
+ if writer is None:
+ with nogil:
+ error_result = _serialise_node(
+ c_buffer, c_doctype, c_enc, element._c_node, c_method,
+ write_xml_declaration, write_doctype, pretty_print, with_tail, standalone)
+ else:
+ error_result = _serialise_node(
+ c_buffer, c_doctype, c_enc, element._c_node, c_method,
+ write_xml_declaration, write_doctype, pretty_print, with_tail, standalone)
+
+ if writer is not None:
+ writer._exc_context._raise_if_stored()
+ if error_result != xmlerror.XML_ERR_OK:
+ _raiseSerialisationError(error_result)
+
+
+cdef int _serialise_node(tree.xmlOutputBuffer* c_buffer, const_xmlChar* c_doctype,
+ const_char* c_enc, xmlNode* c_node, int c_method,
+ bint write_xml_declaration, bint write_doctype, bint pretty_print,
+ bint with_tail, int standalone) noexcept nogil:
+ _writeNodeToBuffer(
+ c_buffer, c_node, c_enc, c_doctype, c_method,
+ write_xml_declaration, write_doctype, pretty_print, with_tail, standalone)
+ error_result = c_buffer.error
+ if error_result == xmlerror.XML_ERR_OK:
+ error_result = tree.xmlOutputBufferClose(c_buffer)
+ if error_result != -1:
+ error_result = xmlerror.XML_ERR_OK
+ else:
+ tree.xmlOutputBufferClose(c_buffer)
+ return error_result
+
+
+cdef _FilelikeWriter _create_output_buffer(
+ f, const_char* c_enc, int c_compression,
+ tree.xmlOutputBuffer** c_buffer_ret, bint close):
+ cdef tree.xmlOutputBuffer* c_buffer
+ cdef _FilelikeWriter writer
+ cdef bytes filename8
+ enchandler = tree.xmlFindCharEncodingHandler(c_enc)
+ if enchandler is NULL:
+ raise LookupError(
+ f"unknown encoding: '{c_enc.decode('UTF-8') if c_enc is not NULL else u''}'")
+ try:
+ f = _getFSPathOrObject(f)
+ if _isString(f):
+ filename8 = _encodeFilename(f)
+ if b'%' in filename8 and (
+ # Exclude absolute Windows paths and file:// URLs.
+ _isFilePath(<const xmlChar*>filename8) not in (NO_FILE_PATH, ABS_WIN_FILE_PATH)
+ or filename8[:7].lower() == b'file://'):
+ # A file path (not a URL) containing the '%' URL escape character.
+ # libxml2 uses URL-unescaping on these, so escape the path before passing it in.
+ filename8 = filename8.replace(b'%', b'%25')
+ c_buffer = tree.xmlOutputBufferCreateFilename(
+ _cstr(filename8), enchandler, c_compression)
+ if c_buffer is NULL:
+ python.PyErr_SetFromErrno(IOError) # raises IOError
+ writer = None
+ elif hasattr(f, 'write'):
+ writer = _FilelikeWriter(f, compression=c_compression, close=close)
+ c_buffer = writer._createOutputBuffer(enchandler)
+ else:
+ raise TypeError(
+ f"File or filename expected, got '{python._fqtypename(f).decode('UTF-8')}'")
+ except:
+ tree.xmlCharEncCloseFunc(enchandler)
+ raise
+ c_buffer_ret[0] = c_buffer
+ return writer
+
+cdef xmlChar **_convert_ns_prefixes(tree.xmlDict* c_dict, ns_prefixes) except NULL:
+ cdef size_t i, num_ns_prefixes = len(ns_prefixes)
+ # Need to allocate one extra memory block to handle last NULL entry
+ c_ns_prefixes = <xmlChar **>python.lxml_malloc(num_ns_prefixes + 1, sizeof(xmlChar*))
+ if not c_ns_prefixes:
+ raise MemoryError()
+ i = 0
+ try:
+ for prefix in ns_prefixes:
+ prefix_utf = _utf8(prefix)
+ c_prefix = tree.xmlDictExists(c_dict, _xcstr(prefix_utf), len(prefix_utf))
+ if c_prefix:
+ # unknown prefixes do not need to get serialised
+ c_ns_prefixes[i] = <xmlChar*>c_prefix
+ i += 1
+ except:
+ python.lxml_free(c_ns_prefixes)
+ raise
+
+ c_ns_prefixes[i] = NULL # append end marker
+ return c_ns_prefixes
+
+cdef _tofilelikeC14N(f, _Element element, bint exclusive, bint with_comments,
+ int compression, inclusive_ns_prefixes):
+ cdef _FilelikeWriter writer = None
+ cdef tree.xmlOutputBuffer* c_buffer
+ cdef xmlChar **c_inclusive_ns_prefixes = NULL
+ cdef char* c_filename
+ cdef xmlDoc* c_base_doc
+ cdef xmlDoc* c_doc
+ cdef int bytes_count, error = 0
+
+ c_base_doc = element._c_node.doc
+ c_doc = _fakeRootDoc(c_base_doc, element._c_node)
+ try:
+ c_inclusive_ns_prefixes = (
+ _convert_ns_prefixes(c_doc.dict, inclusive_ns_prefixes)
+ if inclusive_ns_prefixes else NULL)
+
+ f = _getFSPathOrObject(f)
+ if _isString(f):
+ filename8 = _encodeFilename(f)
+ c_filename = _cstr(filename8)
+ with nogil:
+ error = c14n.xmlC14NDocSave(
+ c_doc, NULL, exclusive, c_inclusive_ns_prefixes,
+ with_comments, c_filename, compression)
+ elif hasattr(f, 'write'):
+ writer = _FilelikeWriter(f, compression=compression)
+ c_buffer = writer._createOutputBuffer(NULL)
+ try:
+ with writer.error_log:
+ bytes_count = c14n.xmlC14NDocSaveTo(
+ c_doc, NULL, exclusive, c_inclusive_ns_prefixes,
+ with_comments, c_buffer)
+ finally:
+ error = tree.xmlOutputBufferClose(c_buffer)
+ if bytes_count < 0:
+ error = bytes_count
+ elif error != -1:
+ error = xmlerror.XML_ERR_OK
+ else:
+ raise TypeError(f"File or filename expected, got '{python._fqtypename(f).decode('UTF-8')}'")
+ finally:
+ _destroyFakeDoc(c_base_doc, c_doc)
+ if c_inclusive_ns_prefixes is not NULL:
+ python.lxml_free(c_inclusive_ns_prefixes)
+
+ if writer is not None:
+ writer._exc_context._raise_if_stored()
+
+ if error < 0:
+ message = "C14N failed"
+ if writer is not None:
+ errors = writer.error_log
+ if len(errors):
+ message = errors[0].message
+ raise C14NError(message)
+
+
+# C14N 2.0
+
+def canonicalize(xml_data=None, *, out=None, from_file=None, **options):
+ """Convert XML to its C14N 2.0 serialised form.
+
+ If *out* is provided, it must be a file or file-like object that receives
+ the serialised canonical XML output (text, not bytes) through its ``.write()``
+ method. To write to a file, open it in text mode with encoding "utf-8".
+ If *out* is not provided, this function returns the output as text string.
+
+ Either *xml_data* (an XML string, tree or Element) or *file*
+ (a file path or file-like object) must be provided as input.
+
+ The configuration options are the same as for the ``C14NWriterTarget``.
+ """
+ if xml_data is None and from_file is None:
+ raise ValueError("Either 'xml_data' or 'from_file' must be provided as input")
+
+ sio = None
+ if out is None:
+ sio = out = StringIO()
+
+ target = C14NWriterTarget(out.write, **options)
+
+ if xml_data is not None and not isinstance(xml_data, basestring):
+ _tree_to_target(xml_data, target)
+ return sio.getvalue() if sio is not None else None
+
+ cdef _FeedParser parser = XMLParser(
+ target=target,
+ attribute_defaults=True,
+ collect_ids=False,
+ )
+
+ if xml_data is not None:
+ parser.feed(xml_data)
+ parser.close()
+ elif from_file is not None:
+ try:
+ _parseDocument(from_file, parser, base_url=None)
+ except _TargetParserResult:
+ pass
+
+ return sio.getvalue() if sio is not None else None
+
+
+cdef _tree_to_target(element, target):
+ for event, elem in iterwalk(element, events=('start', 'end', 'start-ns', 'comment', 'pi')):
+ text = None
+ if event == 'start':
+ target.start(elem.tag, elem.attrib)
+ text = elem.text
+ elif event == 'end':
+ target.end(elem.tag)
+ text = elem.tail
+ elif event == 'start-ns':
+ target.start_ns(*elem)
+ continue
+ elif event == 'comment':
+ target.comment(elem.text)
+ text = elem.tail
+ elif event == 'pi':
+ target.pi(elem.target, elem.text)
+ text = elem.tail
+ if text:
+ target.data(text)
+ return target.close()
+
+
+cdef object _looks_like_prefix_name = re.compile(r'^\w+:\w+$', re.UNICODE).match
+
+
+cdef class C14NWriterTarget:
+ """
+ Canonicalization writer target for the XMLParser.
+
+ Serialises parse events to XML C14N 2.0.
+
+ Configuration options:
+
+ - *with_comments*: set to true to include comments
+ - *strip_text*: set to true to strip whitespace before and after text content
+ - *rewrite_prefixes*: set to true to replace namespace prefixes by "n{number}"
+ - *qname_aware_tags*: a set of qname aware tag names in which prefixes
+ should be replaced in text content
+ - *qname_aware_attrs*: a set of qname aware attribute names in which prefixes
+ should be replaced in text content
+ - *exclude_attrs*: a set of attribute names that should not be serialised
+ - *exclude_tags*: a set of tag names that should not be serialised
+ """
+ cdef object _write
+ cdef list _data
+ cdef set _qname_aware_tags
+ cdef object _find_qname_aware_attrs
+ cdef list _declared_ns_stack
+ cdef list _ns_stack
+ cdef dict _prefix_map
+ cdef list _preserve_space
+ cdef tuple _pending_start
+ cdef set _exclude_tags
+ cdef set _exclude_attrs
+ cdef Py_ssize_t _ignored_depth
+ cdef bint _with_comments
+ cdef bint _strip_text
+ cdef bint _rewrite_prefixes
+ cdef bint _root_seen
+ cdef bint _root_done
+
+ def __init__(self, write, *,
+ with_comments=False, strip_text=False, rewrite_prefixes=False,
+ qname_aware_tags=None, qname_aware_attrs=None,
+ exclude_attrs=None, exclude_tags=None):
+ self._write = write
+ self._data = []
+ self._with_comments = with_comments
+ self._strip_text = strip_text
+ self._exclude_attrs = set(exclude_attrs) if exclude_attrs else None
+ self._exclude_tags = set(exclude_tags) if exclude_tags else None
+
+ self._rewrite_prefixes = rewrite_prefixes
+ if qname_aware_tags:
+ self._qname_aware_tags = set(qname_aware_tags)
+ else:
+ self._qname_aware_tags = None
+ if qname_aware_attrs:
+ self._find_qname_aware_attrs = set(qname_aware_attrs).intersection
+ else:
+ self._find_qname_aware_attrs = None
+
+ # Stack with globally and newly declared namespaces as (uri, prefix) pairs.
+ self._declared_ns_stack = [[
+ ("http://www.w3.org/XML/1998/namespace", "xml"),
+ ]]
+ # Stack with user declared namespace prefixes as (uri, prefix) pairs.
+ self._ns_stack = []
+ if not rewrite_prefixes:
+ self._ns_stack.append(_DEFAULT_NAMESPACE_PREFIXES_ITEMS)
+ self._ns_stack.append([])
+ self._prefix_map = {}
+ self._preserve_space = [False]
+ self._pending_start = None
+ self._ignored_depth = 0
+ self._root_seen = False
+ self._root_done = False
+
+ def _iter_namespaces(self, ns_stack):
+ for namespaces in reversed(ns_stack):
+ if namespaces: # almost no element declares new namespaces
+ yield from namespaces
+
+ cdef _resolve_prefix_name(self, prefixed_name):
+ prefix, name = prefixed_name.split(':', 1)
+ for uri, p in self._iter_namespaces(self._ns_stack):
+ if p == prefix:
+ return f'{{{uri}}}{name}'
+ raise ValueError(f'Prefix {prefix} of QName "{prefixed_name}" is not declared in scope')
+
+ cdef _qname(self, qname, uri=None):
+ if uri is None:
+ uri, tag = qname[1:].rsplit('}', 1) if qname[:1] == '{' else ('', qname)
+ else:
+ tag = qname
+
+ prefixes_seen = set()
+ for u, prefix in self._iter_namespaces(self._declared_ns_stack):
+ if u == uri and prefix not in prefixes_seen:
+ return f'{prefix}:{tag}' if prefix else tag, tag, uri
+ prefixes_seen.add(prefix)
+
+ # Not declared yet => add new declaration.
+ if self._rewrite_prefixes:
+ if uri in self._prefix_map:
+ prefix = self._prefix_map[uri]
+ else:
+ prefix = self._prefix_map[uri] = f'n{len(self._prefix_map)}'
+ self._declared_ns_stack[-1].append((uri, prefix))
+ return f'{prefix}:{tag}', tag, uri
+
+ if not uri and '' not in prefixes_seen:
+ # No default namespace declared => no prefix needed.
+ return tag, tag, uri
+
+ for u, prefix in self._iter_namespaces(self._ns_stack):
+ if u == uri:
+ self._declared_ns_stack[-1].append((uri, prefix))
+ return f'{prefix}:{tag}' if prefix else tag, tag, uri
+
+ if not uri:
+ # As soon as a default namespace is defined,
+ # anything that has no namespace (and thus, no prefix) goes there.
+ return tag, tag, uri
+
+ raise ValueError(f'Namespace "{uri}" of name "{tag}" is not declared in scope')
+
+ def data(self, data):
+ if not self._ignored_depth:
+ self._data.append(data)
+
+ cdef _flush(self):
+ cdef unicode data = ''.join(self._data)
+ del self._data[:]
+ if self._strip_text and not self._preserve_space[-1]:
+ data = data.strip()
+ if self._pending_start is not None:
+ (tag, attrs, new_namespaces), self._pending_start = self._pending_start, None
+ qname_text = data if ':' in data and _looks_like_prefix_name(data) else None
+ self._start(tag, attrs, new_namespaces, qname_text)
+ if qname_text is not None:
+ return
+ if data and self._root_seen:
+ self._write(_escape_cdata_c14n(data))
+
+ def start_ns(self, prefix, uri):
+ if self._ignored_depth:
+ return
+ # we may have to resolve qnames in text content
+ if self._data:
+ self._flush()
+ self._ns_stack[-1].append((uri, prefix))
+
+ def start(self, tag, attrs):
+ if self._exclude_tags is not None and (
+ self._ignored_depth or tag in self._exclude_tags):
+ self._ignored_depth += 1
+ return
+ if self._data:
+ self._flush()
+
+ new_namespaces = []
+ self._declared_ns_stack.append(new_namespaces)
+
+ if self._qname_aware_tags is not None and tag in self._qname_aware_tags:
+ # Need to parse text first to see if it requires a prefix declaration.
+ self._pending_start = (tag, attrs, new_namespaces)
+ return
+ self._start(tag, attrs, new_namespaces)
+
+ cdef _start(self, tag, attrs, new_namespaces, qname_text=None):
+ if self._exclude_attrs is not None and attrs:
+ attrs = {k: v for k, v in attrs.items() if k not in self._exclude_attrs}
+
+ qnames = {tag, *attrs}
+ resolved_names = {}
+
+ # Resolve prefixes in attribute and tag text.
+ if qname_text is not None:
+ qname = resolved_names[qname_text] = self._resolve_prefix_name(qname_text)
+ qnames.add(qname)
+ if self._find_qname_aware_attrs is not None and attrs:
+ qattrs = self._find_qname_aware_attrs(attrs)
+ if qattrs:
+ for attr_name in qattrs:
+ value = attrs[attr_name]
+ if _looks_like_prefix_name(value):
+ qname = resolved_names[value] = self._resolve_prefix_name(value)
+ qnames.add(qname)
+ else:
+ qattrs = None
+ else:
+ qattrs = None
+
+ # Assign prefixes in lexicographical order of used URIs.
+ parsed_qnames = {n: self._qname(n) for n in sorted(
+ qnames, key=lambda n: n.split('}', 1))}
+
+ # Write namespace declarations in prefix order ...
+ if new_namespaces:
+ attr_list = [
+ ('xmlns:' + prefix if prefix else 'xmlns', uri)
+ for uri, prefix in new_namespaces
+ ]
+ attr_list.sort()
+ else:
+ # almost always empty
+ attr_list = []
+
+ # ... followed by attributes in URI+name order
+ if attrs:
+ for k, v in sorted(attrs.items()):
+ if qattrs is not None and k in qattrs and v in resolved_names:
+ v = parsed_qnames[resolved_names[v]][0]
+ attr_qname, attr_name, uri = parsed_qnames[k]
+ # No prefix for attributes in default ('') namespace.
+ attr_list.append((attr_qname if uri else attr_name, v))
+
+ # Honour xml:space attributes.
+ space_behaviour = attrs.get('{http://www.w3.org/XML/1998/namespace}space')
+ self._preserve_space.append(
+ space_behaviour == 'preserve' if space_behaviour
+ else self._preserve_space[-1])
+
+ # Write the tag.
+ write = self._write
+ write('<' + parsed_qnames[tag][0])
+ if attr_list:
+ write(''.join([f' {k}="{_escape_attrib_c14n(v)}"' for k, v in attr_list]))
+ write('>')
+
+ # Write the resolved qname text content.
+ if qname_text is not None:
+ write(_escape_cdata_c14n(parsed_qnames[resolved_names[qname_text]][0]))
+
+ self._root_seen = True
+ self._ns_stack.append([])
+
+ def end(self, tag):
+ if self._ignored_depth:
+ self._ignored_depth -= 1
+ return
+ if self._data:
+ self._flush()
+ self._write(f'</{self._qname(tag)[0]}>')
+ self._preserve_space.pop()
+ self._root_done = len(self._preserve_space) == 1
+ self._declared_ns_stack.pop()
+ self._ns_stack.pop()
+
+ def comment(self, text):
+ if not self._with_comments:
+ return
+ if self._ignored_depth:
+ return
+ if self._root_done:
+ self._write('\n')
+ elif self._root_seen and self._data:
+ self._flush()
+ self._write(f'<!--{_escape_cdata_c14n(text)}-->')
+ if not self._root_seen:
+ self._write('\n')
+
+ def pi(self, target, data):
+ if self._ignored_depth:
+ return
+ if self._root_done:
+ self._write('\n')
+ elif self._root_seen and self._data:
+ self._flush()
+ self._write(
+ f'<?{target} {_escape_cdata_c14n(data)}?>' if data else f'<?{target}?>')
+ if not self._root_seen:
+ self._write('\n')
+
+ def close(self):
+ return None
+
+
+cdef _raise_serialization_error(text):
+ raise TypeError("cannot serialize %r (type %s)" % (text, type(text).__name__))
+
+
+cdef unicode _escape_cdata_c14n(stext):
+ # escape character data
+ cdef unicode text
+ cdef Py_UCS4 ch
+ cdef Py_ssize_t start = 0, pos = 0
+ cdef list substrings = None
+ try:
+ text = unicode(stext)
+ except (TypeError, AttributeError):
+ return _raise_serialization_error(stext)
+
+ for pos, ch in enumerate(text):
+ if ch == '&':
+ escape = '&amp;'
+ elif ch == '<':
+ escape = '&lt;'
+ elif ch == '>':
+ escape = '&gt;'
+ elif ch == '\r':
+ escape = '&#xD;'
+ else:
+ continue
+
+ if substrings is None:
+ substrings = []
+ if pos > start:
+ substrings.append(text[start:pos])
+ substrings.append(escape)
+ start = pos + 1
+
+ if substrings is None:
+ return text
+ if pos >= start:
+ substrings.append(text[start:pos+1])
+ return ''.join(substrings)
+
+
+cdef unicode _escape_attrib_c14n(stext):
+ # escape attribute value
+ cdef unicode text
+ cdef Py_UCS4 ch
+ cdef Py_ssize_t start = 0, pos = 0
+ cdef list substrings = None
+ try:
+ text = unicode(stext)
+ except (TypeError, AttributeError):
+ return _raise_serialization_error(stext)
+
+ for pos, ch in enumerate(text):
+ if ch == '&':
+ escape = '&amp;'
+ elif ch == '<':
+ escape = '&lt;'
+ elif ch == '"':
+ escape = '&quot;'
+ elif ch == '\t':
+ escape = '&#x9;'
+ elif ch == '\n':
+ escape = '&#xA;'
+ elif ch == '\r':
+ escape = '&#xD;'
+ else:
+ continue
+
+ if substrings is None:
+ substrings = []
+ if pos > start:
+ substrings.append(text[start:pos])
+ substrings.append(escape)
+ start = pos + 1
+
+ if substrings is None:
+ return text
+ if pos >= start:
+ substrings.append(text[start:pos+1])
+ return ''.join(substrings)
+
+
+# incremental serialisation
+
+cdef class xmlfile:
+ """xmlfile(self, output_file, encoding=None, compression=None, close=False, buffered=True)
+
+ A simple mechanism for incremental XML serialisation.
+
+ Usage example::
+
+ with xmlfile("somefile.xml", encoding='utf-8') as xf:
+ xf.write_declaration(standalone=True)
+ xf.write_doctype('<!DOCTYPE root SYSTEM "some.dtd">')
+
+ # generate an element (the root element)
+ with xf.element('root'):
+ # write a complete Element into the open root element
+ xf.write(etree.Element('test'))
+
+ # generate and write more Elements, e.g. through iterparse
+ for element in generate_some_elements():
+ # serialise generated elements into the XML file
+ xf.write(element)
+
+ # or write multiple Elements or strings at once
+ xf.write(etree.Element('start'), "text", etree.Element('end'))
+
+ If 'output_file' is a file(-like) object, passing ``close=True`` will
+ close it when exiting the context manager. By default, it is left
+ to the owner to do that. When a file path is used, lxml will take care
+ of opening and closing the file itself. Also, when a compression level
+ is set, lxml will deliberately close the file to make sure all data gets
+ compressed and written.
+
+ Setting ``buffered=False`` will flush the output after each operation,
+ such as opening or closing an ``xf.element()`` block or calling
+ ``xf.write()``. Alternatively, calling ``xf.flush()`` can be used to
+ explicitly flush any pending output when buffering is enabled.
+ """
+ cdef object output_file
+ cdef bytes encoding
+ cdef _IncrementalFileWriter writer
+ cdef _AsyncIncrementalFileWriter async_writer
+ cdef int compresslevel
+ cdef bint close
+ cdef bint buffered
+ cdef int method
+
+ def __init__(self, output_file not None, encoding=None, compression=None,
+ close=False, buffered=True):
+ self.output_file = output_file
+ self.encoding = _utf8orNone(encoding)
+ self.compresslevel = compression or 0
+ self.close = close
+ self.buffered = buffered
+ self.method = OUTPUT_METHOD_XML
+
+ def __enter__(self):
+ assert self.output_file is not None
+ self.writer = _IncrementalFileWriter(
+ self.output_file, self.encoding, self.compresslevel,
+ self.close, self.buffered, self.method)
+ return self.writer
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if self.writer is not None:
+ old_writer, self.writer = self.writer, None
+ raise_on_error = exc_type is None
+ old_writer._close(raise_on_error)
+ if self.close:
+ self.output_file = None
+
+ async def __aenter__(self):
+ assert self.output_file is not None
+ if isinstance(self.output_file, basestring):
+ raise TypeError("Cannot asynchronously write to a plain file")
+ if not hasattr(self.output_file, 'write'):
+ raise TypeError("Output file needs an async .write() method")
+ self.async_writer = _AsyncIncrementalFileWriter(
+ self.output_file, self.encoding, self.compresslevel,
+ self.close, self.buffered, self.method)
+ return self.async_writer
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ if self.async_writer is not None:
+ old_writer, self.async_writer = self.async_writer, None
+ raise_on_error = exc_type is None
+ await old_writer._close(raise_on_error)
+ if self.close:
+ self.output_file = None
+
+
+cdef class htmlfile(xmlfile):
+ """htmlfile(self, output_file, encoding=None, compression=None, close=False, buffered=True)
+
+ A simple mechanism for incremental HTML serialisation. Works the same as
+ xmlfile.
+ """
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.method = OUTPUT_METHOD_HTML
+
+
+cdef enum _IncrementalFileWriterStatus:
+ WRITER_STARTING = 0
+ WRITER_DECL_WRITTEN = 1
+ WRITER_DTD_WRITTEN = 2
+ WRITER_IN_ELEMENT = 3
+ WRITER_FINISHED = 4
+
+
+@cython.final
+@cython.internal
+cdef class _IncrementalFileWriter:
+ cdef tree.xmlOutputBuffer* _c_out
+ cdef bytes _encoding
+ cdef const_char* _c_encoding
+ cdef _FilelikeWriter _target
+ cdef list _element_stack
+ cdef int _status
+ cdef int _method
+ cdef bint _buffered
+
+ def __cinit__(self, outfile, bytes encoding, int compresslevel, bint close,
+ bint buffered, int method):
+ self._status = WRITER_STARTING
+ self._element_stack = []
+ if encoding is None:
+ # We always need a document encoding to make the attribute serialisation
+ # of libxml2 identical to ours.
+ encoding = b'ASCII'
+ self._encoding = encoding
+ self._c_encoding = _cstr(encoding)
+ self._buffered = buffered
+ self._target = _create_output_buffer(
+ outfile, self._c_encoding, compresslevel, &self._c_out, close)
+ self._method = method
+
+ def __dealloc__(self):
+ if self._c_out is not NULL:
+ tree.xmlOutputBufferClose(self._c_out)
+
+ def write_declaration(self, version=None, standalone=None, doctype=None):
+ """write_declaration(self, version=None, standalone=None, doctype=None)
+
+ Write an XML declaration and (optionally) a doctype into the file.
+ """
+ assert self._c_out is not NULL
+ cdef const_xmlChar* c_version
+ cdef int c_standalone
+ if self._method != OUTPUT_METHOD_XML:
+ raise LxmlSyntaxError("only XML documents have declarations")
+ if self._status >= WRITER_DECL_WRITTEN:
+ raise LxmlSyntaxError("XML declaration already written")
+ version = _utf8orNone(version)
+ c_version = _xcstr(version) if version is not None else NULL
+ doctype = _utf8orNone(doctype)
+ if standalone is None:
+ c_standalone = -1
+ else:
+ c_standalone = 1 if standalone else 0
+ _writeDeclarationToBuffer(self._c_out, c_version, self._c_encoding, c_standalone)
+ if doctype is not None:
+ _writeDoctype(self._c_out, _xcstr(doctype))
+ self._status = WRITER_DTD_WRITTEN
+ else:
+ self._status = WRITER_DECL_WRITTEN
+ if not self._buffered:
+ tree.xmlOutputBufferFlush(self._c_out)
+ self._handle_error(self._c_out.error)
+
+ def write_doctype(self, doctype):
+ """write_doctype(self, doctype)
+
+ Writes the given doctype declaration verbatimly into the file.
+ """
+ assert self._c_out is not NULL
+ if doctype is None:
+ return
+ if self._status >= WRITER_DTD_WRITTEN:
+ raise LxmlSyntaxError("DOCTYPE already written or cannot write it here")
+ doctype = _utf8(doctype)
+ _writeDoctype(self._c_out, _xcstr(doctype))
+ self._status = WRITER_DTD_WRITTEN
+ if not self._buffered:
+ tree.xmlOutputBufferFlush(self._c_out)
+ self._handle_error(self._c_out.error)
+
+ def method(self, method):
+ """method(self, method)
+
+ Returns a context manager that overrides and restores the output method.
+ method is one of (None, 'xml', 'html') where None means 'xml'.
+ """
+ assert self._c_out is not NULL
+ c_method = self._method if method is None else _findOutputMethod(method)
+ return _MethodChanger(self, c_method)
+
+ def element(self, tag, attrib=None, nsmap=None, method=None, **_extra):
+ """element(self, tag, attrib=None, nsmap=None, method, **_extra)
+
+ Returns a context manager that writes an opening and closing tag.
+ method is one of (None, 'xml', 'html') where None means 'xml'.
+ """
+ assert self._c_out is not NULL
+ attributes = []
+ if attrib is not None:
+ for name, value in _iter_attrib(attrib):
+ if name not in _extra:
+ ns, name = _getNsTag(name)
+ attributes.append((ns, name, _utf8(value)))
+ if _extra:
+ for name, value in _extra.iteritems():
+ ns, name = _getNsTag(name)
+ attributes.append((ns, name, _utf8(value)))
+ reversed_nsmap = {}
+ if nsmap:
+ for prefix, ns in nsmap.items():
+ if prefix is not None:
+ prefix = _utf8(prefix)
+ _prefixValidOrRaise(prefix)
+ reversed_nsmap[_utf8(ns)] = prefix
+ ns, name = _getNsTag(tag)
+
+ c_method = self._method if method is None else _findOutputMethod(method)
+
+ return _FileWriterElement(self, (ns, name, attributes, reversed_nsmap), c_method)
+
+ cdef _write_qname(self, bytes name, bytes prefix):
+ if prefix: # empty bytes for no prefix (not None to allow sorting)
+ tree.xmlOutputBufferWrite(self._c_out, len(prefix), _cstr(prefix))
+ tree.xmlOutputBufferWrite(self._c_out, 1, ':')
+ tree.xmlOutputBufferWrite(self._c_out, len(name), _cstr(name))
+
+ cdef _write_start_element(self, element_config):
+ if self._status > WRITER_IN_ELEMENT:
+ raise LxmlSyntaxError("cannot append trailing element to complete XML document")
+ ns, name, attributes, nsmap = element_config
+ flat_namespace_map, new_namespaces = self._collect_namespaces(nsmap)
+ prefix = self._find_prefix(ns, flat_namespace_map, new_namespaces)
+ tree.xmlOutputBufferWrite(self._c_out, 1, '<')
+ self._write_qname(name, prefix)
+
+ self._write_attributes_and_namespaces(
+ attributes, flat_namespace_map, new_namespaces)
+
+ tree.xmlOutputBufferWrite(self._c_out, 1, '>')
+ if not self._buffered:
+ tree.xmlOutputBufferFlush(self._c_out)
+ self._handle_error(self._c_out.error)
+
+ self._element_stack.append((ns, name, prefix, flat_namespace_map))
+ self._status = WRITER_IN_ELEMENT
+
+ cdef _write_attributes_and_namespaces(self, list attributes,
+ dict flat_namespace_map,
+ list new_namespaces):
+ if attributes:
+ # _find_prefix() may append to new_namespaces => build them first
+ attributes = [
+ (self._find_prefix(ns, flat_namespace_map, new_namespaces), name, value)
+ for ns, name, value in attributes ]
+ if new_namespaces:
+ new_namespaces.sort()
+ self._write_attributes_list(new_namespaces)
+ if attributes:
+ self._write_attributes_list(attributes)
+
+ cdef _write_attributes_list(self, list attributes):
+ for prefix, name, value in attributes:
+ tree.xmlOutputBufferWrite(self._c_out, 1, ' ')
+ self._write_qname(name, prefix)
+ tree.xmlOutputBufferWrite(self._c_out, 2, '="')
+ _write_attr_string(self._c_out, _cstr(value))
+
+ tree.xmlOutputBufferWrite(self._c_out, 1, '"')
+
+ cdef _write_end_element(self, element_config):
+ if self._status != WRITER_IN_ELEMENT:
+ raise LxmlSyntaxError("not in an element")
+ if not self._element_stack or self._element_stack[-1][:2] != element_config[:2]:
+ raise LxmlSyntaxError("inconsistent exit action in context manager")
+
+ # If previous write operations failed, the context manager exit might still call us.
+ # That is ok, but we stop writing closing tags and handling errors in that case.
+ # For all non-I/O errors, we continue writing closing tags if we can.
+ ok_to_write = self._c_out.error == xmlerror.XML_ERR_OK
+
+ name, prefix = self._element_stack.pop()[1:3]
+ if ok_to_write:
+ tree.xmlOutputBufferWrite(self._c_out, 2, '</')
+ self._write_qname(name, prefix)
+ tree.xmlOutputBufferWrite(self._c_out, 1, '>')
+
+ if not self._element_stack:
+ self._status = WRITER_FINISHED
+ if ok_to_write:
+ if not self._buffered:
+ tree.xmlOutputBufferFlush(self._c_out)
+ self._handle_error(self._c_out.error)
+
+ cdef _find_prefix(self, bytes href, dict flat_namespaces_map, list new_namespaces):
+ if href is None:
+ return None
+ if href in flat_namespaces_map:
+ return flat_namespaces_map[href]
+ # need to create a new prefix
+ prefixes = flat_namespaces_map.values()
+ i = 0
+ while True:
+ prefix = _utf8('ns%d' % i)
+ if prefix not in prefixes:
+ new_namespaces.append((b'xmlns', prefix, href))
+ flat_namespaces_map[href] = prefix
+ return prefix
+ i += 1
+
+ cdef _collect_namespaces(self, dict nsmap):
+ new_namespaces = []
+ flat_namespaces_map = {}
+ for ns, prefix in nsmap.iteritems():
+ flat_namespaces_map[ns] = prefix
+ if prefix is None:
+ # use empty bytes rather than None to allow sorting
+ new_namespaces.append((b'', b'xmlns', ns))
+ else:
+ new_namespaces.append((b'xmlns', prefix, ns))
+ # merge in flat namespace map of parent
+ if self._element_stack:
+ for ns, prefix in (<dict>self._element_stack[-1][-1]).iteritems():
+ if flat_namespaces_map.get(ns) is None:
+ # unknown or empty prefix => prefer a 'real' prefix
+ flat_namespaces_map[ns] = prefix
+ return flat_namespaces_map, new_namespaces
+
+ def write(self, *args, bint with_tail=True, bint pretty_print=False, method=None):
+ """write(self, *args, with_tail=True, pretty_print=False, method=None)
+
+ Write subtrees or strings into the file.
+
+ If method is not None, it should be one of ('html', 'xml', 'text')
+ to temporarily override the output method.
+ """
+ assert self._c_out is not NULL
+ c_method = self._method if method is None else _findOutputMethod(method)
+
+ for content in args:
+ if _isString(content):
+ if self._status != WRITER_IN_ELEMENT:
+ if self._status > WRITER_IN_ELEMENT or content.strip():
+ raise LxmlSyntaxError("not in an element")
+ bstring = _utf8(content)
+ if not bstring:
+ continue
+
+ ns, name, _, _ = self._element_stack[-1]
+ if (c_method == OUTPUT_METHOD_HTML and
+ ns in (None, b'http://www.w3.org/1999/xhtml') and
+ name in (b'script', b'style')):
+ tree.xmlOutputBufferWrite(self._c_out, len(bstring), _cstr(bstring))
+
+ else:
+ tree.xmlOutputBufferWriteEscape(self._c_out, _xcstr(bstring), NULL)
+
+ elif iselement(content):
+ if self._status > WRITER_IN_ELEMENT:
+ raise LxmlSyntaxError("cannot append trailing element to complete XML document")
+ _writeNodeToBuffer(self._c_out, (<_Element>content)._c_node,
+ self._c_encoding, NULL, c_method,
+ False, False, pretty_print, with_tail, False)
+ if (<_Element>content)._c_node.type == tree.XML_ELEMENT_NODE:
+ if not self._element_stack:
+ self._status = WRITER_FINISHED
+
+ elif content is not None:
+ raise TypeError(
+ f"got invalid input value of type {type(content)}, expected string or Element")
+ self._handle_error(self._c_out.error)
+ if not self._buffered:
+ tree.xmlOutputBufferFlush(self._c_out)
+ self._handle_error(self._c_out.error)
+
+ def flush(self):
+ """flush(self)
+
+ Write any pending content of the current output buffer to the stream.
+ """
+ assert self._c_out is not NULL
+ tree.xmlOutputBufferFlush(self._c_out)
+ self._handle_error(self._c_out.error)
+
+ cdef _close(self, bint raise_on_error):
+ if raise_on_error:
+ if self._status < WRITER_IN_ELEMENT:
+ raise LxmlSyntaxError("no content written")
+ if self._element_stack:
+ raise LxmlSyntaxError("pending open tags on close")
+ error_result = self._c_out.error
+ if error_result == xmlerror.XML_ERR_OK:
+ error_result = tree.xmlOutputBufferClose(self._c_out)
+ if error_result != -1:
+ error_result = xmlerror.XML_ERR_OK
+ else:
+ tree.xmlOutputBufferClose(self._c_out)
+ self._status = WRITER_FINISHED
+ self._c_out = NULL
+ del self._element_stack[:]
+ if raise_on_error:
+ self._handle_error(error_result)
+
+ cdef _handle_error(self, int error_result):
+ if error_result != xmlerror.XML_ERR_OK:
+ if self._target is not None:
+ self._target._exc_context._raise_if_stored()
+ _raiseSerialisationError(error_result)
+
+
+@cython.final
+@cython.internal
+cdef class _AsyncDataWriter:
+ cdef list _data
+ def __cinit__(self):
+ self._data = []
+
+ cdef bytes collect(self):
+ data = b''.join(self._data)
+ del self._data[:]
+ return data
+
+ def write(self, data):
+ self._data.append(data)
+
+ def close(self):
+ pass
+
+
+@cython.final
+@cython.internal
+cdef class _AsyncIncrementalFileWriter:
+ cdef _IncrementalFileWriter _writer
+ cdef _AsyncDataWriter _buffer
+ cdef object _async_outfile
+ cdef int _flush_after_writes
+ cdef bint _should_close
+ cdef bint _buffered
+
+ def __cinit__(self, async_outfile, bytes encoding, int compresslevel, bint close,
+ bint buffered, int method):
+ self._flush_after_writes = 20
+ self._async_outfile = async_outfile
+ self._should_close = close
+ self._buffered = buffered
+ self._buffer = _AsyncDataWriter()
+ self._writer = _IncrementalFileWriter(
+ self._buffer, encoding, compresslevel, close=True, buffered=False, method=method)
+
+ cdef bytes _flush(self):
+ if not self._buffered or len(self._buffer._data) > self._flush_after_writes:
+ return self._buffer.collect()
+ return None
+
+ async def flush(self):
+ self._writer.flush()
+ data = self._buffer.collect()
+ if data:
+ await self._async_outfile.write(data)
+
+ async def write_declaration(self, version=None, standalone=None, doctype=None):
+ self._writer.write_declaration(version, standalone, doctype)
+ data = self._flush()
+ if data:
+ await self._async_outfile.write(data)
+
+ async def write_doctype(self, doctype):
+ self._writer.write_doctype(doctype)
+ data = self._flush()
+ if data:
+ await self._async_outfile.write(data)
+
+ async def write(self, *args, with_tail=True, pretty_print=False, method=None):
+ self._writer.write(*args, with_tail=with_tail, pretty_print=pretty_print, method=method)
+ data = self._flush()
+ if data:
+ await self._async_outfile.write(data)
+
+ def method(self, method):
+ return self._writer.method(method)
+
+ def element(self, tag, attrib=None, nsmap=None, method=None, **_extra):
+ element_writer = self._writer.element(tag, attrib, nsmap, method, **_extra)
+ return _AsyncFileWriterElement(element_writer, self)
+
+ async def _close(self, bint raise_on_error):
+ self._writer._close(raise_on_error)
+ data = self._buffer.collect()
+ if data:
+ await self._async_outfile.write(data)
+ if self._should_close:
+ await self._async_outfile.close()
+
+
+@cython.final
+@cython.internal
+cdef class _AsyncFileWriterElement:
+ cdef _FileWriterElement _element_writer
+ cdef _AsyncIncrementalFileWriter _writer
+
+ def __cinit__(self, _FileWriterElement element_writer not None,
+ _AsyncIncrementalFileWriter writer not None):
+ self._element_writer = element_writer
+ self._writer = writer
+
+ async def __aenter__(self):
+ self._element_writer.__enter__()
+ data = self._writer._flush()
+ if data:
+ await self._writer._async_outfile.write(data)
+
+ async def __aexit__(self, *args):
+ self._element_writer.__exit__(*args)
+ data = self._writer._flush()
+ if data:
+ await self._writer._async_outfile.write(data)
+
+
+@cython.final
+@cython.internal
+@cython.freelist(8)
+cdef class _FileWriterElement:
+ cdef _IncrementalFileWriter _writer
+ cdef object _element
+ cdef int _new_method
+ cdef int _old_method
+
+ def __cinit__(self, _IncrementalFileWriter writer not None, element_config, int method):
+ self._writer = writer
+ self._element = element_config
+ self._new_method = method
+ self._old_method = writer._method
+
+ def __enter__(self):
+ self._writer._method = self._new_method
+ self._writer._write_start_element(self._element)
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._writer._write_end_element(self._element)
+ self._writer._method = self._old_method
+
+
+@cython.final
+@cython.internal
+@cython.freelist(8)
+cdef class _MethodChanger:
+ cdef _IncrementalFileWriter _writer
+ cdef int _new_method
+ cdef int _old_method
+ cdef bint _entered
+ cdef bint _exited
+
+ def __cinit__(self, _IncrementalFileWriter writer not None, int method):
+ self._writer = writer
+ self._new_method = method
+ self._old_method = writer._method
+ self._entered = False
+ self._exited = False
+
+ def __enter__(self):
+ if self._entered:
+ raise LxmlSyntaxError("Inconsistent enter action in context manager")
+ self._writer._method = self._new_method
+ self._entered = True
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if self._exited:
+ raise LxmlSyntaxError("Inconsistent exit action in context manager")
+ if self._writer._method != self._new_method:
+ raise LxmlSyntaxError("Method changed outside of context manager")
+ self._writer._method = self._old_method
+ self._exited = True
+
+ async def __aenter__(self):
+ # for your async convenience
+ return self.__enter__()
+
+ async def __aexit__(self, *args):
+ # for your async convenience
+ return self.__exit__(*args)