From 8f017c6eb4b47bf4578f4d2013559250bb345b97 Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Wed, 17 Jun 2026 15:55:45 -0700 Subject: [PATCH 1/3] [SPARK-57515][SQL] Surface MALFORMED_CSV_RECORD instead of ArrayIndexOutOfBoundsException when CSV header exceeds maxColumns --- .../sql/catalyst/csv/CSVHeaderChecker.scala | 43 ++++++++++-- .../execution/datasources/csv/CSVSuite.scala | 66 +++++++++++++++++++ 2 files changed, 104 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala index bec52747dea7c..3d841583f5bae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.catalyst.csv -import com.univocity.parsers.common.AbstractParser +import com.univocity.parsers.common.{AbstractParser, TextParsingException} import com.univocity.parsers.csv.{CsvParser, CsvParserSettings} -import org.apache.spark.SparkIllegalArgumentException +import org.apache.spark.{SparkIllegalArgumentException, SparkRuntimeException} import org.apache.spark.internal.{Logging, MessageWithContext} import org.apache.spark.internal.LogKeys.{CSV_HEADER_COLUMN_NAME, CSV_HEADER_COLUMN_NAMES, CSV_HEADER_LENGTH, CSV_SCHEMA_FIELD_NAME, CSV_SCHEMA_FIELD_NAMES, CSV_SOURCE, NUM_COLUMNS} import org.apache.spark.sql.internal.SQLConf @@ -122,7 +122,7 @@ class CSVHeaderChecker( def checkHeaderColumnNames(line: String): Unit = { if (options.headerFlag) { val parser = new CsvParser(options.asParserSettings) - checkHeaderColumnNames(parser.parseLine(line)) + checkHeaderColumnNames(UnivocityParser.parseLine(parser, line)) } } @@ -130,7 +130,16 @@ class CSVHeaderChecker( private[csv] def checkHeaderColumnNames(tokenizer: AbstractParser[CsvParserSettings]): Unit = { assert(options.multiLine, "This method should be executed with multiLine.") if (options.headerFlag) { - val firstRecord = tokenizer.parseNext() + val firstRecord = try { + tokenizer.parseNext() + } catch { + // scalastyle:off line.size.limit + case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] => + // scalastyle:on line.size.limit + throw malformedCsvHeaderRecord(e, Option(e.getParsedContent).getOrElse("")) + case e: ArrayIndexOutOfBoundsException => + throw malformedCsvHeaderRecord(e, "") + } checkHeaderColumnNames(firstRecord) } setHeaderForSingleVariantColumn.foreach(f => f(headerColumnNames)) @@ -146,9 +155,33 @@ class CSVHeaderChecker( // be not extracted. if (options.headerFlag && isStartOfFile) { CSVExprUtils.extractHeader(lines, options).foreach { header => - checkHeaderColumnNames(tokenizer.parseLine(header)) + val tokens = try { + tokenizer.parseLine(header) + } catch { + // scalastyle:off line.size.limit + case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] => + // scalastyle:on line.size.limit + throw malformedCsvHeaderRecord(e, header) + case e: ArrayIndexOutOfBoundsException => + throw malformedCsvHeaderRecord(e, header) + } + checkHeaderColumnNames(tokens) } } setHeaderForSingleVariantColumn.foreach(f => f(headerColumnNames)) } + + // scalastyle:off line.size.limit + private def malformedCsvHeaderRecord(cause: Throwable, badRecord: String): SparkRuntimeException = { + // scalastyle:on line.size.limit + val boundedRecord = if (badRecord.length > CSVOptions.MAX_ERROR_CONTENT_LENGTH) { + badRecord.take(CSVOptions.MAX_ERROR_CONTENT_LENGTH) + "..." + } else { + badRecord + } + new SparkRuntimeException( + errorClass = "MALFORMED_CSV_RECORD", + messageParameters = Map("badRecord" -> boundedRecord), + cause = cause) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index e48b453309aa8..b965052da499e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -3588,6 +3588,72 @@ abstract class CSVSuite matchPVals = true) } + test("SPARK-57515: non-multiLine CSV read with header exceeding maxColumns surfaces " + + "MALFORMED_CSV_RECORD") { + // CSVHeaderChecker called tokenizer.parseLine(header) directly without the AIOOBE guard + // that UnivocityParser.parseLine wraps. A header line wider than maxColumns must surface + // as MALFORMED_CSV_RECORD, not a raw ArrayIndexOutOfBoundsException. + withTempPath { path => + Files.write(path.toPath, "a,b,c\n1,2,3\n".getBytes(StandardCharsets.UTF_8)) + val e = intercept[SparkRuntimeException] { + spark.read + .option("header", "true") + .option("maxColumns", "2") + .csv(path.getAbsolutePath) + .collect() + } + checkError( + exception = e, + condition = "MALFORMED_CSV_RECORD", + sqlState = Some("KD000"), + parameters = Map("badRecord" -> ".*"), + matchPVals = true) + } + } + + test("SPARK-57515: multiLine CSV read with header exceeding maxColumns surfaces " + + "MALFORMED_CSV_RECORD") { + // Same gap as the non-multiLine path: the tokenizer.parseNext() call in CSVHeaderChecker + // was unguarded. A multiLine header wider than maxColumns must surface as MALFORMED_CSV_RECORD. + withTempPath { path => + Files.write(path.toPath, "a,b,c\n1,2,3\n".getBytes(StandardCharsets.UTF_8)) + val e = intercept[SparkRuntimeException] { + spark.read + .option("header", "true") + .option("multiLine", "true") + .option("maxColumns", "2") + .csv(path.getAbsolutePath) + .collect() + } + checkError( + exception = e, + condition = "MALFORMED_CSV_RECORD", + sqlState = Some("KD000"), + parameters = Map("badRecord" -> ".*"), + matchPVals = true) + } + } + + test("SPARK-57515: Dataset[String] CSV read with header exceeding maxColumns surfaces " + + "MALFORMED_CSV_RECORD") { + // The Dataset[String] path creates a fresh CsvParser in CSVHeaderChecker and called + // parser.parseLine(line) directly, bypassing the AIOOBE guard. + val lines = spark.createDataset(Seq("a,b,c", "1,2,3")) + val e = intercept[SparkRuntimeException] { + spark.read + .option("header", "true") + .option("maxColumns", "2") + .csv(lines) + .collect() + } + checkError( + exception = e, + condition = "MALFORMED_CSV_RECORD", + sqlState = Some("KD000"), + parameters = Map("badRecord" -> "a,b,c"), + matchPVals = false) + } + test("csv with variant") { withTempPath { path => val data = From ed60ee95b9b2804098374bf96bcb835761ae1356 Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Thu, 18 Jun 2026 13:00:06 -0700 Subject: [PATCH 2/3] Use UnivocityParser error helper for CSV headers Replace CSVHeaderChecker's local malformedCsvHeaderRecord calls with UnivocityParser.malformedCsvRecord to centralize malformed-record errors. Expose malformedCsvRecord as package-private (private[csv]) so it can be reused. Remove the now-unused malformedCsvHeaderRecord method and an unused SparkRuntimeException import. Update CSVSuite to assert the exact badRecord parameter ("a,b,c") and set matchPVals=false to match the new behavior. --- .../sql/catalyst/csv/CSVHeaderChecker.scala | 23 ++++--------------- .../sql/catalyst/csv/UnivocityParser.scala | 2 +- .../execution/datasources/csv/CSVSuite.scala | 4 ++-- 3 files changed, 8 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala index 3d841583f5bae..9f4eb66775ce4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.csv import com.univocity.parsers.common.{AbstractParser, TextParsingException} import com.univocity.parsers.csv.{CsvParser, CsvParserSettings} -import org.apache.spark.{SparkIllegalArgumentException, SparkRuntimeException} +import org.apache.spark.SparkIllegalArgumentException import org.apache.spark.internal.{Logging, MessageWithContext} import org.apache.spark.internal.LogKeys.{CSV_HEADER_COLUMN_NAME, CSV_HEADER_COLUMN_NAMES, CSV_HEADER_LENGTH, CSV_SCHEMA_FIELD_NAME, CSV_SCHEMA_FIELD_NAMES, CSV_SOURCE, NUM_COLUMNS} import org.apache.spark.sql.internal.SQLConf @@ -136,9 +136,9 @@ class CSVHeaderChecker( // scalastyle:off line.size.limit case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] => // scalastyle:on line.size.limit - throw malformedCsvHeaderRecord(e, Option(e.getParsedContent).getOrElse("")) + throw UnivocityParser.malformedCsvRecord(e, Option(e.getParsedContent).getOrElse("")) case e: ArrayIndexOutOfBoundsException => - throw malformedCsvHeaderRecord(e, "") + throw UnivocityParser.malformedCsvRecord(e, "") } checkHeaderColumnNames(firstRecord) } @@ -161,9 +161,9 @@ class CSVHeaderChecker( // scalastyle:off line.size.limit case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] => // scalastyle:on line.size.limit - throw malformedCsvHeaderRecord(e, header) + throw UnivocityParser.malformedCsvRecord(e, header) case e: ArrayIndexOutOfBoundsException => - throw malformedCsvHeaderRecord(e, header) + throw UnivocityParser.malformedCsvRecord(e, header) } checkHeaderColumnNames(tokens) } @@ -171,17 +171,4 @@ class CSVHeaderChecker( setHeaderForSingleVariantColumn.foreach(f => f(headerColumnNames)) } - // scalastyle:off line.size.limit - private def malformedCsvHeaderRecord(cause: Throwable, badRecord: String): SparkRuntimeException = { - // scalastyle:on line.size.limit - val boundedRecord = if (badRecord.length > CSVOptions.MAX_ERROR_CONTENT_LENGTH) { - badRecord.take(CSVOptions.MAX_ERROR_CONTENT_LENGTH) + "..." - } else { - badRecord - } - new SparkRuntimeException( - errorClass = "MALFORMED_CSV_RECORD", - messageParameters = Map("badRecord" -> boundedRecord), - cause = cause) - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 113f9b088738b..1b28016dd38d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -654,7 +654,7 @@ private[sql] object UnivocityParser { * is bounded to CSVOptions.MAX_ERROR_CONTENT_LENGTH so an oversized value cannot produce a huge * error message (SPARK-28431). */ - private def malformedCsvRecord(cause: Throwable, badRecord: String): SparkRuntimeException = { + private[csv] def malformedCsvRecord(cause: Throwable, badRecord: String): SparkRuntimeException = { val boundedRecord = if (badRecord.length > CSVOptions.MAX_ERROR_CONTENT_LENGTH) { badRecord.take(CSVOptions.MAX_ERROR_CONTENT_LENGTH) + "..." } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index b965052da499e..7144bd4c6c9a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -3606,8 +3606,8 @@ abstract class CSVSuite exception = e, condition = "MALFORMED_CSV_RECORD", sqlState = Some("KD000"), - parameters = Map("badRecord" -> ".*"), - matchPVals = true) + parameters = Map("badRecord" -> "a,b,c"), + matchPVals = false) } } From 68946f7dd190fc1c05c1a679c41a075b0a3a941c Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Thu, 18 Jun 2026 19:22:02 -0700 Subject: [PATCH 3/3] Use UnivocityParser.parseLine for header parsing; update tests Route header parsing through UnivocityParser.parseLine to ensure out-of-range column errors surface as MALFORMED_CSV_RECORD (rather than raw exceptions) and to keep error handling consistent. Remove an obsolete comment and adjust tests to expect a wrapped SparkException for multiLine reads, asserting the MALFORMED_CSV_RECORD is present on the causal SparkRuntimeException. Minor comment clarifications and formatting changes included. --- .../sql/catalyst/csv/UnivocityParser.scala | 3 ++- .../datasources/csv/CSVDataSource.scala | 5 +--- .../execution/datasources/csv/CSVSuite.scala | 23 ++++++++++++------- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 1b28016dd38d2..a028f77495a48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -654,7 +654,8 @@ private[sql] object UnivocityParser { * is bounded to CSVOptions.MAX_ERROR_CONTENT_LENGTH so an oversized value cannot produce a huge * error message (SPARK-28431). */ - private[csv] def malformedCsvRecord(cause: Throwable, badRecord: String): SparkRuntimeException = { + private[csv] def malformedCsvRecord( + cause: Throwable, badRecord: String): SparkRuntimeException = { val boundedRecord = if (badRecord.length > CSVOptions.MAX_ERROR_CONTENT_LENGTH) { badRecord.take(CSVOptions.MAX_ERROR_CONTENT_LENGTH) + "..." } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 7424d43341b26..5f7f046ad49ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -352,7 +352,7 @@ object TextInputCSVDataSource extends CSVDataSource { maybeFirstLine: Option[String], parsedOptions: CSVOptions): StructType = { val csvParser = new CsvParser(parsedOptions.asParserSettings) - maybeFirstLine.map(csvParser.parseLine(_)) match { + maybeFirstLine.map(UnivocityParser.parseLine(csvParser, _)) match { case Some(firstRow) if firstRow != null => val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions) @@ -362,9 +362,6 @@ object TextInputCSVDataSource extends CSVDataSource { val linesWithoutHeader = CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get, parsedOptions) val parser = new CsvParser(parsedOptions.asParserSettings) - // Route data rows through UnivocityParser.parseLine so a too-many-columns row surfaces as - // MALFORMED_CSV_RECORD, not a raw ArrayIndexOutOfBoundsException (SPARK-57195). The - // first-line parse above stays raw to keep SPARK-28431's bounded TextParsingException. linesWithoutHeader.map(UnivocityParser.parseLine(parser, _)) } SQLExecution.withSQLConfPropagated(csv.sparkSession) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 7144bd4c6c9a6..ee775f8e28605 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -3590,9 +3590,9 @@ abstract class CSVSuite test("SPARK-57515: non-multiLine CSV read with header exceeding maxColumns surfaces " + "MALFORMED_CSV_RECORD") { - // CSVHeaderChecker called tokenizer.parseLine(header) directly without the AIOOBE guard + // inferFromDataset called csvParser.parseLine(header) directly without the AIOOBE guard // that UnivocityParser.parseLine wraps. A header line wider than maxColumns must surface - // as MALFORMED_CSV_RECORD, not a raw ArrayIndexOutOfBoundsException. + // as MALFORMED_CSV_RECORD, not a raw TextParsingException. withTempPath { path => Files.write(path.toPath, "a,b,c\n1,2,3\n".getBytes(StandardCharsets.UTF_8)) val e = intercept[SparkRuntimeException] { @@ -3613,11 +3613,12 @@ abstract class CSVSuite test("SPARK-57515: multiLine CSV read with header exceeding maxColumns surfaces " + "MALFORMED_CSV_RECORD") { - // Same gap as the non-multiLine path: the tokenizer.parseNext() call in CSVHeaderChecker - // was unguarded. A multiLine header wider than maxColumns must surface as MALFORMED_CSV_RECORD. + // For multiLine reads, schema inference runs inside an RDD task, so the + // SparkRuntimeException(MALFORMED_CSV_RECORD) is wrapped in SparkException(FAILED_READ_FILE). + // Verify the cause chain surfaces the MALFORMED_CSV_RECORD condition. withTempPath { path => Files.write(path.toPath, "a,b,c\n1,2,3\n".getBytes(StandardCharsets.UTF_8)) - val e = intercept[SparkRuntimeException] { + val e = intercept[SparkException] { spark.read .option("header", "true") .option("multiLine", "true") @@ -3625,8 +3626,14 @@ abstract class CSVSuite .csv(path.getAbsolutePath) .collect() } - checkError( + checkErrorMatchPVals( exception = e, + condition = "FAILED_READ_FILE.NO_HINT", + parameters = Map("path" -> ".*")) + val cause = e.getCause + assert(cause.isInstanceOf[SparkRuntimeException]) + checkError( + exception = cause.asInstanceOf[SparkRuntimeException], condition = "MALFORMED_CSV_RECORD", sqlState = Some("KD000"), parameters = Map("badRecord" -> ".*"), @@ -3636,8 +3643,8 @@ abstract class CSVSuite test("SPARK-57515: Dataset[String] CSV read with header exceeding maxColumns surfaces " + "MALFORMED_CSV_RECORD") { - // The Dataset[String] path creates a fresh CsvParser in CSVHeaderChecker and called - // parser.parseLine(line) directly, bypassing the AIOOBE guard. + // inferFromDataset called csvParser.parseLine(header) directly without the AIOOBE guard + // that UnivocityParser.parseLine wraps. val lines = spark.createDataset(Seq("a,b,c", "1,2,3")) val e = intercept[SparkRuntimeException] { spark.read