diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 8e7125a07c1e8..8afdcea716477 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -6178,6 +6178,12 @@ ], "sqlState" : "0A000" }, + "PIPELINE_RUN_FAILED" : { + "message" : [ + "" + ], + "sqlState" : "58000" + }, "PIPELINE_SQL_GRAPH_ELEMENT_REGISTRATION_ERROR" : { "message" : [ "", diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index f8edbc9928000..8eca9afa42675 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -22,6 +22,7 @@ import scala.util.Using import io.grpc.stub.StreamObserver +import org.apache.spark.SparkException import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{ExecutePlanResponse, PipelineCommandResult, Relation, ResolvedIdentifier} import org.apache.spark.connect.proto.PipelineCommand.DefineFlow.AutoCdcFlowDetails @@ -563,12 +564,26 @@ private[connect] object PipelinesHandler extends Logging { // Rethrow any exceptions that caused the pipeline run to fail so that the exception is // propagated back to the SC client / CLI. - runFailureEvent.foreach { event => - throw event.error.get - } + runFailureEvent.foreach(throwRunFailure) } } + /** + * Rethrows the failure behind a terminal run-failure event so it reaches the Spark Connect + * client. Most failures carry the underlying cause (e.g. a flow's QueryExecutionFailure), but + * some termination reasons (UnexpectedRunFailure, FailureStoppingFlow) have none. When the cause + * is absent, throw a PIPELINE_RUN_FAILED error built from the event message rather than calling + * Option.get, which would throw a NoSuchElementException and hide the real failure. Using + * PIPELINE_RUN_FAILED instead of INTERNAL_ERROR avoids mislabeling operational failures as bugs. + */ + private[connect] def throwRunFailure(failureEvent: PipelineEvent): Nothing = { + throw failureEvent.error.getOrElse( + new SparkException( + errorClass = "PIPELINE_RUN_FAILED", + messageParameters = Map("message" -> failureEvent.message), + cause = null)) + } + /** * Creates the table filters for the full refresh and refresh operations based on the StartRun * command user provided. Also validates the command parameters to ensure that they are diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandlerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandlerSuite.scala new file mode 100644 index 0000000000000..fd90ce8eacc42 --- /dev/null +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandlerSuite.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.pipelines + +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.pipelines.common.RunState +import org.apache.spark.sql.pipelines.graph.{FailureStoppingFlow, UnexpectedRunFailure} +import org.apache.spark.sql.pipelines.logging.{ + ConstructPipelineEvent, EventLevel, PipelineEventOrigin, RunProgress} + +class PipelinesHandlerSuite extends SparkFunSuite { + + private def runFailedEvent(message: String, error: Option[Throwable]) = + ConstructPipelineEvent( + origin = PipelineEventOrigin(datasetName = None, flowName = None, sourceCodeLocation = None), + // throwRunFailure only reads message and exception; the remaining fields are filled with + // valid placeholder values to construct the event. + level = EventLevel.INFO, + message = message, + details = RunProgress(RunState.FAILED), + exception = error) + + // Use the real no-cause termination-reason messages so the tests break if their wording drifts. + private val unexpectedRunFailureMessage = UnexpectedRunFailure().message + + private val failureStoppingFlowMessage = + FailureStoppingFlow( + Seq(TableIdentifier("t1", Some("db")), TableIdentifier("t2", Some("db")))).message + + // Regression guard rather than a fix-validation test: the old buggy code (throw error.get) also + // rethrew the cause unchanged, so this case passes against both implementations. The no-cause + // test below is the one that genuinely exercises this PR's fix. + test("throwRunFailure rethrows the underlying cause when present") { + val cause = new RuntimeException("boom") + val thrown = intercept[RuntimeException] { + PipelinesHandler.throwRunFailure(runFailedEvent("Run failed.", Some(cause))) + } + assert(thrown eq cause) + } + + test("throwRunFailure surfaces the message when the failure has no cause") { + // No-cause reasons must fall back to a PIPELINE_RUN_FAILED error built from the event message + // rather than raising NoSuchElementException; the message is forwarded verbatim. + Seq(unexpectedRunFailureMessage, failureStoppingFlowMessage).foreach { message => + val thrown = intercept[SparkException] { + PipelinesHandler.throwRunFailure(runFailedEvent(message, None)) + } + checkError( + thrown, + condition = "PIPELINE_RUN_FAILED", + parameters = Map("message" -> message)) + } + } +}