about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/psutil/tests/test_connections.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/psutil/tests/test_connections.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/psutil/tests/test_connections.py')
-rw-r--r--.venv/lib/python3.12/site-packages/psutil/tests/test_connections.py567
1 files changed, 567 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/psutil/tests/test_connections.py b/.venv/lib/python3.12/site-packages/psutil/tests/test_connections.py
new file mode 100644
index 00000000..bca12ff4
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/psutil/tests/test_connections.py
@@ -0,0 +1,567 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests for psutil.net_connections() and Process.net_connections() APIs."""
+
+import os
+import socket
+import textwrap
+from contextlib import closing
+from socket import AF_INET
+from socket import AF_INET6
+from socket import SOCK_DGRAM
+from socket import SOCK_STREAM
+
+import psutil
+from psutil import FREEBSD
+from psutil import LINUX
+from psutil import MACOS
+from psutil import NETBSD
+from psutil import OPENBSD
+from psutil import POSIX
+from psutil import SUNOS
+from psutil import WINDOWS
+from psutil._common import supports_ipv6
+from psutil._compat import PY3
+from psutil.tests import AF_UNIX
+from psutil.tests import HAS_NET_CONNECTIONS_UNIX
+from psutil.tests import SKIP_SYSCONS
+from psutil.tests import PsutilTestCase
+from psutil.tests import bind_socket
+from psutil.tests import bind_unix_socket
+from psutil.tests import check_connection_ntuple
+from psutil.tests import create_sockets
+from psutil.tests import filter_proc_net_connections
+from psutil.tests import pytest
+from psutil.tests import reap_children
+from psutil.tests import retry_on_failure
+from psutil.tests import skip_on_access_denied
+from psutil.tests import tcp_socketpair
+from psutil.tests import unix_socketpair
+from psutil.tests import wait_for_file
+
+
+SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object())
+
+
+def this_proc_net_connections(kind):
+    cons = psutil.Process().net_connections(kind=kind)
+    if kind in {"all", "unix"}:
+        return filter_proc_net_connections(cons)
+    return cons
+
+
+@pytest.mark.xdist_group(name="serial")
+class ConnectionTestCase(PsutilTestCase):
+    def setUp(self):
+        assert this_proc_net_connections(kind='all') == []
+
+    def tearDown(self):
+        # Make sure we closed all resources.
+        assert this_proc_net_connections(kind='all') == []
+
+    def compare_procsys_connections(self, pid, proc_cons, kind='all'):
+        """Given a process PID and its list of connections compare
+        those against system-wide connections retrieved via
+        psutil.net_connections.
+        """
+        try:
+            sys_cons = psutil.net_connections(kind=kind)
+        except psutil.AccessDenied:
+            # On MACOS, system-wide connections are retrieved by iterating
+            # over all processes
+            if MACOS:
+                return
+            else:
+                raise
+        # Filter for this proc PID and exlucde PIDs from the tuple.
+        sys_cons = [c[:-1] for c in sys_cons if c.pid == pid]
+        sys_cons.sort()
+        proc_cons.sort()
+        assert proc_cons == sys_cons
+
+
+class TestBasicOperations(ConnectionTestCase):
+    @pytest.mark.skipif(SKIP_SYSCONS, reason="requires root")
+    def test_system(self):
+        with create_sockets():
+            for conn in psutil.net_connections(kind='all'):
+                check_connection_ntuple(conn)
+
+    def test_process(self):
+        with create_sockets():
+            for conn in this_proc_net_connections(kind='all'):
+                check_connection_ntuple(conn)
+
+    def test_invalid_kind(self):
+        with pytest.raises(ValueError):
+            this_proc_net_connections(kind='???')
+        with pytest.raises(ValueError):
+            psutil.net_connections(kind='???')
+
+
+@pytest.mark.xdist_group(name="serial")
+class TestUnconnectedSockets(ConnectionTestCase):
+    """Tests sockets which are open but not connected to anything."""
+
+    def get_conn_from_sock(self, sock):
+        cons = this_proc_net_connections(kind='all')
+        smap = dict([(c.fd, c) for c in cons])
+        if NETBSD or FREEBSD:
+            # NetBSD opens a UNIX socket to /var/log/run
+            # so there may be more connections.
+            return smap[sock.fileno()]
+        else:
+            assert len(cons) == 1
+            if cons[0].fd != -1:
+                assert smap[sock.fileno()].fd == sock.fileno()
+            return cons[0]
+
+    def check_socket(self, sock):
+        """Given a socket, makes sure it matches the one obtained
+        via psutil. It assumes this process created one connection
+        only (the one supposed to be checked).
+        """
+        conn = self.get_conn_from_sock(sock)
+        check_connection_ntuple(conn)
+
+        # fd, family, type
+        if conn.fd != -1:
+            assert conn.fd == sock.fileno()
+        assert conn.family == sock.family
+        # see: http://bugs.python.org/issue30204
+        assert conn.type == sock.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
+
+        # local address
+        laddr = sock.getsockname()
+        if not laddr and PY3 and isinstance(laddr, bytes):
+            # See: http://bugs.python.org/issue30205
+            laddr = laddr.decode()
+        if sock.family == AF_INET6:
+            laddr = laddr[:2]
+        assert conn.laddr == laddr
+
+        # XXX Solaris can't retrieve system-wide UNIX sockets
+        if sock.family == AF_UNIX and HAS_NET_CONNECTIONS_UNIX:
+            cons = this_proc_net_connections(kind='all')
+            self.compare_procsys_connections(os.getpid(), cons, kind='all')
+        return conn
+
+    def test_tcp_v4(self):
+        addr = ("127.0.0.1", 0)
+        with closing(bind_socket(AF_INET, SOCK_STREAM, addr=addr)) as sock:
+            conn = self.check_socket(sock)
+            assert conn.raddr == ()
+            assert conn.status == psutil.CONN_LISTEN
+
+    @pytest.mark.skipif(not supports_ipv6(), reason="IPv6 not supported")
+    def test_tcp_v6(self):
+        addr = ("::1", 0)
+        with closing(bind_socket(AF_INET6, SOCK_STREAM, addr=addr)) as sock:
+            conn = self.check_socket(sock)
+            assert conn.raddr == ()
+            assert conn.status == psutil.CONN_LISTEN
+
+    def test_udp_v4(self):
+        addr = ("127.0.0.1", 0)
+        with closing(bind_socket(AF_INET, SOCK_DGRAM, addr=addr)) as sock:
+            conn = self.check_socket(sock)
+            assert conn.raddr == ()
+            assert conn.status == psutil.CONN_NONE
+
+    @pytest.mark.skipif(not supports_ipv6(), reason="IPv6 not supported")
+    def test_udp_v6(self):
+        addr = ("::1", 0)
+        with closing(bind_socket(AF_INET6, SOCK_DGRAM, addr=addr)) as sock:
+            conn = self.check_socket(sock)
+            assert conn.raddr == ()
+            assert conn.status == psutil.CONN_NONE
+
+    @pytest.mark.skipif(not POSIX, reason="POSIX only")
+    def test_unix_tcp(self):
+        testfn = self.get_testfn()
+        with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock:
+            conn = self.check_socket(sock)
+            assert conn.raddr == ""  # noqa
+            assert conn.status == psutil.CONN_NONE
+
+    @pytest.mark.skipif(not POSIX, reason="POSIX only")
+    def test_unix_udp(self):
+        testfn = self.get_testfn()
+        with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock:
+            conn = self.check_socket(sock)
+            assert conn.raddr == ""  # noqa
+            assert conn.status == psutil.CONN_NONE
+
+
+@pytest.mark.xdist_group(name="serial")
+class TestConnectedSocket(ConnectionTestCase):
+    """Test socket pairs which are actually connected to
+    each other.
+    """
+
+    # On SunOS, even after we close() it, the server socket stays around
+    # in TIME_WAIT state.
+    @pytest.mark.skipif(SUNOS, reason="unreliable on SUONS")
+    def test_tcp(self):
+        addr = ("127.0.0.1", 0)
+        assert this_proc_net_connections(kind='tcp4') == []
+        server, client = tcp_socketpair(AF_INET, addr=addr)
+        try:
+            cons = this_proc_net_connections(kind='tcp4')
+            assert len(cons) == 2
+            assert cons[0].status == psutil.CONN_ESTABLISHED
+            assert cons[1].status == psutil.CONN_ESTABLISHED
+            # May not be fast enough to change state so it stays
+            # commenteed.
+            # client.close()
+            # cons = this_proc_net_connections(kind='all')
+            # self.assertEqual(len(cons), 1)
+            # self.assertEqual(cons[0].status, psutil.CONN_CLOSE_WAIT)
+        finally:
+            server.close()
+            client.close()
+
+    @pytest.mark.skipif(not POSIX, reason="POSIX only")
+    def test_unix(self):
+        testfn = self.get_testfn()
+        server, client = unix_socketpair(testfn)
+        try:
+            cons = this_proc_net_connections(kind='unix')
+            assert not (cons[0].laddr and cons[0].raddr), cons
+            assert not (cons[1].laddr and cons[1].raddr), cons
+            if NETBSD or FREEBSD:
+                # On NetBSD creating a UNIX socket will cause
+                # a UNIX connection to  /var/run/log.
+                cons = [c for c in cons if c.raddr != '/var/run/log']
+            assert len(cons) == 2
+            if LINUX or FREEBSD or SUNOS or OPENBSD:
+                # remote path is never set
+                assert cons[0].raddr == ""  # noqa
+                assert cons[1].raddr == ""  # noqa
+                # one local address should though
+                assert testfn == (cons[0].laddr or cons[1].laddr)
+            else:
+                # On other systems either the laddr or raddr
+                # of both peers are set.
+                assert (cons[0].laddr or cons[1].laddr) == testfn
+        finally:
+            server.close()
+            client.close()
+
+
+class TestFilters(ConnectionTestCase):
+    def test_filters(self):
+        def check(kind, families, types):
+            for conn in this_proc_net_connections(kind=kind):
+                assert conn.family in families
+                assert conn.type in types
+            if not SKIP_SYSCONS:
+                for conn in psutil.net_connections(kind=kind):
+                    assert conn.family in families
+                    assert conn.type in types
+
+        with create_sockets():
+            check(
+                'all',
+                [AF_INET, AF_INET6, AF_UNIX],
+                [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET],
+            )
+            check('inet', [AF_INET, AF_INET6], [SOCK_STREAM, SOCK_DGRAM])
+            check('inet4', [AF_INET], [SOCK_STREAM, SOCK_DGRAM])
+            check('tcp', [AF_INET, AF_INET6], [SOCK_STREAM])
+            check('tcp4', [AF_INET], [SOCK_STREAM])
+            check('tcp6', [AF_INET6], [SOCK_STREAM])
+            check('udp', [AF_INET, AF_INET6], [SOCK_DGRAM])
+            check('udp4', [AF_INET], [SOCK_DGRAM])
+            check('udp6', [AF_INET6], [SOCK_DGRAM])
+            if HAS_NET_CONNECTIONS_UNIX:
+                check(
+                    'unix',
+                    [AF_UNIX],
+                    [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET],
+                )
+
+    @skip_on_access_denied(only_if=MACOS)
+    def test_combos(self):
+        reap_children()
+
+        def check_conn(proc, conn, family, type, laddr, raddr, status, kinds):
+            all_kinds = (
+                "all",
+                "inet",
+                "inet4",
+                "inet6",
+                "tcp",
+                "tcp4",
+                "tcp6",
+                "udp",
+                "udp4",
+                "udp6",
+            )
+            check_connection_ntuple(conn)
+            assert conn.family == family
+            assert conn.type == type
+            assert conn.laddr == laddr
+            assert conn.raddr == raddr
+            assert conn.status == status
+            for kind in all_kinds:
+                cons = proc.net_connections(kind=kind)
+                if kind in kinds:
+                    assert cons != []
+                else:
+                    assert cons == []
+            # compare against system-wide connections
+            # XXX Solaris can't retrieve system-wide UNIX
+            # sockets.
+            if HAS_NET_CONNECTIONS_UNIX:
+                self.compare_procsys_connections(proc.pid, [conn])
+
+        tcp_template = textwrap.dedent("""
+            import socket, time
+            s = socket.socket({family}, socket.SOCK_STREAM)
+            s.bind(('{addr}', 0))
+            s.listen(5)
+            with open('{testfn}', 'w') as f:
+                f.write(str(s.getsockname()[:2]))
+            [time.sleep(0.1) for x in range(100)]
+            """)
+
+        udp_template = textwrap.dedent("""
+            import socket, time
+            s = socket.socket({family}, socket.SOCK_DGRAM)
+            s.bind(('{addr}', 0))
+            with open('{testfn}', 'w') as f:
+                f.write(str(s.getsockname()[:2]))
+            [time.sleep(0.1) for x in range(100)]
+            """)
+
+        # must be relative on Windows
+        testfile = os.path.basename(self.get_testfn(dir=os.getcwd()))
+        tcp4_template = tcp_template.format(
+            family=int(AF_INET), addr="127.0.0.1", testfn=testfile
+        )
+        udp4_template = udp_template.format(
+            family=int(AF_INET), addr="127.0.0.1", testfn=testfile
+        )
+        tcp6_template = tcp_template.format(
+            family=int(AF_INET6), addr="::1", testfn=testfile
+        )
+        udp6_template = udp_template.format(
+            family=int(AF_INET6), addr="::1", testfn=testfile
+        )
+
+        # launch various subprocess instantiating a socket of various
+        # families and types to enrich psutil results
+        tcp4_proc = self.pyrun(tcp4_template)
+        tcp4_addr = eval(wait_for_file(testfile, delete=True))  # noqa
+        udp4_proc = self.pyrun(udp4_template)
+        udp4_addr = eval(wait_for_file(testfile, delete=True))  # noqa
+        if supports_ipv6():
+            tcp6_proc = self.pyrun(tcp6_template)
+            tcp6_addr = eval(wait_for_file(testfile, delete=True))  # noqa
+            udp6_proc = self.pyrun(udp6_template)
+            udp6_addr = eval(wait_for_file(testfile, delete=True))  # noqa
+        else:
+            tcp6_proc = None
+            udp6_proc = None
+            tcp6_addr = None
+            udp6_addr = None
+
+        for p in psutil.Process().children():
+            cons = p.net_connections()
+            assert len(cons) == 1
+            for conn in cons:
+                # TCP v4
+                if p.pid == tcp4_proc.pid:
+                    check_conn(
+                        p,
+                        conn,
+                        AF_INET,
+                        SOCK_STREAM,
+                        tcp4_addr,
+                        (),
+                        psutil.CONN_LISTEN,
+                        ("all", "inet", "inet4", "tcp", "tcp4"),
+                    )
+                # UDP v4
+                elif p.pid == udp4_proc.pid:
+                    check_conn(
+                        p,
+                        conn,
+                        AF_INET,
+                        SOCK_DGRAM,
+                        udp4_addr,
+                        (),
+                        psutil.CONN_NONE,
+                        ("all", "inet", "inet4", "udp", "udp4"),
+                    )
+                # TCP v6
+                elif p.pid == getattr(tcp6_proc, "pid", None):
+                    check_conn(
+                        p,
+                        conn,
+                        AF_INET6,
+                        SOCK_STREAM,
+                        tcp6_addr,
+                        (),
+                        psutil.CONN_LISTEN,
+                        ("all", "inet", "inet6", "tcp", "tcp6"),
+                    )
+                # UDP v6
+                elif p.pid == getattr(udp6_proc, "pid", None):
+                    check_conn(
+                        p,
+                        conn,
+                        AF_INET6,
+                        SOCK_DGRAM,
+                        udp6_addr,
+                        (),
+                        psutil.CONN_NONE,
+                        ("all", "inet", "inet6", "udp", "udp6"),
+                    )
+
+    def test_count(self):
+        with create_sockets():
+            # tcp
+            cons = this_proc_net_connections(kind='tcp')
+            assert len(cons) == (2 if supports_ipv6() else 1)
+            for conn in cons:
+                assert conn.family in {AF_INET, AF_INET6}
+                assert conn.type == SOCK_STREAM
+            # tcp4
+            cons = this_proc_net_connections(kind='tcp4')
+            assert len(cons) == 1
+            assert cons[0].family == AF_INET
+            assert cons[0].type == SOCK_STREAM
+            # tcp6
+            if supports_ipv6():
+                cons = this_proc_net_connections(kind='tcp6')
+                assert len(cons) == 1
+                assert cons[0].family == AF_INET6
+                assert cons[0].type == SOCK_STREAM
+            # udp
+            cons = this_proc_net_connections(kind='udp')
+            assert len(cons) == (2 if supports_ipv6() else 1)
+            for conn in cons:
+                assert conn.family in {AF_INET, AF_INET6}
+                assert conn.type == SOCK_DGRAM
+            # udp4
+            cons = this_proc_net_connections(kind='udp4')
+            assert len(cons) == 1
+            assert cons[0].family == AF_INET
+            assert cons[0].type == SOCK_DGRAM
+            # udp6
+            if supports_ipv6():
+                cons = this_proc_net_connections(kind='udp6')
+                assert len(cons) == 1
+                assert cons[0].family == AF_INET6
+                assert cons[0].type == SOCK_DGRAM
+            # inet
+            cons = this_proc_net_connections(kind='inet')
+            assert len(cons) == (4 if supports_ipv6() else 2)
+            for conn in cons:
+                assert conn.family in {AF_INET, AF_INET6}
+                assert conn.type in {SOCK_STREAM, SOCK_DGRAM}
+            # inet6
+            if supports_ipv6():
+                cons = this_proc_net_connections(kind='inet6')
+                assert len(cons) == 2
+                for conn in cons:
+                    assert conn.family == AF_INET6
+                    assert conn.type in {SOCK_STREAM, SOCK_DGRAM}
+            # Skipped on BSD becayse by default the Python process
+            # creates a UNIX socket to '/var/run/log'.
+            if HAS_NET_CONNECTIONS_UNIX and not (FREEBSD or NETBSD):
+                cons = this_proc_net_connections(kind='unix')
+                assert len(cons) == 3
+                for conn in cons:
+                    assert conn.family == AF_UNIX
+                    assert conn.type in {SOCK_STREAM, SOCK_DGRAM}
+
+
+@pytest.mark.skipif(SKIP_SYSCONS, reason="requires root")
+class TestSystemWideConnections(ConnectionTestCase):
+    """Tests for net_connections()."""
+
+    def test_it(self):
+        def check(cons, families, types_):
+            for conn in cons:
+                assert conn.family in families
+                if conn.family != AF_UNIX:
+                    assert conn.type in types_
+                check_connection_ntuple(conn)
+
+        with create_sockets():
+            from psutil._common import conn_tmap
+
+            for kind, groups in conn_tmap.items():
+                # XXX: SunOS does not retrieve UNIX sockets.
+                if kind == 'unix' and not HAS_NET_CONNECTIONS_UNIX:
+                    continue
+                families, types_ = groups
+                cons = psutil.net_connections(kind)
+                assert len(cons) == len(set(cons))
+                check(cons, families, types_)
+
+    @retry_on_failure()
+    def test_multi_sockets_procs(self):
+        # Creates multiple sub processes, each creating different
+        # sockets. For each process check that proc.net_connections()
+        # and psutil.net_connections() return the same results.
+        # This is done mainly to check whether net_connections()'s
+        # pid is properly set, see:
+        # https://github.com/giampaolo/psutil/issues/1013
+        with create_sockets() as socks:
+            expected = len(socks)
+        pids = []
+        times = 10
+        fnames = []
+        for _ in range(times):
+            fname = self.get_testfn()
+            fnames.append(fname)
+            src = textwrap.dedent("""\
+                import time, os
+                from psutil.tests import create_sockets
+                with create_sockets():
+                    with open(r'%s', 'w') as f:
+                        f.write("hello")
+                    [time.sleep(0.1) for x in range(100)]
+                """ % fname)
+            sproc = self.pyrun(src)
+            pids.append(sproc.pid)
+
+        # sync
+        for fname in fnames:
+            wait_for_file(fname)
+
+        syscons = [
+            x for x in psutil.net_connections(kind='all') if x.pid in pids
+        ]
+        for pid in pids:
+            assert len([x for x in syscons if x.pid == pid]) == expected
+            p = psutil.Process(pid)
+            assert len(p.net_connections('all')) == expected
+
+
+class TestMisc(PsutilTestCase):
+    def test_net_connection_constants(self):
+        ints = []
+        strs = []
+        for name in dir(psutil):
+            if name.startswith('CONN_'):
+                num = getattr(psutil, name)
+                str_ = str(num)
+                assert str_.isupper(), str_
+                assert str not in strs
+                assert num not in ints
+                ints.append(num)
+                strs.append(str_)
+        if SUNOS:
+            psutil.CONN_IDLE  # noqa
+            psutil.CONN_BOUND  # noqa
+        if WINDOWS:
+            psutil.CONN_DELETE_TCB  # noqa