| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- package customcost
- import (
- "context"
- "fmt"
- "sync"
- "time"
- "github.com/opencost/opencost/core/pkg/opencost"
- "github.com/opencost/opencost/core/pkg/util/timeutil"
- "github.com/opencost/opencost/pkg/env"
- )
- type RepositoryQuerier struct {
- hourlyRepo Repository
- dailyRepo Repository
- hourlyDuration time.Duration
- dailyDuration time.Duration
- }
- func NewRepositoryQuerier(hourlyRepo, dailyRepo Repository, hourlyDuration, dailyDuration time.Duration) *RepositoryQuerier {
- return &RepositoryQuerier{
- hourlyRepo: hourlyRepo,
- dailyRepo: dailyRepo,
- hourlyDuration: hourlyDuration,
- dailyDuration: dailyDuration,
- }
- }
- func (rq *RepositoryQuerier) QueryTotal(ctx context.Context, request CostTotalRequest) (*CostResponse, error) {
- repo := rq.dailyRepo
- step := timeutil.Day
- if request.Accumulate == opencost.AccumulateOptionHour {
- repo = rq.hourlyRepo
- step = time.Hour
- }
- domains, err := repo.Keys()
- if err != nil {
- return nil, fmt.Errorf("QueryTotal: %w", err)
- }
- compiler := NewCustomCostMatchCompiler()
- matcher, err := compiler.Compile(request.Filter)
- if err != nil {
- return nil, fmt.Errorf("RepositoryQuerier: Query: failed to compile filters: %w", err)
- }
- requestWindow := opencost.NewClosedWindow(request.Start, request.End)
- ccs := NewCustomCostSet(requestWindow)
- queryStart := request.Start
- for queryStart.Before(request.End) {
- queryEnd := queryStart.Add(step)
- for _, domain := range domains {
- ccResponse, err := repo.Get(queryStart, domain)
- if err != nil {
- return nil, fmt.Errorf("QueryTotal: %w", err)
- } else if ccResponse == nil || ccResponse.Start == nil || ccResponse.End == nil {
- continue
- }
- customCosts := ParseCustomCostResponse(ccResponse)
- for _, customCost := range customCosts {
- if matcher.Matches(customCost) {
- ccs.Add(customCost)
- }
- }
- }
- queryStart = queryEnd
- }
- err = ccs.Aggregate(request.AggregateBy)
- if err != nil {
- return nil, err
- }
- return NewCostResponse(ccs), nil
- }
- var allSteppedAccumulateOptions = []opencost.AccumulateOption{
- opencost.AccumulateOptionHour,
- opencost.AccumulateOptionDay,
- }
- func hasHourly(opts []opencost.AccumulateOption) bool {
- for _, opt := range opts {
- if opt == opencost.AccumulateOptionHour {
- return true
- }
- }
- return false
- }
- func hasDaily(opts []opencost.AccumulateOption) bool {
- for _, opt := range opts {
- if opt == opencost.AccumulateOptionDay {
- return true
- }
- }
- return false
- }
- // GetCustomCostAccumulateOption determines defaults in a way that matches options presented in the UI
- func getCustomCostAccumulateOption(window opencost.Window, from []opencost.AccumulateOption) (opencost.AccumulateOption, error) {
- if window.IsOpen() || window.IsNegative() {
- return opencost.AccumulateOptionNone, fmt.Errorf("invalid window '%s'", window.String())
- }
- if len(from) == 0 {
- from = allSteppedAccumulateOptions
- }
- hourlyStoreHours := env.GetDataRetentionHourlyResolutionHours()
- hourlySteps := time.Duration(hourlyStoreHours) * time.Hour
- oldestHourly := time.Now().Add(-1 * hourlySteps)
- // Use hourly if...
- // (1) hourly is an option;
- // (2) we have hourly store coverage; and
- // (3) the window duration is less than the hourly break point.
- if hasHourly(from) && oldestHourly.Before(*window.Start()) && window.Duration() <= hourlySteps {
- return opencost.AccumulateOptionHour, nil
- }
- dailyStoreDays := env.GetDataRetentionDailyResolutionDays()
- dailySteps := time.Duration(dailyStoreDays) * timeutil.Day
- oldestDaily := time.Now().Add(-1 * dailySteps)
- // Use daily if...
- // (1) daily is an option; and
- // (2) we have daily store coverage
- if hasDaily(from) && oldestDaily.Before(*window.Start()) {
- return opencost.AccumulateOptionDay, nil
- }
- if oldestDaily.After(*window.Start()) {
- return opencost.AccumulateOptionNone, fmt.Errorf("data store does not have coverage for %v", window)
- }
- return opencost.AccumulateOptionNone, fmt.Errorf("no valid accumulate option in %v for %s", from, window)
- }
- func (rq *RepositoryQuerier) QueryTimeseries(ctx context.Context, request CostTimeseriesRequest) (*CostTimeseriesResponse, error) {
- window, _ := opencost.NewClosedWindow(request.Start, request.End).GetAccumulateWindow(request.Accumulate)
- var err error
- if request.Accumulate == opencost.AccumulateOptionNone {
- request.Accumulate, err = getCustomCostAccumulateOption(window, nil)
- if err != nil {
- return nil, fmt.Errorf("error determining accumulation option: %v", err)
- }
- }
- windows, err := window.GetAccumulateWindows(request.Accumulate)
- if err != nil {
- return nil, fmt.Errorf("error getting timeseries windows: %w", err)
- }
- totals := make([]*CostResponse, len(windows))
- errors := make([]error, len(windows))
- // Query concurrently for each result, error
- var wg sync.WaitGroup
- wg.Add(len(windows))
- for i, w := range windows {
- go func(i int, window opencost.Window, res []*CostResponse) {
- defer wg.Done()
- totals[i], errors[i] = rq.QueryTotal(ctx, CostTotalRequest{
- Start: *window.Start(),
- End: *window.End(),
- AggregateBy: request.AggregateBy,
- Filter: request.Filter,
- Accumulate: request.Accumulate,
- })
- }(i, w, totals)
- }
- wg.Wait()
- // Return an error if any errors occurred
- for i, err := range errors {
- if err != nil {
- return nil, fmt.Errorf("one of %d errors: error querying costs for %s: %w", numErrors(errors), windows[i], err)
- }
- }
- result := &CostTimeseriesResponse{
- Window: window,
- Timeseries: totals,
- }
- return result, nil
- }
- func numErrors(errors []error) int {
- numErrs := 0
- for i := range errors {
- if errors[i] != nil {
- numErrs++
- }
- }
- return numErrs
- }
|