Explorar o código

Allocation ETL: on-demand external cost: fix naming convention

Niko Kovacevic %!s(int64=5) %!d(string=hai) anos
pai
achega
f8050e3f82
Modificáronse 3 ficheiros con 174 adicións e 59 borrados
  1. 166 53
      pkg/kubecost/allocation.go
  2. 2 0
      pkg/kubecost/asset.go
  3. 6 6
      pkg/kubecost/asset_test.go

+ 166 - 53
pkg/kubecost/allocation.go

@@ -11,6 +11,14 @@ import (
 	"github.com/kubecost/cost-model/pkg/log"
 )
 
+// TODO Clean-up use of IsEmpty; nil checks should be separated for safety.
+
+// TODO Consider making Allocation an interface, which is fulfilled by structs
+// like KubernetesAllocation, IdleAllocation, and ExternalAllocation.
+
+// ExternalSuffix indicates an external allocation
+const ExternalSuffix = "__external__"
+
 // IdleSuffix indicates an idle allocation property
 const IdleSuffix = "__idle__"
 
@@ -56,7 +64,6 @@ type Allocation struct {
 	ExternalCost    float64    `json:"externalCost"`
 	TotalCost       float64    `json:"totalCost"`
 	TotalEfficiency float64    `json:"totalEfficiency"`
-	// Profiler        *log.Profiler `json:"-"`
 }
 
 // AllocationMatchFunc is a function that can be used to match Allocations by
@@ -76,7 +83,6 @@ func (a *Allocation) Add(that *Allocation) (*Allocation, error) {
 	}
 
 	agg := a.Clone()
-	// agg.Profiler = a.Profiler
 	agg.add(that, false, false)
 
 	return agg, nil
@@ -198,6 +204,11 @@ func (a *Allocation) IsAggregated() bool {
 	return a == nil || a.Properties == nil
 }
 
+// IsExternal is true if the given Allocation represents external costs.
+func (a *Allocation) IsExternal() bool {
+	return strings.Contains(a.Name, ExternalSuffix)
+}
+
 // IsIdle is true if the given Allocation represents idle costs.
 func (a *Allocation) IsIdle() bool {
 	return strings.Contains(a.Name, IdleSuffix)
@@ -334,20 +345,22 @@ func (a *Allocation) add(that *Allocation, isShared, isAccumulating bool) {
 // a window. An AllocationSet is mutable, so treat it like a threadsafe map.
 type AllocationSet struct {
 	sync.RWMutex
-	// Profiler    *log.Profiler
-	allocations map[string]*Allocation
-	idleKeys    map[string]bool
-	Window      Window
-	Warnings    []string
-	Errors      []string
+	allocations  map[string]*Allocation
+	externalKeys map[string]bool
+	idleKeys     map[string]bool
+	Window       Window
+	Warnings     []string
+	Errors       []string
 }
 
 // NewAllocationSet instantiates a new AllocationSet and, optionally, inserts
 // the given list of Allocations
 func NewAllocationSet(start, end time.Time, allocs ...*Allocation) *AllocationSet {
 	as := &AllocationSet{
-		allocations: map[string]*Allocation{},
-		Window:      NewWindow(&start, &end),
+		allocations:  map[string]*Allocation{},
+		externalKeys: map[string]bool{},
+		idleKeys:     map[string]bool{},
+		Window:       NewWindow(&start, &end),
 	}
 
 	for _, a := range allocs {
@@ -378,20 +391,25 @@ type AllocationAggregationOptions struct {
 // given Property; e.g. Containers can be divided by Namespace, but not vice-a-versa.
 func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationAggregationOptions) error {
 	// The order of operations for aggregating allocations is as follows:
-	// 1. move shared and/or idle allocations to separate sets if options
-	//    indicate that they should be shared
-	// 2. idle coefficients
-	// 2.a) if idle allocation is to be shared, compute idle coefficients
-	//      (do not compute shared coefficients here, see step 5)
-	// 2.b) if idle allocation is NOT shared, but filters are present, compute
-	//      idle filtration coefficients for the purpose of only returning the
-	//      portion of idle allocation that would have been shared with the
-	//      unfiltered results set. (See unit tests 5.a,b,c)
-	// 3. ignore allocation if it fails any of the FilterFuncs
-	// 4. generate aggregation key and insert allocation into the output set
-	// 5. if there are shared allocations, compute sharing coefficients on
+	// 1. Partition external, idle, and shared allocations into separate sets
+	// 2. Compute idle coefficients (if necessary)
+	//    a) if idle allocation is to be shared, compute idle coefficients
+	//       (do not compute shared coefficients here, see step 5)
+	//    b) if idle allocation is NOT shared, but filters are present, compute
+	//       idle filtration coefficients for the purpose of only returning the
+	//       portion of idle allocation that would have been shared with the
+	//       unfiltered results set. (See unit tests 5.a,b,c)
+	// 3. Ignore allocation if it fails any of the FilterFuncs
+	// 4. Distribute idle allocations among remaining non-idle, non-external
+	//    allocations
+	// 5. Generate aggregation key and insert allocation into the output set
+	// 6. Scale un-aggregated idle coefficients by filtration coefficient
+	// 7. If there are shared allocations, compute sharing coefficients on
 	//    the aggregated set, then share allocation accordingly
-	// 6. if the merge idle option is enabled, merge any remaining idle
+	// 8. If there are external allocations that can be aggregated into
+	//    the output (i.e. they can be used to generate a valid key for
+	//    the given properties) then aggregate; otherwise... ignore them?
+	// 9. If the merge idle option is enabled, merge any remaining idle
 	//    allocations into a single idle allocation
 
 	// TODO niko/etl revisit (ShareIdle: ShareEven) case, which is probably wrong
@@ -412,24 +430,27 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 
 	// aggSet will collect the aggregated allocations
 	aggSet := &AllocationSet{
-		// Profiler: as.Profiler,
+		Window: as.Window.Clone(),
+	}
+
+	// externalSet will collect external allocations
+	externalSet := &AllocationSet{
 		Window: as.Window.Clone(),
 	}
 
 	// idleSet will be shared among aggSet after initial aggregation
 	// is complete
 	idleSet := &AllocationSet{
-		// Profiler: as.Profiler,
 		Window: as.Window.Clone(),
 	}
 
 	// shareSet will be shared among aggSet after initial aggregation
 	// is complete
 	shareSet := &AllocationSet{
-		// Profiler: as.Profiler
 		Window: as.Window.Clone(),
 	}
 
+	// Convert SharedHourlyCosts to Allocations in the shareSet
 	for name, cost := range options.SharedHourlyCosts {
 		if cost > 0.0 {
 			hours := as.Resolution().Hours()
@@ -455,21 +476,27 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 	as.Lock()
 	defer as.Unlock()
 
-	// Loop and find all of the idle and shared allocations initially. Add
-	// them to their respective sets, removing them from the set of
-	// allocations to aggregate.
+	// (1) Loop and find all of the external, idle, and shared allocations. Add
+	// them to their respective sets, removing them from the set of allocations
+	// to aggregate.
 	for _, alloc := range as.allocations {
+		// External allocations get aggregated post-hoc (see step 6) and do
+		// not necessarily contain complete sets of properties, so they are
+		// moved to a separate AllocationSet.
+		if alloc.IsExternal() {
+			delete(as.allocations, alloc.Name)
+			externalSet.Insert(alloc)
+		}
+
 		cluster, err := alloc.Properties.GetCluster()
 		if err != nil {
 			log.Warningf("AllocationSet.AggregateBy: missing cluster for allocation: %s", alloc.Name)
 			return err
 		}
 
-		// Idle allocation doesn't get aggregated, so it can be passed through,
-		// whether or not it is shared. If it is shared, it is put in idleSet
-		// because shareSet may be split by different rules (even/weighted).
+		// Idle allocations should be separated into idleSet if they are to be
+		// shared later on. If they are not to be shared, then aggregate them.
 		if alloc.IsIdle() {
-			// Can't recursively call Delete() due to lock acquisition
 			delete(as.idleKeys, alloc.Name)
 			delete(as.allocations, alloc.Name)
 
@@ -480,12 +507,11 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 			}
 		}
 
-		// If any of the share funcs succeed, share the allocation. Do this
-		// prior to filtering so that shared namespaces, etc do not get
-		// filtered out before we have a chance to share them.
+		// Shared allocations must be identified and separated prior to
+		// aggregation and filtering. That is, if any of the ShareFuncs
+		// return true, then move the allocation to shareSet.
 		for _, sf := range options.ShareFuncs {
 			if sf(alloc) {
-				// Can't recursively call Delete() due to lock acquisition
 				delete(as.idleKeys, alloc.Name)
 				delete(as.allocations, alloc.Name)
 
@@ -496,6 +522,8 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		}
 	}
 
+	// It's possible that no more un-shared, non-idle, non-external allocations
+	// remain at this point. This always results in an emptySet.
 	if len(as.allocations) == 0 {
 		log.Warningf("ETL: AggregateBy: no allocations to aggregate")
 		emptySet := &AllocationSet{
@@ -505,15 +533,21 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		return nil
 	}
 
-	// In order to correctly apply idle and shared resource coefficients appropriately,
-	// we need to determine the coefficients for the full set of data. The ensures that
-	// the ratios are maintained through filtering.
+	// (2) In order to correctly apply idle and shared resource coefficients
+	// appropriately, we need to determine the coefficients for the full set
+	// of data. The ensures that the ratios are maintained through filtering.
+
 	// idleCoefficients are organized by [cluster][allocation][resource]=coeff
 	var idleCoefficients map[string]map[string]map[string]float64
+
 	// shareCoefficients are organized by [allocation][resource]=coeff (no cluster)
 	var shareCoefficients map[string]float64
+
 	var err error
 
+	// (2a) If there are idle costs and we intend to share them, compute the
+	// coefficients for sharing the cost among the non-idle, non-aggregated
+	// allocations.
 	if idleSet.Length() > 0 && options.ShareIdle != ShareNone {
 		idleCoefficients, err = computeIdleCoeffs(properties, options, as)
 		if err != nil {
@@ -522,7 +556,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		}
 	}
 
-	// If we're not sharing idle and we're filtering, we need to track the
+	// (2b) If we're not sharing idle and we're filtering, we need to track the
 	// amount of each idle allocation to "delete" in order to maintain parity
 	// with the idle-allocated results. That is, we want to return only the
 	// idle cost that would have been shared with the unfiltered portion of
@@ -532,10 +566,11 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		idleFiltrationCoefficients, err = computeIdleCoeffs(properties, options, as)
 		if err != nil {
 			log.Warningf("AllocationSet.AggregateBy: compute idle coeff: %s", err)
-			return err
+			return fmt.Errorf("error computing idle filtration coefficients: %s", err)
 		}
 	}
 
+	// (3-5) Filter, distribute idle cost, and aggregate (in that order)
 	for _, alloc := range as.allocations {
 		cluster, err := alloc.Properties.GetCluster()
 		if err != nil {
@@ -545,7 +580,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 
 		skip := false
 
-		// If any of the filter funcs fail, immediately skip the allocation.
+		// (3) If any of the filter funcs fail, immediately skip the allocation.
 		for _, ff := range options.FilterFuncs {
 			if !ff(alloc) {
 				skip = true
@@ -566,9 +601,11 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 			continue
 		}
 
-		// Split idle allocations and distribute among aggregated allocations
-		// NOTE: if idle allocation is off (i.e. ShareIdle == ShareNone) then all
-		// idle allocations will be in the aggSet at this point.
+		// (4) Split idle allocations and distribute among remaining
+		// un-aggregated allocations.
+		// NOTE: if idle allocation is off (i.e. ShareIdle == ShareNone) then
+		// all idle allocations will be in the aggSet at this point, so idleSet
+		// will be empty and we won't enter this block.
 		if idleSet.Length() > 0 {
 			// Distribute idle allocations by coefficient per-cluster, per-allocation
 			for _, idleAlloc := range idleSet.allocations {
@@ -606,6 +643,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 			}
 		}
 
+		// (5) generate key to use for aggregation-by-key and allocation name
 		key, err := alloc.generateKey(properties)
 		if err != nil {
 			return err
@@ -616,9 +654,15 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 			alloc.Name = UnallocatedSuffix
 		}
 
+		// Inserting the allocation with the generated key for a name will
+		// perform the actual basic aggregation step.
 		aggSet.Insert(alloc)
 	}
 
+	// clusterIdleFiltrationCoeffs is used to track per-resource idle
+	// coefficients on a cluster-by-cluster basis. It is, essentailly, an
+	// aggregation of idleFiltrationCoefficients after they have been
+	// filtered above (in step 3)
 	var clusterIdleFiltrationCoeffs map[string]map[string]float64
 	if idleFiltrationCoefficients != nil {
 		clusterIdleFiltrationCoeffs = map[string]map[string]float64{}
@@ -640,9 +684,10 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		}
 	}
 
-	// If we have filters, and so have computed coefficients for scaling idle
-	// allocation costs by cluster, then use those coefficients to scale down
-	// each idle coefficient in the aggSet.
+	// (6) If we have both un-shared idle allocations and idle filtration
+	// coefficients (i.e. we have computed coefficients for scaling idle
+	// allocation costs by cluster) then use those coefficients to scale down
+	// each idle allocation.
 	if len(aggSet.idleKeys) > 0 && clusterIdleFiltrationCoeffs != nil {
 		for idleKey := range aggSet.idleKeys {
 			idleAlloc := aggSet.Get(idleKey)
@@ -663,7 +708,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		}
 	}
 
-	// Split shared allocations and distribute among aggregated allocations
+	// (7) Split shared allocations and distribute among aggregated allocations
 	if shareSet.Length() > 0 {
 		shareCoefficients, err = computeShareCoeffs(properties, options, aggSet)
 		if err != nil {
@@ -692,7 +737,25 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		}
 	}
 
-	// Combine all idle allocations into a single "__idle__" allocation
+	// (8) Aggregate external allocations into aggregated allocations. This may
+	// not be possible for every external allocation, but attempt to find an
+	// exact key match, given each external allocation's proerties, and
+	// aggregate if an exact match is found.
+	for _, alloc := range externalSet.allocations {
+		key, err := alloc.generateKey(properties)
+		if err != nil {
+			// TODO niko/allocation-etl remove log after testing
+			log.Infof("ExternalAllocations: AggregateBy: skipping %s: %s", alloc.Name, err)
+			fmt.Printf(" - skipping %s: %s\n", alloc.Name, err)
+			continue
+		}
+
+		alloc.Name = key
+		fmt.Printf(" - inserting %s: %.5f (%.5f)\n", alloc.Name, alloc.ExternalCost, alloc.TotalCost)
+		aggSet.Insert(alloc)
+	}
+
+	// (9) Combine all idle allocations into a single "__idle__" allocation
 	if !options.SplitIdle {
 		for _, idleAlloc := range aggSet.IdleAllocations() {
 			aggSet.Delete(idleAlloc.Name)
@@ -1010,6 +1073,7 @@ func (alloc *Allocation) generateKey(properties Properties) (string, error) {
 	return strings.Join(names, "/"), nil
 }
 
+// TODO clean up
 // Helper function to check for slice membership. Not sure if repeated elsewhere in our codebase.
 func indexOf(v string, arr []string) int {
 	for i, s := range arr {
@@ -1036,9 +1100,21 @@ func (as *AllocationSet) Clone() *AllocationSet {
 		allocs[k] = v.Clone()
 	}
 
+	externalKeys := map[string]bool{}
+	for k, v := range as.externalKeys {
+		externalKeys[k] = v
+	}
+
+	idleKeys := map[string]bool{}
+	for k, v := range as.idleKeys {
+		idleKeys[k] = v
+	}
+
 	return &AllocationSet{
-		allocations: allocs,
-		Window:      as.Window.Clone(),
+		allocations:  allocs,
+		externalKeys: externalKeys,
+		idleKeys:     idleKeys,
+		Window:       as.Window.Clone(),
 	}
 }
 
@@ -1159,6 +1235,7 @@ func (as *AllocationSet) Delete(name string) {
 
 	as.Lock()
 	defer as.Unlock()
+	delete(as.externalKeys, name)
 	delete(as.idleKeys, name)
 	delete(as.allocations, name)
 }
@@ -1199,6 +1276,27 @@ func (as *AllocationSet) Get(key string) *Allocation {
 	return nil
 }
 
+// ExternalAllocations returns a map of the external allocations in the set.
+// Returns clones of the actual Allocations, so mutability is not a problem.
+func (as *AllocationSet) ExternalAllocations() map[string]*Allocation {
+	externals := map[string]*Allocation{}
+
+	if as.IsEmpty() {
+		return externals
+	}
+
+	as.RLock()
+	defer as.RUnlock()
+
+	for key := range as.externalKeys {
+		if alloc, ok := as.allocations[key]; ok {
+			externals[key] = alloc.Clone()
+		}
+	}
+
+	return externals
+}
+
 // IdleAllocations returns a map of the idle allocations in the AllocationSet.
 // Returns clones of the actual Allocations, so mutability is not a problem.
 func (as *AllocationSet) IdleAllocations() map[string]*Allocation {
@@ -1239,6 +1337,10 @@ func (as *AllocationSet) insert(that *Allocation, accumulate bool) error {
 		as.allocations = map[string]*Allocation{}
 	}
 
+	if as.externalKeys == nil {
+		as.externalKeys = map[string]bool{}
+	}
+
 	if as.idleKeys == nil {
 		as.idleKeys = map[string]bool{}
 	}
@@ -1251,6 +1353,11 @@ func (as *AllocationSet) insert(that *Allocation, accumulate bool) error {
 		as.allocations[that.Name].add(that, false, accumulate)
 	}
 
+	// If the given Allocation is an external one, record that
+	if that.IsExternal() {
+		as.externalKeys[that.Name] = true
+	}
+
 	// If the given Allocation is an idle one, record that
 	if that.IsIdle() {
 		as.idleKeys[that.Name] = true
@@ -1309,6 +1416,7 @@ func (as *AllocationSet) Set(alloc *Allocation) error {
 	if as.IsEmpty() {
 		as.Lock()
 		as.allocations = map[string]*Allocation{}
+		as.externalKeys = map[string]bool{}
 		as.idleKeys = map[string]bool{}
 		as.Unlock()
 	}
@@ -1318,6 +1426,11 @@ func (as *AllocationSet) Set(alloc *Allocation) error {
 
 	as.allocations[alloc.Name] = alloc
 
+	// If the given Allocation is an external one, record that
+	if alloc.IsExternal() {
+		as.externalKeys[alloc.Name] = true
+	}
+
 	// If the given Allocation is an idle one, record that
 	if alloc.IsIdle() {
 		as.idleKeys[alloc.Name] = true

+ 2 - 0
pkg/kubecost/asset.go

@@ -181,6 +181,8 @@ func AssetToExternalAllocation(asset Asset, aggregateBy []string, allocationProp
 		return nil, fmt.Errorf("asset does not qualify as an external allocation")
 	}
 
+	names = append(names, ExternalSuffix)
+
 	// TODO niko/allocation-etl efficiency?
 	// TODO niko/allocation-etl resource totals?
 	return &Allocation{

+ 6 - 6
pkg/kubecost/asset_test.go

@@ -1091,8 +1091,8 @@ func TestAssetToExternalAllocation(t *testing.T) {
 	if err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
-	if alloc.Name != "monitoring" {
-		t.Fatalf("expected external allocation with name '%s'; got '%s'", "monitoring", alloc.Name)
+	if alloc.Name != "monitoring/__external__" {
+		t.Fatalf("expected external allocation with name '%s'; got '%s'", "monitoring/__external__", alloc.Name)
 	}
 	if ns, err := alloc.Properties.GetNamespace(); err != nil || ns != "monitoring" {
 		t.Fatalf("expected external allocation with Properties.Namespace '%s'; got '%s' (%s)", "monitoring", ns, err)
@@ -1109,8 +1109,8 @@ func TestAssetToExternalAllocation(t *testing.T) {
 	if err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
-	if alloc.Name != "monitoring/env=prod" {
-		t.Fatalf("expected external allocation with name '%s'; got '%s'", "monitoring/env=prod", alloc.Name)
+	if alloc.Name != "monitoring/env=prod/__external__" {
+		t.Fatalf("expected external allocation with name '%s'; got '%s'", "monitoring/env=prod/__external__", alloc.Name)
 	}
 	if ns, err := alloc.Properties.GetNamespace(); err != nil || ns != "monitoring" {
 		t.Fatalf("expected external allocation with Properties.Namespace '%s'; got '%s' (%s)", "monitoring", ns, err)
@@ -1130,8 +1130,8 @@ func TestAssetToExternalAllocation(t *testing.T) {
 	if err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
-	if alloc.Name != "monitoring/__unallocated__" {
-		t.Fatalf("expected external allocation with name '%s'; got '%s'", "monitoring/__unallocated__", alloc.Name)
+	if alloc.Name != "monitoring/__unallocated__/__external__" {
+		t.Fatalf("expected external allocation with name '%s'; got '%s'", "monitoring/__unallocated__/__external__", alloc.Name)
 	}
 	if ns, err := alloc.Properties.GetNamespace(); err != nil || ns != "monitoring" {
 		t.Fatalf("expected external allocation with Properties.Namespace '%s'; got '%s' (%s)", "monitoring", ns, err)