diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/numpy/core/tests/test_simd.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/numpy/core/tests/test_simd.py | 1333 |
1 files changed, 1333 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/numpy/core/tests/test_simd.py b/.venv/lib/python3.12/site-packages/numpy/core/tests/test_simd.py new file mode 100644 index 00000000..92b56744 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/numpy/core/tests/test_simd.py @@ -0,0 +1,1333 @@ +# NOTE: Please avoid the use of numpy.testing since NPYV intrinsics +# may be involved in their functionality. +import pytest, math, re +import itertools +import operator +from numpy.core._simd import targets, clear_floatstatus, get_floatstatus +from numpy.core._multiarray_umath import __cpu_baseline__ + +def check_floatstatus(divbyzero=False, overflow=False, + underflow=False, invalid=False, + all=False): + #define NPY_FPE_DIVIDEBYZERO 1 + #define NPY_FPE_OVERFLOW 2 + #define NPY_FPE_UNDERFLOW 4 + #define NPY_FPE_INVALID 8 + err = get_floatstatus() + ret = (all or divbyzero) and (err & 1) != 0 + ret |= (all or overflow) and (err & 2) != 0 + ret |= (all or underflow) and (err & 4) != 0 + ret |= (all or invalid) and (err & 8) != 0 + return ret + +class _Test_Utility: + # submodule of the desired SIMD extension, e.g. targets["AVX512F"] + npyv = None + # the current data type suffix e.g. 's8' + sfx = None + # target name can be 'baseline' or one or more of CPU features + target_name = None + + def __getattr__(self, attr): + """ + To call NPV intrinsics without the attribute 'npyv' and + auto suffixing intrinsics according to class attribute 'sfx' + """ + return getattr(self.npyv, attr + "_" + self.sfx) + + def _x2(self, intrin_name): + return getattr(self.npyv, f"{intrin_name}_{self.sfx}x2") + + def _data(self, start=None, count=None, reverse=False): + """ + Create list of consecutive numbers according to number of vector's lanes. + """ + if start is None: + start = 1 + if count is None: + count = self.nlanes + rng = range(start, start + count) + if reverse: + rng = reversed(rng) + if self._is_fp(): + return [x / 1.0 for x in rng] + return list(rng) + + def _is_unsigned(self): + return self.sfx[0] == 'u' + + def _is_signed(self): + return self.sfx[0] == 's' + + def _is_fp(self): + return self.sfx[0] == 'f' + + def _scalar_size(self): + return int(self.sfx[1:]) + + def _int_clip(self, seq): + if self._is_fp(): + return seq + max_int = self._int_max() + min_int = self._int_min() + return [min(max(v, min_int), max_int) for v in seq] + + def _int_max(self): + if self._is_fp(): + return None + max_u = self._to_unsigned(self.setall(-1))[0] + if self._is_signed(): + return max_u // 2 + return max_u + + def _int_min(self): + if self._is_fp(): + return None + if self._is_unsigned(): + return 0 + return -(self._int_max() + 1) + + def _true_mask(self): + max_unsig = getattr(self.npyv, "setall_u" + self.sfx[1:])(-1) + return max_unsig[0] + + def _to_unsigned(self, vector): + if isinstance(vector, (list, tuple)): + return getattr(self.npyv, "load_u" + self.sfx[1:])(vector) + else: + sfx = vector.__name__.replace("npyv_", "") + if sfx[0] == "b": + cvt_intrin = "cvt_u{0}_b{0}" + else: + cvt_intrin = "reinterpret_u{0}_{1}" + return getattr(self.npyv, cvt_intrin.format(sfx[1:], sfx))(vector) + + def _pinfinity(self): + return float("inf") + + def _ninfinity(self): + return -float("inf") + + def _nan(self): + return float("nan") + + def _cpu_features(self): + target = self.target_name + if target == "baseline": + target = __cpu_baseline__ + else: + target = target.split('__') # multi-target separator + return ' '.join(target) + +class _SIMD_BOOL(_Test_Utility): + """ + To test all boolean vector types at once + """ + def _nlanes(self): + return getattr(self.npyv, "nlanes_u" + self.sfx[1:]) + + def _data(self, start=None, count=None, reverse=False): + true_mask = self._true_mask() + rng = range(self._nlanes()) + if reverse: + rng = reversed(rng) + return [true_mask if x % 2 else 0 for x in rng] + + def _load_b(self, data): + len_str = self.sfx[1:] + load = getattr(self.npyv, "load_u" + len_str) + cvt = getattr(self.npyv, f"cvt_b{len_str}_u{len_str}") + return cvt(load(data)) + + def test_operators_logical(self): + """ + Logical operations for boolean types. + Test intrinsics: + npyv_xor_##SFX, npyv_and_##SFX, npyv_or_##SFX, npyv_not_##SFX, + npyv_andc_b8, npvy_orc_b8, nvpy_xnor_b8 + """ + data_a = self._data() + data_b = self._data(reverse=True) + vdata_a = self._load_b(data_a) + vdata_b = self._load_b(data_b) + + data_and = [a & b for a, b in zip(data_a, data_b)] + vand = getattr(self, "and")(vdata_a, vdata_b) + assert vand == data_and + + data_or = [a | b for a, b in zip(data_a, data_b)] + vor = getattr(self, "or")(vdata_a, vdata_b) + assert vor == data_or + + data_xor = [a ^ b for a, b in zip(data_a, data_b)] + vxor = getattr(self, "xor")(vdata_a, vdata_b) + assert vxor == data_xor + + vnot = getattr(self, "not")(vdata_a) + assert vnot == data_b + + # among the boolean types, andc, orc and xnor only support b8 + if self.sfx not in ("b8"): + return + + data_andc = [(a & ~b) & 0xFF for a, b in zip(data_a, data_b)] + vandc = getattr(self, "andc")(vdata_a, vdata_b) + assert data_andc == vandc + + data_orc = [(a | ~b) & 0xFF for a, b in zip(data_a, data_b)] + vorc = getattr(self, "orc")(vdata_a, vdata_b) + assert data_orc == vorc + + data_xnor = [~(a ^ b) & 0xFF for a, b in zip(data_a, data_b)] + vxnor = getattr(self, "xnor")(vdata_a, vdata_b) + assert data_xnor == vxnor + + def test_tobits(self): + data2bits = lambda data: sum([int(x != 0) << i for i, x in enumerate(data, 0)]) + for data in (self._data(), self._data(reverse=True)): + vdata = self._load_b(data) + data_bits = data2bits(data) + tobits = self.tobits(vdata) + bin_tobits = bin(tobits) + assert bin_tobits == bin(data_bits) + + def test_pack(self): + """ + Pack multiple vectors into one + Test intrinsics: + npyv_pack_b8_b16 + npyv_pack_b8_b32 + npyv_pack_b8_b64 + """ + if self.sfx not in ("b16", "b32", "b64"): + return + # create the vectors + data = self._data() + rdata = self._data(reverse=True) + vdata = self._load_b(data) + vrdata = self._load_b(rdata) + pack_simd = getattr(self.npyv, f"pack_b8_{self.sfx}") + # for scalar execution, concatenate the elements of the multiple lists + # into a single list (spack) and then iterate over the elements of + # the created list applying a mask to capture the first byte of them. + if self.sfx == "b16": + spack = [(i & 0xFF) for i in (list(rdata) + list(data))] + vpack = pack_simd(vrdata, vdata) + elif self.sfx == "b32": + spack = [(i & 0xFF) for i in (2*list(rdata) + 2*list(data))] + vpack = pack_simd(vrdata, vrdata, vdata, vdata) + elif self.sfx == "b64": + spack = [(i & 0xFF) for i in (4*list(rdata) + 4*list(data))] + vpack = pack_simd(vrdata, vrdata, vrdata, vrdata, + vdata, vdata, vdata, vdata) + assert vpack == spack + + @pytest.mark.parametrize("intrin", ["any", "all"]) + @pytest.mark.parametrize("data", ( + [-1, 0], + [0, -1], + [-1], + [0] + )) + def test_operators_crosstest(self, intrin, data): + """ + Test intrinsics: + npyv_any_##SFX + npyv_all_##SFX + """ + data_a = self._load_b(data * self._nlanes()) + func = eval(intrin) + intrin = getattr(self, intrin) + desired = func(data_a) + simd = intrin(data_a) + assert not not simd == desired + +class _SIMD_INT(_Test_Utility): + """ + To test all integer vector types at once + """ + def test_operators_shift(self): + if self.sfx in ("u8", "s8"): + return + + data_a = self._data(self._int_max() - self.nlanes) + data_b = self._data(self._int_min(), reverse=True) + vdata_a, vdata_b = self.load(data_a), self.load(data_b) + + for count in range(self._scalar_size()): + # load to cast + data_shl_a = self.load([a << count for a in data_a]) + # left shift + shl = self.shl(vdata_a, count) + assert shl == data_shl_a + # load to cast + data_shr_a = self.load([a >> count for a in data_a]) + # right shift + shr = self.shr(vdata_a, count) + assert shr == data_shr_a + + # shift by zero or max or out-range immediate constant is not applicable and illogical + for count in range(1, self._scalar_size()): + # load to cast + data_shl_a = self.load([a << count for a in data_a]) + # left shift by an immediate constant + shli = self.shli(vdata_a, count) + assert shli == data_shl_a + # load to cast + data_shr_a = self.load([a >> count for a in data_a]) + # right shift by an immediate constant + shri = self.shri(vdata_a, count) + assert shri == data_shr_a + + def test_arithmetic_subadd_saturated(self): + if self.sfx in ("u32", "s32", "u64", "s64"): + return + + data_a = self._data(self._int_max() - self.nlanes) + data_b = self._data(self._int_min(), reverse=True) + vdata_a, vdata_b = self.load(data_a), self.load(data_b) + + data_adds = self._int_clip([a + b for a, b in zip(data_a, data_b)]) + adds = self.adds(vdata_a, vdata_b) + assert adds == data_adds + + data_subs = self._int_clip([a - b for a, b in zip(data_a, data_b)]) + subs = self.subs(vdata_a, vdata_b) + assert subs == data_subs + + def test_math_max_min(self): + data_a = self._data() + data_b = self._data(self.nlanes) + vdata_a, vdata_b = self.load(data_a), self.load(data_b) + + data_max = [max(a, b) for a, b in zip(data_a, data_b)] + simd_max = self.max(vdata_a, vdata_b) + assert simd_max == data_max + + data_min = [min(a, b) for a, b in zip(data_a, data_b)] + simd_min = self.min(vdata_a, vdata_b) + assert simd_min == data_min + + @pytest.mark.parametrize("start", [-100, -10000, 0, 100, 10000]) + def test_reduce_max_min(self, start): + """ + Test intrinsics: + npyv_reduce_max_##sfx + npyv_reduce_min_##sfx + """ + vdata_a = self.load(self._data(start)) + assert self.reduce_max(vdata_a) == max(vdata_a) + assert self.reduce_min(vdata_a) == min(vdata_a) + + +class _SIMD_FP32(_Test_Utility): + """ + To only test single precision + """ + def test_conversions(self): + """ + Round to nearest even integer, assume CPU control register is set to rounding. + Test intrinsics: + npyv_round_s32_##SFX + """ + features = self._cpu_features() + if not self.npyv.simd_f64 and re.match(r".*(NEON|ASIMD)", features): + # very costly to emulate nearest even on Armv7 + # instead we round halves to up. e.g. 0.5 -> 1, -0.5 -> -1 + _round = lambda v: int(v + (0.5 if v >= 0 else -0.5)) + else: + _round = round + vdata_a = self.load(self._data()) + vdata_a = self.sub(vdata_a, self.setall(0.5)) + data_round = [_round(x) for x in vdata_a] + vround = self.round_s32(vdata_a) + assert vround == data_round + +class _SIMD_FP64(_Test_Utility): + """ + To only test double precision + """ + def test_conversions(self): + """ + Round to nearest even integer, assume CPU control register is set to rounding. + Test intrinsics: + npyv_round_s32_##SFX + """ + vdata_a = self.load(self._data()) + vdata_a = self.sub(vdata_a, self.setall(0.5)) + vdata_b = self.mul(vdata_a, self.setall(-1.5)) + data_round = [round(x) for x in list(vdata_a) + list(vdata_b)] + vround = self.round_s32(vdata_a, vdata_b) + assert vround == data_round + +class _SIMD_FP(_Test_Utility): + """ + To test all float vector types at once + """ + def test_arithmetic_fused(self): + vdata_a, vdata_b, vdata_c = [self.load(self._data())]*3 + vdata_cx2 = self.add(vdata_c, vdata_c) + # multiply and add, a*b + c + data_fma = self.load([a * b + c for a, b, c in zip(vdata_a, vdata_b, vdata_c)]) + fma = self.muladd(vdata_a, vdata_b, vdata_c) + assert fma == data_fma + # multiply and subtract, a*b - c + fms = self.mulsub(vdata_a, vdata_b, vdata_c) + data_fms = self.sub(data_fma, vdata_cx2) + assert fms == data_fms + # negate multiply and add, -(a*b) + c + nfma = self.nmuladd(vdata_a, vdata_b, vdata_c) + data_nfma = self.sub(vdata_cx2, data_fma) + assert nfma == data_nfma + # negate multiply and subtract, -(a*b) - c + nfms = self.nmulsub(vdata_a, vdata_b, vdata_c) + data_nfms = self.mul(data_fma, self.setall(-1)) + assert nfms == data_nfms + # multiply, add for odd elements and subtract even elements. + # (a * b) -+ c + fmas = list(self.muladdsub(vdata_a, vdata_b, vdata_c)) + assert fmas[0::2] == list(data_fms)[0::2] + assert fmas[1::2] == list(data_fma)[1::2] + + def test_abs(self): + pinf, ninf, nan = self._pinfinity(), self._ninfinity(), self._nan() + data = self._data() + vdata = self.load(self._data()) + + abs_cases = ((-0, 0), (ninf, pinf), (pinf, pinf), (nan, nan)) + for case, desired in abs_cases: + data_abs = [desired]*self.nlanes + vabs = self.abs(self.setall(case)) + assert vabs == pytest.approx(data_abs, nan_ok=True) + + vabs = self.abs(self.mul(vdata, self.setall(-1))) + assert vabs == data + + def test_sqrt(self): + pinf, ninf, nan = self._pinfinity(), self._ninfinity(), self._nan() + data = self._data() + vdata = self.load(self._data()) + + sqrt_cases = ((-0.0, -0.0), (0.0, 0.0), (-1.0, nan), (ninf, nan), (pinf, pinf)) + for case, desired in sqrt_cases: + data_sqrt = [desired]*self.nlanes + sqrt = self.sqrt(self.setall(case)) + assert sqrt == pytest.approx(data_sqrt, nan_ok=True) + + data_sqrt = self.load([math.sqrt(x) for x in data]) # load to truncate precision + sqrt = self.sqrt(vdata) + assert sqrt == data_sqrt + + def test_square(self): + pinf, ninf, nan = self._pinfinity(), self._ninfinity(), self._nan() + data = self._data() + vdata = self.load(self._data()) + # square + square_cases = ((nan, nan), (pinf, pinf), (ninf, pinf)) + for case, desired in square_cases: + data_square = [desired]*self.nlanes + square = self.square(self.setall(case)) + assert square == pytest.approx(data_square, nan_ok=True) + + data_square = [x*x for x in data] + square = self.square(vdata) + assert square == data_square + + @pytest.mark.parametrize("intrin, func", [("ceil", math.ceil), + ("trunc", math.trunc), ("floor", math.floor), ("rint", round)]) + def test_rounding(self, intrin, func): + """ + Test intrinsics: + npyv_rint_##SFX + npyv_ceil_##SFX + npyv_trunc_##SFX + npyv_floor##SFX + """ + intrin_name = intrin + intrin = getattr(self, intrin) + pinf, ninf, nan = self._pinfinity(), self._ninfinity(), self._nan() + # special cases + round_cases = ((nan, nan), (pinf, pinf), (ninf, ninf)) + for case, desired in round_cases: + data_round = [desired]*self.nlanes + _round = intrin(self.setall(case)) + assert _round == pytest.approx(data_round, nan_ok=True) + + for x in range(0, 2**20, 256**2): + for w in (-1.05, -1.10, -1.15, 1.05, 1.10, 1.15): + data = self.load([(x+a)*w for a in range(self.nlanes)]) + data_round = [func(x) for x in data] + _round = intrin(data) + assert _round == data_round + + # test large numbers + for i in ( + 1.1529215045988576e+18, 4.6116860183954304e+18, + 5.902958103546122e+20, 2.3611832414184488e+21 + ): + x = self.setall(i) + y = intrin(x) + data_round = [func(n) for n in x] + assert y == data_round + + # signed zero + if intrin_name == "floor": + data_szero = (-0.0,) + else: + data_szero = (-0.0, -0.25, -0.30, -0.45, -0.5) + + for w in data_szero: + _round = self._to_unsigned(intrin(self.setall(w))) + data_round = self._to_unsigned(self.setall(-0.0)) + assert _round == data_round + + @pytest.mark.parametrize("intrin", [ + "max", "maxp", "maxn", "min", "minp", "minn" + ]) + def test_max_min(self, intrin): + """ + Test intrinsics: + npyv_max_##sfx + npyv_maxp_##sfx + npyv_maxn_##sfx + npyv_min_##sfx + npyv_minp_##sfx + npyv_minn_##sfx + npyv_reduce_max_##sfx + npyv_reduce_maxp_##sfx + npyv_reduce_maxn_##sfx + npyv_reduce_min_##sfx + npyv_reduce_minp_##sfx + npyv_reduce_minn_##sfx + """ + pinf, ninf, nan = self._pinfinity(), self._ninfinity(), self._nan() + chk_nan = {"xp": 1, "np": 1, "nn": 2, "xn": 2}.get(intrin[-2:], 0) + func = eval(intrin[:3]) + reduce_intrin = getattr(self, "reduce_" + intrin) + intrin = getattr(self, intrin) + hf_nlanes = self.nlanes//2 + + cases = ( + ([0.0, -0.0], [-0.0, 0.0]), + ([10, -10], [10, -10]), + ([pinf, 10], [10, ninf]), + ([10, pinf], [ninf, 10]), + ([10, -10], [10, -10]), + ([-10, 10], [-10, 10]) + ) + for op1, op2 in cases: + vdata_a = self.load(op1*hf_nlanes) + vdata_b = self.load(op2*hf_nlanes) + data = func(vdata_a, vdata_b) + simd = intrin(vdata_a, vdata_b) + assert simd == data + data = func(vdata_a) + simd = reduce_intrin(vdata_a) + assert simd == data + + if not chk_nan: + return + if chk_nan == 1: + test_nan = lambda a, b: ( + b if math.isnan(a) else a if math.isnan(b) else b + ) + else: + test_nan = lambda a, b: ( + nan if math.isnan(a) or math.isnan(b) else b + ) + cases = ( + (nan, 10), + (10, nan), + (nan, pinf), + (pinf, nan), + (nan, nan) + ) + for op1, op2 in cases: + vdata_ab = self.load([op1, op2]*hf_nlanes) + data = test_nan(op1, op2) + simd = reduce_intrin(vdata_ab) + assert simd == pytest.approx(data, nan_ok=True) + vdata_a = self.setall(op1) + vdata_b = self.setall(op2) + data = [data] * self.nlanes + simd = intrin(vdata_a, vdata_b) + assert simd == pytest.approx(data, nan_ok=True) + + def test_reciprocal(self): + pinf, ninf, nan = self._pinfinity(), self._ninfinity(), self._nan() + data = self._data() + vdata = self.load(self._data()) + + recip_cases = ((nan, nan), (pinf, 0.0), (ninf, -0.0), (0.0, pinf), (-0.0, ninf)) + for case, desired in recip_cases: + data_recip = [desired]*self.nlanes + recip = self.recip(self.setall(case)) + assert recip == pytest.approx(data_recip, nan_ok=True) + + data_recip = self.load([1/x for x in data]) # load to truncate precision + recip = self.recip(vdata) + assert recip == data_recip + + def test_special_cases(self): + """ + Compare Not NaN. Test intrinsics: + npyv_notnan_##SFX + """ + nnan = self.notnan(self.setall(self._nan())) + assert nnan == [0]*self.nlanes + + @pytest.mark.parametrize("intrin_name", [ + "rint", "trunc", "ceil", "floor" + ]) + def test_unary_invalid_fpexception(self, intrin_name): + intrin = getattr(self, intrin_name) + for d in [float("nan"), float("inf"), -float("inf")]: + v = self.setall(d) + clear_floatstatus() + intrin(v) + assert check_floatstatus(invalid=True) == False + + @pytest.mark.parametrize('py_comp,np_comp', [ + (operator.lt, "cmplt"), + (operator.le, "cmple"), + (operator.gt, "cmpgt"), + (operator.ge, "cmpge"), + (operator.eq, "cmpeq"), + (operator.ne, "cmpneq") + ]) + def test_comparison_with_nan(self, py_comp, np_comp): + pinf, ninf, nan = self._pinfinity(), self._ninfinity(), self._nan() + mask_true = self._true_mask() + + def to_bool(vector): + return [lane == mask_true for lane in vector] + + intrin = getattr(self, np_comp) + cmp_cases = ((0, nan), (nan, 0), (nan, nan), (pinf, nan), + (ninf, nan), (-0.0, +0.0)) + for case_operand1, case_operand2 in cmp_cases: + data_a = [case_operand1]*self.nlanes + data_b = [case_operand2]*self.nlanes + vdata_a = self.setall(case_operand1) + vdata_b = self.setall(case_operand2) + vcmp = to_bool(intrin(vdata_a, vdata_b)) + data_cmp = [py_comp(a, b) for a, b in zip(data_a, data_b)] + assert vcmp == data_cmp + + @pytest.mark.parametrize("intrin", ["any", "all"]) + @pytest.mark.parametrize("data", ( + [float("nan"), 0], + [0, float("nan")], + [float("nan"), 1], + [1, float("nan")], + [float("nan"), float("nan")], + [0.0, -0.0], + [-0.0, 0.0], + [1.0, -0.0] + )) + def test_operators_crosstest(self, intrin, data): + """ + Test intrinsics: + npyv_any_##SFX + npyv_all_##SFX + """ + data_a = self.load(data * self.nlanes) + func = eval(intrin) + intrin = getattr(self, intrin) + desired = func(data_a) + simd = intrin(data_a) + assert not not simd == desired + +class _SIMD_ALL(_Test_Utility): + """ + To test all vector types at once + """ + def test_memory_load(self): + data = self._data() + # unaligned load + load_data = self.load(data) + assert load_data == data + # aligned load + loada_data = self.loada(data) + assert loada_data == data + # stream load + loads_data = self.loads(data) + assert loads_data == data + # load lower part + loadl = self.loadl(data) + loadl_half = list(loadl)[:self.nlanes//2] + data_half = data[:self.nlanes//2] + assert loadl_half == data_half + assert loadl != data # detect overflow + + def test_memory_store(self): + data = self._data() + vdata = self.load(data) + # unaligned store + store = [0] * self.nlanes + self.store(store, vdata) + assert store == data + # aligned store + store_a = [0] * self.nlanes + self.storea(store_a, vdata) + assert store_a == data + # stream store + store_s = [0] * self.nlanes + self.stores(store_s, vdata) + assert store_s == data + # store lower part + store_l = [0] * self.nlanes + self.storel(store_l, vdata) + assert store_l[:self.nlanes//2] == data[:self.nlanes//2] + assert store_l != vdata # detect overflow + # store higher part + store_h = [0] * self.nlanes + self.storeh(store_h, vdata) + assert store_h[:self.nlanes//2] == data[self.nlanes//2:] + assert store_h != vdata # detect overflow + + @pytest.mark.parametrize("intrin, elsizes, scale, fill", [ + ("self.load_tillz, self.load_till", (32, 64), 1, [0xffff]), + ("self.load2_tillz, self.load2_till", (32, 64), 2, [0xffff, 0x7fff]), + ]) + def test_memory_partial_load(self, intrin, elsizes, scale, fill): + if self._scalar_size() not in elsizes: + return + npyv_load_tillz, npyv_load_till = eval(intrin) + data = self._data() + lanes = list(range(1, self.nlanes + 1)) + lanes += [self.nlanes**2, self.nlanes**4] # test out of range + for n in lanes: + load_till = npyv_load_till(data, n, *fill) + load_tillz = npyv_load_tillz(data, n) + n *= scale + data_till = data[:n] + fill * ((self.nlanes-n) // scale) + assert load_till == data_till + data_tillz = data[:n] + [0] * (self.nlanes-n) + assert load_tillz == data_tillz + + @pytest.mark.parametrize("intrin, elsizes, scale", [ + ("self.store_till", (32, 64), 1), + ("self.store2_till", (32, 64), 2), + ]) + def test_memory_partial_store(self, intrin, elsizes, scale): + if self._scalar_size() not in elsizes: + return + npyv_store_till = eval(intrin) + data = self._data() + data_rev = self._data(reverse=True) + vdata = self.load(data) + lanes = list(range(1, self.nlanes + 1)) + lanes += [self.nlanes**2, self.nlanes**4] + for n in lanes: + data_till = data_rev.copy() + data_till[:n*scale] = data[:n*scale] + store_till = self._data(reverse=True) + npyv_store_till(store_till, n, vdata) + assert store_till == data_till + + @pytest.mark.parametrize("intrin, elsizes, scale", [ + ("self.loadn", (32, 64), 1), + ("self.loadn2", (32, 64), 2), + ]) + def test_memory_noncont_load(self, intrin, elsizes, scale): + if self._scalar_size() not in elsizes: + return + npyv_loadn = eval(intrin) + for stride in range(-64, 64): + if stride < 0: + data = self._data(stride, -stride*self.nlanes) + data_stride = list(itertools.chain( + *zip(*[data[-i::stride] for i in range(scale, 0, -1)]) + )) + elif stride == 0: + data = self._data() + data_stride = data[0:scale] * (self.nlanes//scale) + else: + data = self._data(count=stride*self.nlanes) + data_stride = list(itertools.chain( + *zip(*[data[i::stride] for i in range(scale)])) + ) + data_stride = self.load(data_stride) # cast unsigned + loadn = npyv_loadn(data, stride) + assert loadn == data_stride + + @pytest.mark.parametrize("intrin, elsizes, scale, fill", [ + ("self.loadn_tillz, self.loadn_till", (32, 64), 1, [0xffff]), + ("self.loadn2_tillz, self.loadn2_till", (32, 64), 2, [0xffff, 0x7fff]), + ]) + def test_memory_noncont_partial_load(self, intrin, elsizes, scale, fill): + if self._scalar_size() not in elsizes: + return + npyv_loadn_tillz, npyv_loadn_till = eval(intrin) + lanes = list(range(1, self.nlanes + 1)) + lanes += [self.nlanes**2, self.nlanes**4] + for stride in range(-64, 64): + if stride < 0: + data = self._data(stride, -stride*self.nlanes) + data_stride = list(itertools.chain( + *zip(*[data[-i::stride] for i in range(scale, 0, -1)]) + )) + elif stride == 0: + data = self._data() + data_stride = data[0:scale] * (self.nlanes//scale) + else: + data = self._data(count=stride*self.nlanes) + data_stride = list(itertools.chain( + *zip(*[data[i::stride] for i in range(scale)]) + )) + data_stride = list(self.load(data_stride)) # cast unsigned + for n in lanes: + nscale = n * scale + llanes = self.nlanes - nscale + data_stride_till = ( + data_stride[:nscale] + fill * (llanes//scale) + ) + loadn_till = npyv_loadn_till(data, stride, n, *fill) + assert loadn_till == data_stride_till + data_stride_tillz = data_stride[:nscale] + [0] * llanes + loadn_tillz = npyv_loadn_tillz(data, stride, n) + assert loadn_tillz == data_stride_tillz + + @pytest.mark.parametrize("intrin, elsizes, scale", [ + ("self.storen", (32, 64), 1), + ("self.storen2", (32, 64), 2), + ]) + def test_memory_noncont_store(self, intrin, elsizes, scale): + if self._scalar_size() not in elsizes: + return + npyv_storen = eval(intrin) + data = self._data() + vdata = self.load(data) + hlanes = self.nlanes // scale + for stride in range(1, 64): + data_storen = [0xff] * stride * self.nlanes + for s in range(0, hlanes*stride, stride): + i = (s//stride)*scale + data_storen[s:s+scale] = data[i:i+scale] + storen = [0xff] * stride * self.nlanes + storen += [0x7f]*64 + npyv_storen(storen, stride, vdata) + assert storen[:-64] == data_storen + assert storen[-64:] == [0x7f]*64 # detect overflow + + for stride in range(-64, 0): + data_storen = [0xff] * -stride * self.nlanes + for s in range(0, hlanes*stride, stride): + i = (s//stride)*scale + data_storen[s-scale:s or None] = data[i:i+scale] + storen = [0x7f]*64 + storen += [0xff] * -stride * self.nlanes + npyv_storen(storen, stride, vdata) + assert storen[64:] == data_storen + assert storen[:64] == [0x7f]*64 # detect overflow + # stride 0 + data_storen = [0x7f] * self.nlanes + storen = data_storen.copy() + data_storen[0:scale] = data[-scale:] + npyv_storen(storen, 0, vdata) + assert storen == data_storen + + @pytest.mark.parametrize("intrin, elsizes, scale", [ + ("self.storen_till", (32, 64), 1), + ("self.storen2_till", (32, 64), 2), + ]) + def test_memory_noncont_partial_store(self, intrin, elsizes, scale): + if self._scalar_size() not in elsizes: + return + npyv_storen_till = eval(intrin) + data = self._data() + vdata = self.load(data) + lanes = list(range(1, self.nlanes + 1)) + lanes += [self.nlanes**2, self.nlanes**4] + hlanes = self.nlanes // scale + for stride in range(1, 64): + for n in lanes: + data_till = [0xff] * stride * self.nlanes + tdata = data[:n*scale] + [0xff] * (self.nlanes-n*scale) + for s in range(0, hlanes*stride, stride)[:n]: + i = (s//stride)*scale + data_till[s:s+scale] = tdata[i:i+scale] + storen_till = [0xff] * stride * self.nlanes + storen_till += [0x7f]*64 + npyv_storen_till(storen_till, stride, n, vdata) + assert storen_till[:-64] == data_till + assert storen_till[-64:] == [0x7f]*64 # detect overflow + + for stride in range(-64, 0): + for n in lanes: + data_till = [0xff] * -stride * self.nlanes + tdata = data[:n*scale] + [0xff] * (self.nlanes-n*scale) + for s in range(0, hlanes*stride, stride)[:n]: + i = (s//stride)*scale + data_till[s-scale:s or None] = tdata[i:i+scale] + storen_till = [0x7f]*64 + storen_till += [0xff] * -stride * self.nlanes + npyv_storen_till(storen_till, stride, n, vdata) + assert storen_till[64:] == data_till + assert storen_till[:64] == [0x7f]*64 # detect overflow + + # stride 0 + for n in lanes: + data_till = [0x7f] * self.nlanes + storen_till = data_till.copy() + data_till[0:scale] = data[:n*scale][-scale:] + npyv_storen_till(storen_till, 0, n, vdata) + assert storen_till == data_till + + @pytest.mark.parametrize("intrin, table_size, elsize", [ + ("self.lut32", 32, 32), + ("self.lut16", 16, 64) + ]) + def test_lut(self, intrin, table_size, elsize): + """ + Test lookup table intrinsics: + npyv_lut32_##sfx + npyv_lut16_##sfx + """ + if elsize != self._scalar_size(): + return + intrin = eval(intrin) + idx_itrin = getattr(self.npyv, f"setall_u{elsize}") + table = range(0, table_size) + for i in table: + broadi = self.setall(i) + idx = idx_itrin(i) + lut = intrin(table, idx) + assert lut == broadi + + def test_misc(self): + broadcast_zero = self.zero() + assert broadcast_zero == [0] * self.nlanes + for i in range(1, 10): + broadcasti = self.setall(i) + assert broadcasti == [i] * self.nlanes + + data_a, data_b = self._data(), self._data(reverse=True) + vdata_a, vdata_b = self.load(data_a), self.load(data_b) + + # py level of npyv_set_* don't support ignoring the extra specified lanes or + # fill non-specified lanes with zero. + vset = self.set(*data_a) + assert vset == data_a + # py level of npyv_setf_* don't support ignoring the extra specified lanes or + # fill non-specified lanes with the specified scalar. + vsetf = self.setf(10, *data_a) + assert vsetf == data_a + + # We're testing the sanity of _simd's type-vector, + # reinterpret* intrinsics itself are tested via compiler + # during the build of _simd module + sfxes = ["u8", "s8", "u16", "s16", "u32", "s32", "u64", "s64"] + if self.npyv.simd_f64: + sfxes.append("f64") + if self.npyv.simd_f32: + sfxes.append("f32") + for sfx in sfxes: + vec_name = getattr(self, "reinterpret_" + sfx)(vdata_a).__name__ + assert vec_name == "npyv_" + sfx + + # select & mask operations + select_a = self.select(self.cmpeq(self.zero(), self.zero()), vdata_a, vdata_b) + assert select_a == data_a + select_b = self.select(self.cmpneq(self.zero(), self.zero()), vdata_a, vdata_b) + assert select_b == data_b + + # test extract elements + assert self.extract0(vdata_b) == vdata_b[0] + + # cleanup intrinsic is only used with AVX for + # zeroing registers to avoid the AVX-SSE transition penalty, + # so nothing to test here + self.npyv.cleanup() + + def test_reorder(self): + data_a, data_b = self._data(), self._data(reverse=True) + vdata_a, vdata_b = self.load(data_a), self.load(data_b) + # lower half part + data_a_lo = data_a[:self.nlanes//2] + data_b_lo = data_b[:self.nlanes//2] + # higher half part + data_a_hi = data_a[self.nlanes//2:] + data_b_hi = data_b[self.nlanes//2:] + # combine two lower parts + combinel = self.combinel(vdata_a, vdata_b) + assert combinel == data_a_lo + data_b_lo + # combine two higher parts + combineh = self.combineh(vdata_a, vdata_b) + assert combineh == data_a_hi + data_b_hi + # combine x2 + combine = self.combine(vdata_a, vdata_b) + assert combine == (data_a_lo + data_b_lo, data_a_hi + data_b_hi) + + # zip(interleave) + data_zipl = self.load([ + v for p in zip(data_a_lo, data_b_lo) for v in p + ]) + data_ziph = self.load([ + v for p in zip(data_a_hi, data_b_hi) for v in p + ]) + vzip = self.zip(vdata_a, vdata_b) + assert vzip == (data_zipl, data_ziph) + vzip = [0]*self.nlanes*2 + self._x2("store")(vzip, (vdata_a, vdata_b)) + assert vzip == list(data_zipl) + list(data_ziph) + + # unzip(deinterleave) + unzip = self.unzip(data_zipl, data_ziph) + assert unzip == (data_a, data_b) + unzip = self._x2("load")(list(data_zipl) + list(data_ziph)) + assert unzip == (data_a, data_b) + + def test_reorder_rev64(self): + # Reverse elements of each 64-bit lane + ssize = self._scalar_size() + if ssize == 64: + return + data_rev64 = [ + y for x in range(0, self.nlanes, 64//ssize) + for y in reversed(range(x, x + 64//ssize)) + ] + rev64 = self.rev64(self.load(range(self.nlanes))) + assert rev64 == data_rev64 + + def test_reorder_permi128(self): + """ + Test permuting elements for each 128-bit lane. + npyv_permi128_##sfx + """ + ssize = self._scalar_size() + if ssize < 32: + return + data = self.load(self._data()) + permn = 128//ssize + permd = permn-1 + nlane128 = self.nlanes//permn + shfl = [0, 1] if ssize == 64 else [0, 2, 4, 6] + for i in range(permn): + indices = [(i >> shf) & permd for shf in shfl] + vperm = self.permi128(data, *indices) + data_vperm = [ + data[j + (e & -permn)] + for e, j in enumerate(indices*nlane128) + ] + assert vperm == data_vperm + + @pytest.mark.parametrize('func, intrin', [ + (operator.lt, "cmplt"), + (operator.le, "cmple"), + (operator.gt, "cmpgt"), + (operator.ge, "cmpge"), + (operator.eq, "cmpeq") + ]) + def test_operators_comparison(self, func, intrin): + if self._is_fp(): + data_a = self._data() + else: + data_a = self._data(self._int_max() - self.nlanes) + data_b = self._data(self._int_min(), reverse=True) + vdata_a, vdata_b = self.load(data_a), self.load(data_b) + intrin = getattr(self, intrin) + + mask_true = self._true_mask() + def to_bool(vector): + return [lane == mask_true for lane in vector] + + data_cmp = [func(a, b) for a, b in zip(data_a, data_b)] + cmp = to_bool(intrin(vdata_a, vdata_b)) + assert cmp == data_cmp + + def test_operators_logical(self): + if self._is_fp(): + data_a = self._data() + else: + data_a = self._data(self._int_max() - self.nlanes) + data_b = self._data(self._int_min(), reverse=True) + vdata_a, vdata_b = self.load(data_a), self.load(data_b) + + if self._is_fp(): + data_cast_a = self._to_unsigned(vdata_a) + data_cast_b = self._to_unsigned(vdata_b) + cast, cast_data = self._to_unsigned, self._to_unsigned + else: + data_cast_a, data_cast_b = data_a, data_b + cast, cast_data = lambda a: a, self.load + + data_xor = cast_data([a ^ b for a, b in zip(data_cast_a, data_cast_b)]) + vxor = cast(self.xor(vdata_a, vdata_b)) + assert vxor == data_xor + + data_or = cast_data([a | b for a, b in zip(data_cast_a, data_cast_b)]) + vor = cast(getattr(self, "or")(vdata_a, vdata_b)) + assert vor == data_or + + data_and = cast_data([a & b for a, b in zip(data_cast_a, data_cast_b)]) + vand = cast(getattr(self, "and")(vdata_a, vdata_b)) + assert vand == data_and + + data_not = cast_data([~a for a in data_cast_a]) + vnot = cast(getattr(self, "not")(vdata_a)) + assert vnot == data_not + + if self.sfx not in ("u8"): + return + data_andc = [a & ~b for a, b in zip(data_cast_a, data_cast_b)] + vandc = cast(getattr(self, "andc")(vdata_a, vdata_b)) + assert vandc == data_andc + + @pytest.mark.parametrize("intrin", ["any", "all"]) + @pytest.mark.parametrize("data", ( + [1, 2, 3, 4], + [-1, -2, -3, -4], + [0, 1, 2, 3, 4], + [0x7f, 0x7fff, 0x7fffffff, 0x7fffffffffffffff], + [0, -1, -2, -3, 4], + [0], + [1], + [-1] + )) + def test_operators_crosstest(self, intrin, data): + """ + Test intrinsics: + npyv_any_##SFX + npyv_all_##SFX + """ + data_a = self.load(data * self.nlanes) + func = eval(intrin) + intrin = getattr(self, intrin) + desired = func(data_a) + simd = intrin(data_a) + assert not not simd == desired + + def test_conversion_boolean(self): + bsfx = "b" + self.sfx[1:] + to_boolean = getattr(self.npyv, "cvt_%s_%s" % (bsfx, self.sfx)) + from_boolean = getattr(self.npyv, "cvt_%s_%s" % (self.sfx, bsfx)) + + false_vb = to_boolean(self.setall(0)) + true_vb = self.cmpeq(self.setall(0), self.setall(0)) + assert false_vb != true_vb + + false_vsfx = from_boolean(false_vb) + true_vsfx = from_boolean(true_vb) + assert false_vsfx != true_vsfx + + def test_conversion_expand(self): + """ + Test expand intrinsics: + npyv_expand_u16_u8 + npyv_expand_u32_u16 + """ + if self.sfx not in ("u8", "u16"): + return + totype = self.sfx[0]+str(int(self.sfx[1:])*2) + expand = getattr(self.npyv, f"expand_{totype}_{self.sfx}") + # close enough from the edge to detect any deviation + data = self._data(self._int_max() - self.nlanes) + vdata = self.load(data) + edata = expand(vdata) + # lower half part + data_lo = data[:self.nlanes//2] + # higher half part + data_hi = data[self.nlanes//2:] + assert edata == (data_lo, data_hi) + + def test_arithmetic_subadd(self): + if self._is_fp(): + data_a = self._data() + else: + data_a = self._data(self._int_max() - self.nlanes) + data_b = self._data(self._int_min(), reverse=True) + vdata_a, vdata_b = self.load(data_a), self.load(data_b) + + # non-saturated + data_add = self.load([a + b for a, b in zip(data_a, data_b)]) # load to cast + add = self.add(vdata_a, vdata_b) + assert add == data_add + data_sub = self.load([a - b for a, b in zip(data_a, data_b)]) + sub = self.sub(vdata_a, vdata_b) + assert sub == data_sub + + def test_arithmetic_mul(self): + if self.sfx in ("u64", "s64"): + return + + if self._is_fp(): + data_a = self._data() + else: + data_a = self._data(self._int_max() - self.nlanes) + data_b = self._data(self._int_min(), reverse=True) + vdata_a, vdata_b = self.load(data_a), self.load(data_b) + + data_mul = self.load([a * b for a, b in zip(data_a, data_b)]) + mul = self.mul(vdata_a, vdata_b) + assert mul == data_mul + + def test_arithmetic_div(self): + if not self._is_fp(): + return + + data_a, data_b = self._data(), self._data(reverse=True) + vdata_a, vdata_b = self.load(data_a), self.load(data_b) + + # load to truncate f64 to precision of f32 + data_div = self.load([a / b for a, b in zip(data_a, data_b)]) + div = self.div(vdata_a, vdata_b) + assert div == data_div + + def test_arithmetic_intdiv(self): + """ + Test integer division intrinsics: + npyv_divisor_##sfx + npyv_divc_##sfx + """ + if self._is_fp(): + return + + int_min = self._int_min() + def trunc_div(a, d): + """ + Divide towards zero works with large integers > 2^53, + and wrap around overflow similar to what C does. + """ + if d == -1 and a == int_min: + return a + sign_a, sign_d = a < 0, d < 0 + if a == 0 or sign_a == sign_d: + return a // d + return (a + sign_d - sign_a) // d + 1 + + data = [1, -int_min] # to test overflow + data += range(0, 2**8, 2**5) + data += range(0, 2**8, 2**5-1) + bsize = self._scalar_size() + if bsize > 8: + data += range(2**8, 2**16, 2**13) + data += range(2**8, 2**16, 2**13-1) + if bsize > 16: + data += range(2**16, 2**32, 2**29) + data += range(2**16, 2**32, 2**29-1) + if bsize > 32: + data += range(2**32, 2**64, 2**61) + data += range(2**32, 2**64, 2**61-1) + # negate + data += [-x for x in data] + for dividend, divisor in itertools.product(data, data): + divisor = self.setall(divisor)[0] # cast + if divisor == 0: + continue + dividend = self.load(self._data(dividend)) + data_divc = [trunc_div(a, divisor) for a in dividend] + divisor_parms = self.divisor(divisor) + divc = self.divc(dividend, divisor_parms) + assert divc == data_divc + + def test_arithmetic_reduce_sum(self): + """ + Test reduce sum intrinsics: + npyv_sum_##sfx + """ + if self.sfx not in ("u32", "u64", "f32", "f64"): + return + # reduce sum + data = self._data() + vdata = self.load(data) + + data_sum = sum(data) + vsum = self.sum(vdata) + assert vsum == data_sum + + def test_arithmetic_reduce_sumup(self): + """ + Test extend reduce sum intrinsics: + npyv_sumup_##sfx + """ + if self.sfx not in ("u8", "u16"): + return + rdata = (0, self.nlanes, self._int_min(), self._int_max()-self.nlanes) + for r in rdata: + data = self._data(r) + vdata = self.load(data) + data_sum = sum(data) + vsum = self.sumup(vdata) + assert vsum == data_sum + + def test_mask_conditional(self): + """ + Conditional addition and subtraction for all supported data types. + Test intrinsics: + npyv_ifadd_##SFX, npyv_ifsub_##SFX + """ + vdata_a = self.load(self._data()) + vdata_b = self.load(self._data(reverse=True)) + true_mask = self.cmpeq(self.zero(), self.zero()) + false_mask = self.cmpneq(self.zero(), self.zero()) + + data_sub = self.sub(vdata_b, vdata_a) + ifsub = self.ifsub(true_mask, vdata_b, vdata_a, vdata_b) + assert ifsub == data_sub + ifsub = self.ifsub(false_mask, vdata_a, vdata_b, vdata_b) + assert ifsub == vdata_b + + data_add = self.add(vdata_b, vdata_a) + ifadd = self.ifadd(true_mask, vdata_b, vdata_a, vdata_b) + assert ifadd == data_add + ifadd = self.ifadd(false_mask, vdata_a, vdata_b, vdata_b) + assert ifadd == vdata_b + + if not self._is_fp(): + return + data_div = self.div(vdata_b, vdata_a) + ifdiv = self.ifdiv(true_mask, vdata_b, vdata_a, vdata_b) + assert ifdiv == data_div + ifdivz = self.ifdivz(true_mask, vdata_b, vdata_a) + assert ifdivz == data_div + ifdiv = self.ifdiv(false_mask, vdata_a, vdata_b, vdata_b) + assert ifdiv == vdata_b + ifdivz = self.ifdivz(false_mask, vdata_a, vdata_b) + assert ifdivz == self.zero() + +bool_sfx = ("b8", "b16", "b32", "b64") +int_sfx = ("u8", "s8", "u16", "s16", "u32", "s32", "u64", "s64") +fp_sfx = ("f32", "f64") +all_sfx = int_sfx + fp_sfx +tests_registry = { + bool_sfx: _SIMD_BOOL, + int_sfx : _SIMD_INT, + fp_sfx : _SIMD_FP, + ("f32",): _SIMD_FP32, + ("f64",): _SIMD_FP64, + all_sfx : _SIMD_ALL +} +for target_name, npyv in targets.items(): + simd_width = npyv.simd if npyv else '' + pretty_name = target_name.split('__') # multi-target separator + if len(pretty_name) > 1: + # multi-target + pretty_name = f"({' '.join(pretty_name)})" + else: + pretty_name = pretty_name[0] + + skip = "" + skip_sfx = dict() + if not npyv: + skip = f"target '{pretty_name}' isn't supported by current machine" + elif not npyv.simd: + skip = f"target '{pretty_name}' isn't supported by NPYV" + else: + if not npyv.simd_f32: + skip_sfx["f32"] = f"target '{pretty_name}' "\ + "doesn't support single-precision" + if not npyv.simd_f64: + skip_sfx["f64"] = f"target '{pretty_name}' doesn't"\ + "support double-precision" + + for sfxes, cls in tests_registry.items(): + for sfx in sfxes: + skip_m = skip_sfx.get(sfx, skip) + inhr = (cls,) + attr = dict(npyv=targets[target_name], sfx=sfx, target_name=target_name) + tcls = type(f"Test{cls.__name__}_{simd_width}_{target_name}_{sfx}", inhr, attr) + if skip_m: + pytest.mark.skip(reason=skip_m)(tcls) + globals()[tcls.__name__] = tcls |