collectorprovider.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package collector
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/core/pkg/util/timeutil"
  7. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  8. "github.com/opencost/opencost/modules/collector-source/pkg/util"
  9. )
  10. // StoreProvider returns an appropriate collector for the given window. This is meant to bridge the mismatch of a system
  11. // that was designed to make queries against a continuous datasource with now stores its data in discrete blocks
  12. type StoreProvider interface {
  13. GetStore(start, end time.Time) metric.MetricStore
  14. GetDailyDataCoverage(limitDays int) (time.Time, time.Time, error)
  15. }
  16. // repoStoreProvider is a StoreProvider implementation which uses a Repository and the Intervals of its Resolutions that it is
  17. // configured with to return the most appropriate time.
  18. type repoStoreProvider struct {
  19. repo *metric.MetricRepository
  20. intervals map[string]util.Interval
  21. }
  22. func newRepoStoreProvider(repo *metric.MetricRepository, resoluationConfigs []util.ResolutionConfiguration) *repoStoreProvider {
  23. intervals := make(map[string]util.Interval)
  24. for _, resConf := range resoluationConfigs {
  25. interval, err := util.NewInterval(resConf.Interval)
  26. if err != nil {
  27. continue
  28. }
  29. intervals[resConf.Interval] = interval
  30. }
  31. return &repoStoreProvider{
  32. repo: repo,
  33. intervals: intervals,
  34. }
  35. }
  36. func (r *repoStoreProvider) GetStore(start, end time.Time) metric.MetricStore {
  37. resKey, start := r.getStoreKeys(start, end)
  38. store, err := r.repo.GetCollector(resKey, start)
  39. if err != nil {
  40. log.Debugf("failed to get Store for window '%s - %s': %s", start, end, err)
  41. }
  42. return store
  43. }
  44. // getStoreKeys compares the given start and end against each resolution by truncating the start time and
  45. // add one interval to the truncated value. The duration between start and end is compared with the duration
  46. // between the interval generated times, with the lowest
  47. func (r *repoStoreProvider) getStoreKeys(start, end time.Time) (string, time.Time) {
  48. windowDuration := int64(end.Sub(start))
  49. var minDiff *int64
  50. var minKey string
  51. var minStart time.Time
  52. for key, interval := range r.intervals {
  53. intStart := interval.Truncate(start)
  54. intEnd := interval.Add(start, 1)
  55. intDuration := int64(intEnd.Sub(intStart))
  56. diffDuration := windowDuration - intDuration
  57. if diffDuration < 0 {
  58. diffDuration = -diffDuration
  59. }
  60. if minDiff == nil || diffDuration < *minDiff {
  61. minDiff = &diffDuration
  62. minKey = key
  63. minStart = intStart
  64. }
  65. }
  66. return minKey, minStart
  67. }
  68. // GetDailyDataCoverage this is a bit of a hacky add-on to help fulfill the metricsquerier interface
  69. func (r *repoStoreProvider) GetDailyDataCoverage(limitDays int) (time.Time, time.Time, error) {
  70. coverage := r.repo.Coverage()
  71. dailyCoverage, ok := coverage["1d"]
  72. if !ok {
  73. return time.Time{}, time.Time{}, fmt.Errorf("daily resolution is not configured")
  74. }
  75. if len(dailyCoverage) == 0 {
  76. // If daily coverage is not available, fallback to a reasonable time range
  77. // This prevents CSV export from failing when the metric doesn't exist yet
  78. log.Warnf("GetDailyDataCoverage: daily coverage not available, using fallback time range")
  79. // Use a reasonable fallback: start from 1 day ago to account for metric collection delay
  80. fallbackEnd := time.Now().UTC().Truncate(timeutil.Day)
  81. fallbackStart := fallbackEnd.AddDate(0, 0, -1) // 1 day ago
  82. return fallbackStart, fallbackEnd, nil
  83. }
  84. start := dailyCoverage[0]
  85. end := dailyCoverage[0]
  86. for _, window := range dailyCoverage {
  87. if start.After(window) {
  88. start = window
  89. }
  90. if end.Before(window) {
  91. end = window
  92. }
  93. }
  94. limit := time.Now().UTC().Truncate(timeutil.Day).Add(-timeutil.Day * time.Duration(limitDays))
  95. if start.Before(limit) {
  96. start = limit
  97. }
  98. // since all times that we have been looking at are window start times,
  99. // add a day to end time to create the actual coverage
  100. end = end.Add(timeutil.Day)
  101. return start, end, nil
  102. }