aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/networkx/utils/tests
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/networkx/utils/tests
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/networkx/utils/tests')
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/utils/tests/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/utils/tests/test__init.py11
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/utils/tests/test_backends.py170
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/utils/tests/test_config.py231
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/utils/tests/test_decorators.py510
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/utils/tests/test_heaps.py131
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/utils/tests/test_mapped_queue.py268
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/utils/tests/test_misc.py268
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/utils/tests/test_random_sequence.py38
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/utils/tests/test_rcm.py63
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/utils/tests/test_unionfind.py55
11 files changed, 1745 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/__init__.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test__init.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test__init.py
new file mode 100644
index 00000000..ecbcce36
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test__init.py
@@ -0,0 +1,11 @@
+import pytest
+
+
+def test_utils_namespace():
+ """Ensure objects are not unintentionally exposed in utils namespace."""
+ with pytest.raises(ImportError):
+ from networkx.utils import nx
+ with pytest.raises(ImportError):
+ from networkx.utils import sys
+ with pytest.raises(ImportError):
+ from networkx.utils import defaultdict, deque
diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_backends.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_backends.py
new file mode 100644
index 00000000..ad006f00
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_backends.py
@@ -0,0 +1,170 @@
+import pickle
+
+import pytest
+
+import networkx as nx
+
+sp = pytest.importorskip("scipy")
+pytest.importorskip("numpy")
+
+
+def test_dispatch_kwds_vs_args():
+ G = nx.path_graph(4)
+ nx.pagerank(G)
+ nx.pagerank(G=G)
+ with pytest.raises(TypeError):
+ nx.pagerank()
+
+
+def test_pickle():
+ count = 0
+ for name, func in nx.utils.backends._registered_algorithms.items():
+ pickled = pickle.dumps(func.__wrapped__)
+ assert pickle.loads(pickled) is func.__wrapped__
+ try:
+ # Some functions can't be pickled, but it's not b/c of _dispatchable
+ pickled = pickle.dumps(func)
+ except pickle.PicklingError:
+ continue
+ assert pickle.loads(pickled) is func
+ count += 1
+ assert count > 0
+ assert pickle.loads(pickle.dumps(nx.inverse_line_graph)) is nx.inverse_line_graph
+
+
+@pytest.mark.skipif(
+ "not nx.config.backend_priority.algos "
+ "or nx.config.backend_priority.algos[0] != 'nx_loopback'"
+)
+def test_graph_converter_needs_backend():
+ # When testing, `nx.from_scipy_sparse_array` will *always* call the backend
+ # implementation if it's implemented. If `backend=` isn't given, then the result
+ # will be converted back to NetworkX via `convert_to_nx`.
+ # If not testing, then calling `nx.from_scipy_sparse_array` w/o `backend=` will
+ # always call the original version. `backend=` is *required* to call the backend.
+ from networkx.classes.tests.dispatch_interface import (
+ LoopbackBackendInterface,
+ LoopbackGraph,
+ )
+
+ A = sp.sparse.coo_array([[0, 3, 2], [3, 0, 1], [2, 1, 0]])
+
+ side_effects = []
+
+ def from_scipy_sparse_array(self, *args, **kwargs):
+ side_effects.append(1) # Just to prove this was called
+ return self.convert_from_nx(
+ self.__getattr__("from_scipy_sparse_array")(*args, **kwargs),
+ preserve_edge_attrs=True,
+ preserve_node_attrs=True,
+ preserve_graph_attrs=True,
+ )
+
+ @staticmethod
+ def convert_to_nx(obj, *, name=None):
+ if type(obj) is nx.Graph:
+ return obj
+ return nx.Graph(obj)
+
+ # *This mutates LoopbackBackendInterface!*
+ orig_convert_to_nx = LoopbackBackendInterface.convert_to_nx
+ LoopbackBackendInterface.convert_to_nx = convert_to_nx
+ LoopbackBackendInterface.from_scipy_sparse_array = from_scipy_sparse_array
+
+ try:
+ assert side_effects == []
+ assert type(nx.from_scipy_sparse_array(A)) is nx.Graph
+ assert side_effects == [1]
+ assert (
+ type(nx.from_scipy_sparse_array(A, backend="nx_loopback")) is LoopbackGraph
+ )
+ assert side_effects == [1, 1]
+ # backend="networkx" is default implementation
+ assert type(nx.from_scipy_sparse_array(A, backend="networkx")) is nx.Graph
+ assert side_effects == [1, 1]
+ finally:
+ LoopbackBackendInterface.convert_to_nx = staticmethod(orig_convert_to_nx)
+ del LoopbackBackendInterface.from_scipy_sparse_array
+ with pytest.raises(ImportError, match="backend is not installed"):
+ nx.from_scipy_sparse_array(A, backend="bad-backend-name")
+
+
+@pytest.mark.skipif(
+ "not nx.config.backend_priority.algos "
+ "or nx.config.backend_priority.algos[0] != 'nx_loopback'"
+)
+def test_networkx_backend():
+ """Test using `backend="networkx"` in a dispatchable function."""
+ # (Implementing this test is harder than it should be)
+ from networkx.classes.tests.dispatch_interface import (
+ LoopbackBackendInterface,
+ LoopbackGraph,
+ )
+
+ G = LoopbackGraph()
+ G.add_edges_from([(0, 1), (1, 2), (1, 3), (2, 4)])
+
+ @staticmethod
+ def convert_to_nx(obj, *, name=None):
+ if isinstance(obj, LoopbackGraph):
+ new_graph = nx.Graph()
+ new_graph.__dict__.update(obj.__dict__)
+ return new_graph
+ return obj
+
+ # *This mutates LoopbackBackendInterface!*
+ # This uses the same trick as in the previous test.
+ orig_convert_to_nx = LoopbackBackendInterface.convert_to_nx
+ LoopbackBackendInterface.convert_to_nx = convert_to_nx
+ try:
+ G2 = nx.ego_graph(G, 0, backend="networkx")
+ assert type(G2) is nx.Graph
+ finally:
+ LoopbackBackendInterface.convert_to_nx = staticmethod(orig_convert_to_nx)
+
+
+def test_dispatchable_are_functions():
+ assert type(nx.pagerank) is type(nx.pagerank.orig_func)
+
+
+@pytest.mark.skipif("not nx.utils.backends.backends")
+def test_mixing_backend_graphs():
+ from networkx.classes.tests import dispatch_interface
+
+ G = nx.Graph()
+ G.add_edge(1, 2)
+ G.add_edge(2, 3)
+ H = nx.Graph()
+ H.add_edge(2, 3)
+ rv = nx.intersection(G, H)
+ assert set(nx.intersection(G, H)) == {2, 3}
+ G2 = dispatch_interface.convert(G)
+ H2 = dispatch_interface.convert(H)
+ if "nx_loopback" in nx.config.backend_priority:
+ # Auto-convert
+ assert set(nx.intersection(G2, H)) == {2, 3}
+ assert set(nx.intersection(G, H2)) == {2, 3}
+ elif not nx.config.backend_priority and "nx_loopback" not in nx.config.backends:
+ # G2 and H2 are backend objects for a backend that is not registered!
+ with pytest.raises(ImportError, match="backend is not installed"):
+ nx.intersection(G2, H)
+ with pytest.raises(ImportError, match="backend is not installed"):
+ nx.intersection(G, H2)
+ # It would be nice to test passing graphs from *different* backends,
+ # but we are not set up to do this yet.
+
+
+def test_bad_backend_name():
+ """Using `backend=` raises with unknown backend even if there are no backends."""
+ with pytest.raises(
+ ImportError, match="'this_backend_does_not_exist' backend is not installed"
+ ):
+ nx.null_graph(backend="this_backend_does_not_exist")
+
+
+def test_fallback_to_nx():
+ with pytest.warns(DeprecationWarning, match="_fallback_to_nx"):
+ # Check as class property
+ assert nx._dispatchable._fallback_to_nx == nx.config.fallback_to_nx
+ # Check as instance property
+ assert nx.pagerank.__wrapped__._fallback_to_nx == nx.config.fallback_to_nx
diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_config.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_config.py
new file mode 100644
index 00000000..7416b0ac
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_config.py
@@ -0,0 +1,231 @@
+import collections
+import pickle
+
+import pytest
+
+import networkx as nx
+from networkx.utils.configs import BackendPriorities, Config
+
+
+# Define this at module level so we can test pickling
+class ExampleConfig(Config):
+ """Example configuration."""
+
+ x: int
+ y: str
+
+ def _on_setattr(self, key, value):
+ if key == "x" and value <= 0:
+ raise ValueError("x must be positive")
+ if key == "y" and not isinstance(value, str):
+ raise TypeError("y must be a str")
+ return value
+
+
+class EmptyConfig(Config):
+ pass
+
+
+@pytest.mark.parametrize("cfg", [EmptyConfig(), Config()])
+def test_config_empty(cfg):
+ assert dir(cfg) == []
+ with pytest.raises(AttributeError):
+ cfg.x = 1
+ with pytest.raises(KeyError):
+ cfg["x"] = 1
+ with pytest.raises(AttributeError):
+ cfg.x
+ with pytest.raises(KeyError):
+ cfg["x"]
+ assert len(cfg) == 0
+ assert "x" not in cfg
+ assert cfg == cfg
+ assert cfg.get("x", 2) == 2
+ assert set(cfg.keys()) == set()
+ assert set(cfg.values()) == set()
+ assert set(cfg.items()) == set()
+ cfg2 = pickle.loads(pickle.dumps(cfg))
+ assert cfg == cfg2
+ assert isinstance(cfg, collections.abc.Collection)
+ assert isinstance(cfg, collections.abc.Mapping)
+
+
+def test_config_subclass():
+ with pytest.raises(TypeError, match="missing 2 required keyword-only"):
+ ExampleConfig()
+ with pytest.raises(ValueError, match="x must be positive"):
+ ExampleConfig(x=0, y="foo")
+ with pytest.raises(TypeError, match="unexpected keyword"):
+ ExampleConfig(x=1, y="foo", z="bad config")
+ with pytest.raises(TypeError, match="unexpected keyword"):
+ EmptyConfig(z="bad config")
+ cfg = ExampleConfig(x=1, y="foo")
+ assert cfg.x == 1
+ assert cfg["x"] == 1
+ assert cfg["y"] == "foo"
+ assert cfg.y == "foo"
+ assert "x" in cfg
+ assert "y" in cfg
+ assert "z" not in cfg
+ assert len(cfg) == 2
+ assert set(iter(cfg)) == {"x", "y"}
+ assert set(cfg.keys()) == {"x", "y"}
+ assert set(cfg.values()) == {1, "foo"}
+ assert set(cfg.items()) == {("x", 1), ("y", "foo")}
+ assert dir(cfg) == ["x", "y"]
+ cfg.x = 2
+ cfg["y"] = "bar"
+ assert cfg["x"] == 2
+ assert cfg.y == "bar"
+ with pytest.raises(TypeError, match="can't be deleted"):
+ del cfg.x
+ with pytest.raises(TypeError, match="can't be deleted"):
+ del cfg["y"]
+ assert cfg.x == 2
+ assert cfg == cfg
+ assert cfg == ExampleConfig(x=2, y="bar")
+ assert cfg != ExampleConfig(x=3, y="baz")
+ assert cfg != Config(x=2, y="bar")
+ with pytest.raises(TypeError, match="y must be a str"):
+ cfg["y"] = 5
+ with pytest.raises(ValueError, match="x must be positive"):
+ cfg.x = -5
+ assert cfg.get("x", 10) == 2
+ with pytest.raises(AttributeError):
+ cfg.z = 5
+ with pytest.raises(KeyError):
+ cfg["z"] = 5
+ with pytest.raises(AttributeError):
+ cfg.z
+ with pytest.raises(KeyError):
+ cfg["z"]
+ cfg2 = pickle.loads(pickle.dumps(cfg))
+ assert cfg == cfg2
+ assert cfg.__doc__ == "Example configuration."
+ assert cfg2.__doc__ == "Example configuration."
+
+
+def test_config_defaults():
+ class DefaultConfig(Config):
+ x: int = 0
+ y: int
+
+ cfg = DefaultConfig(y=1)
+ assert cfg.x == 0
+ cfg = DefaultConfig(x=2, y=1)
+ assert cfg.x == 2
+
+
+def test_nxconfig():
+ assert isinstance(nx.config.backend_priority, BackendPriorities)
+ assert isinstance(nx.config.backend_priority.algos, list)
+ assert isinstance(nx.config.backends, Config)
+ with pytest.raises(TypeError, match="must be a list of backend names"):
+ nx.config.backend_priority.algos = "nx_loopback"
+ with pytest.raises(ValueError, match="Unknown backend when setting"):
+ nx.config.backend_priority.algos = ["this_almost_certainly_is_not_a_backend"]
+ with pytest.raises(TypeError, match="must be a Config of backend configs"):
+ nx.config.backends = {}
+ with pytest.raises(TypeError, match="must be a Config of backend configs"):
+ nx.config.backends = Config(plausible_backend_name={})
+ with pytest.raises(ValueError, match="Unknown backend when setting"):
+ nx.config.backends = Config(this_almost_certainly_is_not_a_backend=Config())
+ with pytest.raises(TypeError, match="must be True or False"):
+ nx.config.cache_converted_graphs = "bad value"
+ with pytest.raises(TypeError, match="must be a set of "):
+ nx.config.warnings_to_ignore = 7
+ with pytest.raises(ValueError, match="Unknown warning "):
+ nx.config.warnings_to_ignore = {"bad value"}
+
+
+def test_not_strict():
+ class FlexibleConfig(Config, strict=False):
+ x: int
+
+ cfg = FlexibleConfig(x=1)
+ assert "_strict" not in cfg
+ assert len(cfg) == 1
+ assert list(cfg) == ["x"]
+ assert list(cfg.keys()) == ["x"]
+ assert list(cfg.values()) == [1]
+ assert list(cfg.items()) == [("x", 1)]
+ assert cfg.x == 1
+ assert cfg["x"] == 1
+ assert "x" in cfg
+ assert hasattr(cfg, "x")
+ assert "FlexibleConfig(x=1)" in repr(cfg)
+ assert cfg == FlexibleConfig(x=1)
+ del cfg.x
+ assert "FlexibleConfig()" in repr(cfg)
+ assert len(cfg) == 0
+ assert not hasattr(cfg, "x")
+ assert "x" not in cfg
+ assert not hasattr(cfg, "y")
+ assert "y" not in cfg
+ cfg.y = 2
+ assert len(cfg) == 1
+ assert list(cfg) == ["y"]
+ assert list(cfg.keys()) == ["y"]
+ assert list(cfg.values()) == [2]
+ assert list(cfg.items()) == [("y", 2)]
+ assert cfg.y == 2
+ assert cfg["y"] == 2
+ assert hasattr(cfg, "y")
+ assert "y" in cfg
+ del cfg["y"]
+ assert len(cfg) == 0
+ assert list(cfg) == []
+ with pytest.raises(AttributeError, match="y"):
+ del cfg.y
+ with pytest.raises(KeyError, match="y"):
+ del cfg["y"]
+ with pytest.raises(TypeError, match="missing 1 required keyword-only"):
+ FlexibleConfig()
+ # Be strict when first creating the config object
+ with pytest.raises(TypeError, match="unexpected keyword argument 'y'"):
+ FlexibleConfig(x=1, y=2)
+
+ class FlexibleConfigWithDefault(Config, strict=False):
+ x: int = 0
+
+ assert FlexibleConfigWithDefault().x == 0
+ assert FlexibleConfigWithDefault(x=1)["x"] == 1
+
+
+def test_context():
+ cfg = Config(x=1)
+ with cfg(x=2) as c:
+ assert c.x == 2
+ c.x = 3
+ assert cfg.x == 3
+ assert cfg.x == 1
+
+ with cfg(x=2) as c:
+ assert c == cfg
+ assert cfg.x == 2
+ with cfg(x=3) as c2:
+ assert c2 == cfg
+ assert cfg.x == 3
+ with pytest.raises(RuntimeError, match="context manager without"):
+ with cfg as c3: # Forgot to call `cfg(...)`
+ pass
+ assert cfg.x == 3
+ assert cfg.x == 2
+ assert cfg.x == 1
+
+ c = cfg(x=4) # Not yet as context (not recommended, but possible)
+ assert c == cfg
+ assert cfg.x == 4
+ # Cheat by looking at internal data; context stack should only grow with __enter__
+ assert cfg._prev is not None
+ assert cfg._context_stack == []
+ with c:
+ assert c == cfg
+ assert cfg.x == 4
+ assert cfg.x == 1
+ # Cheat again; there was no preceding `cfg(...)` call this time
+ assert cfg._prev is None
+ with pytest.raises(RuntimeError, match="context manager without"):
+ with cfg:
+ pass
+ assert cfg.x == 1
diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_decorators.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_decorators.py
new file mode 100644
index 00000000..0a4aeabf
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_decorators.py
@@ -0,0 +1,510 @@
+import os
+import pathlib
+import random
+import tempfile
+
+import pytest
+
+import networkx as nx
+from networkx.utils.decorators import (
+ argmap,
+ not_implemented_for,
+ np_random_state,
+ open_file,
+ py_random_state,
+)
+from networkx.utils.misc import PythonRandomInterface, PythonRandomViaNumpyBits
+
+
+def test_not_implemented_decorator():
+ @not_implemented_for("directed")
+ def test_d(G):
+ pass
+
+ test_d(nx.Graph())
+ with pytest.raises(nx.NetworkXNotImplemented):
+ test_d(nx.DiGraph())
+
+ @not_implemented_for("undirected")
+ def test_u(G):
+ pass
+
+ test_u(nx.DiGraph())
+ with pytest.raises(nx.NetworkXNotImplemented):
+ test_u(nx.Graph())
+
+ @not_implemented_for("multigraph")
+ def test_m(G):
+ pass
+
+ test_m(nx.Graph())
+ with pytest.raises(nx.NetworkXNotImplemented):
+ test_m(nx.MultiGraph())
+
+ @not_implemented_for("graph")
+ def test_g(G):
+ pass
+
+ test_g(nx.MultiGraph())
+ with pytest.raises(nx.NetworkXNotImplemented):
+ test_g(nx.Graph())
+
+ # not MultiDiGraph (multiple arguments => AND)
+ @not_implemented_for("directed", "multigraph")
+ def test_not_md(G):
+ pass
+
+ test_not_md(nx.Graph())
+ test_not_md(nx.DiGraph())
+ test_not_md(nx.MultiGraph())
+ with pytest.raises(nx.NetworkXNotImplemented):
+ test_not_md(nx.MultiDiGraph())
+
+ # Graph only (multiple decorators => OR)
+ @not_implemented_for("directed")
+ @not_implemented_for("multigraph")
+ def test_graph_only(G):
+ pass
+
+ test_graph_only(nx.Graph())
+ with pytest.raises(nx.NetworkXNotImplemented):
+ test_graph_only(nx.DiGraph())
+ with pytest.raises(nx.NetworkXNotImplemented):
+ test_graph_only(nx.MultiGraph())
+ with pytest.raises(nx.NetworkXNotImplemented):
+ test_graph_only(nx.MultiDiGraph())
+
+ with pytest.raises(ValueError):
+ not_implemented_for("directed", "undirected")
+
+ with pytest.raises(ValueError):
+ not_implemented_for("multigraph", "graph")
+
+
+def test_not_implemented_decorator_key():
+ with pytest.raises(KeyError):
+
+ @not_implemented_for("foo")
+ def test1(G):
+ pass
+
+ test1(nx.Graph())
+
+
+def test_not_implemented_decorator_raise():
+ with pytest.raises(nx.NetworkXNotImplemented):
+
+ @not_implemented_for("graph")
+ def test1(G):
+ pass
+
+ test1(nx.Graph())
+
+
+class TestOpenFileDecorator:
+ def setup_method(self):
+ self.text = ["Blah... ", "BLAH ", "BLAH!!!!"]
+ self.fobj = tempfile.NamedTemporaryFile("wb+", delete=False)
+ self.name = self.fobj.name
+
+ def teardown_method(self):
+ self.fobj.close()
+ os.unlink(self.name)
+
+ def write(self, path):
+ for text in self.text:
+ path.write(text.encode("ascii"))
+
+ @open_file(1, "r")
+ def read(self, path):
+ return path.readlines()[0]
+
+ @staticmethod
+ @open_file(0, "wb")
+ def writer_arg0(path):
+ path.write(b"demo")
+
+ @open_file(1, "wb+")
+ def writer_arg1(self, path):
+ self.write(path)
+
+ @open_file(2, "wb")
+ def writer_arg2default(self, x, path=None):
+ if path is None:
+ with tempfile.NamedTemporaryFile("wb+") as fh:
+ self.write(fh)
+ else:
+ self.write(path)
+
+ @open_file(4, "wb")
+ def writer_arg4default(self, x, y, other="hello", path=None, **kwargs):
+ if path is None:
+ with tempfile.NamedTemporaryFile("wb+") as fh:
+ self.write(fh)
+ else:
+ self.write(path)
+
+ @open_file("path", "wb")
+ def writer_kwarg(self, **kwargs):
+ path = kwargs.get("path", None)
+ if path is None:
+ with tempfile.NamedTemporaryFile("wb+") as fh:
+ self.write(fh)
+ else:
+ self.write(path)
+
+ def test_writer_arg0_str(self):
+ self.writer_arg0(self.name)
+
+ def test_writer_arg0_fobj(self):
+ self.writer_arg0(self.fobj)
+
+ def test_writer_arg0_pathlib(self):
+ self.writer_arg0(pathlib.Path(self.name))
+
+ def test_writer_arg1_str(self):
+ self.writer_arg1(self.name)
+ assert self.read(self.name) == "".join(self.text)
+
+ def test_writer_arg1_fobj(self):
+ self.writer_arg1(self.fobj)
+ assert not self.fobj.closed
+ self.fobj.close()
+ assert self.read(self.name) == "".join(self.text)
+
+ def test_writer_arg2default_str(self):
+ self.writer_arg2default(0, path=None)
+ self.writer_arg2default(0, path=self.name)
+ assert self.read(self.name) == "".join(self.text)
+
+ def test_writer_arg2default_fobj(self):
+ self.writer_arg2default(0, path=self.fobj)
+ assert not self.fobj.closed
+ self.fobj.close()
+ assert self.read(self.name) == "".join(self.text)
+
+ def test_writer_arg2default_fobj_path_none(self):
+ self.writer_arg2default(0, path=None)
+
+ def test_writer_arg4default_fobj(self):
+ self.writer_arg4default(0, 1, dog="dog", other="other")
+ self.writer_arg4default(0, 1, dog="dog", other="other", path=self.name)
+ assert self.read(self.name) == "".join(self.text)
+
+ def test_writer_kwarg_str(self):
+ self.writer_kwarg(path=self.name)
+ assert self.read(self.name) == "".join(self.text)
+
+ def test_writer_kwarg_fobj(self):
+ self.writer_kwarg(path=self.fobj)
+ self.fobj.close()
+ assert self.read(self.name) == "".join(self.text)
+
+ def test_writer_kwarg_path_none(self):
+ self.writer_kwarg(path=None)
+
+
+class TestRandomState:
+ @classmethod
+ def setup_class(cls):
+ global np
+ np = pytest.importorskip("numpy")
+
+ @np_random_state(1)
+ def instantiate_np_random_state(self, random_state):
+ allowed = (np.random.RandomState, np.random.Generator)
+ assert isinstance(random_state, allowed)
+ return random_state.random()
+
+ @py_random_state(1)
+ def instantiate_py_random_state(self, random_state):
+ allowed = (random.Random, PythonRandomInterface, PythonRandomViaNumpyBits)
+ assert isinstance(random_state, allowed)
+ return random_state.random()
+
+ def test_random_state_None(self):
+ np.random.seed(42)
+ rv = np.random.random()
+ np.random.seed(42)
+ assert rv == self.instantiate_np_random_state(None)
+
+ random.seed(42)
+ rv = random.random()
+ random.seed(42)
+ assert rv == self.instantiate_py_random_state(None)
+
+ def test_random_state_np_random(self):
+ np.random.seed(42)
+ rv = np.random.random()
+ np.random.seed(42)
+ assert rv == self.instantiate_np_random_state(np.random)
+ np.random.seed(42)
+ assert rv == self.instantiate_py_random_state(np.random)
+
+ def test_random_state_int(self):
+ np.random.seed(42)
+ np_rv = np.random.random()
+ random.seed(42)
+ py_rv = random.random()
+
+ np.random.seed(42)
+ seed = 1
+ rval = self.instantiate_np_random_state(seed)
+ rval_expected = np.random.RandomState(seed).rand()
+ assert rval == rval_expected
+ # test that global seed wasn't changed in function
+ assert np_rv == np.random.random()
+
+ random.seed(42)
+ rval = self.instantiate_py_random_state(seed)
+ rval_expected = random.Random(seed).random()
+ assert rval == rval_expected
+ # test that global seed wasn't changed in function
+ assert py_rv == random.random()
+
+ def test_random_state_np_random_Generator(self):
+ np.random.seed(42)
+ np_rv = np.random.random()
+ np.random.seed(42)
+ seed = 1
+
+ rng = np.random.default_rng(seed)
+ rval = self.instantiate_np_random_state(rng)
+ rval_expected = np.random.default_rng(seed).random()
+ assert rval == rval_expected
+
+ rval = self.instantiate_py_random_state(rng)
+ rval_expected = np.random.default_rng(seed).random(size=2)[1]
+ assert rval == rval_expected
+ # test that global seed wasn't changed in function
+ assert np_rv == np.random.random()
+
+ def test_random_state_np_random_RandomState(self):
+ np.random.seed(42)
+ np_rv = np.random.random()
+ np.random.seed(42)
+ seed = 1
+
+ rng = np.random.RandomState(seed)
+ rval = self.instantiate_np_random_state(rng)
+ rval_expected = np.random.RandomState(seed).random()
+ assert rval == rval_expected
+
+ rval = self.instantiate_py_random_state(rng)
+ rval_expected = np.random.RandomState(seed).random(size=2)[1]
+ assert rval == rval_expected
+ # test that global seed wasn't changed in function
+ assert np_rv == np.random.random()
+
+ def test_random_state_py_random(self):
+ seed = 1
+ rng = random.Random(seed)
+ rv = self.instantiate_py_random_state(rng)
+ assert rv == random.Random(seed).random()
+
+ pytest.raises(ValueError, self.instantiate_np_random_state, rng)
+
+
+def test_random_state_string_arg_index():
+ with pytest.raises(nx.NetworkXError):
+
+ @np_random_state("a")
+ def make_random_state(rs):
+ pass
+
+ rstate = make_random_state(1)
+
+
+def test_py_random_state_string_arg_index():
+ with pytest.raises(nx.NetworkXError):
+
+ @py_random_state("a")
+ def make_random_state(rs):
+ pass
+
+ rstate = make_random_state(1)
+
+
+def test_random_state_invalid_arg_index():
+ with pytest.raises(nx.NetworkXError):
+
+ @np_random_state(2)
+ def make_random_state(rs):
+ pass
+
+ rstate = make_random_state(1)
+
+
+def test_py_random_state_invalid_arg_index():
+ with pytest.raises(nx.NetworkXError):
+
+ @py_random_state(2)
+ def make_random_state(rs):
+ pass
+
+ rstate = make_random_state(1)
+
+
+class TestArgmap:
+ class ArgmapError(RuntimeError):
+ pass
+
+ def test_trivial_function(self):
+ def do_not_call(x):
+ raise ArgmapError("do not call this function")
+
+ @argmap(do_not_call)
+ def trivial_argmap():
+ return 1
+
+ assert trivial_argmap() == 1
+
+ def test_trivial_iterator(self):
+ def do_not_call(x):
+ raise ArgmapError("do not call this function")
+
+ @argmap(do_not_call)
+ def trivial_argmap():
+ yield from (1, 2, 3)
+
+ assert tuple(trivial_argmap()) == (1, 2, 3)
+
+ def test_contextmanager(self):
+ container = []
+
+ def contextmanager(x):
+ nonlocal container
+ return x, lambda: container.append(x)
+
+ @argmap(contextmanager, 0, 1, 2, try_finally=True)
+ def foo(x, y, z):
+ return x, y, z
+
+ x, y, z = foo("a", "b", "c")
+
+ # context exits are called in reverse
+ assert container == ["c", "b", "a"]
+
+ def test_tryfinally_generator(self):
+ container = []
+
+ def singleton(x):
+ return (x,)
+
+ with pytest.raises(nx.NetworkXError):
+
+ @argmap(singleton, 0, 1, 2, try_finally=True)
+ def foo(x, y, z):
+ yield from (x, y, z)
+
+ @argmap(singleton, 0, 1, 2)
+ def foo(x, y, z):
+ return x + y + z
+
+ q = foo("a", "b", "c")
+
+ assert q == ("a", "b", "c")
+
+ def test_actual_vararg(self):
+ @argmap(lambda x: -x, 4)
+ def foo(x, y, *args):
+ return (x, y) + tuple(args)
+
+ assert foo(1, 2, 3, 4, 5, 6) == (1, 2, 3, 4, -5, 6)
+
+ def test_signature_destroying_intermediate_decorator(self):
+ def add_one_to_first_bad_decorator(f):
+ """Bad because it doesn't wrap the f signature (clobbers it)"""
+
+ def decorated(a, *args, **kwargs):
+ return f(a + 1, *args, **kwargs)
+
+ return decorated
+
+ add_two_to_second = argmap(lambda b: b + 2, 1)
+
+ @add_two_to_second
+ @add_one_to_first_bad_decorator
+ def add_one_and_two(a, b):
+ return a, b
+
+ assert add_one_and_two(5, 5) == (6, 7)
+
+ def test_actual_kwarg(self):
+ @argmap(lambda x: -x, "arg")
+ def foo(*, arg):
+ return arg
+
+ assert foo(arg=3) == -3
+
+ def test_nested_tuple(self):
+ def xform(x, y):
+ u, v = y
+ return x + u + v, (x + u, x + v)
+
+ # we're testing args and kwargs here, too
+ @argmap(xform, (0, ("t", 2)))
+ def foo(a, *args, **kwargs):
+ return a, args, kwargs
+
+ a, args, kwargs = foo(1, 2, 3, t=4)
+
+ assert a == 1 + 4 + 3
+ assert args == (2, 1 + 3)
+ assert kwargs == {"t": 1 + 4}
+
+ def test_flatten(self):
+ assert tuple(argmap._flatten([[[[[], []], [], []], [], [], []]], set())) == ()
+
+ rlist = ["a", ["b", "c"], [["d"], "e"], "f"]
+ assert "".join(argmap._flatten(rlist, set())) == "abcdef"
+
+ def test_indent(self):
+ code = "\n".join(
+ argmap._indent(
+ *[
+ "try:",
+ "try:",
+ "pass#",
+ "finally:",
+ "pass#",
+ "#",
+ "finally:",
+ "pass#",
+ ]
+ )
+ )
+ assert (
+ code
+ == """try:
+ try:
+ pass#
+ finally:
+ pass#
+ #
+finally:
+ pass#"""
+ )
+
+ def test_immediate_raise(self):
+ @not_implemented_for("directed")
+ def yield_nodes(G):
+ yield from G
+
+ G = nx.Graph([(1, 2)])
+ D = nx.DiGraph()
+
+ # test first call (argmap is compiled and executed)
+ with pytest.raises(nx.NetworkXNotImplemented):
+ node_iter = yield_nodes(D)
+
+ # test second call (argmap is only executed)
+ with pytest.raises(nx.NetworkXNotImplemented):
+ node_iter = yield_nodes(D)
+
+ # ensure that generators still make generators
+ node_iter = yield_nodes(G)
+ next(node_iter)
+ next(node_iter)
+ with pytest.raises(StopIteration):
+ next(node_iter)
diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_heaps.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_heaps.py
new file mode 100644
index 00000000..5ea38716
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_heaps.py
@@ -0,0 +1,131 @@
+import pytest
+
+import networkx as nx
+from networkx.utils import BinaryHeap, PairingHeap
+
+
+class X:
+ def __eq__(self, other):
+ raise self is other
+
+ def __ne__(self, other):
+ raise self is not other
+
+ def __lt__(self, other):
+ raise TypeError("cannot compare")
+
+ def __le__(self, other):
+ raise TypeError("cannot compare")
+
+ def __ge__(self, other):
+ raise TypeError("cannot compare")
+
+ def __gt__(self, other):
+ raise TypeError("cannot compare")
+
+ def __hash__(self):
+ return hash(id(self))
+
+
+x = X()
+
+
+data = [ # min should not invent an element.
+ ("min", nx.NetworkXError),
+ # Popping an empty heap should fail.
+ ("pop", nx.NetworkXError),
+ # Getting nonexisting elements should return None.
+ ("get", 0, None),
+ ("get", x, None),
+ ("get", None, None),
+ # Inserting a new key should succeed.
+ ("insert", x, 1, True),
+ ("get", x, 1),
+ ("min", (x, 1)),
+ # min should not pop the top element.
+ ("min", (x, 1)),
+ # Inserting a new key of different type should succeed.
+ ("insert", 1, -2.0, True),
+ # int and float values should interop.
+ ("min", (1, -2.0)),
+ # pop removes minimum-valued element.
+ ("insert", 3, -(10**100), True),
+ ("insert", 4, 5, True),
+ ("pop", (3, -(10**100))),
+ ("pop", (1, -2.0)),
+ # Decrease-insert should succeed.
+ ("insert", 4, -50, True),
+ ("insert", 4, -60, False, True),
+ # Decrease-insert should not create duplicate keys.
+ ("pop", (4, -60)),
+ ("pop", (x, 1)),
+ # Popping all elements should empty the heap.
+ ("min", nx.NetworkXError),
+ ("pop", nx.NetworkXError),
+ # Non-value-changing insert should fail.
+ ("insert", x, 0, True),
+ ("insert", x, 0, False, False),
+ ("min", (x, 0)),
+ ("insert", x, 0, True, False),
+ ("min", (x, 0)),
+ # Failed insert should not create duplicate keys.
+ ("pop", (x, 0)),
+ ("pop", nx.NetworkXError),
+ # Increase-insert should succeed when allowed.
+ ("insert", None, 0, True),
+ ("insert", 2, -1, True),
+ ("min", (2, -1)),
+ ("insert", 2, 1, True, False),
+ ("min", (None, 0)),
+ # Increase-insert should fail when disallowed.
+ ("insert", None, 2, False, False),
+ ("min", (None, 0)),
+ # Failed increase-insert should not create duplicate keys.
+ ("pop", (None, 0)),
+ ("pop", (2, 1)),
+ ("min", nx.NetworkXError),
+ ("pop", nx.NetworkXError),
+]
+
+
+def _test_heap_class(cls, *args, **kwargs):
+ heap = cls(*args, **kwargs)
+ # Basic behavioral test
+ for op in data:
+ if op[-1] is not nx.NetworkXError:
+ assert op[-1] == getattr(heap, op[0])(*op[1:-1])
+ else:
+ pytest.raises(op[-1], getattr(heap, op[0]), *op[1:-1])
+ # Coverage test.
+ for i in range(99, -1, -1):
+ assert heap.insert(i, i)
+ for i in range(50):
+ assert heap.pop() == (i, i)
+ for i in range(100):
+ assert heap.insert(i, i) == (i < 50)
+ for i in range(100):
+ assert not heap.insert(i, i + 1)
+ for i in range(50):
+ assert heap.pop() == (i, i)
+ for i in range(100):
+ assert heap.insert(i, i + 1) == (i < 50)
+ for i in range(49):
+ assert heap.pop() == (i, i + 1)
+ assert sorted([heap.pop(), heap.pop()]) == [(49, 50), (50, 50)]
+ for i in range(51, 100):
+ assert not heap.insert(i, i + 1, True)
+ for i in range(51, 70):
+ assert heap.pop() == (i, i + 1)
+ for i in range(100):
+ assert heap.insert(i, i)
+ for i in range(100):
+ assert heap.pop() == (i, i)
+ pytest.raises(nx.NetworkXError, heap.pop)
+
+
+def test_PairingHeap():
+ _test_heap_class(PairingHeap)
+
+
+def test_BinaryHeap():
+ _test_heap_class(BinaryHeap)
diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_mapped_queue.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_mapped_queue.py
new file mode 100644
index 00000000..ca9b7e42
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_mapped_queue.py
@@ -0,0 +1,268 @@
+import pytest
+
+from networkx.utils.mapped_queue import MappedQueue, _HeapElement
+
+
+def test_HeapElement_gtlt():
+ bar = _HeapElement(1.1, "a")
+ foo = _HeapElement(1, "b")
+ assert foo < bar
+ assert bar > foo
+ assert foo < 1.1
+ assert 1 < bar
+
+
+def test_HeapElement_gtlt_tied_priority():
+ bar = _HeapElement(1, "a")
+ foo = _HeapElement(1, "b")
+ assert foo > bar
+ assert bar < foo
+
+
+def test_HeapElement_eq():
+ bar = _HeapElement(1.1, "a")
+ foo = _HeapElement(1, "a")
+ assert foo == bar
+ assert bar == foo
+ assert foo == "a"
+
+
+def test_HeapElement_iter():
+ foo = _HeapElement(1, "a")
+ bar = _HeapElement(1.1, (3, 2, 1))
+ assert list(foo) == [1, "a"]
+ assert list(bar) == [1.1, 3, 2, 1]
+
+
+def test_HeapElement_getitem():
+ foo = _HeapElement(1, "a")
+ bar = _HeapElement(1.1, (3, 2, 1))
+ assert foo[1] == "a"
+ assert foo[0] == 1
+ assert bar[0] == 1.1
+ assert bar[2] == 2
+ assert bar[3] == 1
+ pytest.raises(IndexError, bar.__getitem__, 4)
+ pytest.raises(IndexError, foo.__getitem__, 2)
+
+
+class TestMappedQueue:
+ def setup_method(self):
+ pass
+
+ def _check_map(self, q):
+ assert q.position == {elt: pos for pos, elt in enumerate(q.heap)}
+
+ def _make_mapped_queue(self, h):
+ q = MappedQueue()
+ q.heap = h
+ q.position = {elt: pos for pos, elt in enumerate(h)}
+ return q
+
+ def test_heapify(self):
+ h = [5, 4, 3, 2, 1, 0]
+ q = self._make_mapped_queue(h)
+ q._heapify()
+ self._check_map(q)
+
+ def test_init(self):
+ h = [5, 4, 3, 2, 1, 0]
+ q = MappedQueue(h)
+ self._check_map(q)
+
+ def test_incomparable(self):
+ h = [5, 4, "a", 2, 1, 0]
+ pytest.raises(TypeError, MappedQueue, h)
+
+ def test_len(self):
+ h = [5, 4, 3, 2, 1, 0]
+ q = MappedQueue(h)
+ self._check_map(q)
+ assert len(q) == 6
+
+ def test_siftup_leaf(self):
+ h = [2]
+ h_sifted = [2]
+ q = self._make_mapped_queue(h)
+ q._siftup(0)
+ assert q.heap == h_sifted
+ self._check_map(q)
+
+ def test_siftup_one_child(self):
+ h = [2, 0]
+ h_sifted = [0, 2]
+ q = self._make_mapped_queue(h)
+ q._siftup(0)
+ assert q.heap == h_sifted
+ self._check_map(q)
+
+ def test_siftup_left_child(self):
+ h = [2, 0, 1]
+ h_sifted = [0, 2, 1]
+ q = self._make_mapped_queue(h)
+ q._siftup(0)
+ assert q.heap == h_sifted
+ self._check_map(q)
+
+ def test_siftup_right_child(self):
+ h = [2, 1, 0]
+ h_sifted = [0, 1, 2]
+ q = self._make_mapped_queue(h)
+ q._siftup(0)
+ assert q.heap == h_sifted
+ self._check_map(q)
+
+ def test_siftup_multiple(self):
+ h = [0, 1, 2, 4, 3, 5, 6]
+ h_sifted = [0, 1, 2, 4, 3, 5, 6]
+ q = self._make_mapped_queue(h)
+ q._siftup(0)
+ assert q.heap == h_sifted
+ self._check_map(q)
+
+ def test_siftdown_leaf(self):
+ h = [2]
+ h_sifted = [2]
+ q = self._make_mapped_queue(h)
+ q._siftdown(0, 0)
+ assert q.heap == h_sifted
+ self._check_map(q)
+
+ def test_siftdown_single(self):
+ h = [1, 0]
+ h_sifted = [0, 1]
+ q = self._make_mapped_queue(h)
+ q._siftdown(0, len(h) - 1)
+ assert q.heap == h_sifted
+ self._check_map(q)
+
+ def test_siftdown_multiple(self):
+ h = [1, 2, 3, 4, 5, 6, 7, 0]
+ h_sifted = [0, 1, 3, 2, 5, 6, 7, 4]
+ q = self._make_mapped_queue(h)
+ q._siftdown(0, len(h) - 1)
+ assert q.heap == h_sifted
+ self._check_map(q)
+
+ def test_push(self):
+ to_push = [6, 1, 4, 3, 2, 5, 0]
+ h_sifted = [0, 2, 1, 6, 3, 5, 4]
+ q = MappedQueue()
+ for elt in to_push:
+ q.push(elt)
+ assert q.heap == h_sifted
+ self._check_map(q)
+
+ def test_push_duplicate(self):
+ to_push = [2, 1, 0]
+ h_sifted = [0, 2, 1]
+ q = MappedQueue()
+ for elt in to_push:
+ inserted = q.push(elt)
+ assert inserted
+ assert q.heap == h_sifted
+ self._check_map(q)
+ inserted = q.push(1)
+ assert not inserted
+
+ def test_pop(self):
+ h = [3, 4, 6, 0, 1, 2, 5]
+ h_sorted = sorted(h)
+ q = self._make_mapped_queue(h)
+ q._heapify()
+ popped = [q.pop() for _ in range(len(h))]
+ assert popped == h_sorted
+ self._check_map(q)
+
+ def test_remove_leaf(self):
+ h = [0, 2, 1, 6, 3, 5, 4]
+ h_removed = [0, 2, 1, 6, 4, 5]
+ q = self._make_mapped_queue(h)
+ removed = q.remove(3)
+ assert q.heap == h_removed
+
+ def test_remove_root(self):
+ h = [0, 2, 1, 6, 3, 5, 4]
+ h_removed = [1, 2, 4, 6, 3, 5]
+ q = self._make_mapped_queue(h)
+ removed = q.remove(0)
+ assert q.heap == h_removed
+
+ def test_update_leaf(self):
+ h = [0, 20, 10, 60, 30, 50, 40]
+ h_updated = [0, 15, 10, 60, 20, 50, 40]
+ q = self._make_mapped_queue(h)
+ removed = q.update(30, 15)
+ assert q.heap == h_updated
+
+ def test_update_root(self):
+ h = [0, 20, 10, 60, 30, 50, 40]
+ h_updated = [10, 20, 35, 60, 30, 50, 40]
+ q = self._make_mapped_queue(h)
+ removed = q.update(0, 35)
+ assert q.heap == h_updated
+
+
+class TestMappedDict(TestMappedQueue):
+ def _make_mapped_queue(self, h):
+ priority_dict = {elt: elt for elt in h}
+ return MappedQueue(priority_dict)
+
+ def test_init(self):
+ d = {5: 0, 4: 1, "a": 2, 2: 3, 1: 4}
+ q = MappedQueue(d)
+ assert q.position == d
+
+ def test_ties(self):
+ d = {5: 0, 4: 1, 3: 2, 2: 3, 1: 4}
+ q = MappedQueue(d)
+ assert q.position == {elt: pos for pos, elt in enumerate(q.heap)}
+
+ def test_pop(self):
+ d = {5: 0, 4: 1, 3: 2, 2: 3, 1: 4}
+ q = MappedQueue(d)
+ assert q.pop() == _HeapElement(0, 5)
+ assert q.position == {elt: pos for pos, elt in enumerate(q.heap)}
+
+ def test_empty_pop(self):
+ q = MappedQueue()
+ pytest.raises(IndexError, q.pop)
+
+ def test_incomparable_ties(self):
+ d = {5: 0, 4: 0, "a": 0, 2: 0, 1: 0}
+ pytest.raises(TypeError, MappedQueue, d)
+
+ def test_push(self):
+ to_push = [6, 1, 4, 3, 2, 5, 0]
+ h_sifted = [0, 2, 1, 6, 3, 5, 4]
+ q = MappedQueue()
+ for elt in to_push:
+ q.push(elt, priority=elt)
+ assert q.heap == h_sifted
+ self._check_map(q)
+
+ def test_push_duplicate(self):
+ to_push = [2, 1, 0]
+ h_sifted = [0, 2, 1]
+ q = MappedQueue()
+ for elt in to_push:
+ inserted = q.push(elt, priority=elt)
+ assert inserted
+ assert q.heap == h_sifted
+ self._check_map(q)
+ inserted = q.push(1, priority=1)
+ assert not inserted
+
+ def test_update_leaf(self):
+ h = [0, 20, 10, 60, 30, 50, 40]
+ h_updated = [0, 15, 10, 60, 20, 50, 40]
+ q = self._make_mapped_queue(h)
+ removed = q.update(30, 15, priority=15)
+ assert q.heap == h_updated
+
+ def test_update_root(self):
+ h = [0, 20, 10, 60, 30, 50, 40]
+ h_updated = [10, 20, 35, 60, 30, 50, 40]
+ q = self._make_mapped_queue(h)
+ removed = q.update(0, 35, priority=35)
+ assert q.heap == h_updated
diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_misc.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_misc.py
new file mode 100644
index 00000000..eff36b2a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_misc.py
@@ -0,0 +1,268 @@
+import random
+from copy import copy
+
+import pytest
+
+import networkx as nx
+from networkx.utils import (
+ PythonRandomInterface,
+ PythonRandomViaNumpyBits,
+ arbitrary_element,
+ create_py_random_state,
+ create_random_state,
+ dict_to_numpy_array,
+ discrete_sequence,
+ flatten,
+ groups,
+ make_list_of_ints,
+ pairwise,
+ powerlaw_sequence,
+)
+from networkx.utils.misc import _dict_to_numpy_array1, _dict_to_numpy_array2
+
+nested_depth = (
+ 1,
+ 2,
+ (3, 4, ((5, 6, (7,), (8, (9, 10), 11), (12, 13, (14, 15)), 16), 17), 18, 19),
+ 20,
+)
+
+nested_set = {
+ (1, 2, 3, 4),
+ (5, 6, 7, 8, 9),
+ (10, 11, (12, 13, 14), (15, 16, 17, 18)),
+ 19,
+ 20,
+}
+
+nested_mixed = [
+ 1,
+ (2, 3, {4, (5, 6), 7}, [8, 9]),
+ {10: "foo", 11: "bar", (12, 13): "baz"},
+ {(14, 15): "qwe", 16: "asd"},
+ (17, (18, "19"), 20),
+]
+
+
+@pytest.mark.parametrize("result", [None, [], ["existing"], ["existing1", "existing2"]])
+@pytest.mark.parametrize("nested", [nested_depth, nested_mixed, nested_set])
+def test_flatten(nested, result):
+ if result is None:
+ val = flatten(nested, result)
+ assert len(val) == 20
+ else:
+ _result = copy(result) # because pytest passes parameters as is
+ nexisting = len(_result)
+ val = flatten(nested, _result)
+ assert len(val) == len(_result) == 20 + nexisting
+
+ assert issubclass(type(val), tuple)
+
+
+def test_make_list_of_ints():
+ mylist = [1, 2, 3.0, 42, -2]
+ assert make_list_of_ints(mylist) is mylist
+ assert make_list_of_ints(mylist) == mylist
+ assert type(make_list_of_ints(mylist)[2]) is int
+ pytest.raises(nx.NetworkXError, make_list_of_ints, [1, 2, 3, "kermit"])
+ pytest.raises(nx.NetworkXError, make_list_of_ints, [1, 2, 3.1])
+
+
+def test_random_number_distribution():
+ # smoke test only
+ z = powerlaw_sequence(20, exponent=2.5)
+ z = discrete_sequence(20, distribution=[0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 3])
+
+
+class TestNumpyArray:
+ @classmethod
+ def setup_class(cls):
+ global np
+ np = pytest.importorskip("numpy")
+
+ def test_numpy_to_list_of_ints(self):
+ a = np.array([1, 2, 3], dtype=np.int64)
+ b = np.array([1.0, 2, 3])
+ c = np.array([1.1, 2, 3])
+ assert type(make_list_of_ints(a)) == list
+ assert make_list_of_ints(b) == list(b)
+ B = make_list_of_ints(b)
+ assert type(B[0]) == int
+ pytest.raises(nx.NetworkXError, make_list_of_ints, c)
+
+ def test__dict_to_numpy_array1(self):
+ d = {"a": 1, "b": 2}
+ a = _dict_to_numpy_array1(d, mapping={"a": 0, "b": 1})
+ np.testing.assert_allclose(a, np.array([1, 2]))
+ a = _dict_to_numpy_array1(d, mapping={"b": 0, "a": 1})
+ np.testing.assert_allclose(a, np.array([2, 1]))
+
+ a = _dict_to_numpy_array1(d)
+ np.testing.assert_allclose(a.sum(), 3)
+
+ def test__dict_to_numpy_array2(self):
+ d = {"a": {"a": 1, "b": 2}, "b": {"a": 10, "b": 20}}
+
+ mapping = {"a": 1, "b": 0}
+ a = _dict_to_numpy_array2(d, mapping=mapping)
+ np.testing.assert_allclose(a, np.array([[20, 10], [2, 1]]))
+
+ a = _dict_to_numpy_array2(d)
+ np.testing.assert_allclose(a.sum(), 33)
+
+ def test_dict_to_numpy_array_a(self):
+ d = {"a": {"a": 1, "b": 2}, "b": {"a": 10, "b": 20}}
+
+ mapping = {"a": 0, "b": 1}
+ a = dict_to_numpy_array(d, mapping=mapping)
+ np.testing.assert_allclose(a, np.array([[1, 2], [10, 20]]))
+
+ mapping = {"a": 1, "b": 0}
+ a = dict_to_numpy_array(d, mapping=mapping)
+ np.testing.assert_allclose(a, np.array([[20, 10], [2, 1]]))
+
+ a = _dict_to_numpy_array2(d)
+ np.testing.assert_allclose(a.sum(), 33)
+
+ def test_dict_to_numpy_array_b(self):
+ d = {"a": 1, "b": 2}
+
+ mapping = {"a": 0, "b": 1}
+ a = dict_to_numpy_array(d, mapping=mapping)
+ np.testing.assert_allclose(a, np.array([1, 2]))
+
+ a = _dict_to_numpy_array1(d)
+ np.testing.assert_allclose(a.sum(), 3)
+
+
+def test_pairwise():
+ nodes = range(4)
+ node_pairs = [(0, 1), (1, 2), (2, 3)]
+ node_pairs_cycle = node_pairs + [(3, 0)]
+ assert list(pairwise(nodes)) == node_pairs
+ assert list(pairwise(iter(nodes))) == node_pairs
+ assert list(pairwise(nodes, cyclic=True)) == node_pairs_cycle
+ empty_iter = iter(())
+ assert list(pairwise(empty_iter)) == []
+ empty_iter = iter(())
+ assert list(pairwise(empty_iter, cyclic=True)) == []
+
+
+def test_groups():
+ many_to_one = dict(zip("abcde", [0, 0, 1, 1, 2]))
+ actual = groups(many_to_one)
+ expected = {0: {"a", "b"}, 1: {"c", "d"}, 2: {"e"}}
+ assert actual == expected
+ assert {} == groups({})
+
+
+def test_create_random_state():
+ np = pytest.importorskip("numpy")
+ rs = np.random.RandomState
+
+ assert isinstance(create_random_state(1), rs)
+ assert isinstance(create_random_state(None), rs)
+ assert isinstance(create_random_state(np.random), rs)
+ assert isinstance(create_random_state(rs(1)), rs)
+ # Support for numpy.random.Generator
+ rng = np.random.default_rng()
+ assert isinstance(create_random_state(rng), np.random.Generator)
+ pytest.raises(ValueError, create_random_state, "a")
+
+ assert np.all(rs(1).rand(10) == create_random_state(1).rand(10))
+
+
+def test_create_py_random_state():
+ pyrs = random.Random
+
+ assert isinstance(create_py_random_state(1), pyrs)
+ assert isinstance(create_py_random_state(None), pyrs)
+ assert isinstance(create_py_random_state(pyrs(1)), pyrs)
+ pytest.raises(ValueError, create_py_random_state, "a")
+
+ np = pytest.importorskip("numpy")
+
+ rs = np.random.RandomState
+ rng = np.random.default_rng(1000)
+ rng_explicit = np.random.Generator(np.random.SFC64())
+ old_nprs = PythonRandomInterface
+ nprs = PythonRandomViaNumpyBits
+ assert isinstance(create_py_random_state(np.random), nprs)
+ assert isinstance(create_py_random_state(rs(1)), old_nprs)
+ assert isinstance(create_py_random_state(rng), nprs)
+ assert isinstance(create_py_random_state(rng_explicit), nprs)
+ # test default rng input
+ assert isinstance(PythonRandomInterface(), old_nprs)
+ assert isinstance(PythonRandomViaNumpyBits(), nprs)
+
+ # VeryLargeIntegers Smoke test (they raise error for np.random)
+ int64max = 9223372036854775807 # from np.iinfo(np.int64).max
+ for r in (rng, rs(1)):
+ prs = create_py_random_state(r)
+ prs.randrange(3, int64max + 5)
+ prs.randint(3, int64max + 5)
+
+
+def test_PythonRandomInterface_RandomState():
+ np = pytest.importorskip("numpy")
+
+ seed = 42
+ rs = np.random.RandomState
+ rng = PythonRandomInterface(rs(seed))
+ rs42 = rs(seed)
+
+ # make sure these functions are same as expected outcome
+ assert rng.randrange(3, 5) == rs42.randint(3, 5)
+ assert rng.choice([1, 2, 3]) == rs42.choice([1, 2, 3])
+ assert rng.gauss(0, 1) == rs42.normal(0, 1)
+ assert rng.expovariate(1.5) == rs42.exponential(1 / 1.5)
+ assert np.all(rng.shuffle([1, 2, 3]) == rs42.shuffle([1, 2, 3]))
+ assert np.all(
+ rng.sample([1, 2, 3], 2) == rs42.choice([1, 2, 3], (2,), replace=False)
+ )
+ assert np.all(
+ [rng.randint(3, 5) for _ in range(100)]
+ == [rs42.randint(3, 6) for _ in range(100)]
+ )
+ assert rng.random() == rs42.random_sample()
+
+
+def test_PythonRandomInterface_Generator():
+ np = pytest.importorskip("numpy")
+
+ seed = 42
+ rng = np.random.default_rng(seed)
+ pri = PythonRandomInterface(np.random.default_rng(seed))
+
+ # make sure these functions are same as expected outcome
+ assert pri.randrange(3, 5) == rng.integers(3, 5)
+ assert pri.choice([1, 2, 3]) == rng.choice([1, 2, 3])
+ assert pri.gauss(0, 1) == rng.normal(0, 1)
+ assert pri.expovariate(1.5) == rng.exponential(1 / 1.5)
+ assert np.all(pri.shuffle([1, 2, 3]) == rng.shuffle([1, 2, 3]))
+ assert np.all(
+ pri.sample([1, 2, 3], 2) == rng.choice([1, 2, 3], (2,), replace=False)
+ )
+ assert np.all(
+ [pri.randint(3, 5) for _ in range(100)]
+ == [rng.integers(3, 6) for _ in range(100)]
+ )
+ assert pri.random() == rng.random()
+
+
+@pytest.mark.parametrize(
+ ("iterable_type", "expected"), ((list, 1), (tuple, 1), (str, "["), (set, 1))
+)
+def test_arbitrary_element(iterable_type, expected):
+ iterable = iterable_type([1, 2, 3])
+ assert arbitrary_element(iterable) == expected
+
+
+@pytest.mark.parametrize(
+ "iterator",
+ ((i for i in range(3)), iter([1, 2, 3])), # generator
+)
+def test_arbitrary_element_raises(iterator):
+ """Value error is raised when input is an iterator."""
+ with pytest.raises(ValueError, match="from an iterator"):
+ arbitrary_element(iterator)
diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_random_sequence.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_random_sequence.py
new file mode 100644
index 00000000..1d1b9579
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_random_sequence.py
@@ -0,0 +1,38 @@
+import pytest
+
+from networkx.utils import (
+ powerlaw_sequence,
+ random_weighted_sample,
+ weighted_choice,
+ zipf_rv,
+)
+
+
+def test_degree_sequences():
+ seq = powerlaw_sequence(10, seed=1)
+ seq = powerlaw_sequence(10)
+ assert len(seq) == 10
+
+
+def test_zipf_rv():
+ r = zipf_rv(2.3, xmin=2, seed=1)
+ r = zipf_rv(2.3, 2, 1)
+ r = zipf_rv(2.3)
+ assert type(r), int
+ pytest.raises(ValueError, zipf_rv, 0.5)
+ pytest.raises(ValueError, zipf_rv, 2, xmin=0)
+
+
+def test_random_weighted_sample():
+ mapping = {"a": 10, "b": 20}
+ s = random_weighted_sample(mapping, 2, seed=1)
+ s = random_weighted_sample(mapping, 2)
+ assert sorted(s) == sorted(mapping.keys())
+ pytest.raises(ValueError, random_weighted_sample, mapping, 3)
+
+
+def test_random_weighted_choice():
+ mapping = {"a": 10, "b": 0}
+ c = weighted_choice(mapping, seed=1)
+ c = weighted_choice(mapping)
+ assert c == "a"
diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_rcm.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_rcm.py
new file mode 100644
index 00000000..88702b36
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_rcm.py
@@ -0,0 +1,63 @@
+import networkx as nx
+from networkx.utils import reverse_cuthill_mckee_ordering
+
+
+def test_reverse_cuthill_mckee():
+ # example graph from
+ # http://www.boost.org/doc/libs/1_37_0/libs/graph/example/cuthill_mckee_ordering.cpp
+ G = nx.Graph(
+ [
+ (0, 3),
+ (0, 5),
+ (1, 2),
+ (1, 4),
+ (1, 6),
+ (1, 9),
+ (2, 3),
+ (2, 4),
+ (3, 5),
+ (3, 8),
+ (4, 6),
+ (5, 6),
+ (5, 7),
+ (6, 7),
+ ]
+ )
+ rcm = list(reverse_cuthill_mckee_ordering(G))
+ assert rcm in [[0, 8, 5, 7, 3, 6, 2, 4, 1, 9], [0, 8, 5, 7, 3, 6, 4, 2, 1, 9]]
+
+
+def test_rcm_alternate_heuristic():
+ # example from
+ G = nx.Graph(
+ [
+ (0, 0),
+ (0, 4),
+ (1, 1),
+ (1, 2),
+ (1, 5),
+ (1, 7),
+ (2, 2),
+ (2, 4),
+ (3, 3),
+ (3, 6),
+ (4, 4),
+ (5, 5),
+ (5, 7),
+ (6, 6),
+ (7, 7),
+ ]
+ )
+
+ answers = [
+ [6, 3, 5, 7, 1, 2, 4, 0],
+ [6, 3, 7, 5, 1, 2, 4, 0],
+ [7, 5, 1, 2, 4, 0, 6, 3],
+ ]
+
+ def smallest_degree(G):
+ deg, node = min((d, n) for n, d in G.degree())
+ return node
+
+ rcm = list(reverse_cuthill_mckee_ordering(G, heuristic=smallest_degree))
+ assert rcm in answers
diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_unionfind.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_unionfind.py
new file mode 100644
index 00000000..2d30580f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_unionfind.py
@@ -0,0 +1,55 @@
+import networkx as nx
+
+
+def test_unionfind():
+ # Fixed by: 2cddd5958689bdecdcd89b91ac9aaf6ce0e4f6b8
+ # Previously (in 2.x), the UnionFind class could handle mixed types.
+ # But in Python 3.x, this causes a TypeError such as:
+ # TypeError: unorderable types: str() > int()
+ #
+ # Now we just make sure that no exception is raised.
+ x = nx.utils.UnionFind()
+ x.union(0, "a")
+
+
+def test_subtree_union():
+ # See https://github.com/networkx/networkx/pull/3224
+ # (35db1b551ee65780794a357794f521d8768d5049).
+ # Test if subtree unions hare handled correctly by to_sets().
+ uf = nx.utils.UnionFind()
+ uf.union(1, 2)
+ uf.union(3, 4)
+ uf.union(4, 5)
+ uf.union(1, 5)
+ assert list(uf.to_sets()) == [{1, 2, 3, 4, 5}]
+
+
+def test_unionfind_weights():
+ # Tests if weights are computed correctly with unions of many elements
+ uf = nx.utils.UnionFind()
+ uf.union(1, 4, 7)
+ uf.union(2, 5, 8)
+ uf.union(3, 6, 9)
+ uf.union(1, 2, 3, 4, 5, 6, 7, 8, 9)
+ assert uf.weights[uf[1]] == 9
+
+
+def test_unbalanced_merge_weights():
+ # Tests if the largest set's root is used as the new root when merging
+ uf = nx.utils.UnionFind()
+ uf.union(1, 2, 3)
+ uf.union(4, 5, 6, 7, 8, 9)
+ assert uf.weights[uf[1]] == 3
+ assert uf.weights[uf[4]] == 6
+ largest_root = uf[4]
+ uf.union(1, 4)
+ assert uf[1] == largest_root
+ assert uf.weights[largest_root] == 9
+
+
+def test_empty_union():
+ # Tests if a null-union does nothing.
+ uf = nx.utils.UnionFind((0, 1))
+ uf.union()
+ assert uf[0] == 0
+ assert uf[1] == 1