Backend Standards
This document defines conventions for the Python/FastAPI backend.
Technology Stack
Layer |
Choice |
Rationale |
|---|---|---|
Framework |
FastAPI |
Async, auto-docs, Pydantic native |
Server |
Uvicorn |
ASGI, production-ready |
Database |
SQLite + SQLAlchemy 2.0 |
Zero config, already in deps |
Validation |
Pydantic v2 |
FastAPI native, excellent DX |
SSE |
sse-starlette |
Mature, async, FastAPI integration |
Async Files |
aiofiles |
Non-blocking file I/O |
Project Structure
fusion/api/
├── __init__.py
├── main.py # FastAPI app, lifespan, static serving
├── config.py # Settings (pydantic-settings)
├── dependencies.py # Dependency injection
│
├── routes/ # API endpoints
│ ├── __init__.py # Router aggregation
│ ├── runs.py
│ ├── configs.py
│ ├── artifacts.py
│ ├── topology.py
│ └── system.py
│
├── schemas/ # Pydantic models (request/response)
│ ├── __init__.py
│ ├── run.py
│ ├── config.py
│ └── common.py
│
├── services/ # Business logic
│ ├── __init__.py
│ ├── run_manager.py # Job lifecycle
│ ├── config_service.py # Config validation
│ ├── artifact_service.py # Safe file access
│ └── progress_watcher.py # Progress monitoring
│
├── db/ # Database layer
│ ├── __init__.py
│ ├── database.py # Engine, session factory
│ └── models.py # SQLAlchemy ORM models
│
└── static/ # Built React (gitignored except .gitkeep)
└── .gitkeep
Application Setup
Main Application
# fusion/api/main.py
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, Request
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from .config import settings
from .db.database import init_db
from .routes import runs, configs, artifacts, topology, system
from .services.run_manager import recover_orphaned_runs
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown events."""
# Startup
init_db()
recover_orphaned_runs()
yield
# Shutdown (cleanup if needed)
app = FastAPI(
title="FUSION GUI API",
version="1.0.0",
lifespan=lifespan,
)
# API routes
app.include_router(runs.router, prefix="/api/runs", tags=["runs"])
app.include_router(configs.router, prefix="/api/configs", tags=["configs"])
app.include_router(artifacts.router, prefix="/api", tags=["artifacts"])
app.include_router(topology.router, prefix="/api/topology", tags=["topology"])
app.include_router(system.router, prefix="/api", tags=["system"])
# Static file serving with SPA fallback
static_dir = Path(__file__).parent / "static"
if static_dir.exists() and (static_dir / "index.html").exists():
# Serve static files for known extensions
@app.get("/{path:path}")
async def serve_spa(request: Request, path: str):
# Check if it's an API route (shouldn't reach here, but safety)
if path.startswith("api/"):
return {"detail": "Not found"}, 404
# Try to serve static file
file_path = static_dir / path
if file_path.exists() and file_path.is_file():
return FileResponse(file_path)
# SPA fallback: serve index.html for all other routes
return FileResponse(static_dir / "index.html")
# Mount static assets (js, css, images)
app.mount("/assets", StaticFiles(directory=static_dir / "assets"), name="assets")
Configuration
# fusion/api/config.py
from pathlib import Path
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""Application settings with environment variable support."""
# Server
host: str = "127.0.0.1"
port: int = 8765
# Database
database_url: str = "sqlite:///data/gui_runs/runs.db"
# Paths
runs_dir: Path = Path("data/gui_runs")
templates_dir: Path = Path("fusion/configs/templates")
# Limits
max_concurrent_runs: int = 1
max_log_size_bytes: int = 10 * 1024 * 1024 # 10MB
class Config:
env_prefix = "FUSION_GUI_"
settings = Settings()
Database Setup
# fusion/api/db/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from ..config import settings
class Base(DeclarativeBase):
pass
engine = create_engine(
settings.database_url,
connect_args={"check_same_thread": False}, # SQLite specific
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def init_db():
"""Create tables if they don't exist."""
settings.runs_dir.mkdir(parents=True, exist_ok=True)
Base.metadata.create_all(bind=engine)
def get_db():
"""Dependency for database sessions."""
db = SessionLocal()
try:
yield db
finally:
db.close()
ORM Models
# fusion/api/db/models.py
from datetime import datetime
from sqlalchemy import String, Integer, Text, DateTime, Float
from sqlalchemy.orm import Mapped, mapped_column
from .database import Base
class Run(Base):
__tablename__ = "runs"
id: Mapped[str] = mapped_column(String(12), primary_key=True)
name: Mapped[str] = mapped_column(String(255), nullable=True)
status: Mapped[str] = mapped_column(String(20), nullable=False, index=True)
config_json: Mapped[str] = mapped_column(Text, nullable=False)
# Process tracking
pid: Mapped[int | None] = mapped_column(Integer, nullable=True)
pgid: Mapped[int | None] = mapped_column(Integer, nullable=True)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, default=datetime.utcnow
)
started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
# Error info
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
# Progress cache
current_erlang: Mapped[float | None] = mapped_column(Float, nullable=True)
total_erlangs: Mapped[int | None] = mapped_column(Integer, nullable=True)
current_iteration: Mapped[int | None] = mapped_column(Integer, nullable=True)
total_iterations: Mapped[int | None] = mapped_column(Integer, nullable=True)
Pydantic Schemas
# fusion/api/schemas/run.py
from datetime import datetime
from pydantic import BaseModel, Field
class RunProgress(BaseModel):
current_erlang: float | None = None
total_erlangs: int | None = None
current_iteration: int | None = None
total_iterations: int | None = None
percent_complete: float | None = None
latest_metrics: dict | None = None
class RunBase(BaseModel):
name: str | None = None
class RunCreate(RunBase):
template: str = "default"
config: dict = Field(default_factory=dict)
class RunResponse(RunBase):
id: str
status: str
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
error_message: str | None = None
progress: RunProgress | None = None
class Config:
from_attributes = True
class RunListResponse(BaseModel):
runs: list[RunResponse]
total: int
limit: int
offset: int
Route Conventions
# fusion/api/routes/runs.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from sse_starlette.sse import EventSourceResponse
from ..db.database import get_db
from ..db.models import Run
from ..schemas.run import RunCreate, RunResponse, RunListResponse
from ..services.run_manager import RunManager
router = APIRouter()
@router.post("", response_model=RunResponse, status_code=status.HTTP_201_CREATED)
def create_run(
data: RunCreate,
db: Session = Depends(get_db),
):
"""Create and start a new simulation run."""
manager = RunManager(db)
run = manager.create_run(data)
return run
@router.get("", response_model=RunListResponse)
def list_runs(
status: str | None = None,
limit: int = 50,
offset: int = 0,
db: Session = Depends(get_db),
):
"""List all runs with optional filtering."""
query = db.query(Run).order_by(Run.created_at.desc())
if status:
statuses = status.split(",")
query = query.filter(Run.status.in_(statuses))
total = query.count()
runs = query.offset(offset).limit(min(limit, 100)).all()
return RunListResponse(runs=runs, total=total, limit=limit, offset=offset)
@router.get("/{run_id}", response_model=RunResponse)
def get_run(run_id: str, db: Session = Depends(get_db)):
"""Get details for a specific run."""
run = db.query(Run).filter(Run.id == run_id).first()
if not run:
raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
return run
@router.delete("/{run_id}", response_model=RunResponse)
def cancel_run(run_id: str, db: Session = Depends(get_db)):
"""Cancel a running job or delete a completed one."""
manager = RunManager(db)
run = manager.cancel_or_delete(run_id)
if not run:
raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
return run
@router.get("/{run_id}/logs")
async def stream_logs(run_id: str, from_start: bool = True, db: Session = Depends(get_db)):
"""Stream logs via Server-Sent Events."""
run = db.query(Run).filter(Run.id == run_id).first()
if not run:
raise HTTPException(status_code=404, detail=f"Run not found: {run_id}")
from ..services.run_manager import stream_run_logs
return EventSourceResponse(stream_run_logs(run_id, from_start))
Service Layer
# fusion/api/services/run_manager.py
import asyncio
import json
import os
import signal
import subprocess
import sys
import uuid
from datetime import datetime
from pathlib import Path
from typing import AsyncGenerator
import aiofiles
from sqlalchemy.orm import Session
from ..config import settings
from ..db.models import Run
from ..schemas.run import RunCreate
class RunManager:
def __init__(self, db: Session):
self.db = db
def create_run(self, data: RunCreate) -> Run:
"""Create a new run and start the simulation."""
# Check concurrency limit
active = self.db.query(Run).filter(Run.status == "RUNNING").count()
if active >= settings.max_concurrent_runs:
raise ValueError("Maximum concurrent runs reached")
# Generate ID and paths
run_id = uuid.uuid4().hex[:12]
run_dir = settings.runs_dir / run_id
run_dir.mkdir(parents=True)
(run_dir / "logs").mkdir()
(run_dir / "output").mkdir()
# Write config
config_path = run_dir / "config.ini"
self._write_config(data, config_path)
# Create database record
run = Run(
id=run_id,
name=data.name or f"Run {run_id[:6]}",
status="PENDING",
config_json=json.dumps(data.config),
)
self.db.add(run)
self.db.commit()
# Start simulation
self._start_process(run, config_path)
return run
def _start_process(self, run: Run, config_path: Path):
"""Start the simulation subprocess."""
run_dir = settings.runs_dir / run.id
log_path = run_dir / "logs" / "sim.log"
progress_path = run_dir / "progress.jsonl"
with open(log_path, "w") as log_file:
process = subprocess.Popen(
[
sys.executable, "-m", "fusion.cli.run_sim",
"--config", str(config_path),
"--output-dir", str(run_dir / "output"),
"--progress-file", str(progress_path),
],
stdout=log_file,
stderr=subprocess.STDOUT,
start_new_session=True, # New process group
)
run.status = "RUNNING"
run.pid = process.pid
run.pgid = os.getpgid(process.pid)
run.started_at = datetime.utcnow()
self.db.commit()
# Start background watcher (in production, use proper task queue)
# For MVP, this is handled by periodic polling
def cancel_or_delete(self, run_id: str) -> Run | None:
"""Cancel running job or delete completed job."""
run = self.db.query(Run).filter(Run.id == run_id).first()
if not run:
return None
if run.status == "RUNNING":
self._kill_process(run)
if run.status in ("COMPLETED", "FAILED", "CANCELLED"):
# Delete artifacts
run_dir = settings.runs_dir / run_id
if run_dir.exists():
import shutil
shutil.rmtree(run_dir)
self.db.delete(run)
self.db.commit()
return run
def _kill_process(self, run: Run):
"""Kill the entire process group."""
if not run.pgid:
return
try:
os.killpg(run.pgid, signal.SIGTERM)
# Give graceful shutdown time
import time
time.sleep(2)
try:
os.killpg(run.pgid, signal.SIGKILL)
except ProcessLookupError:
pass
except ProcessLookupError:
pass
run.status = "CANCELLED"
run.completed_at = datetime.utcnow()
def _write_config(self, data: RunCreate, path: Path):
"""Write configuration to INI file."""
# Load template
template_path = settings.templates_dir / f"{data.template}.ini"
if not template_path.exists():
template_path = settings.templates_dir / "default.ini"
# TODO: Merge template with overrides
import shutil
shutil.copy(template_path, path)
def recover_orphaned_runs():
"""Mark stale RUNNING jobs as FAILED on startup."""
from ..db.database import SessionLocal
db = SessionLocal()
try:
running = db.query(Run).filter(Run.status == "RUNNING").all()
for run in running:
if not _is_process_alive(run.pgid):
run.status = "FAILED"
run.error_message = "Server restarted while run was active"
run.completed_at = datetime.utcnow()
db.commit()
finally:
db.close()
def _is_process_alive(pgid: int | None) -> bool:
if not pgid:
return False
try:
os.killpg(pgid, 0)
return True
except (ProcessLookupError, PermissionError):
return False
async def stream_run_logs(run_id: str, from_start: bool) -> AsyncGenerator[dict, None]:
"""Stream log file content via SSE."""
log_path = settings.runs_dir / run_id / "logs" / "sim.log"
if not log_path.exists():
yield {"event": "error", "data": "Log file not found"}
return
async with aiofiles.open(log_path, mode="r") as f:
if from_start:
content = await f.read()
if content:
yield {"event": "log", "data": content}
else:
await f.seek(0, 2) # End of file
while True:
line = await f.readline()
if line:
yield {"event": "log", "data": line.rstrip()}
else:
# Check if run is still active
from ..db.database import SessionLocal
db = SessionLocal()
run = db.query(Run).filter(Run.id == run_id).first()
db.close()
if run and run.status not in ("PENDING", "RUNNING"):
yield {"event": "end", "data": run.status}
break
await asyncio.sleep(0.3)
Process Tree Termination
The simulator uses multiprocessing.Pool internally, creating child processes. We must kill the entire process tree, not just the parent.
POSIX (Linux/macOS)
import os
import signal
import subprocess
def start_simulation(cmd: list[str], log_file) -> subprocess.Popen:
"""Start simulation in a new process session."""
return subprocess.Popen(
cmd,
stdout=log_file,
stderr=subprocess.STDOUT,
start_new_session=True, # Creates new process group (PGID = PID)
)
def kill_process_tree_posix(pgid: int) -> None:
"""Kill entire process group on POSIX systems."""
try:
os.killpg(pgid, signal.SIGTERM)
time.sleep(2) # Grace period
os.killpg(pgid, signal.SIGKILL) # Force kill stragglers
except ProcessLookupError:
pass # Already dead
Windows
Windows does not have process groups like POSIX. Options:
Option A: Job Objects (recommended)
import subprocess
import ctypes
from ctypes import wintypes
# Create a job object that auto-kills children when handle closes
def start_simulation_windows(cmd: list[str], log_file) -> subprocess.Popen:
"""Start simulation with Windows Job Object for tree termination."""
# CREATE_NEW_PROCESS_GROUP allows Ctrl+Break signal
process = subprocess.Popen(
cmd,
stdout=log_file,
stderr=subprocess.STDOUT,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
)
# Assign to job object (requires win32job or ctypes)
# Job object configured with JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
# See: https://docs.microsoft.com/en-us/windows/win32/procthread/job-objects
return process
Option B: taskkill fallback (simpler, less reliable)
import subprocess
def kill_process_tree_windows(pid: int) -> None:
"""Kill process tree using taskkill."""
subprocess.run(
["taskkill", "/T", "/F", "/PID", str(pid)],
capture_output=True,
)
Option C: psutil (cross-platform, adds dependency)
import psutil
def kill_process_tree_psutil(pid: int) -> None:
"""Kill process tree using psutil (cross-platform)."""
try:
parent = psutil.Process(pid)
children = parent.children(recursive=True)
for child in children:
child.terminate()
parent.terminate()
gone, alive = psutil.wait_procs(children + [parent], timeout=3)
for p in alive:
p.kill()
except psutil.NoSuchProcess:
pass
Platform Detection
import platform
import os
def kill_simulation(run: Run) -> None:
"""Kill simulation process tree (cross-platform)."""
if platform.system() == "Windows":
kill_process_tree_windows(run.pid)
else:
kill_process_tree_posix(run.pgid)
Artifact Security
# fusion/api/services/artifact_service.py
from pathlib import Path
from fastapi import HTTPException
from ..config import settings
def get_safe_path(run_id: str, relative_path: str) -> Path:
"""
Validate and return safe artifact path.
Security checks:
1. Normalize path to prevent traversal (../)
2. Resolve symlinks via realpath()
3. Verify resolved path is within run directory
4. Reject symlinks that escape the run directory
Raises:
403: Path traversal or symlink escape attempt
404: File does not exist
"""
base = (settings.runs_dir / run_id).resolve()
# Join and resolve (follows symlinks)
requested = (base / relative_path).resolve()
# Security: ensure resolved path is within run directory
if not requested.is_relative_to(base):
raise HTTPException(
status_code=403,
detail="Access denied: path escapes run directory"
)
if not requested.exists():
raise HTTPException(status_code=404, detail="File not found")
return requested
def list_directory(run_id: str, relative_path: str = "") -> list[dict]:
"""List directory contents safely."""
dir_path = get_safe_path(run_id, relative_path) if relative_path else (
settings.runs_dir / run_id
).resolve()
if not dir_path.is_dir():
raise HTTPException(status_code=400, detail="Not a directory")
entries = []
for item in sorted(dir_path.iterdir()):
stat = item.stat()
entries.append({
"name": item.name,
"type": "directory" if item.is_dir() else "file",
"size_bytes": stat.st_size if item.is_file() else None,
"modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(),
})
return entries
Artifact Security Tests
# fusion/api/tests/test_artifact_security.py
import os
import pytest
from pathlib import Path
from fastapi.testclient import TestClient
from fusion.api.main import app
@pytest.fixture
def run_with_symlink(tmp_path):
"""Create a run directory with a malicious symlink."""
run_dir = tmp_path / "test_run"
run_dir.mkdir()
# Create a symlink pointing outside the run directory
secret_file = tmp_path / "secret.txt"
secret_file.write_text("sensitive data")
symlink = run_dir / "escape.txt"
symlink.symlink_to(secret_file)
return run_dir
def test_symlink_escape_blocked(client, run_with_symlink):
"""Symlinks pointing outside run directory should be rejected."""
response = client.get(f"/api/runs/test_run/artifacts/escape.txt")
assert response.status_code == 403
def test_path_traversal_blocked(client):
"""Path traversal attempts should be rejected."""
response = client.get("/api/runs/test_run/artifacts/../../../etc/passwd")
assert response.status_code == 403
def test_double_encoded_traversal_blocked(client):
"""Double-encoded traversal should be rejected."""
response = client.get("/api/runs/test_run/artifacts/..%252f..%252fetc/passwd")
assert response.status_code in (403, 404)
Linting and Testing
Ruff Configuration
The project already uses ruff. Add API-specific paths to existing config:
# pyproject.toml (additions)
[tool.ruff]
extend-include = ["fusion/api/**/*.py"]
Pytest Tests
# fusion/api/tests/test_runs.py
import pytest
from fastapi.testclient import TestClient
from fusion.api.main import app
from fusion.api.db.database import get_db, Base, engine
@pytest.fixture
def client():
"""Test client with clean database."""
Base.metadata.create_all(bind=engine)
yield TestClient(app)
Base.metadata.drop_all(bind=engine)
def test_create_run(client):
response = client.post("/api/runs", json={
"name": "Test Run",
"config": {"general_settings": {"network": "nsfnet"}}
})
assert response.status_code == 201
data = response.json()
assert data["name"] == "Test Run"
assert data["status"] == "PENDING"
def test_list_runs(client):
# Create a run first
client.post("/api/runs", json={"name": "Test"})
response = client.get("/api/runs")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
def test_artifact_path_traversal(client):
# Create a run
create_resp = client.post("/api/runs", json={"name": "Test"})
run_id = create_resp.json()["id"]
# Attempt path traversal
response = client.get(f"/api/runs/{run_id}/artifacts/../../../etc/passwd")
assert response.status_code == 403
Type Hints
All functions must have type hints:
# Good
def get_run(run_id: str, db: Session) -> Run | None:
return db.query(Run).filter(Run.id == run_id).first()
# Bad
def get_run(run_id, db):
return db.query(Run).filter(Run.id == run_id).first()
Logging
Use the existing FUSION logging setup:
from fusion.utils.logging_config import get_logger
logger = get_logger(__name__)
def create_run(data: RunCreate) -> Run:
logger.info("Creating run: %s", data.name)
# ...
logger.debug("Run created with ID: %s", run.id)