store.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package clustercache
  2. import (
  3. "sync"
  4. v1 "k8s.io/api/core/v1"
  5. "k8s.io/apimachinery/pkg/fields"
  6. "k8s.io/apimachinery/pkg/types"
  7. "k8s.io/client-go/rest"
  8. "k8s.io/client-go/tools/cache"
  9. rt "k8s.io/apimachinery/pkg/runtime"
  10. )
  11. // GenericStore is a generic store implementation. It converts objects to a different type using a transform function.
  12. // The main purpose is to reduce a memory footprint by storing only the necessary data.
  13. type GenericStore[Input UIDGetter, Output any] struct {
  14. mutex sync.RWMutex
  15. items map[types.UID]Output
  16. transformFunc func(input Input) Output
  17. // storing this cyclic reflector allows us to defer watching
  18. reflector *cache.Reflector
  19. onInit func()
  20. }
  21. // NewGenericStore creates a new instance of GenericStore.
  22. func NewGenericStore[Input UIDGetter, Output any](transformFunc func(input Input) Output) *GenericStore[Input, Output] {
  23. return &GenericStore[Input, Output]{
  24. items: make(map[types.UID]Output),
  25. transformFunc: transformFunc,
  26. }
  27. }
  28. type UIDGetter interface {
  29. GetUID() types.UID
  30. }
  31. func CreateStore[Input UIDGetter, Output any](
  32. restClient rest.Interface,
  33. resource string,
  34. transformFunc func(input Input) Output,
  35. ) *GenericStore[Input, Output] {
  36. lw := cache.NewListWatchFromClient(restClient, resource, v1.NamespaceAll, fields.Everything())
  37. store := NewGenericStore(transformFunc)
  38. var zeroValue Input
  39. store.reflector = cache.NewReflector(lw, zeroValue, store, 0)
  40. return store
  41. }
  42. func (s *GenericStore[Input, Output]) Watch(stopCh <-chan struct{}, onInit func()) {
  43. s.onInit = onInit
  44. // reflector.Run() will eventually call Replace() on the store with the initial contents
  45. // of the resource list. we'll call onInit after that happens the _first_ time
  46. go s.reflector.Run(stopCh)
  47. }
  48. // Add inserts an object into the store.
  49. func (s *GenericStore[Input, Output]) Add(obj any) error {
  50. return s.Update(obj)
  51. }
  52. // Update updates the existing entry in the store.
  53. func (s *GenericStore[Input, Output]) Update(obj any) error {
  54. s.mutex.Lock()
  55. defer s.mutex.Unlock()
  56. item := obj.(Input)
  57. s.items[item.GetUID()] = s.transformFunc(item)
  58. return nil
  59. }
  60. // Delete removes an object from the store.
  61. func (s *GenericStore[Input, Output]) Delete(obj any) error {
  62. s.mutex.Lock()
  63. defer s.mutex.Unlock()
  64. item := obj.(Input)
  65. delete(s.items, item.GetUID())
  66. return nil
  67. }
  68. // GetAll returns all stored objects.
  69. func (s *GenericStore[Input, Output]) GetAll() []Output {
  70. s.mutex.RLock()
  71. defer s.mutex.RUnlock()
  72. // Deep copy the stored items to ensure that callers do not modify
  73. // the original objects in the store.
  74. allItems := make([]Output, 0, len(s.items))
  75. for _, item := range s.items {
  76. if deepCopyable, ok := any(item).(rt.Object); ok {
  77. allItems = append(allItems, deepCopyable.DeepCopyObject().(Output))
  78. }
  79. }
  80. return allItems
  81. }
  82. // Replace replaces the current list of items in the store.
  83. func (s *GenericStore[Input, Output]) Replace(list []any, _ string) error {
  84. s.mutex.Lock()
  85. s.items = make(map[types.UID]Output, len(list))
  86. s.mutex.Unlock()
  87. for _, o := range list {
  88. err := s.Add(o)
  89. if err != nil {
  90. return err
  91. }
  92. }
  93. // call onInit after the initial list has been processed
  94. if s.onInit != nil {
  95. s.onInit()
  96. s.onInit = nil
  97. }
  98. return nil
  99. }
  100. // Stubs to satisfy the cache.Store interface
  101. func (s *GenericStore[Input, Output]) List() []interface{} {
  102. return nil
  103. }
  104. func (s *GenericStore[Input, Output]) ListKeys() []string {
  105. return nil
  106. }
  107. func (s *GenericStore[Input, Output]) Get(_ interface{}) (item interface{}, exists bool, err error) {
  108. return nil, false, nil
  109. }
  110. func (s *GenericStore[Input, Output]) GetByKey(_ string) (item interface{}, exists bool, err error) {
  111. return nil, false, nil
  112. }
  113. func (s *GenericStore[Input, Output]) Resync() error {
  114. return nil
  115. }