feat: release tokio runtime on driver/executor exit#4734
Conversation
b23f457 to
fb1f23c
Compare
mbutrovich
left a comment
There was a problem hiding this comment.
Thanks for tracking this down, the diagnosis is correct. The runtime was a
static, statics are never dropped, and the tokio workers stay attached to the
JVM as non-daemon threads for their whole lifetime, so the JVM cannot exit. The
Mutex<Option<Runtime>> + take() change is the right fix for that leak, and
the iceberg-rust side looks right too since iceberg only borrows our runtime
handle and never spawns threads of its own.
Two things I would like to see before merge. First, a note on why
shutdown_background() over a bounded shutdown_timeout, since the background
variant detaches the workers asynchronously after we return. Second, a short
comment on the double-release in local mode where both plugins share one runtime.
The one larger question is whether plugin shutdown() is enough on its own. It
only runs on a clean SparkContext.stop(), so a JVM that exits without stopping
the context would still hang. Attaching the workers as daemon threads would make
the fix robust to that case. That could be a follow-up if you would rather keep
this PR focused on the teardown path.
| /// Must not be called from within the runtime's own worker threads, otherwise the shutdown | ||
| /// would deadlock/panic. | ||
| pub fn release_runtime() { | ||
| let runtime = TOKIO_RUNTIME.lock().take(); |
There was a problem hiding this comment.
Nice catch on the root cause. The runtime lived in a static, and Rust never
drops statics, so the worker threads never exited. Moving to
Mutex<Option<Runtime>> + take() is what finally lets the runtime drop. That
is the real fix.
One question on shutdown_background(). It signals the workers and returns
without joining them, so the JVM-exit only unblocks asynchronously as each
worker runs its thread-local detach on the way out. Did you consider
shutdown_timeout(...) instead? It would make teardown deterministic, and the
plugin shutdown thread is not latency sensitive. Not blocking, just curious
about the rationale so we can capture it in the doc comment.
| } | ||
|
|
||
| override def shutdown(): Unit = { | ||
| logInfo("CometExecutorPlugin shutdown") |
There was a problem hiding this comment.
In local mode both plugins live in the same JVM and share the one native
runtime, so both shutdown() paths call NativeBase.release(). It works
because release_runtime() does take(), which makes the second call a no-op.
Could we add a short comment noting that the double release is expected and
safe? It is not obvious when reading either plugin on its own. The executor
plugin shuts down before the driver plugin in local mode, so the executor wins
the take().
| @@ -118,7 +117,7 @@ use std::sync::OnceLock; | |||
| #[cfg(feature = "jemalloc")] | |||
| use tikv_jemalloc_ctl::{epoch, stats}; | |||
|
|
|||
There was a problem hiding this comment.
docs/source/contributor-guide/development.md is now stale in two spots and
could be updated separately once this lands:
- Line 87 calls the runtime a
Lazy<Runtime>static. It isOnceLocktoday and
becomesMutex<Option<Runtime>>here, torn down on plugin shutdown. - Line 60 says the
AttachGuarddetaches the thread when dropped. The
attachment is actually cached in thread-local storage and only released when
the worker thread exits. That detail is exactly why the runtime has to be shut
down for the JVM to exit, so it is worth correcting.
Which issue does this PR close?
Closes #4725
Rationale for this change
The tokio runtime worker threads attach to non-daemon JVM threads, and these threads are not detached or shut down, which prevents the JVM from exiting.
What changes are included in this PR?
How are these changes tested?
The first is this commit , which ran successfully. The second is last commit of main branch, , which hung.
https://github.com/wForget/benchmarks-spark-native/actions/runs/28219472994/job/83600457105
https://github.com/wForget/benchmarks-spark-native/actions/runs/28220915819/job/83601883415