"""
Pipeline factory for FUSION simulation.
This module provides PipelineFactory for creating pipelines based on
SimulationConfig, and PipelineSet for holding pipeline references.
Design Principles:
- Factory is stateless (static/class methods only)
- Lazy imports to avoid circular dependencies
- Config-driven pipeline selection
- Optional pipelines return None when disabled
Usage:
>>> config = SimulationConfig.from_engine_props(engine_props)
>>> pipelines = PipelineFactory.create_pipeline_set(config)
>>> orchestrator = PipelineFactory.create_orchestrator(config)
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fusion.domain.config import SimulationConfig
from fusion.interfaces.pipelines import (
GroomingPipeline,
RoutingPipeline,
SlicingPipeline,
SNRPipeline,
SpectrumPipeline,
)
logger = logging.getLogger(__name__)
# =============================================================================
# PipelineSet
# =============================================================================
[docs]
@dataclass(frozen=False, slots=True)
class PipelineSet:
"""
Container for all pipelines used by SDNOrchestrator.
This dataclass holds references to pipeline implementations that
will be called by the orchestrator during request handling.
Required pipelines (routing, spectrum) are always present.
Optional pipelines (grooming, snr, slicing) may be None if disabled.
:ivar routing: RoutingPipeline implementation (required)
:ivar spectrum: SpectrumPipeline implementation (required)
:ivar grooming: GroomingPipeline or None if disabled
:ivar snr: SNRPipeline or None if disabled
:ivar slicing: SlicingPipeline or None if disabled
"""
# Required pipelines
routing: RoutingPipeline
spectrum: SpectrumPipeline
# Optional pipelines (None if feature disabled)
grooming: GroomingPipeline | None = None
snr: SNRPipeline | None = None
slicing: SlicingPipeline | None = None
def __post_init__(self) -> None:
"""Validate required pipelines are present."""
if self.routing is None:
raise ValueError("PipelineSet requires a routing pipeline")
if self.spectrum is None:
raise ValueError("PipelineSet requires a spectrum pipeline")
@property
def has_grooming(self) -> bool:
"""Check if grooming is available."""
return self.grooming is not None
@property
def has_snr(self) -> bool:
"""Check if SNR validation is available."""
return self.snr is not None
@property
def has_slicing(self) -> bool:
"""Check if slicing is available."""
return self.slicing is not None
def __repr__(self) -> str:
"""Readable representation showing pipeline types."""
parts = [
f"routing={type(self.routing).__name__}",
f"spectrum={type(self.spectrum).__name__}",
]
if self.grooming:
parts.append(f"grooming={type(self.grooming).__name__}")
if self.snr:
parts.append(f"snr={type(self.snr).__name__}")
if self.slicing:
parts.append(f"slicing={type(self.slicing).__name__}")
return f"PipelineSet({', '.join(parts)})"
# =============================================================================
# PipelineFactory
# =============================================================================
[docs]
class PipelineFactory:
"""
Factory for creating pipelines based on SimulationConfig.
This factory uses lazy imports to avoid circular dependencies and
selects between legacy adapters and new pipeline implementations
based on configuration values. All methods are static or class methods
(stateless factory pattern).
"""
[docs]
@staticmethod
def create_routing(config: SimulationConfig) -> RoutingPipeline:
"""
Create routing pipeline based on config.
Returns ProtectedRoutingPipeline for 1+1 protection, otherwise RoutingAdapter.
:param config: Simulation configuration
:type config: SimulationConfig
:return: RoutingPipeline implementation
:rtype: RoutingPipeline
"""
route_method = getattr(config, "route_method", "k_shortest_path")
if route_method == "1plus1_protection":
logger.debug("Creating ProtectedRoutingPipeline for 1+1 protection")
from fusion.pipelines.routing_pipeline import ProtectedRoutingPipeline
return ProtectedRoutingPipeline(config)
else:
logger.debug(f"Creating RoutingAdapter for method: {route_method}")
from fusion.core.adapters.routing_adapter import RoutingAdapter
return RoutingAdapter(config)
[docs]
@staticmethod
def create_spectrum(config: SimulationConfig) -> SpectrumPipeline:
"""
Create spectrum pipeline based on config.
Currently always returns SpectrumAdapter (wraps legacy).
:param config: Simulation configuration
:type config: SimulationConfig
:return: SpectrumPipeline implementation
:rtype: SpectrumPipeline
"""
allocation_method = getattr(config, "allocation_method", "first_fit")
# For now, always use adapter
# Future: Add new implementations for specific allocation methods
logger.debug(f"Creating SpectrumAdapter for method: {allocation_method}")
from fusion.core.adapters.spectrum_adapter import SpectrumAdapter
return SpectrumAdapter(config)
[docs]
@staticmethod
def create_grooming(config: SimulationConfig) -> GroomingPipeline | None:
"""
Create grooming pipeline if enabled.
Returns GroomingAdapter if grooming_enabled, otherwise None.
:param config: Simulation configuration
:type config: SimulationConfig
:return: GroomingPipeline if enabled, None otherwise
:rtype: GroomingPipeline | None
"""
grooming_enabled = getattr(config, "grooming_enabled", False)
if not grooming_enabled:
logger.debug("Grooming disabled, returning None")
return None
logger.debug("Creating GroomingAdapter")
from fusion.core.adapters.grooming_adapter import GroomingAdapter
return GroomingAdapter(config)
[docs]
@staticmethod
def create_snr(config: SimulationConfig) -> SNRPipeline | None:
"""
Create SNR pipeline if enabled.
Returns SNRAdapter if snr_enabled, otherwise None.
:param config: Simulation configuration
:type config: SimulationConfig
:return: SNRPipeline if enabled, None otherwise
:rtype: SNRPipeline | None
"""
snr_enabled = getattr(config, "snr_enabled", False)
if not snr_enabled:
logger.debug("SNR validation disabled, returning None")
return None
logger.debug("Creating SNRAdapter")
from fusion.core.adapters.snr_adapter import SNRAdapter
return SNRAdapter(config)
[docs]
@staticmethod
def create_slicing(config: SimulationConfig) -> SlicingPipeline | None:
"""
Create slicing pipeline if enabled.
Returns StandardSlicingPipeline if slicing_enabled, otherwise None.
:param config: Simulation configuration
:type config: SimulationConfig
:return: SlicingPipeline if enabled, None otherwise
:rtype: SlicingPipeline | None
"""
slicing_enabled = getattr(config, "slicing_enabled", False)
if not slicing_enabled:
logger.debug("Slicing disabled, returning None")
return None
logger.debug("Creating StandardSlicingPipeline")
from fusion.pipelines.slicing_pipeline import StandardSlicingPipeline
return StandardSlicingPipeline(config)
[docs]
@classmethod
def create_pipeline_set(cls, config: SimulationConfig) -> PipelineSet:
"""
Create complete pipeline set from configuration.
Creates all pipelines based on configuration settings. Required pipelines
(routing, spectrum) are always created. Optional pipelines may be None.
:param config: Simulation configuration
:type config: SimulationConfig
:return: PipelineSet with all pipelines configured
:rtype: PipelineSet
"""
logger.info("Creating pipeline set from configuration")
return PipelineSet(
routing=cls.create_routing(config),
spectrum=cls.create_spectrum(config),
grooming=cls.create_grooming(config),
snr=cls.create_snr(config),
slicing=cls.create_slicing(config),
)
[docs]
@classmethod
def create_orchestrator(cls, config: SimulationConfig) -> SDNOrchestrator:
"""
Create orchestrator with configured pipelines.
Convenience method that creates both pipeline set and orchestrator.
The orchestrator is the main entry point for request handling.
:param config: Simulation configuration
:type config: SimulationConfig
:return: SDNOrchestrator instance with configured pipelines
:rtype: SDNOrchestrator
"""
from fusion.core.orchestrator import SDNOrchestrator
pipelines = cls.create_pipeline_set(config)
return SDNOrchestrator(config, pipelines)
# =============================================================================
# Type alias for forward reference
# =============================================================================
if TYPE_CHECKING:
from fusion.core.orchestrator import SDNOrchestrator