2
0

store.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package clustercache
  2. import (
  3. "sync"
  4. v1 "k8s.io/api/core/v1"
  5. "k8s.io/apimachinery/pkg/fields"
  6. "k8s.io/apimachinery/pkg/types"
  7. "k8s.io/client-go/rest"
  8. "k8s.io/client-go/tools/cache"
  9. )
  10. // GenericStore is a generic store implementation. It converts objects to a different type using a transform function.
  11. // The main purpose is to reduce a memory footprint by storing only the necessary data.
  12. type GenericStore[Input UIDGetter, Output any] struct {
  13. mutex sync.RWMutex
  14. items map[types.UID]Output
  15. transformFunc func(input Input) Output
  16. // storing this cyclic reflector allows us to defer watching
  17. reflector *cache.Reflector
  18. onInit func()
  19. }
  20. // NewGenericStore creates a new instance of GenericStore.
  21. func NewGenericStore[Input UIDGetter, Output any](transformFunc func(input Input) Output) *GenericStore[Input, Output] {
  22. return &GenericStore[Input, Output]{
  23. items: make(map[types.UID]Output),
  24. transformFunc: transformFunc,
  25. }
  26. }
  27. type UIDGetter interface {
  28. GetUID() types.UID
  29. }
  30. func CreateStore[Input UIDGetter, Output any](
  31. restClient rest.Interface,
  32. resource string,
  33. transformFunc func(input Input) Output,
  34. ) *GenericStore[Input, Output] {
  35. lw := cache.NewListWatchFromClient(restClient, resource, v1.NamespaceAll, fields.Everything())
  36. store := NewGenericStore(transformFunc)
  37. var zeroValue Input
  38. store.reflector = cache.NewReflector(lw, zeroValue, store, 0)
  39. return store
  40. }
  41. func (s *GenericStore[Input, Output]) Watch(stopCh <-chan struct{}, onInit func()) {
  42. s.onInit = onInit
  43. // reflector.Run() will eventually call Replace() on the store with the initial contents
  44. // of the resource list. we'll call onInit after that happens the _first_ time
  45. go s.reflector.Run(stopCh)
  46. }
  47. // Add inserts an object into the store.
  48. func (s *GenericStore[Input, Output]) Add(obj any) error {
  49. return s.Update(obj)
  50. }
  51. // Update updates the existing entry in the store.
  52. func (s *GenericStore[Input, Output]) Update(obj any) error {
  53. s.mutex.Lock()
  54. defer s.mutex.Unlock()
  55. item := obj.(Input)
  56. s.items[item.GetUID()] = s.transformFunc(item)
  57. return nil
  58. }
  59. // Delete removes an object from the store.
  60. func (s *GenericStore[Input, Output]) Delete(obj any) error {
  61. s.mutex.Lock()
  62. defer s.mutex.Unlock()
  63. item := obj.(Input)
  64. delete(s.items, item.GetUID())
  65. return nil
  66. }
  67. // GetAll returns all stored objects.
  68. func (s *GenericStore[Input, Output]) GetAll() []Output {
  69. s.mutex.RLock()
  70. defer s.mutex.RUnlock()
  71. allItems := make([]Output, 0, len(s.items))
  72. for _, item := range s.items {
  73. allItems = append(allItems, item)
  74. }
  75. return allItems
  76. }
  77. // Replace replaces the current list of items in the store.
  78. func (s *GenericStore[Input, Output]) Replace(list []any, _ string) error {
  79. s.mutex.Lock()
  80. s.items = make(map[types.UID]Output, len(list))
  81. s.mutex.Unlock()
  82. for _, o := range list {
  83. err := s.Add(o)
  84. if err != nil {
  85. return err
  86. }
  87. }
  88. // call onInit after the initial list has been processed
  89. if s.onInit != nil {
  90. s.onInit()
  91. s.onInit = nil
  92. }
  93. return nil
  94. }
  95. // Stubs to satisfy the cache.Store interface
  96. func (s *GenericStore[Input, Output]) List() []interface{} {
  97. return nil
  98. }
  99. func (s *GenericStore[Input, Output]) ListKeys() []string {
  100. return nil
  101. }
  102. func (s *GenericStore[Input, Output]) Get(_ interface{}) (item interface{}, exists bool, err error) {
  103. return nil, false, nil
  104. }
  105. func (s *GenericStore[Input, Output]) GetByKey(_ string) (item interface{}, exists bool, err error) {
  106. return nil, false, nil
  107. }
  108. func (s *GenericStore[Input, Output]) Resync() error {
  109. return nil
  110. }