aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/pyasn1/codec/ber
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/pyasn1/codec/ber')
-rw-r--r--.venv/lib/python3.12/site-packages/pyasn1/codec/ber/__init__.py1
-rw-r--r--.venv/lib/python3.12/site-packages/pyasn1/codec/ber/decoder.py2189
-rw-r--r--.venv/lib/python3.12/site-packages/pyasn1/codec/ber/encoder.py954
-rw-r--r--.venv/lib/python3.12/site-packages/pyasn1/codec/ber/eoo.py28
4 files changed, 3172 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pyasn1/codec/ber/__init__.py b/.venv/lib/python3.12/site-packages/pyasn1/codec/ber/__init__.py
new file mode 100644
index 00000000..8c3066b2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pyasn1/codec/ber/__init__.py
@@ -0,0 +1 @@
+# This file is necessary to make this directory a package.
diff --git a/.venv/lib/python3.12/site-packages/pyasn1/codec/ber/decoder.py b/.venv/lib/python3.12/site-packages/pyasn1/codec/ber/decoder.py
new file mode 100644
index 00000000..7e69ca15
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pyasn1/codec/ber/decoder.py
@@ -0,0 +1,2189 @@
+#
+# This file is part of pyasn1 software.
+#
+# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
+# License: https://pyasn1.readthedocs.io/en/latest/license.html
+#
+import io
+import os
+import sys
+import warnings
+
+from pyasn1 import debug
+from pyasn1 import error
+from pyasn1.codec.ber import eoo
+from pyasn1.codec.streaming import asSeekableStream
+from pyasn1.codec.streaming import isEndOfStream
+from pyasn1.codec.streaming import peekIntoStream
+from pyasn1.codec.streaming import readFromStream
+from pyasn1.compat import _MISSING
+from pyasn1.error import PyAsn1Error
+from pyasn1.type import base
+from pyasn1.type import char
+from pyasn1.type import tag
+from pyasn1.type import tagmap
+from pyasn1.type import univ
+from pyasn1.type import useful
+
+__all__ = ['StreamingDecoder', 'Decoder', 'decode']
+
+LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER)
+
+noValue = base.noValue
+
+SubstrateUnderrunError = error.SubstrateUnderrunError
+
+
+class AbstractPayloadDecoder(object):
+ protoComponent = None
+
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ """Decode value with fixed byte length.
+
+ The decoder is allowed to consume as many bytes as necessary.
+ """
+ raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
+
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ """Decode value with undefined length.
+
+ The decoder is allowed to consume as many bytes as necessary.
+ """
+ raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
+
+ @staticmethod
+ def _passAsn1Object(asn1Object, options):
+ if 'asn1Object' not in options:
+ options['asn1Object'] = asn1Object
+
+ return options
+
+
+class AbstractSimplePayloadDecoder(AbstractPayloadDecoder):
+ @staticmethod
+ def substrateCollector(asn1Object, substrate, length, options):
+ for chunk in readFromStream(substrate, length, options):
+ yield chunk
+
+ def _createComponent(self, asn1Spec, tagSet, value, **options):
+ if options.get('native'):
+ return value
+ elif asn1Spec is None:
+ return self.protoComponent.clone(value, tagSet=tagSet)
+ elif value is noValue:
+ return asn1Spec
+ else:
+ return asn1Spec.clone(value)
+
+
+class RawPayloadDecoder(AbstractSimplePayloadDecoder):
+ protoComponent = univ.Any('')
+
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if substrateFun:
+ asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
+
+ for chunk in substrateFun(asn1Object, substrate, length, options):
+ yield chunk
+
+ return
+
+ for value in decodeFun(substrate, asn1Spec, tagSet, length, **options):
+ yield value
+
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if substrateFun:
+ asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
+
+ for chunk in substrateFun(asn1Object, substrate, length, options):
+ yield chunk
+
+ return
+
+ while True:
+ for value in decodeFun(
+ substrate, asn1Spec, tagSet, length,
+ allowEoo=True, **options):
+
+ if value is eoo.endOfOctets:
+ return
+
+ yield value
+
+
+rawPayloadDecoder = RawPayloadDecoder()
+
+
+class IntegerPayloadDecoder(AbstractSimplePayloadDecoder):
+ protoComponent = univ.Integer(0)
+
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+
+ if tagSet[0].tagFormat != tag.tagFormatSimple:
+ raise error.PyAsn1Error('Simple tag format expected')
+
+ for chunk in readFromStream(substrate, length, options):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+
+ if chunk:
+ value = int.from_bytes(bytes(chunk), 'big', signed=True)
+
+ else:
+ value = 0
+
+ yield self._createComponent(asn1Spec, tagSet, value, **options)
+
+
+class BooleanPayloadDecoder(IntegerPayloadDecoder):
+ protoComponent = univ.Boolean(0)
+
+ def _createComponent(self, asn1Spec, tagSet, value, **options):
+ return IntegerPayloadDecoder._createComponent(
+ self, asn1Spec, tagSet, value and 1 or 0, **options)
+
+
+class BitStringPayloadDecoder(AbstractSimplePayloadDecoder):
+ protoComponent = univ.BitString(())
+ supportConstructedForm = True
+
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+
+ if substrateFun:
+ asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
+
+ for chunk in substrateFun(asn1Object, substrate, length, options):
+ yield chunk
+
+ return
+
+ if not length:
+ raise error.PyAsn1Error('Empty BIT STRING substrate')
+
+ for chunk in isEndOfStream(substrate):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+
+ if chunk:
+ raise error.PyAsn1Error('Empty BIT STRING substrate')
+
+ if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
+
+ for trailingBits in readFromStream(substrate, 1, options):
+ if isinstance(trailingBits, SubstrateUnderrunError):
+ yield trailingBits
+
+ trailingBits = ord(trailingBits)
+ if trailingBits > 7:
+ raise error.PyAsn1Error(
+ 'Trailing bits overflow %s' % trailingBits
+ )
+
+ for chunk in readFromStream(substrate, length - 1, options):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+
+ value = self.protoComponent.fromOctetString(
+ chunk, internalFormat=True, padding=trailingBits)
+
+ yield self._createComponent(asn1Spec, tagSet, value, **options)
+
+ return
+
+ if not self.supportConstructedForm:
+ raise error.PyAsn1Error('Constructed encoding form prohibited '
+ 'at %s' % self.__class__.__name__)
+
+ if LOG:
+ LOG('assembling constructed serialization')
+
+ # All inner fragments are of the same type, treat them as octet string
+ substrateFun = self.substrateCollector
+
+ bitString = self.protoComponent.fromOctetString(b'', internalFormat=True)
+
+ current_position = substrate.tell()
+
+ while substrate.tell() - current_position < length:
+ for component in decodeFun(
+ substrate, self.protoComponent, substrateFun=substrateFun,
+ **options):
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ trailingBits = component[0]
+ if trailingBits > 7:
+ raise error.PyAsn1Error(
+ 'Trailing bits overflow %s' % trailingBits
+ )
+
+ bitString = self.protoComponent.fromOctetString(
+ component[1:], internalFormat=True,
+ prepend=bitString, padding=trailingBits
+ )
+
+ yield self._createComponent(asn1Spec, tagSet, bitString, **options)
+
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+
+ if substrateFun:
+ asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
+
+ for chunk in substrateFun(asn1Object, substrate, length, options):
+ yield chunk
+
+ return
+
+ # All inner fragments are of the same type, treat them as octet string
+ substrateFun = self.substrateCollector
+
+ bitString = self.protoComponent.fromOctetString(b'', internalFormat=True)
+
+ while True: # loop over fragments
+
+ for component in decodeFun(
+ substrate, self.protoComponent, substrateFun=substrateFun,
+ allowEoo=True, **options):
+
+ if component is eoo.endOfOctets:
+ break
+
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ if component is eoo.endOfOctets:
+ break
+
+ trailingBits = component[0]
+ if trailingBits > 7:
+ raise error.PyAsn1Error(
+ 'Trailing bits overflow %s' % trailingBits
+ )
+
+ bitString = self.protoComponent.fromOctetString(
+ component[1:], internalFormat=True,
+ prepend=bitString, padding=trailingBits
+ )
+
+ yield self._createComponent(asn1Spec, tagSet, bitString, **options)
+
+
+class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder):
+ protoComponent = univ.OctetString('')
+ supportConstructedForm = True
+
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if substrateFun:
+ asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
+
+ for chunk in substrateFun(asn1Object, substrate, length, options):
+ yield chunk
+
+ return
+
+ if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
+ for chunk in readFromStream(substrate, length, options):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+
+ yield self._createComponent(asn1Spec, tagSet, chunk, **options)
+
+ return
+
+ if not self.supportConstructedForm:
+ raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__)
+
+ if LOG:
+ LOG('assembling constructed serialization')
+
+ # All inner fragments are of the same type, treat them as octet string
+ substrateFun = self.substrateCollector
+
+ header = b''
+
+ original_position = substrate.tell()
+ # head = popSubstream(substrate, length)
+ while substrate.tell() - original_position < length:
+ for component in decodeFun(
+ substrate, self.protoComponent, substrateFun=substrateFun,
+ **options):
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ header += component
+
+ yield self._createComponent(asn1Spec, tagSet, header, **options)
+
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if substrateFun and substrateFun is not self.substrateCollector:
+ asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
+
+ for chunk in substrateFun(asn1Object, substrate, length, options):
+ yield chunk
+
+ return
+
+ # All inner fragments are of the same type, treat them as octet string
+ substrateFun = self.substrateCollector
+
+ header = b''
+
+ while True: # loop over fragments
+
+ for component in decodeFun(
+ substrate, self.protoComponent, substrateFun=substrateFun,
+ allowEoo=True, **options):
+
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ if component is eoo.endOfOctets:
+ break
+
+ if component is eoo.endOfOctets:
+ break
+
+ header += component
+
+ yield self._createComponent(asn1Spec, tagSet, header, **options)
+
+
+class NullPayloadDecoder(AbstractSimplePayloadDecoder):
+ protoComponent = univ.Null('')
+
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+
+ if tagSet[0].tagFormat != tag.tagFormatSimple:
+ raise error.PyAsn1Error('Simple tag format expected')
+
+ for chunk in readFromStream(substrate, length, options):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+
+ component = self._createComponent(asn1Spec, tagSet, '', **options)
+
+ if chunk:
+ raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length)
+
+ yield component
+
+
+class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder):
+ protoComponent = univ.ObjectIdentifier(())
+
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if tagSet[0].tagFormat != tag.tagFormatSimple:
+ raise error.PyAsn1Error('Simple tag format expected')
+
+ for chunk in readFromStream(substrate, length, options):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+
+ if not chunk:
+ raise error.PyAsn1Error('Empty substrate')
+
+ oid = ()
+ index = 0
+ substrateLen = len(chunk)
+ while index < substrateLen:
+ subId = chunk[index]
+ index += 1
+ if subId < 128:
+ oid += (subId,)
+ elif subId > 128:
+ # Construct subid from a number of octets
+ nextSubId = subId
+ subId = 0
+ while nextSubId >= 128:
+ subId = (subId << 7) + (nextSubId & 0x7F)
+ if index >= substrateLen:
+ raise error.SubstrateUnderrunError(
+ 'Short substrate for sub-OID past %s' % (oid,)
+ )
+ nextSubId = chunk[index]
+ index += 1
+ oid += ((subId << 7) + nextSubId,)
+ elif subId == 128:
+ # ASN.1 spec forbids leading zeros (0x80) in OID
+ # encoding, tolerating it opens a vulnerability. See
+ # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
+ # page 7
+ raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding')
+
+ # Decode two leading arcs
+ if 0 <= oid[0] <= 39:
+ oid = (0,) + oid
+ elif 40 <= oid[0] <= 79:
+ oid = (1, oid[0] - 40) + oid[1:]
+ elif oid[0] >= 80:
+ oid = (2, oid[0] - 80) + oid[1:]
+ else:
+ raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0])
+
+ yield self._createComponent(asn1Spec, tagSet, oid, **options)
+
+
+class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder):
+ protoComponent = univ.RelativeOID(())
+
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if tagSet[0].tagFormat != tag.tagFormatSimple:
+ raise error.PyAsn1Error('Simple tag format expected')
+
+ for chunk in readFromStream(substrate, length, options):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+
+ if not chunk:
+ raise error.PyAsn1Error('Empty substrate')
+
+ reloid = ()
+ index = 0
+ substrateLen = len(chunk)
+ while index < substrateLen:
+ subId = chunk[index]
+ index += 1
+ if subId < 128:
+ reloid += (subId,)
+ elif subId > 128:
+ # Construct subid from a number of octets
+ nextSubId = subId
+ subId = 0
+ while nextSubId >= 128:
+ subId = (subId << 7) + (nextSubId & 0x7F)
+ if index >= substrateLen:
+ raise error.SubstrateUnderrunError(
+ 'Short substrate for sub-OID past %s' % (reloid,)
+ )
+ nextSubId = chunk[index]
+ index += 1
+ reloid += ((subId << 7) + nextSubId,)
+ elif subId == 128:
+ # ASN.1 spec forbids leading zeros (0x80) in OID
+ # encoding, tolerating it opens a vulnerability. See
+ # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
+ # page 7
+ raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding')
+
+ yield self._createComponent(asn1Spec, tagSet, reloid, **options)
+
+
+class RealPayloadDecoder(AbstractSimplePayloadDecoder):
+ protoComponent = univ.Real()
+
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if tagSet[0].tagFormat != tag.tagFormatSimple:
+ raise error.PyAsn1Error('Simple tag format expected')
+
+ for chunk in readFromStream(substrate, length, options):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+
+ if not chunk:
+ yield self._createComponent(asn1Spec, tagSet, 0.0, **options)
+ return
+
+ fo = chunk[0]
+ chunk = chunk[1:]
+ if fo & 0x80: # binary encoding
+ if not chunk:
+ raise error.PyAsn1Error("Incomplete floating-point value")
+
+ if LOG:
+ LOG('decoding binary encoded REAL')
+
+ n = (fo & 0x03) + 1
+
+ if n == 4:
+ n = chunk[0]
+ chunk = chunk[1:]
+
+ eo, chunk = chunk[:n], chunk[n:]
+
+ if not eo or not chunk:
+ raise error.PyAsn1Error('Real exponent screwed')
+
+ e = eo[0] & 0x80 and -1 or 0
+
+ while eo: # exponent
+ e <<= 8
+ e |= eo[0]
+ eo = eo[1:]
+
+ b = fo >> 4 & 0x03 # base bits
+
+ if b > 2:
+ raise error.PyAsn1Error('Illegal Real base')
+
+ if b == 1: # encbase = 8
+ e *= 3
+
+ elif b == 2: # encbase = 16
+ e *= 4
+ p = 0
+
+ while chunk: # value
+ p <<= 8
+ p |= chunk[0]
+ chunk = chunk[1:]
+
+ if fo & 0x40: # sign bit
+ p = -p
+
+ sf = fo >> 2 & 0x03 # scale bits
+ p *= 2 ** sf
+ value = (p, 2, e)
+
+ elif fo & 0x40: # infinite value
+ if LOG:
+ LOG('decoding infinite REAL')
+
+ value = fo & 0x01 and '-inf' or 'inf'
+
+ elif fo & 0xc0 == 0: # character encoding
+ if not chunk:
+ raise error.PyAsn1Error("Incomplete floating-point value")
+
+ if LOG:
+ LOG('decoding character encoded REAL')
+
+ try:
+ if fo & 0x3 == 0x1: # NR1
+ value = (int(chunk), 10, 0)
+
+ elif fo & 0x3 == 0x2: # NR2
+ value = float(chunk)
+
+ elif fo & 0x3 == 0x3: # NR3
+ value = float(chunk)
+
+ else:
+ raise error.SubstrateUnderrunError(
+ 'Unknown NR (tag %s)' % fo
+ )
+
+ except ValueError:
+ raise error.SubstrateUnderrunError(
+ 'Bad character Real syntax'
+ )
+
+ else:
+ raise error.SubstrateUnderrunError(
+ 'Unknown encoding (tag %s)' % fo
+ )
+
+ yield self._createComponent(asn1Spec, tagSet, value, **options)
+
+
+class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder):
+ protoComponent = None
+
+
+class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder):
+ protoRecordComponent = None
+ protoSequenceComponent = None
+
+ def _getComponentTagMap(self, asn1Object, idx):
+ raise NotImplementedError
+
+ def _getComponentPositionByType(self, asn1Object, tagSet, idx):
+ raise NotImplementedError
+
+ def _decodeComponentsSchemaless(
+ self, substrate, tagSet=None, decodeFun=None,
+ length=None, **options):
+
+ asn1Object = None
+
+ components = []
+ componentTypes = set()
+
+ original_position = substrate.tell()
+
+ while length == -1 or substrate.tell() < original_position + length:
+ for component in decodeFun(substrate, **options):
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ if length == -1 and component is eoo.endOfOctets:
+ break
+
+ components.append(component)
+ componentTypes.add(component.tagSet)
+
+ # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF
+ # The heuristics is:
+ # * 1+ components of different types -> likely SEQUENCE/SET
+ # * otherwise -> likely SEQUENCE OF/SET OF
+ if len(componentTypes) > 1:
+ protoComponent = self.protoRecordComponent
+
+ else:
+ protoComponent = self.protoSequenceComponent
+
+ asn1Object = protoComponent.clone(
+ # construct tagSet from base tag from prototype ASN.1 object
+ # and additional tags recovered from the substrate
+ tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags)
+ )
+
+ if LOG:
+ LOG('guessed %r container type (pass `asn1Spec` to guide the '
+ 'decoder)' % asn1Object)
+
+ for idx, component in enumerate(components):
+ asn1Object.setComponentByPosition(
+ idx, component,
+ verifyConstraints=False,
+ matchTags=False, matchConstraints=False
+ )
+
+ yield asn1Object
+
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if tagSet[0].tagFormat != tag.tagFormatConstructed:
+ raise error.PyAsn1Error('Constructed tag format expected')
+
+ original_position = substrate.tell()
+
+ if substrateFun:
+ if asn1Spec is not None:
+ asn1Object = asn1Spec.clone()
+
+ elif self.protoComponent is not None:
+ asn1Object = self.protoComponent.clone(tagSet=tagSet)
+
+ else:
+ asn1Object = self.protoRecordComponent, self.protoSequenceComponent
+
+ for chunk in substrateFun(asn1Object, substrate, length, options):
+ yield chunk
+
+ return
+
+ if asn1Spec is None:
+ for asn1Object in self._decodeComponentsSchemaless(
+ substrate, tagSet=tagSet, decodeFun=decodeFun,
+ length=length, **options):
+ if isinstance(asn1Object, SubstrateUnderrunError):
+ yield asn1Object
+
+ if substrate.tell() < original_position + length:
+ if LOG:
+ for trailing in readFromStream(substrate, context=options):
+ if isinstance(trailing, SubstrateUnderrunError):
+ yield trailing
+
+ LOG('Unused trailing %d octets encountered: %s' % (
+ len(trailing), debug.hexdump(trailing)))
+
+ yield asn1Object
+
+ return
+
+ asn1Object = asn1Spec.clone()
+ asn1Object.clear()
+
+ options = self._passAsn1Object(asn1Object, options)
+
+ if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
+
+ namedTypes = asn1Spec.componentType
+
+ isSetType = asn1Spec.typeId == univ.Set.typeId
+ isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
+
+ if LOG:
+ LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
+ not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
+ asn1Spec))
+
+ seenIndices = set()
+ idx = 0
+ while substrate.tell() - original_position < length:
+ if not namedTypes:
+ componentType = None
+
+ elif isSetType:
+ componentType = namedTypes.tagMapUnique
+
+ else:
+ try:
+ if isDeterministic:
+ componentType = namedTypes[idx].asn1Object
+
+ elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
+ componentType = namedTypes.getTagMapNearPosition(idx)
+
+ else:
+ componentType = namedTypes[idx].asn1Object
+
+ except IndexError:
+ raise error.PyAsn1Error(
+ 'Excessive components decoded at %r' % (asn1Spec,)
+ )
+
+ for component in decodeFun(substrate, componentType, **options):
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ if not isDeterministic and namedTypes:
+ if isSetType:
+ idx = namedTypes.getPositionByType(component.effectiveTagSet)
+
+ elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
+ idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
+
+ asn1Object.setComponentByPosition(
+ idx, component,
+ verifyConstraints=False,
+ matchTags=False, matchConstraints=False
+ )
+
+ seenIndices.add(idx)
+ idx += 1
+
+ if LOG:
+ LOG('seen component indices %s' % seenIndices)
+
+ if namedTypes:
+ if not namedTypes.requiredComponents.issubset(seenIndices):
+ raise error.PyAsn1Error(
+ 'ASN.1 object %s has uninitialized '
+ 'components' % asn1Object.__class__.__name__)
+
+ if namedTypes.hasOpenTypes:
+
+ openTypes = options.get('openTypes', {})
+
+ if LOG:
+ LOG('user-specified open types map:')
+
+ for k, v in openTypes.items():
+ LOG('%s -> %r' % (k, v))
+
+ if openTypes or options.get('decodeOpenTypes', False):
+
+ for idx, namedType in enumerate(namedTypes.namedTypes):
+ if not namedType.openType:
+ continue
+
+ if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
+ continue
+
+ governingValue = asn1Object.getComponentByName(
+ namedType.openType.name
+ )
+
+ try:
+ openType = openTypes[governingValue]
+
+ except KeyError:
+
+ if LOG:
+ LOG('default open types map of component '
+ '"%s.%s" governed by component "%s.%s"'
+ ':' % (asn1Object.__class__.__name__,
+ namedType.name,
+ asn1Object.__class__.__name__,
+ namedType.openType.name))
+
+ for k, v in namedType.openType.items():
+ LOG('%s -> %r' % (k, v))
+
+ try:
+ openType = namedType.openType[governingValue]
+
+ except KeyError:
+ if LOG:
+ LOG('failed to resolve open type by governing '
+ 'value %r' % (governingValue,))
+ continue
+
+ if LOG:
+ LOG('resolved open type %r by governing '
+ 'value %r' % (openType, governingValue))
+
+ containerValue = asn1Object.getComponentByPosition(idx)
+
+ if containerValue.typeId in (
+ univ.SetOf.typeId, univ.SequenceOf.typeId):
+
+ for pos, containerElement in enumerate(
+ containerValue):
+
+ stream = asSeekableStream(containerValue[pos].asOctets())
+
+ for component in decodeFun(stream, asn1Spec=openType, **options):
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ containerValue[pos] = component
+
+ else:
+ stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
+
+ for component in decodeFun(stream, asn1Spec=openType, **options):
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ asn1Object.setComponentByPosition(idx, component)
+
+ else:
+ inconsistency = asn1Object.isInconsistent
+ if inconsistency:
+ raise error.PyAsn1Error(
+ f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent")
+
+ else:
+ componentType = asn1Spec.componentType
+
+ if LOG:
+ LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
+
+ idx = 0
+
+ while substrate.tell() - original_position < length:
+ for component in decodeFun(substrate, componentType, **options):
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ asn1Object.setComponentByPosition(
+ idx, component,
+ verifyConstraints=False,
+ matchTags=False, matchConstraints=False
+ )
+
+ idx += 1
+
+ yield asn1Object
+
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if tagSet[0].tagFormat != tag.tagFormatConstructed:
+ raise error.PyAsn1Error('Constructed tag format expected')
+
+ if substrateFun is not None:
+ if asn1Spec is not None:
+ asn1Object = asn1Spec.clone()
+
+ elif self.protoComponent is not None:
+ asn1Object = self.protoComponent.clone(tagSet=tagSet)
+
+ else:
+ asn1Object = self.protoRecordComponent, self.protoSequenceComponent
+
+ for chunk in substrateFun(asn1Object, substrate, length, options):
+ yield chunk
+
+ return
+
+ if asn1Spec is None:
+ for asn1Object in self._decodeComponentsSchemaless(
+ substrate, tagSet=tagSet, decodeFun=decodeFun,
+ length=length, **dict(options, allowEoo=True)):
+ if isinstance(asn1Object, SubstrateUnderrunError):
+ yield asn1Object
+
+ yield asn1Object
+
+ return
+
+ asn1Object = asn1Spec.clone()
+ asn1Object.clear()
+
+ options = self._passAsn1Object(asn1Object, options)
+
+ if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
+
+ namedTypes = asn1Object.componentType
+
+ isSetType = asn1Object.typeId == univ.Set.typeId
+ isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
+
+ if LOG:
+ LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
+ not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
+ asn1Spec))
+
+ seenIndices = set()
+
+ idx = 0
+
+ while True: # loop over components
+ if len(namedTypes) <= idx:
+ asn1Spec = None
+
+ elif isSetType:
+ asn1Spec = namedTypes.tagMapUnique
+
+ else:
+ try:
+ if isDeterministic:
+ asn1Spec = namedTypes[idx].asn1Object
+
+ elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
+ asn1Spec = namedTypes.getTagMapNearPosition(idx)
+
+ else:
+ asn1Spec = namedTypes[idx].asn1Object
+
+ except IndexError:
+ raise error.PyAsn1Error(
+ 'Excessive components decoded at %r' % (asn1Object,)
+ )
+
+ for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options):
+
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ if component is eoo.endOfOctets:
+ break
+
+ if component is eoo.endOfOctets:
+ break
+
+ if not isDeterministic and namedTypes:
+ if isSetType:
+ idx = namedTypes.getPositionByType(component.effectiveTagSet)
+
+ elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
+ idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
+
+ asn1Object.setComponentByPosition(
+ idx, component,
+ verifyConstraints=False,
+ matchTags=False, matchConstraints=False
+ )
+
+ seenIndices.add(idx)
+ idx += 1
+
+ if LOG:
+ LOG('seen component indices %s' % seenIndices)
+
+ if namedTypes:
+ if not namedTypes.requiredComponents.issubset(seenIndices):
+ raise error.PyAsn1Error(
+ 'ASN.1 object %s has uninitialized '
+ 'components' % asn1Object.__class__.__name__)
+
+ if namedTypes.hasOpenTypes:
+
+ openTypes = options.get('openTypes', {})
+
+ if LOG:
+ LOG('user-specified open types map:')
+
+ for k, v in openTypes.items():
+ LOG('%s -> %r' % (k, v))
+
+ if openTypes or options.get('decodeOpenTypes', False):
+
+ for idx, namedType in enumerate(namedTypes.namedTypes):
+ if not namedType.openType:
+ continue
+
+ if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
+ continue
+
+ governingValue = asn1Object.getComponentByName(
+ namedType.openType.name
+ )
+
+ try:
+ openType = openTypes[governingValue]
+
+ except KeyError:
+
+ if LOG:
+ LOG('default open types map of component '
+ '"%s.%s" governed by component "%s.%s"'
+ ':' % (asn1Object.__class__.__name__,
+ namedType.name,
+ asn1Object.__class__.__name__,
+ namedType.openType.name))
+
+ for k, v in namedType.openType.items():
+ LOG('%s -> %r' % (k, v))
+
+ try:
+ openType = namedType.openType[governingValue]
+
+ except KeyError:
+ if LOG:
+ LOG('failed to resolve open type by governing '
+ 'value %r' % (governingValue,))
+ continue
+
+ if LOG:
+ LOG('resolved open type %r by governing '
+ 'value %r' % (openType, governingValue))
+
+ containerValue = asn1Object.getComponentByPosition(idx)
+
+ if containerValue.typeId in (
+ univ.SetOf.typeId, univ.SequenceOf.typeId):
+
+ for pos, containerElement in enumerate(
+ containerValue):
+
+ stream = asSeekableStream(containerValue[pos].asOctets())
+
+ for component in decodeFun(stream, asn1Spec=openType,
+ **dict(options, allowEoo=True)):
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ if component is eoo.endOfOctets:
+ break
+
+ containerValue[pos] = component
+
+ else:
+ stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
+ for component in decodeFun(stream, asn1Spec=openType,
+ **dict(options, allowEoo=True)):
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ if component is eoo.endOfOctets:
+ break
+
+ asn1Object.setComponentByPosition(idx, component)
+
+ else:
+ inconsistency = asn1Object.isInconsistent
+ if inconsistency:
+ raise error.PyAsn1Error(
+ f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent")
+
+ else:
+ componentType = asn1Spec.componentType
+
+ if LOG:
+ LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
+
+ idx = 0
+
+ while True:
+
+ for component in decodeFun(
+ substrate, componentType, allowEoo=True, **options):
+
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ if component is eoo.endOfOctets:
+ break
+
+ if component is eoo.endOfOctets:
+ break
+
+ asn1Object.setComponentByPosition(
+ idx, component,
+ verifyConstraints=False,
+ matchTags=False, matchConstraints=False
+ )
+
+ idx += 1
+
+ yield asn1Object
+
+
+class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase):
+ protoRecordComponent = univ.Sequence()
+ protoSequenceComponent = univ.SequenceOf()
+
+
+class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
+ protoComponent = univ.Sequence()
+
+
+class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
+ protoComponent = univ.SequenceOf()
+
+
+class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase):
+ protoRecordComponent = univ.Set()
+ protoSequenceComponent = univ.SetOf()
+
+
+class SetPayloadDecoder(SetOrSetOfPayloadDecoder):
+ protoComponent = univ.Set()
+
+
+class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder):
+ protoComponent = univ.SetOf()
+
+
+class ChoicePayloadDecoder(ConstructedPayloadDecoderBase):
+ protoComponent = univ.Choice()
+
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if asn1Spec is None:
+ asn1Object = self.protoComponent.clone(tagSet=tagSet)
+
+ else:
+ asn1Object = asn1Spec.clone()
+
+ if substrateFun:
+ for chunk in substrateFun(asn1Object, substrate, length, options):
+ yield chunk
+
+ return
+
+ options = self._passAsn1Object(asn1Object, options)
+
+ if asn1Object.tagSet == tagSet:
+ if LOG:
+ LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,))
+
+ for component in decodeFun(
+ substrate, asn1Object.componentTagMap, **options):
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ else:
+ if LOG:
+ LOG('decoding %s as untagged CHOICE' % (tagSet,))
+
+ for component in decodeFun(
+ substrate, asn1Object.componentTagMap, tagSet, length,
+ state, **options):
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ effectiveTagSet = component.effectiveTagSet
+
+ if LOG:
+ LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet))
+
+ asn1Object.setComponentByType(
+ effectiveTagSet, component,
+ verifyConstraints=False,
+ matchTags=False, matchConstraints=False,
+ innerFlag=False
+ )
+
+ yield asn1Object
+
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if asn1Spec is None:
+ asn1Object = self.protoComponent.clone(tagSet=tagSet)
+
+ else:
+ asn1Object = asn1Spec.clone()
+
+ if substrateFun:
+ for chunk in substrateFun(asn1Object, substrate, length, options):
+ yield chunk
+
+ return
+
+ options = self._passAsn1Object(asn1Object, options)
+
+ isTagged = asn1Object.tagSet == tagSet
+
+ if LOG:
+ LOG('decoding %s as %stagged CHOICE' % (
+ tagSet, isTagged and 'explicitly ' or 'un'))
+
+ while True:
+
+ if isTagged:
+ iterator = decodeFun(
+ substrate, asn1Object.componentType.tagMapUnique,
+ **dict(options, allowEoo=True))
+
+ else:
+ iterator = decodeFun(
+ substrate, asn1Object.componentType.tagMapUnique,
+ tagSet, length, state, **dict(options, allowEoo=True))
+
+ for component in iterator:
+
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ if component is eoo.endOfOctets:
+ break
+
+ effectiveTagSet = component.effectiveTagSet
+
+ if LOG:
+ LOG('decoded component %s, effective tag set '
+ '%s' % (component, effectiveTagSet))
+
+ asn1Object.setComponentByType(
+ effectiveTagSet, component,
+ verifyConstraints=False,
+ matchTags=False, matchConstraints=False,
+ innerFlag=False
+ )
+
+ if not isTagged:
+ break
+
+ if not isTagged or component is eoo.endOfOctets:
+ break
+
+ yield asn1Object
+
+
+class AnyPayloadDecoder(AbstractSimplePayloadDecoder):
+ protoComponent = univ.Any()
+
+ def valueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if asn1Spec is None:
+ isUntagged = True
+
+ elif asn1Spec.__class__ is tagmap.TagMap:
+ isUntagged = tagSet not in asn1Spec.tagMap
+
+ else:
+ isUntagged = tagSet != asn1Spec.tagSet
+
+ if isUntagged:
+ fullPosition = substrate.markedPosition
+ currentPosition = substrate.tell()
+
+ substrate.seek(fullPosition, os.SEEK_SET)
+ length += currentPosition - fullPosition
+
+ if LOG:
+ for chunk in peekIntoStream(substrate, length):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+ LOG('decoding as untagged ANY, substrate '
+ '%s' % debug.hexdump(chunk))
+
+ if substrateFun:
+ for chunk in substrateFun(
+ self._createComponent(asn1Spec, tagSet, noValue, **options),
+ substrate, length, options):
+ yield chunk
+
+ return
+
+ for chunk in readFromStream(substrate, length, options):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+
+ yield self._createComponent(asn1Spec, tagSet, chunk, **options)
+
+ def indefLenValueDecoder(self, substrate, asn1Spec,
+ tagSet=None, length=None, state=None,
+ decodeFun=None, substrateFun=None,
+ **options):
+ if asn1Spec is None:
+ isTagged = False
+
+ elif asn1Spec.__class__ is tagmap.TagMap:
+ isTagged = tagSet in asn1Spec.tagMap
+
+ else:
+ isTagged = tagSet == asn1Spec.tagSet
+
+ if isTagged:
+ # tagged Any type -- consume header substrate
+ chunk = b''
+
+ if LOG:
+ LOG('decoding as tagged ANY')
+
+ else:
+ # TODO: Seems not to be tested
+ fullPosition = substrate.markedPosition
+ currentPosition = substrate.tell()
+
+ substrate.seek(fullPosition, os.SEEK_SET)
+ for chunk in readFromStream(substrate, currentPosition - fullPosition, options):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield chunk
+
+ if LOG:
+ LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk))
+
+ # Any components do not inherit initial tag
+ asn1Spec = self.protoComponent
+
+ if substrateFun and substrateFun is not self.substrateCollector:
+ asn1Object = self._createComponent(
+ asn1Spec, tagSet, noValue, **options)
+
+ for chunk in substrateFun(
+ asn1Object, chunk + substrate, length + len(chunk), options):
+ yield chunk
+
+ return
+
+ if LOG:
+ LOG('assembling constructed serialization')
+
+ # All inner fragments are of the same type, treat them as octet string
+ substrateFun = self.substrateCollector
+
+ while True: # loop over fragments
+
+ for component in decodeFun(
+ substrate, asn1Spec, substrateFun=substrateFun,
+ allowEoo=True, **options):
+
+ if isinstance(component, SubstrateUnderrunError):
+ yield component
+
+ if component is eoo.endOfOctets:
+ break
+
+ if component is eoo.endOfOctets:
+ break
+
+ chunk += component
+
+ if substrateFun:
+ yield chunk # TODO: Weird
+
+ else:
+ yield self._createComponent(asn1Spec, tagSet, chunk, **options)
+
+
+# character string types
+class UTF8StringPayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = char.UTF8String()
+
+
+class NumericStringPayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = char.NumericString()
+
+
+class PrintableStringPayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = char.PrintableString()
+
+
+class TeletexStringPayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = char.TeletexString()
+
+
+class VideotexStringPayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = char.VideotexString()
+
+
+class IA5StringPayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = char.IA5String()
+
+
+class GraphicStringPayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = char.GraphicString()
+
+
+class VisibleStringPayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = char.VisibleString()
+
+
+class GeneralStringPayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = char.GeneralString()
+
+
+class UniversalStringPayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = char.UniversalString()
+
+
+class BMPStringPayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = char.BMPString()
+
+
+# "useful" types
+class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = useful.ObjectDescriptor()
+
+
+class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = useful.GeneralizedTime()
+
+
+class UTCTimePayloadDecoder(OctetStringPayloadDecoder):
+ protoComponent = useful.UTCTime()
+
+
+TAG_MAP = {
+ univ.Integer.tagSet: IntegerPayloadDecoder(),
+ univ.Boolean.tagSet: BooleanPayloadDecoder(),
+ univ.BitString.tagSet: BitStringPayloadDecoder(),
+ univ.OctetString.tagSet: OctetStringPayloadDecoder(),
+ univ.Null.tagSet: NullPayloadDecoder(),
+ univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(),
+ univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(),
+ univ.Enumerated.tagSet: IntegerPayloadDecoder(),
+ univ.Real.tagSet: RealPayloadDecoder(),
+ univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf
+ univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf
+ univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any
+ # character string types
+ char.UTF8String.tagSet: UTF8StringPayloadDecoder(),
+ char.NumericString.tagSet: NumericStringPayloadDecoder(),
+ char.PrintableString.tagSet: PrintableStringPayloadDecoder(),
+ char.TeletexString.tagSet: TeletexStringPayloadDecoder(),
+ char.VideotexString.tagSet: VideotexStringPayloadDecoder(),
+ char.IA5String.tagSet: IA5StringPayloadDecoder(),
+ char.GraphicString.tagSet: GraphicStringPayloadDecoder(),
+ char.VisibleString.tagSet: VisibleStringPayloadDecoder(),
+ char.GeneralString.tagSet: GeneralStringPayloadDecoder(),
+ char.UniversalString.tagSet: UniversalStringPayloadDecoder(),
+ char.BMPString.tagSet: BMPStringPayloadDecoder(),
+ # useful types
+ useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(),
+ useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(),
+ useful.UTCTime.tagSet: UTCTimePayloadDecoder()
+}
+
+# Type-to-codec map for ambiguous ASN.1 types
+TYPE_MAP = {
+ univ.Set.typeId: SetPayloadDecoder(),
+ univ.SetOf.typeId: SetOfPayloadDecoder(),
+ univ.Sequence.typeId: SequencePayloadDecoder(),
+ univ.SequenceOf.typeId: SequenceOfPayloadDecoder(),
+ univ.Choice.typeId: ChoicePayloadDecoder(),
+ univ.Any.typeId: AnyPayloadDecoder()
+}
+
+# Put in non-ambiguous types for faster codec lookup
+for typeDecoder in TAG_MAP.values():
+ if typeDecoder.protoComponent is not None:
+ typeId = typeDecoder.protoComponent.__class__.typeId
+ if typeId is not None and typeId not in TYPE_MAP:
+ TYPE_MAP[typeId] = typeDecoder
+
+
+(stDecodeTag,
+ stDecodeLength,
+ stGetValueDecoder,
+ stGetValueDecoderByAsn1Spec,
+ stGetValueDecoderByTag,
+ stTryAsExplicitTag,
+ stDecodeValue,
+ stDumpRawValue,
+ stErrorCondition,
+ stStop) = [x for x in range(10)]
+
+
+EOO_SENTINEL = bytes((0, 0))
+
+
+class SingleItemDecoder(object):
+ defaultErrorState = stErrorCondition
+ #defaultErrorState = stDumpRawValue
+ defaultRawDecoder = AnyPayloadDecoder()
+
+ supportIndefLength = True
+
+ TAG_MAP = TAG_MAP
+ TYPE_MAP = TYPE_MAP
+
+ def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored):
+ self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP
+ self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP
+
+ # Tag & TagSet objects caches
+ self._tagCache = {}
+ self._tagSetCache = {}
+
+ def __call__(self, substrate, asn1Spec=None,
+ tagSet=None, length=None, state=stDecodeTag,
+ decodeFun=None, substrateFun=None,
+ **options):
+
+ allowEoo = options.pop('allowEoo', False)
+
+ if LOG:
+ LOG('decoder called at scope %s with state %d, working with up '
+ 'to %s octets of substrate: '
+ '%s' % (debug.scope, state, length, substrate))
+
+ # Look for end-of-octets sentinel
+ if allowEoo and self.supportIndefLength:
+
+ for eoo_candidate in readFromStream(substrate, 2, options):
+ if isinstance(eoo_candidate, SubstrateUnderrunError):
+ yield eoo_candidate
+
+ if eoo_candidate == EOO_SENTINEL:
+ if LOG:
+ LOG('end-of-octets sentinel found')
+ yield eoo.endOfOctets
+ return
+
+ else:
+ substrate.seek(-2, os.SEEK_CUR)
+
+ tagMap = self._tagMap
+ typeMap = self._typeMap
+ tagCache = self._tagCache
+ tagSetCache = self._tagSetCache
+
+ value = noValue
+
+ substrate.markedPosition = substrate.tell()
+
+ while state is not stStop:
+
+ if state is stDecodeTag:
+ # Decode tag
+ isShortTag = True
+
+ for firstByte in readFromStream(substrate, 1, options):
+ if isinstance(firstByte, SubstrateUnderrunError):
+ yield firstByte
+
+ firstOctet = ord(firstByte)
+
+ try:
+ lastTag = tagCache[firstOctet]
+
+ except KeyError:
+ integerTag = firstOctet
+ tagClass = integerTag & 0xC0
+ tagFormat = integerTag & 0x20
+ tagId = integerTag & 0x1F
+
+ if tagId == 0x1F:
+ isShortTag = False
+ lengthOctetIdx = 0
+ tagId = 0
+
+ while True:
+ for integerByte in readFromStream(substrate, 1, options):
+ if isinstance(integerByte, SubstrateUnderrunError):
+ yield integerByte
+
+ if not integerByte:
+ raise error.SubstrateUnderrunError(
+ 'Short octet stream on long tag decoding'
+ )
+
+ integerTag = ord(integerByte)
+ lengthOctetIdx += 1
+ tagId <<= 7
+ tagId |= (integerTag & 0x7F)
+
+ if not integerTag & 0x80:
+ break
+
+ lastTag = tag.Tag(
+ tagClass=tagClass, tagFormat=tagFormat, tagId=tagId
+ )
+
+ if isShortTag:
+ # cache short tags
+ tagCache[firstOctet] = lastTag
+
+ if tagSet is None:
+ if isShortTag:
+ try:
+ tagSet = tagSetCache[firstOctet]
+
+ except KeyError:
+ # base tag not recovered
+ tagSet = tag.TagSet((), lastTag)
+ tagSetCache[firstOctet] = tagSet
+ else:
+ tagSet = tag.TagSet((), lastTag)
+
+ else:
+ tagSet = lastTag + tagSet
+
+ state = stDecodeLength
+
+ if LOG:
+ LOG('tag decoded into %s, decoding length' % tagSet)
+
+ if state is stDecodeLength:
+ # Decode length
+ for firstOctet in readFromStream(substrate, 1, options):
+ if isinstance(firstOctet, SubstrateUnderrunError):
+ yield firstOctet
+
+ firstOctet = ord(firstOctet)
+
+ if firstOctet < 128:
+ length = firstOctet
+
+ elif firstOctet > 128:
+ size = firstOctet & 0x7F
+ # encoded in size bytes
+ for encodedLength in readFromStream(substrate, size, options):
+ if isinstance(encodedLength, SubstrateUnderrunError):
+ yield encodedLength
+ encodedLength = list(encodedLength)
+ # missing check on maximum size, which shouldn't be a
+ # problem, we can handle more than is possible
+ if len(encodedLength) != size:
+ raise error.SubstrateUnderrunError(
+ '%s<%s at %s' % (size, len(encodedLength), tagSet)
+ )
+
+ length = 0
+ for lengthOctet in encodedLength:
+ length <<= 8
+ length |= lengthOctet
+ size += 1
+
+ else: # 128 means indefinite
+ length = -1
+
+ if length == -1 and not self.supportIndefLength:
+ raise error.PyAsn1Error('Indefinite length encoding not supported by this codec')
+
+ state = stGetValueDecoder
+
+ if LOG:
+ LOG('value length decoded into %d' % length)
+
+ if state is stGetValueDecoder:
+ if asn1Spec is None:
+ state = stGetValueDecoderByTag
+
+ else:
+ state = stGetValueDecoderByAsn1Spec
+ #
+ # There're two ways of creating subtypes in ASN.1 what influences
+ # decoder operation. These methods are:
+ # 1) Either base types used in or no IMPLICIT tagging has been
+ # applied on subtyping.
+ # 2) Subtype syntax drops base type information (by means of
+ # IMPLICIT tagging.
+ # The first case allows for complete tag recovery from substrate
+ # while the second one requires original ASN.1 type spec for
+ # decoding.
+ #
+ # In either case a set of tags (tagSet) is coming from substrate
+ # in an incremental, tag-by-tag fashion (this is the case of
+ # EXPLICIT tag which is most basic). Outermost tag comes first
+ # from the wire.
+ #
+ if state is stGetValueDecoderByTag:
+ try:
+ concreteDecoder = tagMap[tagSet]
+
+ except KeyError:
+ concreteDecoder = None
+
+ if concreteDecoder:
+ state = stDecodeValue
+
+ else:
+ try:
+ concreteDecoder = tagMap[tagSet[:1]]
+
+ except KeyError:
+ concreteDecoder = None
+
+ if concreteDecoder:
+ state = stDecodeValue
+ else:
+ state = stTryAsExplicitTag
+
+ if LOG:
+ LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
+ debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__)
+
+ if state is stGetValueDecoderByAsn1Spec:
+
+ if asn1Spec.__class__ is tagmap.TagMap:
+ try:
+ chosenSpec = asn1Spec[tagSet]
+
+ except KeyError:
+ chosenSpec = None
+
+ if LOG:
+ LOG('candidate ASN.1 spec is a map of:')
+
+ for firstOctet, v in asn1Spec.presentTypes.items():
+ LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
+
+ if asn1Spec.skipTypes:
+ LOG('but neither of: ')
+ for firstOctet, v in asn1Spec.skipTypes.items():
+ LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
+ LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet))
+
+ elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap:
+ chosenSpec = asn1Spec
+ if LOG:
+ LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__)
+
+ else:
+ chosenSpec = None
+
+ if chosenSpec is not None:
+ try:
+ # ambiguous type or just faster codec lookup
+ concreteDecoder = typeMap[chosenSpec.typeId]
+
+ if LOG:
+ LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,))
+
+ except KeyError:
+ # use base type for codec lookup to recover untagged types
+ baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag)
+ try:
+ # base type or tagged subtype
+ concreteDecoder = tagMap[baseTagSet]
+
+ if LOG:
+ LOG('value decoder chosen by base %s' % (baseTagSet,))
+
+ except KeyError:
+ concreteDecoder = None
+
+ if concreteDecoder:
+ asn1Spec = chosenSpec
+ state = stDecodeValue
+
+ else:
+ state = stTryAsExplicitTag
+
+ else:
+ concreteDecoder = None
+ state = stTryAsExplicitTag
+
+ if LOG:
+ LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
+ debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__)
+
+ if state is stDecodeValue:
+ if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this
+ def substrateFun(asn1Object, _substrate, _length, _options):
+ """Legacy hack to keep the recursiveFlag=False option supported.
+
+ The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization
+ of the old recursiveFlag=False option. Users should pass their callback instead of using
+ recursiveFlag.
+ """
+ yield asn1Object
+
+ original_position = substrate.tell()
+
+ if length == -1: # indef length
+ for value in concreteDecoder.indefLenValueDecoder(
+ substrate, asn1Spec,
+ tagSet, length, stGetValueDecoder,
+ self, substrateFun, **options):
+ if isinstance(value, SubstrateUnderrunError):
+ yield value
+
+ else:
+ for value in concreteDecoder.valueDecoder(
+ substrate, asn1Spec,
+ tagSet, length, stGetValueDecoder,
+ self, substrateFun, **options):
+ if isinstance(value, SubstrateUnderrunError):
+ yield value
+
+ bytesRead = substrate.tell() - original_position
+ if not substrateFun and bytesRead != length:
+ raise PyAsn1Error(
+ "Read %s bytes instead of expected %s." % (bytesRead, length))
+ elif substrateFun and bytesRead > length:
+ # custom substrateFun may be used for partial decoding, reading less is expected there
+ raise PyAsn1Error(
+ "Read %s bytes are more than expected %s." % (bytesRead, length))
+
+ if LOG:
+ LOG('codec %s yields type %s, value:\n%s\n...' % (
+ concreteDecoder.__class__.__name__, value.__class__.__name__,
+ isinstance(value, base.Asn1Item) and value.prettyPrint() or value))
+
+ state = stStop
+ break
+
+ if state is stTryAsExplicitTag:
+ if (tagSet and
+ tagSet[0].tagFormat == tag.tagFormatConstructed and
+ tagSet[0].tagClass != tag.tagClassUniversal):
+ # Assume explicit tagging
+ concreteDecoder = rawPayloadDecoder
+ state = stDecodeValue
+
+ else:
+ concreteDecoder = None
+ state = self.defaultErrorState
+
+ if LOG:
+ LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure'))
+
+ if state is stDumpRawValue:
+ concreteDecoder = self.defaultRawDecoder
+
+ if LOG:
+ LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
+
+ state = stDecodeValue
+
+ if state is stErrorCondition:
+ raise error.PyAsn1Error(
+ '%s not in asn1Spec: %r' % (tagSet, asn1Spec)
+ )
+
+ if LOG:
+ debug.scope.pop()
+ LOG('decoder left scope %s, call completed' % debug.scope)
+
+ yield value
+
+
+class StreamingDecoder(object):
+ """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects.
+
+ On each iteration, consume whatever BER/CER/DER serialization is
+ available in the `substrate` stream-like object and turns it into
+ one or more, possibly nested, ASN.1 objects.
+
+ Parameters
+ ----------
+ substrate: :py:class:`file`, :py:class:`io.BytesIO`
+ BER/CER/DER serialization in form of a byte stream
+
+ Keyword Args
+ ------------
+ asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
+ A pyasn1 type object to act as a template guiding the decoder.
+ Depending on the ASN.1 structure being decoded, `asn1Spec` may
+ or may not be required. One of the reasons why `asn1Spec` may
+ me required is that ASN.1 structure is encoded in the *IMPLICIT*
+ tagging mode.
+
+ Yields
+ ------
+ : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError`
+ Decoded ASN.1 object (possibly, nested) or
+ :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating
+ insufficient BER/CER/DER serialization on input to fully recover ASN.1
+ objects from it.
+
+ In the latter case the caller is advised to ensure some more data in
+ the input stream, then call the iterator again. The decoder will resume
+ the decoding process using the newly arrived data.
+
+ The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError`
+ object might hold a reference to the partially populated ASN.1 object
+ being reconstructed.
+
+ Raises
+ ------
+ ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError
+ `PyAsn1Error` on deserialization error, `EndOfStreamError` on
+ premature stream closure.
+
+ Examples
+ --------
+ Decode BER serialisation without ASN.1 schema
+
+ .. code-block:: pycon
+
+ >>> stream = io.BytesIO(
+ ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
+ >>>
+ >>> for asn1Object in StreamingDecoder(stream):
+ ... print(asn1Object)
+ >>>
+ SequenceOf:
+ 1 2 3
+
+ Decode BER serialisation with ASN.1 schema
+
+ .. code-block:: pycon
+
+ >>> stream = io.BytesIO(
+ ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
+ >>>
+ >>> schema = SequenceOf(componentType=Integer())
+ >>>
+ >>> decoder = StreamingDecoder(stream, asn1Spec=schema)
+ >>> for asn1Object in decoder:
+ ... print(asn1Object)
+ >>>
+ SequenceOf:
+ 1 2 3
+ """
+
+ SINGLE_ITEM_DECODER = SingleItemDecoder
+
+ def __init__(self, substrate, asn1Spec=None, **options):
+ self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options)
+ self._substrate = asSeekableStream(substrate)
+ self._asn1Spec = asn1Spec
+ self._options = options
+
+ def __iter__(self):
+ while True:
+ for asn1Object in self._singleItemDecoder(
+ self._substrate, self._asn1Spec, **self._options):
+ yield asn1Object
+
+ for chunk in isEndOfStream(self._substrate):
+ if isinstance(chunk, SubstrateUnderrunError):
+ yield
+
+ break
+
+ if chunk:
+ break
+
+
+class Decoder(object):
+ """Create a BER decoder object.
+
+ Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object.
+ """
+ STREAMING_DECODER = StreamingDecoder
+
+ @classmethod
+ def __call__(cls, substrate, asn1Spec=None, **options):
+ """Turns BER/CER/DER octet stream into an ASN.1 object.
+
+ Takes BER/CER/DER octet-stream in form of :py:class:`bytes`
+ and decode it into an ASN.1 object
+ (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
+ may be a scalar or an arbitrary nested structure.
+
+ Parameters
+ ----------
+ substrate: :py:class:`bytes`
+ BER/CER/DER octet-stream to parse
+
+ Keyword Args
+ ------------
+ asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
+ A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item`
+ derivative) to act as a template guiding the decoder.
+ Depending on the ASN.1 structure being decoded, `asn1Spec` may or
+ may not be required. Most common reason for it to require is that
+ ASN.1 structure is encoded in *IMPLICIT* tagging mode.
+
+ substrateFun: :py:class:`Union[
+ Callable[[pyasn1.type.base.PyAsn1Item, bytes, int],
+ Tuple[pyasn1.type.base.PyAsn1Item, bytes]],
+ Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict],
+ Generator[Union[pyasn1.type.base.PyAsn1Item,
+ pyasn1.error.SubstrateUnderrunError],
+ None, None]]
+ ]`
+ User callback meant to generalize special use cases like non-recursive or
+ partial decoding. A 3-arg non-streaming variant is supported for backwards
+ compatiblilty in addition to the newer 4-arg streaming variant.
+ The callback will receive the uninitialized object recovered from substrate
+ as 1st argument, the uninterpreted payload as 2nd argument, and the length
+ of the uninterpreted payload as 3rd argument. The streaming variant will
+ additionally receive the decode(..., **options) kwargs as 4th argument.
+ The non-streaming variant shall return an object that will be propagated
+ as decode() return value as 1st item, and the remainig payload for further
+ decode passes as 2nd item.
+ The streaming variant shall yield an object that will be propagated as
+ decode() return value, and leave the remaining payload in the stream.
+
+ Returns
+ -------
+ : :py:class:`tuple`
+ A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object
+ recovered from BER/CER/DER substrate and the unprocessed trailing
+ portion of the `substrate` (may be empty)
+
+ Raises
+ ------
+ : :py:class:`~pyasn1.error.PyAsn1Error`
+ :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient
+ input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error.
+
+ Examples
+ --------
+ Decode BER/CER/DER serialisation without ASN.1 schema
+
+ .. code-block:: pycon
+
+ >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
+ >>> str(s)
+ SequenceOf:
+ 1 2 3
+
+ Decode BER/CER/DER serialisation with ASN.1 schema
+
+ .. code-block:: pycon
+
+ >>> seq = SequenceOf(componentType=Integer())
+ >>> s, unprocessed = decode(
+ b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
+ >>> str(s)
+ SequenceOf:
+ 1 2 3
+
+ """
+ substrate = asSeekableStream(substrate)
+
+ if "substrateFun" in options:
+ origSubstrateFun = options["substrateFun"]
+
+ def substrateFunWrapper(asn1Object, substrate, length, options=None):
+ """Support both 0.4 and 0.5 style APIs.
+
+ substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible,
+ we first try if we received a streaming user callback. If that fails,we assume we've received a
+ non-streaming v0.4 user callback and convert it for streaming on the fly
+ """
+ try:
+ substrate_gen = origSubstrateFun(asn1Object, substrate, length, options)
+ except TypeError as _value:
+ if _value.__traceback__.tb_next:
+ # Traceback depth > 1 means TypeError from inside user provided function
+ raise
+ # invariant maintained at Decoder.__call__ entry
+ assert isinstance(substrate, io.BytesIO) # nosec assert_used
+ substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length)
+ for value in substrate_gen:
+ yield value
+
+ options["substrateFun"] = substrateFunWrapper
+
+ streamingDecoder = cls.STREAMING_DECODER(
+ substrate, asn1Spec, **options)
+
+ for asn1Object in streamingDecoder:
+ if isinstance(asn1Object, SubstrateUnderrunError):
+ raise error.SubstrateUnderrunError('Short substrate on input')
+
+ try:
+ tail = next(readFromStream(substrate))
+
+ except error.EndOfStreamError:
+ tail = b''
+
+ return asn1Object, tail
+
+ @staticmethod
+ def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length):
+ substrate_bytes = substrate.read()
+ if length == -1:
+ length = len(substrate_bytes)
+ value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length)
+ nbytes = substrate.write(nextSubstrate)
+ substrate.truncate()
+ substrate.seek(-nbytes, os.SEEK_CUR)
+ yield value
+
+#: Turns BER octet stream into an ASN.1 object.
+#:
+#: Takes BER octet-stream and decode it into an ASN.1 object
+#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
+#: may be a scalar or an arbitrary nested structure.
+#:
+#: Parameters
+#: ----------
+#: substrate: :py:class:`bytes`
+#: BER octet-stream
+#:
+#: Keyword Args
+#: ------------
+#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
+#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure
+#: being decoded, *asn1Spec* may or may not be required. Most common reason for
+#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode.
+#:
+#: Returns
+#: -------
+#: : :py:class:`tuple`
+#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
+#: and the unprocessed trailing portion of the *substrate* (may be empty)
+#:
+#: Raises
+#: ------
+#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError
+#: On decoding errors
+#:
+#: Notes
+#: -----
+#: This function is deprecated. Please use :py:class:`Decoder` or
+#: :py:class:`StreamingDecoder` class instance.
+#:
+#: Examples
+#: --------
+#: Decode BER serialisation without ASN.1 schema
+#:
+#: .. code-block:: pycon
+#:
+#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
+#: >>> str(s)
+#: SequenceOf:
+#: 1 2 3
+#:
+#: Decode BER serialisation with ASN.1 schema
+#:
+#: .. code-block:: pycon
+#:
+#: >>> seq = SequenceOf(componentType=Integer())
+#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
+#: >>> str(s)
+#: SequenceOf:
+#: 1 2 3
+#:
+decode = Decoder()
+
+def __getattr__(attr: str):
+ if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr):
+ warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning)
+ return globals()[newAttr]
+ raise AttributeError(attr)
diff --git a/.venv/lib/python3.12/site-packages/pyasn1/codec/ber/encoder.py b/.venv/lib/python3.12/site-packages/pyasn1/codec/ber/encoder.py
new file mode 100644
index 00000000..d16fb1f8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pyasn1/codec/ber/encoder.py
@@ -0,0 +1,954 @@
+#
+# This file is part of pyasn1 software.
+#
+# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
+# License: https://pyasn1.readthedocs.io/en/latest/license.html
+#
+import sys
+import warnings
+
+from pyasn1 import debug
+from pyasn1 import error
+from pyasn1.codec.ber import eoo
+from pyasn1.compat import _MISSING
+from pyasn1.compat.integer import to_bytes
+from pyasn1.type import char
+from pyasn1.type import tag
+from pyasn1.type import univ
+from pyasn1.type import useful
+
+__all__ = ['Encoder', 'encode']
+
+LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_ENCODER)
+
+
+class AbstractItemEncoder(object):
+ supportIndefLenMode = True
+
+ # An outcome of otherwise legit call `encodeFun(eoo.endOfOctets)`
+ eooIntegerSubstrate = (0, 0)
+ eooOctetsSubstrate = bytes(eooIntegerSubstrate)
+
+ # noinspection PyMethodMayBeStatic
+ def encodeTag(self, singleTag, isConstructed):
+ tagClass, tagFormat, tagId = singleTag
+ encodedTag = tagClass | tagFormat
+ if isConstructed:
+ encodedTag |= tag.tagFormatConstructed
+
+ if tagId < 31:
+ return encodedTag | tagId,
+
+ else:
+ substrate = tagId & 0x7f,
+
+ tagId >>= 7
+
+ while tagId:
+ substrate = (0x80 | (tagId & 0x7f),) + substrate
+ tagId >>= 7
+
+ return (encodedTag | 0x1F,) + substrate
+
+ def encodeLength(self, length, defMode):
+ if not defMode and self.supportIndefLenMode:
+ return (0x80,)
+
+ if length < 0x80:
+ return length,
+
+ else:
+ substrate = ()
+ while length:
+ substrate = (length & 0xff,) + substrate
+ length >>= 8
+
+ substrateLen = len(substrate)
+
+ if substrateLen > 126:
+ raise error.PyAsn1Error('Length octets overflow (%d)' % substrateLen)
+
+ return (0x80 | substrateLen,) + substrate
+
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ raise error.PyAsn1Error('Not implemented')
+
+ def encode(self, value, asn1Spec=None, encodeFun=None, **options):
+
+ if asn1Spec is None:
+ tagSet = value.tagSet
+ else:
+ tagSet = asn1Spec.tagSet
+
+ # untagged item?
+ if not tagSet:
+ substrate, isConstructed, isOctets = self.encodeValue(
+ value, asn1Spec, encodeFun, **options
+ )
+ return substrate
+
+ defMode = options.get('defMode', True)
+
+ substrate = b''
+
+ for idx, singleTag in enumerate(tagSet.superTags):
+
+ defModeOverride = defMode
+
+ # base tag?
+ if not idx:
+ try:
+ substrate, isConstructed, isOctets = self.encodeValue(
+ value, asn1Spec, encodeFun, **options
+ )
+
+ except error.PyAsn1Error as exc:
+ raise error.PyAsn1Error(
+ 'Error encoding %r: %s' % (value, exc))
+
+ if LOG:
+ LOG('encoded %svalue %s into %s' % (
+ isConstructed and 'constructed ' or '', value, substrate
+ ))
+
+ if not substrate and isConstructed and options.get('ifNotEmpty', False):
+ return substrate
+
+ if not isConstructed:
+ defModeOverride = True
+
+ if LOG:
+ LOG('overridden encoding mode into definitive for primitive type')
+
+ header = self.encodeTag(singleTag, isConstructed)
+
+ if LOG:
+ LOG('encoded %stag %s into %s' % (
+ isConstructed and 'constructed ' or '',
+ singleTag, debug.hexdump(bytes(header))))
+
+ header += self.encodeLength(len(substrate), defModeOverride)
+
+ if LOG:
+ LOG('encoded %s octets (tag + payload) into %s' % (
+ len(substrate), debug.hexdump(bytes(header))))
+
+ if isOctets:
+ substrate = bytes(header) + substrate
+
+ if not defModeOverride:
+ substrate += self.eooOctetsSubstrate
+
+ else:
+ substrate = header + substrate
+
+ if not defModeOverride:
+ substrate += self.eooIntegerSubstrate
+
+ if not isOctets:
+ substrate = bytes(substrate)
+
+ return substrate
+
+
+class EndOfOctetsEncoder(AbstractItemEncoder):
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ return b'', False, True
+
+
+class BooleanEncoder(AbstractItemEncoder):
+ supportIndefLenMode = False
+
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ return value and (1,) or (0,), False, False
+
+
+class IntegerEncoder(AbstractItemEncoder):
+ supportIndefLenMode = False
+ supportCompactZero = False
+
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ if value == 0:
+ if LOG:
+ LOG('encoding %spayload for zero INTEGER' % (
+ self.supportCompactZero and 'no ' or ''
+ ))
+
+ # de-facto way to encode zero
+ if self.supportCompactZero:
+ return (), False, False
+ else:
+ return (0,), False, False
+
+ return to_bytes(int(value), signed=True), False, True
+
+
+class BitStringEncoder(AbstractItemEncoder):
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ if asn1Spec is not None:
+ # TODO: try to avoid ASN.1 schema instantiation
+ value = asn1Spec.clone(value)
+
+ valueLength = len(value)
+ if valueLength % 8:
+ alignedValue = value << (8 - valueLength % 8)
+ else:
+ alignedValue = value
+
+ maxChunkSize = options.get('maxChunkSize', 0)
+ if not maxChunkSize or len(alignedValue) <= maxChunkSize * 8:
+ substrate = alignedValue.asOctets()
+ return bytes((len(substrate) * 8 - valueLength,)) + substrate, False, True
+
+ if LOG:
+ LOG('encoding into up to %s-octet chunks' % maxChunkSize)
+
+ baseTag = value.tagSet.baseTag
+
+ # strip off explicit tags
+ if baseTag:
+ tagSet = tag.TagSet(baseTag, baseTag)
+
+ else:
+ tagSet = tag.TagSet()
+
+ alignedValue = alignedValue.clone(tagSet=tagSet)
+
+ stop = 0
+ substrate = b''
+ while stop < valueLength:
+ start = stop
+ stop = min(start + maxChunkSize * 8, valueLength)
+ substrate += encodeFun(alignedValue[start:stop], asn1Spec, **options)
+
+ return substrate, True, True
+
+
+class OctetStringEncoder(AbstractItemEncoder):
+
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+
+ if asn1Spec is None:
+ substrate = value.asOctets()
+
+ elif not isinstance(value, bytes):
+ substrate = asn1Spec.clone(value).asOctets()
+
+ else:
+ substrate = value
+
+ maxChunkSize = options.get('maxChunkSize', 0)
+
+ if not maxChunkSize or len(substrate) <= maxChunkSize:
+ return substrate, False, True
+
+ if LOG:
+ LOG('encoding into up to %s-octet chunks' % maxChunkSize)
+
+ # strip off explicit tags for inner chunks
+
+ if asn1Spec is None:
+ baseTag = value.tagSet.baseTag
+
+ # strip off explicit tags
+ if baseTag:
+ tagSet = tag.TagSet(baseTag, baseTag)
+
+ else:
+ tagSet = tag.TagSet()
+
+ asn1Spec = value.clone(tagSet=tagSet)
+
+ elif not isinstance(value, bytes):
+ baseTag = asn1Spec.tagSet.baseTag
+
+ # strip off explicit tags
+ if baseTag:
+ tagSet = tag.TagSet(baseTag, baseTag)
+
+ else:
+ tagSet = tag.TagSet()
+
+ asn1Spec = asn1Spec.clone(tagSet=tagSet)
+
+ pos = 0
+ substrate = b''
+
+ while True:
+ chunk = value[pos:pos + maxChunkSize]
+ if not chunk:
+ break
+
+ substrate += encodeFun(chunk, asn1Spec, **options)
+ pos += maxChunkSize
+
+ return substrate, True, True
+
+
+class NullEncoder(AbstractItemEncoder):
+ supportIndefLenMode = False
+
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ return b'', False, True
+
+
+class ObjectIdentifierEncoder(AbstractItemEncoder):
+ supportIndefLenMode = False
+
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ if asn1Spec is not None:
+ value = asn1Spec.clone(value)
+
+ oid = value.asTuple()
+
+ # Build the first pair
+ try:
+ first = oid[0]
+ second = oid[1]
+
+ except IndexError:
+ raise error.PyAsn1Error('Short OID %s' % (value,))
+
+ if 0 <= second <= 39:
+ if first == 1:
+ oid = (second + 40,) + oid[2:]
+ elif first == 0:
+ oid = (second,) + oid[2:]
+ elif first == 2:
+ oid = (second + 80,) + oid[2:]
+ else:
+ raise error.PyAsn1Error('Impossible first/second arcs at %s' % (value,))
+
+ elif first == 2:
+ oid = (second + 80,) + oid[2:]
+
+ else:
+ raise error.PyAsn1Error('Impossible first/second arcs at %s' % (value,))
+
+ octets = ()
+
+ # Cycle through subIds
+ for subOid in oid:
+ if 0 <= subOid <= 127:
+ # Optimize for the common case
+ octets += (subOid,)
+
+ elif subOid > 127:
+ # Pack large Sub-Object IDs
+ res = (subOid & 0x7f,)
+ subOid >>= 7
+
+ while subOid:
+ res = (0x80 | (subOid & 0x7f),) + res
+ subOid >>= 7
+
+ # Add packed Sub-Object ID to resulted Object ID
+ octets += res
+
+ else:
+ raise error.PyAsn1Error('Negative OID arc %s at %s' % (subOid, value))
+
+ return octets, False, False
+
+
+class RelativeOIDEncoder(AbstractItemEncoder):
+ supportIndefLenMode = False
+
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ if asn1Spec is not None:
+ value = asn1Spec.clone(value)
+
+ octets = ()
+
+ # Cycle through subIds
+ for subOid in value.asTuple():
+ if 0 <= subOid <= 127:
+ # Optimize for the common case
+ octets += (subOid,)
+
+ elif subOid > 127:
+ # Pack large Sub-Object IDs
+ res = (subOid & 0x7f,)
+ subOid >>= 7
+
+ while subOid:
+ res = (0x80 | (subOid & 0x7f),) + res
+ subOid >>= 7
+
+ # Add packed Sub-Object ID to resulted RELATIVE-OID
+ octets += res
+
+ else:
+ raise error.PyAsn1Error('Negative RELATIVE-OID arc %s at %s' % (subOid, value))
+
+ return octets, False, False
+
+
+class RealEncoder(AbstractItemEncoder):
+ supportIndefLenMode = False
+ binEncBase = 2 # set to None to choose encoding base automatically
+
+ @staticmethod
+ def _dropFloatingPoint(m, encbase, e):
+ ms, es = 1, 1
+ if m < 0:
+ ms = -1 # mantissa sign
+
+ if e < 0:
+ es = -1 # exponent sign
+
+ m *= ms
+
+ if encbase == 8:
+ m *= 2 ** (abs(e) % 3 * es)
+ e = abs(e) // 3 * es
+
+ elif encbase == 16:
+ m *= 2 ** (abs(e) % 4 * es)
+ e = abs(e) // 4 * es
+
+ while True:
+ if int(m) != m:
+ m *= encbase
+ e -= 1
+ continue
+ break
+
+ return ms, int(m), encbase, e
+
+ def _chooseEncBase(self, value):
+ m, b, e = value
+ encBase = [2, 8, 16]
+ if value.binEncBase in encBase:
+ return self._dropFloatingPoint(m, value.binEncBase, e)
+
+ elif self.binEncBase in encBase:
+ return self._dropFloatingPoint(m, self.binEncBase, e)
+
+ # auto choosing base 2/8/16
+ mantissa = [m, m, m]
+ exponent = [e, e, e]
+ sign = 1
+ encbase = 2
+ e = float('inf')
+
+ for i in range(3):
+ (sign,
+ mantissa[i],
+ encBase[i],
+ exponent[i]) = self._dropFloatingPoint(mantissa[i], encBase[i], exponent[i])
+
+ if abs(exponent[i]) < abs(e) or (abs(exponent[i]) == abs(e) and mantissa[i] < m):
+ e = exponent[i]
+ m = int(mantissa[i])
+ encbase = encBase[i]
+
+ if LOG:
+ LOG('automatically chosen REAL encoding base %s, sign %s, mantissa %s, '
+ 'exponent %s' % (encbase, sign, m, e))
+
+ return sign, m, encbase, e
+
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ if asn1Spec is not None:
+ value = asn1Spec.clone(value)
+
+ if value.isPlusInf:
+ return (0x40,), False, False
+
+ if value.isMinusInf:
+ return (0x41,), False, False
+
+ m, b, e = value
+
+ if not m:
+ return b'', False, True
+
+ if b == 10:
+ if LOG:
+ LOG('encoding REAL into character form')
+
+ return b'\x03%dE%s%d' % (m, e == 0 and b'+' or b'', e), False, True
+
+ elif b == 2:
+ fo = 0x80 # binary encoding
+ ms, m, encbase, e = self._chooseEncBase(value)
+
+ if ms < 0: # mantissa sign
+ fo |= 0x40 # sign bit
+
+ # exponent & mantissa normalization
+ if encbase == 2:
+ while m & 0x1 == 0:
+ m >>= 1
+ e += 1
+
+ elif encbase == 8:
+ while m & 0x7 == 0:
+ m >>= 3
+ e += 1
+ fo |= 0x10
+
+ else: # encbase = 16
+ while m & 0xf == 0:
+ m >>= 4
+ e += 1
+ fo |= 0x20
+
+ sf = 0 # scale factor
+
+ while m & 0x1 == 0:
+ m >>= 1
+ sf += 1
+
+ if sf > 3:
+ raise error.PyAsn1Error('Scale factor overflow') # bug if raised
+
+ fo |= sf << 2
+ eo = b''
+ if e == 0 or e == -1:
+ eo = bytes((e & 0xff,))
+
+ else:
+ while e not in (0, -1):
+ eo = bytes((e & 0xff,)) + eo
+ e >>= 8
+
+ if e == 0 and eo and eo[0] & 0x80:
+ eo = bytes((0,)) + eo
+
+ if e == -1 and eo and not (eo[0] & 0x80):
+ eo = bytes((0xff,)) + eo
+
+ n = len(eo)
+ if n > 0xff:
+ raise error.PyAsn1Error('Real exponent overflow')
+
+ if n == 1:
+ pass
+
+ elif n == 2:
+ fo |= 1
+
+ elif n == 3:
+ fo |= 2
+
+ else:
+ fo |= 3
+ eo = bytes((n & 0xff,)) + eo
+
+ po = b''
+
+ while m:
+ po = bytes((m & 0xff,)) + po
+ m >>= 8
+
+ substrate = bytes((fo,)) + eo + po
+
+ return substrate, False, True
+
+ else:
+ raise error.PyAsn1Error('Prohibited Real base %s' % b)
+
+
+class SequenceEncoder(AbstractItemEncoder):
+ omitEmptyOptionals = False
+
+ # TODO: handling three flavors of input is too much -- split over codecs
+
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+
+ substrate = b''
+
+ omitEmptyOptionals = options.get(
+ 'omitEmptyOptionals', self.omitEmptyOptionals)
+
+ if LOG:
+ LOG('%sencoding empty OPTIONAL components' % (
+ omitEmptyOptionals and 'not ' or ''))
+
+ if asn1Spec is None:
+ # instance of ASN.1 schema
+ inconsistency = value.isInconsistent
+ if inconsistency:
+ raise error.PyAsn1Error(
+ f"ASN.1 object {value.__class__.__name__} is inconsistent")
+
+ namedTypes = value.componentType
+
+ for idx, component in enumerate(value.values()):
+ if namedTypes:
+ namedType = namedTypes[idx]
+
+ if namedType.isOptional and not component.isValue:
+ if LOG:
+ LOG('not encoding OPTIONAL component %r' % (namedType,))
+ continue
+
+ if namedType.isDefaulted and component == namedType.asn1Object:
+ if LOG:
+ LOG('not encoding DEFAULT component %r' % (namedType,))
+ continue
+
+ if omitEmptyOptionals:
+ options.update(ifNotEmpty=namedType.isOptional)
+
+ # wrap open type blob if needed
+ if namedTypes and namedType.openType:
+
+ wrapType = namedType.asn1Object
+
+ if wrapType.typeId in (
+ univ.SetOf.typeId, univ.SequenceOf.typeId):
+
+ substrate += encodeFun(
+ component, asn1Spec,
+ **dict(options, wrapType=wrapType.componentType))
+
+ else:
+ chunk = encodeFun(component, asn1Spec, **options)
+
+ if wrapType.isSameTypeWith(component):
+ substrate += chunk
+
+ else:
+ substrate += encodeFun(chunk, wrapType, **options)
+
+ if LOG:
+ LOG('wrapped with wrap type %r' % (wrapType,))
+
+ else:
+ substrate += encodeFun(component, asn1Spec, **options)
+
+ else:
+ # bare Python value + ASN.1 schema
+ for idx, namedType in enumerate(asn1Spec.componentType.namedTypes):
+
+ try:
+ component = value[namedType.name]
+
+ except KeyError:
+ raise error.PyAsn1Error('Component name "%s" not found in %r' % (
+ namedType.name, value))
+
+ if namedType.isOptional and namedType.name not in value:
+ if LOG:
+ LOG('not encoding OPTIONAL component %r' % (namedType,))
+ continue
+
+ if namedType.isDefaulted and component == namedType.asn1Object:
+ if LOG:
+ LOG('not encoding DEFAULT component %r' % (namedType,))
+ continue
+
+ if omitEmptyOptionals:
+ options.update(ifNotEmpty=namedType.isOptional)
+
+ componentSpec = namedType.asn1Object
+
+ # wrap open type blob if needed
+ if namedType.openType:
+
+ if componentSpec.typeId in (
+ univ.SetOf.typeId, univ.SequenceOf.typeId):
+
+ substrate += encodeFun(
+ component, componentSpec,
+ **dict(options, wrapType=componentSpec.componentType))
+
+ else:
+ chunk = encodeFun(component, componentSpec, **options)
+
+ if componentSpec.isSameTypeWith(component):
+ substrate += chunk
+
+ else:
+ substrate += encodeFun(chunk, componentSpec, **options)
+
+ if LOG:
+ LOG('wrapped with wrap type %r' % (componentSpec,))
+
+ else:
+ substrate += encodeFun(component, componentSpec, **options)
+
+ return substrate, True, True
+
+
+class SequenceOfEncoder(AbstractItemEncoder):
+ def _encodeComponents(self, value, asn1Spec, encodeFun, **options):
+
+ if asn1Spec is None:
+ inconsistency = value.isInconsistent
+ if inconsistency:
+ raise error.PyAsn1Error(
+ f"ASN.1 object {value.__class__.__name__} is inconsistent")
+
+ else:
+ asn1Spec = asn1Spec.componentType
+
+ chunks = []
+
+ wrapType = options.pop('wrapType', None)
+
+ for idx, component in enumerate(value):
+ chunk = encodeFun(component, asn1Spec, **options)
+
+ if (wrapType is not None and
+ not wrapType.isSameTypeWith(component)):
+ # wrap encoded value with wrapper container (e.g. ANY)
+ chunk = encodeFun(chunk, wrapType, **options)
+
+ if LOG:
+ LOG('wrapped with wrap type %r' % (wrapType,))
+
+ chunks.append(chunk)
+
+ return chunks
+
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ chunks = self._encodeComponents(
+ value, asn1Spec, encodeFun, **options)
+
+ return b''.join(chunks), True, True
+
+
+class ChoiceEncoder(AbstractItemEncoder):
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ if asn1Spec is None:
+ component = value.getComponent()
+ else:
+ names = [namedType.name for namedType in asn1Spec.componentType.namedTypes
+ if namedType.name in value]
+ if len(names) != 1:
+ raise error.PyAsn1Error('%s components for Choice at %r' % (len(names) and 'Multiple ' or 'None ', value))
+
+ name = names[0]
+
+ component = value[name]
+ asn1Spec = asn1Spec[name]
+
+ return encodeFun(component, asn1Spec, **options), True, True
+
+
+class AnyEncoder(OctetStringEncoder):
+ def encodeValue(self, value, asn1Spec, encodeFun, **options):
+ if asn1Spec is None:
+ value = value.asOctets()
+ elif not isinstance(value, bytes):
+ value = asn1Spec.clone(value).asOctets()
+
+ return value, not options.get('defMode', True), True
+
+
+TAG_MAP = {
+ eoo.endOfOctets.tagSet: EndOfOctetsEncoder(),
+ univ.Boolean.tagSet: BooleanEncoder(),
+ univ.Integer.tagSet: IntegerEncoder(),
+ univ.BitString.tagSet: BitStringEncoder(),
+ univ.OctetString.tagSet: OctetStringEncoder(),
+ univ.Null.tagSet: NullEncoder(),
+ univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(),
+ univ.RelativeOID.tagSet: RelativeOIDEncoder(),
+ univ.Enumerated.tagSet: IntegerEncoder(),
+ univ.Real.tagSet: RealEncoder(),
+ # Sequence & Set have same tags as SequenceOf & SetOf
+ univ.SequenceOf.tagSet: SequenceOfEncoder(),
+ univ.SetOf.tagSet: SequenceOfEncoder(),
+ univ.Choice.tagSet: ChoiceEncoder(),
+ # character string types
+ char.UTF8String.tagSet: OctetStringEncoder(),
+ char.NumericString.tagSet: OctetStringEncoder(),
+ char.PrintableString.tagSet: OctetStringEncoder(),
+ char.TeletexString.tagSet: OctetStringEncoder(),
+ char.VideotexString.tagSet: OctetStringEncoder(),
+ char.IA5String.tagSet: OctetStringEncoder(),
+ char.GraphicString.tagSet: OctetStringEncoder(),
+ char.VisibleString.tagSet: OctetStringEncoder(),
+ char.GeneralString.tagSet: OctetStringEncoder(),
+ char.UniversalString.tagSet: OctetStringEncoder(),
+ char.BMPString.tagSet: OctetStringEncoder(),
+ # useful types
+ useful.ObjectDescriptor.tagSet: OctetStringEncoder(),
+ useful.GeneralizedTime.tagSet: OctetStringEncoder(),
+ useful.UTCTime.tagSet: OctetStringEncoder()
+}
+
+# Put in ambiguous & non-ambiguous types for faster codec lookup
+TYPE_MAP = {
+ univ.Boolean.typeId: BooleanEncoder(),
+ univ.Integer.typeId: IntegerEncoder(),
+ univ.BitString.typeId: BitStringEncoder(),
+ univ.OctetString.typeId: OctetStringEncoder(),
+ univ.Null.typeId: NullEncoder(),
+ univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(),
+ univ.RelativeOID.typeId: RelativeOIDEncoder(),
+ univ.Enumerated.typeId: IntegerEncoder(),
+ univ.Real.typeId: RealEncoder(),
+ # Sequence & Set have same tags as SequenceOf & SetOf
+ univ.Set.typeId: SequenceEncoder(),
+ univ.SetOf.typeId: SequenceOfEncoder(),
+ univ.Sequence.typeId: SequenceEncoder(),
+ univ.SequenceOf.typeId: SequenceOfEncoder(),
+ univ.Choice.typeId: ChoiceEncoder(),
+ univ.Any.typeId: AnyEncoder(),
+ # character string types
+ char.UTF8String.typeId: OctetStringEncoder(),
+ char.NumericString.typeId: OctetStringEncoder(),
+ char.PrintableString.typeId: OctetStringEncoder(),
+ char.TeletexString.typeId: OctetStringEncoder(),
+ char.VideotexString.typeId: OctetStringEncoder(),
+ char.IA5String.typeId: OctetStringEncoder(),
+ char.GraphicString.typeId: OctetStringEncoder(),
+ char.VisibleString.typeId: OctetStringEncoder(),
+ char.GeneralString.typeId: OctetStringEncoder(),
+ char.UniversalString.typeId: OctetStringEncoder(),
+ char.BMPString.typeId: OctetStringEncoder(),
+ # useful types
+ useful.ObjectDescriptor.typeId: OctetStringEncoder(),
+ useful.GeneralizedTime.typeId: OctetStringEncoder(),
+ useful.UTCTime.typeId: OctetStringEncoder()
+}
+
+
+class SingleItemEncoder(object):
+ fixedDefLengthMode = None
+ fixedChunkSize = None
+
+ TAG_MAP = TAG_MAP
+ TYPE_MAP = TYPE_MAP
+
+ def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored):
+ self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP
+ self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP
+
+ def __call__(self, value, asn1Spec=None, **options):
+ try:
+ if asn1Spec is None:
+ typeId = value.typeId
+ else:
+ typeId = asn1Spec.typeId
+
+ except AttributeError:
+ raise error.PyAsn1Error('Value %r is not ASN.1 type instance '
+ 'and "asn1Spec" not given' % (value,))
+
+ if LOG:
+ LOG('encoder called in %sdef mode, chunk size %s for type %s, '
+ 'value:\n%s' % (not options.get('defMode', True) and 'in' or '',
+ options.get('maxChunkSize', 0),
+ asn1Spec is None and value.prettyPrintType() or
+ asn1Spec.prettyPrintType(), value))
+
+ if self.fixedDefLengthMode is not None:
+ options.update(defMode=self.fixedDefLengthMode)
+
+ if self.fixedChunkSize is not None:
+ options.update(maxChunkSize=self.fixedChunkSize)
+
+ try:
+ concreteEncoder = self._typeMap[typeId]
+
+ if LOG:
+ LOG('using value codec %s chosen by type ID '
+ '%s' % (concreteEncoder.__class__.__name__, typeId))
+
+ except KeyError:
+ if asn1Spec is None:
+ tagSet = value.tagSet
+ else:
+ tagSet = asn1Spec.tagSet
+
+ # use base type for codec lookup to recover untagged types
+ baseTagSet = tag.TagSet(tagSet.baseTag, tagSet.baseTag)
+
+ try:
+ concreteEncoder = self._tagMap[baseTagSet]
+
+ except KeyError:
+ raise error.PyAsn1Error('No encoder for %r (%s)' % (value, tagSet))
+
+ if LOG:
+ LOG('using value codec %s chosen by tagSet '
+ '%s' % (concreteEncoder.__class__.__name__, tagSet))
+
+ substrate = concreteEncoder.encode(value, asn1Spec, self, **options)
+
+ if LOG:
+ LOG('codec %s built %s octets of substrate: %s\nencoder '
+ 'completed' % (concreteEncoder, len(substrate),
+ debug.hexdump(substrate)))
+
+ return substrate
+
+
+class Encoder(object):
+ SINGLE_ITEM_ENCODER = SingleItemEncoder
+
+ def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **options):
+ self._singleItemEncoder = self.SINGLE_ITEM_ENCODER(
+ tagMap=tagMap, typeMap=typeMap, **options
+ )
+
+ def __call__(self, pyObject, asn1Spec=None, **options):
+ return self._singleItemEncoder(
+ pyObject, asn1Spec=asn1Spec, **options)
+
+
+#: Turns ASN.1 object into BER octet stream.
+#:
+#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
+#: walks all its components recursively and produces a BER octet stream.
+#:
+#: Parameters
+#: ----------
+#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
+#: A Python or pyasn1 object to encode. If Python object is given, `asnSpec`
+#: parameter is required to guide the encoding process.
+#:
+#: Keyword Args
+#: ------------
+#: asn1Spec:
+#: Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
+#:
+#: defMode: :py:class:`bool`
+#: If :obj:`False`, produces indefinite length encoding
+#:
+#: maxChunkSize: :py:class:`int`
+#: Maximum chunk size in chunked encoding mode (0 denotes unlimited chunk size)
+#:
+#: Returns
+#: -------
+#: : :py:class:`bytes`
+#: Given ASN.1 object encoded into BER octetstream
+#:
+#: Raises
+#: ------
+#: ~pyasn1.error.PyAsn1Error
+#: On encoding errors
+#:
+#: Examples
+#: --------
+#: Encode Python value into BER with ASN.1 schema
+#:
+#: .. code-block:: pycon
+#:
+#: >>> seq = SequenceOf(componentType=Integer())
+#: >>> encode([1, 2, 3], asn1Spec=seq)
+#: b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03'
+#:
+#: Encode ASN.1 value object into BER
+#:
+#: .. code-block:: pycon
+#:
+#: >>> seq = SequenceOf(componentType=Integer())
+#: >>> seq.extend([1, 2, 3])
+#: >>> encode(seq)
+#: b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03'
+#:
+encode = Encoder()
+
+def __getattr__(attr: str):
+ if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr):
+ warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning)
+ return globals()[newAttr]
+ raise AttributeError(attr)
diff --git a/.venv/lib/python3.12/site-packages/pyasn1/codec/ber/eoo.py b/.venv/lib/python3.12/site-packages/pyasn1/codec/ber/eoo.py
new file mode 100644
index 00000000..8c91a3d2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pyasn1/codec/ber/eoo.py
@@ -0,0 +1,28 @@
+#
+# This file is part of pyasn1 software.
+#
+# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
+# License: https://pyasn1.readthedocs.io/en/latest/license.html
+#
+from pyasn1.type import base
+from pyasn1.type import tag
+
+__all__ = ['endOfOctets']
+
+
+class EndOfOctets(base.SimpleAsn1Type):
+ defaultValue = 0
+ tagSet = tag.initTagSet(
+ tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 0x00)
+ )
+
+ _instance = None
+
+ def __new__(cls, *args, **kwargs):
+ if cls._instance is None:
+ cls._instance = object.__new__(cls, *args, **kwargs)
+
+ return cls._instance
+
+
+endOfOctets = EndOfOctets()