From dd2a997ff0454351578773a9265eea5d9809787e Mon Sep 17 00:00:00 2001 From: Yoni Romm Date: Wed, 3 Jun 2026 10:41:02 +0300 Subject: [PATCH] feat: support multi secret namespace watch --- injection/context.go | 30 +- injection/context_test.go | 34 ++ injection/multinamespace/factory.go | 226 +++++++++++ injection/multinamespace/factory_test.go | 483 +++++++++++++++++++++++ injection/multinamespace/informer.go | 375 ++++++++++++++++++ injection/multinamespace/inject.go | 70 ++++ 6 files changed, 1217 insertions(+), 1 deletion(-) create mode 100644 injection/multinamespace/factory.go create mode 100644 injection/multinamespace/factory_test.go create mode 100644 injection/multinamespace/informer.go create mode 100644 injection/multinamespace/inject.go diff --git a/injection/context.go b/injection/context.go index af8bf22a9d..1ed4c35727 100644 --- a/injection/context.go +++ b/injection/context.go @@ -22,10 +22,14 @@ import ( "k8s.io/client-go/rest" ) -// nsKey is the key that namespaces are associated with on +// nsKey is the key that a single namespace is associated with on // contexts returned by WithNamespaceScope. type nsKey struct{} +// nsScopesKey is the key that multiple namespaces are associated with on +// contexts returned by WithNamespaceScopes. +type nsScopesKey struct{} + // WithNamespaceScope associates a namespace scoping with the // provided context, which will scope the informers produced // by the downstream informer factories. @@ -50,6 +54,30 @@ func GetNamespaceScope(ctx context.Context) string { return value.(string) } +// WithNamespaceScopes associates a multi-namespace allowlist with the +// provided context. When non-empty, opt-in informer factory overrides may +// scope specific informer types (for example secrets) to these namespaces. +func WithNamespaceScopes(ctx context.Context, namespaces ...string) context.Context { + if len(namespaces) == 0 { + return ctx + } + return context.WithValue(ctx, nsScopesKey{}, namespaces) +} + +// GetNamespaceScopes returns the multi-namespace allowlist from +// WithNamespaceScopes. When unset, it falls back to a one-element slice from +// GetNamespaceScope when a single namespace scope is present; otherwise nil. +func GetNamespaceScopes(ctx context.Context) []string { + value := ctx.Value(nsScopesKey{}) + if value != nil { + return value.([]string) + } + if ns := GetNamespaceScope(ctx); ns != "" { + return []string{ns} + } + return nil +} + // cfgKey is the key that the config is associated with. type cfgKey struct{} diff --git a/injection/context_test.go b/injection/context_test.go index 8106090cc6..8052208ffe 100644 --- a/injection/context_test.go +++ b/injection/context_test.go @@ -42,6 +42,40 @@ func TestContextNamespace(t *testing.T) { } } +func TestContextNamespaceScopes(t *testing.T) { + ctx := context.Background() + + if got := GetNamespaceScopes(ctx); got != nil { + t.Errorf("GetNamespaceScopes() = %v, wanted nil", got) + } + + want := []string{"ns-a", "ns-b"} + ctx = WithNamespaceScopes(ctx, want...) + + if got := GetNamespaceScopes(ctx); len(got) != len(want) || got[0] != want[0] || got[1] != want[1] { + t.Errorf("GetNamespaceScopes() = %v, wanted %v", got, want) + } +} + +func TestContextNamespaceScopesFallsBackToSingleScope(t *testing.T) { + ctx := WithNamespaceScope(context.Background(), "only-ns") + + got := GetNamespaceScopes(ctx) + if len(got) != 1 || got[0] != "only-ns" { + t.Errorf("GetNamespaceScopes() = %v, wanted [only-ns]", got) + } +} + +func TestContextNamespaceScopesPreferMultiOverSingle(t *testing.T) { + ctx := WithNamespaceScope(context.Background(), "single-ns") + ctx = WithNamespaceScopes(ctx, "multi-a", "multi-b") + + got := GetNamespaceScopes(ctx) + if len(got) != 2 || got[0] != "multi-a" || got[1] != "multi-b" { + t.Errorf("GetNamespaceScopes() = %v, wanted [multi-a multi-b]", got) + } +} + func TestContextConfig(t *testing.T) { ctx := context.Background() diff --git a/injection/multinamespace/factory.go b/injection/multinamespace/factory.go new file mode 100644 index 0000000000..a46b3049f1 --- /dev/null +++ b/injection/multinamespace/factory.go @@ -0,0 +1,226 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multinamespace + +import ( + "reflect" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" + admissionregistration "k8s.io/client-go/informers/admissionregistration" + apiserverinternal "k8s.io/client-go/informers/apiserverinternal" + apps "k8s.io/client-go/informers/apps" + autoscaling "k8s.io/client-go/informers/autoscaling" + batch "k8s.io/client-go/informers/batch" + certificates "k8s.io/client-go/informers/certificates" + coordination "k8s.io/client-go/informers/coordination" + core "k8s.io/client-go/informers/core" + discovery "k8s.io/client-go/informers/discovery" + events "k8s.io/client-go/informers/events" + extensions "k8s.io/client-go/informers/extensions" + flowcontrol "k8s.io/client-go/informers/flowcontrol" + internalinterfaces "k8s.io/client-go/informers/internalinterfaces" + networking "k8s.io/client-go/informers/networking" + node "k8s.io/client-go/informers/node" + policy "k8s.io/client-go/informers/policy" + rbac "k8s.io/client-go/informers/rbac" + resource "k8s.io/client-go/informers/resource" + scheduling "k8s.io/client-go/informers/scheduling" + storage "k8s.io/client-go/informers/storage" + storagemigration "k8s.io/client-go/informers/storagemigration" + kubernetes "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +// scopedFactory is a self-contained SharedInformerFactory. It intercepts only +// the secret informer, replacing it with a merged view over per-namespace +// sub-factories, and delegates every other informer type to an internal default +// factory (cluster-wide, or single-namespace when defaultNamespace is set). +type scopedFactory struct { + defaultFactory informers.SharedInformerFactory + + namespaces []string + subFactories []informers.SharedInformerFactory + + mu sync.Mutex + cachedSecret cache.SharedIndexInformer +} + +// NewScopedFactory creates a scopedFactory that restricts the secret informer +// to the given namespaces. A separate SharedInformerFactory scoped to each +// namespace is created from client with the provided resync period. +// +// defaultNamespace, when non-empty, scopes the internal default factory used +// for non-secret types (matching injection.WithNamespaceScope). When empty, +// non-secret informers are cluster-wide. +func NewScopedFactory( + client kubernetes.Interface, + resync time.Duration, + namespaces []string, + defaultNamespace string, +) informers.SharedInformerFactory { + opts := make([]informers.SharedInformerOption, 0, 1) + if defaultNamespace != "" { + opts = append(opts, informers.WithNamespace(defaultNamespace)) + } + defaultFactory := informers.NewSharedInformerFactoryWithOptions(client, resync, opts...) + + subs := make([]informers.SharedInformerFactory, 0, len(namespaces)) + for _, ns := range namespaces { + subs = append(subs, informers.NewSharedInformerFactoryWithOptions( + client, resync, informers.WithNamespace(ns), + )) + } + return &scopedFactory{ + defaultFactory: defaultFactory, + namespaces: namespaces, + subFactories: subs, + } +} + +func (f *scopedFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { + if _, isSecret := obj.(*corev1.Secret); isSecret { + f.mu.Lock() + defer f.mu.Unlock() + if f.cachedSecret != nil { + return f.cachedSecret + } + nsInformers := make([]cache.SharedIndexInformer, 0, len(f.subFactories)) + for _, sf := range f.subFactories { + nsInformers = append(nsInformers, sf.Core().V1().Secrets().Informer()) + } + f.cachedSecret = newMergedInformer(f.namespaces, nsInformers) + return f.cachedSecret + } + return f.defaultFactory.InformerFor(obj, newFunc) +} + +func (f *scopedFactory) Start(stopCh <-chan struct{}) { + f.defaultFactory.Start(stopCh) + for _, sf := range f.subFactories { + sf.Start(stopCh) + } +} + +func (f *scopedFactory) Shutdown() { + f.defaultFactory.Shutdown() + for _, sf := range f.subFactories { + sf.Shutdown() + } +} + +func (f *scopedFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + result := f.defaultFactory.WaitForCacheSync(stopCh) + for _, sf := range f.subFactories { + for k, v := range sf.WaitForCacheSync(stopCh) { + if existing, ok := result[k]; ok { + result[k] = existing && v + } else { + result[k] = v + } + } + } + return result +} + +func (f *scopedFactory) ForResource(gvr schema.GroupVersionResource) (informers.GenericInformer, error) { + return f.defaultFactory.ForResource(gvr) +} + +func (f *scopedFactory) Core() core.Interface { + return core.New(f, "", nil) +} + +func (f *scopedFactory) Admissionregistration() admissionregistration.Interface { + return f.defaultFactory.Admissionregistration() +} + +func (f *scopedFactory) Internal() apiserverinternal.Interface { + return f.defaultFactory.Internal() +} + +func (f *scopedFactory) Apps() apps.Interface { + return f.defaultFactory.Apps() +} + +func (f *scopedFactory) Autoscaling() autoscaling.Interface { + return f.defaultFactory.Autoscaling() +} + +func (f *scopedFactory) Batch() batch.Interface { + return f.defaultFactory.Batch() +} + +func (f *scopedFactory) Certificates() certificates.Interface { + return f.defaultFactory.Certificates() +} + +func (f *scopedFactory) Coordination() coordination.Interface { + return f.defaultFactory.Coordination() +} + +func (f *scopedFactory) Discovery() discovery.Interface { + return f.defaultFactory.Discovery() +} + +func (f *scopedFactory) Events() events.Interface { + return f.defaultFactory.Events() +} + +func (f *scopedFactory) Extensions() extensions.Interface { + return f.defaultFactory.Extensions() +} + +func (f *scopedFactory) Flowcontrol() flowcontrol.Interface { + return f.defaultFactory.Flowcontrol() +} + +func (f *scopedFactory) Networking() networking.Interface { + return f.defaultFactory.Networking() +} + +func (f *scopedFactory) Node() node.Interface { + return f.defaultFactory.Node() +} + +func (f *scopedFactory) Policy() policy.Interface { + return f.defaultFactory.Policy() +} + +func (f *scopedFactory) Rbac() rbac.Interface { + return f.defaultFactory.Rbac() +} + +func (f *scopedFactory) Resource() resource.Interface { + return f.defaultFactory.Resource() +} + +func (f *scopedFactory) Scheduling() scheduling.Interface { + return f.defaultFactory.Scheduling() +} + +func (f *scopedFactory) Storage() storage.Interface { + return f.defaultFactory.Storage() +} + +func (f *scopedFactory) Storagemigration() storagemigration.Interface { + return f.defaultFactory.Storagemigration() +} diff --git a/injection/multinamespace/factory_test.go b/injection/multinamespace/factory_test.go new file mode 100644 index 0000000000..718707f519 --- /dev/null +++ b/injection/multinamespace/factory_test.go @@ -0,0 +1,483 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multinamespace + +import ( + "context" + "reflect" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + fakekube "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" +) + +func TestInformerForSecretReturnsMergedInformer(t *testing.T) { + client := fakekube.NewSimpleClientset() + namespaces := []string{"ns-a", "ns-b"} + + factory := NewScopedFactory(client, 0, namespaces, "") + + inf1 := factory.InformerFor(&corev1.Secret{}, nil) + inf2 := factory.InformerFor(&corev1.Secret{}, nil) + + if inf1 == nil { + t.Fatal("InformerFor(Secret) returned nil") + } + if inf1 != inf2 { + t.Error("InformerFor(Secret) should return the same cached instance on repeated calls") + } +} + +func TestInformerForNonSecretDelegatesToDefaultFactory(t *testing.T) { + client := fakekube.NewSimpleClientset() + namespaces := []string{"ns-a"} + + factory := NewScopedFactory(client, 0, namespaces, "") + + inf := factory.Core().V1().ConfigMaps().Informer() + if inf == nil { + t.Fatal("Core().V1().ConfigMaps().Informer() returned nil") + } + + secretInf := factory.InformerFor(&corev1.Secret{}, nil) + if inf == secretInf { + t.Error("ConfigMap informer should not be the same as the secret informer") + } +} + +func TestWaitForCacheSyncIncludesSecretType(t *testing.T) { + client := fakekube.NewSimpleClientset() + namespaces := []string{"ns-a"} + + factory := NewScopedFactory(client, 0, namespaces, "") + + factory.InformerFor(&corev1.Secret{}, nil) + + stopCh := make(chan struct{}) + defer close(stopCh) + factory.Start(stopCh) + + result := factory.WaitForCacheSync(stopCh) + secretType := reflect.TypeOf(&corev1.Secret{}) + if _, ok := result[secretType]; !ok { + t.Errorf("WaitForCacheSync result missing *corev1.Secret key; got keys: %v", result) + } +} + +func TestNewScopedFactoryCreatesOneSubFactoryPerNamespace(t *testing.T) { + client := fakekube.NewSimpleClientset() + namespaces := []string{"ns-a", "ns-b", "ns-c"} + + f := NewScopedFactory(client, 0, namespaces, "").(*scopedFactory) + + if len(f.subFactories) != len(namespaces) { + t.Errorf("expected %d sub-factories, got %d", len(namespaces), len(f.subFactories)) + } + if !reflect.DeepEqual(f.namespaces, namespaces) { + t.Errorf("namespaces = %v, want %v", f.namespaces, namespaces) + } + if f.defaultFactory == nil { + t.Error("defaultFactory should be non-nil") + } +} + +func TestScopedFactoryShutdown(t *testing.T) { + client := fakekube.NewSimpleClientset() + factory := NewScopedFactory(client, 0, []string{"ns-a"}, "") + + stopCh := make(chan struct{}) + factory.Start(stopCh) + close(stopCh) + factory.Shutdown() +} + +func TestScopedFactoryResync(t *testing.T) { + client := fakekube.NewSimpleClientset() + resync := 30 * time.Second + + f := NewScopedFactory(client, resync, []string{"ns-a"}, "").(*scopedFactory) + + if len(f.subFactories) != 1 { + t.Fatalf("expected 1 sub-factory, got %d", len(f.subFactories)) + } +} + +func makeSecretInIndexer(t *testing.T, namespace, name string) cache.Indexer { + t.Helper() + idx := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ + cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, + }) + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}, + } + if err := idx.Add(secret); err != nil { + t.Fatalf("failed to add secret to indexer: %v", err) + } + return idx +} + +func TestMergedIndexerList(t *testing.T) { + idxA := makeSecretInIndexer(t, "ns-a", "secret-a") + idxB := makeSecretInIndexer(t, "ns-b", "secret-b") + + m := &mergedIndexer{ + subs: []cache.Indexer{idxA, idxB}, + byNamespace: map[string]cache.Indexer{"ns-a": idxA, "ns-b": idxB}, + } + + items := m.List() + if len(items) != 2 { + t.Errorf("List() returned %d items, want 2", len(items)) + } +} + +func TestMergedIndexerGetByKey(t *testing.T) { + idxA := makeSecretInIndexer(t, "ns-a", "secret-a") + idxB := makeSecretInIndexer(t, "ns-b", "secret-b") + + m := &mergedIndexer{ + subs: []cache.Indexer{idxA, idxB}, + byNamespace: map[string]cache.Indexer{"ns-a": idxA, "ns-b": idxB}, + } + + tests := []struct { + key string + wantFound bool + }{ + {"ns-a/secret-a", true}, + {"ns-b/secret-b", true}, + {"ns-a/secret-b", false}, + {"ns-c/secret-x", false}, + } + + for _, tt := range tests { + t.Run(tt.key, func(t *testing.T) { + obj, found, err := m.GetByKey(tt.key) + if err != nil { + t.Fatalf("GetByKey(%q) error: %v", tt.key, err) + } + if found != tt.wantFound { + t.Errorf("GetByKey(%q) found=%v, want %v", tt.key, found, tt.wantFound) + } + if tt.wantFound && obj == nil { + t.Errorf("GetByKey(%q) returned nil object", tt.key) + } + }) + } +} + +func TestMergedIndexerGet(t *testing.T) { + idxA := makeSecretInIndexer(t, "ns-a", "secret-a") + idxB := makeSecretInIndexer(t, "ns-b", "secret-b") + + m := &mergedIndexer{ + subs: []cache.Indexer{idxA, idxB}, + byNamespace: map[string]cache.Indexer{"ns-a": idxA, "ns-b": idxB}, + } + + secretA := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: "ns-a", Name: "secret-a"}} + _, found, err := m.Get(secretA) + if err != nil || !found { + t.Errorf("Get(secret-a) found=%v, err=%v; want found=true, err=nil", found, err) + } + + missing := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: "ns-a", Name: "missing"}} + _, found, err = m.Get(missing) + if err != nil || found { + t.Errorf("Get(missing) found=%v, err=%v; want found=false, err=nil", found, err) + } +} + +func TestMergedIndexerListKeys(t *testing.T) { + idxA := makeSecretInIndexer(t, "ns-a", "secret-a") + idxB := makeSecretInIndexer(t, "ns-b", "secret-b") + + m := &mergedIndexer{ + subs: []cache.Indexer{idxA, idxB}, + byNamespace: map[string]cache.Indexer{"ns-a": idxA, "ns-b": idxB}, + } + + keys := m.ListKeys() + if len(keys) != 2 { + t.Errorf("ListKeys() returned %d keys, want 2: %v", len(keys), keys) + } +} + +func TestMergedIndexerByIndex(t *testing.T) { + idxA := makeSecretInIndexer(t, "ns-a", "secret-a") + idxB := makeSecretInIndexer(t, "ns-b", "secret-b") + + m := &mergedIndexer{ + subs: []cache.Indexer{idxA, idxB}, + byNamespace: map[string]cache.Indexer{"ns-a": idxA, "ns-b": idxB}, + } + + items, err := m.ByIndex(cache.NamespaceIndex, "ns-a") + if err != nil { + t.Fatalf("ByIndex error: %v", err) + } + if len(items) != 1 { + t.Errorf("ByIndex(ns-a) returned %d items, want 1", len(items)) + } +} + +func TestMergedIndexerWritesPanic(t *testing.T) { + m := &mergedIndexer{} + + assertPanics := func(name string, fn func()) { + t.Helper() + defer func() { + if r := recover(); r == nil { + t.Errorf("%s: expected panic but did not panic", name) + } + }() + fn() + } + + assertPanics("Add", func() { _ = m.Add(nil) }) + assertPanics("Update", func() { _ = m.Update(nil) }) + assertPanics("Delete", func() { _ = m.Delete(nil) }) + assertPanics("Replace", func() { _ = m.Replace(nil, "") }) +} + +type fakeSharedIndexInformer struct { + cache.SharedIndexInformer + synced bool + indexer cache.Indexer + handlers []cache.ResourceEventHandler +} + +func newFakeInformer(synced bool, idx cache.Indexer) *fakeSharedIndexInformer { + return &fakeSharedIndexInformer{synced: synced, indexer: idx} +} + +func (f *fakeSharedIndexInformer) HasSynced() bool { return f.synced } +func (f *fakeSharedIndexInformer) GetIndexer() cache.Indexer { return f.indexer } +func (f *fakeSharedIndexInformer) Run(_ <-chan struct{}) {} +func (f *fakeSharedIndexInformer) RunWithContext(_ context.Context) {} +func (f *fakeSharedIndexInformer) AddEventHandler(h cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + f.handlers = append(f.handlers, h) + return &fakeRegistration{}, nil +} +func (f *fakeSharedIndexInformer) AddEventHandlerWithResyncPeriod(h cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) { + return f.AddEventHandler(h) +} +func (f *fakeSharedIndexInformer) RemoveEventHandler(_ cache.ResourceEventHandlerRegistration) error { + return nil +} +func (f *fakeSharedIndexInformer) AddIndexers(_ cache.Indexers) error { return nil } +func (f *fakeSharedIndexInformer) SetWatchErrorHandler(_ cache.WatchErrorHandler) error { return nil } +func (f *fakeSharedIndexInformer) IsStopped() bool { return false } +func (f *fakeSharedIndexInformer) LastSyncResourceVersion() string { return "" } + +type fakeRegistration struct{} + +func (r *fakeRegistration) HasSynced() bool { return true } + +func TestMergedInformerHasSyncedAllTrue(t *testing.T) { + inf := &mergedInformer{ + subs: []cache.SharedIndexInformer{ + newFakeInformer(true, nil), + newFakeInformer(true, nil), + }, + } + if !inf.HasSynced() { + t.Error("HasSynced() = false, want true when all sub-informers are synced") + } +} + +func TestMergedInformerHasSyncedOneFalse(t *testing.T) { + inf := &mergedInformer{ + subs: []cache.SharedIndexInformer{ + newFakeInformer(true, nil), + newFakeInformer(false, nil), + }, + } + if inf.HasSynced() { + t.Error("HasSynced() = true, want false when any sub-informer is not synced") + } +} + +func TestMergedInformerAddEventHandlerFansOut(t *testing.T) { + subA := newFakeInformer(true, nil) + subB := newFakeInformer(true, nil) + + inf := &mergedInformer{ + subs: []cache.SharedIndexInformer{subA, subB}, + } + + handler := cache.ResourceEventHandlerFuncs{} + reg, err := inf.AddEventHandler(handler) + if err != nil { + t.Fatalf("AddEventHandler error: %v", err) + } + if reg == nil { + t.Fatal("AddEventHandler returned nil registration") + } + if len(subA.handlers) != 1 || len(subB.handlers) != 1 { + t.Errorf("handlers not fanned out: subA=%d subB=%d", len(subA.handlers), len(subB.handlers)) + } +} + +func TestMergedInformerGetIndexer(t *testing.T) { + idxA := makeSecretInIndexer(t, "ns-a", "secret-a") + idxB := makeSecretInIndexer(t, "ns-b", "secret-b") + + inf := &mergedInformer{ + namespaces: []string{"ns-a", "ns-b"}, + subs: []cache.SharedIndexInformer{ + newFakeInformer(true, idxA), + newFakeInformer(true, idxB), + }, + } + + merged := inf.GetIndexer() + if merged == nil { + t.Fatal("GetIndexer() returned nil") + } + + items := merged.List() + if len(items) != 2 { + t.Errorf("GetIndexer().List() = %d items, want 2", len(items)) + } +} + +func TestMergedInformerCacheLookup(t *testing.T) { + idxA := makeSecretInIndexer(t, "ns-a", "auth-secret") + idxB := makeSecretInIndexer(t, "ns-b", "other-secret") + + inf := &mergedInformer{ + namespaces: []string{"ns-a", "ns-b"}, + subs: []cache.SharedIndexInformer{ + newFakeInformer(true, idxA), + newFakeInformer(true, idxB), + }, + } + + indexer := inf.GetIndexer() + + tests := []struct { + key string + wantFound bool + }{ + {"ns-a/auth-secret", true}, + {"ns-b/other-secret", true}, + {"ns-a/other-secret", false}, + {"ns-b/auth-secret", false}, + {"ns-c/auth-secret", false}, + } + + for _, tt := range tests { + t.Run(tt.key, func(t *testing.T) { + obj, found, err := indexer.GetByKey(tt.key) + if err != nil { + t.Fatalf("GetByKey(%q) unexpected error: %v", tt.key, err) + } + if found != tt.wantFound { + t.Errorf("GetByKey(%q): found=%v, want %v", tt.key, found, tt.wantFound) + } + if tt.wantFound && obj == nil { + t.Errorf("GetByKey(%q): found=true but obj is nil", tt.key) + } + }) + } +} + +func TestMergedInformerGetStore(t *testing.T) { + inf := &mergedInformer{ + namespaces: []string{"ns-a"}, + subs: []cache.SharedIndexInformer{ + newFakeInformer(true, makeSecretInIndexer(t, "ns-a", "s")), + }, + } + store := inf.GetStore() + if store == nil { + t.Fatal("GetStore() returned nil") + } +} + +func TestMergedInformerRemoveEventHandler(t *testing.T) { + subA := newFakeInformer(true, nil) + inf := &mergedInformer{subs: []cache.SharedIndexInformer{subA}} + + reg, _ := inf.AddEventHandler(cache.ResourceEventHandlerFuncs{}) + if err := inf.RemoveEventHandler(reg); err != nil { + t.Errorf("RemoveEventHandler error: %v", err) + } +} + +func TestNamespaceFromKey(t *testing.T) { + tests := []struct { + key string + want string + }{ + {"ns-a/secret-a", "ns-a"}, + {"secret-a", ""}, + {"", ""}, + {"ns/name/extra", "ns"}, + } + for _, tt := range tests { + t.Run(tt.key, func(t *testing.T) { + if got := namespaceFromKey(tt.key); got != tt.want { + t.Errorf("namespaceFromKey(%q) = %q, want %q", tt.key, got, tt.want) + } + }) + } +} + +func TestNamespaceOf(t *testing.T) { + secret := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: "ns-a", Name: "s"}} + if got := namespaceOf(secret); got != "ns-a" { + t.Errorf("namespaceOf(secret) = %q, want %q", got, "ns-a") + } + if got := namespaceOf(nil); got != "" { + t.Errorf("namespaceOf(nil) = %q, want empty", got) + } + if got := namespaceOf("not-an-object"); got != "" { + t.Errorf("namespaceOf(string) = %q, want empty", got) + } +} + +func TestSubForNamespace(t *testing.T) { + idxA := makeSecretInIndexer(t, "ns-a", "s") + m := &mergedIndexer{ + subs: []cache.Indexer{idxA}, + byNamespace: map[string]cache.Indexer{"ns-a": idxA}, + } + + if got := m.subForNamespace("ns-a"); got != idxA { + t.Error("subForNamespace(ns-a) did not return the expected indexer") + } + if got := m.subForNamespace("ns-z"); got != nil { + t.Error("subForNamespace(unknown) should return nil") + } + if got := m.subForNamespace(""); got != nil { + t.Error("subForNamespace('') should return nil") + } +} + +var _ cache.SharedIndexInformer = (*fakeSharedIndexInformer)(nil) +var _ informers.SharedInformerFactory = (*scopedFactory)(nil) +var _ cache.Indexer = (*mergedIndexer)(nil) +var _ cache.SharedIndexInformer = (*mergedInformer)(nil) +var _ cache.ResourceEventHandlerRegistration = (*fakeRegistration)(nil) +var _ runtime.Object = (*corev1.Secret)(nil) diff --git a/injection/multinamespace/informer.go b/injection/multinamespace/informer.go new file mode 100644 index 0000000000..838cc03539 --- /dev/null +++ b/injection/multinamespace/informer.go @@ -0,0 +1,375 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multinamespace + +import ( + "context" + "strings" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/tools/cache" +) + +// newMergedInformer returns a SharedIndexInformer that aggregates N +// per-namespace sub-informers into a single logical informer. +// +// namespaces and subInformers are parallel slices: subInformers[i] watches +// namespaces[i]. This association lets the merged indexer route per-namespace +// lookups to the owning sub-indexer in O(1). +// +// Event handlers are fanned out to all sub-informers. The merged indexer +// provides a unified read-only view: writes panic because the sub-informers +// own their own stores. +func newMergedInformer(namespaces []string, subInformers []cache.SharedIndexInformer) cache.SharedIndexInformer { + return &mergedInformer{namespaces: namespaces, subs: subInformers} +} + +// mergedInformer implements cache.SharedIndexInformer by delegating to N +// per-namespace sub-informers. namespaces[i] is the namespace watched by +// subs[i]. +type mergedInformer struct { + namespaces []string + subs []cache.SharedIndexInformer +} + +// multiRegistration fans out RemoveEventHandler to all N registrations. +type multiRegistration struct { + regs []cache.ResourceEventHandlerRegistration +} + +func (m *multiRegistration) HasSynced() bool { + for _, r := range m.regs { + if !r.HasSynced() { + return false + } + } + return true +} + +func (inf *mergedInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + regs := make([]cache.ResourceEventHandlerRegistration, 0, len(inf.subs)) + for _, si := range inf.subs { + reg, err := si.AddEventHandler(handler) + if err != nil { + return nil, err + } + regs = append(regs, reg) + } + return &multiRegistration{regs: regs}, nil +} + +func (inf *mergedInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) (cache.ResourceEventHandlerRegistration, error) { + regs := make([]cache.ResourceEventHandlerRegistration, 0, len(inf.subs)) + for _, si := range inf.subs { + reg, err := si.AddEventHandlerWithResyncPeriod(handler, resyncPeriod) + if err != nil { + return nil, err + } + regs = append(regs, reg) + } + return &multiRegistration{regs: regs}, nil +} + +func (inf *mergedInformer) AddEventHandlerWithOptions(handler cache.ResourceEventHandler, opts cache.HandlerOptions) (cache.ResourceEventHandlerRegistration, error) { + regs := make([]cache.ResourceEventHandlerRegistration, 0, len(inf.subs)) + for _, si := range inf.subs { + reg, err := si.AddEventHandlerWithOptions(handler, opts) + if err != nil { + return nil, err + } + regs = append(regs, reg) + } + return &multiRegistration{regs: regs}, nil +} + +func (inf *mergedInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error { + mr, ok := handle.(*multiRegistration) + if !ok { + for _, si := range inf.subs { + _ = si.RemoveEventHandler(handle) + } + return nil + } + for i, si := range inf.subs { + if i < len(mr.regs) { + if err := si.RemoveEventHandler(mr.regs[i]); err != nil { + return err + } + } + } + return nil +} + +func (inf *mergedInformer) GetStore() cache.Store { + return inf.GetIndexer() +} + +func (inf *mergedInformer) GetController() cache.Controller { + return nil +} + +func (inf *mergedInformer) Run(stopCh <-chan struct{}) { + for _, si := range inf.subs { + go si.Run(stopCh) + } + <-stopCh +} + +func (inf *mergedInformer) RunWithContext(ctx context.Context) { + for _, si := range inf.subs { + go si.RunWithContext(ctx) + } + <-ctx.Done() +} + +func (inf *mergedInformer) HasSynced() bool { + for _, si := range inf.subs { + if !si.HasSynced() { + return false + } + } + return true +} + +func (inf *mergedInformer) LastSyncResourceVersion() string { + if len(inf.subs) > 0 { + return inf.subs[0].LastSyncResourceVersion() + } + return "" +} + +func (inf *mergedInformer) SetWatchErrorHandler(handler cache.WatchErrorHandler) error { + for _, si := range inf.subs { + if err := si.SetWatchErrorHandler(handler); err != nil { + return err + } + } + return nil +} + +func (inf *mergedInformer) SetWatchErrorHandlerWithContext(handler cache.WatchErrorHandlerWithContext) error { + for _, si := range inf.subs { + if err := si.SetWatchErrorHandlerWithContext(handler); err != nil { + return err + } + } + return nil +} + +func (inf *mergedInformer) SetTransform(fn cache.TransformFunc) error { + for _, si := range inf.subs { + if err := si.SetTransform(fn); err != nil { + return err + } + } + return nil +} + +func (inf *mergedInformer) IsStopped() bool { + for _, si := range inf.subs { + if !si.IsStopped() { + return false + } + } + return true +} + +func (inf *mergedInformer) AddIndexers(indexers cache.Indexers) error { + for _, si := range inf.subs { + if err := si.AddIndexers(indexers); err != nil { + return err + } + } + return nil +} + +func (inf *mergedInformer) GetIndexer() cache.Indexer { + indexers := make([]cache.Indexer, 0, len(inf.subs)) + byNamespace := make(map[string]cache.Indexer, len(inf.subs)) + for i, si := range inf.subs { + idx := si.GetIndexer() + indexers = append(indexers, idx) + if i < len(inf.namespaces) { + byNamespace[inf.namespaces[i]] = idx + } + } + return &mergedIndexer{subs: indexers, byNamespace: byNamespace} +} + +// mergedIndexer implements cache.Indexer over N per-namespace sub-indexers. +// It is read-only: the per-namespace informers own their underlying stores. +type mergedIndexer struct { + subs []cache.Indexer + byNamespace map[string]cache.Indexer +} + +func (m *mergedIndexer) Add(obj interface{}) error { + panic("multinamespace: mergedIndexer is read-only") +} + +func (m *mergedIndexer) Update(obj interface{}) error { + panic("multinamespace: mergedIndexer is read-only") +} + +func (m *mergedIndexer) Delete(obj interface{}) error { + panic("multinamespace: mergedIndexer is read-only") +} + +func (m *mergedIndexer) Replace(objs []interface{}, resourceVersion string) error { + panic("multinamespace: mergedIndexer is read-only") +} + +func (m *mergedIndexer) Resync() error { return nil } + +func (m *mergedIndexer) List() []interface{} { + var all []interface{} + for _, s := range m.subs { + all = append(all, s.List()...) + } + return all +} + +func (m *mergedIndexer) ListKeys() []string { + var all []string + for _, s := range m.subs { + all = append(all, s.ListKeys()...) + } + return all +} + +func (m *mergedIndexer) Get(obj interface{}) (interface{}, bool, error) { + ns := namespaceOf(obj) + sub := m.subForNamespace(ns) + if sub != nil { + return sub.Get(obj) + } + for _, s := range m.subs { + item, exists, err := s.Get(obj) + if err != nil || exists { + return item, exists, err + } + } + return nil, false, nil +} + +func (m *mergedIndexer) GetByKey(key string) (interface{}, bool, error) { + ns := namespaceFromKey(key) + sub := m.subForNamespace(ns) + if sub != nil { + return sub.GetByKey(key) + } + for _, s := range m.subs { + item, exists, err := s.GetByKey(key) + if err != nil || exists { + return item, exists, err + } + } + return nil, false, nil +} + +func (m *mergedIndexer) Index(indexName string, obj interface{}) ([]interface{}, error) { + var all []interface{} + for _, s := range m.subs { + items, err := s.Index(indexName, obj) + if err != nil { + return nil, err + } + all = append(all, items...) + } + return all, nil +} + +func (m *mergedIndexer) IndexKeys(indexName, indexedValue string) ([]string, error) { + var all []string + for _, s := range m.subs { + keys, err := s.IndexKeys(indexName, indexedValue) + if err != nil { + return nil, err + } + all = append(all, keys...) + } + return all, nil +} + +func (m *mergedIndexer) ListIndexFuncValues(indexName string) []string { + seen := make(map[string]struct{}) + var all []string + for _, s := range m.subs { + for _, v := range s.ListIndexFuncValues(indexName) { + if _, ok := seen[v]; !ok { + seen[v] = struct{}{} + all = append(all, v) + } + } + } + return all +} + +func (m *mergedIndexer) ByIndex(indexName, indexedValue string) ([]interface{}, error) { + var all []interface{} + for _, s := range m.subs { + items, err := s.ByIndex(indexName, indexedValue) + if err != nil { + return nil, err + } + all = append(all, items...) + } + return all, nil +} + +func (m *mergedIndexer) GetIndexers() cache.Indexers { + if len(m.subs) > 0 { + return m.subs[0].GetIndexers() + } + return cache.Indexers{} +} + +func (m *mergedIndexer) AddIndexers(newIndexers cache.Indexers) error { + for _, s := range m.subs { + if err := s.AddIndexers(newIndexers); err != nil { + return err + } + } + return nil +} + +func (m *mergedIndexer) subForNamespace(ns string) cache.Indexer { + if ns == "" { + return nil + } + return m.byNamespace[ns] +} + +func namespaceOf(obj interface{}) string { + if obj == nil { + return "" + } + acc, err := meta.Accessor(obj) + if err != nil { + return "" + } + return acc.GetNamespace() +} + +func namespaceFromKey(key string) string { + parts := strings.SplitN(key, "/", 2) + if len(parts) == 2 { + return parts[0] + } + return "" +} diff --git a/injection/multinamespace/inject.go b/injection/multinamespace/inject.go new file mode 100644 index 0000000000..5a36a42896 --- /dev/null +++ b/injection/multinamespace/inject.go @@ -0,0 +1,70 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package multinamespace provides an opt-in kube SharedInformerFactory override +// that scopes the secret informer to multiple namespaces while leaving other +// informer types on a self-contained default factory (cluster-wide, or +// single-namespace when injection.WithNamespaceScope is set). +// +// Usage from main(): +// +// import ( +// "knative.dev/pkg/injection" +// "knative.dev/pkg/injection/multinamespace" +// "knative.dev/pkg/injection/sharedmain" +// "knative.dev/pkg/signals" +// ) +// +// func main() { +// ctx := signals.NewContext() +// ctx = injection.WithNamespaceScopes(ctx, "tenant-a", "tenant-b") +// multinamespace.RegisterScopeOverride(10 * time.Minute) +// sharedmain.MainWithContext(ctx, "mycomponent", controllers...) +// } +// +// RegisterScopeOverride must be called after the generated kube factory is +// linked (typically from main before sharedmain) so its injector runs last and +// overwrites kubefactory.Key{} when len(injection.GetNamespaceScopes(ctx)) > 1. +package multinamespace + +import ( + "context" + "time" + + kubeclient "knative.dev/pkg/client/injection/kube/client" + kubefactory "knative.dev/pkg/client/injection/kube/informers/factory" + "knative.dev/pkg/injection" +) + +// RegisterScopeOverride registers an InformerFactoryInjector with +// injection.Default that, when more than one namespace scope is set on the +// context, replaces the kube SharedInformerFactory with a self-contained +// scopedFactory (secrets merged across namespaces; other types unchanged). +func RegisterScopeOverride(resync time.Duration) { + injection.Default.RegisterInformerFactory(func(ctx context.Context) context.Context { + namespaces := injection.GetNamespaceScopes(ctx) + if len(namespaces) <= 1 { + return ctx + } + scoped := NewScopedFactory( + kubeclient.Get(ctx), + resync, + namespaces, + injection.GetNamespaceScope(ctx), + ) + return context.WithValue(ctx, kubefactory.Key{}, scoped) + }) +}