Pipeline Protocols
This document provides detailed documentation for each pipeline protocol defined
in fusion/interfaces/pipelines.py and fusion/interfaces/control_policy.py.
RoutingPipeline
- Location:
fusion/interfaces/pipelines.py- Purpose:
Find candidate routes between source and destination nodes
The RoutingPipeline protocol defines how routing algorithms expose their
functionality to the orchestrator.
Protocol Definition
@runtime_checkable
class RoutingPipeline(Protocol):
def find_routes(
self,
source: str,
destination: str,
bandwidth_gbps: int,
network_state: NetworkState,
*,
forced_path: list[str] | None = None,
) -> RouteResult:
...
Parameters
Parameter |
Type |
Description |
|---|---|---|
|
|
Source node identifier |
|
|
Destination node identifier |
|
|
Required bandwidth (used for modulation selection) |
|
|
Current network state (topology, configuration) |
|
|
If provided, use this path instead of searching (for partial grooming) |
Return Value
Returns a RouteResult (from fusion.domain.results) containing:
paths: Candidate paths as tuples of node IDsweights_km: Path distances in kilometersmodulations: Valid modulation formats per pathstrategy_name: Name of routing algorithm used
Returns an empty RouteResult (is_empty=True) if no routes found.
Implementation Notes
Number of paths is limited by
network_state.config.k_pathsModulation formats are filtered by reach based on path weight
Must NOT modify
network_state(pure query method)Must NOT store
network_stateas instance attribute
Example Implementation
from fusion.domain.network_state import NetworkState
from fusion.domain.results import RouteResult
class KShortestPathRouting:
"""K-shortest paths routing implementation."""
def find_routes(
self,
source: str,
destination: str,
bandwidth_gbps: int,
network_state: NetworkState,
*,
forced_path: list[str] | None = None,
) -> RouteResult:
if forced_path is not None:
# Use forced path from grooming
paths = (tuple(forced_path),)
weights = (self._calculate_weight(forced_path, network_state),)
else:
# Find k-shortest paths
k = network_state.config.k_paths
paths, weights = self._find_k_shortest(
network_state.topology, source, destination, k
)
# Filter modulations by reach
modulations = self._get_valid_modulations(weights, bandwidth_gbps)
return RouteResult(
paths=paths,
weights_km=weights,
modulations=modulations,
strategy_name="KShortestPath",
)
SpectrumPipeline
- Location:
fusion/interfaces/pipelines.py- Purpose:
Find available spectrum slots along a path
The SpectrumPipeline protocol defines how spectrum assignment algorithms
find free spectrum for lightpath establishment.
Protocol Definition
@runtime_checkable
class SpectrumPipeline(Protocol):
def find_spectrum(
self,
path: list[str],
modulation: str | list[str],
bandwidth_gbps: int,
network_state: NetworkState,
*,
connection_index: int | None = None,
path_index: int = 0,
use_dynamic_slicing: bool = False,
snr_bandwidth: int | None = None,
request_id: int | None = None,
slice_bandwidth: int | None = None,
) -> SpectrumResult:
...
def find_protected_spectrum(
self,
primary_path: list[str],
backup_path: list[str],
modulation: str,
bandwidth_gbps: int,
network_state: NetworkState,
) -> SpectrumResult:
...
Key Parameters
Parameter |
Type |
Description |
|---|---|---|
|
|
Ordered list of node IDs forming the route |
|
|
Modulation format(s) to try |
|
|
Required bandwidth in Gbps |
|
|
Current network state |
|
|
If True, may return partial allocation |
Return Value
Returns a SpectrumResult containing:
is_free: True if spectrum was foundstart_slot,end_slot: Slot range (if found)core: Core index (if found)band: Band identifier (if found)modulation: Confirmed modulation formatslots_needed: Number of slots required
Implementation Notes
Does NOT allocate spectrum (caller uses
NetworkState.create_lightpath())Searches across all bands and cores based on allocation policy
slots_neededincludes guard band slots
GroomingPipeline
- Location:
fusion/interfaces/pipelines.py- Purpose:
Pack requests onto existing lightpaths with available capacity
Protocol Definition
@runtime_checkable
class GroomingPipeline(Protocol):
def try_groom(
self,
request: Request,
network_state: NetworkState,
) -> GroomingResult:
...
def rollback_groom(
self,
request: Request,
lightpath_ids: list[int],
network_state: NetworkState,
) -> None:
...
Return Value
Returns a GroomingResult containing:
fully_groomed: True if entire request was groomedpartially_groomed: True if some bandwidth was groomedbandwidth_groomed_gbps: Amount successfully groomedremaining_bandwidth_gbps: Amount still needing allocationlightpaths_used: IDs of lightpaths usedforced_path: Suggested path for remainder (if partial)
Implementation Notes
MAY have side effects (modifies lightpath bandwidth allocations)
Must support rollback via
rollback_groom()Does NOT modify spectrum (lightpaths already exist)
SNRPipeline
- Location:
fusion/interfaces/pipelines.py- Purpose:
Validate signal-to-noise ratio meets modulation requirements
Protocol Definition
@runtime_checkable
class SNRPipeline(Protocol):
def validate(
self,
lightpath: Lightpath,
network_state: NetworkState,
) -> SNRResult:
...
def recheck_affected(
self,
new_lightpath_id: int,
network_state: NetworkState,
*,
affected_range_slots: int = 5,
slicing_flag: bool = False,
) -> SNRRecheckResult:
...
Return Values
validate() returns SNRResult:
passed: True if SNR meets thresholdsnr_db: Measured/calculated SNR valuerequired_snr_db: Threshold for modulationmargin_db: SNR margin above thresholdfailure_reason: Why it failed (if applicable)
recheck_affected() returns SNRRecheckResult:
all_pass: True if all affected lightpaths still passdegraded_lightpath_ids: Lightpaths now failingviolations: Dict mapping lightpath_id to SNR shortfall
Implementation Notes
Both methods are pure query methods (no side effects)
recheck_affected()identifies existing lightpaths degraded by new allocationUsed to trigger rollback if existing lightpaths become invalid
SlicingPipeline
- Location:
fusion/interfaces/pipelines.py- Purpose:
Divide large requests into multiple smaller allocations
Protocol Definition
@runtime_checkable
class SlicingPipeline(Protocol):
def try_slice(
self,
request: Request,
path: list[str],
modulation: str,
bandwidth_gbps: int,
network_state: NetworkState,
*,
max_slices: int | None = None,
spectrum_pipeline: SpectrumPipeline | None = None,
snr_pipeline: SNRPipeline | None = None,
connection_index: int | None = None,
path_index: int = 0,
snr_accumulator: list[float] | None = None,
path_weight: float | None = None,
) -> SlicingResult:
...
def rollback_slices(
self,
lightpath_ids: list[int],
network_state: NetworkState,
) -> None:
...
Return Value
Returns a SlicingResult containing:
success: True if slicing is possiblenum_slices: Number of slices neededslice_bandwidth_gbps: Bandwidth per slicelightpaths_created: IDs if allocated, empty for feasibility checktotal_bandwidth_gbps: Total bandwidth allocated
Implementation Notes
Only called when single lightpath allocation fails
By default, just checks feasibility (doesn’t allocate)
When
spectrum_pipelineandsnr_pipelineprovided, performs full allocationLimited by
max_slicesorconfig.max_slices
ControlPolicy
- Location:
fusion/interfaces/control_policy.py- Purpose:
Select actions for resource allocation from available options
The ControlPolicy protocol defines how decision-making policies (heuristics,
RL agents, or supervised/unsupervised models) interface with the orchestrator.
Protocol Definition
@runtime_checkable
class ControlPolicy(Protocol):
def select_action(
self,
request: Request,
options: list[PathOption],
network_state: NetworkState,
) -> int:
...
def update(self, request: Request, action: int, reward: float) -> None:
...
def get_name(self) -> str:
...
Methods
select_action()
Select which path option to use for the given request.
- param request:
The incoming request to serve
- param options:
List of
PathOptionobjects with feasibility info- param network_state:
Current network state (read-only)
- return:
Path index (0 to len-1), or -1 if no valid action
Note
Policies MUST only return indices where options[i].is_feasible is True.
Returning an infeasible index is undefined behavior.
update()
Update policy based on experience (for online learning).
- param request:
The request that was served
- param action:
The action (path index) that was taken
- param reward:
The reward received
Note
Heuristic policies: implement as
passRL policies: update replay buffers, internal state
Supervised/unsupervised policies: implement as
pass(inference only)
get_name()
Return the policy name for logging and metrics.
- return:
Human-readable policy name
Examples: "FirstFeasiblePolicy", "RLPolicy(PPO)", "SupervisedPolicy(pytorch)"
Example Implementations
Heuristic Policy (First Feasible)
class FirstFeasiblePolicy:
"""Select the first feasible path option."""
def select_action(self, request, options, network_state) -> int:
for i, opt in enumerate(options):
if opt.is_feasible:
return i
return -1 # No feasible option
def update(self, request, action, reward) -> None:
pass # Heuristic - no learning
def get_name(self) -> str:
return "FirstFeasiblePolicy"
RL Policy Wrapper
class RLControlPolicy:
"""Wrap an RL agent as a ControlPolicy."""
def __init__(self, agent, observation_builder):
self.agent = agent
self.obs_builder = observation_builder
def select_action(self, request, options, network_state) -> int:
obs = self.obs_builder.build(request, options, network_state)
action = self.agent.predict(obs)
# Ensure action is feasible
if 0 <= action < len(options) and options[action].is_feasible:
return action
# Fallback to first feasible
for i, opt in enumerate(options):
if opt.is_feasible:
return i
return -1
def update(self, request, action, reward) -> None:
self.agent.store_transition(action, reward)
def get_name(self) -> str:
return f"RLPolicy({self.agent.algorithm_name})"
Protocol Compliance Checklist
When implementing a pipeline protocol, verify:
Check |
How to Verify |
|---|---|
Method names match exactly |
Compare with protocol definition |
Parameter names match |
Include all required and optional parameters |
Parameter types match |
Use same types or compatible subtypes |
Return type matches |
Return the exact result type from |
No stored NetworkState |
Receive state as parameter, never as |
Passes isinstance() check |
|
Passes mypy |
Run |
Testing Protocol Compliance
from fusion.interfaces import RoutingPipeline
def test_my_router_satisfies_protocol():
"""Verify implementation satisfies RoutingPipeline protocol."""
router = MyCustomRouter()
# Runtime check
assert isinstance(router, RoutingPipeline)
# Call with minimal arguments
result = router.find_routes(
source="0",
destination="5",
bandwidth_gbps=100,
network_state=mock_state,
)
# Verify return type
from fusion.domain.results import RouteResult
assert isinstance(result, RouteResult)