diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/psutil/tests/test_misc.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/psutil/tests/test_misc.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/psutil/tests/test_misc.py | 1058 |
1 files changed, 1058 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/psutil/tests/test_misc.py b/.venv/lib/python3.12/site-packages/psutil/tests/test_misc.py new file mode 100644 index 00000000..cf98f8b4 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/psutil/tests/test_misc.py @@ -0,0 +1,1058 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Miscellaneous tests.""" + +import ast +import collections +import errno +import json +import os +import pickle +import socket +import stat +import sys + +import psutil +import psutil.tests +from psutil import POSIX +from psutil import WINDOWS +from psutil._common import bcat +from psutil._common import cat +from psutil._common import debug +from psutil._common import isfile_strict +from psutil._common import memoize +from psutil._common import memoize_when_activated +from psutil._common import parse_environ_block +from psutil._common import supports_ipv6 +from psutil._common import wrap_numbers +from psutil._compat import PY3 +from psutil._compat import FileNotFoundError +from psutil._compat import redirect_stderr +from psutil.tests import CI_TESTING +from psutil.tests import HAS_BATTERY +from psutil.tests import HAS_MEMORY_MAPS +from psutil.tests import HAS_NET_IO_COUNTERS +from psutil.tests import HAS_SENSORS_BATTERY +from psutil.tests import HAS_SENSORS_FANS +from psutil.tests import HAS_SENSORS_TEMPERATURES +from psutil.tests import PYTHON_EXE +from psutil.tests import PYTHON_EXE_ENV +from psutil.tests import QEMU_USER +from psutil.tests import SCRIPTS_DIR +from psutil.tests import PsutilTestCase +from psutil.tests import mock +from psutil.tests import process_namespace +from psutil.tests import pytest +from psutil.tests import reload_module +from psutil.tests import sh +from psutil.tests import system_namespace + + +# =================================================================== +# --- Test classes' repr(), str(), ... +# =================================================================== + + +class TestSpecialMethods(PsutilTestCase): + def test_check_pid_range(self): + with pytest.raises(OverflowError): + psutil._psplatform.cext.check_pid_range(2**128) + with pytest.raises(psutil.NoSuchProcess): + psutil.Process(2**128) + + def test_process__repr__(self, func=repr): + p = psutil.Process(self.spawn_testproc().pid) + r = func(p) + assert "psutil.Process" in r + assert "pid=%s" % p.pid in r + assert "name='%s'" % str(p.name()) in r.replace("name=u'", "name='") + assert "status=" in r + assert "exitcode=" not in r + p.terminate() + p.wait() + r = func(p) + assert "status='terminated'" in r + assert "exitcode=" in r + + with mock.patch.object( + psutil.Process, + "name", + side_effect=psutil.ZombieProcess(os.getpid()), + ): + p = psutil.Process() + r = func(p) + assert "pid=%s" % p.pid in r + assert "status='zombie'" in r + assert "name=" not in r + with mock.patch.object( + psutil.Process, + "name", + side_effect=psutil.NoSuchProcess(os.getpid()), + ): + p = psutil.Process() + r = func(p) + assert "pid=%s" % p.pid in r + assert "terminated" in r + assert "name=" not in r + with mock.patch.object( + psutil.Process, + "name", + side_effect=psutil.AccessDenied(os.getpid()), + ): + p = psutil.Process() + r = func(p) + assert "pid=%s" % p.pid in r + assert "name=" not in r + + def test_process__str__(self): + self.test_process__repr__(func=str) + + def test_error__repr__(self): + assert repr(psutil.Error()) == "psutil.Error()" + + def test_error__str__(self): + assert str(psutil.Error()) == "" # noqa + + def test_no_such_process__repr__(self): + assert ( + repr(psutil.NoSuchProcess(321)) + == "psutil.NoSuchProcess(pid=321, msg='process no longer exists')" + ) + assert ( + repr(psutil.NoSuchProcess(321, name="name", msg="msg")) + == "psutil.NoSuchProcess(pid=321, name='name', msg='msg')" + ) + + def test_no_such_process__str__(self): + assert ( + str(psutil.NoSuchProcess(321)) + == "process no longer exists (pid=321)" + ) + assert ( + str(psutil.NoSuchProcess(321, name="name", msg="msg")) + == "msg (pid=321, name='name')" + ) + + def test_zombie_process__repr__(self): + assert ( + repr(psutil.ZombieProcess(321)) + == 'psutil.ZombieProcess(pid=321, msg="PID still ' + 'exists but it\'s a zombie")' + ) + assert ( + repr(psutil.ZombieProcess(321, name="name", ppid=320, msg="foo")) + == "psutil.ZombieProcess(pid=321, ppid=320, name='name'," + " msg='foo')" + ) + + def test_zombie_process__str__(self): + assert ( + str(psutil.ZombieProcess(321)) + == "PID still exists but it's a zombie (pid=321)" + ) + assert ( + str(psutil.ZombieProcess(321, name="name", ppid=320, msg="foo")) + == "foo (pid=321, ppid=320, name='name')" + ) + + def test_access_denied__repr__(self): + assert repr(psutil.AccessDenied(321)) == "psutil.AccessDenied(pid=321)" + assert ( + repr(psutil.AccessDenied(321, name="name", msg="msg")) + == "psutil.AccessDenied(pid=321, name='name', msg='msg')" + ) + + def test_access_denied__str__(self): + assert str(psutil.AccessDenied(321)) == "(pid=321)" + assert ( + str(psutil.AccessDenied(321, name="name", msg="msg")) + == "msg (pid=321, name='name')" + ) + + def test_timeout_expired__repr__(self): + assert ( + repr(psutil.TimeoutExpired(5)) + == "psutil.TimeoutExpired(seconds=5, msg='timeout after 5" + " seconds')" + ) + assert ( + repr(psutil.TimeoutExpired(5, pid=321, name="name")) + == "psutil.TimeoutExpired(pid=321, name='name', seconds=5, " + "msg='timeout after 5 seconds')" + ) + + def test_timeout_expired__str__(self): + assert str(psutil.TimeoutExpired(5)) == "timeout after 5 seconds" + assert ( + str(psutil.TimeoutExpired(5, pid=321, name="name")) + == "timeout after 5 seconds (pid=321, name='name')" + ) + + def test_process__eq__(self): + p1 = psutil.Process() + p2 = psutil.Process() + assert p1 == p2 + p2._ident = (0, 0) + assert p1 != p2 + assert p1 != 'foo' + + def test_process__hash__(self): + s = set([psutil.Process(), psutil.Process()]) + assert len(s) == 1 + + +# =================================================================== +# --- Misc, generic, corner cases +# =================================================================== + + +class TestMisc(PsutilTestCase): + def test__all__(self): + dir_psutil = dir(psutil) + for name in dir_psutil: + if name in { + 'debug', + 'long', + 'tests', + 'test', + 'PermissionError', + 'ProcessLookupError', + }: + continue + if not name.startswith('_'): + try: + __import__(name) + except ImportError: + if name not in psutil.__all__: + fun = getattr(psutil, name) + if fun is None: + continue + if ( + fun.__doc__ is not None + and 'deprecated' not in fun.__doc__.lower() + ): + raise self.fail('%r not in psutil.__all__' % name) + + # Import 'star' will break if __all__ is inconsistent, see: + # https://github.com/giampaolo/psutil/issues/656 + # Can't do `from psutil import *` as it won't work on python 3 + # so we simply iterate over __all__. + for name in psutil.__all__: + assert name in dir_psutil + + def test_version(self): + assert ( + '.'.join([str(x) for x in psutil.version_info]) + == psutil.__version__ + ) + + def test_process_as_dict_no_new_names(self): + # See https://github.com/giampaolo/psutil/issues/813 + p = psutil.Process() + p.foo = '1' + assert 'foo' not in p.as_dict() + + def test_serialization(self): + def check(ret): + json.loads(json.dumps(ret)) + + a = pickle.dumps(ret) + b = pickle.loads(a) + assert ret == b + + # --- process APIs + + proc = psutil.Process() + check(psutil.Process().as_dict()) + + ns = process_namespace(proc) + for fun, name in ns.iter(ns.getters, clear_cache=True): + with self.subTest(proc=proc, name=name): + try: + ret = fun() + except psutil.Error: + pass + else: + check(ret) + + # --- system APIs + + ns = system_namespace() + for fun, name in ns.iter(ns.getters): + if name in {"win_service_iter", "win_service_get"}: + continue + if QEMU_USER and name == "net_if_stats": + # OSError: [Errno 38] ioctl(SIOCETHTOOL) not implemented + continue + with self.subTest(name=name): + try: + ret = fun() + except psutil.AccessDenied: + pass + else: + check(ret) + + # --- exception classes + + b = pickle.loads( + pickle.dumps( + psutil.NoSuchProcess(pid=4567, name='name', msg='msg') + ) + ) + assert isinstance(b, psutil.NoSuchProcess) + assert b.pid == 4567 + assert b.name == 'name' + assert b.msg == 'msg' + + b = pickle.loads( + pickle.dumps( + psutil.ZombieProcess(pid=4567, name='name', ppid=42, msg='msg') + ) + ) + assert isinstance(b, psutil.ZombieProcess) + assert b.pid == 4567 + assert b.ppid == 42 + assert b.name == 'name' + assert b.msg == 'msg' + + b = pickle.loads( + pickle.dumps(psutil.AccessDenied(pid=123, name='name', msg='msg')) + ) + assert isinstance(b, psutil.AccessDenied) + assert b.pid == 123 + assert b.name == 'name' + assert b.msg == 'msg' + + b = pickle.loads( + pickle.dumps( + psutil.TimeoutExpired(seconds=33, pid=4567, name='name') + ) + ) + assert isinstance(b, psutil.TimeoutExpired) + assert b.seconds == 33 + assert b.pid == 4567 + assert b.name == 'name' + + # # XXX: https://github.com/pypa/setuptools/pull/2896 + # @pytest.mark.skipif(APPVEYOR, + # reason="temporarily disabled due to setuptools bug" + # ) + # def test_setup_script(self): + # setup_py = os.path.join(ROOT_DIR, 'setup.py') + # if CI_TESTING and not os.path.exists(setup_py): + # raise pytest.skip("can't find setup.py") + # module = import_module_by_path(setup_py) + # self.assertRaises(SystemExit, module.setup) + # self.assertEqual(module.get_version(), psutil.__version__) + + def test_ad_on_process_creation(self): + # We are supposed to be able to instantiate Process also in case + # of zombie processes or access denied. + with mock.patch.object( + psutil.Process, '_get_ident', side_effect=psutil.AccessDenied + ) as meth: + psutil.Process() + assert meth.called + + with mock.patch.object( + psutil.Process, '_get_ident', side_effect=psutil.ZombieProcess(1) + ) as meth: + psutil.Process() + assert meth.called + + with mock.patch.object( + psutil.Process, '_get_ident', side_effect=ValueError + ) as meth: + with pytest.raises(ValueError): + psutil.Process() + assert meth.called + + with mock.patch.object( + psutil.Process, '_get_ident', side_effect=psutil.NoSuchProcess(1) + ) as meth: + with self.assertRaises(psutil.NoSuchProcess): + psutil.Process() + assert meth.called + + def test_sanity_version_check(self): + # see: https://github.com/giampaolo/psutil/issues/564 + with mock.patch( + "psutil._psplatform.cext.version", return_value="0.0.0" + ): + with pytest.raises(ImportError) as cm: + reload_module(psutil) + assert "version conflict" in str(cm.value).lower() + + +# =================================================================== +# --- psutil/_common.py utils +# =================================================================== + + +class TestMemoizeDecorator(PsutilTestCase): + def setUp(self): + self.calls = [] + + tearDown = setUp + + def run_against(self, obj, expected_retval=None): + # no args + for _ in range(2): + ret = obj() + assert self.calls == [((), {})] + if expected_retval is not None: + assert ret == expected_retval + # with args + for _ in range(2): + ret = obj(1) + assert self.calls == [((), {}), ((1,), {})] + if expected_retval is not None: + assert ret == expected_retval + # with args + kwargs + for _ in range(2): + ret = obj(1, bar=2) + assert self.calls == [((), {}), ((1,), {}), ((1,), {'bar': 2})] + if expected_retval is not None: + assert ret == expected_retval + # clear cache + assert len(self.calls) == 3 + obj.cache_clear() + ret = obj() + if expected_retval is not None: + assert ret == expected_retval + assert len(self.calls) == 4 + # docstring + assert obj.__doc__ == "My docstring." + + def test_function(self): + @memoize + def foo(*args, **kwargs): + """My docstring.""" + baseclass.calls.append((args, kwargs)) + return 22 + + baseclass = self + self.run_against(foo, expected_retval=22) + + def test_class(self): + @memoize + class Foo: + """My docstring.""" + + def __init__(self, *args, **kwargs): + baseclass.calls.append((args, kwargs)) + + def bar(self): + return 22 + + baseclass = self + self.run_against(Foo, expected_retval=None) + assert Foo().bar() == 22 + + def test_class_singleton(self): + # @memoize can be used against classes to create singletons + @memoize + class Bar: + def __init__(self, *args, **kwargs): + pass + + assert Bar() is Bar() + assert id(Bar()) == id(Bar()) + assert id(Bar(1)) == id(Bar(1)) + assert id(Bar(1, foo=3)) == id(Bar(1, foo=3)) + assert id(Bar(1)) != id(Bar(2)) + + def test_staticmethod(self): + class Foo: + @staticmethod + @memoize + def bar(*args, **kwargs): + """My docstring.""" + baseclass.calls.append((args, kwargs)) + return 22 + + baseclass = self + self.run_against(Foo().bar, expected_retval=22) + + def test_classmethod(self): + class Foo: + @classmethod + @memoize + def bar(cls, *args, **kwargs): + """My docstring.""" + baseclass.calls.append((args, kwargs)) + return 22 + + baseclass = self + self.run_against(Foo().bar, expected_retval=22) + + def test_original(self): + # This was the original test before I made it dynamic to test it + # against different types. Keeping it anyway. + @memoize + def foo(*args, **kwargs): + """Foo docstring.""" + calls.append(None) + return (args, kwargs) + + calls = [] + # no args + for _ in range(2): + ret = foo() + expected = ((), {}) + assert ret == expected + assert len(calls) == 1 + # with args + for _ in range(2): + ret = foo(1) + expected = ((1,), {}) + assert ret == expected + assert len(calls) == 2 + # with args + kwargs + for _ in range(2): + ret = foo(1, bar=2) + expected = ((1,), {'bar': 2}) + assert ret == expected + assert len(calls) == 3 + # clear cache + foo.cache_clear() + ret = foo() + expected = ((), {}) + assert ret == expected + assert len(calls) == 4 + # docstring + assert foo.__doc__ == "Foo docstring." + + +class TestCommonModule(PsutilTestCase): + def test_memoize_when_activated(self): + class Foo: + @memoize_when_activated + def foo(self): + calls.append(None) + + f = Foo() + calls = [] + f.foo() + f.foo() + assert len(calls) == 2 + + # activate + calls = [] + f.foo.cache_activate(f) + f.foo() + f.foo() + assert len(calls) == 1 + + # deactivate + calls = [] + f.foo.cache_deactivate(f) + f.foo() + f.foo() + assert len(calls) == 2 + + def test_parse_environ_block(self): + def k(s): + return s.upper() if WINDOWS else s + + assert parse_environ_block("a=1\0") == {k("a"): "1"} + assert parse_environ_block("a=1\0b=2\0\0") == { + k("a"): "1", + k("b"): "2", + } + assert parse_environ_block("a=1\0b=\0\0") == {k("a"): "1", k("b"): ""} + # ignore everything after \0\0 + assert parse_environ_block("a=1\0b=2\0\0c=3\0") == { + k("a"): "1", + k("b"): "2", + } + # ignore everything that is not an assignment + assert parse_environ_block("xxx\0a=1\0") == {k("a"): "1"} + assert parse_environ_block("a=1\0=b=2\0") == {k("a"): "1"} + # do not fail if the block is incomplete + assert parse_environ_block("a=1\0b=2") == {k("a"): "1"} + + def test_supports_ipv6(self): + self.addCleanup(supports_ipv6.cache_clear) + if supports_ipv6(): + with mock.patch('psutil._common.socket') as s: + s.has_ipv6 = False + supports_ipv6.cache_clear() + assert not supports_ipv6() + + supports_ipv6.cache_clear() + with mock.patch( + 'psutil._common.socket.socket', side_effect=socket.error + ) as s: + assert not supports_ipv6() + assert s.called + + supports_ipv6.cache_clear() + with mock.patch( + 'psutil._common.socket.socket', side_effect=socket.gaierror + ) as s: + assert not supports_ipv6() + supports_ipv6.cache_clear() + assert s.called + + supports_ipv6.cache_clear() + with mock.patch( + 'psutil._common.socket.socket.bind', + side_effect=socket.gaierror, + ) as s: + assert not supports_ipv6() + supports_ipv6.cache_clear() + assert s.called + else: + with pytest.raises(socket.error): + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + try: + sock.bind(("::1", 0)) + finally: + sock.close() + + def test_isfile_strict(self): + this_file = os.path.abspath(__file__) + assert isfile_strict(this_file) + assert not isfile_strict(os.path.dirname(this_file)) + with mock.patch( + 'psutil._common.os.stat', side_effect=OSError(errno.EPERM, "foo") + ): + with pytest.raises(OSError): + isfile_strict(this_file) + with mock.patch( + 'psutil._common.os.stat', side_effect=OSError(errno.EACCES, "foo") + ): + with pytest.raises(OSError): + isfile_strict(this_file) + with mock.patch( + 'psutil._common.os.stat', side_effect=OSError(errno.ENOENT, "foo") + ): + assert not isfile_strict(this_file) + with mock.patch('psutil._common.stat.S_ISREG', return_value=False): + assert not isfile_strict(this_file) + + def test_debug(self): + if PY3: + from io import StringIO + else: + from StringIO import StringIO + + with mock.patch.object(psutil._common, "PSUTIL_DEBUG", True): + with redirect_stderr(StringIO()) as f: + debug("hello") + sys.stderr.flush() + msg = f.getvalue() + assert msg.startswith("psutil-debug"), msg + assert "hello" in msg + assert __file__.replace('.pyc', '.py') in msg + + # supposed to use repr(exc) + with mock.patch.object(psutil._common, "PSUTIL_DEBUG", True): + with redirect_stderr(StringIO()) as f: + debug(ValueError("this is an error")) + msg = f.getvalue() + assert "ignoring ValueError" in msg + assert "'this is an error'" in msg + + # supposed to use str(exc), because of extra info about file name + with mock.patch.object(psutil._common, "PSUTIL_DEBUG", True): + with redirect_stderr(StringIO()) as f: + exc = OSError(2, "no such file") + exc.filename = "/foo" + debug(exc) + msg = f.getvalue() + assert "no such file" in msg + assert "/foo" in msg + + def test_cat_bcat(self): + testfn = self.get_testfn() + with open(testfn, "w") as f: + f.write("foo") + assert cat(testfn) == "foo" + assert bcat(testfn) == b"foo" + with pytest.raises(FileNotFoundError): + cat(testfn + '-invalid') + with pytest.raises(FileNotFoundError): + bcat(testfn + '-invalid') + assert cat(testfn + '-invalid', fallback="bar") == "bar" + assert bcat(testfn + '-invalid', fallback="bar") == "bar" + + +# =================================================================== +# --- Tests for wrap_numbers() function. +# =================================================================== + + +nt = collections.namedtuple('foo', 'a b c') + + +class TestWrapNumbers(PsutilTestCase): + def setUp(self): + wrap_numbers.cache_clear() + + tearDown = setUp + + def test_first_call(self): + input = {'disk1': nt(5, 5, 5)} + assert wrap_numbers(input, 'disk_io') == input + + def test_input_hasnt_changed(self): + input = {'disk1': nt(5, 5, 5)} + assert wrap_numbers(input, 'disk_io') == input + assert wrap_numbers(input, 'disk_io') == input + + def test_increase_but_no_wrap(self): + input = {'disk1': nt(5, 5, 5)} + assert wrap_numbers(input, 'disk_io') == input + input = {'disk1': nt(10, 15, 20)} + assert wrap_numbers(input, 'disk_io') == input + input = {'disk1': nt(20, 25, 30)} + assert wrap_numbers(input, 'disk_io') == input + input = {'disk1': nt(20, 25, 30)} + assert wrap_numbers(input, 'disk_io') == input + + def test_wrap(self): + # let's say 100 is the threshold + input = {'disk1': nt(100, 100, 100)} + assert wrap_numbers(input, 'disk_io') == input + # first wrap restarts from 10 + input = {'disk1': nt(100, 100, 10)} + assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 110)} + # then it remains the same + input = {'disk1': nt(100, 100, 10)} + assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 110)} + # then it goes up + input = {'disk1': nt(100, 100, 90)} + assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 190)} + # then it wraps again + input = {'disk1': nt(100, 100, 20)} + assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 210)} + # and remains the same + input = {'disk1': nt(100, 100, 20)} + assert wrap_numbers(input, 'disk_io') == {'disk1': nt(100, 100, 210)} + # now wrap another num + input = {'disk1': nt(50, 100, 20)} + assert wrap_numbers(input, 'disk_io') == {'disk1': nt(150, 100, 210)} + # and again + input = {'disk1': nt(40, 100, 20)} + assert wrap_numbers(input, 'disk_io') == {'disk1': nt(190, 100, 210)} + # keep it the same + input = {'disk1': nt(40, 100, 20)} + assert wrap_numbers(input, 'disk_io') == {'disk1': nt(190, 100, 210)} + + def test_changing_keys(self): + # Emulate a case where the second call to disk_io() + # (or whatever) provides a new disk, then the new disk + # disappears on the third call. + input = {'disk1': nt(5, 5, 5)} + assert wrap_numbers(input, 'disk_io') == input + input = {'disk1': nt(5, 5, 5), 'disk2': nt(7, 7, 7)} + assert wrap_numbers(input, 'disk_io') == input + input = {'disk1': nt(8, 8, 8)} + assert wrap_numbers(input, 'disk_io') == input + + def test_changing_keys_w_wrap(self): + input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 100)} + assert wrap_numbers(input, 'disk_io') == input + # disk 2 wraps + input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 10)} + assert wrap_numbers(input, 'disk_io') == { + 'disk1': nt(50, 50, 50), + 'disk2': nt(100, 100, 110), + } + # disk 2 disappears + input = {'disk1': nt(50, 50, 50)} + assert wrap_numbers(input, 'disk_io') == input + + # then it appears again; the old wrap is supposed to be + # gone. + input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 100)} + assert wrap_numbers(input, 'disk_io') == input + # remains the same + input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 100)} + assert wrap_numbers(input, 'disk_io') == input + # and then wraps again + input = {'disk1': nt(50, 50, 50), 'disk2': nt(100, 100, 10)} + assert wrap_numbers(input, 'disk_io') == { + 'disk1': nt(50, 50, 50), + 'disk2': nt(100, 100, 110), + } + + def test_real_data(self): + d = { + 'nvme0n1': (300, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), + 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), + 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), + 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348), + } + assert wrap_numbers(d, 'disk_io') == d + assert wrap_numbers(d, 'disk_io') == d + # decrease this ↓ + d = { + 'nvme0n1': (100, 508, 640, 1571, 5970, 1987, 2049, 451751, 47048), + 'nvme0n1p1': (1171, 2, 5600256, 1024, 516, 0, 0, 0, 8), + 'nvme0n1p2': (54, 54, 2396160, 5165056, 4, 24, 30, 1207, 28), + 'nvme0n1p3': (2389, 4539, 5154, 150, 4828, 1844, 2019, 398, 348), + } + out = wrap_numbers(d, 'disk_io') + assert out['nvme0n1'][0] == 400 + + # --- cache tests + + def test_cache_first_call(self): + input = {'disk1': nt(5, 5, 5)} + wrap_numbers(input, 'disk_io') + cache = wrap_numbers.cache_info() + assert cache[0] == {'disk_io': input} + assert cache[1] == {'disk_io': {}} + assert cache[2] == {'disk_io': {}} + + def test_cache_call_twice(self): + input = {'disk1': nt(5, 5, 5)} + wrap_numbers(input, 'disk_io') + input = {'disk1': nt(10, 10, 10)} + wrap_numbers(input, 'disk_io') + cache = wrap_numbers.cache_info() + assert cache[0] == {'disk_io': input} + assert cache[1] == { + 'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0} + } + assert cache[2] == {'disk_io': {}} + + def test_cache_wrap(self): + # let's say 100 is the threshold + input = {'disk1': nt(100, 100, 100)} + wrap_numbers(input, 'disk_io') + + # first wrap restarts from 10 + input = {'disk1': nt(100, 100, 10)} + wrap_numbers(input, 'disk_io') + cache = wrap_numbers.cache_info() + assert cache[0] == {'disk_io': input} + assert cache[1] == { + 'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 100} + } + assert cache[2] == {'disk_io': {'disk1': set([('disk1', 2)])}} + + def check_cache_info(): + cache = wrap_numbers.cache_info() + assert cache[1] == { + 'disk_io': { + ('disk1', 0): 0, + ('disk1', 1): 0, + ('disk1', 2): 100, + } + } + assert cache[2] == {'disk_io': {'disk1': set([('disk1', 2)])}} + + # then it remains the same + input = {'disk1': nt(100, 100, 10)} + wrap_numbers(input, 'disk_io') + cache = wrap_numbers.cache_info() + assert cache[0] == {'disk_io': input} + check_cache_info() + + # then it goes up + input = {'disk1': nt(100, 100, 90)} + wrap_numbers(input, 'disk_io') + cache = wrap_numbers.cache_info() + assert cache[0] == {'disk_io': input} + check_cache_info() + + # then it wraps again + input = {'disk1': nt(100, 100, 20)} + wrap_numbers(input, 'disk_io') + cache = wrap_numbers.cache_info() + assert cache[0] == {'disk_io': input} + assert cache[1] == { + 'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 190} + } + assert cache[2] == {'disk_io': {'disk1': set([('disk1', 2)])}} + + def test_cache_changing_keys(self): + input = {'disk1': nt(5, 5, 5)} + wrap_numbers(input, 'disk_io') + input = {'disk1': nt(5, 5, 5), 'disk2': nt(7, 7, 7)} + wrap_numbers(input, 'disk_io') + cache = wrap_numbers.cache_info() + assert cache[0] == {'disk_io': input} + assert cache[1] == { + 'disk_io': {('disk1', 0): 0, ('disk1', 1): 0, ('disk1', 2): 0} + } + assert cache[2] == {'disk_io': {}} + + def test_cache_clear(self): + input = {'disk1': nt(5, 5, 5)} + wrap_numbers(input, 'disk_io') + wrap_numbers(input, 'disk_io') + wrap_numbers.cache_clear('disk_io') + assert wrap_numbers.cache_info() == ({}, {}, {}) + wrap_numbers.cache_clear('disk_io') + wrap_numbers.cache_clear('?!?') + + @pytest.mark.skipif(not HAS_NET_IO_COUNTERS, reason="not supported") + def test_cache_clear_public_apis(self): + if not psutil.disk_io_counters() or not psutil.net_io_counters(): + raise pytest.skip("no disks or NICs available") + psutil.disk_io_counters() + psutil.net_io_counters() + caches = wrap_numbers.cache_info() + for cache in caches: + assert 'psutil.disk_io_counters' in cache + assert 'psutil.net_io_counters' in cache + + psutil.disk_io_counters.cache_clear() + caches = wrap_numbers.cache_info() + for cache in caches: + assert 'psutil.net_io_counters' in cache + assert 'psutil.disk_io_counters' not in cache + + psutil.net_io_counters.cache_clear() + caches = wrap_numbers.cache_info() + assert caches == ({}, {}, {}) + + +# =================================================================== +# --- Example script tests +# =================================================================== + + +@pytest.mark.skipif( + not os.path.exists(SCRIPTS_DIR), reason="can't locate scripts directory" +) +class TestScripts(PsutilTestCase): + """Tests for scripts in the "scripts" directory.""" + + @staticmethod + def assert_stdout(exe, *args, **kwargs): + kwargs.setdefault("env", PYTHON_EXE_ENV) + exe = '%s' % os.path.join(SCRIPTS_DIR, exe) + cmd = [PYTHON_EXE, exe] + for arg in args: + cmd.append(arg) + try: + out = sh(cmd, **kwargs).strip() + except RuntimeError as err: + if 'AccessDenied' in str(err): + return str(err) + else: + raise + assert out, out + return out + + @staticmethod + def assert_syntax(exe): + exe = os.path.join(SCRIPTS_DIR, exe) + with open(exe, encoding="utf8") if PY3 else open(exe) as f: + src = f.read() + ast.parse(src) + + def test_coverage(self): + # make sure all example scripts have a test method defined + meths = dir(self) + for name in os.listdir(SCRIPTS_DIR): + if name.endswith('.py'): + if 'test_' + os.path.splitext(name)[0] not in meths: + # self.assert_stdout(name) + raise self.fail( + 'no test defined for %r script' + % os.path.join(SCRIPTS_DIR, name) + ) + + @pytest.mark.skipif(not POSIX, reason="POSIX only") + def test_executable(self): + for root, dirs, files in os.walk(SCRIPTS_DIR): + for file in files: + if file.endswith('.py'): + path = os.path.join(root, file) + if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]: + raise self.fail('%r is not executable' % path) + + def test_disk_usage(self): + self.assert_stdout('disk_usage.py') + + def test_free(self): + self.assert_stdout('free.py') + + def test_meminfo(self): + self.assert_stdout('meminfo.py') + + def test_procinfo(self): + self.assert_stdout('procinfo.py', str(os.getpid())) + + @pytest.mark.skipif(CI_TESTING and not psutil.users(), reason="no users") + def test_who(self): + self.assert_stdout('who.py') + + def test_ps(self): + self.assert_stdout('ps.py') + + def test_pstree(self): + self.assert_stdout('pstree.py') + + def test_netstat(self): + self.assert_stdout('netstat.py') + + @pytest.mark.skipif(QEMU_USER, reason="QEMU user not supported") + def test_ifconfig(self): + self.assert_stdout('ifconfig.py') + + @pytest.mark.skipif(not HAS_MEMORY_MAPS, reason="not supported") + def test_pmap(self): + self.assert_stdout('pmap.py', str(os.getpid())) + + def test_procsmem(self): + if 'uss' not in psutil.Process().memory_full_info()._fields: + raise pytest.skip("not supported") + self.assert_stdout('procsmem.py') + + def test_killall(self): + self.assert_syntax('killall.py') + + def test_nettop(self): + self.assert_syntax('nettop.py') + + def test_top(self): + self.assert_syntax('top.py') + + def test_iotop(self): + self.assert_syntax('iotop.py') + + def test_pidof(self): + output = self.assert_stdout('pidof.py', psutil.Process().name()) + assert str(os.getpid()) in output + + @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only") + def test_winservices(self): + self.assert_stdout('winservices.py') + + def test_cpu_distribution(self): + self.assert_syntax('cpu_distribution.py') + + @pytest.mark.skipif(not HAS_SENSORS_TEMPERATURES, reason="not supported") + def test_temperatures(self): + if not psutil.sensors_temperatures(): + raise pytest.skip("no temperatures") + self.assert_stdout('temperatures.py') + + @pytest.mark.skipif(not HAS_SENSORS_FANS, reason="not supported") + def test_fans(self): + if not psutil.sensors_fans(): + raise pytest.skip("no fans") + self.assert_stdout('fans.py') + + @pytest.mark.skipif(not HAS_SENSORS_BATTERY, reason="not supported") + @pytest.mark.skipif(not HAS_BATTERY, reason="no battery") + def test_battery(self): + self.assert_stdout('battery.py') + + @pytest.mark.skipif(not HAS_SENSORS_BATTERY, reason="not supported") + @pytest.mark.skipif(not HAS_BATTERY, reason="no battery") + def test_sensors(self): + self.assert_stdout('sensors.py') |