Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,32 @@ object GraphExecution extends Logging {
}

/**
* Analyze the exception thrown by flow execution and figure out if we should retry the execution,
* or we need to reanalyze the flow entirely to resolve issues like schema changes.
* Represents that the `FlowExecution` should be stopped because a streaming flow's set of
* sources changed since the last run. This is unrecoverable without a full refresh, so the flow
* must not be retried regardless of the remaining retry budget.
*/
private case class StreamingSourcesChanged(
cause: Throwable,
flowDisplayName: String
) extends FlowExecutionStopReason {
override lazy val runTerminationReason: RunTerminationReason = {
QueryExecutionFailure(flowDisplayName, maxRetries = 0, Option(cause))
}
override lazy val failureMessage: String = {
s"Flow '$flowDisplayName' had streaming sources added or removed. It will not be " +
s"retried. Please perform a full refresh to rebuild it against the current sources."
}
}

/**
* Analyze the exception thrown by flow execution and decide whether to retry the execution or
* stop it. The result is either RetryFlowExecution or StopFlowExecution; this function does not
* reanalyze the flow itself.
* This should be the narrow waist for all exception analysis in flow execution.
* TODO: currently it only handles schema change and max retries, we should aim to extend this to
* include other non-retryable exception as well so we can have a single SoT for all these error
* matching logic.
* Currently it handles max retries and streaming source changes; other non-retryable errors are
* still routed through the retry path.
* TODO: extend this to include other non-retryable exceptions as well so we can have a single
* SoT for all these error matching logic.
* @param ex Exception to analyze.
* @param flowDisplayName The user facing flow name with the error.
* @param currentNumTries Number of times the flow has been tried.
Expand All @@ -278,8 +298,12 @@ object GraphExecution extends Logging {
currentNumTries: => Int,
maxAllowedRetries: => Int
): FlowExecutionAction = {
val flowExecutionNonRetryableReasonOpt = if (currentNumTries > maxAllowedRetries) {
Some(MaxRetryExceeded(ex, flowDisplayName, maxAllowedRetries))
val error = ex
val flowExecutionNonRetryableReasonOpt = if (PipelinesErrors.streamingSourcesChanged(error)) {
// Source-set changes need a full refresh, so they are never retried.
Some(StreamingSourcesChanged(error, flowDisplayName))
} else if (currentNumTries > maxAllowedRetries) {
Some(MaxRetryExceeded(error, flowDisplayName, maxAllowedRetries))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ object PipelinesErrors extends Logging {
getExceptionChain(throwable).exists(check)
}

/**
* Returns true if `ex` (or any of its causes) indicates that a streaming flow's set of sources
* changed since the last run. This is unrecoverable without a full refresh, so a flow that fails
* with this error must not be retried.
*/
private[graph] def streamingSourcesChanged(ex: Throwable): Boolean = {
checkCauses(
throwable = ex,
check = cause => {
cause.isInstanceOf[AssertionError] &&
cause.getMessage != null &&
cause.getMessage.contains("sources in the checkpoint offsets and now there are") &&
cause.getMessage.contains("sources requested by the query. Cannot continue.")
}
)
}

/**
* Checks an error for streaming specific handling. This is a pretty messy signature as a result
* of unifying some divergences between the triggered caller in TriggeredGraphExecution and the
Expand All @@ -87,28 +104,7 @@ object PipelinesErrors extends Logging {
maxRetries: Int,
onRetry: => Unit
): Unit = {
if (PipelinesErrors.checkCauses(
throwable = ex,
check = ex => {
ex.isInstanceOf[AssertionError] &&
ex.getMessage != null &&
ex.getMessage.contains("sources in the checkpoint offsets and now there are") &&
ex.getMessage.contains("sources requested by the query. Cannot continue.")
}
)) {
val message = s"""
|Flow '${flow.displayName}' had streaming sources added or removed. Please perform a
|full refresh in order to rebuild '${flow.displayName}' against the current set of
|sources.
|""".stripMargin

env.flowProgressEventLogger.recordFailed(
flow = flow,
exception = ex,
logAsWarn = false,
messageOpt = Option(message)
)
} else if (flow.once && ex == null) {
if (flow.once && ex == null) {
// No need to do anything if this is a ONCE flow with no exception. That just means it's done.
} else {
val actionFromError = GraphExecution.determineFlowExecutionActionFromError(
Expand All @@ -120,7 +116,8 @@ object PipelinesErrors extends Logging {
actionFromError match {
// Simply retry
case GraphExecution.RetryFlowExecution => onRetry
// Schema change exception
// Non-retryable stop reason (max retries exceeded, streaming sources changed, ...).
// When shouldRethrow is true, this rethrows so the run stops eagerly on these reasons.
case GraphExecution.StopFlowExecution(reason) =>
val msg = reason.failureMessage
if (reason.warnInsteadOfError) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,13 +462,26 @@ class TriggeredGraphExecutionSuite extends ExecutionTest with SharedSparkSession
updateContext2.pipelineExecution.runPipeline()
updateContext2.pipelineExecution.awaitCompletion()

// A streaming source change is unrecoverable without a full refresh, so the flow must not be
// retried: we should see exactly one failure rather than maxFlowRetryAttempts + 1 of them.
assertFlowProgressEvent(
eventBuffer = updateContext2.eventBuffer,
identifier = fullyQualifiedIdentifier("input_table"),
expectedFlowStatus = FlowStatus.FAILED,
expectedEventLevel = EventLevel.ERROR,
msgChecker = _.contains(
s"Flow '${eventLogName("input_table")}' had streaming sources added or removed."
),
expectedNumOfEvents = Option(1)
)

// The run should fail because of the source change, not because the flow exhausted its retries.
assertRunProgressEvent(
eventBuffer = updateContext2.eventBuffer,
state = RunState.FAILED,
expectedEventLevel = EventLevel.ERROR,
msgChecker = _.contains(
s"flow '${eventLogName("input_table")}' has failed."
)
)
}
Expand Down