repositoryquerier.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package customcost
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/opencost"
  8. "github.com/opencost/opencost/core/pkg/util/timeutil"
  9. )
  10. type RepositoryQuerier struct {
  11. hourlyRepo Repository
  12. dailyRepo Repository
  13. hourlyDuration time.Duration
  14. dailyDuration time.Duration
  15. }
  16. func NewRepositoryQuerier(hourlyRepo, dailyRepo Repository, hourlyDuration, dailyDuration time.Duration) *RepositoryQuerier {
  17. return &RepositoryQuerier{
  18. hourlyRepo: hourlyRepo,
  19. dailyRepo: dailyRepo,
  20. hourlyDuration: hourlyDuration,
  21. dailyDuration: dailyDuration,
  22. }
  23. }
  24. func (rq *RepositoryQuerier) QueryTotal(ctx context.Context, request CostTotalRequest) (*CostResponse, error) {
  25. window := opencost.NewClosedWindow(request.Start, request.End)
  26. window, accumulate, err := GetCustomCostWindowAccumulation(window, request.Accumulate)
  27. if err != nil {
  28. return nil, fmt.Errorf("error getting custom cost total window accumulation: %w", err)
  29. }
  30. repo := rq.dailyRepo
  31. step := timeutil.Day
  32. if accumulate == opencost.AccumulateOptionHour {
  33. repo = rq.hourlyRepo
  34. step = time.Hour
  35. }
  36. domains, err := repo.Keys()
  37. if err != nil {
  38. return nil, fmt.Errorf("QueryTotal: %w", err)
  39. }
  40. compiler := NewCustomCostMatchCompiler()
  41. matcher, err := compiler.Compile(request.Filter)
  42. if err != nil {
  43. return nil, fmt.Errorf("RepositoryQuerier: Query: failed to compile filters: %w", err)
  44. }
  45. ccs := NewCustomCostSet(window)
  46. queryStart := *window.Start()
  47. for queryStart.Before(*window.End()) {
  48. queryEnd := queryStart.Add(step)
  49. for _, domain := range domains {
  50. ccResponse, err := repo.Get(queryStart, domain)
  51. if err != nil {
  52. return nil, fmt.Errorf("QueryTotal: %w", err)
  53. } else if ccResponse == nil || ccResponse.Start == nil || ccResponse.End == nil {
  54. continue
  55. }
  56. customCosts := ParseCustomCostResponse(ccResponse, request.CostType)
  57. for _, customCost := range customCosts {
  58. if matcher.Matches(customCost) {
  59. ccs.Add(customCost)
  60. }
  61. }
  62. }
  63. queryStart = queryEnd
  64. }
  65. err = ccs.Aggregate(request.AggregateBy)
  66. if err != nil {
  67. return nil, err
  68. }
  69. ccs.Sort(request.SortBy, request.SortDirection)
  70. return NewCostResponse(ccs, request.CostType), nil
  71. }
  72. func (rq *RepositoryQuerier) QueryTimeseries(ctx context.Context, request CostTimeseriesRequest) (*CostTimeseriesResponse, error) {
  73. window := opencost.NewClosedWindow(request.Start, request.End)
  74. window, accumulate, err := GetCustomCostWindowAccumulation(window, request.Accumulate)
  75. if err != nil {
  76. return nil, fmt.Errorf("error getting custom cost timeseries window accumulation: %w", err)
  77. }
  78. windows, err := window.GetAccumulateWindows(accumulate)
  79. if err != nil {
  80. return nil, fmt.Errorf("error getting timeseries windows: %w", err)
  81. }
  82. totals := make([]*CostResponse, len(windows))
  83. errors := make([]error, len(windows))
  84. // Query concurrently for each result, error
  85. var wg sync.WaitGroup
  86. wg.Add(len(windows))
  87. for i, w := range windows {
  88. go func(i int, window opencost.Window, res []*CostResponse) {
  89. defer wg.Done()
  90. totals[i], errors[i] = rq.QueryTotal(ctx, CostTotalRequest{
  91. Start: *window.Start(),
  92. End: *window.End(),
  93. AggregateBy: request.AggregateBy,
  94. Filter: request.Filter,
  95. Accumulate: accumulate,
  96. CostType: request.CostType,
  97. SortBy: request.SortBy,
  98. SortDirection: request.SortDirection,
  99. })
  100. }(i, w, totals)
  101. }
  102. wg.Wait()
  103. // Return an error if any errors occurred
  104. for i, err := range errors {
  105. if err != nil {
  106. return nil, fmt.Errorf("one of %d errors: error querying costs for %s: %w", numErrors(errors), windows[i], err)
  107. }
  108. }
  109. result := &CostTimeseriesResponse{
  110. Window: window,
  111. Timeseries: totals,
  112. }
  113. return result, nil
  114. }
  115. func numErrors(errors []error) int {
  116. numErrs := 0
  117. for i := range errors {
  118. if errors[i] != nil {
  119. numErrs++
  120. }
  121. }
  122. return numErrs
  123. }