|
|
@@ -11,6 +11,14 @@ import (
|
|
|
"github.com/kubecost/cost-model/pkg/log"
|
|
|
)
|
|
|
|
|
|
+// TODO Clean-up use of IsEmpty; nil checks should be separated for safety.
|
|
|
+
|
|
|
+// TODO Consider making Allocation an interface, which is fulfilled by structs
|
|
|
+// like KubernetesAllocation, IdleAllocation, and ExternalAllocation.
|
|
|
+
|
|
|
+// ExternalSuffix indicates an external allocation
|
|
|
+const ExternalSuffix = "__external__"
|
|
|
+
|
|
|
// IdleSuffix indicates an idle allocation property
|
|
|
const IdleSuffix = "__idle__"
|
|
|
|
|
|
@@ -53,9 +61,9 @@ type Allocation struct {
|
|
|
RAMCost float64 `json:"ramCost"`
|
|
|
RAMEfficiency float64 `json:"ramEfficiency"`
|
|
|
SharedCost float64 `json:"sharedCost"`
|
|
|
+ ExternalCost float64 `json:"externalCost"`
|
|
|
TotalCost float64 `json:"totalCost"`
|
|
|
TotalEfficiency float64 `json:"totalEfficiency"`
|
|
|
- // Profiler *log.Profiler `json:"-"`
|
|
|
}
|
|
|
|
|
|
// AllocationMatchFunc is a function that can be used to match Allocations by
|
|
|
@@ -75,7 +83,6 @@ func (a *Allocation) Add(that *Allocation) (*Allocation, error) {
|
|
|
}
|
|
|
|
|
|
agg := a.Clone()
|
|
|
- // agg.Profiler = a.Profiler
|
|
|
agg.add(that, false, false)
|
|
|
|
|
|
return agg, nil
|
|
|
@@ -106,11 +113,14 @@ func (a *Allocation) Clone() *Allocation {
|
|
|
RAMCost: a.RAMCost,
|
|
|
RAMEfficiency: a.RAMEfficiency,
|
|
|
SharedCost: a.SharedCost,
|
|
|
+ ExternalCost: a.ExternalCost,
|
|
|
TotalCost: a.TotalCost,
|
|
|
TotalEfficiency: a.TotalEfficiency,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// Equal returns true if the values held in the given Allocation precisely
|
|
|
+// match those of the receiving Allocation. nil does not match nil.
|
|
|
func (a *Allocation) Equal(that *Allocation) bool {
|
|
|
if a == nil || that == nil {
|
|
|
return false
|
|
|
@@ -167,6 +177,9 @@ func (a *Allocation) Equal(that *Allocation) bool {
|
|
|
if a.SharedCost != that.SharedCost {
|
|
|
return false
|
|
|
}
|
|
|
+ if a.ExternalCost != that.ExternalCost {
|
|
|
+ return false
|
|
|
+ }
|
|
|
if a.TotalCost != that.TotalCost {
|
|
|
return false
|
|
|
}
|
|
|
@@ -191,6 +204,11 @@ func (a *Allocation) IsAggregated() bool {
|
|
|
return a == nil || a.Properties == nil
|
|
|
}
|
|
|
|
|
|
+// IsExternal is true if the given Allocation represents external costs.
|
|
|
+func (a *Allocation) IsExternal() bool {
|
|
|
+ return strings.Contains(a.Name, ExternalSuffix)
|
|
|
+}
|
|
|
+
|
|
|
// IsIdle is true if the given Allocation represents idle costs.
|
|
|
func (a *Allocation) IsIdle() bool {
|
|
|
return strings.Contains(a.Name, IdleSuffix)
|
|
|
@@ -201,45 +219,6 @@ func (a *Allocation) IsUnallocated() bool {
|
|
|
return strings.Contains(a.Name, UnallocatedSuffix)
|
|
|
}
|
|
|
|
|
|
-// MatchesFilter returns true if the Allocation passes the given AllocationFilter
|
|
|
-func (a *Allocation) MatchesFilter(f AllocationMatchFunc) bool {
|
|
|
- return f(a)
|
|
|
-}
|
|
|
-
|
|
|
-// MatchesAll takes a variadic list of Properties, returning true iff the
|
|
|
-// Allocation matches each set of Properties.
|
|
|
-func (a *Allocation) MatchesAll(ps ...Properties) bool {
|
|
|
- // nil Allocation don't match any Properties
|
|
|
- if a == nil {
|
|
|
- return false
|
|
|
- }
|
|
|
-
|
|
|
- for _, p := range ps {
|
|
|
- if !a.Properties.Matches(p) {
|
|
|
- return false
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return true
|
|
|
-}
|
|
|
-
|
|
|
-// MatchesOne takes a variadic list of Properties, returning true iff the
|
|
|
-// Allocation matches at least one of the set of Properties.
|
|
|
-func (a *Allocation) MatchesOne(ps ...Properties) bool {
|
|
|
- // nil Allocation don't match any Properties
|
|
|
- if a == nil {
|
|
|
- return false
|
|
|
- }
|
|
|
-
|
|
|
- for _, p := range ps {
|
|
|
- if a.Properties.Matches(p) {
|
|
|
- return true
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return false
|
|
|
-}
|
|
|
-
|
|
|
// Share works like Add, but converts the entire cost of the given Allocation
|
|
|
// to SharedCost, rather than adding to the individual resource costs.
|
|
|
func (a *Allocation) Share(that *Allocation) (*Allocation, error) {
|
|
|
@@ -267,13 +246,7 @@ func (a *Allocation) String() string {
|
|
|
|
|
|
func (a *Allocation) add(that *Allocation, isShared, isAccumulating bool) {
|
|
|
if a == nil {
|
|
|
- a = that
|
|
|
-
|
|
|
- // reset properties
|
|
|
- thatCluster, _ := that.Properties.GetCluster()
|
|
|
- thatNode, _ := that.Properties.GetNode()
|
|
|
- a.Properties = Properties{ClusterProp: thatCluster, NodeProp: thatNode}
|
|
|
-
|
|
|
+ log.Warningf("Allocation.AggregateBy: trying to add a nil receiver")
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -349,6 +322,7 @@ func (a *Allocation) add(that *Allocation, isShared, isAccumulating bool) {
|
|
|
}
|
|
|
|
|
|
a.SharedCost += that.SharedCost
|
|
|
+ a.ExternalCost += that.ExternalCost
|
|
|
a.CPUCost += that.CPUCost
|
|
|
a.GPUCost += that.GPUCost
|
|
|
a.NetworkCost += that.NetworkCost
|
|
|
@@ -363,20 +337,22 @@ func (a *Allocation) add(that *Allocation, isShared, isAccumulating bool) {
|
|
|
// a window. An AllocationSet is mutable, so treat it like a threadsafe map.
|
|
|
type AllocationSet struct {
|
|
|
sync.RWMutex
|
|
|
- // Profiler *log.Profiler
|
|
|
- allocations map[string]*Allocation
|
|
|
- idleKeys map[string]bool
|
|
|
- Window Window
|
|
|
- Warnings []string
|
|
|
- Errors []string
|
|
|
+ allocations map[string]*Allocation
|
|
|
+ externalKeys map[string]bool
|
|
|
+ idleKeys map[string]bool
|
|
|
+ Window Window
|
|
|
+ Warnings []string
|
|
|
+ Errors []string
|
|
|
}
|
|
|
|
|
|
// NewAllocationSet instantiates a new AllocationSet and, optionally, inserts
|
|
|
// the given list of Allocations
|
|
|
func NewAllocationSet(start, end time.Time, allocs ...*Allocation) *AllocationSet {
|
|
|
as := &AllocationSet{
|
|
|
- allocations: map[string]*Allocation{},
|
|
|
- Window: NewWindow(&start, &end),
|
|
|
+ allocations: map[string]*Allocation{},
|
|
|
+ externalKeys: map[string]bool{},
|
|
|
+ idleKeys: map[string]bool{},
|
|
|
+ Window: NewWindow(&start, &end),
|
|
|
}
|
|
|
|
|
|
for _, a := range allocs {
|
|
|
@@ -407,20 +383,25 @@ type AllocationAggregationOptions struct {
|
|
|
// given Property; e.g. Containers can be divided by Namespace, but not vice-a-versa.
|
|
|
func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationAggregationOptions) error {
|
|
|
// The order of operations for aggregating allocations is as follows:
|
|
|
- // 1. move shared and/or idle allocations to separate sets if options
|
|
|
- // indicate that they should be shared
|
|
|
- // 2. idle coefficients
|
|
|
- // 2.a) if idle allocation is to be shared, compute idle coefficients
|
|
|
- // (do not compute shared coefficients here, see step 5)
|
|
|
- // 2.b) if idle allocation is NOT shared, but filters are present, compute
|
|
|
- // idle filtration coefficients for the purpose of only returning the
|
|
|
- // portion of idle allocation that would have been shared with the
|
|
|
- // unfiltered results set. (See unit tests 5.a,b,c)
|
|
|
- // 3. ignore allocation if it fails any of the FilterFuncs
|
|
|
- // 4. generate aggregation key and insert allocation into the output set
|
|
|
- // 5. if there are shared allocations, compute sharing coefficients on
|
|
|
+ // 1. Partition external, idle, and shared allocations into separate sets
|
|
|
+ // 2. Compute idle coefficients (if necessary)
|
|
|
+ // a) if idle allocation is to be shared, compute idle coefficients
|
|
|
+ // (do not compute shared coefficients here, see step 5)
|
|
|
+ // b) if idle allocation is NOT shared, but filters are present, compute
|
|
|
+ // idle filtration coefficients for the purpose of only returning the
|
|
|
+ // portion of idle allocation that would have been shared with the
|
|
|
+ // unfiltered results set. (See unit tests 5.a,b,c)
|
|
|
+ // 3. Ignore allocation if it fails any of the FilterFuncs
|
|
|
+ // 4. Distribute idle allocations among remaining non-idle, non-external
|
|
|
+ // allocations
|
|
|
+ // 5. Generate aggregation key and insert allocation into the output set
|
|
|
+ // 6. Scale un-aggregated idle coefficients by filtration coefficient
|
|
|
+ // 7. If there are shared allocations, compute sharing coefficients on
|
|
|
// the aggregated set, then share allocation accordingly
|
|
|
- // 6. if the merge idle option is enabled, merge any remaining idle
|
|
|
+ // 8. If there are external allocations that can be aggregated into
|
|
|
+ // the output (i.e. they can be used to generate a valid key for
|
|
|
+ // the given properties) then aggregate; otherwise... ignore them?
|
|
|
+ // 9. If the merge idle option is enabled, merge any remaining idle
|
|
|
// allocations into a single idle allocation
|
|
|
|
|
|
// TODO niko/etl revisit (ShareIdle: ShareEven) case, which is probably wrong
|
|
|
@@ -436,24 +417,27 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
|
|
|
// aggSet will collect the aggregated allocations
|
|
|
aggSet := &AllocationSet{
|
|
|
- // Profiler: as.Profiler,
|
|
|
+ Window: as.Window.Clone(),
|
|
|
+ }
|
|
|
+
|
|
|
+ // externalSet will collect external allocations
|
|
|
+ externalSet := &AllocationSet{
|
|
|
Window: as.Window.Clone(),
|
|
|
}
|
|
|
|
|
|
// idleSet will be shared among aggSet after initial aggregation
|
|
|
// is complete
|
|
|
idleSet := &AllocationSet{
|
|
|
- // Profiler: as.Profiler,
|
|
|
Window: as.Window.Clone(),
|
|
|
}
|
|
|
|
|
|
// shareSet will be shared among aggSet after initial aggregation
|
|
|
// is complete
|
|
|
shareSet := &AllocationSet{
|
|
|
- // Profiler: as.Profiler
|
|
|
Window: as.Window.Clone(),
|
|
|
}
|
|
|
|
|
|
+ // Convert SharedHourlyCosts to Allocations in the shareSet
|
|
|
for name, cost := range options.SharedHourlyCosts {
|
|
|
if cost > 0.0 {
|
|
|
hours := as.Resolution().Hours()
|
|
|
@@ -479,21 +463,29 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
as.Lock()
|
|
|
defer as.Unlock()
|
|
|
|
|
|
- // Loop and find all of the idle and shared allocations initially. Add
|
|
|
- // them to their respective sets, removing them from the set of
|
|
|
- // allocations to aggregate.
|
|
|
+ // (1) Loop and find all of the external, idle, and shared allocations. Add
|
|
|
+ // them to their respective sets, removing them from the set of allocations
|
|
|
+ // to aggregate.
|
|
|
for _, alloc := range as.allocations {
|
|
|
+ // External allocations get aggregated post-hoc (see step 6) and do
|
|
|
+ // not necessarily contain complete sets of properties, so they are
|
|
|
+ // moved to a separate AllocationSet.
|
|
|
+ if alloc.IsExternal() {
|
|
|
+ delete(as.externalKeys, alloc.Name)
|
|
|
+ delete(as.allocations, alloc.Name)
|
|
|
+ externalSet.Insert(alloc)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
cluster, err := alloc.Properties.GetCluster()
|
|
|
if err != nil {
|
|
|
log.Warningf("AllocationSet.AggregateBy: missing cluster for allocation: %s", alloc.Name)
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- // Idle allocation doesn't get aggregated, so it can be passed through,
|
|
|
- // whether or not it is shared. If it is shared, it is put in idleSet
|
|
|
- // because shareSet may be split by different rules (even/weighted).
|
|
|
+ // Idle allocations should be separated into idleSet if they are to be
|
|
|
+ // shared later on. If they are not to be shared, then aggregate them.
|
|
|
if alloc.IsIdle() {
|
|
|
- // Can't recursively call Delete() due to lock acquisition
|
|
|
delete(as.idleKeys, alloc.Name)
|
|
|
delete(as.allocations, alloc.Name)
|
|
|
|
|
|
@@ -502,14 +494,15 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
} else {
|
|
|
aggSet.Insert(alloc)
|
|
|
}
|
|
|
+
|
|
|
+ continue
|
|
|
}
|
|
|
|
|
|
- // If any of the share funcs succeed, share the allocation. Do this
|
|
|
- // prior to filtering so that shared namespaces, etc do not get
|
|
|
- // filtered out before we have a chance to share them.
|
|
|
+ // Shared allocations must be identified and separated prior to
|
|
|
+ // aggregation and filtering. That is, if any of the ShareFuncs
|
|
|
+ // return true, then move the allocation to shareSet.
|
|
|
for _, sf := range options.ShareFuncs {
|
|
|
if sf(alloc) {
|
|
|
- // Can't recursively call Delete() due to lock acquisition
|
|
|
delete(as.idleKeys, alloc.Name)
|
|
|
delete(as.allocations, alloc.Name)
|
|
|
|
|
|
@@ -520,6 +513,8 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // It's possible that no more un-shared, non-idle, non-external allocations
|
|
|
+ // remain at this point. This always results in an emptySet.
|
|
|
if len(as.allocations) == 0 {
|
|
|
log.Warningf("ETL: AggregateBy: no allocations to aggregate")
|
|
|
emptySet := &AllocationSet{
|
|
|
@@ -529,24 +524,30 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- // In order to correctly apply idle and shared resource coefficients appropriately,
|
|
|
- // we need to determine the coefficients for the full set of data. The ensures that
|
|
|
- // the ratios are maintained through filtering.
|
|
|
+ // (2) In order to correctly apply idle and shared resource coefficients
|
|
|
+ // appropriately, we need to determine the coefficients for the full set
|
|
|
+ // of data. The ensures that the ratios are maintained through filtering.
|
|
|
+
|
|
|
// idleCoefficients are organized by [cluster][allocation][resource]=coeff
|
|
|
var idleCoefficients map[string]map[string]map[string]float64
|
|
|
+
|
|
|
// shareCoefficients are organized by [allocation][resource]=coeff (no cluster)
|
|
|
var shareCoefficients map[string]float64
|
|
|
+
|
|
|
var err error
|
|
|
|
|
|
+ // (2a) If there are idle costs and we intend to share them, compute the
|
|
|
+ // coefficients for sharing the cost among the non-idle, non-aggregated
|
|
|
+ // allocations.
|
|
|
if idleSet.Length() > 0 && options.ShareIdle != ShareNone {
|
|
|
idleCoefficients, err = computeIdleCoeffs(properties, options, as)
|
|
|
if err != nil {
|
|
|
log.Warningf("AllocationSet.AggregateBy: compute idle coeff: %s", err)
|
|
|
- return err
|
|
|
+ return fmt.Errorf("error computing idle coefficients: %s", err)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // If we're not sharing idle and we're filtering, we need to track the
|
|
|
+ // (2b) If we're not sharing idle and we're filtering, we need to track the
|
|
|
// amount of each idle allocation to "delete" in order to maintain parity
|
|
|
// with the idle-allocated results. That is, we want to return only the
|
|
|
// idle cost that would have been shared with the unfiltered portion of
|
|
|
@@ -556,10 +557,11 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
idleFiltrationCoefficients, err = computeIdleCoeffs(properties, options, as)
|
|
|
if err != nil {
|
|
|
log.Warningf("AllocationSet.AggregateBy: compute idle coeff: %s", err)
|
|
|
- return err
|
|
|
+ return fmt.Errorf("error computing idle filtration coefficients: %s", err)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // (3-5) Filter, distribute idle cost, and aggregate (in that order)
|
|
|
for _, alloc := range as.allocations {
|
|
|
cluster, err := alloc.Properties.GetCluster()
|
|
|
if err != nil {
|
|
|
@@ -569,7 +571,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
|
|
|
skip := false
|
|
|
|
|
|
- // If any of the filter funcs fail, immediately skip the allocation.
|
|
|
+ // (3) If any of the filter funcs fail, immediately skip the allocation.
|
|
|
for _, ff := range options.FilterFuncs {
|
|
|
if !ff(alloc) {
|
|
|
skip = true
|
|
|
@@ -590,9 +592,11 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- // Split idle allocations and distribute among aggregated allocations
|
|
|
- // NOTE: if idle allocation is off (i.e. ShareIdle == ShareNone) then all
|
|
|
- // idle allocations will be in the aggSet at this point.
|
|
|
+ // (4) Split idle allocations and distribute among remaining
|
|
|
+ // un-aggregated allocations.
|
|
|
+ // NOTE: if idle allocation is off (i.e. ShareIdle == ShareNone) then
|
|
|
+ // all idle allocations will be in the aggSet at this point, so idleSet
|
|
|
+ // will be empty and we won't enter this block.
|
|
|
if idleSet.Length() > 0 {
|
|
|
// Distribute idle allocations by coefficient per-cluster, per-allocation
|
|
|
for _, idleAlloc := range idleSet.allocations {
|
|
|
@@ -630,6 +634,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // (5) generate key to use for aggregation-by-key and allocation name
|
|
|
key, err := alloc.generateKey(properties)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
@@ -640,9 +645,15 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
alloc.Name = UnallocatedSuffix
|
|
|
}
|
|
|
|
|
|
+ // Inserting the allocation with the generated key for a name will
|
|
|
+ // perform the actual basic aggregation step.
|
|
|
aggSet.Insert(alloc)
|
|
|
}
|
|
|
|
|
|
+ // clusterIdleFiltrationCoeffs is used to track per-resource idle
|
|
|
+ // coefficients on a cluster-by-cluster basis. It is, essentailly, an
|
|
|
+ // aggregation of idleFiltrationCoefficients after they have been
|
|
|
+ // filtered above (in step 3)
|
|
|
var clusterIdleFiltrationCoeffs map[string]map[string]float64
|
|
|
if idleFiltrationCoefficients != nil {
|
|
|
clusterIdleFiltrationCoeffs = map[string]map[string]float64{}
|
|
|
@@ -664,9 +675,10 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // If we have filters, and so have computed coefficients for scaling idle
|
|
|
- // allocation costs by cluster, then use those coefficients to scale down
|
|
|
- // each idle coefficient in the aggSet.
|
|
|
+ // (6) If we have both un-shared idle allocations and idle filtration
|
|
|
+ // coefficients (i.e. we have computed coefficients for scaling idle
|
|
|
+ // allocation costs by cluster) then use those coefficients to scale down
|
|
|
+ // each idle allocation.
|
|
|
if len(aggSet.idleKeys) > 0 && clusterIdleFiltrationCoeffs != nil {
|
|
|
for idleKey := range aggSet.idleKeys {
|
|
|
idleAlloc := aggSet.Get(idleKey)
|
|
|
@@ -687,7 +699,7 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Split shared allocations and distribute among aggregated allocations
|
|
|
+ // (7) Split shared allocations and distribute among aggregated allocations
|
|
|
if shareSet.Length() > 0 {
|
|
|
shareCoefficients, err = computeShareCoeffs(properties, options, aggSet)
|
|
|
if err != nil {
|
|
|
@@ -716,7 +728,21 @@ func (as *AllocationSet) AggregateBy(properties Properties, options *AllocationA
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Combine all idle allocations into a single "__idle__" allocation
|
|
|
+ // (8) Aggregate external allocations into aggregated allocations. This may
|
|
|
+ // not be possible for every external allocation, but attempt to find an
|
|
|
+ // exact key match, given each external allocation's proerties, and
|
|
|
+ // aggregate if an exact match is found.
|
|
|
+ for _, alloc := range externalSet.allocations {
|
|
|
+ key, err := alloc.generateKey(properties)
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ alloc.Name = key
|
|
|
+ aggSet.Insert(alloc)
|
|
|
+ }
|
|
|
+
|
|
|
+ // (9) Combine all idle allocations into a single "__idle__" allocation
|
|
|
if !options.SplitIdle {
|
|
|
for _, idleAlloc := range aggSet.IdleAllocations() {
|
|
|
aggSet.Delete(idleAlloc.Name)
|
|
|
@@ -1034,6 +1060,7 @@ func (alloc *Allocation) generateKey(properties Properties) (string, error) {
|
|
|
return strings.Join(names, "/"), nil
|
|
|
}
|
|
|
|
|
|
+// TODO clean up
|
|
|
// Helper function to check for slice membership. Not sure if repeated elsewhere in our codebase.
|
|
|
func indexOf(v string, arr []string) int {
|
|
|
for i, s := range arr {
|
|
|
@@ -1060,12 +1087,133 @@ func (as *AllocationSet) Clone() *AllocationSet {
|
|
|
allocs[k] = v.Clone()
|
|
|
}
|
|
|
|
|
|
+ externalKeys := map[string]bool{}
|
|
|
+ for k, v := range as.externalKeys {
|
|
|
+ externalKeys[k] = v
|
|
|
+ }
|
|
|
+
|
|
|
+ idleKeys := map[string]bool{}
|
|
|
+ for k, v := range as.idleKeys {
|
|
|
+ idleKeys[k] = v
|
|
|
+ }
|
|
|
+
|
|
|
return &AllocationSet{
|
|
|
- allocations: allocs,
|
|
|
- Window: as.Window.Clone(),
|
|
|
+ allocations: allocs,
|
|
|
+ externalKeys: externalKeys,
|
|
|
+ idleKeys: idleKeys,
|
|
|
+ Window: as.Window.Clone(),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// ComputeIdleAllocations computes the idle allocations for the AllocationSet,
|
|
|
+// given a set of Assets. Ideally, assetSet should contain only Nodes, but if
|
|
|
+// it contains other Assets, they will be ignored; only CPU, GPU and RAM are
|
|
|
+// considered for idle allocation. One idle allocation per-cluster will be
|
|
|
+// computed and returned, keyed by cluster_id.
|
|
|
+func (as *AllocationSet) ComputeIdleAllocations(assetSet *AssetSet) (map[string]*Allocation, error) {
|
|
|
+ if as == nil {
|
|
|
+ return nil, fmt.Errorf("cannot compute idle allocation for nil AllocationSet")
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO: external allocation: remove after testing and benchmarking
|
|
|
+ profStart := time.Now()
|
|
|
+ defer log.Profile(profStart, fmt.Sprintf("ComputeIdleAllocations: %s", as.Window))
|
|
|
+
|
|
|
+ if assetSet == nil {
|
|
|
+ return nil, fmt.Errorf("cannot compute idle allocation with nil AssetSet")
|
|
|
+ }
|
|
|
+
|
|
|
+ if !as.Window.Equal(assetSet.Window) {
|
|
|
+ return nil, fmt.Errorf("cannot compute idle allocation for sets with mismatched windows: %s != %s", as.Window, assetSet.Window)
|
|
|
+ }
|
|
|
+
|
|
|
+ window := as.Window
|
|
|
+
|
|
|
+ // Build a map of cumulative cluster asset costs, per resource; i.e.
|
|
|
+ // cluster-to-{cpu|gpu|ram}-to-cost.
|
|
|
+ assetClusterResourceCosts := map[string]map[string]float64{}
|
|
|
+ assetSet.Each(func(key string, a Asset) {
|
|
|
+ if node, ok := a.(*Node); ok {
|
|
|
+ if _, ok := assetClusterResourceCosts[node.Properties().Cluster]; !ok {
|
|
|
+ assetClusterResourceCosts[node.Properties().Cluster] = map[string]float64{}
|
|
|
+ }
|
|
|
+ assetClusterResourceCosts[node.Properties().Cluster]["cpu"] += node.CPUCost * (1.0 - node.Discount)
|
|
|
+ assetClusterResourceCosts[node.Properties().Cluster]["gpu"] += node.GPUCost * (1.0 - node.Discount)
|
|
|
+ assetClusterResourceCosts[node.Properties().Cluster]["ram"] += node.RAMCost * (1.0 - node.Discount)
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ // Determine start, end on a per-cluster basis
|
|
|
+ clusterStarts := map[string]time.Time{}
|
|
|
+ clusterEnds := map[string]time.Time{}
|
|
|
+
|
|
|
+ // Subtract allocated costs from asset costs, leaving only the remaining
|
|
|
+ // idle costs.
|
|
|
+ as.Each(func(name string, a *Allocation) {
|
|
|
+ cluster, err := a.Properties.GetCluster()
|
|
|
+ if err != nil {
|
|
|
+ // Failed to find allocation's cluster
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, ok := assetClusterResourceCosts[cluster]; !ok {
|
|
|
+ // Failed to find assets for allocation's cluster
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Set cluster (start, end) if they are either not currently set,
|
|
|
+ // or if the detected (start, end) of the current allocation falls
|
|
|
+ // before or after, respectively, the current values.
|
|
|
+ if s, ok := clusterStarts[cluster]; !ok || a.Start.Before(s) {
|
|
|
+ clusterStarts[cluster] = a.Start
|
|
|
+ }
|
|
|
+ if e, ok := clusterEnds[cluster]; !ok || a.End.Before(e) {
|
|
|
+ clusterEnds[cluster] = a.End
|
|
|
+ }
|
|
|
+
|
|
|
+ assetClusterResourceCosts[cluster]["cpu"] -= a.CPUCost
|
|
|
+ assetClusterResourceCosts[cluster]["gpu"] -= a.GPUCost
|
|
|
+ assetClusterResourceCosts[cluster]["ram"] -= a.RAMCost
|
|
|
+ })
|
|
|
+
|
|
|
+ // Turn remaining un-allocated asset costs into idle allocations
|
|
|
+ idleAllocs := map[string]*Allocation{}
|
|
|
+ for cluster, resources := range assetClusterResourceCosts {
|
|
|
+ // Default start and end to the (start, end) of the given window, but
|
|
|
+ // use the actual, detected (start, end) pair if they are available.
|
|
|
+ start := *window.Start()
|
|
|
+ if s, ok := clusterStarts[cluster]; ok && window.Contains(s) {
|
|
|
+ start = s
|
|
|
+ }
|
|
|
+ end := *window.End()
|
|
|
+ if e, ok := clusterEnds[cluster]; ok && window.Contains(e) {
|
|
|
+ end = e
|
|
|
+ }
|
|
|
+
|
|
|
+ idleAlloc := &Allocation{
|
|
|
+ Name: fmt.Sprintf("%s/%s", cluster, IdleSuffix),
|
|
|
+ Properties: Properties{ClusterProp: cluster},
|
|
|
+ Start: start,
|
|
|
+ End: end,
|
|
|
+ Minutes: end.Sub(start).Minutes(), // TODO deprecate w/ niko/allocation-minutes
|
|
|
+ CPUCost: resources["cpu"],
|
|
|
+ GPUCost: resources["gpu"],
|
|
|
+ RAMCost: resources["ram"],
|
|
|
+ }
|
|
|
+ idleAlloc.TotalCost = idleAlloc.CPUCost + idleAlloc.GPUCost + idleAlloc.RAMCost
|
|
|
+
|
|
|
+ // Do not continue if multiple idle allocations are computed for a
|
|
|
+ // single cluster.
|
|
|
+ if _, ok := idleAllocs[cluster]; ok {
|
|
|
+ return nil, fmt.Errorf("duplicate idle allocations for cluster %s", cluster)
|
|
|
+ }
|
|
|
+
|
|
|
+ idleAllocs[cluster] = idleAlloc
|
|
|
+ }
|
|
|
+
|
|
|
+ return idleAllocs, nil
|
|
|
+}
|
|
|
+
|
|
|
// Delete removes the allocation with the given name from the set
|
|
|
func (as *AllocationSet) Delete(name string) {
|
|
|
if as == nil {
|
|
|
@@ -1074,6 +1222,7 @@ func (as *AllocationSet) Delete(name string) {
|
|
|
|
|
|
as.Lock()
|
|
|
defer as.Unlock()
|
|
|
+ delete(as.externalKeys, name)
|
|
|
delete(as.idleKeys, name)
|
|
|
delete(as.allocations, name)
|
|
|
}
|
|
|
@@ -1114,6 +1263,44 @@ func (as *AllocationSet) Get(key string) *Allocation {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+// ExternalAllocations returns a map of the external allocations in the set.
|
|
|
+// Returns clones of the actual Allocations, so mutability is not a problem.
|
|
|
+func (as *AllocationSet) ExternalAllocations() map[string]*Allocation {
|
|
|
+ externals := map[string]*Allocation{}
|
|
|
+
|
|
|
+ if as.IsEmpty() {
|
|
|
+ return externals
|
|
|
+ }
|
|
|
+
|
|
|
+ as.RLock()
|
|
|
+ defer as.RUnlock()
|
|
|
+
|
|
|
+ for key := range as.externalKeys {
|
|
|
+ if alloc, ok := as.allocations[key]; ok {
|
|
|
+ externals[key] = alloc.Clone()
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return externals
|
|
|
+}
|
|
|
+
|
|
|
+// ExternalCost returns the total aggregated external costs of the set
|
|
|
+func (as *AllocationSet) ExternalCost() float64 {
|
|
|
+ if as.IsEmpty() {
|
|
|
+ return 0.0
|
|
|
+ }
|
|
|
+
|
|
|
+ as.RLock()
|
|
|
+ defer as.RUnlock()
|
|
|
+
|
|
|
+ externalCost := 0.0
|
|
|
+ for _, alloc := range as.allocations {
|
|
|
+ externalCost += alloc.ExternalCost
|
|
|
+ }
|
|
|
+
|
|
|
+ return externalCost
|
|
|
+}
|
|
|
+
|
|
|
// IdleAllocations returns a map of the idle allocations in the AllocationSet.
|
|
|
// Returns clones of the actual Allocations, so mutability is not a problem.
|
|
|
func (as *AllocationSet) IdleAllocations() map[string]*Allocation {
|
|
|
@@ -1143,16 +1330,25 @@ func (as *AllocationSet) Insert(that *Allocation) error {
|
|
|
}
|
|
|
|
|
|
func (as *AllocationSet) insert(that *Allocation, accumulate bool) error {
|
|
|
- if as.IsEmpty() {
|
|
|
- as.Lock()
|
|
|
- as.allocations = map[string]*Allocation{}
|
|
|
- as.idleKeys = map[string]bool{}
|
|
|
- as.Unlock()
|
|
|
+ if as == nil {
|
|
|
+ return fmt.Errorf("cannot insert into nil AllocationSet")
|
|
|
}
|
|
|
|
|
|
as.Lock()
|
|
|
defer as.Unlock()
|
|
|
|
|
|
+ if as.allocations == nil {
|
|
|
+ as.allocations = map[string]*Allocation{}
|
|
|
+ }
|
|
|
+
|
|
|
+ if as.externalKeys == nil {
|
|
|
+ as.externalKeys = map[string]bool{}
|
|
|
+ }
|
|
|
+
|
|
|
+ if as.idleKeys == nil {
|
|
|
+ as.idleKeys = map[string]bool{}
|
|
|
+ }
|
|
|
+
|
|
|
// Add the given Allocation to the existing entry, if there is one;
|
|
|
// otherwise just set directly into allocations
|
|
|
if _, ok := as.allocations[that.Name]; !ok {
|
|
|
@@ -1161,6 +1357,11 @@ func (as *AllocationSet) insert(that *Allocation, accumulate bool) error {
|
|
|
as.allocations[that.Name].add(that, false, accumulate)
|
|
|
}
|
|
|
|
|
|
+ // If the given Allocation is an external one, record that
|
|
|
+ if that.IsExternal() {
|
|
|
+ as.externalKeys[that.Name] = true
|
|
|
+ }
|
|
|
+
|
|
|
// If the given Allocation is an idle one, record that
|
|
|
if that.IsIdle() {
|
|
|
as.idleKeys[that.Name] = true
|
|
|
@@ -1213,10 +1414,13 @@ func (as *AllocationSet) Resolution() time.Duration {
|
|
|
return as.Window.Duration()
|
|
|
}
|
|
|
|
|
|
+// Set uses the given Allocation to overwrite the existing entry in the
|
|
|
+// AllocationSet under the Allocation's name.
|
|
|
func (as *AllocationSet) Set(alloc *Allocation) error {
|
|
|
if as.IsEmpty() {
|
|
|
as.Lock()
|
|
|
as.allocations = map[string]*Allocation{}
|
|
|
+ as.externalKeys = map[string]bool{}
|
|
|
as.idleKeys = map[string]bool{}
|
|
|
as.Unlock()
|
|
|
}
|
|
|
@@ -1226,6 +1430,11 @@ func (as *AllocationSet) Set(alloc *Allocation) error {
|
|
|
|
|
|
as.allocations[alloc.Name] = alloc
|
|
|
|
|
|
+ // If the given Allocation is an external one, record that
|
|
|
+ if alloc.IsExternal() {
|
|
|
+ as.externalKeys[alloc.Name] = true
|
|
|
+ }
|
|
|
+
|
|
|
// If the given Allocation is an idle one, record that
|
|
|
if alloc.IsIdle() {
|
|
|
as.idleKeys[alloc.Name] = true
|
|
|
@@ -1272,6 +1481,7 @@ func (as *AllocationSet) TotalCost() float64 {
|
|
|
return tc
|
|
|
}
|
|
|
|
|
|
+// UTCOffset returns the AllocationSet's configured UTCOffset.
|
|
|
func (as *AllocationSet) UTCOffset() time.Duration {
|
|
|
_, zone := as.Start().Zone()
|
|
|
return time.Duration(zone) * time.Second
|
|
|
@@ -1339,11 +1549,17 @@ func (as *AllocationSet) accumulate(that *AllocationSet) (*AllocationSet, error)
|
|
|
return acc, nil
|
|
|
}
|
|
|
|
|
|
+// AllocationSetRange is a thread-safe slice of AllocationSets. It is meant to
|
|
|
+// be used such that the AllocationSets held are consecutive and coherent with
|
|
|
+// respect to using the same aggregation properties, UTC offset, and
|
|
|
+// resolution. However these rules are not necessarily enforced, so use wisely.
|
|
|
type AllocationSetRange struct {
|
|
|
sync.RWMutex
|
|
|
allocations []*AllocationSet
|
|
|
}
|
|
|
|
|
|
+// NewAllocationSetRange instantiates a new range composed of the given
|
|
|
+// AllocationSets in the order provided.
|
|
|
func NewAllocationSetRange(allocs ...*AllocationSet) *AllocationSetRange {
|
|
|
return &AllocationSetRange{
|
|
|
allocations: allocs,
|
|
|
@@ -1372,6 +1588,8 @@ func (asr *AllocationSetRange) Accumulate() (*AllocationSet, error) {
|
|
|
// TODO niko/etl accumulate into lower-resolution chunks of the given resolution
|
|
|
// func (asr *AllocationSetRange) AccumulateBy(resolution time.Duration) *AllocationSetRange
|
|
|
|
|
|
+// AggregateBy aggregates each AllocationSet in the range by the given
|
|
|
+// properties and options.
|
|
|
func (asr *AllocationSetRange) AggregateBy(properties Properties, options *AllocationAggregationOptions) error {
|
|
|
aggRange := &AllocationSetRange{allocations: []*AllocationSet{}}
|
|
|
|
|
|
@@ -1391,6 +1609,8 @@ func (asr *AllocationSetRange) AggregateBy(properties Properties, options *Alloc
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+// Append appends the given AllocationSet to the end of the range. It does not
|
|
|
+// validate whether or not that violates window continuity.
|
|
|
func (asr *AllocationSetRange) Append(that *AllocationSet) {
|
|
|
asr.Lock()
|
|
|
defer asr.Unlock()
|
|
|
@@ -1408,6 +1628,7 @@ func (asr *AllocationSetRange) Each(f func(int, *AllocationSet)) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// Get retrieves the AllocationSet at the given index of the range.
|
|
|
func (asr *AllocationSetRange) Get(i int) (*AllocationSet, error) {
|
|
|
if i < 0 || i >= len(asr.allocations) {
|
|
|
return nil, fmt.Errorf("AllocationSetRange: index out of range: %d", i)
|
|
|
@@ -1418,6 +1639,64 @@ func (asr *AllocationSetRange) Get(i int) (*AllocationSet, error) {
|
|
|
return asr.allocations[i], nil
|
|
|
}
|
|
|
|
|
|
+// InsertRange merges the given AllocationSetRange into the receiving one by
|
|
|
+// lining up sets with matching windows, then inserting each allocation from
|
|
|
+// the given ASR into the respective set in the receiving ASR. If the given
|
|
|
+// ASR contains an AllocationSet from a window that does not exist in the
|
|
|
+// receiving ASR, then an error is returned. However, the given ASR does not
|
|
|
+// need to cover the full range of the receiver.
|
|
|
+func (asr *AllocationSetRange) InsertRange(that *AllocationSetRange) error {
|
|
|
+ if asr == nil {
|
|
|
+ return fmt.Errorf("cannot insert range into nil AllocationSetRange")
|
|
|
+ }
|
|
|
+
|
|
|
+ // keys maps window to index in asr
|
|
|
+ keys := map[string]int{}
|
|
|
+ asr.Each(func(i int, as *AllocationSet) {
|
|
|
+ if as == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ keys[as.Window.String()] = i
|
|
|
+ })
|
|
|
+
|
|
|
+ // Nothing to merge, so simply return
|
|
|
+ if len(keys) == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ var err error
|
|
|
+ that.Each(func(j int, thatAS *AllocationSet) {
|
|
|
+ if thatAS == nil || err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Find matching AllocationSet in asr
|
|
|
+ i, ok := keys[thatAS.Window.String()]
|
|
|
+ if !ok {
|
|
|
+ err = fmt.Errorf("cannot merge AllocationSet into window that does not exist: %s", thatAS.Window.String())
|
|
|
+ return
|
|
|
+ }
|
|
|
+ as, err := asr.Get(i)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("AllocationSetRange index does not exist: %d", i)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Insert each Allocation from the given set
|
|
|
+ thatAS.Each(func(k string, alloc *Allocation) {
|
|
|
+ err = as.Insert(alloc)
|
|
|
+ if err != nil {
|
|
|
+ err = fmt.Errorf("error inserting allocation: %s", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ })
|
|
|
+ })
|
|
|
+
|
|
|
+ // err might be nil
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+// Length returns the length of the range, which is zero if nil
|
|
|
func (asr *AllocationSetRange) Length() int {
|
|
|
if asr == nil || asr.allocations == nil {
|
|
|
return 0
|
|
|
@@ -1428,12 +1707,15 @@ func (asr *AllocationSetRange) Length() int {
|
|
|
return len(asr.allocations)
|
|
|
}
|
|
|
|
|
|
+// MarshalJSON JSON-encodes the range
|
|
|
func (asr *AllocationSetRange) MarshalJSON() ([]byte, error) {
|
|
|
asr.RLock()
|
|
|
asr.RUnlock()
|
|
|
return json.Marshal(asr.allocations)
|
|
|
}
|
|
|
|
|
|
+// Slice copies the underlying slice of AllocationSets, maintaining order,
|
|
|
+// and returns the copied slice.
|
|
|
func (asr *AllocationSetRange) Slice() []*AllocationSet {
|
|
|
if asr == nil || asr.allocations == nil {
|
|
|
return nil
|
|
|
@@ -1456,6 +1738,9 @@ func (asr *AllocationSetRange) String() string {
|
|
|
return fmt.Sprintf("AllocationSetRange{length: %d}", asr.Length())
|
|
|
}
|
|
|
|
|
|
+// UTCOffset returns the detected UTCOffset of the AllocationSets within the
|
|
|
+// range. Defaults to 0 if the range is nil or empty. Does not warn if there
|
|
|
+// are sets with conflicting UTCOffsets (just returns the first).
|
|
|
func (asr *AllocationSetRange) UTCOffset() time.Duration {
|
|
|
if asr.Length() == 0 {
|
|
|
return 0
|