diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/asyncpg/pgproto/codecs/pg_snapshot.pyx')
-rw-r--r-- | .venv/lib/python3.12/site-packages/asyncpg/pgproto/codecs/pg_snapshot.pyx | 63 |
1 files changed, 63 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/asyncpg/pgproto/codecs/pg_snapshot.pyx b/.venv/lib/python3.12/site-packages/asyncpg/pgproto/codecs/pg_snapshot.pyx new file mode 100644 index 00000000..d96107cc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asyncpg/pgproto/codecs/pg_snapshot.pyx @@ -0,0 +1,63 @@ +# Copyright (C) 2016-present the asyncpg authors and contributors +# <see AUTHORS file> +# +# This module is part of asyncpg and is released under +# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0 + + +cdef pg_snapshot_encode(CodecContext settings, WriteBuffer buf, obj): + cdef: + ssize_t nxip + uint64_t xmin + uint64_t xmax + int i + WriteBuffer xip_buf = WriteBuffer.new() + + if not (cpython.PyTuple_Check(obj) or cpython.PyList_Check(obj)): + raise TypeError( + 'list or tuple expected (got type {})'.format(type(obj))) + + if len(obj) != 3: + raise ValueError( + 'invalid number of elements in txid_snapshot tuple, expecting 4') + + nxip = len(obj[2]) + if nxip > _MAXINT32: + raise ValueError('txid_snapshot value is too long') + + xmin = obj[0] + xmax = obj[1] + + for i in range(nxip): + xip_buf.write_int64( + <int64_t>cpython.PyLong_AsUnsignedLongLong(obj[2][i])) + + buf.write_int32(20 + xip_buf.len()) + + buf.write_int32(<int32_t>nxip) + buf.write_int64(<int64_t>xmin) + buf.write_int64(<int64_t>xmax) + buf.write_buffer(xip_buf) + + +cdef pg_snapshot_decode(CodecContext settings, FRBuffer *buf): + cdef: + int32_t nxip + uint64_t xmin + uint64_t xmax + tuple xip_tup + int32_t i + object xip + + nxip = hton.unpack_int32(frb_read(buf, 4)) + xmin = <uint64_t>hton.unpack_int64(frb_read(buf, 8)) + xmax = <uint64_t>hton.unpack_int64(frb_read(buf, 8)) + + xip_tup = cpython.PyTuple_New(nxip) + for i in range(nxip): + xip = cpython.PyLong_FromUnsignedLongLong( + <uint64_t>hton.unpack_int64(frb_read(buf, 8))) + cpython.Py_INCREF(xip) + cpython.PyTuple_SET_ITEM(xip_tup, i, xip) + + return (xmin, xmax, xip_tup) |