Source code for fusion.modules.spectrum.first_fit

"""
First Fit spectrum assignment algorithm implementation.
"""

from typing import Any

import numpy as np

from fusion.core.properties import SDNProps, SpectrumProps
from fusion.interfaces.spectrum import AbstractSpectrumAssigner
from fusion.modules.spectrum.utils import SpectrumHelpers


[docs] class FirstFitSpectrum(AbstractSpectrumAssigner): """ First Fit spectrum assignment algorithm. This algorithm assigns spectrum by finding the first available contiguous set of slots that can accommodate the request. """
[docs] def __init__(self, engine_props: dict, sdn_props: SDNProps, route_props: object): """ Initialize First Fit spectrum assignment algorithm. :param engine_props: Dictionary containing engine configuration :param sdn_props: Object containing SDN controller properties :param route_props: Object containing routing properties """ super().__init__(engine_props, sdn_props, route_props) self.spectrum_props = SpectrumProps() self.spec_help_obj = SpectrumHelpers( engine_props=self.engine_props, sdn_props=self.sdn_props, spectrum_props=self.spectrum_props, ) self._assignments_made = 0 self._total_slots_assigned = 0
@property def algorithm_name(self) -> str: """Return the name of the spectrum assignment algorithm.""" return "first_fit" @property def supports_multiband(self) -> bool: """Indicate whether this algorithm supports multi-band assignment.""" return True
[docs] def assign(self, path: list[Any], request: Any) -> dict[str, Any] | None: """ Assign spectrum resources along the given path for the request. :param path: List of nodes representing the path :param request: Request object containing traffic demand and spectrum requirements :return: Dictionary containing spectrum assignment details or None if assignment fails :raises ValueError: If path is empty or request is invalid """ # Validate inputs if not path: raise ValueError("Path cannot be empty") if request is None: raise ValueError("Request cannot be None") # Store path and request info self.spectrum_props.path_list = path # Get slots needed for the request if hasattr(request, "slots_needed"): self.spectrum_props.slots_needed = request.slots_needed elif hasattr(request, "bandwidth"): # Calculate slots needed based on bandwidth self.spectrum_props.slots_needed = self._calculate_slots_needed(request.bandwidth) else: self.spectrum_props.slots_needed = 1 # Reset assignment state self.spectrum_props.is_free = False self.spectrum_props.start_slot = None self.spectrum_props.end_slot = None self.spectrum_props.core_number = None self.spectrum_props.current_band = None # Try first fit allocation success = self._find_first_fit() if success: self._assignments_made += 1 self._total_slots_assigned += self.spectrum_props.slots_needed return { "start_slot": self.spectrum_props.start_slot, "end_slot": self.spectrum_props.end_slot, "core_number": self.spectrum_props.core_number, "band": self.spectrum_props.current_band, "is_free": self.spectrum_props.is_free, "slots_needed": self.spectrum_props.slots_needed, } return None
def _calculate_slots_needed(self, bandwidth: float) -> int: """ Calculate number of slots needed for given bandwidth. :param bandwidth: Required bandwidth in bps :return: Number of spectrum slots required :note: This is a simplified calculation. In practice, this would depend on modulation format, guard bands, forward error correction overhead, etc. """ # This is a simplified calculation - in reality this would depend on # modulation format, guard bands, etc. slots_per_gbps = self.engine_props.get("slots_per_gbps", 1) return int(np.ceil(bandwidth * slots_per_gbps)) def _find_first_fit(self) -> bool: """ Find first available spectrum slots using first fit strategy. :return: True if spectrum assignment successful, False otherwise Updates: spectrum_props: Updates start_slot, end_slot, core_number, current_band, is_free """ # Set up cores and bands to check core_matrix = [] if self.spectrum_props.forced_core is not None: core_list = [self.spectrum_props.forced_core] else: core_list = list(range(0, self.engine_props.get("cores_per_link", 1))) if self.spectrum_props.forced_band is not None: band_list = [self.spectrum_props.forced_band] else: band_list = self.engine_props.get("band_list", ["c"]) # Build core matrix for spectrum search for curr_core in core_list: band_cores = [] for band in band_list: cores_matrix = getattr(self.spectrum_props, "cores_matrix", None) if cores_matrix is not None and band in cores_matrix: band_cores.append(cores_matrix[band][curr_core]) if band_cores: core_matrix.append(band_cores) # Search through cores and bands for first fit for core_index, core_bands in enumerate(core_matrix): for band_index, core_array in enumerate(core_bands): band = band_list[band_index] core_num = core_list[core_index] # Find contiguous free slots if self._find_contiguous_slots(core_array, core_num, band): return True # If we have path information, try direct link search path_list = getattr(self.spectrum_props, "path_list", None) if path_list is not None and len(path_list) > 1: return self._find_first_fit_on_path() return False def _find_contiguous_slots(self, core_array: Any, core_num: int, band: str) -> bool: """ Find contiguous free slots in a core array. :param core_array: Array representing spectrum usage in a core :param core_num: Core number being checked :param band: Frequency band being checked :return: True if contiguous slots found and assigned, False otherwise """ if not hasattr(core_array, "__len__"): return False # Find all free slots (value 0) free_slots = np.where(np.array(core_array) == 0)[0] slots_needed = self.spectrum_props.slots_needed if slots_needed is None: raise ValueError("slots_needed must not be None") if len(free_slots) < slots_needed: return False # Look for contiguous slots for i in range(len(free_slots) - slots_needed + 1): start_slot = free_slots[i] required_slots = list(range(start_slot, start_slot + slots_needed)) # Check if all required slots are available if all(slot in free_slots for slot in required_slots): # Check continuity constraint is_contiguous = True for j in range(len(required_slots) - 1): if required_slots[j + 1] - required_slots[j] != 1: is_contiguous = False break if is_contiguous: self.spectrum_props.start_slot = start_slot self.spectrum_props.end_slot = start_slot + slots_needed - 1 self.spectrum_props.core_number = core_num self.spectrum_props.current_band = band self.spectrum_props.is_free = True return True return False def _find_first_fit_on_path(self) -> bool: """Find first fit spectrum assignment along the entire path.""" # Check all links in the path for spectrum availability path_list = self.spectrum_props.path_list if path_list is None: raise ValueError("path_list must not be None") for i in range(len(path_list) - 1): source = path_list[i] dest = path_list[i + 1] link_key = (source, dest) network_dict = self.sdn_props.network_spectrum_dict if network_dict is None: return False if link_key not in network_dict: return False link_dict = network_dict[link_key] # Try to find assignment for this link if self._try_cores_and_bands_for_link(link_dict): return True return False def _try_cores_and_bands_for_link(self, link_dict: dict) -> bool: """Try each core and band combination for a given link.""" for core_num in range(self.engine_props.get("cores_per_link", 1)): for band in self.engine_props.get("band_list", ["c"]): if self._try_assignment_for_core_band(link_dict, core_num, band): return True return False def _try_assignment_for_core_band(self, link_dict: dict, core_num: int, band: str) -> bool: """Try spectrum assignment for specific core and band.""" if band not in link_dict["cores_matrix"]: return False core_array = link_dict["cores_matrix"][band][core_num] if not self._find_contiguous_slots(core_array, core_num, band): return False # Verify this assignment works for entire path start_slot = self.spectrum_props.start_slot end_slot = self.spectrum_props.end_slot if start_slot is None or end_slot is None: raise ValueError("start_slot and end_slot must not be None") return self._verify_path_assignment(start_slot, end_slot, core_num, band) def _verify_path_assignment(self, start_slot: int, end_slot: int, core_num: int, band: str) -> bool: """Verify spectrum assignment is available along entire path.""" path_list = self.spectrum_props.path_list if path_list is None: raise ValueError("path_list must not be None") for i in range(len(path_list) - 1): source = path_list[i] dest = path_list[i + 1] if not self.check_spectrum_availability([source, dest], start_slot, end_slot, core_num, band): return False return True
[docs] def check_spectrum_availability(self, path: list[Any], start_slot: int, end_slot: int, core_num: int, band: str) -> bool: """ Check if spectrum slots are available along the entire path. :param path: List of nodes representing the path :param start_slot: Starting slot index :param end_slot: Ending slot index :param core_num: Core number to check :param band: Frequency band to check :return: True if all slots are available along the path, False otherwise """ for i in range(len(path) - 1): source, dest = path[i], path[i + 1] link_key = (source, dest) network_dict = self.sdn_props.network_spectrum_dict if network_dict is None: return False if link_key not in network_dict: return False link_dict = network_dict[link_key] if band not in link_dict["cores_matrix"] or core_num >= len(link_dict["cores_matrix"][band]): return False core_array = link_dict["cores_matrix"][band][core_num] # Check if all required slots are free for slot in range(start_slot, end_slot + 1): if slot >= len(core_array) or core_array[slot] != 0: return False return True
[docs] def allocate_spectrum( self, path: list[Any], start_slot: int, end_slot: int, core_num: int, band: str, request_id: Any, ) -> bool: """ Allocate spectrum resources along the path. :param path: List of nodes representing the path :param start_slot: Starting slot index to allocate :param end_slot: Ending slot index to allocate :param core_num: Core number to allocate on :param band: Frequency band to allocate in :param request_id: Unique identifier for the request :return: True if allocation successful, False otherwise """ for i in range(len(path) - 1): source, dest = path[i], path[i + 1] link_key = (source, dest) network_dict = self.sdn_props.network_spectrum_dict if network_dict is None: return False if link_key not in network_dict: return False link_dict = network_dict[link_key] core_array = link_dict["cores_matrix"][band][core_num] # Allocate slots with request ID for slot in range(start_slot, end_slot + 1): core_array[slot] = request_id return True
[docs] def deallocate_spectrum(self, path: list[Any], start_slot: int, end_slot: int, core_num: int, band: str) -> bool: """Deallocate spectrum resources along the path.""" for i in range(len(path) - 1): source, dest = path[i], path[i + 1] link_key = (source, dest) network_dict = self.sdn_props.network_spectrum_dict if network_dict is None: return False if link_key not in network_dict: return False link_dict = network_dict[link_key] core_array = link_dict["cores_matrix"][band][core_num] # Free slots by setting to 0 for slot in range(start_slot, end_slot + 1): core_array[slot] = 0 return True
[docs] def get_fragmentation_metric(self, path: list[Any]) -> float: """Calculate fragmentation metric for the given path.""" total_fragmentation = 0.0 link_count = 0 for i in range(len(path) - 1): source, dest = path[i], path[i + 1] link_key = (source, dest) network_dict = self.sdn_props.network_spectrum_dict if network_dict is not None and link_key in network_dict: link_dict = network_dict[link_key] link_fragmentation = self._calculate_link_fragmentation(link_dict) total_fragmentation += link_fragmentation link_count += 1 return total_fragmentation / link_count if link_count > 0 else 0.0
def _calculate_link_fragmentation(self, link_dict: dict) -> float: """Calculate fragmentation for a single link.""" total_free_blocks = 0 total_free_slots = 0 for band in link_dict["cores_matrix"]: for core_array in link_dict["cores_matrix"][band]: blocks, free_slots = self._analyze_core_array(core_array) total_free_blocks += blocks total_free_slots += free_slots # Fragmentation = 1 - (largest_block_size / total_free_slots) # Simplified: use number of blocks as fragmentation indicator if total_free_slots == 0: return 0.0 return 1.0 - (1.0 / total_free_blocks) if total_free_blocks > 0 else 0.0 def _analyze_core_array(self, core_array: list) -> tuple[int, int]: """Analyze core array and return free blocks and total free slots.""" free_slots = np.where(np.array(core_array) == 0)[0] if len(free_slots) == 0: return 0, 0 # Count contiguous blocks blocks = self._count_contiguous_blocks(core_array) return blocks, len(free_slots) def _count_contiguous_blocks(self, core_array: list) -> int: """Count the number of contiguous free blocks in a core array.""" blocks = 0 in_block = False for slot in core_array: if slot == 0: # Free slot if not in_block: blocks += 1 in_block = True else: # Occupied slot in_block = False return blocks
[docs] def get_metrics(self) -> dict[str, Any]: """Get spectrum assignment algorithm performance metrics.""" avg_slots = self._total_slots_assigned / self._assignments_made if self._assignments_made > 0 else 0 return { "algorithm": self.algorithm_name, "assignments_made": self._assignments_made, "total_slots_assigned": self._total_slots_assigned, "average_slots_per_assignment": avg_slots, "supports_multiband": self.supports_multiband, }
[docs] def reset(self) -> None: """Reset the spectrum assignment algorithm state.""" self._assignments_made = 0 self._total_slots_assigned = 0 self.spectrum_props = SpectrumProps()