repository.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package metric
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  8. )
  9. // MetricRepository is an MetricUpdater which applies calls to update to all resolutions being tracked. It holds the
  10. // MetricStore instances for each resolution.
  11. type MetricRepository struct {
  12. lock sync.Mutex
  13. resolutionStores map[string]*resolutionStores
  14. }
  15. func NewMetricRepository(
  16. resolutions []*util.Resolution,
  17. storeFactory MetricStoreFactory,
  18. ) *MetricRepository {
  19. resoluationCollectors := make(map[string]*resolutionStores)
  20. var limitResolution *util.Resolution
  21. for _, resolution := range resolutions {
  22. if limitResolution == nil || resolution.Limit().Before(limitResolution.Limit()) {
  23. limitResolution = resolution
  24. }
  25. resCollector, err := newResolutionStores(resolution, storeFactory)
  26. if err != nil {
  27. log.Errorf("NewMetricRepository: failed to init resolution metric: %s", err.Error())
  28. continue
  29. }
  30. resoluationCollectors[resolution.Interval()] = resCollector
  31. }
  32. return &MetricRepository{
  33. resolutionStores: resoluationCollectors,
  34. }
  35. }
  36. func (r *MetricRepository) GetCollector(interval string, t time.Time) (MetricStore, error) {
  37. r.lock.Lock()
  38. defer r.lock.Unlock()
  39. resCollector, ok := r.resolutionStores[interval]
  40. if !ok {
  41. return nil, fmt.Errorf("failed to find resolution for key %s", interval)
  42. }
  43. return resCollector.getCollector(t)
  44. }
  45. // Update calls Update on the collectors for each resolution
  46. func (r *MetricRepository) Update(
  47. updateSet *UpdateSet,
  48. ) {
  49. r.lock.Lock()
  50. defer r.lock.Unlock()
  51. if updateSet == nil {
  52. return
  53. }
  54. // Call update on the collectors for each resolution
  55. for _, resCollector := range r.resolutionStores {
  56. resCollector.update(updateSet)
  57. }
  58. }
  59. func (r *MetricRepository) Coverage() map[string][]time.Time {
  60. r.lock.Lock()
  61. defer r.lock.Unlock()
  62. result := make(map[string][]time.Time)
  63. for resKey, resCollector := range r.resolutionStores {
  64. var windowStarts []time.Time
  65. for _, key := range resCollector.getKeys() {
  66. windowStarts = append(windowStarts, time.UnixMilli(key).UTC())
  67. }
  68. result[resKey] = windowStarts
  69. }
  70. return result
  71. }
  72. // resolutionStores is a grouping of a resolution and the instances of MetricStore that it is used to manage
  73. type resolutionStores struct {
  74. lock sync.Mutex
  75. resolution *util.Resolution
  76. collectors map[int64]MetricStore
  77. factory func() MetricStore
  78. }
  79. func newResolutionStores(resolution *util.Resolution, factory MetricStoreFactory) (*resolutionStores, error) {
  80. resCol := &resolutionStores{
  81. resolution: resolution,
  82. collectors: map[int64]MetricStore{},
  83. factory: factory,
  84. }
  85. // Start loop which will remove expired MetricStore
  86. go func() {
  87. for {
  88. time.Sleep(resCol.resolution.Next().Sub(time.Now().UTC()))
  89. resCol.clean()
  90. }
  91. }()
  92. return resCol, nil
  93. }
  94. func (r *resolutionStores) clean() {
  95. r.lock.Lock()
  96. defer r.lock.Unlock()
  97. limitKey := r.resolution.Limit().UnixMilli()
  98. for key := range r.collectors {
  99. if key < limitKey {
  100. delete(r.collectors, key)
  101. }
  102. }
  103. }
  104. func (r *resolutionStores) update(
  105. updateSet *UpdateSet,
  106. ) {
  107. r.lock.Lock()
  108. defer r.lock.Unlock()
  109. limit := r.resolution.Limit()
  110. if updateSet.Timestamp.Before(limit) {
  111. return
  112. }
  113. resolutionStart := r.resolution.Get(updateSet.Timestamp)
  114. key := resolutionStart.UnixMilli()
  115. collector, ok := r.collectors[key]
  116. if !ok {
  117. collector = r.factory()
  118. r.collectors[key] = collector
  119. }
  120. for _, update := range updateSet.Updates {
  121. collector.Update(update.Name, update.Labels, update.Value, updateSet.Timestamp, update.AdditionalInfo)
  122. }
  123. // check if update needs to be applied to previous collector, because some aggregators are inclusive
  124. if resolutionStart.Equal(updateSet.Timestamp) {
  125. prevKey := r.resolution.Get(updateSet.Timestamp.Add(-1)).UnixMilli()
  126. if prevCollector, ok := r.collectors[prevKey]; ok {
  127. for _, update := range updateSet.Updates {
  128. prevCollector.Update(update.Name, update.Labels, update.Value, updateSet.Timestamp, update.AdditionalInfo)
  129. }
  130. }
  131. }
  132. }
  133. func (r *resolutionStores) getCollector(t time.Time) (MetricStore, error) {
  134. r.lock.Lock()
  135. defer r.lock.Unlock()
  136. if t.Before(r.resolution.Limit()) {
  137. return nil, fmt.Errorf(
  138. "request for metric at time '%s' for resolution '%s' is past limit of '%s'",
  139. t.Format(time.RFC3339),
  140. r.resolution.Interval(),
  141. r.resolution.Limit().Format(time.RFC3339),
  142. )
  143. }
  144. key := r.resolution.Get(t).UnixMilli()
  145. collector, ok := r.collectors[key]
  146. if !ok {
  147. return nil, fmt.Errorf("failed to find MetricCollector for interval '%s' for time '%s'", r.resolution.Interval(), t.Format(time.RFC3339))
  148. }
  149. return collector, nil
  150. }
  151. func (r *resolutionStores) getKeys() []int64 {
  152. r.lock.Lock()
  153. defer r.lock.Unlock()
  154. var keys []int64
  155. for key := range r.collectors {
  156. keys = append(keys, key)
  157. }
  158. return keys
  159. }