Run Lifecycle
This document describes how simulation runs are created, executed, monitored, and terminated.
State Machine
See: diagrams/run-state-machine.txt for visual representation
States
State |
Description |
Transitions To |
|---|---|---|
|
Run created, not yet started |
|
|
Simulation subprocess is active |
|
|
Simulation finished successfully |
(terminal) |
|
Simulation crashed or errored |
(terminal) |
|
User requested cancellation |
(terminal) |
Lifecycle Flow
1. Run Creation (POST /api/runs)
# Backend receives request
request = RunCreateRequest(
name="My Simulation",
config={"general_settings": {...}, ...},
template="default" # optional, loads template then applies overrides
)
# Generate run ID (UUID)
run_id = uuid.uuid4().hex[:12]
# Create run directory structure
# See: 03-run-directory-contract.md
# Write frozen config
config_path = f"data/gui_runs/{run_id}/config.ini"
write_config(request.config, config_path)
# Insert database record (PENDING)
db.insert(Run(id=run_id, status="PENDING", ...))
# Return immediately
return {"id": run_id, "status": "PENDING"}
2. Run Start (automatic or manual)
By default, runs start immediately after creation. Future: add autostart=False option.
def start_run(run_id: str) -> None:
run = db.get(run_id)
# Create log file
log_path = f"data/gui_runs/{run_id}/logs/sim.log"
log_file = open(log_path, "w")
# Start subprocess in NEW SESSION (critical for clean kill)
process = subprocess.Popen(
[
sys.executable, "-m", "fusion.cli.run_sim",
"--config", f"data/gui_runs/{run_id}/config.ini",
"--output-dir", f"data/gui_runs/{run_id}/output",
"--progress-file", f"data/gui_runs/{run_id}/progress.jsonl",
],
stdout=log_file,
stderr=subprocess.STDOUT,
start_new_session=True, # CRITICAL: Creates new process group
)
# Update database
run.status = "RUNNING"
run.pid = process.pid
run.pgid = os.getpgid(process.pid) # Store process group ID
run.started_at = datetime.utcnow()
db.commit()
# Start background watcher (checks for completion)
start_watcher(run_id, process)
Why start_new_session=True?
The simulator uses multiprocessing.Pool internally, spawning child processes. If we only kill the parent PID, children become orphans. By creating a new session, all descendants share a process group ID (PGID) that we can kill atomically.
3. Progress Reporting
The simulator writes structured progress to progress.jsonl:
{"type":"start","timestamp":"2024-01-15T10:00:00Z","total_erlangs":10,"total_iterations":100}
{"type":"erlang_start","timestamp":"2024-01-15T10:00:01Z","erlang":50,"erlang_index":0}
{"type":"iteration","timestamp":"2024-01-15T10:00:05Z","erlang":50,"iteration":1,"blocking_prob":0.023}
{"type":"iteration","timestamp":"2024-01-15T10:00:10Z","erlang":50,"iteration":2,"blocking_prob":0.021}
{"type":"erlang_complete","timestamp":"2024-01-15T10:01:00Z","erlang":50,"mean_blocking":0.022}
{"type":"complete","timestamp":"2024-01-15T10:10:00Z","exit_code":0}
Why not parse logs?
Logs are for humans, progress is for machines
Log format may change, breaking parsers
Structured JSON is unambiguous
Enables rich progress UI (progress bars, charts)
4. Log Streaming
Logs stream via SSE by tailing sim.log:
@router.get("/runs/{run_id}/logs")
async def stream_logs(run_id: str):
run = get_run_or_404(run_id)
log_path = f"data/gui_runs/{run_id}/logs/sim.log"
async def generate():
async with aiofiles.open(log_path, mode="r") as f:
# Send existing content first
content = await f.read()
if content:
yield {"event": "log", "data": content}
# Then tail for new content
while True:
line = await f.readline()
if line:
yield {"event": "log", "data": line}
else:
# Check if run is still active
run = db.get(run_id)
if run.status not in ("PENDING", "RUNNING"):
yield {"event": "end", "data": run.status}
break
await asyncio.sleep(0.3)
return EventSourceResponse(generate())
Why not subprocess PIPE?
Using stdout=PIPE with long-running processes causes deadlocks when the pipe buffer fills. Redirecting to a file and tailing is safe regardless of output volume.
5. Run Completion
The background watcher detects completion:
async def watch_run(run_id: str, process: subprocess.Popen):
while True:
return_code = process.poll()
if return_code is not None:
run = db.get(run_id)
run.completed_at = datetime.utcnow()
if return_code == 0:
run.status = "COMPLETED"
else:
run.status = "FAILED"
run.error_message = f"Exit code: {return_code}"
db.commit()
break
await asyncio.sleep(1.0)
6. Cancellation
def cancel_run(run_id: str) -> bool:
run = db.get(run_id)
if run.status not in ("PENDING", "RUNNING"):
return False # Cannot cancel completed/failed runs
if run.status == "PENDING":
run.status = "CANCELLED"
db.commit()
return True
# RUNNING: Kill entire process group
try:
os.killpg(run.pgid, signal.SIGTERM)
# Wait briefly for graceful shutdown
time.sleep(2.0)
# Force kill if still alive
try:
os.killpg(run.pgid, signal.SIGKILL)
except ProcessLookupError:
pass # Already dead
run.status = "CANCELLED"
run.completed_at = datetime.utcnow()
db.commit()
return True
except ProcessLookupError:
# Process already gone
run.status = "CANCELLED"
db.commit()
return True
Why os.killpg() instead of process.terminate()?
process.terminate() only kills the direct child. The simulator’s multiprocessing workers would become orphans, continuing to consume resources. os.killpg() kills the entire process tree.
7. Server Restart Recovery
On startup, the server reconciles database state with reality:
def recover_orphaned_runs():
"""Mark stale RUNNING jobs as FAILED."""
running_runs = db.query(Run).filter(Run.status == "RUNNING").all()
for run in running_runs:
if not is_process_alive(run.pgid):
run.status = "FAILED"
run.error_message = "Server restarted while run was active"
run.completed_at = datetime.utcnow()
db.commit()
def is_process_alive(pgid: int) -> bool:
try:
os.killpg(pgid, 0) # Signal 0 = check existence
return True
except (ProcessLookupError, PermissionError):
return False
Concurrency Limits
MVP: Single Active Run
For simplicity, MVP allows only one RUNNING job at a time:
def can_start_run() -> bool:
active = db.query(Run).filter(Run.status == "RUNNING").count()
return active == 0
Future: Configurable Concurrency
MAX_CONCURRENT_RUNS = int(os.getenv("FUSION_GUI_MAX_RUNS", "1"))
def can_start_run() -> bool:
active = db.query(Run).filter(Run.status == "RUNNING").count()
return active < MAX_CONCURRENT_RUNS
Database Schema
CREATE TABLE runs (
id TEXT PRIMARY KEY,
name TEXT,
status TEXT NOT NULL CHECK (status IN ('PENDING','RUNNING','COMPLETED','FAILED','CANCELLED')),
config_json TEXT NOT NULL,
-- Process tracking
pid INTEGER,
pgid INTEGER,
-- Timestamps
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
-- Error info
error_message TEXT,
-- Progress cache (updated periodically)
current_erlang REAL,
total_erlangs INTEGER,
current_iteration INTEGER,
total_iterations INTEGER
);
CREATE INDEX idx_runs_status ON runs(status);
CREATE INDEX idx_runs_created_at ON runs(created_at DESC);