Services

This page documents the service layer in fusion/api/services/. Services contain the business logic for the API, separating concerns from the route handlers.


run_manager.py - Simulation Process Lifecycle

Location:

fusion/api/services/run_manager.py

Class:

RunManager

Manages the full lifecycle of simulation runs: creation, process spawning, monitoring, and cleanup.

Class Overview

class RunManager:
    """Manages simulation run lifecycle."""

    def __init__(self, db: Session):
        """Initialize with database session."""

    def create_run(self, data: RunCreate) -> Run:
        """Create a new run and spawn the simulation process."""

    def cancel_or_delete(self, run_id: str) -> Run | None:
        """Cancel running simulation or delete completed run."""

Key Methods

create_run(data: RunCreate) -> Run

Creates a new simulation run:

  1. Generates a unique 12-character hex ID

  2. Writes the configuration to a temporary INI file

  3. Creates the database record with PENDING status

  4. Spawns the simulation subprocess with fusion-sim

  5. Updates the database with process PID and RUNNING status

  6. Starts a background thread to monitor process completion

cancel_or_delete(run_id: str) -> Run | None

Handles run cancellation and deletion:

  • For RUNNING status: Sends SIGTERM to the process group

  • For COMPLETED/FAILED/CANCELLED: Deletes the database record

  • Returns None if run not found

recover_orphaned_runs() -> None

Module-level function called at startup. Marks any RUNNING/PENDING runs as FAILED if their process is no longer alive. This handles cases where the API server crashed or was killed.

Process Management

Simulations run as subprocesses with these characteristics:

  • New process group (start_new_session=True) for clean cancellation

  • Stdout/stderr redirected to logs/{run_id}.log

  • Process tracked by both PID and PGID (process group ID)

  • Cancellation sends SIGTERM to entire process group

# Process spawning
process = subprocess.Popen(
    ["fusion-sim", "--config_path", config_path, "--run_id", run_id],
    stdout=log_file,
    stderr=subprocess.STDOUT,
    start_new_session=True,  # New process group
)

# Cancellation (kills entire process group)
os.killpg(run.pgid, signal.SIGTERM)

progress_watcher.py - Progress Streaming

Location:

fusion/api/services/progress_watcher.py

Function:

stream_progress

Monitors simulation progress files and streams updates via SSE.

How It Works

The simulation writes progress to a JSONL file (progress.jsonl) in the run output directory. Each line is a JSON object with current progress:

{"erlang": 300.0, "iteration": 1, "total_iterations": 2, "timestamp": "..."}
{"erlang": 300.0, "iteration": 2, "total_iterations": 2, "timestamp": "..."}
{"erlang": 400.0, "iteration": 1, "total_iterations": 2, "timestamp": "..."}

The watcher:

  1. Opens the progress file (or waits for it to exist)

  2. Reads existing lines and yields them as SSE events

  3. Polls for new lines with 1-second intervals

  4. Yields a done event when the run completes

async def stream_progress(run_id: str) -> AsyncGenerator[dict, None]:
    """Stream progress events for a run."""
    progress_file = get_progress_file_path(run_id)

    # Wait for file to exist
    while not progress_file.exists():
        if is_run_complete(run_id):
            yield {"event": "done", "data": "{}"}
            return
        await asyncio.sleep(1)

    # Stream progress updates
    with open(progress_file) as f:
        while True:
            line = f.readline()
            if line:
                yield {"event": "progress", "data": line.strip()}
            elif is_run_complete(run_id):
                yield {"event": "done", "data": "{}"}
                return
            else:
                await asyncio.sleep(1)

Usage in Route Handler

from sse_starlette.sse import EventSourceResponse
from .services.progress_watcher import stream_progress

@router.get("/{run_id}/progress")
async def stream_run_progress(run_id: str) -> EventSourceResponse:
    return EventSourceResponse(stream_progress(run_id))

artifact_service.py - Output File Handling

Location:

fusion/api/services/artifact_service.py

Functions:

list_artifacts, get_artifact_path

Manages access to simulation output files.

Functions

list_artifacts(run_id: str) -> list[ArtifactInfo]

Lists all output files for a run:

  1. Determines the output directory from run configuration

  2. Recursively scans for files (excluding hidden files/directories)

  3. Returns file metadata (name, path, size, modified time)

artifacts = list_artifacts("my_run_id")
# Returns:
# [
#     ArtifactInfo(name="erlang_results.json", path="...", size=4096, ...),
#     ArtifactInfo(name="blocking_probability.csv", path="...", size=1024, ...),
# ]

get_artifact_path(run_id: str, relative_path: str) -> Path | None

Resolves and validates an artifact path:

  1. Joins the output directory with the relative path

  2. Validates the path is within the output directory (prevents traversal)

  3. Checks the file exists

  4. Returns the absolute path or None

path = get_artifact_path("my_run", "results/erlang.json")
if path:
    return FileResponse(path)

Security

The get_artifact_path function includes path traversal protection:

def get_artifact_path(run_id: str, relative_path: str) -> Path | None:
    output_dir = get_output_directory(run_id)
    full_path = (output_dir / relative_path).resolve()

    # Prevent path traversal attacks
    if not str(full_path).startswith(str(output_dir.resolve())):
        return None

    if not full_path.exists() or not full_path.is_file():
        return None

    return full_path

Log Streaming

Location:

fusion/api/services/run_manager.py

Function:

stream_run_logs

Streams simulation log output via SSE. Similar to progress streaming but reads from the log file (logs/{run_id}.log).

async def stream_run_logs(
    run_id: str,
    from_start: bool = True
) -> AsyncGenerator[dict, None]:
    """Stream log lines for a run."""
    log_file = get_log_file_path(run_id)

    with open(log_file) as f:
        if not from_start:
            f.seek(0, 2)  # Seek to end

        while True:
            line = f.readline()
            if line:
                yield {"event": "log", "data": json.dumps({"line": line.rstrip()})}
            elif is_run_complete(run_id):
                yield {"event": "done", "data": "{}"}
                return
            else:
                await asyncio.sleep(0.1)  # 100ms poll interval