cachegroup.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package cache
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/kubecost/opencost/pkg/util/interval"
  7. "golang.org/x/sync/singleflight"
  8. )
  9. // cacheEntry contains a T item and the time it was added to the cache
  10. type cacheEntry[T comparable] struct {
  11. item T
  12. ts time.Time
  13. }
  14. // CacheGroup provides single flighting for grouping repeated calls for the same workload, as well
  15. // as a cache that extends the lifetime of the returned result by a specific duration.
  16. type CacheGroup[T comparable] struct {
  17. lock sync.Mutex
  18. cache map[string]*cacheEntry[T]
  19. group singleflight.Group
  20. expirationLock sync.Mutex
  21. expirationRunner *interval.IntervalRunner
  22. expiry time.Duration
  23. max int
  24. }
  25. // NewCacheGroup[T] creates a new cache group instance given the max number of keys to cache.
  26. // If a new cache entry is added that exceeds the maximum, the oldest entry is evicted.
  27. func NewCacheGroup[T comparable](max int) *CacheGroup[T] {
  28. return &CacheGroup[T]{
  29. cache: make(map[string]*cacheEntry[T]),
  30. max: max,
  31. }
  32. }
  33. // Do accepts a group key and a factory function to execute a workload request. Any executions
  34. // of Do() using an identical key will wait on the originating request rather than executing a
  35. // new request, and the final result will be shared among any callers sharing the same key.
  36. // Additionally, once returned, the workload for that key will remained cached. An expiration
  37. // policy can be added for this cache by calling the WithExpiration method.
  38. func (cg *CacheGroup[T]) Do(key string, factory func() (T, error)) (T, error) {
  39. // Check cache for existing data using the group key
  40. cg.lock.Lock()
  41. if result, ok := cg.cache[key]; ok {
  42. cg.lock.Unlock()
  43. return result.item, nil
  44. }
  45. cg.lock.Unlock()
  46. // single flight the group using the group key
  47. item, err, _ := cg.group.Do(key, func() (any, error) {
  48. i, err := factory()
  49. if err != nil {
  50. return nil, err
  51. }
  52. // assign cache once a result for the group key is returned
  53. cg.lock.Lock()
  54. cg.removeOldestBeyondCapacity()
  55. cg.cache[key] = &cacheEntry[T]{
  56. item: i,
  57. ts: time.Now(),
  58. }
  59. cg.lock.Unlock()
  60. return i, nil
  61. })
  62. if err != nil {
  63. return defaultValue[T](), err
  64. }
  65. tItem, ok := item.(T)
  66. if !ok {
  67. return defaultValue[T](), fmt.Errorf("Failed to convert single flight result")
  68. }
  69. return tItem, nil
  70. }
  71. // WithExpiration assigns a cache expiration to cached entries, and starts an eviction process,
  72. // which runs on the specified interval.
  73. func (cg *CacheGroup[T]) WithExpiration(expiry time.Duration, evictionInterval time.Duration) *CacheGroup[T] {
  74. cg.expirationLock.Lock()
  75. defer cg.expirationLock.Unlock()
  76. if cg.expirationRunner == nil {
  77. cg.expirationRunner = interval.NewIntervalRunner(func() {
  78. cg.lock.Lock()
  79. defer cg.lock.Unlock()
  80. cg.removeExpired()
  81. }, evictionInterval)
  82. }
  83. if cg.expirationRunner.Start() {
  84. cg.expiry = expiry
  85. }
  86. return cg
  87. }
  88. // DisableExpiration will shutdown the expiration process which allows cache entries to remain until 'max' is
  89. // exceeded.
  90. func (cg *CacheGroup[T]) DisableExpiration() {
  91. cg.expirationLock.Lock()
  92. defer cg.expirationLock.Unlock()
  93. if cg.expirationRunner == nil {
  94. cg.expirationRunner.Stop()
  95. cg.expirationRunner = nil
  96. }
  97. }
  98. // locates the oldest entry and removes it from the map. caller should lock
  99. // prior to calling
  100. func (cg *CacheGroup[T]) removeOldestBeyondCapacity() {
  101. // only remove the oldest entries if we're at max capacity
  102. if len(cg.cache) < cg.max {
  103. return
  104. }
  105. oldest := time.Now()
  106. oldestKey := ""
  107. for k, v := range cg.cache {
  108. if v.ts.Before(oldest) {
  109. oldest = v.ts
  110. oldestKey = k
  111. }
  112. }
  113. delete(cg.cache, oldestKey)
  114. }
  115. // removes any entries that have expired from the map. caller should lock prior
  116. // to calling
  117. func (cg *CacheGroup[T]) removeExpired() {
  118. if len(cg.cache) == 0 {
  119. return
  120. }
  121. now := time.Now()
  122. for k, v := range cg.cache {
  123. if now.Sub(v.ts) >= cg.expiry {
  124. delete(cg.cache, k)
  125. }
  126. }
  127. }
  128. // default value helper function to returns the initialized value for a T instance
  129. // (ie: for value types, typically the 0 value. For pointer types, nil)
  130. func defaultValue[T any]() T {
  131. var t T
  132. return t
  133. }