about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/pyasn1/codec/cer
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/pyasn1/codec/cer
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/pyasn1/codec/cer')
-rw-r--r--.venv/lib/python3.12/site-packages/pyasn1/codec/cer/__init__.py1
-rw-r--r--.venv/lib/python3.12/site-packages/pyasn1/codec/cer/decoder.py149
-rw-r--r--.venv/lib/python3.12/site-packages/pyasn1/codec/cer/encoder.py331
3 files changed, 481 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pyasn1/codec/cer/__init__.py b/.venv/lib/python3.12/site-packages/pyasn1/codec/cer/__init__.py
new file mode 100644
index 00000000..8c3066b2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pyasn1/codec/cer/__init__.py
@@ -0,0 +1 @@
+# This file is necessary to make this directory a package.
diff --git a/.venv/lib/python3.12/site-packages/pyasn1/codec/cer/decoder.py b/.venv/lib/python3.12/site-packages/pyasn1/codec/cer/decoder.py
new file mode 100644
index 00000000..d6890f01
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pyasn1/codec/cer/decoder.py
@@ -0,0 +1,149 @@
+#
+# This file is part of pyasn1 software.
+#
+# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
+# License: https://pyasn1.readthedocs.io/en/latest/license.html
+#
+import warnings
+
+from pyasn1 import error
+from pyasn1.codec.streaming import readFromStream
+from pyasn1.codec.ber import decoder
+from pyasn1.type import univ
+
+__all__ = ['decode', 'StreamingDecoder']
+
+SubstrateUnderrunError = error.SubstrateUnderrunError
+
+
+class BooleanPayloadDecoder(decoder.AbstractSimplePayloadDecoder):
+    protoComponent = univ.Boolean(0)
+
+    def valueDecoder(self, substrate, asn1Spec,
+                     tagSet=None, length=None, state=None,
+                     decodeFun=None, substrateFun=None,
+                     **options):
+
+        if length != 1:
+            raise error.PyAsn1Error('Not single-octet Boolean payload')
+
+        for chunk in readFromStream(substrate, length, options):
+            if isinstance(chunk, SubstrateUnderrunError):
+                yield chunk
+
+        byte = chunk[0]
+
+        # CER/DER specifies encoding of TRUE as 0xFF and FALSE as 0x0, while
+        # BER allows any non-zero value as TRUE; cf. sections 8.2.2. and 11.1 
+        # in https://www.itu.int/ITU-T/studygroups/com17/languages/X.690-0207.pdf
+        if byte == 0xff:
+            value = 1
+
+        elif byte == 0x00:
+            value = 0
+
+        else:
+            raise error.PyAsn1Error('Unexpected Boolean payload: %s' % byte)
+
+        yield self._createComponent(asn1Spec, tagSet, value, **options)
+
+
+# TODO: prohibit non-canonical encoding
+BitStringPayloadDecoder = decoder.BitStringPayloadDecoder
+OctetStringPayloadDecoder = decoder.OctetStringPayloadDecoder
+RealPayloadDecoder = decoder.RealPayloadDecoder
+
+TAG_MAP = decoder.TAG_MAP.copy()
+TAG_MAP.update(
+    {univ.Boolean.tagSet: BooleanPayloadDecoder(),
+     univ.BitString.tagSet: BitStringPayloadDecoder(),
+     univ.OctetString.tagSet: OctetStringPayloadDecoder(),
+     univ.Real.tagSet: RealPayloadDecoder()}
+)
+
+TYPE_MAP = decoder.TYPE_MAP.copy()
+
+# Put in non-ambiguous types for faster codec lookup
+for typeDecoder in TAG_MAP.values():
+    if typeDecoder.protoComponent is not None:
+        typeId = typeDecoder.protoComponent.__class__.typeId
+        if typeId is not None and typeId not in TYPE_MAP:
+            TYPE_MAP[typeId] = typeDecoder
+
+
+class SingleItemDecoder(decoder.SingleItemDecoder):
+    __doc__ = decoder.SingleItemDecoder.__doc__
+
+    TAG_MAP = TAG_MAP
+    TYPE_MAP = TYPE_MAP
+
+
+class StreamingDecoder(decoder.StreamingDecoder):
+    __doc__ = decoder.StreamingDecoder.__doc__
+
+    SINGLE_ITEM_DECODER = SingleItemDecoder
+
+
+class Decoder(decoder.Decoder):
+    __doc__ = decoder.Decoder.__doc__
+
+    STREAMING_DECODER = StreamingDecoder
+
+
+#: Turns CER octet stream into an ASN.1 object.
+#:
+#: Takes CER octet-stream and decode it into an ASN.1 object
+#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
+#: may be a scalar or an arbitrary nested structure.
+#:
+#: Parameters
+#: ----------
+#: substrate: :py:class:`bytes`
+#:     CER octet-stream
+#:
+#: Keyword Args
+#: ------------
+#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
+#:     A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure
+#:     being decoded, *asn1Spec* may or may not be required. Most common reason for
+#:     it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode.
+#:
+#: Returns
+#: -------
+#: : :py:class:`tuple`
+#:     A tuple of pyasn1 object recovered from CER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
+#:     and the unprocessed trailing portion of the *substrate* (may be empty)
+#:
+#: Raises
+#: ------
+#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError
+#:     On decoding errors
+#:
+#: Examples
+#: --------
+#: Decode CER serialisation without ASN.1 schema
+#:
+#: .. code-block:: pycon
+#:
+#:    >>> s, _ = decode(b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00')
+#:    >>> str(s)
+#:    SequenceOf:
+#:     1 2 3
+#:
+#: Decode CER serialisation with ASN.1 schema
+#:
+#: .. code-block:: pycon
+#:
+#:    >>> seq = SequenceOf(componentType=Integer())
+#:    >>> s, _ = decode(b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00', asn1Spec=seq)
+#:    >>> str(s)
+#:    SequenceOf:
+#:     1 2 3
+#:
+decode = Decoder()
+
+def __getattr__(attr: str):
+    if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr):
+        warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning)
+        return globals()[newAttr]
+    raise AttributeError(attr)
diff --git a/.venv/lib/python3.12/site-packages/pyasn1/codec/cer/encoder.py b/.venv/lib/python3.12/site-packages/pyasn1/codec/cer/encoder.py
new file mode 100644
index 00000000..e16e9ece
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pyasn1/codec/cer/encoder.py
@@ -0,0 +1,331 @@
+#
+# This file is part of pyasn1 software.
+#
+# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
+# License: https://pyasn1.readthedocs.io/en/latest/license.html
+#
+import warnings
+
+from pyasn1 import error
+from pyasn1.codec.ber import encoder
+from pyasn1.type import univ
+from pyasn1.type import useful
+
+__all__ = ['Encoder', 'encode']
+
+
+class BooleanEncoder(encoder.IntegerEncoder):
+    def encodeValue(self, value, asn1Spec, encodeFun, **options):
+        if value == 0:
+            substrate = (0,)
+        else:
+            substrate = (255,)
+        return substrate, False, False
+
+
+class RealEncoder(encoder.RealEncoder):
+    def _chooseEncBase(self, value):
+        m, b, e = value
+        return self._dropFloatingPoint(m, b, e)
+
+
+# specialized GeneralStringEncoder here
+
+class TimeEncoderMixIn(object):
+    Z_CHAR = ord('Z')
+    PLUS_CHAR = ord('+')
+    MINUS_CHAR = ord('-')
+    COMMA_CHAR = ord(',')
+    DOT_CHAR = ord('.')
+    ZERO_CHAR = ord('0')
+
+    MIN_LENGTH = 12
+    MAX_LENGTH = 19
+
+    def encodeValue(self, value, asn1Spec, encodeFun, **options):
+        # CER encoding constraints:
+        # - minutes are mandatory, seconds are optional
+        # - sub-seconds must NOT be zero / no meaningless zeros
+        # - no hanging fraction dot
+        # - time in UTC (Z)
+        # - only dot is allowed for fractions
+
+        if asn1Spec is not None:
+            value = asn1Spec.clone(value)
+
+        numbers = value.asNumbers()
+
+        if self.PLUS_CHAR in numbers or self.MINUS_CHAR in numbers:
+            raise error.PyAsn1Error('Must be UTC time: %r' % value)
+
+        if numbers[-1] != self.Z_CHAR:
+            raise error.PyAsn1Error('Missing "Z" time zone specifier: %r' % value)
+
+        if self.COMMA_CHAR in numbers:
+            raise error.PyAsn1Error('Comma in fractions disallowed: %r' % value)
+
+        if self.DOT_CHAR in numbers:
+
+            isModified = False
+
+            numbers = list(numbers)
+
+            searchIndex = min(numbers.index(self.DOT_CHAR) + 4, len(numbers) - 1)
+
+            while numbers[searchIndex] != self.DOT_CHAR:
+                if numbers[searchIndex] == self.ZERO_CHAR:
+                    del numbers[searchIndex]
+                    isModified = True
+
+                searchIndex -= 1
+
+            searchIndex += 1
+
+            if searchIndex < len(numbers):
+                if numbers[searchIndex] == self.Z_CHAR:
+                    # drop hanging comma
+                    del numbers[searchIndex - 1]
+                    isModified = True
+
+            if isModified:
+                value = value.clone(numbers)
+
+        if not self.MIN_LENGTH < len(numbers) < self.MAX_LENGTH:
+            raise error.PyAsn1Error('Length constraint violated: %r' % value)
+
+        options.update(maxChunkSize=1000)
+
+        return encoder.OctetStringEncoder.encodeValue(
+            self, value, asn1Spec, encodeFun, **options
+        )
+
+
+class GeneralizedTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder):
+    MIN_LENGTH = 12
+    MAX_LENGTH = 20
+
+
+class UTCTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder):
+    MIN_LENGTH = 10
+    MAX_LENGTH = 14
+
+
+class SetOfEncoder(encoder.SequenceOfEncoder):
+    def encodeValue(self, value, asn1Spec, encodeFun, **options):
+        chunks = self._encodeComponents(
+            value, asn1Spec, encodeFun, **options)
+
+        # sort by serialised and padded components
+        if len(chunks) > 1:
+            zero = b'\x00'
+            maxLen = max(map(len, chunks))
+            paddedChunks = [
+                (x.ljust(maxLen, zero), x) for x in chunks
+            ]
+            paddedChunks.sort(key=lambda x: x[0])
+
+            chunks = [x[1] for x in paddedChunks]
+
+        return b''.join(chunks), True, True
+
+
+class SequenceOfEncoder(encoder.SequenceOfEncoder):
+    def encodeValue(self, value, asn1Spec, encodeFun, **options):
+
+        if options.get('ifNotEmpty', False) and not len(value):
+            return b'', True, True
+
+        chunks = self._encodeComponents(
+            value, asn1Spec, encodeFun, **options)
+
+        return b''.join(chunks), True, True
+
+
+class SetEncoder(encoder.SequenceEncoder):
+    @staticmethod
+    def _componentSortKey(componentAndType):
+        """Sort SET components by tag
+
+        Sort regardless of the Choice value (static sort)
+        """
+        component, asn1Spec = componentAndType
+
+        if asn1Spec is None:
+            asn1Spec = component
+
+        if asn1Spec.typeId == univ.Choice.typeId and not asn1Spec.tagSet:
+            if asn1Spec.tagSet:
+                return asn1Spec.tagSet
+            else:
+                return asn1Spec.componentType.minTagSet
+        else:
+            return asn1Spec.tagSet
+
+    def encodeValue(self, value, asn1Spec, encodeFun, **options):
+
+        substrate = b''
+
+        comps = []
+        compsMap = {}
+
+        if asn1Spec is None:
+            # instance of ASN.1 schema
+            inconsistency = value.isInconsistent
+            if inconsistency:
+                raise error.PyAsn1Error(
+                    f"ASN.1 object {value.__class__.__name__} is inconsistent")
+
+            namedTypes = value.componentType
+
+            for idx, component in enumerate(value.values()):
+                if namedTypes:
+                    namedType = namedTypes[idx]
+
+                    if namedType.isOptional and not component.isValue:
+                            continue
+
+                    if namedType.isDefaulted and component == namedType.asn1Object:
+                            continue
+
+                    compsMap[id(component)] = namedType
+
+                else:
+                    compsMap[id(component)] = None
+
+                comps.append((component, asn1Spec))
+
+        else:
+            # bare Python value + ASN.1 schema
+            for idx, namedType in enumerate(asn1Spec.componentType.namedTypes):
+
+                try:
+                    component = value[namedType.name]
+
+                except KeyError:
+                    raise error.PyAsn1Error('Component name "%s" not found in %r' % (namedType.name, value))
+
+                if namedType.isOptional and namedType.name not in value:
+                    continue
+
+                if namedType.isDefaulted and component == namedType.asn1Object:
+                    continue
+
+                compsMap[id(component)] = namedType
+                comps.append((component, asn1Spec[idx]))
+
+        for comp, compType in sorted(comps, key=self._componentSortKey):
+            namedType = compsMap[id(comp)]
+
+            if namedType:
+                options.update(ifNotEmpty=namedType.isOptional)
+
+            chunk = encodeFun(comp, compType, **options)
+
+            # wrap open type blob if needed
+            if namedType and namedType.openType:
+                wrapType = namedType.asn1Object
+                if wrapType.tagSet and not wrapType.isSameTypeWith(comp):
+                    chunk = encodeFun(chunk, wrapType, **options)
+
+            substrate += chunk
+
+        return substrate, True, True
+
+
+class SequenceEncoder(encoder.SequenceEncoder):
+    omitEmptyOptionals = True
+
+
+TAG_MAP = encoder.TAG_MAP.copy()
+
+TAG_MAP.update({
+    univ.Boolean.tagSet: BooleanEncoder(),
+    univ.Real.tagSet: RealEncoder(),
+    useful.GeneralizedTime.tagSet: GeneralizedTimeEncoder(),
+    useful.UTCTime.tagSet: UTCTimeEncoder(),
+    # Sequence & Set have same tags as SequenceOf & SetOf
+    univ.SetOf.tagSet: SetOfEncoder(),
+    univ.Sequence.typeId: SequenceEncoder()
+})
+
+TYPE_MAP = encoder.TYPE_MAP.copy()
+
+TYPE_MAP.update({
+    univ.Boolean.typeId: BooleanEncoder(),
+    univ.Real.typeId: RealEncoder(),
+    useful.GeneralizedTime.typeId: GeneralizedTimeEncoder(),
+    useful.UTCTime.typeId: UTCTimeEncoder(),
+    # Sequence & Set have same tags as SequenceOf & SetOf
+    univ.Set.typeId: SetEncoder(),
+    univ.SetOf.typeId: SetOfEncoder(),
+    univ.Sequence.typeId: SequenceEncoder(),
+    univ.SequenceOf.typeId: SequenceOfEncoder()
+})
+
+
+class SingleItemEncoder(encoder.SingleItemEncoder):
+    fixedDefLengthMode = False
+    fixedChunkSize = 1000
+
+    TAG_MAP = TAG_MAP
+    TYPE_MAP = TYPE_MAP
+
+
+class Encoder(encoder.Encoder):
+    SINGLE_ITEM_ENCODER = SingleItemEncoder
+
+
+#: Turns ASN.1 object into CER octet stream.
+#:
+#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
+#: walks all its components recursively and produces a CER octet stream.
+#:
+#: Parameters
+#: ----------
+#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
+#:     A Python or pyasn1 object to encode. If Python object is given, `asnSpec`
+#:     parameter is required to guide the encoding process.
+#:
+#: Keyword Args
+#: ------------
+#: asn1Spec:
+#:     Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
+#:
+#: Returns
+#: -------
+#: : :py:class:`bytes`
+#:     Given ASN.1 object encoded into BER octet-stream
+#:
+#: Raises
+#: ------
+#: ~pyasn1.error.PyAsn1Error
+#:     On encoding errors
+#:
+#: Examples
+#: --------
+#: Encode Python value into CER with ASN.1 schema
+#:
+#: .. code-block:: pycon
+#:
+#:    >>> seq = SequenceOf(componentType=Integer())
+#:    >>> encode([1, 2, 3], asn1Spec=seq)
+#:    b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00'
+#:
+#: Encode ASN.1 value object into CER
+#:
+#: .. code-block:: pycon
+#:
+#:    >>> seq = SequenceOf(componentType=Integer())
+#:    >>> seq.extend([1, 2, 3])
+#:    >>> encode(seq)
+#:    b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00'
+#:
+encode = Encoder()
+
+# EncoderFactory queries class instance and builds a map of tags -> encoders
+
+def __getattr__(attr: str):
+    if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr):
+        warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning)
+        return globals()[newAttr]
+    raise AttributeError(attr)