RL Environments Module

At a Glance

Purpose:

Gymnasium-compatible RL environment for optical network path selection

Location:

fusion/modules/rl/environments/

Key Classes:

UnifiedSimEnv, ActionMaskWrapper, PathEncoder

Prerequisites:

Gymnasium, numpy; optionally SB3-contrib for MaskablePPO

This module provides UnifiedSimEnv, a Gymnasium-compatible environment that enables reinforcement learning agents to learn optimal path selection for optical network Routing and Spectrum Assignment (RSA).

Architecture Overview

+------------------------------------------------------------------+
|                    ENVIRONMENTS MODULE                           |
+------------------------------------------------------------------+
|                                                                  |
|  UnifiedSimEnv                ActionMaskWrapper    PathEncoder   |
|  --------------                ----------------    -----------   |
|  - Gymnasium env               - SB3 adapter       - Path to     |
|  - Configurable obs            - action_masks()      edge mask   |
|  - Action masking              - MaskablePPO       - GNN format  |
|  - Dual mode support             compatible                      |
|                                                                  |
|  Operating Modes:                                                |
|  ----------------                                                |
|  STANDALONE (testing)    <-->    WIRED (production)              |
|  - Synthetic requests            - Real SimulationEngine         |
|  - Random feasibility            - Actual spectrum checks        |
|  - No dependencies               - Full V4 stack                 |
|                                                                  |
+------------------------------------------------------------------+

Key Invariants

  1. Same Pipelines: Uses identical pipelines as non-RL simulation (no forked code)

  2. Action Masking: Invalid actions indicated in info["action_mask"]

  3. Reproducibility: Deterministic episodes via seeding

  4. Dual Mode: Works standalone for testing or wired to full simulation

Quick Start: Standalone Mode

The simplest way to use UnifiedSimEnv requires no external dependencies:

from fusion.modules.rl.environments import UnifiedSimEnv

# Create environment (standalone mode - no simulation stack needed)
env = UnifiedSimEnv(num_requests=100)

# Standard Gymnasium loop
obs, info = env.reset(seed=42)
total_reward = 0.0

while True:
    # Get valid actions from mask
    action_mask = info["action_mask"]
    valid_actions = [i for i, valid in enumerate(action_mask) if valid]

    # Select random valid action
    action = env.np_random.choice(valid_actions)

    # Take step
    obs, reward, terminated, truncated, info = env.step(action)
    total_reward += reward

    if terminated or truncated:
        break

print(f"Episode complete. Total reward: {total_reward}")

Understanding the Observation Space

UnifiedSimEnv uses a Dict observation space with configurable features. The default (obs_8) includes all available features:

from fusion.modules.rl.environments import UnifiedSimEnv

env = UnifiedSimEnv()

# Inspect observation space
for name, space in env.observation_space.spaces.items():
    print(f"{name}: shape={space.shape}, dtype={space.dtype}")

# Output:
# source: shape=(14,), dtype=float32
# destination: shape=(14,), dtype=float32
# request_bandwidth: shape=(4,), dtype=float32
# holding_time: shape=(1,), dtype=float32
# slots_needed: shape=(3,), dtype=float32
# path_lengths: shape=(3,), dtype=float32
# congestion: shape=(3,), dtype=float32
# available_slots: shape=(3,), dtype=float32
# is_feasible: shape=(3,), dtype=float32

Feature Descriptions

Feature

Shape

Description

source

(num_nodes,)

One-hot encoded source node

destination

(num_nodes,)

One-hot encoded destination node

request_bandwidth

(num_bw_classes,)

One-hot bandwidth class (10/40/100/400 Gbps)

holding_time

(1,)

Normalized request duration [0, 1]

slots_needed

(k_paths,)

Spectrum slots required per path

path_lengths

(k_paths,)

Hop count for each candidate path

congestion

(k_paths,)

Network utilization per path [0, 1]

available_slots

(k_paths,)

Free spectrum ratio per path [0, 1]

is_feasible

(k_paths,)

Binary: can this path accommodate the request?

Observation Space Configurations

Choose the observation level based on your needs:

Space

Features

Use Case

obs_1

source, destination

Minimal routing

obs_2

  • bandwidth

Bandwidth-aware

obs_3

  • holding_time

Time-sensitive

obs_4

source, dest, bw, time

Standard (good start)

obs_5

  • slots_needed, path_lengths

Resource-aware

obs_6

  • congestion

Congestion-aware

obs_7

  • available_slots

Availability-aware

obs_8

  • is_feasible

Complete (default)

from fusion.modules.rl.adapter import RLConfig
from fusion.modules.rl.environments import UnifiedSimEnv

# Use minimal observation space
config = RLConfig(obs_space="obs_1")
env = UnifiedSimEnv(config=config)

# Only source and destination in observation
obs, info = env.reset(seed=42)
print(obs.keys())  # dict_keys(['source', 'destination'])

Action Space and Masking

The action space is Discrete(k_paths) - the agent selects which candidate path to use for the current request.

Why Action Masking?

Not all paths are always feasible. A path may be infeasible if:

  • Insufficient contiguous spectrum slots

  • Path doesn’t exist for this source-destination pair

  • Physical constraints (e.g., SNR threshold)

Selecting an infeasible path results in a blocked request and penalty. Action masking prevents the agent from learning to select invalid actions.

Using Action Masks

from fusion.modules.rl.environments import UnifiedSimEnv

env = UnifiedSimEnv(num_requests=10)
obs, info = env.reset(seed=42)

# Action mask is a boolean array
action_mask = info["action_mask"]
print(f"Action mask: {action_mask}")  # e.g., [True, True, False]

# Only select from valid actions
valid_actions = [i for i, valid in enumerate(action_mask) if valid]

if valid_actions:
    action = valid_actions[0]  # Select first valid
else:
    action = 0  # Fallback (will be penalized)

obs, reward, terminated, truncated, info = env.step(action)

Training with MaskablePPO

For production training, use SB3-contrib’s MaskablePPO with the ActionMaskWrapper:

from sb3_contrib import MaskablePPO
from fusion.modules.rl.environments import UnifiedSimEnv, ActionMaskWrapper

# Create and wrap environment
env = UnifiedSimEnv(num_requests=1000)
wrapped = ActionMaskWrapper(env)

# Create MaskablePPO model
model = MaskablePPO(
    "MultiInputPolicy",  # Required for Dict observation space
    wrapped,
    verbose=1,
    n_steps=2048,
    batch_size=64,
    learning_rate=3e-4,
)

# Train
model.learn(total_timesteps=100_000)

# Evaluate
obs, info = wrapped.reset(seed=99)
total_reward = 0.0

while True:
    action, _ = model.predict(obs, action_masks=wrapped.action_masks())
    obs, reward, terminated, truncated, info = wrapped.step(int(action))
    total_reward += reward

    if terminated or truncated:
        break

print(f"Evaluation reward: {total_reward}")

ActionMaskWrapper Reference

The wrapper adapts UnifiedSimEnv for SB3’s MaskablePPO:

from fusion.modules.rl.environments import ActionMaskWrapper

wrapped = ActionMaskWrapper(env)

# Key method: returns current action mask
mask = wrapped.action_masks()  # np.ndarray of bools

# Used by MaskablePPO during predict()
action, _ = model.predict(obs, action_masks=wrapped.action_masks())

GNN Observations

For graph neural network policies, enable GNN observation mode:

from fusion.modules.rl.adapter import RLConfig
from fusion.modules.rl.environments import UnifiedSimEnv

config = RLConfig(
    use_gnn_obs=True,
    num_nodes=14,
    k_paths=5,
)
env = UnifiedSimEnv(config=config)

obs, info = env.reset(seed=42)

# Additional GNN features available
print(obs.keys())
# Includes: edge_index, edge_attr, path_masks, node_features, adjacency

GNN Feature Reference

Feature

Shape

Description

edge_index

(2, num_edges)

PyG format: [source_indices, target_indices]

edge_attr

(num_edges, 2)

Edge properties: [utilization, normalized_length]

path_masks

(k_paths, num_edges)

Binary mask: which edges used by each path

node_features

(num_nodes, 4)

Node props: [util, degree, centrality, marker]

adjacency

(num_nodes, num_nodes)

Adjacency matrix (optional)

PathEncoder for Custom GNN Features

Convert path node sequences to edge masks:

from fusion.modules.rl.environments import PathEncoder

# Create encoder for a network
encoder = PathEncoder(num_nodes=14, num_edges=42)

# Encode a path (list of node IDs)
path = [0, 1, 5, 7]
edge_mask = encoder.encode_path(path)

# Result: binary array where 1 = edge used by path
print(edge_mask.shape)  # (42,)
print(edge_mask.sum())  # 3 (three edges in path)

Wired Mode: Full Simulation Integration

For production use with real network simulation:

from fusion.core.simulation import SimulationEngine
from fusion.core.orchestrator import SDNOrchestrator
from fusion.modules.rl.adapter import RLConfig, RLSimulationAdapter
from fusion.modules.rl.environments import UnifiedSimEnv

# Setup simulation components
engine = SimulationEngine(sim_params)
orchestrator = SDNOrchestrator(engine_props)
config = RLConfig(k_paths=3, obs_space="obs_8")
adapter = RLSimulationAdapter(orchestrator, config)

# Create wired environment
env = UnifiedSimEnv(
    config=config,
    engine=engine,
    orchestrator=orchestrator,
    adapter=adapter,
)

# Now uses real network state and spectrum checks
obs, info = env.reset(seed=42)

Wired vs Standalone Comparison

Aspect

Standalone Mode

Wired Mode

Dependencies

None (self-contained)

Full V4 simulation stack

Requests

Synthetic (seeded RNG)

From SimulationEngine

Feasibility

Random (~70% feasible)

Real spectrum checks

Network State

Synthetic topology

Actual NetworkState

Use Case

Testing, CI/CD

Training, production

Episode Lifecycle

Understanding the Episode Flow

reset(seed=42)
    |
    v
Generate N requests (Poisson arrivals)
    |
    v
request_index = 0
    |
    +---> Build observation for request[0]
    |     Return (obs, info) with action_mask
    |
    v
step(action)
    |
    +---> Apply action (allocate path)
    |     Compute reward (success/block)
    |     Advance request_index
    |     Check termination
    |
    +---> If request_index < N:
    |         Build next observation
    |         Return (obs, reward, False, False, info)
    |
    +---> If request_index >= N:
              terminated = True
              Return (zero_obs, reward, True, False, info)

Episode Properties

env = UnifiedSimEnv(num_requests=100)
obs, info = env.reset(seed=42)

# Track episode progress
print(f"Current request: {env.request_index}")
print(f"Total requests: {env.num_requests}")
print(f"Episode done: {env.is_episode_done}")

# Access current request (standalone mode)
req = env.current_request
print(f"Source: {req.source}, Dest: {req.destination}")

Seeding and Reproducibility

Same seed produces identical episodes:

env = UnifiedSimEnv(num_requests=10)

# First run
obs1, _ = env.reset(seed=42)
rewards1 = []
for _ in range(10):
    obs, reward, term, trunc, info = env.step(0)
    rewards1.append(reward)

# Second run with same seed
obs2, _ = env.reset(seed=42)
rewards2 = []
for _ in range(10):
    obs, reward, term, trunc, info = env.step(0)
    rewards2.append(reward)

assert rewards1 == rewards2  # Identical!

Configuration Reference

RLConfig Parameters

from fusion.modules.rl.adapter import RLConfig

config = RLConfig(
    # Network parameters
    k_paths=3,                  # Candidate paths per request
    num_nodes=14,               # Network node count
    total_slots=320,            # Spectrum slots per link
    num_bandwidth_classes=4,    # Bandwidth quantization levels

    # Observation settings
    obs_space="obs_8",          # Feature set (obs_1 through obs_8)
    use_gnn_obs=False,          # Enable GNN observations
    num_node_features=4,        # GNN node feature dimension

    # Reward shaping
    rl_success_reward=1.0,      # Reward for successful allocation
    rl_block_penalty=-1.0,      # Penalty for blocked request
    rl_grooming_bonus=0.1,      # Bonus for traffic grooming
    rl_slicing_penalty=-0.1,    # Penalty for spectrum slicing

    # Episode settings
    max_holding_time=1000.0,    # Maximum request duration
)

env = UnifiedSimEnv(config=config, num_requests=500)

UnifiedSimEnv Constructor

env = UnifiedSimEnv(
    config=RLConfig(),          # RL configuration
    num_requests=100,           # Requests per episode
    render_mode=None,           # Gymnasium render mode

    # Wired mode (optional)
    engine=None,                # SimulationEngine instance
    orchestrator=None,          # SDNOrchestrator instance
    adapter=None,               # RLSimulationAdapter instance
)

Troubleshooting

Common Issues

“No valid actions available”

All paths are infeasible for this request. This can happen with:

  • High network load (many active connections)

  • Large bandwidth requests

  • Limited spectrum availability

action_mask = info["action_mask"]
if not any(action_mask):
    # All paths infeasible - must select anyway (will block)
    action = 0
else:
    valid = [i for i, v in enumerate(action_mask) if v]
    action = valid[0]

“Step called after episode terminated”

Call reset() before stepping:

obs, info = env.reset(seed=42)
while True:
    obs, reward, terminated, truncated, info = env.step(action)
    if terminated or truncated:
        # Must reset before next step
        obs, info = env.reset()
        break

“MaskablePPO expects ndarray, not dict”

Use MultiInputPolicy for Dict observation spaces:

# Wrong: MlpPolicy expects flat observations
model = MaskablePPO("MlpPolicy", env)  # Error!

# Correct: MultiInputPolicy handles Dict spaces
model = MaskablePPO("MultiInputPolicy", env)

Observation space mismatch after config change

Recreate the environment after changing config:

# Wrong: reusing environment with different config
env.config.obs_space = "obs_4"  # Doesn't update spaces!

# Correct: create new environment
new_config = RLConfig(obs_space="obs_4")
env = UnifiedSimEnv(config=new_config)

File Reference

fusion/modules/rl/environments/
|-- __init__.py              # Public exports
|-- unified_env.py           # UnifiedSimEnv, SimpleRequest, PathEncoder
|-- wrappers.py              # ActionMaskWrapper
`-- tests/
    |-- __init__.py
    `-- test_unified_env.py  # Comprehensive test suite

API Summary

From package (``fusion.modules.rl.environments``):

from fusion.modules.rl.environments import (
    UnifiedSimEnv,       # Main Gymnasium environment
    ActionMaskWrapper,   # SB3 MaskablePPO adapter
    PathEncoder,         # Path to edge mask converter
)

From unified_env module:

from fusion.modules.rl.environments.unified_env import (
    UnifiedSimEnv,
    SimpleRequest,  # Lightweight request for standalone mode
    PathEncoder,
)