Pipeline Protocols

This document provides detailed documentation for each pipeline protocol defined in fusion/interfaces/pipelines.py and fusion/interfaces/control_policy.py.

RoutingPipeline

Location:

fusion/interfaces/pipelines.py

Purpose:

Find candidate routes between source and destination nodes

The RoutingPipeline protocol defines how routing algorithms expose their functionality to the orchestrator.

Protocol Definition

@runtime_checkable
class RoutingPipeline(Protocol):
    def find_routes(
        self,
        source: str,
        destination: str,
        bandwidth_gbps: int,
        network_state: NetworkState,
        *,
        forced_path: list[str] | None = None,
    ) -> RouteResult:
        ...

Parameters

Parameter

Type

Description

source

str

Source node identifier

destination

str

Destination node identifier

bandwidth_gbps

int

Required bandwidth (used for modulation selection)

network_state

NetworkState

Current network state (topology, configuration)

forced_path

list[str] | None

If provided, use this path instead of searching (for partial grooming)

Return Value

Returns a RouteResult (from fusion.domain.results) containing:

  • paths: Candidate paths as tuples of node IDs

  • weights_km: Path distances in kilometers

  • modulations: Valid modulation formats per path

  • strategy_name: Name of routing algorithm used

Returns an empty RouteResult (is_empty=True) if no routes found.

Implementation Notes

  • Number of paths is limited by network_state.config.k_paths

  • Modulation formats are filtered by reach based on path weight

  • Must NOT modify network_state (pure query method)

  • Must NOT store network_state as instance attribute

Example Implementation

from fusion.domain.network_state import NetworkState
from fusion.domain.results import RouteResult

class KShortestPathRouting:
    """K-shortest paths routing implementation."""

    def find_routes(
        self,
        source: str,
        destination: str,
        bandwidth_gbps: int,
        network_state: NetworkState,
        *,
        forced_path: list[str] | None = None,
    ) -> RouteResult:
        if forced_path is not None:
            # Use forced path from grooming
            paths = (tuple(forced_path),)
            weights = (self._calculate_weight(forced_path, network_state),)
        else:
            # Find k-shortest paths
            k = network_state.config.k_paths
            paths, weights = self._find_k_shortest(
                network_state.topology, source, destination, k
            )

        # Filter modulations by reach
        modulations = self._get_valid_modulations(weights, bandwidth_gbps)

        return RouteResult(
            paths=paths,
            weights_km=weights,
            modulations=modulations,
            strategy_name="KShortestPath",
        )

SpectrumPipeline

Location:

fusion/interfaces/pipelines.py

Purpose:

Find available spectrum slots along a path

The SpectrumPipeline protocol defines how spectrum assignment algorithms find free spectrum for lightpath establishment.

Protocol Definition

@runtime_checkable
class SpectrumPipeline(Protocol):
    def find_spectrum(
        self,
        path: list[str],
        modulation: str | list[str],
        bandwidth_gbps: int,
        network_state: NetworkState,
        *,
        connection_index: int | None = None,
        path_index: int = 0,
        use_dynamic_slicing: bool = False,
        snr_bandwidth: int | None = None,
        request_id: int | None = None,
        slice_bandwidth: int | None = None,
    ) -> SpectrumResult:
        ...

    def find_protected_spectrum(
        self,
        primary_path: list[str],
        backup_path: list[str],
        modulation: str,
        bandwidth_gbps: int,
        network_state: NetworkState,
    ) -> SpectrumResult:
        ...

Key Parameters

Parameter

Type

Description

path

list[str]

Ordered list of node IDs forming the route

modulation

str | list[str]

Modulation format(s) to try

bandwidth_gbps

int

Required bandwidth in Gbps

network_state

NetworkState

Current network state

use_dynamic_slicing

bool

If True, may return partial allocation

Return Value

Returns a SpectrumResult containing:

  • is_free: True if spectrum was found

  • start_slot, end_slot: Slot range (if found)

  • core: Core index (if found)

  • band: Band identifier (if found)

  • modulation: Confirmed modulation format

  • slots_needed: Number of slots required

Implementation Notes

  • Does NOT allocate spectrum (caller uses NetworkState.create_lightpath())

  • Searches across all bands and cores based on allocation policy

  • slots_needed includes guard band slots

GroomingPipeline

Location:

fusion/interfaces/pipelines.py

Purpose:

Pack requests onto existing lightpaths with available capacity

Protocol Definition

@runtime_checkable
class GroomingPipeline(Protocol):
    def try_groom(
        self,
        request: Request,
        network_state: NetworkState,
    ) -> GroomingResult:
        ...

    def rollback_groom(
        self,
        request: Request,
        lightpath_ids: list[int],
        network_state: NetworkState,
    ) -> None:
        ...

Return Value

Returns a GroomingResult containing:

  • fully_groomed: True if entire request was groomed

  • partially_groomed: True if some bandwidth was groomed

  • bandwidth_groomed_gbps: Amount successfully groomed

  • remaining_bandwidth_gbps: Amount still needing allocation

  • lightpaths_used: IDs of lightpaths used

  • forced_path: Suggested path for remainder (if partial)

Implementation Notes

  • MAY have side effects (modifies lightpath bandwidth allocations)

  • Must support rollback via rollback_groom()

  • Does NOT modify spectrum (lightpaths already exist)

SNRPipeline

Location:

fusion/interfaces/pipelines.py

Purpose:

Validate signal-to-noise ratio meets modulation requirements

Protocol Definition

@runtime_checkable
class SNRPipeline(Protocol):
    def validate(
        self,
        lightpath: Lightpath,
        network_state: NetworkState,
    ) -> SNRResult:
        ...

    def recheck_affected(
        self,
        new_lightpath_id: int,
        network_state: NetworkState,
        *,
        affected_range_slots: int = 5,
        slicing_flag: bool = False,
    ) -> SNRRecheckResult:
        ...

Return Values

validate() returns SNRResult:

  • passed: True if SNR meets threshold

  • snr_db: Measured/calculated SNR value

  • required_snr_db: Threshold for modulation

  • margin_db: SNR margin above threshold

  • failure_reason: Why it failed (if applicable)

recheck_affected() returns SNRRecheckResult:

  • all_pass: True if all affected lightpaths still pass

  • degraded_lightpath_ids: Lightpaths now failing

  • violations: Dict mapping lightpath_id to SNR shortfall

Implementation Notes

  • Both methods are pure query methods (no side effects)

  • recheck_affected() identifies existing lightpaths degraded by new allocation

  • Used to trigger rollback if existing lightpaths become invalid

SlicingPipeline

Location:

fusion/interfaces/pipelines.py

Purpose:

Divide large requests into multiple smaller allocations

Protocol Definition

@runtime_checkable
class SlicingPipeline(Protocol):
    def try_slice(
        self,
        request: Request,
        path: list[str],
        modulation: str,
        bandwidth_gbps: int,
        network_state: NetworkState,
        *,
        max_slices: int | None = None,
        spectrum_pipeline: SpectrumPipeline | None = None,
        snr_pipeline: SNRPipeline | None = None,
        connection_index: int | None = None,
        path_index: int = 0,
        snr_accumulator: list[float] | None = None,
        path_weight: float | None = None,
    ) -> SlicingResult:
        ...

    def rollback_slices(
        self,
        lightpath_ids: list[int],
        network_state: NetworkState,
    ) -> None:
        ...

Return Value

Returns a SlicingResult containing:

  • success: True if slicing is possible

  • num_slices: Number of slices needed

  • slice_bandwidth_gbps: Bandwidth per slice

  • lightpaths_created: IDs if allocated, empty for feasibility check

  • total_bandwidth_gbps: Total bandwidth allocated

Implementation Notes

  • Only called when single lightpath allocation fails

  • By default, just checks feasibility (doesn’t allocate)

  • When spectrum_pipeline and snr_pipeline provided, performs full allocation

  • Limited by max_slices or config.max_slices

ControlPolicy

Location:

fusion/interfaces/control_policy.py

Purpose:

Select actions for resource allocation from available options

The ControlPolicy protocol defines how decision-making policies (heuristics, RL agents, or supervised/unsupervised models) interface with the orchestrator.

Protocol Definition

@runtime_checkable
class ControlPolicy(Protocol):
    def select_action(
        self,
        request: Request,
        options: list[PathOption],
        network_state: NetworkState,
    ) -> int:
        ...

    def update(self, request: Request, action: int, reward: float) -> None:
        ...

    def get_name(self) -> str:
        ...

Methods

select_action()

Select which path option to use for the given request.

param request:

The incoming request to serve

param options:

List of PathOption objects with feasibility info

param network_state:

Current network state (read-only)

return:

Path index (0 to len-1), or -1 if no valid action

Note

Policies MUST only return indices where options[i].is_feasible is True. Returning an infeasible index is undefined behavior.

update()

Update policy based on experience (for online learning).

param request:

The request that was served

param action:

The action (path index) that was taken

param reward:

The reward received

Note

  • Heuristic policies: implement as pass

  • RL policies: update replay buffers, internal state

  • Supervised/unsupervised policies: implement as pass (inference only)

get_name()

Return the policy name for logging and metrics.

return:

Human-readable policy name

Examples: "FirstFeasiblePolicy", "RLPolicy(PPO)", "SupervisedPolicy(pytorch)"

Example Implementations

Heuristic Policy (First Feasible)

class FirstFeasiblePolicy:
    """Select the first feasible path option."""

    def select_action(self, request, options, network_state) -> int:
        for i, opt in enumerate(options):
            if opt.is_feasible:
                return i
        return -1  # No feasible option

    def update(self, request, action, reward) -> None:
        pass  # Heuristic - no learning

    def get_name(self) -> str:
        return "FirstFeasiblePolicy"

RL Policy Wrapper

class RLControlPolicy:
    """Wrap an RL agent as a ControlPolicy."""

    def __init__(self, agent, observation_builder):
        self.agent = agent
        self.obs_builder = observation_builder

    def select_action(self, request, options, network_state) -> int:
        obs = self.obs_builder.build(request, options, network_state)
        action = self.agent.predict(obs)
        # Ensure action is feasible
        if 0 <= action < len(options) and options[action].is_feasible:
            return action
        # Fallback to first feasible
        for i, opt in enumerate(options):
            if opt.is_feasible:
                return i
        return -1

    def update(self, request, action, reward) -> None:
        self.agent.store_transition(action, reward)

    def get_name(self) -> str:
        return f"RLPolicy({self.agent.algorithm_name})"

Protocol Compliance Checklist

When implementing a pipeline protocol, verify:

Check

How to Verify

Method names match exactly

Compare with protocol definition

Parameter names match

Include all required and optional parameters

Parameter types match

Use same types or compatible subtypes

Return type matches

Return the exact result type from fusion.domain.results

No stored NetworkState

Receive state as parameter, never as self.state

Passes isinstance() check

assert isinstance(impl, ProtocolClass)

Passes mypy

Run mypy on your implementation

Testing Protocol Compliance

from fusion.interfaces import RoutingPipeline

def test_my_router_satisfies_protocol():
    """Verify implementation satisfies RoutingPipeline protocol."""
    router = MyCustomRouter()

    # Runtime check
    assert isinstance(router, RoutingPipeline)

    # Call with minimal arguments
    result = router.find_routes(
        source="0",
        destination="5",
        bandwidth_gbps=100,
        network_state=mock_state,
    )

    # Verify return type
    from fusion.domain.results import RouteResult
    assert isinstance(result, RouteResult)