Skip to content

Persist execution state to database and recover on startup #1218

Description

@geoffjay

Context

Part of #1216 (track execution state of agents). Once the formal state machine exists (#1217), the execution state must be persisted to the database so it survives orchestrator restarts.

Background

Today, AgentStatus (Pending/Running/Stopped/Failed) is persisted but ActivityState (Idle/Busy) is tracked only in memory via ConnectionRegistry.activity_states. This means:

  • Orchestrator restart loses all activity state
  • Clients connecting after restart see no indication of what agents were doing
  • No way to distinguish between "agent is idle" and "we don't know because we just restarted"

What to Build

1. Database Migration

Add an execution_state column to the agents table:

ALTER TABLE agents ADD COLUMN execution_state TEXT NOT NULL DEFAULT 'pending';

The existing status column is kept for backward compatibility but execution_state becomes the source of truth. A future migration can deprecate the status column.

2. Write-Through on Transitions

Every ExecutionState transition must be persisted immediately. Add a method to the agent manager (or a new ExecutionStateManager):

/// Transition an agent's execution state and persist it atomically.
///
/// Returns the new state on success, or an error if the transition
/// is invalid or persistence fails.
pub async fn transition_state(
    &self,
    agent_id: &Uuid,
    target: ExecutionState,
    trigger: TransitionTrigger,
) -> Result<ExecutionState> {
    let agent = self.storage.get(agent_id).await?;
    let new_state = agent.execution_state.transition_to(target)?;
    
    // Persist to DB
    agent.execution_state = new_state.clone();
    agent.status = new_state.to_agent_status(); // backward compat
    agent.updated_at = Utc::now();
    self.storage.update(&agent).await?;
    
    // Update in-memory activity state for existing consumers
    let activity = new_state.to_activity_state();
    self.registry.set_activity_state(agent_id, activity).await;
    
    // Broadcast state change event
    self.broadcast_state_change(agent_id, &new_state, &trigger).await;
    
    Ok(new_state)
}

3. Wire Into Existing Transition Points

Replace all ad-hoc status mutations in manager.rs with calls to transition_state():

Current Code New Call
agent.status = AgentStatus::Running (line 177) transition_state(id, Starting, Spawn) then transition_state(id, Idle, Connected)
agent.status = AgentStatus::Failed (line 109, 170) transition_state(id, Failed, ProcessCrashed)
agent.status = AgentStatus::Stopped (line 304) transition_state(id, Stopped, UserTerminated)
Reconciliation failures (line 440-524) transition_state(id, Failed, Reconciliation)

Wire into websocket.rs for activity transitions:

Current Code New Call
activity_states.insert(id, Busy) (line 313) transition_state(id, Busy, PromptReceived)
activity_states.insert(id, Idle) (line 700) transition_state(id, Idle, ResultReceived)

4. Startup Recovery

During reconcile_all(), read persisted execution_state from the database:

  • Busy in DB but no WebSocket connection: Agent was mid-task when orchestrator restarted. Mark as needing attention (transition to Failed with Reconciliation trigger, or attempt restart).
  • Idle in DB with live session: Agent is healthy, re-register in ConnectionRegistry.
  • Starting in DB: Spawn was interrupted. Transition to Failed and attempt restart.
  • Pending in DB: Spawn never completed. Transition to Failed.

5. Backward Compatibility

  • Keep status column updated via to_agent_status() on every transition
  • Keep ConnectionRegistry.activity_states updated via to_activity_state()
  • Existing API consumers see no breaking changes

Acceptance Criteria

  • Database migration adding execution_state TEXT column
  • transition_state() method with atomic persist + broadcast
  • All ad-hoc status mutations in manager.rs replaced with transition_state()
  • All activity state mutations in websocket.rs replaced with transition_state()
  • status column kept in sync via to_agent_status()
  • Startup recovery in reconcile_all() reads persisted execution state
  • Agents that were Busy at crash time are handled gracefully on restart
  • Existing tests pass (backward compatibility)
  • New tests for persistence round-trip and startup recovery

Relevant Files

  • crates/orchestrator/src/manager.rs -- replace all status mutations
  • crates/orchestrator/src/websocket.rs -- replace activity_states mutations
  • crates/orchestrator/src/entity/agent.rs -- add execution_state column
  • crates/orchestrator/src/migration/ -- new migration file
  • crates/orchestrator/src/storage.rs -- update queries

Blocked By

Stack Base

Stack on: feature/autonomous-pipeline
Blocked by: #1217

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    architectureCross-service architectural design or reviewcomplexity:largeLarge scope: 200+ lines, multiple filesenhancementNew feature or requestneeds-testsArea needs dedicated test coveragetriagedIssue has been triaged, ready for planning or implementation

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions