"""
Result objects for FUSION pipeline stages.
This module defines frozen dataclasses for all pipeline outputs:
- RouteResult: Routing pipeline output (candidate paths)
- SpectrumResult: Spectrum assignment output
- GroomingResult: Grooming pipeline output
- SlicingResult: Slicing pipeline output
- SNRResult: SNR validation output
- AllocationResult: Final orchestrator output (SINGLE SOURCE OF TRUTH)
All result objects are immutable (frozen dataclasses) to ensure
consistency throughout the processing pipeline.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from fusion.domain.request import BlockReason
# =============================================================================
# RouteResult
# =============================================================================
[docs]
@dataclass(frozen=True)
class RouteResult:
"""
Result of routing pipeline - candidate paths with modulation options.
Contains ordered list of candidate paths from best to worst,
each with associated path weight and valid modulation formats.
For 1+1 protection, includes disjoint backup paths.
Attributes:
paths: List of candidate paths (each path is tuple of node IDs).
weights_km: Path lengths/weights in kilometers.
modulations: Valid modulation formats per path.
backup_paths: Disjoint backup paths for 1+1 protection (optional).
backup_weights_km: Backup path lengths (optional).
backup_modulations: Modulations for backup paths (optional).
strategy_name: Name of routing algorithm used.
metadata: Algorithm-specific metadata.
connection_index: External routing index for pre-calculated SNR lookup.
Example:
>>> result = RouteResult(
... paths=(("0", "2", "5"),),
... weights_km=(100.0,),
... modulations=(("QPSK", "16-QAM"),),
... strategy_name="k_shortest_path",
... )
>>> result.is_empty
False
>>> result.best_path
('0', '2', '5')
"""
# Primary path candidates
paths: tuple[tuple[str, ...], ...] = ()
weights_km: tuple[float, ...] = ()
modulations: tuple[tuple[str, ...], ...] = ()
# Backup paths for 1+1 protection (optional)
backup_paths: tuple[tuple[str, ...], ...] | None = None
backup_weights_km: tuple[float, ...] | None = None
backup_modulations: tuple[tuple[str, ...], ...] | None = None
# Metadata
strategy_name: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
# External routing index (for pre-calculated SNR data lookup)
connection_index: int | None = None
def __post_init__(self) -> None:
"""Validate route result after creation."""
# Length consistency
if len(self.paths) != len(self.weights_km):
raise ValueError("paths and weights_km must have same length")
if len(self.paths) != len(self.modulations):
raise ValueError("paths and modulations must have same length")
# Path validity
for path in self.paths:
if len(path) < 2:
raise ValueError("Each path must have at least 2 nodes")
# Weight validity
for weight in self.weights_km:
if weight < 0:
raise ValueError("Weights must be non-negative")
# Backup consistency
if self.backup_paths is not None:
if len(self.backup_paths) != len(self.paths):
raise ValueError("backup_paths must match paths length")
@property
def is_empty(self) -> bool:
"""True if no paths found (routing failed)."""
return len(self.paths) == 0
@property
def num_paths(self) -> int:
"""Number of candidate paths."""
return len(self.paths)
@property
def has_protection(self) -> bool:
"""True if backup paths are available."""
return self.backup_paths is not None and len(self.backup_paths) > 0
@property
def best_path(self) -> tuple[str, ...] | None:
"""Best (first) candidate path or None if empty."""
return self.paths[0] if self.paths else None
@property
def best_weight(self) -> float | None:
"""Weight of best path or None if empty."""
return self.weights_km[0] if self.weights_km else None
[docs]
def get_path(self, index: int) -> tuple[str, ...]:
"""Get path at index."""
return self.paths[index]
[docs]
def get_modulations_for_path(self, index: int) -> tuple[str, ...]:
"""Get valid modulations for path at index."""
return self.modulations[index]
[docs]
@classmethod
def empty(cls, strategy_name: str = "") -> RouteResult:
"""Create empty result (no paths found)."""
return cls(
paths=(),
weights_km=(),
modulations=(),
strategy_name=strategy_name,
)
[docs]
@classmethod
def from_routing_props(cls, routing_props: Any) -> RouteResult:
"""
Create from legacy RoutingProps.
:param routing_props: Legacy RoutingProps object.
:type routing_props: Any
:return: New RouteResult instance.
:rtype: RouteResult
"""
# Handle empty case
if not hasattr(routing_props, "paths_matrix") or not routing_props.paths_matrix:
return cls.empty()
# Convert lists to tuples for immutability
paths = tuple(tuple(str(n) for n in p) for p in routing_props.paths_matrix)
weights = tuple(routing_props.weights_list)
modulations = tuple(tuple(mods) for mods in routing_props.modulation_formats_matrix)
# Handle backup paths if present
backup_paths = None
backup_weights = None
backup_mods = None
if hasattr(routing_props, "backup_paths_matrix") and routing_props.backup_paths_matrix:
backup_paths = tuple(tuple(str(n) for n in p) if p else () for p in routing_props.backup_paths_matrix)
if hasattr(routing_props, "backup_weights_list"):
backup_weights = tuple(routing_props.backup_weights_list)
if hasattr(routing_props, "backup_modulation_formats_matrix"):
backup_mods = tuple(tuple(mods) for mods in routing_props.backup_modulation_formats_matrix)
return cls(
paths=paths,
weights_km=weights,
modulations=modulations,
backup_paths=backup_paths,
backup_weights_km=backup_weights,
backup_modulations=backup_mods,
)
# =============================================================================
# SpectrumResult
# =============================================================================
[docs]
@dataclass(frozen=True)
class SpectrumResult:
"""
Result of spectrum assignment pipeline.
Indicates whether contiguous spectrum was found and provides
the allocation details (slot range, core, band, modulation).
Attributes:
is_free: Whether spectrum assignment was successful.
start_slot: First slot index (valid only if is_free=True).
end_slot: Last slot index exclusive (valid only if is_free=True).
core: Core number for MCF (valid only if is_free=True).
band: Frequency band (valid only if is_free=True).
modulation: Selected modulation format.
slots_needed: Number of slots required (including guard band).
achieved_bandwidth_gbps: Achieved bandwidth in dynamic_lps mode.
snr_db: SNR value calculated during spectrum assignment.
backup_start_slot: Backup spectrum start for 1+1 protection.
backup_end_slot: Backup spectrum end for 1+1 protection.
backup_core: Backup core number for 1+1 protection.
backup_band: Backup frequency band for 1+1 protection.
Example:
>>> result = SpectrumResult(
... is_free=True,
... start_slot=100,
... end_slot=108,
... core=0,
... band="c",
... modulation="QPSK",
... slots_needed=8,
... )
>>> result.num_slots
8
"""
is_free: bool
# Primary allocation (meaningful only if is_free=True)
start_slot: int = 0
end_slot: int = 0
core: int = 0
band: str = "c"
modulation: str = ""
slots_needed: int = 0
# Achieved bandwidth in dynamic_lps mode (may be less than requested)
# When None, assumed to equal the requested bandwidth
achieved_bandwidth_gbps: int | None = None
# SNR value calculated during spectrum assignment (for metrics tracking)
# This is the value from legacy spectrum_props.crosstalk_cost
snr_db: float | None = None
# Backup allocation (for 1+1 protection)
backup_start_slot: int | None = None
backup_end_slot: int | None = None
backup_core: int | None = None
backup_band: str | None = None
def __post_init__(self) -> None:
"""Validate spectrum result after creation."""
if self.is_free:
if self.end_slot <= self.start_slot:
raise ValueError("end_slot must be > start_slot when is_free=True")
if self.slots_needed <= 0:
raise ValueError("slots_needed must be > 0 when is_free=True")
@property
def num_slots(self) -> int:
"""Number of slots allocated (0 if not free)."""
if not self.is_free:
return 0
return self.end_slot - self.start_slot
@property
def has_backup(self) -> bool:
"""True if backup spectrum is allocated."""
return self.backup_start_slot is not None
[docs]
@classmethod
def not_found(cls, slots_needed: int = 0) -> SpectrumResult:
"""Create result for failed spectrum assignment."""
return cls(is_free=False, slots_needed=slots_needed)
[docs]
@classmethod
def from_spectrum_props(cls, spectrum_props: Any) -> SpectrumResult:
"""
Create from legacy SpectrumProps or dict.
:param spectrum_props: Legacy SpectrumProps object or dict.
:type spectrum_props: Any
:return: New SpectrumResult instance.
:rtype: SpectrumResult
"""
if isinstance(spectrum_props, dict):
return cls(
is_free=spectrum_props.get("is_free", False),
start_slot=spectrum_props.get("start_slot", 0),
end_slot=spectrum_props.get("end_slot", 0),
core=int(spectrum_props.get("core_number", spectrum_props.get("core", 0)) or 0),
band=spectrum_props.get("band", "c"),
modulation=spectrum_props.get("modulation", ""),
slots_needed=spectrum_props.get("slots_needed", 0),
)
# Handle SpectrumProps object
return cls(
is_free=getattr(spectrum_props, "is_free", False),
start_slot=getattr(spectrum_props, "start_slot", 0) or 0,
end_slot=getattr(spectrum_props, "end_slot", 0) or 0,
core=getattr(spectrum_props, "core_number", 0) or 0,
band=getattr(spectrum_props, "current_band", "c") or "c",
modulation=getattr(spectrum_props, "modulation", "") or "",
slots_needed=getattr(spectrum_props, "slots_needed", 0) or 0,
)
[docs]
def to_allocation_dict(self) -> dict[str, Any]:
"""
Convert to allocation dictionary for legacy compatibility.
:return: Dictionary with allocation details.
:rtype: dict[str, Any]
"""
return {
"is_free": self.is_free,
"start_slot": self.start_slot,
"end_slot": self.end_slot,
"core_number": self.core,
"band": self.band,
"modulation": self.modulation,
"slots_needed": self.slots_needed,
}
# =============================================================================
# GroomingResult
# =============================================================================
[docs]
@dataclass(frozen=True)
class GroomingResult:
"""
Result of grooming pipeline - using existing lightpath capacity.
Indicates whether a request can be (fully or partially) served
by existing lightpaths without creating new ones.
Attributes:
fully_groomed: Entire request fits in existing lightpaths.
partially_groomed: Some bandwidth groomed, rest needs new LP.
bandwidth_groomed_gbps: Amount successfully groomed.
remaining_bandwidth_gbps: Amount still needing new lightpath.
lightpaths_used: IDs of lightpaths used for grooming.
forced_path: Required path for new lightpath (if partial).
snr_list: SNR values from grooming attempts (for legacy compatibility).
modulation_list: Modulation formats from grooming attempts.
Example:
>>> result = GroomingResult.full(100, [1, 2])
>>> result.fully_groomed
True
>>> result.needs_new_lightpath
False
"""
fully_groomed: bool = False
partially_groomed: bool = False
bandwidth_groomed_gbps: int = 0
remaining_bandwidth_gbps: int = 0
lightpaths_used: tuple[int, ...] = ()
forced_path: tuple[str, ...] | None = None
# Legacy compatibility: SNR/modulation values from grooming attempts
snr_list: tuple[float, ...] = ()
modulation_list: tuple[str, ...] = ()
def __post_init__(self) -> None:
"""Validate grooming result after creation."""
if self.fully_groomed and self.partially_groomed:
raise ValueError("fully_groomed and partially_groomed are mutually exclusive")
if self.fully_groomed and self.remaining_bandwidth_gbps != 0:
raise ValueError("fully_groomed requires remaining_bandwidth_gbps == 0")
if (self.fully_groomed or self.partially_groomed) and len(self.lightpaths_used) == 0:
raise ValueError("groomed requests must have lightpaths_used")
@property
def was_groomed(self) -> bool:
"""True if any grooming occurred."""
return self.fully_groomed or self.partially_groomed
@property
def needs_new_lightpath(self) -> bool:
"""True if a new lightpath is still needed."""
return not self.fully_groomed and self.remaining_bandwidth_gbps > 0
[docs]
@classmethod
def no_grooming(cls, bandwidth_gbps: int) -> GroomingResult:
"""Create result when no grooming is possible."""
return cls(
fully_groomed=False,
partially_groomed=False,
bandwidth_groomed_gbps=0,
remaining_bandwidth_gbps=bandwidth_gbps,
lightpaths_used=(),
)
[docs]
@classmethod
def full(
cls,
bandwidth_gbps: int,
lightpath_ids: list[int],
snr_list: list[float] | None = None,
modulation_list: list[str] | None = None,
) -> GroomingResult:
"""Create result for fully groomed request."""
return cls(
fully_groomed=True,
partially_groomed=False,
bandwidth_groomed_gbps=bandwidth_gbps,
remaining_bandwidth_gbps=0,
lightpaths_used=tuple(lightpath_ids),
snr_list=tuple(snr_list) if snr_list else (),
modulation_list=tuple(modulation_list) if modulation_list else (),
)
[docs]
@classmethod
def partial(
cls,
bandwidth_groomed: int,
remaining: int,
lightpath_ids: list[int],
forced_path: list[str] | None = None,
snr_list: list[float] | None = None,
modulation_list: list[str] | None = None,
) -> GroomingResult:
"""Create result for partially groomed request."""
return cls(
fully_groomed=False,
partially_groomed=True,
bandwidth_groomed_gbps=bandwidth_groomed,
remaining_bandwidth_gbps=remaining,
lightpaths_used=tuple(lightpath_ids),
forced_path=tuple(forced_path) if forced_path else None,
snr_list=tuple(snr_list) if snr_list else (),
modulation_list=tuple(modulation_list) if modulation_list else (),
)
# =============================================================================
# SlicingResult
# =============================================================================
[docs]
@dataclass(frozen=True)
class SlicingResult:
"""
Result of slicing pipeline - splitting request across lightpaths.
When a request is too large for a single lightpath (modulation
limitations), it may be split into multiple smaller slices.
Attributes:
success: Whether slicing succeeded.
num_slices: Number of slices created.
slice_bandwidth_gbps: Bandwidth per slice.
lightpaths_created: IDs of created lightpaths.
total_bandwidth_gbps: Total bandwidth across all slices.
failed_attempt_snr_values: SNR values from failed attempts (legacy).
Example:
>>> result = SlicingResult.sliced(
... num_slices=4,
... slice_bandwidth=25,
... lightpath_ids=[1, 2, 3, 4],
... )
>>> result.is_sliced
True
>>> result.total_bandwidth_gbps
100
"""
success: bool = False
num_slices: int = 0
slice_bandwidth_gbps: int = 0
lightpaths_created: tuple[int, ...] = ()
total_bandwidth_gbps: int = 0
failed_attempt_snr_values: tuple[float, ...] = () # SNR values from failed attempts (for Legacy compatibility)
def __post_init__(self) -> None:
"""Validate slicing result after creation."""
if self.success:
if self.num_slices == 0:
raise ValueError("success=True requires num_slices > 0")
if len(self.lightpaths_created) != self.num_slices:
raise ValueError("lightpaths_created length must match num_slices")
@property
def is_sliced(self) -> bool:
"""True if request was sliced (multiple lightpaths)."""
return self.success and self.num_slices > 1
[docs]
@classmethod
def failed(cls) -> SlicingResult:
"""Create result for failed slicing."""
return cls(success=False)
[docs]
@classmethod
def single_lightpath(cls, bandwidth_gbps: int, lightpath_id: int) -> SlicingResult:
"""Create result for single lightpath (no slicing needed)."""
return cls(
success=True,
num_slices=1,
slice_bandwidth_gbps=bandwidth_gbps,
lightpaths_created=(lightpath_id,),
total_bandwidth_gbps=bandwidth_gbps,
)
[docs]
@classmethod
def sliced(
cls,
num_slices: int,
slice_bandwidth: int,
lightpath_ids: list[int],
) -> SlicingResult:
"""Create result for sliced request."""
return cls(
success=True,
num_slices=num_slices,
slice_bandwidth_gbps=slice_bandwidth,
lightpaths_created=tuple(lightpath_ids),
total_bandwidth_gbps=num_slices * slice_bandwidth,
)
# =============================================================================
# SNRResult
# =============================================================================
[docs]
@dataclass(frozen=True)
class SNRResult:
"""
Result of SNR validation pipeline.
Indicates whether the signal-to-noise ratio meets the threshold
required for the selected modulation format.
Attributes:
passed: Whether SNR meets threshold (primary success indicator).
snr_db: Calculated SNR value in dB.
required_snr_db: Threshold required for modulation format.
margin_db: SNR margin (snr_db - required_snr_db).
failure_reason: Explanation if validation failed.
link_snr_values: Per-link SNR breakdown for debugging.
Example:
>>> result = SNRResult.success(snr_db=18.5, required_snr_db=15.0)
>>> result.passed
True
>>> result.margin_db
3.5
"""
passed: bool
snr_db: float = 0.0
required_snr_db: float = 0.0
margin_db: float = 0.0
failure_reason: str | None = None
link_snr_values: dict[tuple[str, str], float] = field(default_factory=dict)
@property
def is_degraded(self) -> bool:
"""True if SNR passed but with low margin (< 1 dB)."""
return self.passed and 0 <= self.margin_db < 1.0
@property
def has_link_breakdown(self) -> bool:
"""True if per-link SNR values are available."""
return len(self.link_snr_values) > 0
[docs]
@classmethod
def success(
cls,
snr_db: float,
required_snr_db: float,
link_snr_values: dict[tuple[str, str], float] | None = None,
) -> SNRResult:
"""Create successful SNR result."""
margin = snr_db - required_snr_db
return cls(
passed=True,
snr_db=snr_db,
required_snr_db=required_snr_db,
margin_db=margin,
failure_reason=None,
link_snr_values=link_snr_values or {},
)
[docs]
@classmethod
def failure(
cls,
snr_db: float,
required_snr_db: float,
reason: str = "SNR below threshold",
link_snr_values: dict[tuple[str, str], float] | None = None,
) -> SNRResult:
"""Create failed SNR result."""
margin = snr_db - required_snr_db
return cls(
passed=False,
snr_db=snr_db,
required_snr_db=required_snr_db,
margin_db=margin,
failure_reason=reason,
link_snr_values=link_snr_values or {},
)
[docs]
@classmethod
def skipped(cls) -> SNRResult:
"""Create result for when SNR validation is disabled."""
return cls(
passed=True,
snr_db=0.0,
required_snr_db=0.0,
margin_db=0.0,
failure_reason=None,
)
[docs]
def get_weakest_link(self) -> tuple[tuple[str, str], float] | None:
"""
Get the link with lowest SNR.
:return: Tuple of (link, snr_db) or None if no link breakdown.
:rtype: tuple[tuple[str, str], float] | None
"""
if not self.link_snr_values:
return None
return min(self.link_snr_values.items(), key=lambda x: x[1])
# =============================================================================
# SNRRecheckResult
# =============================================================================
[docs]
@dataclass(frozen=True)
class SNRRecheckResult:
"""
Result of SNR recheck after new lightpath allocation.
When a new lightpath is allocated, it may cause interference with
existing lightpaths. This result captures the outcome of rechecking
SNR for affected lightpaths.
Attributes:
all_pass: True if all affected lightpaths still meet SNR threshold.
degraded_lightpath_ids: IDs of lightpaths now below threshold.
violations: Mapping of lightpath_id to SNR shortfall in dB.
checked_count: Number of lightpaths that were checked.
Example:
>>> result = SNRRecheckResult.success()
>>> result.all_pass
True
>>> result.num_degraded
0
"""
all_pass: bool
degraded_lightpath_ids: tuple[int, ...] = ()
violations: dict[int, float] = field(default_factory=dict)
checked_count: int = 0
def __post_init__(self) -> None:
"""Validate SNR recheck result after creation."""
if self.all_pass and len(self.degraded_lightpath_ids) > 0:
raise ValueError("all_pass=True requires no degraded lightpaths")
if not self.all_pass and len(self.degraded_lightpath_ids) == 0:
raise ValueError("all_pass=False requires at least one degraded lightpath")
@property
def num_degraded(self) -> int:
"""Number of lightpaths that are now degraded."""
return len(self.degraded_lightpath_ids)
@property
def has_violations(self) -> bool:
"""True if any lightpaths are degraded."""
return len(self.degraded_lightpath_ids) > 0
[docs]
def get_worst_violation(self) -> tuple[int, float] | None:
"""
Get lightpath with largest SNR shortfall.
:return: Tuple of (lightpath_id, shortfall_db) or None if no violations.
:rtype: tuple[int, float] | None
"""
if not self.violations:
return None
return min(self.violations.items(), key=lambda x: x[1])
[docs]
@classmethod
def success(cls, checked_count: int = 0) -> SNRRecheckResult:
"""Create result when all affected lightpaths still pass."""
return cls(
all_pass=True,
degraded_lightpath_ids=(),
violations={},
checked_count=checked_count,
)
[docs]
@classmethod
def degraded(
cls,
degraded_ids: list[int],
violations: dict[int, float],
checked_count: int = 0,
) -> SNRRecheckResult:
"""Create result when some lightpaths are now degraded."""
return cls(
all_pass=False,
degraded_lightpath_ids=tuple(degraded_ids),
violations=violations,
checked_count=checked_count if checked_count > 0 else len(degraded_ids),
)
# =============================================================================
# AllocationResult - FINAL AUTHORITY
# =============================================================================
[docs]
@dataclass(frozen=True)
class ProtectionResult:
"""
Result of protection path establishment or switchover.
Used for 1+1 protection scenarios to track the establishment
of primary and backup paths, and any switchover events.
Attributes:
primary_established: Whether primary path was established.
backup_established: Whether backup path was established.
primary_spectrum: Spectrum allocation for primary path.
backup_spectrum: Spectrum allocation for backup path.
switchover_triggered: Whether a switchover event occurred.
switchover_success: Whether switchover completed successfully.
switchover_time_ms: Time taken for switchover in milliseconds.
failure_type: Type of failure that triggered switchover.
recovery_start: Simulation time when recovery started.
recovery_end: Simulation time when recovery completed.
recovery_type: Type of recovery ("protection" or "restoration").
Example:
>>> result = ProtectionResult.established(
... primary_spectrum=spectrum1,
... backup_spectrum=spectrum2,
... )
>>> result.is_fully_protected
True
"""
primary_established: bool = False
backup_established: bool = False
primary_spectrum: SpectrumResult | None = None
backup_spectrum: SpectrumResult | None = None
switchover_triggered: bool = False
switchover_success: bool = False
switchover_time_ms: float | None = None
failure_type: str | None = None
recovery_start: float | None = None
recovery_end: float | None = None
recovery_type: str | None = None
@property
def is_fully_protected(self) -> bool:
"""True if both primary and backup paths are established."""
return self.primary_established and self.backup_established
@property
def recovery_duration_ms(self) -> float | None:
"""Duration of recovery in milliseconds, if completed."""
if self.recovery_start is not None and self.recovery_end is not None:
return (self.recovery_end - self.recovery_start) * 1000
return None
[docs]
@classmethod
def established(
cls,
primary_spectrum: SpectrumResult,
backup_spectrum: SpectrumResult | None = None,
) -> ProtectionResult:
"""Create result for successfully established protection."""
return cls(
primary_established=True,
backup_established=backup_spectrum is not None,
primary_spectrum=primary_spectrum,
backup_spectrum=backup_spectrum,
)
[docs]
@classmethod
def primary_only(cls, primary_spectrum: SpectrumResult) -> ProtectionResult:
"""Create result when only primary path established (backup failed)."""
return cls(
primary_established=True,
backup_established=False,
primary_spectrum=primary_spectrum,
)
[docs]
@classmethod
def failed(cls) -> ProtectionResult:
"""Create result for failed protection establishment."""
return cls(
primary_established=False,
backup_established=False,
)
[docs]
@classmethod
def switchover(
cls,
success: bool,
switchover_time_ms: float,
failure_type: str,
recovery_type: str = "protection",
) -> ProtectionResult:
"""Create result for a switchover event."""
return cls(
primary_established=True,
backup_established=True,
switchover_triggered=True,
switchover_success=success,
switchover_time_ms=switchover_time_ms if success else None,
failure_type=failure_type,
recovery_type=recovery_type,
)
# =============================================================================
# AllocationResult - FINAL AUTHORITY
# =============================================================================
[docs]
@dataclass(frozen=True)
class AllocationResult:
"""
Final result of request allocation - SINGLE SOURCE OF TRUTH.
This is the authoritative result returned by the orchestrator.
The `success` field is the final word on whether a request
was served (fully or partially).
Attributes:
success: FINAL AUTHORITY - True if request was served.
lightpaths_created: IDs of newly created lightpaths.
lightpaths_groomed: IDs of existing lightpaths used.
total_bandwidth_allocated_gbps: Total bandwidth allocated.
is_groomed: Request used existing lightpath capacity.
is_partially_groomed: Mix of existing and new lightpath.
is_sliced: Request split across multiple lightpaths.
is_protected: Request has 1+1 protection.
block_reason: BlockReason enum if success=False.
bandwidth_allocations: Bandwidth allocated per segment.
modulations: Modulation format per segment.
cores: Core number per segment.
bands: Frequency band per segment.
start_slots: Start slot per segment.
end_slots: End slot per segment.
xt_costs: Crosstalk cost per segment (from crosstalk_list).
xt_values: Crosstalk values per segment (from xt_list).
snr_values: SNR value per segment.
lightpath_bandwidths: Total bandwidth per lightpath.
failed_attempt_snr_values: SNR values from failed allocation attempts.
path_index: Which k-path was selected (0, 1, 2...).
route_result: Routing pipeline output.
spectrum_result: Spectrum pipeline output.
grooming_result: Grooming pipeline output.
slicing_result: Slicing pipeline output.
snr_result: SNR validation output.
protection_result: Protection establishment output.
Example:
>>> result = AllocationResult.success_new_lightpath(
... lightpath_id=42,
... bandwidth_gbps=100,
... )
>>> result.success
True
>>> result.num_lightpaths
1
"""
success: bool
# Lightpath tracking
lightpaths_created: tuple[int, ...] = ()
lightpaths_groomed: tuple[int, ...] = ()
total_bandwidth_allocated_gbps: int = 0
# Feature flags
is_groomed: bool = False
is_partially_groomed: bool = False
is_sliced: bool = False
is_protected: bool = False
# Failure info
block_reason: BlockReason | None = None
# Per-segment tracking (for sliced/multi-lightpath allocations)
bandwidth_allocations: tuple[int, ...] = ()
modulations: tuple[str, ...] = ()
cores: tuple[int, ...] = ()
bands: tuple[str, ...] = ()
start_slots: tuple[int, ...] = ()
end_slots: tuple[int, ...] = ()
xt_costs: tuple[float, ...] = () # From legacy crosstalk_list
xt_values: tuple[float, ...] = () # From legacy xt_list
snr_values: tuple[float, ...] = ()
lightpath_bandwidths: tuple[int, ...] = ()
# Legacy compatibility: SNR values from failed allocation attempts
# (allocations that passed initially but failed SNR recheck)
failed_attempt_snr_values: tuple[float, ...] = ()
# Path tracking
path_index: int = 0 # Which k-path was selected (0, 1, 2...)
# Nested results (for debugging/tracing)
route_result: RouteResult | None = None
spectrum_result: SpectrumResult | None = None
grooming_result: GroomingResult | None = None
slicing_result: SlicingResult | None = None
snr_result: SNRResult | None = None
protection_result: ProtectionResult | None = None
def __post_init__(self) -> None:
"""Validate allocation result after creation."""
if self.success:
# Must have allocated something
total_lps = len(self.lightpaths_created) + len(self.lightpaths_groomed)
if total_lps == 0:
raise ValueError("success=True requires at least one lightpath")
if self.total_bandwidth_allocated_gbps <= 0:
raise ValueError("success=True requires total_bandwidth > 0")
else:
# Must have a reason for failure
if self.block_reason is None:
raise ValueError("success=False requires block_reason")
@property
def all_lightpath_ids(self) -> tuple[int, ...]:
"""All lightpath IDs (created + groomed)."""
return self.lightpaths_created + self.lightpaths_groomed
@property
def num_lightpaths(self) -> int:
"""Total number of lightpaths used."""
return len(self.lightpaths_created) + len(self.lightpaths_groomed)
@property
def used_grooming(self) -> bool:
"""True if any grooming was used."""
return self.is_groomed or self.is_partially_groomed
[docs]
@classmethod
def blocked(
cls,
reason: BlockReason,
route_result: RouteResult | None = None,
spectrum_result: SpectrumResult | None = None,
snr_result: SNRResult | None = None,
) -> AllocationResult:
"""Create result for blocked request."""
return cls(
success=False,
block_reason=reason,
route_result=route_result,
spectrum_result=spectrum_result,
snr_result=snr_result,
)
[docs]
@classmethod
def success_new_lightpath(
cls,
lightpath_id: int,
bandwidth_gbps: int,
route_result: RouteResult | None = None,
spectrum_result: SpectrumResult | None = None,
snr_result: SNRResult | None = None,
is_protected: bool = False,
) -> AllocationResult:
"""Create result for successful allocation with new lightpath."""
return cls(
success=True,
lightpaths_created=(lightpath_id,),
total_bandwidth_allocated_gbps=bandwidth_gbps,
is_protected=is_protected,
route_result=route_result,
spectrum_result=spectrum_result,
snr_result=snr_result,
)
[docs]
@classmethod
def success_groomed(
cls,
lightpath_ids: list[int],
bandwidth_gbps: int,
grooming_result: GroomingResult | None = None,
) -> AllocationResult:
"""Create result for fully groomed request."""
return cls(
success=True,
lightpaths_groomed=tuple(lightpath_ids),
total_bandwidth_allocated_gbps=bandwidth_gbps,
is_groomed=True,
grooming_result=grooming_result,
)
[docs]
@classmethod
def success_partial_groom(
cls,
groomed_ids: list[int],
new_lightpath_id: int,
total_bandwidth: int,
grooming_result: GroomingResult | None = None,
route_result: RouteResult | None = None,
spectrum_result: SpectrumResult | None = None,
snr_result: SNRResult | None = None,
) -> AllocationResult:
"""Create result for partially groomed request."""
return cls(
success=True,
lightpaths_created=(new_lightpath_id,),
lightpaths_groomed=tuple(groomed_ids),
total_bandwidth_allocated_gbps=total_bandwidth,
is_partially_groomed=True,
grooming_result=grooming_result,
route_result=route_result,
spectrum_result=spectrum_result,
snr_result=snr_result,
)
[docs]
@classmethod
def success_sliced(
cls,
lightpath_ids: list[int],
bandwidth_gbps: int,
slicing_result: SlicingResult | None = None,
route_result: RouteResult | None = None,
) -> AllocationResult:
"""Create result for sliced request."""
return cls(
success=True,
lightpaths_created=tuple(lightpath_ids),
total_bandwidth_allocated_gbps=bandwidth_gbps,
is_sliced=True,
slicing_result=slicing_result,
route_result=route_result,
)