Source code for fusion.policies.heuristic_policy

"""
Heuristic policies for path selection.

This module provides deterministic, rule-based path selection strategies
implementing the ControlPolicy protocol. Available policies:

- FirstFeasiblePolicy: Select first feasible path (K-shortest first fit)
- ShortestFeasiblePolicy: Select shortest feasible path by distance
- LeastCongestedPolicy: Select least congested feasible path
- RandomFeasiblePolicy: Randomly select among feasible paths
- LoadBalancedPolicy: Balance path length and congestion

All policies implement the ControlPolicy protocol but do not learn
from experience (update() is a no-op).

Example:
    >>> from fusion.policies.heuristic_policy import ShortestFeasiblePolicy
    >>> policy = ShortestFeasiblePolicy()
    >>> action = policy.select_action(request, options, network_state)

Note:
    Heuristic policies are typically used as:
    - Default selection strategies
    - Baselines for RL/ML comparison
    - Fallback when ML models fail
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

import numpy as np

if TYPE_CHECKING:
    from fusion.domain.network_state import NetworkState
    from fusion.domain.request import Request
    from fusion.modules.rl.adapter import PathOption


[docs] class HeuristicPolicy(ABC): """ Abstract base class for heuristic path selection policies. Heuristic policies are deterministic (except RandomFeasiblePolicy), rule-based strategies that select paths without learning. They serve as: 1. Default selection strategies 2. Baselines for RL/ML comparison 3. Fallback policies when ML models fail All subclasses must implement select_action(). The update() method is a no-op since heuristics don't learn. This class implements the ControlPolicy protocol. """
[docs] @abstractmethod def select_action( self, request: Request, options: list[PathOption], network_state: NetworkState, ) -> int: """ Select an action (path index) based on heuristic rule. :param request: The incoming request (may be used for context). :type request: Request :param options: List of available PathOptions. :type options: list[PathOption] :param network_state: Current network state (read-only). :type network_state: NetworkState :return: Selected path index (0 to len(options)-1), or -1 if none valid. :rtype: int """ ...
[docs] def update( # noqa: B027 self, request: Request, action: int, reward: float ) -> None: """ Update policy based on experience. Heuristic policies do not learn, so this is a no-op. This is intentionally not abstract - subclasses inherit this no-op. :param request: The request that was served (ignored). :type request: Request :param action: The action taken (ignored). :type action: int :param reward: The reward received (ignored). :type reward: float """ pass
[docs] def get_name(self) -> str: """ Return the policy name for logging. :return: Policy class name. :rtype: str """ return self.__class__.__name__
def _get_feasible_options( self, options: list[PathOption], ) -> list[PathOption]: """ Filter options to only feasible ones. :param options: List of path options. :type options: list[PathOption] :return: Filtered list containing only feasible options. :rtype: list[PathOption] """ return [opt for opt in options if opt.is_feasible]
[docs] class FirstFeasiblePolicy(HeuristicPolicy): """ Select the first feasible path in index order. This is the simplest heuristic. It iterates through options and returns the first path where is_feasible=True. Equivalent to "K-Shortest Path First Fit" when paths are pre-sorted by length. Selection Logic: 1. Iterate through options in index order 2. Return first option where is_feasible=True 3. Return -1 if no feasible option exists Time Complexity: O(n) worst case, O(1) best case Space Complexity: O(1) """
[docs] def select_action( self, request: Request, options: list[PathOption], network_state: NetworkState, ) -> int: """Select the first feasible path.""" for opt in options: if opt.is_feasible: return opt.path_index return -1
[docs] class ShortestFeasiblePolicy(HeuristicPolicy): """ Select the shortest feasible path by distance. Finds all feasible paths and selects the one with minimum weight_km (path length in kilometers). Selection Logic: 1. Filter to feasible options 2. Find option with minimum weight_km 3. Return its path_index, or -1 if none feasible Tie Breaking: First occurrence when tied on weight_km. Time Complexity: O(n) Space Complexity: O(n) for feasible list """
[docs] def select_action( self, request: Request, options: list[PathOption], network_state: NetworkState, ) -> int: """Select the shortest feasible path by weight_km.""" feasible = self._get_feasible_options(options) if not feasible: return -1 shortest = min(feasible, key=lambda opt: opt.weight_km) return shortest.path_index
[docs] class LeastCongestedPolicy(HeuristicPolicy): """ Select the least congested feasible path. Prioritizes paths with lower congestion values to distribute load across the network and reduce fragmentation. Selection Logic: 1. Filter to feasible options 2. Find option with minimum congestion (0.0 to 1.0) 3. Return its path_index, or -1 if none feasible Tie Breaking: First occurrence when tied on congestion. Time Complexity: O(n) Space Complexity: O(n) for feasible list """
[docs] def select_action( self, request: Request, options: list[PathOption], network_state: NetworkState, ) -> int: """Select the least congested feasible path.""" feasible = self._get_feasible_options(options) if not feasible: return -1 least_congested = min(feasible, key=lambda opt: opt.congestion) return least_congested.path_index
[docs] class RandomFeasiblePolicy(HeuristicPolicy): """ Randomly select among feasible paths. Uniformly samples from all feasible paths. Useful for: - Exploration during training - Baseline comparison (random performance) - Load distribution across multiple paths Uses numpy's random number generator with optional seed. Selection Logic: 1. Filter to feasible options 2. Uniformly sample one option 3. Return its path_index, or -1 if none feasible Time Complexity: O(n) Space Complexity: O(n) for feasible list :ivar _rng: Numpy random generator. :vartype _rng: numpy.random.Generator :ivar _seed: Original seed for reset. :vartype _seed: int | None """
[docs] def __init__(self, seed: int | None = None) -> None: """ Initialize with optional random seed. :param seed: Random seed for reproducibility. If None, uses system entropy (non-reproducible). :type seed: int | None """ self._seed = seed self._rng = np.random.default_rng(seed)
[docs] def select_action( self, request: Request, options: list[PathOption], network_state: NetworkState, ) -> int: """Randomly select a feasible path.""" feasible = self._get_feasible_options(options) if not feasible: return -1 # Use integer index to avoid numpy type issues idx = int(self._rng.integers(0, len(feasible))) return feasible[idx].path_index
[docs] def reset_rng(self, seed: int | None = None) -> None: """ Reset the random number generator. :param seed: New seed. If None, uses the original seed. :type seed: int | None """ if seed is None: seed = self._seed self._rng = np.random.default_rng(seed)
[docs] def get_name(self) -> str: """ Return policy name including seed. :return: Policy name with seed if set. :rtype: str """ if self._seed is not None: return f"RandomFeasiblePolicy(seed={self._seed})" return "RandomFeasiblePolicy"
[docs] class LoadBalancedPolicy(HeuristicPolicy): """ Select path balancing length and congestion. Combines path length (weight_km) and congestion into a weighted score:: score = alpha * normalized_length + (1 - alpha) * congestion Where normalized_length = weight_km / max_weight_km among feasible paths. Alpha Parameter: - 0.0: Pure congestion-based (same as LeastCongestedPolicy) - 0.5: Equal weight to length and congestion (default) - 1.0: Pure length-based (same as ShortestFeasiblePolicy) Selection Logic: 1. Filter to feasible options 2. Normalize weight_km to [0, 1] range 3. Compute weighted score for each option 4. Return option with minimum score Tie Breaking: First occurrence when tied on score. Time Complexity: O(n) Space Complexity: O(n) for feasible list :ivar _alpha: Weight for path length (0.0 to 1.0). :vartype _alpha: float """
[docs] def __init__(self, alpha: float = 0.5) -> None: """ Initialize with load balancing weight. :param alpha: Weight for path length (0.0 to 1.0). 0.0 = prioritize low congestion, 1.0 = prioritize short length, 0.5 = equal balance (default). :type alpha: float :raises ValueError: If alpha is not in [0, 1] range. """ if not 0.0 <= alpha <= 1.0: raise ValueError(f"alpha must be in [0, 1], got {alpha}") self._alpha = alpha
@property def alpha(self) -> float: """ Current alpha value. :return: The alpha weight parameter. :rtype: float """ return self._alpha
[docs] def select_action( self, request: Request, options: list[PathOption], network_state: NetworkState, ) -> int: """Select path with minimum weighted score.""" feasible = self._get_feasible_options(options) if not feasible: return -1 max_weight = max(opt.weight_km for opt in feasible) if max_weight == 0: max_weight = 1.0 # Avoid division by zero def compute_score(opt: PathOption) -> float: normalized_length = opt.weight_km / max_weight return self._alpha * normalized_length + (1 - self._alpha) * opt.congestion best = min(feasible, key=compute_score) return best.path_index
[docs] def get_name(self) -> str: """ Return policy name including alpha. :return: Policy name with alpha parameter. :rtype: str """ return f"LoadBalancedPolicy(alpha={self._alpha})"