Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.csv

import com.univocity.parsers.common.AbstractParser
import com.univocity.parsers.common.{AbstractParser, TextParsingException}
import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}

import org.apache.spark.SparkIllegalArgumentException
Expand Down Expand Up @@ -122,15 +122,24 @@ class CSVHeaderChecker(
def checkHeaderColumnNames(line: String): Unit = {
if (options.headerFlag) {
val parser = new CsvParser(options.asParserSettings)
checkHeaderColumnNames(parser.parseLine(line))
checkHeaderColumnNames(UnivocityParser.parseLine(parser, line))
}
}

// This is currently only used to parse CSV with multiLine mode.
private[csv] def checkHeaderColumnNames(tokenizer: AbstractParser[CsvParserSettings]): Unit = {
assert(options.multiLine, "This method should be executed with multiLine.")
if (options.headerFlag) {
val firstRecord = tokenizer.parseNext()
val firstRecord = try {
tokenizer.parseNext()
} catch {
// scalastyle:off line.size.limit
case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
// scalastyle:on line.size.limit
throw UnivocityParser.malformedCsvRecord(e, Option(e.getParsedContent).getOrElse(""))
case e: ArrayIndexOutOfBoundsException =>
throw UnivocityParser.malformedCsvRecord(e, "")
}
checkHeaderColumnNames(firstRecord)
}
setHeaderForSingleVariantColumn.foreach(f => f(headerColumnNames))
Expand All @@ -146,9 +155,20 @@ class CSVHeaderChecker(
// be not extracted.
if (options.headerFlag && isStartOfFile) {
CSVExprUtils.extractHeader(lines, options).foreach { header =>
checkHeaderColumnNames(tokenizer.parseLine(header))
val tokens = try {
tokenizer.parseLine(header)
} catch {
// scalastyle:off line.size.limit
case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
// scalastyle:on line.size.limit
throw UnivocityParser.malformedCsvRecord(e, header)
case e: ArrayIndexOutOfBoundsException =>
throw UnivocityParser.malformedCsvRecord(e, header)
}
checkHeaderColumnNames(tokens)
}
}
setHeaderForSingleVariantColumn.foreach(f => f(headerColumnNames))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ private[sql] object UnivocityParser {
* is bounded to CSVOptions.MAX_ERROR_CONTENT_LENGTH so an oversized value cannot produce a huge
* error message (SPARK-28431).
*/
private def malformedCsvRecord(cause: Throwable, badRecord: String): SparkRuntimeException = {
private[csv] def malformedCsvRecord(cause: Throwable, badRecord: String): SparkRuntimeException = {
val boundedRecord = if (badRecord.length > CSVOptions.MAX_ERROR_CONTENT_LENGTH) {
badRecord.take(CSVOptions.MAX_ERROR_CONTENT_LENGTH) + "..."
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3588,6 +3588,72 @@ abstract class CSVSuite
matchPVals = true)
}

test("SPARK-57515: non-multiLine CSV read with header exceeding maxColumns surfaces " +
"MALFORMED_CSV_RECORD") {
// CSVHeaderChecker called tokenizer.parseLine(header) directly without the AIOOBE guard
// that UnivocityParser.parseLine wraps. A header line wider than maxColumns must surface
// as MALFORMED_CSV_RECORD, not a raw ArrayIndexOutOfBoundsException.
withTempPath { path =>
Files.write(path.toPath, "a,b,c\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
val e = intercept[SparkRuntimeException] {
spark.read
.option("header", "true")
.option("maxColumns", "2")
.csv(path.getAbsolutePath)
.collect()
}
checkError(
exception = e,
condition = "MALFORMED_CSV_RECORD",
sqlState = Some("KD000"),
parameters = Map("badRecord" -> "a,b,c"),
matchPVals = false)
}
}

test("SPARK-57515: multiLine CSV read with header exceeding maxColumns surfaces " +
"MALFORMED_CSV_RECORD") {
// Same gap as the non-multiLine path: the tokenizer.parseNext() call in CSVHeaderChecker
// was unguarded. A multiLine header wider than maxColumns must surface as MALFORMED_CSV_RECORD.
withTempPath { path =>
Files.write(path.toPath, "a,b,c\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
val e = intercept[SparkRuntimeException] {
spark.read
.option("header", "true")
.option("multiLine", "true")
.option("maxColumns", "2")
.csv(path.getAbsolutePath)
.collect()
}
checkError(
exception = e,
condition = "MALFORMED_CSV_RECORD",
sqlState = Some("KD000"),
parameters = Map("badRecord" -> ".*"),
matchPVals = true)
}
}

test("SPARK-57515: Dataset[String] CSV read with header exceeding maxColumns surfaces " +
"MALFORMED_CSV_RECORD") {
// The Dataset[String] path creates a fresh CsvParser in CSVHeaderChecker and called
// parser.parseLine(line) directly, bypassing the AIOOBE guard.
val lines = spark.createDataset(Seq("a,b,c", "1,2,3"))
val e = intercept[SparkRuntimeException] {
spark.read
.option("header", "true")
.option("maxColumns", "2")
.csv(lines)
.collect()
}
checkError(
exception = e,
condition = "MALFORMED_CSV_RECORD",
sqlState = Some("KD000"),
parameters = Map("badRecord" -> "a,b,c"),
matchPVals = false)
}

test("csv with variant") {
withTempPath { path =>
val data =
Expand Down