repository.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package metric
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  8. )
  9. // MetricRepository is an MetricUpdater which applies calls to update to all resolutions being tracked. It holds the
  10. // MetricStore instances for each resolution.
  11. type MetricRepository struct {
  12. lock sync.Mutex
  13. resolutionStores map[string]*resolutionStores
  14. }
  15. func NewMetricRepository(
  16. resolutions []*util.Resolution,
  17. storeFactory MetricStoreFactory,
  18. ) *MetricRepository {
  19. resoluationCollectors := make(map[string]*resolutionStores)
  20. var limitResolution *util.Resolution
  21. for _, resolution := range resolutions {
  22. if limitResolution == nil || resolution.Limit().Before(limitResolution.Limit()) {
  23. limitResolution = resolution
  24. }
  25. resCollector, err := newResolutionStores(resolution, storeFactory)
  26. if err != nil {
  27. log.Errorf("NewMetricRepository: failed to init resolution metric: %s", err.Error())
  28. continue
  29. }
  30. resoluationCollectors[resolution.Interval()] = resCollector
  31. }
  32. return &MetricRepository{
  33. resolutionStores: resoluationCollectors,
  34. }
  35. }
  36. func (r *MetricRepository) GetCollector(interval string, t time.Time) (MetricStore, error) {
  37. r.lock.Lock()
  38. defer r.lock.Unlock()
  39. resCollector, ok := r.resolutionStores[interval]
  40. if !ok {
  41. return nil, fmt.Errorf("failed to find resolution for key %s", interval)
  42. }
  43. return resCollector.getCollector(t)
  44. }
  45. // Update calls Update on the collectors for each resolution
  46. func (r *MetricRepository) Update(
  47. updateSet *UpdateSet,
  48. ) {
  49. r.lock.Lock()
  50. defer r.lock.Unlock()
  51. if updateSet == nil {
  52. return
  53. }
  54. // Call update on the collectors for each resolution
  55. for _, resCollector := range r.resolutionStores {
  56. resCollector.update(updateSet)
  57. }
  58. }
  59. func (r *MetricRepository) Coverage() map[string][]time.Time {
  60. r.lock.Lock()
  61. defer r.lock.Unlock()
  62. result := make(map[string][]time.Time)
  63. for resKey, resCollector := range r.resolutionStores {
  64. var windowStarts []time.Time
  65. for _, key := range resCollector.getKeys() {
  66. windowStarts = append(windowStarts, time.UnixMilli(key).UTC())
  67. }
  68. result[resKey] = windowStarts
  69. }
  70. return result
  71. }
  72. // resolutionStores is a grouping of a resolution and the instances of MetricStore that it is used to manage
  73. type resolutionStores struct {
  74. lock sync.Mutex
  75. resolution *util.Resolution
  76. collectors map[int64]MetricStore
  77. factory func() MetricStore
  78. }
  79. func newResolutionStores(resolution *util.Resolution, factory MetricStoreFactory) (*resolutionStores, error) {
  80. resCol := &resolutionStores{
  81. resolution: resolution,
  82. collectors: map[int64]MetricStore{},
  83. factory: factory,
  84. }
  85. // Start loop which will remove expired MetricStore
  86. go func() {
  87. for {
  88. time.Sleep(resCol.resolution.Next().Sub(time.Now().UTC()))
  89. resCol.clean()
  90. }
  91. }()
  92. return resCol, nil
  93. }
  94. func (r *resolutionStores) clean() {
  95. r.lock.Lock()
  96. defer r.lock.Unlock()
  97. limitKey := r.resolution.Limit().UnixMilli()
  98. for key := range r.collectors {
  99. if key < limitKey {
  100. delete(r.collectors, key)
  101. }
  102. }
  103. }
  104. func (r *resolutionStores) update(
  105. updateSet *UpdateSet,
  106. ) {
  107. r.lock.Lock()
  108. defer r.lock.Unlock()
  109. limit := r.resolution.Limit()
  110. if updateSet.Timestamp.Before(limit) {
  111. log.Debugf(
  112. "skipping update on resolution '%s' because Timestamp '%s' is before the limit '%s",
  113. r.resolution.Interval(),
  114. updateSet.Timestamp.Format(time.RFC3339),
  115. limit.Format(time.RFC3339),
  116. )
  117. return
  118. }
  119. resolutionStart := r.resolution.Get(updateSet.Timestamp)
  120. key := resolutionStart.UnixMilli()
  121. collector, ok := r.collectors[key]
  122. if !ok {
  123. collector = r.factory()
  124. r.collectors[key] = collector
  125. }
  126. for _, update := range updateSet.Updates {
  127. collector.Update(update.Name, update.Labels, update.Value, updateSet.Timestamp, update.AdditionalInfo)
  128. }
  129. // check if update needs to be applied to previous collector, because some aggregators are inclusive
  130. if resolutionStart.Equal(updateSet.Timestamp) {
  131. prevKey := r.resolution.Get(updateSet.Timestamp.Add(-1)).UnixMilli()
  132. if prevCollector, ok := r.collectors[prevKey]; ok {
  133. for _, update := range updateSet.Updates {
  134. prevCollector.Update(update.Name, update.Labels, update.Value, updateSet.Timestamp, update.AdditionalInfo)
  135. }
  136. }
  137. }
  138. }
  139. func (r *resolutionStores) getCollector(t time.Time) (MetricStore, error) {
  140. r.lock.Lock()
  141. defer r.lock.Unlock()
  142. if t.Before(r.resolution.Limit()) {
  143. return nil, fmt.Errorf(
  144. "request for metric at time '%s' for resolution '%s' is past limit of '%s'",
  145. t.Format(time.RFC3339),
  146. r.resolution.Interval(),
  147. r.resolution.Limit().Format(time.RFC3339),
  148. )
  149. }
  150. key := r.resolution.Get(t).UnixMilli()
  151. collector, ok := r.collectors[key]
  152. if !ok {
  153. return nil, fmt.Errorf("failed to find MetricCollector for interval '%s' for time '%s'", r.resolution.Interval(), t.Format(time.RFC3339))
  154. }
  155. return collector, nil
  156. }
  157. func (r *resolutionStores) getKeys() []int64 {
  158. r.lock.Lock()
  159. defer r.lock.Unlock()
  160. var keys []int64
  161. for key := range r.collectors {
  162. keys = append(keys, key)
  163. }
  164. return keys
  165. }