querier.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package customcost
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/opencost"
  8. "github.com/opencost/opencost/core/pkg/util/timeutil"
  9. )
  10. type Querier struct {
  11. hourlyRepo Repository
  12. dailyRepo Repository
  13. hourlyDuration time.Duration
  14. dailyDuration time.Duration
  15. }
  16. func NewQuerier(hourlyRepo, dailyRepo Repository, hourlyDuration, dailyDuration time.Duration) *Querier {
  17. return &Querier{
  18. hourlyRepo: hourlyRepo,
  19. dailyRepo: dailyRepo,
  20. hourlyDuration: hourlyDuration,
  21. dailyDuration: dailyDuration,
  22. }
  23. }
  24. func (q *Querier) QueryTotal(request CostTotalRequest, ctx context.Context) (*CostResponse, error) {
  25. repo, start, end, step := q.parseRequest(request.Start, request.End, request.Step)
  26. domains, err := repo.Keys()
  27. if err != nil {
  28. return nil, fmt.Errorf("QueryTotal: %w", err)
  29. }
  30. requestWindow := opencost.NewClosedWindow(request.Start, request.End)
  31. ccs := NewCustomCostSet(requestWindow)
  32. queryStart := start
  33. for queryStart.Before(end) {
  34. queryEnd := queryStart.Add(step)
  35. for _, domain := range domains {
  36. ccResponse, err := repo.Get(queryStart, domain)
  37. if err != nil {
  38. return nil, fmt.Errorf("QueryTotal: %w", err)
  39. } else if ccResponse == nil {
  40. continue
  41. }
  42. customCosts := ParseCustomCostResponse(ccResponse)
  43. ccs.Add(customCosts)
  44. }
  45. queryStart = queryEnd
  46. }
  47. err = ccs.Aggregate(request.AggregateBy)
  48. if err != nil {
  49. return nil, err
  50. }
  51. return NewCostResponse(ccs), nil
  52. }
  53. func (q *Querier) QueryTimeseries(request CostTimeseriesRequest, ctx context.Context) (*CostTimeseriesResponse, error) {
  54. _, start, end, step := q.parseRequest(request.Start, request.End, request.Step)
  55. windows, err := opencost.GetWindows(start, end, step)
  56. if err != nil {
  57. return nil, fmt.Errorf("error getting timeseries windows: %w", err)
  58. }
  59. totals := make([]*CostResponse, len(windows))
  60. errors := make([]error, len(windows))
  61. // Query concurrently for each result, error
  62. var wg sync.WaitGroup
  63. wg.Add(len(windows))
  64. for i, w := range windows {
  65. go func(i int, window opencost.Window, res []*CostResponse) {
  66. defer wg.Done()
  67. totals[i], errors[i] = q.QueryTotal(CostTotalRequest{
  68. Start: *window.Start(),
  69. End: *window.End(),
  70. AggregateBy: request.AggregateBy,
  71. Filter: request.Filter,
  72. Step: step,
  73. }, ctx)
  74. }(i, w, totals)
  75. }
  76. wg.Wait()
  77. // Return an error if any errors occurred
  78. for i, err := range errors {
  79. if err != nil {
  80. return nil, fmt.Errorf("one of %d errors: error querying costs for %s: %w", numErrors(errors), windows[i], err)
  81. }
  82. }
  83. result := &CostTimeseriesResponse{
  84. Window: opencost.NewClosedWindow(start, end),
  85. Timeseries: totals,
  86. }
  87. return result, nil
  88. }
  89. func (q *Querier) parseRequest(requestStart, requestEnd time.Time, requestStep time.Duration) (Repository, time.Time, time.Time, time.Duration) {
  90. oldestHourlyData := time.Now().UTC().Add(-q.hourlyDuration)
  91. var step time.Duration
  92. var repo Repository
  93. if (requestStart.After(oldestHourlyData) || (requestStep == time.Hour)) &&
  94. (requestStep != timeutil.Day) {
  95. step = time.Hour
  96. repo = q.hourlyRepo
  97. } else {
  98. step = timeutil.Day
  99. repo = q.dailyRepo
  100. }
  101. start := opencost.RoundBack(requestStart, step)
  102. end := opencost.RoundBack(requestEnd, step)
  103. if requestStep != 0 {
  104. step = requestStep
  105. }
  106. return repo, start, end, step
  107. }
  108. func numErrors(errors []error) int {
  109. numErrs := 0
  110. for i := range errors {
  111. if errors[i] != nil {
  112. numErrs++
  113. }
  114. }
  115. return numErrs
  116. }