|
@@ -436,32 +436,29 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
shareSet := &AllocationSet{
|
|
shareSet := &AllocationSet{
|
|
|
Window: as.Window.Clone(),
|
|
Window: as.Window.Clone(),
|
|
|
}
|
|
}
|
|
|
- flatShareSet := &AllocationSet{
|
|
|
|
|
- Window: as.Window.Clone(),
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Convert SharedHourlyCosts to Allocations in the shareSet
|
|
|
|
|
- for name, cost := range options.SharedHourlyCosts {
|
|
|
|
|
- if cost > 0.0 {
|
|
|
|
|
- hours := as.Resolution().Hours()
|
|
|
|
|
-
|
|
|
|
|
- // If set ends in the future, adjust hours accordingly
|
|
|
|
|
- diff := time.Now().Sub(as.End())
|
|
|
|
|
- if diff < 0.0 {
|
|
|
|
|
- hours += diff.Hours()
|
|
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
- totalSharedCost := cost * hours
|
|
|
|
|
-
|
|
|
|
|
- flatShareSet.Insert(&Allocation{
|
|
|
|
|
- Name: fmt.Sprintf("%s/%s", name, SharedSuffix),
|
|
|
|
|
- Start: as.Start(),
|
|
|
|
|
- End: as.End(),
|
|
|
|
|
- SharedCost: totalSharedCost,
|
|
|
|
|
- TotalCost: totalSharedCost,
|
|
|
|
|
- })
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // // Convert SharedHourlyCosts to Allocations in the shareSet
|
|
|
|
|
+ // for name, cost := range options.SharedHourlyCosts {
|
|
|
|
|
+ // if cost > 0.0 {
|
|
|
|
|
+ // hours := as.Resolution().Hours()
|
|
|
|
|
+
|
|
|
|
|
+ // // If set ends in the future, adjust hours accordingly
|
|
|
|
|
+ // diff := time.Now().Sub(as.End())
|
|
|
|
|
+ // if diff < 0.0 {
|
|
|
|
|
+ // hours += diff.Hours()
|
|
|
|
|
+ // }
|
|
|
|
|
+
|
|
|
|
|
+ // totalSharedCost := cost * hours
|
|
|
|
|
+
|
|
|
|
|
+ // shareSet.Insert(&Allocation{
|
|
|
|
|
+ // Name: fmt.Sprintf("%s/%s", name, SharedSuffix),
|
|
|
|
|
+ // Start: as.Start(),
|
|
|
|
|
+ // End: as.End(),
|
|
|
|
|
+ // SharedCost: totalSharedCost,
|
|
|
|
|
+ // TotalCost: totalSharedCost,
|
|
|
|
|
+ // })
|
|
|
|
|
+ // }
|
|
|
|
|
+ // }
|
|
|
|
|
|
|
|
as.Lock()
|
|
as.Lock()
|
|
|
defer as.Unlock()
|
|
defer as.Unlock()
|
|
@@ -480,11 +477,11 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- cluster, err := alloc.Properties.GetCluster()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- log.Warningf("AllocationSet.AggregateBy: missing cluster for allocation: %s", alloc.Name)
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // cluster, err := alloc.Properties.GetCluster()
|
|
|
|
|
+ // if err != nil {
|
|
|
|
|
+ // log.Warningf("AllocationSet.AggregateBy: missing cluster for allocation: %s", alloc.Name)
|
|
|
|
|
+ // return err
|
|
|
|
|
+ // }
|
|
|
|
|
|
|
|
// Idle allocations should be separated into idleSet if they are to be
|
|
// Idle allocations should be separated into idleSet if they are to be
|
|
|
// shared later on. If they are not to be shared, then aggregate them.
|
|
// shared later on. If they are not to be shared, then aggregate them.
|
|
@@ -509,7 +506,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
delete(as.idleKeys, alloc.Name)
|
|
delete(as.idleKeys, alloc.Name)
|
|
|
delete(as.allocations, alloc.Name)
|
|
delete(as.allocations, alloc.Name)
|
|
|
|
|
|
|
|
- alloc.Name = fmt.Sprintf("%s/%s", cluster, SharedSuffix)
|
|
|
|
|
|
|
+ // alloc.Name = fmt.Sprintf("%s/%s", cluster, SharedSuffix)
|
|
|
shareSet.Insert(alloc)
|
|
shareSet.Insert(alloc)
|
|
|
break
|
|
break
|
|
|
}
|
|
}
|
|
@@ -519,7 +516,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
// It's possible that no more un-shared, non-idle, non-external allocations
|
|
// It's possible that no more un-shared, non-idle, non-external allocations
|
|
|
// remain at this point. This always results in an emptySet.
|
|
// remain at this point. This always results in an emptySet.
|
|
|
if len(as.allocations) == 0 {
|
|
if len(as.allocations) == 0 {
|
|
|
- log.Warningf("ETL: AggregateBy: no allocations to aggregate")
|
|
|
|
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: no allocations to aggregate")
|
|
|
emptySet := &AllocationSet{
|
|
emptySet := &AllocationSet{
|
|
|
Window: as.Window.Clone(),
|
|
Window: as.Window.Clone(),
|
|
|
}
|
|
}
|
|
@@ -534,9 +531,8 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
// idleCoefficients are organized by [cluster][allocation][resource]=coeff
|
|
// idleCoefficients are organized by [cluster][allocation][resource]=coeff
|
|
|
var idleCoefficients map[string]map[string]map[string]float64
|
|
var idleCoefficients map[string]map[string]map[string]float64
|
|
|
|
|
|
|
|
- // shareCoefficients are organized by [allocation][resource]=coeff (no cluster)
|
|
|
|
|
|
|
+ // shareCoefficients are organized by [allocation]=coeff
|
|
|
var shareCoefficients map[string]float64
|
|
var shareCoefficients map[string]float64
|
|
|
- var flatShareCoefficients map[string]float64
|
|
|
|
|
|
|
|
|
|
var err error
|
|
var err error
|
|
|
|
|
|
|
@@ -544,20 +540,13 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
// coefficients for sharing the cost among the non-idle, non-aggregated
|
|
// coefficients for sharing the cost among the non-idle, non-aggregated
|
|
|
// allocations.
|
|
// allocations.
|
|
|
if idleSet.Length() > 0 && options.ShareIdle != ShareNone {
|
|
if idleSet.Length() > 0 && options.ShareIdle != ShareNone {
|
|
|
- idleCoefficients, err = computeIdleCoeffs(properties, options, as)
|
|
|
|
|
|
|
+ idleCoefficients, err = computeIdleCoeffs(properties, options, as, shareSet)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
log.Warningf("AllocationSet.AggregateBy: compute idle coeff: %s", err)
|
|
log.Warningf("AllocationSet.AggregateBy: compute idle coeff: %s", err)
|
|
|
return fmt.Errorf("error computing idle coefficients: %s", err)
|
|
return fmt.Errorf("error computing idle coefficients: %s", err)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- flatShareCoefficients, err = computeShareCoeffs(properties, options, as, false)
|
|
|
|
|
- log.Errorf("%+v", flatShareCoefficients)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- log.Warningf("AllocationSet.AggregateBy: compute shared coeff: missing cluster ID: %s", err)
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
// (2b) If we're not sharing idle and we're filtering, we need to track the
|
|
// (2b) If we're not sharing idle and we're filtering, we need to track the
|
|
|
// amount of each idle allocation to "delete" in order to maintain parity
|
|
// amount of each idle allocation to "delete" in order to maintain parity
|
|
|
// with the idle-allocated results. That is, we want to return only the
|
|
// with the idle-allocated results. That is, we want to return only the
|
|
@@ -565,13 +554,44 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
// the results, not the full idle cost.
|
|
// the results, not the full idle cost.
|
|
|
var idleFiltrationCoefficients map[string]map[string]map[string]float64
|
|
var idleFiltrationCoefficients map[string]map[string]map[string]float64
|
|
|
if len(options.FilterFuncs) > 0 && options.ShareIdle == ShareNone {
|
|
if len(options.FilterFuncs) > 0 && options.ShareIdle == ShareNone {
|
|
|
- idleFiltrationCoefficients, err = computeIdleCoeffs(properties, options, as)
|
|
|
|
|
|
|
+ idleFiltrationCoefficients, err = computeIdleCoeffs(properties, options, as, shareSet)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- log.Warningf("AllocationSet.AggregateBy: compute idle coeff: %s", err)
|
|
|
|
|
return fmt.Errorf("error computing idle filtration coefficients: %s", err)
|
|
return fmt.Errorf("error computing idle filtration coefficients: %s", err)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Convert SharedHourlyCosts to Allocations in the shareSet
|
|
|
|
|
+ // TODO comment: why do we have to do this here, after computing idle?
|
|
|
|
|
+ for name, cost := range options.SharedHourlyCosts {
|
|
|
|
|
+ if cost > 0.0 {
|
|
|
|
|
+ hours := as.Resolution().Hours()
|
|
|
|
|
+
|
|
|
|
|
+ // If set ends in the future, adjust hours accordingly
|
|
|
|
|
+ diff := time.Now().Sub(as.End())
|
|
|
|
|
+ if diff < 0.0 {
|
|
|
|
|
+ hours += diff.Hours()
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ totalSharedCost := cost * hours
|
|
|
|
|
+
|
|
|
|
|
+ shareSet.Insert(&Allocation{
|
|
|
|
|
+ Name: fmt.Sprintf("%s/%s", name, SharedSuffix),
|
|
|
|
|
+ Start: as.Start(),
|
|
|
|
|
+ End: as.End(),
|
|
|
|
|
+ SharedCost: totalSharedCost,
|
|
|
|
|
+ TotalCost: totalSharedCost,
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // (2c) Compute share coefficients (TODO comment)
|
|
|
|
|
+ if shareSet.Length() > 0 {
|
|
|
|
|
+ shareCoefficients, err = computeShareCoeffs(properties, options, as)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return fmt.Errorf("error computing share coefficients: %s", err)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// (3-5) Filter, distribute idle cost, and aggregate (in that order)
|
|
// (3-5) Filter, distribute idle cost, and aggregate (in that order)
|
|
|
for _, alloc := range as.allocations {
|
|
for _, alloc := range as.allocations {
|
|
|
cluster, err := alloc.Properties.GetCluster()
|
|
cluster, err := alloc.Properties.GetCluster()
|
|
@@ -623,11 +643,11 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
|
|
|
|
|
// Make sure idle coefficients exist
|
|
// Make sure idle coefficients exist
|
|
|
if _, ok := idleCoefficients[cluster]; !ok {
|
|
if _, ok := idleCoefficients[cluster]; !ok {
|
|
|
- log.Errorf("ETL: share (idle) allocation: error getting allocation coefficient [no cluster: '%s' in coefficients] for '%s'", cluster, alloc.Name)
|
|
|
|
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no cluster '%s' for '%s'", cluster, alloc.Name)
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
if _, ok := idleCoefficients[cluster][alloc.Name]; !ok {
|
|
if _, ok := idleCoefficients[cluster][alloc.Name]; !ok {
|
|
|
- log.Errorf("ETL: share (idle) allocation: error getting allocation coefficienct for '%s'", alloc.Name)
|
|
|
|
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -661,6 +681,52 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
aggSet.Insert(alloc)
|
|
aggSet.Insert(alloc)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // TODO comment
|
|
|
|
|
+ if idleSet.Length() > 0 {
|
|
|
|
|
+ for _, alloc := range shareSet.allocations {
|
|
|
|
|
+ cluster, err := alloc.Properties.GetCluster()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: missing cluster for allocation: %s", alloc.Name)
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Distribute idle allocations by coefficient per-cluster, per-allocation
|
|
|
|
|
+ for _, idleAlloc := range idleSet.allocations {
|
|
|
|
|
+ // Only share idle if the cluster matches; i.e. the allocation
|
|
|
|
|
+ // is from the same cluster as the idle costs
|
|
|
|
|
+ idleCluster, err := idleAlloc.Properties.GetCluster()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ if idleCluster != cluster {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Make sure idle coefficients exist
|
|
|
|
|
+ if _, ok := idleCoefficients[cluster]; !ok {
|
|
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no cluster '%s' for '%s'", cluster, alloc.Name)
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if _, ok := idleCoefficients[cluster][alloc.Name]; !ok {
|
|
|
|
|
+ log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[cluster][alloc.Name]["cpu"]
|
|
|
|
|
+ alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[cluster][alloc.Name]["gpu"]
|
|
|
|
|
+ alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[cluster][alloc.Name]["ram"]
|
|
|
|
|
+
|
|
|
|
|
+ idleCPUCost := idleAlloc.CPUCost * idleCoefficients[cluster][alloc.Name]["cpu"]
|
|
|
|
|
+ idleGPUCost := idleAlloc.GPUCost * idleCoefficients[cluster][alloc.Name]["gpu"]
|
|
|
|
|
+ idleRAMCost := idleAlloc.RAMCost * idleCoefficients[cluster][alloc.Name]["ram"]
|
|
|
|
|
+ alloc.CPUCost += idleCPUCost
|
|
|
|
|
+ alloc.GPUCost += idleGPUCost
|
|
|
|
|
+ alloc.RAMCost += idleRAMCost
|
|
|
|
|
+ alloc.TotalCost += idleCPUCost + idleGPUCost + idleRAMCost
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// clusterIdleFiltrationCoeffs is used to track per-resource idle
|
|
// clusterIdleFiltrationCoeffs is used to track per-resource idle
|
|
|
// coefficients on a cluster-by-cluster basis. It is, essentailly, an
|
|
// coefficients on a cluster-by-cluster basis. It is, essentailly, an
|
|
|
// aggregation of idleFiltrationCoefficients after they have been
|
|
// aggregation of idleFiltrationCoefficients after they have been
|
|
@@ -711,34 +777,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// (7) Split shared allocations and distribute among aggregated allocations
|
|
// (7) Split shared allocations and distribute among aggregated allocations
|
|
|
- if shareSet.Length() > 0 || flatShareSet.Length() > 0 {
|
|
|
|
|
- shareCoefficients, err = computeShareCoeffs(properties, options, aggSet, true)
|
|
|
|
|
- log.Errorf("Share Coefficients %+v", shareCoefficients)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- log.Warningf("AllocationSet.AggregateBy: compute shared coeff: missing cluster ID: %s", err)
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- for _, alloc := range aggSet.allocations {
|
|
|
|
|
- if alloc.IsIdle() {
|
|
|
|
|
- // Skip idle allocations (they do not receive shared allocation)
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Distribute shared allocations by coefficient per-allocation
|
|
|
|
|
- // NOTE: share coefficients do not partition by cluster, like
|
|
|
|
|
- // idle coefficients do.
|
|
|
|
|
- for _, sharedAlloc := range flatShareSet.allocations {
|
|
|
|
|
- if _, ok := flatShareCoefficients[alloc.Name]; !ok {
|
|
|
|
|
- log.Errorf("ETL: flat share allocation: error getting allocation coefficienct for '%s'", alloc.Name)
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- alloc.SharedCost += sharedAlloc.TotalCost * flatShareCoefficients[alloc.Name]
|
|
|
|
|
- alloc.TotalCost += sharedAlloc.TotalCost * flatShareCoefficients[alloc.Name]
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
|
|
+ if shareSet.Length() > 0 {
|
|
|
for _, alloc := range aggSet.allocations {
|
|
for _, alloc := range aggSet.allocations {
|
|
|
if alloc.IsIdle() {
|
|
if alloc.IsIdle() {
|
|
|
// Skip idle allocations (they do not receive shared allocation)
|
|
// Skip idle allocations (they do not receive shared allocation)
|
|
@@ -797,8 +836,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// TODO niko/etl deprecate the use of a map of resources here, we only use totals
|
|
|
|
|
-func computeShareCoeffs(properties Properties, options *AllocationAggregationOptions, as *AllocationSet, aggregated bool) (map[string]float64, error) {
|
|
|
|
|
|
|
+func computeShareCoeffs(properties Properties, options *AllocationAggregationOptions, as *AllocationSet) (map[string]float64, error) {
|
|
|
// Compute coeffs by totalling per-allocation, then dividing by the total.
|
|
// Compute coeffs by totalling per-allocation, then dividing by the total.
|
|
|
coeffs := map[string]float64{}
|
|
coeffs := map[string]float64{}
|
|
|
|
|
|
|
@@ -810,22 +848,44 @@ func computeShareCoeffs(properties Properties, options *AllocationAggregationOpt
|
|
|
shareType := options.ShareSplit
|
|
shareType := options.ShareSplit
|
|
|
|
|
|
|
|
// Record allocation values first, then normalize by totals to get percentages
|
|
// Record allocation values first, then normalize by totals to get percentages
|
|
|
- for name, alloc := range as.allocations {
|
|
|
|
|
|
|
+ for n, alloc := range as.allocations {
|
|
|
if alloc.IsIdle() {
|
|
if alloc.IsIdle() {
|
|
|
// Skip idle allocations in coefficient calculation
|
|
// Skip idle allocations in coefficient calculation
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
- if !aggregated {
|
|
|
|
|
- name, _ = alloc.generateKey(properties)
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // Determine the post-aggregation key under which the allocation will
|
|
|
|
|
+ // be shared.
|
|
|
|
|
+ name, err := alloc.generateKey(properties)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, fmt.Errorf(`failed to generate key for shared allocation "%s": %s`, n, err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // If the current allocation will be filtered out in step 3, contribute
|
|
|
|
|
+ // its share of the shared coefficient to a "__filtered__" bin, which
|
|
|
|
|
+ // will ultimately be dropped. This step ensures that the shared cost
|
|
|
|
|
+ // of a non-filtered allocation will be conserved even when the filter
|
|
|
|
|
+ // is removed. (Otherwise, all the shared cost will get redistributed
|
|
|
|
|
+ // over the unfiltered results, inflating their shared costs.)
|
|
|
|
|
+ filtered := false
|
|
|
|
|
+ for _, ff := range options.FilterFuncs {
|
|
|
|
|
+ if !ff(alloc) {
|
|
|
|
|
+ filtered = true
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if filtered {
|
|
|
|
|
+ name = "__filtered__"
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if shareType == ShareEven {
|
|
if shareType == ShareEven {
|
|
|
- // Not additive - set to 1.0 for even distribution
|
|
|
|
|
|
|
+ // Even distribution is not additive - set to 1.0 for everything
|
|
|
coeffs[name] = 1.0
|
|
coeffs[name] = 1.0
|
|
|
- // Total is always additive
|
|
|
|
|
- total += 1.0
|
|
|
|
|
|
|
+ // Total for even distribution is always the number of coefficients
|
|
|
|
|
+ total = float64(len(coeffs))
|
|
|
} else {
|
|
} else {
|
|
|
- // Both are additive for weighted distribution
|
|
|
|
|
|
|
+ // Both are additive for weighted distribution, where each
|
|
|
|
|
+ // cumulative coefficient will be divided by the total.
|
|
|
coeffs[name] += alloc.TotalCost
|
|
coeffs[name] += alloc.TotalCost
|
|
|
total += alloc.TotalCost
|
|
total += alloc.TotalCost
|
|
|
}
|
|
}
|
|
@@ -844,7 +904,7 @@ func computeShareCoeffs(properties Properties, options *AllocationAggregationOpt
|
|
|
return coeffs, nil
|
|
return coeffs, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func computeIdleCoeffs(properties Properties, options *AllocationAggregationOptions, as *AllocationSet) (map[string]map[string]map[string]float64, error) {
|
|
|
|
|
|
|
+func computeIdleCoeffs(properties Properties, options *AllocationAggregationOptions, as *AllocationSet, shareSet *AllocationSet) (map[string]map[string]map[string]float64, error) {
|
|
|
types := []string{"cpu", "gpu", "ram"}
|
|
types := []string{"cpu", "gpu", "ram"}
|
|
|
|
|
|
|
|
// Compute idle coefficients, then save them in AllocationAggregationOptions
|
|
// Compute idle coefficients, then save them in AllocationAggregationOptions
|
|
@@ -864,17 +924,50 @@ func computeIdleCoeffs(properties Properties, options *AllocationAggregationOpti
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // If any of the share funcs succeed, share the allocation. Do this
|
|
|
|
|
- // prior to filtering so that shared namespaces, etc do not get
|
|
|
|
|
- // filtered out before we have a chance to share them.
|
|
|
|
|
- skip := false
|
|
|
|
|
- for _, sf := range options.ShareFuncs {
|
|
|
|
|
- if sf(alloc) {
|
|
|
|
|
- skip = true
|
|
|
|
|
- break
|
|
|
|
|
|
|
+ // We need to key the allocations by cluster id
|
|
|
|
|
+ clusterID, err := alloc.Properties.GetCluster()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // get the name key for the allocation
|
|
|
|
|
+ name := alloc.Name
|
|
|
|
|
+
|
|
|
|
|
+ // Create cluster based tables if they don't exist
|
|
|
|
|
+ if _, ok := coeffs[clusterID]; !ok {
|
|
|
|
|
+ coeffs[clusterID] = map[string]map[string]float64{}
|
|
|
|
|
+ }
|
|
|
|
|
+ if _, ok := totals[clusterID]; !ok {
|
|
|
|
|
+ totals[clusterID] = map[string]float64{}
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if _, ok := coeffs[clusterID][name]; !ok {
|
|
|
|
|
+ coeffs[clusterID][name] = map[string]float64{}
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if shareType == ShareEven {
|
|
|
|
|
+ for _, r := range types {
|
|
|
|
|
+ // Not additive - hard set to 1.0
|
|
|
|
|
+ coeffs[clusterID][name][r] = 1.0
|
|
|
|
|
+
|
|
|
|
|
+ // totals are additive
|
|
|
|
|
+ totals[clusterID][r] += 1.0
|
|
|
}
|
|
}
|
|
|
|
|
+ } else {
|
|
|
|
|
+ coeffs[clusterID][name]["cpu"] += alloc.CPUCost
|
|
|
|
|
+ coeffs[clusterID][name]["gpu"] += alloc.GPUCost
|
|
|
|
|
+ coeffs[clusterID][name]["ram"] += alloc.RAMCost
|
|
|
|
|
+
|
|
|
|
|
+ totals[clusterID]["cpu"] += alloc.CPUCost
|
|
|
|
|
+ totals[clusterID]["gpu"] += alloc.GPUCost
|
|
|
|
|
+ totals[clusterID]["ram"] += alloc.RAMCost
|
|
|
}
|
|
}
|
|
|
- if skip {
|
|
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Do the same for shared allocations
|
|
|
|
|
+ for _, alloc := range shareSet.allocations {
|
|
|
|
|
+ if alloc.IsIdle() {
|
|
|
|
|
+ // Skip idle allocations in coefficient calculation
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|