repositoryquerier.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package customcost
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/opencost"
  8. "github.com/opencost/opencost/core/pkg/util/timeutil"
  9. )
  10. type RepositoryQuerier struct {
  11. hourlyRepo Repository
  12. dailyRepo Repository
  13. hourlyDuration time.Duration
  14. dailyDuration time.Duration
  15. }
  16. func NewRepositoryQuerier(hourlyRepo, dailyRepo Repository, hourlyDuration, dailyDuration time.Duration) *RepositoryQuerier {
  17. return &RepositoryQuerier{
  18. hourlyRepo: hourlyRepo,
  19. dailyRepo: dailyRepo,
  20. hourlyDuration: hourlyDuration,
  21. dailyDuration: dailyDuration,
  22. }
  23. }
  24. func (rq *RepositoryQuerier) QueryTotal(ctx context.Context, request CostTotalRequest) (*CostResponse, error) {
  25. repo := rq.dailyRepo
  26. step := timeutil.Day
  27. if request.Accumulate == opencost.AccumulateOptionHour {
  28. repo = rq.hourlyRepo
  29. step = time.Hour
  30. }
  31. domains, err := repo.Keys()
  32. if err != nil {
  33. return nil, fmt.Errorf("QueryTotal: %w", err)
  34. }
  35. requestWindow := opencost.NewClosedWindow(request.Start, request.End)
  36. ccs := NewCustomCostSet(requestWindow)
  37. queryStart := request.Start
  38. for queryStart.Before(request.End) {
  39. queryEnd := queryStart.Add(step)
  40. for _, domain := range domains {
  41. ccResponse, err := repo.Get(queryStart, domain)
  42. if err != nil {
  43. return nil, fmt.Errorf("QueryTotal: %w", err)
  44. } else if ccResponse == nil || ccResponse.Start == nil || ccResponse.End == nil {
  45. continue
  46. }
  47. customCosts := ParseCustomCostResponse(ccResponse)
  48. ccs.Add(customCosts)
  49. }
  50. queryStart = queryEnd
  51. }
  52. err = ccs.Aggregate(request.AggregateBy)
  53. if err != nil {
  54. return nil, err
  55. }
  56. return NewCostResponse(ccs), nil
  57. }
  58. func (rq *RepositoryQuerier) QueryTimeseries(ctx context.Context, request CostTimeseriesRequest) (*CostTimeseriesResponse, error) {
  59. window, _ := opencost.NewClosedWindow(request.Start, request.End).GetAccumulateWindow(request.Accumulate)
  60. windows, err := window.GetAccumulateWindows(request.Accumulate)
  61. if err != nil {
  62. return nil, fmt.Errorf("error getting timeseries windows: %w", err)
  63. }
  64. totals := make([]*CostResponse, len(windows))
  65. errors := make([]error, len(windows))
  66. // Query concurrently for each result, error
  67. var wg sync.WaitGroup
  68. wg.Add(len(windows))
  69. for i, w := range windows {
  70. go func(i int, window opencost.Window, res []*CostResponse) {
  71. defer wg.Done()
  72. totals[i], errors[i] = rq.QueryTotal(ctx, CostTotalRequest{
  73. Start: *window.Start(),
  74. End: *window.End(),
  75. AggregateBy: request.AggregateBy,
  76. Filter: request.Filter,
  77. Accumulate: request.Accumulate,
  78. })
  79. }(i, w, totals)
  80. }
  81. wg.Wait()
  82. // Return an error if any errors occurred
  83. for i, err := range errors {
  84. if err != nil {
  85. return nil, fmt.Errorf("one of %d errors: error querying costs for %s: %w", numErrors(errors), windows[i], err)
  86. }
  87. }
  88. result := &CostTimeseriesResponse{
  89. Window: window,
  90. Timeseries: totals,
  91. }
  92. return result, nil
  93. }
  94. func numErrors(errors []error) int {
  95. numErrs := 0
  96. for i := range errors {
  97. if errors[i] != nil {
  98. numErrs++
  99. }
  100. }
  101. return numErrs
  102. }