Run Lifecycle

This document describes how simulation runs are created, executed, monitored, and terminated.

State Machine

See: diagrams/run-state-machine.txt for visual representation

States

State

Description

Transitions To

PENDING

Run created, not yet started

RUNNING, CANCELLED

RUNNING

Simulation subprocess is active

COMPLETED, FAILED, CANCELLED

COMPLETED

Simulation finished successfully

(terminal)

FAILED

Simulation crashed or errored

(terminal)

CANCELLED

User requested cancellation

(terminal)

Lifecycle Flow

1. Run Creation (POST /api/runs)

# Backend receives request
request = RunCreateRequest(
    name="My Simulation",
    config={"general_settings": {...}, ...},
    template="default"  # optional, loads template then applies overrides
)

# Generate run ID (UUID)
run_id = uuid.uuid4().hex[:12]

# Create run directory structure
# See: 03-run-directory-contract.md

# Write frozen config
config_path = f"data/gui_runs/{run_id}/config.ini"
write_config(request.config, config_path)

# Insert database record (PENDING)
db.insert(Run(id=run_id, status="PENDING", ...))

# Return immediately
return {"id": run_id, "status": "PENDING"}

2. Run Start (automatic or manual)

By default, runs start immediately after creation. Future: add autostart=False option.

def start_run(run_id: str) -> None:
    run = db.get(run_id)

    # Create log file
    log_path = f"data/gui_runs/{run_id}/logs/sim.log"
    log_file = open(log_path, "w")

    # Start subprocess in NEW SESSION (critical for clean kill)
    process = subprocess.Popen(
        [
            sys.executable, "-m", "fusion.cli.run_sim",
            "--config", f"data/gui_runs/{run_id}/config.ini",
            "--output-dir", f"data/gui_runs/{run_id}/output",
            "--progress-file", f"data/gui_runs/{run_id}/progress.jsonl",
        ],
        stdout=log_file,
        stderr=subprocess.STDOUT,
        start_new_session=True,  # CRITICAL: Creates new process group
    )

    # Update database
    run.status = "RUNNING"
    run.pid = process.pid
    run.pgid = os.getpgid(process.pid)  # Store process group ID
    run.started_at = datetime.utcnow()
    db.commit()

    # Start background watcher (checks for completion)
    start_watcher(run_id, process)

Why start_new_session=True?

The simulator uses multiprocessing.Pool internally, spawning child processes. If we only kill the parent PID, children become orphans. By creating a new session, all descendants share a process group ID (PGID) that we can kill atomically.

3. Progress Reporting

The simulator writes structured progress to progress.jsonl:

{"type":"start","timestamp":"2024-01-15T10:00:00Z","total_erlangs":10,"total_iterations":100}
{"type":"erlang_start","timestamp":"2024-01-15T10:00:01Z","erlang":50,"erlang_index":0}
{"type":"iteration","timestamp":"2024-01-15T10:00:05Z","erlang":50,"iteration":1,"blocking_prob":0.023}
{"type":"iteration","timestamp":"2024-01-15T10:00:10Z","erlang":50,"iteration":2,"blocking_prob":0.021}
{"type":"erlang_complete","timestamp":"2024-01-15T10:01:00Z","erlang":50,"mean_blocking":0.022}
{"type":"complete","timestamp":"2024-01-15T10:10:00Z","exit_code":0}

Why not parse logs?

  • Logs are for humans, progress is for machines

  • Log format may change, breaking parsers

  • Structured JSON is unambiguous

  • Enables rich progress UI (progress bars, charts)

4. Log Streaming

Logs stream via SSE by tailing sim.log:

@router.get("/runs/{run_id}/logs")
async def stream_logs(run_id: str):
    run = get_run_or_404(run_id)
    log_path = f"data/gui_runs/{run_id}/logs/sim.log"

    async def generate():
        async with aiofiles.open(log_path, mode="r") as f:
            # Send existing content first
            content = await f.read()
            if content:
                yield {"event": "log", "data": content}

            # Then tail for new content
            while True:
                line = await f.readline()
                if line:
                    yield {"event": "log", "data": line}
                else:
                    # Check if run is still active
                    run = db.get(run_id)
                    if run.status not in ("PENDING", "RUNNING"):
                        yield {"event": "end", "data": run.status}
                        break
                    await asyncio.sleep(0.3)

    return EventSourceResponse(generate())

Why not subprocess PIPE?

Using stdout=PIPE with long-running processes causes deadlocks when the pipe buffer fills. Redirecting to a file and tailing is safe regardless of output volume.

5. Run Completion

The background watcher detects completion:

async def watch_run(run_id: str, process: subprocess.Popen):
    while True:
        return_code = process.poll()
        if return_code is not None:
            run = db.get(run_id)
            run.completed_at = datetime.utcnow()

            if return_code == 0:
                run.status = "COMPLETED"
            else:
                run.status = "FAILED"
                run.error_message = f"Exit code: {return_code}"

            db.commit()
            break

        await asyncio.sleep(1.0)

6. Cancellation

def cancel_run(run_id: str) -> bool:
    run = db.get(run_id)

    if run.status not in ("PENDING", "RUNNING"):
        return False  # Cannot cancel completed/failed runs

    if run.status == "PENDING":
        run.status = "CANCELLED"
        db.commit()
        return True

    # RUNNING: Kill entire process group
    try:
        os.killpg(run.pgid, signal.SIGTERM)

        # Wait briefly for graceful shutdown
        time.sleep(2.0)

        # Force kill if still alive
        try:
            os.killpg(run.pgid, signal.SIGKILL)
        except ProcessLookupError:
            pass  # Already dead

        run.status = "CANCELLED"
        run.completed_at = datetime.utcnow()
        db.commit()
        return True

    except ProcessLookupError:
        # Process already gone
        run.status = "CANCELLED"
        db.commit()
        return True

Why os.killpg() instead of process.terminate()?

process.terminate() only kills the direct child. The simulator’s multiprocessing workers would become orphans, continuing to consume resources. os.killpg() kills the entire process tree.

7. Server Restart Recovery

On startup, the server reconciles database state with reality:

def recover_orphaned_runs():
    """Mark stale RUNNING jobs as FAILED."""
    running_runs = db.query(Run).filter(Run.status == "RUNNING").all()

    for run in running_runs:
        if not is_process_alive(run.pgid):
            run.status = "FAILED"
            run.error_message = "Server restarted while run was active"
            run.completed_at = datetime.utcnow()

    db.commit()

def is_process_alive(pgid: int) -> bool:
    try:
        os.killpg(pgid, 0)  # Signal 0 = check existence
        return True
    except (ProcessLookupError, PermissionError):
        return False

Concurrency Limits

MVP: Single Active Run

For simplicity, MVP allows only one RUNNING job at a time:

def can_start_run() -> bool:
    active = db.query(Run).filter(Run.status == "RUNNING").count()
    return active == 0

Future: Configurable Concurrency

MAX_CONCURRENT_RUNS = int(os.getenv("FUSION_GUI_MAX_RUNS", "1"))

def can_start_run() -> bool:
    active = db.query(Run).filter(Run.status == "RUNNING").count()
    return active < MAX_CONCURRENT_RUNS

Database Schema

CREATE TABLE runs (
    id TEXT PRIMARY KEY,
    name TEXT,
    status TEXT NOT NULL CHECK (status IN ('PENDING','RUNNING','COMPLETED','FAILED','CANCELLED')),
    config_json TEXT NOT NULL,

    -- Process tracking
    pid INTEGER,
    pgid INTEGER,

    -- Timestamps
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,

    -- Error info
    error_message TEXT,

    -- Progress cache (updated periodically)
    current_erlang REAL,
    total_erlangs INTEGER,
    current_iteration INTEGER,
    total_iterations INTEGER
);

CREATE INDEX idx_runs_status ON runs(status);
CREATE INDEX idx_runs_created_at ON runs(created_at DESC);