"""
Congestion-aware routing algorithm implementation.
"""
from typing import Any
import networkx as nx
import numpy as np
from fusion.core.properties import RoutingProps, SDNProps
from fusion.interfaces.router import AbstractRoutingAlgorithm
from fusion.modules.routing.utils import RoutingHelpers
from fusion.utils.data import sort_nested_dict_values
from fusion.utils.network import (
find_path_congestion,
find_path_length,
get_path_modulation,
)
[docs]
class CongestionAwareRouting(AbstractRoutingAlgorithm):
"""
Congestion-aware routing algorithm.
This algorithm finds paths by considering network congestion levels,
selecting the path with the least congested link.
"""
[docs]
def __init__(self, engine_props: dict[str, Any], sdn_props: SDNProps) -> None:
"""
Initialize congestion-aware routing algorithm.
:param engine_props: Dictionary containing engine configuration.
:type engine_props: dict[str, Any]
:param sdn_props: Object containing SDN controller properties.
:type sdn_props: Any
"""
super().__init__(engine_props, sdn_props)
self.route_props = RoutingProps()
self.route_help_obj = RoutingHelpers(
route_props=self.route_props,
engine_props=self.engine_props,
sdn_props=self.sdn_props,
)
self._path_count = 0
self._total_congestion = 0.0
@property
def algorithm_name(self) -> str:
"""
Get the name of the routing algorithm.
:return: The algorithm name 'congestion_aware'.
:rtype: str
"""
return "congestion_aware"
@property
def supported_topologies(self) -> list[str]:
"""
Get the list of supported topology types.
:return: List of supported topology names including NSFNet,
USBackbone60, Pan-European, and Generic.
:rtype: list[str]
"""
return ["NSFNet", "USBackbone60", "Pan-European", "Generic"]
[docs]
def validate_environment(self, topology: Any) -> bool:
"""
Validate that the routing algorithm can work with the given topology.
:param topology: NetworkX graph representing the network topology.
:type topology: Any
:return: True if the algorithm can route in this environment.
:rtype: bool
"""
# Check if topology has the required attributes for congestion calculation
return hasattr(topology, "nodes") and hasattr(topology, "edges") and hasattr(self.sdn_props, "network_spectrum_dict")
[docs]
def route(self, source: Any, destination: Any, request: Any) -> None:
"""
Find a route from source to destination using congestion-aware k-shortest.
For the first k shortest-length candidate paths we compute:
score = alpha * mean_path_congestion + (1 - alpha) * (path_len / max_len)
Results are stored in route_props (paths_matrix, modulation_formats_matrix,
weights_list). Consumers should access route_props.paths_matrix for paths.
:param source: Source node identifier.
:type source: Any
:param destination: Destination node identifier.
:type destination: Any
:param request: Request object containing traffic demand details.
:type request: Any
"""
# Store source/destination in sdn_props for compatibility
self.sdn_props.source = source
self.sdn_props.destination = destination
# Reset paths matrix for new calculation
self.route_props.paths_matrix = []
self.route_props.modulation_formats_matrix = []
self.route_props.weights_list = []
try:
self._find_congestion_aware_paths()
if self.route_props.paths_matrix:
self._path_count += 1
# Calculate congestion metric for the best path
best_path = self.route_props.paths_matrix[0]
congestion = self._calculate_path_congestion(best_path)
self._total_congestion += float(congestion)
except (nx.NetworkXNoPath, nx.NodeNotFound):
pass
def _find_congestion_aware_paths(self) -> None:
"""
Implement the sophisticated congestion-aware k-shortest routing.
For the first k shortest-length candidate paths computes:
score = alpha * mean_path_congestion + (1 - alpha) * (path_len / max_len)
All k paths are stored in route_props.*, sorted by score so the
downstream allocator will try the most promising path first.
"""
candidate_paths_data = self._gather_candidate_paths()
if not candidate_paths_data:
# Set blocked state - using block_reason on sdn_props
self.sdn_props.block_reason = "congestion"
return
scored_paths = self._calculate_path_scores(candidate_paths_data)
self._populate_route_properties(scored_paths)
def _gather_candidate_paths(self) -> dict[str, list[Any]]:
"""
Gather k shortest-length candidate paths with their metrics.
:return: Dictionary containing 'paths', 'lengths', and 'congestions' lists.
:rtype: dict[str, list[Any]]
"""
k_paths = int(self.engine_props.get("k_paths", 1))
topology = self.engine_props.get("topology", getattr(self.sdn_props, "topology", None))
paths_iterator = nx.shortest_simple_paths(
G=topology,
source=self.sdn_props.source,
target=self.sdn_props.destination,
weight="length",
)
candidate_paths, path_lengths, path_congestions = [], [], []
for path_index, path in enumerate(paths_iterator):
if path_index >= k_paths:
break
candidate_paths.append(path)
path_length = find_path_length(path_list=path, topology=topology)
path_lengths.append(path_length)
mean_congestion, _ = find_path_congestion(
path_list=path,
network_spectrum=getattr(self.sdn_props, "network_spectrum_dict", {}),
)
path_congestions.append(mean_congestion)
return {
"paths": candidate_paths,
"lengths": path_lengths,
"congestions": path_congestions,
}
def _calculate_path_scores(self, candidate_paths_data: dict[str, list[Any]]) -> list[tuple]:
"""
Calculate congestion-aware scores for candidate paths.
:param candidate_paths_data: Dictionary with paths, lengths, and congestions.
:type candidate_paths_data: dict[str, list[Any]]
:return: List of tuples (path, length, score) sorted by score.
:rtype: list[tuple]
"""
alpha = float(self.engine_props.get("ca_alpha", 0.3))
path_lengths_array = np.asarray(candidate_paths_data["lengths"], dtype=float)
path_congestions_array = np.asarray(candidate_paths_data["congestions"], dtype=float)
max_length = path_lengths_array.max() if path_lengths_array.max() > 0 else 1.0
normalized_hop_counts = path_lengths_array / max_length
scores = alpha * path_congestions_array + (1.0 - alpha) * normalized_hop_counts
# Create list of (path, length, score) tuples sorted by score
scored_paths = []
for idx in scores.argsort():
scored_paths.append(
(
candidate_paths_data["paths"][idx],
candidate_paths_data["lengths"][idx],
float(scores[idx]),
)
)
return scored_paths
def _populate_route_properties(self, scored_paths: list[tuple]) -> None:
"""
Populate route properties with scored paths in order.
:param scored_paths: List of (path, length, score) tuples sorted by score.
:type scored_paths: list[tuple]
"""
chosen_bandwidth = getattr(self.sdn_props, "bandwidth", None)
for path, path_length, score in scored_paths:
modulation_list = self._get_modulation_formats(path_length, chosen_bandwidth)
self.route_props.paths_matrix.append(path)
self.route_props.modulation_formats_matrix.append(modulation_list)
self.route_props.weights_list.append(score)
def _get_modulation_formats(self, path_length: float, chosen_bandwidth: Any) -> list[str | bool]:
"""
Get appropriate modulation formats for the given path length.
:param path_length: Length of the path.
:type path_length: float
:param chosen_bandwidth: Selected bandwidth for the connection.
:type chosen_bandwidth: Any
:return: List of modulation format strings.
:rtype: list[str]
"""
if not self.engine_props.get("pre_calc_mod_selection", False):
modulation_format = get_path_modulation(
mods_dict=self.engine_props["mod_per_bw"][chosen_bandwidth],
path_len=path_length,
)
if modulation_format and modulation_format is not True:
return [str(modulation_format)]
return ["QPSK"]
modulation_formats_sorted = sort_nested_dict_values(
original_dict=getattr(self.sdn_props, "mod_formats_dict", {}),
nested_key="max_length",
)
return list(modulation_formats_sorted.keys())
def _find_least_congested_path(self) -> list[Any] | None:
"""
Find the least congested path using the original algorithm logic.
:return: Path with the least congested bottleneck link, or None if no path
found.
:rtype: list[Any] | None
"""
topology = self.engine_props.get("topology", getattr(self.sdn_props, "topology", None))
all_paths_obj = nx.shortest_simple_paths(topology, self.sdn_props.source, self.sdn_props.destination)
min_hops = None
for i, path_list in enumerate(all_paths_obj):
num_hops = len(path_list)
if i == 0:
min_hops = num_hops
self._find_most_cong_link(path_list=path_list)
else:
if min_hops is not None and num_hops <= min_hops + 1:
self._find_most_cong_link(path_list=path_list)
else:
# We exceeded minimum hops plus one, return the best path
self._find_least_cong()
if self.route_props.paths_matrix:
first_path = self.route_props.paths_matrix[0]
if isinstance(first_path, dict) and "path_list" in first_path:
path_list = first_path["path_list"]
return list(path_list) if isinstance(path_list, (list, tuple)) else None
return list(first_path) if isinstance(first_path, (list, tuple)) else None
break
return None
def _find_most_cong_link(self, path_list: list[Any]) -> None:
"""
Find the most congested link along a path.
Identifies the link with the least free slots along the given path
and stores it in the route properties matrix.
:param path_list: List of node identifiers representing the path.
:type path_list: list[Any]
"""
most_cong_link = None
most_cong_slots = -1
for i in range(len(path_list) - 1):
network_spectrum_dict = getattr(self.sdn_props, "network_spectrum_dict", {})
link_dict = network_spectrum_dict[(path_list[i], path_list[i + 1])]
free_slots = 0
for band in link_dict["cores_matrix"]:
cores_matrix = link_dict["cores_matrix"][band]
for core_arr in cores_matrix:
free_slots += np.sum(core_arr == 0)
if free_slots < most_cong_slots or most_cong_link is None:
most_cong_slots = free_slots
most_cong_link = link_dict
self.route_props.paths_matrix.append(
{
"path_list": path_list,
"link_dict": {"link": most_cong_link, "free_slots": most_cong_slots},
}
)
def _find_least_cong(self) -> None:
"""
Select the path with the least congested bottleneck link.
Sorts all candidate paths by their most congested link's free slots
and updates the route properties with the best path.
"""
# Sort dictionary by number of free slots, descending
sorted_paths_list = sorted(
self.route_props.paths_matrix,
key=lambda d: d["link_dict"]["free_slots"],
reverse=True,
)
self.route_props.paths_matrix = [sorted_paths_list[0]["path_list"]]
self.route_props.weights_list = [int(sorted_paths_list[0]["link_dict"]["free_slots"])]
def _calculate_path_congestion(self, path: list[Any]) -> float:
"""
Calculate the congestion metric for a given path.
:param path: List of node identifiers representing the path.
:type path: list[Any]
:return: Congestion ratio (used slots / total slots) for the path.
Returns 0.0 if the path is invalid or has less than 2 nodes.
:rtype: float
"""
if not path or len(path) < 2:
return 0.0
total_used_slots = 0
total_slots = 0
network_spectrum_dict = getattr(self.sdn_props, "network_spectrum_dict", {})
for i in range(len(path) - 1):
link_key = (path[i], path[i + 1])
if link_key in network_spectrum_dict:
link_dict = network_spectrum_dict[link_key]
for band in link_dict["cores_matrix"]:
cores_matrix = link_dict["cores_matrix"][band]
for core_arr in cores_matrix:
total_slots += len(core_arr)
total_used_slots += np.sum(core_arr != 0)
return total_used_slots / total_slots if total_slots > 0 else 0.0
[docs]
def get_paths(self, source: Any, destination: Any, k: int = 1) -> list[list[Any]]:
"""
Get k paths ordered by congestion level.
:param source: Source node identifier.
:type source: Any
:param destination: Destination node identifier.
:type destination: Any
:param k: Number of paths to return.
:type k: int
:return: List of k paths ordered by congestion (least congested first).
:rtype: list[list[Any]]
"""
# For congestion-aware routing, we typically return the single best path
# But we can extend this to return multiple paths ordered by congestion
self.route(source, destination, None)
if self.route_props.paths_matrix:
return [self.route_props.paths_matrix[0]]
return []
[docs]
def update_weights(self, topology: Any) -> None:
"""
Update congestion weights based on current network state.
:param topology: NetworkX graph to update weights for.
:type topology: Any
"""
# Update congestion costs for all links based on current spectrum usage
network_spectrum_dict = getattr(self.sdn_props, "network_spectrum_dict", {})
for link_tuple in list(network_spectrum_dict.keys())[::2]:
source, destination = link_tuple
congestion = self._calculate_link_congestion(source, destination)
# Update both directions
if hasattr(topology, "edges"):
topology[source][destination]["cong_cost"] = congestion
topology[destination][source]["cong_cost"] = congestion
def _calculate_link_congestion(self, source: Any, destination: Any) -> float:
"""
Calculate congestion level for a specific link.
:param source: Source node identifier.
:type source: Any
:param destination: Destination node identifier.
:type destination: Any
:return: Congestion ratio (used slots / total slots) for the link.
:rtype: float
"""
link_key = (source, destination)
network_spectrum_dict = getattr(self.sdn_props, "network_spectrum_dict", {})
if link_key not in network_spectrum_dict:
return 0.0
link_dict = network_spectrum_dict[link_key]
total_used_slots = 0
total_slots = 0
for band in link_dict["cores_matrix"]:
cores_matrix = link_dict["cores_matrix"][band]
for core_arr in cores_matrix:
total_slots += len(core_arr)
total_used_slots += np.sum(core_arr != 0)
return total_used_slots / total_slots if total_slots > 0 else 0.0
[docs]
def get_metrics(self) -> dict[str, Any]:
"""
Get routing algorithm performance metrics.
:return: Dictionary containing algorithm-specific metrics including
algorithm name, paths computed, average congestion, and total
congestion considered.
:rtype: dict[str, Any]
"""
avg_congestion = self._total_congestion / self._path_count if self._path_count > 0 else 0
return {
"algorithm": self.algorithm_name,
"paths_computed": self._path_count,
"average_congestion": avg_congestion,
"total_congestion_considered": self._total_congestion,
}
[docs]
def reset(self) -> None:
"""Reset the routing algorithm state."""
self._path_count = 0
self._total_congestion = 0.0
self.route_props = RoutingProps()