about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/psutil/tests/test_misc.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/psutil/tests/test_misc.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/psutil/tests/test_misc.py')
-rw-r--r--.venv/lib/python3.12/site-packages/psutil/tests/test_misc.py1058
1 files changed, 1058 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/psutil/tests/test_misc.py b/.venv/lib/python3.12/site-packages/psutil/tests/test_misc.py
new file mode 100644
index 00000000..cf98f8b4
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/psutil/tests/test_misc.py
@@ -0,0 +1,1058 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Miscellaneous tests."""
+
+import ast
+import collections
+import errno
+import json
+import os
+import pickle
+import socket
+import stat
+import sys
+
+import psutil
+import psutil.tests
+from psutil import POSIX
+from psutil import WINDOWS
+from psutil._common import bcat
+from psutil._common import cat
+from psutil._common import debug
+from psutil._common import isfile_strict
+from psutil._common import memoize
+from psutil._common import memoize_when_activated
+from psutil._common import parse_environ_block
+from psutil._common import supports_ipv6
+from psutil._common import wrap_numbers
+from psutil._compat import PY3
+from psutil._compat import FileNotFoundError
+from psutil._compat import redirect_stderr
+from psutil.tests import CI_TESTING
+from psutil.tests import HAS_BATTERY
+from psutil.tests import HAS_MEMORY_MAPS
+from psutil.tests import HAS_NET_IO_COUNTERS
+from psutil.tests import HAS_SENSORS_BATTERY
+from psutil.tests import HAS_SENSORS_FANS
+from psutil.tests import HAS_SENSORS_TEMPERATURES
+from psutil.tests import PYTHON_EXE
+from psutil.tests import PYTHON_EXE_ENV
+from psutil.tests import QEMU_USER
+from psutil.tests import SCRIPTS_DIR
+from psutil.tests import PsutilTestCase
+from psutil.tests import mock
+from psutil.tests import process_namespace
+from psutil.tests import pytest
+from psutil.tests import reload_module
+from psutil.tests import sh
+from psutil.tests import system_namespace
+
+
+# ===================================================================
+# --- Test classes' repr(), str(), ...
+# ===================================================================
+
+
+class TestSpecialMethods(PsutilTestCase):
+    def test_check_pid_range(self):
+        with pytest.raises(OverflowError):
+            psutil._psplatform.cext.check_pid_range(2**128)
+        with pytest.raises(psutil.NoSuchProcess):
+            psutil.Process(2**128)
+
+    def test_process__repr__(self, func=repr):
+        p = psutil.Process(self.spawn_testproc().pid)
+        r = func(p)
+        assert "psutil.Process" in r
+        assert "pid=%s" % p.pid in r
+        assert "name='%s'" % str(p.name()) in r.replace("name=u'", "name='")
+        assert "status=" in r
+        assert "exitcode=" not in r
+        p.terminate()
+        p.wait()
+        r = func(p)
+        assert "status='terminated'" in r
+        assert "exitcode=" in r
+
+        with mock.patch.object(
+            psutil.Process,
+            "name",
+            side_effect=psutil.ZombieProcess(os.getpid()),
+        ):
+            p = psutil.Process()
+            r = func(p)
+            assert "pid=%s" % p.pid in r
+            assert "status='zombie'" in r
+            assert "name=" not in r
+        with mock.patch.object(
+            psutil.Process,
+            "name",
+            side_effect=psutil.NoSuchProcess(os.getpid()),
+        ):
+            p = psutil.Process()
+            r = func(p)
+            assert "pid=%s" % p.pid in r
+            assert "terminated" in r
+            assert "name=" not in r
+        with mock.patch.object(
+            psutil.Process,
+            "name",
+            side_effect=psutil.AccessDenied(os.getpid()),
+        ):
+            p = psutil.Process()
+            r = func(p)
+            assert "pid=%s" % p.pid in r
+            assert "name=" not in r
+
+    def test_process__str__(self):
+        self.test_process__repr__(func=str)
+
+    def test_error__repr__(self):
+        assert repr(psutil.Error()) == "psutil.Error()"
+
+    def test_error__str__(self):
+        assert str(psutil.Error()) == ""  # noqa
+
+    def test_no_such_process__repr__(self):
+        assert (
+            repr(psutil.NoSuchProcess(321))
+            == "psutil.NoSuchProcess(pid=321, msg='process no longer exists')"
+        )
+        assert (
+            repr(psutil.NoSuchProcess(321, name="name", msg="msg"))
+            == "psutil.NoSuchProcess(pid=321, name='name', msg='msg')"
+        )
+
+    def test_no_such_process__str__(self):
+        assert (
+            str(psutil.NoSuchProcess(321))
+            == "process no longer exists (pid=321)"
+        )
+        assert (
+            str(psutil.NoSuchProcess(321, name="name", msg="msg"))
+            == "msg (pid=321, name='name')"
+        )
+
+    def test_zombie_process__repr__(self):
+        assert (
+            repr(psutil.ZombieProcess(321))
+            == 'psutil.ZombieProcess(pid=321, msg="PID still '
+            'exists but it\'s a zombie")'
+        )
+        assert (
+            repr(psutil.ZombieProcess(321, name="name", ppid=320, msg="foo"))
+            == "psutil.ZombieProcess(pid=321, ppid=320, name='name',"
+            " msg='foo')"
+        )
+
+    def test_zombie_process__str__(self):
+        assert (
+            str(psutil.ZombieProcess(321))
+            == "PID still exists but it's a zombie (pid=321)"
+        )
+        assert (
+            str(psutil.ZombieProcess(321, name="name", ppid=320, msg="foo"))
+            == "foo (pid=321, ppid=320, name='name')"
+        )
+
+    def test_access_denied__repr__(self):
+        assert repr(psutil.AccessDenied(321)) == "psutil.AccessDenied(pid=321)"
+        assert (
+            repr(psutil.AccessDenied(321, name="name", msg="msg"))
+            == "psutil.AccessDenied(pid=321, name='name', msg='msg')"
+        )
+
+    def test_access_denied__str__(self):
+        assert str(psutil.AccessDenied(321)) == "(pid=321)"
+        assert (
+            str(psutil.AccessDenied(321, name="name", msg="msg"))
+            == "msg (pid=321, name='name')"
+        )
+
+    def test_timeout_expired__repr__(self):
+        assert (
+            repr(psutil.TimeoutExpired(5))
+            == "psutil.TimeoutExpired(seconds=5, msg='timeout after 5"
+            " seconds')"
+        )
+        assert (
+            repr(psutil.TimeoutExpired(5, pid=321, name="name"))
+            == "psutil.TimeoutExpired(pid=321, name='name', seconds=5, "
+            "msg='timeout after 5 seconds')"
+        )
+
+    def test_timeout_expired__str__(self):
+        assert str(psutil.TimeoutExpired(5)) == "timeout after 5 seconds"
+        assert (
+            str(psutil.TimeoutExpired(5, pid=321, name="name"))
+            == "timeout after 5 seconds (pid=321, name='name')"
+        )
+
+    def test_process__eq__(self):
+        p1 = psutil.Process()
+        p2 = psutil.Process()
+        assert p1 == p2
+        p2._ident = (0, 0)
+        assert p1 != p2
+        assert p1 != 'foo'
+
+    def test_process__hash__(self):
+        s = set([psutil.Process(), psutil.Process()])
+        assert len(s) == 1
+
+
+# ===================================================================
+# --- Misc, generic, corner cases
+# ===================================================================
+
+
+class TestMisc(PsutilTestCase):
+    def test__all__(self):
+        dir_psutil = dir(psutil)
+        for name in dir_psutil:
+            if name in {
+                'debug',
+                'long',
+                'tests',
+                'test',
+                'PermissionError',
+                'ProcessLookupError',
+            }:
+                continue
+            if not name.startswith('_'):
+                try:
+                    __import__(name)
+                except ImportError:
+                    if name not in psutil.__all__:
+                        fun = getattr(psutil, name)
+                        if fun is None:
+                            continue
+                        if (
+                            fun.__doc__ is not None
+                            and 'deprecated' not in fun.__doc__.lower()
+                        ):
+                            raise self.fail('%r not in psutil.__all__' % name)
+
+        # Import 'star' will break if __all__ is inconsistent, see:
+        # https://github.com/giampaolo/psutil/issues/656
+        # Can't do `from psutil import *` as it won't work on python 3
+        # so we simply iterate over __all__.
+        for name in psutil.__all__:
+            assert name in dir_psutil
+
+    def test_version(self):
+        assert (
+            '.'.join([str(x) for x in psutil.version_info])
+            == psutil.__version__
+        )
+
+    def test_process_as_dict_no_new_names(self):
+        # See https://github.com/giampaolo/psutil/issues/813
+        p = psutil.Process()
+        p.foo = '1'
+        assert 'foo' not in p.as_dict()
+
+    def test_serialization(self):
+        def check(ret):
+            json.loads(json.dumps(ret))
+
+            a = pickle.dumps(ret)
+            b = pickle.loads(a)
+            assert ret == b
+
+        # --- process APIs
+
+        proc = psutil.Process()
+        check(psutil.Process().as_dict())
+
+        ns = process_namespace(proc)
+        for fun, name in ns.iter(ns.getters, clear_cache=True):
+            with self.subTest(proc=proc, name=name):
+                try:
+                    ret = fun()
+                except psutil.Error:
+                    pass
+                else:
+                    check(ret)
+
+        # --- system APIs
+
+        ns = system_namespace()
+        for fun, name in ns.iter(ns.getters):
+            if name in {"win_service_iter", "win_service_get"}:
+                continue
+            if QEMU_USER and name == "net_if_stats":
+                # OSError: [Errno 38] ioctl(SIOCETHTOOL) not implemented
+                continue
+            with self.subTest(name=name):
+                try:
+                    ret = fun()
+                except psutil.AccessDenied:
+                    pass
+                else:
+                    check(ret)
+
+        # --- exception classes
+
+        b = pickle.loads(
+            pickle.dumps(
+                psutil.NoSuchProcess(pid=4567, name='name', msg='msg')
+            )
+        )
+        assert isinstance(b, psutil.NoSuchProcess)
+        assert b.pid == 4567
+        assert b.name == 'name'
+        assert b.msg == 'msg'
+
+        b = pickle.loads(
+            pickle.dumps(
+                psutil.ZombieProcess(pid=4567, name='name', ppid=42, msg='msg')
+            )
+        )
+        assert isinstance(b, psutil.ZombieProcess)
+        assert b.pid == 4567
+        assert b.ppid == 42
+        assert b.name == 'name'
+        assert b.msg == 'msg'
+
+        b = pickle.loads(
+            pickle.dumps(psutil.AccessDenied(pid=123, name='name', msg='msg'))
+        )
+        assert isinstance(b, psutil.AccessDenied)
+        assert b.pid == 123
+        assert b.name == 'name'
+        assert b.msg == 'msg'
+
+        b = pickle.loads(
+            pickle.dumps(
+                psutil.TimeoutExpired(seconds=33, pid=4567, name='name')
+            )
+        )
+        assert isinstance(b, psutil.TimeoutExpired)
+        assert b.seconds == 33
+        assert b.pid == 4567
+        assert b.name == 'name'
+
+    # # XXX: https://github.com/pypa/setuptools/pull/2896
+    # @pytest.mark.skipif(APPVEYOR,
+    #     reason="temporarily disabled due to setuptools bug"
+    # )
+    # def test_setup_script(self):
+    #     setup_py = os.path.join(ROOT_DIR, 'setup.py')
+    #     if CI_TESTING and not os.path.exists(setup_py):
+    #         raise pytest.skip("can't find setup.py")
+    #     module = import_module_by_path(setup_py)
+    #     self.assertRaises(SystemExit, module.setup)
+    #     self.assertEqual(module.get_version(), psutil.__version__)
+
+    def test_ad_on_process_creation(self):
+        # We are supposed to be able to instantiate Process also in case
+        # of zombie processes or access denied.
+        with mock.patch.object(
+            psutil.Process, '_get_ident', side_effect=psutil.AccessDenied
+        ) as meth:
+            psutil.Process()
+            assert meth.called
+
+        with mock.patch.object(
+            psutil.Process, '_get_ident', side_effect=psutil.ZombieProcess(1)
+        ) as meth:
+            psutil.Process()
+            assert meth.called
+
+        with mock.patch.object(
+            psutil.Process, '_get_ident', side_effect=ValueError
+        ) as meth:
+            with pytest.raises(ValueError):
+                psutil.Process()
+            assert meth.called
+
+        with mock.patch.object(
+            psutil.Process, '_get_ident', side_effect=psutil.NoSuchProcess(1)
+        ) as meth:
+            with self.assertRaises(psutil.NoSuchProcess):
+                psutil.Process()
+            assert meth.called
+
+    def test_sanity_version_check(self):
+        # see: https://github.com/giampaolo/psutil/issues/564
+        with mock.patch(
+            "psutil._psplatform.cext.version", return_value="0.0.0"
+        ):
+            with pytest.raises(ImportError) as cm:
+                reload_module(psutil)
+            assert "version conflict" in str(cm.value).lower()
+
+
+# ===================================================================
+# --- psutil/_common.py utils
+# ===================================================================
+
+
+class TestMemoizeDecorator(PsutilTestCase):
+    def setUp(self):
+        self.calls = []
+
+    tearDown = setUp
+
+    def run_against(self, obj, expected_retval=None):
+        # no args
+        for _ in range(2):
+            ret = obj()
+            assert self.calls == [((), {})]
+            if expected_retval is not None:
+                assert ret == expected_retval
+        # with args
+        for _ in range(2):
+            ret = obj(1)
+            assert self.calls == [((), {}), ((1,), {})]
+            if expected_retval is not None:
+                assert ret == expected_retval
+        # with args + kwargs
+        for _ in range(2):
+            ret = obj(1, bar=2)
+            assert self.calls == [((), {}), ((1,), {}), ((1,), {'bar': 2})]
+            if expected_retval is not None:
+                assert ret == expected_retval
+        # clear cache
+        assert len(self.calls) == 3
+        obj.cache_clear()
+        ret = obj()
+        if expected_retval is not None:
+            assert ret == expected_retval
+        assert len(self.calls) == 4
+        # docstring
+        assert obj.__doc__ == "My docstring."
+
+    def test_function(self):
+        @memoize
+        def foo(*args, **kwargs):
+            """My docstring."""
+            baseclass.calls.append((args, kwargs))
+            return 22
+
+        baseclass = self
+        self.run_against(foo, expected_retval=22)
+
+    def test_class(self):
+        @memoize
+        class Foo:
+            """My docstring."""
+
+            def __init__(self, *args, **kwargs):
+                baseclass.calls.append((args, kwargs))
+
+            def bar(self):
+                return 22
+
+        baseclass = self
+        self.run_against(Foo, expected_retval=None)
+        assert Foo().bar() == 22
+
+    def test_class_singleton(self):
+        # @memoize can be used against classes to create singletons
+        @memoize
+        class Bar:
+            def __init__(self, *args, **kwargs):
+                pass
+
+        assert Bar() is Bar()
+        assert id(Bar()) == id(Bar())
+        assert id(Bar(1)) == id(Bar(1))
+        assert id(Bar(1, foo=3)) == id(Bar(1, foo=3))
+        assert id(Bar(1)) != id(Bar(2))
+
+    def test_staticmethod(self):
+        class Foo:
+            @staticmethod
+            @memoize
+            def bar(*args, **kwargs):
+                """My docstring."""
+                baseclass.calls.append((args, kwargs))
+                return 22
+
+        baseclass = self
+        self.run_against(Foo().bar, expected_retval=22)
+
+    def test_classmethod(self):
+        class Foo:
+            @classmethod
+            @memoize
+            def bar(cls, *args, **kwargs):
+                """My docstring."""
+                baseclass.calls.append((args, kwargs))
+                return 22
+
+        baseclass = self
+        self.run_against(Foo().bar, expected_retval=22)
+
+    def test_original(self):
+        # This was the original test before I made it dynamic to test it
+        # against different types. Keeping it anyway.
+        @memoize
+        def foo(*args, **kwargs):
+            """Foo docstring."""
+            calls.append(None)
+            return (args, kwargs)
+
+        calls = []
+        # no args
+        for _ in range(2):
+            ret = foo()
+            expected = ((), {})
+            assert ret == expected
+            assert len(calls) == 1
+        # with args
+        for _ in range(2):
+            ret = foo(1)
+            expected = ((1,), {})
+            assert ret == expected
+            assert len(calls) == 2
+        # with args + kwargs
+        for _ in range(2):
+            ret = foo(1, bar=2)
+            expected = ((1,), {'bar': 2})
+            assert ret == expected
+            assert len(calls) == 3
+        # clear cache
+        foo.cache_clear()
+        ret = foo()
+        expected = ((), {})
+        assert ret == expected
+        assert len(calls) == 4
+        # docstring
+        assert foo.__doc__ == "Foo docstring."
+
+
+class TestCommonModule(PsutilTestCase):
+    def test_memoize_when_activated(self):
+        class Foo:
+            @memoize_when_activated
+            def foo(self):
+                calls.append(None)
+
+        f = Foo()
+        calls = []
+        f.foo()
+        f.foo()
+        assert len(calls) == 2
+
+        # activate
+        calls = []
+        f.foo.cache_activate(f)
+        f.foo()
+        f.foo()
+        assert len(calls) == 1
+
+        # deactivate
+        calls = []
+        f.foo.cache_deactivate(f)
+        f.foo()
+        f.foo()
+        assert len(calls) == 2
+
+    def test_parse_environ_block(self):
+        def k(s):
+            return s.upper() if WINDOWS else s
+
+        assert parse_environ_block("a=1\0") == {k("a"): "1"}
+        assert parse_environ_block("a=1\0b=2\0\0") == {
+            k("a"): "1",
+            k("b"): "2",
+        }
+        assert parse_environ_block("a=1\0b=\0\0") == {k("a"): "1", k("b"): ""}
+        # ignore everything after \0\0
+        assert parse_environ_block("a=1\0b=2\0\0c=3\0") == {
+            k("a"): "1",
+            k("b"): "2",
+        }
+        # ignore everything that is not an assignment
+        assert parse_environ_block("xxx\0a=1\0") == {k("a"): "1"}
+        assert parse_environ_block("a=1\0=b=2\0") == {k("a"): "1"}
+        # do not fail if the block is incomplete
+        assert parse_environ_block("a=1\0b=2") == {k("a"): "1"}
+
+    def test_supports_ipv6(self):
+        self.addCleanup(supports_ipv6.cache_clear)
+        if supports_ipv6():
+            with mock.patch('psutil._common.socket') as s:
+                s.has_ipv6 = False
+                supports_ipv6.cache_clear()
+                assert not supports_ipv6()
+
+            supports_ipv6.cache_clear()
+            with mock.patch(
+                'psutil._common.socket.socket', side_effect=socket.error
+            ) as s:
+                assert not supports_ipv6()
+                assert s.called
+
+            supports_ipv6.cache_clear()
+            with mock.patch(
+                'psutil._common.socket.socket', side_effect=socket.gaierror
+            ) as s:
+                assert not supports_ipv6()
+                supports_ipv6.cache_clear()
+                assert s.called
+
+            supports_ipv6.cache_clear()
+            with mock.patch(
+                'psutil._common.socket.socket.bind',
+                side_effect=socket.gaierror,
+            ) as s:
+                assert not supports_ipv6()
+                supports_ipv6.cache_clear()
+                assert s.called
+        else:
+            with pytest.raises(socket.error):
+                sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+                try:
+                    sock.bind(("::1", 0))
+                finally:
+                    sock.close()
+
+    def test_isfile_strict(self):
+        this_file = os.path.abspath(__file__)
+        assert isfile_strict(this_file)
+        assert not isfile_strict(os.path.dirname(this_file))
+        with mock.patch(
+            'psutil._common.os.stat', side_effect=OSError(errno.EPERM, "foo")
+        ):
+            with pytest.raises(OSError):
+                isfile_strict(this_file)
+        with mock.patch(
+            'psutil._common.os.stat', side_effect=OSError(errno.EACCES, "foo")
+        ):
+            with pytest.raises(OSError):
+                isfile_strict(this_file)
+        with mock.patch(
+            'psutil._common.os.stat', side_effect=OSError(errno.ENOENT, "foo")
+        ):
+            assert not isfile_strict(this_file)
+        with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
+            assert not isfile_strict(this_file)
+
+    def test_debug(self):
+        if PY3:
+            from io import StringIO
+        else:
+            from StringIO import StringIO
+
+        with mock.patch.object(psutil._common, "PSUTIL_DEBUG", True):
+            with redirect_stderr(StringIO()) as f:
+                debug("hello")
+                sys.stderr.flush()
+        msg = f.getvalue()
+        assert msg.startswith("psutil-debug"), msg
+        assert "hello" in msg
+        assert __file__.replace('.pyc', '.py') in msg
+
+        # supposed to use repr(exc)
+        with mock.patch.object(psutil._common, "PSUTIL_DEBUG", True):
+            with redirect_stderr(StringIO()) as f:
+                debug(ValueError("this is an error"))
+        msg = f.getvalue()
+        assert "ignoring ValueError" in msg
+        assert "'this is an error'" in msg
+
+        # supposed to use str(exc), because of extra info about file name
+        with mock.patch.object(psutil._common, "PSUTIL_DEBUG", True):
+            with redirect_stderr(StringIO()) as f:
+                exc = OSError(2, "no such file")
+                exc.filename = "/foo"
+                debug(exc)
+        msg = f.getvalue()
+        assert "no such file" in msg
+        assert "/foo" in msg
+
+    def test_cat_bcat(self):
+        testfn = self.get_testfn()
+        with open(testfn, "w") as f:
+            f.write("foo")
+        assert cat(testfn) == "foo"
+        assert bcat(testfn) == b"foo"
+        with pytest.raises(FileNotFoundError):
+            cat(testfn + '-invalid')
+        with pytest.raises(FileNotFoundError):
+            bcat(testfn + '-invalid')
+        assert cat(testfn + '-invalid', fallback="bar") == "bar"
+        assert bcat(testfn + '-invalid', fallback="bar") == "bar"
+
+
+# ===================================================================
+# --- Tests for wrap_numbers() function.
+# ===================================================================
+
+
+nt = collections.namedtuple('foo', 'a b c')
+
+
+class TestWrapNumbers(PsutilTestCase):
+    def setUp(self):
+        wrap_numbers.cache_clear()
+
+    tearDown = setUp
+
+    def test_first_call(self):
+        input = {'disk1': nt(5, 5, 5)}
+        assert wrap_numbers(input, 'disk_io') == input
+
+    def test_input_hasnt_changed(self):
+        input = {'disk1': nt(5, 5, 5)}
+        assert wrap_numbers(input, 'disk_io') == input
+        assert wrap_numbers(input, 'disk_io') == input
+
+    def test_increase_but_no_wrap(self):
+        input = {'disk1': nt(5, 5, 5)}
+        assert wrap_numbers(input, 'disk_io') == input
+        input = {'disk1': nt(10, 15, 20)}
+        assert wrap_numbers(input, 'disk_io') == input
+        input = {'disk1': nt(20, 25, 30)}
+        assert wrap_numbers(input, 'disk_io') == input
+        input = {'disk1': nt(20, 25, 30)}
+        assert wrap_numbers(input, 'disk_io') == input
+
+    def test_wrap(self):
+        # let's say 100 is the threshold
+        input = {'disk1': nt(100, 100, 100)}
+        assert wrap_numbers(input, 'disk_io') == input
+        # first wrap restarts from 10
+        input = {'disk1': nt(100, 100, 10)}
+        assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 110)}
+        # then it remains the same
+        input = {'disk1': nt(100, 100, 10)}
+        assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 110)}
+        # then it goes up
+        input = {'disk1': nt(100, 100, 90)}
+        assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 190)}
+        # then it wraps again
+        input = {'disk1': nt(100, 100, 20)}
+        assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 210)}
+        # and remains the same
+        input = {'disk1': nt(100, 100, 20)}
+        assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 210)}
+        # now wrap another num
+        input = {'disk1': nt(50, 100, 20)}
+        assert wrap_numbers(input, 'disk_io') == {'disk1': nt(150, 100, 210)}
+        # and again
+        input = {'disk1': nt(40, 100, 20)}
+        assert wrap_numbers(input, 'disk_io') == {'disk1': nt(190, 100, 210)}
+        # keep it the same
+        input = {'disk1': nt(40, 100, 20)}
+        assert wrap_numbers(input, 'disk_io') == {'disk1': nt(190, 100, 210)}
+
+    def test_changing_keys(self):
+        # Emulate a case where the second call to disk_io()
+        # (or whatever) provides a new disk, then the new disk
+        # disappears on the third call.
+        input = {'disk1': nt(5, 5, 5)}
+        assert wrap_numbers(input, 'disk_io') == input
+        input = {'disk1': nt(5, 5, 5), 'disk2': nt(7, 7, 7)}
+        assert wrap_numbers(input, 'disk_io') == input
+        input = {'disk1': nt(8, 8, 8)}
+        assert wrap_numbers(input, 'disk_io') == input
+
+    def test_changing_keys_w_wrap(self):
+        input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 100)}
+        assert wrap_numbers(input, 'disk_io') == input
+        # disk 2 wraps
+        input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 10)}
+        assert wrap_numbers(input, 'disk_io') == {
+            'disk1': nt(50, 50, 50),
+            'disk2': nt(100, 100, 110),
+        }
+        # disk 2 disappears
+        input = {'disk1': nt(50, 50, 50)}
+        assert wrap_numbers(input, 'disk_io') == input
+
+        # then it appears again; the old wrap is supposed to be
+        # gone.
+        input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 100)}
+        assert wrap_numbers(input, 'disk_io') == input
+        # remains the same
+        input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 100)}
+        assert wrap_numbers(input, 'disk_io') == input
+        # and then wraps again
+        input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 10)}
+        assert wrap_numbers(input, 'disk_io') == {
+            'disk1': nt(50, 50, 50),
+            'disk2': nt(100, 100, 110),
+        }
+
+    def test_real_data(self):
+        d = {
+            'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
+            'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
+            'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
+            'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348),
+        }
+        assert wrap_numbers(d, 'disk_io') == d
+        assert wrap_numbers(d, 'disk_io') == d
+        # decrease this   ↓
+        d = {
+            'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048),
+            'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8),
+            'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28),
+            'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348),
+        }
+        out = wrap_numbers(d, 'disk_io')
+        assert out['nvme0n1'][0] == 400
+
+    # --- cache tests
+
+    def test_cache_first_call(self):
+        input = {'disk1': nt(5, 5, 5)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        assert cache[0] == {'disk_io': input}
+        assert cache[1] == {'disk_io': {}}
+        assert cache[2] == {'disk_io': {}}
+
+    def test_cache_call_twice(self):
+        input = {'disk1': nt(5, 5, 5)}
+        wrap_numbers(input, 'disk_io')
+        input = {'disk1': nt(10, 10, 10)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        assert cache[0] == {'disk_io': input}
+        assert cache[1] == {
+            'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}
+        }
+        assert cache[2] == {'disk_io': {}}
+
+    def test_cache_wrap(self):
+        # let's say 100 is the threshold
+        input = {'disk1': nt(100, 100, 100)}
+        wrap_numbers(input, 'disk_io')
+
+        # first wrap restarts from 10
+        input = {'disk1': nt(100, 100, 10)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        assert cache[0] == {'disk_io': input}
+        assert cache[1] == {
+            'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100}
+        }
+        assert cache[2] == {'disk_io': {'disk1': set([('disk1', 2)])}}
+
+        def check_cache_info():
+            cache = wrap_numbers.cache_info()
+            assert cache[1] == {
+                'disk_io': {
+                    ('disk1', 0): 0,
+                    ('disk1', 1): 0,
+                    ('disk1', 2): 100,
+                }
+            }
+            assert cache[2] == {'disk_io': {'disk1': set([('disk1', 2)])}}
+
+        # then it remains the same
+        input = {'disk1': nt(100, 100, 10)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        assert cache[0] == {'disk_io': input}
+        check_cache_info()
+
+        # then it goes up
+        input = {'disk1': nt(100, 100, 90)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        assert cache[0] == {'disk_io': input}
+        check_cache_info()
+
+        # then it wraps again
+        input = {'disk1': nt(100, 100, 20)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        assert cache[0] == {'disk_io': input}
+        assert cache[1] == {
+            'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190}
+        }
+        assert cache[2] == {'disk_io': {'disk1': set([('disk1', 2)])}}
+
+    def test_cache_changing_keys(self):
+        input = {'disk1': nt(5, 5, 5)}
+        wrap_numbers(input, 'disk_io')
+        input = {'disk1': nt(5, 5, 5), 'disk2': nt(7, 7, 7)}
+        wrap_numbers(input, 'disk_io')
+        cache = wrap_numbers.cache_info()
+        assert cache[0] == {'disk_io': input}
+        assert cache[1] == {
+            'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0}
+        }
+        assert cache[2] == {'disk_io': {}}
+
+    def test_cache_clear(self):
+        input = {'disk1': nt(5, 5, 5)}
+        wrap_numbers(input, 'disk_io')
+        wrap_numbers(input, 'disk_io')
+        wrap_numbers.cache_clear('disk_io')
+        assert wrap_numbers.cache_info() == ({}, {}, {})
+        wrap_numbers.cache_clear('disk_io')
+        wrap_numbers.cache_clear('?!?')
+
+    @pytest.mark.skipif(not HAS_NET_IO_COUNTERS, reason="not supported")
+    def test_cache_clear_public_apis(self):
+        if not psutil.disk_io_counters() or not psutil.net_io_counters():
+            raise pytest.skip("no disks or NICs available")
+        psutil.disk_io_counters()
+        psutil.net_io_counters()
+        caches = wrap_numbers.cache_info()
+        for cache in caches:
+            assert 'psutil.disk_io_counters' in cache
+            assert 'psutil.net_io_counters' in cache
+
+        psutil.disk_io_counters.cache_clear()
+        caches = wrap_numbers.cache_info()
+        for cache in caches:
+            assert 'psutil.net_io_counters' in cache
+            assert 'psutil.disk_io_counters' not in cache
+
+        psutil.net_io_counters.cache_clear()
+        caches = wrap_numbers.cache_info()
+        assert caches == ({}, {}, {})
+
+
+# ===================================================================
+# --- Example script tests
+# ===================================================================
+
+
+@pytest.mark.skipif(
+    not os.path.exists(SCRIPTS_DIR), reason="can't locate scripts directory"
+)
+class TestScripts(PsutilTestCase):
+    """Tests for scripts in the "scripts" directory."""
+
+    @staticmethod
+    def assert_stdout(exe, *args, **kwargs):
+        kwargs.setdefault("env", PYTHON_EXE_ENV)
+        exe = '%s' % os.path.join(SCRIPTS_DIR, exe)
+        cmd = [PYTHON_EXE, exe]
+        for arg in args:
+            cmd.append(arg)
+        try:
+            out = sh(cmd, **kwargs).strip()
+        except RuntimeError as err:
+            if 'AccessDenied' in str(err):
+                return str(err)
+            else:
+                raise
+        assert out, out
+        return out
+
+    @staticmethod
+    def assert_syntax(exe):
+        exe = os.path.join(SCRIPTS_DIR, exe)
+        with open(exe, encoding="utf8") if PY3 else open(exe) as f:
+            src = f.read()
+        ast.parse(src)
+
+    def test_coverage(self):
+        # make sure all example scripts have a test method defined
+        meths = dir(self)
+        for name in os.listdir(SCRIPTS_DIR):
+            if name.endswith('.py'):
+                if 'test_' + os.path.splitext(name)[0] not in meths:
+                    # self.assert_stdout(name)
+                    raise self.fail(
+                        'no test defined for %r script'
+                        % os.path.join(SCRIPTS_DIR, name)
+                    )
+
+    @pytest.mark.skipif(not POSIX, reason="POSIX only")
+    def test_executable(self):
+        for root, dirs, files in os.walk(SCRIPTS_DIR):
+            for file in files:
+                if file.endswith('.py'):
+                    path = os.path.join(root, file)
+                    if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]:
+                        raise self.fail('%r is not executable' % path)
+
+    def test_disk_usage(self):
+        self.assert_stdout('disk_usage.py')
+
+    def test_free(self):
+        self.assert_stdout('free.py')
+
+    def test_meminfo(self):
+        self.assert_stdout('meminfo.py')
+
+    def test_procinfo(self):
+        self.assert_stdout('procinfo.py', str(os.getpid()))
+
+    @pytest.mark.skipif(CI_TESTING and not psutil.users(), reason="no users")
+    def test_who(self):
+        self.assert_stdout('who.py')
+
+    def test_ps(self):
+        self.assert_stdout('ps.py')
+
+    def test_pstree(self):
+        self.assert_stdout('pstree.py')
+
+    def test_netstat(self):
+        self.assert_stdout('netstat.py')
+
+    @pytest.mark.skipif(QEMU_USER, reason="QEMU user not supported")
+    def test_ifconfig(self):
+        self.assert_stdout('ifconfig.py')
+
+    @pytest.mark.skipif(not HAS_MEMORY_MAPS, reason="not supported")
+    def test_pmap(self):
+        self.assert_stdout('pmap.py', str(os.getpid()))
+
+    def test_procsmem(self):
+        if 'uss' not in psutil.Process().memory_full_info()._fields:
+            raise pytest.skip("not supported")
+        self.assert_stdout('procsmem.py')
+
+    def test_killall(self):
+        self.assert_syntax('killall.py')
+
+    def test_nettop(self):
+        self.assert_syntax('nettop.py')
+
+    def test_top(self):
+        self.assert_syntax('top.py')
+
+    def test_iotop(self):
+        self.assert_syntax('iotop.py')
+
+    def test_pidof(self):
+        output = self.assert_stdout('pidof.py', psutil.Process().name())
+        assert str(os.getpid()) in output
+
+    @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only")
+    def test_winservices(self):
+        self.assert_stdout('winservices.py')
+
+    def test_cpu_distribution(self):
+        self.assert_syntax('cpu_distribution.py')
+
+    @pytest.mark.skipif(not HAS_SENSORS_TEMPERATURES, reason="not supported")
+    def test_temperatures(self):
+        if not psutil.sensors_temperatures():
+            raise pytest.skip("no temperatures")
+        self.assert_stdout('temperatures.py')
+
+    @pytest.mark.skipif(not HAS_SENSORS_FANS, reason="not supported")
+    def test_fans(self):
+        if not psutil.sensors_fans():
+            raise pytest.skip("no fans")
+        self.assert_stdout('fans.py')
+
+    @pytest.mark.skipif(not HAS_SENSORS_BATTERY, reason="not supported")
+    @pytest.mark.skipif(not HAS_BATTERY, reason="no battery")
+    def test_battery(self):
+        self.assert_stdout('battery.py')
+
+    @pytest.mark.skipif(not HAS_SENSORS_BATTERY, reason="not supported")
+    @pytest.mark.skipif(not HAS_BATTERY, reason="no battery")
+    def test_sensors(self):
+        self.assert_stdout('sensors.py')