Source code for fusion.core.simulation

"""
Simulation engine module for running optical network simulations.

This module provides the main simulation engine functionality for running
optical network simulations with support for ML/RL models and various metrics.
"""

from __future__ import annotations

import copy
import os
import signal
import time
from typing import TYPE_CHECKING, Any

import networkx as nx
import numpy as np

from fusion.core.metrics import SimStats
from fusion.core.ml_metrics import MLMetricsCollector
from fusion.core.persistence import StatsPersistence
from fusion.core.request import get_requests
from fusion.core.sdn_controller import SDNController
from fusion.modules.failures import FailureManager
from fusion.modules.ml import load_model
from fusion.reporting.dataset_logger import DatasetLogger
from fusion.reporting.simulation_reporter import SimulationReporter
from fusion.utils.logging_config import get_logger, log_message
from fusion.utils.os import create_directory

if TYPE_CHECKING:
    from fusion.core.orchestrator import SDNOrchestrator
    from fusion.domain.config import SimulationConfig
    from fusion.domain.network_state import NetworkState
    from fusion.domain.request import Request
    from fusion.domain.results import AllocationResult

logger = get_logger(__name__)

ENV_VAR_USE_ORCHESTRATOR = "FUSION_USE_ORCHESTRATOR"
DEFAULT_USE_ORCHESTRATOR = False

# These combinations are invalid when use_orchestrator=True
INVALID_ORCHESTRATOR_COMBINATIONS = [
    ("protection_enabled", "slicing_enabled"),  # Protection + Slicing not supported yet
]


def _resolve_use_orchestrator(engine_props: dict[str, Any]) -> bool:
    """
    Resolve the use_orchestrator feature flag.

    Priority: (1) env var FUSION_USE_ORCHESTRATOR, (2) engine_props, (3) default False.

    :param engine_props: Engine configuration dictionary
    :type engine_props: dict[str, Any]
    :return: True if orchestrator path should be used, False for legacy path
    :rtype: bool
    """
    # Priority 1: Environment variable
    env_value = os.environ.get(ENV_VAR_USE_ORCHESTRATOR)
    if env_value is not None:
        resolved = env_value.lower() in ("true", "1", "yes", "on")
        logger.debug(
            "use_orchestrator resolved from env var %s=%s -> %s",
            ENV_VAR_USE_ORCHESTRATOR,
            env_value,
            resolved,
        )
        return resolved

    # Priority 2: engine_props
    if "use_orchestrator" in engine_props:
        resolved = bool(engine_props["use_orchestrator"])
        logger.debug(
            "use_orchestrator resolved from engine_props -> %s",
            resolved,
        )
        return resolved

    # Priority 3: Default
    logger.debug(
        "use_orchestrator using default -> %s",
        DEFAULT_USE_ORCHESTRATOR,
    )
    return DEFAULT_USE_ORCHESTRATOR


def _validate_orchestrator_config(engine_props: dict[str, Any]) -> None:
    """
    Validate configuration for orchestrator mode.

    Checks for invalid feature flag combinations when use_orchestrator=True.
    Logs warnings for suboptimal but allowed configurations.

    :param engine_props: Engine configuration dictionary
    :type engine_props: dict[str, Any]
    :raises ValueError: If invalid combination detected
    """
    # Extract feature flags
    protection_enabled = engine_props.get("route_method") == "1plus1_protection"
    slicing_enabled = engine_props.get("max_segments", 1) > 1
    grooming_enabled = engine_props.get("is_grooming_enabled", False)
    snr_enabled = engine_props.get("snr_type") is not None
    snr_recheck = engine_props.get("snr_recheck", False)
    node_disjoint = engine_props.get("node_disjoint_protection", False)

    errors: list[str] = []
    warnings: list[str] = []

    if protection_enabled and slicing_enabled:
        errors.append("protection_enabled and slicing_enabled cannot both be True. Protection requires single lightpath per request.")

    # ERROR: node_disjoint without protection
    if node_disjoint and not protection_enabled:
        errors.append("node_disjoint_protection requires protection_enabled=True (route_method='1plus1_protection')")

    # WARNING: slicing without grooming (P3.3.e - Combination 1)
    if slicing_enabled and not grooming_enabled:
        warnings.append("slicing_enabled without grooming_enabled may cause inefficient allocations (slices may use different paths)")

    # WARNING: congestion check (snr_recheck) without SNR (P3.3.e - Combination 3)
    if snr_recheck and not snr_enabled:
        warnings.append("snr_recheck has no effect without snr_enabled (snr_type must be set)")

    # Log warnings
    for warning in warnings:
        logger.warning("Config warning: %s", warning)

    # Raise on errors
    if errors:
        error_msg = "Invalid orchestrator configuration:\n" + "\n".join(f"  - {e}" for e in errors)
        raise ValueError(error_msg)

    logger.debug("Orchestrator configuration validation passed")


[docs] def seed_request_generation(seed: int) -> None: """ Seed random number generators used for request generation. This function seeds ONLY NumPy, which is used for generating traffic patterns (arrivals, departures, bandwidth selection, etc.). This allows request generation to vary per iteration while keeping RL/ML models deterministic across iterations. :param seed: Random seed (integer) :type seed: int """ logger.debug("Seeding request generation (NumPy) with seed=%d", seed) np.random.seed(seed)
[docs] def seed_rl_components(seed: int) -> None: """ Seed random number generators used for RL/ML components. This function seeds Python's random module and PyTorch (CPU and CUDA), which are used by RL agents and ML models. This allows RL components to remain deterministic even when request generation seeds vary. Also sets PyTorch to deterministic mode to prevent non-deterministic operations in neural network computations. :param seed: Random seed (integer) :type seed: int """ logger.debug("Seeding RL components (random, PyTorch) with seed=%d", seed) # Seed Python's random module import random random.seed(seed) # Seed PyTorch (if available) try: import torch # Check if torch is properly loaded (not broken by architecture mismatch) if not hasattr(torch, "manual_seed"): # Torch imported but is broken (e.g., architecture mismatch) logger.debug("PyTorch imported but broken, skipping PyTorch seeding") pass else: torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) # Enforce deterministic behavior torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False # Use deterministic algorithms where possible torch.use_deterministic_algorithms(True, warn_only=True) except (ImportError, AttributeError, OSError): # PyTorch not installed or broken, skip logger.debug("PyTorch not available, skipping PyTorch seeding") pass
[docs] def seed_all_rngs(seed: int) -> None: """ Seed all random number generators for reproducibility. Seeds: - Python's built-in random module - NumPy's random state - PyTorch's random state (CPU and CUDA) Also sets PyTorch to deterministic mode to prevent non-deterministic operations. .. note:: For finer control, use :func:`seed_request_generation` and :func:`seed_rl_components` separately. This allows different seeding strategies for traffic generation vs RL/ML models. :param seed: Random seed (integer) :type seed: int """ logger.debug("Seeding all RNGs with seed=%d", seed) seed_request_generation(seed) seed_rl_components(seed)
[docs] def generate_seed_from_time() -> int: """ Generate a seed from current time. Used when no seed is explicitly provided. :return: Seed value :rtype: int """ return int(time.time() * 1000) % (2**31 - 1)
[docs] def validate_seed(seed: int) -> int: """ Validate and normalize seed value. Ensures seed is in valid range for all RNGs. :param seed: Seed value :type seed: int :return: Validated seed :rtype: int :raises ValueError: If seed is out of valid range """ if seed < 0: raise ValueError(f"Seed must be non-negative, got {seed}") if seed > 2**31 - 1: raise ValueError(f"Seed must be < 2^31, got {seed}") return seed
[docs] class SimulationEngine: """ Controls a single simulation. This class manages the execution of a single optical network simulation, including topology creation, request processing, and statistics collection. :param engine_props: Engine configuration properties :type engine_props: dict[str, Any] """
[docs] def __init__(self, engine_props: dict[str, Any]) -> None: self.engine_props = engine_props self.network_spectrum_dict: dict[tuple[Any, Any], dict[str, Any]] = {} self.reqs_dict: dict[tuple[int, float], dict[str, Any]] | None = None self.reqs_status_dict: dict[int, dict[str, Any]] = {} self.iteration = 0 self.topology = nx.Graph() self.sim_info = os.path.join( self.engine_props["network"], self.engine_props["date"], self.engine_props["sim_start"], ) self.use_orchestrator = _resolve_use_orchestrator(engine_props) if self.use_orchestrator: _validate_orchestrator_config(engine_props) self._sim_config: SimulationConfig | None = None self._network_state: NetworkState | None = None self._orchestrator: SDNOrchestrator | None = None self._v5_requests: dict[tuple[int, float], Request] | None = None # ===================================================================== # Legacy Path Initialization (always needed for backwards compatibility) # ===================================================================== self.sdn_obj = SDNController(engine_props=self.engine_props) # FailureManager reference will be set after topology creation self.sdn_obj.failure_manager = None self.stats_obj = SimStats(engine_props=self.engine_props, sim_info=self.sim_info) self.reporter = SimulationReporter(logger=logger) self.persistence = StatsPersistence(engine_props=self.engine_props, sim_info=self.sim_info) # Initialize ML metrics collector if needed self.ml_metrics: MLMetricsCollector | None = ( MLMetricsCollector(engine_props=self.engine_props, sim_info=self.sim_info) if engine_props.get("output_train_data", False) else None ) # Initialize dataset logger if enabled self.dataset_logger: DatasetLogger | None = None if engine_props.get("log_offline_dataset", False): # Build path matching output structure exactly: # data/training_data/{network}/{date}/{sim_start}/{thread}/ # dataset_erlang_{erlang}.jsonl erlang_value = int(engine_props["erlang"]) dataset_dir = os.path.join( "data", "training_data", self.sim_info, engine_props.get("thread_num", "s1"), ) create_directory(dataset_dir) # Include erlang value in filename dataset_filename = f"dataset_erlang_{erlang_value}.jsonl" output_path = os.path.join(dataset_dir, dataset_filename) self.dataset_logger = DatasetLogger(output_path, engine_props) logger.info(f"Dataset logging enabled: {output_path}") self.ml_model = None self.stop_flag = engine_props.get("stop_flag") self.grooming_stats: dict[str, Any] = {} # Validate grooming configuration self._validate_grooming_config() # Initialize FailureManager (will be set up after topology is created) self.failure_manager: FailureManager | None = None # Log mode selection if self.use_orchestrator: logger.info("SimulationEngine initialized in ORCHESTRATOR mode (v5)") else: logger.debug("SimulationEngine initialized in LEGACY mode")
[docs] def update_arrival_params(self, current_time: tuple[int, float]) -> None: """ Update parameters for a request after attempted allocation. :param current_time: The current simulated time as (iteration, time) tuple :type current_time: tuple[int, float] """ if self.reqs_dict is None or current_time not in self.reqs_dict: return sdn_props = self.sdn_obj.sdn_props self.stats_obj.iter_update( req_data=self.reqs_dict[current_time], sdn_data=sdn_props, network_spectrum_dict=self.network_spectrum_dict, ) # Track grooming outcomes if enabled if self.engine_props.get("is_grooming_enabled", False): if not self.grooming_stats: self.grooming_stats = { "fully_groomed": 0, "partially_groomed": 0, "not_groomed": 0, "lightpaths_created": 0, "lightpaths_released": 0, "avg_lightpath_utilization": [], } was_groomed = hasattr(sdn_props, "was_groomed") and sdn_props.was_groomed was_partially = hasattr(sdn_props, "was_partially_groomed") and sdn_props.was_partially_groomed if was_groomed: self.grooming_stats["fully_groomed"] += 1 elif was_partially: self.grooming_stats["partially_groomed"] += 1 else: self.grooming_stats["not_groomed"] += 1 # Track new lightpaths has_new_lp = hasattr(sdn_props, "was_new_lp_established") if has_new_lp and sdn_props.was_new_lp_established: self.grooming_stats["lightpaths_created"] += len(sdn_props.was_new_lp_established) if sdn_props.was_routed: if sdn_props.number_of_transponders is not None: self.stats_obj.current_transponders = sdn_props.number_of_transponders self.reqs_status_dict.update( { self.reqs_dict[current_time]["req_id"]: { "mod_format": sdn_props.modulation_list, "path": sdn_props.path_list, "is_sliced": sdn_props.is_sliced, "was_routed": sdn_props.was_routed, "core_list": sdn_props.core_list, "band": sdn_props.band_list, "start_slot_list": sdn_props.start_slot_list, "end_slot_list": sdn_props.end_slot_list, "bandwidth_list": sdn_props.bandwidth_list, "snr_cost": sdn_props.snr_list if self.engine_props.get("snr_type") != "None" else None, "xt_cost": sdn_props.xt_list if self.engine_props.get("snr_type") != "None" else None, "lightpath_id_list": sdn_props.lightpath_id_list, "lightpath_bandwidth_list": sdn_props.lightpath_bandwidth_list, "was_new_lp_established": sdn_props.was_new_lp_established, } } )
[docs] def handle_arrival( self, current_time: tuple[int, float], force_route_matrix: list[Any] | None = None, force_core: int | None = None, force_slicing: bool = False, forced_index: int | None = None, force_mod_format: str | None = None, ) -> None: """ Update the SDN controller to handle an arrival request. In orchestrator mode, delegates to _handle_arrival_orchestrator. In legacy mode, uses SDNController. :param current_time: The arrival time of the request :type current_time: float :param force_route_matrix: Passes forced routes to the SDN controller :type force_route_matrix: Optional[List[Any]] :param force_core: Force a certain core for allocation :type force_core: Optional[int] :param force_slicing: Forces slicing in the SDN controller :type force_slicing: bool :param forced_index: Forces an index in the SDN controller :type forced_index: Optional[int] :param force_mod_format: Forces a modulation format :type force_mod_format: Optional[str] """ if self.reqs_dict is None or current_time not in self.reqs_dict: return if self.use_orchestrator: self._handle_arrival_orchestrator(current_time, force_route_matrix) return # ===================================================================== # Legacy Path # ===================================================================== for request_key, request_value in self.reqs_dict[current_time].items(): if request_key == "mod_formats": request_key = "modulation_formats_dict" elif request_key == "req_id": request_key = "request_id" self.sdn_obj.sdn_props.update_params( key=request_key, spectrum_key=None, spectrum_obj=None, value=request_value, ) self.sdn_obj.handle_event( self.reqs_dict[current_time], request_type="arrival", force_route_matrix=force_route_matrix, force_slicing=force_slicing, forced_index=forced_index, force_core=force_core, ml_model=self.ml_model, force_mod_format=force_mod_format, ) if self.sdn_obj.sdn_props.network_spectrum_dict is not None: self.network_spectrum_dict = self.sdn_obj.sdn_props.network_spectrum_dict self.update_arrival_params(current_time=current_time) # Log dataset transition if enabled self._log_dataset_transition(current_time=current_time)
def _handle_arrival_orchestrator( self, current_time: tuple[int, float], forced_path: list[str] | None = None, ) -> None: """ Handle arrival using v5 orchestrator path. Converts legacy request dict to v5 Request domain object, calls orchestrator.handle_arrival(), and updates stats from result. :param current_time: Tuple of (request_id, time) :type current_time: tuple[int, float] :param forced_path: Optional forced path for allocation :type forced_path: list[str] | None """ if self.reqs_dict is None or current_time not in self.reqs_dict: return if self._orchestrator is None or self._network_state is None: logger.error("Orchestrator not initialized, falling back to legacy") return # Get or create Request from legacy dict request = self._get_or_create_v5_request(current_time) if request is None: return # Call orchestrator result = self._orchestrator.handle_arrival( request, self._network_state, forced_path=forced_path, ) # Update stats from result self._update_stats_from_result(current_time, request, result) # Sync network state back to legacy dict for compatibility if self._network_state is not None: self.network_spectrum_dict = self._network_state.network_spectrum_dict # TODO: Rename "v5" references to something clearer. The "v5" naming is a legacy # artifact from internal versioning and is confusing. Consider renaming to # "_get_or_create_request" and "_requests_cache" (drop the "v5" prefix entirely). def _get_or_create_v5_request(self, current_time: tuple[int, float]) -> Request | None: """ Get or create a Request domain object from legacy request dict. :param current_time: Tuple of (request_id, time) :type current_time: tuple[int, float] :return: Request object or None if request not found :rtype: Request | None """ from fusion.domain.request import Request if self.reqs_dict is None or current_time not in self.reqs_dict: return None # TODO: Rename _v5_requests to _requests_cache (see TODO above) if self._v5_requests is None: self._v5_requests = {} # Check cache first if current_time in self._v5_requests: return self._v5_requests[current_time] # Create new Request from legacy dict legacy_dict = self.reqs_dict[current_time] request = Request.from_legacy_dict(current_time, legacy_dict) # Cache it self._v5_requests[current_time] = request return request def _update_stats_from_result( self, current_time: tuple[int, float], request: Request, result: AllocationResult, ) -> None: """ Update statistics from orchestrator AllocationResult. Uses the record_arrival method for stats tracking, which handles SNR_RECHECK_FAIL support and rollback-aware utilization tracking. :param current_time: Request time key :type current_time: tuple[int, float] :param request: The v5 Request that was processed :type request: Request :param result: AllocationResult from orchestrator :type result: AllocationResult """ if self.reqs_dict is None or current_time not in self.reqs_dict: return self.stats_obj.record_arrival( request=request, result=result, network_state=self._network_state, was_rollback=False, # Will be set by orchestrator if rollback occurred ) if result.success: # Track in reqs_status_dict for release handling self.reqs_status_dict[request.request_id] = { "mod_format": list(result.modulations) if result.modulations else [], "path": self._get_paths_from_result(result), "is_sliced": result.is_sliced, "was_routed": True, "core_list": list(result.cores) if result.cores else [], "band": list(result.bands) if result.bands else [], "start_slot_list": list(result.start_slots) if result.start_slots else [], "end_slot_list": list(result.end_slots) if result.end_slots else [], "bandwidth_list": list(result.bandwidth_allocations) if result.bandwidth_allocations else [request.bandwidth_gbps], "lightpath_id_list": list(result.all_lightpath_ids) if result.all_lightpath_ids else [], "lightpath_bandwidth_list": list(result.lightpath_bandwidths) if result.lightpath_bandwidths else [], "was_new_lp_established": list(result.lightpaths_created) if result.lightpaths_created else [], } # Track grooming if applicable if result.is_groomed or result.is_partially_groomed: if self.grooming_stats: if result.is_groomed and not result.is_partially_groomed: self.grooming_stats["fully_groomed"] += 1 elif result.is_partially_groomed: self.grooming_stats["partially_groomed"] += 1 else: # For blocked requests, grooming stats tracking if self.grooming_stats: self.grooming_stats["not_groomed"] += 1 def _get_paths_from_result(self, result: AllocationResult) -> list[list[str]]: """ Extract paths from AllocationResult lightpath IDs. :param result: The allocation result :type result: AllocationResult :return: List of paths (each path is a list of node IDs) :rtype: list[list[str]] """ paths: list[list[str]] = [] if not result.all_lightpath_ids or self._network_state is None: return paths for lp_id in result.all_lightpath_ids: lp = self._network_state.get_lightpath(lp_id) if lp and lp.path: paths.append(list(lp.path)) return paths
[docs] def handle_release(self, current_time: tuple[int, float]) -> None: """ Update the SDN controller to handle the release of a request. In orchestrator mode (v5), delegates to _handle_release_orchestrator. In legacy mode, uses SDNController. :param current_time: The arrival time of the request :type current_time: float """ if self.reqs_dict is None or current_time not in self.reqs_dict: return if self.use_orchestrator: self._handle_release_orchestrator(current_time) return # ===================================================================== # Legacy Path # ===================================================================== for request_key, request_value in self.reqs_dict[current_time].items(): if request_key == "mod_formats": request_key = "modulation_formats_dict" elif request_key == "req_id": request_key = "request_id" self.sdn_obj.sdn_props.update_params( key=request_key, spectrum_key=None, spectrum_obj=None, value=request_value, ) if ( self.reqs_dict is not None and current_time in self.reqs_dict and self.reqs_dict[current_time]["req_id"] in self.reqs_status_dict ): # Restore request state from reqs_status_dict req_id = self.reqs_dict[current_time]["req_id"] req_status = self.reqs_status_dict[req_id] self.sdn_obj.sdn_props.path_list = req_status["path"] self.sdn_obj.sdn_props.lightpath_id_list = req_status.get("lightpath_id_list", []) self.sdn_obj.sdn_props.lightpath_bandwidth_list = req_status.get("lightpath_bandwidth_list", []) self.sdn_obj.sdn_props.was_new_lp_established = req_status.get("was_new_lp_established", []) # Initialize dict before handle_event so sdn_controller can populate it if self.sdn_obj.sdn_props.lp_bw_utilization_dict is None: self.sdn_obj.sdn_props.lp_bw_utilization_dict = {} self.sdn_obj.handle_event(self.reqs_dict[current_time], request_type="release") # Update lightpath bandwidth utilization statistics # For non-grooming cases, populate utilization from reqs_status_dict if not already done if not self.engine_props.get("is_grooming_enabled", False): # Check if sdn_controller already populated utilization (e.g., via dynamic slicing) if self.sdn_obj.sdn_props.lp_bw_utilization_dict is None or len(self.sdn_obj.sdn_props.lp_bw_utilization_dict) == 0: # Only use 100% fallback if dict is empty (not already calculated) lightpath_ids = req_status.get("lightpath_id_list", []) bandwidth_list = req_status.get("bandwidth_list", []) core_list = req_status.get("core_list", []) band_list = req_status.get("band", []) # Fallback: use request bandwidth if bandwidth_list is not available if not bandwidth_list or all(bw is None for bw in bandwidth_list): req_bandwidth = self.reqs_dict[current_time].get("bandwidth") bandwidth_list = [req_bandwidth] * len(lightpath_ids) self.sdn_obj.sdn_props.lp_bw_utilization_dict = {} # For non-grooming non-slicing, each lightpath is 100% utilized for i, lp_id in enumerate(lightpath_ids): if i < len(bandwidth_list) and i < len(core_list) and i < len(band_list): self.sdn_obj.sdn_props.lp_bw_utilization_dict[lp_id] = { "band": band_list[i], "core": core_list[i], "bit_rate": bandwidth_list[i], "utilization": 100.0, # 100% for non-grooming non-slicing } if self.sdn_obj.sdn_props.lp_bw_utilization_dict is not None and len(self.sdn_obj.sdn_props.lp_bw_utilization_dict) > 0: self.stats_obj.update_utilization_dict(self.sdn_obj.sdn_props.lp_bw_utilization_dict) # Clear the dict after aggregation to prevent duplicate counting self.sdn_obj.sdn_props.lp_bw_utilization_dict = {} sdn_spectrum_dict = self.sdn_obj.sdn_props.network_spectrum_dict if sdn_spectrum_dict is not None: self.network_spectrum_dict = sdn_spectrum_dict
# Request was blocked, nothing to release def _handle_release_orchestrator( self, current_time: tuple[int, float], ) -> None: """ Handle release using orchestrator path. Retrieves the Request from cache and calls orchestrator.handle_release(). :param current_time: Tuple of (request_id, time) :type current_time: tuple[int, float] """ if self.reqs_dict is None or current_time not in self.reqs_dict: return if self._orchestrator is None or self._network_state is None: logger.error("Orchestrator not initialized for release") return # Get the arrival time key for this request req_id = self.reqs_dict[current_time]["req_id"] # Check if request was routed (exists in reqs_status_dict) if req_id not in self.reqs_status_dict: # Request was blocked, nothing to release return # Find the corresponding Request object in cache request = None if self._v5_requests: for _time_key, cached_req in self._v5_requests.items(): if cached_req.request_id == req_id: request = cached_req break if request is None: logger.warning("Request %d not found in v5 cache for release", req_id) return # Collect utilization stats before release (need LP objects to exist) req_status = self.reqs_status_dict[req_id] lightpath_ids = req_status.get("lightpath_id_list", []) # Get departure time for time-weighted average calculation departure_time = self.reqs_dict[current_time].get("depart", current_time[1]) # Import average_bandwidth_usage for time-weighted calculation from fusion.utils.network import average_bandwidth_usage # Collect utilization data BEFORE release (LP objects still exist) pre_release_util: dict[int, dict[str, Any]] = {} for lp_id in lightpath_ids: lp = self._network_state.get_lightpath(lp_id) if lp: # Calculate time-weighted average utilization using history # Don't add entry at departure_time here - release_bandwidth() handles that if lp.time_bw_usage: utilization = average_bandwidth_usage(lp.time_bw_usage, departure_time) else: # Fallback to point-in-time if no history utilization = lp.utilization * 100.0 pre_release_util[lp_id] = { "band": lp.band, "core": lp.core, "bit_rate": lp.total_bandwidth_gbps, "utilization": utilization, } # Call orchestrator release self._orchestrator.handle_release(request, self._network_state) # Only record utilization for LPs that are fully released (match legacy behavior) # Legacy only records utilization when LP has no remaining users utilization_dict: dict[int, dict[str, Any]] = {} for lp_id in lightpath_ids: lp_after = self._network_state.get_lightpath(lp_id) is_fully_released = lp_after is None if is_fully_released and lp_id in pre_release_util: utilization_dict[lp_id] = pre_release_util[lp_id] # Update utilization stats if utilization_dict: self.stats_obj.update_utilization_dict(utilization_dict) # Sync network state back to legacy dict for compatibility self.network_spectrum_dict = self._network_state.network_spectrum_dict # Clean up from reqs_status_dict del self.reqs_status_dict[req_id] def _log_dataset_transition(self, current_time: tuple[int, float]) -> None: """ Log a dataset transition for offline RL training. :param current_time: The arrival time as (iteration, time) tuple :type current_time: tuple[int, float] """ if not self.dataset_logger or self.reqs_dict is None: return if current_time not in self.reqs_dict: return request = self.reqs_dict[current_time] sdn_props = self.sdn_obj.sdn_props route_props = self.sdn_obj.route_obj.route_props # Extract k-paths from routing k_paths = route_props.paths_matrix if route_props.paths_matrix else [] # Build state dict with available information state = { "src": request.get("source", sdn_props.source), "dst": request.get("destination", sdn_props.destination), "slots_needed": request.get("slots_needed", 0), "bandwidth": request.get("bandwidth", 0), "k_paths": k_paths, "num_paths": len(k_paths), } # Determine selected path index and action mask # Simple approach: mark all paths as feasible initially action_mask = [True] * len(k_paths) if k_paths else [False] if sdn_props.was_routed and sdn_props.path_list: # Find which path was selected by matching path_list selected_path_index = -1 for idx, path in enumerate(k_paths): if path == sdn_props.path_list: selected_path_index = idx break action = selected_path_index else: # Request was blocked action = -1 # Mark all paths as infeasible since routing failed action_mask = [False] * len(action_mask) # TODO: Reward values are hardcoded. Should be configurable via config or # computed based on metrics (e.g., bandwidth efficiency, path length, SNR margin). reward = 1.0 if sdn_props.was_routed else -1.0 # Build metadata meta = { "request_id": request.get("req_id", -1), "arrival_time": current_time, "erlang": self.engine_props["erlang"], "iteration": self.iteration, "decision_time_ms": (sdn_props.route_time * 1000) if hasattr(sdn_props, "route_time") and sdn_props.route_time else 0.0, } # Log the transition self.dataset_logger.log_transition( state=state, action=action, reward=reward, next_state=None, action_mask=action_mask, meta=meta, )
[docs] def create_topology(self) -> None: """ Create the physical topology of the simulation. In orchestrator mode, also initializes: - SimulationConfig from engine_props - NetworkState with topology - SDNOrchestrator with pipelines """ self.network_spectrum_dict = {} self.topology.add_nodes_from(self.engine_props["topology_info"]["nodes"]) self.engine_props["band_list"] = [] for band in ["c", "l", "s", "o", "e"]: try: if self.engine_props[f"{band}_band"]: self.engine_props["band_list"].append(band) except KeyError: continue for link_num, link_data in self.engine_props["topology_info"]["links"].items(): source = link_data["source"] dest = link_data["destination"] cores_matrix: dict[str, np.ndarray] = {} for band in self.engine_props["band_list"]: band_slots = self.engine_props[f"{band}_band"] cores_matrix[band] = np.zeros((link_data["fiber"]["num_cores"], band_slots)) self.network_spectrum_dict[(source, dest)] = { "cores_matrix": cores_matrix, "link_num": int(link_num), "usage_count": 0, "throughput": 0, } self.network_spectrum_dict[(dest, source)] = { "cores_matrix": cores_matrix, "link_num": int(link_num), "usage_count": 0, "throughput": 0, } self.topology.add_edge(source, dest, length=link_data["length"], nli_cost=None) self.engine_props["topology"] = self.topology self.stats_obj.topology = self.topology self.sdn_obj.sdn_props.network_spectrum_dict = self.network_spectrum_dict self.sdn_obj.sdn_props.topology = self.topology if self.use_orchestrator: self._init_orchestrator_path()
def _init_orchestrator_path(self) -> None: """ Initialize orchestrator path components. Creates: - SimulationConfig from engine_props - NetworkState with current topology - PipelineSet via PipelineFactory - SDNOrchestrator with pipelines Called from create_topology() when use_orchestrator=True. """ from fusion.core.pipeline_factory import PipelineFactory from fusion.domain.config import SimulationConfig from fusion.domain.network_state import NetworkState # Create SimulationConfig from engine_props self._sim_config = SimulationConfig.from_engine_props(self.engine_props) logger.debug( "Created SimulationConfig: network=%s, k_paths=%d, grooming=%s", self._sim_config.network_name, self._sim_config.k_paths, self._sim_config.grooming_enabled, ) # Create NetworkState with topology self._network_state = NetworkState(self.topology, self._sim_config) logger.debug( "Created NetworkState: nodes=%d, links=%d", self._network_state.node_count, self._network_state.link_count, ) # Create orchestrator with pipelines via factory self._orchestrator = PipelineFactory.create_orchestrator(self._sim_config) logger.info( "Orchestrator path initialized: %s", type(self._orchestrator).__name__, )
[docs] def generate_requests(self, seed: int) -> None: """ Call the request generator to generate requests. :param seed: The seed to use for the random generation :type seed: int """ self.reqs_dict = get_requests(seed=seed, engine_props=self.engine_props) # Sort by time (second element of tuple key) to match v5 behavior self.reqs_dict = dict(sorted(self.reqs_dict.items(), key=lambda x: x[0][1]))
[docs] def handle_request(self, current_time: tuple[int, float], request_number: int) -> None: """ Carry out arrival or departure functions for a given request. :param current_time: The current simulated time :type current_time: float :param request_number: The request number :type request_number: int """ if self.reqs_dict is None or current_time not in self.reqs_dict: return request_type = self.reqs_dict[current_time]["request_type"] if request_type == "arrival": old_network_spectrum_dict = copy.deepcopy(self.network_spectrum_dict) old_request_info_dict = copy.deepcopy(self.reqs_dict[current_time]) self.handle_arrival(current_time=current_time) if self.engine_props["save_snapshots"] and request_number % self.engine_props["snapshot_step"] == 0: self.stats_obj.update_snapshot( network_spectrum_dict=self.network_spectrum_dict, request_number=request_number, ) if self.engine_props["output_train_data"]: was_routed = self.sdn_obj.sdn_props.was_routed if was_routed: if self.reqs_dict is not None and current_time in self.reqs_dict: request_info_dict = self.reqs_status_dict[self.reqs_dict[current_time]["req_id"]] if self.ml_metrics: self.ml_metrics.update_train_data( old_request_info_dict=old_request_info_dict, request_info_dict=request_info_dict, network_spectrum_dict=old_network_spectrum_dict, current_transponders=self.stats_obj.current_transponders, ) elif request_type == "release": self.handle_release(current_time=current_time) else: raise NotImplementedError(f"Request type unrecognized. Expected arrival or release, got: {request_type}")
[docs] def reset(self) -> None: """ Reset simulation state for new iteration. Clears all tracking dictionaries and counters to prepare for a fresh simulation run. In orchestrator mode, also resets NetworkState. """ # Reset network spectrum for link_key in self.network_spectrum_dict: self.network_spectrum_dict[link_key]["usage_count"] = 0 self.network_spectrum_dict[link_key]["throughput"] = 0 # Reset request tracking self.reqs_status_dict = {} if self.use_orchestrator: # Clear v5 request cache self._v5_requests = {} # Re-create NetworkState with fresh spectrum (if topology exists) if self._sim_config is not None and self.topology.number_of_nodes() > 0: from fusion.domain.network_state import NetworkState self._network_state = NetworkState(self.topology, self._sim_config) logger.debug( "Reset NetworkState for new iteration: nodes=%d, links=%d", self._network_state.node_count, self._network_state.link_count, ) # ===================================================================== # Legacy Path Reset # ===================================================================== # Initialize lightpath tracking on first iteration only (must persist across iterations) if self.sdn_obj.sdn_props.lightpath_status_dict is None: self.sdn_obj.sdn_props.lightpath_status_dict = {} if self.sdn_obj.sdn_props.lp_bw_utilization_dict is None: self.sdn_obj.sdn_props.lp_bw_utilization_dict = {} # Reset lightpath ID counter each iteration self.sdn_obj.sdn_props.reset_lightpath_id_counter() # Reset grooming structures if enabled if self.engine_props.get("is_grooming_enabled", False): if hasattr(self.sdn_obj, "grooming_obj"): self.sdn_obj.grooming_obj.grooming_props.lightpath_status_dict = {} logger.debug("Reset grooming structures for new iteration")
[docs] def end_iter(self, iteration: int, print_flag: bool = True, base_file_path: str | None = None) -> bool: """ Update iteration statistics. :param iteration: The current iteration :type iteration: int :param print_flag: Whether to print or not :type print_flag: bool :param base_file_path: The base file path to save output statistics :type base_file_path: Optional[str] :return: Whether confidence interval has been reached :rtype: bool """ self.stats_obj.calculate_blocking_statistics() self.stats_obj.finalize_iteration_statistics() # Collect grooming statistics if enabled if self.engine_props.get("is_grooming_enabled", False): self._collect_grooming_stats() # Some form of ML/RL is being used, ignore confidence intervals # for training and testing if not self.engine_props["is_training"]: resp = bool(self.stats_obj.calculate_confidence_interval()) else: resp = False if (iteration + 1) % self.engine_props["print_step"] == 0 or iteration == 0 or (iteration + 1) == self.engine_props["max_iters"]: # Use the reporter for output instead of metrics class if hasattr(self, "reporter"): self.reporter.report_iteration_stats( iteration=iteration, max_iterations=self.engine_props["max_iters"], erlang=self.engine_props["erlang"], blocking_list=self.stats_obj.stats_props.simulation_blocking_list, print_flag=print_flag, ) # Always save on first and last iteration, plus every save_step is_first_iter = iteration == 0 is_last_iter = (iteration + 1) == self.engine_props["max_iters"] is_save_step = (iteration + 1) % self.engine_props["save_step"] == 0 if is_first_iter or is_last_iter or is_save_step: self._save_all_stats(base_file_path or "data") return resp
[docs] def init_iter( self, iteration: int, seed: int | None = None, print_flag: bool = True, trial: int | None = None, ) -> None: """ Initialize an iteration. Seeds all random number generators (Python random, NumPy, PyTorch) to ensure reproducible results across iterations. :param iteration: The current iteration number :type iteration: int :param seed: The seed to use for the random generation :type seed: Optional[int] :param print_flag: Flag to determine printing iter info :type print_flag: bool :param trial: The trial number :type trial: Optional[int] """ if trial is not None: self.engine_props["thread_num"] = f"s{trial + 1}" self.iteration = iteration self.engine_props["current_iteration"] = iteration # Reset state for new iteration (matches v5 pattern) self.reset() # Set iteration on orchestrator for debug purposes if self.use_orchestrator and self._orchestrator is not None: self._orchestrator.current_iteration = iteration self.stats_obj.iteration = iteration self.stats_obj.init_iter_stats() for link_key in self.network_spectrum_dict: self.network_spectrum_dict[link_key]["usage_count"] = 0 self.network_spectrum_dict[link_key]["throughput"] = 0 # Initialize transponder usage per node if enabled if self.engine_props.get("transponder_usage_per_node", False): self._init_transponder_usage() # To prevent incomplete saves try: signal.signal(signal.SIGINT, self._signal_save_handler) signal.signal(signal.SIGTERM, self._signal_save_handler) except ValueError: # Signal only works in the main thread pass if iteration == 0 and print_flag: logger.info( "Simulation started for Erlang: %s simulation number: %s", self.engine_props["erlang"], self.engine_props["thread_num"], ) if self.engine_props["deploy_model"]: self.ml_model = load_model(engine_properties=self.engine_props) # Request generation seed (typically varies per iteration for diverse traffic) if seed is not None: # Explicit seed parameter overrides everything request_seed = seed elif self.engine_props.get("request_seeds"): # Use explicit request_seeds list (one seed per iteration) request_seed = self.engine_props["request_seeds"][iteration] logger.info( "Using request_seed=%d from request_seeds list (iteration=%d)", request_seed, iteration, ) elif self.engine_props.get("seeds"): # Backwards compatibility: use seeds list (deprecated, use request_seeds) request_seed = self.engine_props["seeds"][iteration] logger.info( "Using request_seed=%d from seeds list (iteration=%d)", request_seed, iteration, ) else: # Default: iteration + 1 (varies per iteration) request_seed = iteration + 1 logger.debug("Using default request_seed=%d (iteration+1)", request_seed) # RL component seed (constant across iterations for deterministic training) if self.engine_props.get("rl_seed") is not None: # Use explicit RL seed (constant across iterations) rl_seed = self.engine_props["rl_seed"] logger.info("Using constant rl_seed=%d for RL components", rl_seed) seed_rl_components(rl_seed) elif self.engine_props.get("seed") is not None: # Use general seed as constant RL seed rl_seed = self.engine_props["seed"] logger.info("Using constant rl_seed=%d from general seed for RL components", rl_seed) seed_rl_components(rl_seed) else: # No RL seed specified - use same as request seed (varies per iteration) logger.debug( "No rl_seed specified, using request_seed=%d for RL (varies per iteration)", request_seed, ) seed_rl_components(request_seed) # Seed request generation (varies per iteration by default) seed_request_generation(request_seed) self.generate_requests(request_seed) # Schedule failure AFTER requests are generated (in every iteration) if self.failure_manager: # Clear any previous failures before scheduling new ones self.failure_manager.clear_all_failures() self._schedule_failure()
def _initialize_failure_manager(self) -> None: """ Initialize FailureManager and schedule failures if configured. This method is called after topology creation to set up failure injection based on the failure_settings configuration. The actual failure is scheduled after request generation in init_iter() to use real Poisson arrival times rather than indices. """ failure_type = self.engine_props.get("failure_type", "none") if failure_type == "none": logger.info("No failures configured for this simulation") return # Debug: Log topology info logger.info(f"Topology has {self.topology.number_of_nodes()} nodes and {self.topology.number_of_edges()} edges") logger.debug(f"Topology nodes: {sorted(self.topology.nodes())}") logger.debug(f"Topology edges (first 10): {list(self.topology.edges())[:10]}") # Create FailureManager with topology self.failure_manager = FailureManager(self.engine_props, self.topology) logger.info(f"FailureManager initialized for failure type: {failure_type}") # Pass FailureManager to SDNController for path feasibility checking self.sdn_obj.failure_manager = self.failure_manager # Note: Failure will be scheduled in init_iter() after requests are generated def _schedule_failure(self) -> None: """ Schedule failure event based on configuration. Reads failure settings from engine_props and injects the appropriate failure type at the configured time. Uses actual Poisson arrival times from the generated requests. """ if not self.failure_manager or not self.reqs_dict: return failure_type = self.engine_props.get("failure_type", "none") # Get actual arrival times from generated requests arrival_times = sorted([t for t, req in self.reqs_dict.items() if req.get("request_type") == "arrival"]) if not arrival_times: logger.warning("No arrival times available to schedule failure") return # Determine failure time based on arrival index t_fail_arrival_index = self.engine_props.get("t_fail_arrival_index", -1) # If -1, inject at midpoint if t_fail_arrival_index == -1: t_fail_arrival_index = len(arrival_times) // 2 # Clamp to valid range t_fail_arrival_index = max(0, min(t_fail_arrival_index, len(arrival_times) - 1)) # Get actual failure time t_fail = arrival_times[t_fail_arrival_index] # Determine repair time t_repair_after_arrivals = self.engine_props.get("t_repair_after_arrivals", 2) t_repair_arrival_index = t_fail_arrival_index + t_repair_after_arrivals # Validate that repair index is within bounds if t_repair_arrival_index >= len(arrival_times): logger.error( f"Invalid failure configuration: repair would occur at arrival " f"index {t_repair_arrival_index}, but only {len(arrival_times)} " f"requests exist. Increase num_requests or reduce " f"t_fail_arrival_index/t_repair_after_arrivals." ) raise ValueError( f"Repair index {t_repair_arrival_index} exceeds number of " f"arrivals ({len(arrival_times)}). Need at least " f"{t_repair_arrival_index + 1} requests." ) t_repair = arrival_times[t_repair_arrival_index] logger.info( f"Scheduling {failure_type} failure at arrival index " f"{t_fail_arrival_index} (t={t_fail:.4f}), repair at index " f"{t_repair_arrival_index} (t={t_repair:.4f})" ) # Inject failure based on type try: if failure_type == "link": # Convert link node IDs to match topology node type # Topology nodes might be strings, so we need to match that type failed_src = self.engine_props["failed_link_src"] failed_dst = self.engine_props["failed_link_dst"] # Try to match the type of nodes in the topology if self.topology.number_of_nodes() > 0: sample_node = next(iter(self.topology.nodes())) if isinstance(sample_node, str): failed_src = str(failed_src) failed_dst = str(failed_dst) event = self.failure_manager.inject_failure( "link", t_fail=t_fail, t_repair=t_repair, link_id=(failed_src, failed_dst), ) logger.info(f"Link failure scheduled: {event['failed_links']} from t={t_fail:.2f} to t={t_repair:.2f}") elif failure_type == "srlg": srlg_links = self.engine_props.get("srlg_links", []) event = self.failure_manager.inject_failure("srlg", t_fail=t_fail, t_repair=t_repair, srlg_links=srlg_links) logger.info(f"SRLG failure scheduled: {len(event['failed_links'])} links from t={t_fail:.2f} to t={t_repair:.2f}") elif failure_type == "geo": # Convert center node ID to match topology node type center_node = self.engine_props["geo_center_node"] if self.topology.number_of_nodes() > 0: sample_node = next(iter(self.topology.nodes())) if isinstance(sample_node, str): center_node = str(center_node) event = self.failure_manager.inject_failure( "geo", t_fail=t_fail, t_repair=t_repair, center_node=center_node, hop_radius=self.engine_props["geo_hop_radius"], ) logger.info(f"Geographic failure scheduled: {len(event['failed_links'])} links from t={t_fail:.2f} to t={t_repair:.2f}") elif failure_type == "node": # Convert node ID to match topology node type node_id = self.engine_props.get("failed_node_id") if node_id is not None and self.topology.number_of_nodes() > 0: sample_node = next(iter(self.topology.nodes())) if isinstance(sample_node, str): node_id = str(node_id) event = self.failure_manager.inject_failure( "node", t_fail=t_fail, t_repair=t_repair, node_id=node_id, ) logger.info(f"Node failure scheduled: {len(event['failed_links'])} links from t={t_fail:.2f} to t={t_repair:.2f}") except Exception as e: logger.error(f"Failed to schedule {failure_type} failure: {e}") raise
[docs] def run(self, seed: int | None = None) -> int: """ Run the simulation by creating the topology and processing requests. This method creates the topology, processes requests, and sends iteration-based updates to the parent's queue. :param seed: Optional seed for random generation :type seed: Optional[int] :return: Number of completed iteration units :rtype: int """ simulation_context = self._setup_simulation_context() self._log_simulation_start(simulation_context) # Iterations are 0-indexed internally (0, 1, 2, ..., max_iters-1) # max_iters specifies the total count # (e.g., max_iters=2 runs iterations 0 and 1) for iteration in range(self.engine_props["max_iters"]): if self._should_stop_simulation(simulation_context): break simulation_context["done_units"] = self._run_single_iteration(iteration, seed, simulation_context) if simulation_context["end_iter"]: break self._log_simulation_complete(simulation_context) return int(simulation_context["done_units"])
def _setup_simulation_context(self) -> dict[str, Any]: """ Initialize simulation context with necessary parameters. :return: Dictionary containing simulation context parameters :rtype: Dict[str, Any] """ self.create_topology() # Initialize FailureManager if failures are configured self._initialize_failure_manager() return { "log_queue": self.engine_props.get("log_queue"), "max_iters": self.engine_props["max_iters"], "progress_queue": self.engine_props.get("progress_queue"), "thread_num": self.engine_props.get("thread_num", "unknown"), "my_iteration_units": self.engine_props.get("my_iteration_units", self.engine_props["max_iters"]), "done_offset": self.engine_props.get("done_offset", 0), "done_units": self.engine_props.get("done_offset", 0), "end_iter": False, } def _log_simulation_start(self, context: dict[str, Any]) -> None: """ Log simulation start message. :param context: Simulation context dictionary :type context: Dict[str, Any] """ log_message( message=( f"[Engine] thread={context['thread_num']}, " f"offset={context['done_offset']}, " f"my_iteration_units={context['my_iteration_units']}, " f"erlang={self.engine_props['erlang']}\n" ), log_queue=context["log_queue"], ) def _should_stop_simulation(self, context: dict[str, Any]) -> bool: """ Check if simulation should be stopped. :param context: Simulation context dictionary :type context: Dict[str, Any] :return: True if simulation should stop :rtype: bool """ if self.stop_flag is not None and self.stop_flag.is_set(): log_message( message=(f"Simulation stopped for Erlang: {self.engine_props['erlang']} simulation number: {context['thread_num']}.\n"), log_queue=context["log_queue"], ) return True return False def _run_single_iteration(self, iteration: int, seed: int | None, context: dict[str, Any]) -> int: """ Execute a single simulation iteration. :param iteration: Current iteration number :type iteration: int :param seed: Random seed for request generation :type seed: Optional[int] :param context: Simulation context dictionary :type context: Dict[str, Any] :return: Updated done_units count :rtype: int """ self.init_iter(iteration=iteration, seed=seed) self._process_all_requests() context["end_iter"] = self.end_iter(iteration=iteration) context["done_units"] += 1 self._update_progress(iteration, context) time.sleep(0.2) return int(context["done_units"]) def _find_affected_requests(self, failed_links: list[tuple[Any, Any]]) -> list[dict[str, Any]]: """ Find all allocated requests affected by failed links. A request is affected if any link in its current active path (primary or backup) matches a failed link. :param failed_links: List of failed link tuples :type failed_links: list[tuple[Any, Any]] :return: List of affected request info dictionaries :rtype: list[dict[str, Any]] """ affected = [] failed_links_set = set(failed_links) # Also include reverse direction since links are bidirectional for link in failed_links: failed_links_set.add((link[1], link[0])) for _request_id, request_info in self.sdn_obj.sdn_props.allocated_requests.items(): # Determine which path to check based on active_path active_path_key = "primary_path" if request_info.get("active_path") == "primary" else "backup_path" active_path = request_info.get(active_path_key) if active_path is None: continue # Check if any link in the active path is failed for i in range(len(active_path) - 1): link = (active_path[i], active_path[i + 1]) if link in failed_links_set: affected.append(request_info) break return affected def _is_path_feasible(self, path: list[int] | None) -> bool: """ Check if a path is feasible given current failures. :param path: Path to check as list of node IDs :type path: list[int] | None :return: True if path is feasible, False otherwise :rtype: bool """ if path is None: return False if not self.failure_manager: return True return self.failure_manager.is_path_feasible(path) def _handle_failure_impact(self, current_time: tuple[int, float], failed_links: list[tuple[Any, Any]]) -> None: """ Handle impact of failures on already-allocated requests. For each affected request: - If protected and backup path is viable: switch to backup - Otherwise: release spectrum and count as blocked :param current_time: Current simulation time as (iteration, time) tuple :type current_time: float :param failed_links: List of newly failed links :type failed_links: list[tuple[Any, Any]] """ affected_requests = self._find_affected_requests(failed_links) if not affected_requests: logger.debug("No allocated requests affected by failures") return logger.info(f"Found {len(affected_requests)} allocated request(s) affected by failures at t={current_time:.2f}") switchover_count = 0 dropped_count = 0 for request_info in affected_requests: request_id = request_info["request_id"] # Check if this is a protected request with viable backup if request_info.get("is_protected") and request_info.get("active_path") == "primary": backup_path = request_info.get("backup_path") if self._is_path_feasible(backup_path): # Backup path is viable - switch to it self._switch_to_backup(request_info, current_time) switchover_count += 1 logger.info(f"Request {request_id}: Switched to backup path {backup_path} at t={current_time:.2f}") else: # Backup path also failed - release and count as blocked self._release_failed_request(request_info, current_time) dropped_count += 1 logger.warning(f"Request {request_id}: Both primary and backup paths failed, releasing at t={current_time:.2f}") else: # Unprotected request or already on backup - release self._release_failed_request(request_info, current_time) dropped_count += 1 logger.warning(f"Request {request_id}: Unprotected request failed, releasing at t={current_time:.2f}") logger.info(f"Failure impact: {switchover_count} switchovers, {dropped_count} dropped requests") def _switch_to_backup(self, request_info: dict[str, Any], current_time: tuple[int, float]) -> None: """ Switch a protected request to its backup path. :param request_info: Request information dictionary :type request_info: dict[str, Any] :param current_time: Current simulation time :type current_time: float """ request_id = request_info["request_id"] # Update active path in tracking self.sdn_obj.sdn_props.allocated_requests[request_id]["active_path"] = "backup" # Update switchover metrics in SDN props # Extract time value from tuple for metrics storage _, time_val = current_time self.sdn_obj.sdn_props.switchover_count += 1 self.sdn_obj.sdn_props.last_switchover_time = time_val # Update stats metrics self.stats_obj.stats_props.protection_switchovers += 1 self.stats_obj.stats_props.switchover_times.append(time_val) # Note: Spectrum is already allocated on backup path, no need to reallocate def _release_failed_request(self, request_info: dict[str, Any], current_time: tuple[int, float]) -> None: """ Release a request that failed due to link failures. This releases the spectrum and counts the request as blocked. :param request_info: Request information dictionary :type request_info: dict[str, Any] :param current_time: Current simulation time as (iteration, time) tuple :type current_time: tuple[int, float] """ request_id = request_info["request_id"] # Set up SDN props for release # Extract time value from tuple for SDN props _, time_val = current_time self.sdn_obj.sdn_props.request_id = request_id self.sdn_obj.sdn_props.path_list = request_info.get("primary_path") self.sdn_obj.sdn_props.arrive = request_info.get("arrive_time") self.sdn_obj.sdn_props.depart = time_val # Use failure time as depart time # NOTE: Do NOT set bandwidth from request_info during release - it may contain # allocated bandwidth (e.g., 100) instead of original request bandwidth (e.g., 800), # which corrupts sdn_props.bandwidth for subsequent operations. # self.sdn_obj.sdn_props.bandwidth = request_info.get("bandwidth") # Release spectrum on primary path self.sdn_obj.release() # If protected, also release backup path if request_info.get("is_protected"): if request_info.get("backup_path"): self.sdn_obj.sdn_props.path_list = request_info.get("backup_path") self.sdn_obj.release() # Count as protection failure (both paths failed) self.stats_obj.stats_props.protection_failures += 1 # Count as blocked due to failure # CRITICAL: Increment the main blocked_requests counter for blocking probability self.stats_obj.blocked_requests += 1 # Also update failure-specific counters current_failure_count = self.stats_obj.stats_props.block_reasons_dict.get("failure", 0) self.stats_obj.stats_props.block_reasons_dict["failure"] = (current_failure_count if current_failure_count is not None else 0) + 1 self.stats_obj.stats_props.failure_induced_blocks += 1 # Update bit rate blocking if bandwidth info available if request_info.get("bandwidth") is not None: bandwidth = int(request_info["bandwidth"]) self.stats_obj.bit_rate_blocked += bandwidth # Note: bit_rate_request was already incremented when request was initially allocated def _process_all_requests(self) -> None: """Process all requests for the current iteration.""" request_number = 1 if self.reqs_dict is None: return for current_time in self.reqs_dict: # current_time is tuple (iteration, time) - extract time for logging/failure checks _, time_val = current_time # Check for scheduled failure activations at this time if self.failure_manager: activated_links = self.failure_manager.activate_failures(time_val) if activated_links: logger.info(f"Activated {len(activated_links)} failed link(s) at time {time_val:.2f}: {activated_links}") # Handle already-allocated requests affected by failures self._handle_failure_impact(current_time, activated_links) # Check for scheduled repairs at this time if self.failure_manager: repaired_links = self.failure_manager.repair_failures(time_val) if repaired_links: logger.info(f"Repaired {len(repaired_links)} link(s) at time {time_val:.2f}: {repaired_links}") self.handle_request(current_time=current_time, request_number=request_number) if self.reqs_dict is not None and current_time in self.reqs_dict and self.reqs_dict[current_time]["request_type"] == "arrival": request_number += 1 def _update_progress(self, iteration: int, context: dict[str, Any]) -> None: """ Update progress tracking and logging. :param iteration: Current iteration number :type iteration: int :param context: Simulation context dictionary :type context: Dict[str, Any] """ if context["progress_queue"]: context["progress_queue"].put((context["thread_num"], context["done_units"])) log_message( message=(f"CHILD={context['thread_num']} iteration={iteration}, done_units={context['done_units']}\n"), log_queue=context["log_queue"], ) def _log_simulation_complete(self, context: dict[str, Any]) -> None: """ Log simulation completion message. :param context: Simulation context dictionary :type context: Dict[str, Any] """ log_message( message=( f"Simulation finished for Erlang: {self.engine_props['erlang']} finished for simulation number: {context['thread_num']}.\n" ), log_queue=context["log_queue"], ) # Close dataset logger if enabled if self.dataset_logger: self.dataset_logger.close() logger.info("Dataset logger closed") def _validate_grooming_config(self) -> None: """ Validate grooming-related configuration. Checks that grooming configuration options are consistent and compatible with other simulation settings. """ if not self.engine_props.get("is_grooming_enabled", False): return # Check for required settings if "transponders_per_node" not in self.engine_props: logger.warning("transponders_per_node not set, using default value of 10") self.engine_props["transponders_per_node"] = 10 # Validate SNR rechecking settings if self.engine_props.get("snr_recheck", False): if self.engine_props.get("snr_type") in ["None", None]: logger.warning("snr_recheck enabled but snr_type is None - rechecking will be skipped") # Validate partial service setting if self.engine_props.get("can_partially_serve", False): logger.info("Partial service allocation enabled") logger.debug("Grooming configuration validated") def _init_transponder_usage(self) -> None: """ Initialize transponder usage tracking for all nodes. Sets up the transponder_usage_dict with initial transponder counts for each node in the network. """ if self.sdn_obj.sdn_props.topology is None: logger.warning("Cannot initialize transponder usage: topology not set") return self.sdn_obj.sdn_props.transponder_usage_dict = {} # Get initial transponder count from config initial_transponders = self.engine_props.get("transponders_per_node", 10) for node in self.sdn_obj.sdn_props.topology.nodes(): self.sdn_obj.sdn_props.transponder_usage_dict[node] = { "available_transponder": initial_transponders, "total_transponder": initial_transponders, } logger.debug( "Initialized transponder usage for %d nodes (%d transponders each)", len(self.sdn_obj.sdn_props.transponder_usage_dict), initial_transponders, ) def _collect_grooming_stats(self) -> None: """ Collect grooming-specific statistics. Calculates and stores metrics related to traffic grooming performance including grooming success rate and bandwidth utilization. """ if not self.grooming_stats: self.grooming_stats = { "fully_groomed": 0, "partially_groomed": 0, "not_groomed": 0, "lightpaths_created": 0, "lightpaths_released": 0, "avg_lightpath_utilization": [], } # Calculate average lightpath utilization if self.sdn_obj.sdn_props.lp_bw_utilization_dict: utilizations = [lp_info["utilization"] for lp_info in self.sdn_obj.sdn_props.lp_bw_utilization_dict.values()] avg_util = sum(utilizations) / len(utilizations) if utilizations else 0.0 self.grooming_stats["avg_lightpath_utilization"].append(avg_util) logger.info( "Grooming stats: %d lightpaths, avg utilization: %.2f%%", len(utilizations), avg_util, ) def _save_all_stats(self, base_file_path: str = "data") -> None: """ Save all statistics using the persistence module. :param base_file_path: Base path for output files :type base_file_path: str """ # Create save dictionary with iteration stats save_dict: dict[str, Any] = {"iter_stats": {}} # Get blocking statistics from metrics blocking_stats = self.stats_obj.get_blocking_statistics() # Save main statistics self.persistence.save_stats( stats_dict=save_dict, stats_props=self.stats_obj.stats_props, blocking_stats=blocking_stats, base_file_path=base_file_path, ) # Save ML training data if available if self.ml_metrics: self.ml_metrics.save_train_data( iteration=self.stats_obj.iteration or 0, max_iterations=self.engine_props["max_iters"], base_file_path=base_file_path, ) def _signal_save_handler(self, signum: int, frame: Any) -> None: # pylint: disable=unused-argument """ Handle save operation when receiving signals. :param signum: Signal number :param frame: Current stack frame """ logger.warning("Received signal %d, saving statistics...", signum) self._save_all_stats() logger.info("Statistics saved due to signal") @property def num_requests(self) -> int: """ Total number of arrival requests in current episode. :return: Number of arrival requests (excludes release events) :rtype: int """ if self.reqs_dict is None: return 0 return sum(1 for req in self.reqs_dict.values() if req.get("request_type") == "arrival") @property def network_state(self) -> NetworkState | None: """ Access to the v5 NetworkState object. :return: NetworkState if orchestrator mode enabled, None otherwise :rtype: NetworkState | None """ return self._network_state
[docs] def get_next_request(self) -> Request | None: """ Get the next unprocessed arrival request for RL. This method is used by UnifiedSimEnv to get requests one at a time for step-by-step RL decision making. Requires orchestrator mode. :return: Next Request object, or None if all arrivals processed :rtype: Request | None """ if not self.use_orchestrator or self.reqs_dict is None: return None # Get sorted arrival times arrival_times = sorted([t for t, req in self.reqs_dict.items() if req.get("request_type") == "arrival"]) # Find next unprocessed request using _rl_request_index if not hasattr(self, "_rl_request_index"): self._rl_request_index = 0 if self._rl_request_index >= len(arrival_times): return None current_time = arrival_times[self._rl_request_index] return self._get_or_create_v5_request(current_time)
[docs] def process_releases_until(self, time: float) -> None: """ Process all release events due before given time. This method is used by UnifiedSimEnv to process lightpath releases between RL decision points. Requires orchestrator mode. :param time: Process all releases with depart_time <= time :type time: float """ if not self.use_orchestrator or self.reqs_dict is None: return if self._network_state is None or self._orchestrator is None: return # Find and process all release events at or before given time # NOTE: Use <= to match legacy behavior where releases at exactly # the same time as the next arrival are processed before that arrival release_times = sorted([t for t, req in self.reqs_dict.items() if req.get("request_type") == "release" and t[1] <= time]) for release_time in release_times: # Check if we've already processed this release if not hasattr(self, "_processed_releases"): self._processed_releases: set[tuple[int, float]] = set() if release_time in self._processed_releases: continue # Process the release self._handle_release_orchestrator(release_time) self._processed_releases.add(release_time)
[docs] def record_allocation_result( self, request: Request, result: AllocationResult, ) -> None: """ Record allocation result for RL statistics. This method is used by UnifiedSimEnv to record the result of each RL decision and advance to the next request. Updates statistics counters, schedules release event if successful, and advances index. :param request: The Request that was processed :type request: Request :param result: AllocationResult from orchestrator :type result: AllocationResult """ if not self.use_orchestrator: return # Find the time key for this request if self.reqs_dict is None: return time_key: tuple[int, float] | None = None for t, req in self.reqs_dict.items(): if req.get("req_id") == request.request_id: time_key = t break if time_key is None: logger.warning("Could not find time key for request %d", request.request_id) return # Update stats using existing method self._update_stats_from_result(time_key, request, result) # Track the allocation for release processing if result.success: self.reqs_status_dict[request.request_id] = { "mod_format": list(result.modulations) if result.modulations else [], "path": self._get_paths_from_result(result), "is_sliced": result.is_sliced, "was_routed": True, "core_list": list(result.cores) if result.cores else [], "band": list(result.bands) if result.bands else [], "start_slot_list": list(result.start_slots) if result.start_slots else [], "end_slot_list": list(result.end_slots) if result.end_slots else [], "bandwidth_list": (list(result.bandwidth_allocations) if result.bandwidth_allocations else [request.bandwidth_gbps]), "lightpath_id_list": (list(result.all_lightpath_ids) if result.all_lightpath_ids else []), "lightpath_bandwidth_list": (list(result.lightpath_bandwidths) if result.lightpath_bandwidths else []), "was_new_lp_established": (list(result.lightpaths_created) if result.lightpaths_created else []), } # Advance to next request if not hasattr(self, "_rl_request_index"): self._rl_request_index = 0 self._rl_request_index += 1
[docs] def reset_rl_state(self) -> None: """Reset RL-specific state for new episode. Called by UnifiedSimEnv.reset() to prepare for a new episode. """ self._rl_request_index = 0 self._processed_releases = set() # Reset v5 request cache self._v5_requests = {} # Reset network state if in orchestrator mode if self.use_orchestrator and self._network_state is not None: self._network_state.reset()