aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests')
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_connectivity.py421
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_cuts.py309
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_disjoint_paths.py249
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_edge_augmentation.py502
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_edge_kcomponents.py488
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_kcomponents.py296
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_kcutsets.py273
-rw-r--r--.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_stoer_wagner.py102
9 files changed, 2640 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/__init__.py b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_connectivity.py b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_connectivity.py
new file mode 100644
index 00000000..7aef2477
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_connectivity.py
@@ -0,0 +1,421 @@
+import itertools
+
+import pytest
+
+import networkx as nx
+from networkx.algorithms import flow
+from networkx.algorithms.connectivity import (
+ local_edge_connectivity,
+ local_node_connectivity,
+)
+
+flow_funcs = [
+ flow.boykov_kolmogorov,
+ flow.dinitz,
+ flow.edmonds_karp,
+ flow.preflow_push,
+ flow.shortest_augmenting_path,
+]
+
+
+# helper functions for tests
+
+
+def _generate_no_biconnected(max_attempts=50):
+ attempts = 0
+ while True:
+ G = nx.fast_gnp_random_graph(100, 0.0575, seed=42)
+ if nx.is_connected(G) and not nx.is_biconnected(G):
+ attempts = 0
+ yield G
+ else:
+ if attempts >= max_attempts:
+ msg = f"Tried {max_attempts} times: no suitable Graph."
+ raise Exception(msg)
+ else:
+ attempts += 1
+
+
+def test_average_connectivity():
+ # figure 1 from:
+ # Beineke, L., O. Oellermann, and R. Pippert (2002). The average
+ # connectivity of a graph. Discrete mathematics 252(1-3), 31-45
+ # http://www.sciencedirect.com/science/article/pii/S0012365X01001807
+ G1 = nx.path_graph(3)
+ G1.add_edges_from([(1, 3), (1, 4)])
+ G2 = nx.path_graph(3)
+ G2.add_edges_from([(1, 3), (1, 4), (0, 3), (0, 4), (3, 4)])
+ G3 = nx.Graph()
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert nx.average_node_connectivity(G1, **kwargs) == 1, errmsg
+ assert nx.average_node_connectivity(G2, **kwargs) == 2.2, errmsg
+ assert nx.average_node_connectivity(G3, **kwargs) == 0, errmsg
+
+
+def test_average_connectivity_directed():
+ G = nx.DiGraph([(1, 3), (1, 4), (1, 5)])
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert nx.average_node_connectivity(G) == 0.25, errmsg
+
+
+def test_articulation_points():
+ Ggen = _generate_no_biconnected()
+ for flow_func in flow_funcs:
+ for i in range(3):
+ G = next(Ggen)
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert nx.node_connectivity(G, flow_func=flow_func) == 1, errmsg
+
+
+def test_brandes_erlebach():
+ # Figure 1 chapter 7: Connectivity
+ # http://www.informatik.uni-augsburg.de/thi/personen/kammer/Graph_Connectivity.pdf
+ G = nx.Graph()
+ G.add_edges_from(
+ [
+ (1, 2),
+ (1, 3),
+ (1, 4),
+ (1, 5),
+ (2, 3),
+ (2, 6),
+ (3, 4),
+ (3, 6),
+ (4, 6),
+ (4, 7),
+ (5, 7),
+ (6, 8),
+ (6, 9),
+ (7, 8),
+ (7, 10),
+ (8, 11),
+ (9, 10),
+ (9, 11),
+ (10, 11),
+ ]
+ )
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert 3 == local_edge_connectivity(G, 1, 11, **kwargs), errmsg
+ assert 3 == nx.edge_connectivity(G, 1, 11, **kwargs), errmsg
+ assert 2 == local_node_connectivity(G, 1, 11, **kwargs), errmsg
+ assert 2 == nx.node_connectivity(G, 1, 11, **kwargs), errmsg
+ assert 2 == nx.edge_connectivity(G, **kwargs), errmsg
+ assert 2 == nx.node_connectivity(G, **kwargs), errmsg
+ if flow_func is flow.preflow_push:
+ assert 3 == nx.edge_connectivity(G, 1, 11, cutoff=2, **kwargs), errmsg
+ else:
+ assert 2 == nx.edge_connectivity(G, 1, 11, cutoff=2, **kwargs), errmsg
+
+
+def test_white_harary_1():
+ # Figure 1b white and harary (2001)
+ # https://doi.org/10.1111/0081-1750.00098
+ # A graph with high adhesion (edge connectivity) and low cohesion
+ # (vertex connectivity)
+ G = nx.disjoint_union(nx.complete_graph(4), nx.complete_graph(4))
+ G.remove_node(7)
+ for i in range(4, 7):
+ G.add_edge(0, i)
+ G = nx.disjoint_union(G, nx.complete_graph(4))
+ G.remove_node(G.order() - 1)
+ for i in range(7, 10):
+ G.add_edge(0, i)
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert 1 == nx.node_connectivity(G, flow_func=flow_func), errmsg
+ assert 3 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
+
+
+def test_white_harary_2():
+ # Figure 8 white and harary (2001)
+ # https://doi.org/10.1111/0081-1750.00098
+ G = nx.disjoint_union(nx.complete_graph(4), nx.complete_graph(4))
+ G.add_edge(0, 4)
+ # kappa <= lambda <= delta
+ assert 3 == min(nx.core_number(G).values())
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert 1 == nx.node_connectivity(G, flow_func=flow_func), errmsg
+ assert 1 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
+
+
+def test_complete_graphs():
+ for n in range(5, 20, 5):
+ for flow_func in flow_funcs:
+ G = nx.complete_graph(n)
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert n - 1 == nx.node_connectivity(G, flow_func=flow_func), errmsg
+ assert n - 1 == nx.node_connectivity(
+ G.to_directed(), flow_func=flow_func
+ ), errmsg
+ assert n - 1 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
+ assert n - 1 == nx.edge_connectivity(
+ G.to_directed(), flow_func=flow_func
+ ), errmsg
+
+
+def test_empty_graphs():
+ for k in range(5, 25, 5):
+ G = nx.empty_graph(k)
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert 0 == nx.node_connectivity(G, flow_func=flow_func), errmsg
+ assert 0 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
+
+
+def test_petersen():
+ G = nx.petersen_graph()
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert 3 == nx.node_connectivity(G, flow_func=flow_func), errmsg
+ assert 3 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
+
+
+def test_tutte():
+ G = nx.tutte_graph()
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert 3 == nx.node_connectivity(G, flow_func=flow_func), errmsg
+ assert 3 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
+
+
+def test_dodecahedral():
+ G = nx.dodecahedral_graph()
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert 3 == nx.node_connectivity(G, flow_func=flow_func), errmsg
+ assert 3 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
+
+
+def test_octahedral():
+ G = nx.octahedral_graph()
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert 4 == nx.node_connectivity(G, flow_func=flow_func), errmsg
+ assert 4 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
+
+
+def test_icosahedral():
+ G = nx.icosahedral_graph()
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert 5 == nx.node_connectivity(G, flow_func=flow_func), errmsg
+ assert 5 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
+
+
+def test_missing_source():
+ G = nx.path_graph(4)
+ for flow_func in flow_funcs:
+ pytest.raises(
+ nx.NetworkXError, nx.node_connectivity, G, 10, 1, flow_func=flow_func
+ )
+
+
+def test_missing_target():
+ G = nx.path_graph(4)
+ for flow_func in flow_funcs:
+ pytest.raises(
+ nx.NetworkXError, nx.node_connectivity, G, 1, 10, flow_func=flow_func
+ )
+
+
+def test_edge_missing_source():
+ G = nx.path_graph(4)
+ for flow_func in flow_funcs:
+ pytest.raises(
+ nx.NetworkXError, nx.edge_connectivity, G, 10, 1, flow_func=flow_func
+ )
+
+
+def test_edge_missing_target():
+ G = nx.path_graph(4)
+ for flow_func in flow_funcs:
+ pytest.raises(
+ nx.NetworkXError, nx.edge_connectivity, G, 1, 10, flow_func=flow_func
+ )
+
+
+def test_not_weakly_connected():
+ G = nx.DiGraph()
+ nx.add_path(G, [1, 2, 3])
+ nx.add_path(G, [4, 5])
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert nx.node_connectivity(G) == 0, errmsg
+ assert nx.edge_connectivity(G) == 0, errmsg
+
+
+def test_not_connected():
+ G = nx.Graph()
+ nx.add_path(G, [1, 2, 3])
+ nx.add_path(G, [4, 5])
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert nx.node_connectivity(G) == 0, errmsg
+ assert nx.edge_connectivity(G) == 0, errmsg
+
+
+def test_directed_edge_connectivity():
+ G = nx.cycle_graph(10, create_using=nx.DiGraph()) # only one direction
+ D = nx.cycle_graph(10).to_directed() # 2 reciprocal edges
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ assert 1 == nx.edge_connectivity(G, flow_func=flow_func), errmsg
+ assert 1 == local_edge_connectivity(G, 1, 4, flow_func=flow_func), errmsg
+ assert 1 == nx.edge_connectivity(G, 1, 4, flow_func=flow_func), errmsg
+ assert 2 == nx.edge_connectivity(D, flow_func=flow_func), errmsg
+ assert 2 == local_edge_connectivity(D, 1, 4, flow_func=flow_func), errmsg
+ assert 2 == nx.edge_connectivity(D, 1, 4, flow_func=flow_func), errmsg
+
+
+def test_cutoff():
+ G = nx.complete_graph(5)
+ for local_func in [local_edge_connectivity, local_node_connectivity]:
+ for flow_func in flow_funcs:
+ if flow_func is flow.preflow_push:
+ # cutoff is not supported by preflow_push
+ continue
+ for cutoff in [3, 2, 1]:
+ result = local_func(G, 0, 4, flow_func=flow_func, cutoff=cutoff)
+ assert cutoff == result, f"cutoff error in {flow_func.__name__}"
+
+
+def test_invalid_auxiliary():
+ G = nx.complete_graph(5)
+ pytest.raises(nx.NetworkXError, local_node_connectivity, G, 0, 3, auxiliary=G)
+
+
+def test_interface_only_source():
+ G = nx.complete_graph(5)
+ for interface_func in [nx.node_connectivity, nx.edge_connectivity]:
+ pytest.raises(nx.NetworkXError, interface_func, G, s=0)
+
+
+def test_interface_only_target():
+ G = nx.complete_graph(5)
+ for interface_func in [nx.node_connectivity, nx.edge_connectivity]:
+ pytest.raises(nx.NetworkXError, interface_func, G, t=3)
+
+
+def test_edge_connectivity_flow_vs_stoer_wagner():
+ graph_funcs = [nx.icosahedral_graph, nx.octahedral_graph, nx.dodecahedral_graph]
+ for graph_func in graph_funcs:
+ G = graph_func()
+ assert nx.stoer_wagner(G)[0] == nx.edge_connectivity(G)
+
+
+class TestAllPairsNodeConnectivity:
+ @classmethod
+ def setup_class(cls):
+ cls.path = nx.path_graph(7)
+ cls.directed_path = nx.path_graph(7, create_using=nx.DiGraph())
+ cls.cycle = nx.cycle_graph(7)
+ cls.directed_cycle = nx.cycle_graph(7, create_using=nx.DiGraph())
+ cls.gnp = nx.gnp_random_graph(30, 0.1, seed=42)
+ cls.directed_gnp = nx.gnp_random_graph(30, 0.1, directed=True, seed=42)
+ cls.K20 = nx.complete_graph(20)
+ cls.K10 = nx.complete_graph(10)
+ cls.K5 = nx.complete_graph(5)
+ cls.G_list = [
+ cls.path,
+ cls.directed_path,
+ cls.cycle,
+ cls.directed_cycle,
+ cls.gnp,
+ cls.directed_gnp,
+ cls.K10,
+ cls.K5,
+ cls.K20,
+ ]
+
+ def test_cycles(self):
+ K_undir = nx.all_pairs_node_connectivity(self.cycle)
+ for source in K_undir:
+ for target, k in K_undir[source].items():
+ assert k == 2
+ K_dir = nx.all_pairs_node_connectivity(self.directed_cycle)
+ for source in K_dir:
+ for target, k in K_dir[source].items():
+ assert k == 1
+
+ def test_complete(self):
+ for G in [self.K10, self.K5, self.K20]:
+ K = nx.all_pairs_node_connectivity(G)
+ for source in K:
+ for target, k in K[source].items():
+ assert k == len(G) - 1
+
+ def test_paths(self):
+ K_undir = nx.all_pairs_node_connectivity(self.path)
+ for source in K_undir:
+ for target, k in K_undir[source].items():
+ assert k == 1
+ K_dir = nx.all_pairs_node_connectivity(self.directed_path)
+ for source in K_dir:
+ for target, k in K_dir[source].items():
+ if source < target:
+ assert k == 1
+ else:
+ assert k == 0
+
+ def test_all_pairs_connectivity_nbunch(self):
+ G = nx.complete_graph(5)
+ nbunch = [0, 2, 3]
+ C = nx.all_pairs_node_connectivity(G, nbunch=nbunch)
+ assert len(C) == len(nbunch)
+
+ def test_all_pairs_connectivity_icosahedral(self):
+ G = nx.icosahedral_graph()
+ C = nx.all_pairs_node_connectivity(G)
+ assert all(5 == C[u][v] for u, v in itertools.combinations(G, 2))
+
+ def test_all_pairs_connectivity(self):
+ G = nx.Graph()
+ nodes = [0, 1, 2, 3]
+ nx.add_path(G, nodes)
+ A = {n: {} for n in G}
+ for u, v in itertools.combinations(nodes, 2):
+ A[u][v] = A[v][u] = nx.node_connectivity(G, u, v)
+ C = nx.all_pairs_node_connectivity(G)
+ assert sorted((k, sorted(v)) for k, v in A.items()) == sorted(
+ (k, sorted(v)) for k, v in C.items()
+ )
+
+ def test_all_pairs_connectivity_directed(self):
+ G = nx.DiGraph()
+ nodes = [0, 1, 2, 3]
+ nx.add_path(G, nodes)
+ A = {n: {} for n in G}
+ for u, v in itertools.permutations(nodes, 2):
+ A[u][v] = nx.node_connectivity(G, u, v)
+ C = nx.all_pairs_node_connectivity(G)
+ assert sorted((k, sorted(v)) for k, v in A.items()) == sorted(
+ (k, sorted(v)) for k, v in C.items()
+ )
+
+ def test_all_pairs_connectivity_nbunch_combinations(self):
+ G = nx.complete_graph(5)
+ nbunch = [0, 2, 3]
+ A = {n: {} for n in nbunch}
+ for u, v in itertools.combinations(nbunch, 2):
+ A[u][v] = A[v][u] = nx.node_connectivity(G, u, v)
+ C = nx.all_pairs_node_connectivity(G, nbunch=nbunch)
+ assert sorted((k, sorted(v)) for k, v in A.items()) == sorted(
+ (k, sorted(v)) for k, v in C.items()
+ )
+
+ def test_all_pairs_connectivity_nbunch_iter(self):
+ G = nx.complete_graph(5)
+ nbunch = [0, 2, 3]
+ A = {n: {} for n in nbunch}
+ for u, v in itertools.combinations(nbunch, 2):
+ A[u][v] = A[v][u] = nx.node_connectivity(G, u, v)
+ C = nx.all_pairs_node_connectivity(G, nbunch=iter(nbunch))
+ assert sorted((k, sorted(v)) for k, v in A.items()) == sorted(
+ (k, sorted(v)) for k, v in C.items()
+ )
diff --git a/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_cuts.py b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_cuts.py
new file mode 100644
index 00000000..7a485be3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_cuts.py
@@ -0,0 +1,309 @@
+import pytest
+
+import networkx as nx
+from networkx.algorithms import flow
+from networkx.algorithms.connectivity import minimum_st_edge_cut, minimum_st_node_cut
+from networkx.utils import arbitrary_element
+
+flow_funcs = [
+ flow.boykov_kolmogorov,
+ flow.dinitz,
+ flow.edmonds_karp,
+ flow.preflow_push,
+ flow.shortest_augmenting_path,
+]
+
+# Tests for node and edge cutsets
+
+
+def _generate_no_biconnected(max_attempts=50):
+ attempts = 0
+ while True:
+ G = nx.fast_gnp_random_graph(100, 0.0575, seed=42)
+ if nx.is_connected(G) and not nx.is_biconnected(G):
+ attempts = 0
+ yield G
+ else:
+ if attempts >= max_attempts:
+ msg = f"Tried {attempts} times: no suitable Graph."
+ raise Exception(msg)
+ else:
+ attempts += 1
+
+
+def test_articulation_points():
+ Ggen = _generate_no_biconnected()
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ for i in range(1): # change 1 to 3 or more for more realizations.
+ G = next(Ggen)
+ cut = nx.minimum_node_cut(G, flow_func=flow_func)
+ assert len(cut) == 1, errmsg
+ assert cut.pop() in set(nx.articulation_points(G)), errmsg
+
+
+def test_brandes_erlebach_book():
+ # Figure 1 chapter 7: Connectivity
+ # http://www.informatik.uni-augsburg.de/thi/personen/kammer/Graph_Connectivity.pdf
+ G = nx.Graph()
+ G.add_edges_from(
+ [
+ (1, 2),
+ (1, 3),
+ (1, 4),
+ (1, 5),
+ (2, 3),
+ (2, 6),
+ (3, 4),
+ (3, 6),
+ (4, 6),
+ (4, 7),
+ (5, 7),
+ (6, 8),
+ (6, 9),
+ (7, 8),
+ (7, 10),
+ (8, 11),
+ (9, 10),
+ (9, 11),
+ (10, 11),
+ ]
+ )
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ # edge cutsets
+ assert 3 == len(nx.minimum_edge_cut(G, 1, 11, **kwargs)), errmsg
+ edge_cut = nx.minimum_edge_cut(G, **kwargs)
+ # Node 5 has only two edges
+ assert 2 == len(edge_cut), errmsg
+ H = G.copy()
+ H.remove_edges_from(edge_cut)
+ assert not nx.is_connected(H), errmsg
+ # node cuts
+ assert {6, 7} == minimum_st_node_cut(G, 1, 11, **kwargs), errmsg
+ assert {6, 7} == nx.minimum_node_cut(G, 1, 11, **kwargs), errmsg
+ node_cut = nx.minimum_node_cut(G, **kwargs)
+ assert 2 == len(node_cut), errmsg
+ H = G.copy()
+ H.remove_nodes_from(node_cut)
+ assert not nx.is_connected(H), errmsg
+
+
+def test_white_harary_paper():
+ # Figure 1b white and harary (2001)
+ # https://doi.org/10.1111/0081-1750.00098
+ # A graph with high adhesion (edge connectivity) and low cohesion
+ # (node connectivity)
+ G = nx.disjoint_union(nx.complete_graph(4), nx.complete_graph(4))
+ G.remove_node(7)
+ for i in range(4, 7):
+ G.add_edge(0, i)
+ G = nx.disjoint_union(G, nx.complete_graph(4))
+ G.remove_node(G.order() - 1)
+ for i in range(7, 10):
+ G.add_edge(0, i)
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ # edge cuts
+ edge_cut = nx.minimum_edge_cut(G, **kwargs)
+ assert 3 == len(edge_cut), errmsg
+ H = G.copy()
+ H.remove_edges_from(edge_cut)
+ assert not nx.is_connected(H), errmsg
+ # node cuts
+ node_cut = nx.minimum_node_cut(G, **kwargs)
+ assert {0} == node_cut, errmsg
+ H = G.copy()
+ H.remove_nodes_from(node_cut)
+ assert not nx.is_connected(H), errmsg
+
+
+def test_petersen_cutset():
+ G = nx.petersen_graph()
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ # edge cuts
+ edge_cut = nx.minimum_edge_cut(G, **kwargs)
+ assert 3 == len(edge_cut), errmsg
+ H = G.copy()
+ H.remove_edges_from(edge_cut)
+ assert not nx.is_connected(H), errmsg
+ # node cuts
+ node_cut = nx.minimum_node_cut(G, **kwargs)
+ assert 3 == len(node_cut), errmsg
+ H = G.copy()
+ H.remove_nodes_from(node_cut)
+ assert not nx.is_connected(H), errmsg
+
+
+def test_octahedral_cutset():
+ G = nx.octahedral_graph()
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ # edge cuts
+ edge_cut = nx.minimum_edge_cut(G, **kwargs)
+ assert 4 == len(edge_cut), errmsg
+ H = G.copy()
+ H.remove_edges_from(edge_cut)
+ assert not nx.is_connected(H), errmsg
+ # node cuts
+ node_cut = nx.minimum_node_cut(G, **kwargs)
+ assert 4 == len(node_cut), errmsg
+ H = G.copy()
+ H.remove_nodes_from(node_cut)
+ assert not nx.is_connected(H), errmsg
+
+
+def test_icosahedral_cutset():
+ G = nx.icosahedral_graph()
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ # edge cuts
+ edge_cut = nx.minimum_edge_cut(G, **kwargs)
+ assert 5 == len(edge_cut), errmsg
+ H = G.copy()
+ H.remove_edges_from(edge_cut)
+ assert not nx.is_connected(H), errmsg
+ # node cuts
+ node_cut = nx.minimum_node_cut(G, **kwargs)
+ assert 5 == len(node_cut), errmsg
+ H = G.copy()
+ H.remove_nodes_from(node_cut)
+ assert not nx.is_connected(H), errmsg
+
+
+def test_node_cutset_exception():
+ G = nx.Graph()
+ G.add_edges_from([(1, 2), (3, 4)])
+ for flow_func in flow_funcs:
+ pytest.raises(nx.NetworkXError, nx.minimum_node_cut, G, flow_func=flow_func)
+
+
+def test_node_cutset_random_graphs():
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ for i in range(3):
+ G = nx.fast_gnp_random_graph(50, 0.25, seed=42)
+ if not nx.is_connected(G):
+ ccs = iter(nx.connected_components(G))
+ start = arbitrary_element(next(ccs))
+ G.add_edges_from((start, arbitrary_element(c)) for c in ccs)
+ cutset = nx.minimum_node_cut(G, flow_func=flow_func)
+ assert nx.node_connectivity(G) == len(cutset), errmsg
+ G.remove_nodes_from(cutset)
+ assert not nx.is_connected(G), errmsg
+
+
+def test_edge_cutset_random_graphs():
+ for flow_func in flow_funcs:
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ for i in range(3):
+ G = nx.fast_gnp_random_graph(50, 0.25, seed=42)
+ if not nx.is_connected(G):
+ ccs = iter(nx.connected_components(G))
+ start = arbitrary_element(next(ccs))
+ G.add_edges_from((start, arbitrary_element(c)) for c in ccs)
+ cutset = nx.minimum_edge_cut(G, flow_func=flow_func)
+ assert nx.edge_connectivity(G) == len(cutset), errmsg
+ G.remove_edges_from(cutset)
+ assert not nx.is_connected(G), errmsg
+
+
+def test_empty_graphs():
+ G = nx.Graph()
+ D = nx.DiGraph()
+ for interface_func in [nx.minimum_node_cut, nx.minimum_edge_cut]:
+ for flow_func in flow_funcs:
+ pytest.raises(
+ nx.NetworkXPointlessConcept, interface_func, G, flow_func=flow_func
+ )
+ pytest.raises(
+ nx.NetworkXPointlessConcept, interface_func, D, flow_func=flow_func
+ )
+
+
+def test_unbounded():
+ G = nx.complete_graph(5)
+ for flow_func in flow_funcs:
+ assert 4 == len(minimum_st_edge_cut(G, 1, 4, flow_func=flow_func))
+
+
+def test_missing_source():
+ G = nx.path_graph(4)
+ for interface_func in [nx.minimum_edge_cut, nx.minimum_node_cut]:
+ for flow_func in flow_funcs:
+ pytest.raises(
+ nx.NetworkXError, interface_func, G, 10, 1, flow_func=flow_func
+ )
+
+
+def test_missing_target():
+ G = nx.path_graph(4)
+ for interface_func in [nx.minimum_edge_cut, nx.minimum_node_cut]:
+ for flow_func in flow_funcs:
+ pytest.raises(
+ nx.NetworkXError, interface_func, G, 1, 10, flow_func=flow_func
+ )
+
+
+def test_not_weakly_connected():
+ G = nx.DiGraph()
+ nx.add_path(G, [1, 2, 3])
+ nx.add_path(G, [4, 5])
+ for interface_func in [nx.minimum_edge_cut, nx.minimum_node_cut]:
+ for flow_func in flow_funcs:
+ pytest.raises(nx.NetworkXError, interface_func, G, flow_func=flow_func)
+
+
+def test_not_connected():
+ G = nx.Graph()
+ nx.add_path(G, [1, 2, 3])
+ nx.add_path(G, [4, 5])
+ for interface_func in [nx.minimum_edge_cut, nx.minimum_node_cut]:
+ for flow_func in flow_funcs:
+ pytest.raises(nx.NetworkXError, interface_func, G, flow_func=flow_func)
+
+
+def tests_min_cut_complete():
+ G = nx.complete_graph(5)
+ for interface_func in [nx.minimum_edge_cut, nx.minimum_node_cut]:
+ for flow_func in flow_funcs:
+ assert 4 == len(interface_func(G, flow_func=flow_func))
+
+
+def tests_min_cut_complete_directed():
+ G = nx.complete_graph(5)
+ G = G.to_directed()
+ for interface_func in [nx.minimum_edge_cut, nx.minimum_node_cut]:
+ for flow_func in flow_funcs:
+ assert 4 == len(interface_func(G, flow_func=flow_func))
+
+
+def tests_minimum_st_node_cut():
+ G = nx.Graph()
+ G.add_nodes_from([0, 1, 2, 3, 7, 8, 11, 12])
+ G.add_edges_from([(7, 11), (1, 11), (1, 12), (12, 8), (0, 1)])
+ nodelist = minimum_st_node_cut(G, 7, 11)
+ assert nodelist == {}
+
+
+def test_invalid_auxiliary():
+ G = nx.complete_graph(5)
+ pytest.raises(nx.NetworkXError, minimum_st_node_cut, G, 0, 3, auxiliary=G)
+
+
+def test_interface_only_source():
+ G = nx.complete_graph(5)
+ for interface_func in [nx.minimum_node_cut, nx.minimum_edge_cut]:
+ pytest.raises(nx.NetworkXError, interface_func, G, s=0)
+
+
+def test_interface_only_target():
+ G = nx.complete_graph(5)
+ for interface_func in [nx.minimum_node_cut, nx.minimum_edge_cut]:
+ pytest.raises(nx.NetworkXError, interface_func, G, t=3)
diff --git a/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_disjoint_paths.py b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_disjoint_paths.py
new file mode 100644
index 00000000..0c0fad9f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_disjoint_paths.py
@@ -0,0 +1,249 @@
+import pytest
+
+import networkx as nx
+from networkx.algorithms import flow
+from networkx.utils import pairwise
+
+flow_funcs = [
+ flow.boykov_kolmogorov,
+ flow.edmonds_karp,
+ flow.dinitz,
+ flow.preflow_push,
+ flow.shortest_augmenting_path,
+]
+
+
+def is_path(G, path):
+ return all(v in G[u] for u, v in pairwise(path))
+
+
+def are_edge_disjoint_paths(G, paths):
+ if not paths:
+ return False
+ for path in paths:
+ assert is_path(G, path)
+ paths_edges = [list(pairwise(p)) for p in paths]
+ num_of_edges = sum(len(e) for e in paths_edges)
+ num_unique_edges = len(set.union(*[set(es) for es in paths_edges]))
+ if num_of_edges == num_unique_edges:
+ return True
+ return False
+
+
+def are_node_disjoint_paths(G, paths):
+ if not paths:
+ return False
+ for path in paths:
+ assert is_path(G, path)
+ # first and last nodes are source and target
+ st = {paths[0][0], paths[0][-1]}
+ num_of_nodes = len([n for path in paths for n in path if n not in st])
+ num_unique_nodes = len({n for path in paths for n in path if n not in st})
+ if num_of_nodes == num_unique_nodes:
+ return True
+ return False
+
+
+def test_graph_from_pr_2053():
+ G = nx.Graph()
+ G.add_edges_from(
+ [
+ ("A", "B"),
+ ("A", "D"),
+ ("A", "F"),
+ ("A", "G"),
+ ("B", "C"),
+ ("B", "D"),
+ ("B", "G"),
+ ("C", "D"),
+ ("C", "E"),
+ ("C", "Z"),
+ ("D", "E"),
+ ("D", "F"),
+ ("E", "F"),
+ ("E", "Z"),
+ ("F", "Z"),
+ ("G", "Z"),
+ ]
+ )
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ # edge disjoint paths
+ edge_paths = list(nx.edge_disjoint_paths(G, "A", "Z", **kwargs))
+ assert are_edge_disjoint_paths(G, edge_paths), errmsg
+ assert nx.edge_connectivity(G, "A", "Z") == len(edge_paths), errmsg
+ # node disjoint paths
+ node_paths = list(nx.node_disjoint_paths(G, "A", "Z", **kwargs))
+ assert are_node_disjoint_paths(G, node_paths), errmsg
+ assert nx.node_connectivity(G, "A", "Z") == len(node_paths), errmsg
+
+
+def test_florentine_families():
+ G = nx.florentine_families_graph()
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ # edge disjoint paths
+ edge_dpaths = list(nx.edge_disjoint_paths(G, "Medici", "Strozzi", **kwargs))
+ assert are_edge_disjoint_paths(G, edge_dpaths), errmsg
+ assert nx.edge_connectivity(G, "Medici", "Strozzi") == len(edge_dpaths), errmsg
+ # node disjoint paths
+ node_dpaths = list(nx.node_disjoint_paths(G, "Medici", "Strozzi", **kwargs))
+ assert are_node_disjoint_paths(G, node_dpaths), errmsg
+ assert nx.node_connectivity(G, "Medici", "Strozzi") == len(node_dpaths), errmsg
+
+
+def test_karate():
+ G = nx.karate_club_graph()
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ # edge disjoint paths
+ edge_dpaths = list(nx.edge_disjoint_paths(G, 0, 33, **kwargs))
+ assert are_edge_disjoint_paths(G, edge_dpaths), errmsg
+ assert nx.edge_connectivity(G, 0, 33) == len(edge_dpaths), errmsg
+ # node disjoint paths
+ node_dpaths = list(nx.node_disjoint_paths(G, 0, 33, **kwargs))
+ assert are_node_disjoint_paths(G, node_dpaths), errmsg
+ assert nx.node_connectivity(G, 0, 33) == len(node_dpaths), errmsg
+
+
+def test_petersen_disjoint_paths():
+ G = nx.petersen_graph()
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ # edge disjoint paths
+ edge_dpaths = list(nx.edge_disjoint_paths(G, 0, 6, **kwargs))
+ assert are_edge_disjoint_paths(G, edge_dpaths), errmsg
+ assert 3 == len(edge_dpaths), errmsg
+ # node disjoint paths
+ node_dpaths = list(nx.node_disjoint_paths(G, 0, 6, **kwargs))
+ assert are_node_disjoint_paths(G, node_dpaths), errmsg
+ assert 3 == len(node_dpaths), errmsg
+
+
+def test_octahedral_disjoint_paths():
+ G = nx.octahedral_graph()
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ # edge disjoint paths
+ edge_dpaths = list(nx.edge_disjoint_paths(G, 0, 5, **kwargs))
+ assert are_edge_disjoint_paths(G, edge_dpaths), errmsg
+ assert 4 == len(edge_dpaths), errmsg
+ # node disjoint paths
+ node_dpaths = list(nx.node_disjoint_paths(G, 0, 5, **kwargs))
+ assert are_node_disjoint_paths(G, node_dpaths), errmsg
+ assert 4 == len(node_dpaths), errmsg
+
+
+def test_icosahedral_disjoint_paths():
+ G = nx.icosahedral_graph()
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ # edge disjoint paths
+ edge_dpaths = list(nx.edge_disjoint_paths(G, 0, 6, **kwargs))
+ assert are_edge_disjoint_paths(G, edge_dpaths), errmsg
+ assert 5 == len(edge_dpaths), errmsg
+ # node disjoint paths
+ node_dpaths = list(nx.node_disjoint_paths(G, 0, 6, **kwargs))
+ assert are_node_disjoint_paths(G, node_dpaths), errmsg
+ assert 5 == len(node_dpaths), errmsg
+
+
+def test_cutoff_disjoint_paths():
+ G = nx.icosahedral_graph()
+ for flow_func in flow_funcs:
+ kwargs = {"flow_func": flow_func}
+ errmsg = f"Assertion failed in function: {flow_func.__name__}"
+ for cutoff in [2, 4]:
+ kwargs["cutoff"] = cutoff
+ # edge disjoint paths
+ edge_dpaths = list(nx.edge_disjoint_paths(G, 0, 6, **kwargs))
+ assert are_edge_disjoint_paths(G, edge_dpaths), errmsg
+ assert cutoff == len(edge_dpaths), errmsg
+ # node disjoint paths
+ node_dpaths = list(nx.node_disjoint_paths(G, 0, 6, **kwargs))
+ assert are_node_disjoint_paths(G, node_dpaths), errmsg
+ assert cutoff == len(node_dpaths), errmsg
+
+
+def test_missing_source_edge_paths():
+ with pytest.raises(nx.NetworkXError):
+ G = nx.path_graph(4)
+ list(nx.edge_disjoint_paths(G, 10, 1))
+
+
+def test_missing_source_node_paths():
+ with pytest.raises(nx.NetworkXError):
+ G = nx.path_graph(4)
+ list(nx.node_disjoint_paths(G, 10, 1))
+
+
+def test_missing_target_edge_paths():
+ with pytest.raises(nx.NetworkXError):
+ G = nx.path_graph(4)
+ list(nx.edge_disjoint_paths(G, 1, 10))
+
+
+def test_missing_target_node_paths():
+ with pytest.raises(nx.NetworkXError):
+ G = nx.path_graph(4)
+ list(nx.node_disjoint_paths(G, 1, 10))
+
+
+def test_not_weakly_connected_edges():
+ with pytest.raises(nx.NetworkXNoPath):
+ G = nx.DiGraph()
+ nx.add_path(G, [1, 2, 3])
+ nx.add_path(G, [4, 5])
+ list(nx.edge_disjoint_paths(G, 1, 5))
+
+
+def test_not_weakly_connected_nodes():
+ with pytest.raises(nx.NetworkXNoPath):
+ G = nx.DiGraph()
+ nx.add_path(G, [1, 2, 3])
+ nx.add_path(G, [4, 5])
+ list(nx.node_disjoint_paths(G, 1, 5))
+
+
+def test_not_connected_edges():
+ with pytest.raises(nx.NetworkXNoPath):
+ G = nx.Graph()
+ nx.add_path(G, [1, 2, 3])
+ nx.add_path(G, [4, 5])
+ list(nx.edge_disjoint_paths(G, 1, 5))
+
+
+def test_not_connected_nodes():
+ with pytest.raises(nx.NetworkXNoPath):
+ G = nx.Graph()
+ nx.add_path(G, [1, 2, 3])
+ nx.add_path(G, [4, 5])
+ list(nx.node_disjoint_paths(G, 1, 5))
+
+
+def test_isolated_edges():
+ with pytest.raises(nx.NetworkXNoPath):
+ G = nx.Graph()
+ G.add_node(1)
+ nx.add_path(G, [4, 5])
+ list(nx.edge_disjoint_paths(G, 1, 5))
+
+
+def test_isolated_nodes():
+ with pytest.raises(nx.NetworkXNoPath):
+ G = nx.Graph()
+ G.add_node(1)
+ nx.add_path(G, [4, 5])
+ list(nx.node_disjoint_paths(G, 1, 5))
+
+
+def test_invalid_auxiliary():
+ with pytest.raises(nx.NetworkXError):
+ G = nx.complete_graph(5)
+ list(nx.node_disjoint_paths(G, 0, 3, auxiliary=G))
diff --git a/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_edge_augmentation.py b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_edge_augmentation.py
new file mode 100644
index 00000000..e1d92d99
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_edge_augmentation.py
@@ -0,0 +1,502 @@
+import itertools as it
+import random
+
+import pytest
+
+import networkx as nx
+from networkx.algorithms.connectivity import k_edge_augmentation
+from networkx.algorithms.connectivity.edge_augmentation import (
+ _unpack_available_edges,
+ collapse,
+ complement_edges,
+ is_k_edge_connected,
+ is_locally_k_edge_connected,
+)
+from networkx.utils import pairwise
+
+# This should be set to the largest k for which an efficient algorithm is
+# explicitly defined.
+MAX_EFFICIENT_K = 2
+
+
+def tarjan_bridge_graph():
+ # graph from tarjan paper
+ # RE Tarjan - "A note on finding the bridges of a graph"
+ # Information Processing Letters, 1974 - Elsevier
+ # doi:10.1016/0020-0190(74)90003-9.
+ # define 2-connected components and bridges
+ ccs = [
+ (1, 2, 4, 3, 1, 4),
+ (5, 6, 7, 5),
+ (8, 9, 10, 8),
+ (17, 18, 16, 15, 17),
+ (11, 12, 14, 13, 11, 14),
+ ]
+ bridges = [(4, 8), (3, 5), (3, 17)]
+ G = nx.Graph(it.chain(*(pairwise(path) for path in ccs + bridges)))
+ return G
+
+
+def test_weight_key():
+ G = nx.Graph()
+ G.add_nodes_from([1, 2, 3, 4, 5, 6, 7, 8, 9])
+ G.add_edges_from([(3, 8), (1, 2), (2, 3)])
+ impossible = {(3, 6), (3, 9)}
+ rng = random.Random(0)
+ avail_uv = list(set(complement_edges(G)) - impossible)
+ avail = [(u, v, {"cost": rng.random()}) for u, v in avail_uv]
+
+ _augment_and_check(G, k=1)
+ _augment_and_check(G, k=1, avail=avail_uv)
+ _augment_and_check(G, k=1, avail=avail, weight="cost")
+
+ _check_augmentations(G, avail, weight="cost")
+
+
+def test_is_locally_k_edge_connected_exceptions():
+ pytest.raises(nx.NetworkXNotImplemented, is_k_edge_connected, nx.DiGraph(), k=0)
+ pytest.raises(nx.NetworkXNotImplemented, is_k_edge_connected, nx.MultiGraph(), k=0)
+ pytest.raises(ValueError, is_k_edge_connected, nx.Graph(), k=0)
+
+
+def test_is_k_edge_connected():
+ G = nx.barbell_graph(10, 0)
+ assert is_k_edge_connected(G, k=1)
+ assert not is_k_edge_connected(G, k=2)
+
+ G = nx.Graph()
+ G.add_nodes_from([5, 15])
+ assert not is_k_edge_connected(G, k=1)
+ assert not is_k_edge_connected(G, k=2)
+
+ G = nx.complete_graph(5)
+ assert is_k_edge_connected(G, k=1)
+ assert is_k_edge_connected(G, k=2)
+ assert is_k_edge_connected(G, k=3)
+ assert is_k_edge_connected(G, k=4)
+
+ G = nx.compose(nx.complete_graph([0, 1, 2]), nx.complete_graph([3, 4, 5]))
+ assert not is_k_edge_connected(G, k=1)
+ assert not is_k_edge_connected(G, k=2)
+ assert not is_k_edge_connected(G, k=3)
+
+
+def test_is_k_edge_connected_exceptions():
+ pytest.raises(
+ nx.NetworkXNotImplemented, is_locally_k_edge_connected, nx.DiGraph(), 1, 2, k=0
+ )
+ pytest.raises(
+ nx.NetworkXNotImplemented,
+ is_locally_k_edge_connected,
+ nx.MultiGraph(),
+ 1,
+ 2,
+ k=0,
+ )
+ pytest.raises(ValueError, is_locally_k_edge_connected, nx.Graph(), 1, 2, k=0)
+
+
+def test_is_locally_k_edge_connected():
+ G = nx.barbell_graph(10, 0)
+ assert is_locally_k_edge_connected(G, 5, 15, k=1)
+ assert not is_locally_k_edge_connected(G, 5, 15, k=2)
+
+ G = nx.Graph()
+ G.add_nodes_from([5, 15])
+ assert not is_locally_k_edge_connected(G, 5, 15, k=2)
+
+
+def test_null_graph():
+ G = nx.Graph()
+ _check_augmentations(G, max_k=MAX_EFFICIENT_K + 2)
+
+
+def test_cliques():
+ for n in range(1, 10):
+ G = nx.complete_graph(n)
+ _check_augmentations(G, max_k=MAX_EFFICIENT_K + 2)
+
+
+def test_clique_and_node():
+ for n in range(1, 10):
+ G = nx.complete_graph(n)
+ G.add_node(n + 1)
+ _check_augmentations(G, max_k=MAX_EFFICIENT_K + 2)
+
+
+def test_point_graph():
+ G = nx.Graph()
+ G.add_node(1)
+ _check_augmentations(G, max_k=MAX_EFFICIENT_K + 2)
+
+
+def test_edgeless_graph():
+ G = nx.Graph()
+ G.add_nodes_from([1, 2, 3, 4])
+ _check_augmentations(G)
+
+
+def test_invalid_k():
+ G = nx.Graph()
+ pytest.raises(ValueError, list, k_edge_augmentation(G, k=-1))
+ pytest.raises(ValueError, list, k_edge_augmentation(G, k=0))
+
+
+def test_unfeasible():
+ G = tarjan_bridge_graph()
+ pytest.raises(nx.NetworkXUnfeasible, list, k_edge_augmentation(G, k=1, avail=[]))
+
+ pytest.raises(nx.NetworkXUnfeasible, list, k_edge_augmentation(G, k=2, avail=[]))
+
+ pytest.raises(
+ nx.NetworkXUnfeasible, list, k_edge_augmentation(G, k=2, avail=[(7, 9)])
+ )
+
+ # partial solutions should not error if real solutions are infeasible
+ aug_edges = list(k_edge_augmentation(G, k=2, avail=[(7, 9)], partial=True))
+ assert aug_edges == [(7, 9)]
+
+ _check_augmentations(G, avail=[], max_k=MAX_EFFICIENT_K + 2)
+
+ _check_augmentations(G, avail=[(7, 9)], max_k=MAX_EFFICIENT_K + 2)
+
+
+def test_tarjan():
+ G = tarjan_bridge_graph()
+
+ aug_edges = set(_augment_and_check(G, k=2)[0])
+ print(f"aug_edges = {aug_edges!r}")
+ # can't assert edge exactly equality due to non-determinant edge order
+ # but we do know the size of the solution must be 3
+ assert len(aug_edges) == 3
+
+ avail = [
+ (9, 7),
+ (8, 5),
+ (2, 10),
+ (6, 13),
+ (11, 18),
+ (1, 17),
+ (2, 3),
+ (16, 17),
+ (18, 14),
+ (15, 14),
+ ]
+ aug_edges = set(_augment_and_check(G, avail=avail, k=2)[0])
+
+ # Can't assert exact length since approximation depends on the order of a
+ # dict traversal.
+ assert len(aug_edges) <= 3 * 2
+
+ _check_augmentations(G, avail)
+
+
+def test_configuration():
+ # seeds = [2718183590, 2470619828, 1694705158, 3001036531, 2401251497]
+ seeds = [1001, 1002, 1003, 1004]
+ for seed in seeds:
+ deg_seq = nx.random_powerlaw_tree_sequence(20, seed=seed, tries=5000)
+ G = nx.Graph(nx.configuration_model(deg_seq, seed=seed))
+ G.remove_edges_from(nx.selfloop_edges(G))
+ _check_augmentations(G)
+
+
+def test_shell():
+ # seeds = [2057382236, 3331169846, 1840105863, 476020778, 2247498425]
+ seeds = [18]
+ for seed in seeds:
+ constructor = [(12, 70, 0.8), (15, 40, 0.6)]
+ G = nx.random_shell_graph(constructor, seed=seed)
+ _check_augmentations(G)
+
+
+def test_karate():
+ G = nx.karate_club_graph()
+ _check_augmentations(G)
+
+
+def test_star():
+ G = nx.star_graph(3)
+ _check_augmentations(G)
+
+ G = nx.star_graph(5)
+ _check_augmentations(G)
+
+ G = nx.star_graph(10)
+ _check_augmentations(G)
+
+
+def test_barbell():
+ G = nx.barbell_graph(5, 0)
+ _check_augmentations(G)
+
+ G = nx.barbell_graph(5, 2)
+ _check_augmentations(G)
+
+ G = nx.barbell_graph(5, 3)
+ _check_augmentations(G)
+
+ G = nx.barbell_graph(5, 4)
+ _check_augmentations(G)
+
+
+def test_bridge():
+ G = nx.Graph([(2393, 2257), (2393, 2685), (2685, 2257), (1758, 2257)])
+ _check_augmentations(G)
+
+
+def test_gnp_augmentation():
+ rng = random.Random(0)
+ G = nx.gnp_random_graph(30, 0.005, seed=0)
+ # Randomly make edges available
+ avail = {
+ (u, v): 1 + rng.random() for u, v in complement_edges(G) if rng.random() < 0.25
+ }
+ _check_augmentations(G, avail)
+
+
+def _assert_solution_properties(G, aug_edges, avail_dict=None):
+ """Checks that aug_edges are consistently formatted"""
+ if avail_dict is not None:
+ assert all(
+ e in avail_dict for e in aug_edges
+ ), "when avail is specified aug-edges should be in avail"
+
+ unique_aug = set(map(tuple, map(sorted, aug_edges)))
+ unique_aug = list(map(tuple, map(sorted, aug_edges)))
+ assert len(aug_edges) == len(unique_aug), "edges should be unique"
+
+ assert not any(u == v for u, v in unique_aug), "should be no self-edges"
+
+ assert not any(
+ G.has_edge(u, v) for u, v in unique_aug
+ ), "aug edges and G.edges should be disjoint"
+
+
+def _augment_and_check(
+ G, k, avail=None, weight=None, verbose=False, orig_k=None, max_aug_k=None
+):
+ """
+ Does one specific augmentation and checks for properties of the result
+ """
+ if orig_k is None:
+ try:
+ orig_k = nx.edge_connectivity(G)
+ except nx.NetworkXPointlessConcept:
+ orig_k = 0
+ info = {}
+ try:
+ if avail is not None:
+ # ensure avail is in dict form
+ avail_dict = dict(zip(*_unpack_available_edges(avail, weight=weight)))
+ else:
+ avail_dict = None
+ try:
+ # Find the augmentation if possible
+ generator = nx.k_edge_augmentation(G, k=k, weight=weight, avail=avail)
+ assert not isinstance(generator, list), "should always return an iter"
+ aug_edges = []
+ for edge in generator:
+ aug_edges.append(edge)
+ except nx.NetworkXUnfeasible:
+ infeasible = True
+ info["infeasible"] = True
+ assert len(aug_edges) == 0, "should not generate anything if unfeasible"
+
+ if avail is None:
+ n_nodes = G.number_of_nodes()
+ assert n_nodes <= k, (
+ "unconstrained cases are only unfeasible if |V| <= k. "
+ f"Got |V|={n_nodes} and k={k}"
+ )
+ else:
+ if max_aug_k is None:
+ G_aug_all = G.copy()
+ G_aug_all.add_edges_from(avail_dict.keys())
+ try:
+ max_aug_k = nx.edge_connectivity(G_aug_all)
+ except nx.NetworkXPointlessConcept:
+ max_aug_k = 0
+
+ assert max_aug_k < k, (
+ "avail should only be unfeasible if using all edges "
+ "does not achieve k-edge-connectivity"
+ )
+
+ # Test for a partial solution
+ partial_edges = list(
+ nx.k_edge_augmentation(G, k=k, weight=weight, partial=True, avail=avail)
+ )
+
+ info["n_partial_edges"] = len(partial_edges)
+
+ if avail_dict is None:
+ assert set(partial_edges) == set(
+ complement_edges(G)
+ ), "unweighted partial solutions should be the complement"
+ elif len(avail_dict) > 0:
+ H = G.copy()
+
+ # Find the partial / full augmented connectivity
+ H.add_edges_from(partial_edges)
+ partial_conn = nx.edge_connectivity(H)
+
+ H.add_edges_from(set(avail_dict.keys()))
+ full_conn = nx.edge_connectivity(H)
+
+ # Full connectivity should be no better than our partial
+ # solution.
+ assert (
+ partial_conn == full_conn
+ ), "adding more edges should not increase k-conn"
+
+ # Find the new edge-connectivity after adding the augmenting edges
+ aug_edges = partial_edges
+ else:
+ infeasible = False
+
+ # Find the weight of the augmentation
+ num_edges = len(aug_edges)
+ if avail is not None:
+ total_weight = sum(avail_dict[e] for e in aug_edges)
+ else:
+ total_weight = num_edges
+
+ info["total_weight"] = total_weight
+ info["num_edges"] = num_edges
+
+ # Find the new edge-connectivity after adding the augmenting edges
+ G_aug = G.copy()
+ G_aug.add_edges_from(aug_edges)
+ try:
+ aug_k = nx.edge_connectivity(G_aug)
+ except nx.NetworkXPointlessConcept:
+ aug_k = 0
+ info["aug_k"] = aug_k
+
+ # Do checks
+ if not infeasible and orig_k < k:
+ assert info["aug_k"] >= k, f"connectivity should increase to k={k} or more"
+
+ assert info["aug_k"] >= orig_k, "augmenting should never reduce connectivity"
+
+ _assert_solution_properties(G, aug_edges, avail_dict)
+
+ except Exception:
+ info["failed"] = True
+ print(f"edges = {list(G.edges())}")
+ print(f"nodes = {list(G.nodes())}")
+ print(f"aug_edges = {list(aug_edges)}")
+ print(f"info = {info}")
+ raise
+ else:
+ if verbose:
+ print(f"info = {info}")
+
+ if infeasible:
+ aug_edges = None
+ return aug_edges, info
+
+
+def _check_augmentations(G, avail=None, max_k=None, weight=None, verbose=False):
+ """Helper to check weighted/unweighted cases with multiple values of k"""
+ # Using all available edges, find the maximum edge-connectivity
+ try:
+ orig_k = nx.edge_connectivity(G)
+ except nx.NetworkXPointlessConcept:
+ orig_k = 0
+
+ if avail is not None:
+ all_aug_edges = _unpack_available_edges(avail, weight=weight)[0]
+ G_aug_all = G.copy()
+ G_aug_all.add_edges_from(all_aug_edges)
+ try:
+ max_aug_k = nx.edge_connectivity(G_aug_all)
+ except nx.NetworkXPointlessConcept:
+ max_aug_k = 0
+ else:
+ max_aug_k = G.number_of_nodes() - 1
+
+ if max_k is None:
+ max_k = min(4, max_aug_k)
+
+ avail_uniform = {e: 1 for e in complement_edges(G)}
+
+ if verbose:
+ print("\n=== CHECK_AUGMENTATION ===")
+ print(f"G.number_of_nodes = {G.number_of_nodes()!r}")
+ print(f"G.number_of_edges = {G.number_of_edges()!r}")
+ print(f"max_k = {max_k!r}")
+ print(f"max_aug_k = {max_aug_k!r}")
+ print(f"orig_k = {orig_k!r}")
+
+ # check augmentation for multiple values of k
+ for k in range(1, max_k + 1):
+ if verbose:
+ print("---------------")
+ print(f"Checking k = {k}")
+
+ # Check the unweighted version
+ if verbose:
+ print("unweighted case")
+ aug_edges1, info1 = _augment_and_check(G, k=k, verbose=verbose, orig_k=orig_k)
+
+ # Check that the weighted version with all available edges and uniform
+ # weights gives a similar solution to the unweighted case.
+ if verbose:
+ print("weighted uniform case")
+ aug_edges2, info2 = _augment_and_check(
+ G,
+ k=k,
+ avail=avail_uniform,
+ verbose=verbose,
+ orig_k=orig_k,
+ max_aug_k=G.number_of_nodes() - 1,
+ )
+
+ # Check the weighted version
+ if avail is not None:
+ if verbose:
+ print("weighted case")
+ aug_edges3, info3 = _augment_and_check(
+ G,
+ k=k,
+ avail=avail,
+ weight=weight,
+ verbose=verbose,
+ max_aug_k=max_aug_k,
+ orig_k=orig_k,
+ )
+
+ if aug_edges1 is not None:
+ # Check approximation ratios
+ if k == 1:
+ # when k=1, both solutions should be optimal
+ assert info2["total_weight"] == info1["total_weight"]
+ if k == 2:
+ # when k=2, the weighted version is an approximation
+ if orig_k == 0:
+ # the approximation ratio is 3 if G is not connected
+ assert info2["total_weight"] <= info1["total_weight"] * 3
+ else:
+ # the approximation ratio is 2 if G is was connected
+ assert info2["total_weight"] <= info1["total_weight"] * 2
+ _check_unconstrained_bridge_property(G, info1)
+
+
+def _check_unconstrained_bridge_property(G, info1):
+ # Check Theorem 5 from Eswaran and Tarjan. (1975) Augmentation problems
+ import math
+
+ bridge_ccs = list(nx.connectivity.bridge_components(G))
+ # condense G into an forest C
+ C = collapse(G, bridge_ccs)
+
+ p = len([n for n, d in C.degree() if d == 1]) # leafs
+ q = len([n for n, d in C.degree() if d == 0]) # isolated
+ if p + q > 1:
+ size_target = math.ceil(p / 2) + q
+ size_aug = info1["num_edges"]
+ assert (
+ size_aug == size_target
+ ), "augmentation size is different from what theory predicts"
diff --git a/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_edge_kcomponents.py b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_edge_kcomponents.py
new file mode 100644
index 00000000..4a1f681a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_edge_kcomponents.py
@@ -0,0 +1,488 @@
+import itertools as it
+
+import pytest
+
+import networkx as nx
+from networkx.algorithms.connectivity import EdgeComponentAuxGraph, bridge_components
+from networkx.algorithms.connectivity.edge_kcomponents import general_k_edge_subgraphs
+from networkx.utils import pairwise
+
+# ----------------
+# Helper functions
+# ----------------
+
+
+def fset(list_of_sets):
+ """allows == to be used for list of sets"""
+ return set(map(frozenset, list_of_sets))
+
+
+def _assert_subgraph_edge_connectivity(G, ccs_subgraph, k):
+ """
+ tests properties of k-edge-connected subgraphs
+
+ the actual edge connectivity should be no less than k unless the cc is a
+ single node.
+ """
+ for cc in ccs_subgraph:
+ C = G.subgraph(cc)
+ if len(cc) > 1:
+ connectivity = nx.edge_connectivity(C)
+ assert connectivity >= k
+
+
+def _memo_connectivity(G, u, v, memo):
+ edge = (u, v)
+ if edge in memo:
+ return memo[edge]
+ if not G.is_directed():
+ redge = (v, u)
+ if redge in memo:
+ return memo[redge]
+ memo[edge] = nx.edge_connectivity(G, *edge)
+ return memo[edge]
+
+
+def _all_pairs_connectivity(G, cc, k, memo):
+ # Brute force check
+ for u, v in it.combinations(cc, 2):
+ # Use a memoization dict to save on computation
+ connectivity = _memo_connectivity(G, u, v, memo)
+ if G.is_directed():
+ connectivity = min(connectivity, _memo_connectivity(G, v, u, memo))
+ assert connectivity >= k
+
+
+def _assert_local_cc_edge_connectivity(G, ccs_local, k, memo):
+ """
+ tests properties of k-edge-connected components
+
+ the local edge connectivity between each pair of nodes in the original
+ graph should be no less than k unless the cc is a single node.
+ """
+ for cc in ccs_local:
+ if len(cc) > 1:
+ # Strategy for testing a bit faster: If the subgraph has high edge
+ # connectivity then it must have local connectivity
+ C = G.subgraph(cc)
+ connectivity = nx.edge_connectivity(C)
+ if connectivity < k:
+ # Otherwise do the brute force (with memoization) check
+ _all_pairs_connectivity(G, cc, k, memo)
+
+
+# Helper function
+def _check_edge_connectivity(G):
+ """
+ Helper - generates all k-edge-components using the aux graph. Checks the
+ both local and subgraph edge connectivity of each cc. Also checks that
+ alternate methods of computing the k-edge-ccs generate the same result.
+ """
+ # Construct the auxiliary graph that can be used to make each k-cc or k-sub
+ aux_graph = EdgeComponentAuxGraph.construct(G)
+
+ # memoize the local connectivity in this graph
+ memo = {}
+
+ for k in it.count(1):
+ # Test "local" k-edge-components and k-edge-subgraphs
+ ccs_local = fset(aux_graph.k_edge_components(k))
+ ccs_subgraph = fset(aux_graph.k_edge_subgraphs(k))
+
+ # Check connectivity properties that should be guaranteed by the
+ # algorithms.
+ _assert_local_cc_edge_connectivity(G, ccs_local, k, memo)
+ _assert_subgraph_edge_connectivity(G, ccs_subgraph, k)
+
+ if k == 1 or k == 2 and not G.is_directed():
+ assert (
+ ccs_local == ccs_subgraph
+ ), "Subgraphs and components should be the same when k == 1 or (k == 2 and not G.directed())"
+
+ if G.is_directed():
+ # Test special case methods are the same as the aux graph
+ if k == 1:
+ alt_sccs = fset(nx.strongly_connected_components(G))
+ assert alt_sccs == ccs_local, "k=1 failed alt"
+ assert alt_sccs == ccs_subgraph, "k=1 failed alt"
+ else:
+ # Test special case methods are the same as the aux graph
+ if k == 1:
+ alt_ccs = fset(nx.connected_components(G))
+ assert alt_ccs == ccs_local, "k=1 failed alt"
+ assert alt_ccs == ccs_subgraph, "k=1 failed alt"
+ elif k == 2:
+ alt_bridge_ccs = fset(bridge_components(G))
+ assert alt_bridge_ccs == ccs_local, "k=2 failed alt"
+ assert alt_bridge_ccs == ccs_subgraph, "k=2 failed alt"
+ # if new methods for k == 3 or k == 4 are implemented add them here
+
+ # Check the general subgraph method works by itself
+ alt_subgraph_ccs = fset(
+ [set(C.nodes()) for C in general_k_edge_subgraphs(G, k=k)]
+ )
+ assert alt_subgraph_ccs == ccs_subgraph, "alt subgraph method failed"
+
+ # Stop once k is larger than all special case methods
+ # and we cannot break down ccs any further.
+ if k > 2 and all(len(cc) == 1 for cc in ccs_local):
+ break
+
+
+# ----------------
+# Misc tests
+# ----------------
+
+
+def test_zero_k_exception():
+ G = nx.Graph()
+ # functions that return generators error immediately
+ pytest.raises(ValueError, nx.k_edge_components, G, k=0)
+ pytest.raises(ValueError, nx.k_edge_subgraphs, G, k=0)
+
+ # actual generators only error when you get the first item
+ aux_graph = EdgeComponentAuxGraph.construct(G)
+ pytest.raises(ValueError, list, aux_graph.k_edge_components(k=0))
+ pytest.raises(ValueError, list, aux_graph.k_edge_subgraphs(k=0))
+
+ pytest.raises(ValueError, list, general_k_edge_subgraphs(G, k=0))
+
+
+def test_empty_input():
+ G = nx.Graph()
+ assert [] == list(nx.k_edge_components(G, k=5))
+ assert [] == list(nx.k_edge_subgraphs(G, k=5))
+
+ G = nx.DiGraph()
+ assert [] == list(nx.k_edge_components(G, k=5))
+ assert [] == list(nx.k_edge_subgraphs(G, k=5))
+
+
+def test_not_implemented():
+ G = nx.MultiGraph()
+ pytest.raises(nx.NetworkXNotImplemented, EdgeComponentAuxGraph.construct, G)
+ pytest.raises(nx.NetworkXNotImplemented, nx.k_edge_components, G, k=2)
+ pytest.raises(nx.NetworkXNotImplemented, nx.k_edge_subgraphs, G, k=2)
+ with pytest.raises(nx.NetworkXNotImplemented):
+ next(bridge_components(G))
+ with pytest.raises(nx.NetworkXNotImplemented):
+ next(bridge_components(nx.DiGraph()))
+
+
+def test_general_k_edge_subgraph_quick_return():
+ # tests quick return optimization
+ G = nx.Graph()
+ G.add_node(0)
+ subgraphs = list(general_k_edge_subgraphs(G, k=1))
+ assert len(subgraphs) == 1
+ for subgraph in subgraphs:
+ assert subgraph.number_of_nodes() == 1
+
+ G.add_node(1)
+ subgraphs = list(general_k_edge_subgraphs(G, k=1))
+ assert len(subgraphs) == 2
+ for subgraph in subgraphs:
+ assert subgraph.number_of_nodes() == 1
+
+
+# ----------------
+# Undirected tests
+# ----------------
+
+
+def test_random_gnp():
+ # seeds = [1550709854, 1309423156, 4208992358, 2785630813, 1915069929]
+ seeds = [12, 13]
+
+ for seed in seeds:
+ G = nx.gnp_random_graph(20, 0.2, seed=seed)
+ _check_edge_connectivity(G)
+
+
+def test_configuration():
+ # seeds = [2718183590, 2470619828, 1694705158, 3001036531, 2401251497]
+ seeds = [14, 15]
+ for seed in seeds:
+ deg_seq = nx.random_powerlaw_tree_sequence(20, seed=seed, tries=5000)
+ G = nx.Graph(nx.configuration_model(deg_seq, seed=seed))
+ G.remove_edges_from(nx.selfloop_edges(G))
+ _check_edge_connectivity(G)
+
+
+def test_shell():
+ # seeds = [2057382236, 3331169846, 1840105863, 476020778, 2247498425]
+ seeds = [20]
+ for seed in seeds:
+ constructor = [(12, 70, 0.8), (15, 40, 0.6)]
+ G = nx.random_shell_graph(constructor, seed=seed)
+ _check_edge_connectivity(G)
+
+
+def test_karate():
+ G = nx.karate_club_graph()
+ _check_edge_connectivity(G)
+
+
+def test_tarjan_bridge():
+ # graph from tarjan paper
+ # RE Tarjan - "A note on finding the bridges of a graph"
+ # Information Processing Letters, 1974 - Elsevier
+ # doi:10.1016/0020-0190(74)90003-9.
+ # define 2-connected components and bridges
+ ccs = [
+ (1, 2, 4, 3, 1, 4),
+ (5, 6, 7, 5),
+ (8, 9, 10, 8),
+ (17, 18, 16, 15, 17),
+ (11, 12, 14, 13, 11, 14),
+ ]
+ bridges = [(4, 8), (3, 5), (3, 17)]
+ G = nx.Graph(it.chain(*(pairwise(path) for path in ccs + bridges)))
+ _check_edge_connectivity(G)
+
+
+def test_bridge_cc():
+ # define 2-connected components and bridges
+ cc2 = [(1, 2, 4, 3, 1, 4), (8, 9, 10, 8), (11, 12, 13, 11)]
+ bridges = [(4, 8), (3, 5), (20, 21), (22, 23, 24)]
+ G = nx.Graph(it.chain(*(pairwise(path) for path in cc2 + bridges)))
+ bridge_ccs = fset(bridge_components(G))
+ target_ccs = fset(
+ [{1, 2, 3, 4}, {5}, {8, 9, 10}, {11, 12, 13}, {20}, {21}, {22}, {23}, {24}]
+ )
+ assert bridge_ccs == target_ccs
+ _check_edge_connectivity(G)
+
+
+def test_undirected_aux_graph():
+ # Graph similar to the one in
+ # http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0136264
+ a, b, c, d, e, f, g, h, i = "abcdefghi"
+ paths = [
+ (a, d, b, f, c),
+ (a, e, b),
+ (a, e, b, c, g, b, a),
+ (c, b),
+ (f, g, f),
+ (h, i),
+ ]
+ G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
+ aux_graph = EdgeComponentAuxGraph.construct(G)
+
+ components_1 = fset(aux_graph.k_edge_subgraphs(k=1))
+ target_1 = fset([{a, b, c, d, e, f, g}, {h, i}])
+ assert target_1 == components_1
+
+ # Check that the undirected case for k=1 agrees with CCs
+ alt_1 = fset(nx.k_edge_subgraphs(G, k=1))
+ assert alt_1 == components_1
+
+ components_2 = fset(aux_graph.k_edge_subgraphs(k=2))
+ target_2 = fset([{a, b, c, d, e, f, g}, {h}, {i}])
+ assert target_2 == components_2
+
+ # Check that the undirected case for k=2 agrees with bridge components
+ alt_2 = fset(nx.k_edge_subgraphs(G, k=2))
+ assert alt_2 == components_2
+
+ components_3 = fset(aux_graph.k_edge_subgraphs(k=3))
+ target_3 = fset([{a}, {b, c, f, g}, {d}, {e}, {h}, {i}])
+ assert target_3 == components_3
+
+ components_4 = fset(aux_graph.k_edge_subgraphs(k=4))
+ target_4 = fset([{a}, {b}, {c}, {d}, {e}, {f}, {g}, {h}, {i}])
+ assert target_4 == components_4
+
+ _check_edge_connectivity(G)
+
+
+def test_local_subgraph_difference():
+ paths = [
+ (11, 12, 13, 14, 11, 13, 14, 12), # first 4-clique
+ (21, 22, 23, 24, 21, 23, 24, 22), # second 4-clique
+ # paths connecting each node of the 4 cliques
+ (11, 101, 21),
+ (12, 102, 22),
+ (13, 103, 23),
+ (14, 104, 24),
+ ]
+ G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
+ aux_graph = EdgeComponentAuxGraph.construct(G)
+
+ # Each clique is returned separately in k-edge-subgraphs
+ subgraph_ccs = fset(aux_graph.k_edge_subgraphs(3))
+ subgraph_target = fset(
+ [{101}, {102}, {103}, {104}, {21, 22, 23, 24}, {11, 12, 13, 14}]
+ )
+ assert subgraph_ccs == subgraph_target
+
+ # But in k-edge-ccs they are returned together
+ # because they are locally 3-edge-connected
+ local_ccs = fset(aux_graph.k_edge_components(3))
+ local_target = fset([{101}, {102}, {103}, {104}, {11, 12, 13, 14, 21, 22, 23, 24}])
+ assert local_ccs == local_target
+
+
+def test_local_subgraph_difference_directed():
+ dipaths = [(1, 2, 3, 4, 1), (1, 3, 1)]
+ G = nx.DiGraph(it.chain(*[pairwise(path) for path in dipaths]))
+
+ assert fset(nx.k_edge_components(G, k=1)) == fset(nx.k_edge_subgraphs(G, k=1))
+
+ # Unlike undirected graphs, when k=2, for directed graphs there is a case
+ # where the k-edge-ccs are not the same as the k-edge-subgraphs.
+ # (in directed graphs ccs and subgraphs are the same when k=2)
+ assert fset(nx.k_edge_components(G, k=2)) != fset(nx.k_edge_subgraphs(G, k=2))
+
+ assert fset(nx.k_edge_components(G, k=3)) == fset(nx.k_edge_subgraphs(G, k=3))
+
+ _check_edge_connectivity(G)
+
+
+def test_triangles():
+ paths = [
+ (11, 12, 13, 11), # first 3-clique
+ (21, 22, 23, 21), # second 3-clique
+ (11, 21), # connected by an edge
+ ]
+ G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
+
+ # subgraph and ccs are the same in all cases here
+ assert fset(nx.k_edge_components(G, k=1)) == fset(nx.k_edge_subgraphs(G, k=1))
+
+ assert fset(nx.k_edge_components(G, k=2)) == fset(nx.k_edge_subgraphs(G, k=2))
+
+ assert fset(nx.k_edge_components(G, k=3)) == fset(nx.k_edge_subgraphs(G, k=3))
+
+ _check_edge_connectivity(G)
+
+
+def test_four_clique():
+ paths = [
+ (11, 12, 13, 14, 11, 13, 14, 12), # first 4-clique
+ (21, 22, 23, 24, 21, 23, 24, 22), # second 4-clique
+ # paths connecting the 4 cliques such that they are
+ # 3-connected in G, but not in the subgraph.
+ # Case where the nodes bridging them do not have degree less than 3.
+ (100, 13),
+ (12, 100, 22),
+ (13, 200, 23),
+ (14, 300, 24),
+ ]
+ G = nx.Graph(it.chain(*[pairwise(path) for path in paths]))
+
+ # The subgraphs and ccs are different for k=3
+ local_ccs = fset(nx.k_edge_components(G, k=3))
+ subgraphs = fset(nx.k_edge_subgraphs(G, k=3))
+ assert local_ccs != subgraphs
+
+ # The cliques ares in the same cc
+ clique1 = frozenset(paths[0])
+ clique2 = frozenset(paths[1])
+ assert clique1.union(clique2).union({100}) in local_ccs
+
+ # but different subgraphs
+ assert clique1 in subgraphs
+ assert clique2 in subgraphs
+
+ assert G.degree(100) == 3
+
+ _check_edge_connectivity(G)
+
+
+def test_five_clique():
+ # Make a graph that can be disconnected less than 4 edges, but no node has
+ # degree less than 4.
+ G = nx.disjoint_union(nx.complete_graph(5), nx.complete_graph(5))
+ paths = [
+ # add aux-connections
+ (1, 100, 6),
+ (2, 100, 7),
+ (3, 200, 8),
+ (4, 200, 100),
+ ]
+ G.add_edges_from(it.chain(*[pairwise(path) for path in paths]))
+ assert min(dict(nx.degree(G)).values()) == 4
+
+ # For k=3 they are the same
+ assert fset(nx.k_edge_components(G, k=3)) == fset(nx.k_edge_subgraphs(G, k=3))
+
+ # For k=4 they are the different
+ # the aux nodes are in the same CC as clique 1 but no the same subgraph
+ assert fset(nx.k_edge_components(G, k=4)) != fset(nx.k_edge_subgraphs(G, k=4))
+
+ # For k=5 they are not the same
+ assert fset(nx.k_edge_components(G, k=5)) != fset(nx.k_edge_subgraphs(G, k=5))
+
+ # For k=6 they are the same
+ assert fset(nx.k_edge_components(G, k=6)) == fset(nx.k_edge_subgraphs(G, k=6))
+ _check_edge_connectivity(G)
+
+
+# ----------------
+# Undirected tests
+# ----------------
+
+
+def test_directed_aux_graph():
+ # Graph similar to the one in
+ # http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0136264
+ a, b, c, d, e, f, g, h, i = "abcdefghi"
+ dipaths = [
+ (a, d, b, f, c),
+ (a, e, b),
+ (a, e, b, c, g, b, a),
+ (c, b),
+ (f, g, f),
+ (h, i),
+ ]
+ G = nx.DiGraph(it.chain(*[pairwise(path) for path in dipaths]))
+ aux_graph = EdgeComponentAuxGraph.construct(G)
+
+ components_1 = fset(aux_graph.k_edge_subgraphs(k=1))
+ target_1 = fset([{a, b, c, d, e, f, g}, {h}, {i}])
+ assert target_1 == components_1
+
+ # Check that the directed case for k=1 agrees with SCCs
+ alt_1 = fset(nx.strongly_connected_components(G))
+ assert alt_1 == components_1
+
+ components_2 = fset(aux_graph.k_edge_subgraphs(k=2))
+ target_2 = fset([{i}, {e}, {d}, {b, c, f, g}, {h}, {a}])
+ assert target_2 == components_2
+
+ components_3 = fset(aux_graph.k_edge_subgraphs(k=3))
+ target_3 = fset([{a}, {b}, {c}, {d}, {e}, {f}, {g}, {h}, {i}])
+ assert target_3 == components_3
+
+
+def test_random_gnp_directed():
+ # seeds = [3894723670, 500186844, 267231174, 2181982262, 1116750056]
+ seeds = [21]
+ for seed in seeds:
+ G = nx.gnp_random_graph(20, 0.2, directed=True, seed=seed)
+ _check_edge_connectivity(G)
+
+
+def test_configuration_directed():
+ # seeds = [671221681, 2403749451, 124433910, 672335939, 1193127215]
+ seeds = [67]
+ for seed in seeds:
+ deg_seq = nx.random_powerlaw_tree_sequence(20, seed=seed, tries=5000)
+ G = nx.DiGraph(nx.configuration_model(deg_seq, seed=seed))
+ G.remove_edges_from(nx.selfloop_edges(G))
+ _check_edge_connectivity(G)
+
+
+def test_shell_directed():
+ # seeds = [3134027055, 4079264063, 1350769518, 1405643020, 530038094]
+ seeds = [31]
+ for seed in seeds:
+ constructor = [(12, 70, 0.8), (15, 40, 0.6)]
+ G = nx.random_shell_graph(constructor, seed=seed).to_directed()
+ _check_edge_connectivity(G)
+
+
+def test_karate_directed():
+ G = nx.karate_club_graph().to_directed()
+ _check_edge_connectivity(G)
diff --git a/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_kcomponents.py b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_kcomponents.py
new file mode 100644
index 00000000..f4436acd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_kcomponents.py
@@ -0,0 +1,296 @@
+# Test for Moody and White k-components algorithm
+import pytest
+
+import networkx as nx
+from networkx.algorithms.connectivity.kcomponents import (
+ _consolidate,
+ build_k_number_dict,
+)
+
+##
+# A nice synthetic graph
+##
+
+
+def torrents_and_ferraro_graph():
+ # Graph from https://arxiv.org/pdf/1503.04476v1 p.26
+ G = nx.convert_node_labels_to_integers(
+ nx.grid_graph([5, 5]), label_attribute="labels"
+ )
+ rlabels = nx.get_node_attributes(G, "labels")
+ labels = {v: k for k, v in rlabels.items()}
+
+ for nodes in [(labels[(0, 4)], labels[(1, 4)]), (labels[(3, 4)], labels[(4, 4)])]:
+ new_node = G.order() + 1
+ # Petersen graph is triconnected
+ P = nx.petersen_graph()
+ G = nx.disjoint_union(G, P)
+ # Add two edges between the grid and P
+ G.add_edge(new_node + 1, nodes[0])
+ G.add_edge(new_node, nodes[1])
+ # K5 is 4-connected
+ K = nx.complete_graph(5)
+ G = nx.disjoint_union(G, K)
+ # Add three edges between P and K5
+ G.add_edge(new_node + 2, new_node + 11)
+ G.add_edge(new_node + 3, new_node + 12)
+ G.add_edge(new_node + 4, new_node + 13)
+ # Add another K5 sharing a node
+ G = nx.disjoint_union(G, K)
+ nbrs = G[new_node + 10]
+ G.remove_node(new_node + 10)
+ for nbr in nbrs:
+ G.add_edge(new_node + 17, nbr)
+ # This edge makes the graph biconnected; it's
+ # needed because K5s share only one node.
+ G.add_edge(new_node + 16, new_node + 8)
+
+ for nodes in [(labels[(0, 0)], labels[(1, 0)]), (labels[(3, 0)], labels[(4, 0)])]:
+ new_node = G.order() + 1
+ # Petersen graph is triconnected
+ P = nx.petersen_graph()
+ G = nx.disjoint_union(G, P)
+ # Add two edges between the grid and P
+ G.add_edge(new_node + 1, nodes[0])
+ G.add_edge(new_node, nodes[1])
+ # K5 is 4-connected
+ K = nx.complete_graph(5)
+ G = nx.disjoint_union(G, K)
+ # Add three edges between P and K5
+ G.add_edge(new_node + 2, new_node + 11)
+ G.add_edge(new_node + 3, new_node + 12)
+ G.add_edge(new_node + 4, new_node + 13)
+ # Add another K5 sharing two nodes
+ G = nx.disjoint_union(G, K)
+ nbrs = G[new_node + 10]
+ G.remove_node(new_node + 10)
+ for nbr in nbrs:
+ G.add_edge(new_node + 17, nbr)
+ nbrs2 = G[new_node + 9]
+ G.remove_node(new_node + 9)
+ for nbr in nbrs2:
+ G.add_edge(new_node + 18, nbr)
+ return G
+
+
+def test_directed():
+ with pytest.raises(nx.NetworkXNotImplemented):
+ G = nx.gnp_random_graph(10, 0.2, directed=True, seed=42)
+ nx.k_components(G)
+
+
+# Helper function
+def _check_connectivity(G, k_components):
+ for k, components in k_components.items():
+ if k < 3:
+ continue
+ # check that k-components have node connectivity >= k.
+ for component in components:
+ C = G.subgraph(component)
+ K = nx.node_connectivity(C)
+ assert K >= k
+
+
+@pytest.mark.slow
+def test_torrents_and_ferraro_graph():
+ G = torrents_and_ferraro_graph()
+ result = nx.k_components(G)
+ _check_connectivity(G, result)
+
+ # In this example graph there are 8 3-components, 4 with 15 nodes
+ # and 4 with 5 nodes.
+ assert len(result[3]) == 8
+ assert len([c for c in result[3] if len(c) == 15]) == 4
+ assert len([c for c in result[3] if len(c) == 5]) == 4
+ # There are also 8 4-components all with 5 nodes.
+ assert len(result[4]) == 8
+ assert all(len(c) == 5 for c in result[4])
+
+
+@pytest.mark.slow
+def test_random_gnp():
+ G = nx.gnp_random_graph(50, 0.2, seed=42)
+ result = nx.k_components(G)
+ _check_connectivity(G, result)
+
+
+@pytest.mark.slow
+def test_shell():
+ constructor = [(20, 80, 0.8), (80, 180, 0.6)]
+ G = nx.random_shell_graph(constructor, seed=42)
+ result = nx.k_components(G)
+ _check_connectivity(G, result)
+
+
+def test_configuration():
+ deg_seq = nx.random_powerlaw_tree_sequence(100, tries=5, seed=72)
+ G = nx.Graph(nx.configuration_model(deg_seq))
+ G.remove_edges_from(nx.selfloop_edges(G))
+ result = nx.k_components(G)
+ _check_connectivity(G, result)
+
+
+def test_karate():
+ G = nx.karate_club_graph()
+ result = nx.k_components(G)
+ _check_connectivity(G, result)
+
+
+def test_karate_component_number():
+ karate_k_num = {
+ 0: 4,
+ 1: 4,
+ 2: 4,
+ 3: 4,
+ 4: 3,
+ 5: 3,
+ 6: 3,
+ 7: 4,
+ 8: 4,
+ 9: 2,
+ 10: 3,
+ 11: 1,
+ 12: 2,
+ 13: 4,
+ 14: 2,
+ 15: 2,
+ 16: 2,
+ 17: 2,
+ 18: 2,
+ 19: 3,
+ 20: 2,
+ 21: 2,
+ 22: 2,
+ 23: 3,
+ 24: 3,
+ 25: 3,
+ 26: 2,
+ 27: 3,
+ 28: 3,
+ 29: 3,
+ 30: 4,
+ 31: 3,
+ 32: 4,
+ 33: 4,
+ }
+ G = nx.karate_club_graph()
+ k_components = nx.k_components(G)
+ k_num = build_k_number_dict(k_components)
+ assert karate_k_num == k_num
+
+
+def test_davis_southern_women():
+ G = nx.davis_southern_women_graph()
+ result = nx.k_components(G)
+ _check_connectivity(G, result)
+
+
+def test_davis_southern_women_detail_3_and_4():
+ solution = {
+ 3: [
+ {
+ "Nora Fayette",
+ "E10",
+ "Myra Liddel",
+ "E12",
+ "E14",
+ "Frances Anderson",
+ "Evelyn Jefferson",
+ "Ruth DeSand",
+ "Helen Lloyd",
+ "Eleanor Nye",
+ "E9",
+ "E8",
+ "E5",
+ "E4",
+ "E7",
+ "E6",
+ "E1",
+ "Verne Sanderson",
+ "E3",
+ "E2",
+ "Theresa Anderson",
+ "Pearl Oglethorpe",
+ "Katherina Rogers",
+ "Brenda Rogers",
+ "E13",
+ "Charlotte McDowd",
+ "Sylvia Avondale",
+ "Laura Mandeville",
+ }
+ ],
+ 4: [
+ {
+ "Nora Fayette",
+ "E10",
+ "Verne Sanderson",
+ "E12",
+ "Frances Anderson",
+ "Evelyn Jefferson",
+ "Ruth DeSand",
+ "Helen Lloyd",
+ "Eleanor Nye",
+ "E9",
+ "E8",
+ "E5",
+ "E4",
+ "E7",
+ "E6",
+ "Myra Liddel",
+ "E3",
+ "Theresa Anderson",
+ "Katherina Rogers",
+ "Brenda Rogers",
+ "Charlotte McDowd",
+ "Sylvia Avondale",
+ "Laura Mandeville",
+ }
+ ],
+ }
+ G = nx.davis_southern_women_graph()
+ result = nx.k_components(G)
+ for k, components in result.items():
+ if k < 3:
+ continue
+ assert len(components) == len(solution[k])
+ for component in components:
+ assert component in solution[k]
+
+
+def test_set_consolidation_rosettacode():
+ # Tests from http://rosettacode.org/wiki/Set_consolidation
+ def list_of_sets_equal(result, solution):
+ assert {frozenset(s) for s in result} == {frozenset(s) for s in solution}
+
+ question = [{"A", "B"}, {"C", "D"}]
+ solution = [{"A", "B"}, {"C", "D"}]
+ list_of_sets_equal(_consolidate(question, 1), solution)
+ question = [{"A", "B"}, {"B", "C"}]
+ solution = [{"A", "B", "C"}]
+ list_of_sets_equal(_consolidate(question, 1), solution)
+ question = [{"A", "B"}, {"C", "D"}, {"D", "B"}]
+ solution = [{"A", "C", "B", "D"}]
+ list_of_sets_equal(_consolidate(question, 1), solution)
+ question = [{"H", "I", "K"}, {"A", "B"}, {"C", "D"}, {"D", "B"}, {"F", "G", "H"}]
+ solution = [{"A", "C", "B", "D"}, {"G", "F", "I", "H", "K"}]
+ list_of_sets_equal(_consolidate(question, 1), solution)
+ question = [
+ {"A", "H"},
+ {"H", "I", "K"},
+ {"A", "B"},
+ {"C", "D"},
+ {"D", "B"},
+ {"F", "G", "H"},
+ ]
+ solution = [{"A", "C", "B", "D", "G", "F", "I", "H", "K"}]
+ list_of_sets_equal(_consolidate(question, 1), solution)
+ question = [
+ {"H", "I", "K"},
+ {"A", "B"},
+ {"C", "D"},
+ {"D", "B"},
+ {"F", "G", "H"},
+ {"A", "H"},
+ ]
+ solution = [{"A", "C", "B", "D", "G", "F", "I", "H", "K"}]
+ list_of_sets_equal(_consolidate(question, 1), solution)
diff --git a/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_kcutsets.py b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_kcutsets.py
new file mode 100644
index 00000000..4b4b5494
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_kcutsets.py
@@ -0,0 +1,273 @@
+# Jordi Torrents
+# Test for k-cutsets
+import itertools
+
+import pytest
+
+import networkx as nx
+from networkx.algorithms import flow
+from networkx.algorithms.connectivity.kcutsets import _is_separating_set
+
+MAX_CUTSETS_TO_TEST = 4 # originally 100. cut to decrease testing time
+
+flow_funcs = [
+ flow.boykov_kolmogorov,
+ flow.dinitz,
+ flow.edmonds_karp,
+ flow.preflow_push,
+ flow.shortest_augmenting_path,
+]
+
+
+##
+# Some nice synthetic graphs
+##
+def graph_example_1():
+ G = nx.convert_node_labels_to_integers(
+ nx.grid_graph([5, 5]), label_attribute="labels"
+ )
+ rlabels = nx.get_node_attributes(G, "labels")
+ labels = {v: k for k, v in rlabels.items()}
+
+ for nodes in [
+ (labels[(0, 0)], labels[(1, 0)]),
+ (labels[(0, 4)], labels[(1, 4)]),
+ (labels[(3, 0)], labels[(4, 0)]),
+ (labels[(3, 4)], labels[(4, 4)]),
+ ]:
+ new_node = G.order() + 1
+ # Petersen graph is triconnected
+ P = nx.petersen_graph()
+ G = nx.disjoint_union(G, P)
+ # Add two edges between the grid and P
+ G.add_edge(new_node + 1, nodes[0])
+ G.add_edge(new_node, nodes[1])
+ # K5 is 4-connected
+ K = nx.complete_graph(5)
+ G = nx.disjoint_union(G, K)
+ # Add three edges between P and K5
+ G.add_edge(new_node + 2, new_node + 11)
+ G.add_edge(new_node + 3, new_node + 12)
+ G.add_edge(new_node + 4, new_node + 13)
+ # Add another K5 sharing a node
+ G = nx.disjoint_union(G, K)
+ nbrs = G[new_node + 10]
+ G.remove_node(new_node + 10)
+ for nbr in nbrs:
+ G.add_edge(new_node + 17, nbr)
+ G.add_edge(new_node + 16, new_node + 5)
+ return G
+
+
+def torrents_and_ferraro_graph():
+ G = nx.convert_node_labels_to_integers(
+ nx.grid_graph([5, 5]), label_attribute="labels"
+ )
+ rlabels = nx.get_node_attributes(G, "labels")
+ labels = {v: k for k, v in rlabels.items()}
+
+ for nodes in [(labels[(0, 4)], labels[(1, 4)]), (labels[(3, 4)], labels[(4, 4)])]:
+ new_node = G.order() + 1
+ # Petersen graph is triconnected
+ P = nx.petersen_graph()
+ G = nx.disjoint_union(G, P)
+ # Add two edges between the grid and P
+ G.add_edge(new_node + 1, nodes[0])
+ G.add_edge(new_node, nodes[1])
+ # K5 is 4-connected
+ K = nx.complete_graph(5)
+ G = nx.disjoint_union(G, K)
+ # Add three edges between P and K5
+ G.add_edge(new_node + 2, new_node + 11)
+ G.add_edge(new_node + 3, new_node + 12)
+ G.add_edge(new_node + 4, new_node + 13)
+ # Add another K5 sharing a node
+ G = nx.disjoint_union(G, K)
+ nbrs = G[new_node + 10]
+ G.remove_node(new_node + 10)
+ for nbr in nbrs:
+ G.add_edge(new_node + 17, nbr)
+ # Commenting this makes the graph not biconnected !!
+ # This stupid mistake make one reviewer very angry :P
+ G.add_edge(new_node + 16, new_node + 8)
+
+ for nodes in [(labels[(0, 0)], labels[(1, 0)]), (labels[(3, 0)], labels[(4, 0)])]:
+ new_node = G.order() + 1
+ # Petersen graph is triconnected
+ P = nx.petersen_graph()
+ G = nx.disjoint_union(G, P)
+ # Add two edges between the grid and P
+ G.add_edge(new_node + 1, nodes[0])
+ G.add_edge(new_node, nodes[1])
+ # K5 is 4-connected
+ K = nx.complete_graph(5)
+ G = nx.disjoint_union(G, K)
+ # Add three edges between P and K5
+ G.add_edge(new_node + 2, new_node + 11)
+ G.add_edge(new_node + 3, new_node + 12)
+ G.add_edge(new_node + 4, new_node + 13)
+ # Add another K5 sharing two nodes
+ G = nx.disjoint_union(G, K)
+ nbrs = G[new_node + 10]
+ G.remove_node(new_node + 10)
+ for nbr in nbrs:
+ G.add_edge(new_node + 17, nbr)
+ nbrs2 = G[new_node + 9]
+ G.remove_node(new_node + 9)
+ for nbr in nbrs2:
+ G.add_edge(new_node + 18, nbr)
+ return G
+
+
+# Helper function
+def _check_separating_sets(G):
+ for cc in nx.connected_components(G):
+ if len(cc) < 3:
+ continue
+ Gc = G.subgraph(cc)
+ node_conn = nx.node_connectivity(Gc)
+ all_cuts = nx.all_node_cuts(Gc)
+ # Only test a limited number of cut sets to reduce test time.
+ for cut in itertools.islice(all_cuts, MAX_CUTSETS_TO_TEST):
+ assert node_conn == len(cut)
+ assert not nx.is_connected(nx.restricted_view(G, cut, []))
+
+
+@pytest.mark.slow
+def test_torrents_and_ferraro_graph():
+ G = torrents_and_ferraro_graph()
+ _check_separating_sets(G)
+
+
+def test_example_1():
+ G = graph_example_1()
+ _check_separating_sets(G)
+
+
+def test_random_gnp():
+ G = nx.gnp_random_graph(100, 0.1, seed=42)
+ _check_separating_sets(G)
+
+
+def test_shell():
+ constructor = [(20, 80, 0.8), (80, 180, 0.6)]
+ G = nx.random_shell_graph(constructor, seed=42)
+ _check_separating_sets(G)
+
+
+def test_configuration():
+ deg_seq = nx.random_powerlaw_tree_sequence(100, tries=5, seed=72)
+ G = nx.Graph(nx.configuration_model(deg_seq))
+ G.remove_edges_from(nx.selfloop_edges(G))
+ _check_separating_sets(G)
+
+
+def test_karate():
+ G = nx.karate_club_graph()
+ _check_separating_sets(G)
+
+
+def _generate_no_biconnected(max_attempts=50):
+ attempts = 0
+ while True:
+ G = nx.fast_gnp_random_graph(100, 0.0575, seed=42)
+ if nx.is_connected(G) and not nx.is_biconnected(G):
+ attempts = 0
+ yield G
+ else:
+ if attempts >= max_attempts:
+ msg = f"Tried {attempts} times: no suitable Graph."
+ raise Exception(msg)
+ else:
+ attempts += 1
+
+
+def test_articulation_points():
+ Ggen = _generate_no_biconnected()
+ for i in range(1): # change 1 to 3 or more for more realizations.
+ G = next(Ggen)
+ articulation_points = [{a} for a in nx.articulation_points(G)]
+ for cut in nx.all_node_cuts(G):
+ assert cut in articulation_points
+
+
+def test_grid_2d_graph():
+ # All minimum node cuts of a 2d grid
+ # are the four pairs of nodes that are
+ # neighbors of the four corner nodes.
+ G = nx.grid_2d_graph(5, 5)
+ solution = [{(0, 1), (1, 0)}, {(3, 0), (4, 1)}, {(3, 4), (4, 3)}, {(0, 3), (1, 4)}]
+ for cut in nx.all_node_cuts(G):
+ assert cut in solution
+
+
+def test_disconnected_graph():
+ G = nx.fast_gnp_random_graph(100, 0.01, seed=42)
+ cuts = nx.all_node_cuts(G)
+ pytest.raises(nx.NetworkXError, next, cuts)
+
+
+@pytest.mark.slow
+def test_alternative_flow_functions():
+ graphs = [nx.grid_2d_graph(4, 4), nx.cycle_graph(5)]
+ for G in graphs:
+ node_conn = nx.node_connectivity(G)
+ for flow_func in flow_funcs:
+ all_cuts = nx.all_node_cuts(G, flow_func=flow_func)
+ # Only test a limited number of cut sets to reduce test time.
+ for cut in itertools.islice(all_cuts, MAX_CUTSETS_TO_TEST):
+ assert node_conn == len(cut)
+ assert not nx.is_connected(nx.restricted_view(G, cut, []))
+
+
+def test_is_separating_set_complete_graph():
+ G = nx.complete_graph(5)
+ assert _is_separating_set(G, {0, 1, 2, 3})
+
+
+def test_is_separating_set():
+ for i in [5, 10, 15]:
+ G = nx.star_graph(i)
+ max_degree_node = max(G, key=G.degree)
+ assert _is_separating_set(G, {max_degree_node})
+
+
+def test_non_repeated_cuts():
+ # The algorithm was repeating the cut {0, 1} for the giant biconnected
+ # component of the Karate club graph.
+ K = nx.karate_club_graph()
+ bcc = max(list(nx.biconnected_components(K)), key=len)
+ G = K.subgraph(bcc)
+ solution = [{32, 33}, {2, 33}, {0, 3}, {0, 1}, {29, 33}]
+ cuts = list(nx.all_node_cuts(G))
+ if len(solution) != len(cuts):
+ print(f"Solution: {solution}")
+ print(f"Result: {cuts}")
+ assert len(solution) == len(cuts)
+ for cut in cuts:
+ assert cut in solution
+
+
+def test_cycle_graph():
+ G = nx.cycle_graph(5)
+ solution = [{0, 2}, {0, 3}, {1, 3}, {1, 4}, {2, 4}]
+ cuts = list(nx.all_node_cuts(G))
+ assert len(solution) == len(cuts)
+ for cut in cuts:
+ assert cut in solution
+
+
+def test_complete_graph():
+ G = nx.complete_graph(5)
+ assert nx.node_connectivity(G) == 4
+ assert list(nx.all_node_cuts(G)) == []
+
+
+def test_all_node_cuts_simple_case():
+ G = nx.complete_graph(5)
+ G.remove_edges_from([(0, 1), (3, 4)])
+ expected = [{0, 1, 2}, {2, 3, 4}]
+ actual = list(nx.all_node_cuts(G))
+ assert len(actual) == len(expected)
+ for cut in actual:
+ assert cut in expected
diff --git a/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_stoer_wagner.py b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_stoer_wagner.py
new file mode 100644
index 00000000..2b9e2bab
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/networkx/algorithms/connectivity/tests/test_stoer_wagner.py
@@ -0,0 +1,102 @@
+from itertools import chain
+
+import pytest
+
+import networkx as nx
+
+
+def _check_partition(G, cut_value, partition, weight):
+ assert isinstance(partition, tuple)
+ assert len(partition) == 2
+ assert isinstance(partition[0], list)
+ assert isinstance(partition[1], list)
+ assert len(partition[0]) > 0
+ assert len(partition[1]) > 0
+ assert sum(map(len, partition)) == len(G)
+ assert set(chain.from_iterable(partition)) == set(G)
+ partition = tuple(map(set, partition))
+ w = 0
+ for u, v, e in G.edges(data=True):
+ if (u in partition[0]) == (v in partition[1]):
+ w += e.get(weight, 1)
+ assert w == cut_value
+
+
+def _test_stoer_wagner(G, answer, weight="weight"):
+ cut_value, partition = nx.stoer_wagner(G, weight, heap=nx.utils.PairingHeap)
+ assert cut_value == answer
+ _check_partition(G, cut_value, partition, weight)
+ cut_value, partition = nx.stoer_wagner(G, weight, heap=nx.utils.BinaryHeap)
+ assert cut_value == answer
+ _check_partition(G, cut_value, partition, weight)
+
+
+def test_graph1():
+ G = nx.Graph()
+ G.add_edge("x", "a", weight=3)
+ G.add_edge("x", "b", weight=1)
+ G.add_edge("a", "c", weight=3)
+ G.add_edge("b", "c", weight=5)
+ G.add_edge("b", "d", weight=4)
+ G.add_edge("d", "e", weight=2)
+ G.add_edge("c", "y", weight=2)
+ G.add_edge("e", "y", weight=3)
+ _test_stoer_wagner(G, 4)
+
+
+def test_graph2():
+ G = nx.Graph()
+ G.add_edge("x", "a")
+ G.add_edge("x", "b")
+ G.add_edge("a", "c")
+ G.add_edge("b", "c")
+ G.add_edge("b", "d")
+ G.add_edge("d", "e")
+ G.add_edge("c", "y")
+ G.add_edge("e", "y")
+ _test_stoer_wagner(G, 2)
+
+
+def test_graph3():
+ # Source:
+ # Stoer, M. and Wagner, F. (1997). "A simple min-cut algorithm". Journal of
+ # the ACM 44 (4), 585-591.
+ G = nx.Graph()
+ G.add_edge(1, 2, weight=2)
+ G.add_edge(1, 5, weight=3)
+ G.add_edge(2, 3, weight=3)
+ G.add_edge(2, 5, weight=2)
+ G.add_edge(2, 6, weight=2)
+ G.add_edge(3, 4, weight=4)
+ G.add_edge(3, 7, weight=2)
+ G.add_edge(4, 7, weight=2)
+ G.add_edge(4, 8, weight=2)
+ G.add_edge(5, 6, weight=3)
+ G.add_edge(6, 7, weight=1)
+ G.add_edge(7, 8, weight=3)
+ _test_stoer_wagner(G, 4)
+
+
+def test_weight_name():
+ G = nx.Graph()
+ G.add_edge(1, 2, weight=1, cost=8)
+ G.add_edge(1, 3, cost=2)
+ G.add_edge(2, 3, cost=4)
+ _test_stoer_wagner(G, 6, weight="cost")
+
+
+def test_exceptions():
+ G = nx.Graph()
+ pytest.raises(nx.NetworkXError, nx.stoer_wagner, G)
+ G.add_node(1)
+ pytest.raises(nx.NetworkXError, nx.stoer_wagner, G)
+ G.add_node(2)
+ pytest.raises(nx.NetworkXError, nx.stoer_wagner, G)
+ G.add_edge(1, 2, weight=-2)
+ pytest.raises(nx.NetworkXError, nx.stoer_wagner, G)
+ G = nx.DiGraph()
+ pytest.raises(nx.NetworkXNotImplemented, nx.stoer_wagner, G)
+ G = nx.MultiGraph()
+ pytest.raises(nx.NetworkXNotImplemented, nx.stoer_wagner, G)
+ G = nx.MultiDiGraph()
+ pytest.raises(nx.NetworkXNotImplemented, nx.stoer_wagner, G)