Source code for fusion.modules.routing.one_plus_one_protection

"""
1+1 disjoint protection routing implementation.

This module provides a routing algorithm that computes link-disjoint primary
and backup paths for survivability. On failure, traffic switches to the backup
path with fixed protection switchover latency.
"""

import logging
from typing import Any, cast

import networkx as nx

from fusion.core.properties import SDNProps
from fusion.interfaces.router import AbstractRoutingAlgorithm
from fusion.utils.data import sort_nested_dict_values
from fusion.utils.network import find_path_length, get_path_modulation

logger = logging.getLogger(__name__)


[docs] class OnePlusOneProtection(AbstractRoutingAlgorithm): """ Traditional 1+1 disjoint protection routing for optical networks. Implements standard 1+1 protection as used in optical networking: - Each request gets ONE pair of link-disjoint paths (primary + backup) - Uses max-flow algorithm (Suurballe's) for optimal disjoint pair finding - Spectrum allocated on BOTH paths simultaneously with same slots - Traffic transmitted on both paths, receiver uses primary signal - Fast switchover on failure (protection switchover latency: default 50ms) Traditional Flow: 1. Find ONE optimal disjoint pair using max-flow algorithm 2. Validate BOTH paths are feasible (modulation format constraints) 3. Allocate spectrum on BOTH paths (shared protection spectrum) 4. Transmit on both, receiver monitors primary, switches to backup on failure Key features: - Max-flow based optimal disjoint path computation - Link-disjoint path validation (no shared links) - Modulation format feasibility check for both paths - Simultaneous dual allocation (shared spectrum) - Automatic failure detection and switchover - Optional revert-to-primary after repair :param engine_props: Engine configuration :type engine_props: dict[str, Any] :param sdn_props: SDN controller properties :type sdn_props: SDNProps """
[docs] def __init__(self, engine_props: dict[str, Any], sdn_props: SDNProps) -> None: """ Initialize 1+1 protection router. :param engine_props: Engine configuration :type engine_props: dict[str, Any] :param sdn_props: SDN properties :type sdn_props: SDNProps """ super().__init__(engine_props, sdn_props) self.topology = engine_props.get("topology", sdn_props.topology) self.protection_switchover_ms = engine_props.get("protection_settings", {}).get("protection_switchover_ms", 50.0) self.revert_to_primary = engine_props.get("protection_settings", {}).get("revert_to_primary", False) self._disjoint_paths_found = 0 self._disjoint_paths_failed = 0
@property def algorithm_name(self) -> str: """ Get the name of the routing algorithm. :return: The algorithm name '1plus1_protection'. :rtype: str """ return "1plus1_protection" @property def supported_topologies(self) -> list[str]: """ Get the list of supported topology types. :return: List of supported topology names. :rtype: list[str] """ return ["NSFNet", "USBackbone60", "Pan-European", "Generic"]
[docs] def validate_environment(self, topology: nx.Graph) -> bool: """ Validate that the routing algorithm can work with the given topology. 1+1 protection requires a connected graph with sufficient edge connectivity for disjoint paths. :param topology: NetworkX graph representing the network topology. :type topology: nx.Graph :return: True if the algorithm can route in this environment. :rtype: bool """ try: # Check if graph is connected if not nx.is_connected(topology): logger.warning("Topology is not connected") return False # Check if graph has sufficient edge connectivity # At least 2-edge-connected for disjoint paths edge_connectivity = nx.edge_connectivity(topology) if edge_connectivity < 2: logger.warning(f"Topology edge connectivity ({edge_connectivity}) is too low for 1+1 protection (need >= 2)") return False return True except Exception as e: logger.error(f"Error validating environment: {e}") return False
[docs] def route(self, source: Any, destination: Any, request: Any) -> None: """ Find ONE link-disjoint path pair for traditional 1+1 protection. Traditional 1+1 protection flow: 1. Use max-flow algorithm to find optimal disjoint pair 2. Validate BOTH paths are feasible (modulation format check) 3. Pass the single pair to SDN controller 4. SDN controller allocates spectrum on BOTH paths simultaneously 5. Traffic transmitted on both paths, receiver uses primary (fast switchover) :param source: Source node ID :type source: Any :param destination: Destination node ID :type destination: Any :param request: Request details (optional) :type request: Any Example: >>> router = OnePlusOneProtection(props, sdn_props) >>> router.route(0, 5) >>> # route_props contains ONE path pair >>> # SDN controller will allocate on BOTH primary and backup paths """ # Clear previous route properties self.route_props.paths_matrix = [] self.route_props.modulation_formats_matrix = [] self.route_props.weights_list = [] self.route_props.backup_paths_matrix = [] self.route_props.backup_modulation_formats_matrix = [] # Find all disjoint paths using max-flow algorithm (Suurballe's) all_disjoint_paths = self.find_all_disjoint_paths(source, destination) if len(all_disjoint_paths) < 2: logger.warning(f"1+1 protection: Could not find at least 2 disjoint paths for {source} -> {destination}") self._disjoint_paths_failed += 1 return # Try all possible pairs from disjoint paths, find first feasible pair # Suurballe's returns paths in optimal order (shortest first) # We try pairs in order: (P1,P2), (P1,P3), (P2,P3), ... until both are feasible primary_path = None backup_path = None primary_mods = None backup_mods = None for i in range(len(all_disjoint_paths)): for j in range(i + 1, len(all_disjoint_paths)): candidate_primary = all_disjoint_paths[i] candidate_backup = all_disjoint_paths[j] # Calculate modulation formats for BOTH paths in this pair temp_primary_mods = self._get_modulation_formats_for_path(candidate_primary) temp_backup_mods = self._get_modulation_formats_for_path(candidate_backup) # Check if BOTH paths have feasible modulation formats primary_feasible = any(mod and mod is not False for mod in temp_primary_mods) backup_feasible = any(mod and mod is not False for mod in temp_backup_mods) if primary_feasible and backup_feasible: # Found a feasible pair! primary_path = candidate_primary backup_path = candidate_backup primary_mods = temp_primary_mods backup_mods = temp_backup_mods logger.debug( f"1+1 protection: Found feasible pair (paths {i + 1}, {j + 1}) from {len(all_disjoint_paths)} disjoint paths" ) break if primary_path is not None: break # Found feasible pair, exit outer loop # Check if we found a feasible pair # Note: primary_mods and backup_mods are always set together with paths if primary_path is None or backup_path is None or primary_mods is None or backup_mods is None: logger.warning( f"1+1 protection: Found {len(all_disjoint_paths)} disjoint paths but " f"no pair where BOTH paths are feasible for {source} -> {destination}" ) self._disjoint_paths_failed += 1 return # Store paths in SDN properties self.sdn_props.primary_path = primary_path self.sdn_props.backup_path = backup_path self.sdn_props.is_protected = True self.sdn_props.active_path = "primary" # Populate route_props with the feasible pair # SDN controller will allocate spectrum on BOTH paths self.route_props.paths_matrix.append(primary_path) self.route_props.backup_paths_matrix.append(backup_path) self.route_props.modulation_formats_matrix.append(primary_mods) self.route_props.backup_modulation_formats_matrix.append(backup_mods) self.route_props.weights_list.append(len(primary_path) - 1) self._disjoint_paths_found += 1 logger.debug( f"1+1 protection: Using feasible pair for {source}->{destination}:\n" f" Primary: {primary_path} ({len(primary_path) - 1} hops, mods={primary_mods})\n" f" Backup: {backup_path} ({len(backup_path) - 1} hops, mods={backup_mods})" )
[docs] def find_all_disjoint_paths(self, source: Any, destination: Any) -> list[list[int]]: """ Find all link-disjoint paths using max-flow algorithm. Uses NetworkX's edge_disjoint_paths which implements Suurballe's algorithm to find all edge-disjoint paths between source and destination. :param source: Source node :type source: Any :param destination: Destination node :type destination: Any :return: List of all disjoint paths found (empty if none exist) :rtype: list[list[int]] Example: >>> all_paths = router.find_all_disjoint_paths(0, 5) >>> # Returns: [[0,1,5], [0,2,5], [0,3,4,5], ...] """ try: paths = list( nx.edge_disjoint_paths( self.topology, source, destination, flow_func=None, # Use default (shortest augmenting path) ) ) return [list(path) for path in paths] except (AttributeError, nx.NetworkXNoPath, nx.NetworkXError): return []
[docs] def find_disjoint_paths(self, source: Any, destination: Any) -> tuple[list[int] | None, list[int] | None]: """ Find link-disjoint primary and backup paths. Uses NetworkX's edge_disjoint_paths function for optimal disjoint path finding. Returns the first two paths found. :param source: Source node :type source: Any :param destination: Destination node :type destination: Any :return: (primary_path, backup_path) or (None, None) :rtype: tuple[list[int] | None, list[int] | None] Example: >>> primary, backup = router.find_disjoint_paths(0, 5) >>> # Verify disjointness >>> primary_links = set(zip(primary[:-1], primary[1:])) >>> backup_links = set(zip(backup[:-1], backup[1:])) >>> assert primary_links.isdisjoint(backup_links) """ all_paths = self.find_all_disjoint_paths(source, destination) if len(all_paths) >= 2: return all_paths[0], all_paths[1] return None, None
[docs] def find_disjoint_paths_k_shortest(self, source: Any, destination: Any, k: int = 10) -> tuple[list[int] | None, list[int] | None]: """ Find disjoint paths using K-shortest paths (alternative method). This is an alternative to edge_disjoint_paths. It finds the shortest path as primary, then finds the shortest path on a graph with primary links removed. Note: This method is retained for potential future use or comparison, but find_disjoint_paths() should be preferred as it uses a max-flow algorithm which is more robust. :param source: Source node :type source: Any :param destination: Destination node :type destination: Any :param k: Max paths to consider :type k: int :return: (primary, backup) paths :rtype: tuple[list[int] | None, list[int] | None] """ # Find K shortest paths try: k_paths = list(nx.shortest_simple_paths(self.topology, source, destination)) except nx.NetworkXNoPath: return None, None if len(k_paths) < 2: return None, None # Take first path as primary primary = k_paths[0] primary_links = set(zip(primary[:-1], primary[1:], strict=False)) # Find first path that is link-disjoint with primary for candidate in k_paths[1:k]: candidate_links = set(zip(candidate[:-1], candidate[1:], strict=False)) # Check for link-disjointness (both directions) is_disjoint = True for link in candidate_links: if link in primary_links or (link[1], link[0]) in primary_links: is_disjoint = False break if is_disjoint: return primary, candidate return None, None
[docs] def select_best_path_pair(self, path_pairs: list[tuple[list[int], list[int]]]) -> tuple[list[int], list[int]]: """ Select the best path pair from multiple options. Currently selects based on shortest combined path length (primary + backup). This minimizes total resource usage while maintaining protection. :param path_pairs: List of (primary, backup) path tuples :type path_pairs: list[tuple[list[int], list[int]]] :return: Best (primary, backup) path pair :rtype: tuple[list[int], list[int]] Example: >>> pairs = [([0,1,2], [0,3,2]), ([0,4,5,2], [0,3,2])] >>> best = router.select_best_path_pair(pairs) >>> print(best) ([0, 1, 2], [0, 3, 2]) # Shortest combined length """ if not path_pairs: raise ValueError("No path pairs to select from") # Select pair with shortest combined length best_pair = min(path_pairs, key=lambda pair: len(pair[0]) + len(pair[1])) return best_pair
[docs] def handle_failure(self, current_time: float, affected_requests: list[dict[str, Any]]) -> list[dict[str, Any]]: """ Handle failure by switching protected requests to backup. :param current_time: Current simulation time :type current_time: float :param affected_requests: Requests on failed links :type affected_requests: list[dict[str, Any]] :return: Recovery actions performed :rtype: list[dict[str, Any]] Example: >>> actions = router.handle_failure(100.0, affected_requests) >>> for action in actions: ... print(f"Request {action['request_id']}: " ... f"switched in {action['recovery_time_ms']}ms") """ recovery_actions = [] for request in affected_requests: if request.get("is_protected", False): # Switch to backup recovery_time_ms = self.protection_switchover_ms recovery_actions.append( { "request_id": request["id"], "action": "switchover", "recovery_time_ms": recovery_time_ms, "from_path": "primary", "to_path": "backup", } ) logger.info(f"Request {request['id']}: 1+1 switchover ({recovery_time_ms}ms)") return recovery_actions
[docs] def get_paths(self, source: Any, destination: Any, k: int = 2) -> list[list[Any]]: """ Get k shortest paths between source and destination. For 1+1 protection, this returns the primary and backup paths. :param source: Source node identifier :type source: Any :param destination: Destination node identifier :type destination: Any :param k: Number of paths to return (default 2 for 1+1) :type k: int :return: List of k paths, where each path is a list of nodes :rtype: list[list[Any]] """ primary, backup = self.find_disjoint_paths(source, destination) # Return both paths if available if primary and backup: return [primary, backup] if primary: return [primary] return []
[docs] def update_weights(self, topology: nx.Graph) -> None: """ Update edge weights based on current network state. For 1+1 protection, weights are typically uniform (hop count). :param topology: NetworkX graph to update weights for :type topology: nx.Graph """ # Set uniform weights (hop count) for u, v in topology.edges(): topology[u][v]["weight"] = 1.0
[docs] def get_metrics(self) -> dict[str, Any]: """ Get routing algorithm performance metrics. :return: Dictionary containing algorithm-specific metrics :rtype: dict[str, Any] """ total_attempts = self._disjoint_paths_found + self._disjoint_paths_failed success_rate = self._disjoint_paths_found / total_attempts if total_attempts > 0 else 0.0 return { "algorithm": self.algorithm_name, "disjoint_paths_found": self._disjoint_paths_found, "disjoint_paths_failed": self._disjoint_paths_failed, "success_rate": success_rate, "protection_switchover_ms": self.protection_switchover_ms, "revert_to_primary": self.revert_to_primary, }
def _get_modulation_formats_for_path(self, path: list[Any]) -> list[str | bool]: """ Get modulation formats for a given path. Determines appropriate modulation formats based on path length, bandwidth requirements, and available modulation format configurations. Returns False for infeasible modulation formats. :param path: List of nodes representing the path :type path: list[Any] :return: List of modulation format strings or False for infeasible :rtype: list[str | bool] """ path_length = find_path_length(path_list=path, topology=self.topology) chosen_bandwidth = getattr(self.sdn_props, "bandwidth", None) if chosen_bandwidth and not self.engine_props.get("pre_calc_mod_selection", False): # Use mod_per_bw if available if "mod_per_bw" in self.engine_props and chosen_bandwidth in self.engine_props["mod_per_bw"]: modulation_format = get_path_modulation( mods_dict=self.engine_props["mod_per_bw"][chosen_bandwidth], path_len=path_length, ) return [str(modulation_format)] else: # Fallback to mod_formats modulation_formats = getattr(self.sdn_props, "mod_formats", {}) modulation_format = get_path_modulation(modulation_formats, path_length) return [str(modulation_format)] else: # Use all modulation formats sorted by max_length has_mod_dict = hasattr(self.sdn_props, "modulation_formats_dict") if has_mod_dict and self.sdn_props.modulation_formats_dict is not None: modulation_formats_dict = sort_nested_dict_values( original_dict=self.sdn_props.modulation_formats_dict, nested_key="max_length", ) # Ensure all keys are strings result = [str(key) for key in modulation_formats_dict.keys()][::-1] return cast(list[str | bool], result) else: # Fallback to simple list return ["QPSK"]
[docs] def reset(self) -> None: """ Reset the routing algorithm state. Clears metrics and counters. """ self._disjoint_paths_found = 0 self._disjoint_paths_failed = 0