[SPARK-57523][SQL] Declarative Pipelines should not retry flows that fail due to streaming source changes#56588
Open
LuciferYang wants to merge 1 commit into
Conversation
…fail due to streaming source changes When a streaming flow fails because its set of sources changed since the last run (the "There are [N] sources in the checkpoint offsets and now there are [M] sources requested by the query. Cannot continue." assertion), the failure is unrecoverable without a full refresh. The flow's retry decision is made by GraphExecution.determineFlowExecutionActionFromError, which only looked at the retry count, so the flow was retried maxFlowRetryAttempts times against the same checkpoint, re-failing identically each time, and the run ended up reporting the unrelated "has failed more than N times" reason. Centralize the decision in determineFlowExecutionActionFromError - the intended single source of truth for retryability per its own TODO - which now recognizes the source-change error and stops the flow immediately with a dedicated StreamingSourcesChanged reason. The source-change predicate is factored into PipelinesErrors.streamingSourcesChanged, and the now-redundant special-case branch in checkStreamingErrorsAndRetry is removed so the handling flows through the single path. The existing source-change test now also asserts the flow fails exactly once (not maxFlowRetryAttempts + 1 times) and that the run no longer terminates with "has failed more than N times".
b6fb169 to
eb5c54c
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
When a Declarative Pipelines streaming flow fails because its set of sources changed since the last run, the failure is unrecoverable without a full refresh. The retry decision in
GraphExecution.determineFlowExecutionActionFromErroronly considered the retry count, so such a flow was retriedmaxFlowRetryAttemptstimes against the same checkpoint, re-failing identically each time, and the run ended up reporting the unrelated "has failed more than N times" reason.This PR teaches
determineFlowExecutionActionFromError- the method the code already nominates as the single source of truth for retryability - to recognize the source-change error and stop the flow immediately with a dedicatedStreamingSourcesChangedreason. The predicate is factored intoPipelinesErrors.streamingSourcesChanged, and the now-redundant special-case branch incheckStreamingErrorsAndRetryis removed so the handling flows through the single path.Why are the changes needed?
A streaming source-set change cannot be recovered by retrying against the same checkpoint - every retry hits the identical
AssertionError("There are [N] sources in the checkpoint offsets and now there are [M] sources requested by the query. Cannot continue."). Retrying it wastes work, emits the same error repeatedly, and reports a misleading terminal reason instead of the actual cause.Does this PR introduce any user-facing change?
Yes. A flow that fails because its streaming sources changed now fails immediately without retrying, and the run terminates with the source-change failure rather than "has failed more than N times". A full refresh is still required, as before. This is a behavior change within the unreleased/maintained branches, not relative to a different released version.
How was this patch tested?
Extended
TriggeredGraphExecutionSuite("stream failure on deletes and updates gives clear error") to assert the flow fails exactly once (notmaxFlowRetryAttempts + 1times) and that the run no longer terminates with "has failed more than N times". The full suite (15 tests) passes.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.8)