|
|
@@ -1,4 +1,4 @@
|
|
|
-// Copyright 2024 the Kilo authors
|
|
|
+// Copyright 2026 the Kilo authors
|
|
|
//
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
@@ -40,11 +40,17 @@ type sharedInformerFactory struct {
|
|
|
lock sync.Mutex
|
|
|
defaultResync time.Duration
|
|
|
customResync map[reflect.Type]time.Duration
|
|
|
+ transform cache.TransformFunc
|
|
|
|
|
|
informers map[reflect.Type]cache.SharedIndexInformer
|
|
|
// startedInformers is used for tracking which informers have been started.
|
|
|
// This allows Start() to be called multiple times safely.
|
|
|
startedInformers map[reflect.Type]bool
|
|
|
+ // wg tracks how many goroutines were started.
|
|
|
+ wg sync.WaitGroup
|
|
|
+ // shuttingDown is true when Shutdown has been called. It may still be running
|
|
|
+ // because it needs to wait for goroutines.
|
|
|
+ shuttingDown bool
|
|
|
}
|
|
|
|
|
|
// WithCustomResyncConfig sets a custom resync period for the specified informer types.
|
|
|
@@ -73,6 +79,14 @@ func WithNamespace(namespace string) SharedInformerOption {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// WithTransform sets a transform on all informers.
|
|
|
+func WithTransform(transform cache.TransformFunc) SharedInformerOption {
|
|
|
+ return func(factory *sharedInformerFactory) *sharedInformerFactory {
|
|
|
+ factory.transform = transform
|
|
|
+ return factory
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces.
|
|
|
func NewSharedInformerFactory(client versioned.Interface, defaultResync time.Duration) SharedInformerFactory {
|
|
|
return NewSharedInformerFactoryWithOptions(client, defaultResync)
|
|
|
@@ -105,20 +119,39 @@ func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResy
|
|
|
return factory
|
|
|
}
|
|
|
|
|
|
-// Start initializes all requested informers.
|
|
|
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
|
|
|
f.lock.Lock()
|
|
|
defer f.lock.Unlock()
|
|
|
|
|
|
+ if f.shuttingDown {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
for informerType, informer := range f.informers {
|
|
|
if !f.startedInformers[informerType] {
|
|
|
- go informer.Run(stopCh)
|
|
|
+ f.wg.Add(1)
|
|
|
+ // We need a new variable in each loop iteration,
|
|
|
+ // otherwise the goroutine would use the loop variable
|
|
|
+ // and that keeps changing.
|
|
|
+ informer := informer
|
|
|
+ go func() {
|
|
|
+ defer f.wg.Done()
|
|
|
+ informer.Run(stopCh)
|
|
|
+ }()
|
|
|
f.startedInformers[informerType] = true
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// WaitForCacheSync waits for all started informers' cache were synced.
|
|
|
+func (f *sharedInformerFactory) Shutdown() {
|
|
|
+ f.lock.Lock()
|
|
|
+ f.shuttingDown = true
|
|
|
+ f.lock.Unlock()
|
|
|
+
|
|
|
+ // Will return immediately if there is nothing to wait for.
|
|
|
+ f.wg.Wait()
|
|
|
+}
|
|
|
+
|
|
|
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
|
|
|
informers := func() map[reflect.Type]cache.SharedIndexInformer {
|
|
|
f.lock.Lock()
|
|
|
@@ -140,7 +173,7 @@ func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[ref
|
|
|
return res
|
|
|
}
|
|
|
|
|
|
-// InternalInformerFor returns the SharedIndexInformer for obj using an internal
|
|
|
+// InformerFor returns the SharedIndexInformer for obj using an internal
|
|
|
// client.
|
|
|
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
|
|
|
f.lock.Lock()
|
|
|
@@ -158,6 +191,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
|
|
|
}
|
|
|
|
|
|
informer = newFunc(f.client, resyncPeriod)
|
|
|
+ informer.SetTransform(f.transform)
|
|
|
f.informers[informerType] = informer
|
|
|
|
|
|
return informer
|
|
|
@@ -165,11 +199,58 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
|
|
|
|
|
|
// SharedInformerFactory provides shared informers for resources in all known
|
|
|
// API group versions.
|
|
|
+//
|
|
|
+// It is typically used like this:
|
|
|
+//
|
|
|
+// ctx, cancel := context.Background()
|
|
|
+// defer cancel()
|
|
|
+// factory := NewSharedInformerFactory(client, resyncPeriod)
|
|
|
+// defer factory.WaitForStop() // Returns immediately if nothing was started.
|
|
|
+// genericInformer := factory.ForResource(resource)
|
|
|
+// typedInformer := factory.SomeAPIGroup().V1().SomeType()
|
|
|
+// factory.Start(ctx.Done()) // Start processing these informers.
|
|
|
+// synced := factory.WaitForCacheSync(ctx.Done())
|
|
|
+// for v, ok := range synced {
|
|
|
+// if !ok {
|
|
|
+// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v)
|
|
|
+// return
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// // Creating informers can also be created after Start, but then
|
|
|
+// // Start must be called again:
|
|
|
+// anotherGenericInformer := factory.ForResource(resource)
|
|
|
+// factory.Start(ctx.Done())
|
|
|
type SharedInformerFactory interface {
|
|
|
internalinterfaces.SharedInformerFactory
|
|
|
- ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
|
|
|
+
|
|
|
+ // Start initializes all requested informers. They are handled in goroutines
|
|
|
+ // which run until the stop channel gets closed.
|
|
|
+ Start(stopCh <-chan struct{})
|
|
|
+
|
|
|
+ // Shutdown marks a factory as shutting down. At that point no new
|
|
|
+ // informers can be started anymore and Start will return without
|
|
|
+ // doing anything.
|
|
|
+ //
|
|
|
+ // In addition, Shutdown blocks until all goroutines have terminated. For that
|
|
|
+ // to happen, the close channel(s) that they were started with must be closed,
|
|
|
+ // either before Shutdown gets called or while it is waiting.
|
|
|
+ //
|
|
|
+ // Shutdown may be called multiple times, even concurrently. All such calls will
|
|
|
+ // block until all goroutines have terminated.
|
|
|
+ Shutdown()
|
|
|
+
|
|
|
+ // WaitForCacheSync blocks until all started informers' caches were synced
|
|
|
+ // or the stop channel gets closed.
|
|
|
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
|
|
|
|
|
|
+ // ForResource gives generic access to a shared informer of the matching type.
|
|
|
+ ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
|
|
|
+
|
|
|
+ // InformerFor returns the SharedIndexInformer for obj using an internal
|
|
|
+ // client.
|
|
|
+ InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer
|
|
|
+
|
|
|
Kilo() kilo.Interface
|
|
|
}
|
|
|
|