repositoryquerier.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package customcost
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/opencost"
  8. "github.com/opencost/opencost/core/pkg/util/timeutil"
  9. "github.com/opencost/opencost/pkg/env"
  10. )
  11. type RepositoryQuerier struct {
  12. hourlyRepo Repository
  13. dailyRepo Repository
  14. hourlyDuration time.Duration
  15. dailyDuration time.Duration
  16. }
  17. func NewRepositoryQuerier(hourlyRepo, dailyRepo Repository, hourlyDuration, dailyDuration time.Duration) *RepositoryQuerier {
  18. return &RepositoryQuerier{
  19. hourlyRepo: hourlyRepo,
  20. dailyRepo: dailyRepo,
  21. hourlyDuration: hourlyDuration,
  22. dailyDuration: dailyDuration,
  23. }
  24. }
  25. func (rq *RepositoryQuerier) QueryTotal(ctx context.Context, request CostTotalRequest) (*CostResponse, error) {
  26. repo := rq.dailyRepo
  27. step := timeutil.Day
  28. if request.Accumulate == opencost.AccumulateOptionHour {
  29. repo = rq.hourlyRepo
  30. step = time.Hour
  31. }
  32. domains, err := repo.Keys()
  33. if err != nil {
  34. return nil, fmt.Errorf("QueryTotal: %w", err)
  35. }
  36. compiler := NewCustomCostMatchCompiler()
  37. matcher, err := compiler.Compile(request.Filter)
  38. if err != nil {
  39. return nil, fmt.Errorf("RepositoryQuerier: Query: failed to compile filters: %w", err)
  40. }
  41. requestWindow := opencost.NewClosedWindow(request.Start, request.End)
  42. ccs := NewCustomCostSet(requestWindow)
  43. queryStart := request.Start
  44. for queryStart.Before(request.End) {
  45. queryEnd := queryStart.Add(step)
  46. for _, domain := range domains {
  47. ccResponse, err := repo.Get(queryStart, domain)
  48. if err != nil {
  49. return nil, fmt.Errorf("QueryTotal: %w", err)
  50. } else if ccResponse == nil || ccResponse.Start == nil || ccResponse.End == nil {
  51. continue
  52. }
  53. customCosts := ParseCustomCostResponse(ccResponse)
  54. for _, customCost := range customCosts {
  55. if matcher.Matches(customCost) {
  56. ccs.Add(customCost)
  57. }
  58. }
  59. }
  60. queryStart = queryEnd
  61. }
  62. err = ccs.Aggregate(request.AggregateBy)
  63. if err != nil {
  64. return nil, err
  65. }
  66. return NewCostResponse(ccs), nil
  67. }
  68. var allSteppedAccumulateOptions = []opencost.AccumulateOption{
  69. opencost.AccumulateOptionHour,
  70. opencost.AccumulateOptionDay,
  71. }
  72. func hasHourly(opts []opencost.AccumulateOption) bool {
  73. for _, opt := range opts {
  74. if opt == opencost.AccumulateOptionHour {
  75. return true
  76. }
  77. }
  78. return false
  79. }
  80. func hasDaily(opts []opencost.AccumulateOption) bool {
  81. for _, opt := range opts {
  82. if opt == opencost.AccumulateOptionDay {
  83. return true
  84. }
  85. }
  86. return false
  87. }
  88. // GetCustomCostAccumulateOption determines defaults in a way that matches options presented in the UI
  89. func getCustomCostAccumulateOption(window opencost.Window, from []opencost.AccumulateOption) (opencost.AccumulateOption, error) {
  90. if window.IsOpen() || window.IsNegative() {
  91. return opencost.AccumulateOptionNone, fmt.Errorf("invalid window '%s'", window.String())
  92. }
  93. if len(from) == 0 {
  94. from = allSteppedAccumulateOptions
  95. }
  96. hourlyStoreHours := env.GetDataRetentionHourlyResolutionHours()
  97. hourlySteps := time.Duration(hourlyStoreHours) * time.Hour
  98. oldestHourly := time.Now().Add(-1 * hourlySteps)
  99. // Use hourly if...
  100. // (1) hourly is an option;
  101. // (2) we have hourly store coverage; and
  102. // (3) the window duration is less than the hourly break point.
  103. if hasHourly(from) && oldestHourly.Before(*window.Start()) && window.Duration() <= hourlySteps {
  104. return opencost.AccumulateOptionHour, nil
  105. }
  106. dailyStoreDays := env.GetDataRetentionDailyResolutionDays()
  107. dailySteps := time.Duration(dailyStoreDays) * timeutil.Day
  108. oldestDaily := time.Now().Add(-1 * dailySteps)
  109. // Use daily if...
  110. // (1) daily is an option; and
  111. // (2) we have daily store coverage
  112. if hasDaily(from) && oldestDaily.Before(*window.Start()) {
  113. return opencost.AccumulateOptionDay, nil
  114. }
  115. if oldestDaily.After(*window.Start()) {
  116. return opencost.AccumulateOptionNone, fmt.Errorf("data store does not have coverage for %v", window)
  117. }
  118. return opencost.AccumulateOptionNone, fmt.Errorf("no valid accumulate option in %v for %s", from, window)
  119. }
  120. func (rq *RepositoryQuerier) QueryTimeseries(ctx context.Context, request CostTimeseriesRequest) (*CostTimeseriesResponse, error) {
  121. window, _ := opencost.NewClosedWindow(request.Start, request.End).GetAccumulateWindow(request.Accumulate)
  122. var err error
  123. if request.Accumulate == opencost.AccumulateOptionNone {
  124. request.Accumulate, err = getCustomCostAccumulateOption(window, nil)
  125. if err != nil {
  126. return nil, fmt.Errorf("error determining accumulation option: %v", err)
  127. }
  128. }
  129. windows, err := window.GetAccumulateWindows(request.Accumulate)
  130. if err != nil {
  131. return nil, fmt.Errorf("error getting timeseries windows: %w", err)
  132. }
  133. totals := make([]*CostResponse, len(windows))
  134. errors := make([]error, len(windows))
  135. // Query concurrently for each result, error
  136. var wg sync.WaitGroup
  137. wg.Add(len(windows))
  138. for i, w := range windows {
  139. go func(i int, window opencost.Window, res []*CostResponse) {
  140. defer wg.Done()
  141. totals[i], errors[i] = rq.QueryTotal(ctx, CostTotalRequest{
  142. Start: *window.Start(),
  143. End: *window.End(),
  144. AggregateBy: request.AggregateBy,
  145. Filter: request.Filter,
  146. Accumulate: request.Accumulate,
  147. })
  148. }(i, w, totals)
  149. }
  150. wg.Wait()
  151. // Return an error if any errors occurred
  152. for i, err := range errors {
  153. if err != nil {
  154. return nil, fmt.Errorf("one of %d errors: error querying costs for %s: %w", numErrors(errors), windows[i], err)
  155. }
  156. }
  157. result := &CostTimeseriesResponse{
  158. Window: window,
  159. Timeseries: totals,
  160. }
  161. return result, nil
  162. }
  163. func numErrors(errors []error) int {
  164. numErrs := 0
  165. for i := range errors {
  166. if errors[i] != nil {
  167. numErrs++
  168. }
  169. }
  170. return numErrs
  171. }