Source code for fusion.modules.routing.xt_aware

"""
Cross-talk (XT) aware routing algorithm implementation.
"""

from typing import Any

import networkx as nx

from fusion.core.properties import RoutingProps, SDNProps
from fusion.interfaces.router import AbstractRoutingAlgorithm
from fusion.modules.routing.utils import RoutingHelpers
from fusion.utils.data import sort_nested_dict_values
from fusion.utils.network import find_path_length, get_path_modulation
from fusion.utils.spectrum import find_free_slots


[docs] class XTAwareRouting(AbstractRoutingAlgorithm): """ Cross-talk aware routing algorithm. This algorithm finds paths by considering intra-core crosstalk interference, selecting the path with the least amount of cross-talk. """
[docs] def __init__(self, engine_props: dict[str, Any], sdn_props: SDNProps) -> None: """ Initialize XT-aware routing algorithm. :param engine_props: Dictionary containing engine configuration. :type engine_props: dict[str, Any] :param sdn_props: Object containing SDN controller properties. :type sdn_props: Any """ super().__init__(engine_props, sdn_props) self.route_props = RoutingProps() self.route_help_obj = RoutingHelpers( engine_props=self.engine_props, sdn_props=self.sdn_props, route_props=self.route_props, ) self._path_count = 0 self._total_xt = 0.0
@property def algorithm_name(self) -> str: """ Get the name of the routing algorithm. :return: The algorithm name 'xt_aware'. :rtype: str """ return "xt_aware" @property def supported_topologies(self) -> list[str]: """ Get the list of supported topology types. :return: List of supported topology names including NSFNet, USBackbone60, Pan-European, and Generic. :rtype: list[str] """ return ["NSFNet", "USBackbone60", "Pan-European", "Generic"]
[docs] def validate_environment(self, topology: Any) -> bool: """ Validate that the routing algorithm can work with the given topology. :param topology: NetworkX graph representing the network topology. :type topology: Any :return: True if the algorithm can route in this environment. :rtype: bool """ return hasattr(topology, "nodes") and hasattr(topology, "edges") and hasattr(self.sdn_props, "network_spectrum_dict")
[docs] def route(self, source: Any, destination: Any, request: Any) -> None: """ Find a route from source to destination considering cross-talk. Results are stored in route_props (paths_matrix, modulation_formats_matrix, weights_list). Consumers should access route_props.paths_matrix for paths. :param source: Source node identifier. :type source: Any :param destination: Destination node identifier. :type destination: Any :param request: Request object containing traffic demand details. :type request: Any """ # Store source/destination in sdn_props for compatibility self.sdn_props.source = source self.sdn_props.destination = destination # Reset paths matrix for new calculation self.route_props.paths_matrix = [] self.route_props.weights_list = [] self.route_props.modulation_formats_matrix = [] try: # Update XT costs for all links self._update_xt_costs() # Find least XT path self._find_least_weight("xt_cost") if self.route_props.paths_matrix: path = self.route_props.paths_matrix[0] self._path_count += 1 # Calculate XT metric for this path xt = self._calculate_path_xt(path) self._total_xt += float(xt) except (nx.NetworkXNoPath, nx.NodeNotFound): pass
def _update_xt_costs(self) -> None: """ Update cross-talk costs for all links in the topology. Calculates and assigns cross-talk scores to all network links based on current spectrum utilization and XT calculation type. Updates both directions of bidirectional links with the same XT cost. """ topology = self.engine_props.get("topology", getattr(self.sdn_props, "topology", None)) # At the moment, we have identical bidirectional links # (no need to loop over all links) network_spectrum_dict = getattr(self.sdn_props, "network_spectrum_dict", {}) for link_tuple in list(network_spectrum_dict.keys())[::2]: source_node, destination_node = link_tuple[0], link_tuple[1] span_count = 1.0 if topology is not None: span_count = topology[source_node][destination_node]["length"] / self.route_props.span_length available_slots_dict = find_free_slots( network_spectrum_dict=network_spectrum_dict, link_tuple=link_tuple, ) crosstalk_cost = self.route_help_obj.find_xt_link_cost(free_slots_dict=available_slots_dict, link_list=link_tuple) # Consider XT type configuration xt_calculation_type = self.engine_props.get("xt_type") beta_coefficient = self.engine_props.get("beta", 0.5) if xt_calculation_type == "with_length": if self.route_props.max_link_length is None: self.route_help_obj.get_max_link_length() normalized_length = 1.0 if topology is not None: normalized_length = topology[source_node][destination_node]["length"] / self.route_props.max_link_length length_weighted_cost = normalized_length * beta_coefficient crosstalk_weighted_cost = (1 - beta_coefficient) * crosstalk_cost final_link_cost = length_weighted_cost + crosstalk_weighted_cost elif xt_calculation_type == "without_length": final_link_cost = span_count * crosstalk_cost else: # Default behavior final_link_cost = crosstalk_cost if topology is not None and hasattr(topology, "edges"): topology[source_node][destination_node]["xt_cost"] = final_link_cost topology[destination_node][source_node]["xt_cost"] = final_link_cost def _find_least_weight(self, weight: str) -> None: """ Find the path with the least weight based on cross-talk cost. Updates the route properties with the path having minimum cross-talk, along with its weight and modulation format. :param weight: The edge attribute name to use for weight calculation, typically 'xt_cost'. :type weight: str """ topology = self.engine_props.get("topology", getattr(self.sdn_props, "topology", None)) paths_generator = nx.shortest_simple_paths( G=topology, source=self.sdn_props.source, target=self.sdn_props.destination, weight=weight, ) for path_list in paths_generator: # Calculate path weight as sum across the path path_weight = 0.0 if topology is not None: path_weight = sum(topology[path_list[i]][path_list[i + 1]][weight] for i in range(len(path_list) - 1)) # Calculate actual path length for modulation format selection path_length = find_path_length(path_list=path_list, topology=topology) # Get modulation formats based on path length has_mod_dict = hasattr(self.sdn_props, "modulation_formats_dict") if has_mod_dict and self.sdn_props.modulation_formats_dict is not None: # Use sorted modulation formats mod_formats_dict = sort_nested_dict_values( original_dict=self.sdn_props.modulation_formats_dict, nested_key="max_length", ) mod_format_list: list[str | bool] = [] for mod_format in mod_formats_dict: if self.sdn_props.modulation_formats_dict[mod_format]["max_length"] >= path_length: mod_format_list.append(mod_format) else: mod_format_list.append(False) else: # Fallback to simple modulation selection mod_formats = getattr(self.sdn_props, "mod_formats", {}) modulation_format = get_path_modulation(mod_formats, path_length) if modulation_format and modulation_format is not True: mod_format_list = [str(modulation_format)] else: mod_format_list = ["QPSK"] self.route_props.weights_list.append(path_weight) self.route_props.paths_matrix.append(path_list) self.route_props.modulation_formats_matrix.append(mod_format_list) # For XT-aware, we typically take the first (best) path break def _calculate_path_xt(self, path: list[Any]) -> float: """ Calculate the cross-talk metric for a given path. :param path: List of node identifiers representing the path. :type path: list[Any] :return: Cross-talk metric value for the path. Returns 0.0 if the path is invalid or has less than 2 nodes. :rtype: float """ if not path or len(path) < 2: return 0.0 topology = self.engine_props.get("topology", getattr(self.sdn_props, "topology", None)) total_xt = 0.0 for i in range(len(path) - 1): source, destination = path[i], path[i + 1] if topology is not None and hasattr(topology, "edges") and topology.has_edge(source, destination): xt_cost = topology[source][destination].get("xt_cost", 0.0) total_xt += xt_cost return total_xt
[docs] def get_paths(self, source: Any, destination: Any, k: int = 1) -> list[list[Any]]: """ Get k paths ordered by cross-talk level. :param source: Source node identifier. :type source: Any :param destination: Destination node identifier. :type destination: Any :param k: Number of paths to return. :type k: int :return: List of k paths ordered by cross-talk (least XT first). :rtype: list[list[Any]] """ # Update XT costs first self._update_xt_costs() topology = self.engine_props.get("topology", getattr(self.sdn_props, "topology", None)) try: paths_generator = nx.shortest_simple_paths(topology, source, destination, weight="xt_cost") paths = [] for i, path in enumerate(paths_generator): if i >= k: break paths.append(list(path)) return paths except (nx.NetworkXNoPath, nx.NodeNotFound): return []
[docs] def update_weights(self, topology: Any) -> None: """ Update cross-talk weights based on current spectrum state. :param topology: NetworkX graph to update weights for. :type topology: Any """ # Recalculate XT costs for all links network_spectrum_dict = getattr(self.sdn_props, "network_spectrum_dict", {}) for link_list in list(network_spectrum_dict.keys())[::2]: source, destination = link_list[0], link_list[1] num_spans = topology[source][destination]["length"] / self.route_props.span_length free_slots_dict = find_free_slots( network_spectrum_dict=network_spectrum_dict, link_tuple=link_list, ) xt_cost = self.route_help_obj.find_xt_link_cost(free_slots_dict=free_slots_dict, link_list=link_list) # Apply XT type configuration if self.engine_props.get("xt_type") == "with_length": if self.route_props.max_link_length is None: self.route_help_obj.get_max_link_length() link_cost = topology[source][destination]["length"] / self.route_props.max_link_length link_cost *= self.engine_props.get("beta", 0.5) link_cost += (1 - self.engine_props.get("beta", 0.5)) * xt_cost elif self.engine_props.get("xt_type") == "without_length": link_cost = num_spans * xt_cost else: link_cost = xt_cost if hasattr(topology, "edges"): topology[source][destination]["xt_cost"] = link_cost topology[destination][source]["xt_cost"] = link_cost
[docs] def get_metrics(self) -> dict[str, Any]: """ Get routing algorithm performance metrics. :return: Dictionary containing algorithm-specific metrics including algorithm name, paths computed, average XT, total XT considered, and XT calculation type. :rtype: dict[str, Any] """ avg_xt = self._total_xt / self._path_count if self._path_count > 0 else 0 return { "algorithm": self.algorithm_name, "paths_computed": self._path_count, "average_xt": avg_xt, "total_xt_considered": self._total_xt, "xt_type": self.engine_props.get("xt_type", "default"), }
[docs] def reset(self) -> None: """Reset the routing algorithm state.""" self._path_count = 0 self._total_xt = 0.0 self.route_props = RoutingProps()