repository.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package metric
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  8. )
  9. // MetricRepository is an MetricUpdater which applies calls to update to all resolutions being tracked. It holds the
  10. // MetricStore instances for each resolution.
  11. type MetricRepository struct {
  12. lock sync.Mutex
  13. resolutionStores map[string]*resolutionStores
  14. }
  15. func NewMetricRepository(
  16. resolutions []*util.Resolution,
  17. storeFactory MetricStoreFactory,
  18. ) *MetricRepository {
  19. resoluationCollectors := make(map[string]*resolutionStores)
  20. var limitResolution *util.Resolution
  21. for _, resolution := range resolutions {
  22. if limitResolution == nil || resolution.Limit().Before(limitResolution.Limit()) {
  23. limitResolution = resolution
  24. }
  25. resCollector, err := newResolutionStores(resolution, storeFactory)
  26. if err != nil {
  27. log.Errorf("NewMetricRepository: failed to init resolution metric: %s", err.Error())
  28. continue
  29. }
  30. resoluationCollectors[resolution.Interval()] = resCollector
  31. }
  32. return &MetricRepository{
  33. resolutionStores: resoluationCollectors,
  34. }
  35. }
  36. func (r *MetricRepository) GetCollector(interval string, t time.Time) (MetricStore, error) {
  37. r.lock.Lock()
  38. defer r.lock.Unlock()
  39. resCollector, ok := r.resolutionStores[interval]
  40. if !ok {
  41. return nil, fmt.Errorf("failed to find resolution for key %s", interval)
  42. }
  43. return resCollector.getCollector(t)
  44. }
  45. // Update calls Update on the collectors for each resolution
  46. func (r *MetricRepository) Update(
  47. updates []Update,
  48. timestamp time.Time,
  49. ) {
  50. r.lock.Lock()
  51. defer r.lock.Unlock()
  52. for _, update := range updates {
  53. // Call update on the collectors for each resolution
  54. for _, resCollector := range r.resolutionStores {
  55. resCollector.update(update.Name, update.Labels, update.Value, timestamp, update.AdditionalInfo)
  56. }
  57. }
  58. }
  59. func (r *MetricRepository) Coverage() map[string][]time.Time {
  60. r.lock.Lock()
  61. defer r.lock.Unlock()
  62. result := make(map[string][]time.Time)
  63. for resKey, resCollector := range r.resolutionStores {
  64. var windowStarts []time.Time
  65. for _, key := range resCollector.getKeys() {
  66. windowStarts = append(windowStarts, time.Unix(key, 0).UTC())
  67. }
  68. result[resKey] = windowStarts
  69. }
  70. return result
  71. }
  72. // resolutionStores is a grouping of a resolution and the instances of MetricStore that it is used to manage
  73. type resolutionStores struct {
  74. lock sync.Mutex
  75. resolution *util.Resolution
  76. collectors map[int64]MetricStore
  77. factory func() MetricStore
  78. }
  79. func newResolutionStores(resolution *util.Resolution, factory MetricStoreFactory) (*resolutionStores, error) {
  80. resCol := &resolutionStores{
  81. resolution: resolution,
  82. collectors: map[int64]MetricStore{},
  83. factory: factory,
  84. }
  85. // Start loop which will remove expired MetricStore
  86. go func() {
  87. for {
  88. time.Sleep(resCol.resolution.Next().Sub(time.Now().UTC()))
  89. resCol.clean()
  90. }
  91. }()
  92. return resCol, nil
  93. }
  94. func (r *resolutionStores) clean() {
  95. r.lock.Lock()
  96. defer r.lock.Unlock()
  97. limitKey := r.resolution.Limit().UnixMilli()
  98. for key := range r.collectors {
  99. if key < limitKey {
  100. delete(r.collectors, key)
  101. }
  102. }
  103. }
  104. func (r *resolutionStores) update(
  105. metricName string,
  106. labels map[string]string,
  107. value float64,
  108. timestamp time.Time,
  109. additionalInformation map[string]string,
  110. ) {
  111. r.lock.Lock()
  112. defer r.lock.Unlock()
  113. limit := r.resolution.Limit()
  114. if timestamp.Before(limit) {
  115. log.Debugf(
  116. "failed to call update on resolution '%s' because Timestamp '%s' is before the limit '%s",
  117. r.resolution.Interval(),
  118. timestamp.Format(time.RFC3339),
  119. limit.Format(time.RFC3339),
  120. )
  121. return
  122. }
  123. key := r.resolution.Get(timestamp).UnixMilli()
  124. collector, ok := r.collectors[key]
  125. if !ok {
  126. collector = r.factory()
  127. r.collectors[key] = collector
  128. }
  129. collector.Update(metricName, labels, value, timestamp, additionalInformation)
  130. }
  131. func (r *resolutionStores) getCollector(t time.Time) (MetricStore, error) {
  132. r.lock.Lock()
  133. defer r.lock.Unlock()
  134. if t.Before(r.resolution.Limit()) {
  135. return nil, fmt.Errorf(
  136. "request for metric at time '%s' for resolution '%s' is past limit of '%s'",
  137. t.Format(time.RFC3339),
  138. r.resolution.Interval(),
  139. r.resolution.Limit().Format(time.RFC3339),
  140. )
  141. }
  142. key := r.resolution.Get(t).UnixMilli()
  143. collector, ok := r.collectors[key]
  144. if !ok {
  145. return nil, fmt.Errorf("failed to find MetricCollector for interval '%s' for time '%s'", r.resolution.Interval(), t.Format(time.RFC3339))
  146. }
  147. return collector, nil
  148. }
  149. func (r *resolutionStores) getKeys() []int64 {
  150. r.lock.Lock()
  151. defer r.lock.Unlock()
  152. var keys []int64
  153. for key := range r.collectors {
  154. keys = append(keys, key)
  155. }
  156. return keys
  157. }