Source code for fusion.core.orchestrator

"""
SDN Orchestrator for FUSION simulation.

This module provides SDNOrchestrator, a thin coordination layer
that routes requests through pipelines without implementing
algorithm logic.

RULES (enforced by code review):
- No algorithm logic (K-shortest-path, first-fit, SNR calculation)
- No direct numpy access
- No hardcoded slicing/grooming logic
- Receives NetworkState per call, never stores it
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from fusion.core.pipeline_factory import PipelineSet
    from fusion.domain.config import SimulationConfig
    from fusion.domain.network_state import NetworkState
    from fusion.domain.request import BlockReason, Request
    from fusion.domain.results import AllocationResult, RouteResult, SpectrumResult
    from fusion.interfaces.control_policy import ControlPolicy
    from fusion.interfaces.pipelines import (
        GroomingPipeline,
        RoutingPipeline,
        SlicingPipeline,
        SNRPipeline,
        SpectrumPipeline,
    )
    from fusion.modules.rl.adapter import RLSimulationAdapter
    from fusion.pipelines.protection_pipeline import ProtectionPipeline

logger = logging.getLogger(__name__)


[docs] class SDNOrchestrator: """ Thin coordination layer for request handling. The orchestrator sequences pipeline calls and combines their results. It does NOT implement algorithm logic - all computation is delegated to pipelines. Does NOT store network_state - receives per call only. :ivar config: Simulation configuration :ivar routing: Pipeline for finding candidate routes :ivar spectrum: Pipeline for spectrum assignment :ivar grooming: Pipeline for traffic grooming (optional) :ivar snr: Pipeline for SNR validation (optional) :ivar slicing: Pipeline for request slicing (optional) """ __slots__ = ( "config", "routing", "spectrum", "grooming", "snr", "slicing", "_failed_attempt_snr_list", "_last_path_index", "current_iteration", "_policy", "_rl_adapter", "_protection_pipeline", )
[docs] def __init__( self, config: SimulationConfig, pipelines: PipelineSet, policy: ControlPolicy | None = None, rl_adapter: RLSimulationAdapter | None = None, protection_pipeline: ProtectionPipeline | None = None, ) -> None: """ Initialize orchestrator with config and pipelines. :param config: Simulation configuration :type config: SimulationConfig :param pipelines: Container with all pipeline implementations :type pipelines: PipelineSet :param policy: Optional control policy for path selection :type policy: ControlPolicy | None :param rl_adapter: Optional RL adapter for building options :type rl_adapter: RLSimulationAdapter | None :param protection_pipeline: Optional protection pipeline :type protection_pipeline: ProtectionPipeline | None """ self.config: SimulationConfig = config self.routing: RoutingPipeline = pipelines.routing self.spectrum: SpectrumPipeline = pipelines.spectrum self.grooming: GroomingPipeline | None = pipelines.grooming self.snr: SNRPipeline | None = pipelines.snr self.slicing: SlicingPipeline | None = pipelines.slicing # Track last path_index like Legacy does (for groomed request path_index matching) self._last_path_index: int = 0 # Current iteration for debug purposes (set by simulation) self.current_iteration: int = 0 # Policy integration (all optional) self._policy: ControlPolicy | None = policy self._rl_adapter: RLSimulationAdapter | None = rl_adapter self._protection_pipeline: ProtectionPipeline | None = protection_pipeline
[docs] def handle_arrival( self, request: Request, network_state: NetworkState, forced_path: list[str] | None = None, forced_modulation: str | None = None, forced_path_index: int | None = None, ) -> AllocationResult: """ Handle request arrival by coordinating pipelines. :param request: The incoming request to process :type request: Request :param network_state: Current network state (passed per call) :type network_state: NetworkState :param forced_path: Optional forced path from external source :type forced_path: list[str] | None :param forced_modulation: Optional forced modulation format (for RL) :type forced_modulation: str | None :param forced_path_index: Optional path index for stats tracking (for RL) :type forced_path_index: int | None :return: AllocationResult with success/failure and details :rtype: AllocationResult """ # Branch for protection-enabled requests (P3.2.g) if self.config.protection_enabled and getattr(request, "protection_required", False): return self._handle_protected_arrival(request, network_state) # Standard unprotected flow return self._handle_unprotected_arrival(request, network_state, forced_path, forced_modulation, forced_path_index)
def _handle_unprotected_arrival( self, request: Request, network_state: NetworkState, forced_path: list[str] | None = None, forced_modulation: str | None = None, forced_path_index: int | None = None, ) -> AllocationResult: """Handle standard (unprotected) request arrival.""" from fusion.domain.request import BlockReason, RequestStatus from fusion.domain.results import AllocationResult groomed_lightpaths: list[int] = [] remaining_bw = request.bandwidth_gbps was_partially_groomed = False # Track SNR values from failed allocation attempts (for Legacy compatibility) # Legacy adds SNR to snr_list before SNR recheck; if recheck fails, value stays self._failed_attempt_snr_list = [] # type: list[float] # Stage 1: Grooming (if enabled) if self.grooming and self.config.grooming_enabled: groom_result = self.grooming.try_groom(request, network_state) if groom_result.fully_groomed: request.status = RequestStatus.GROOMED # CRITICAL: Add groomed lightpath IDs to request so release can return bandwidth request.lightpath_ids.extend(groom_result.lightpaths_used) # Use _last_path_index to match Legacy behavior (Legacy keeps stale path_index) return AllocationResult( success=True, is_groomed=True, lightpaths_groomed=tuple(groom_result.lightpaths_used), total_bandwidth_allocated_gbps=request.bandwidth_gbps, grooming_result=groom_result, path_index=self._last_path_index, ) if groom_result.partially_groomed: groomed_lightpaths = list(groom_result.lightpaths_used) remaining_bw = groom_result.remaining_bandwidth_gbps was_partially_groomed = True if groom_result.forced_path: forced_path = list(groom_result.forced_path) # LP capacity is determined by spectrum assignment (modulation-based capacity) # NOT by original request bandwidth. For slicing, spectrum/slicing code sets # lightpath_bandwidth based on what the modulation can support. lp_capacity_override = None # Stage 2: Routing route_result = self.routing.find_routes( request.source, request.destination, remaining_bw, network_state, forced_path=forced_path, ) if route_result.is_empty: return self._handle_failure(request, groomed_lightpaths, BlockReason.NO_PATH, network_state) # Override modulations with forced_modulation if provided (for RL) # This mimics legacy RL behavior where get_path_modulation selects ONE modulation if forced_modulation: # Create new RouteResult with just the forced modulation for each path from dataclasses import replace route_result = replace(route_result, modulations=tuple((forced_modulation,) for _ in route_result.paths)) # For partial grooming, LEGACY uses original request bandwidth for spectrum allocation # (not remaining_bw). This affects slots_needed calculation and SNR checks. spectrum_bw = request.bandwidth_gbps if was_partially_groomed else remaining_bw # Track the last path index tried (for blocked request reporting) last_path_idx = 0 # Stage 3: Try standard allocation on ALL paths first (no slicing) # This applies to ALL modes including dynamic_lps - legacy tries standard first # and only falls back to dynamic slicing if standard fails on ALL paths for loop_idx, path in enumerate(route_result.paths): # Use forced_path_index for stats tracking when provided (RL mode) path_idx = forced_path_index if forced_path_index is not None else loop_idx last_path_idx = path_idx self._last_path_index = path_idx # Track for groomed requests (Legacy behavior) modulations = route_result.modulations[loop_idx] weight_km = route_result.weights_km[loop_idx] # For partial grooming, actual_bw_to_allocate = remaining_bw (what we need to serve) # but spectrum_bw = original request (for spectrum calculation) actual_bw_to_allocate = remaining_bw if was_partially_groomed else spectrum_bw result = self._try_allocate_on_path( request, path, modulations, weight_km, spectrum_bw, network_state, allow_slicing=False, # Don't try slicing yet connection_index=route_result.connection_index, path_index=path_idx, lp_capacity_override=lp_capacity_override, actual_bw_to_allocate=actual_bw_to_allocate, num_paths=len(route_result.paths), ) if result is not None: groomed_bw = request.bandwidth_gbps - remaining_bw return self._combine_results( request, groomed_lightpaths, result, route_result, path_index=path_idx, groomed_bandwidth_gbps=groomed_bw ) # Stage 4: Try dynamic_lps slicing on ALL paths (FIXED-GRID ONLY) # For flex-grid, skip to Stage 5 which uses the slicing pipeline's # _try_allocate_dynamic method that properly handles flex-grid dynamic_lps. # Note: This uses use_dynamic_slicing=True which behaves differently from # legacy's handle_dynamic_slicing_direct, but achieves similar blocking rates. # Legacy behavior: when slicing fails on a path, _handle_congestion pops # SNR entries for rolled-back lightpaths. We replicate this by reverting # _failed_attempt_snr_list to its state before each failed path attempt. if self.config.dynamic_lps and self.config.fixed_grid: for loop_idx, path in enumerate(route_result.paths): # Use forced_path_index for stats tracking when provided (RL mode) path_idx = forced_path_index if forced_path_index is not None else loop_idx last_path_idx = path_idx self._last_path_index = path_idx # Track for groomed requests (Legacy behavior) # Record SNR count BEFORE this path's slicing attempt # If slicing fails, we'll revert to this count (Legacy behavior) snr_count_before_path = len(self._failed_attempt_snr_list) if hasattr(self, "_failed_attempt_snr_list") else 0 modulations = route_result.modulations[loop_idx] weight_km = route_result.weights_km[loop_idx] # For dynamic slicing, use remaining_bw for the slicing loop iteration # (how much bandwidth to actually serve), but spectrum_bw for spectrum # allocation (modulation/slots calculation). Legacy uses sdn_props.remaining_bw # for the slicing loop when partially groomed. slicing_target_bw = remaining_bw if was_partially_groomed else spectrum_bw # For partial grooming, actual_bw_to_allocate = remaining_bw (for stats tracking) actual_bw_to_allocate = remaining_bw if was_partially_groomed else spectrum_bw result = self._try_allocate_on_path( request, path, modulations, weight_km, spectrum_bw, network_state, allow_slicing=True, # Now try dynamic slicing slicing_only=False, # Need to run through modulations to get spectrum connection_index=route_result.connection_index, path_index=path_idx, use_dynamic_slicing=True, # Use dynamic slicing spectrum method lp_capacity_override=lp_capacity_override, slicing_target_bw=slicing_target_bw, # Actual bandwidth to serve via slicing actual_bw_to_allocate=actual_bw_to_allocate, # For stats tracking num_paths=len(route_result.paths), ) if result is not None: groomed_bw = request.bandwidth_gbps - remaining_bw return self._combine_results( request, groomed_lightpaths, result, route_result, path_index=path_idx, groomed_bandwidth_gbps=groomed_bw ) else: # Slicing failed on this path - simulate Legacy's pop(lp_idx) behavior # Legacy's _handle_congestion pops snr_list[lp_idx] for each rolled-back LP. # Since lightpath_id_list doesn't have Stage 3 entries (they were popped on # SNR recheck fail), lp_idx=0 removes the FIRST snr_list entry, which is # Stage 3's SNR. This causes Stage 3 entries to be removed before slicing # entries. We replicate by removing N entries from the START of the list, # where N = number of entries added during this path's attempt. entries_added = len(self._failed_attempt_snr_list) - snr_count_before_path if entries_added > 0 and hasattr(self, "_failed_attempt_snr_list"): self._failed_attempt_snr_list = self._failed_attempt_snr_list[entries_added:] # Stage 5: Try segment slicing pipeline on ALL paths (only if standard allocation failed) # Also handles flex-grid dynamic_lps (skipped in Stage 4) via slicing pipeline's # _try_allocate_dynamic method. # Legacy behavior: when slicing fails on a path, _handle_congestion pops # snr_list[lp_idx] for each rolled-back LP. We replicate by removing entries # from the START of the list (matching Legacy's pop(0) behavior). # IMPORTANT: Skip Stage 5 when Stage 4 already handled fixed-grid dynamic_lps. # Stage 4 handles fixed_grid + dynamic_lps, so Stage 5 should NOT also try slicing # in that case (it would cause double allocation on different code paths). stage_4_handled = self.config.dynamic_lps and self.config.fixed_grid use_slicing = ( self.slicing and not stage_4_handled and (self.config.slicing_enabled or (self.config.dynamic_lps and not self.config.fixed_grid)) ) if use_slicing: for loop_idx, path in enumerate(route_result.paths): # Use forced_path_index for stats tracking when provided (RL mode) path_idx = forced_path_index if forced_path_index is not None else loop_idx last_path_idx = path_idx self._last_path_index = path_idx # Track for groomed requests (Legacy behavior) # Record SNR count BEFORE this path's slicing attempt snr_count_before_path = len(self._failed_attempt_snr_list) if hasattr(self, "_failed_attempt_snr_list") else 0 modulations = route_result.modulations[loop_idx] weight_km = route_result.weights_km[loop_idx] # For partial grooming, actual_bw_to_allocate = remaining_bw actual_bw_to_allocate = remaining_bw if was_partially_groomed else spectrum_bw result = self._try_allocate_on_path( request, path, modulations, weight_km, spectrum_bw, network_state, allow_slicing=True, # Now try slicing slicing_only=True, # Skip standard allocation (already tried) connection_index=route_result.connection_index, path_index=path_idx, lp_capacity_override=lp_capacity_override, actual_bw_to_allocate=actual_bw_to_allocate, num_paths=len(route_result.paths), ) if result is not None: groomed_bw = request.bandwidth_gbps - remaining_bw return self._combine_results( request, groomed_lightpaths, result, route_result, path_index=path_idx, groomed_bandwidth_gbps=groomed_bw ) else: # Slicing failed on this path - simulate Legacy's pop(lp_idx) behavior # Remove N entries from the START of the list (Stage 3 entries first) entries_added = len(self._failed_attempt_snr_list) - snr_count_before_path if entries_added > 0 and hasattr(self, "_failed_attempt_snr_list"): self._failed_attempt_snr_list = self._failed_attempt_snr_list[entries_added:] # Stage 6: All paths failed return self._handle_failure(request, groomed_lightpaths, BlockReason.CONGESTION, network_state, path_index=last_path_idx) def _handle_protected_arrival( self, request: Request, network_state: NetworkState, ) -> AllocationResult: """ Handle arrival for protected (1+1) request. Implements protection pipeline integration. """ from fusion.domain.request import BlockReason, RequestStatus from fusion.domain.results import AllocationResult # Reset SNR tracking for this request (Legacy compatibility) self._failed_attempt_snr_list = [] # Stage 1: Find working path working_routes = self.routing.find_routes( request.source, request.destination, request.bandwidth_gbps, network_state, ) if working_routes.is_empty: return AllocationResult( success=False, block_reason=BlockReason.NO_PATH, route_result=working_routes, ) # Check if routing returned backup paths (protection-aware routing) if not working_routes.has_protection: return AllocationResult( success=False, block_reason=BlockReason.PROTECTION_FAIL, route_result=working_routes, ) # Try each working/backup path pair for idx, working_path in enumerate(working_routes.paths): backup_path = working_routes.backup_paths[idx] if working_routes.backup_paths else None if not backup_path: continue result = self._try_protected_allocation( request, working_path, backup_path, working_routes.modulations[idx], working_routes.backup_modulations[idx] if working_routes.backup_modulations else working_routes.modulations[idx], working_routes.weights_km[idx], working_routes.backup_weights_km[idx] if working_routes.backup_weights_km else working_routes.weights_km[idx], network_state, connection_index=working_routes.connection_index, path_index=idx, ) if result is not None and result.success: request.status = RequestStatus.ALLOCATED return result return AllocationResult( success=False, block_reason=BlockReason.PROTECTION_FAIL, route_result=working_routes, ) def _try_protected_allocation( self, request: Request, working_path: tuple[str, ...], backup_path: tuple[str, ...], working_mods: tuple[str, ...], backup_mods: tuple[str, ...], working_weight: float, backup_weight: float, network_state: NetworkState, connection_index: int | None = None, path_index: int = 0, ) -> AllocationResult | None: """Try to allocate both working and backup paths atomically.""" from fusion.domain.results import AllocationResult # Find spectrum for working path working_spectrum = self.spectrum.find_spectrum( list(working_path), working_mods[0], request.bandwidth_gbps, network_state, connection_index=connection_index, path_index=path_index, ) if not working_spectrum.is_free: return None # Create working lightpath working_lp = network_state.create_lightpath( path=list(working_path), start_slot=working_spectrum.start_slot, end_slot=working_spectrum.end_slot, core=working_spectrum.core, band=working_spectrum.band, modulation=working_spectrum.modulation, bandwidth_gbps=request.bandwidth_gbps, path_weight_km=working_weight, guard_slots=self.config.guard_slots, connection_index=connection_index, arrive_time=request.arrive_time, ) # Find spectrum for backup path backup_spectrum = self.spectrum.find_spectrum( list(backup_path), backup_mods[0], request.bandwidth_gbps, network_state, connection_index=connection_index, path_index=path_index, ) if not backup_spectrum.is_free: # Rollback working network_state.release_lightpath(working_lp.lightpath_id) return None # Create backup lightpath backup_lp = network_state.create_lightpath( path=list(backup_path), start_slot=backup_spectrum.start_slot, end_slot=backup_spectrum.end_slot, core=backup_spectrum.core, band=backup_spectrum.band, modulation=backup_spectrum.modulation, bandwidth_gbps=request.bandwidth_gbps, path_weight_km=backup_weight, guard_slots=self.config.guard_slots, connection_index=connection_index, arrive_time=request.arrive_time, ) # SNR validation for both (if enabled) if self.snr and self.config.snr_enabled: working_snr = self.snr.validate(working_lp, network_state) if not working_snr.passed: network_state.release_lightpath(backup_lp.lightpath_id) network_state.release_lightpath(working_lp.lightpath_id) return None backup_snr = self.snr.validate(backup_lp, network_state) if not backup_snr.passed: network_state.release_lightpath(backup_lp.lightpath_id) network_state.release_lightpath(working_lp.lightpath_id) return None # Link working and backup LPs working_lp.protection_lp_id = backup_lp.lightpath_id backup_lp.working_lp_id = working_lp.lightpath_id # Update request request.lightpath_ids.extend([working_lp.lightpath_id, backup_lp.lightpath_id]) return AllocationResult( success=True, lightpaths_created=(working_lp.lightpath_id, backup_lp.lightpath_id), total_bandwidth_allocated_gbps=request.bandwidth_gbps, is_protected=True, spectrum_result=working_spectrum, ) def _try_allocate_on_path( self, request: Request, path: tuple[str, ...], modulations: tuple[str, ...], weight_km: float, bandwidth_gbps: int, network_state: NetworkState, allow_slicing: bool = True, slicing_only: bool = False, connection_index: int | None = None, path_index: int = 0, use_dynamic_slicing: bool = False, lp_capacity_override: int | None = None, snr_bandwidth: int | None = None, slicing_target_bw: int | None = None, actual_bw_to_allocate: int | None = None, num_paths: int = 3, ) -> AllocationResult | None: """ Try to allocate on a single path. :param request: The request to allocate :param path: Path to try allocation on :param modulations: Valid modulations for this path :param weight_km: Path weight in km :param bandwidth_gbps: Bandwidth for spectrum calculation :param network_state: Current network state :param allow_slicing: Whether slicing fallback is allowed :param slicing_only: Skip standard allocation (only try slicing) :param connection_index: External routing index for pre-calculated SNR lookup :param path_index: Index of which k-path is being tried (0, 1, 2...) :param use_dynamic_slicing: Use dynamic slicing spectrum method :param lp_capacity_override: Override LP capacity (for partial grooming) :param snr_bandwidth: Bandwidth for SNR checks :param slicing_target_bw: Actual bandwidth to serve via dynamic slicing :param actual_bw_to_allocate: Actual bandwidth to allocate/track :param num_paths: Total number of paths being tried :return: AllocationResult if successful, None otherwise :rtype: AllocationResult | None """ # Try standard allocation with ALL modulations at once (like legacy) # Filter out False/None values (modulations that don't reach path distance) if not slicing_only: valid_mods = [mod for mod in modulations if mod and mod is not False] if valid_mods: spectrum_result = self.spectrum.find_spectrum( list(path), valid_mods, bandwidth_gbps, network_state, connection_index=connection_index, path_index=path_index, use_dynamic_slicing=use_dynamic_slicing, snr_bandwidth=snr_bandwidth, ) if spectrum_result.is_free: # Handle dynamic_lps mode: achieved_bandwidth may be < requested achieved_bw = spectrum_result.achieved_bandwidth_gbps if allow_slicing and self.config.dynamic_lps and achieved_bw is not None and achieved_bw < bandwidth_gbps: # Dynamic slicing: create multiple lightpaths # Only do this in slicing stage (allow_slicing=True), not standard allocation # Use slicing_target_bw if provided (for partial grooming), # otherwise use bandwidth_gbps actual_remaining = slicing_target_bw if slicing_target_bw is not None else bandwidth_gbps result = self._allocate_dynamic_slices( request=request, path=path, weight_km=weight_km, remaining_bw=actual_remaining, slice_bw=achieved_bw, first_spectrum_result=spectrum_result, network_state=network_state, connection_index=connection_index, path_index=path_index, num_paths=num_paths, ) if result is not None: return result elif not self.config.dynamic_lps or achieved_bw is None or achieved_bw >= bandwidth_gbps: # Standard allocation: single lightpath # Only allocate if we have full bandwidth (or not in dynamic_lps mode) # When use_dynamic_slicing=True (Stage 4), use slicing_flag=True for SNR # recheck to match Legacy's segment_slicing behavior # Use actual_bw_to_allocate for allocation tracking (defaults to bandwidth_gbps) bw_to_allocate = actual_bw_to_allocate if actual_bw_to_allocate is not None else bandwidth_gbps alloc_result = self._allocate_and_validate( request, path, spectrum_result, weight_km, bw_to_allocate, # Use actual bandwidth to allocate, not spectrum bandwidth network_state, connection_index=connection_index, lp_capacity_override=lp_capacity_override, path_index=path_index, slicing_flag=use_dynamic_slicing, ) if alloc_result is not None: return alloc_result # Fallback to slicing (if enabled and allowed) # Also allow slicing for flex-grid + dynamic_lps (handled by slicing pipeline) slicing_allowed = self.config.slicing_enabled or (self.config.dynamic_lps and not self.config.fixed_grid) # Don't fall through to slicing pipeline if we already tried fixed-grid dynamic slicing # (use_dynamic_slicing=True means _allocate_dynamic_slices was already called) if use_dynamic_slicing and self.config.fixed_grid: slicing_allowed = False if allow_slicing and self.slicing and slicing_allowed: # Get first valid modulation for slicing first_valid_mod = next((m for m in modulations if m and m is not False), "") # Use actual_bw_to_allocate for slicing (defaults to bandwidth_gbps) bw_for_slicing = actual_bw_to_allocate if actual_bw_to_allocate is not None else bandwidth_gbps slicing_result = self.slicing.try_slice( request, list(path), first_valid_mod, bw_for_slicing, # Use actual bandwidth to serve for slicing network_state, spectrum_pipeline=self.spectrum, snr_pipeline=self.snr, connection_index=connection_index, path_index=path_index, snr_accumulator=self._failed_attempt_snr_list, # Accumulate SNR across try_slice calls path_weight=weight_km, # Pass routing weight for metrics tracking ) # Convert SlicingResult to AllocationResult if slicing_result.success: from fusion.domain.results import AllocationResult # With accumulator, slicing appends directly to _failed_attempt_snr_list # so we use the accumulator values (not slicing_result.failed_attempt_snr_values) accumulated_snr = tuple(self._failed_attempt_snr_list) return AllocationResult( success=True, lightpaths_created=slicing_result.lightpaths_created, is_sliced=slicing_result.is_sliced, total_bandwidth_allocated_gbps=slicing_result.total_bandwidth_gbps, failed_attempt_snr_values=accumulated_snr, ) return None def _allocate_and_validate( self, request: Request, path: tuple[str, ...], spectrum_result: SpectrumResult, weight_km: float, bandwidth_gbps: int, network_state: NetworkState, connection_index: int | None = None, lp_capacity_override: int | None = None, path_index: int = 0, slicing_flag: bool = False, ) -> AllocationResult | None: """Allocate lightpath and validate SNR with congestion handling.""" from fusion.domain.results import AllocationResult # Determine lightpath capacity: # 1. If lp_capacity_override is set, use it # 2. Elif achieved_bandwidth_gbps is set (from spectrum_props.lightpath_bandwidth), use it # This is set by spectrum_adapter to match LEGACY behavior (sdn_props.bandwidth) # 3. Else fallback to the requested bandwidth_gbps if lp_capacity_override is not None: lp_capacity = lp_capacity_override elif spectrum_result.achieved_bandwidth_gbps is not None: lp_capacity = spectrum_result.achieved_bandwidth_gbps else: lp_capacity = bandwidth_gbps # Create lightpath # Note: end_slot from spectrum_result includes guard slots, # so we must pass guard_slots for correct release behavior lightpath = network_state.create_lightpath( path=list(path), start_slot=spectrum_result.start_slot, end_slot=spectrum_result.end_slot, core=spectrum_result.core, band=spectrum_result.band, modulation=spectrum_result.modulation, bandwidth_gbps=lp_capacity, path_weight_km=weight_km, guard_slots=self.config.guard_slots, connection_index=connection_index, arrive_time=request.arrive_time, ) # Store SNR value from spectrum assignment for metrics tracking # This is the SNR calculated during spectrum assignment (before lightpath creation) # which matches legacy behavior if spectrum_result.snr_db is not None: lightpath.snr_db = spectrum_result.snr_db # SNR recheck: check if existing lightpaths would be degraded by new allocation # NOTE: We do NOT re-validate the new lightpath's SNR here because: # 1. SNR was already validated during spectrum assignment (if snr_type is set) # 2. Legacy snr_recheck_after_allocation only checks EXISTING lightpaths # 3. Modulations are chosen for capacity, not SNR margin after allocation if self.snr and self.config.snr_recheck: recheck_result = self.snr.recheck_affected(lightpath.lightpath_id, network_state, slicing_flag=slicing_flag) if not recheck_result.all_pass: # Rollback: existing LP would fail SNR logger.debug(f"SNR recheck failed for request {request.request_id}: affected LPs {recheck_result.degraded_lightpath_ids}") # Fixed legacy behavior: SNR value is removed when recheck fails # (Previously we kept stale SNR values to match legacy's bug) network_state.release_lightpath(lightpath.lightpath_id) return None # Success: link request to lightpath and update remaining bandwidth # Use allocate_bandwidth to properly track time_bw_usage for utilization stats lightpath.allocate_bandwidth(request.request_id, bandwidth_gbps, timestamp=request.arrive_time) request.lightpath_ids.append(lightpath.lightpath_id) # Track the successful attempt's SNR for stats tracking if spectrum_result.snr_db is not None and hasattr(self, "_failed_attempt_snr_list"): self._failed_attempt_snr_list.append(spectrum_result.snr_db) # Include accumulated SNR values in result for stats tracking failed_snr = tuple(self._failed_attempt_snr_list) if hasattr(self, "_failed_attempt_snr_list") else () return AllocationResult( success=True, lightpaths_created=(lightpath.lightpath_id,), total_bandwidth_allocated_gbps=bandwidth_gbps, spectrum_result=spectrum_result, failed_attempt_snr_values=failed_snr, ) def _allocate_dynamic_slices( self, request: Request, path: tuple[str, ...], weight_km: float, remaining_bw: int, slice_bw: int, first_spectrum_result: SpectrumResult, network_state: NetworkState, connection_index: int | None = None, path_index: int = 0, num_paths: int = 3, ) -> AllocationResult | None: """ Allocate multiple lightpaths for dynamic_lps mode. When dynamic slicing returns achieved_bandwidth < requested_bandwidth, create multiple lightpaths to satisfy the full request. :param request: The request being processed :param path: Path for all lightpaths :param weight_km: Path weight in km :param remaining_bw: Total bandwidth still needed :param slice_bw: Bandwidth per lightpath (from dynamic slicing) :param first_spectrum_result: First spectrum result to use :param network_state: Current network state :param connection_index: External routing index :param path_index: Path index for SNR lookup :param num_paths: Total number of paths being tried :return: AllocationResult if successful, None otherwise :rtype: AllocationResult | None """ from fusion.domain.results import AllocationResult allocated_lightpaths: list[int] = [] total_allocated = 0 spectrum_result = first_spectrum_result max_iterations = 20 # Safety limit # Capture initial state BEFORE allocating - for partial serving check initial_lightpath_count = len(request.lightpath_ids) iteration = 0 while remaining_bw > 0 and iteration < max_iterations: iteration += 1 if not spectrum_result.is_free: # No more spectrum available break # Get actual slice bandwidth (use achieved_bw if available, else slice_bw) actual_slice_bw = spectrum_result.achieved_bandwidth_gbps or slice_bw # Create lightpath for this slice lightpath = network_state.create_lightpath( path=list(path), start_slot=spectrum_result.start_slot, end_slot=spectrum_result.end_slot, core=spectrum_result.core, band=spectrum_result.band, modulation=spectrum_result.modulation, bandwidth_gbps=actual_slice_bw, path_weight_km=weight_km, guard_slots=self.config.guard_slots, connection_index=connection_index, arrive_time=request.arrive_time, ) # Store SNR value from spectrum assignment for metrics tracking # This is the SNR calculated during spectrum assignment (before lightpath creation) snr_was_added = False if spectrum_result.snr_db is not None: lightpath.snr_db = spectrum_result.snr_db # Track SNR for metrics (add before recheck, like legacy does) if hasattr(self, "_failed_attempt_snr_list"): self._failed_attempt_snr_list.append(spectrum_result.snr_db) snr_was_added = True # SNR recheck for affected existing lightpaths (legacy behavior) # NOTE: Legacy does NOT re-validate the new LP's SNR here - only checks existing LPs if self.snr and self.config.snr_recheck: recheck_result = self.snr.recheck_affected(lightpath.lightpath_id, network_state, slicing_flag=True) if not recheck_result.all_pass: # Rollback: existing LP would fail SNR logger.debug( f"Dynamic slice SNR recheck failed for request {request.request_id}: " f"affected LPs {recheck_result.degraded_lightpath_ids}" ) # Fixed legacy behavior: remove SNR value when recheck fails if snr_was_added and self._failed_attempt_snr_list: self._failed_attempt_snr_list.pop() network_state.release_lightpath(lightpath.lightpath_id) break # Calculate how much bandwidth to dedicate to this lightpath # Don't over-allocate: only use what the request actually needs dedicated_bw = min(actual_slice_bw, remaining_bw) # Success: link request to lightpath # Use allocate_bandwidth to properly track time_bw_usage lightpath.allocate_bandwidth(request.request_id, dedicated_bw, timestamp=request.arrive_time) request.lightpath_ids.append(lightpath.lightpath_id) allocated_lightpaths.append(lightpath.lightpath_id) total_allocated += dedicated_bw remaining_bw -= dedicated_bw if remaining_bw > 0: # Validate modulation before next iteration # Can be empty/False if previous spectrum assignment failed internally mod_for_next = spectrum_result.modulation if not mod_for_next or mod_for_next is False: break # Find spectrum for next slice spectrum_result = self.spectrum.find_spectrum( list(path), mod_for_next, remaining_bw, network_state, connection_index=connection_index, path_index=path_index, use_dynamic_slicing=True, ) if total_allocated > 0: # Legacy behavior: only accept partial allocation on the LAST path # (unless request was already partially groomed) # This ensures we try ALL paths fully before settling for partial is_partial = remaining_bw > 0 is_last_path = path_index >= num_paths - 1 was_partially_groomed = initial_lightpath_count > 0 # Had groomed LPs BEFORE this allocation if is_partial and not is_last_path and not was_partially_groomed: # Not on last path and didn't fully allocate - rollback and try next path for lp_id in allocated_lightpaths: # Remove from request's lightpath_ids if added if lp_id in request.lightpath_ids: request.lightpath_ids.remove(lp_id) network_state.release_lightpath(lp_id) return None # Include accumulated SNR values for Legacy compatibility failed_snr = tuple(self._failed_attempt_snr_list) if hasattr(self, "_failed_attempt_snr_list") else () return AllocationResult( success=True, lightpaths_created=tuple(allocated_lightpaths), is_sliced=len(allocated_lightpaths) > 1, total_bandwidth_allocated_gbps=total_allocated, spectrum_result=first_spectrum_result, failed_attempt_snr_values=failed_snr, path_index=path_index, ) # Complete failure - rollback any allocated lightpaths for lp_id in allocated_lightpaths: network_state.release_lightpath(lp_id) return None def _handle_failure( self, request: Request, groomed_lightpaths: list[int], reason: BlockReason, network_state: NetworkState, path_index: int = 0, ) -> AllocationResult: """Handle allocation failure with grooming rollback (P3.2.f).""" from fusion.domain.request import RequestStatus from fusion.domain.results import AllocationResult # Accept partial grooming if allowed if groomed_lightpaths and self.config.can_partially_serve: groomed_bw = self._sum_groomed_bandwidth(groomed_lightpaths, request, network_state) # Only return success if we actually allocated some bandwidth if groomed_bw > 0: request.status = RequestStatus.PARTIALLY_GROOMED # CRITICAL: Add groomed lightpath IDs so release can return bandwidth request.lightpath_ids.extend(groomed_lightpaths) return AllocationResult( success=True, lightpaths_groomed=tuple(groomed_lightpaths), is_groomed=True, is_partially_groomed=True, total_bandwidth_allocated_gbps=groomed_bw, path_index=path_index, ) # Rollback grooming if any (P3.2.f) if groomed_lightpaths and self.grooming: logger.debug(f"Rolling back {len(groomed_lightpaths)} groomed LPs for request {request.request_id}") self.grooming.rollback_groom(request, groomed_lightpaths, network_state) request.status = RequestStatus.BLOCKED request.block_reason = reason return AllocationResult(success=False, block_reason=reason, path_index=path_index) def _sum_groomed_bandwidth( self, lightpath_ids: list[int], request: Request, network_state: NetworkState, ) -> int: """Sum bandwidth allocated to request from groomed lightpaths.""" total = 0 for lp_id in lightpath_ids: lp = network_state.get_lightpath(lp_id) if lp and request.request_id in lp.request_allocations: total += lp.request_allocations[request.request_id] return total def _combine_results( self, request: Request, groomed_lightpaths: list[int], alloc_result: AllocationResult, route_result: RouteResult | None = None, path_index: int = 0, groomed_bandwidth_gbps: int = 0, ) -> AllocationResult: """Combine groomed and allocated results.""" from fusion.domain.request import RequestStatus from fusion.domain.results import AllocationResult if groomed_lightpaths: request.status = RequestStatus.PARTIALLY_GROOMED # CRITICAL: Add groomed lightpath IDs so release can return bandwidth request.lightpath_ids.extend(groomed_lightpaths) else: request.status = RequestStatus.ALLOCATED # Total bandwidth = groomed + newly allocated total_bw = groomed_bandwidth_gbps + alloc_result.total_bandwidth_allocated_gbps # Include failed attempt SNR values for Legacy compatibility # With snr_accumulator, slicing appends directly to _failed_attempt_snr_list # so we just use the orchestrator's list (which has all values) failed_snr = tuple(self._failed_attempt_snr_list) if hasattr(self, "_failed_attempt_snr_list") else () return AllocationResult( success=True, lightpaths_created=alloc_result.lightpaths_created, lightpaths_groomed=tuple(groomed_lightpaths), is_groomed=len(groomed_lightpaths) > 0, is_partially_groomed=(len(groomed_lightpaths) > 0 and len(alloc_result.lightpaths_created) > 0), is_sliced=alloc_result.is_sliced, total_bandwidth_allocated_gbps=total_bw, path_index=path_index, route_result=route_result, spectrum_result=alloc_result.spectrum_result, failed_attempt_snr_values=failed_snr, )
[docs] def handle_release( self, request: Request, network_state: NetworkState, ) -> None: """ Handle request release. Releases bandwidth from all lightpaths associated with the request. If a lightpath has no remaining allocations, it is fully released. """ from fusion.domain.request import RequestStatus for lp_id in list(request.lightpath_ids): lp = network_state.get_lightpath(lp_id) if lp is None: continue # Release this request's bandwidth using release_bandwidth to track time_bw_usage if request.request_id in lp.request_allocations: lp.release_bandwidth(request.request_id, timestamp=request.depart_time) # Handle protection: release backup if this is working if hasattr(lp, "protection_lp_id") and lp.protection_lp_id: backup = network_state.get_lightpath(lp.protection_lp_id) if backup and not backup.request_allocations: network_state.release_lightpath(lp.protection_lp_id) # Release lightpath if no more allocations if not lp.request_allocations: network_state.release_lightpath(lp_id) request.lightpath_ids.clear() request.status = RequestStatus.RELEASED
@property def policy(self) -> ControlPolicy | None: """Get the current control policy (None if not set).""" return self._policy @policy.setter def policy(self, value: ControlPolicy | None) -> None: """Set the control policy.""" self._policy = value @property def rl_adapter(self) -> RLSimulationAdapter | None: """Get the RL adapter (None if not set).""" return self._rl_adapter @rl_adapter.setter def rl_adapter(self, value: RLSimulationAdapter | None) -> None: """Set the RL adapter.""" self._rl_adapter = value @property def protection_pipeline(self) -> ProtectionPipeline | None: """Get the protection pipeline (None if not set).""" return self._protection_pipeline @protection_pipeline.setter def protection_pipeline(self, value: ProtectionPipeline | None) -> None: """Set the protection pipeline.""" self._protection_pipeline = value
[docs] def has_policy(self) -> bool: """Check if a policy is configured.""" return self._policy is not None
[docs] def handle_arrival_with_policy( self, request: Request, network_state: NetworkState, ) -> AllocationResult: """ Handle request arrival using the configured policy. This method provides policy-driven path selection while maintaining backward compatibility with handle_arrival(). It builds PathOption list via RL adapter, calls policy.select_action() to get selected path, validates action, and routes through standard allocation with forced path. If no policy is configured, delegates to handle_arrival(). Protection pipeline is only used when protection_enabled is True in config, request.protection_required is True, and protection_pipeline is set. :param request: The incoming request to process :type request: Request :param network_state: Current network state (passed per call) :type network_state: NetworkState :return: AllocationResult with success/failure and details :rtype: AllocationResult """ from fusion.domain.request import BlockReason from fusion.domain.results import AllocationResult # If no policy configured, delegate to standard handler if self._policy is None: return self.handle_arrival(request, network_state) # Create RL adapter lazily if not set if self._rl_adapter is None: from fusion.modules.rl.adapter import RLSimulationAdapter self._rl_adapter = RLSimulationAdapter(self) # Log policy being used policy_name = self._policy.get_name() logger.debug( "Request %s: Using policy %s", request.request_id, policy_name, ) # Check for protected request with protection pipeline if self.config.protection_enabled and getattr(request, "protection_required", False) and self._protection_pipeline is not None: return self._handle_protected_with_policy(request, network_state) # Build path options via adapter options = self._rl_adapter.get_path_options(request, network_state) if not options: logger.debug( "Request %s: No path options available", request.request_id, ) return AllocationResult( success=False, block_reason=BlockReason.NO_PATH, ) # Call policy to select action action = self._policy.select_action(request, options, network_state) logger.debug( "Request %s: Policy %s selected action %d (of %d options)", request.request_id, policy_name, action, len(options), ) # Validate action if action < 0 or action >= len(options): logger.debug( "Request %s: Invalid action %d, blocking", request.request_id, action, ) return AllocationResult( success=False, block_reason=BlockReason.CONGESTION, ) # Check feasibility selected_option = options[action] if not selected_option.is_feasible: logger.debug( "Request %s: Selected action %d is infeasible, blocking", request.request_id, action, ) return AllocationResult( success=False, block_reason=BlockReason.CONGESTION, ) # Route through standard handler with forced path result = self.handle_arrival( request, network_state, forced_path=list(selected_option.path), forced_modulation=selected_option.modulation, forced_path_index=action, ) # TODO: Reward values are hardcoded. Should be configurable via config or # computed based on metrics (e.g., bandwidth efficiency, path length, SNR margin). # Update policy with outcome (for learning policies) if result.success: reward = 1.0 else: reward = -1.0 logger.warning("Using hardcoded reward values (success=1.0, failure=-1.0) for policy update") self._policy.update(request, action, reward) return result
def _handle_protected_with_policy( self, request: Request, network_state: NetworkState, ) -> AllocationResult: """ Handle protected request with policy selection. When protection is enabled and a protection pipeline is configured, finds disjoint path pairs, builds PathOptions with protection info, calls policy to select among protected options, and allocates same spectrum on both paths. :param request: The protected request :type request: Request :param network_state: Current network state :type network_state: NetworkState :return: AllocationResult indicating success/failure :rtype: AllocationResult """ from fusion.domain.request import BlockReason from fusion.domain.results import AllocationResult # Should only be called when protection_pipeline is set if self._protection_pipeline is None: return self._handle_protected_arrival(request, network_state) # Get topology from network state if not hasattr(network_state, "topology"): logger.warning("NetworkState has no topology, falling back to standard protection") return self._handle_protected_arrival(request, network_state) topology = network_state.topology # Find disjoint path pair paths = self._protection_pipeline.find_protected_paths(topology, request.source, request.destination) if paths is None: logger.debug( "Request %s: No disjoint paths found for protection", request.request_id, ) return AllocationResult( success=False, block_reason=BlockReason.PROTECTION_FAIL, ) primary_path, backup_path = paths # Verify disjointness if not self._protection_pipeline.verify_disjointness(primary_path, backup_path): logger.warning( "Request %s: Path pair failed disjointness verification", request.request_id, ) return AllocationResult( success=False, block_reason=BlockReason.PROTECTION_FAIL, ) # TODO: Slots calculation is hardcoded and incorrect. Should use modulation format, # slot width (e.g., 12.5 GHz), and spectral efficiency to compute actual slots needed. # This placeholder assumes 25 Gbps per slot which is not accurate. slots_needed = request.bandwidth_gbps // 25 + 1 logger.warning("Using hardcoded slots calculation (bandwidth // 25 + 1) for protected allocation") # Try to allocate protected spectrum alloc_result = self._protection_pipeline.allocate_protected( primary_path=primary_path, backup_path=backup_path, slots_needed=slots_needed, network_state=network_state, ) if not alloc_result.success: logger.debug( "Request %s: No common spectrum for protection: %s", request.request_id, alloc_result.failure_reason, ) return AllocationResult( success=False, block_reason=BlockReason.PROTECTION_FAIL, ) # Create protected lightpath # Delegate to standard protected allocation for actual lightpath creation return self._handle_protected_arrival(request, network_state)