Source code for fusion.interfaces.pipelines

"""
Pipeline protocol definitions for FUSION architecture.

This module defines typing.Protocol classes for all pipeline components:

- RoutingPipeline: Find candidate routes between nodes
- SpectrumPipeline: Find available spectrum along paths
- GroomingPipeline: Pack requests onto existing lightpaths
- SNRPipeline: Validate signal quality
- SlicingPipeline: Divide requests into smaller allocations

Design Principles:

- Protocols are type-only definitions (no runtime behavior)
- All pipelines receive NetworkState as method parameter
- Pipelines do NOT store NetworkState as instance attribute
- Return structured result objects from fusion.domain.results

Example usage::

    # Type hint a parameter as accepting any routing implementation
    def process_request(
        router: RoutingPipeline,
        network_state: NetworkState,
    ) -> RouteResult:
        return router.find_routes("A", "B", 100, network_state)

    # Check if object implements protocol (runtime_checkable)
    if isinstance(my_router, RoutingPipeline):
        result = my_router.find_routes(...)
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Protocol, runtime_checkable

if TYPE_CHECKING:
    from fusion.domain.lightpath import Lightpath
    from fusion.domain.network_state import NetworkState
    from fusion.domain.request import Request
    from fusion.domain.results import (
        GroomingResult,
        RouteResult,
        SlicingResult,
        SNRRecheckResult,
        SNRResult,
        SpectrumResult,
    )


# =============================================================================
# RoutingPipeline
# =============================================================================


[docs] @runtime_checkable class RoutingPipeline(Protocol): """ Protocol for route finding algorithms. Implementations find candidate routes between source and destination nodes, returning paths with their weights and valid modulation formats. Design Notes: - Implementations should NOT store NetworkState as instance attribute - Receive NetworkState as method parameter for each call - Return RouteResult without modifying NetworkState - Configuration accessed via network_state.config Common Implementations: - K-shortest paths routing - Congestion-aware routing - Fragmentation-aware routing - NLI-aware routing (non-linear interference) Example:: class KShortestPathRouter: def find_routes( self, source: str, destination: str, bandwidth_gbps: int, network_state: NetworkState, *, forced_path: list[str] | None = None, ) -> RouteResult: if forced_path is not None: return RouteResult(paths=(tuple(forced_path),), ...) k = network_state.config.k_paths paths = find_k_shortest(network_state.topology, source, destination, k) return RouteResult(paths=paths, ...) """
[docs] def find_routes( self, source: str, destination: str, bandwidth_gbps: int, network_state: NetworkState, *, forced_path: list[str] | None = None, ) -> RouteResult: """ Find candidate routes between source and destination. :param source: Source node identifier :type source: str :param destination: Destination node identifier :type destination: str :param bandwidth_gbps: Required bandwidth (used for modulation selection) :type bandwidth_gbps: int :param network_state: Current network state (topology, config) :type network_state: NetworkState :param forced_path: If provided, use this path instead of searching (typically from partial grooming) :type forced_path: list[str] | None :return: RouteResult containing paths, weights_km, modulations, and strategy_name. Returns empty RouteResult if no routes found. :rtype: RouteResult .. note:: - Number of paths limited by network_state.config.k_paths - Modulation formats filtered by reach based on path weight - This is a pure query method with no side effects """ ...
# ============================================================================= # SpectrumPipeline # =============================================================================
[docs] @runtime_checkable class SpectrumPipeline(Protocol): """ Protocol for spectrum assignment algorithms. Implementations find available spectrum slots along a path for a given bandwidth and modulation format. Design Notes: - Implementations should NOT store NetworkState as instance attribute - Return SpectrumResult without modifying NetworkState - Actual allocation done by NetworkState.create_lightpath() Common Implementations: - First-fit: Allocate lowest available slot range - Best-fit: Allocate smallest sufficient gap - Last-fit: Allocate highest available slot range Example:: class FirstFitSpectrum: def find_spectrum( self, path: list[str], modulation: str, bandwidth_gbps: int, network_state: NetworkState, ) -> SpectrumResult: slots_needed = calculate_slots(bandwidth_gbps, modulation) for band in network_state.config.band_list: for core in range(network_state.config.cores_per_link): start = find_first_free(path, slots_needed, core, band) if start is not None: return SpectrumResult(is_free=True, start_slot=start, ...) return SpectrumResult.not_found(slots_needed) """
[docs] def find_spectrum( self, path: list[str], modulation: str | list[str], bandwidth_gbps: int, network_state: NetworkState, *, connection_index: int | None = None, path_index: int = 0, use_dynamic_slicing: bool = False, snr_bandwidth: int | None = None, request_id: int | None = None, slice_bandwidth: int | None = None, excluded_modulations: set[str] | None = None, ) -> SpectrumResult: """ Find available spectrum along a path. :param path: Ordered list of node IDs forming the route :type path: list[str] :param modulation: Modulation format name (e.g., "QPSK") or list of valid modulations to try :type modulation: str | list[str] :param bandwidth_gbps: Required bandwidth in Gbps :type bandwidth_gbps: int :param network_state: Current network state :type network_state: NetworkState :param connection_index: External routing index for pre-calculated SNR lookup :type connection_index: int | None :param path_index: Index of which k-path is being tried (0, 1, 2...) :type path_index: int :param use_dynamic_slicing: If True, use dynamic slicing to find spectrum :type use_dynamic_slicing: bool :param snr_bandwidth: Override bandwidth for SNR calculation :type snr_bandwidth: int | None :param request_id: Request ID for tracking/logging :type request_id: int | None :param slice_bandwidth: Bandwidth per slice when slicing is active :type slice_bandwidth: int | None :param excluded_modulations: Modulations to exclude from dynamic slicing :type excluded_modulations: set[str] | None :return: SpectrumResult with is_free, start_slot, end_slot, core, band, modulation, and slots_needed fields :rtype: SpectrumResult .. note:: This is a pure query method - does NOT allocate spectrum. Caller must use NetworkState.create_lightpath() to allocate. """ ...
[docs] def find_protected_spectrum( self, primary_path: list[str], backup_path: list[str], modulation: str, bandwidth_gbps: int, network_state: NetworkState, ) -> SpectrumResult: """ Find spectrum for both primary and backup paths (1+1 protection). :param primary_path: Primary route node sequence :type primary_path: list[str] :param backup_path: Backup route node sequence (should be disjoint) :type backup_path: list[str] :param modulation: Modulation format name :type modulation: str :param bandwidth_gbps: Required bandwidth :type bandwidth_gbps: int :param network_state: Current network state :type network_state: NetworkState :return: SpectrumResult with primary allocation in main fields and backup allocation in backup_* fields. Returns is_free=False if either path lacks spectrum. :rtype: SpectrumResult .. note:: This is a pure query method - does NOT allocate spectrum. Both paths must have free spectrum for success. """ ...
# ============================================================================= # GroomingPipeline # =============================================================================
[docs] @runtime_checkable class GroomingPipeline(Protocol): """ Protocol for traffic grooming algorithms. Grooming attempts to pack multiple requests onto existing lightpaths that have available capacity, reducing new lightpath establishment. Design Notes: - MAY have side effects (modifies lightpath bandwidth allocations) - Must support rollback for failed allocations - Returns GroomingResult indicating success/partial/failure Grooming Strategies: - Full grooming: Entire request fits on existing lightpath(s) - Partial grooming: Some bandwidth groomed, rest needs new lightpath - No grooming: No suitable lightpaths found Example:: class SimpleGrooming: def try_groom( self, request: Request, network_state: NetworkState, ) -> GroomingResult: candidates = network_state.get_lightpaths_between( request.source, request.destination ) for lp in candidates: if lp.can_accommodate(request.bandwidth_gbps): lp.allocate_bandwidth(request.request_id, request.bandwidth_gbps) return GroomingResult.full(request.bandwidth_gbps, [lp.lightpath_id]) return GroomingResult.no_grooming(request.bandwidth_gbps) """
[docs] def try_groom( self, request: Request, network_state: NetworkState, ) -> GroomingResult: """ Attempt to groom request onto existing lightpaths. :param request: The request to groom (source, destination, bandwidth) :type request: Request :param network_state: Current network state with active lightpaths :type network_state: NetworkState :return: GroomingResult with fully_groomed, partially_groomed, bandwidth_groomed_gbps, remaining_bandwidth_gbps, lightpaths_used, and forced_path fields :rtype: GroomingResult .. note:: If grooming succeeds (full or partial), modifies lightpath bandwidth allocations via Lightpath.allocate_bandwidth(). """ ...
[docs] def rollback_groom( self, request: Request, lightpath_ids: list[int], network_state: NetworkState, ) -> None: """ Rollback grooming allocations (e.g., after downstream failure). :param request: The request that was groomed :type request: Request :param lightpath_ids: Lightpath IDs to rollback :type lightpath_ids: list[int] :param network_state: Current network state :type network_state: NetworkState .. note:: Releases bandwidth from specified lightpaths via Lightpath.release_bandwidth(). Safe to call even if request wasn't groomed on all lightpaths. """ ...
# ============================================================================= # SNRPipeline # =============================================================================
[docs] @runtime_checkable class SNRPipeline(Protocol): """ Protocol for signal-to-noise ratio validation. Validates that lightpaths meet SNR requirements for their modulation format, considering interference from other channels. Design Notes: - validate() is a pure query method (no side effects) - recheck_affected() checks existing lightpaths after new allocation - Returns SNRResult/SNRRecheckResult with pass/fail and measurements SNR Considerations: - ASE noise from amplifiers - Non-linear interference (NLI) from other channels - Crosstalk in multi-core fibers (MCF) - Modulation-dependent thresholds Example:: class GNModelSNR: def validate( self, lightpath: Lightpath, network_state: NetworkState, ) -> SNRResult: required_snr = get_threshold(lightpath.modulation) snr_db = calculate_gn_model_snr(lightpath, network_state) if snr_db >= required_snr: return SNRResult.success(snr_db, required_snr) return SNRResult.failure(snr_db, required_snr, "SNR below threshold") """
[docs] def validate( self, lightpath: Lightpath, network_state: NetworkState, ) -> SNRResult: """ Validate SNR for a lightpath. :param lightpath: The lightpath to validate (may be newly created or existing lightpath being rechecked) :type lightpath: Lightpath :param network_state: Current network state with spectrum allocations :type network_state: NetworkState :return: SNRResult with passed, snr_db, required_snr_db, margin_db, failure_reason, and link_snr_values fields :rtype: SNRResult .. note:: This is a pure query method. Uses lightpath.modulation to determine threshold and considers interference from all other active lightpaths. """ ...
[docs] def recheck_affected( self, new_lightpath_id: int, network_state: NetworkState, *, _affected_range_slots: int = 5, slicing_flag: bool = False, ) -> SNRRecheckResult: """ Recheck SNR of existing lightpaths after new allocation. Some existing lightpaths may be degraded by the new allocation's interference. This method identifies them. :param new_lightpath_id: ID of newly created lightpath :type new_lightpath_id: int :param network_state: Current network state :type network_state: NetworkState :param affected_range_slots: Consider lightpaths within this many slots of the new allocation (default: 5) :type affected_range_slots: int :param slicing_flag: If True, indicates this is a slicing context :type slicing_flag: bool :return: SNRRecheckResult with all_pass, degraded_lightpath_ids, violations, and checked_count fields :rtype: SNRRecheckResult .. note:: This is a pure query method. Only checks lightpaths on same links and in nearby spectrum. Used to trigger rollback if existing lightpaths are degraded. """ ...
# ============================================================================= # SlicingPipeline # =============================================================================
[docs] @runtime_checkable class SlicingPipeline(Protocol): """ Protocol for request slicing algorithms. Slicing divides a large request into multiple smaller lightpaths when a single lightpath cannot accommodate the full bandwidth. Design Notes: - try_slice() is a pure query (checks feasibility) - rollback_slices() removes created lightpaths on failure - Used when normal allocation fails due to spectrum fragmentation - Limited by config.max_slices Slicing Strategies: - Static slicing: Fixed slice size (e.g., 50 Gbps per slice) - Dynamic slicing: Adaptive based on availability Example:: class DynamicSlicing: def try_slice( self, request: Request, path: list[str], modulation: str, bandwidth_gbps: int, network_state: NetworkState, *, max_slices: int | None = None, ) -> SlicingResult: limit = max_slices or network_state.config.max_slices for num_slices in range(2, limit + 1): slice_bw = bandwidth_gbps // num_slices if can_allocate_slices(path, slice_bw, num_slices, network_state): return SlicingResult.sliced(num_slices, slice_bw, ...) return SlicingResult.failed() """
[docs] def try_slice( self, request: Request, path: list[str], modulation: str, bandwidth_gbps: int, network_state: NetworkState, *, max_slices: int | None = None, spectrum_pipeline: SpectrumPipeline | None = None, snr_pipeline: SNRPipeline | None = None, connection_index: int | None = None, path_index: int = 0, snr_accumulator: list[float] | None = None, path_weight: float | None = None, ) -> SlicingResult: """ Attempt to slice request into multiple smaller allocations. :param request: The request being processed :type request: Request :param path: Route to use for slicing :type path: list[str] :param modulation: Modulation format for slices :type modulation: str :param bandwidth_gbps: Total bandwidth to slice :type bandwidth_gbps: int :param network_state: Current network state :type network_state: NetworkState :param max_slices: Override config.max_slices :type max_slices: int | None :param spectrum_pipeline: Pipeline for finding spectrum per slice :type spectrum_pipeline: SpectrumPipeline | None :param snr_pipeline: Pipeline for validating each slice :type snr_pipeline: SNRPipeline | None :param connection_index: External routing index for pre-calculated SNR lookup :type connection_index: int | None :param path_index: Index of which k-path is being tried (0, 1, 2...) :type path_index: int :param snr_accumulator: List to accumulate SNR values from failed attempts :type snr_accumulator: list[float] | None :param path_weight: Path weight in km (from routing, for metrics tracking) :type path_weight: float | None :return: SlicingResult with success, num_slices, slice_bandwidth_gbps, lightpaths_created, and total_bandwidth_gbps fields :rtype: SlicingResult .. note:: Pure query method by default. When spectrum_pipeline/snr_pipeline provided, may allocate slices. """ ...
[docs] def rollback_slices( self, lightpath_ids: list[int], network_state: NetworkState, ) -> None: """ Rollback all slices on failure. Called when partial slice allocation fails and all created slices must be released. :param lightpath_ids: IDs of lightpaths to release :type lightpath_ids: list[int] :param network_state: Network state to modify :type network_state: NetworkState .. note:: Releases/deletes specified lightpaths from network_state. Safe to call with empty list. """ ...
# ============================================================================= # Exports # ============================================================================= __all__ = [ "RoutingPipeline", "SpectrumPipeline", "GroomingPipeline", "SNRPipeline", "SlicingPipeline", ]