Backend Standards

This document defines conventions for the Python/FastAPI backend.

Technology Stack

Layer

Choice

Rationale

Framework

FastAPI

Async, auto-docs, Pydantic native

Server

Uvicorn

ASGI, production-ready

Database

SQLite + SQLAlchemy 2.0

Zero config, already in deps

Validation

Pydantic v2

FastAPI native, excellent DX

SSE

sse-starlette

Mature, async, FastAPI integration

Async Files

aiofiles

Non-blocking file I/O

Project Structure

fusion/api/
├── __init__.py
├── main.py                     # FastAPI app, lifespan, static serving
├── config.py                   # Settings (pydantic-settings)
├── dependencies.py             # Dependency injection
│
├── routes/                     # API endpoints
│   ├── __init__.py             # Router aggregation
│   ├── runs.py
│   ├── configs.py
│   ├── artifacts.py
│   ├── topology.py
│   └── system.py
│
├── schemas/                    # Pydantic models (request/response)
│   ├── __init__.py
│   ├── run.py
│   ├── config.py
│   └── common.py
│
├── services/                   # Business logic
│   ├── __init__.py
│   ├── run_manager.py          # Job lifecycle
│   ├── config_service.py       # Config validation
│   ├── artifact_service.py     # Safe file access
│   └── progress_watcher.py     # Progress monitoring
│
├── db/                         # Database layer
│   ├── __init__.py
│   ├── database.py             # Engine, session factory
│   └── models.py               # SQLAlchemy ORM models
│
└── static/                     # Built React (gitignored except .gitkeep)
    └── .gitkeep

Application Setup

Main Application

# fusion/api/main.py
from contextlib import asynccontextmanager
from pathlib import Path

from fastapi import FastAPI, Request
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles

from .config import settings
from .db.database import init_db
from .routes import runs, configs, artifacts, topology, system
from .services.run_manager import recover_orphaned_runs


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Startup and shutdown events."""
    # Startup
    init_db()
    recover_orphaned_runs()
    yield
    # Shutdown (cleanup if needed)


app = FastAPI(
    title="FUSION GUI API",
    version="1.0.0",
    lifespan=lifespan,
)

# API routes
app.include_router(runs.router, prefix="/api/runs", tags=["runs"])
app.include_router(configs.router, prefix="/api/configs", tags=["configs"])
app.include_router(artifacts.router, prefix="/api", tags=["artifacts"])
app.include_router(topology.router, prefix="/api/topology", tags=["topology"])
app.include_router(system.router, prefix="/api", tags=["system"])


# Static file serving with SPA fallback
static_dir = Path(__file__).parent / "static"

if static_dir.exists() and (static_dir / "index.html").exists():
    # Serve static files for known extensions
    @app.get("/{path:path}")
    async def serve_spa(request: Request, path: str):
        # Check if it's an API route (shouldn't reach here, but safety)
        if path.startswith("api/"):
            return {"detail": "Not found"}, 404

        # Try to serve static file
        file_path = static_dir / path
        if file_path.exists() and file_path.is_file():
            return FileResponse(file_path)

        # SPA fallback: serve index.html for all other routes
        return FileResponse(static_dir / "index.html")

    # Mount static assets (js, css, images)
    app.mount("/assets", StaticFiles(directory=static_dir / "assets"), name="assets")

Configuration

# fusion/api/config.py
from pathlib import Path
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    """Application settings with environment variable support."""

    # Server
    host: str = "127.0.0.1"
    port: int = 8765

    # Database
    database_url: str = "sqlite:///data/gui_runs/runs.db"

    # Paths
    runs_dir: Path = Path("data/gui_runs")
    templates_dir: Path = Path("fusion/configs/templates")

    # Limits
    max_concurrent_runs: int = 1
    max_log_size_bytes: int = 10 * 1024 * 1024  # 10MB

    class Config:
        env_prefix = "FUSION_GUI_"


settings = Settings()

Database Setup

# fusion/api/db/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase

from ..config import settings


class Base(DeclarativeBase):
    pass


engine = create_engine(
    settings.database_url,
    connect_args={"check_same_thread": False},  # SQLite specific
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


def init_db():
    """Create tables if they don't exist."""
    settings.runs_dir.mkdir(parents=True, exist_ok=True)
    Base.metadata.create_all(bind=engine)


def get_db():
    """Dependency for database sessions."""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

ORM Models

# fusion/api/db/models.py
from datetime import datetime
from sqlalchemy import String, Integer, Text, DateTime, Float
from sqlalchemy.orm import Mapped, mapped_column

from .database import Base


class Run(Base):
    __tablename__ = "runs"

    id: Mapped[str] = mapped_column(String(12), primary_key=True)
    name: Mapped[str] = mapped_column(String(255), nullable=True)
    status: Mapped[str] = mapped_column(String(20), nullable=False, index=True)
    config_json: Mapped[str] = mapped_column(Text, nullable=False)

    # Process tracking
    pid: Mapped[int | None] = mapped_column(Integer, nullable=True)
    pgid: Mapped[int | None] = mapped_column(Integer, nullable=True)

    # Timestamps
    created_at: Mapped[datetime] = mapped_column(
        DateTime, nullable=False, default=datetime.utcnow
    )
    started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
    completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)

    # Error info
    error_message: Mapped[str | None] = mapped_column(Text, nullable=True)

    # Progress cache
    current_erlang: Mapped[float | None] = mapped_column(Float, nullable=True)
    total_erlangs: Mapped[int | None] = mapped_column(Integer, nullable=True)
    current_iteration: Mapped[int | None] = mapped_column(Integer, nullable=True)
    total_iterations: Mapped[int | None] = mapped_column(Integer, nullable=True)

Pydantic Schemas

# fusion/api/schemas/run.py
from datetime import datetime
from pydantic import BaseModel, Field


class RunProgress(BaseModel):
    current_erlang: float | None = None
    total_erlangs: int | None = None
    current_iteration: int | None = None
    total_iterations: int | None = None
    percent_complete: float | None = None
    latest_metrics: dict | None = None


class RunBase(BaseModel):
    name: str | None = None


class RunCreate(RunBase):
    template: str = "default"
    config: dict = Field(default_factory=dict)


class RunResponse(RunBase):
    id: str
    status: str
    created_at: datetime
    started_at: datetime | None = None
    completed_at: datetime | None = None
    error_message: str | None = None
    progress: RunProgress | None = None

    class Config:
        from_attributes = True


class RunListResponse(BaseModel):
    runs: list[RunResponse]
    total: int
    limit: int
    offset: int

Route Conventions

# fusion/api/routes/runs.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from sse_starlette.sse import EventSourceResponse

from ..db.database import get_db
from ..db.models import Run
from ..schemas.run import RunCreate, RunResponse, RunListResponse
from ..services.run_manager import RunManager

router = APIRouter()


@router.post("", response_model=RunResponse, status_code=status.HTTP_201_CREATED)
def create_run(
    data: RunCreate,
    db: Session = Depends(get_db),
):
    """Create and start a new simulation run."""
    manager = RunManager(db)
    run = manager.create_run(data)
    return run


@router.get("", response_model=RunListResponse)
def list_runs(
    status: str | None = None,
    limit: int = 50,
    offset: int = 0,
    db: Session = Depends(get_db),
):
    """List all runs with optional filtering."""
    query = db.query(Run).order_by(Run.created_at.desc())

    if status:
        statuses = status.split(",")
        query = query.filter(Run.status.in_(statuses))

    total = query.count()
    runs = query.offset(offset).limit(min(limit, 100)).all()

    return RunListResponse(runs=runs, total=total, limit=limit, offset=offset)


@router.get("/{run_id}", response_model=RunResponse)
def get_run(run_id: str, db: Session = Depends(get_db)):
    """Get details for a specific run."""
    run = db.query(Run).filter(Run.id == run_id).first()
    if not run:
        raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
    return run


@router.delete("/{run_id}", response_model=RunResponse)
def cancel_run(run_id: str, db: Session = Depends(get_db)):
    """Cancel a running job or delete a completed one."""
    manager = RunManager(db)
    run = manager.cancel_or_delete(run_id)
    if not run:
        raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
    return run


@router.get("/{run_id}/logs")
async def stream_logs(run_id: str, from_start: bool = True, db: Session = Depends(get_db)):
    """Stream logs via Server-Sent Events."""
    run = db.query(Run).filter(Run.id == run_id).first()
    if not run:
        raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")

    from ..services.run_manager import stream_run_logs
    return EventSourceResponse(stream_run_logs(run_id, from_start))

Service Layer

# fusion/api/services/run_manager.py
import asyncio
import json
import os
import signal
import subprocess
import sys
import uuid
from datetime import datetime
from pathlib import Path
from typing import AsyncGenerator

import aiofiles
from sqlalchemy.orm import Session

from ..config import settings
from ..db.models import Run
from ..schemas.run import RunCreate


class RunManager:
    def __init__(self, db: Session):
        self.db = db

    def create_run(self, data: RunCreate) -> Run:
        """Create a new run and start the simulation."""
        # Check concurrency limit
        active = self.db.query(Run).filter(Run.status == "RUNNING").count()
        if active >= settings.max_concurrent_runs:
            raise ValueError("Maximum concurrent runs reached")

        # Generate ID and paths
        run_id = uuid.uuid4().hex[:12]
        run_dir = settings.runs_dir / run_id
        run_dir.mkdir(parents=True)
        (run_dir / "logs").mkdir()
        (run_dir / "output").mkdir()

        # Write config
        config_path = run_dir / "config.ini"
        self._write_config(data, config_path)

        # Create database record
        run = Run(
            id=run_id,
            name=data.name or f"Run {run_id[:6]}",
            status="PENDING",
            config_json=json.dumps(data.config),
        )
        self.db.add(run)
        self.db.commit()

        # Start simulation
        self._start_process(run, config_path)
        return run

    def _start_process(self, run: Run, config_path: Path):
        """Start the simulation subprocess."""
        run_dir = settings.runs_dir / run.id
        log_path = run_dir / "logs" / "sim.log"
        progress_path = run_dir / "progress.jsonl"

        with open(log_path, "w") as log_file:
            process = subprocess.Popen(
                [
                    sys.executable, "-m", "fusion.cli.run_sim",
                    "--config", str(config_path),
                    "--output-dir", str(run_dir / "output"),
                    "--progress-file", str(progress_path),
                ],
                stdout=log_file,
                stderr=subprocess.STDOUT,
                start_new_session=True,  # New process group
            )

        run.status = "RUNNING"
        run.pid = process.pid
        run.pgid = os.getpgid(process.pid)
        run.started_at = datetime.utcnow()
        self.db.commit()

        # Start background watcher (in production, use proper task queue)
        # For MVP, this is handled by periodic polling

    def cancel_or_delete(self, run_id: str) -> Run | None:
        """Cancel running job or delete completed job."""
        run = self.db.query(Run).filter(Run.id == run_id).first()
        if not run:
            return None

        if run.status == "RUNNING":
            self._kill_process(run)

        if run.status in ("COMPLETED", "FAILED", "CANCELLED"):
            # Delete artifacts
            run_dir = settings.runs_dir / run_id
            if run_dir.exists():
                import shutil
                shutil.rmtree(run_dir)

        self.db.delete(run)
        self.db.commit()
        return run

    def _kill_process(self, run: Run):
        """Kill the entire process group."""
        if not run.pgid:
            return

        try:
            os.killpg(run.pgid, signal.SIGTERM)
            # Give graceful shutdown time
            import time
            time.sleep(2)
            try:
                os.killpg(run.pgid, signal.SIGKILL)
            except ProcessLookupError:
                pass
        except ProcessLookupError:
            pass

        run.status = "CANCELLED"
        run.completed_at = datetime.utcnow()

    def _write_config(self, data: RunCreate, path: Path):
        """Write configuration to INI file."""
        # Load template
        template_path = settings.templates_dir / f"{data.template}.ini"
        if not template_path.exists():
            template_path = settings.templates_dir / "default.ini"

        # TODO: Merge template with overrides
        import shutil
        shutil.copy(template_path, path)


def recover_orphaned_runs():
    """Mark stale RUNNING jobs as FAILED on startup."""
    from ..db.database import SessionLocal

    db = SessionLocal()
    try:
        running = db.query(Run).filter(Run.status == "RUNNING").all()
        for run in running:
            if not _is_process_alive(run.pgid):
                run.status = "FAILED"
                run.error_message = "Server restarted while run was active"
                run.completed_at = datetime.utcnow()
        db.commit()
    finally:
        db.close()


def _is_process_alive(pgid: int | None) -> bool:
    if not pgid:
        return False
    try:
        os.killpg(pgid, 0)
        return True
    except (ProcessLookupError, PermissionError):
        return False


async def stream_run_logs(run_id: str, from_start: bool) -> AsyncGenerator[dict, None]:
    """Stream log file content via SSE."""
    log_path = settings.runs_dir / run_id / "logs" / "sim.log"

    if not log_path.exists():
        yield {"event": "error", "data": "Log file not found"}
        return

    async with aiofiles.open(log_path, mode="r") as f:
        if from_start:
            content = await f.read()
            if content:
                yield {"event": "log", "data": content}
        else:
            await f.seek(0, 2)  # End of file

        while True:
            line = await f.readline()
            if line:
                yield {"event": "log", "data": line.rstrip()}
            else:
                # Check if run is still active
                from ..db.database import SessionLocal
                db = SessionLocal()
                run = db.query(Run).filter(Run.id == run_id).first()
                db.close()

                if run and run.status not in ("PENDING", "RUNNING"):
                    yield {"event": "end", "data": run.status}
                    break

                await asyncio.sleep(0.3)

Process Tree Termination

The simulator uses multiprocessing.Pool internally, creating child processes. We must kill the entire process tree, not just the parent.

POSIX (Linux/macOS)

import os
import signal
import subprocess

def start_simulation(cmd: list[str], log_file) -> subprocess.Popen:
    """Start simulation in a new process session."""
    return subprocess.Popen(
        cmd,
        stdout=log_file,
        stderr=subprocess.STDOUT,
        start_new_session=True,  # Creates new process group (PGID = PID)
    )

def kill_process_tree_posix(pgid: int) -> None:
    """Kill entire process group on POSIX systems."""
    try:
        os.killpg(pgid, signal.SIGTERM)
        time.sleep(2)  # Grace period
        os.killpg(pgid, signal.SIGKILL)  # Force kill stragglers
    except ProcessLookupError:
        pass  # Already dead

Windows

Windows does not have process groups like POSIX. Options:

Option A: Job Objects (recommended)

import subprocess
import ctypes
from ctypes import wintypes

# Create a job object that auto-kills children when handle closes
def start_simulation_windows(cmd: list[str], log_file) -> subprocess.Popen:
    """Start simulation with Windows Job Object for tree termination."""
    # CREATE_NEW_PROCESS_GROUP allows Ctrl+Break signal
    process = subprocess.Popen(
        cmd,
        stdout=log_file,
        stderr=subprocess.STDOUT,
        creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
    )

    # Assign to job object (requires win32job or ctypes)
    # Job object configured with JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
    # See: https://docs.microsoft.com/en-us/windows/win32/procthread/job-objects

    return process

Option B: taskkill fallback (simpler, less reliable)

import subprocess

def kill_process_tree_windows(pid: int) -> None:
    """Kill process tree using taskkill."""
    subprocess.run(
        ["taskkill", "/T", "/F", "/PID", str(pid)],
        capture_output=True,
    )

Option C: psutil (cross-platform, adds dependency)

import psutil

def kill_process_tree_psutil(pid: int) -> None:
    """Kill process tree using psutil (cross-platform)."""
    try:
        parent = psutil.Process(pid)
        children = parent.children(recursive=True)
        for child in children:
            child.terminate()
        parent.terminate()
        gone, alive = psutil.wait_procs(children + [parent], timeout=3)
        for p in alive:
            p.kill()
    except psutil.NoSuchProcess:
        pass

Platform Detection

import platform
import os

def kill_simulation(run: Run) -> None:
    """Kill simulation process tree (cross-platform)."""
    if platform.system() == "Windows":
        kill_process_tree_windows(run.pid)
    else:
        kill_process_tree_posix(run.pgid)

Artifact Security

# fusion/api/services/artifact_service.py
from pathlib import Path

from fastapi import HTTPException

from ..config import settings


def get_safe_path(run_id: str, relative_path: str) -> Path:
    """
    Validate and return safe artifact path.

    Security checks:
    1. Normalize path to prevent traversal (../)
    2. Resolve symlinks via realpath()
    3. Verify resolved path is within run directory
    4. Reject symlinks that escape the run directory

    Raises:
        403: Path traversal or symlink escape attempt
        404: File does not exist
    """
    base = (settings.runs_dir / run_id).resolve()

    # Join and resolve (follows symlinks)
    requested = (base / relative_path).resolve()

    # Security: ensure resolved path is within run directory
    if not requested.is_relative_to(base):
        raise HTTPException(
            status_code=403,
            detail="Access denied: path escapes run directory"
        )

    if not requested.exists():
        raise HTTPException(status_code=404, detail="File not found")

    return requested


def list_directory(run_id: str, relative_path: str = "") -> list[dict]:
    """List directory contents safely."""
    dir_path = get_safe_path(run_id, relative_path) if relative_path else (
        settings.runs_dir / run_id
    ).resolve()

    if not dir_path.is_dir():
        raise HTTPException(status_code=400, detail="Not a directory")

    entries = []
    for item in sorted(dir_path.iterdir()):
        stat = item.stat()
        entries.append({
            "name": item.name,
            "type": "directory" if item.is_dir() else "file",
            "size_bytes": stat.st_size if item.is_file() else None,
            "modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(),
        })

    return entries

Artifact Security Tests

# fusion/api/tests/test_artifact_security.py
import os
import pytest
from pathlib import Path
from fastapi.testclient import TestClient

from fusion.api.main import app

@pytest.fixture
def run_with_symlink(tmp_path):
    """Create a run directory with a malicious symlink."""
    run_dir = tmp_path / "test_run"
    run_dir.mkdir()

    # Create a symlink pointing outside the run directory
    secret_file = tmp_path / "secret.txt"
    secret_file.write_text("sensitive data")

    symlink = run_dir / "escape.txt"
    symlink.symlink_to(secret_file)

    return run_dir

def test_symlink_escape_blocked(client, run_with_symlink):
    """Symlinks pointing outside run directory should be rejected."""
    response = client.get(f"/api/runs/test_run/artifacts/escape.txt")
    assert response.status_code == 403

def test_path_traversal_blocked(client):
    """Path traversal attempts should be rejected."""
    response = client.get("/api/runs/test_run/artifacts/../../../etc/passwd")
    assert response.status_code == 403

def test_double_encoded_traversal_blocked(client):
    """Double-encoded traversal should be rejected."""
    response = client.get("/api/runs/test_run/artifacts/..%252f..%252fetc/passwd")
    assert response.status_code in (403, 404)

Linting and Testing

Ruff Configuration

The project already uses ruff. Add API-specific paths to existing config:

# pyproject.toml (additions)
[tool.ruff]
extend-include = ["fusion/api/**/*.py"]

Pytest Tests

# fusion/api/tests/test_runs.py
import pytest
from fastapi.testclient import TestClient

from fusion.api.main import app
from fusion.api.db.database import get_db, Base, engine


@pytest.fixture
def client():
    """Test client with clean database."""
    Base.metadata.create_all(bind=engine)
    yield TestClient(app)
    Base.metadata.drop_all(bind=engine)


def test_create_run(client):
    response = client.post("/api/runs", json={
        "name": "Test Run",
        "config": {"general_settings": {"network": "nsfnet"}}
    })

    assert response.status_code == 201
    data = response.json()
    assert data["name"] == "Test Run"
    assert data["status"] == "PENDING"


def test_list_runs(client):
    # Create a run first
    client.post("/api/runs", json={"name": "Test"})

    response = client.get("/api/runs")
    assert response.status_code == 200
    data = response.json()
    assert data["total"] == 1


def test_artifact_path_traversal(client):
    # Create a run
    create_resp = client.post("/api/runs", json={"name": "Test"})
    run_id = create_resp.json()["id"]

    # Attempt path traversal
    response = client.get(f"/api/runs/{run_id}/artifacts/../../../etc/passwd")
    assert response.status_code == 403

Type Hints

All functions must have type hints:

# Good
def get_run(run_id: str, db: Session) -> Run | None:
    return db.query(Run).filter(Run.id == run_id).first()

# Bad
def get_run(run_id, db):
    return db.query(Run).filter(Run.id == run_id).first()

Logging

Use the existing FUSION logging setup:

from fusion.utils.logging_config import get_logger

logger = get_logger(__name__)

def create_run(data: RunCreate) -> Run:
    logger.info("Creating run: %s", data.name)
    # ...
    logger.debug("Run created with ID: %s", run.id)