aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/asyncpg/pgproto/buffer.pyx
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/asyncpg/pgproto/buffer.pyx
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/asyncpg/pgproto/buffer.pyx')
-rw-r--r--.venv/lib/python3.12/site-packages/asyncpg/pgproto/buffer.pyx817
1 files changed, 817 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/asyncpg/pgproto/buffer.pyx b/.venv/lib/python3.12/site-packages/asyncpg/pgproto/buffer.pyx
new file mode 100644
index 00000000..e05d4c7d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/asyncpg/pgproto/buffer.pyx
@@ -0,0 +1,817 @@
+# Copyright (C) 2016-present the asyncpg authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of asyncpg and is released under
+# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
+
+
+from libc.string cimport memcpy
+
+import collections
+
+class BufferError(Exception):
+ pass
+
+@cython.no_gc_clear
+@cython.final
+@cython.freelist(_BUFFER_FREELIST_SIZE)
+cdef class WriteBuffer:
+
+ def __cinit__(self):
+ self._smallbuf_inuse = True
+ self._buf = self._smallbuf
+ self._size = _BUFFER_INITIAL_SIZE
+ self._length = 0
+ self._message_mode = 0
+
+ def __dealloc__(self):
+ if self._buf is not NULL and not self._smallbuf_inuse:
+ cpython.PyMem_Free(self._buf)
+ self._buf = NULL
+ self._size = 0
+
+ if self._view_count:
+ raise BufferError(
+ 'Deallocating buffer with attached memoryviews')
+
+ def __getbuffer__(self, Py_buffer *buffer, int flags):
+ self._view_count += 1
+
+ cpython.PyBuffer_FillInfo(
+ buffer, self, self._buf, self._length,
+ 1, # read-only
+ flags)
+
+ def __releasebuffer__(self, Py_buffer *buffer):
+ self._view_count -= 1
+
+ cdef inline _check_readonly(self):
+ if self._view_count:
+ raise BufferError('the buffer is in read-only mode')
+
+ cdef inline _ensure_alloced(self, ssize_t extra_length):
+ cdef ssize_t new_size = extra_length + self._length
+
+ if new_size > self._size:
+ self._reallocate(new_size)
+
+ cdef _reallocate(self, ssize_t new_size):
+ cdef char *new_buf
+
+ if new_size < _BUFFER_MAX_GROW:
+ new_size = _BUFFER_MAX_GROW
+ else:
+ # Add a little extra
+ new_size += _BUFFER_INITIAL_SIZE
+
+ if self._smallbuf_inuse:
+ new_buf = <char*>cpython.PyMem_Malloc(
+ sizeof(char) * <size_t>new_size)
+ if new_buf is NULL:
+ self._buf = NULL
+ self._size = 0
+ self._length = 0
+ raise MemoryError
+ memcpy(new_buf, self._buf, <size_t>self._size)
+ self._size = new_size
+ self._buf = new_buf
+ self._smallbuf_inuse = False
+ else:
+ new_buf = <char*>cpython.PyMem_Realloc(
+ <void*>self._buf, <size_t>new_size)
+ if new_buf is NULL:
+ cpython.PyMem_Free(self._buf)
+ self._buf = NULL
+ self._size = 0
+ self._length = 0
+ raise MemoryError
+ self._buf = new_buf
+ self._size = new_size
+
+ cdef inline start_message(self, char type):
+ if self._length != 0:
+ raise BufferError(
+ 'cannot start_message for a non-empty buffer')
+ self._ensure_alloced(5)
+ self._message_mode = 1
+ self._buf[0] = type
+ self._length = 5
+
+ cdef inline end_message(self):
+ # "length-1" to exclude the message type byte
+ cdef ssize_t mlen = self._length - 1
+
+ self._check_readonly()
+ if not self._message_mode:
+ raise BufferError(
+ 'end_message can only be called with start_message')
+ if self._length < 5:
+ raise BufferError('end_message: buffer is too small')
+ if mlen > _MAXINT32:
+ raise BufferError('end_message: message is too large')
+
+ hton.pack_int32(&self._buf[1], <int32_t>mlen)
+ return self
+
+ cdef inline reset(self):
+ self._length = 0
+ self._message_mode = 0
+
+ cdef write_buffer(self, WriteBuffer buf):
+ self._check_readonly()
+
+ if not buf._length:
+ return
+
+ self._ensure_alloced(buf._length)
+ memcpy(self._buf + self._length,
+ <void*>buf._buf,
+ <size_t>buf._length)
+ self._length += buf._length
+
+ cdef write_byte(self, char b):
+ self._check_readonly()
+
+ self._ensure_alloced(1)
+ self._buf[self._length] = b
+ self._length += 1
+
+ cdef write_bytes(self, bytes data):
+ cdef char* buf
+ cdef ssize_t len
+
+ cpython.PyBytes_AsStringAndSize(data, &buf, &len)
+ self.write_cstr(buf, len)
+
+ cdef write_bytestring(self, bytes string):
+ cdef char* buf
+ cdef ssize_t len
+
+ cpython.PyBytes_AsStringAndSize(string, &buf, &len)
+ # PyBytes_AsStringAndSize returns a null-terminated buffer,
+ # but the null byte is not counted in len. hence the + 1
+ self.write_cstr(buf, len + 1)
+
+ cdef write_str(self, str string, str encoding):
+ self.write_bytestring(string.encode(encoding))
+
+ cdef write_len_prefixed_buffer(self, WriteBuffer buf):
+ # Write a length-prefixed (not NULL-terminated) bytes sequence.
+ self.write_int32(<int32_t>buf.len())
+ self.write_buffer(buf)
+
+ cdef write_len_prefixed_bytes(self, bytes data):
+ # Write a length-prefixed (not NULL-terminated) bytes sequence.
+ cdef:
+ char *buf
+ ssize_t size
+
+ cpython.PyBytes_AsStringAndSize(data, &buf, &size)
+ if size > _MAXINT32:
+ raise BufferError('string is too large')
+ # `size` does not account for the NULL at the end.
+ self.write_int32(<int32_t>size)
+ self.write_cstr(buf, size)
+
+ cdef write_frbuf(self, FRBuffer *buf):
+ cdef:
+ ssize_t buf_len = buf.len
+ if buf_len > 0:
+ self.write_cstr(frb_read_all(buf), buf_len)
+
+ cdef write_cstr(self, const char *data, ssize_t len):
+ self._check_readonly()
+ self._ensure_alloced(len)
+
+ memcpy(self._buf + self._length, <void*>data, <size_t>len)
+ self._length += len
+
+ cdef write_int16(self, int16_t i):
+ self._check_readonly()
+ self._ensure_alloced(2)
+
+ hton.pack_int16(&self._buf[self._length], i)
+ self._length += 2
+
+ cdef write_int32(self, int32_t i):
+ self._check_readonly()
+ self._ensure_alloced(4)
+
+ hton.pack_int32(&self._buf[self._length], i)
+ self._length += 4
+
+ cdef write_int64(self, int64_t i):
+ self._check_readonly()
+ self._ensure_alloced(8)
+
+ hton.pack_int64(&self._buf[self._length], i)
+ self._length += 8
+
+ cdef write_float(self, float f):
+ self._check_readonly()
+ self._ensure_alloced(4)
+
+ hton.pack_float(&self._buf[self._length], f)
+ self._length += 4
+
+ cdef write_double(self, double d):
+ self._check_readonly()
+ self._ensure_alloced(8)
+
+ hton.pack_double(&self._buf[self._length], d)
+ self._length += 8
+
+ @staticmethod
+ cdef WriteBuffer new_message(char type):
+ cdef WriteBuffer buf
+ buf = WriteBuffer.__new__(WriteBuffer)
+ buf.start_message(type)
+ return buf
+
+ @staticmethod
+ cdef WriteBuffer new():
+ cdef WriteBuffer buf
+ buf = WriteBuffer.__new__(WriteBuffer)
+ return buf
+
+
+@cython.no_gc_clear
+@cython.final
+@cython.freelist(_BUFFER_FREELIST_SIZE)
+cdef class ReadBuffer:
+
+ def __cinit__(self):
+ self._bufs = collections.deque()
+ self._bufs_append = self._bufs.append
+ self._bufs_popleft = self._bufs.popleft
+ self._bufs_len = 0
+ self._buf0 = None
+ self._buf0_prev = None
+ self._pos0 = 0
+ self._len0 = 0
+ self._length = 0
+
+ self._current_message_type = 0
+ self._current_message_len = 0
+ self._current_message_len_unread = 0
+ self._current_message_ready = 0
+
+ cdef feed_data(self, data):
+ cdef:
+ ssize_t dlen
+ bytes data_bytes
+
+ if not cpython.PyBytes_CheckExact(data):
+ if cpythonx.PyByteArray_CheckExact(data):
+ # ProactorEventLoop in Python 3.10+ seems to be sending
+ # bytearray objects instead of bytes. Handle this here
+ # to avoid duplicating this check in every data_received().
+ data = bytes(data)
+ else:
+ raise BufferError(
+ 'feed_data: a bytes or bytearray object expected')
+
+ # Uncomment the below code to test code paths that
+ # read single int/str/bytes sequences are split over
+ # multiple received buffers.
+ #
+ # ll = 107
+ # if len(data) > ll:
+ # self.feed_data(data[:ll])
+ # self.feed_data(data[ll:])
+ # return
+
+ data_bytes = <bytes>data
+
+ dlen = cpython.Py_SIZE(data_bytes)
+ if dlen == 0:
+ # EOF?
+ return
+
+ self._bufs_append(data_bytes)
+ self._length += dlen
+
+ if self._bufs_len == 0:
+ # First buffer
+ self._len0 = dlen
+ self._buf0 = data_bytes
+
+ self._bufs_len += 1
+
+ cdef inline _ensure_first_buf(self):
+ if PG_DEBUG:
+ if self._len0 == 0:
+ raise BufferError('empty first buffer')
+ if self._length == 0:
+ raise BufferError('empty buffer')
+
+ if self._pos0 == self._len0:
+ self._switch_to_next_buf()
+
+ cdef _switch_to_next_buf(self):
+ # The first buffer is fully read, discard it
+ self._bufs_popleft()
+ self._bufs_len -= 1
+
+ # Shouldn't fail, since we've checked that `_length >= 1`
+ # in _ensure_first_buf()
+ self._buf0_prev = self._buf0
+ self._buf0 = <bytes>self._bufs[0]
+
+ self._pos0 = 0
+ self._len0 = len(self._buf0)
+
+ if PG_DEBUG:
+ if self._len0 < 1:
+ raise BufferError(
+ 'debug: second buffer of ReadBuffer is empty')
+
+ cdef inline const char* _try_read_bytes(self, ssize_t nbytes):
+ # Try to read *nbytes* from the first buffer.
+ #
+ # Returns pointer to data if there is at least *nbytes*
+ # in the buffer, NULL otherwise.
+ #
+ # Important: caller must call _ensure_first_buf() prior
+ # to calling try_read_bytes, and must not overread
+
+ cdef:
+ const char *result
+
+ if PG_DEBUG:
+ if nbytes > self._length:
+ return NULL
+
+ if self._current_message_ready:
+ if self._current_message_len_unread < nbytes:
+ return NULL
+
+ if self._pos0 + nbytes <= self._len0:
+ result = cpython.PyBytes_AS_STRING(self._buf0)
+ result += self._pos0
+ self._pos0 += nbytes
+ self._length -= nbytes
+ if self._current_message_ready:
+ self._current_message_len_unread -= nbytes
+ return result
+ else:
+ return NULL
+
+ cdef inline _read_into(self, char *buf, ssize_t nbytes):
+ cdef:
+ ssize_t nread
+ char *buf0
+
+ while True:
+ buf0 = cpython.PyBytes_AS_STRING(self._buf0)
+
+ if self._pos0 + nbytes > self._len0:
+ nread = self._len0 - self._pos0
+ memcpy(buf, buf0 + self._pos0, <size_t>nread)
+ self._pos0 = self._len0
+ self._length -= nread
+ nbytes -= nread
+ buf += nread
+ self._ensure_first_buf()
+
+ else:
+ memcpy(buf, buf0 + self._pos0, <size_t>nbytes)
+ self._pos0 += nbytes
+ self._length -= nbytes
+ break
+
+ cdef inline _read_and_discard(self, ssize_t nbytes):
+ cdef:
+ ssize_t nread
+
+ self._ensure_first_buf()
+ while True:
+ if self._pos0 + nbytes > self._len0:
+ nread = self._len0 - self._pos0
+ self._pos0 = self._len0
+ self._length -= nread
+ nbytes -= nread
+ self._ensure_first_buf()
+
+ else:
+ self._pos0 += nbytes
+ self._length -= nbytes
+ break
+
+ cdef bytes read_bytes(self, ssize_t nbytes):
+ cdef:
+ bytes result
+ ssize_t nread
+ const char *cbuf
+ char *buf
+
+ self._ensure_first_buf()
+ cbuf = self._try_read_bytes(nbytes)
+ if cbuf != NULL:
+ return cpython.PyBytes_FromStringAndSize(cbuf, nbytes)
+
+ if nbytes > self._length:
+ raise BufferError(
+ 'not enough data to read {} bytes'.format(nbytes))
+
+ if self._current_message_ready:
+ self._current_message_len_unread -= nbytes
+ if self._current_message_len_unread < 0:
+ raise BufferError('buffer overread')
+
+ result = cpython.PyBytes_FromStringAndSize(NULL, nbytes)
+ buf = cpython.PyBytes_AS_STRING(result)
+ self._read_into(buf, nbytes)
+ return result
+
+ cdef bytes read_len_prefixed_bytes(self):
+ cdef int32_t size = self.read_int32()
+ if size < 0:
+ raise BufferError(
+ 'negative length for a len-prefixed bytes value')
+ if size == 0:
+ return b''
+ return self.read_bytes(size)
+
+ cdef str read_len_prefixed_utf8(self):
+ cdef:
+ int32_t size
+ const char *cbuf
+
+ size = self.read_int32()
+ if size < 0:
+ raise BufferError(
+ 'negative length for a len-prefixed bytes value')
+
+ if size == 0:
+ return ''
+
+ self._ensure_first_buf()
+ cbuf = self._try_read_bytes(size)
+ if cbuf != NULL:
+ return cpython.PyUnicode_DecodeUTF8(cbuf, size, NULL)
+ else:
+ return self.read_bytes(size).decode('utf-8')
+
+ cdef read_uuid(self):
+ cdef:
+ bytes mem
+ const char *cbuf
+
+ self._ensure_first_buf()
+ cbuf = self._try_read_bytes(16)
+ if cbuf != NULL:
+ return pg_uuid_from_buf(cbuf)
+ else:
+ return pg_UUID(self.read_bytes(16))
+
+ cdef inline char read_byte(self) except? -1:
+ cdef const char *first_byte
+
+ if PG_DEBUG:
+ if not self._buf0:
+ raise BufferError(
+ 'debug: first buffer of ReadBuffer is empty')
+
+ self._ensure_first_buf()
+ first_byte = self._try_read_bytes(1)
+ if first_byte is NULL:
+ raise BufferError('not enough data to read one byte')
+
+ return first_byte[0]
+
+ cdef inline int64_t read_int64(self) except? -1:
+ cdef:
+ bytes mem
+ const char *cbuf
+
+ self._ensure_first_buf()
+ cbuf = self._try_read_bytes(8)
+ if cbuf != NULL:
+ return hton.unpack_int64(cbuf)
+ else:
+ mem = self.read_bytes(8)
+ return hton.unpack_int64(cpython.PyBytes_AS_STRING(mem))
+
+ cdef inline int32_t read_int32(self) except? -1:
+ cdef:
+ bytes mem
+ const char *cbuf
+
+ self._ensure_first_buf()
+ cbuf = self._try_read_bytes(4)
+ if cbuf != NULL:
+ return hton.unpack_int32(cbuf)
+ else:
+ mem = self.read_bytes(4)
+ return hton.unpack_int32(cpython.PyBytes_AS_STRING(mem))
+
+ cdef inline int16_t read_int16(self) except? -1:
+ cdef:
+ bytes mem
+ const char *cbuf
+
+ self._ensure_first_buf()
+ cbuf = self._try_read_bytes(2)
+ if cbuf != NULL:
+ return hton.unpack_int16(cbuf)
+ else:
+ mem = self.read_bytes(2)
+ return hton.unpack_int16(cpython.PyBytes_AS_STRING(mem))
+
+ cdef inline read_null_str(self):
+ if not self._current_message_ready:
+ raise BufferError(
+ 'read_null_str only works when the message guaranteed '
+ 'to be in the buffer')
+
+ cdef:
+ ssize_t pos
+ ssize_t nread
+ bytes result
+ const char *buf
+ const char *buf_start
+
+ self._ensure_first_buf()
+
+ buf_start = cpython.PyBytes_AS_STRING(self._buf0)
+ buf = buf_start + self._pos0
+ while buf - buf_start < self._len0:
+ if buf[0] == 0:
+ pos = buf - buf_start
+ nread = pos - self._pos0
+ buf = self._try_read_bytes(nread + 1)
+ if buf != NULL:
+ return cpython.PyBytes_FromStringAndSize(buf, nread)
+ else:
+ break
+ else:
+ buf += 1
+
+ result = b''
+ while True:
+ pos = self._buf0.find(b'\x00', self._pos0)
+ if pos >= 0:
+ result += self._buf0[self._pos0 : pos]
+ nread = pos - self._pos0 + 1
+ self._pos0 = pos + 1
+ self._length -= nread
+
+ self._current_message_len_unread -= nread
+ if self._current_message_len_unread < 0:
+ raise BufferError(
+ 'read_null_str: buffer overread')
+
+ return result
+
+ else:
+ result += self._buf0[self._pos0:]
+ nread = self._len0 - self._pos0
+ self._pos0 = self._len0
+ self._length -= nread
+
+ self._current_message_len_unread -= nread
+ if self._current_message_len_unread < 0:
+ raise BufferError(
+ 'read_null_str: buffer overread')
+
+ self._ensure_first_buf()
+
+ cdef int32_t take_message(self) except -1:
+ cdef:
+ const char *cbuf
+
+ if self._current_message_ready:
+ return 1
+
+ if self._current_message_type == 0:
+ if self._length < 1:
+ return 0
+ self._ensure_first_buf()
+ cbuf = self._try_read_bytes(1)
+ if cbuf == NULL:
+ raise BufferError(
+ 'failed to read one byte on a non-empty buffer')
+ self._current_message_type = cbuf[0]
+
+ if self._current_message_len == 0:
+ if self._length < 4:
+ return 0
+
+ self._ensure_first_buf()
+ cbuf = self._try_read_bytes(4)
+ if cbuf != NULL:
+ self._current_message_len = hton.unpack_int32(cbuf)
+ else:
+ self._current_message_len = self.read_int32()
+
+ self._current_message_len_unread = self._current_message_len - 4
+
+ if self._length < self._current_message_len_unread:
+ return 0
+
+ self._current_message_ready = 1
+ return 1
+
+ cdef inline int32_t take_message_type(self, char mtype) except -1:
+ cdef const char *buf0
+
+ if self._current_message_ready:
+ return self._current_message_type == mtype
+ elif self._length >= 1:
+ self._ensure_first_buf()
+ buf0 = cpython.PyBytes_AS_STRING(self._buf0)
+
+ return buf0[self._pos0] == mtype and self.take_message()
+ else:
+ return 0
+
+ cdef int32_t put_message(self) except -1:
+ if not self._current_message_ready:
+ raise BufferError(
+ 'cannot put message: no message taken')
+ self._current_message_ready = False
+ return 0
+
+ cdef inline const char* try_consume_message(self, ssize_t* len):
+ cdef:
+ ssize_t buf_len
+ const char *buf
+
+ if not self._current_message_ready:
+ return NULL
+
+ self._ensure_first_buf()
+ buf_len = self._current_message_len_unread
+ buf = self._try_read_bytes(buf_len)
+ if buf != NULL:
+ len[0] = buf_len
+ self._finish_message()
+ return buf
+
+ cdef discard_message(self):
+ if not self._current_message_ready:
+ raise BufferError('no message to discard')
+ if self._current_message_len_unread > 0:
+ self._read_and_discard(self._current_message_len_unread)
+ self._current_message_len_unread = 0
+ self._finish_message()
+
+ cdef bytes consume_message(self):
+ if not self._current_message_ready:
+ raise BufferError('no message to consume')
+ if self._current_message_len_unread > 0:
+ mem = self.read_bytes(self._current_message_len_unread)
+ else:
+ mem = b''
+ self._finish_message()
+ return mem
+
+ cdef redirect_messages(self, WriteBuffer buf, char mtype,
+ int stop_at=0):
+ if not self._current_message_ready:
+ raise BufferError(
+ 'consume_full_messages called on a buffer without a '
+ 'complete first message')
+ if mtype != self._current_message_type:
+ raise BufferError(
+ 'consume_full_messages called with a wrong mtype')
+ if self._current_message_len_unread != self._current_message_len - 4:
+ raise BufferError(
+ 'consume_full_messages called on a partially read message')
+
+ cdef:
+ const char* cbuf
+ ssize_t cbuf_len
+ int32_t msg_len
+ ssize_t new_pos0
+ ssize_t pos_delta
+ int32_t done
+
+ while True:
+ buf.write_byte(mtype)
+ buf.write_int32(self._current_message_len)
+
+ cbuf = self.try_consume_message(&cbuf_len)
+ if cbuf != NULL:
+ buf.write_cstr(cbuf, cbuf_len)
+ else:
+ buf.write_bytes(self.consume_message())
+
+ if self._length > 0:
+ self._ensure_first_buf()
+ else:
+ return
+
+ if stop_at and buf._length >= stop_at:
+ return
+
+ # Fast path: exhaust buf0 as efficiently as possible.
+ if self._pos0 + 5 <= self._len0:
+ cbuf = cpython.PyBytes_AS_STRING(self._buf0)
+ new_pos0 = self._pos0
+ cbuf_len = self._len0
+
+ done = 0
+ # Scan the first buffer and find the position of the
+ # end of the last "mtype" message.
+ while new_pos0 + 5 <= cbuf_len:
+ if (cbuf + new_pos0)[0] != mtype:
+ done = 1
+ break
+ if (stop_at and
+ (buf._length + new_pos0 - self._pos0) > stop_at):
+ done = 1
+ break
+ msg_len = hton.unpack_int32(cbuf + new_pos0 + 1) + 1
+ if new_pos0 + msg_len > cbuf_len:
+ break
+ new_pos0 += msg_len
+
+ if new_pos0 != self._pos0:
+ assert self._pos0 < new_pos0 <= self._len0
+
+ pos_delta = new_pos0 - self._pos0
+ buf.write_cstr(
+ cbuf + self._pos0,
+ pos_delta)
+
+ self._pos0 = new_pos0
+ self._length -= pos_delta
+
+ assert self._length >= 0
+
+ if done:
+ # The next message is of a different type.
+ return
+
+ # Back to slow path.
+ if not self.take_message_type(mtype):
+ return
+
+ cdef bytearray consume_messages(self, char mtype):
+ """Consume consecutive messages of the same type."""
+ cdef:
+ char *buf
+ ssize_t nbytes
+ ssize_t total_bytes = 0
+ bytearray result
+
+ if not self.take_message_type(mtype):
+ return None
+
+ # consume_messages is a volume-oriented method, so
+ # we assume that the remainder of the buffer will contain
+ # messages of the requested type.
+ result = cpythonx.PyByteArray_FromStringAndSize(NULL, self._length)
+ buf = cpythonx.PyByteArray_AsString(result)
+
+ while self.take_message_type(mtype):
+ self._ensure_first_buf()
+ nbytes = self._current_message_len_unread
+ self._read_into(buf, nbytes)
+ buf += nbytes
+ total_bytes += nbytes
+ self._finish_message()
+
+ # Clamp the result to an actual size read.
+ cpythonx.PyByteArray_Resize(result, total_bytes)
+
+ return result
+
+ cdef finish_message(self):
+ if self._current_message_type == 0 or not self._current_message_ready:
+ # The message has already been finished (e.g by consume_message()),
+ # or has been put back by put_message().
+ return
+
+ if self._current_message_len_unread:
+ if PG_DEBUG:
+ mtype = chr(self._current_message_type)
+
+ discarded = self.consume_message()
+
+ if PG_DEBUG:
+ print('!!! discarding message {!r} unread data: {!r}'.format(
+ mtype,
+ discarded))
+
+ self._finish_message()
+
+ cdef inline _finish_message(self):
+ self._current_message_type = 0
+ self._current_message_len = 0
+ self._current_message_ready = 0
+ self._current_message_len_unread = 0
+
+ @staticmethod
+ cdef ReadBuffer new_message_parser(object data):
+ cdef ReadBuffer buf
+
+ buf = ReadBuffer.__new__(ReadBuffer)
+ buf.feed_data(data)
+
+ buf._current_message_ready = 1
+ buf._current_message_len_unread = buf._len0
+
+ return buf