Services
This page documents the service layer in fusion/api/services/. Services contain
the business logic for the API, separating concerns from the route handlers.
run_manager.py - Simulation Process Lifecycle
- Location:
fusion/api/services/run_manager.py- Class:
RunManager
Manages the full lifecycle of simulation runs: creation, process spawning, monitoring, and cleanup.
Class Overview
class RunManager:
"""Manages simulation run lifecycle."""
def __init__(self, db: Session):
"""Initialize with database session."""
def create_run(self, data: RunCreate) -> Run:
"""Create a new run and spawn the simulation process."""
def cancel_or_delete(self, run_id: str) -> Run | None:
"""Cancel running simulation or delete completed run."""
Key Methods
create_run(data: RunCreate) -> Run
Creates a new simulation run:
Generates a unique 12-character hex ID
Writes the configuration to a temporary INI file
Creates the database record with PENDING status
Spawns the simulation subprocess with
fusion-simUpdates the database with process PID and RUNNING status
Starts a background thread to monitor process completion
cancel_or_delete(run_id: str) -> Run | None
Handles run cancellation and deletion:
For RUNNING status: Sends SIGTERM to the process group
For COMPLETED/FAILED/CANCELLED: Deletes the database record
Returns None if run not found
recover_orphaned_runs() -> None
Module-level function called at startup. Marks any RUNNING/PENDING runs as FAILED if their process is no longer alive. This handles cases where the API server crashed or was killed.
Process Management
Simulations run as subprocesses with these characteristics:
New process group (
start_new_session=True) for clean cancellationStdout/stderr redirected to
logs/{run_id}.logProcess tracked by both PID and PGID (process group ID)
Cancellation sends SIGTERM to entire process group
# Process spawning
process = subprocess.Popen(
["fusion-sim", "--config_path", config_path, "--run_id", run_id],
stdout=log_file,
stderr=subprocess.STDOUT,
start_new_session=True, # New process group
)
# Cancellation (kills entire process group)
os.killpg(run.pgid, signal.SIGTERM)
progress_watcher.py - Progress Streaming
- Location:
fusion/api/services/progress_watcher.py- Function:
stream_progress
Monitors simulation progress files and streams updates via SSE.
How It Works
The simulation writes progress to a JSONL file (progress.jsonl) in the run
output directory. Each line is a JSON object with current progress:
{"erlang": 300.0, "iteration": 1, "total_iterations": 2, "timestamp": "..."}
{"erlang": 300.0, "iteration": 2, "total_iterations": 2, "timestamp": "..."}
{"erlang": 400.0, "iteration": 1, "total_iterations": 2, "timestamp": "..."}
The watcher:
Opens the progress file (or waits for it to exist)
Reads existing lines and yields them as SSE events
Polls for new lines with 1-second intervals
Yields a
doneevent when the run completes
async def stream_progress(run_id: str) -> AsyncGenerator[dict, None]:
"""Stream progress events for a run."""
progress_file = get_progress_file_path(run_id)
# Wait for file to exist
while not progress_file.exists():
if is_run_complete(run_id):
yield {"event": "done", "data": "{}"}
return
await asyncio.sleep(1)
# Stream progress updates
with open(progress_file) as f:
while True:
line = f.readline()
if line:
yield {"event": "progress", "data": line.strip()}
elif is_run_complete(run_id):
yield {"event": "done", "data": "{}"}
return
else:
await asyncio.sleep(1)
Usage in Route Handler
from sse_starlette.sse import EventSourceResponse
from .services.progress_watcher import stream_progress
@router.get("/{run_id}/progress")
async def stream_run_progress(run_id: str) -> EventSourceResponse:
return EventSourceResponse(stream_progress(run_id))
artifact_service.py - Output File Handling
- Location:
fusion/api/services/artifact_service.py- Functions:
list_artifacts,get_artifact_path
Manages access to simulation output files.
Functions
list_artifacts(run_id: str) -> list[ArtifactInfo]
Lists all output files for a run:
Determines the output directory from run configuration
Recursively scans for files (excluding hidden files/directories)
Returns file metadata (name, path, size, modified time)
artifacts = list_artifacts("my_run_id")
# Returns:
# [
# ArtifactInfo(name="erlang_results.json", path="...", size=4096, ...),
# ArtifactInfo(name="blocking_probability.csv", path="...", size=1024, ...),
# ]
get_artifact_path(run_id: str, relative_path: str) -> Path | None
Resolves and validates an artifact path:
Joins the output directory with the relative path
Validates the path is within the output directory (prevents traversal)
Checks the file exists
Returns the absolute path or None
path = get_artifact_path("my_run", "results/erlang.json")
if path:
return FileResponse(path)
Security
The get_artifact_path function includes path traversal protection:
def get_artifact_path(run_id: str, relative_path: str) -> Path | None:
output_dir = get_output_directory(run_id)
full_path = (output_dir / relative_path).resolve()
# Prevent path traversal attacks
if not str(full_path).startswith(str(output_dir.resolve())):
return None
if not full_path.exists() or not full_path.is_file():
return None
return full_path
Log Streaming
- Location:
fusion/api/services/run_manager.py- Function:
stream_run_logs
Streams simulation log output via SSE. Similar to progress streaming but reads
from the log file (logs/{run_id}.log).
async def stream_run_logs(
run_id: str,
from_start: bool = True
) -> AsyncGenerator[dict, None]:
"""Stream log lines for a run."""
log_file = get_log_file_path(run_id)
with open(log_file) as f:
if not from_start:
f.seek(0, 2) # Seek to end
while True:
line = f.readline()
if line:
yield {"event": "log", "data": json.dumps({"line": line.rstrip()})}
elif is_run_complete(run_id):
yield {"event": "done", "data": "{}"}
return
else:
await asyncio.sleep(0.1) # 100ms poll interval