Source code for fusion.interfaces.factory

"""
Factory classes for creating algorithm instances using interfaces.
"""

from typing import Any

from fusion.core.properties import SDNProps
from fusion.interfaces.router import AbstractRoutingAlgorithm
from fusion.interfaces.snr import AbstractSNRMeasurer
from fusion.interfaces.spectrum import AbstractSpectrumAssigner

# Import registries (avoid circular imports by importing inside functions)
from fusion.modules.snr.registry import create_snr_algorithm
from fusion.modules.spectrum.registry import create_spectrum_algorithm


[docs] class AlgorithmFactory: """Factory for creating algorithm instances using the interface system."""
[docs] @staticmethod def create_routing_algorithm(name: str, engine_props: dict, sdn_props: SDNProps) -> AbstractRoutingAlgorithm: """ Create a routing algorithm instance. :param name: Name of the routing algorithm :type name: str :param engine_props: Engine configuration properties :type engine_props: dict :param sdn_props: SDN controller properties :type sdn_props: SDNProps :return: Configured routing algorithm instance :rtype: AbstractRoutingAlgorithm :raises ValueError: If algorithm name is not found """ try: from fusion.modules.routing.registry import ( create_algorithm as create_routing_algorithm, ) result = create_routing_algorithm(name, engine_props, sdn_props) return result # type: ignore[no-any-return] except KeyError as e: available = [ "k_shortest_path", "congestion_aware", "fragmentation_aware", "nli_aware", "xt_aware", ] raise ValueError(f"Unknown routing algorithm '{name}'. Available: {available}") from e
[docs] @staticmethod def create_spectrum_algorithm(name: str, engine_props: dict, sdn_props: SDNProps, route_props: object) -> AbstractSpectrumAssigner: """ Create a spectrum assignment algorithm instance. :param name: Name of the spectrum algorithm :type name: str :param engine_props: Engine configuration properties :type engine_props: dict :param sdn_props: SDN controller properties :type sdn_props: SDNProps :param route_props: Routing properties :type route_props: object :return: Configured spectrum assignment algorithm instance :rtype: AbstractSpectrumAssigner :raises ValueError: If algorithm name is not found """ try: return create_spectrum_algorithm(name, engine_props, sdn_props, route_props) except KeyError as e: available = ["first_fit", "best_fit", "last_fit"] raise ValueError(f"Unknown spectrum algorithm '{name}'. Available: {available}") from e
[docs] @staticmethod def create_snr_algorithm( name: str, engine_props: dict, sdn_props: object, spectrum_props: object, route_props: object, ) -> AbstractSNRMeasurer: """ Create an SNR measurement algorithm instance. :param name: Name of the SNR algorithm :type name: str :param engine_props: Engine configuration properties :type engine_props: dict :param sdn_props: SDN controller properties :type sdn_props: object :param spectrum_props: Spectrum assignment properties :type spectrum_props: object :param route_props: Routing properties :type route_props: object :return: Configured SNR measurement algorithm instance :rtype: AbstractSNRMeasurer :raises ValueError: If algorithm name is not found """ try: return create_snr_algorithm(name, engine_props, sdn_props, spectrum_props, route_props) except KeyError as e: available = ["standard_snr"] raise ValueError(f"Unknown SNR algorithm '{name}'. Available: {available}") from e
[docs] class SimulationPipeline: """Complete simulation pipeline using interface-based algorithms."""
[docs] def __init__(self, config: dict[str, Any]): """ Initialize the simulation pipeline. :param config: Configuration dictionary containing algorithm selections and parameters :type config: Dict[str, Any] """ self.config = config self.engine_props = config.get("engine_props", {}) self.sdn_props = config.get("sdn_props") self.route_props = config.get("route_props") self.spectrum_props = config.get("spectrum_props") # Create algorithm instances with error handling try: self.routing_algorithm = self._create_routing_algorithm() self.spectrum_algorithm = self._create_spectrum_algorithm() self.snr_algorithm = self._create_snr_algorithm() except (ValueError, KeyError, RuntimeError) as e: raise RuntimeError(f"Failed to initialize simulation pipeline: {e}") from e
def _create_routing_algorithm(self) -> AbstractRoutingAlgorithm: """Create routing algorithm from configuration.""" routing_name = self.config.get("routing_algorithm", "k_shortest_path") return AlgorithmFactory.create_routing_algorithm( routing_name, self.engine_props, self.sdn_props, # type: ignore[arg-type] ) def _create_spectrum_algorithm(self) -> AbstractSpectrumAssigner: """Create spectrum assignment algorithm from configuration.""" spectrum_name = self.config.get("spectrum_algorithm", "first_fit") if self.sdn_props is None: raise ValueError("sdn_props is required for spectrum algorithm creation") return AlgorithmFactory.create_spectrum_algorithm( spectrum_name, self.engine_props, self.sdn_props, self.route_props, # type: ignore[arg-type] ) def _create_snr_algorithm(self) -> AbstractSNRMeasurer: """Create SNR measurement algorithm from configuration.""" snr_name = self.config.get("snr_algorithm", "standard_snr") return AlgorithmFactory.create_snr_algorithm( snr_name, self.engine_props, self.sdn_props, self.spectrum_props, self.route_props, ) def _create_base_result(self, source: Any, destination: Any) -> dict[str, Any]: """Create base result dictionary.""" return { "source": source, "destination": destination, "success": False, "path": None, "spectrum_assignment": None, "snr": None, "metrics": {}, } def _validate_algorithms(self) -> str: """ Validate that all required algorithms are initialized. :return: Empty string if valid, error message if invalid :rtype: str """ if not hasattr(self, "routing_algorithm") or self.routing_algorithm is None: return "Routing algorithm not initialized" if not hasattr(self, "spectrum_algorithm") or self.spectrum_algorithm is None: return "Spectrum algorithm not initialized" if not hasattr(self, "snr_algorithm") or self.snr_algorithm is None: return "SNR algorithm not initialized" return "" def _collect_metrics(self) -> dict[str, Any]: """Collect metrics from all algorithms.""" try: metrics = {} if hasattr(self.routing_algorithm, "get_metrics"): metrics["routing"] = self.routing_algorithm.get_metrics() else: metrics["routing"] = {"error": "routing algorithm missing get_metrics method"} if hasattr(self.spectrum_algorithm, "get_metrics"): metrics["spectrum"] = self.spectrum_algorithm.get_metrics() else: metrics["spectrum"] = {"error": "spectrum algorithm missing get_metrics method"} if hasattr(self.snr_algorithm, "get_metrics"): metrics["snr"] = self.snr_algorithm.get_metrics() else: metrics["snr"] = {"error": "snr algorithm missing get_metrics method"} return metrics except (AttributeError, TypeError) as e: return {"error": f"Failed to collect metrics: {str(e)}"}
[docs] def process_request(self, source: Any, destination: Any, request: Any) -> dict[str, Any]: """ Process a single network request through the complete pipeline. :param source: Source node identifier :type source: Any :param destination: Destination node identifier :type destination: Any :param request: Request object containing traffic demand details :type request: Any :return: Dictionary containing processing results :rtype: Dict[str, Any] """ result = self._create_base_result(source, destination) # Validate algorithms validation_error = self._validate_algorithms() if validation_error: result["failure_reason"] = validation_error return result try: # Process through the pipeline self._process_pipeline(result, source, destination, request) except (KeyError, AttributeError, TypeError, ValueError) as e: result["failure_reason"] = f"Processing error: {str(e)}" except (RuntimeError, OSError, ImportError) as e: result["failure_reason"] = f"System error: {str(e)}" # Collect metrics from all algorithms result["metrics"] = self._collect_metrics() # Final safety check - ensure result is never None if result is None: result = { "source": source, "destination": destination, "success": False, "path": None, "spectrum_assignment": None, "snr": None, "metrics": {"error": "Unexpected None result"}, "failure_reason": "Internal error: result was None", } return result
def _process_pipeline(self, result: dict[str, Any], source: Any, destination: Any, request: Any) -> None: """Process request through the complete pipeline.""" # Step 1: Routing if not hasattr(self.routing_algorithm, "route"): result["failure_reason"] = "Routing algorithm missing route method" return self.routing_algorithm.route(source, destination, request) # Check if routing found a path via route_props if not self.routing_algorithm.route_props.paths_matrix: result["failure_reason"] = "No path found" return # Use the first path from paths_matrix path = self.routing_algorithm.route_props.paths_matrix[0] result["path"] = path # Step 2: Spectrum Assignment if not hasattr(self.spectrum_algorithm, "assign"): result["failure_reason"] = "Spectrum algorithm missing assign method" return spectrum_assignment = self.spectrum_algorithm.assign(path, request) if not spectrum_assignment: result["failure_reason"] = "No spectrum available" return result["spectrum_assignment"] = spectrum_assignment # Step 3: SNR Check and Allocation self._process_snr_and_allocation(result, path, spectrum_assignment, request) def _process_snr_and_allocation( self, result: dict[str, Any], path: Any, spectrum_assignment: dict[str, Any], request: Any, ) -> None: """Process SNR check and spectrum allocation.""" if not hasattr(self.snr_algorithm, "calculate_snr"): result["failure_reason"] = "SNR algorithm missing calculate_snr method" return snr_value = self.snr_algorithm.calculate_snr(path, spectrum_assignment) result["snr"] = snr_value # Get modulation format for SNR threshold modulation = request.modulation if hasattr(request, "modulation") else "QPSK" topology: Any = self.engine_props.get("topology") if topology is None and self.sdn_props is not None and hasattr(self.sdn_props, "topology"): topology = self.sdn_props.topology if topology is None: result["failure_reason"] = "No topology available" return path_length = sum(topology[path[i]][path[i + 1]]["length"] for i in range(len(path) - 1)) if not hasattr(self.snr_algorithm, "get_required_snr_threshold"): result["failure_reason"] = "SNR algorithm missing get_required_snr_threshold method" return if not hasattr(self.snr_algorithm, "is_snr_acceptable"): result["failure_reason"] = "SNR algorithm missing is_snr_acceptable method" return required_snr = self.snr_algorithm.get_required_snr_threshold(modulation, path_length) snr_margin = self.config.get("snr_margin", 1.0) if self.snr_algorithm.is_snr_acceptable(snr_value, required_snr, snr_margin): self._allocate_spectrum(result, path, spectrum_assignment, request, required_snr, snr_value) else: result["failure_reason"] = f"SNR too low ({snr_value:.1f} dB < {required_snr:.1f} dB)" def _allocate_spectrum( self, result: dict[str, Any], path: Any, spectrum_assignment: dict[str, Any], request: Any, required_snr: float, snr_value: float, ) -> None: """Allocate spectrum for the request.""" if not hasattr(self.spectrum_algorithm, "allocate_spectrum"): result["failure_reason"] = "Spectrum algorithm missing allocate_spectrum method" return request_id = getattr(request, "id", hash((result["source"], result["destination"]))) success = self.spectrum_algorithm.allocate_spectrum( path, spectrum_assignment["start_slot"], spectrum_assignment["end_slot"], spectrum_assignment["core_num"], spectrum_assignment["band"], request_id, ) if success: result["success"] = True result["required_snr"] = required_snr result["snr_margin"] = snr_value - required_snr else: result["failure_reason"] = "Spectrum allocation failed"
[docs] def get_algorithm_info(self) -> dict[str, dict[str, Any]]: """Get information about all algorithms in the pipeline.""" return { "routing": { "name": self.routing_algorithm.algorithm_name, "supported_topologies": self.routing_algorithm.supported_topologies, "class": type(self.routing_algorithm).__name__, }, "spectrum": { "name": self.spectrum_algorithm.algorithm_name, "supports_multiband": self.spectrum_algorithm.supports_multiband, "class": type(self.spectrum_algorithm).__name__, }, "snr": { "name": self.snr_algorithm.algorithm_name, "supports_multicore": self.snr_algorithm.supports_multicore, "class": type(self.snr_algorithm).__name__, }, }
[docs] def reset_all_algorithms(self) -> None: """Reset state for all algorithms.""" self.routing_algorithm.reset() self.spectrum_algorithm.reset() self.snr_algorithm.reset()
[docs] def create_simulation_pipeline(config: dict[str, Any]) -> SimulationPipeline: """ Create a complete simulation pipeline from configuration. :param config: Configuration dictionary :type config: Dict[str, Any] :return: Configured SimulationPipeline instance :rtype: SimulationPipeline """ return SimulationPipeline(config)