"""
SDN Orchestrator for FUSION simulation.
This module provides SDNOrchestrator, a thin coordination layer
that routes requests through pipelines without implementing
algorithm logic.
RULES (enforced by code review):
- No algorithm logic (K-shortest-path, first-fit, SNR calculation)
- No direct numpy access
- No hardcoded slicing/grooming logic
- Receives NetworkState per call, never stores it
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fusion.core.pipeline_factory import PipelineSet
from fusion.domain.config import SimulationConfig
from fusion.domain.network_state import NetworkState
from fusion.domain.request import BlockReason, Request
from fusion.domain.results import AllocationResult, RouteResult, SpectrumResult
from fusion.interfaces.control_policy import ControlPolicy
from fusion.interfaces.pipelines import (
GroomingPipeline,
RoutingPipeline,
SlicingPipeline,
SNRPipeline,
SpectrumPipeline,
)
from fusion.modules.rl.adapter import RLSimulationAdapter
from fusion.pipelines.protection_pipeline import ProtectionPipeline
logger = logging.getLogger(__name__)
[docs]
class SDNOrchestrator:
"""
Thin coordination layer for request handling.
The orchestrator sequences pipeline calls and combines their
results. It does NOT implement algorithm logic - all computation
is delegated to pipelines.
Does NOT store network_state - receives per call only.
:ivar config: Simulation configuration
:ivar routing: Pipeline for finding candidate routes
:ivar spectrum: Pipeline for spectrum assignment
:ivar grooming: Pipeline for traffic grooming (optional)
:ivar snr: Pipeline for SNR validation (optional)
:ivar slicing: Pipeline for request slicing (optional)
"""
__slots__ = (
"config",
"routing",
"spectrum",
"grooming",
"snr",
"slicing",
"_failed_attempt_snr_list",
"_last_path_index",
"current_iteration",
"_policy",
"_rl_adapter",
"_protection_pipeline",
)
[docs]
def __init__(
self,
config: SimulationConfig,
pipelines: PipelineSet,
policy: ControlPolicy | None = None,
rl_adapter: RLSimulationAdapter | None = None,
protection_pipeline: ProtectionPipeline | None = None,
) -> None:
"""
Initialize orchestrator with config and pipelines.
:param config: Simulation configuration
:type config: SimulationConfig
:param pipelines: Container with all pipeline implementations
:type pipelines: PipelineSet
:param policy: Optional control policy for path selection
:type policy: ControlPolicy | None
:param rl_adapter: Optional RL adapter for building options
:type rl_adapter: RLSimulationAdapter | None
:param protection_pipeline: Optional protection pipeline
:type protection_pipeline: ProtectionPipeline | None
"""
self.config: SimulationConfig = config
self.routing: RoutingPipeline = pipelines.routing
self.spectrum: SpectrumPipeline = pipelines.spectrum
self.grooming: GroomingPipeline | None = pipelines.grooming
self.snr: SNRPipeline | None = pipelines.snr
self.slicing: SlicingPipeline | None = pipelines.slicing
# Track last path_index like Legacy does (for groomed request path_index matching)
self._last_path_index: int = 0
# Current iteration for debug purposes (set by simulation)
self.current_iteration: int = 0
# Policy integration (all optional)
self._policy: ControlPolicy | None = policy
self._rl_adapter: RLSimulationAdapter | None = rl_adapter
self._protection_pipeline: ProtectionPipeline | None = protection_pipeline
[docs]
def handle_arrival(
self,
request: Request,
network_state: NetworkState,
forced_path: list[str] | None = None,
forced_modulation: str | None = None,
forced_path_index: int | None = None,
) -> AllocationResult:
"""
Handle request arrival by coordinating pipelines.
:param request: The incoming request to process
:type request: Request
:param network_state: Current network state (passed per call)
:type network_state: NetworkState
:param forced_path: Optional forced path from external source
:type forced_path: list[str] | None
:param forced_modulation: Optional forced modulation format (for RL)
:type forced_modulation: str | None
:param forced_path_index: Optional path index for stats tracking (for RL)
:type forced_path_index: int | None
:return: AllocationResult with success/failure and details
:rtype: AllocationResult
"""
# Branch for protection-enabled requests (P3.2.g)
if self.config.protection_enabled and getattr(request, "protection_required", False):
return self._handle_protected_arrival(request, network_state)
# Standard unprotected flow
return self._handle_unprotected_arrival(request, network_state, forced_path, forced_modulation, forced_path_index)
def _handle_unprotected_arrival(
self,
request: Request,
network_state: NetworkState,
forced_path: list[str] | None = None,
forced_modulation: str | None = None,
forced_path_index: int | None = None,
) -> AllocationResult:
"""Handle standard (unprotected) request arrival."""
from fusion.domain.request import BlockReason, RequestStatus
from fusion.domain.results import AllocationResult
groomed_lightpaths: list[int] = []
remaining_bw = request.bandwidth_gbps
was_partially_groomed = False
# Track SNR values from failed allocation attempts (for Legacy compatibility)
# Legacy adds SNR to snr_list before SNR recheck; if recheck fails, value stays
self._failed_attempt_snr_list = [] # type: list[float]
# Stage 1: Grooming (if enabled)
if self.grooming and self.config.grooming_enabled:
groom_result = self.grooming.try_groom(request, network_state)
if groom_result.fully_groomed:
request.status = RequestStatus.GROOMED
# CRITICAL: Add groomed lightpath IDs to request so release can return bandwidth
request.lightpath_ids.extend(groom_result.lightpaths_used)
# Use _last_path_index to match Legacy behavior (Legacy keeps stale path_index)
return AllocationResult(
success=True,
is_groomed=True,
lightpaths_groomed=tuple(groom_result.lightpaths_used),
total_bandwidth_allocated_gbps=request.bandwidth_gbps,
grooming_result=groom_result,
path_index=self._last_path_index,
)
if groom_result.partially_groomed:
groomed_lightpaths = list(groom_result.lightpaths_used)
remaining_bw = groom_result.remaining_bandwidth_gbps
was_partially_groomed = True
if groom_result.forced_path:
forced_path = list(groom_result.forced_path)
# LP capacity is determined by spectrum assignment (modulation-based capacity)
# NOT by original request bandwidth. For slicing, spectrum/slicing code sets
# lightpath_bandwidth based on what the modulation can support.
lp_capacity_override = None
# Stage 2: Routing
route_result = self.routing.find_routes(
request.source,
request.destination,
remaining_bw,
network_state,
forced_path=forced_path,
)
if route_result.is_empty:
return self._handle_failure(request, groomed_lightpaths, BlockReason.NO_PATH, network_state)
# Override modulations with forced_modulation if provided (for RL)
# This mimics legacy RL behavior where get_path_modulation selects ONE modulation
if forced_modulation:
# Create new RouteResult with just the forced modulation for each path
from dataclasses import replace
route_result = replace(route_result, modulations=tuple((forced_modulation,) for _ in route_result.paths))
# For partial grooming, LEGACY uses original request bandwidth for spectrum allocation
# (not remaining_bw). This affects slots_needed calculation and SNR checks.
spectrum_bw = request.bandwidth_gbps if was_partially_groomed else remaining_bw
# Track the last path index tried (for blocked request reporting)
last_path_idx = 0
# Stage 3: Try standard allocation on ALL paths first (no slicing)
# This applies to ALL modes including dynamic_lps - legacy tries standard first
# and only falls back to dynamic slicing if standard fails on ALL paths
for loop_idx, path in enumerate(route_result.paths):
# Use forced_path_index for stats tracking when provided (RL mode)
path_idx = forced_path_index if forced_path_index is not None else loop_idx
last_path_idx = path_idx
self._last_path_index = path_idx # Track for groomed requests (Legacy behavior)
modulations = route_result.modulations[loop_idx]
weight_km = route_result.weights_km[loop_idx]
# For partial grooming, actual_bw_to_allocate = remaining_bw (what we need to serve)
# but spectrum_bw = original request (for spectrum calculation)
actual_bw_to_allocate = remaining_bw if was_partially_groomed else spectrum_bw
result = self._try_allocate_on_path(
request,
path,
modulations,
weight_km,
spectrum_bw,
network_state,
allow_slicing=False, # Don't try slicing yet
connection_index=route_result.connection_index,
path_index=path_idx,
lp_capacity_override=lp_capacity_override,
actual_bw_to_allocate=actual_bw_to_allocate,
num_paths=len(route_result.paths),
)
if result is not None:
groomed_bw = request.bandwidth_gbps - remaining_bw
return self._combine_results(
request, groomed_lightpaths, result, route_result, path_index=path_idx, groomed_bandwidth_gbps=groomed_bw
)
# Stage 4: Try dynamic_lps slicing on ALL paths (FIXED-GRID ONLY)
# For flex-grid, skip to Stage 5 which uses the slicing pipeline's
# _try_allocate_dynamic method that properly handles flex-grid dynamic_lps.
# Note: This uses use_dynamic_slicing=True which behaves differently from
# legacy's handle_dynamic_slicing_direct, but achieves similar blocking rates.
# Legacy behavior: when slicing fails on a path, _handle_congestion pops
# SNR entries for rolled-back lightpaths. We replicate this by reverting
# _failed_attempt_snr_list to its state before each failed path attempt.
if self.config.dynamic_lps and self.config.fixed_grid:
for loop_idx, path in enumerate(route_result.paths):
# Use forced_path_index for stats tracking when provided (RL mode)
path_idx = forced_path_index if forced_path_index is not None else loop_idx
last_path_idx = path_idx
self._last_path_index = path_idx # Track for groomed requests (Legacy behavior)
# Record SNR count BEFORE this path's slicing attempt
# If slicing fails, we'll revert to this count (Legacy behavior)
snr_count_before_path = len(self._failed_attempt_snr_list) if hasattr(self, "_failed_attempt_snr_list") else 0
modulations = route_result.modulations[loop_idx]
weight_km = route_result.weights_km[loop_idx]
# For dynamic slicing, use remaining_bw for the slicing loop iteration
# (how much bandwidth to actually serve), but spectrum_bw for spectrum
# allocation (modulation/slots calculation). Legacy uses sdn_props.remaining_bw
# for the slicing loop when partially groomed.
slicing_target_bw = remaining_bw if was_partially_groomed else spectrum_bw
# For partial grooming, actual_bw_to_allocate = remaining_bw (for stats tracking)
actual_bw_to_allocate = remaining_bw if was_partially_groomed else spectrum_bw
result = self._try_allocate_on_path(
request,
path,
modulations,
weight_km,
spectrum_bw,
network_state,
allow_slicing=True, # Now try dynamic slicing
slicing_only=False, # Need to run through modulations to get spectrum
connection_index=route_result.connection_index,
path_index=path_idx,
use_dynamic_slicing=True, # Use dynamic slicing spectrum method
lp_capacity_override=lp_capacity_override,
slicing_target_bw=slicing_target_bw, # Actual bandwidth to serve via slicing
actual_bw_to_allocate=actual_bw_to_allocate, # For stats tracking
num_paths=len(route_result.paths),
)
if result is not None:
groomed_bw = request.bandwidth_gbps - remaining_bw
return self._combine_results(
request, groomed_lightpaths, result, route_result, path_index=path_idx, groomed_bandwidth_gbps=groomed_bw
)
else:
# Slicing failed on this path - simulate Legacy's pop(lp_idx) behavior
# Legacy's _handle_congestion pops snr_list[lp_idx] for each rolled-back LP.
# Since lightpath_id_list doesn't have Stage 3 entries (they were popped on
# SNR recheck fail), lp_idx=0 removes the FIRST snr_list entry, which is
# Stage 3's SNR. This causes Stage 3 entries to be removed before slicing
# entries. We replicate by removing N entries from the START of the list,
# where N = number of entries added during this path's attempt.
entries_added = len(self._failed_attempt_snr_list) - snr_count_before_path
if entries_added > 0 and hasattr(self, "_failed_attempt_snr_list"):
self._failed_attempt_snr_list = self._failed_attempt_snr_list[entries_added:]
# Stage 5: Try segment slicing pipeline on ALL paths (only if standard allocation failed)
# Also handles flex-grid dynamic_lps (skipped in Stage 4) via slicing pipeline's
# _try_allocate_dynamic method.
# Legacy behavior: when slicing fails on a path, _handle_congestion pops
# snr_list[lp_idx] for each rolled-back LP. We replicate by removing entries
# from the START of the list (matching Legacy's pop(0) behavior).
# IMPORTANT: Skip Stage 5 when Stage 4 already handled fixed-grid dynamic_lps.
# Stage 4 handles fixed_grid + dynamic_lps, so Stage 5 should NOT also try slicing
# in that case (it would cause double allocation on different code paths).
stage_4_handled = self.config.dynamic_lps and self.config.fixed_grid
use_slicing = (
self.slicing
and not stage_4_handled
and (self.config.slicing_enabled or (self.config.dynamic_lps and not self.config.fixed_grid))
)
if use_slicing:
for loop_idx, path in enumerate(route_result.paths):
# Use forced_path_index for stats tracking when provided (RL mode)
path_idx = forced_path_index if forced_path_index is not None else loop_idx
last_path_idx = path_idx
self._last_path_index = path_idx # Track for groomed requests (Legacy behavior)
# Record SNR count BEFORE this path's slicing attempt
snr_count_before_path = len(self._failed_attempt_snr_list) if hasattr(self, "_failed_attempt_snr_list") else 0
modulations = route_result.modulations[loop_idx]
weight_km = route_result.weights_km[loop_idx]
# For partial grooming, actual_bw_to_allocate = remaining_bw
actual_bw_to_allocate = remaining_bw if was_partially_groomed else spectrum_bw
result = self._try_allocate_on_path(
request,
path,
modulations,
weight_km,
spectrum_bw,
network_state,
allow_slicing=True, # Now try slicing
slicing_only=True, # Skip standard allocation (already tried)
connection_index=route_result.connection_index,
path_index=path_idx,
lp_capacity_override=lp_capacity_override,
actual_bw_to_allocate=actual_bw_to_allocate,
num_paths=len(route_result.paths),
)
if result is not None:
groomed_bw = request.bandwidth_gbps - remaining_bw
return self._combine_results(
request, groomed_lightpaths, result, route_result, path_index=path_idx, groomed_bandwidth_gbps=groomed_bw
)
else:
# Slicing failed on this path - simulate Legacy's pop(lp_idx) behavior
# Remove N entries from the START of the list (Stage 3 entries first)
entries_added = len(self._failed_attempt_snr_list) - snr_count_before_path
if entries_added > 0 and hasattr(self, "_failed_attempt_snr_list"):
self._failed_attempt_snr_list = self._failed_attempt_snr_list[entries_added:]
# Stage 6: All paths failed
return self._handle_failure(request, groomed_lightpaths, BlockReason.CONGESTION, network_state, path_index=last_path_idx)
def _handle_protected_arrival(
self,
request: Request,
network_state: NetworkState,
) -> AllocationResult:
"""
Handle arrival for protected (1+1) request.
Implements protection pipeline integration.
"""
from fusion.domain.request import BlockReason, RequestStatus
from fusion.domain.results import AllocationResult
# Reset SNR tracking for this request (Legacy compatibility)
self._failed_attempt_snr_list = []
# Stage 1: Find working path
working_routes = self.routing.find_routes(
request.source,
request.destination,
request.bandwidth_gbps,
network_state,
)
if working_routes.is_empty:
return AllocationResult(
success=False,
block_reason=BlockReason.NO_PATH,
route_result=working_routes,
)
# Check if routing returned backup paths (protection-aware routing)
if not working_routes.has_protection:
return AllocationResult(
success=False,
block_reason=BlockReason.PROTECTION_FAIL,
route_result=working_routes,
)
# Try each working/backup path pair
for idx, working_path in enumerate(working_routes.paths):
backup_path = working_routes.backup_paths[idx] if working_routes.backup_paths else None
if not backup_path:
continue
result = self._try_protected_allocation(
request,
working_path,
backup_path,
working_routes.modulations[idx],
working_routes.backup_modulations[idx] if working_routes.backup_modulations else working_routes.modulations[idx],
working_routes.weights_km[idx],
working_routes.backup_weights_km[idx] if working_routes.backup_weights_km else working_routes.weights_km[idx],
network_state,
connection_index=working_routes.connection_index,
path_index=idx,
)
if result is not None and result.success:
request.status = RequestStatus.ALLOCATED
return result
return AllocationResult(
success=False,
block_reason=BlockReason.PROTECTION_FAIL,
route_result=working_routes,
)
def _try_protected_allocation(
self,
request: Request,
working_path: tuple[str, ...],
backup_path: tuple[str, ...],
working_mods: tuple[str, ...],
backup_mods: tuple[str, ...],
working_weight: float,
backup_weight: float,
network_state: NetworkState,
connection_index: int | None = None,
path_index: int = 0,
) -> AllocationResult | None:
"""Try to allocate both working and backup paths atomically."""
from fusion.domain.results import AllocationResult
# Find spectrum for working path
working_spectrum = self.spectrum.find_spectrum(
list(working_path),
working_mods[0],
request.bandwidth_gbps,
network_state,
connection_index=connection_index,
path_index=path_index,
)
if not working_spectrum.is_free:
return None
# Create working lightpath
working_lp = network_state.create_lightpath(
path=list(working_path),
start_slot=working_spectrum.start_slot,
end_slot=working_spectrum.end_slot,
core=working_spectrum.core,
band=working_spectrum.band,
modulation=working_spectrum.modulation,
bandwidth_gbps=request.bandwidth_gbps,
path_weight_km=working_weight,
guard_slots=self.config.guard_slots,
connection_index=connection_index,
arrive_time=request.arrive_time,
)
# Find spectrum for backup path
backup_spectrum = self.spectrum.find_spectrum(
list(backup_path),
backup_mods[0],
request.bandwidth_gbps,
network_state,
connection_index=connection_index,
path_index=path_index,
)
if not backup_spectrum.is_free:
# Rollback working
network_state.release_lightpath(working_lp.lightpath_id)
return None
# Create backup lightpath
backup_lp = network_state.create_lightpath(
path=list(backup_path),
start_slot=backup_spectrum.start_slot,
end_slot=backup_spectrum.end_slot,
core=backup_spectrum.core,
band=backup_spectrum.band,
modulation=backup_spectrum.modulation,
bandwidth_gbps=request.bandwidth_gbps,
path_weight_km=backup_weight,
guard_slots=self.config.guard_slots,
connection_index=connection_index,
arrive_time=request.arrive_time,
)
# SNR validation for both (if enabled)
if self.snr and self.config.snr_enabled:
working_snr = self.snr.validate(working_lp, network_state)
if not working_snr.passed:
network_state.release_lightpath(backup_lp.lightpath_id)
network_state.release_lightpath(working_lp.lightpath_id)
return None
backup_snr = self.snr.validate(backup_lp, network_state)
if not backup_snr.passed:
network_state.release_lightpath(backup_lp.lightpath_id)
network_state.release_lightpath(working_lp.lightpath_id)
return None
# Link working and backup LPs
working_lp.protection_lp_id = backup_lp.lightpath_id
backup_lp.working_lp_id = working_lp.lightpath_id
# Update request
request.lightpath_ids.extend([working_lp.lightpath_id, backup_lp.lightpath_id])
return AllocationResult(
success=True,
lightpaths_created=(working_lp.lightpath_id, backup_lp.lightpath_id),
total_bandwidth_allocated_gbps=request.bandwidth_gbps,
is_protected=True,
spectrum_result=working_spectrum,
)
def _try_allocate_on_path(
self,
request: Request,
path: tuple[str, ...],
modulations: tuple[str, ...],
weight_km: float,
bandwidth_gbps: int,
network_state: NetworkState,
allow_slicing: bool = True,
slicing_only: bool = False,
connection_index: int | None = None,
path_index: int = 0,
use_dynamic_slicing: bool = False,
lp_capacity_override: int | None = None,
snr_bandwidth: int | None = None,
slicing_target_bw: int | None = None,
actual_bw_to_allocate: int | None = None,
num_paths: int = 3,
) -> AllocationResult | None:
"""
Try to allocate on a single path.
:param request: The request to allocate
:param path: Path to try allocation on
:param modulations: Valid modulations for this path
:param weight_km: Path weight in km
:param bandwidth_gbps: Bandwidth for spectrum calculation
:param network_state: Current network state
:param allow_slicing: Whether slicing fallback is allowed
:param slicing_only: Skip standard allocation (only try slicing)
:param connection_index: External routing index for pre-calculated SNR lookup
:param path_index: Index of which k-path is being tried (0, 1, 2...)
:param use_dynamic_slicing: Use dynamic slicing spectrum method
:param lp_capacity_override: Override LP capacity (for partial grooming)
:param snr_bandwidth: Bandwidth for SNR checks
:param slicing_target_bw: Actual bandwidth to serve via dynamic slicing
:param actual_bw_to_allocate: Actual bandwidth to allocate/track
:param num_paths: Total number of paths being tried
:return: AllocationResult if successful, None otherwise
:rtype: AllocationResult | None
"""
# Try standard allocation with ALL modulations at once (like legacy)
# Filter out False/None values (modulations that don't reach path distance)
if not slicing_only:
valid_mods = [mod for mod in modulations if mod and mod is not False]
if valid_mods:
spectrum_result = self.spectrum.find_spectrum(
list(path),
valid_mods,
bandwidth_gbps,
network_state,
connection_index=connection_index,
path_index=path_index,
use_dynamic_slicing=use_dynamic_slicing,
snr_bandwidth=snr_bandwidth,
)
if spectrum_result.is_free:
# Handle dynamic_lps mode: achieved_bandwidth may be < requested
achieved_bw = spectrum_result.achieved_bandwidth_gbps
if allow_slicing and self.config.dynamic_lps and achieved_bw is not None and achieved_bw < bandwidth_gbps:
# Dynamic slicing: create multiple lightpaths
# Only do this in slicing stage (allow_slicing=True), not standard allocation
# Use slicing_target_bw if provided (for partial grooming),
# otherwise use bandwidth_gbps
actual_remaining = slicing_target_bw if slicing_target_bw is not None else bandwidth_gbps
result = self._allocate_dynamic_slices(
request=request,
path=path,
weight_km=weight_km,
remaining_bw=actual_remaining,
slice_bw=achieved_bw,
first_spectrum_result=spectrum_result,
network_state=network_state,
connection_index=connection_index,
path_index=path_index,
num_paths=num_paths,
)
if result is not None:
return result
elif not self.config.dynamic_lps or achieved_bw is None or achieved_bw >= bandwidth_gbps:
# Standard allocation: single lightpath
# Only allocate if we have full bandwidth (or not in dynamic_lps mode)
# When use_dynamic_slicing=True (Stage 4), use slicing_flag=True for SNR
# recheck to match Legacy's segment_slicing behavior
# Use actual_bw_to_allocate for allocation tracking (defaults to bandwidth_gbps)
bw_to_allocate = actual_bw_to_allocate if actual_bw_to_allocate is not None else bandwidth_gbps
alloc_result = self._allocate_and_validate(
request,
path,
spectrum_result,
weight_km,
bw_to_allocate, # Use actual bandwidth to allocate, not spectrum bandwidth
network_state,
connection_index=connection_index,
lp_capacity_override=lp_capacity_override,
path_index=path_index,
slicing_flag=use_dynamic_slicing,
)
if alloc_result is not None:
return alloc_result
# Fallback to slicing (if enabled and allowed)
# Also allow slicing for flex-grid + dynamic_lps (handled by slicing pipeline)
slicing_allowed = self.config.slicing_enabled or (self.config.dynamic_lps and not self.config.fixed_grid)
# Don't fall through to slicing pipeline if we already tried fixed-grid dynamic slicing
# (use_dynamic_slicing=True means _allocate_dynamic_slices was already called)
if use_dynamic_slicing and self.config.fixed_grid:
slicing_allowed = False
if allow_slicing and self.slicing and slicing_allowed:
# Get first valid modulation for slicing
first_valid_mod = next((m for m in modulations if m and m is not False), "")
# Use actual_bw_to_allocate for slicing (defaults to bandwidth_gbps)
bw_for_slicing = actual_bw_to_allocate if actual_bw_to_allocate is not None else bandwidth_gbps
slicing_result = self.slicing.try_slice(
request,
list(path),
first_valid_mod,
bw_for_slicing, # Use actual bandwidth to serve for slicing
network_state,
spectrum_pipeline=self.spectrum,
snr_pipeline=self.snr,
connection_index=connection_index,
path_index=path_index,
snr_accumulator=self._failed_attempt_snr_list, # Accumulate SNR across try_slice calls
path_weight=weight_km, # Pass routing weight for metrics tracking
)
# Convert SlicingResult to AllocationResult
if slicing_result.success:
from fusion.domain.results import AllocationResult
# With accumulator, slicing appends directly to _failed_attempt_snr_list
# so we use the accumulator values (not slicing_result.failed_attempt_snr_values)
accumulated_snr = tuple(self._failed_attempt_snr_list)
return AllocationResult(
success=True,
lightpaths_created=slicing_result.lightpaths_created,
is_sliced=slicing_result.is_sliced,
total_bandwidth_allocated_gbps=slicing_result.total_bandwidth_gbps,
failed_attempt_snr_values=accumulated_snr,
)
return None
def _allocate_and_validate(
self,
request: Request,
path: tuple[str, ...],
spectrum_result: SpectrumResult,
weight_km: float,
bandwidth_gbps: int,
network_state: NetworkState,
connection_index: int | None = None,
lp_capacity_override: int | None = None,
path_index: int = 0,
slicing_flag: bool = False,
) -> AllocationResult | None:
"""Allocate lightpath and validate SNR with congestion handling."""
from fusion.domain.results import AllocationResult
# Determine lightpath capacity:
# 1. If lp_capacity_override is set, use it
# 2. Elif achieved_bandwidth_gbps is set (from spectrum_props.lightpath_bandwidth), use it
# This is set by spectrum_adapter to match LEGACY behavior (sdn_props.bandwidth)
# 3. Else fallback to the requested bandwidth_gbps
if lp_capacity_override is not None:
lp_capacity = lp_capacity_override
elif spectrum_result.achieved_bandwidth_gbps is not None:
lp_capacity = spectrum_result.achieved_bandwidth_gbps
else:
lp_capacity = bandwidth_gbps
# Create lightpath
# Note: end_slot from spectrum_result includes guard slots,
# so we must pass guard_slots for correct release behavior
lightpath = network_state.create_lightpath(
path=list(path),
start_slot=spectrum_result.start_slot,
end_slot=spectrum_result.end_slot,
core=spectrum_result.core,
band=spectrum_result.band,
modulation=spectrum_result.modulation,
bandwidth_gbps=lp_capacity,
path_weight_km=weight_km,
guard_slots=self.config.guard_slots,
connection_index=connection_index,
arrive_time=request.arrive_time,
)
# Store SNR value from spectrum assignment for metrics tracking
# This is the SNR calculated during spectrum assignment (before lightpath creation)
# which matches legacy behavior
if spectrum_result.snr_db is not None:
lightpath.snr_db = spectrum_result.snr_db
# SNR recheck: check if existing lightpaths would be degraded by new allocation
# NOTE: We do NOT re-validate the new lightpath's SNR here because:
# 1. SNR was already validated during spectrum assignment (if snr_type is set)
# 2. Legacy snr_recheck_after_allocation only checks EXISTING lightpaths
# 3. Modulations are chosen for capacity, not SNR margin after allocation
if self.snr and self.config.snr_recheck:
recheck_result = self.snr.recheck_affected(lightpath.lightpath_id, network_state, slicing_flag=slicing_flag)
if not recheck_result.all_pass:
# Rollback: existing LP would fail SNR
logger.debug(f"SNR recheck failed for request {request.request_id}: affected LPs {recheck_result.degraded_lightpath_ids}")
# Fixed legacy behavior: SNR value is removed when recheck fails
# (Previously we kept stale SNR values to match legacy's bug)
network_state.release_lightpath(lightpath.lightpath_id)
return None
# Success: link request to lightpath and update remaining bandwidth
# Use allocate_bandwidth to properly track time_bw_usage for utilization stats
lightpath.allocate_bandwidth(request.request_id, bandwidth_gbps, timestamp=request.arrive_time)
request.lightpath_ids.append(lightpath.lightpath_id)
# Track the successful attempt's SNR for stats tracking
if spectrum_result.snr_db is not None and hasattr(self, "_failed_attempt_snr_list"):
self._failed_attempt_snr_list.append(spectrum_result.snr_db)
# Include accumulated SNR values in result for stats tracking
failed_snr = tuple(self._failed_attempt_snr_list) if hasattr(self, "_failed_attempt_snr_list") else ()
return AllocationResult(
success=True,
lightpaths_created=(lightpath.lightpath_id,),
total_bandwidth_allocated_gbps=bandwidth_gbps,
spectrum_result=spectrum_result,
failed_attempt_snr_values=failed_snr,
)
def _allocate_dynamic_slices(
self,
request: Request,
path: tuple[str, ...],
weight_km: float,
remaining_bw: int,
slice_bw: int,
first_spectrum_result: SpectrumResult,
network_state: NetworkState,
connection_index: int | None = None,
path_index: int = 0,
num_paths: int = 3,
) -> AllocationResult | None:
"""
Allocate multiple lightpaths for dynamic_lps mode.
When dynamic slicing returns achieved_bandwidth < requested_bandwidth,
create multiple lightpaths to satisfy the full request.
:param request: The request being processed
:param path: Path for all lightpaths
:param weight_km: Path weight in km
:param remaining_bw: Total bandwidth still needed
:param slice_bw: Bandwidth per lightpath (from dynamic slicing)
:param first_spectrum_result: First spectrum result to use
:param network_state: Current network state
:param connection_index: External routing index
:param path_index: Path index for SNR lookup
:param num_paths: Total number of paths being tried
:return: AllocationResult if successful, None otherwise
:rtype: AllocationResult | None
"""
from fusion.domain.results import AllocationResult
allocated_lightpaths: list[int] = []
total_allocated = 0
spectrum_result = first_spectrum_result
max_iterations = 20 # Safety limit
# Capture initial state BEFORE allocating - for partial serving check
initial_lightpath_count = len(request.lightpath_ids)
iteration = 0
while remaining_bw > 0 and iteration < max_iterations:
iteration += 1
if not spectrum_result.is_free:
# No more spectrum available
break
# Get actual slice bandwidth (use achieved_bw if available, else slice_bw)
actual_slice_bw = spectrum_result.achieved_bandwidth_gbps or slice_bw
# Create lightpath for this slice
lightpath = network_state.create_lightpath(
path=list(path),
start_slot=spectrum_result.start_slot,
end_slot=spectrum_result.end_slot,
core=spectrum_result.core,
band=spectrum_result.band,
modulation=spectrum_result.modulation,
bandwidth_gbps=actual_slice_bw,
path_weight_km=weight_km,
guard_slots=self.config.guard_slots,
connection_index=connection_index,
arrive_time=request.arrive_time,
)
# Store SNR value from spectrum assignment for metrics tracking
# This is the SNR calculated during spectrum assignment (before lightpath creation)
snr_was_added = False
if spectrum_result.snr_db is not None:
lightpath.snr_db = spectrum_result.snr_db
# Track SNR for metrics (add before recheck, like legacy does)
if hasattr(self, "_failed_attempt_snr_list"):
self._failed_attempt_snr_list.append(spectrum_result.snr_db)
snr_was_added = True
# SNR recheck for affected existing lightpaths (legacy behavior)
# NOTE: Legacy does NOT re-validate the new LP's SNR here - only checks existing LPs
if self.snr and self.config.snr_recheck:
recheck_result = self.snr.recheck_affected(lightpath.lightpath_id, network_state, slicing_flag=True)
if not recheck_result.all_pass:
# Rollback: existing LP would fail SNR
logger.debug(
f"Dynamic slice SNR recheck failed for request {request.request_id}: "
f"affected LPs {recheck_result.degraded_lightpath_ids}"
)
# Fixed legacy behavior: remove SNR value when recheck fails
if snr_was_added and self._failed_attempt_snr_list:
self._failed_attempt_snr_list.pop()
network_state.release_lightpath(lightpath.lightpath_id)
break
# Calculate how much bandwidth to dedicate to this lightpath
# Don't over-allocate: only use what the request actually needs
dedicated_bw = min(actual_slice_bw, remaining_bw)
# Success: link request to lightpath
# Use allocate_bandwidth to properly track time_bw_usage
lightpath.allocate_bandwidth(request.request_id, dedicated_bw, timestamp=request.arrive_time)
request.lightpath_ids.append(lightpath.lightpath_id)
allocated_lightpaths.append(lightpath.lightpath_id)
total_allocated += dedicated_bw
remaining_bw -= dedicated_bw
if remaining_bw > 0:
# Validate modulation before next iteration
# Can be empty/False if previous spectrum assignment failed internally
mod_for_next = spectrum_result.modulation
if not mod_for_next or mod_for_next is False:
break
# Find spectrum for next slice
spectrum_result = self.spectrum.find_spectrum(
list(path),
mod_for_next,
remaining_bw,
network_state,
connection_index=connection_index,
path_index=path_index,
use_dynamic_slicing=True,
)
if total_allocated > 0:
# Legacy behavior: only accept partial allocation on the LAST path
# (unless request was already partially groomed)
# This ensures we try ALL paths fully before settling for partial
is_partial = remaining_bw > 0
is_last_path = path_index >= num_paths - 1
was_partially_groomed = initial_lightpath_count > 0 # Had groomed LPs BEFORE this allocation
if is_partial and not is_last_path and not was_partially_groomed:
# Not on last path and didn't fully allocate - rollback and try next path
for lp_id in allocated_lightpaths:
# Remove from request's lightpath_ids if added
if lp_id in request.lightpath_ids:
request.lightpath_ids.remove(lp_id)
network_state.release_lightpath(lp_id)
return None
# Include accumulated SNR values for Legacy compatibility
failed_snr = tuple(self._failed_attempt_snr_list) if hasattr(self, "_failed_attempt_snr_list") else ()
return AllocationResult(
success=True,
lightpaths_created=tuple(allocated_lightpaths),
is_sliced=len(allocated_lightpaths) > 1,
total_bandwidth_allocated_gbps=total_allocated,
spectrum_result=first_spectrum_result,
failed_attempt_snr_values=failed_snr,
path_index=path_index,
)
# Complete failure - rollback any allocated lightpaths
for lp_id in allocated_lightpaths:
network_state.release_lightpath(lp_id)
return None
def _handle_failure(
self,
request: Request,
groomed_lightpaths: list[int],
reason: BlockReason,
network_state: NetworkState,
path_index: int = 0,
) -> AllocationResult:
"""Handle allocation failure with grooming rollback (P3.2.f)."""
from fusion.domain.request import RequestStatus
from fusion.domain.results import AllocationResult
# Accept partial grooming if allowed
if groomed_lightpaths and self.config.can_partially_serve:
groomed_bw = self._sum_groomed_bandwidth(groomed_lightpaths, request, network_state)
# Only return success if we actually allocated some bandwidth
if groomed_bw > 0:
request.status = RequestStatus.PARTIALLY_GROOMED
# CRITICAL: Add groomed lightpath IDs so release can return bandwidth
request.lightpath_ids.extend(groomed_lightpaths)
return AllocationResult(
success=True,
lightpaths_groomed=tuple(groomed_lightpaths),
is_groomed=True,
is_partially_groomed=True,
total_bandwidth_allocated_gbps=groomed_bw,
path_index=path_index,
)
# Rollback grooming if any (P3.2.f)
if groomed_lightpaths and self.grooming:
logger.debug(f"Rolling back {len(groomed_lightpaths)} groomed LPs for request {request.request_id}")
self.grooming.rollback_groom(request, groomed_lightpaths, network_state)
request.status = RequestStatus.BLOCKED
request.block_reason = reason
return AllocationResult(success=False, block_reason=reason, path_index=path_index)
def _sum_groomed_bandwidth(
self,
lightpath_ids: list[int],
request: Request,
network_state: NetworkState,
) -> int:
"""Sum bandwidth allocated to request from groomed lightpaths."""
total = 0
for lp_id in lightpath_ids:
lp = network_state.get_lightpath(lp_id)
if lp and request.request_id in lp.request_allocations:
total += lp.request_allocations[request.request_id]
return total
def _combine_results(
self,
request: Request,
groomed_lightpaths: list[int],
alloc_result: AllocationResult,
route_result: RouteResult | None = None,
path_index: int = 0,
groomed_bandwidth_gbps: int = 0,
) -> AllocationResult:
"""Combine groomed and allocated results."""
from fusion.domain.request import RequestStatus
from fusion.domain.results import AllocationResult
if groomed_lightpaths:
request.status = RequestStatus.PARTIALLY_GROOMED
# CRITICAL: Add groomed lightpath IDs so release can return bandwidth
request.lightpath_ids.extend(groomed_lightpaths)
else:
request.status = RequestStatus.ALLOCATED
# Total bandwidth = groomed + newly allocated
total_bw = groomed_bandwidth_gbps + alloc_result.total_bandwidth_allocated_gbps
# Include failed attempt SNR values for Legacy compatibility
# With snr_accumulator, slicing appends directly to _failed_attempt_snr_list
# so we just use the orchestrator's list (which has all values)
failed_snr = tuple(self._failed_attempt_snr_list) if hasattr(self, "_failed_attempt_snr_list") else ()
return AllocationResult(
success=True,
lightpaths_created=alloc_result.lightpaths_created,
lightpaths_groomed=tuple(groomed_lightpaths),
is_groomed=len(groomed_lightpaths) > 0,
is_partially_groomed=(len(groomed_lightpaths) > 0 and len(alloc_result.lightpaths_created) > 0),
is_sliced=alloc_result.is_sliced,
total_bandwidth_allocated_gbps=total_bw,
path_index=path_index,
route_result=route_result,
spectrum_result=alloc_result.spectrum_result,
failed_attempt_snr_values=failed_snr,
)
[docs]
def handle_release(
self,
request: Request,
network_state: NetworkState,
) -> None:
"""
Handle request release.
Releases bandwidth from all lightpaths associated with the
request. If a lightpath has no remaining allocations, it
is fully released.
"""
from fusion.domain.request import RequestStatus
for lp_id in list(request.lightpath_ids):
lp = network_state.get_lightpath(lp_id)
if lp is None:
continue
# Release this request's bandwidth using release_bandwidth to track time_bw_usage
if request.request_id in lp.request_allocations:
lp.release_bandwidth(request.request_id, timestamp=request.depart_time)
# Handle protection: release backup if this is working
if hasattr(lp, "protection_lp_id") and lp.protection_lp_id:
backup = network_state.get_lightpath(lp.protection_lp_id)
if backup and not backup.request_allocations:
network_state.release_lightpath(lp.protection_lp_id)
# Release lightpath if no more allocations
if not lp.request_allocations:
network_state.release_lightpath(lp_id)
request.lightpath_ids.clear()
request.status = RequestStatus.RELEASED
@property
def policy(self) -> ControlPolicy | None:
"""Get the current control policy (None if not set)."""
return self._policy
@policy.setter
def policy(self, value: ControlPolicy | None) -> None:
"""Set the control policy."""
self._policy = value
@property
def rl_adapter(self) -> RLSimulationAdapter | None:
"""Get the RL adapter (None if not set)."""
return self._rl_adapter
@rl_adapter.setter
def rl_adapter(self, value: RLSimulationAdapter | None) -> None:
"""Set the RL adapter."""
self._rl_adapter = value
@property
def protection_pipeline(self) -> ProtectionPipeline | None:
"""Get the protection pipeline (None if not set)."""
return self._protection_pipeline
@protection_pipeline.setter
def protection_pipeline(self, value: ProtectionPipeline | None) -> None:
"""Set the protection pipeline."""
self._protection_pipeline = value
[docs]
def has_policy(self) -> bool:
"""Check if a policy is configured."""
return self._policy is not None
[docs]
def handle_arrival_with_policy(
self,
request: Request,
network_state: NetworkState,
) -> AllocationResult:
"""
Handle request arrival using the configured policy.
This method provides policy-driven path selection while maintaining
backward compatibility with handle_arrival(). It builds PathOption list
via RL adapter, calls policy.select_action() to get selected path,
validates action, and routes through standard allocation with forced path.
If no policy is configured, delegates to handle_arrival().
Protection pipeline is only used when protection_enabled is True in config,
request.protection_required is True, and protection_pipeline is set.
:param request: The incoming request to process
:type request: Request
:param network_state: Current network state (passed per call)
:type network_state: NetworkState
:return: AllocationResult with success/failure and details
:rtype: AllocationResult
"""
from fusion.domain.request import BlockReason
from fusion.domain.results import AllocationResult
# If no policy configured, delegate to standard handler
if self._policy is None:
return self.handle_arrival(request, network_state)
# Create RL adapter lazily if not set
if self._rl_adapter is None:
from fusion.modules.rl.adapter import RLSimulationAdapter
self._rl_adapter = RLSimulationAdapter(self)
# Log policy being used
policy_name = self._policy.get_name()
logger.debug(
"Request %s: Using policy %s",
request.request_id,
policy_name,
)
# Check for protected request with protection pipeline
if self.config.protection_enabled and getattr(request, "protection_required", False) and self._protection_pipeline is not None:
return self._handle_protected_with_policy(request, network_state)
# Build path options via adapter
options = self._rl_adapter.get_path_options(request, network_state)
if not options:
logger.debug(
"Request %s: No path options available",
request.request_id,
)
return AllocationResult(
success=False,
block_reason=BlockReason.NO_PATH,
)
# Call policy to select action
action = self._policy.select_action(request, options, network_state)
logger.debug(
"Request %s: Policy %s selected action %d (of %d options)",
request.request_id,
policy_name,
action,
len(options),
)
# Validate action
if action < 0 or action >= len(options):
logger.debug(
"Request %s: Invalid action %d, blocking",
request.request_id,
action,
)
return AllocationResult(
success=False,
block_reason=BlockReason.CONGESTION,
)
# Check feasibility
selected_option = options[action]
if not selected_option.is_feasible:
logger.debug(
"Request %s: Selected action %d is infeasible, blocking",
request.request_id,
action,
)
return AllocationResult(
success=False,
block_reason=BlockReason.CONGESTION,
)
# Route through standard handler with forced path
result = self.handle_arrival(
request,
network_state,
forced_path=list(selected_option.path),
forced_modulation=selected_option.modulation,
forced_path_index=action,
)
# TODO: Reward values are hardcoded. Should be configurable via config or
# computed based on metrics (e.g., bandwidth efficiency, path length, SNR margin).
# Update policy with outcome (for learning policies)
if result.success:
reward = 1.0
else:
reward = -1.0
logger.warning("Using hardcoded reward values (success=1.0, failure=-1.0) for policy update")
self._policy.update(request, action, reward)
return result
def _handle_protected_with_policy(
self,
request: Request,
network_state: NetworkState,
) -> AllocationResult:
"""
Handle protected request with policy selection.
When protection is enabled and a protection pipeline is configured,
finds disjoint path pairs, builds PathOptions with protection info,
calls policy to select among protected options, and allocates same
spectrum on both paths.
:param request: The protected request
:type request: Request
:param network_state: Current network state
:type network_state: NetworkState
:return: AllocationResult indicating success/failure
:rtype: AllocationResult
"""
from fusion.domain.request import BlockReason
from fusion.domain.results import AllocationResult
# Should only be called when protection_pipeline is set
if self._protection_pipeline is None:
return self._handle_protected_arrival(request, network_state)
# Get topology from network state
if not hasattr(network_state, "topology"):
logger.warning("NetworkState has no topology, falling back to standard protection")
return self._handle_protected_arrival(request, network_state)
topology = network_state.topology
# Find disjoint path pair
paths = self._protection_pipeline.find_protected_paths(topology, request.source, request.destination)
if paths is None:
logger.debug(
"Request %s: No disjoint paths found for protection",
request.request_id,
)
return AllocationResult(
success=False,
block_reason=BlockReason.PROTECTION_FAIL,
)
primary_path, backup_path = paths
# Verify disjointness
if not self._protection_pipeline.verify_disjointness(primary_path, backup_path):
logger.warning(
"Request %s: Path pair failed disjointness verification",
request.request_id,
)
return AllocationResult(
success=False,
block_reason=BlockReason.PROTECTION_FAIL,
)
# TODO: Slots calculation is hardcoded and incorrect. Should use modulation format,
# slot width (e.g., 12.5 GHz), and spectral efficiency to compute actual slots needed.
# This placeholder assumes 25 Gbps per slot which is not accurate.
slots_needed = request.bandwidth_gbps // 25 + 1
logger.warning("Using hardcoded slots calculation (bandwidth // 25 + 1) for protected allocation")
# Try to allocate protected spectrum
alloc_result = self._protection_pipeline.allocate_protected(
primary_path=primary_path,
backup_path=backup_path,
slots_needed=slots_needed,
network_state=network_state,
)
if not alloc_result.success:
logger.debug(
"Request %s: No common spectrum for protection: %s",
request.request_id,
alloc_result.failure_reason,
)
return AllocationResult(
success=False,
block_reason=BlockReason.PROTECTION_FAIL,
)
# Create protected lightpath
# Delegate to standard protected allocation for actual lightpath creation
return self._handle_protected_arrival(request, network_state)