Skip to content

Kafka Connect: make sink cleanup robust#16843

Open
hkwi wants to merge 2 commits into
apache:mainfrom
hkwi:codex/iceberg-connect-cleanup
Open

Kafka Connect: make sink cleanup robust#16843
hkwi wants to merge 2 commits into
apache:mainfrom
hkwi:codex/iceberg-connect-cleanup

Conversation

@hkwi

@hkwi hkwi commented Jun 17, 2026

Copy link
Copy Markdown

Summary

This makes Iceberg Kafka Connect sink cleanup more robust when shutdown happens after an internal Kafka client or coordinator operation has already failed.

The change keeps the original failure visible to Kafka Connect, but it no longer lets that first failure skip the remaining cleanup steps. In particular:

  • close the sink catalog even when committer shutdown throws
  • close channel producer and consumer independently
  • wait for the coordinator thread to finish after termination
  • continue coordinator shutdown and offset seek after CommitterImpl.close(...) fails while checking leader ownership
  • remove an unused AdminClient from Channel, avoiding one extra internal Kafka client per worker/coordinator channel
  • close Worker channel resources and SinkWriter independently

Report

We observed a connector deletion path where the REST delete succeeded, but task shutdown then failed in CommitterImpl.close(...):

DELETE /connectors/<connector> HTTP/1.1 204
WorkerSinkTask{id=<connector>-0} After being scheduled for shutdown, task threw an uncaught exception
ConnectException: Cannot retrieve members for consumer group: <connector>-iceberg
  at KafkaUtils.consumerGroupDescription
  at CommitterImpl.hasLeaderPartition
  at CommitterImpl.close
  at IcebergSinkTask.close
Caused by: GroupAuthorizationException

After that, the connector was gone from the REST API, but internal Kafka clients using the connector-derived client id continued logging authentication/metadata errors. The practical failure mode is similar to the "zombie coordinator" class of bugs: a shutdown path hits an exception and leaves internal resources alive longer than intended.

Related Issues And PRs

This is related to, but not identical to, the existing Kafka Connect cleanup reports:

  • iceberg-kafka-connect zombie coordinator thread when writing to S3 fails #16016 reports a zombie coordinator thread after S3 write failures. The common point is that a sink task failure can leave coordinator-side resources operating after the task/catalog lifecycle has moved on. The difference is that this PR covers shutdown triggered by connector/task close, where leader detection fails during CommitterImpl.close(...).
  • Fix zombie coordinator thread when S3 writes fail #16020 proposes joining the coordinator thread after termination and closing the catalog in a finally block. This PR includes the same cleanup direction, and also continues cleanup across worker/channel close failures and the hasLeaderPartition(...) failure path.
  • Kafka Connect: Prevent zombie coordinator during rebalance #16156 addresses a rebalance overlap where the previous coordinator is not fully stopped before another coordinator can start. This PR also makes coordinator shutdown synchronous, but the motivating failure here is connector deletion/task close with authorization failure during leader lookup.
  • Kafka Connect: Handle no coordinator and data loss in ICR mode #12372 fixed no-coordinator/data-loss behavior around incremental cooperative rebalancing. That is why this PR does not silently swallow leader lookup failures: the original error is still propagated after cleanup, so real coordinator-election problems remain visible.
  • Improve coordinator election logging in Iceberg Kafka Connect Sink #12610 discusses better diagnostics when the configured Iceberg connect group does not match the actual Kafka Connect consumer group. This PR does not change that behavior; it only prevents cleanup from being skipped once such a lookup fails.

I did not find an existing Apache Iceberg issue that specifically reports the connector-delete + GroupAuthorizationException/Cannot retrieve members for consumer group shutdown path.

Tests

./gradlew :iceberg-kafka-connect:iceberg-kafka-connect:test \
  --tests 'org.apache.iceberg.connect.channel.TestCommitterImpl' \
  --tests 'org.apache.iceberg.connect.channel.TestCoordinatorThread' \
  --tests 'org.apache.iceberg.connect.channel.TestWorker'

git diff --check


// Normal close: if leader partition is lost, stop coordinator.
if (hasLeaderPartition(closedPartitions)) {
boolean stopCoordinator = false;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This local boolean stopCoordinator shares its name with the stopCoordinator() method called a few lines below, so the branch reads as if (stopCoordinator) { ... stopCoordinator(); }. Rename the flag to a predicate such as shouldStopCoordinator (or leaderPartitionLost) to separate the condition from the action it triggers.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing this out. I renamed the local flag to shouldStopCoordinator in c852c17 so the predicate is clearly separated from the stopCoordinator() action.

}
}

private RuntimeException appendFailure(RuntimeException failure, RuntimeException next) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appendFailure (set-as-primary if none yet, else addSuppressed) is the same merge logic hand-inlined in the catch blocks of Channel.stop(), Worker.stop(), and CoordinatorThread.terminate() (two branches). All four classes live in org.apache.iceberg.connect.channel; consider promoting one package-private static helper and calling it from each so the suppressed-exception handling stays in one place.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, that makes sense. I kept the follow-up small and moved the merge logic into a package-private static helper on the existing Channel class in c852c17, then reused it from CommitterImpl, Channel, Worker, and CoordinatorThread. That keeps the suppression handling in one place without adding another helper class.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants