refactor: commit workflow-registry transitions atomically#72
Open
kaiitunnz wants to merge 7 commits into
Open
Conversation
8 tasks
e3261ef to
8f848c1
Compare
Drop update_workflow / update_workflow_async, delete_task_states / delete_task_states_async, and the sync load_workflow_sched — none had any caller. The rehydrate path uses load_workflow_sched_async, which stays. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Every TaskRuntime transition persisted its task records and its workflow status-set membership as separate Redis transactions, so a crash mid-persist could leave durable state half-applied. The API-driven workflow cancel was the sharpest case: unlike event-driven transitions, it has no replay to heal a partial write. Introduce WorkflowTransition and WorkflowRegistry.commit_transition, which applies a transition's record upserts, status-set membership moves, and optional schedule snapshot in one atomic control-Redis transaction. Each transition (dispatch, requeue, terminal, cancel, and replay re-persist) now builds one delta and commits it as the single last step, replacing the per-status mark_task_* verbs and the separate record / schedule writes. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
8f848c1 to
f577fda
Compare
WorkflowTransition added a DTO between every caller and the registry while carrying no behavior of its own. Replace it with keyword-only params on commit_transition so each call site reads as a direct delta. While there, _persist_terminal_locked now collects only terminal tasks, warns when a non-terminal task reaches the path, and skips the commit for a workflow with no terminal move. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The dispatcher and epoch-order stubs accepted **kwargs, so a renamed or typo'd commit_transition param would pass tests while breaking against the real registry. Give them the keyword-only signature so such drift fails loudly. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
timzsu
requested changes
Jun 14, 2026
Comment on lines
+1216
to
+1221
| self._workflow_registry.commit_transition( | ||
| workflow_id, | ||
| records=self._records_locked(*touched), | ||
| cancelled=cancelled, | ||
| sched=self._sched_locked(workflow_id), | ||
| ) |
Collaborator
There was a problem hiding this comment.
Two issues to consider:
cancel_workflowdoes not check for the existence of the workflow. Therefore, if the user passes a wrong ID to the cancel endpoint, we will still commit the transition, which might lead to a partial workflow state. Consider adding an existence check (workflow_exists_async) before it.- Interrupts are also sent via Redis, but they are not republished during rehydration. Can we republish the interrupts when rehydrating the cancellation?
Collaborator
Author
There was a problem hiding this comment.
- Added the workflow existence check by checking the existence of task records corresponding to the workflow ID instead of
workflow_exists_asyncto avoid an additional Redis round-trip. - This is a gap that should be deferred to a future PR because rehydration runs before workers are re-registered, and the interrupts are delivered with Redis pub-sub, causing them to be dropped. This requires a change to the worker lifecycle and more thorough testing. Added this to the PR description.
Comment on lines
+78
to
+84
| def commit_transition( | ||
| self, | ||
| workflow_id: str, | ||
| *, | ||
| records: Sequence[PersistedTask] = (), | ||
| sched: WorkflowSched | None = None, | ||
| **_: Any, |
Collaborator
There was a problem hiding this comment.
Can we match the signature with the runtime? Now it swallows any input which is error-prone.
cancel_workflow committed a schedule snapshot and updated_at bump even when no task matched the id, orphaning workflow keys for an id that never existed and turning the cancel endpoint's intended 404 into a 500 on the partial hash. Return early when the workflow has no in-memory tasks. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The rehydrate fake's commit_transition accepted **kwargs, so a renamed or typo'd param would pass tests while breaking against the real registry. Give it the keyword-only signature, matching the dispatcher and epoch-order stubs. Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
TaskRuntimepersisted each task transition as several independent Redis writes — the task records viasave_task_states, the workflow status-set membership via a per-statusmark_task_*verb, and the schedule snapshot viasave_workflow_sched— each opening its own transaction. A crash between any two left durable state half-applied (e.g. a task in the cancelled set whose persisted record still saysPENDING, so rehydrate re-runs it). The API-driven workflow cancel was the sharpest case: unlike the event-driven transitions, it has no durable event to replay, so nothing heals a partial write. This was raised in review on #64.Changes
src/server/registries/workflow.py: addWorkflowRegistry.commit_transition, which takes a transition delta as explicit keyword params (record upserts, per-status membership moves —dispatched/pending/done/failed/cancelled— and an optionalWorkflowSched) and applies the whole delta in a singlecontrol_pipeline()MULTI/EXEC. Remove the per-status verbs (mark_task_dispatched/pending/done/failed/cancelled, sync + async) and the syncsave_task_states/save_workflow_schedthey were paired with.src/server/task/runtime.py: every transition (dispatch, requeue/mark_pending, terminalmark_succeeded/mark_failed/mark_cancelled,cancel_workflow, and the replay re-persist) now commits onecommit_transitionfrom its already-applied in-memory state as the single last step._persist_terminal_lockedgroups terminal tasks by workflow, folds the schedule snapshot in, warns on (and skips) any non-terminal task, and skips the commit entirely when a workflow has no terminal task to move;cancel_workflowcommits cancelling records + cancelled membership + schedule in one transaction.commit_transition; the five persist-failure tests now fault-inject on that one choke point; two new tests assert the cancel guarantee directly (test_cancel_workflow_commits_atomically_on_crash,test_rehydrate_restores_cancelled_workflow).docs/SERVICE_RESTARTS.md: document the atomic-transition contract and why cancel relies on it alone.update_workflow(_async),delete_task_states(_async), and syncload_workflow_schedfrom the registry.Design
The durable shape of every transition is the same — upsert some records, move some tasks between status sets, maybe snapshot the schedule — so
commit_transitiontakes that delta as explicit params and commits it atomically, rather than scattering it across per-status verbs or a one-offpersist_cancellationhelper. All affected keys live on the control Redis, so a singleMULTI/EXECcovers them: the transition commits in full or not at all, and a crash mid-persist can never leave a half-applied state. Event-driven transitions keep their at-least-once replay backstop; the cancel path, which has none, now relies on this atomicity. The "persist last" invariant is preserved — in-memory mutations all happen before the single commit — so a failed write still leaves in-memory fully applied and replay heals as before. Behavior is otherwise unchanged: each status group reproduces the exact srem/sadd of the old verb,mark_cancelledstill writes no schedule, andupdated_atbumps on the same transitions as before (plus the schedule-only commits, where the durable state did change).Test Plan
End-to-end against a single root + CPU worker on freshly built images carrying this branch (5 scenarios / 22 assertions). It targets what the unit suite cannot reach: the API-driven
workflow cancelpath, which has no event to replay and so relies on the single atomic commit alone, and the workflow status-set membership that derives the reportedDONE/CANCELLEDstatus. Scenarios: API cancel of (1) a queued, (2) an in-flight SSHsleep, and (3) a mixedPENDING+DISPATCHEDworkflow — each asserted to commit the whole cascade in one transaction, rehydrateCANCELLEDacross aflowmesh stack restart server, and never resurrect a task once a worker comes up; (4) completion derivingDONEidempotently across a restart; (5) the_persist_terminal_lockedguard logging zero non-terminal warnings.Test Result
End-to-end suite: 22 passed, 0 failed (exit 0). The API-driven cancel persisted its full cascade and survived a server restart with no task resurrected in all three cancel scenarios (queued, in-flight, mixed-status); completion stayed
DONEacross a restart; and the terminal-persist guard logged 0 non-terminal warnings.Follow-up
Cancel interrupts are delivered over Redis pub/sub, which has no replay. If the root restarts mid-cancel, rehydration restores the task's
CANCELLINGstate but never re-sends the interrupt, so the cancellation can be silently dropped. The fix belongs in the worker lifecycle (re-deliver on re-register) and needs its own testing, so it is deferred to a future PR.Pre-submission Checklist
pre-commit runand fixed any issues.uv run pytest tests/server/passes locally.[BREAKING].