Niko Kovacevic 5 лет назад
Родитель
Сommit
ca5115ecf0
2 измененных файлов с 104 добавлено и 100 удалено
  1. 104 98
      pkg/kubecost/allocation.go
  2. 0 2
      pkg/kubecost/allocation_test.go

+ 104 - 98
pkg/kubecost/allocation.go

@@ -383,26 +383,30 @@ type AllocationAggregationOptions struct {
 // given Property; e.g. Containers can be divided by Namespace, but not vice-a-versa.
 // given Property; e.g. Containers can be divided by Namespace, but not vice-a-versa.
 func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationAggregationOptions) error {
 func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationAggregationOptions) error {
 	// The order of operations for aggregating allocations is as follows:
 	// The order of operations for aggregating allocations is as follows:
-	// 1. Partition external, idle, and shared allocations into separate sets
-	// 2. Compute idle coefficients (if necessary)
-	//    a) if idle allocation is to be shared, compute idle coefficients
-	//       (do not compute shared coefficients here, see step 5)
-	//    b) if idle allocation is NOT shared, but filters are present, compute
-	//       idle filtration coefficients for the purpose of only returning the
-	//       portion of idle allocation that would have been shared with the
-	//       unfiltered results set. (See unit tests 5.a,b,c)
-	// 3. Ignore allocation if it fails any of the FilterFuncs
-	// 4. Distribute idle allocations among remaining non-idle, non-external
-	//    allocations
-	// 5. Generate aggregation key and insert allocation into the output set
-	// 6. Scale un-aggregated idle coefficients by filtration coefficient
-	// 7. If there are shared allocations, compute sharing coefficients on
-	//    the aggregated set, then share allocation accordingly
-	// 8. If there are external allocations that can be aggregated into
-	//    the output (i.e. they can be used to generate a valid key for
-	//    the given properties) then aggregate; otherwise... ignore them?
-	// 9. If the merge idle option is enabled, merge any remaining idle
-	//    allocations into a single idle allocation
+	//  1. Partition external, idle, and shared allocations into separate sets.
+	//     Also, create the aggSet into which the results will be aggregated.
+	//  2. Compute sharing coefficients for idle and shared resources
+	//     a) if idle allocation is to be shared, compute idle coefficients
+	//     b) if idle allocation is NOT shared, but filters are present, compute
+	//        idle filtration coefficients for the purpose of only returning the
+	//        portion of idle allocation that would have been shared with the
+	//        unfiltered results. (See unit tests 5.a,b,c)
+	//     c) generate shared allocation for then given shared overhead, which
+	//        must happen after (2a) and (2b)
+	//     d) if there are shared resources, compute share coefficients
+	//  3. Drop any allocation that fails any of the filters
+	//  4. Distribute idle allocations according to the idle coefficients
+	//  5. Generate aggregation key and insert allocation into the output set
+	//  6. If idle is shared and resources are shared, some idle might be shared
+	//     with a shared resource. Distribute that to the shared resources
+	//     prior to sharing them with the aggregated results.
+	//  7. Apply idle filtration coefficients from step (2b)
+	//  8. Distribute shared allocations according to the share coefficients.
+	//  9. If there are external allocations that can be aggregated into
+	//     the output (i.e. they can be used to generate a valid key for
+	//     the given properties) then aggregate; otherwise... ignore them?
+	// 10. If the merge idle option is enabled, merge any remaining idle
+	//     allocations into a single idle allocation
 
 
 	// TODO niko/etl revisit (ShareIdle: ShareEven) case, which is probably wrong
 	// TODO niko/etl revisit (ShareIdle: ShareEven) case, which is probably wrong
 	// (and, frankly, ill-defined; i.e. evenly across clusters? within clusters?)
 	// (and, frankly, ill-defined; i.e. evenly across clusters? within clusters?)
@@ -437,29 +441,6 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		Window: as.Window.Clone(),
 		Window: as.Window.Clone(),
 	}
 	}
 
 
-	// // Convert SharedHourlyCosts to Allocations in the shareSet
-	// for name, cost := range options.SharedHourlyCosts {
-	// 	if cost > 0.0 {
-	// 		hours := as.Resolution().Hours()
-
-	// 		// If set ends in the future, adjust hours accordingly
-	// 		diff := time.Now().Sub(as.End())
-	// 		if diff < 0.0 {
-	// 			hours += diff.Hours()
-	// 		}
-
-	// 		totalSharedCost := cost * hours
-
-	// 		shareSet.Insert(&Allocation{
-	// 			Name:       fmt.Sprintf("%s/%s", name, SharedSuffix),
-	// 			Start:      as.Start(),
-	// 			End:        as.End(),
-	// 			SharedCost: totalSharedCost,
-	// 			TotalCost:  totalSharedCost,
-	// 		})
-	// 	}
-	// }
-
 	as.Lock()
 	as.Lock()
 	defer as.Unlock()
 	defer as.Unlock()
 
 
@@ -477,14 +458,9 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 			continue
 			continue
 		}
 		}
 
 
-		// cluster, err := alloc.Properties.GetCluster()
-		// if err != nil {
-		// 	log.Warningf("AllocationSet.AggregateBy: missing cluster for allocation: %s", alloc.Name)
-		// 	return err
-		// }
-
 		// Idle allocations should be separated into idleSet if they are to be
 		// Idle allocations should be separated into idleSet if they are to be
-		// shared later on. If they are not to be shared, then aggregate them.
+		// shared later on. If they are not to be shared, then add them to the
+		// aggSet like any other allocation.
 		if alloc.IsIdle() {
 		if alloc.IsIdle() {
 			delete(as.idleKeys, alloc.Name)
 			delete(as.idleKeys, alloc.Name)
 			delete(as.allocations, alloc.Name)
 			delete(as.allocations, alloc.Name)
@@ -499,14 +475,12 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		}
 		}
 
 
 		// Shared allocations must be identified and separated prior to
 		// Shared allocations must be identified and separated prior to
-		// aggregation and filtering. That is, if any of the ShareFuncs
-		// return true, then move the allocation to shareSet.
+		// aggregation and filtering. That is, if any of the ShareFuncs return
+		// true for the allocation, then move it to shareSet.
 		for _, sf := range options.ShareFuncs {
 		for _, sf := range options.ShareFuncs {
 			if sf(alloc) {
 			if sf(alloc) {
 				delete(as.idleKeys, alloc.Name)
 				delete(as.idleKeys, alloc.Name)
 				delete(as.allocations, alloc.Name)
 				delete(as.allocations, alloc.Name)
-
-				// alloc.Name = fmt.Sprintf("%s/%s", cluster, SharedSuffix)
 				shareSet.Insert(alloc)
 				shareSet.Insert(alloc)
 				break
 				break
 			}
 			}
@@ -514,9 +488,8 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 	}
 	}
 
 
 	// It's possible that no more un-shared, non-idle, non-external allocations
 	// It's possible that no more un-shared, non-idle, non-external allocations
-	// remain at this point. This always results in an emptySet.
+	// remain at this point. This always results in an emptySet, so return early.
 	if len(as.allocations) == 0 {
 	if len(as.allocations) == 0 {
-		log.Warningf("AllocationSet.AggregateBy: no allocations to aggregate")
 		emptySet := &AllocationSet{
 		emptySet := &AllocationSet{
 			Window: as.Window.Clone(),
 			Window: as.Window.Clone(),
 		}
 		}
@@ -524,21 +497,39 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		return nil
 		return nil
 	}
 	}
 
 
-	// (2) In order to correctly apply idle and shared resource coefficients
-	// appropriately, we need to determine the coefficients for the full set
-	// of data. The ensures that the ratios are maintained through filtering.
-
-	// idleCoefficients are organized by [cluster][allocation][resource]=coeff
-	var idleCoefficients map[string]map[string]map[string]float64
-
-	// shareCoefficients are organized by [allocation]=coeff
-	var shareCoefficients map[string]float64
+	// (2) In order to correctly share idle and shared costs, we first compute
+	// sharing coefficients, which represent the proportion of each cost to
+	// share with each allocation. Idle allocations are shared per-cluster,
+	// per-allocation, and per-resource, while shared resources are shared per-
+	// allocation only.
+	//
+	// For an idleCoefficient example, the entries:
+	//   [cluster1][cluster1/namespace1/pod1/container1][cpu] = 0.166667
+	//   [cluster1][cluster1/namespace1/pod1/container1][gpu] = 0.166667
+	//   [cluster1][cluster1/namespace1/pod1/container1][ram] = 0.687500
+	// mean that the allocation "cluster1/namespace1/pod1/container1" will
+	// receive 16.67% of cluster1's idle CPU and GPU costs and 68.75% of its
+	// RAM costs.
+	//
+	// For a shareCoefficient example, the entries:
+	//   [namespace2] = 0.666667
+	//   [__filtered__] = 0.333333
+	// mean that the post-aggregation allocation "namespace2" will receive
+	// 66.67% of the shared resource costs, while the remaining 33.33% will
+	// be filtered out, as they were shared with allocations that did not pass
+	// one of the given filters.
+	//
+	// In order to maintain stable results when multiple operations are being
+	// carried out (e.g. sharing idle, sharing resources, and filtering) these
+	// coefficients are computed for the full set of allocaitons prior to
+	// adding shared overhead and prior to applying filters.
 
 
 	var err error
 	var err error
 
 
-	// (2a) If there are idle costs and we intend to share them, compute the
-	// coefficients for sharing the cost among the non-idle, non-aggregated
-	// allocations.
+	// (2a) If there are idle costs to be shared, compute the coefficients for
+	// sharing them among the non-idle, non-aggregated allocations (including
+	// the shared allocations).
+	var idleCoefficients map[string]map[string]map[string]float64
 	if idleSet.Length() > 0 && options.ShareIdle != ShareNone {
 	if idleSet.Length() > 0 && options.ShareIdle != ShareNone {
 		idleCoefficients, err = computeIdleCoeffs(properties, options, as, shareSet)
 		idleCoefficients, err = computeIdleCoeffs(properties, options, as, shareSet)
 		if err != nil {
 		if err != nil {
@@ -547,11 +538,30 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		}
 		}
 	}
 	}
 
 
-	// (2b) If we're not sharing idle and we're filtering, we need to track the
-	// amount of each idle allocation to "delete" in order to maintain parity
-	// with the idle-allocated results. That is, we want to return only the
-	// idle cost that would have been shared with the unfiltered portion of
-	// the results, not the full idle cost.
+	// (2b) If idle costs are not to be shared, but there are filters, then we
+	// need to track the amount of each idle allocation to "filter" in order to
+	// maintain parity with the results when idle is shared. That is, we want
+	// to return only the idle costs that would have been shared with the given
+	// results, even if the filter had not been applied.
+	//
+	// For example, consider these results from aggregating by namespace with
+	// two clusters:
+	//
+	//  namespace1: 25.00
+	// 	namespace2: 30.00
+	// 	namespace3: 15.00
+	// 	idle:       30.00
+	//
+	// When we then filter by cluster==cluster1, namespaces 2 and 3 are
+	// reduced by the amount that existed on cluster2. Then, idle must also be
+	// reduced by the relevant amount:
+	//
+	//  namespace1: 25.00
+	// 	namespace2: 15.00
+	//  idle:       20.00
+	//
+	// Note that this can happen for any field, not just cluster, so we again
+	// need to track this on a per-cluster, per-allocation, per-resource basis.
 	var idleFiltrationCoefficients map[string]map[string]map[string]float64
 	var idleFiltrationCoefficients map[string]map[string]map[string]float64
 	if len(options.FilterFuncs) > 0 && options.ShareIdle == ShareNone {
 	if len(options.FilterFuncs) > 0 && options.ShareIdle == ShareNone {
 		idleFiltrationCoefficients, err = computeIdleCoeffs(properties, options, as, shareSet)
 		idleFiltrationCoefficients, err = computeIdleCoeffs(properties, options, as, shareSet)
@@ -560,8 +570,9 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		}
 		}
 	}
 	}
 
 
-	// Convert SharedHourlyCosts to Allocations in the shareSet
-	// TODO comment: why do we have to do this here, after computing idle?
+	// (2c) Convert SharedHourlyCosts to Allocations in the shareSet. This must
+	// come after idle coefficients are computes so that allocations generated
+	// by shared overhead do not skew the idle coefficient computation.
 	for name, cost := range options.SharedHourlyCosts {
 	for name, cost := range options.SharedHourlyCosts {
 		if cost > 0.0 {
 		if cost > 0.0 {
 			hours := as.Resolution().Hours()
 			hours := as.Resolution().Hours()
@@ -584,7 +595,10 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		}
 		}
 	}
 	}
 
 
-	// (2c) Compute share coefficients (TODO comment)
+	// (2d) Compute share coefficients for shared resources. These are computed
+	// after idle coefficients, and are computed for the aggregated allocations
+	// of the main allocation set. See above for details and an example.
+	var shareCoefficients map[string]float64
 	if shareSet.Length() > 0 {
 	if shareSet.Length() > 0 {
 		shareCoefficients, err = computeShareCoeffs(properties, options, as)
 		shareCoefficients, err = computeShareCoeffs(properties, options, as)
 		if err != nil {
 		if err != nil {
@@ -623,8 +637,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 			continue
 			continue
 		}
 		}
 
 
-		// (4) Split idle allocations and distribute among remaining
-		// un-aggregated allocations.
+		// (4) Distribute idle allocations according to the idle coefficients
 		// NOTE: if idle allocation is off (i.e. ShareIdle == ShareNone) then
 		// NOTE: if idle allocation is off (i.e. ShareIdle == ShareNone) then
 		// all idle allocations will be in the aggSet at this point, so idleSet
 		// all idle allocations will be in the aggSet at this point, so idleSet
 		// will be empty and we won't enter this block.
 		// will be empty and we won't enter this block.
@@ -681,8 +694,11 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		aggSet.Insert(alloc)
 		aggSet.Insert(alloc)
 	}
 	}
 
 
-	// TODO comment
-	if idleSet.Length() > 0 {
+	// (6) If idle is shared and resources are shared, it's possible that some
+	// amount of idle cost will be shared with a shared resource. Distribute
+	// that idle allocation, if it exists, to the respective shared allocations
+	// before sharing with the aggregated allocations.
+	if idleSet.Length() > 0 && shareSet.Length() > 0 {
 		for _, alloc := range shareSet.allocations {
 		for _, alloc := range shareSet.allocations {
 			cluster, err := alloc.Properties.GetCluster()
 			cluster, err := alloc.Properties.GetCluster()
 			if err != nil {
 			if err != nil {
@@ -752,17 +768,16 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		}
 		}
 	}
 	}
 
 
-	// (6) If we have both un-shared idle allocations and idle filtration
-	// coefficients (i.e. we have computed coefficients for scaling idle
-	// allocation costs by cluster) then use those coefficients to scale down
-	// each idle allocation.
+	// (7) If we have both un-shared idle allocations and idle filtration
+	// coefficients then apply those. See step (2b) for an example.
 	if len(aggSet.idleKeys) > 0 && clusterIdleFiltrationCoeffs != nil {
 	if len(aggSet.idleKeys) > 0 && clusterIdleFiltrationCoeffs != nil {
 		for idleKey := range aggSet.idleKeys {
 		for idleKey := range aggSet.idleKeys {
 			idleAlloc := aggSet.Get(idleKey)
 			idleAlloc := aggSet.Get(idleKey)
 
 
 			cluster, err := idleAlloc.Properties.GetCluster()
 			cluster, err := idleAlloc.Properties.GetCluster()
 			if err != nil {
 			if err != nil {
-				log.Warningf("AggregateBy: idle allocation without cluster: %s", idleAlloc)
+				log.Warningf("AllocationSet.AggregateBy: idle allocation without cluster: %s", idleAlloc)
+				continue
 			}
 			}
 
 
 			if resourceCoeffs, ok := clusterIdleFiltrationCoeffs[cluster]; ok {
 			if resourceCoeffs, ok := clusterIdleFiltrationCoeffs[cluster]; ok {
@@ -772,24 +787,15 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 				idleAlloc.RAMByteHours *= resourceCoeffs["ram"]
 				idleAlloc.RAMByteHours *= resourceCoeffs["ram"]
 				idleAlloc.TotalCost = idleAlloc.CPUCost + idleAlloc.RAMCost
 				idleAlloc.TotalCost = idleAlloc.CPUCost + idleAlloc.RAMCost
 			}
 			}
-
 		}
 		}
 	}
 	}
 
 
-	// (7) Split shared allocations and distribute among aggregated allocations
+	// (8) Distribute shared allocations according to the share coefficients.
 	if shareSet.Length() > 0 {
 	if shareSet.Length() > 0 {
 		for _, alloc := range aggSet.allocations {
 		for _, alloc := range aggSet.allocations {
-			if alloc.IsIdle() {
-				// Skip idle allocations (they do not receive shared allocation)
-				continue
-			}
-
-			// Distribute shared allocations by coefficient per-allocation
-			// NOTE: share coefficients do not partition by cluster, like
-			// idle coefficients do.
 			for _, sharedAlloc := range shareSet.allocations {
 			for _, sharedAlloc := range shareSet.allocations {
 				if _, ok := shareCoefficients[alloc.Name]; !ok {
 				if _, ok := shareCoefficients[alloc.Name]; !ok {
-					log.Errorf("ETL: share allocation: error getting allocation coefficienct for '%s'", alloc.Name)
+					log.Warningf("AllocationSet.AggregateBy: error getting share coefficienct for '%s'", alloc.Name)
 					continue
 					continue
 				}
 				}
 
 
@@ -799,7 +805,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		}
 		}
 	}
 	}
 
 
-	// (8) Aggregate external allocations into aggregated allocations. This may
+	// (9) Aggregate external allocations into aggregated allocations. This may
 	// not be possible for every external allocation, but attempt to find an
 	// not be possible for every external allocation, but attempt to find an
 	// exact key match, given each external allocation's proerties, and
 	// exact key match, given each external allocation's proerties, and
 	// aggregate if an exact match is found.
 	// aggregate if an exact match is found.
@@ -822,7 +828,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
 		}
 		}
 	}
 	}
 
 
-	// (9) Combine all idle allocations into a single "__idle__" allocation
+	// (10) Combine all idle allocations into a single "__idle__" allocation
 	if !options.SplitIdle {
 	if !options.SplitIdle {
 		for _, idleAlloc := range aggSet.IdleAllocations() {
 		for _, idleAlloc := range aggSet.IdleAllocations() {
 			aggSet.Delete(idleAlloc.Name)
 			aggSet.Delete(idleAlloc.Name)

+ 0 - 2
pkg/kubecost/allocation_test.go

@@ -937,7 +937,6 @@ func TestAllocationSet_AggregateBy(t *testing.T) {
 		ShareSplit: ShareWeighted,
 		ShareSplit: ShareWeighted,
 		ShareIdle:  ShareWeighted,
 		ShareIdle:  ShareWeighted,
 	})
 	})
-	printAllocationSet("6f", as)
 	assertAllocationSetTotals(t, as, "6f", err, 2, activeTotalCost+idleTotalCost)
 	assertAllocationSetTotals(t, as, "6f", err, 2, activeTotalCost+idleTotalCost)
 	assertAllocationTotals(t, as, "6f", map[string]float64{
 	assertAllocationTotals(t, as, "6f", map[string]float64{
 		"namespace2": 66.77,
 		"namespace2": 66.77,
@@ -945,7 +944,6 @@ func TestAllocationSet_AggregateBy(t *testing.T) {
 	})
 	})
 	assertAllocationWindow(t, as, "6f", startYesterday, endYesterday, 1440.0)
 	assertAllocationWindow(t, as, "6f", startYesterday, endYesterday, 1440.0)
 
 
-	fmt.Println("--6g--")
 	// 6g Share idle, share resources, and filter
 	// 6g Share idle, share resources, and filter
 	//
 	//
 	// First, share idle weighted produces:
 	// First, share idle weighted produces: