Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -6178,6 +6178,12 @@
],
"sqlState" : "0A000"
},
"PIPELINE_RUN_FAILED" : {
"message" : [
"<message>"
],
"sqlState" : "58000"
},
"PIPELINE_SQL_GRAPH_ELEMENT_REGISTRATION_ERROR" : {
"message" : [
"<message>",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.Using

import io.grpc.stub.StreamObserver

import org.apache.spark.SparkException
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{ExecutePlanResponse, PipelineCommandResult, Relation, ResolvedIdentifier}
import org.apache.spark.connect.proto.PipelineCommand.DefineFlow.AutoCdcFlowDetails
Expand Down Expand Up @@ -563,12 +564,26 @@ private[connect] object PipelinesHandler extends Logging {

// Rethrow any exceptions that caused the pipeline run to fail so that the exception is
// propagated back to the SC client / CLI.
runFailureEvent.foreach { event =>
throw event.error.get
}
runFailureEvent.foreach(throwRunFailure)
}
}

/**
* Rethrows the failure behind a terminal run-failure event so it reaches the Spark Connect
* client. Most failures carry the underlying cause (e.g. a flow's QueryExecutionFailure), but
* some termination reasons (UnexpectedRunFailure, FailureStoppingFlow) have none. When the cause
* is absent, throw a PIPELINE_RUN_FAILED error built from the event message rather than calling
* Option.get, which would throw a NoSuchElementException and hide the real failure. Using
* PIPELINE_RUN_FAILED instead of INTERNAL_ERROR avoids mislabeling operational failures as bugs.
*/
private[connect] def throwRunFailure(failureEvent: PipelineEvent): Nothing = {
throw failureEvent.error.getOrElse(
new SparkException(
errorClass = "PIPELINE_RUN_FAILED",
messageParameters = Map("message" -> failureEvent.message),
cause = null))
}

/**
* Creates the table filters for the full refresh and refresh operations based on the StartRun
* command user provided. Also validates the command parameters to ensure that they are
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connect.pipelines

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.pipelines.common.RunState
import org.apache.spark.sql.pipelines.graph.{FailureStoppingFlow, UnexpectedRunFailure}
import org.apache.spark.sql.pipelines.logging.{
ConstructPipelineEvent, EventLevel, PipelineEventOrigin, RunProgress}

class PipelinesHandlerSuite extends SparkFunSuite {

private def runFailedEvent(message: String, error: Option[Throwable]) =
ConstructPipelineEvent(
origin = PipelineEventOrigin(datasetName = None, flowName = None, sourceCodeLocation = None),
// throwRunFailure only reads message and exception; the remaining fields are filled with
// valid placeholder values to construct the event.
level = EventLevel.INFO,
message = message,
details = RunProgress(RunState.FAILED),
exception = error)

// Use the real no-cause termination-reason messages so the tests break if their wording drifts.
private val unexpectedRunFailureMessage = UnexpectedRunFailure().message

private val failureStoppingFlowMessage =
FailureStoppingFlow(
Seq(TableIdentifier("t1", Some("db")), TableIdentifier("t2", Some("db")))).message

// Regression guard rather than a fix-validation test: the old buggy code (throw error.get) also
// rethrew the cause unchanged, so this case passes against both implementations. The no-cause
// test below is the one that genuinely exercises this PR's fix.
test("throwRunFailure rethrows the underlying cause when present") {
val cause = new RuntimeException("boom")
val thrown = intercept[RuntimeException] {
PipelinesHandler.throwRunFailure(runFailedEvent("Run failed.", Some(cause)))
}
assert(thrown eq cause)
}

test("throwRunFailure surfaces the message when the failure has no cause") {
// No-cause reasons must fall back to a PIPELINE_RUN_FAILED error built from the event message
// rather than raising NoSuchElementException; the message is forwarded verbatim.
Seq(unexpectedRunFailureMessage, failureStoppingFlowMessage).foreach { message =>
val thrown = intercept[SparkException] {
PipelinesHandler.throwRunFailure(runFailedEvent(message, None))
}
checkError(
thrown,
condition = "PIPELINE_RUN_FAILED",
parameters = Map("message" -> message))
}
}
}