Source code for fusion.core.metrics

import math
from statistics import mean, stdev, variance
from typing import Any

import numpy as np

from fusion.analysis.network_analysis import NetworkAnalyzer
from fusion.core.properties import SNAP_KEYS_LIST, SDNProps, StatsProps
from fusion.utils.logging_config import get_logger
from fusion.utils.network import find_path_length

logger = get_logger(__name__)


[docs] class SimStats: """ The SimStats class finds and stores all relevant statistics in simulations. """
[docs] def __init__( self, engine_props: dict[str, Any], sim_info: str, stats_props: StatsProps | None = None, ): if stats_props is not None: self.stats_props = stats_props else: self.stats_props = StatsProps() self.engine_props = engine_props self.sim_info = sim_info # Used to track transponders for a single allocated request self.current_transponders = 0 # Used to track transponders for an entire simulation on average self.total_transponders = 0 self.blocked_requests = 0 self.block_mean: float | None = None self.block_variance: float | None = None self.block_ci: float | None = None self.block_ci_percent: float | None = None self.bit_rate_request: int = 0 self.bit_rate_blocked = 0 self.bit_rate_block_mean: float | None = None self.bit_rate_block_variance: float | None = None self.bit_rate_block_ci: float | None = None self.bit_rate_block_ci_percent: float | None = None self.topology: Any = None self.iteration: int | None = None # Recovery time tracking self.recovery_times_ms: list[float] = [] self.failure_window_bp: list[float] = [] self.recovery_events: list[dict[str, Any]] = [] # Failure window parameters self.failure_window_size = engine_props.get("recovery_timing_settings", {}).get("failure_window_size", 1000) # Fragmentation and decision time metrics self.fragmentation_scores: list[float] = [] self.decision_times_ms: list[float] = [] self.total_requests: int = 0 self.groomed_requests: int = 0 self.sliced_requests: int = 0 self.protected_requests: int = 0 self.snapshot_interval: int = engine_props.get("snapshot_interval", 100) self.mods_dict_updates_log: list[dict[str, Any]] = []
@staticmethod def _get_snapshot_info( network_spectrum_dict: dict[tuple[Any, Any], dict[str, Any]], path_list: list[tuple[int, int]] | None, ) -> tuple[int, int, int]: """ Retrieves relative information for simulation snapshots. :param network_spectrum_dict: The current network spectrum database. :param path_list: A path to find snapshot info, if empty, does this for the entire network. :return: The occupied slots, number of guard bands, and active requests. :rtype: tuple """ active_reqs_set = set() occupied_slots = 0 guard_slots = 0 # Skip by two because the link is bidirectional, no need to check both arrays # e.g., (0, 1) and (1, 0) for link in list(network_spectrum_dict.keys())[::2]: if path_list is not None and link not in path_list: continue link_data = network_spectrum_dict[link] for core in link_data["cores_matrix"]: requests_set = set(core[core > 0]) for curr_req in requests_set: active_reqs_set.add(curr_req) occupied_slots += len(np.where(core != 0)[0]) guard_slots += len(np.where(core < 0)[0]) return occupied_slots, guard_slots, len(active_reqs_set) # TODO(v6.1): Reimplement update_snapshot with orchestrator-compatible metrics collection
[docs] def update_snapshot( self, network_spectrum_dict: dict[tuple[Any, Any], dict[str, Any]], request_number: int, path_list: list[tuple[int, int]] | None = None, ) -> None: """ Stub for snapshot collection - not currently functional. This method is a placeholder that maintains API compatibility but does not collect snapshot data. Full implementation planned for v6.1. :param network_spectrum_dict: The current network spectrum database. :param request_number: The current request number. :param path_list: The desired path to find the occupied slots on. :return: None """ logger.debug("update_snapshot called but not functional - will be implemented in v6.1")
def _init_snapshots(self) -> None: for req_num in range(0, self.engine_props["num_requests"] + 1, self.engine_props["snapshot_step"]): self.stats_props.snapshots_dict[req_num] = {} for key in SNAP_KEYS_LIST: self.stats_props.snapshots_dict[req_num][key] = [] def _init_mods_weights_bws(self) -> None: # Initialize weights_dict and modulations_used_dict as nested dicts if not isinstance(self.stats_props.weights_dict, dict): self.stats_props.weights_dict = {} if not isinstance(self.stats_props.modulations_used_dict, dict): self.stats_props.modulations_used_dict = {} if not isinstance(self.stats_props.demand_realization_ratio, dict): self.stats_props.demand_realization_ratio = {} # Initialize demand_realization_ratio with overall key self.stats_props.demand_realization_ratio["overall"] = [] for bandwidth, obj in self.engine_props["mod_per_bw"].items(): # Convert bandwidth to string to match tracking logic bandwidth_key = str(bandwidth) # Ensure bandwidth keys exist as dicts if bandwidth_key not in self.stats_props.modulations_used_dict: self.stats_props.modulations_used_dict[bandwidth_key] = {} if bandwidth_key not in self.stats_props.weights_dict: self.stats_props.weights_dict[bandwidth_key] = {} # Initialize demand_realization_ratio per bandwidth self.stats_props.demand_realization_ratio[bandwidth_key] = [] for modulation in obj.keys(): # Initialize nested dict structure for weights if bandwidth_key not in self.stats_props.weights_dict: self.stats_props.weights_dict[bandwidth_key] = {} self.stats_props.weights_dict[bandwidth_key][modulation] = [] # Initialize nested dict structure for modulations_used if bandwidth_key not in self.stats_props.modulations_used_dict: self.stats_props.modulations_used_dict[bandwidth_key] = {} self.stats_props.modulations_used_dict[bandwidth_key][modulation] = 0 # Initialize modulation-specific tracking mod_dict = self.stats_props.modulations_used_dict mod_entry = mod_dict.get(modulation, {}) length_entry = mod_entry.get("length", {}) overall_entry = length_entry.get("overall") if ( modulation not in mod_dict or not isinstance(mod_entry, dict) or "length" not in mod_entry or isinstance(overall_entry, dict) ): self.stats_props.modulations_used_dict[modulation] = {} self.stats_props.modulations_used_dict[modulation]["length"] = {} self.stats_props.modulations_used_dict[modulation]["length"]["overall"] = [] self.stats_props.modulations_used_dict[modulation]["hop"] = {} self.stats_props.modulations_used_dict[modulation]["hop"]["overall"] = [] self.stats_props.modulations_used_dict[modulation]["snr"] = {} self.stats_props.modulations_used_dict[modulation]["snr"]["overall"] = [] self.stats_props.modulations_used_dict[modulation]["xt_cost"] = {} self.stats_props.modulations_used_dict[modulation]["xt_cost"]["overall"] = [] for band in self.engine_props["band_list"]: self.stats_props.modulations_used_dict[modulation][band] = 0 self.stats_props.modulations_used_dict[modulation]["length"][band] = [] self.stats_props.modulations_used_dict[modulation]["hop"][band] = [] self.stats_props.modulations_used_dict[modulation]["snr"][band] = [] self.stats_props.modulations_used_dict[modulation]["xt_cost"][band] = [] self.stats_props.bandwidth_blocking_dict[bandwidth_key] = 0 def _init_frag_dict(self) -> None: """Initialize fragmentation dictionary for tracking fragmentation metrics.""" if not isinstance(self.stats_props.frag_dict, dict): self.stats_props.frag_dict = {} frag_metrics = self.engine_props.get("fragmentation_metrics", []) # Handle case where config parsing returns string "[]" instead of empty list if isinstance(frag_metrics, str) and frag_metrics in ("[]", ""): frag_metrics = [] elif not isinstance(frag_metrics, list): frag_metrics = [] for method in frag_metrics: self.stats_props.frag_dict[method] = {} for req_cnt in range( self.engine_props.get("frag_calc_step", 100), self.engine_props["num_requests"] + 1, self.engine_props.get("frag_calc_step", 100), ): self.stats_props.frag_dict[method][req_cnt] = {"arrival": {}, "release": {}} def _init_lp_bw_utilization_dict(self) -> None: """Initialize lightpath bandwidth utilization dictionary.""" if not isinstance(self.stats_props.lp_bw_utilization_dict, dict): self.stats_props.lp_bw_utilization_dict = {} for bandwidth, _obj in self.engine_props["mod_per_bw"].items(): bandwidth_key = str(bandwidth) self.stats_props.lp_bw_utilization_dict[bandwidth_key] = {} for band in self.engine_props["band_list"]: self.stats_props.lp_bw_utilization_dict[bandwidth_key][band] = {} for core_num in range(self.engine_props["cores_per_link"]): self.stats_props.lp_bw_utilization_dict[bandwidth_key][band][core_num] = [] self.stats_props.lp_bw_utilization_dict["overall"] = [] def _init_stat_dicts(self) -> None: for stat_key, data_type in vars(self.stats_props).items(): if not isinstance(data_type, dict): continue if stat_key in ( "modulations_used_dict", "weights_dict", "bandwidth_blocking_dict", "demand_realization_ratio", ): self._init_mods_weights_bws() elif stat_key == "frag_dict": self._init_frag_dict() elif stat_key == "lp_bw_utilization_dict": self._init_lp_bw_utilization_dict() elif stat_key == "snapshots_dict": if self.engine_props["save_snapshots"]: self._init_snapshots() elif stat_key == "cores_dict": cores_range = range(self.engine_props["cores_per_link"]) self.stats_props.cores_dict = dict.fromkeys(cores_range, 0) elif stat_key == "block_reasons_dict": self.stats_props.block_reasons_dict = { "distance": 0, "congestion": 0, "xt_threshold": 0, } elif stat_key == "link_usage_dict": self.stats_props.link_usage_dict = {} elif stat_key != "iter_stats": raise ValueError("Dictionary statistic was not reset in props.") def _init_stat_lists(self) -> None: for stat_key in vars(self.stats_props).keys(): data_type = getattr(self.stats_props, stat_key) if isinstance(data_type, list): # Only reset sim_block_list when we encounter a new traffic volume if self.iteration != 0 and stat_key in [ "simulation_blocking_list", "simulation_bitrate_blocking_list", ]: continue if stat_key == "path_index_list": continue setattr(self.stats_props, stat_key, [])
[docs] def init_iter_stats(self) -> None: """ Initializes data structures used in other methods of this class. :return: None """ self._init_stat_dicts() self._init_stat_lists() self.blocked_requests = 0 self.bit_rate_blocked = 0 self.bit_rate_request = 0 self.total_transponders = 0 k_paths = self.engine_props.get("k_paths", 1) if k_paths is not None: self.stats_props.path_index_list = [0] * k_paths else: self.stats_props.path_index_list = [0]
[docs] def calculate_blocking_statistics(self) -> None: """ Gets the current blocking probability. :return: None """ if self.engine_props["num_requests"] == 0: blocking_prob = 0.0 bit_rate_blocking_prob = 0.0 else: num_requests = self.engine_props["num_requests"] blocking_prob = float(self.blocked_requests) / float(num_requests) if self.bit_rate_request > 0: bit_rate_blocking_prob = float(self.bit_rate_blocked) / float(self.bit_rate_request) else: bit_rate_blocking_prob = 0.0 self.stats_props.simulation_blocking_list.append(blocking_prob) self.stats_props.simulation_bitrate_blocking_list.append(bit_rate_blocking_prob)
# TODO: Refactor this ~200-line function into smaller single-responsibility methods. # Each stat_key (snr_list, crosstalk_list, core_list, modulation_list, etc.) should # have its own handler method. Current implementation violates 50-line guideline. def _handle_iter_lists(self, sdn_data: SDNProps) -> None: for stat_key in sdn_data.stat_key_list: curr_sdn_data = sdn_data.get_data(key=stat_key) # Track overall SNR statistics (mean of newly established lightpaths) # Note: crosstalk is handled separately and only when xt_type is configured if stat_key == "snr_list": if set(curr_sdn_data) <= {None}: continue # Skip if all values are None # Filter to only newly established lightpaths new_lp_values = [] for i, value in enumerate(curr_sdn_data): if i < len(sdn_data.lightpath_id_list): lp_id = sdn_data.lightpath_id_list[i] if lp_id in sdn_data.was_new_lp_established and value is not None: new_lp_values.append(value) # Append mean of new lightpaths to overall statistics list if len(new_lp_values) > 0: self.stats_props.snr_list.append(mean(new_lp_values)) if stat_key == "crosstalk_list": # FIXME(drl_path_agents): DRL agents may return [None] for crosstalk when # XT calculation is disabled or unavailable. This should be handled upstream # in the RL adapter by providing a default value or omitting the field entirely. if curr_sdn_data == [None]: continue # Skip this stat_key, don't break entire loop for i, data in enumerate(curr_sdn_data): # Only process NEW lightpaths when grooming is enabled (skip EXISTING groomed lightpaths) if self.engine_props.get("is_grooming_enabled", False): if i < len(sdn_data.lightpath_id_list): lp_id = sdn_data.lightpath_id_list[i] if lp_id not in sdn_data.was_new_lp_established: continue # Skip EXISTING lightpaths if stat_key == "core_list": if data not in self.stats_props.cores_dict: self.stats_props.cores_dict[data] = 0 self.stats_props.cores_dict[data] += 1 elif stat_key == "modulation_list": bandwidth = sdn_data.lightpath_bandwidth_list[i] # Use lightpath bandwidth band = sdn_data.band_list[i] # Ensure the nested dict structure exists - convert float to int to match dict keys bandwidth_key = str(int(float(bandwidth))) if bandwidth is not None else None mod_dict = self.stats_props.modulations_used_dict bw_dict = mod_dict.get(bandwidth_key) if bandwidth_key and isinstance(bw_dict, dict): if data in bw_dict: old_bw_count = bw_dict[data] bw_dict[data] += 1 self.mods_dict_updates_log.append( { "req_id": sdn_data.request_id, "mod": data, "bw": bandwidth_key, "band": None, "action": "bw_count_increment", "old": old_bw_count, "new": bw_dict[data], } ) else: # Initialize if not present bw_dict[data] = 1 self.mods_dict_updates_log.append( { "req_id": sdn_data.request_id, "mod": data, "bw": bandwidth_key, "band": None, "action": "bw_count_init", "value": 1, } ) data_mod_dict = mod_dict.get(data) if isinstance(data_mod_dict, dict): if band in data_mod_dict: old_band_count = data_mod_dict[band] data_mod_dict[band] += 1 self.mods_dict_updates_log.append( { "req_id": sdn_data.request_id, "mod": data, "bw": None, "band": band, "action": "band_count_increment", "old": old_band_count, "new": data_mod_dict[band], } ) else: # Initialize if not present data_mod_dict[band] = 1 self.mods_dict_updates_log.append( { "req_id": sdn_data.request_id, "mod": data, "bw": None, "band": band, "action": "band_count_init", "value": 1, } ) length_dict = data_mod_dict.get("length") has_length = "length" in data_mod_dict if has_length and isinstance(length_dict, dict): if band in length_dict: length_dict[band].append(sdn_data.path_weight) self.mods_dict_updates_log.append( { "req_id": sdn_data.request_id, "mod": data, "bw": None, "band": band, "action": "length_append", "value": sdn_data.path_weight, } ) if "overall" in length_dict: length_dict["overall"].append(sdn_data.path_weight) self.mods_dict_updates_log.append( { "req_id": sdn_data.request_id, "mod": data, "bw": None, "band": "overall", "action": "length_append", "value": sdn_data.path_weight, } ) # Track hop count hop_dict = data_mod_dict.get("hop") if hop_dict and isinstance(hop_dict, dict) and sdn_data.path_list: num_hops = len(sdn_data.path_list) - 1 if band in hop_dict: hop_dict[band].append(num_hops) self.mods_dict_updates_log.append( { "req_id": sdn_data.request_id, "mod": data, "bw": None, "band": band, "action": "hop_append", "value": num_hops, } ) if "overall" in hop_dict: hop_dict["overall"].append(num_hops) self.mods_dict_updates_log.append( { "req_id": sdn_data.request_id, "mod": data, "bw": None, "band": "overall", "action": "hop_append", "value": num_hops, } ) # Track SNR or XT cost if self.engine_props.get("snr_type") != "None": if i < len(sdn_data.snr_list) and sdn_data.snr_list[i] is not None: if self.engine_props.get("snr_type") == "xt_calculation": # Track xt_cost xt_cost_dict = data_mod_dict.get("xt_cost") if xt_cost_dict and isinstance(xt_cost_dict, dict): if band in xt_cost_dict: xt_cost_dict[band].append(sdn_data.snr_list[i]) self.mods_dict_updates_log.append( { "req_id": sdn_data.request_id, "mod": data, "bw": None, "band": band, "action": "xt_cost_append", "value": sdn_data.snr_list[i], } ) if "overall" in xt_cost_dict: xt_cost_dict["overall"].append(sdn_data.snr_list[i]) self.mods_dict_updates_log.append( { "req_id": sdn_data.request_id, "mod": data, "bw": None, "band": "overall", "action": "xt_cost_append", "value": sdn_data.snr_list[i], } ) else: # Track snr snr_val = sdn_data.snr_list[i] snr_dict = data_mod_dict.get("snr") if snr_dict and isinstance(snr_dict, dict): if band in snr_dict: snr_dict[band].append(snr_val) self.mods_dict_updates_log.append( { "req_id": sdn_data.request_id, "mod": data, "bw": None, "band": band, "action": "snr_append", "value": snr_val, } ) if "overall" in snr_dict: snr_dict["overall"].append(snr_val) self.mods_dict_updates_log.append( { "req_id": sdn_data.request_id, "mod": data, "bw": None, "band": "overall", "action": "snr_append", "value": snr_val, } ) elif stat_key == "start_slot_list": self.stats_props.start_slot_list.append(int(data)) elif stat_key == "end_slot_list": self.stats_props.end_slot_list.append(int(data)) elif stat_key == "modulation_list": self.stats_props.modulation_list.append(str(data)) elif stat_key == "bandwidth_list": self.stats_props.bandwidth_list.append(float(data))
[docs] def iter_update( self, req_data: dict[str, Any], sdn_data: SDNProps, network_spectrum_dict: dict[tuple[Any, Any], dict[str, Any]], ) -> None: """ Continuously updates the statistical data for each request allocated/blocked in the current iteration. :param req_data: Holds data relevant to the current request. :param sdn_data: Hold the response data from the sdn controller. :return: None """ # Request was blocked if not sdn_data.was_routed: self.blocked_requests += 1 if sdn_data.bandwidth is not None: self.bit_rate_blocked += int(sdn_data.bandwidth) self.bit_rate_request += int(sdn_data.bandwidth) if sdn_data.block_reason is not None and sdn_data.block_reason in self.stats_props.block_reasons_dict: block_reason = sdn_data.block_reason current_val = self.stats_props.block_reasons_dict[block_reason] if current_val is not None: self.stats_props.block_reasons_dict[block_reason] = current_val + 1 else: self.stats_props.block_reasons_dict[block_reason] = 1 req_bandwidth = req_data.get("bandwidth") bw_block_dict = self.stats_props.bandwidth_blocking_dict if req_bandwidth is not None and req_bandwidth in bw_block_dict: self.stats_props.bandwidth_blocking_dict[req_bandwidth] += 1 else: # Skip stats tracking for fully groomed requests (v5 behavior) if sdn_data.was_groomed: if sdn_data.bandwidth is not None: self.bit_rate_request += int(sdn_data.bandwidth) return # Track bit rate for requests if sdn_data.bandwidth is not None: self.bit_rate_request += int(sdn_data.bandwidth) # Track blocked bandwidth for partial allocations (slicing or grooming) # This applies regardless of grooming setting remaining_bw = getattr(sdn_data, "remaining_bw", None) if remaining_bw is not None and remaining_bw != "0": self.bit_rate_blocked += int(remaining_bw) was_new_lps = getattr(sdn_data, "was_new_lp_established", []) if not was_new_lps: return if sdn_data.path_list is not None: num_hops = len(sdn_data.path_list) - 1 self.stats_props.hops_list.append(float(num_hops)) path_len = find_path_length(path_list=sdn_data.path_list, topology=self.topology) if path_len is not None: self.stats_props.lengths_list.append(round(float(path_len), 2)) self._handle_iter_lists(sdn_data=sdn_data) # Print modulation usage counts for all bandwidths mods_by_bw = {} for bw, bw_data in self.stats_props.modulations_used_dict.items(): if isinstance(bw_data, dict): # Extract only the modulation counts (skip nested dicts like 'length', 'hop', etc.) mods_by_bw[bw] = {mod: count for mod, count in bw_data.items() if isinstance(count, int)} if sdn_data.route_time is not None: self.stats_props.route_times_list.append(sdn_data.route_time) if sdn_data.number_of_transponders is not None: self.total_transponders += sdn_data.number_of_transponders if sdn_data.modulation_list and len(sdn_data.modulation_list) > 0: if sdn_data.path_index is not None and 0 <= sdn_data.path_index < len(self.stats_props.path_index_list): self.stats_props.path_index_list[sdn_data.path_index] += 1 # Track weights for NEW lightpaths only when grooming is enabled if ( hasattr(sdn_data, "bandwidth_list") and hasattr(sdn_data, "lightpath_id_list") and hasattr(sdn_data, "modulation_list") and sdn_data.path_weight is not None ): was_new_lps = getattr(sdn_data, "was_new_lp_established", []) for i in range(len(sdn_data.bandwidth_list)): # Check if this lightpath should be tracked should_track = False if self.engine_props.get("is_grooming_enabled", False): # When grooming is enabled, only track newly established lightpaths if ( isinstance(was_new_lps, list) and i < len(sdn_data.lightpath_id_list) and sdn_data.lightpath_id_list[i] in was_new_lps ): should_track = True else: # When grooming is disabled, track all lightpaths should_track = True if should_track: # FIX: Use lightpath_bandwidth (capacity) not bandwidth (dedicated) for weights_dict key bandwidth_key = str(sdn_data.lightpath_bandwidth_list[i]) mod_format = sdn_data.modulation_list[i] if i < len(sdn_data.modulation_list) else None if mod_format and bandwidth_key in self.stats_props.weights_dict: if mod_format in self.stats_props.weights_dict[bandwidth_key]: self.stats_props.weights_dict[bandwidth_key][mod_format].append(round(float(sdn_data.path_weight), 2)) else: # Initialize if not present self.stats_props.weights_dict[bandwidth_key][mod_format] = [round(float(sdn_data.path_weight), 2)] # Track demand realization ratio for partial grooming if self.engine_props.get("can_partially_serve"): demand_bw_key = str(sdn_data.bandwidth) if sdn_data.bandwidth is not None else None if demand_bw_key and demand_bw_key in self.stats_props.demand_realization_ratio: remaining_bw = getattr(sdn_data, "remaining_bw", 0) if remaining_bw is None: remaining_bw = 0 if sdn_data.bandwidth is None: raise ValueError("bandwidth must be set for demand realization tracking") original_bw = int(sdn_data.bandwidth) served_bw = original_bw - int(remaining_bw) realization_ratio = served_bw / original_bw self.stats_props.demand_realization_ratio[demand_bw_key].append(realization_ratio) self.stats_props.demand_realization_ratio["overall"].append(realization_ratio) self.stats_props.link_usage_dict = NetworkAnalyzer.get_link_usage_summary(network_spectrum_dict)
[docs] def update_utilization_dict(self, utilization_dict: dict[int, dict[str, Any]]) -> None: """ Update lightpath bandwidth utilization statistics. Called after lightpath release to track bandwidth utilization metrics per bandwidth/band/core combination. :param utilization_dict: Dictionary mapping lightpath_id to utilization info with keys: 'bit_rate', 'band', 'core', 'utilization' """ for lp_id in utilization_dict: lp_info = utilization_dict[lp_id] # Convert to int first to match initialization keys (e.g., '600' not '600.0') bit_rate_key = str(int(float(lp_info["bit_rate"]))) band = lp_info["band"] core = lp_info["core"] utilization = lp_info["utilization"] # Track per-bandwidth/band/core # If keys don't exist, KeyError will surface the configuration issue self.stats_props.lp_bw_utilization_dict[bit_rate_key][band][core].append(utilization) # Track overall self.stats_props.lp_bw_utilization_dict["overall"].append(utilization)
def _get_iter_means(self) -> None: for _, curr_snapshot in self.stats_props.snapshots_dict.items(): for snap_key, data_list in curr_snapshot.items(): # Skip if already a scalar if isinstance(data_list, (int, float)): continue # Handle list case (legacy path) if data_list: curr_snapshot[snap_key] = mean(data_list) else: curr_snapshot[snap_key] = None # Process weights_dict if it's properly structured if isinstance(self.stats_props.weights_dict, dict): for _bandwidth, mod_obj in self.stats_props.weights_dict.items(): if not isinstance(mod_obj, dict): continue for modulation, data_list in mod_obj.items(): # Skip if already processed (data_list is already a dict with # statistics) if isinstance(data_list, dict): continue # Modulation was never used if len(data_list) == 0: mod_obj[modulation] = { "mean": None, "std": None, "min": None, "max": None, } elif len(data_list) == 1: mod_obj[modulation] = { "mean": mean(data_list), "std": 0.0, "min": min(data_list), "max": max(data_list), } else: mod_obj[modulation] = { "mean": mean(data_list), "std": stdev(data_list), "min": min(data_list), "max": max(data_list), } # Process modulations_used_dict if it has expected structure mod_used_dict = self.stats_props.modulations_used_dict mod_entry = mod_used_dict.get(modulation, {}) mod_length = mod_entry.get("length", {}) if ( isinstance(mod_used_dict, dict) and modulation in mod_used_dict and isinstance(mod_entry, dict) and "length" in mod_entry and isinstance(mod_length, dict) ): for key, value in mod_length.items(): if not isinstance(value, list): continue # Filter out None values before calculating statistics filtered_value = [v for v in value if v is not None] if len(filtered_value) == 0: mod_length[key] = { "mean": None, "std": None, "min": None, "max": None, } else: if len(filtered_value) == 1: deviation = 0.0 else: deviation = stdev(filtered_value) self.stats_props.modulations_used_dict[modulation]["length"][key] = { "mean": round(float(mean(filtered_value)), 2), "std": round(float(deviation), 2), "min": round(float(min(filtered_value)), 2), "max": round(float(max(filtered_value)), 2), } # Process hop and snr (xt_cost remains as empty list like grooming-new) for route_spec in ["hop", "snr"]: # Skip SNR if snr_type is None if self.engine_props.get("snr_type") == "None" and route_spec == "snr": continue route_dict = mod_entry.get(route_spec, {}) if isinstance(route_dict, dict): for key, value in route_dict.items(): if not isinstance(value, list): continue if len(value) == 0: route_dict[key] = { "mean": None, "std": None, "min": None, "max": None, } else: deviation = 0.0 if len(value) == 1 else stdev(value) self.stats_props.modulations_used_dict[modulation][route_spec][key] = { "mean": round(float(mean(value)), 2), "std": round(float(deviation), 2), "min": round(float(min(value)), 2), "max": round(float(max(value)), 2), } # Process demand_realization_ratio if self.engine_props.get("can_partially_serve"): for bw, bw_obj in self.stats_props.demand_realization_ratio.items(): if not isinstance(bw_obj, list): continue if len(bw_obj) == 0: self.stats_props.demand_realization_ratio[bw] = { "mean": None, "std": None, "min": None, "max": None, } continue deviation = 0.0 if len(bw_obj) == 1 else stdev(bw_obj) num_full_served = sum(1 for val in bw_obj if val == 1) ratio_dict = { "mean": round(mean(bw_obj), 2), "std": round(deviation, 2), "min": min(bw_obj), "max": max(bw_obj), "num_full_served": num_full_served, } if bw == "overall": ratio_dict["ratio_full_served"] = num_full_served / self.engine_props["num_requests"] else: request_dist = self.engine_props.get("request_distribution", {}) if bw in request_dist: total_bw_requests = request_dist[bw] * self.engine_props["num_requests"] if total_bw_requests > 0: ratio_dict["ratio_full_served"] = num_full_served / total_bw_requests else: ratio_dict["ratio_full_served"] = 0 self.stats_props.demand_realization_ratio[bw] = ratio_dict # Process lp_bw_utilization_dict overall_was_converted = False for bw, bw_obj in self.stats_props.lp_bw_utilization_dict.items(): if bw == "overall": if isinstance(bw_obj, list): overall_was_converted = True if len(bw_obj) == 0: self.stats_props.lp_bw_utilization_dict[bw] = { "mean": None, "std": None, "min": None, "max": None, } else: deviation = 0.0 if len(bw_obj) == 1 else stdev(bw_obj) self.stats_props.lp_bw_utilization_dict[bw] = { "mean": round(mean(bw_obj), 2), "std": round(deviation, 2), "min": round(min(bw_obj), 2), "max": round(max(bw_obj), 2), } else: if isinstance(bw_obj, dict): for _band, band_obj in bw_obj.items(): if isinstance(band_obj, dict): for core, data_list in band_obj.items(): if isinstance(data_list, list): if len(data_list) == 0: band_obj[core] = { "mean": None, "std": None, "min": None, "max": None, } else: deviation = 0.0 if len(data_list) == 1 else stdev(data_list) band_obj[core] = { "mean": round(mean(data_list), 2), "std": round(deviation, 2), "min": round(min(data_list), 2), "max": round(max(data_list), 2), } # Track sim_lp_utilization_list from lp_bw_utilization_dict overall mean # Only append if we converted the list to dict in this call (prevents duplicates) if overall_was_converted: overall_util = self.stats_props.lp_bw_utilization_dict["overall"] if isinstance(overall_util, dict) and "mean" in overall_util: if overall_util["mean"] is not None: self.stats_props.sim_lp_utilization_list.append(overall_util["mean"])
[docs] def finalize_iteration_statistics(self) -> None: """ Updates relevant stats after an iteration has finished. :return: None """ if self.engine_props["num_requests"] == self.blocked_requests: self.stats_props.transponders_list.append(0) else: trans_mean = self.total_transponders / float(self.engine_props["num_requests"] - self.blocked_requests) self.stats_props.transponders_list.append(trans_mean) if self.blocked_requests > 0: # Check if already normalized (values are between 0 and 1) current_values = list(self.stats_props.block_reasons_dict.values()) is_already_normalized = all(isinstance(v, float) and 0 <= v <= 1 for v in current_values if v is not None and v > 0) if not is_already_normalized: for ( block_type, num_times, ) in self.stats_props.block_reasons_dict.items(): if num_times is not None: self.stats_props.block_reasons_dict[block_type] = num_times / float(self.blocked_requests) self._get_iter_means()
[docs] def calculate_confidence_interval(self) -> bool: """ Get the confidence interval for every iteration so far. :return: Whether the simulations should end for this erlang. :rtype: bool """ self.block_mean = mean(self.stats_props.simulation_blocking_list) self.bit_rate_block_mean = mean(self.stats_props.simulation_bitrate_blocking_list) if len(self.stats_props.simulation_blocking_list) <= 1: # With only 1 data point, variance is 0.0 (no variation) self.block_variance = 0.0 self.bit_rate_block_variance = 0.0 self.block_ci = 0.0 self.block_ci_percent = 0.0 self.bit_rate_block_ci = 0.0 self.bit_rate_block_ci_percent = 0.0 return False self.block_variance = variance(self.stats_props.simulation_blocking_list) self.bit_rate_block_variance = variance(self.stats_props.simulation_bitrate_blocking_list) # Calculate bit rate blocking CI separately (always calculate, even when blocking is 0) # When variance is 0, this evaluates to 0.0 try: bit_rate_block_ci = 1.645 * ( math.sqrt(self.bit_rate_block_variance) / math.sqrt(len(self.stats_props.simulation_bitrate_blocking_list)) ) self.bit_rate_block_ci = bit_rate_block_ci if self.bit_rate_block_mean > 0: bit_rate_block_ci_percent = ((2 * bit_rate_block_ci) / self.bit_rate_block_mean) * 100 self.bit_rate_block_ci_percent = bit_rate_block_ci_percent except (ZeroDivisionError, ValueError): # If calculation fails, leave as None pass if self.block_mean == 0.0: # No request blocking means no CI to calculate for request blocking # Request blocking CI values remain None (initialized state) return False try: # Using 1.645 for 90% confidence level block_ci_rate = 1.645 * (math.sqrt(self.block_variance) / math.sqrt(len(self.stats_props.simulation_blocking_list))) self.block_ci = block_ci_rate block_ci_percent = ((2 * block_ci_rate) / self.block_mean) * 100 self.block_ci_percent = block_ci_percent except ZeroDivisionError: return False # TODO: CI percent threshold (currently hardcoded to 5%) should be configurable # via engine_props. See core/TODO.md "Configuration System Integration". if block_ci_percent <= 5: iter_val = self.iteration iteration_display = (iter_val + 1) if iter_val is not None else 1 logger.info( "Confidence interval of %.2f%% reached. %d, ending for Erlang: %s", block_ci_percent, iteration_display, self.engine_props["erlang"], ) return True return False
[docs] def get_blocking_statistics(self) -> dict[str, Any]: """ Get all blocking-related statistics for persistence. :return: Dictionary containing blocking statistics :rtype: dict """ return { "block_mean": self.block_mean, "block_variance": self.block_variance, "block_ci": self.block_ci, "block_ci_percent": self.block_ci_percent, "bit_rate_block_mean": self.bit_rate_block_mean, "bit_rate_block_variance": self.bit_rate_block_variance, "bit_rate_block_ci": self.bit_rate_block_ci, "bit_rate_block_ci_percent": self.bit_rate_block_ci_percent, "iteration": self.iteration, }
# Backward compatibility methods
[docs] def end_iter_update(self) -> None: """ Backward compatibility wrapper for finalize_iteration_statistics. """ return self.finalize_iteration_statistics()
[docs] def save_stats(self, base_fp: str = "data") -> None: """ Backward compatibility method for saving statistics. :param base_fp: Base file path for saving """ # Import here to avoid circular imports from fusion.core.persistence import ( StatsPersistence, # pylint: disable=import-outside-toplevel ) # Ensure iteration is set to 0 if not initialized (for backward compatibility) if self.iteration is None: self.iteration = 0 persistence = StatsPersistence(engine_props=self.engine_props, sim_info=self.sim_info) # Prepare save dict with iter_stats structure save_dict: dict[str, Any] = {"iter_stats": {}} # Get blocking statistics blocking_stats = self.get_blocking_statistics() # Save using the persistence module persistence.save_stats( stats_dict=save_dict, stats_props=self.stats_props, blocking_stats=blocking_stats, base_file_path=base_fp, )
[docs] def record_recovery_event( self, failure_time: float, recovery_time: float, affected_requests: int, recovery_type: str, # "protection" or "restoration" ) -> None: """ Record a recovery event. :param failure_time: Time of failure occurrence :type failure_time: float :param recovery_time: Time when recovery completed :type recovery_time: float :param affected_requests: Number of affected requests :type affected_requests: int :param recovery_type: Type of recovery mechanism :type recovery_type: str Example: >>> stats.record_recovery_event( ... failure_time=100.0, ... recovery_time=100.05, # 50ms later ... affected_requests=5, ... recovery_type='protection' ... ) """ # Compute recovery duration and round to avoid floating-point precision errors recovery_duration_ms = round((recovery_time - failure_time) * 1000, 10) # Store recovery time self.recovery_times_ms.append(recovery_duration_ms) # Store full event details event = { "failure_time": failure_time, "recovery_time": recovery_time, "recovery_duration_ms": recovery_duration_ms, "affected_requests": affected_requests, "recovery_type": recovery_type, } self.recovery_events.append(event) logger.info( "Recovery event: type=%s, duration=%.2fms, affected=%d", recovery_type, recovery_duration_ms, affected_requests, )
[docs] def get_recovery_stats(self) -> dict[str, float]: """ Get recovery time statistics. Computes mean, P95, and max recovery times across all recovery events. :return: Dict with recovery statistics :rtype: dict[str, float] Example: >>> stats = get_recovery_stats() >>> print(stats) { 'mean_ms': 52.3, 'p95_ms': 98.5, 'max_ms': 105.2, 'count': 12 } """ if not self.recovery_times_ms: return {"mean_ms": 0.0, "p95_ms": 0.0, "max_ms": 0.0, "count": 0} return { "mean_ms": float(np.mean(self.recovery_times_ms)), "p95_ms": float(np.percentile(self.recovery_times_ms, 95)), "max_ms": float(np.max(self.recovery_times_ms)), "count": len(self.recovery_times_ms), }
[docs] def compute_failure_window_bp( self, failure_time: float, arrival_times: list[float], blocked_requests: list[int], ) -> float: """ Compute blocking probability within failure window. Measures BP in the window [failure_time, failure_time + window_size] where window_size is specified in number of arrivals. :param failure_time: Failure occurrence time :type failure_time: float :param arrival_times: List of all request arrival times :type arrival_times: list[float] :param blocked_requests: List of blocked request indices :type blocked_requests: list[int] :return: BP within failure window :rtype: float Example: >>> bp = stats.compute_failure_window_bp( ... failure_time=100.0, ... arrival_times=all_arrivals, ... blocked_requests=blocked_ids ... ) >>> print(f"Failure window BP: {bp:.4f}") 0.0823 """ # Find arrival index at failure time failure_index = np.searchsorted(arrival_times, failure_time) # Define window [failure_index, failure_index + window_size] window_end = min(failure_index + self.failure_window_size, len(arrival_times)) # Count arrivals and blocks in window window_arrivals = window_end - failure_index window_blocks = sum(1 for req_id in blocked_requests if failure_index <= req_id < window_end) if window_arrivals == 0: return 0.0 bp_value: float = float(window_blocks) / float(window_arrivals) self.failure_window_bp.append(bp_value) logger.info( "Failure window BP: %.4f (%d/%d blocked)", bp_value, window_blocks, window_arrivals, ) return bp_value
[docs] def get_failure_window_stats(self) -> dict[str, float]: """ Get failure window BP statistics. :return: Dict with mean and P95 failure window BP :rtype: dict[str, float] """ if not self.failure_window_bp: return {"mean": 0.0, "p95": 0.0, "count": 0} return { "mean": float(np.mean(self.failure_window_bp)), "p95": float(np.percentile(self.failure_window_bp, 95)), "count": len(self.failure_window_bp), }
[docs] def get_recovery_csv_row(self) -> dict[str, Any]: """ Export recovery statistics as CSV row. :return: Dict with all recovery metric values :rtype: dict[str, Any] """ recovery_stats = self.get_recovery_stats() window_stats = self.get_failure_window_stats() return { # Recovery metrics "recovery_time_mean_ms": recovery_stats["mean_ms"], "recovery_time_p95_ms": recovery_stats["p95_ms"], "recovery_time_max_ms": recovery_stats["max_ms"], "recovery_event_count": recovery_stats["count"], # Failure window metrics "bp_window_fail_mean": window_stats["mean"], "bp_window_fail_p95": window_stats["p95"], "failure_window_count": window_stats["count"], }
# Fragmentation and Decision Time Methods
[docs] def compute_fragmentation_proxy( self, path: list[int], network_spectrum_dict: dict[tuple[Any, Any], dict[str, Any]], ) -> float: """ Compute fragmentation proxy for a path. Fragmentation = 1 - (largest_contiguous_block / total_free_slots) Higher values indicate more fragmentation. :param path: Path node list :type path: list[int] :param network_spectrum_dict: Spectrum state :type network_spectrum_dict: dict[tuple[Any, Any], dict[str, Any]] :return: Fragmentation score [0, 1] :rtype: float Example: >>> frag = stats.compute_fragmentation_proxy(path, spectrum_dict) >>> print(f"Fragmentation: {frag:.3f}") 0.347 """ total_free = 0 largest_contig = 0 for i in range(len(path) - 1): link = (path[i], path[i + 1]) reverse_link = (path[i + 1], path[i]) link_spectrum = network_spectrum_dict.get(link, network_spectrum_dict.get(reverse_link, {})) if not link_spectrum: continue # Get cores matrix cores_matrix = link_spectrum.get("cores_matrix", []) if not cores_matrix: continue # Process first core (for simplicity) slots = cores_matrix[0] if len(cores_matrix) > 0 else [] # Find free blocks free_blocks = self._find_free_blocks(np.asarray(slots)) if free_blocks: link_total = sum(block[1] - block[0] for block in free_blocks) link_largest = max(block[1] - block[0] for block in free_blocks) total_free += link_total largest_contig = max(largest_contig, link_largest) if total_free == 0: return 1.0 # Fully fragmented frag = 1.0 - (largest_contig / total_free) return frag
def _find_free_blocks(self, slots: np.ndarray) -> list[tuple[int, int]]: """ Find contiguous free blocks in spectrum. :param slots: Spectrum slot array :type slots: np.ndarray :return: List of (start, end) tuples for free blocks :rtype: list[tuple[int, int]] """ blocks = [] start = None for i, slot in enumerate(slots): if slot == 0: # Free if start is None: start = i else: # Occupied if start is not None: blocks.append((start, i)) start = None if start is not None: blocks.append((start, len(slots))) return blocks
[docs] def record_fragmentation( self, path: list[int], network_spectrum_dict: dict[tuple[Any, Any], dict[str, Any]], ) -> None: """ Record fragmentation score for a path. :param path: Path node list :type path: list[int] :param network_spectrum_dict: Spectrum state :type network_spectrum_dict: dict[tuple[Any, Any], dict[str, Any]] """ frag_score = self.compute_fragmentation_proxy(path, network_spectrum_dict) self.fragmentation_scores.append(frag_score)
[docs] def record_decision_time(self, decision_time_ms: float) -> None: """ Record policy decision time. :param decision_time_ms: Decision time in milliseconds :type decision_time_ms: float """ self.decision_times_ms.append(decision_time_ms)
[docs] def get_fragmentation_stats(self) -> dict[str, float]: """ Get fragmentation statistics. :return: Dict with mean and P95 fragmentation scores :rtype: dict[str, float] """ if not self.fragmentation_scores: return {"mean": 0.0, "p95": 0.0, "count": 0} return { "mean": float(np.mean(self.fragmentation_scores)), "p95": float(np.percentile(self.fragmentation_scores, 95)), "count": len(self.fragmentation_scores), }
[docs] def get_decision_time_stats(self) -> dict[str, float]: """ Get decision time statistics. :return: Dict with mean and P95 decision times :rtype: dict[str, float] """ if not self.decision_times_ms: return {"mean": 0.0, "p95": 0.0, "count": 0} return { "mean": float(np.mean(self.decision_times_ms)), "p95": float(np.percentile(self.decision_times_ms, 95)), "count": len(self.decision_times_ms), }
[docs] def to_csv_row(self) -> dict[str, Any]: """ Export all statistics as CSV row. Includes standard metrics plus survivability-specific metrics. :return: Dict with all metric values :rtype: dict[str, Any] """ # Get survivability stats recovery_stats = self.get_recovery_stats() window_stats = self.get_failure_window_stats() frag_stats = self.get_fragmentation_stats() decision_stats = self.get_decision_time_stats() # Get failure settings failure_settings = self.engine_props.get("failure_settings", {}) routing_settings = self.engine_props.get("routing_settings", {}) rl_settings = self.engine_props.get("offline_rl_settings", {}) return { # Experiment parameters "topology": self.engine_props.get("network", "unknown"), "load": self.engine_props.get("erlang", 0), "failure_type": failure_settings.get("failure_type", "none"), "k_paths": routing_settings.get("k_paths", 1), "policy": rl_settings.get("policy_type", "ksp_ff"), "seed": self.engine_props.get("seed", 0), # Standard metrics "bp_overall": self.block_mean if self.block_mean is not None else 0.0, "bp_variance": self.block_variance if self.block_variance is not None else 0.0, "bp_ci_percent": (self.block_ci_percent if self.block_ci_percent is not None else 0.0), "bit_rate_bp": (self.bit_rate_block_mean if self.bit_rate_block_mean is not None else 0.0), # Failure window metrics "bp_window_fail_mean": window_stats["mean"], "bp_window_fail_p95": window_stats["p95"], # Recovery metrics "recovery_time_mean_ms": recovery_stats["mean_ms"], "recovery_time_p95_ms": recovery_stats["p95_ms"], "recovery_time_max_ms": recovery_stats["max_ms"], # Fragmentation "frag_proxy_mean": frag_stats["mean"], "frag_proxy_p95": frag_stats["p95"], # Decision times "decision_time_mean_ms": decision_stats["mean"], "decision_time_p95_ms": decision_stats["p95"], }
[docs] def record_arrival( self, request: Any, # fusion.domain.request.Request result: Any, # fusion.domain.results.AllocationResult network_state: Any, # fusion.domain.network_state.NetworkState was_rollback: bool = False, ) -> None: """ Record statistics from orchestrator allocation result. This method provides the same functionality as iter_update but works with domain objects instead of legacy sdn_props. :param request: The Request that was processed :type request: Any :param result: The AllocationResult from orchestrator :type result: Any :param network_state: Current NetworkState for spectrum data :type network_state: Any :param was_rollback: True if this was a rollback (skip utilization tracking) :type was_rollback: bool """ # Increment total requests for tracking self.total_requests += 1 if not result.success: self._record_blocked_request_new(request, result) else: self._record_successful_allocation_new(request, result, network_state, was_rollback) # Take periodic snapshot self._maybe_take_snapshot_new(network_state)
def _record_blocked_request_new( self, request: Any, # fusion.domain.request.Request result: Any, # fusion.domain.results.AllocationResult ) -> None: """ Record stats for a blocked request (orchestrator path). Maps SNR_RECHECK_FAIL to 'xt_threshold' for backwards compatibility. :param request: The Request that was blocked :type request: Any :param result: The AllocationResult with block_reason :type result: Any """ self.blocked_requests += 1 # Track bandwidth blocking if hasattr(request, "bandwidth_gbps"): self.bit_rate_blocked += int(request.bandwidth_gbps) self.bit_rate_request += int(request.bandwidth_gbps) # Track by bandwidth class bw_class = str(request.bandwidth_gbps) if bw_class in self.stats_props.bandwidth_blocking_dict: self.stats_props.bandwidth_blocking_dict[bw_class] += 1 # Update block reason if result.block_reason: reason_key = self._map_block_reason_new(result.block_reason) current = self.stats_props.block_reasons_dict.get(reason_key) if current is None: self.stats_props.block_reasons_dict[reason_key] = 1 else: self.stats_props.block_reasons_dict[reason_key] = current + 1 # TODO: was_rollback parameter is accepted but not yet used. Should skip # utilization tracking for rolled-back lightpaths per P3.6 Gap 5 coverage. def _record_successful_allocation_new( self, request: Any, # fusion.domain.request.Request result: Any, # fusion.domain.results.AllocationResult network_state: Any, # fusion.domain.network_state.NetworkState was_rollback: bool = False, ) -> None: """ Record stats for a successful allocation (orchestrator path). :param request: The Request that was allocated :type request: Any :param result: The AllocationResult with allocation details :type result: Any :param network_state: Current NetworkState :type network_state: Any :param was_rollback: True if rollback occurred (skip utilization tracking) :type was_rollback: bool """ # Track allocation type counters if result.is_groomed: self.groomed_requests += 1 if getattr(result, "is_sliced", False): self.sliced_requests += 1 if getattr(result, "is_protected", False): self.protected_requests += 1 # Skip further stats tracking for fully groomed requests (v5 behavior) if result.is_groomed and not result.is_partially_groomed: if hasattr(request, "bandwidth_gbps"): self.bit_rate_request += int(request.bandwidth_gbps) return # Track bit rate for requests if hasattr(request, "bandwidth_gbps"): self.bit_rate_request += int(request.bandwidth_gbps) # Match legacy behavior: early return if remaining bandwidth but no new LPs created # (partial grooming succeeded but new allocation failed) remaining_bw = request.bandwidth_gbps - (result.total_bandwidth_allocated_gbps or 0) if remaining_bw > 0: self.bit_rate_blocked += remaining_bw if not result.lightpaths_created: return # Resource metrics - transponders (count of lightpaths created) transponders = len(result.lightpaths_created) if result.lightpaths_created else 0 self.total_transponders += transponders # Track path metrics once (using first lightpath for path/hops) first_lp_details = self._get_lightpath_details_new(result, network_state) if first_lp_details.get("path"): path = first_lp_details["path"] num_hops = len(path) - 1 self.stats_props.hops_list.append(float(num_hops)) path_len = self._calculate_path_length_new(path, network_state) if path_len is not None: self.stats_props.lengths_list.append(round(float(path_len), 2)) # Track path index from allocation result # Count when ANY lightpath is used (created or groomed) - matches legacy modulation_list check path_idx = getattr(result, "path_index", 0) has_lightpaths = result.lightpaths_created or getattr(result, "lightpaths_groomed", ()) if has_lightpaths and 0 <= path_idx < len(self.stats_props.path_index_list): self.stats_props.path_index_list[path_idx] += 1 # Track demand realization ratio for partial grooming (matches legacy behavior) if self.engine_props.get("can_partially_serve"): bandwidth_key = str(request.bandwidth_gbps) if hasattr(request, "bandwidth_gbps") else None if bandwidth_key and bandwidth_key in self.stats_props.demand_realization_ratio: original_bw = int(request.bandwidth_gbps) # Calculate served bandwidth from allocation result if result.total_bandwidth_allocated_gbps: served_bw = int(result.total_bandwidth_allocated_gbps) else: served_bw = original_bw realization_ratio = served_bw / original_bw if original_bw > 0 else 0 self.stats_props.demand_realization_ratio[bandwidth_key].append(realization_ratio) self.stats_props.demand_realization_ratio["overall"].append(realization_ratio) # Track per-lightpath stats (core, modulation, weights) for ALL lightpaths # This is critical for sliced requests where multiple lightpaths are created if result.lightpaths_created and network_state is not None: # Collect SNR values for all lightpaths in this request # Legacy behavior: append mean of all lightpaths' SNR for a single request request_snr_values: list[float] = [] # Legacy compatibility: Build snr_list with failed attempt values first # Legacy's _update_request_statistics adds SNR to snr_list before SNR recheck, # and if recheck fails, the value stays. This causes snr_list[i] to return # "wrong" values from failed attempts. failed_snr = list(getattr(result, "failed_attempt_snr_values", ()) or ()) for lp_idx, lp_id in enumerate(result.lightpaths_created): lp = network_state.get_lightpath(lp_id) if lp is None: continue # Track core usage (like legacy _handle_iter_lists) core = getattr(lp, "core", None) if core is not None: if core not in self.stats_props.cores_dict: self.stats_props.cores_dict[core] = 0 self.stats_props.cores_dict[core] += 1 # Get lightpath details for modulation tracking modulation = getattr(lp, "modulation", None) band = getattr(lp, "band", None) snr_db = getattr(lp, "snr_db", None) lp_path = getattr(lp, "path", None) # Use LIGHTPATH bandwidth, not request bandwidth (critical for slicing) lp_bandwidth = getattr(lp, "total_bandwidth_gbps", None) # Legacy compatibility: Use failed attempt SNR if available # Legacy's snr_list has failed attempt values at lower indices # When iterating modulation_list (which has only successful values), # Legacy reads snr_list[i] which may be from a failed attempt snr_for_tracking = snr_db if failed_snr and lp_idx < len(failed_snr): # Use failed attempt SNR (matches Legacy's buggy index behavior) snr_for_tracking = failed_snr[lp_idx] # Get path weight from lightpath (routing weight, not raw length) # This preserves the routing algorithm's weight (e.g., XT-aware normalized cost) path_weight = getattr(lp, "path_weight_km", None) lp_num_hops = len(lp_path) - 1 if lp_path else None # Track modulation with correct lightpath bandwidth if modulation: self._increment_modulation_count( modulation, bandwidth_gbps=lp_bandwidth, band=band, path_weight=path_weight, num_hops=lp_num_hops, snr_value=snr_for_tracking, ) # Collect SNR for this lightpath (use snr_for_tracking for Legacy compatibility) if snr_for_tracking is not None: request_snr_values.append(snr_for_tracking) # Append mean of all lightpaths' SNR for this request (legacy behavior) if request_snr_values: self.stats_props.snr_list.append(mean(request_snr_values)) else: # Fallback for non-lightpath allocations (shouldn't happen often) if first_lp_details.get("modulation"): bandwidth = request.bandwidth_gbps if hasattr(request, "bandwidth_gbps") else None # Use stored routing weight, not recalculated path length path_weight = first_lp_details.get("path_weight_km") fallback_num_hops = len(first_lp_details["path"]) - 1 if first_lp_details.get("path") else None self._increment_modulation_count( first_lp_details["modulation"], bandwidth_gbps=bandwidth, band=first_lp_details.get("band"), path_weight=path_weight, num_hops=fallback_num_hops, snr_value=first_lp_details.get("snr_db"), ) if first_lp_details.get("snr_db") is not None: self.stats_props.snr_list.append(first_lp_details["snr_db"]) def _get_lightpath_details_new( self, result: Any, # fusion.domain.results.AllocationResult network_state: Any, # fusion.domain.network_state.NetworkState ) -> dict[str, Any]: """ Extract lightpath details from result and network state. :param result: AllocationResult from orchestrator :type result: Any :param network_state: NetworkState for lightpath lookup :type network_state: Any :return: Dict with path, modulation, snr_db, etc. :rtype: dict[str, Any] """ details: dict[str, Any] = { "path": None, "modulation": None, "snr_db": None, "crosstalk_db": None, "core": None, "band": None, "start_slot": None, "end_slot": None, "path_weight_km": None, } # Try to get from lightpaths_created if result.lightpaths_created and network_state is not None: lp_id = result.lightpaths_created[0] lp = network_state.get_lightpath(lp_id) if lp is not None: details["path"] = getattr(lp, "path", None) details["modulation"] = getattr(lp, "modulation", None) details["snr_db"] = getattr(lp, "snr_db", None) details["crosstalk_db"] = getattr(lp, "crosstalk_db", None) details["core"] = getattr(lp, "core", None) details["band"] = getattr(lp, "band", None) details["start_slot"] = getattr(lp, "start_slot", None) details["end_slot"] = getattr(lp, "end_slot", None) details["path_weight_km"] = getattr(lp, "path_weight_km", None) # Fall back to result fields if lightpath lookup failed if details["modulation"] is None and result.modulations: details["modulation"] = result.modulations[0] if details["snr_db"] is None and result.snr_values: details["snr_db"] = result.snr_values[0] return details def _map_block_reason_new(self, reason: Any) -> str: """ Map BlockReason enum to stats dict key. SNR_RECHECK_FAIL maps to 'xt_threshold' for backwards compatibility. :param reason: BlockReason enum value :type reason: Any :return: Legacy stats dict key (distance, congestion, xt_threshold, failure) :rtype: str """ from fusion.domain.request import BlockReason mapping = { BlockReason.NO_PATH: "distance", BlockReason.DISTANCE: "distance", BlockReason.CONGESTION: "congestion", BlockReason.SNR_THRESHOLD: "xt_threshold", BlockReason.SNR_RECHECK_FAIL: "xt_threshold", # P3.6 Gap 5 BlockReason.XT_THRESHOLD: "xt_threshold", BlockReason.FAILURE: "failure", BlockReason.LINK_FAILURE: "failure", BlockReason.NODE_FAILURE: "failure", BlockReason.GROOMING_FAIL: "congestion", BlockReason.SLICING_FAIL: "congestion", BlockReason.PROTECTION_FAIL: "congestion", } return mapping.get(reason, "congestion") def _calculate_path_length_new( self, path: tuple[str, ...] | list[str], network_state: Any, # fusion.domain.network_state.NetworkState ) -> float | None: """ Calculate total path length in km. :param path: Tuple or list of node IDs :type path: tuple[str, ...] | list[str] :param network_state: NetworkState with topology :type network_state: Any :return: Total path length in km or None if cannot calculate :rtype: float | None """ if len(path) < 2: return 0.0 if network_state is None: return None topology = getattr(network_state, "topology", None) if topology is None: return None total_length = 0.0 for i in range(len(path) - 1): u, v = path[i], path[i + 1] if topology.has_edge(u, v): edge_data = topology.edges[u, v] total_length += edge_data.get("length", edge_data.get("weight", 0.0)) elif topology.has_edge(v, u): edge_data = topology.edges[v, u] total_length += edge_data.get("length", edge_data.get("weight", 0.0)) return total_length
[docs] def record_grooming_rollback( self, request: Any, # fusion.domain.request.Request rolled_back_lightpath_ids: list[int], network_state: Any, # fusion.domain.network_state.NetworkState ) -> None: """ Record that grooming was rolled back - adjust utilization stats. Called when partial grooming succeeds but new lightpath allocation fails, requiring rollback of the groomed bandwidth. :param request: The Request being processed :type request: Any :param rolled_back_lightpath_ids: IDs of lightpaths that had grooming rolled back :type rolled_back_lightpath_ids: list[int] :param network_state: Current NetworkState :type network_state: Any """ logger.debug( "Grooming rollback recorded for request %s on lightpaths %s", getattr(request, "request_id", "unknown"), rolled_back_lightpath_ids, )
def _maybe_take_snapshot_new( self, network_state: Any, # fusion.domain.network_state.NetworkState ) -> None: """ Take periodic network snapshot (orchestrator path). Snapshots are taken at configurable intervals to track simulation progress over time. :param network_state: Current NetworkState for spectrum data :type network_state: Any """ # Check if snapshots are enabled if not self.engine_props.get("save_snapshots", False): return # Check if it's time for a snapshot snapshot_interval = getattr(self, "snapshot_interval", 100) if self.total_requests % snapshot_interval != 0: return # Skip if no requests yet if self.total_requests == 0: return snapshot = { "request_number": self.total_requests, "blocked_count": self.blocked_requests, "spectrum_utilization": self._calculate_spectrum_utilization_new(network_state), } self.stats_props.snapshots_dict[self.total_requests] = snapshot def _calculate_spectrum_utilization_new( self, network_state: Any, # fusion.domain.network_state.NetworkState ) -> float: """ Calculate overall spectrum utilization from network state. :param network_state: Current NetworkState :type network_state: Any :return: Utilization as fraction (0.0 to 1.0) :rtype: float """ if network_state is None: return 0.0 total_slots = 0 used_slots = 0 # Get spectrum utilization from network state network_spectrum_dict = getattr(network_state, "network_spectrum_dict", None) if network_spectrum_dict is None: return 0.0 for _link_key, link_data in network_spectrum_dict.items(): cores_matrix = link_data.get("cores_matrix", {}) for _band, cores in cores_matrix.items(): for core_spectrum in cores: if hasattr(core_spectrum, "__len__"): total_slots += len(core_spectrum) used_slots += sum(1 for s in core_spectrum if s != 0) if total_slots == 0: return 0.0 return used_slots / total_slots def _record_bandwidth_new( self, bandwidth_gbps: int, blocked: bool, ) -> None: """ Record bandwidth for blocking statistics (orchestrator path). :param bandwidth_gbps: Bandwidth of the request :type bandwidth_gbps: int :param blocked: True if request was blocked :type blocked: bool """ # Track total requested (using existing bit_rate_request) self.bit_rate_request += bandwidth_gbps if blocked: # Track blocked bandwidth (using existing bit_rate_blocked) self.bit_rate_blocked += bandwidth_gbps # Track by bandwidth class bw_class = str(bandwidth_gbps) current = self.stats_props.bandwidth_blocking_dict.get(bw_class, 0) self.stats_props.bandwidth_blocking_dict[bw_class] = current + 1 def _increment_modulation_count( self, modulation: str, bandwidth_gbps: int | None = None, band: str | None = None, path_weight: float | None = None, num_hops: int | None = None, snr_value: float | None = None, ) -> None: """ Increment modulation usage count and track detailed metrics (orchestrator path). Updates modulations_used_dict and weights_dict to track modulation usage including length, hop, SNR, and XT cost statistics per band. :param modulation: Modulation format name (e.g., 'QPSK', '16-QAM') :type modulation: str :param bandwidth_gbps: Optional bandwidth for bandwidth-keyed tracking :type bandwidth_gbps: int | None :param band: Optional band for band-specific tracking :type band: str | None :param path_weight: Path length/weight in km for length tracking :type path_weight: float | None :param num_hops: Number of hops for hop tracking :type num_hops: int | None :param snr_value: SNR or XT cost value for quality tracking :type snr_value: float | None """ if modulation is None: return mod_dict = self.stats_props.modulations_used_dict # Track by bandwidth key if provided if bandwidth_gbps is not None: bandwidth_key = str(int(bandwidth_gbps)) bw_dict = mod_dict.get(bandwidth_key) if isinstance(bw_dict, dict): if modulation in bw_dict: bw_dict[modulation] += 1 else: bw_dict[modulation] = 1 # Also track in weights_dict (path weight per bandwidth/modulation) if path_weight is not None: weights_dict = self.stats_props.weights_dict if bandwidth_key in weights_dict: if modulation in weights_dict[bandwidth_key]: weights_dict[bandwidth_key][modulation].append(round(float(path_weight), 2)) else: weights_dict[bandwidth_key][modulation] = [round(float(path_weight), 2)] # Track by modulation name with band data_mod_dict = mod_dict.get(modulation) if isinstance(data_mod_dict, dict): # Increment band count if band and band in data_mod_dict: data_mod_dict[band] += 1 elif band: data_mod_dict[band] = 1 # Track length if path_weight is not None: length_dict = data_mod_dict.get("length") if length_dict and isinstance(length_dict, dict): if band and band in length_dict: length_dict[band].append(path_weight) if "overall" in length_dict: length_dict["overall"].append(path_weight) # Track hop count if num_hops is not None: hop_dict = data_mod_dict.get("hop") if hop_dict and isinstance(hop_dict, dict): if band and band in hop_dict: hop_dict[band].append(num_hops) if "overall" in hop_dict: hop_dict["overall"].append(num_hops) # Track SNR or XT cost if snr_value is not None: snr_type = self.engine_props.get("snr_type") if snr_type == "xt_calculation": xt_dict = data_mod_dict.get("xt_cost") if xt_dict and isinstance(xt_dict, dict): if band and band in xt_dict: xt_dict[band].append(snr_value) if "overall" in xt_dict: xt_dict["overall"].append(snr_value) else: snr_dict = data_mod_dict.get("snr") if snr_dict and isinstance(snr_dict, dict): if band and band in snr_dict: snr_dict[band].append(snr_value) if "overall" in snr_dict: snr_dict["overall"].append(snr_value)