Skip to content

[SPARK-56919][SQL] Move setupJob before materializeAdaptiveSparkPlan to prevent table path loss#56126

Open
shrirangmhalgi wants to merge 3 commits into
apache:masterfrom
shrirangmhalgi:SPARK-56919-setupJob-before-materialize
Open

[SPARK-56919][SQL] Move setupJob before materializeAdaptiveSparkPlan to prevent table path loss#56126
shrirangmhalgi wants to merge 3 commits into
apache:masterfrom
shrirangmhalgi:SPARK-56919-setupJob-before-materialize

Conversation

@shrirangmhalgi

@shrirangmhalgi shrirangmhalgi commented May 26, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

  1. Move committer.setupJob(job) from inside writeAndCommit() to before materializeAdaptiveSparkPlan() in FileFormatWriter.write(), so the output path is recreated before anything can throw.
  2. Wrap the post-setupJob body in try { ... } catch { committer.abortJob(job); throw } so the staging dir is cleaned up on any failure (e.g., AQE shuffle stage failure in materializeAdaptiveSparkPlan).
  3. Remove writeAndCommit's inner try / catch + abortJob since the outer catch now handles it - avoiding double-calling of abortJob for write / commit failures

Why are the changes needed?

INSERT OVERWRITE deletes the output path before calling write(). When materializeAdaptiveSparkPlan throws (AQE shuffle stage failure), writeAndCommit is never reached, so setupJob never recreates the path. The table path is permanently lost. The outer try / catch ensures abortJob cleans up the staging dir (_temporary / .spark-staging-*) on any failure after setupJob.

Does this PR introduce any user-facing change?

Yes. Previously, a failed INSERT OVERWRITE with AQE could permanently delete the table path. Now the path survives the failure.

How was this patch tested?

Added regression test in InsertSuite that uses a failing UDF in a shuffle stage to trigger AQE failure during materializeAdaptiveSparkPlan. Verifies the table path exists after the failed overwrite.

Was this patch authored or co-authored using generative AI tooling?

Yes. Authored using Claude Opus 4.6.

@shrirangmhalgi shrirangmhalgi marked this pull request as ready for review May 26, 2026 18:25
@shrirangmhalgi

Copy link
Copy Markdown
Contributor Author

@dongjoon-hyun / @gengliangwang. Could you please review? This is a one-line fix - moves setupJob before materializeAdaptiveSparkPlan to prevent permanent table path loss when AQE fails during INSERT OVERWRITE.

@shrirangmhalgi

Copy link
Copy Markdown
Contributor Author

Gentle ping @cloud-fan @gengliangwang - this is a one-line fix for a data loss bug: INSERT OVERWRITE with AQE failure permanently deletes the table path because setupJob is never reached.

@yadavay-amzn yadavay-amzn left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments (no approval)

Assessment: The fix correctly identifies a real data-loss bug: INSERT OVERWRITE deletes the table path before calling FileFormatWriter.write(), and if materializeAdaptiveSparkPlan throws (AQE shuffle failure), setupJob was never reached, so the output directory was never recreated — permanent path loss.

Moving setupJob before materializeAdaptiveSparkPlan ensures the directory is recreated before anything can throw. The test is well-constructed and exercises the actual runtime failure path (not just plan shape).

Blocking concern:

  • Staging directory leak on AQE failure: With the new ordering, if materializeAdaptiveSparkPlan throws after setupJob but before writeAndCommit, neither commitJob nor abortJob is called. setupJob (via Hadoop's FileOutputCommitter) creates a staging directory at outputPath/.spark-staging-{jobId}/_temporary/. This directory will be leaked. The old code had the same semantic guarantee — setupJob was intentionally placed outside the try/catch block in writeAndCommit — but in the old code, if setupJob succeeded, execution always entered writeAndCommit's try block, so abortJob would fire on any subsequent failure. Now there's a gap. Consider wrapping the post-setupJob section of write() in a try/catch that calls committer.abortJob(job) on failure, e.g.:

    committer.setupJob(job)
    try {
      // materializeAdaptiveSparkPlan + the rest of write()
      ...
    } catch { case cause: Throwable =>
      committer.abortJob(job)
      throw cause
    }

    Or at minimum, document that the staging dir may leak in this failure window and explain why that's acceptable (it arguably is — temp dirs are ephemeral and the data-loss fix is more important).

Non-blocking:

  • nit: The removed comment ("This call shouldn't be put into the try block below because it only initializes and prepares the job, any exception thrown from here shouldn't cause abortJob() to be called") was historically accurate. Now that setupJob is in a different location entirely, the deletion of that comment is correct, but the new comment could mention why it's safe to not have abortJob protection here (staging dir is acceptable to leak vs. losing the table path).

  • nit: The test uses "spark.sql.optimizer.plannedWrite.enabled" as a string literal. Consider using SQLConf.PLANNED_WRITE_ENABLED.key for consistency with the other config references.

  • The fix also affects FileStreamSink (another caller of FileFormatWriter.write). This should be benign since streaming sinks don't do the delete-before-write pattern, but noting for awareness.

What I verified

  1. Code-traced the full write path: InsertIntoHadoopFsRelationCommand.run()deleteMatchingPartitions (deletes table path) → FileFormatWriter.write() → (old: materialize → writeAndCommit → setupJob; new: setupJob → materialize → writeAndCommit). Confirmed the ordering change is the minimal fix.

  2. Verified setupJob semantics: HadoopMapReduceCommitProtocol.setupJob creates job/task IDs in config, instantiates the OutputCommitter, and calls Hadoop's committer.setupJob which creates the output directory. Nothing between the new setupJob position and writeAndCommit depends on config set by setupJob, and writeJobUUID (set after setupJob) is not consumed by setupJob.

  3. Verified no double-call: setupJob is removed from writeAndCommit and added once in write(), covering both the planned-write and unplanned-write paths via the two executeWrite overloads.

  4. Verified test exercises real bug path: plannedWrite.enabled=false ensures materializeAdaptiveSparkPlan is called; repartition(2) forces shuffle; fail_udf throws during shuffle execution inside AdaptiveSparkPlanExec.finalPhysicalPlan; the assertion checks the actual filesystem path existence. This is a proper runtime regression test, not just a plan-shape check. Rating: ADEQUATE — exercises the exact failure mode.

  5. Did not run the test locally — assessed via code trace only.

@cloud-fan cloud-fan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review summary: correct, minimal fix for a real data-loss bug — 0 blocking, 1 non-blocking, 2 nits.

Design / architecture (non-blocking)

  • FileFormatWriter.write now calls setupJob outside writeAndCommit's try/catch, so if materializeAdaptiveSparkPlan throws, neither commitJob nor abortJob runs and the committer staging dir (_temporary / .spark-staging-{jobId}) is leaked. This is the concern @yadavay-amzn already raised as blocking — I'd rate it non-blocking: the leaked dirs are _/.-prefixed (filtered from FileIndex reads) and self-heal on the next overwrite's deleteMatchingPartitions, and preventing the table-path loss is the priority. The clean form is to wrap materialize + the rest of write() in try { ... } catch { case c: Throwable => committer.abortJob(job); throw c }; I confirmed HadoopMapReduceCommitProtocol.abortJob deletes only _temporary/stagingDir, not the output path, so this preserves the fix rather than re-introducing the bug.

Nits (both already noted by @yadavay-amzn):

  • Test: prefer SQLConf.PLANNED_WRITE_ENABLED.key over the "spark.sql.optimizer.plannedWrite.enabled" string literal.
  • The new comment could note why there's no abortJob protection at the new setupJob site (a leaked temp dir is acceptable vs. losing the table path).

Verification

Independently confirmed the reorder is safe:

  • setupJob does not read spark.sql.sources.writeJobUUID (set on the job config at line 174, after setupJob) — its jobId comes from the ctor arg / createJobID, so moving it earlier introduces no ordering dependency.
  • setupJob is still invoked exactly once: removed from writeAndCommit (the shared path for both executeWrite overloads) and added once in write().
  • The other caller, FileStreamSink, doesn't delete-before-write, so the earlier setupJob is benign there.

Since @yadavay-amzn's review already covers every point, I'm not adding duplicate inline comments.

@shrirangmhalgi shrirangmhalgi force-pushed the SPARK-56919-setupJob-before-materialize branch from 65ae865 to 92d3b62 Compare June 15, 2026 06:00
@shrirangmhalgi

Copy link
Copy Markdown
Contributor Author

Thanks @yadavay-amzn and @cloud-fan for the reviews! Addressed all points in the latest commit:

  • Wrapped post-setupJob in try { ... } catch { committer.abortJob(job); throw } so staging dirs are cleaned up if materializeAdaptiveSparkPlan throws
  • Replaced string literal with SQLConf.PLANNED_WRITE_ENABLED.key
  • Added comment explaining why a leaked staging dir is acceptable (dot/underscore-prefixed, filtered from reads, self-heals on next overwrite)

@cloud-fan cloud-fan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 addressed, 0 remaining, 4 new. (4 = 4 newly introduced, 0 late catches.)

0 blocking, 3 non-blocking, 1 nit.
Correct, minimal fix for a real data-loss bug; the prior round's concerns are resolved. The new items are all consequences of the latest commit's additions - none blocks merge.

Design / architecture (1)

  • FileFormatWriter.scala:208: outer catch double-calls abortJob on write/commit failure (writeAndCommit already aborts) - see inline

Correctness (1)

  • FileFormatWriter.scala:153: "leaked staging dir is acceptable" comment is stale - the added abortJob cleans it up - see inline

Suggestions (1)

  • FileFormatWriter.scala:158: try body not re-indented (optional; diff-size tradeoff) - see inline

Nits: 1 minor item (see inline comments).

Verification

Confirmed the bug and fix by code trace: InsertIntoHadoopFsRelationCommand eagerly deletes the output path (deleteMatchingPartitions -> fs.delete, line 136) before FileFormatWriter.write; pre-fix, a materializeAdaptiveSparkPlan throw skipped setupJob, so the dir was never recreated. Moving setupJob earlier recreates it before materialize can throw, and setupJob doesn't read writeJobUUID (set later at line 178), so the reorder is safe. HadoopMapReduceCommitProtocol.abortJob deletes only _temporary/staging, not the output path, so the new catch preserves the fix.

PR description suggestions

  • Document: the try/catch + abortJob cleanup added in the latest commit. The "What changes were proposed" section still describes only the original one-line setupJob move; the abort-on-failure handling (and that abortJob now runs when materializeAdaptiveSparkPlan throws) is a substantive part of the change and is undocumented.

}
} catch {
case cause: Throwable =>
committer.abortJob(job)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeAndCommit already calls abortJob and rethrows on a write- or commit-failure, and both executeWrite calls sit inside this new try - so on those paths abortJob now runs twice (it was once before this PR). It's harmless for the built-in committers (their abortJob is idempotent), but it's a behavior change to the FileCommitProtocol.abortJob contract, and a non-idempotent custom committer could double-clean. The materialize-failure path you're fixing only hits this outer catch, so it's unaffected. Simplest single-abort form: drop writeAndCommit's now-redundant try/catch and move its logError here.

Comment on lines +153 to +155
// A leaked staging dir (_temporary / .spark-staging-*) is acceptable vs. losing the table
// path — these dirs are dot/underscore-prefixed (filtered from reads) and self-heal on the
// next overwrite.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rationale is now stale: the try/catch you added below calls abortJob on any failure after setupJob, and HadoopMapReduceCommitProtocol.abortJob deletes _temporary / .spark-staging-* - so the staging dir isn't actually leaked on the materialize-failure path. The only leak window is setupJob itself throwing (it's outside the try). The rewrite below also drops the non-ASCII em-dash that would otherwise fail scalastyle.

Suggested change
// A leaked staging dir (_temporary / .spark-staging-*) is acceptable vs. losing the table
// path — these dirs are dot/underscore-prefixed (filtered from reads) and self-heal on the
// next overwrite.
// setupJob is outside the try below because it only initializes the job; the try/catch
// calls abortJob on any later failure (e.g. materialize throwing), which cleans up the
// staging dir (_temporary / .spark-staging-*).

// next overwrite.
committer.setupJob(job)

try {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: the body here isn't re-indented, so def materializeAdaptiveSparkPlan and the locals read as method-level rather than inside the try. Re-indenting ~47 lines bloats the diff, so leaving it is a defensible call - your judgment on the project norm.

…to prevent table path loss

INSERT OVERWRITE deletes the output path before calling FileFormatWriter.write(). If materializeAdaptiveSparkPlan throws (e.g., AQE shuffle stage failure), setupJob inside writeAndCommit is never reached, leaving the path permanently deleted.

Move committer.setupJob(job) to before the materializeAdaptiveSparkPlan call so the output path is recreated regardless of whether AQE succeeds.

Closes SPARK-56919'
@shrirangmhalgi shrirangmhalgi force-pushed the SPARK-56919-setupJob-before-materialize branch from 92d3b62 to e4736d8 Compare June 18, 2026 15:48
@shrirangmhalgi

Copy link
Copy Markdown
Contributor Author

Thanks @cloud-fan! for the review. Addressed all items in the latest commit:

  • Single abortJob in outer catch only (removed writeAndCommit's inner try/catch to avoid double-call)
  • Updated stale comment per your suggestion
  • Moved logError to outer catch
  • Re-indented the try body for cleaner read
  • Updated PR description to document the abort-on-failure handling

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants