"""
Best Fit spectrum assignment algorithm implementation.
"""
import itertools
from operator import itemgetter
from typing import Any
import numpy as np
from fusion.core.properties import SDNProps, SpectrumProps
from fusion.interfaces.spectrum import AbstractSpectrumAssigner
from fusion.modules.spectrum.utils import SpectrumHelpers
[docs]
class BestFitSpectrum(AbstractSpectrumAssigner):
"""
Best Fit spectrum assignment algorithm.
This algorithm assigns spectrum by finding the smallest available contiguous
set of slots that can accommodate the request, minimizing fragmentation.
"""
[docs]
def __init__(self, engine_props: dict, sdn_props: SDNProps, route_props: object):
"""
Initialize Best Fit spectrum assignment algorithm.
:param engine_props: Dictionary containing engine configuration
:param sdn_props: Object containing SDN controller properties
:param route_props: Object containing routing properties
"""
super().__init__(engine_props, sdn_props, route_props)
self.spectrum_props = SpectrumProps()
self.spec_help_obj = SpectrumHelpers(
engine_props=self.engine_props,
sdn_props=self.sdn_props,
spectrum_props=self.spectrum_props,
)
self._assignments_made = 0
self._total_slots_assigned = 0
@property
def algorithm_name(self) -> str:
"""Return the name of the spectrum assignment algorithm."""
return "best_fit"
@property
def supports_multiband(self) -> bool:
"""Indicate whether this algorithm supports multi-band assignment."""
return True
[docs]
def assign(self, path: list[Any], request: Any) -> dict[str, Any] | None:
"""
Assign spectrum resources along the given path for the request.
:param path: List of nodes representing the path
:param request: Request object containing traffic demand and spectrum
requirements
:return: Dictionary containing spectrum assignment details or None if
assignment fails
"""
# Store path and request info
self.spectrum_props.path_list = path
# Get slots needed for the request
if hasattr(request, "slots_needed"):
self.spectrum_props.slots_needed = request.slots_needed
elif hasattr(request, "bandwidth"):
self.spectrum_props.slots_needed = self._calculate_slots_needed(request.bandwidth)
else:
self.spectrum_props.slots_needed = 1
# Reset assignment state
self.spectrum_props.is_free = False
self.spectrum_props.start_slot = None
self.spectrum_props.end_slot = None
self.spectrum_props.core_number = None
self.spectrum_props.current_band = None
# Try best fit allocation
success = self._find_best_fit()
if success:
self._assignments_made += 1
self._total_slots_assigned += self.spectrum_props.slots_needed
return {
"start_slot": self.spectrum_props.start_slot,
"end_slot": self.spectrum_props.end_slot,
"core_number": self.spectrum_props.core_number,
"band": self.spectrum_props.current_band,
"is_free": self.spectrum_props.is_free,
"slots_needed": self.spectrum_props.slots_needed,
}
return None
def _calculate_slots_needed(self, bandwidth: float) -> int:
"""Calculate number of slots needed for given bandwidth."""
slots_per_gbps = self.engine_props.get("slots_per_gbps", 1)
return int(np.ceil(bandwidth * slots_per_gbps))
def _find_best_fit(self) -> bool:
"""Find best fit spectrum assignment using the original algorithm logic."""
channels_list = []
# Get all potential super channels
path_list = self.spectrum_props.path_list
if path_list is None:
raise ValueError("path_list must not be None")
for i in range(len(path_list) - 1):
src = path_list[i]
dest = path_list[i + 1]
for core_num in range(self.engine_props.get("cores_per_link", 1)):
forced_core = getattr(self.spectrum_props, "forced_core", None)
if forced_core is not None and forced_core != core_num:
continue
for band in self.engine_props.get("band_list", ["c"]):
forced_band = getattr(self.spectrum_props, "forced_band", None)
if forced_band is not None and forced_band != band:
continue
link_key = (src, dest)
network_dict = self.sdn_props.network_spectrum_dict
if network_dict is None:
continue
if link_key not in network_dict:
continue
link_dict = network_dict[link_key]
if band not in link_dict["cores_matrix"]:
continue
core_arr = link_dict["cores_matrix"][band][core_num]
open_slots_arr = np.where(np.array(core_arr) == 0)[0]
# Group consecutive slots into channels
tmp_matrix = [
list(map(itemgetter(1), g)) for k, g in itertools.groupby(enumerate(open_slots_arr), lambda i_x: i_x[0] - i_x[1])
]
slots_needed = self.spectrum_props.slots_needed
if slots_needed is None:
raise ValueError("slots_needed must not be None")
for channel_list in tmp_matrix:
if len(channel_list) >= slots_needed:
channels_list.append(
{
"link": (src, dest),
"core": core_num,
"channel": channel_list,
"band": band,
}
)
# Sort the list of candidate super channels by size (best fit = smallest fit)
channels_list = sorted(channels_list, key=lambda d: len(d["channel"]))
return self._allocate_best_fit(channels_list)
def _allocate_best_fit(self, channels_list: list) -> bool:
"""Allocate spectrum using best fit from sorted channel list."""
for channel_dict in channels_list:
for start_index in channel_dict["channel"]:
slots_needed = self.spectrum_props.slots_needed
end_index = (start_index + slots_needed + self.engine_props.get("guard_slots", 0)) - 1
if end_index not in channel_dict["channel"]:
break
# Check if this assignment works for multi-hop paths
path_list = self.spectrum_props.path_list
if path_list is None:
raise ValueError("path_list must not be None")
if len(path_list) > 2:
self.spec_help_obj.start_index = start_index
self.spec_help_obj.end_index = end_index
self.spec_help_obj.core_number = channel_dict["core"]
self.spec_help_obj.current_band = channel_dict["band"]
self.spec_help_obj.check_other_links()
if self.spectrum_props.is_free or len(path_list) <= 2:
self.spectrum_props.is_free = True
self.spectrum_props.start_slot = start_index
self.spectrum_props.end_slot = end_index
self.spectrum_props.core_number = channel_dict["core"]
self.spectrum_props.current_band = channel_dict["band"]
return True
return False
[docs]
def check_spectrum_availability(self, path: list[Any], start_slot: int, end_slot: int, core_num: int, band: str) -> bool:
"""Check if spectrum slots are available along the entire path."""
for i in range(len(path) - 1):
source, dest = path[i], path[i + 1]
link_key = (source, dest)
network_dict = self.sdn_props.network_spectrum_dict
if network_dict is None:
return False
if link_key not in network_dict:
return False
link_dict = network_dict[link_key]
if band not in link_dict["cores_matrix"] or core_num >= len(link_dict["cores_matrix"][band]):
return False
core_array = link_dict["cores_matrix"][band][core_num]
# Check if all required slots are free
for slot in range(start_slot, end_slot + 1):
if slot >= len(core_array) or core_array[slot] != 0:
return False
return True
[docs]
def allocate_spectrum(
self,
path: list[Any],
start_slot: int,
end_slot: int,
core_num: int,
band: str,
request_id: Any,
) -> bool:
"""Allocate spectrum resources along the path."""
for i in range(len(path) - 1):
source, dest = path[i], path[i + 1]
link_key = (source, dest)
network_dict = self.sdn_props.network_spectrum_dict
if network_dict is None:
return False
if link_key not in network_dict:
return False
link_dict = network_dict[link_key]
core_array = link_dict["cores_matrix"][band][core_num]
# Allocate slots with request ID
for slot in range(start_slot, end_slot + 1):
core_array[slot] = request_id
return True
[docs]
def deallocate_spectrum(self, path: list[Any], start_slot: int, end_slot: int, core_num: int, band: str) -> bool:
"""Deallocate spectrum resources along the path."""
for i in range(len(path) - 1):
source, dest = path[i], path[i + 1]
link_key = (source, dest)
network_dict = self.sdn_props.network_spectrum_dict
if network_dict is None:
return False
if link_key not in network_dict:
return False
link_dict = network_dict[link_key]
core_array = link_dict["cores_matrix"][band][core_num]
# Free slots by setting to 0
for slot in range(start_slot, end_slot + 1):
core_array[slot] = 0
return True
[docs]
def get_fragmentation_metric(self, path: list[Any]) -> float:
"""Calculate fragmentation metric for the given path."""
total_fragmentation = 0.0
link_count = 0
for i in range(len(path) - 1):
source, dest = path[i], path[i + 1]
link_key = (source, dest)
network_dict = self.sdn_props.network_spectrum_dict
if network_dict is not None and link_key in network_dict:
link_dict = network_dict[link_key]
link_fragmentation = self._calculate_link_fragmentation(link_dict)
total_fragmentation += link_fragmentation
link_count += 1
return total_fragmentation / link_count if link_count > 0 else 0.0
def _calculate_link_fragmentation(self, link_dict: dict) -> float:
"""Calculate fragmentation for a single link."""
total_segments = 0
total_free_slots = 0
for band in link_dict["cores_matrix"]:
for core_array in link_dict["cores_matrix"][band]:
core_arr = np.array(core_array)
free_slots = np.where(core_arr == 0)[0]
if len(free_slots) > 0:
total_free_slots += len(free_slots)
# Count number of contiguous segments
segments = 1
for i in range(1, len(free_slots)):
if free_slots[i] - free_slots[i - 1] > 1:
segments += 1
total_segments += segments
# Higher fragmentation = more segments for same number of free slots
if total_free_slots == 0:
return 0.0
return total_segments / total_free_slots
[docs]
def get_metrics(self) -> dict[str, Any]:
"""Get spectrum assignment algorithm performance metrics."""
avg_slots = self._total_slots_assigned / self._assignments_made if self._assignments_made > 0 else 0
return {
"algorithm": self.algorithm_name,
"assignments_made": self._assignments_made,
"total_slots_assigned": self._total_slots_assigned,
"average_slots_per_assignment": avg_slots,
"supports_multiband": self.supports_multiband,
"fragmentation_optimized": True,
}
[docs]
def reset(self) -> None:
"""Reset the spectrum assignment algorithm state."""
self._assignments_made = 0
self._total_slots_assigned = 0
self.spectrum_props = SpectrumProps()