diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/networkx/utils/tests')
11 files changed, 1745 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/__init__.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/__init__.py diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test__init.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test__init.py new file mode 100644 index 00000000..ecbcce36 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test__init.py @@ -0,0 +1,11 @@ +import pytest + + +def test_utils_namespace(): + """Ensure objects are not unintentionally exposed in utils namespace.""" + with pytest.raises(ImportError): + from networkx.utils import nx + with pytest.raises(ImportError): + from networkx.utils import sys + with pytest.raises(ImportError): + from networkx.utils import defaultdict, deque diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_backends.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_backends.py new file mode 100644 index 00000000..ad006f00 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_backends.py @@ -0,0 +1,170 @@ +import pickle + +import pytest + +import networkx as nx + +sp = pytest.importorskip("scipy") +pytest.importorskip("numpy") + + +def test_dispatch_kwds_vs_args(): + G = nx.path_graph(4) + nx.pagerank(G) + nx.pagerank(G=G) + with pytest.raises(TypeError): + nx.pagerank() + + +def test_pickle(): + count = 0 + for name, func in nx.utils.backends._registered_algorithms.items(): + pickled = pickle.dumps(func.__wrapped__) + assert pickle.loads(pickled) is func.__wrapped__ + try: + # Some functions can't be pickled, but it's not b/c of _dispatchable + pickled = pickle.dumps(func) + except pickle.PicklingError: + continue + assert pickle.loads(pickled) is func + count += 1 + assert count > 0 + assert pickle.loads(pickle.dumps(nx.inverse_line_graph)) is nx.inverse_line_graph + + +@pytest.mark.skipif( + "not nx.config.backend_priority.algos " + "or nx.config.backend_priority.algos[0] != 'nx_loopback'" +) +def test_graph_converter_needs_backend(): + # When testing, `nx.from_scipy_sparse_array` will *always* call the backend + # implementation if it's implemented. If `backend=` isn't given, then the result + # will be converted back to NetworkX via `convert_to_nx`. + # If not testing, then calling `nx.from_scipy_sparse_array` w/o `backend=` will + # always call the original version. `backend=` is *required* to call the backend. + from networkx.classes.tests.dispatch_interface import ( + LoopbackBackendInterface, + LoopbackGraph, + ) + + A = sp.sparse.coo_array([[0, 3, 2], [3, 0, 1], [2, 1, 0]]) + + side_effects = [] + + def from_scipy_sparse_array(self, *args, **kwargs): + side_effects.append(1) # Just to prove this was called + return self.convert_from_nx( + self.__getattr__("from_scipy_sparse_array")(*args, **kwargs), + preserve_edge_attrs=True, + preserve_node_attrs=True, + preserve_graph_attrs=True, + ) + + @staticmethod + def convert_to_nx(obj, *, name=None): + if type(obj) is nx.Graph: + return obj + return nx.Graph(obj) + + # *This mutates LoopbackBackendInterface!* + orig_convert_to_nx = LoopbackBackendInterface.convert_to_nx + LoopbackBackendInterface.convert_to_nx = convert_to_nx + LoopbackBackendInterface.from_scipy_sparse_array = from_scipy_sparse_array + + try: + assert side_effects == [] + assert type(nx.from_scipy_sparse_array(A)) is nx.Graph + assert side_effects == [1] + assert ( + type(nx.from_scipy_sparse_array(A, backend="nx_loopback")) is LoopbackGraph + ) + assert side_effects == [1, 1] + # backend="networkx" is default implementation + assert type(nx.from_scipy_sparse_array(A, backend="networkx")) is nx.Graph + assert side_effects == [1, 1] + finally: + LoopbackBackendInterface.convert_to_nx = staticmethod(orig_convert_to_nx) + del LoopbackBackendInterface.from_scipy_sparse_array + with pytest.raises(ImportError, match="backend is not installed"): + nx.from_scipy_sparse_array(A, backend="bad-backend-name") + + +@pytest.mark.skipif( + "not nx.config.backend_priority.algos " + "or nx.config.backend_priority.algos[0] != 'nx_loopback'" +) +def test_networkx_backend(): + """Test using `backend="networkx"` in a dispatchable function.""" + # (Implementing this test is harder than it should be) + from networkx.classes.tests.dispatch_interface import ( + LoopbackBackendInterface, + LoopbackGraph, + ) + + G = LoopbackGraph() + G.add_edges_from([(0, 1), (1, 2), (1, 3), (2, 4)]) + + @staticmethod + def convert_to_nx(obj, *, name=None): + if isinstance(obj, LoopbackGraph): + new_graph = nx.Graph() + new_graph.__dict__.update(obj.__dict__) + return new_graph + return obj + + # *This mutates LoopbackBackendInterface!* + # This uses the same trick as in the previous test. + orig_convert_to_nx = LoopbackBackendInterface.convert_to_nx + LoopbackBackendInterface.convert_to_nx = convert_to_nx + try: + G2 = nx.ego_graph(G, 0, backend="networkx") + assert type(G2) is nx.Graph + finally: + LoopbackBackendInterface.convert_to_nx = staticmethod(orig_convert_to_nx) + + +def test_dispatchable_are_functions(): + assert type(nx.pagerank) is type(nx.pagerank.orig_func) + + +@pytest.mark.skipif("not nx.utils.backends.backends") +def test_mixing_backend_graphs(): + from networkx.classes.tests import dispatch_interface + + G = nx.Graph() + G.add_edge(1, 2) + G.add_edge(2, 3) + H = nx.Graph() + H.add_edge(2, 3) + rv = nx.intersection(G, H) + assert set(nx.intersection(G, H)) == {2, 3} + G2 = dispatch_interface.convert(G) + H2 = dispatch_interface.convert(H) + if "nx_loopback" in nx.config.backend_priority: + # Auto-convert + assert set(nx.intersection(G2, H)) == {2, 3} + assert set(nx.intersection(G, H2)) == {2, 3} + elif not nx.config.backend_priority and "nx_loopback" not in nx.config.backends: + # G2 and H2 are backend objects for a backend that is not registered! + with pytest.raises(ImportError, match="backend is not installed"): + nx.intersection(G2, H) + with pytest.raises(ImportError, match="backend is not installed"): + nx.intersection(G, H2) + # It would be nice to test passing graphs from *different* backends, + # but we are not set up to do this yet. + + +def test_bad_backend_name(): + """Using `backend=` raises with unknown backend even if there are no backends.""" + with pytest.raises( + ImportError, match="'this_backend_does_not_exist' backend is not installed" + ): + nx.null_graph(backend="this_backend_does_not_exist") + + +def test_fallback_to_nx(): + with pytest.warns(DeprecationWarning, match="_fallback_to_nx"): + # Check as class property + assert nx._dispatchable._fallback_to_nx == nx.config.fallback_to_nx + # Check as instance property + assert nx.pagerank.__wrapped__._fallback_to_nx == nx.config.fallback_to_nx diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_config.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_config.py new file mode 100644 index 00000000..7416b0ac --- /dev/null +++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_config.py @@ -0,0 +1,231 @@ +import collections +import pickle + +import pytest + +import networkx as nx +from networkx.utils.configs import BackendPriorities, Config + + +# Define this at module level so we can test pickling +class ExampleConfig(Config): + """Example configuration.""" + + x: int + y: str + + def _on_setattr(self, key, value): + if key == "x" and value <= 0: + raise ValueError("x must be positive") + if key == "y" and not isinstance(value, str): + raise TypeError("y must be a str") + return value + + +class EmptyConfig(Config): + pass + + +@pytest.mark.parametrize("cfg", [EmptyConfig(), Config()]) +def test_config_empty(cfg): + assert dir(cfg) == [] + with pytest.raises(AttributeError): + cfg.x = 1 + with pytest.raises(KeyError): + cfg["x"] = 1 + with pytest.raises(AttributeError): + cfg.x + with pytest.raises(KeyError): + cfg["x"] + assert len(cfg) == 0 + assert "x" not in cfg + assert cfg == cfg + assert cfg.get("x", 2) == 2 + assert set(cfg.keys()) == set() + assert set(cfg.values()) == set() + assert set(cfg.items()) == set() + cfg2 = pickle.loads(pickle.dumps(cfg)) + assert cfg == cfg2 + assert isinstance(cfg, collections.abc.Collection) + assert isinstance(cfg, collections.abc.Mapping) + + +def test_config_subclass(): + with pytest.raises(TypeError, match="missing 2 required keyword-only"): + ExampleConfig() + with pytest.raises(ValueError, match="x must be positive"): + ExampleConfig(x=0, y="foo") + with pytest.raises(TypeError, match="unexpected keyword"): + ExampleConfig(x=1, y="foo", z="bad config") + with pytest.raises(TypeError, match="unexpected keyword"): + EmptyConfig(z="bad config") + cfg = ExampleConfig(x=1, y="foo") + assert cfg.x == 1 + assert cfg["x"] == 1 + assert cfg["y"] == "foo" + assert cfg.y == "foo" + assert "x" in cfg + assert "y" in cfg + assert "z" not in cfg + assert len(cfg) == 2 + assert set(iter(cfg)) == {"x", "y"} + assert set(cfg.keys()) == {"x", "y"} + assert set(cfg.values()) == {1, "foo"} + assert set(cfg.items()) == {("x", 1), ("y", "foo")} + assert dir(cfg) == ["x", "y"] + cfg.x = 2 + cfg["y"] = "bar" + assert cfg["x"] == 2 + assert cfg.y == "bar" + with pytest.raises(TypeError, match="can't be deleted"): + del cfg.x + with pytest.raises(TypeError, match="can't be deleted"): + del cfg["y"] + assert cfg.x == 2 + assert cfg == cfg + assert cfg == ExampleConfig(x=2, y="bar") + assert cfg != ExampleConfig(x=3, y="baz") + assert cfg != Config(x=2, y="bar") + with pytest.raises(TypeError, match="y must be a str"): + cfg["y"] = 5 + with pytest.raises(ValueError, match="x must be positive"): + cfg.x = -5 + assert cfg.get("x", 10) == 2 + with pytest.raises(AttributeError): + cfg.z = 5 + with pytest.raises(KeyError): + cfg["z"] = 5 + with pytest.raises(AttributeError): + cfg.z + with pytest.raises(KeyError): + cfg["z"] + cfg2 = pickle.loads(pickle.dumps(cfg)) + assert cfg == cfg2 + assert cfg.__doc__ == "Example configuration." + assert cfg2.__doc__ == "Example configuration." + + +def test_config_defaults(): + class DefaultConfig(Config): + x: int = 0 + y: int + + cfg = DefaultConfig(y=1) + assert cfg.x == 0 + cfg = DefaultConfig(x=2, y=1) + assert cfg.x == 2 + + +def test_nxconfig(): + assert isinstance(nx.config.backend_priority, BackendPriorities) + assert isinstance(nx.config.backend_priority.algos, list) + assert isinstance(nx.config.backends, Config) + with pytest.raises(TypeError, match="must be a list of backend names"): + nx.config.backend_priority.algos = "nx_loopback" + with pytest.raises(ValueError, match="Unknown backend when setting"): + nx.config.backend_priority.algos = ["this_almost_certainly_is_not_a_backend"] + with pytest.raises(TypeError, match="must be a Config of backend configs"): + nx.config.backends = {} + with pytest.raises(TypeError, match="must be a Config of backend configs"): + nx.config.backends = Config(plausible_backend_name={}) + with pytest.raises(ValueError, match="Unknown backend when setting"): + nx.config.backends = Config(this_almost_certainly_is_not_a_backend=Config()) + with pytest.raises(TypeError, match="must be True or False"): + nx.config.cache_converted_graphs = "bad value" + with pytest.raises(TypeError, match="must be a set of "): + nx.config.warnings_to_ignore = 7 + with pytest.raises(ValueError, match="Unknown warning "): + nx.config.warnings_to_ignore = {"bad value"} + + +def test_not_strict(): + class FlexibleConfig(Config, strict=False): + x: int + + cfg = FlexibleConfig(x=1) + assert "_strict" not in cfg + assert len(cfg) == 1 + assert list(cfg) == ["x"] + assert list(cfg.keys()) == ["x"] + assert list(cfg.values()) == [1] + assert list(cfg.items()) == [("x", 1)] + assert cfg.x == 1 + assert cfg["x"] == 1 + assert "x" in cfg + assert hasattr(cfg, "x") + assert "FlexibleConfig(x=1)" in repr(cfg) + assert cfg == FlexibleConfig(x=1) + del cfg.x + assert "FlexibleConfig()" in repr(cfg) + assert len(cfg) == 0 + assert not hasattr(cfg, "x") + assert "x" not in cfg + assert not hasattr(cfg, "y") + assert "y" not in cfg + cfg.y = 2 + assert len(cfg) == 1 + assert list(cfg) == ["y"] + assert list(cfg.keys()) == ["y"] + assert list(cfg.values()) == [2] + assert list(cfg.items()) == [("y", 2)] + assert cfg.y == 2 + assert cfg["y"] == 2 + assert hasattr(cfg, "y") + assert "y" in cfg + del cfg["y"] + assert len(cfg) == 0 + assert list(cfg) == [] + with pytest.raises(AttributeError, match="y"): + del cfg.y + with pytest.raises(KeyError, match="y"): + del cfg["y"] + with pytest.raises(TypeError, match="missing 1 required keyword-only"): + FlexibleConfig() + # Be strict when first creating the config object + with pytest.raises(TypeError, match="unexpected keyword argument 'y'"): + FlexibleConfig(x=1, y=2) + + class FlexibleConfigWithDefault(Config, strict=False): + x: int = 0 + + assert FlexibleConfigWithDefault().x == 0 + assert FlexibleConfigWithDefault(x=1)["x"] == 1 + + +def test_context(): + cfg = Config(x=1) + with cfg(x=2) as c: + assert c.x == 2 + c.x = 3 + assert cfg.x == 3 + assert cfg.x == 1 + + with cfg(x=2) as c: + assert c == cfg + assert cfg.x == 2 + with cfg(x=3) as c2: + assert c2 == cfg + assert cfg.x == 3 + with pytest.raises(RuntimeError, match="context manager without"): + with cfg as c3: # Forgot to call `cfg(...)` + pass + assert cfg.x == 3 + assert cfg.x == 2 + assert cfg.x == 1 + + c = cfg(x=4) # Not yet as context (not recommended, but possible) + assert c == cfg + assert cfg.x == 4 + # Cheat by looking at internal data; context stack should only grow with __enter__ + assert cfg._prev is not None + assert cfg._context_stack == [] + with c: + assert c == cfg + assert cfg.x == 4 + assert cfg.x == 1 + # Cheat again; there was no preceding `cfg(...)` call this time + assert cfg._prev is None + with pytest.raises(RuntimeError, match="context manager without"): + with cfg: + pass + assert cfg.x == 1 diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_decorators.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_decorators.py new file mode 100644 index 00000000..0a4aeabf --- /dev/null +++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_decorators.py @@ -0,0 +1,510 @@ +import os +import pathlib +import random +import tempfile + +import pytest + +import networkx as nx +from networkx.utils.decorators import ( + argmap, + not_implemented_for, + np_random_state, + open_file, + py_random_state, +) +from networkx.utils.misc import PythonRandomInterface, PythonRandomViaNumpyBits + + +def test_not_implemented_decorator(): + @not_implemented_for("directed") + def test_d(G): + pass + + test_d(nx.Graph()) + with pytest.raises(nx.NetworkXNotImplemented): + test_d(nx.DiGraph()) + + @not_implemented_for("undirected") + def test_u(G): + pass + + test_u(nx.DiGraph()) + with pytest.raises(nx.NetworkXNotImplemented): + test_u(nx.Graph()) + + @not_implemented_for("multigraph") + def test_m(G): + pass + + test_m(nx.Graph()) + with pytest.raises(nx.NetworkXNotImplemented): + test_m(nx.MultiGraph()) + + @not_implemented_for("graph") + def test_g(G): + pass + + test_g(nx.MultiGraph()) + with pytest.raises(nx.NetworkXNotImplemented): + test_g(nx.Graph()) + + # not MultiDiGraph (multiple arguments => AND) + @not_implemented_for("directed", "multigraph") + def test_not_md(G): + pass + + test_not_md(nx.Graph()) + test_not_md(nx.DiGraph()) + test_not_md(nx.MultiGraph()) + with pytest.raises(nx.NetworkXNotImplemented): + test_not_md(nx.MultiDiGraph()) + + # Graph only (multiple decorators => OR) + @not_implemented_for("directed") + @not_implemented_for("multigraph") + def test_graph_only(G): + pass + + test_graph_only(nx.Graph()) + with pytest.raises(nx.NetworkXNotImplemented): + test_graph_only(nx.DiGraph()) + with pytest.raises(nx.NetworkXNotImplemented): + test_graph_only(nx.MultiGraph()) + with pytest.raises(nx.NetworkXNotImplemented): + test_graph_only(nx.MultiDiGraph()) + + with pytest.raises(ValueError): + not_implemented_for("directed", "undirected") + + with pytest.raises(ValueError): + not_implemented_for("multigraph", "graph") + + +def test_not_implemented_decorator_key(): + with pytest.raises(KeyError): + + @not_implemented_for("foo") + def test1(G): + pass + + test1(nx.Graph()) + + +def test_not_implemented_decorator_raise(): + with pytest.raises(nx.NetworkXNotImplemented): + + @not_implemented_for("graph") + def test1(G): + pass + + test1(nx.Graph()) + + +class TestOpenFileDecorator: + def setup_method(self): + self.text = ["Blah... ", "BLAH ", "BLAH!!!!"] + self.fobj = tempfile.NamedTemporaryFile("wb+", delete=False) + self.name = self.fobj.name + + def teardown_method(self): + self.fobj.close() + os.unlink(self.name) + + def write(self, path): + for text in self.text: + path.write(text.encode("ascii")) + + @open_file(1, "r") + def read(self, path): + return path.readlines()[0] + + @staticmethod + @open_file(0, "wb") + def writer_arg0(path): + path.write(b"demo") + + @open_file(1, "wb+") + def writer_arg1(self, path): + self.write(path) + + @open_file(2, "wb") + def writer_arg2default(self, x, path=None): + if path is None: + with tempfile.NamedTemporaryFile("wb+") as fh: + self.write(fh) + else: + self.write(path) + + @open_file(4, "wb") + def writer_arg4default(self, x, y, other="hello", path=None, **kwargs): + if path is None: + with tempfile.NamedTemporaryFile("wb+") as fh: + self.write(fh) + else: + self.write(path) + + @open_file("path", "wb") + def writer_kwarg(self, **kwargs): + path = kwargs.get("path", None) + if path is None: + with tempfile.NamedTemporaryFile("wb+") as fh: + self.write(fh) + else: + self.write(path) + + def test_writer_arg0_str(self): + self.writer_arg0(self.name) + + def test_writer_arg0_fobj(self): + self.writer_arg0(self.fobj) + + def test_writer_arg0_pathlib(self): + self.writer_arg0(pathlib.Path(self.name)) + + def test_writer_arg1_str(self): + self.writer_arg1(self.name) + assert self.read(self.name) == "".join(self.text) + + def test_writer_arg1_fobj(self): + self.writer_arg1(self.fobj) + assert not self.fobj.closed + self.fobj.close() + assert self.read(self.name) == "".join(self.text) + + def test_writer_arg2default_str(self): + self.writer_arg2default(0, path=None) + self.writer_arg2default(0, path=self.name) + assert self.read(self.name) == "".join(self.text) + + def test_writer_arg2default_fobj(self): + self.writer_arg2default(0, path=self.fobj) + assert not self.fobj.closed + self.fobj.close() + assert self.read(self.name) == "".join(self.text) + + def test_writer_arg2default_fobj_path_none(self): + self.writer_arg2default(0, path=None) + + def test_writer_arg4default_fobj(self): + self.writer_arg4default(0, 1, dog="dog", other="other") + self.writer_arg4default(0, 1, dog="dog", other="other", path=self.name) + assert self.read(self.name) == "".join(self.text) + + def test_writer_kwarg_str(self): + self.writer_kwarg(path=self.name) + assert self.read(self.name) == "".join(self.text) + + def test_writer_kwarg_fobj(self): + self.writer_kwarg(path=self.fobj) + self.fobj.close() + assert self.read(self.name) == "".join(self.text) + + def test_writer_kwarg_path_none(self): + self.writer_kwarg(path=None) + + +class TestRandomState: + @classmethod + def setup_class(cls): + global np + np = pytest.importorskip("numpy") + + @np_random_state(1) + def instantiate_np_random_state(self, random_state): + allowed = (np.random.RandomState, np.random.Generator) + assert isinstance(random_state, allowed) + return random_state.random() + + @py_random_state(1) + def instantiate_py_random_state(self, random_state): + allowed = (random.Random, PythonRandomInterface, PythonRandomViaNumpyBits) + assert isinstance(random_state, allowed) + return random_state.random() + + def test_random_state_None(self): + np.random.seed(42) + rv = np.random.random() + np.random.seed(42) + assert rv == self.instantiate_np_random_state(None) + + random.seed(42) + rv = random.random() + random.seed(42) + assert rv == self.instantiate_py_random_state(None) + + def test_random_state_np_random(self): + np.random.seed(42) + rv = np.random.random() + np.random.seed(42) + assert rv == self.instantiate_np_random_state(np.random) + np.random.seed(42) + assert rv == self.instantiate_py_random_state(np.random) + + def test_random_state_int(self): + np.random.seed(42) + np_rv = np.random.random() + random.seed(42) + py_rv = random.random() + + np.random.seed(42) + seed = 1 + rval = self.instantiate_np_random_state(seed) + rval_expected = np.random.RandomState(seed).rand() + assert rval == rval_expected + # test that global seed wasn't changed in function + assert np_rv == np.random.random() + + random.seed(42) + rval = self.instantiate_py_random_state(seed) + rval_expected = random.Random(seed).random() + assert rval == rval_expected + # test that global seed wasn't changed in function + assert py_rv == random.random() + + def test_random_state_np_random_Generator(self): + np.random.seed(42) + np_rv = np.random.random() + np.random.seed(42) + seed = 1 + + rng = np.random.default_rng(seed) + rval = self.instantiate_np_random_state(rng) + rval_expected = np.random.default_rng(seed).random() + assert rval == rval_expected + + rval = self.instantiate_py_random_state(rng) + rval_expected = np.random.default_rng(seed).random(size=2)[1] + assert rval == rval_expected + # test that global seed wasn't changed in function + assert np_rv == np.random.random() + + def test_random_state_np_random_RandomState(self): + np.random.seed(42) + np_rv = np.random.random() + np.random.seed(42) + seed = 1 + + rng = np.random.RandomState(seed) + rval = self.instantiate_np_random_state(rng) + rval_expected = np.random.RandomState(seed).random() + assert rval == rval_expected + + rval = self.instantiate_py_random_state(rng) + rval_expected = np.random.RandomState(seed).random(size=2)[1] + assert rval == rval_expected + # test that global seed wasn't changed in function + assert np_rv == np.random.random() + + def test_random_state_py_random(self): + seed = 1 + rng = random.Random(seed) + rv = self.instantiate_py_random_state(rng) + assert rv == random.Random(seed).random() + + pytest.raises(ValueError, self.instantiate_np_random_state, rng) + + +def test_random_state_string_arg_index(): + with pytest.raises(nx.NetworkXError): + + @np_random_state("a") + def make_random_state(rs): + pass + + rstate = make_random_state(1) + + +def test_py_random_state_string_arg_index(): + with pytest.raises(nx.NetworkXError): + + @py_random_state("a") + def make_random_state(rs): + pass + + rstate = make_random_state(1) + + +def test_random_state_invalid_arg_index(): + with pytest.raises(nx.NetworkXError): + + @np_random_state(2) + def make_random_state(rs): + pass + + rstate = make_random_state(1) + + +def test_py_random_state_invalid_arg_index(): + with pytest.raises(nx.NetworkXError): + + @py_random_state(2) + def make_random_state(rs): + pass + + rstate = make_random_state(1) + + +class TestArgmap: + class ArgmapError(RuntimeError): + pass + + def test_trivial_function(self): + def do_not_call(x): + raise ArgmapError("do not call this function") + + @argmap(do_not_call) + def trivial_argmap(): + return 1 + + assert trivial_argmap() == 1 + + def test_trivial_iterator(self): + def do_not_call(x): + raise ArgmapError("do not call this function") + + @argmap(do_not_call) + def trivial_argmap(): + yield from (1, 2, 3) + + assert tuple(trivial_argmap()) == (1, 2, 3) + + def test_contextmanager(self): + container = [] + + def contextmanager(x): + nonlocal container + return x, lambda: container.append(x) + + @argmap(contextmanager, 0, 1, 2, try_finally=True) + def foo(x, y, z): + return x, y, z + + x, y, z = foo("a", "b", "c") + + # context exits are called in reverse + assert container == ["c", "b", "a"] + + def test_tryfinally_generator(self): + container = [] + + def singleton(x): + return (x,) + + with pytest.raises(nx.NetworkXError): + + @argmap(singleton, 0, 1, 2, try_finally=True) + def foo(x, y, z): + yield from (x, y, z) + + @argmap(singleton, 0, 1, 2) + def foo(x, y, z): + return x + y + z + + q = foo("a", "b", "c") + + assert q == ("a", "b", "c") + + def test_actual_vararg(self): + @argmap(lambda x: -x, 4) + def foo(x, y, *args): + return (x, y) + tuple(args) + + assert foo(1, 2, 3, 4, 5, 6) == (1, 2, 3, 4, -5, 6) + + def test_signature_destroying_intermediate_decorator(self): + def add_one_to_first_bad_decorator(f): + """Bad because it doesn't wrap the f signature (clobbers it)""" + + def decorated(a, *args, **kwargs): + return f(a + 1, *args, **kwargs) + + return decorated + + add_two_to_second = argmap(lambda b: b + 2, 1) + + @add_two_to_second + @add_one_to_first_bad_decorator + def add_one_and_two(a, b): + return a, b + + assert add_one_and_two(5, 5) == (6, 7) + + def test_actual_kwarg(self): + @argmap(lambda x: -x, "arg") + def foo(*, arg): + return arg + + assert foo(arg=3) == -3 + + def test_nested_tuple(self): + def xform(x, y): + u, v = y + return x + u + v, (x + u, x + v) + + # we're testing args and kwargs here, too + @argmap(xform, (0, ("t", 2))) + def foo(a, *args, **kwargs): + return a, args, kwargs + + a, args, kwargs = foo(1, 2, 3, t=4) + + assert a == 1 + 4 + 3 + assert args == (2, 1 + 3) + assert kwargs == {"t": 1 + 4} + + def test_flatten(self): + assert tuple(argmap._flatten([[[[[], []], [], []], [], [], []]], set())) == () + + rlist = ["a", ["b", "c"], [["d"], "e"], "f"] + assert "".join(argmap._flatten(rlist, set())) == "abcdef" + + def test_indent(self): + code = "\n".join( + argmap._indent( + *[ + "try:", + "try:", + "pass#", + "finally:", + "pass#", + "#", + "finally:", + "pass#", + ] + ) + ) + assert ( + code + == """try: + try: + pass# + finally: + pass# + # +finally: + pass#""" + ) + + def test_immediate_raise(self): + @not_implemented_for("directed") + def yield_nodes(G): + yield from G + + G = nx.Graph([(1, 2)]) + D = nx.DiGraph() + + # test first call (argmap is compiled and executed) + with pytest.raises(nx.NetworkXNotImplemented): + node_iter = yield_nodes(D) + + # test second call (argmap is only executed) + with pytest.raises(nx.NetworkXNotImplemented): + node_iter = yield_nodes(D) + + # ensure that generators still make generators + node_iter = yield_nodes(G) + next(node_iter) + next(node_iter) + with pytest.raises(StopIteration): + next(node_iter) diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_heaps.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_heaps.py new file mode 100644 index 00000000..5ea38716 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_heaps.py @@ -0,0 +1,131 @@ +import pytest + +import networkx as nx +from networkx.utils import BinaryHeap, PairingHeap + + +class X: + def __eq__(self, other): + raise self is other + + def __ne__(self, other): + raise self is not other + + def __lt__(self, other): + raise TypeError("cannot compare") + + def __le__(self, other): + raise TypeError("cannot compare") + + def __ge__(self, other): + raise TypeError("cannot compare") + + def __gt__(self, other): + raise TypeError("cannot compare") + + def __hash__(self): + return hash(id(self)) + + +x = X() + + +data = [ # min should not invent an element. + ("min", nx.NetworkXError), + # Popping an empty heap should fail. + ("pop", nx.NetworkXError), + # Getting nonexisting elements should return None. + ("get", 0, None), + ("get", x, None), + ("get", None, None), + # Inserting a new key should succeed. + ("insert", x, 1, True), + ("get", x, 1), + ("min", (x, 1)), + # min should not pop the top element. + ("min", (x, 1)), + # Inserting a new key of different type should succeed. + ("insert", 1, -2.0, True), + # int and float values should interop. + ("min", (1, -2.0)), + # pop removes minimum-valued element. + ("insert", 3, -(10**100), True), + ("insert", 4, 5, True), + ("pop", (3, -(10**100))), + ("pop", (1, -2.0)), + # Decrease-insert should succeed. + ("insert", 4, -50, True), + ("insert", 4, -60, False, True), + # Decrease-insert should not create duplicate keys. + ("pop", (4, -60)), + ("pop", (x, 1)), + # Popping all elements should empty the heap. + ("min", nx.NetworkXError), + ("pop", nx.NetworkXError), + # Non-value-changing insert should fail. + ("insert", x, 0, True), + ("insert", x, 0, False, False), + ("min", (x, 0)), + ("insert", x, 0, True, False), + ("min", (x, 0)), + # Failed insert should not create duplicate keys. + ("pop", (x, 0)), + ("pop", nx.NetworkXError), + # Increase-insert should succeed when allowed. + ("insert", None, 0, True), + ("insert", 2, -1, True), + ("min", (2, -1)), + ("insert", 2, 1, True, False), + ("min", (None, 0)), + # Increase-insert should fail when disallowed. + ("insert", None, 2, False, False), + ("min", (None, 0)), + # Failed increase-insert should not create duplicate keys. + ("pop", (None, 0)), + ("pop", (2, 1)), + ("min", nx.NetworkXError), + ("pop", nx.NetworkXError), +] + + +def _test_heap_class(cls, *args, **kwargs): + heap = cls(*args, **kwargs) + # Basic behavioral test + for op in data: + if op[-1] is not nx.NetworkXError: + assert op[-1] == getattr(heap, op[0])(*op[1:-1]) + else: + pytest.raises(op[-1], getattr(heap, op[0]), *op[1:-1]) + # Coverage test. + for i in range(99, -1, -1): + assert heap.insert(i, i) + for i in range(50): + assert heap.pop() == (i, i) + for i in range(100): + assert heap.insert(i, i) == (i < 50) + for i in range(100): + assert not heap.insert(i, i + 1) + for i in range(50): + assert heap.pop() == (i, i) + for i in range(100): + assert heap.insert(i, i + 1) == (i < 50) + for i in range(49): + assert heap.pop() == (i, i + 1) + assert sorted([heap.pop(), heap.pop()]) == [(49, 50), (50, 50)] + for i in range(51, 100): + assert not heap.insert(i, i + 1, True) + for i in range(51, 70): + assert heap.pop() == (i, i + 1) + for i in range(100): + assert heap.insert(i, i) + for i in range(100): + assert heap.pop() == (i, i) + pytest.raises(nx.NetworkXError, heap.pop) + + +def test_PairingHeap(): + _test_heap_class(PairingHeap) + + +def test_BinaryHeap(): + _test_heap_class(BinaryHeap) diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_mapped_queue.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_mapped_queue.py new file mode 100644 index 00000000..ca9b7e42 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_mapped_queue.py @@ -0,0 +1,268 @@ +import pytest + +from networkx.utils.mapped_queue import MappedQueue, _HeapElement + + +def test_HeapElement_gtlt(): + bar = _HeapElement(1.1, "a") + foo = _HeapElement(1, "b") + assert foo < bar + assert bar > foo + assert foo < 1.1 + assert 1 < bar + + +def test_HeapElement_gtlt_tied_priority(): + bar = _HeapElement(1, "a") + foo = _HeapElement(1, "b") + assert foo > bar + assert bar < foo + + +def test_HeapElement_eq(): + bar = _HeapElement(1.1, "a") + foo = _HeapElement(1, "a") + assert foo == bar + assert bar == foo + assert foo == "a" + + +def test_HeapElement_iter(): + foo = _HeapElement(1, "a") + bar = _HeapElement(1.1, (3, 2, 1)) + assert list(foo) == [1, "a"] + assert list(bar) == [1.1, 3, 2, 1] + + +def test_HeapElement_getitem(): + foo = _HeapElement(1, "a") + bar = _HeapElement(1.1, (3, 2, 1)) + assert foo[1] == "a" + assert foo[0] == 1 + assert bar[0] == 1.1 + assert bar[2] == 2 + assert bar[3] == 1 + pytest.raises(IndexError, bar.__getitem__, 4) + pytest.raises(IndexError, foo.__getitem__, 2) + + +class TestMappedQueue: + def setup_method(self): + pass + + def _check_map(self, q): + assert q.position == {elt: pos for pos, elt in enumerate(q.heap)} + + def _make_mapped_queue(self, h): + q = MappedQueue() + q.heap = h + q.position = {elt: pos for pos, elt in enumerate(h)} + return q + + def test_heapify(self): + h = [5, 4, 3, 2, 1, 0] + q = self._make_mapped_queue(h) + q._heapify() + self._check_map(q) + + def test_init(self): + h = [5, 4, 3, 2, 1, 0] + q = MappedQueue(h) + self._check_map(q) + + def test_incomparable(self): + h = [5, 4, "a", 2, 1, 0] + pytest.raises(TypeError, MappedQueue, h) + + def test_len(self): + h = [5, 4, 3, 2, 1, 0] + q = MappedQueue(h) + self._check_map(q) + assert len(q) == 6 + + def test_siftup_leaf(self): + h = [2] + h_sifted = [2] + q = self._make_mapped_queue(h) + q._siftup(0) + assert q.heap == h_sifted + self._check_map(q) + + def test_siftup_one_child(self): + h = [2, 0] + h_sifted = [0, 2] + q = self._make_mapped_queue(h) + q._siftup(0) + assert q.heap == h_sifted + self._check_map(q) + + def test_siftup_left_child(self): + h = [2, 0, 1] + h_sifted = [0, 2, 1] + q = self._make_mapped_queue(h) + q._siftup(0) + assert q.heap == h_sifted + self._check_map(q) + + def test_siftup_right_child(self): + h = [2, 1, 0] + h_sifted = [0, 1, 2] + q = self._make_mapped_queue(h) + q._siftup(0) + assert q.heap == h_sifted + self._check_map(q) + + def test_siftup_multiple(self): + h = [0, 1, 2, 4, 3, 5, 6] + h_sifted = [0, 1, 2, 4, 3, 5, 6] + q = self._make_mapped_queue(h) + q._siftup(0) + assert q.heap == h_sifted + self._check_map(q) + + def test_siftdown_leaf(self): + h = [2] + h_sifted = [2] + q = self._make_mapped_queue(h) + q._siftdown(0, 0) + assert q.heap == h_sifted + self._check_map(q) + + def test_siftdown_single(self): + h = [1, 0] + h_sifted = [0, 1] + q = self._make_mapped_queue(h) + q._siftdown(0, len(h) - 1) + assert q.heap == h_sifted + self._check_map(q) + + def test_siftdown_multiple(self): + h = [1, 2, 3, 4, 5, 6, 7, 0] + h_sifted = [0, 1, 3, 2, 5, 6, 7, 4] + q = self._make_mapped_queue(h) + q._siftdown(0, len(h) - 1) + assert q.heap == h_sifted + self._check_map(q) + + def test_push(self): + to_push = [6, 1, 4, 3, 2, 5, 0] + h_sifted = [0, 2, 1, 6, 3, 5, 4] + q = MappedQueue() + for elt in to_push: + q.push(elt) + assert q.heap == h_sifted + self._check_map(q) + + def test_push_duplicate(self): + to_push = [2, 1, 0] + h_sifted = [0, 2, 1] + q = MappedQueue() + for elt in to_push: + inserted = q.push(elt) + assert inserted + assert q.heap == h_sifted + self._check_map(q) + inserted = q.push(1) + assert not inserted + + def test_pop(self): + h = [3, 4, 6, 0, 1, 2, 5] + h_sorted = sorted(h) + q = self._make_mapped_queue(h) + q._heapify() + popped = [q.pop() for _ in range(len(h))] + assert popped == h_sorted + self._check_map(q) + + def test_remove_leaf(self): + h = [0, 2, 1, 6, 3, 5, 4] + h_removed = [0, 2, 1, 6, 4, 5] + q = self._make_mapped_queue(h) + removed = q.remove(3) + assert q.heap == h_removed + + def test_remove_root(self): + h = [0, 2, 1, 6, 3, 5, 4] + h_removed = [1, 2, 4, 6, 3, 5] + q = self._make_mapped_queue(h) + removed = q.remove(0) + assert q.heap == h_removed + + def test_update_leaf(self): + h = [0, 20, 10, 60, 30, 50, 40] + h_updated = [0, 15, 10, 60, 20, 50, 40] + q = self._make_mapped_queue(h) + removed = q.update(30, 15) + assert q.heap == h_updated + + def test_update_root(self): + h = [0, 20, 10, 60, 30, 50, 40] + h_updated = [10, 20, 35, 60, 30, 50, 40] + q = self._make_mapped_queue(h) + removed = q.update(0, 35) + assert q.heap == h_updated + + +class TestMappedDict(TestMappedQueue): + def _make_mapped_queue(self, h): + priority_dict = {elt: elt for elt in h} + return MappedQueue(priority_dict) + + def test_init(self): + d = {5: 0, 4: 1, "a": 2, 2: 3, 1: 4} + q = MappedQueue(d) + assert q.position == d + + def test_ties(self): + d = {5: 0, 4: 1, 3: 2, 2: 3, 1: 4} + q = MappedQueue(d) + assert q.position == {elt: pos for pos, elt in enumerate(q.heap)} + + def test_pop(self): + d = {5: 0, 4: 1, 3: 2, 2: 3, 1: 4} + q = MappedQueue(d) + assert q.pop() == _HeapElement(0, 5) + assert q.position == {elt: pos for pos, elt in enumerate(q.heap)} + + def test_empty_pop(self): + q = MappedQueue() + pytest.raises(IndexError, q.pop) + + def test_incomparable_ties(self): + d = {5: 0, 4: 0, "a": 0, 2: 0, 1: 0} + pytest.raises(TypeError, MappedQueue, d) + + def test_push(self): + to_push = [6, 1, 4, 3, 2, 5, 0] + h_sifted = [0, 2, 1, 6, 3, 5, 4] + q = MappedQueue() + for elt in to_push: + q.push(elt, priority=elt) + assert q.heap == h_sifted + self._check_map(q) + + def test_push_duplicate(self): + to_push = [2, 1, 0] + h_sifted = [0, 2, 1] + q = MappedQueue() + for elt in to_push: + inserted = q.push(elt, priority=elt) + assert inserted + assert q.heap == h_sifted + self._check_map(q) + inserted = q.push(1, priority=1) + assert not inserted + + def test_update_leaf(self): + h = [0, 20, 10, 60, 30, 50, 40] + h_updated = [0, 15, 10, 60, 20, 50, 40] + q = self._make_mapped_queue(h) + removed = q.update(30, 15, priority=15) + assert q.heap == h_updated + + def test_update_root(self): + h = [0, 20, 10, 60, 30, 50, 40] + h_updated = [10, 20, 35, 60, 30, 50, 40] + q = self._make_mapped_queue(h) + removed = q.update(0, 35, priority=35) + assert q.heap == h_updated diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_misc.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_misc.py new file mode 100644 index 00000000..eff36b2a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_misc.py @@ -0,0 +1,268 @@ +import random +from copy import copy + +import pytest + +import networkx as nx +from networkx.utils import ( + PythonRandomInterface, + PythonRandomViaNumpyBits, + arbitrary_element, + create_py_random_state, + create_random_state, + dict_to_numpy_array, + discrete_sequence, + flatten, + groups, + make_list_of_ints, + pairwise, + powerlaw_sequence, +) +from networkx.utils.misc import _dict_to_numpy_array1, _dict_to_numpy_array2 + +nested_depth = ( + 1, + 2, + (3, 4, ((5, 6, (7,), (8, (9, 10), 11), (12, 13, (14, 15)), 16), 17), 18, 19), + 20, +) + +nested_set = { + (1, 2, 3, 4), + (5, 6, 7, 8, 9), + (10, 11, (12, 13, 14), (15, 16, 17, 18)), + 19, + 20, +} + +nested_mixed = [ + 1, + (2, 3, {4, (5, 6), 7}, [8, 9]), + {10: "foo", 11: "bar", (12, 13): "baz"}, + {(14, 15): "qwe", 16: "asd"}, + (17, (18, "19"), 20), +] + + +@pytest.mark.parametrize("result", [None, [], ["existing"], ["existing1", "existing2"]]) +@pytest.mark.parametrize("nested", [nested_depth, nested_mixed, nested_set]) +def test_flatten(nested, result): + if result is None: + val = flatten(nested, result) + assert len(val) == 20 + else: + _result = copy(result) # because pytest passes parameters as is + nexisting = len(_result) + val = flatten(nested, _result) + assert len(val) == len(_result) == 20 + nexisting + + assert issubclass(type(val), tuple) + + +def test_make_list_of_ints(): + mylist = [1, 2, 3.0, 42, -2] + assert make_list_of_ints(mylist) is mylist + assert make_list_of_ints(mylist) == mylist + assert type(make_list_of_ints(mylist)[2]) is int + pytest.raises(nx.NetworkXError, make_list_of_ints, [1, 2, 3, "kermit"]) + pytest.raises(nx.NetworkXError, make_list_of_ints, [1, 2, 3.1]) + + +def test_random_number_distribution(): + # smoke test only + z = powerlaw_sequence(20, exponent=2.5) + z = discrete_sequence(20, distribution=[0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 3]) + + +class TestNumpyArray: + @classmethod + def setup_class(cls): + global np + np = pytest.importorskip("numpy") + + def test_numpy_to_list_of_ints(self): + a = np.array([1, 2, 3], dtype=np.int64) + b = np.array([1.0, 2, 3]) + c = np.array([1.1, 2, 3]) + assert type(make_list_of_ints(a)) == list + assert make_list_of_ints(b) == list(b) + B = make_list_of_ints(b) + assert type(B[0]) == int + pytest.raises(nx.NetworkXError, make_list_of_ints, c) + + def test__dict_to_numpy_array1(self): + d = {"a": 1, "b": 2} + a = _dict_to_numpy_array1(d, mapping={"a": 0, "b": 1}) + np.testing.assert_allclose(a, np.array([1, 2])) + a = _dict_to_numpy_array1(d, mapping={"b": 0, "a": 1}) + np.testing.assert_allclose(a, np.array([2, 1])) + + a = _dict_to_numpy_array1(d) + np.testing.assert_allclose(a.sum(), 3) + + def test__dict_to_numpy_array2(self): + d = {"a": {"a": 1, "b": 2}, "b": {"a": 10, "b": 20}} + + mapping = {"a": 1, "b": 0} + a = _dict_to_numpy_array2(d, mapping=mapping) + np.testing.assert_allclose(a, np.array([[20, 10], [2, 1]])) + + a = _dict_to_numpy_array2(d) + np.testing.assert_allclose(a.sum(), 33) + + def test_dict_to_numpy_array_a(self): + d = {"a": {"a": 1, "b": 2}, "b": {"a": 10, "b": 20}} + + mapping = {"a": 0, "b": 1} + a = dict_to_numpy_array(d, mapping=mapping) + np.testing.assert_allclose(a, np.array([[1, 2], [10, 20]])) + + mapping = {"a": 1, "b": 0} + a = dict_to_numpy_array(d, mapping=mapping) + np.testing.assert_allclose(a, np.array([[20, 10], [2, 1]])) + + a = _dict_to_numpy_array2(d) + np.testing.assert_allclose(a.sum(), 33) + + def test_dict_to_numpy_array_b(self): + d = {"a": 1, "b": 2} + + mapping = {"a": 0, "b": 1} + a = dict_to_numpy_array(d, mapping=mapping) + np.testing.assert_allclose(a, np.array([1, 2])) + + a = _dict_to_numpy_array1(d) + np.testing.assert_allclose(a.sum(), 3) + + +def test_pairwise(): + nodes = range(4) + node_pairs = [(0, 1), (1, 2), (2, 3)] + node_pairs_cycle = node_pairs + [(3, 0)] + assert list(pairwise(nodes)) == node_pairs + assert list(pairwise(iter(nodes))) == node_pairs + assert list(pairwise(nodes, cyclic=True)) == node_pairs_cycle + empty_iter = iter(()) + assert list(pairwise(empty_iter)) == [] + empty_iter = iter(()) + assert list(pairwise(empty_iter, cyclic=True)) == [] + + +def test_groups(): + many_to_one = dict(zip("abcde", [0, 0, 1, 1, 2])) + actual = groups(many_to_one) + expected = {0: {"a", "b"}, 1: {"c", "d"}, 2: {"e"}} + assert actual == expected + assert {} == groups({}) + + +def test_create_random_state(): + np = pytest.importorskip("numpy") + rs = np.random.RandomState + + assert isinstance(create_random_state(1), rs) + assert isinstance(create_random_state(None), rs) + assert isinstance(create_random_state(np.random), rs) + assert isinstance(create_random_state(rs(1)), rs) + # Support for numpy.random.Generator + rng = np.random.default_rng() + assert isinstance(create_random_state(rng), np.random.Generator) + pytest.raises(ValueError, create_random_state, "a") + + assert np.all(rs(1).rand(10) == create_random_state(1).rand(10)) + + +def test_create_py_random_state(): + pyrs = random.Random + + assert isinstance(create_py_random_state(1), pyrs) + assert isinstance(create_py_random_state(None), pyrs) + assert isinstance(create_py_random_state(pyrs(1)), pyrs) + pytest.raises(ValueError, create_py_random_state, "a") + + np = pytest.importorskip("numpy") + + rs = np.random.RandomState + rng = np.random.default_rng(1000) + rng_explicit = np.random.Generator(np.random.SFC64()) + old_nprs = PythonRandomInterface + nprs = PythonRandomViaNumpyBits + assert isinstance(create_py_random_state(np.random), nprs) + assert isinstance(create_py_random_state(rs(1)), old_nprs) + assert isinstance(create_py_random_state(rng), nprs) + assert isinstance(create_py_random_state(rng_explicit), nprs) + # test default rng input + assert isinstance(PythonRandomInterface(), old_nprs) + assert isinstance(PythonRandomViaNumpyBits(), nprs) + + # VeryLargeIntegers Smoke test (they raise error for np.random) + int64max = 9223372036854775807 # from np.iinfo(np.int64).max + for r in (rng, rs(1)): + prs = create_py_random_state(r) + prs.randrange(3, int64max + 5) + prs.randint(3, int64max + 5) + + +def test_PythonRandomInterface_RandomState(): + np = pytest.importorskip("numpy") + + seed = 42 + rs = np.random.RandomState + rng = PythonRandomInterface(rs(seed)) + rs42 = rs(seed) + + # make sure these functions are same as expected outcome + assert rng.randrange(3, 5) == rs42.randint(3, 5) + assert rng.choice([1, 2, 3]) == rs42.choice([1, 2, 3]) + assert rng.gauss(0, 1) == rs42.normal(0, 1) + assert rng.expovariate(1.5) == rs42.exponential(1 / 1.5) + assert np.all(rng.shuffle([1, 2, 3]) == rs42.shuffle([1, 2, 3])) + assert np.all( + rng.sample([1, 2, 3], 2) == rs42.choice([1, 2, 3], (2,), replace=False) + ) + assert np.all( + [rng.randint(3, 5) for _ in range(100)] + == [rs42.randint(3, 6) for _ in range(100)] + ) + assert rng.random() == rs42.random_sample() + + +def test_PythonRandomInterface_Generator(): + np = pytest.importorskip("numpy") + + seed = 42 + rng = np.random.default_rng(seed) + pri = PythonRandomInterface(np.random.default_rng(seed)) + + # make sure these functions are same as expected outcome + assert pri.randrange(3, 5) == rng.integers(3, 5) + assert pri.choice([1, 2, 3]) == rng.choice([1, 2, 3]) + assert pri.gauss(0, 1) == rng.normal(0, 1) + assert pri.expovariate(1.5) == rng.exponential(1 / 1.5) + assert np.all(pri.shuffle([1, 2, 3]) == rng.shuffle([1, 2, 3])) + assert np.all( + pri.sample([1, 2, 3], 2) == rng.choice([1, 2, 3], (2,), replace=False) + ) + assert np.all( + [pri.randint(3, 5) for _ in range(100)] + == [rng.integers(3, 6) for _ in range(100)] + ) + assert pri.random() == rng.random() + + +@pytest.mark.parametrize( + ("iterable_type", "expected"), ((list, 1), (tuple, 1), (str, "["), (set, 1)) +) +def test_arbitrary_element(iterable_type, expected): + iterable = iterable_type([1, 2, 3]) + assert arbitrary_element(iterable) == expected + + +@pytest.mark.parametrize( + "iterator", + ((i for i in range(3)), iter([1, 2, 3])), # generator +) +def test_arbitrary_element_raises(iterator): + """Value error is raised when input is an iterator.""" + with pytest.raises(ValueError, match="from an iterator"): + arbitrary_element(iterator) diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_random_sequence.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_random_sequence.py new file mode 100644 index 00000000..1d1b9579 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_random_sequence.py @@ -0,0 +1,38 @@ +import pytest + +from networkx.utils import ( + powerlaw_sequence, + random_weighted_sample, + weighted_choice, + zipf_rv, +) + + +def test_degree_sequences(): + seq = powerlaw_sequence(10, seed=1) + seq = powerlaw_sequence(10) + assert len(seq) == 10 + + +def test_zipf_rv(): + r = zipf_rv(2.3, xmin=2, seed=1) + r = zipf_rv(2.3, 2, 1) + r = zipf_rv(2.3) + assert type(r), int + pytest.raises(ValueError, zipf_rv, 0.5) + pytest.raises(ValueError, zipf_rv, 2, xmin=0) + + +def test_random_weighted_sample(): + mapping = {"a": 10, "b": 20} + s = random_weighted_sample(mapping, 2, seed=1) + s = random_weighted_sample(mapping, 2) + assert sorted(s) == sorted(mapping.keys()) + pytest.raises(ValueError, random_weighted_sample, mapping, 3) + + +def test_random_weighted_choice(): + mapping = {"a": 10, "b": 0} + c = weighted_choice(mapping, seed=1) + c = weighted_choice(mapping) + assert c == "a" diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_rcm.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_rcm.py new file mode 100644 index 00000000..88702b36 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_rcm.py @@ -0,0 +1,63 @@ +import networkx as nx +from networkx.utils import reverse_cuthill_mckee_ordering + + +def test_reverse_cuthill_mckee(): + # example graph from + # http://www.boost.org/doc/libs/1_37_0/libs/graph/example/cuthill_mckee_ordering.cpp + G = nx.Graph( + [ + (0, 3), + (0, 5), + (1, 2), + (1, 4), + (1, 6), + (1, 9), + (2, 3), + (2, 4), + (3, 5), + (3, 8), + (4, 6), + (5, 6), + (5, 7), + (6, 7), + ] + ) + rcm = list(reverse_cuthill_mckee_ordering(G)) + assert rcm in [[0, 8, 5, 7, 3, 6, 2, 4, 1, 9], [0, 8, 5, 7, 3, 6, 4, 2, 1, 9]] + + +def test_rcm_alternate_heuristic(): + # example from + G = nx.Graph( + [ + (0, 0), + (0, 4), + (1, 1), + (1, 2), + (1, 5), + (1, 7), + (2, 2), + (2, 4), + (3, 3), + (3, 6), + (4, 4), + (5, 5), + (5, 7), + (6, 6), + (7, 7), + ] + ) + + answers = [ + [6, 3, 5, 7, 1, 2, 4, 0], + [6, 3, 7, 5, 1, 2, 4, 0], + [7, 5, 1, 2, 4, 0, 6, 3], + ] + + def smallest_degree(G): + deg, node = min((d, n) for n, d in G.degree()) + return node + + rcm = list(reverse_cuthill_mckee_ordering(G, heuristic=smallest_degree)) + assert rcm in answers diff --git a/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_unionfind.py b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_unionfind.py new file mode 100644 index 00000000..2d30580f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/networkx/utils/tests/test_unionfind.py @@ -0,0 +1,55 @@ +import networkx as nx + + +def test_unionfind(): + # Fixed by: 2cddd5958689bdecdcd89b91ac9aaf6ce0e4f6b8 + # Previously (in 2.x), the UnionFind class could handle mixed types. + # But in Python 3.x, this causes a TypeError such as: + # TypeError: unorderable types: str() > int() + # + # Now we just make sure that no exception is raised. + x = nx.utils.UnionFind() + x.union(0, "a") + + +def test_subtree_union(): + # See https://github.com/networkx/networkx/pull/3224 + # (35db1b551ee65780794a357794f521d8768d5049). + # Test if subtree unions hare handled correctly by to_sets(). + uf = nx.utils.UnionFind() + uf.union(1, 2) + uf.union(3, 4) + uf.union(4, 5) + uf.union(1, 5) + assert list(uf.to_sets()) == [{1, 2, 3, 4, 5}] + + +def test_unionfind_weights(): + # Tests if weights are computed correctly with unions of many elements + uf = nx.utils.UnionFind() + uf.union(1, 4, 7) + uf.union(2, 5, 8) + uf.union(3, 6, 9) + uf.union(1, 2, 3, 4, 5, 6, 7, 8, 9) + assert uf.weights[uf[1]] == 9 + + +def test_unbalanced_merge_weights(): + # Tests if the largest set's root is used as the new root when merging + uf = nx.utils.UnionFind() + uf.union(1, 2, 3) + uf.union(4, 5, 6, 7, 8, 9) + assert uf.weights[uf[1]] == 3 + assert uf.weights[uf[4]] == 6 + largest_root = uf[4] + uf.union(1, 4) + assert uf[1] == largest_root + assert uf.weights[largest_root] == 9 + + +def test_empty_union(): + # Tests if a null-union does nothing. + uf = nx.utils.UnionFind((0, 1)) + uf.union() + assert uf[0] == 0 + assert uf[1] == 1 |