diff --git a/nx_parallel/algorithms/__init__.py b/nx_parallel/algorithms/__init__.py index 02b8366d..f19c01cc 100644 --- a/nx_parallel/algorithms/__init__.py +++ b/nx_parallel/algorithms/__init__.py @@ -15,3 +15,4 @@ from .cluster import * from .link_prediction import * from .dag import * +from .mis import * diff --git a/nx_parallel/algorithms/mis.py b/nx_parallel/algorithms/mis.py new file mode 100644 index 00000000..c561572b --- /dev/null +++ b/nx_parallel/algorithms/mis.py @@ -0,0 +1,194 @@ +import inspect +from joblib import Parallel, delayed +import nx_parallel as nxp +import networkx as nx + +__all__ = ["maximal_independent_set"] + +# Import the actual NetworkX implementation (fully unwrapped, not the dispatcher) +from networkx.algorithms.mis import maximal_independent_set as _nx_mis_dispatcher +_nx_mis = inspect.unwrap(_nx_mis_dispatcher) + + +@nxp._configure_if_nx_active(should_run=nxp.should_run_if_large(50000)) +def maximal_independent_set(G, nodes=None, seed=None, get_chunks="chunks"): + """Returns a random maximal independent set guaranteed to contain + a given set of nodes. + + This parallel implementation processes nodes in chunks across multiple + cores, using a Luby-style randomized parallel algorithm for speedup + on large graphs. + + An independent set is a set of nodes such that the subgraph + of G induced by these nodes contains no edges. A maximal + independent set is an independent set such that it is not possible + to add a new node and still get an independent set. + + The parallel computation divides nodes into chunks and processes them + in parallel, iteratively building the independent set faster than + sequential processing on large graphs. + + networkx.maximal_independent_set: https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.mis.maximal_independent_set.html + + Parameters + ---------- + G : NetworkX graph + An undirected graph. + + nodes : list or iterable, optional + Nodes that must be part of the independent set. This set of nodes + must be independent. If not provided, a random starting node is chosen. + + seed : integer, random_state, or None (default) + Indicator of random number generation state. + See :ref:`Randomness`. + + get_chunks : str, function (default = "chunks") + A function that takes in a list of nodes and returns chunks. + The default chunking divides nodes into n_jobs chunks. + + Returns + ------- + indep_nodes : list + List of nodes that are part of a maximal independent set. + + Raises + ------ + NetworkXUnfeasible + If the nodes in the provided list are not part of the graph or + do not form an independent set, an exception is raised. + + NetworkXNotImplemented + If `G` is directed. + + Examples + -------- + >>> import networkx as nx + >>> import nx_parallel as nxp + >>> G = nx.path_graph(5) + >>> nxp.maximal_independent_set(G) # doctest: +SKIP + [4, 0, 2] + >>> nxp.maximal_independent_set(G, [1]) # doctest: +SKIP + [1, 3] + + Notes + ----- + This algorithm does not solve the maximum independent set problem. + The parallel version uses a chunk-based parallel algorithm that + provides speedup on large graphs (>= 50000 nodes). For smaller graphs, + the NetworkX sequential version is used automatically. + + """ + if hasattr(G, "graph_object"): + G = G.graph_object + + # Validate directed graph + if G.is_directed(): + raise nx.NetworkXNotImplemented("Not implemented for directed graphs.") + + # Convert seed to Random object if needed (for fallback and parallel execution) + import random + if seed is not None: + if hasattr(seed, 'random'): + # It's already a RandomState/Random object + rng = seed + else: + # It's a seed value + rng = random.Random(seed) + else: + rng = random.Random() + + # Check if we should run parallel version + # This is needed when backend is explicitly specified + should_run_result = maximal_independent_set.should_run(G, nodes, seed) + if should_run_result is not True: + # Fall back to NetworkX sequential (unwrapped version needs Random object) + return _nx_mis(G, nodes=nodes, seed=rng) + + # Validate nodes parameter + if nodes is not None: + nodes_set = set(nodes) + if not nodes_set.issubset(G): + raise nx.NetworkXUnfeasible(f"{nodes} is not a subset of the nodes of G") + neighbors = set.union(*[set(G.adj[v]) for v in nodes_set]) if nodes_set else set() + if set.intersection(neighbors, nodes_set): + raise nx.NetworkXUnfeasible(f"{nodes} is not an independent set of G") + else: + nodes_set = set() + + n_jobs = nxp.get_n_jobs() + + # Parallel strategy: Run complete MIS algorithm on node chunks independently + # Then merge results by resolving conflicts + all_nodes = list(G.nodes()) + + # Remove required nodes and their neighbors from consideration + if nodes_set: + available = set(all_nodes) - nodes_set + for node in nodes_set: + available.discard(node) + available.difference_update(G.neighbors(node)) + available = list(available) + else: + available = all_nodes + + # Shuffle for randomness + rng.shuffle(available) + + # Split into chunks + if get_chunks == "chunks": + chunks = list(nxp.chunks(available, n_jobs)) + else: + chunks = list(get_chunks(available)) + + # Precompute adjacency + adj_dict = {node: set(G.neighbors(node)) for node in G.nodes()} + + def _process_chunk_independent(chunk, chunk_seed): + """Process chunk completely independently - build local MIS.""" + local_rng = random.Random(chunk_seed) + local_mis = [] + local_excluded = set() + + # Shuffle chunk for randomness + chunk_list = list(chunk) + local_rng.shuffle(chunk_list) + + for node in chunk_list: + if node not in local_excluded: + # Add to MIS + local_mis.append(node) + local_excluded.add(node) + # Mark neighbors as excluded (only within this chunk) + for neighbor in adj_dict[node]: + if neighbor in chunk_list: + local_excluded.add(neighbor) + + return local_mis + + # Generate seeds for each chunk + chunk_seeds = [rng.randint(0, 2**31 - 1) for _ in range(len(chunks))] + + # Process chunks in parallel + results = Parallel()( + delayed(_process_chunk_independent)(chunk, chunk_seeds[i]) + for i, chunk in enumerate(chunks) + ) + + # Merge results: resolve conflicts between chunks + indep_set = list(nodes_set) if nodes_set else [] + excluded = set(nodes_set) + + if nodes_set: + for node in nodes_set: + excluded.update(adj_dict[node]) + + # Process results in order, greedily adding non-conflicting nodes + for local_mis in results: + for node in local_mis: + if node not in excluded: + indep_set.append(node) + excluded.add(node) + excluded.update(adj_dict[node]) + + return indep_set diff --git a/nx_parallel/algorithms/tests/test_mis.py b/nx_parallel/algorithms/tests/test_mis.py new file mode 100644 index 00000000..a99463a1 --- /dev/null +++ b/nx_parallel/algorithms/tests/test_mis.py @@ -0,0 +1,125 @@ +import networkx as nx +import nx_parallel as nxp +import pytest + + +def test_maximal_independent_set_basic(): + G = nx.path_graph(5) + H = nxp.ParallelGraph(G) + result = nxp.maximal_independent_set(H) + + result_set = set(result) + for node in result: + neighbors = set(G.neighbors(node)) + assert not result_set.intersection(neighbors) + + for node in G.nodes(): + if node not in result_set: + neighbors = set(G.neighbors(node)) + assert result_set.intersection(neighbors) + + +def test_maximal_independent_set_with_required_nodes(): + G = nx.path_graph(7) + H = nxp.ParallelGraph(G) + required_nodes = [1, 3] + result = nxp.maximal_independent_set(H, nodes=required_nodes) + + assert 1 in result + assert 3 in result + + result_set = set(result) + for node in result: + neighbors = set(G.neighbors(node)) + assert not result_set.intersection(neighbors) + + +def test_maximal_independent_set_invalid_nodes(): + G = nx.path_graph(5) + H = nxp.ParallelGraph(G) + + with pytest.raises(nx.NetworkXUnfeasible): + nxp.maximal_independent_set(H, nodes=[10, 20]) + + with pytest.raises(nx.NetworkXUnfeasible): + nxp.maximal_independent_set(H, nodes=[0, 1]) + + +def test_maximal_independent_set_directed_graph(): + G = nx.DiGraph([(0, 1), (1, 2)]) + H = nxp.ParallelGraph(G) + + with pytest.raises(nx.NetworkXNotImplemented): + nxp.maximal_independent_set(H) + + +def test_maximal_independent_set_deterministic_with_seed(): + G = nx.karate_club_graph() + H = nxp.ParallelGraph(G) + + result1 = nxp.maximal_independent_set(H, seed=42) + result2 = nxp.maximal_independent_set(H, seed=42) + + assert result1 == result2 + + +def test_maximal_independent_set_different_seeds(): + G = nx.karate_club_graph() + H = nxp.ParallelGraph(G) + + result1 = nxp.maximal_independent_set(H, seed=42) + result2 = nxp.maximal_independent_set(H, seed=100) + + for result in [result1, result2]: + result_set = set(result) + for node in result: + neighbors = set(G.neighbors(node)) + assert not result_set.intersection(neighbors) + + +def test_maximal_independent_set_complete_graph(): + G = nx.complete_graph(5) + H = nxp.ParallelGraph(G) + result = nxp.maximal_independent_set(H) + + assert len(result) == 1 + + +def test_maximal_independent_set_empty_graph(): + G = nx.empty_graph(5) + H = nxp.ParallelGraph(G) + result = nxp.maximal_independent_set(H) + + assert len(result) == 5 + + +def test_maximal_independent_set_large_graph(): + G = nx.fast_gnp_random_graph(150, 0.1, seed=42) + H = nxp.ParallelGraph(G) + result = nxp.maximal_independent_set(H, seed=42) + + result_set = set(result) + for node in result: + neighbors = set(G.neighbors(node)) + assert not result_set.intersection(neighbors) + + for node in G.nodes(): + if node not in result_set: + neighbors = set(G.neighbors(node)) + assert result_set.intersection(neighbors) + + +def test_maximal_independent_set_random_graph(): + G = nx.fast_gnp_random_graph(50, 0.1, seed=42) + H = nxp.ParallelGraph(G) + result = nxp.maximal_independent_set(H, seed=42) + + result_set = set(result) + for node in result: + neighbors = set(G.neighbors(node)) + assert not result_set.intersection(neighbors) + + for node in G.nodes(): + if node not in result_set: + neighbors = set(G.neighbors(node)) + assert result_set.intersection(neighbors) diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index 4674a0a8..3fc4e1f9 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -63,6 +63,8 @@ "average_neighbor_degree", # Connectivity "all_pairs_node_connectivity", + # Maximal Independent Set + "maximal_independent_set", ] diff --git a/nx_parallel/tests/test_get_chunks.py b/nx_parallel/tests/test_get_chunks.py index 284e70ed..5d2e86f7 100644 --- a/nx_parallel/tests/test_get_chunks.py +++ b/nx_parallel/tests/test_get_chunks.py @@ -38,6 +38,7 @@ def test_get_functions_with_get_chunks(): ignore_funcs = [ "number_of_isolates", "is_reachable", + "maximal_independent_set", ] diff --git a/nx_parallel/utils/should_run_policies.py b/nx_parallel/utils/should_run_policies.py index cd856951..101db660 100644 --- a/nx_parallel/utils/should_run_policies.py +++ b/nx_parallel/utils/should_run_policies.py @@ -9,20 +9,39 @@ ] -def should_skip_parallel(*_): +def should_skip_parallel(*_, **__): return "Fast algorithm; skip parallel execution" -def should_run_if_large(G, *_): - if hasattr(G, "graph_object"): - G = G.graph_object +def should_run_if_large(nodes_threshold=200, *_, **__): + # If nodes_threshold is a graph-like object, it's being used as a direct should_run + # function instead of a factory. Use default threshold. + if hasattr(nodes_threshold, '__len__') and hasattr(nodes_threshold, 'nodes'): + # nodes_threshold is actually a graph, use it as G with default threshold + G = nodes_threshold + threshold = 200 - if len(G) <= 200: - return "Graph too small for parallel execution" - return True + if hasattr(G, "graph_object"): + G = G.graph_object + + if len(G) < threshold: + return "Graph too small for parallel execution" + return True + + # Otherwise, it's being used as a factory, return a wrapper + threshold = nodes_threshold + def wrapper(G, *_, **__): + if hasattr(G, "graph_object"): + G = G.graph_object + + if len(G) < threshold: + return "Graph too small for parallel execution" + return True + + return wrapper -def default_should_run(*_): +def default_should_run(*_, **__): n_jobs = nxp.get_n_jobs() print(f"{n_jobs=}") if n_jobs in (None, 0, 1): @@ -31,7 +50,7 @@ def default_should_run(*_): def should_run_if_sparse(threshold=0.3): - def wrapper(G, *_): + def wrapper(G, *_, **__): if hasattr(G, "graph_object"): G = G.graph_object