AssociatedNameStrategy support for the Odyssey initiative - CLI#3373
Open
Cynthia Qin (cqin-confluent) wants to merge 8 commits into
Open
AssociatedNameStrategy support for the Odyssey initiative - CLI#3373Cynthia Qin (cqin-confluent) wants to merge 8 commits into
Cynthia Qin (cqin-confluent) wants to merge 8 commits into
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
This PR aims to add AssociatedNameStrategy support to the CLI’s Schema Registry serde flows by propagating a Kafka cluster ID into serializer/deserializer initialization and (for produce) resolving the SR subject via the associations API.
Changes:
- Extend serde provider interfaces to accept
kafkaClusterIdand thread it through produce/consume paths. - Configure JSON/Protobuf serializers & deserializers to use
serde.AssociatedNameStrategyTypewhen a Kafka cluster ID is available. - Add helpers to create an SR client and resolve a subject via SR associations (with fallback).
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/serdes/serdes.go | Updates provider interfaces; adds SR client helper and subject resolution via associations. |
| pkg/serdes/protobuf_serialization_provider.go | Enables AssociatedNameStrategy for Protobuf serialization when Kafka cluster ID is provided. |
| pkg/serdes/protobuf_deserialization_provider.go | Enables AssociatedNameStrategy for Protobuf deserialization when Kafka cluster ID is provided. |
| pkg/serdes/json_serialization_provider.go | Enables AssociatedNameStrategy for JSON Schema serialization when Kafka cluster ID is provided. |
| pkg/serdes/json_deserialization_provider.go | Enables AssociatedNameStrategy for JSON Schema deserialization when Kafka cluster ID is provided. |
| pkg/serdes/integer_serialization_provider.go | Adjusts InitSerializer signature to match updated interface. |
| pkg/serdes/integer_deserialization_provider.go | Adjusts InitDeserializer signature to match updated interface. |
| pkg/serdes/double_serialization_provider.go | Adjusts InitSerializer signature to match updated interface. |
| pkg/serdes/double_deserialization_provider.go | Adjusts InitDeserializer signature to match updated interface. |
| internal/kafka/confluent_kafka.go | Threads Kafka cluster ID into deserializer initialization during consume. |
| internal/kafka/command_topic_produce.go | Passes Kafka cluster ID to serializers and resolves subject via associations for registration/lookup. |
| internal/kafka/command_topic_consume.go | Captures Kafka cluster ID in the consume handler for downstream deserializer initialization. |
| internal/asyncapi/command_export.go | Updates deserializer init callsite for the new signature (passes empty Kafka cluster ID). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
70
to
72
| type SerializationProvider interface { | ||
| InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error | ||
| InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error | ||
| LoadSchema(string, map[string]string) error |
Comment on lines
79
to
81
| type DeserializationProvider interface { | ||
| InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error | ||
| InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error | ||
| LoadSchema(string, string, serde.Type, *kafka.Message) error |
Comment on lines
+168
to
+170
| // returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id | ||
| // as resource namespace. Falls backt o default TopicNameStrategy (<topic>-<mode>) if unmatched. | ||
| func ResolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string { |
Comment on lines
+175
to
+177
| associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1) | ||
| if err != nil || len(associations) == 0 { | ||
| return fallback |
Comment on lines
+524
to
+529
| // Resolve subject via SR associations, fall back to TopicNameStrategy on miss. | ||
| subject := topicNameStrategy(topic, mode) | ||
| if kafkaClusterId != "" && srEndpoint != "" { | ||
| if client, err := serdes.NewSchemaRegistryClient(srEndpoint, srClusterId, srAuth); err == nil { | ||
| subject = serdes.ResolveSubject(client, kafkaClusterId, topic, mode) | ||
| } |
Comment on lines
79
to
81
| type DeserializationProvider interface { | ||
| InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error | ||
| InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error | ||
| LoadSchema(string, string, serde.Type, *kafka.Message) error |
Comment on lines
+168
to
+170
| // returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id | ||
| // as resource namespace. Falls backt o default TopicNameStrategy (<topic>-<mode>) if unmatched. | ||
| func ResolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string { |
Comment on lines
+175
to
+177
| associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1) | ||
| if err != nil || len(associations) == 0 { | ||
| return fallback |
Comment on lines
+524
to
+529
| // Resolve subject via SR associations, fall back to TopicNameStrategy on miss. | ||
| subject := topicNameStrategy(topic, mode) | ||
| if kafkaClusterId != "" && srEndpoint != "" { | ||
| if client, err := serdes.NewSchemaRegistryClient(srEndpoint, srClusterId, srAuth); err == nil { | ||
| subject = serdes.ResolveSubject(client, kafkaClusterId, topic, mode) | ||
| } |
Comment on lines
+170
to
+176
| func ResolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string { | ||
| fallback := topic + "-" + mode | ||
| if kafkaClusterId == "" || client == nil { | ||
| return fallback | ||
| } | ||
| associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1) | ||
| if err != nil || len(associations) == 0 { |
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


Release Notes
Breaking Changes
New Features
Bug Fixes
Checklist
Whatsection below whether this PR applies to Confluent Cloud, Confluent Platform, or both.Test & Reviewsection below.Blast Radiussection below.What
Blast Radius
References
Test & Review