[SPARK-57479][SQL] Read and infer XML schema from tar archives#56572
[SPARK-57479][SQL] Read and infer XML schema from tar archives#56572akshatshenoi-db wants to merge 7 commits into
Conversation
### What changes were proposed in this pull request? SPARK-57135 added reading CSV files packed in tar archives (`.tar`/`.tar.gz`/`.tgz`), SPARK-57321 added CSV schema inference, SPARK-57419 extended both to JSON, and SPARK-57478 to text, all gated by `spark.sql.files.archive.reader.enabled`. This extends the same capability to the XML data source. When the flag is enabled, the V1 XML data source reads a tar archive as if it were a directory of its entries: each entry is streamed through `ArchiveReader` (never unpacked to disk) and parsed exactly like a standalone XML file (`XmlDataSource.readArchive` -> `StaxXmlParser.parseStream`). Schema inference reads every archive entry together with any loose files in a single `XmlInferSchema` pass (`inferWithArchives`), so the inferred schema matches a directory read of the same files. The whole archive is one non-splittable unit (`XmlFileFormat.isSplitable` returns false), and a corrupt/missing archive is skipped as a unit under `ignoreCorruptFiles`/`ignoreMissingFiles`. XML has no DSv2 reader, so the archive scan is V1-only and no `Table` change is needed. This also adjusts the `readArchive` entry point (JSON and XML) to take a parser factory and build a fresh parser for each archive entry -- matching the per-file parser of a non-archive read -- rather than sharing one parser across all entries. ### Why are the changes needed? To let XML ingestion read tar archives without unpacking them to disk, matching the CSV, JSON, and text behavior already in Spark. ### Does this PR introduce _any_ user-facing change? Yes. With `spark.sql.files.archive.reader.enabled=true` (default false), the XML data source can read and infer schemas from `.tar`/`.tar.gz`/`.tgz` files. ### How was this patch tested? New `XMLTarArchiveReadSuite` (mixing `XMLArchiveReadBase` with the shared `ArchiveReadSuiteBase` and `TarArchiveReadBase`), exercising the shared archive read/inference/complex-type tests plus XML-specific tests: multi-line records, attributes, and single-pass null-field widening against a loose file. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code
Override readArchive per data source instead of one mode-agnostic parseStream, mirroring readFile. TextInputXmlDataSource (single-line) splits each entry into lines and runs them through a FailureSafeParser, so a single-line archive entry gets the same per-record corrupt-record handling as a non-archive single-line read. MultiLineXmlDataSource parses each entry as a whole document: it buffers the entry's bytes and re-opens over them for the optimized parser (which re-reads its input to echo the corrupt-record text on a parse failure, which a single-use entry stream cannot do), and reads the entry stream directly for the legacy parser.
… base data source Move inferWithArchives and createBaseRdd onto the base XmlDataSource so both single-line and multi-line inference share one implementation, and infer over all inputs (archives plus any loose files) in a single pass keyed off whether any input is an archive -- mirroring JsonDataSource.inferWithArchives. No behavior change: archive entries and loose files were already inferred in one XmlInferSchema pass; this drops the archive/non-archive partition and the two-RDD union in favor of a single per-input-branch pass.
Branch inferWithArchives on multiLine so inference matches this mode's scan, mirroring JSON's inferWithArchives: multi-line splits each input's whole stream into rowTag-delimited records (StaxXmlParser.tokenizeStream), single-line treats each line as a record (ArchiveReader.lineIterator), so a single-line archive read infers the same schema a single-line directory read does. Previously inference was always rowTag-tokenized, which diverged from the now per-mode scan for single-line input. Factor the per-input open/skip into a `perInput` helper, as JSON does. Add a single-line read+infer parity test.
AI code review (self-review via spark-dev)Ran an automated code review at head Verdict: 0 blocking, 0 non-blocking, 0 nits — clean. Checked:
Added a single-line read+infer parity test. Not built locally; CI is the gate. |
|
|
||
| (file: PartitionedFile) => { | ||
| val parser = new JacksonParser( | ||
| def parser() = new JacksonParser( |
There was a problem hiding this comment.
I think you can make this lazy val instead
cloud-fan
left a comment
There was a problem hiding this comment.
0 blocking, 2 non-blocking, 0 nits.
A clean, faithful port of the established JSON/CSV/text archive pattern with thorough tests; both findings are optional simplifications, one echoing an existing reviewer comment.
Suggestions (2)
- XmlFileFormat.scala:123: parser-factory change appears unnecessary (pre-PR shared parser was safe); aligns with @HyukjinKwon's comment — see inline
- XmlDataSource.scala:338: optimized multi-line read buffers each whole entry in memory — see inline
Verification
Traced the TarArchiveInputStream cursor contract on every new consumption path (the entry stream is a CloseShieldInputStream view valid only until the cursor advances, and readEntries advances only once an entry's iterator is exhausted): multi-line inference emits fully-materialized String records (tokenizeStream), both single-line paths copy each line into a new String(...), the legacy multi-line read consumes its lazy iterator before advancing, and the optimized multi-line read buffers via readAllBytes() then re-opens a ByteArrayInputStream — so no path reads from an advanced cursor or aliases a reused buffer.
|
|
||
| (file: PartitionedFile) => { | ||
| val parser = new StaxXmlParser( | ||
| def parser() = new StaxXmlParser( |
There was a problem hiding this comment.
The parser-factory change (() => StaxXmlParser / () => JacksonParser, fresh parser per entry) looks unnecessary. On master readArchive took a single shared parser and reused it for every entry, and that shipped via SPARK-57419. StaxXmlParser holds no per-stream mutable state — only lazy val/val parsing utilities and a reusable val parse ("intentionally a val to create a function once and reuse") — and parseStream/parseStreamOptimized build their reader and FailureSafeParser locally per call without mutating this; the same is true of JacksonParser. So a shared parser is safe across entries (just as one parser already parses many records within a single file), and the factory just adds a per-entry allocation and a () => parser() double-indirection here.
This is the XML side of @HyukjinKwon's comment on JsonFileFormat.scala. One nuance for that thread: a plain lazy val parser would not preserve the stated "fresh per entry" semantics — the () => parser closure would hand the single memoized instance to every entry — so if you take the simplification, the clean form is to revert readArchive to a shared parser (as on master) rather than change def to lazy val. Non-blocking; the current code is correct either way.
There was a problem hiding this comment.
per entry factory is intended to keep parity with the non archive read path if parser construction ever becomes file/entry-dependent the archive path stays correct without need for revisit.
| if (entryParser.options.useLegacyXMLParser) { | ||
| entryParser.parseStream(in, schema) | ||
| } else { | ||
| val bytes = in.readAllBytes() |
There was a problem hiding this comment.
The default (optimized) multi-line parser re-reads its input to echo the corrupt-record text on a parse failure, which a single-use archive entry stream can't do, so the entry is fully buffered here and re-opened over a ByteArrayInputStream. A non-archive multi-line read instead streams from disk and re-opens the file, and the legacy-parser branch above streams the entry directly without buffering — so this is the one read path that holds a whole entry in memory at once.
This is inherent to single-use entry streams + the optimized parser's re-read requirement, so likely an accepted trade-off, but worth confirming it's fine for a large single XML document packed in an archive, and maybe noting the limitation in the readArchive Scaladoc. Non-blocking.
02bd85e to
fc6c690
Compare
…ive buffering Note in MultiLineXmlDataSource.readArchive's Scaladoc that buffering one whole entry in memory under the optimized parser is an intended trade-off -- the optimized parser needs a re-readable input, so a single very large XML document packed in an archive is materialized in full, while a non-archive read streams from and re-opens the file. Entries are still read one at a time, so archive size itself stays bounded.
cloud-fan
left a comment
There was a problem hiding this comment.
1 addressed, 1 remaining, 2 new. (2 new = 0 newly introduced, 2 late catches.)
0 blocking, 2 non-blocking, 0 nits.
Faithful port; the prior round's optimized-buffering note was addressed. Two optional follow-ups.
Design / architecture (1)
- XmlDataSource.scala:68:
readArchiveis overridden whole per mode, duplicating thereadEntrieswrapper the JSON peer centralizes behind an abstractreadStream— see inline
Suggestions (1)
- XMLArchiveReadBase.scala:116: no malformed-record test for the archive read paths; the JSON peer has one — see inline
Verification
Re-traced the five new consumers of ArchiveReader.readEntries against the single shared TarArchiveInputStream cursor (the entry stream is valid only until the iterator advances): both multi-line reads (legacy lazy-streams the entry; optimized drains via readAllBytes then parses off an independent ByteArrayInputStream), both single-line paths (each line copied into a new String off the reused Text buffer), and the tokenizing inference (tokenizeStream materializes each record as a String) fully consume or materialize each entry before the cursor advances — no use-after-advance, no buffer aliasing. Unchanged this round.
| * each entry's bytes are parsed exactly like a standalone XML file. Single-line and multi-line | ||
| * parse an entry's bytes differently (mirroring [[readFile]]), so each data source overrides it. | ||
| * | ||
| * Kept separate from [[readFile]] (rather than dispatched inside it) because only the V1 |
There was a problem hiding this comment.
Non-blocking: the JSON archive support you're porting keeps readArchive concrete in the base — it calls an abstract readStream(in, parser(), schema) that each mode overrides, so the ArchiveReader.readEntries wiring lives in one place. Here readArchive is left abstract and both modes (TextInputXmlDataSource :239, MultiLineXmlDataSource :332) re-implement the ArchiveReader(file.toPath).readEntries(conf) { ... } wrapper, differing only in the per-entry body.
Mirroring JSON — a concrete base readArchive over an abstract readStream (single-line readStream = lines + FailureSafeParser; multi-line readStream = the legacy/optimized branch) — would centralize that wiring and match the peer. Behavior is identical either way, so this is purely a maintainability/consistency call.
| } | ||
| } | ||
|
|
||
| test("XML: single-line mode reads and infers an archive like a directory") { |
There was a problem hiding this comment.
Non-blocking: no test exercises a malformed XML record through the archive read paths. The single-line readArchive override wires its own FailureSafeParser specifically for per-record corrupt-record handling, and the multi-line path buffers bytes to echo the corrupt record — but the shared ArchiveReadSuiteBase only covers corrupt archive files (file-granular), not corrupt records.
The JSON peer has a direct analogue (JSONArchiveReadBase: "a malformed record in an archive entry matches a directory read (both modes)"). Consider adding the XML equivalent: a malformed single-line record and a malformed whole document in multiLine, each asserted against a directory read with a _corrupt_record column.
…r an abstract readStream Mirror the JSON peer: keep readArchive concrete in the base XmlDataSource (holding the single ArchiveReader.readEntries wiring) and add an abstract readStream that each mode overrides -- single-line splits the entry into lines through a FailureSafeParser, multi-line parses the whole entry. Behavior is unchanged.
…hive read paths Mirror JSONArchiveReadBase's malformed-record test: assert that a corrupt single-line record and a corrupt whole multi-line document in an archive entry produce the same _corrupt_record output as a directory read of the same files, covering the FailureSafeParser wiring in the single-line readStream and the byte-buffering in the multi-line path.
What changes were proposed in this pull request?
SPARK-57135 added reading CSV files packed in tar archives (
.tar/.tar.gz/.tgz), SPARK-57321 added CSV schema inference, SPARK-57419 extended both to JSON, and SPARK-57478 to text, all gated byspark.sql.files.archive.reader.enabled. This extends the same capability to the XML data source.When the flag is enabled, the V1 XML data source reads a tar archive as if it were a directory of its entries: each entry is streamed through
ArchiveReader(never unpacked to disk) and parsed exactly like a standalone XML file.readArchiveis overridden per data source to mirrorreadFile: single-line entries are split into lines and run through aFailureSafeParser(so they get the same per-record corrupt-record handling as a non-archive read), while multi-line entries are parsed as whole documents. Schema inference reads every archive entry together with any loose files in a singleXmlInferSchemapass (inferWithArchives), so the inferred schema matches a directory read of the same files. The whole archive is one non-splittable unit (XmlFileFormat.isSplitablereturns false), and a corrupt/missing archive is skipped as a unit underignoreCorruptFiles/ignoreMissingFiles. XML has no DSv2 reader, so the archive scan is V1-only and noTablechange is needed.This also adjusts the
readArchiveentry point (JSON and XML) to take a parser factory and build a fresh parser for each archive entry -- matching the per-file parser of a non-archive read -- rather than sharing one parser across all entries.Why are the changes needed?
To let XML ingestion read tar archives without unpacking them to disk, matching the CSV, JSON, and text behavior already in Spark.
Does this PR introduce any user-facing change?
Yes. With
spark.sql.files.archive.reader.enabled=true(default false), the XML data source can read and infer schemas from.tar/.tar.gz/.tgzfiles.How was this patch tested?
New
XMLTarArchiveReadSuite(mixingXMLArchiveReadBasewith the sharedArchiveReadSuiteBaseandTarArchiveReadBase), exercising the shared archive read/inference/complex-type tests plus XML-specific tests: multi-line records, attributes, and single-pass null-field widening against a loose file.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code