Răsfoiți Sursa

Merge pull request #2619 from nik-kc/nik/plugins

Cliff Colvin 2 ani în urmă
părinte
comite
862911f0d4

+ 2 - 3
core/pkg/version/version.go

@@ -3,9 +3,8 @@ package version
 import "fmt"
 
 var (
-	Version      = "dev"
-	GitCommit    = "HEAD"
-	Architecture = "amd64"
+	Version   = "dev"
+	GitCommit = "HEAD"
 )
 
 func FriendlyVersion() string {

+ 6 - 2
pkg/cmd/costmodel/costmodel.go

@@ -68,8 +68,9 @@ func Execute(opts *CostModelOpts) error {
 		if err != nil {
 			return fmt.Errorf("error instantiating custom cost pipeline service: %v", err)
 		}
-		//repoQuerier := cloudcost.NewRepositoryQuerier(repo)
-		//a.CloudCostQueryService = cloudcost.NewQueryService(repoQuerier, repoQuerier)
+
+		customCostQuerier := customcost.NewQuerier(hourlyRepo, dailyRepo, ingConfig.HourlyDuration, ingConfig.DailyDuration)
+		a.CustomCostQueryService = customcost.NewQueryService(customCostQuerier)
 	}
 
 	rootMux := http.NewServeMux()
@@ -90,6 +91,9 @@ func Execute(opts *CostModelOpts) error {
 	a.Router.GET("/cloudCost/rebuild", a.CloudCostPipelineService.GetCloudCostRebuildHandler())
 	a.Router.GET("/cloudCost/repair", a.CloudCostPipelineService.GetCloudCostRepairHandler())
 
+	a.Router.GET("/customCost/total", a.CustomCostQueryService.GetCustomCostTotalHandler())
+	a.Router.GET("/customCost/timeseries", a.CustomCostQueryService.GetCustomCostTimeseriesHandler())
+
 	if env.IsPProfEnabled() {
 		a.Router.HandlerFunc(http.MethodGet, "/debug/pprof/", pprof.Index)
 		a.Router.HandlerFunc(http.MethodGet, "/debug/pprof/cmdline", pprof.Cmdline)

+ 1 - 0
pkg/costmodel/router.go

@@ -97,6 +97,7 @@ type Accesses struct {
 	CloudConfigController     *cloudconfig.Controller
 	CloudCostPipelineService  *cloudcost.PipelineService
 	CloudCostQueryService     *cloudcost.QueryService
+	CustomCostQueryService    *customcost.QueryService
 	CustomCostPipelineService *customcost.PipelineService
 	ClusterInfoProvider       clusters.ClusterInfoProvider
 	Model                     *CostModel

+ 2 - 2
pkg/customcost/ingestor.go

@@ -159,7 +159,7 @@ func (ing *CustomCostIngestor) BuildWindow(start, end time.Time) {
 }
 
 func (ing *CustomCostIngestor) buildSingleDomain(start, end time.Time, domain string) {
-	req := pb.CustomCostRequest{
+	req := &pb.CustomCostRequest{
 		Start:      timestamppb.New(start),
 		End:        timestamppb.New(end),
 		Resolution: durationpb.New(ing.resolution),
@@ -203,7 +203,7 @@ func (ing *CustomCostIngestor) buildSingleDomain(start, end time.Time, domain st
 		}
 		log.Debugf("BuildWindow[%s]: GetCustomCost: writing custom costs for window %v-%v: %d", domain, ccr.Start, ccr.End, len(ccr.Costs))
 
-		err2 := ing.repo.Put(&ccr)
+		err2 := ing.repo.Put(ccr)
 		if err2 != nil {
 			log.Errorf("CustomCost[%s]: ingestor: failed to save Custom Cost Set with window %v-%v: %s", domain, ccr.Start, ccr.End, err2.Error())
 		}

+ 1 - 1
pkg/customcost/memoryrepository.go

@@ -50,7 +50,7 @@ func (m *MemoryRepository) Get(startTime time.Time, domain string) (*pb.CustomCo
 		return nil, nil
 	}
 
-	var ccr *pb.CustomCostResponse
+	ccr := &pb.CustomCostResponse{}
 	err := proto.Unmarshal(b, ccr)
 	if err != nil {
 		return nil, fmt.Errorf("error unmarshalling data: %w")

+ 3 - 4
pkg/customcost/pipelineservice.go

@@ -16,7 +16,6 @@ import (
 	ocplugin "github.com/opencost/opencost/core/pkg/plugin"
 	proto "github.com/opencost/opencost/core/pkg/protocol"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
-	"github.com/opencost/opencost/core/pkg/version"
 )
 
 var protocol = proto.HTTP()
@@ -55,11 +54,11 @@ func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.
 		return nil, nil
 	}
 
-	log.Infof("requiring plugins matching your architecture: " + version.Architecture)
+	log.Infof("requiring plugins matching your architecture: " + runtime.GOARCH)
 	configs := map[string]*plugin.ClientConfig{}
 	// set up the client config
 	for name, config := range pluginNames {
-		file := fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, version.Architecture)
+		file := fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, runtime.GOARCH)
 		log.Debugf("looking for file: %s", file)
 		if _, err := os.Stat(file); err != nil {
 			msg := fmt.Sprintf("error reading executable for %s plugin. Plugin executables must be in %s and have name format <plugin name>.ocplugin.<os>.<opencost binary archtecture (arm64 or amd64)>", name, execDir)
@@ -86,7 +85,7 @@ func getRegisteredPlugins(configDir string, execDir string) (map[string]*plugin.
 		configs[name] = &plugin.ClientConfig{
 			HandshakeConfig:  handshakeConfig,
 			Plugins:          pluginMap,
-			Cmd:              exec.Command(fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, version.Architecture), config),
+			Cmd:              exec.Command(fmt.Sprintf(execFmt, execDir, name, runtime.GOOS, runtime.GOARCH), config),
 			Logger:           logger,
 			AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
 		}

+ 2 - 4
pkg/customcost/pipelineservice_test.go

@@ -11,7 +11,6 @@ import (
 
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
-	"github.com/opencost/opencost/core/pkg/version"
 )
 
 func TestPipelineService(t *testing.T) {
@@ -28,7 +27,6 @@ func TestPipelineService(t *testing.T) {
 	if err != nil {
 		t.Fatalf("error creating temp exec dir: %v", err)
 	}
-	version.Architecture = runtime.GOARCH
 	// write DD secrets to config files
 	// write config file to temp dir
 	writeDDConfig(dir+"/config", t)
@@ -134,8 +132,8 @@ func TestPipelineService(t *testing.T) {
 }
 
 func downloadLatestPluginExec(dirName string, t *testing.T) {
-	ddPluginURL := "https://github.com/opencost/opencost-plugins/releases/download/v0.0.3/datadog.ocplugin." + runtime.GOOS + "." + version.Architecture
-	out, err := os.OpenFile(dirName+"/datadog.ocplugin."+runtime.GOOS+"."+version.Architecture, 0755|os.O_CREATE, 0755)
+	ddPluginURL := "https://github.com/opencost/opencost-plugins/releases/download/v0.0.3/datadog.ocplugin." + runtime.GOOS + "." + runtime.GOARCH
+	out, err := os.OpenFile(dirName+"/datadog.ocplugin."+runtime.GOOS+"."+runtime.GOARCH, 0755|os.O_CREATE, 0755)
 	if err != nil {
 		t.Fatalf("error creating executable file: %v", err)
 	}

+ 44 - 0
pkg/customcost/props.go

@@ -0,0 +1,44 @@
+package customcost
+
+import (
+	"fmt"
+	"strings"
+)
+
+type CustomCostProperty string
+
+const (
+	CustomCostDomainProp CustomCostProperty = "domain"
+)
+
+func ParseCustomCostProperties(props []string) ([]string, error) {
+	var properties []string
+	added := make(map[CustomCostProperty]struct{})
+
+	for _, prop := range props {
+		property, err := ParseCustomCostProperty(prop)
+		if err != nil {
+			return nil, fmt.Errorf("Failed to parse property: %w", err)
+		}
+
+		if _, ok := added[property]; !ok {
+			added[property] = struct{}{}
+			properties = append(properties, string(property))
+		}
+	}
+
+	if len(properties) == 0 {
+		properties = []string{string(CustomCostDomainProp)}
+	}
+
+	return properties, nil
+}
+
+func ParseCustomCostProperty(text string) (CustomCostProperty, error) {
+	switch strings.TrimSpace(strings.ToLower(text)) {
+	case strings.TrimSpace(strings.ToLower(string(CustomCostDomainProp))):
+		return CustomCostDomainProp, nil
+	}
+
+	return "", fmt.Errorf("invalid custom cost property: %s", text)
+}

+ 143 - 0
pkg/customcost/querier.go

@@ -0,0 +1,143 @@
+package customcost
+
+import (
+	"context"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
+)
+
+type Querier struct {
+	hourlyRepo     Repository
+	dailyRepo      Repository
+	hourlyDuration time.Duration
+	dailyDuration  time.Duration
+}
+
+func NewQuerier(hourlyRepo, dailyRepo Repository, hourlyDuration, dailyDuration time.Duration) *Querier {
+	return &Querier{
+		hourlyRepo:     hourlyRepo,
+		dailyRepo:      dailyRepo,
+		hourlyDuration: hourlyDuration,
+		dailyDuration:  dailyDuration,
+	}
+}
+
+func (q *Querier) QueryTotal(request CostTotalRequest, ctx context.Context) (*CostResponse, error) {
+	repo, start, end, step := q.parseRequest(request.Start, request.End, request.Step)
+
+	domains, err := repo.Keys()
+	if err != nil {
+		return nil, fmt.Errorf("QueryTotal: %w", err)
+	}
+
+	requestWindow := opencost.NewClosedWindow(request.Start, request.End)
+	ccs := NewCustomCostSet(requestWindow)
+
+	queryStart := start
+	for queryStart.Before(end) {
+		queryEnd := queryStart.Add(step)
+
+		for _, domain := range domains {
+			ccResponse, err := repo.Get(queryStart, domain)
+			if err != nil {
+				return nil, fmt.Errorf("QueryTotal: %w", err)
+			} else if ccResponse == nil {
+				continue
+			}
+
+			customCosts := ParseCustomCostResponse(ccResponse)
+			ccs.Add(customCosts)
+		}
+
+		queryStart = queryEnd
+	}
+
+	err = ccs.Aggregate(request.AggregateBy)
+	if err != nil {
+		return nil, err
+	}
+
+	return NewCostResponse(ccs), nil
+}
+
+func (q *Querier) QueryTimeseries(request CostTimeseriesRequest, ctx context.Context) (*CostTimeseriesResponse, error) {
+	_, start, end, step := q.parseRequest(request.Start, request.End, request.Step)
+
+	windows, err := opencost.GetWindows(start, end, step)
+	if err != nil {
+		return nil, fmt.Errorf("error getting timeseries windows: %w", err)
+	}
+
+	totals := make([]*CostResponse, len(windows))
+	errors := make([]error, len(windows))
+
+	// Query concurrently for each result, error
+	var wg sync.WaitGroup
+	wg.Add(len(windows))
+
+	for i, w := range windows {
+		go func(i int, window opencost.Window, res []*CostResponse) {
+			defer wg.Done()
+			totals[i], errors[i] = q.QueryTotal(CostTotalRequest{
+				Start:       *window.Start(),
+				End:         *window.End(),
+				AggregateBy: request.AggregateBy,
+				Filter:      request.Filter,
+				Step:        step,
+			}, ctx)
+		}(i, w, totals)
+	}
+
+	wg.Wait()
+
+	// Return an error if any errors occurred
+	for i, err := range errors {
+		if err != nil {
+			return nil, fmt.Errorf("one of %d errors: error querying costs for %s: %w", numErrors(errors), windows[i], err)
+		}
+	}
+
+	result := &CostTimeseriesResponse{
+		Window:     opencost.NewClosedWindow(start, end),
+		Timeseries: totals,
+	}
+
+	return result, nil
+}
+
+func (q *Querier) parseRequest(requestStart, requestEnd time.Time, requestStep time.Duration) (Repository, time.Time, time.Time, time.Duration) {
+	oldestHourlyData := time.Now().UTC().Add(-q.hourlyDuration)
+
+	var step time.Duration
+	var repo Repository
+	if (requestStart.After(oldestHourlyData) || (requestStep == time.Hour)) &&
+		(requestStep != timeutil.Day) {
+		step = time.Hour
+		repo = q.hourlyRepo
+	} else {
+		step = timeutil.Day
+		repo = q.dailyRepo
+	}
+	start := opencost.RoundBack(requestStart, step)
+	end := opencost.RoundBack(requestEnd, step)
+
+	if requestStep != 0 {
+		step = requestStep
+	}
+
+	return repo, start, end, step
+}
+
+func numErrors(errors []error) int {
+	numErrs := 0
+	for i := range errors {
+		if errors[i] != nil {
+			numErrs++
+		}
+	}
+	return numErrs
+}

+ 96 - 0
pkg/customcost/queryservice.go

@@ -0,0 +1,96 @@
+package customcost
+
+import (
+	"fmt"
+	"net/http"
+
+	"github.com/julienschmidt/httprouter"
+	"github.com/opencost/opencost/core/pkg/util/httputil"
+	"go.opentelemetry.io/otel"
+)
+
+const tracerName = "github.com/opencost/opencost/pkg/customcost"
+
+type QueryService struct {
+	Querier *Querier
+}
+
+func NewQueryService(querier *Querier) *QueryService {
+	return &QueryService{
+		Querier: querier,
+	}
+}
+
+func (qs *QueryService) GetCustomCostTotalHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+		tracer := otel.Tracer(tracerName)
+		ctx, span := tracer.Start(r.Context(), "Service.GetCustomCostTotalHandler")
+		defer span.End()
+
+		// If Query Service is nil, always return 501
+		if qs == nil {
+			http.Error(w, "Query Service is nil", http.StatusNotImplemented)
+			return
+		}
+
+		if qs.Querier == nil {
+			http.Error(w, "CustomCost Query Service is nil", http.StatusNotImplemented)
+			return
+		}
+
+		qp := httputil.NewQueryParams(r.URL.Query())
+		request, err := ParseCustomCostTotalRequest(qp)
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusBadRequest)
+			return
+		}
+
+		resp, err := qs.Querier.QueryTotal(*request, ctx)
+		if err != nil {
+			http.Error(w, fmt.Sprintf("Internal server error: %s", err), http.StatusInternalServerError)
+			return
+		}
+
+		_, spanResp := tracer.Start(ctx, "write response")
+		w.Header().Set("Content-Type", "application/json")
+		protocol.WriteData(w, resp)
+		spanResp.End()
+	}
+}
+
+func (qs *QueryService) GetCustomCostTimeseriesHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+		tracer := otel.Tracer(tracerName)
+		ctx, span := tracer.Start(r.Context(), "Service.GetCustomCostTimeseriesHandler")
+		defer span.End()
+
+		// If Query Service is nil, always return 501
+		if qs == nil {
+			http.Error(w, "Query Service is nil", http.StatusNotImplemented)
+			return
+		}
+
+		if qs.Querier == nil {
+			http.Error(w, "CustomCost Query Service is nil", http.StatusNotImplemented)
+			return
+		}
+
+		qp := httputil.NewQueryParams(r.URL.Query())
+		request, err := ParseCustomCostTimeseriesRequest(qp)
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusBadRequest)
+			return
+		}
+
+		resp, err := qs.Querier.QueryTimeseries(*request, ctx)
+		if err != nil {
+			http.Error(w, fmt.Sprintf("Internal server error: %s", err), http.StatusInternalServerError)
+			return
+		}
+
+		_, spanResp := tracer.Start(ctx, "write response")
+		w.Header().Set("Content-Type", "application/json")
+		protocol.WriteData(w, resp)
+		spanResp.End()
+	}
+}

+ 92 - 0
pkg/customcost/queryservice_helper.go

@@ -0,0 +1,92 @@
+package customcost
+
+import (
+	"fmt"
+
+	"github.com/opencost/opencost/core/pkg/filter"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/util/httputil"
+)
+
+func ParseCustomCostTotalRequest(qp httputil.QueryParams) (*CostTotalRequest, error) {
+	windowStr := qp.Get("window", "")
+	if windowStr == "" {
+		return nil, fmt.Errorf("missing require window param")
+	}
+
+	window, err := opencost.ParseWindowUTC(windowStr)
+	if err != nil {
+		return nil, fmt.Errorf("invalid window parameter: %w", err)
+	}
+	if window.IsOpen() {
+		return nil, fmt.Errorf("invalid window parameter: %s", window.String())
+	}
+
+	aggregateByRaw := qp.GetList("aggregate", ",")
+	aggregateBy, err := ParseCustomCostProperties(aggregateByRaw)
+	if err != nil {
+		return nil, err
+	}
+
+	var filter filter.Filter
+	//filterString := qp.Get("filter", "")
+	//if filterString != "" {
+	//	parser := cloudcost.NewCloudCostFilterParser()
+	//	filter, err = parser.Parse(filterString)
+	//	if err != nil {
+	//		return nil, fmt.Errorf("parsing 'filter' parameter: %s", err)
+	//	}
+	//}
+
+	opts := &CostTotalRequest{
+		Start:       *window.Start(),
+		End:         *window.End(),
+		AggregateBy: aggregateBy,
+		Filter:      filter,
+	}
+
+	return opts, nil
+}
+
+func ParseCustomCostTimeseriesRequest(qp httputil.QueryParams) (*CostTimeseriesRequest, error) {
+	windowStr := qp.Get("window", "")
+	if windowStr == "" {
+		return nil, fmt.Errorf("missing require window param")
+	}
+
+	window, err := opencost.ParseWindowUTC(windowStr)
+	if err != nil {
+		return nil, fmt.Errorf("invalid window parameter: %w", err)
+	}
+	if window.IsOpen() {
+		return nil, fmt.Errorf("invalid window parameter: %s", window.String())
+	}
+
+	aggregateByRaw := qp.GetList("aggregate", ",")
+	aggregateBy, err := ParseCustomCostProperties(aggregateByRaw)
+	if err != nil {
+		return nil, err
+	}
+
+	step := qp.GetDuration("step", 0)
+
+	var filter filter.Filter
+	//filterString := qp.Get("filter", "")
+	//if filterString != "" {
+	//	parser := cloudcost.NewCloudCostFilterParser()
+	//	filter, err = parser.Parse(filterString)
+	//	if err != nil {
+	//		return nil, fmt.Errorf("parsing 'filter' parameter: %s", err)
+	//	}
+	//}
+
+	opts := &CostTimeseriesRequest{
+		Start:       *window.Start(),
+		End:         *window.End(),
+		AggregateBy: aggregateBy,
+		Step:        step,
+		Filter:      filter,
+	}
+
+	return opts, nil
+}

+ 165 - 0
pkg/customcost/types.go

@@ -0,0 +1,165 @@
+package customcost
+
+import (
+	"fmt"
+	"strings"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/filter"
+	"github.com/opencost/opencost/core/pkg/model/pb"
+	"github.com/opencost/opencost/core/pkg/opencost"
+)
+
+type CostTotalRequest struct {
+	Start       time.Time
+	End         time.Time
+	AggregateBy []string
+	Step        time.Duration
+	Filter      filter.Filter
+}
+
+type CostTimeseriesRequest struct {
+	Start       time.Time
+	End         time.Time
+	AggregateBy []string
+	Step        time.Duration
+	Filter      filter.Filter
+}
+
+type CostResponse struct {
+	Window          opencost.Window `json:"window"`
+	TotalBilledCost float32         `json:"totalBilledCost"`
+	TotalListCost   float32         `json:"totalListCost"`
+	CustomCosts     []*CustomCost   `json:"customCosts"`
+}
+
+type CustomCost struct {
+	Id             string  `json:"id"`
+	Zone           string  `json:"zone"`
+	AccountName    string  `json:"account_name"`
+	ChargeCategory string  `json:"charge_category"`
+	Description    string  `json:"description"`
+	ResourceName   string  `json:"resource_name"`
+	ResourceType   string  `json:"resource_type"`
+	ProviderId     string  `json:"provider_id"`
+	BilledCost     float32 `json:"billedCost"`
+	ListCost       float32 `json:"listCost"`
+	ListUnitPrice  float32 `json:"list_unit_price"`
+	UsageQuantity  float32 `json:"usage_quantity"`
+	UsageUnit      string  `json:"usage_unit"`
+	Domain         string  `json:"domain"`
+	Aggregate      string  `json:"aggregate"`
+}
+
+type CostTimeseriesResponse struct {
+	Window     opencost.Window `json:"window"`
+	Timeseries []*CostResponse `json:"timeseries"`
+}
+
+func NewCostResponse(ccs *CustomCostSet) *CostResponse {
+	costResponse := &CostResponse{
+		Window:      ccs.Window,
+		CustomCosts: []*CustomCost{},
+	}
+
+	for _, cc := range ccs.CustomCosts {
+		costResponse.TotalBilledCost += cc.BilledCost
+		costResponse.TotalListCost += cc.ListCost
+		costResponse.CustomCosts = append(costResponse.CustomCosts, cc)
+	}
+
+	return costResponse
+}
+
+func ParseCustomCostResponse(ccResponse *pb.CustomCostResponse) []*CustomCost {
+	costs := ccResponse.GetCosts()
+
+	customCosts := make([]*CustomCost, len(costs))
+	for i, cost := range costs {
+		customCosts[i] = &CustomCost{
+			Id:             cost.GetId(),
+			Zone:           cost.GetZone(),
+			AccountName:    cost.GetAccountName(),
+			ChargeCategory: cost.GetChargeCategory(),
+			Description:    cost.GetDescription(),
+			ResourceName:   cost.GetResourceName(),
+			ResourceType:   cost.GetResourceType(),
+			ProviderId:     cost.GetProviderId(),
+			BilledCost:     cost.GetBilledCost(),
+			ListCost:       cost.GetListCost(),
+			ListUnitPrice:  cost.GetListUnitPrice(),
+			UsageQuantity:  cost.GetUsageQuantity(),
+			UsageUnit:      cost.GetUsageUnit(),
+			Domain:         ccResponse.GetDomain(),
+		}
+	}
+
+	return customCosts
+}
+
+func (cc *CustomCost) Add(other *CustomCost) {
+	cc.BilledCost += other.BilledCost
+	cc.ListCost += other.ListCost
+	cc.ListUnitPrice += other.ListUnitPrice
+	cc.UsageQuantity += other.UsageQuantity
+}
+
+type CustomCostSet struct {
+	CustomCosts []*CustomCost
+	Window      opencost.Window
+}
+
+func NewCustomCostSet(window opencost.Window) *CustomCostSet {
+	return &CustomCostSet{
+		CustomCosts: []*CustomCost{},
+		Window:      window,
+	}
+}
+
+func (ccs *CustomCostSet) Add(customCosts []*CustomCost) {
+	ccs.CustomCosts = append(ccs.CustomCosts, customCosts...)
+}
+
+func (ccs *CustomCostSet) Aggregate(aggregateBy []string) error {
+	if len(aggregateBy) == 0 {
+		return fmt.Errorf("found empty aggregateBy")
+	}
+
+	aggMap := make(map[string]*CustomCost)
+	for _, cc := range ccs.CustomCosts {
+		aggKey, err := generateAggKey(cc, aggregateBy)
+		if err != nil {
+			return fmt.Errorf("failed to aggregate CustomCostSet: %w", err)
+		}
+		cc.Aggregate = aggKey
+
+		if existing, ok := aggMap[aggKey]; ok {
+			existing.Add(cc)
+		} else {
+			aggMap[aggKey] = cc
+		}
+	}
+
+	var newCustomCosts []*CustomCost
+	for _, customCost := range aggMap {
+		newCustomCosts = append(newCustomCosts, customCost)
+	}
+	ccs.CustomCosts = newCustomCosts
+
+	return nil
+}
+
+func generateAggKey(cc *CustomCost, aggregateBy []string) (string, error) {
+	var aggKeys []string
+	for _, agg := range aggregateBy {
+		// TODO only domain is supported currently
+		if agg == string(CustomCostDomainProp) {
+			aggKeys = append(aggKeys, cc.Domain)
+		} else {
+			return "", fmt.Errorf("unsupported aggregation type: %s", agg)
+		}
+	}
+	aggKey := strings.Join(aggKeys, "/")
+
+	return aggKey, nil
+}