Source code for fusion.core.sdn_controller

"""
Software-defined network controller module for managing network requests.

This module provides the main SDN controller functionality for routing and spectrum
allocation in software-defined optical networks.
"""

import copy
import time
from typing import Any

import numpy as np

from fusion.core.grooming import Grooming
from fusion.core.properties import SDNProps
from fusion.core.routing import Routing
from fusion.core.snr_measurements import SnrMeasurements
from fusion.core.spectrum_assignment import SpectrumAssignment
from fusion.modules.ml import get_ml_obs
from fusion.modules.spectrum.light_path_slicing import LightPathSlicingManager
from fusion.utils.logging_config import get_logger

logger = get_logger(__name__)


[docs] class SDNController: """ Software-defined network controller for managing network requests. This class provides functionality for routing, spectrum allocation, and resource management in software-defined optical networks. It handles request allocation, release, and various slicing strategies. :param engine_props: Engine configuration properties :type engine_props: dict[str, Any] """
[docs] def __init__(self, engine_props: dict[str, Any]) -> None: self.engine_props = engine_props self.sdn_props = SDNProps() self.ai_obj = None self.route_obj = Routing(engine_props=self.engine_props, sdn_props=self.sdn_props) self.spectrum_obj = SpectrumAssignment( engine_props=self.engine_props, sdn_props=self.sdn_props, route_props=self.route_obj.route_props, ) self.slicing_manager = LightPathSlicingManager( engine_props=self.engine_props, sdn_props=self.sdn_props, spectrum_obj=self.spectrum_obj, ) self.grooming_obj = Grooming(engine_props=self.engine_props, sdn_props=self.sdn_props) self.stats = None # FailureManager reference for path feasibility checking # (set by SimulationEngine) self.failure_manager: Any | None = None
def _update_throughput(self) -> None: """ Update throughput statistics for the current request. Should be called once per request release, before releasing individual lightpaths. This prevents overcounting throughput when multiple lightpaths are used for a single groomed request. """ try: if ( self.sdn_props.depart is None or self.sdn_props.arrive is None or self.sdn_props.bandwidth is None or self.sdn_props.path_list is None or self.sdn_props.network_spectrum_dict is None ): logger.warning("Missing data for throughput calculation") return duration = self.sdn_props.depart - self.sdn_props.arrive # seconds bandwidth = int(self.sdn_props.bandwidth) # Gbps data_transferred = bandwidth * duration # Gbps·s for source, dest in zip(self.sdn_props.path_list, self.sdn_props.path_list[1:], strict=False): self.sdn_props.network_spectrum_dict[(source, dest)]["throughput"] += data_transferred self.sdn_props.network_spectrum_dict[(dest, source)]["throughput"] += data_transferred except (TypeError, ValueError) as e: logger.warning("Throughput update skipped: %s", e)
[docs] def release(self, lightpath_id: int | None = None, slicing_flag: bool = False, skip_validation: bool = False) -> None: """ Remove a previously allocated request from the network. :param lightpath_id: Specific lightpath ID to release (for grooming) :type lightpath_id: int | None :param slicing_flag: If True, only release spectrum, not transponders :type slicing_flag: bool """ if lightpath_id is None: raise ValueError("Lightpath ID is none") if self.sdn_props.path_list is None: raise ValueError("Path list is not initialized") for source, dest in zip(self.sdn_props.path_list, self.sdn_props.path_list[1:], strict=False): for band in self.engine_props["band_list"]: for core_num in range(self.engine_props["cores_per_link"]): if self.sdn_props.network_spectrum_dict is None: continue core_array = self.sdn_props.network_spectrum_dict[(source, dest)]["cores_matrix"][band][core_num] request_id_indices = np.where(core_array == lightpath_id) guard_band_indices = np.where(core_array == (lightpath_id * -1)) for request_index in request_id_indices[0]: self.sdn_props.network_spectrum_dict[(source, dest)]["cores_matrix"][band][core_num][request_index] = 0 self.sdn_props.network_spectrum_dict[(dest, source)]["cores_matrix"][band][core_num][request_index] = 0 for guard_band_index in guard_band_indices[0]: self.sdn_props.network_spectrum_dict[(source, dest)]["cores_matrix"][band][core_num][guard_band_index] = 0 self.sdn_props.network_spectrum_dict[(dest, source)]["cores_matrix"][band][core_num][guard_band_index] = 0 # Handle grooming-specific cleanup and dynamic slicing bandwidth tracking if lightpath_id is not None: self._release_lightpath_resources(lightpath_id, skip_validation=skip_validation)
def _release_lightpath_resources(self, lightpath_id: int, skip_validation: bool = False) -> None: """ Release transponders and update lightpath status dict. :param lightpath_id: ID of lightpath to release :type lightpath_id: int :param skip_validation: Skip validation checks (for rollback scenarios) :type skip_validation: bool """ # Early return only if path_list or lightpath_status_dict are None # (transponder_usage_dict is optional and only needed for transponder tracking) if self.sdn_props.path_list is None or self.sdn_props.lightpath_status_dict is None: return # Always update transponders # for node in [self.sdn_props.source, self.sdn_props.destination]: # if node not in self.sdn_props.transponder_usage_dict: # logger.warning("Node %s not in transponder usage dict", node) # continue # self.sdn_props.transponder_usage_dict[node]["available_transponder"] += 1 # Update transponders if tracking is enabled and dict is available if self.engine_props.get("transponder_usage_per_node", None) and self.sdn_props.transponder_usage_dict is not None: for node in [self.sdn_props.source, self.sdn_props.destination]: if node not in self.sdn_props.transponder_usage_dict: raise KeyError(f"Node '{node}' not found in transponder usage dictionary.") self.sdn_props.transponder_usage_dict[node]["available_transponder"] += 1 light_id = tuple(sorted([self.sdn_props.path_list[0], self.sdn_props.path_list[-1]])) # Check if light_id exists if light_id not in self.sdn_props.lightpath_status_dict: return # Handle lightpath status dict if light_id in self.sdn_props.lightpath_status_dict and lightpath_id in self.sdn_props.lightpath_status_dict[light_id]: # Calculate bandwidth utilization stats try: if self.sdn_props.lp_bw_utilization_dict is None: return lp_status = self.sdn_props.lightpath_status_dict[light_id][lightpath_id] average_bw_usage = 0.0 # Note: average_bandwidth_usage may not exist yet # Skip if not available try: from fusion.utils.network import ( # type: ignore[attr-defined] average_bandwidth_usage, ) if self.sdn_props.depart is not None: average_bw_usage = average_bandwidth_usage( bw_dict=lp_status["time_bw_usage"], departure_time=self.sdn_props.depart, ) except (ImportError, AttributeError): pass # Only record utilization for normal releases, not SNR rollbacks # (rolled-back lightpaths never served traffic, so no utilization stats) if not skip_validation: self.sdn_props.lp_bw_utilization_dict.update( { lightpath_id: { "band": lp_status["band"], "core": lp_status["core"], "bit_rate": lp_status["lightpath_bandwidth"], "utilization": average_bw_usage, } } ) except (TypeError, ValueError, KeyError) as e: logger.warning("Average BW update skipped: %s", e) # Grooming validation - ensure no active requests (skip during rollback) if ( not skip_validation and self.sdn_props.lightpath_status_dict[light_id][lightpath_id]["requests_dict"] and self.engine_props["is_grooming_enabled"] ): raise ValueError(f"Lightpath {lightpath_id} still has active requests") # Remove from status dict self.sdn_props.lightpath_status_dict[light_id].pop(lightpath_id) def _allocate_guard_band( self, band: str, core_matrix: dict[str, Any], reverse_core_matrix: dict[str, Any], core_num: int, end_slot: int, lightpath_id: int, ) -> None: """ Allocate guard band slots for spectrum isolation. :param band: Spectral band identifier :type band: str :param core_matrix: Core matrix for forward direction :type core_matrix: list[Any] :param reverse_core_matrix: Core matrix for reverse direction :type reverse_core_matrix: list[Any] :param core_num: Core number to allocate on :type core_num: int :param end_slot: End slot position for guard band :type end_slot: int :param lightpath_id: Lightpath ID to use for guard band marking :type lightpath_id: int :raises BufferError: If attempting to allocate already taken spectrum """ if core_matrix[band][core_num][end_slot] != 0.0 or reverse_core_matrix[band][core_num][end_slot] != 0.0: raise BufferError("Attempted to allocate a taken spectrum.") # Use lightpath_id with negative sign for guard bands core_matrix[band][core_num][end_slot] = lightpath_id * -1 reverse_core_matrix[band][core_num][end_slot] = lightpath_id * -1 def _allocate_on_path(self, path: list[int]) -> None: """ Allocate spectrum on a specific path (bidirectionally). This helper method contains the core allocation logic for a single path. It validates spectrum availability, updates usage counts, allocates slots on both forward and reverse links, and handles guard bands. :param path: List of node IDs representing the path :type path: list[int] :raises BufferError: If attempting to allocate already taken spectrum :raises ValueError: If no spectrum is detected during allocation """ # Get allocation parameters from spectrum_props start_slot = self.spectrum_obj.spectrum_props.start_slot end_slot = self.spectrum_obj.spectrum_props.end_slot core_num = self.spectrum_obj.spectrum_props.core_number band = self.spectrum_obj.spectrum_props.current_band lightpath_id = self.spectrum_obj.spectrum_props.lightpath_id # Lightpath ID must always be set before allocation if lightpath_id is None: raise ValueError( "lightpath_id must be set in spectrum_props before calling allocate(). " "This is a programming error - ensure get_lightpath_id() is called " "and assigned to spectrum_props.lightpath_id before allocation." ) # Validate all required parameters are present if start_slot is None or end_slot is None or core_num is None or band is None: raise ValueError("Missing required spectrum allocation parameters") if self.sdn_props.network_spectrum_dict is None: raise ValueError("Network spectrum dictionary is None") logger.debug(f"Attempting allocation on path {path}: slots [{start_slot}:{end_slot}], band={band}, core={core_num}") # Guard slot adjustment if self.engine_props["guard_slots"] != 0: end_slot = end_slot - 1 else: end_slot = end_slot + 1 # Allocate on each link in the path for link_tuple in zip(path, path[1:], strict=False): link_dict = self.sdn_props.network_spectrum_dict[(link_tuple[0], link_tuple[1])] reverse_link_dict = self.sdn_props.network_spectrum_dict[(link_tuple[1], link_tuple[0])] # Validate spectrum is free on both directions spectrum_slots_set = set(link_dict["cores_matrix"][band][core_num][start_slot:end_slot]) reverse_spectrum_slots_set = set(reverse_link_dict["cores_matrix"][band][core_num][start_slot:end_slot]) if spectrum_slots_set == {} or reverse_spectrum_slots_set == {}: raise ValueError("Nothing detected on the spectrum when allocating.") if spectrum_slots_set != {0.0} or reverse_spectrum_slots_set != {0.0}: logger.error( f"Spectrum already taken on link {link_tuple}: " f"forward={spectrum_slots_set}, reverse={reverse_spectrum_slots_set}. " f"Path: {path}, slots [{start_slot}:{end_slot}], band={band}, core={core_num}" ) raise BufferError("Attempted to allocate a taken spectrum.") # Update usage counts if link_tuple in self.sdn_props.network_spectrum_dict: self.sdn_props.network_spectrum_dict[link_tuple]["usage_count"] += 1 self.sdn_props.network_spectrum_dict[(link_tuple[1], link_tuple[0])]["usage_count"] += 1 # Allocate spectrum on both directions core_matrix = link_dict["cores_matrix"] reverse_core_matrix = reverse_link_dict["cores_matrix"] # Use lightpath_id instead of request_id core_matrix[band][core_num][start_slot:end_slot] = lightpath_id reverse_core_matrix[band][core_num][start_slot:end_slot] = lightpath_id # Handle guard bands if self.engine_props["guard_slots"]: self._allocate_guard_band( band=band, core_matrix=core_matrix, reverse_core_matrix=reverse_core_matrix, core_num=core_num, end_slot=end_slot, lightpath_id=lightpath_id, )
[docs] def allocate(self) -> None: """ Allocate spectrum resources for a network request. Assigns spectrum slots to the current request across all links in the path, including guard bands if configured. For 1+1 protected requests, allocates on both primary and backup paths. :raises BufferError: If attempting to allocate already taken spectrum :raises ValueError: If no spectrum is detected during allocation """ if ( self.spectrum_obj.spectrum_props.start_slot is None or self.spectrum_obj.spectrum_props.end_slot is None or self.spectrum_obj.spectrum_props.core_number is None or self.spectrum_obj.spectrum_props.current_band is None or self.sdn_props.path_list is None or self.sdn_props.network_spectrum_dict is None ): raise ValueError("Missing required spectrum or path information") # Allocate on primary path self._allocate_on_path(self.sdn_props.path_list) # If protected, also allocate on backup path backup_path = self.spectrum_obj.spectrum_props.backup_path if backup_path is not None: logger.debug(f"1+1 protection: Allocating spectrum on backup path {backup_path}") self._allocate_on_path(backup_path) # Track allocated request for failure handling self._track_allocated_request()
def _track_allocated_request(self) -> None: """ Track an allocated request for failure handling. Records request information including paths, protection status, and timing to enable proper handling when failures occur. """ if self.sdn_props.request_id is None: return # Get backup path if this is a protected request backup_path = self.spectrum_obj.spectrum_props.backup_path # Store request information self.sdn_props.allocated_requests[self.sdn_props.request_id] = { "request_id": self.sdn_props.request_id, "primary_path": self.sdn_props.path_list.copy() if self.sdn_props.path_list else None, "backup_path": backup_path.copy() if backup_path else None, "is_protected": self.sdn_props.is_protected, "active_path": self.sdn_props.active_path, "arrive_time": self.sdn_props.arrive, "depart_time": self.sdn_props.depart, "bandwidth": self.sdn_props.bandwidth, "source": self.sdn_props.source, "destination": self.sdn_props.destination, "start_slot": self.spectrum_obj.spectrum_props.start_slot, "end_slot": self.spectrum_obj.spectrum_props.end_slot, "core_number": self.spectrum_obj.spectrum_props.core_number, "current_band": self.spectrum_obj.spectrum_props.current_band, } logger.debug( f"Tracked allocated request {self.sdn_props.request_id}: " f"protected={self.sdn_props.is_protected}, " f"primary={self.sdn_props.path_list}, backup={backup_path}" ) def _update_request_statistics(self, bandwidth: float | None) -> None: """ Update request statistics with allocation results. :param bandwidth: Allocated bandwidth for the request :type bandwidth: str """ if bandwidth is not None: self.sdn_props.bandwidth_list.append(bandwidth) for stat_key in self.sdn_props.stat_key_list: # Skip remaining_bw - it's tracked separately for grooming if stat_key == "remaining_bw": continue # Map stat_key to corresponding spectrum_props attribute spectrum_key = stat_key.split("_", maxsplit=1)[0] if spectrum_key == "crosstalk": spectrum_key = "crosstalk_cost" elif spectrum_key == "snr": spectrum_key = "crosstalk_cost" # SNR cost is same as crosstalk_cost elif spectrum_key == "xt": spectrum_key = "crosstalk_cost" # XT cost is same as crosstalk_cost elif spectrum_key == "core": spectrum_key = "core_number" elif spectrum_key == "band": spectrum_key = "current_band" elif spectrum_key == "start": spectrum_key = "start_slot" elif spectrum_key == "end": spectrum_key = "end_slot" elif spectrum_key == "modulation": spectrum_key = "modulation" elif stat_key == "lightpath_id_list": spectrum_key = "lightpath_id" elif stat_key == "lightpath_bandwidth_list": spectrum_key = "lightpath_bandwidth" self.sdn_props.update_params( key=stat_key, spectrum_key=spectrum_key, spectrum_obj=self.spectrum_obj.spectrum_props, ) # Backward compatibility alias for tests def _update_req_stats(self, bandwidth: float | None = None, remaining: str | None = None) -> None: """Legacy method name for _update_request_statistics.""" # Note: remaining parameter is accepted for compatibility but not currently used self._update_request_statistics(bandwidth) def _update_grooming_stats(self) -> None: """Update grooming statistics after request processing.""" if not hasattr(self, "stats") or self.stats is None: return if not hasattr(self.stats, "grooming_stats") or self.stats.grooming_stats is None: return # Determine grooming outcome was_groomed = getattr(self.sdn_props, "was_groomed", False) or False was_partially_groomed = getattr(self.sdn_props, "was_partially_groomed", False) # Get bandwidth bandwidth = float(self.sdn_props.bandwidth) if self.sdn_props.bandwidth else 0.0 # Count new lightpaths established was_new_lps = getattr(self.sdn_props, "was_new_lp_established", []) new_lightpaths = len(was_new_lps) if isinstance(was_new_lps, list) else 0 self.stats.grooming_stats.update_grooming_outcome( was_groomed=was_groomed, was_partially_groomed=was_partially_groomed, bandwidth=bandwidth, new_lightpaths=new_lightpaths, ) def _handle_slicing_request( self, path_list: list[Any], path_index: int, forced_segments: int, force_slicing: bool, ) -> bool: """ Handle slicing request using the dedicated slicing manager. :param path_list: List of nodes in the routing path :type path_list: list[Any] :param path_index: Index of the current path being processed :type path_index: int :param forced_segments: Number of segments to force (-1 for auto) :type forced_segments: int :param force_slicing: Whether slicing is forced :type force_slicing: bool :return: True if slicing was successful :rtype: bool """ if self.engine_props["dynamic_lps"]: return self.slicing_manager.handle_dynamic_slicing_direct( path_list=path_list, path_index=path_index, forced_segments=forced_segments, sdn_controller=self, ) return self.slicing_manager.handle_static_slicing_direct(path_list=path_list, forced_segments=forced_segments, sdn_controller=self) def _check_snr_after_allocation(self, lightpath_id: int) -> bool: """ Recheck SNR after spectrum allocation (for grooming). :param lightpath_id: ID of newly allocated lightpath :type lightpath_id: int :return: True if SNR is acceptable, False otherwise :rtype: bool """ if not self.engine_props.get("snr_recheck", False): return True # Build lightpath info for SNR recheck new_lp_info = { "id": lightpath_id, "path": self.sdn_props.path_list, "spectrum": ( self.spectrum_obj.spectrum_props.start_slot, self.spectrum_obj.spectrum_props.end_slot, ), "core": self.spectrum_obj.spectrum_props.core_number, "band": self.spectrum_obj.spectrum_props.current_band, "mod_format": self.spectrum_obj.spectrum_props.modulation, } # Create SNR checker instance snr_checker = SnrMeasurements( engine_props_dict=self.engine_props, sdn_props=self.sdn_props, spectrum_props=copy.deepcopy(self.spectrum_obj.spectrum_props), route_props=self.route_obj.route_props, ) # Perform SNR recheck recheck_enable, violations = snr_checker.snr_recheck_after_allocation(new_lp_info) if recheck_enable: return True # SNR recheck failed - rollback the allocation logger.warning(f"SNR recheck failed for lightpath {lightpath_id} - rolling back allocation. Violations: {violations}") # Release the lightpath from the network (skip validation for rollback) self.release(lightpath_id=lightpath_id, slicing_flag=True, skip_validation=True) # Remove from tracking lists if lightpath_id in self.sdn_props.lightpath_id_list: idx = self.sdn_props.lightpath_id_list.index(lightpath_id) # Only restore remaining bandwidth for sliced/partial requests (v5 behavior) # For non-sliced requests, remaining_bw should stay None/0 since no bandwidth was served if self.sdn_props.is_sliced or self.sdn_props.was_partially_groomed: bw_to_restore = int(float(self.sdn_props.bandwidth_list[idx])) if self.sdn_props.remaining_bw is None or self.sdn_props.remaining_bw == 0: self.sdn_props.remaining_bw = bw_to_restore else: # Handle remaining_bw being str/float type current_remaining = int(float(self.sdn_props.remaining_bw)) self.sdn_props.remaining_bw = current_remaining + bw_to_restore # Remove from all tracking lists (with safety check for list length) tracking_lists: list[list] = [ self.sdn_props.lightpath_id_list, self.sdn_props.lightpath_bandwidth_list, self.sdn_props.start_slot_list, self.sdn_props.band_list, self.sdn_props.core_list, self.sdn_props.end_slot_list, self.sdn_props.xt_list, self.sdn_props.crosstalk_list, self.sdn_props.snr_list, self.sdn_props.bandwidth_list, self.sdn_props.modulation_list, ] for tracking_list in tracking_lists: if idx < len(tracking_list): tracking_list.pop(idx) # Remove from new lightpath list if lightpath_id in self.sdn_props.was_new_lp_established: self.sdn_props.was_new_lp_established.remove(lightpath_id) # Mark request as blocked self.sdn_props.was_routed = False self.sdn_props.block_reason = "snr_recheck_failed" return False def _handle_congestion_with_grooming(self, remaining_bw: int) -> None: """ Handle allocation failure with grooming rollback. If partial grooming occurred but remaining bandwidth cannot be allocated, rollback the newly created lightpaths but keep the groomed portion. :param remaining_bw: Remaining bandwidth that could not be allocated :type remaining_bw: int """ if self.sdn_props.bandwidth is not None and remaining_bw != int(self.sdn_props.bandwidth): # Rollback newly established lightpaths (skip_validation since never served traffic) was_new_lps = getattr(self.sdn_props, "was_new_lp_established", []) if isinstance(was_new_lps, list): for lpid in list(was_new_lps): self.release(lightpath_id=lpid, slicing_flag=True, skip_validation=True) was_new_lps.remove(lpid) # Remove from tracking lists lp_idx = self.sdn_props.lightpath_id_list.index(lpid) self.sdn_props.lightpath_id_list.pop(lp_idx) self.sdn_props.lightpath_bandwidth_list.pop(lp_idx) self.sdn_props.start_slot_list.pop(lp_idx) self.sdn_props.band_list.pop(lp_idx) self.sdn_props.core_list.pop(lp_idx) self.sdn_props.end_slot_list.pop(lp_idx) self.sdn_props.xt_list.pop(lp_idx) self.sdn_props.crosstalk_list.pop(lp_idx) self.sdn_props.snr_list.pop(lp_idx) self.sdn_props.bandwidth_list.pop(lp_idx) self.sdn_props.modulation_list.pop(lp_idx) self.sdn_props.number_of_transponders = 1 self.sdn_props.is_sliced = False self.sdn_props.was_partially_routed = False if getattr(self.sdn_props, "was_partially_groomed", False): if self.sdn_props.bandwidth is not None: self.sdn_props.remaining_bw = int(self.sdn_props.bandwidth) - sum(map(int, self.sdn_props.bandwidth_list)) else: if self.sdn_props.bandwidth is not None: self.sdn_props.remaining_bw = int(self.sdn_props.bandwidth) self.sdn_props.was_new_lp_established = [] def _handle_congestion(self, remaining_bandwidth: int | None = None, remaining_bw: int | None = None) -> None: """ Handle allocation failure due to network congestion. :param remaining_bandwidth: Remaining bandwidth that could not be allocated :type remaining_bandwidth: int :param remaining_bw: Legacy parameter name for remaining_bandwidth :type remaining_bw: int """ # Handle backward compatibility if remaining_bw is not None: remaining_bandwidth = remaining_bw if remaining_bandwidth is None: raise ValueError("Must provide remaining_bandwidth") self.sdn_props.was_routed = False self.sdn_props.block_reason = "congestion" # Reset to 0 (not 1) - when congestion happens, no transponders are committed # Previously = 1 caused over-counting when slicing recovered from SNR recheck failures self.sdn_props.number_of_transponders = 0 if self.sdn_props.bandwidth is not None and remaining_bandwidth != int(self.sdn_props.bandwidth): # Release all newly established lightpaths for this request (skip_validation since never served traffic) was_new_lps = getattr(self.sdn_props, "was_new_lp_established", []) if isinstance(was_new_lps, list) and self.sdn_props.path_list: for lpid in list(was_new_lps): # Remove current request from lightpath's requests_dict before releasing light_id = tuple(sorted([self.sdn_props.path_list[0], self.sdn_props.path_list[-1]])) if ( self.sdn_props.lightpath_status_dict is not None and light_id in self.sdn_props.lightpath_status_dict and lpid in self.sdn_props.lightpath_status_dict[light_id] ): lp_info = self.sdn_props.lightpath_status_dict[light_id][lpid] if self.sdn_props.request_id in lp_info["requests_dict"]: # Restore remaining bandwidth before removing request allocated_bw = lp_info["requests_dict"][self.sdn_props.request_id] lp_info["remaining_bandwidth"] += allocated_bw lp_info["requests_dict"].pop(self.sdn_props.request_id) self.release(lightpath_id=lpid, slicing_flag=True, skip_validation=True) was_new_lps.remove(lpid) # Remove from tracking lists if lpid in self.sdn_props.lightpath_id_list: lp_idx = self.sdn_props.lightpath_id_list.index(lpid) self.sdn_props.lightpath_id_list.pop(lp_idx) self.sdn_props.lightpath_bandwidth_list.pop(lp_idx) self.sdn_props.start_slot_list.pop(lp_idx) self.sdn_props.band_list.pop(lp_idx) self.sdn_props.core_list.pop(lp_idx) self.sdn_props.end_slot_list.pop(lp_idx) self.sdn_props.xt_list.pop(lp_idx) self.sdn_props.crosstalk_list.pop(lp_idx) self.sdn_props.snr_list.pop(lp_idx) self.sdn_props.bandwidth_list.pop(lp_idx) self.sdn_props.modulation_list.pop(lp_idx) self.sdn_props.is_sliced = False self.sdn_props.was_partially_routed = False # Reset remaining_bw to match v5 behavior if self.sdn_props.bandwidth is not None: if self.sdn_props.was_partially_groomed: self.sdn_props.remaining_bw = int(self.sdn_props.bandwidth) - sum(map(int, self.sdn_props.bandwidth_list)) else: self.sdn_props.remaining_bw = int(self.sdn_props.bandwidth) self.sdn_props.was_new_lp_established = [] def _initialize_request_statistics(self) -> None: """Initialize request statistics for a new request.""" self.sdn_props.bandwidth_list = [] self.sdn_props.reset_params() def _setup_routing(self, force_route_matrix: list[Any] | None, force_mod_format: str | list[str] | None) -> tuple[list[Any], float]: """ Setup routing for the request. :param force_route_matrix: Optional forced routing matrix :type force_route_matrix: list[Any] | None :param force_mod_format: Optional forced modulation format (single or list) :type force_mod_format: str | list[str] | None :return: Tuple of (route_matrix, route_time) :rtype: tuple[list[Any], float] """ start_time = time.time() if force_route_matrix is None: self.route_obj.get_route() route_matrix = self.route_obj.route_props.paths_matrix else: route_matrix = force_route_matrix if force_mod_format: # Handle both string and list formats for force_mod_format formats_matrix: list[list[str | bool]] if isinstance(force_mod_format, list): formats_matrix = [force_mod_format] # type: ignore[list-item] else: formats_matrix = [[force_mod_format]] else: formats_matrix = [[]] self.route_obj.route_props.modulation_formats_matrix = formats_matrix # For partially groomed requests, use path_weight from groomed lightpath # This ensures new lightpaths on the same path get the correct weight if getattr(self.sdn_props, "was_partially_groomed", False) and self.sdn_props.path_weight is not None: self.route_obj.route_props.weights_list = [self.sdn_props.path_weight] else: self.route_obj.route_props.weights_list = [0.0] route_time = time.time() - start_time return route_matrix, route_time def _get_ml_prediction(self, ml_model: Any | None, request_dict: dict[str, Any]) -> float: """ Get ML model prediction for forced segments. :param ml_model: Optional machine learning model :type ml_model: Any | None :param request_dict: Request dictionary :type request_dict: dict[str, Any] :return: Forced segments prediction (-1 for auto) :rtype: float """ if ml_model is not None: input_df = get_ml_obs( request_dict=request_dict, engine_properties=self.engine_props, sdn_properties=self.sdn_props, ) prediction = ml_model.predict(input_df)[0] return float(prediction) return -1.0 def _process_single_path( self, path_list: list[Any], path_index: int, mod_format_list: list[str], forced_segments: float, force_slicing: bool, segment_slicing: bool, forced_index: int | None, force_core: int | None, forced_band: str | None, backup_mod_format_list: list[str] | None = None, ) -> bool: """ Process allocation for a single path. :param path_list: List of nodes in the routing path :type path_list: list[Any] :param path_index: Index of the current path :type path_index: int :param mod_format_list: List of modulation formats for primary path :type mod_format_list: list[str] :param forced_segments: Number of forced segments :type forced_segments: float :param force_slicing: Whether to force slicing :type force_slicing: bool :param segment_slicing: Whether segment slicing is enabled :type segment_slicing: bool :param forced_index: Optional forced spectrum index :type forced_index: int | None :param force_core: Optional forced core number :type force_core: int | None :param forced_band: Optional forced band :type forced_band: str | None :param backup_mod_format_list: Optional modulation formats for backup path (1+1) :type backup_mod_format_list: list[str] | None :return: True if path processing was successful :rtype: bool """ self.sdn_props.path_list = path_list self.sdn_props.path_index = path_index # Handle slicing scenarios # Match v5 behavior: set force_slicing=True when segment_slicing is enabled if segment_slicing or force_slicing or forced_segments > 1: force_slicing = True success = self._handle_slicing_request(path_list, path_index, int(forced_segments), force_slicing) if not success: self.sdn_props.number_of_transponders = 1 return False else: # Handle standard allocation self.spectrum_obj.spectrum_props.forced_index = forced_index self.spectrum_obj.spectrum_props.forced_core = force_core self.spectrum_obj.spectrum_props.path_list = path_list self.spectrum_obj.spectrum_props.forced_band = forced_band self.spectrum_obj.get_spectrum( mod_format_list=mod_format_list, backup_mod_format_list=backup_mod_format_list, ) if self.spectrum_obj.spectrum_props.is_free is not True: self.sdn_props.block_reason = "congestion" return False # Generate and assign unique lightpath ID for this allocation lp_id = self.sdn_props.get_lightpath_id() self.spectrum_obj.spectrum_props.lightpath_id = lp_id self.sdn_props.was_new_lp_established.append(lp_id) # Determine bandwidth for statistics and lightpath (handle partial grooming) allocate_bandwidth: float | None if self.sdn_props.was_partially_groomed: # For partial grooming, allocate_bandwidth is the actual bandwidth used for statistics allocate_bandwidth = float(self.sdn_props.remaining_bw) if self.sdn_props.remaining_bw else None else: allocate_bandwidth = self.sdn_props.bandwidth # Always use original request bandwidth for lightpath (matches v5 behavior) # This ensures lightpath is tracked in the correct bandwidth tier for modulation statistics lightpath_bandwidth = self.sdn_props.bandwidth # Set lightpath bandwidth only if not already set by slicing code if ( not hasattr(self.spectrum_obj.spectrum_props, "lightpath_bandwidth") or self.spectrum_obj.spectrum_props.lightpath_bandwidth is None ): self.spectrum_obj.spectrum_props.lightpath_bandwidth = lightpath_bandwidth self._update_request_statistics(bandwidth=allocate_bandwidth) # Create lightpath entry in status dict for grooming/tracking self.spectrum_obj._update_lightpath_status() return True def _finalize_successful_allocation( self, path_index: int, route_time: float, force_slicing: bool, segment_slicing: bool, ) -> bool: """ Finalize a successful allocation. :param path_index: Index of the successful path :type path_index: int :param route_time: Time taken for routing :type route_time: float :param force_slicing: Whether slicing was forced :type force_slicing: bool :param segment_slicing: Whether segment slicing was used :type segment_slicing: bool :return: True if finalization succeeded (including SNR recheck), False otherwise :rtype: bool """ self.sdn_props.was_routed = True self.sdn_props.route_time = route_time self.sdn_props.path_weight = self.route_obj.route_props.weights_list[path_index] self.sdn_props.spectrum_object = self.spectrum_obj.spectrum_props if not segment_slicing and not force_slicing: # Set is_sliced=True for partial grooming (v5 behavior) if self.sdn_props.was_partially_groomed: self.sdn_props.is_sliced = True self.sdn_props.remaining_bw = 0 # Request fully served else: self.sdn_props.is_sliced = False self.allocate() # Check SNR after allocation for newly created lightpaths # For sliced requests, SNR recheck is done in the slicing loop (v5 behavior) # to prevent orphaned allocations. Only do finalize SNR recheck for non-sliced. if not segment_slicing and not force_slicing: if self.sdn_props.was_new_lp_established: for lp_id in self.sdn_props.was_new_lp_established: if not self._check_snr_after_allocation(lp_id): # SNR recheck failed - allocation was rolled back return False # Update grooming statistics if self.engine_props.get("is_grooming_enabled", False): self._update_grooming_stats() return True
[docs] def handle_event( self, request_dict: dict[str, Any], request_type: str, force_slicing: bool = False, force_route_matrix: list[Any] | None = None, forced_index: int | None = None, force_core: int | None = None, ml_model: Any | None = None, force_mod_format: str | list[str] | None = None, forced_band: str | None = None, ) -> None: """ Handle any event that occurs in the simulation. Controls the main flow of request processing including routing, spectrum allocation, and various slicing strategies. :param request_dict: Request dictionary containing request parameters :type request_dict: dict[str, Any] :param request_type: Type of request ('arrival' or 'release') :type request_type: str :param force_slicing: Whether to force light path segment slicing :type force_slicing: bool :param force_route_matrix: Optional forced routing matrix :type force_route_matrix: list[Any] | None :param forced_index: Optional forced start index for spectrum allocation :type forced_index: int | None :param force_mod_format: Optional forced modulation format :type force_mod_format: str | None :param force_core: Optional forced core number :type force_core: int | None :param ml_model: Optional machine learning model for predictions :type ml_model: Any | None :param forced_band: Optional forced spectral band :type forced_band: str | None """ # Handle release requests if request_type == "release": lightpath_id_list: list[int | None] = [] if self.engine_props.get("is_grooming_enabled", False): groom_result = self.grooming_obj.handle_grooming(request_type) if isinstance(groom_result, list): # Convert list[int] to list[int | None] lightpath_id_list = list(groom_result) else: # Convert list[int] to list[int | None] lightpath_id_list = list(self.sdn_props.lightpath_id_list) # Update throughput once per request, then release each lightpath self._update_throughput() for lightpath_id in lightpath_id_list: self.release(lightpath_id=lightpath_id) return self._initialize_request_statistics() self.sdn_props.number_of_transponders = 1 # Try grooming first if enabled if self.engine_props.get("is_grooming_enabled", False): # Set lightpath status dict for grooming object if hasattr(self.grooming_obj, "lightpath_status_dict"): self.grooming_obj.lightpath_status_dict = self.sdn_props.lightpath_status_dict groom_result = self.grooming_obj.handle_grooming(request_type) if groom_result: # Fully groomed - done! self._update_grooming_stats() return # Not groomed or partially groomed self.sdn_props.was_new_lp_established = [] if getattr(self.sdn_props, "was_partially_groomed", False): # Force route on same path as groomed portion force_route_matrix = [self.sdn_props.path_list] # Get modulation formats for remaining bandwidth from fusion.utils.data import sort_nested_dict_values if self.sdn_props.modulation_formats_dict is not None: mod_formats_dict = sort_nested_dict_values( original_dict=self.sdn_props.modulation_formats_dict, nested_key="max_length", ) # Use all modulation formats sorted by max_length (v5 behavior) force_mod_format = list(mod_formats_dict.keys()) # Setup routing route_matrix, route_time = self._setup_routing(force_route_matrix, force_mod_format) # Get ML prediction if available forced_segments = self._get_ml_prediction(ml_model, request_dict) # Try allocation with different strategies segment_slicing = False while True: for path_index, path_list in enumerate(route_matrix): if path_list is not False: # Check path feasibility if failures are active if self.failure_manager and not self.failure_manager.is_path_feasible(path_list): logger.debug(f"Path {path_list} (index {path_index}) is infeasible due to active failures") continue # Skip this path and try next one # Check backup path feasibility for protected requests # Get corresponding backup path from backup_paths_matrix if available backup_path = None if ( hasattr(self.route_obj.route_props, "backup_paths_matrix") and len(self.route_obj.route_props.backup_paths_matrix) > path_index ): backup_path = self.route_obj.route_props.backup_paths_matrix[path_index] elif self.sdn_props.backup_path is not None: # Fallback to sdn_props.backup_path for backward compatibility backup_path = self.sdn_props.backup_path if self.failure_manager and backup_path is not None and not self.failure_manager.is_path_feasible(backup_path): logger.debug(f"Backup path {backup_path} (for primary {path_list}) is infeasible due to active failures") continue # Skip this path pair and try next one # Set backup path in sdn_props for spectrum assignment backup_mod_format_list = None if backup_path is not None: self.sdn_props.backup_path = backup_path self.sdn_props.is_protected = True # Get backup modulation formats for 1+1 protection validation if ( hasattr( self.route_obj.route_props, "backup_modulation_formats_matrix", ) and len(self.route_obj.route_props.backup_modulation_formats_matrix) > path_index ): backup_mod_format_list = self.route_obj.route_props.backup_modulation_formats_matrix[path_index] mod_format_list = self.route_obj.route_props.modulation_formats_matrix[path_index] # Match v5: set force_slicing=True when segment_slicing or forced_segments > 1 if segment_slicing or force_slicing or forced_segments > 1: force_slicing = True # Set path_weight for this path attempt BEFORE processing # This ensures any lightpaths created during this attempt (even if it fails # but is accepted via partial serving) will store the correct path_weight self.sdn_props.path_weight = self.route_obj.route_props.weights_list[path_index] # Process the path success = self._process_single_path( path_list, path_index, mod_format_list, # type: ignore[arg-type] forced_segments, force_slicing, segment_slicing, forced_index, force_core, forced_band, backup_mod_format_list, # type: ignore[arg-type] ) if success: # Try to finalize - includes SNR recheck finalize_success = self._finalize_successful_allocation(path_index, route_time, force_slicing, segment_slicing) if finalize_success: return # If finalize failed (SNR recheck), continue to next path # Try segment slicing if not already tried if self.engine_props["max_segments"] > 1 and self.sdn_props.bandwidth != "25" and not segment_slicing: segment_slicing = True continue # All paths exhausted self.sdn_props.block_reason = "distance" self.sdn_props.was_routed = False # FEATURE: Support partial serving (v5 behavior) # If can_partially_serve is enabled and SOME bandwidth was allocated, accept partial service if self.engine_props.get("can_partially_serve", False): # Check if any bandwidth was allocated (either through grooming or new lightpaths) has_allocated_bandwidth = ( len(self.sdn_props.lightpath_id_list) > 0 or len(getattr(self.sdn_props, "was_new_lp_established", [])) > 0 ) if has_allocated_bandwidth: # Check if this is a partially groomed request OR the last path attempt if ( getattr(self.sdn_props, "was_partially_groomed", False) or self.sdn_props.path_index >= self.engine_props.get("k_paths", 1) - 1 ): # Accept partial service - mark as successfully routed self.sdn_props.was_routed = True self.sdn_props.was_partially_routed = True self.sdn_props.is_sliced = True return # Success! # CRITICAL FIX: Release groomed bandwidth if request was partially groomed but blocked if getattr(self.sdn_props, "was_partially_groomed", False): if self.sdn_props.lightpath_id_list and self.sdn_props.source is not None and self.sdn_props.destination is not None: # Release groomed bandwidth from all lightpaths light_id = tuple(sorted([self.sdn_props.source, self.sdn_props.destination])) lp_status = self.sdn_props.lightpath_status_dict for lp_id in self.sdn_props.lightpath_id_list[:]: if lp_status is not None and light_id in lp_status: if lp_id in lp_status[light_id]: lp_info = lp_status[light_id][lp_id] if self.sdn_props.request_id in lp_info["requests_dict"]: # Get allocated bandwidth and release it req_bw = lp_info["requests_dict"][self.sdn_props.request_id] lp_info["requests_dict"].pop(self.sdn_props.request_id) lp_info["remaining_bandwidth"] += req_bw # Clear the lightpath tracking lists self.sdn_props.lightpath_id_list = [] self.sdn_props.lightpath_bandwidth_list = [] return
# Backward compatibility methods for tests def _allocate_slicing(self, num_segments: int, mod_format: str, path_list: list[Any], bandwidth: str) -> None: """ Backward compatibility wrapper for allocate_slicing method. :param num_segments: Number of segments to allocate :type num_segments: int :param mod_format: Modulation format to use :type mod_format: str :param path_list: List of nodes in the routing path :type path_list: list[Any] :param bandwidth: Bandwidth requirement for each segment :type bandwidth: str """ self.slicing_manager.allocate_slicing_direct( num_segments=num_segments, mod_format=mod_format, path_list=path_list, bandwidth=bandwidth, sdn_controller=self, ) def _handle_dynamic_slicing(self, path_list: list[Any], path_index: int, forced_segments: int) -> None: """ Backward compatibility wrapper for handle_dynamic_slicing method. :param path_list: List of nodes in the routing path :type path_list: list[Any] :param path_index: Index of the current path being processed :type path_index: int :param forced_segments: Number of forced segments (unused) :type forced_segments: int """ self.slicing_manager.handle_dynamic_slicing_direct( path_list=path_list, path_index=path_index, forced_segments=forced_segments, sdn_controller=self, )