Ver código fonte

sth/querier-tweaks merge

Signed-off-by: Alex Meijer <ameijer@kubecost.com>
Alex Meijer 2 anos atrás
pai
commit
c682ac084b

+ 1 - 1
pkg/costmodel/router.go

@@ -1784,7 +1784,7 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 			return nil
 		}
 
-		customCostQuerier := customcost.NewQuerier(hourlyRepo, dailyRepo, ingConfig.HourlyDuration, ingConfig.DailyDuration)
+		customCostQuerier := customcost.NewRepositoryQuerier(hourlyRepo, dailyRepo, ingConfig.HourlyDuration, ingConfig.DailyDuration)
 		a.CustomCostQueryService = customcost.NewQueryService(customCostQuerier)
 	}
 

+ 3 - 136
pkg/customcost/querier.go

@@ -2,142 +2,9 @@ package customcost
 
 import (
 	"context"
-	"fmt"
-	"sync"
-	"time"
-
-	"github.com/opencost/opencost/core/pkg/opencost"
-	"github.com/opencost/opencost/core/pkg/util/timeutil"
 )
 
-type Querier struct {
-	hourlyRepo     Repository
-	dailyRepo      Repository
-	hourlyDuration time.Duration
-	dailyDuration  time.Duration
-}
-
-func NewQuerier(hourlyRepo, dailyRepo Repository, hourlyDuration, dailyDuration time.Duration) *Querier {
-	return &Querier{
-		hourlyRepo:     hourlyRepo,
-		dailyRepo:      dailyRepo,
-		hourlyDuration: hourlyDuration,
-		dailyDuration:  dailyDuration,
-	}
-}
-
-func (q *Querier) QueryTotal(request CostTotalRequest, ctx context.Context) (*CostResponse, error) {
-	repo, start, end, step := q.parseRequest(request.Start, request.End, request.Step)
-
-	domains, err := repo.Keys()
-	if err != nil {
-		return nil, fmt.Errorf("QueryTotal: %w", err)
-	}
-
-	requestWindow := opencost.NewClosedWindow(request.Start, request.End)
-	ccs := NewCustomCostSet(requestWindow)
-
-	queryStart := start
-	for queryStart.Before(end) {
-		queryEnd := queryStart.Add(step)
-
-		for _, domain := range domains {
-			ccResponse, err := repo.Get(queryStart, domain)
-			if err != nil {
-				return nil, fmt.Errorf("QueryTotal: %w", err)
-			} else if ccResponse == nil {
-				continue
-			}
-
-			customCosts := ParseCustomCostResponse(ccResponse)
-			ccs.Add(customCosts)
-		}
-
-		queryStart = queryEnd
-	}
-
-	err = ccs.Aggregate(request.AggregateBy)
-	if err != nil {
-		return nil, err
-	}
-
-	return NewCostResponse(ccs), nil
-}
-
-func (q *Querier) QueryTimeseries(request CostTimeseriesRequest, ctx context.Context) (*CostTimeseriesResponse, error) {
-	_, start, end, step := q.parseRequest(request.Start, request.End, request.Step)
-
-	windows, err := opencost.GetWindows(start, end, step)
-	if err != nil {
-		return nil, fmt.Errorf("error getting timeseries windows: %w", err)
-	}
-
-	totals := make([]*CostResponse, len(windows))
-	errors := make([]error, len(windows))
-
-	// Query concurrently for each result, error
-	var wg sync.WaitGroup
-	wg.Add(len(windows))
-
-	for i, w := range windows {
-		go func(i int, window opencost.Window, res []*CostResponse) {
-			defer wg.Done()
-			totals[i], errors[i] = q.QueryTotal(CostTotalRequest{
-				Start:       *window.Start(),
-				End:         *window.End(),
-				AggregateBy: request.AggregateBy,
-				Filter:      request.Filter,
-				Step:        step,
-			}, ctx)
-		}(i, w, totals)
-	}
-
-	wg.Wait()
-
-	// Return an error if any errors occurred
-	for i, err := range errors {
-		if err != nil {
-			return nil, fmt.Errorf("one of %d errors: error querying costs for %s: %w", numErrors(errors), windows[i], err)
-		}
-	}
-
-	result := &CostTimeseriesResponse{
-		Window:     opencost.NewClosedWindow(start, end),
-		Timeseries: totals,
-	}
-
-	return result, nil
-}
-
-func (q *Querier) parseRequest(requestStart, requestEnd time.Time, requestStep time.Duration) (Repository, time.Time, time.Time, time.Duration) {
-	oldestHourlyData := time.Now().UTC().Add(-q.hourlyDuration)
-
-	var step time.Duration
-	var repo Repository
-	if (requestStart.After(oldestHourlyData) || (requestStep == time.Hour)) &&
-		(requestStep != timeutil.Day) {
-		step = time.Hour
-		repo = q.hourlyRepo
-	} else {
-		step = timeutil.Day
-		repo = q.dailyRepo
-	}
-	start := opencost.RoundBack(requestStart, step)
-	end := opencost.RoundBack(requestEnd, step)
-
-	if requestStep != 0 {
-		step = requestStep
-	}
-
-	return repo, start, end, step
-}
-
-func numErrors(errors []error) int {
-	numErrs := 0
-	for i := range errors {
-		if errors[i] != nil {
-			numErrs++
-		}
-	}
-	return numErrs
+type Querier interface {
+	QueryTotal(ctx context.Context, request CostTotalRequest) (*CostResponse, error)
+	QueryTimeseries(ctx context.Context, request CostTimeseriesRequest) (*CostTimeseriesResponse, error)
 }

+ 4 - 4
pkg/customcost/queryservice.go

@@ -12,10 +12,10 @@ import (
 const tracerName = "github.com/opencost/opencost/pkg/customcost"
 
 type QueryService struct {
-	Querier *Querier
+	Querier Querier
 }
 
-func NewQueryService(querier *Querier) *QueryService {
+func NewQueryService(querier Querier) *QueryService {
 	return &QueryService{
 		Querier: querier,
 	}
@@ -45,7 +45,7 @@ func (qs *QueryService) GetCustomCostTotalHandler() func(w http.ResponseWriter,
 			return
 		}
 
-		resp, err := qs.Querier.QueryTotal(*request, ctx)
+		resp, err := qs.Querier.QueryTotal(ctx, *request)
 		if err != nil {
 			http.Error(w, fmt.Sprintf("Internal server error: %s", err), http.StatusInternalServerError)
 			return
@@ -82,7 +82,7 @@ func (qs *QueryService) GetCustomCostTimeseriesHandler() func(w http.ResponseWri
 			return
 		}
 
-		resp, err := qs.Querier.QueryTimeseries(*request, ctx)
+		resp, err := qs.Querier.QueryTimeseries(ctx, *request)
 		if err != nil {
 			http.Error(w, fmt.Sprintf("Internal server error: %s", err), http.StatusInternalServerError)
 			return

+ 2 - 2
pkg/customcost/queryservice_helper.go

@@ -68,7 +68,7 @@ func ParseCustomCostTimeseriesRequest(qp httputil.QueryParams) (*CostTimeseriesR
 		return nil, err
 	}
 
-	step := qp.GetDuration("step", 0)
+	accumulate := opencost.ParseAccumulate(qp.Get("accumulate", ""))
 
 	var filter filter.Filter
 	//filterString := qp.Get("filter", "")
@@ -84,7 +84,7 @@ func ParseCustomCostTimeseriesRequest(qp httputil.QueryParams) (*CostTimeseriesR
 		Start:       *window.Start(),
 		End:         *window.End(),
 		AggregateBy: aggregateBy,
-		Step:        step,
+		Accumulate:  accumulate,
 		Filter:      filter,
 	}
 

+ 123 - 0
pkg/customcost/repositoryquerier.go

@@ -0,0 +1,123 @@
+package customcost
+
+import (
+	"context"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
+)
+
+type RepositoryQuerier struct {
+	hourlyRepo     Repository
+	dailyRepo      Repository
+	hourlyDuration time.Duration
+	dailyDuration  time.Duration
+}
+
+func NewRepositoryQuerier(hourlyRepo, dailyRepo Repository, hourlyDuration, dailyDuration time.Duration) *RepositoryQuerier {
+	return &RepositoryQuerier{
+		hourlyRepo:     hourlyRepo,
+		dailyRepo:      dailyRepo,
+		hourlyDuration: hourlyDuration,
+		dailyDuration:  dailyDuration,
+	}
+}
+
+func (rq *RepositoryQuerier) QueryTotal(ctx context.Context, request CostTotalRequest) (*CostResponse, error) {
+	repo := rq.dailyRepo
+	step := timeutil.Day
+	if request.Accumulate == opencost.AccumulateOptionHour {
+		repo = rq.hourlyRepo
+		step = time.Hour
+	}
+	domains, err := repo.Keys()
+	if err != nil {
+		return nil, fmt.Errorf("QueryTotal: %w", err)
+	}
+
+	requestWindow := opencost.NewClosedWindow(request.Start, request.End)
+	ccs := NewCustomCostSet(requestWindow)
+	queryStart := request.Start
+	for queryStart.Before(request.End) {
+		queryEnd := queryStart.Add(step)
+
+		for _, domain := range domains {
+			ccResponse, err := repo.Get(queryStart, domain)
+			if err != nil {
+				return nil, fmt.Errorf("QueryTotal: %w", err)
+			} else if ccResponse == nil {
+				continue
+			}
+
+			customCosts := ParseCustomCostResponse(ccResponse)
+			ccs.Add(customCosts)
+		}
+
+		queryStart = queryEnd
+	}
+
+	err = ccs.Aggregate(request.AggregateBy)
+	if err != nil {
+		return nil, err
+	}
+
+	return NewCostResponse(ccs), nil
+}
+
+func (rq *RepositoryQuerier) QueryTimeseries(ctx context.Context, request CostTimeseriesRequest) (*CostTimeseriesResponse, error) {
+	window, _ := opencost.NewClosedWindow(request.Start, request.End).GetAccumulateWindow(request.Accumulate)
+
+	windows, err := window.GetAccumulateWindows(request.Accumulate)
+	if err != nil {
+		return nil, fmt.Errorf("error getting timeseries windows: %w", err)
+	}
+
+	totals := make([]*CostResponse, len(windows))
+	errors := make([]error, len(windows))
+
+	// Query concurrently for each result, error
+	var wg sync.WaitGroup
+	wg.Add(len(windows))
+
+	for i, w := range windows {
+		go func(i int, window opencost.Window, res []*CostResponse) {
+			defer wg.Done()
+			totals[i], errors[i] = rq.QueryTotal(ctx, CostTotalRequest{
+				Start:       *window.Start(),
+				End:         *window.End(),
+				AggregateBy: request.AggregateBy,
+				Filter:      request.Filter,
+				Accumulate:  request.Accumulate,
+			})
+		}(i, w, totals)
+	}
+
+	wg.Wait()
+
+	// Return an error if any errors occurred
+	for i, err := range errors {
+		if err != nil {
+			return nil, fmt.Errorf("one of %d errors: error querying costs for %s: %w", numErrors(errors), windows[i], err)
+		}
+	}
+
+	result := &CostTimeseriesResponse{
+		Window:     window,
+		Timeseries: totals,
+	}
+
+	return result, nil
+}
+
+func numErrors(errors []error) int {
+	numErrs := 0
+	for i := range errors {
+		if errors[i] != nil {
+			numErrs++
+		}
+	}
+	return numErrs
+}

+ 2 - 2
pkg/customcost/types.go

@@ -14,7 +14,7 @@ type CostTotalRequest struct {
 	Start       time.Time
 	End         time.Time
 	AggregateBy []string
-	Step        time.Duration
+	Accumulate  opencost.AccumulateOption
 	Filter      filter.Filter
 }
 
@@ -22,7 +22,7 @@ type CostTimeseriesRequest struct {
 	Start       time.Time
 	End         time.Time
 	AggregateBy []string
-	Step        time.Duration
+	Accumulate  opencost.AccumulateOption
 	Filter      filter.Filter
 }