"""
Protection pipeline for 1+1 dedicated path protection.
This module provides ProtectionPipeline which handles:
- Finding disjoint path pairs (link or node disjoint)
- Allocating same spectrum on both primary and backup paths
- Creating protected lightpaths via NetworkState
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Protocol, runtime_checkable
import numpy as np
from fusion.pipelines.disjoint_path_finder import DisjointnessType, DisjointPathFinder
if TYPE_CHECKING:
import networkx as nx
from fusion.domain.lightpath import Lightpath
from fusion.domain.network_state import NetworkState
@runtime_checkable
class LinkSpectrumLike(Protocol):
"""Protocol for link spectrum objects used in protection allocation."""
def get_slot_count(self, band: str) -> int:
"""Get number of slots for a band."""
...
def get_spectrum_array(self, band: str) -> np.ndarray:
"""Get spectrum array for a band."""
...
@runtime_checkable
class NetworkStateLike(Protocol):
"""Protocol for network state objects used in protection allocation."""
def get_link_spectrum(self, link: tuple[str, str]) -> LinkSpectrumLike:
"""Get link spectrum for a link."""
...
logger = logging.getLogger(__name__)
[docs]
@dataclass
class ProtectedAllocationResult:
"""
Result of protected spectrum allocation.
:ivar success: Whether allocation succeeded.
:vartype success: bool
:ivar start_slot: Starting spectrum slot (same on both paths).
:vartype start_slot: int
:ivar end_slot: Ending spectrum slot (exclusive).
:vartype end_slot: int
:ivar failure_reason: Reason for failure if success=False.
:vartype failure_reason: str | None
"""
success: bool
start_slot: int = -1
end_slot: int = -1
failure_reason: str | None = None
[docs]
@classmethod
def no_disjoint_paths(cls) -> ProtectedAllocationResult:
"""Create result for no disjoint paths found."""
return cls(success=False, failure_reason="no_disjoint_paths")
[docs]
@classmethod
def no_common_spectrum(cls) -> ProtectedAllocationResult:
"""Create result for no common spectrum available."""
return cls(success=False, failure_reason="no_common_spectrum")
[docs]
@classmethod
def allocated(cls, start_slot: int, end_slot: int) -> ProtectedAllocationResult:
"""Create result for successful allocation."""
return cls(success=True, start_slot=start_slot, end_slot=end_slot)
[docs]
class ProtectionPipeline:
"""
Pipeline for 1+1 dedicated path protection.
Provides methods to:
- Find disjoint path pairs (link or node disjoint)
- Allocate the SAME spectrum on both primary and backup paths
- Integrate with NetworkState for lightpath creation
1+1 Dedicated Protection:
- Both primary and backup paths are pre-provisioned
- SAME spectrum slots are allocated on BOTH paths
- Traffic is transmitted on both paths simultaneously
- Receiver monitors primary and switches to backup on failure
- Fast switchover (typically < 50ms)
:ivar disjoint_finder: DisjointPathFinder for path computation.
:vartype disjoint_finder: DisjointPathFinder
:ivar switchover_latency_ms: Protection switchover latency (default 50ms).
:vartype switchover_latency_ms: float
Example:
>>> pipeline = ProtectionPipeline(DisjointnessType.LINK)
>>> result = pipeline.allocate_protected(
... primary_path=["A", "B", "C"],
... backup_path=["A", "D", "C"],
... slots_needed=8,
... network_state=state,
... core=0,
... band="c",
... )
>>> if result.success:
... print(f"Allocated slots {result.start_slot}-{result.end_slot}")
"""
[docs]
def __init__(
self,
disjointness: DisjointnessType = DisjointnessType.LINK,
switchover_latency_ms: float = 50.0,
) -> None:
"""
Initialize ProtectionPipeline.
:param disjointness: Type of path disjointness (LINK or NODE).
:type disjointness: DisjointnessType
:param switchover_latency_ms: Switchover latency in milliseconds.
:type switchover_latency_ms: float
"""
self.disjoint_finder = DisjointPathFinder(disjointness)
self.switchover_latency_ms = switchover_latency_ms
@property
def disjointness(self) -> DisjointnessType:
"""Current disjointness mode."""
return self.disjoint_finder.disjointness
[docs]
def find_protected_paths(
self,
topology: nx.Graph,
source: str,
destination: str,
) -> tuple[list[str], list[str]] | None:
"""
Find a disjoint path pair for protection.
:param topology: Network topology graph.
:type topology: nx.Graph
:param source: Source node ID.
:type source: str
:param destination: Destination node ID.
:type destination: str
:return: Tuple of (primary_path, backup_path) or None if not possible.
:rtype: tuple[list[str], list[str]] | None
"""
return self.disjoint_finder.find_disjoint_pair(topology, source, destination)
[docs]
def allocate_protected(
self,
primary_path: list[str],
backup_path: list[str],
slots_needed: int,
network_state: NetworkStateLike,
core: int = 0,
band: str = "c",
) -> ProtectedAllocationResult:
"""
Allocate spectrum on both primary and backup paths.
For 1+1 dedicated protection, allocates the SAME spectrum slots
on both paths to enable fast switchover.
:param primary_path: Primary path node sequence.
:type primary_path: list[str]
:param backup_path: Backup path node sequence.
:type backup_path: list[str]
:param slots_needed: Number of spectrum slots needed.
:type slots_needed: int
:param network_state: Current network state.
:type network_state: NetworkStateLike
:param core: Core index (default 0).
:type core: int
:param band: Band identifier (default "c").
:type band: str
:return: ProtectedAllocationResult with spectrum assignment or failure reason.
:rtype: ProtectedAllocationResult
Algorithm:
1. Get spectrum availability on both paths (bitwise arrays)
2. Compute intersection (common free blocks)
3. Find first-fit contiguous block satisfying slots_needed
4. Return allocation details
"""
# Get spectrum availability on both paths
primary_available = self._get_path_spectrum_availability(primary_path, network_state, core, band)
backup_available = self._get_path_spectrum_availability(backup_path, network_state, core, band)
if primary_available is None or backup_available is None:
return ProtectedAllocationResult.no_common_spectrum()
# Find common free blocks (bitwise AND)
common_free = primary_available & backup_available
# Find contiguous block using first-fit
start_slot = self._find_first_fit_block(common_free, slots_needed)
if start_slot < 0:
logger.debug(f"No common spectrum block of {slots_needed} slots available on both paths")
return ProtectedAllocationResult.no_common_spectrum()
end_slot = start_slot + slots_needed
logger.debug(f"Protected allocation: slots [{start_slot}:{end_slot}] on primary {primary_path} and backup {backup_path}")
return ProtectedAllocationResult.allocated(start_slot, end_slot)
[docs]
def create_protected_lightpath(
self,
network_state: NetworkState,
primary_path: list[str],
backup_path: list[str],
start_slot: int,
end_slot: int,
core: int,
band: str,
modulation: str,
bandwidth_gbps: int,
path_weight_km: float,
guard_slots: int = 0,
) -> Lightpath:
"""
Create a protected lightpath with both primary and backup paths.
Uses NetworkState.create_lightpath() which already supports protection.
Allocates spectrum on both paths with the same slot range.
:param network_state: Network state to create lightpath in.
:type network_state: NetworkState
:param primary_path: Primary path node sequence.
:type primary_path: list[str]
:param backup_path: Backup path node sequence.
:type backup_path: list[str]
:param start_slot: Starting spectrum slot.
:type start_slot: int
:param end_slot: Ending spectrum slot (exclusive).
:type end_slot: int
:param core: Core index.
:type core: int
:param band: Band identifier.
:type band: str
:param modulation: Modulation format.
:type modulation: str
:param bandwidth_gbps: Bandwidth in Gbps.
:type bandwidth_gbps: int
:param path_weight_km: Primary path weight in km.
:type path_weight_km: float
:param guard_slots: Number of guard slots (default 0).
:type guard_slots: int
:return: Created Lightpath with protection fields set.
:rtype: Lightpath
:raises ValueError: If spectrum not available on either path.
"""
return network_state.create_lightpath(
path=primary_path,
start_slot=start_slot,
end_slot=end_slot,
core=core,
band=band,
modulation=modulation,
bandwidth_gbps=bandwidth_gbps,
path_weight_km=path_weight_km,
guard_slots=guard_slots,
# Protection fields
backup_path=backup_path,
backup_start_slot=start_slot, # Same slot for 1+1
backup_end_slot=end_slot,
backup_core=core,
backup_band=band,
)
[docs]
def verify_disjointness(
self,
path1: list[str],
path2: list[str],
) -> bool:
"""
Verify that two paths meet the disjointness requirement.
:param path1: First path.
:type path1: list[str]
:param path2: Second path.
:type path2: list[str]
:return: True if paths are disjoint according to current mode.
:rtype: bool
"""
if self.disjointness == DisjointnessType.LINK:
return self.disjoint_finder.are_link_disjoint(path1, path2)
else:
return self.disjoint_finder.are_node_disjoint(path1, path2)
def _get_path_spectrum_availability(
self,
path: list[str],
network_state: NetworkStateLike,
core: int,
band: str,
) -> np.ndarray | None:
"""
Get combined spectrum availability for a path (AND of all links).
:param path: Path as list of node IDs.
:type path: list[str]
:param network_state: Network state.
:type network_state: NetworkStateLike
:param core: Core index.
:type core: int
:param band: Band identifier.
:type band: str
:return: Boolean array where True = free, or None if path invalid.
:rtype: np.ndarray | None
"""
if len(path) < 2:
return None
# Get spectrum size from first link
try:
first_link = (path[0], path[1])
link_spectrum = network_state.get_link_spectrum(first_link)
num_slots = link_spectrum.get_slot_count(band)
except KeyError:
return None
# Start with all slots available
combined = np.ones(num_slots, dtype=bool)
# AND with each link's availability
for i in range(len(path) - 1):
link = (path[i], path[i + 1])
try:
link_spectrum = network_state.get_link_spectrum(link)
spectrum_array = link_spectrum.get_spectrum_array(band)
# 0 = free, nonzero = occupied
# We want True for free slots
link_available = spectrum_array[core] == 0
combined &= link_available
except (KeyError, IndexError):
return None
return combined
def _find_first_fit_block(
self,
available: np.ndarray,
slots_needed: int,
) -> int:
"""
Find first contiguous block of free slots.
:param available: Boolean array (True = free).
:type available: np.ndarray
:param slots_needed: Number of contiguous slots needed.
:type slots_needed: int
:return: Start slot index, or -1 if no fit found.
:rtype: int
"""
consecutive = 0
start = -1
for i, free in enumerate(available):
if free:
if consecutive == 0:
start = i
consecutive += 1
if consecutive >= slots_needed:
return start
else:
consecutive = 0
start = -1
return -1
[docs]
def get_switchover_latency(self) -> float:
"""Get the protection switchover latency in milliseconds."""
return self.switchover_latency_ms