feat: persist and rehydrate scheduling state across root restarts#64
Conversation
|
@kaiitunnz The purpose of this PR looks sound, but I think the restart contract needs a bit more clarification before we rely on it operationally. It would be helpful to include an e2e pipeline that exercises the intended restart order and demonstrates that queued, dispatched, and in-flight tasks preserve state correctly. A few design questions I’d like to clarify:
|
|
@timzsu This is an initial work on decoupling cluster states from the root server so that they can outlive root restarts, so this PR will cover only basic features. New features or advanced restart semantics can be added in future PRs. Below is my reply to your concerns.
On the e2e pipeline, I have built my own e2e test suites and use them locally for my recent PRs. For this PR, the test covers queued and completed tasks across a server restart. It will be extended to exercise dispatched and in-flight tasks and the intended restart order. I will push the e2e test in this PR, but we have to work on standardizing e2e regression tests in a future PR. |
|
@kaiitunnz Thanks for the clarification, which looks good to me. May you clarify what the worker's server mean? Is it a RunMesh term? |
|
@timzsu Worker server is a FlowMesh term. A FlowMesh cluster consists of two kinds of nodes: a root node and a worker node. The root node holds the cluster states and the Redis instances, while the worker nodes only manage the workers. A worker server is just a server in a worker node. |
First-class drain primitive over destroy_all_workers() so the rolling-restart flow (release in-flight tasks before recreating the worker-managing service) is part of the SDK surface. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
…t [SERVICE]` `flowmesh stack restart server` drains managed workers then recreates only the server/supervisor (--no-deps --force-recreate), leaving Redis and the rest of the stack running. This is the per-node primitive for a rolling image update: recreate the server on each node in turn (root last). No service arg keeps the whole-stack drain/down/up behavior. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The dispatcher's task DAG, ready queue, and epoch state lived only in the root server's memory, so a restart lost all in-flight scheduling. Persist each task's mutable state (status, attempts, assigned worker, failed workers, merge linkage) plus its dependency edges and epoch index to Redis on every transition, and persist per-workflow epoch ordering / frontier. TaskRuntime.rehydrate() rebuilds every live workflow from these durable records on startup: it restores the DAG, re-derives pending deps and the ready queue, and reconstructs epoch frontiers. In-flight tasks stay assigned to their worker — completions that occur during the restart are replayed via the task event stream, and departed workers are reclaimed by the watchdog. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Task lifecycle events flowed over Redis pub/sub, so any TASK_SUCCEEDED / TASK_FAILED emitted while the root server was down was dropped — the dispatcher never learned the outcome. Publish them to a durable Redis stream instead (supervisor relay and watchdog producers), and consume via a blocking stream read that resumes from a persisted cursor, so events emitted during a rolling restart are replayed on startup. mark_succeeded / mark_failed now short-circuit on an already-terminal task so a replayed event can't double-count usage or re-cascade dependents. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Rebuild the scheduler from durable records before the dispatch loop and event consumer start, so a restarted root resumes in-flight workflows. Because this runs inside the ASGI lifespan before it yields, the server does not accept traffic (and its healthcheck does not pass) until rehydration completes — readiness is implicit, no separate probe needed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Add docs/ROLLING_UPDATES.md (operator flow, root survivability, constraints) and an ARCHITECTURE.md pointer. Tests cover scheduler-state persistence and rehydration (completed/ready restore, in-flight tasks staying dispatched, epoch frontier restore, replay idempotency), the SDK drain_workers() call, and the service-scoped `stack restart` (drain-then-recreate, --no-pull, redis skips drain, unknown-service guard). Extend the runtime test doubles with the new persistence methods. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The server flushed the entire Redis DB on shutdown (atexit + lifespan stop), which wiped the durable scheduler state and event stream a restart rehydrates from — so a rolling image update silently dropped every in-flight workflow. State lifetime belongs to the Redis volume, not the server process: stopping or recreating the server now preserves it, and a planned down/up resumes the queue. Reset to a clean slate via `flowmesh stack clean` (removes the Redis volumes). Removes the now-unused clear_redis_state cleanup module. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
A replayed or late TASK_DISPATCHED / TASK_STARTED / progress update must not regress a task that already reached DONE/FAILED/CANCELLED. Without the guard, replay after a root restart could move a terminal task back to DISPATCHED and double-count its usage. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
…contract Persist the stream cursor after each handled entry rather than once per batch, shrinking the window of events replayed after a crash. Spell out the at-least-once replay contract (persist transition → emit → handle → advance cursor; idempotent handlers) in the consumer docstring and the rolling-update doc. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
…restart Worker heartbeats are pub/sub and dropped while the root is down, so a surviving worker looks briefly stale once the root restarts. The watchdog would then reap its rehydrated in-flight tasks and needlessly requeue them. Track tasks rehydrated as DISPATCHED/CANCELLING and extend the watchdog's death grace for the owning worker until WORKER_REHYDRATION_GRACE_SEC (default 120s) has elapsed since rehydration, giving the worker time to re-register first. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The task-event stream is length-bounded, so a consumer that stays down long enough for its cursor to fall behind the trim horizon silently loses the events in between. On resume, compare the persisted cursor against the stream's oldest surviving entry and log a warning when entries were trimmed before they were consumed. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The echo executor now honors spec.data.delay_sec and sleeps for that many seconds before producing its result, keeping the task RUNNING long enough to exercise dispatch and recovery paths (e.g. restarting the server mid-task). Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
A handler exception previously propagated out of the consumer loop and killed the thread, silently stopping every later completion from being processed — a single poison event could wedge the whole control plane. Process each batch in a helper that logs and skips a failing entry (advancing the cursor past it) so the stream keeps flowing; a task left stuck by the skip is reclaimed by the watchdog. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Remove the dead TASK_EVENT_CHANNEL constant (all producers/consumers use the stream); document why rehydrate subtracts only completed deps, not failed ones; trim the startup comment to the non-obvious ordering rationale; drop a stale noqa on test stubs that are no longer over-length. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The no-worker grace window is ephemeral scheduler state that is not persisted, so it resets on a root restart by design. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
e3fd27a to
52bb58c
Compare
PersistedTask and WorkflowSched are now frozen Pydantic models that own their JSON (de)serialization, replacing the hand-built dict helpers; the only special case is re-injecting TaskRecord.failed_workers, which is excluded from its API dump but must survive a restart. rehydrate and the per-task / sched loads are async, using the async Redis client directly instead of offloading sync calls to a thread. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The watchdog's inputs are already typed by WatchdogConfig, so the defensive bool()/int() coercions add nothing; also simplify the stream-row unpacking and fix a docstring typo in the consumer. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
`stack restart` now accepts zero or more SERVICE arguments: it validates the whole set up front, drains workers once if any of them manages workers, then recreates them all in a single compose up. No argument still restarts the whole stack. Also repoints the drain helper at destroy_all_workers ahead of removing the drain_workers alias. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
drain_workers was a pure pass-through to destroy_all_workers; two identically behaving public methods are a footgun. The CLI's own _drain_workers helper already carries the drain-before-restart intent and now calls destroy_all_workers directly. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The knob existed only to hold a task in flight for a local rolling-restart e2e; it is not needed in the product, so drop it and its test. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Rename ROLLING_UPDATES.md to SERVICE_RESTARTS.md and present the feature as fine-grained, in-place restarts that preserve in-flight work, with rolling image updates demoted to one use case. Also expands the state-lifetime explanation (named volumes + RDB persistence) and drops the stale echo delay_sec mention. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The restart command is a general in-place restart primitive, not a rolling-update-only tool; the docstring now describes it as such. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The co-located-worker constraint implied all in-flight tasks requeue on a restart drain. Only batch tasks do; SSH is the one executor with real cancellation, so its in-flight task is cancelled instead. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
GHSA-rrmf-rvhw-rf47 (torch) has no published fix; GHSA-3ww4-5jv9-j5gm (vllm) is fixed in 0.22.0 but vllm stays pinned at 0.18.0 by the vllm-omni compatibility blocker. Both are ignored under the same rationale as the existing torch / vllm entries. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
timzsu
left a comment
There was a problem hiding this comment.
Some comments, feel free to push back. In addition, EXECUTORS.md is not modified, yet the PR description said it has been changed.
A replayed TASK_SUCCEEDED / TASK_FAILED / cancellation could flip a task that is already in a different terminal state, regressing the result or double-counting usage. Each transition now ignores an already-terminal task and logs a warning instead. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Advancing the cursor past a handler exception silently dropped the transition, and the staleness watchdog only reclaims dead workers, not a task stuck under a live one. Handler failures now leave the cursor unadvanced and retry on the next read, dead-lettering only after TASK_EVENT_HANDLER_MAX_ATTEMPTS; parse failures are still skipped as deterministic poison. A missing cursor also falls back to "0-0" (replay from start) rather than "$" (tail), so unconsumed events are not lost. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Type the stack-restart helper's arguments explicitly, back the node-client tests with httpx.MockTransport, and assert get_record is not None before use — removing the # type: ignore comments these worked around. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
timzsu
left a comment
There was a problem hiding this comment.
Two things to discuss. In addition, the PR description contains some changelog-like text (e.g., "falls back to "0-0" (replay from start), not "$" (tail)"), which you may want to clean up.
Terminal transitions (mark_succeeded / mark_failed / mark_cancelled) now apply every in-memory mutation before any Redis write, so a failed persist leaves the in-memory scheduler state fully applied rather than half-mutated. A replayed terminal event re-persists the workflow's terminal records, so the event consumer never advances its cursor past a transition whose persist failed after the in-memory commit. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
timzsu
left a comment
There was a problem hiding this comment.
One comment. In addition, it seems that a crash during persistence is still not recoverable. Is it acceptable?
cancel_workflow split its terminal CANCELLED tasks from CANCELLING ones and persisted the terminal set membership in a post-lock loop, after the record save and outside the lock, so a CANCELLED record could be observed or persisted without its cancelled-set membership. Terminal tasks now persist through _persist_terminal_locked as the single last step inside the lock, matching the mark_succeeded / mark_failed / mark_cancelled pattern; CANCELLING tasks keep their non-terminal record persisted separately. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
_persist_terminal_locked issued one mark_task_done / mark_task_failed / mark_task_cancelled round-trip per task. It now groups task ids by workflow and status and issues at most one call per group, collapsing N per-task Redis transactions into one per status and shrinking the lock-hold time of a terminal transition. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
rehydrate loaded each workflow's task states with one GET per task, costing a round-trip per task before the server accepts traffic on restart. The registry now exposes load_task_states / load_task_states_async, which fetch a workflow's task states in a single mget, and rehydrate loads each workflow's set in one round-trip. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Improving the reliability of |
Purpose
The root holds the dispatcher's scheduling state in memory and task events flowed over fire-and-forget pub/sub, so any restart of the root server — a crash, a deploy, an image bump — lost every in-flight workflow. This makes the root restart-survivable: per-task scheduling state is persisted to Redis on each transition and rebuilt on startup, and task lifecycle events flow through a durable, replayable stream consumed from a persisted cursor. The cost is a brief control-plane pause while the server recreates and rehydrates; workers keep running their tasks throughout.
Rolling node image updates — recreate one node at a time, root last, with no full-cluster teardown — fall out as one application of this, via a per-node
stack restartprimitive. The rollout itself stays operator-driven.Changes
task/runtime.py,registries/workflow.py,clients/redis.py) — persist per-task state, deps, and epoch index to Redis on each transition;TaskRuntime.rehydrate()rebuilds the DAG / ready queue / epoch frontiers on startup.supervisor/services/relay_service.py,services/watchdog.py,services/monitoring.py) — task events move to a durable Redis stream consumed from a persisted cursor (advanced per entry);mark_succeeded/mark_failedare idempotent against replay.main.py) — state lifetime follows the Redis volume, not the process, so a restart rehydrates instead of starting empty; reset viastack clean. Removes the deadcleanup.py.main.py) — rehydrate before the lifespan yields, so readiness is implicit (no traffic until scheduling state is restored).mark_started/mark_dispatched/mark_updated,mark_succeeded/mark_failed/mark_cancelled) is guarded so a replayed or conflicting event cannot regress a terminal task or double-count usage; rehydrated in-flight tasks get a heartbeat grace (WORKER_REHYDRATION_GRACE_SEC, default 120s) so a worker still catching up after a restart is not reaped while its pub/sub heartbeats are missing; and the task-event consumer warns when its cursor trails the bounded stream's trim horizon, resumes from the stream start when no cursor is persisted, drops malformed entries, and retries a failing handler up toTASK_EVENT_HANDLER_MAX_ATTEMPTSbefore dead-lettering it.stack restart [SERVICE ...](cli/stack/.../stack.py) recreates one or more Compose services in place (--no-deps --force-recreate, optional--pull), leaving Redis up and draining managed workers first (once, viadestroy_all_workers()).docs/SERVICE_RESTARTS.md(generic in-place-restart framing),ARCHITECTURE.md,CLI.md,ENV.md; persistence/rehydration, terminal-guard, rehydration-grace, trim-detection, consumer retry/dead-letter, and multi-servicestack restarttests.Design
State is rebuilt, not snapshotted: task IDs are random per parse, so the DAG is reconstructed from persisted per-task records and overlaid with status. Events use a stream + persisted cursor (like the existing log streams) rather than consumer groups; the persist-transition → advance-cursor ordering gives at-least-once delivery, made safe by idempotent handlers and terminal-status guards. In-flight tasks stay assigned on rehydrate — survivors' completions arrive via the stream (no double execution), and departed workers are reclaimed by the watchdog only after the rehydration grace elapses (heartbeats drop during downtime, so a survivor looks briefly stale on resume). Only the server is recreated on a node; Redis stays up so durable state survives.
On a drain, a batch task's in-flight work requeues and re-runs (its executor
cancel()is a no-op, so the worker's departure is recovered viaWORKER_UNREGISTER); an SSH task — the one executor with real cancellation — is cancelled instead, since a live session cannot resume on another node.Test Plan
A local single-node e2e (root + one CPU worker, driven outside the repo tree) exercises the
stack restartprimitive end-to-end — see Test Result.Test Result
pre-commit run --all-filespassed (incl. workspace mypy). Unit tests pass acrosstests/server/+tests/shared/+ the SDK/CLI cases above; the fulltests/suite (GPU worker paths) was not run this session.The single-node e2e drives
stack restartthrough four scenarios, all green (16/16 assertions):PENDING, survivesstack restart server(only the server container is recreated; Redis untouched), and runs toDONEonce a worker joins.DONEworkflow is stillDONEafter a restart (idempotent replay, not re-run).sleepcaught atDISPATCHEDis cleanly cancelled by the drain (terminal, not orphaned).restart redis_control serverrecreates both; aDONEworkflow survives (state lives on the Redis volume).The batch in-flight requeue-and-rerun path is covered by the rehydrate/recovery unit tests and a multi-node manual procedure.
Pre-submission Checklist
pre-commit run --all-filesand fixed any issues.uv run pytest tests/passes locally.uv sync --all-packages --group ci --frozen).[BREAKING]and described migration steps above.