|
|
@@ -480,26 +480,30 @@ type AllocationAggregationOptions struct {
|
|
|
// given Property; e.g. Containers can be divided by Namespace, but not vice-a-versa.
|
|
|
func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationAggregationOptions) error {
|
|
|
// The order of operations for aggregating allocations is as follows:
|
|
|
- // 1. Partition external, idle, and shared allocations into separate sets
|
|
|
- // 2. Compute idle coefficients (if necessary)
|
|
|
- // a) if idle allocation is to be shared, compute idle coefficients
|
|
|
- // (do not compute shared coefficients here, see step 5)
|
|
|
- // b) if idle allocation is NOT shared, but filters are present, compute
|
|
|
- // idle filtration coefficients for the purpose of only returning the
|
|
|
- // portion of idle allocation that would have been shared with the
|
|
|
- // unfiltered results set. (See unit tests 5.a,b,c)
|
|
|
- // 3. Ignore allocation if it fails any of the FilterFuncs
|
|
|
- // 4. Distribute idle allocations among remaining non-idle, non-external
|
|
|
- // allocations
|
|
|
- // 5. Generate aggregation key and insert allocation into the output set
|
|
|
- // 6. Scale un-aggregated idle coefficients by filtration coefficient
|
|
|
- // 7. If there are shared allocations, compute sharing coefficients on
|
|
|
- // the aggregated set, then share allocation accordingly
|
|
|
- // 8. If there are external allocations that can be aggregated into
|
|
|
- // the output (i.e. they can be used to generate a valid key for
|
|
|
- // the given properties) then aggregate; otherwise... ignore them?
|
|
|
- // 9. If the merge idle option is enabled, merge any remaining idle
|
|
|
- // allocations into a single idle allocation
|
|
|
+ // 1. Partition external, idle, and shared allocations into separate sets.
|
|
|
+ // Also, create the aggSet into which the results will be aggregated.
|
|
|
+ // 2. Compute sharing coefficients for idle and shared resources
|
|
|
+ // a) if idle allocation is to be shared, compute idle coefficients
|
|
|
+ // b) if idle allocation is NOT shared, but filters are present, compute
|
|
|
+ // idle filtration coefficients for the purpose of only returning the
|
|
|
+ // portion of idle allocation that would have been shared with the
|
|
|
+ // unfiltered results. (See unit tests 5.a,b,c)
|
|
|
+ // c) generate shared allocation for then given shared overhead, which
|
|
|
+ // must happen after (2a) and (2b)
|
|
|
+ // d) if there are shared resources, compute share coefficients
|
|
|
+ // 3. Drop any allocation that fails any of the filters
|
|
|
+ // 4. Distribute idle allocations according to the idle coefficients
|
|
|
+ // 5. Generate aggregation key and insert allocation into the output set
|
|
|
+ // 6. If idle is shared and resources are shared, some idle might be shared
|
|
|
+ // with a shared resource. Distribute that to the shared resources
|
|
|
+ // prior to sharing them with the aggregated results.
|
|
|
+ // 7. Apply idle filtration coefficients from step (2b)
|
|
|
+ // 8. Distribute shared allocations according to the share coefficients.
|
|
|
+ // 9. If there are external allocations that can be aggregated into
|
|
|
+ // the output (i.e. they can be used to generate a valid key for
|
|
|
+ // the given properties) then aggregate; otherwise... ignore them?
|
|
|
+ // 10. If the merge idle option is enabled, merge any remaining idle
|
|
|
+ // allocations into a single idle allocation
|
|
|
|
|
|
if options == nil {
|
|
|
options = &AllocationAggregationOptions{}
|
|
|
@@ -531,28 +535,6 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
Window: as.Window.Clone(),
|
|
|
}
|
|
|
|
|
|
- // Convert SharedHourlyCosts to Allocations in the shareSet
|
|
|
- for name, cost := range options.SharedHourlyCosts {
|
|
|
- if cost > 0.0 {
|
|
|
- hours := as.Resolution().Hours()
|
|
|
-
|
|
|
- // If set ends in the future, adjust hours accordingly
|
|
|
- diff := time.Now().Sub(as.End())
|
|
|
- if diff < 0.0 {
|
|
|
- hours += diff.Hours()
|
|
|
- }
|
|
|
-
|
|
|
- totalSharedCost := cost * hours
|
|
|
-
|
|
|
- shareSet.Insert(&Allocation{
|
|
|
- Name: fmt.Sprintf("%s/%s", name, SharedSuffix),
|
|
|
- Start: as.Start(),
|
|
|
- End: as.End(),
|
|
|
- SharedCost: totalSharedCost,
|
|
|
- })
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
as.Lock()
|
|
|
defer as.Unlock()
|
|
|
|
|
|
@@ -570,14 +552,9 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- cluster, err := alloc.Properties.GetCluster()
|
|
|
- if err != nil {
|
|
|
- log.Warningf("AllocationSet.AggregateBy: missing cluster for allocation: %s", alloc.Name)
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
// Idle allocations should be separated into idleSet if they are to be
|
|
|
- // shared later on. If they are not to be shared, then aggregate them.
|
|
|
+ // shared later on. If they are not to be shared, then add them to the
|
|
|
+ // aggSet like any other allocation.
|
|
|
if alloc.IsIdle() {
|
|
|
delete(as.idleKeys, alloc.Name)
|
|
|
delete(as.allocations, alloc.Name)
|
|
|
@@ -592,14 +569,12 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
}
|
|
|
|
|
|
// Shared allocations must be identified and separated prior to
|
|
|
- // aggregation and filtering. That is, if any of the ShareFuncs
|
|
|
- // return true, then move the allocation to shareSet.
|
|
|
+ // aggregation and filtering. That is, if any of the ShareFuncs return
|
|
|
+ // true for the allocation, then move it to shareSet.
|
|
|
for _, sf := range options.ShareFuncs {
|
|
|
if sf(alloc) {
|
|
|
delete(as.idleKeys, alloc.Name)
|
|
|
delete(as.allocations, alloc.Name)
|
|
|
-
|
|
|
- alloc.Name = fmt.Sprintf("%s/%s", cluster, SharedSuffix)
|
|
|
shareSet.Insert(alloc)
|
|
|
break
|
|
|
}
|
|
|
@@ -607,9 +582,8 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
}
|
|
|
|
|
|
// It's possible that no more un-shared, non-idle, non-external allocations
|
|
|
- // remain at this point. This always results in an emptySet.
|
|
|
+ // remain at this point. This always results in an emptySet, so return early.
|
|
|
if len(as.allocations) == 0 {
|
|
|
- log.Warningf("ETL: AggregateBy: no allocations to aggregate")
|
|
|
emptySet := &AllocationSet{
|
|
|
Window: as.Window.Clone(),
|
|
|
}
|
|
|
@@ -617,43 +591,115 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- // (2) In order to correctly apply idle and shared resource coefficients
|
|
|
- // appropriately, we need to determine the coefficients for the full set
|
|
|
- // of data. The ensures that the ratios are maintained through filtering.
|
|
|
-
|
|
|
- // idleCoefficients are organized by [cluster][allocation][resource]=coeff
|
|
|
- var idleCoefficients map[string]map[string]map[string]float64
|
|
|
-
|
|
|
- // shareCoefficients are organized by [allocation][resource]=coeff (no cluster)
|
|
|
- var shareCoefficients map[string]float64
|
|
|
+ // (2) In order to correctly share idle and shared costs, we first compute
|
|
|
+ // sharing coefficients, which represent the proportion of each cost to
|
|
|
+ // share with each allocation. Idle allocations are shared per-cluster,
|
|
|
+ // per-allocation, and per-resource, while shared resources are shared per-
|
|
|
+ // allocation only.
|
|
|
+ //
|
|
|
+ // For an idleCoefficient example, the entries:
|
|
|
+ // [cluster1][cluster1/namespace1/pod1/container1][cpu] = 0.166667
|
|
|
+ // [cluster1][cluster1/namespace1/pod1/container1][gpu] = 0.166667
|
|
|
+ // [cluster1][cluster1/namespace1/pod1/container1][ram] = 0.687500
|
|
|
+ // mean that the allocation "cluster1/namespace1/pod1/container1" will
|
|
|
+ // receive 16.67% of cluster1's idle CPU and GPU costs and 68.75% of its
|
|
|
+ // RAM costs.
|
|
|
+ //
|
|
|
+ // For a shareCoefficient example, the entries:
|
|
|
+ // [namespace2] = 0.666667
|
|
|
+ // [__filtered__] = 0.333333
|
|
|
+ // mean that the post-aggregation allocation "namespace2" will receive
|
|
|
+ // 66.67% of the shared resource costs, while the remaining 33.33% will
|
|
|
+ // be filtered out, as they were shared with allocations that did not pass
|
|
|
+ // one of the given filters.
|
|
|
+ //
|
|
|
+ // In order to maintain stable results when multiple operations are being
|
|
|
+ // carried out (e.g. sharing idle, sharing resources, and filtering) these
|
|
|
+ // coefficients are computed for the full set of allocaitons prior to
|
|
|
+ // adding shared overhead and prior to applying filters.
|
|
|
|
|
|
var err error
|
|
|
|
|
|
- // (2a) If there are idle costs and we intend to share them, compute the
|
|
|
- // coefficients for sharing the cost among the non-idle, non-aggregated
|
|
|
- // allocations.
|
|
|
+ // (2a) If there are idle costs to be shared, compute the coefficients for
|
|
|
+ // sharing them among the non-idle, non-aggregated allocations (including
|
|
|
+ // the shared allocations).
|
|
|
+ var idleCoefficients map[string]map[string]map[string]float64
|
|
|
if idleSet.Length() > 0 && options.ShareIdle != ShareNone {
|
|
|
- idleCoefficients, err = computeIdleCoeffs(properties, options, as)
|
|
|
+ idleCoefficients, err = computeIdleCoeffs(properties, options, as, shareSet)
|
|
|
if err != nil {
|
|
|
log.Warningf("AllocationSet.AggregateBy: compute idle coeff: %s", err)
|
|
|
return fmt.Errorf("error computing idle coefficients: %s", err)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // (2b) If we're not sharing idle and we're filtering, we need to track the
|
|
|
- // amount of each idle allocation to "delete" in order to maintain parity
|
|
|
- // with the idle-allocated results. That is, we want to return only the
|
|
|
- // idle cost that would have been shared with the unfiltered portion of
|
|
|
- // the results, not the full idle cost.
|
|
|
+ // (2b) If idle costs are not to be shared, but there are filters, then we
|
|
|
+ // need to track the amount of each idle allocation to "filter" in order to
|
|
|
+ // maintain parity with the results when idle is shared. That is, we want
|
|
|
+ // to return only the idle costs that would have been shared with the given
|
|
|
+ // results, even if the filter had not been applied.
|
|
|
+ //
|
|
|
+ // For example, consider these results from aggregating by namespace with
|
|
|
+ // two clusters:
|
|
|
+ //
|
|
|
+ // namespace1: 25.00
|
|
|
+ // namespace2: 30.00
|
|
|
+ // namespace3: 15.00
|
|
|
+ // idle: 30.00
|
|
|
+ //
|
|
|
+ // When we then filter by cluster==cluster1, namespaces 2 and 3 are
|
|
|
+ // reduced by the amount that existed on cluster2. Then, idle must also be
|
|
|
+ // reduced by the relevant amount:
|
|
|
+ //
|
|
|
+ // namespace1: 25.00
|
|
|
+ // namespace2: 15.00
|
|
|
+ // idle: 20.00
|
|
|
+ //
|
|
|
+ // Note that this can happen for any field, not just cluster, so we again
|
|
|
+ // need to track this on a per-cluster, per-allocation, per-resource basis.
|
|
|
var idleFiltrationCoefficients map[string]map[string]map[string]float64
|
|
|
if len(options.FilterFuncs) > 0 && options.ShareIdle == ShareNone {
|
|
|
- idleFiltrationCoefficients, err = computeIdleCoeffs(properties, options, as)
|
|
|
+ idleFiltrationCoefficients, err = computeIdleCoeffs(properties, options, as, shareSet)
|
|
|
if err != nil {
|
|
|
- log.Warningf("AllocationSet.AggregateBy: compute idle coeff: %s", err)
|
|
|
return fmt.Errorf("error computing idle filtration coefficients: %s", err)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // (2c) Convert SharedHourlyCosts to Allocations in the shareSet. This must
|
|
|
+ // come after idle coefficients are computes so that allocations generated
|
|
|
+ // by shared overhead do not skew the idle coefficient computation.
|
|
|
+ for name, cost := range options.SharedHourlyCosts {
|
|
|
+ if cost > 0.0 {
|
|
|
+ hours := as.Resolution().Hours()
|
|
|
+
|
|
|
+ // If set ends in the future, adjust hours accordingly
|
|
|
+ diff := time.Now().Sub(as.End())
|
|
|
+ if diff < 0.0 {
|
|
|
+ hours += diff.Hours()
|
|
|
+ }
|
|
|
+
|
|
|
+ totalSharedCost := cost * hours
|
|
|
+
|
|
|
+ shareSet.Insert(&Allocation{
|
|
|
+ Name: fmt.Sprintf("%s/%s", name, SharedSuffix),
|
|
|
+ Start: as.Start(),
|
|
|
+ End: as.End(),
|
|
|
+ SharedCost: totalSharedCost,
|
|
|
+ TotalCost: totalSharedCost,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // (2d) Compute share coefficients for shared resources. These are computed
|
|
|
+ // after idle coefficients, and are computed for the aggregated allocations
|
|
|
+ // of the main allocation set. See above for details and an example.
|
|
|
+ var shareCoefficients map[string]float64
|
|
|
+ if shareSet.Length() > 0 {
|
|
|
+ shareCoefficients, err = computeShareCoeffs(properties, options, as)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("error computing share coefficients: %s", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// (3-5) Filter, distribute idle cost, and aggregate (in that order)
|
|
|
for _, alloc := range as.allocations {
|
|
|
cluster, err := alloc.Properties.GetCluster()
|
|
|
@@ -685,8 +731,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- // (4) Split idle allocations and distribute among remaining
|
|
|
- // un-aggregated allocations.
|
|
|
+ // (4) Distribute idle allocations according to the idle coefficients
|
|
|
// NOTE: if idle allocation is off (i.e. ShareIdle == ShareNone) then
|
|
|
// all idle allocations will be in the aggSet at this point, so idleSet
|
|
|
// will be empty and we won't enter this block.
|
|
|
@@ -705,11 +750,11 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
|
|
|
// Make sure idle coefficients exist
|
|
|
if _, ok := idleCoefficients[cluster]; !ok {
|
|
|
- log.Errorf("ETL: share (idle) allocation: error getting allocation coefficient [no cluster: '%s' in coefficients] for '%s'", cluster, alloc.Name)
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no cluster '%s' for '%s'", cluster, alloc.Name)
|
|
|
continue
|
|
|
}
|
|
|
if _, ok := idleCoefficients[cluster][alloc.Name]; !ok {
|
|
|
- log.Errorf("ETL: share (idle) allocation: error getting allocation coefficienct for '%s'", alloc.Name)
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -742,6 +787,55 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
aggSet.Insert(alloc)
|
|
|
}
|
|
|
|
|
|
+ // (6) If idle is shared and resources are shared, it's possible that some
|
|
|
+ // amount of idle cost will be shared with a shared resource. Distribute
|
|
|
+ // that idle allocation, if it exists, to the respective shared allocations
|
|
|
+ // before sharing with the aggregated allocations.
|
|
|
+ if idleSet.Length() > 0 && shareSet.Length() > 0 {
|
|
|
+ for _, alloc := range shareSet.allocations {
|
|
|
+ cluster, err := alloc.Properties.GetCluster()
|
|
|
+ if err != nil {
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: missing cluster for allocation: %s", alloc.Name)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ // Distribute idle allocations by coefficient per-cluster, per-allocation
|
|
|
+ for _, idleAlloc := range idleSet.allocations {
|
|
|
+ // Only share idle if the cluster matches; i.e. the allocation
|
|
|
+ // is from the same cluster as the idle costs
|
|
|
+ idleCluster, err := idleAlloc.Properties.GetCluster()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if idleCluster != cluster {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // Make sure idle coefficients exist
|
|
|
+ if _, ok := idleCoefficients[cluster]; !ok {
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no cluster '%s' for '%s'", cluster, alloc.Name)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if _, ok := idleCoefficients[cluster][alloc.Name]; !ok {
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[cluster][alloc.Name]["cpu"]
|
|
|
+ alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[cluster][alloc.Name]["gpu"]
|
|
|
+ alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[cluster][alloc.Name]["ram"]
|
|
|
+
|
|
|
+ idleCPUCost := idleAlloc.CPUCost * idleCoefficients[cluster][alloc.Name]["cpu"]
|
|
|
+ idleGPUCost := idleAlloc.GPUCost * idleCoefficients[cluster][alloc.Name]["gpu"]
|
|
|
+ idleRAMCost := idleAlloc.RAMCost * idleCoefficients[cluster][alloc.Name]["ram"]
|
|
|
+ alloc.CPUCost += idleCPUCost
|
|
|
+ alloc.GPUCost += idleGPUCost
|
|
|
+ alloc.RAMCost += idleRAMCost
|
|
|
+ alloc.TotalCost += idleCPUCost + idleGPUCost + idleRAMCost
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// clusterIdleFiltrationCoeffs is used to track per-resource idle
|
|
|
// coefficients on a cluster-by-cluster basis. It is, essentailly, an
|
|
|
// aggregation of idleFiltrationCoefficients after they have been
|
|
|
@@ -767,17 +861,16 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // (6) If we have both un-shared idle allocations and idle filtration
|
|
|
- // coefficients (i.e. we have computed coefficients for scaling idle
|
|
|
- // allocation costs by cluster) then use those coefficients to scale down
|
|
|
- // each idle allocation.
|
|
|
+ // (7) If we have both un-shared idle allocations and idle filtration
|
|
|
+ // coefficients then apply those. See step (2b) for an example.
|
|
|
if len(aggSet.idleKeys) > 0 && clusterIdleFiltrationCoeffs != nil {
|
|
|
for idleKey := range aggSet.idleKeys {
|
|
|
idleAlloc := aggSet.Get(idleKey)
|
|
|
|
|
|
cluster, err := idleAlloc.Properties.GetCluster()
|
|
|
if err != nil {
|
|
|
- log.Warningf("AggregateBy: idle allocation without cluster: %s", idleAlloc)
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: idle allocation without cluster: %s", idleAlloc)
|
|
|
+ continue
|
|
|
}
|
|
|
|
|
|
if resourceCoeffs, ok := clusterIdleFiltrationCoeffs[cluster]; ok {
|
|
|
@@ -786,30 +879,15 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
idleAlloc.RAMCost *= resourceCoeffs["ram"]
|
|
|
idleAlloc.RAMByteHours *= resourceCoeffs["ram"]
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // (7) Split shared allocations and distribute among aggregated allocations
|
|
|
+ // (8) Distribute shared allocations according to the share coefficients.
|
|
|
if shareSet.Length() > 0 {
|
|
|
- shareCoefficients, err = computeShareCoeffs(properties, options, aggSet)
|
|
|
- if err != nil {
|
|
|
- log.Warningf("AllocationSet.AggregateBy: compute shared coeff: missing cluster ID: %s", err)
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
for _, alloc := range aggSet.allocations {
|
|
|
- if alloc.IsIdle() {
|
|
|
- // Skip idle allocations (they do not receive shared allocation)
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
- // Distribute shared allocations by coefficient per-allocation
|
|
|
- // NOTE: share coefficients do not partition by cluster, like
|
|
|
- // idle coefficients do.
|
|
|
for _, sharedAlloc := range shareSet.allocations {
|
|
|
if _, ok := shareCoefficients[alloc.Name]; !ok {
|
|
|
- log.Errorf("ETL: share allocation: error getting allocation coefficienct for '%s'", alloc.Name)
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: error getting share coefficienct for '%s'", alloc.Name)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -818,7 +896,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // (8) Aggregate external allocations into aggregated allocations. This may
|
|
|
+ // (9) Aggregate external allocations into aggregated allocations. This may
|
|
|
// not be possible for every external allocation, but attempt to find an
|
|
|
// exact key match, given each external allocation's proerties, and
|
|
|
// aggregate if an exact match is found.
|
|
|
@@ -841,7 +919,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // (9) Combine all idle allocations into a single "__idle__" allocation
|
|
|
+ // (10) Combine all idle allocations into a single "__idle__" allocation
|
|
|
if !options.SplitIdle {
|
|
|
for _, idleAlloc := range aggSet.IdleAllocations() {
|
|
|
aggSet.Delete(idleAlloc.Name)
|
|
|
@@ -867,19 +945,44 @@ func computeShareCoeffs(properties Properties, options *AllocationAggregationOpt
|
|
|
shareType := options.ShareSplit
|
|
|
|
|
|
// Record allocation values first, then normalize by totals to get percentages
|
|
|
- for name, alloc := range as.allocations {
|
|
|
+ for n, alloc := range as.allocations {
|
|
|
if alloc.IsIdle() {
|
|
|
// Skip idle allocations in coefficient calculation
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
+ // Determine the post-aggregation key under which the allocation will
|
|
|
+ // be shared.
|
|
|
+ name, err := alloc.generateKey(properties)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf(`failed to generate key for shared allocation "%s": %s`, n, err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // If the current allocation will be filtered out in step 3, contribute
|
|
|
+ // its share of the shared coefficient to a "__filtered__" bin, which
|
|
|
+ // will ultimately be dropped. This step ensures that the shared cost
|
|
|
+ // of a non-filtered allocation will be conserved even when the filter
|
|
|
+ // is removed. (Otherwise, all the shared cost will get redistributed
|
|
|
+ // over the unfiltered results, inflating their shared costs.)
|
|
|
+ filtered := false
|
|
|
+ for _, ff := range options.FilterFuncs {
|
|
|
+ if !ff(alloc) {
|
|
|
+ filtered = true
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if filtered {
|
|
|
+ name = "__filtered__"
|
|
|
+ }
|
|
|
+
|
|
|
if shareType == ShareEven {
|
|
|
- // Not additive - set to 1.0 for even distribution
|
|
|
+ // Even distribution is not additive - set to 1.0 for everything
|
|
|
coeffs[name] = 1.0
|
|
|
- // Total is always additive
|
|
|
- total += 1.0
|
|
|
+ // Total for even distribution is always the number of coefficients
|
|
|
+ total = float64(len(coeffs))
|
|
|
} else {
|
|
|
- // Both are additive for weighted distribution
|
|
|
+ // Both are additive for weighted distribution, where each
|
|
|
+ // cumulative coefficient will be divided by the total.
|
|
|
coeffs[name] += alloc.TotalCost()
|
|
|
total += alloc.TotalCost()
|
|
|
}
|
|
|
@@ -898,7 +1001,7 @@ func computeShareCoeffs(properties Properties, options *AllocationAggregationOpt
|
|
|
return coeffs, nil
|
|
|
}
|
|
|
|
|
|
-func computeIdleCoeffs(properties Properties, options *AllocationAggregationOptions, as *AllocationSet) (map[string]map[string]map[string]float64, error) {
|
|
|
+func computeIdleCoeffs(properties Properties, options *AllocationAggregationOptions, as *AllocationSet, shareSet *AllocationSet) (map[string]map[string]map[string]float64, error) {
|
|
|
types := []string{"cpu", "gpu", "ram"}
|
|
|
|
|
|
// Compute idle coefficients, then save them in AllocationAggregationOptions
|
|
|
@@ -918,17 +1021,50 @@ func computeIdleCoeffs(properties Properties, options *AllocationAggregationOpti
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- // If any of the share funcs succeed, share the allocation. Do this
|
|
|
- // prior to filtering so that shared namespaces, etc do not get
|
|
|
- // filtered out before we have a chance to share them.
|
|
|
- skip := false
|
|
|
- for _, sf := range options.ShareFuncs {
|
|
|
- if sf(alloc) {
|
|
|
- skip = true
|
|
|
- break
|
|
|
+ // We need to key the allocations by cluster id
|
|
|
+ clusterID, err := alloc.Properties.GetCluster()
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ // get the name key for the allocation
|
|
|
+ name := alloc.Name
|
|
|
+
|
|
|
+ // Create cluster based tables if they don't exist
|
|
|
+ if _, ok := coeffs[clusterID]; !ok {
|
|
|
+ coeffs[clusterID] = map[string]map[string]float64{}
|
|
|
+ }
|
|
|
+ if _, ok := totals[clusterID]; !ok {
|
|
|
+ totals[clusterID] = map[string]float64{}
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, ok := coeffs[clusterID][name]; !ok {
|
|
|
+ coeffs[clusterID][name] = map[string]float64{}
|
|
|
+ }
|
|
|
+
|
|
|
+ if shareType == ShareEven {
|
|
|
+ for _, r := range types {
|
|
|
+ // Not additive - hard set to 1.0
|
|
|
+ coeffs[clusterID][name][r] = 1.0
|
|
|
+
|
|
|
+ // totals are additive
|
|
|
+ totals[clusterID][r] += 1.0
|
|
|
}
|
|
|
+ } else {
|
|
|
+ coeffs[clusterID][name]["cpu"] += alloc.CPUCost
|
|
|
+ coeffs[clusterID][name]["gpu"] += alloc.GPUCost
|
|
|
+ coeffs[clusterID][name]["ram"] += alloc.RAMCost
|
|
|
+
|
|
|
+ totals[clusterID]["cpu"] += alloc.CPUCost
|
|
|
+ totals[clusterID]["gpu"] += alloc.GPUCost
|
|
|
+ totals[clusterID]["ram"] += alloc.RAMCost
|
|
|
}
|
|
|
- if skip {
|
|
|
+ }
|
|
|
+
|
|
|
+ // Do the same for shared allocations
|
|
|
+ for _, alloc := range shareSet.allocations {
|
|
|
+ if alloc.IsIdle() {
|
|
|
+ // Skip idle allocations in coefficient calculation
|
|
|
continue
|
|
|
}
|
|
|
|