store.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package clustercache
  2. import (
  3. "context"
  4. "sync"
  5. v1 "k8s.io/api/core/v1"
  6. "k8s.io/apimachinery/pkg/fields"
  7. "k8s.io/apimachinery/pkg/types"
  8. "k8s.io/client-go/rest"
  9. "k8s.io/client-go/tools/cache"
  10. )
  11. // GenericStore is a generic store implementation. It converts objects to a different type using a transform function.
  12. // The main purpose is to reduce a memory footprint by storing only the necessary data.
  13. type GenericStore[Input UIDGetter, Output any] struct {
  14. mutex sync.RWMutex
  15. items map[types.UID]Output
  16. transformFunc func(input Input) Output
  17. }
  18. // NewGenericStore creates a new instance of GenericStore.
  19. func NewGenericStore[Input UIDGetter, Output any](transformFunc func(input Input) Output) *GenericStore[Input, Output] {
  20. return &GenericStore[Input, Output]{
  21. items: make(map[types.UID]Output),
  22. transformFunc: transformFunc,
  23. }
  24. }
  25. type UIDGetter interface {
  26. GetUID() types.UID
  27. }
  28. func CreateStoreAndWatch[Input UIDGetter, Output any](
  29. ctx context.Context,
  30. restClient rest.Interface,
  31. resource string,
  32. transformFunc func(input Input) Output,
  33. ) *GenericStore[Input, Output] {
  34. lw := cache.NewListWatchFromClient(restClient, resource, v1.NamespaceAll, fields.Everything())
  35. store := NewGenericStore(transformFunc)
  36. var zeroValue Input
  37. reflector := cache.NewReflector(lw, zeroValue, store, 0)
  38. go reflector.Run(ctx.Done())
  39. return store
  40. }
  41. // Add inserts an object into the store.
  42. func (s *GenericStore[Input, Output]) Add(obj any) error {
  43. return s.Update(obj)
  44. }
  45. // Update updates the existing entry in the store.
  46. func (s *GenericStore[Input, Output]) Update(obj any) error {
  47. s.mutex.Lock()
  48. defer s.mutex.Unlock()
  49. item := obj.(Input)
  50. s.items[item.GetUID()] = s.transformFunc(item)
  51. return nil
  52. }
  53. // Delete removes an object from the store.
  54. func (s *GenericStore[Input, Output]) Delete(obj any) error {
  55. s.mutex.Lock()
  56. defer s.mutex.Unlock()
  57. item := obj.(Input)
  58. delete(s.items, item.GetUID())
  59. return nil
  60. }
  61. // GetAll returns all stored objects.
  62. func (s *GenericStore[Input, Output]) GetAll() []Output {
  63. s.mutex.RLock()
  64. defer s.mutex.RUnlock()
  65. allItems := make([]Output, 0, len(s.items))
  66. for _, item := range s.items {
  67. allItems = append(allItems, item)
  68. }
  69. return allItems
  70. }
  71. // Replace replaces the current list of items in the store.
  72. func (s *GenericStore[Input, Output]) Replace(list []any, _ string) error {
  73. s.mutex.Lock()
  74. s.items = make(map[types.UID]Output, len(list))
  75. s.mutex.Unlock()
  76. for _, o := range list {
  77. err := s.Add(o)
  78. if err != nil {
  79. return err
  80. }
  81. }
  82. return nil
  83. }
  84. // Stubs to satisfy the cache.Store interface
  85. func (s *GenericStore[Input, Output]) List() []interface{} { return nil }
  86. func (s *GenericStore[Input, Output]) ListKeys() []string { return nil }
  87. func (s *GenericStore[Input, Output]) Get(_ interface{}) (item interface{}, exists bool, err error) {
  88. return nil, false, nil
  89. }
  90. func (s *GenericStore[Input, Output]) GetByKey(_ string) (item interface{}, exists bool, err error) {
  91. return nil, false, nil
  92. }
  93. func (s *GenericStore[Input, Output]) Resync() error { return nil }