diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/pptx/opc/serialized.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/pptx/opc/serialized.py | 296 |
1 files changed, 296 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pptx/opc/serialized.py b/.venv/lib/python3.12/site-packages/pptx/opc/serialized.py new file mode 100644 index 00000000..92366708 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/pptx/opc/serialized.py @@ -0,0 +1,296 @@ +"""API for reading/writing serialized Open Packaging Convention (OPC) package.""" + +from __future__ import annotations + +import os +import posixpath +import zipfile +from typing import IO, TYPE_CHECKING, Any, Container, Sequence + +from pptx.exc import PackageNotFoundError +from pptx.opc.constants import CONTENT_TYPE as CT +from pptx.opc.oxml import CT_Types, serialize_part_xml +from pptx.opc.packuri import CONTENT_TYPES_URI, PACKAGE_URI, PackURI +from pptx.opc.shared import CaseInsensitiveDict +from pptx.opc.spec import default_content_types +from pptx.util import lazyproperty + +if TYPE_CHECKING: + from pptx.opc.package import Part, _Relationships # pyright: ignore[reportPrivateUsage] + + +class PackageReader(Container[bytes]): + """Provides access to package-parts of an OPC package with dict semantics. + + The package may be in zip-format (a .pptx file) or expanded into a directory structure, + perhaps by unzipping a .pptx file. + """ + + def __init__(self, pkg_file: str | IO[bytes]): + self._pkg_file = pkg_file + + def __contains__(self, pack_uri: object) -> bool: + """Return True when part identified by `pack_uri` is present in package.""" + return pack_uri in self._blob_reader + + def __getitem__(self, pack_uri: PackURI) -> bytes: + """Return bytes for part corresponding to `pack_uri`.""" + return self._blob_reader[pack_uri] + + def rels_xml_for(self, partname: PackURI) -> bytes | None: + """Return optional rels item XML for `partname`. + + Returns `None` if no rels item is present for `partname`. `partname` is a |PackURI| + instance. + """ + blob_reader, uri = self._blob_reader, partname.rels_uri + return blob_reader[uri] if uri in blob_reader else None + + @lazyproperty + def _blob_reader(self) -> _PhysPkgReader: + """|_PhysPkgReader| subtype providing read access to the package file.""" + return _PhysPkgReader.factory(self._pkg_file) + + +class PackageWriter: + """Writes a zip-format OPC package to `pkg_file`. + + `pkg_file` can be either a path to a zip file (a string) or a file-like object. `pkg_rels` is + the |_Relationships| object containing relationships for the package. `parts` is a sequence of + |Part| subtype instance to be written to the package. + + Its single API classmethod is :meth:`write`. This class is not intended to be instantiated. + """ + + def __init__(self, pkg_file: str | IO[bytes], pkg_rels: _Relationships, parts: Sequence[Part]): + self._pkg_file = pkg_file + self._pkg_rels = pkg_rels + self._parts = parts + + @classmethod + def write( + cls, pkg_file: str | IO[bytes], pkg_rels: _Relationships, parts: Sequence[Part] + ) -> None: + """Write a physical package (.pptx file) to `pkg_file`. + + The serialized package contains `pkg_rels` and `parts`, a content-types stream based on + the content type of each part, and a .rels file for each part that has relationships. + """ + cls(pkg_file, pkg_rels, parts)._write() + + def _write(self) -> None: + """Write physical package (.pptx file).""" + with _PhysPkgWriter.factory(self._pkg_file) as phys_writer: + self._write_content_types_stream(phys_writer) + self._write_pkg_rels(phys_writer) + self._write_parts(phys_writer) + + def _write_content_types_stream(self, phys_writer: _PhysPkgWriter) -> None: + """Write `[Content_Types].xml` part to the physical package. + + This part must contain an appropriate content type lookup target for each part in the + package. + """ + phys_writer.write( + CONTENT_TYPES_URI, + serialize_part_xml(_ContentTypesItem.xml_for(self._parts)), + ) + + def _write_parts(self, phys_writer: _PhysPkgWriter) -> None: + """Write blob of each part in `parts` to the package. + + A rels item for each part is also written when the part has relationships. + """ + for part in self._parts: + phys_writer.write(part.partname, part.blob) + if part._rels: # pyright: ignore[reportPrivateUsage] + phys_writer.write(part.partname.rels_uri, part.rels.xml) + + def _write_pkg_rels(self, phys_writer: _PhysPkgWriter) -> None: + """Write the XML rels item for `pkg_rels` ('/_rels/.rels') to the package.""" + phys_writer.write(PACKAGE_URI.rels_uri, self._pkg_rels.xml) + + +class _PhysPkgReader(Container[PackURI]): + """Base class for physical package reader objects.""" + + def __contains__(self, item: object) -> bool: + """Must be implemented by each subclass.""" + raise NotImplementedError( # pragma: no cover + "`%s` must implement `.__contains__()`" % type(self).__name__ + ) + + def __getitem__(self, pack_uri: PackURI) -> bytes: + """Blob for part corresponding to `pack_uri`.""" + raise NotImplementedError( # pragma: no cover + f"`{type(self).__name__}` must implement `.__contains__()`" + ) + + @classmethod + def factory(cls, pkg_file: str | IO[bytes]) -> _PhysPkgReader: + """Return |_PhysPkgReader| subtype instance appropriage for `pkg_file`.""" + # --- for pkg_file other than str, assume it's a stream and pass it to Zip + # --- reader to sort out + if not isinstance(pkg_file, str): + return _ZipPkgReader(pkg_file) + + # --- otherwise we treat `pkg_file` as a path --- + if os.path.isdir(pkg_file): + return _DirPkgReader(pkg_file) + + if zipfile.is_zipfile(pkg_file): + return _ZipPkgReader(pkg_file) + + raise PackageNotFoundError("Package not found at '%s'" % pkg_file) + + +class _DirPkgReader(_PhysPkgReader): + """Implements |PhysPkgReader| interface for OPC package extracted into directory. + + `path` is the path to a directory containing an expanded package. + """ + + def __init__(self, path: str): + self._path = os.path.abspath(path) + + def __contains__(self, pack_uri: object) -> bool: + """Return True when part identified by `pack_uri` is present in zip archive.""" + if not isinstance(pack_uri, PackURI): + return False + return os.path.exists(posixpath.join(self._path, pack_uri.membername)) + + def __getitem__(self, pack_uri: PackURI) -> bytes: + """Return bytes of file corresponding to `pack_uri` in package directory.""" + path = os.path.join(self._path, pack_uri.membername) + try: + with open(path, "rb") as f: + return f.read() + except IOError: + raise KeyError("no member '%s' in package" % pack_uri) + + +class _ZipPkgReader(_PhysPkgReader): + """Implements |PhysPkgReader| interface for a zip-file OPC package.""" + + def __init__(self, pkg_file: str | IO[bytes]): + self._pkg_file = pkg_file + + def __contains__(self, pack_uri: object) -> bool: + """Return True when part identified by `pack_uri` is present in zip archive.""" + return pack_uri in self._blobs + + def __getitem__(self, pack_uri: PackURI) -> bytes: + """Return bytes for part corresponding to `pack_uri`. + + Raises |KeyError| if no matching member is present in zip archive. + """ + if pack_uri not in self._blobs: + raise KeyError("no member '%s' in package" % pack_uri) + return self._blobs[pack_uri] + + @lazyproperty + def _blobs(self) -> dict[PackURI, bytes]: + """dict mapping partname to package part binaries.""" + with zipfile.ZipFile(self._pkg_file, "r") as z: + return {PackURI("/%s" % name): z.read(name) for name in z.namelist()} + + +class _PhysPkgWriter: + """Base class for physical package writer objects.""" + + @classmethod + def factory(cls, pkg_file: str | IO[bytes]) -> _ZipPkgWriter: + """Return |_PhysPkgWriter| subtype instance appropriage for `pkg_file`. + + Currently the only subtype is `_ZipPkgWriter`, but a `_DirPkgWriter` could be implemented + or even a `_StreamPkgWriter`. + """ + return _ZipPkgWriter(pkg_file) + + def write(self, pack_uri: PackURI, blob: bytes) -> None: + """Write `blob` to package with membername corresponding to `pack_uri`.""" + raise NotImplementedError( # pragma: no cover + f"`{type(self).__name__}` must implement `.write()`" + ) + + +class _ZipPkgWriter(_PhysPkgWriter): + """Implements |PhysPkgWriter| interface for a zip-file (.pptx file) OPC package.""" + + def __init__(self, pkg_file: str | IO[bytes]): + self._pkg_file = pkg_file + + def __enter__(self) -> _ZipPkgWriter: + """Enable use as a context-manager. Opening zip for writing happens here.""" + return self + + def __exit__(self, *exc: list[Any]) -> None: + """Close the zip archive on exit from context. + + Closing flushes any pending physical writes and releasing any resources it's using. + """ + self._zipf.close() + + def write(self, pack_uri: PackURI, blob: bytes) -> None: + """Write `blob` to zip package with membername corresponding to `pack_uri`.""" + self._zipf.writestr(pack_uri.membername, blob) + + @lazyproperty + def _zipf(self) -> zipfile.ZipFile: + """`ZipFile` instance open for writing.""" + return zipfile.ZipFile( + self._pkg_file, "w", compression=zipfile.ZIP_DEFLATED, strict_timestamps=False + ) + + +class _ContentTypesItem: + """Composes content-types "part" ([Content_Types].xml) for a collection of parts.""" + + def __init__(self, parts: Sequence[Part]): + self._parts = parts + + @classmethod + def xml_for(cls, parts: Sequence[Part]) -> CT_Types: + """Return content-types XML mapping each part in `parts` to a content-type. + + The resulting XML is suitable for storage as `[Content_Types].xml` in an OPC package. + """ + return cls(parts)._xml + + @lazyproperty + def _xml(self) -> CT_Types: + """lxml.etree._Element containing the content-types item. + + This XML object is suitable for serialization to the `[Content_Types].xml` item for an OPC + package. Although the sequence of elements is not strictly significant, as an aid to + testing and readability Default elements are sorted by extension and Override elements are + sorted by partname. + """ + defaults, overrides = self._defaults_and_overrides + _types_elm = CT_Types.new() + + for ext, content_type in sorted(defaults.items()): + _types_elm.add_default(ext, content_type) + for partname, content_type in sorted(overrides.items()): + _types_elm.add_override(partname, content_type) + + return _types_elm + + @lazyproperty + def _defaults_and_overrides(self) -> tuple[dict[str, str], dict[PackURI, str]]: + """pair of dict (defaults, overrides) accounting for all parts. + + `defaults` is {ext: content_type} and overrides is {partname: content_type}. + """ + defaults = CaseInsensitiveDict(rels=CT.OPC_RELATIONSHIPS, xml=CT.XML) + overrides: dict[PackURI, str] = {} + + for part in self._parts: + partname, content_type = part.partname, part.content_type + ext = partname.ext + if (ext.lower(), content_type) in default_content_types: + defaults[ext] = content_type + else: + overrides[partname] = content_type + + return defaults, overrides |