aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/asyncpg/pgproto/codecs/pg_snapshot.pyx
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/asyncpg/pgproto/codecs/pg_snapshot.pyx')
-rw-r--r--.venv/lib/python3.12/site-packages/asyncpg/pgproto/codecs/pg_snapshot.pyx63
1 files changed, 63 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/asyncpg/pgproto/codecs/pg_snapshot.pyx b/.venv/lib/python3.12/site-packages/asyncpg/pgproto/codecs/pg_snapshot.pyx
new file mode 100644
index 00000000..d96107cc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/asyncpg/pgproto/codecs/pg_snapshot.pyx
@@ -0,0 +1,63 @@
+# Copyright (C) 2016-present the asyncpg authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of asyncpg and is released under
+# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
+
+
+cdef pg_snapshot_encode(CodecContext settings, WriteBuffer buf, obj):
+ cdef:
+ ssize_t nxip
+ uint64_t xmin
+ uint64_t xmax
+ int i
+ WriteBuffer xip_buf = WriteBuffer.new()
+
+ if not (cpython.PyTuple_Check(obj) or cpython.PyList_Check(obj)):
+ raise TypeError(
+ 'list or tuple expected (got type {})'.format(type(obj)))
+
+ if len(obj) != 3:
+ raise ValueError(
+ 'invalid number of elements in txid_snapshot tuple, expecting 4')
+
+ nxip = len(obj[2])
+ if nxip > _MAXINT32:
+ raise ValueError('txid_snapshot value is too long')
+
+ xmin = obj[0]
+ xmax = obj[1]
+
+ for i in range(nxip):
+ xip_buf.write_int64(
+ <int64_t>cpython.PyLong_AsUnsignedLongLong(obj[2][i]))
+
+ buf.write_int32(20 + xip_buf.len())
+
+ buf.write_int32(<int32_t>nxip)
+ buf.write_int64(<int64_t>xmin)
+ buf.write_int64(<int64_t>xmax)
+ buf.write_buffer(xip_buf)
+
+
+cdef pg_snapshot_decode(CodecContext settings, FRBuffer *buf):
+ cdef:
+ int32_t nxip
+ uint64_t xmin
+ uint64_t xmax
+ tuple xip_tup
+ int32_t i
+ object xip
+
+ nxip = hton.unpack_int32(frb_read(buf, 4))
+ xmin = <uint64_t>hton.unpack_int64(frb_read(buf, 8))
+ xmax = <uint64_t>hton.unpack_int64(frb_read(buf, 8))
+
+ xip_tup = cpython.PyTuple_New(nxip)
+ for i in range(nxip):
+ xip = cpython.PyLong_FromUnsignedLongLong(
+ <uint64_t>hton.unpack_int64(frb_read(buf, 8)))
+ cpython.Py_INCREF(xip)
+ cpython.PyTuple_SET_ITEM(xip_tup, i, xip)
+
+ return (xmin, xmax, xip_tup)