Source code for fusion.analysis.network_analysis

"""
Network analysis utilities for FUSION.

Provides functions for analyzing network topology, link usage,
and other network-related metrics.

TODO (legacy migration): This module currently expects the legacy dict format
for network_spectrum (dict mapping (src, dst) tuples to dicts with 'cores_matrix',
'usage_count', 'throughput', etc.). This format is provided by
NetworkState.network_spectrum_dict for backwards compatibility with v5.5.0.

This module should be updated to work directly with NetworkState and LinkSpectrum
objects (v6.1.0) instead of raw dicts. See fusion/domain/network_state.py for
the new data structures.
"""

from typing import Any

import numpy as np

from fusion.utils.logging_config import get_logger

logger = get_logger(__name__)


[docs] class NetworkAnalyzer: """ Analyzes network topology and usage patterns. This class provides utilities for examining network characteristics, link utilization, and traffic patterns. """
[docs] def __init__(self) -> None: """Initialize the network analyzer."""
[docs] @staticmethod def analyze_network_congestion(network_spectrum: dict, specific_paths: list | None = None) -> dict[str, Any]: """ Analyze network congestion levels. :param network_spectrum: Network spectrum database :param specific_paths: Optional specific paths to analyze :return: Congestion analysis results """ total_occupied_slots = 0 total_guard_slots = 0 active_requests = set() links_analyzed = 0 # Skip by two because the link is bidirectional for link in list(network_spectrum.keys())[::2]: if specific_paths is not None and link not in specific_paths: continue link_data = network_spectrum[link] links_analyzed += 1 for core in link_data["cores_matrix"]: requests = set(core[core > 0]) active_requests.update(requests) total_occupied_slots += len(np.where(core != 0)[0]) total_guard_slots += len(np.where(core < 0)[0]) return { "total_occupied_slots": total_occupied_slots, "total_guard_slots": total_guard_slots, "active_requests": len(active_requests), "links_analyzed": links_analyzed, "avg_occupied_per_link": (total_occupied_slots / links_analyzed if links_analyzed > 0 else 0), "avg_guard_per_link": (total_guard_slots / links_analyzed if links_analyzed > 0 else 0), }
[docs] @staticmethod def get_network_utilization_stats(network_spectrum: dict) -> dict[str, float]: """ Calculate network-wide utilization statistics. :param network_spectrum: Network spectrum database :return: Dictionary of utilization statistics """ total_slots = 0 occupied_slots = 0 link_utilization_list = [] # Process each bidirectional link once processed_links = set() for (src, dst), link_data in network_spectrum.items(): link_key = f"{min(src, dst)}-{max(src, dst)}" if link_key in processed_links: continue processed_links.add(link_key) cores_matrix = link_data.get("cores_matrix", []) for core in cores_matrix: core_total = len(core) core_occupied = len(np.where(core != 0)[0]) total_slots += core_total occupied_slots += core_occupied if core_total > 0: link_utilization_list.append(core_occupied / core_total) overall_utilization = occupied_slots / total_slots if total_slots > 0 else 0.0 return { "overall_utilization": overall_utilization, "average_link_utilization": (float(np.mean(link_utilization_list)) if link_utilization_list else 0.0), "max_link_utilization": (float(np.max(link_utilization_list)) if link_utilization_list else 0.0), "min_link_utilization": (float(np.min(link_utilization_list)) if link_utilization_list else 0.0), "total_slots": total_slots, "occupied_slots": occupied_slots, "links_processed": len(processed_links), }