| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- package clustercache
- import (
- "context"
- "sync"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/cache"
- )
- // GenericStore is a generic store implementation. It converts objects to a different type using a transform function.
- // The main purpose is to reduce a memory footprint by storing only the necessary data.
- type GenericStore[Input UIDGetter, Output any] struct {
- mutex sync.RWMutex
- items map[types.UID]Output
- transformFunc func(input Input) Output
- }
- // NewGenericStore creates a new instance of GenericStore.
- func NewGenericStore[Input UIDGetter, Output any](transformFunc func(input Input) Output) *GenericStore[Input, Output] {
- return &GenericStore[Input, Output]{
- items: make(map[types.UID]Output),
- transformFunc: transformFunc,
- }
- }
- type UIDGetter interface {
- GetUID() types.UID
- }
- func CreateStoreAndWatch[Input UIDGetter, Output any](
- ctx context.Context,
- restClient rest.Interface,
- resource string,
- transformFunc func(input Input) Output,
- ) *GenericStore[Input, Output] {
- lw := cache.NewListWatchFromClient(restClient, resource, v1.NamespaceAll, fields.Everything())
- store := NewGenericStore(transformFunc)
- var zeroValue Input
- reflector := cache.NewReflector(lw, zeroValue, store, 0)
- go reflector.Run(ctx.Done())
- return store
- }
- // Add inserts an object into the store.
- func (s *GenericStore[Input, Output]) Add(obj any) error {
- return s.Update(obj)
- }
- // Update updates the existing entry in the store.
- func (s *GenericStore[Input, Output]) Update(obj any) error {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- item := obj.(Input)
- s.items[item.GetUID()] = s.transformFunc(item)
- return nil
- }
- // Delete removes an object from the store.
- func (s *GenericStore[Input, Output]) Delete(obj any) error {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- item := obj.(Input)
- delete(s.items, item.GetUID())
- return nil
- }
- // GetAll returns all stored objects.
- func (s *GenericStore[Input, Output]) GetAll() []Output {
- s.mutex.RLock()
- defer s.mutex.RUnlock()
- allItems := make([]Output, 0, len(s.items))
- for _, item := range s.items {
- allItems = append(allItems, item)
- }
- return allItems
- }
- // Replace replaces the current list of items in the store.
- func (s *GenericStore[Input, Output]) Replace(list []any, _ string) error {
- s.mutex.Lock()
- s.items = make(map[types.UID]Output, len(list))
- s.mutex.Unlock()
- for _, o := range list {
- err := s.Add(o)
- if err != nil {
- return err
- }
- }
- return nil
- }
- // Stubs to satisfy the cache.Store interface
- func (s *GenericStore[Input, Output]) List() []interface{} { return nil }
- func (s *GenericStore[Input, Output]) ListKeys() []string { return nil }
- func (s *GenericStore[Input, Output]) Get(_ interface{}) (item interface{}, exists bool, err error) {
- return nil, false, nil
- }
- func (s *GenericStore[Input, Output]) GetByKey(_ string) (item interface{}, exists bool, err error) {
- return nil, false, nil
- }
- func (s *GenericStore[Input, Output]) Resync() error { return nil }
|