about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/asyncpg/pgproto/codecs/tid.pyx
blob: b39bddc432ad3728bdb933b466c18f15d70e7338 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0


cdef tid_encode(CodecContext settings, WriteBuffer buf, obj):
    cdef int overflow = 0
    cdef unsigned long block, offset

    if not (cpython.PyTuple_Check(obj) or cpython.PyList_Check(obj)):
        raise TypeError(
            'list or tuple expected (got type {})'.format(type(obj)))

    if len(obj) != 2:
        raise ValueError(
            'invalid number of elements in tid tuple, expecting 2')

    try:
        block = cpython.PyLong_AsUnsignedLong(obj[0])
    except OverflowError:
        overflow = 1

    # "long" and "long long" have the same size for x86_64, need an extra check
    if overflow or (sizeof(block) > 4 and block > UINT32_MAX):
        raise OverflowError('tuple id block value out of uint32 range')

    try:
        offset = cpython.PyLong_AsUnsignedLong(obj[1])
        overflow = 0
    except OverflowError:
        overflow = 1

    if overflow or offset > 65535:
        raise OverflowError('tuple id offset value out of uint16 range')

    buf.write_int32(6)
    buf.write_int32(<int32_t>block)
    buf.write_int16(<int16_t>offset)


cdef tid_decode(CodecContext settings, FRBuffer *buf):
    cdef:
        uint32_t block
        uint16_t offset

    block = <uint32_t>hton.unpack_int32(frb_read(buf, 4))
    offset = <uint16_t>hton.unpack_int16(frb_read(buf, 2))

    return (block, offset)