|
|
@@ -111,6 +111,8 @@ func (a *Allocation) Clone() *Allocation {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// Equal returns true if the values held in the given Allocation precisely
|
|
|
+// match those of the receiving Allocation. nil does not match nil.
|
|
|
func (a *Allocation) Equal(that *Allocation) bool {
|
|
|
if a == nil || that == nil {
|
|
|
return false
|
|
|
@@ -995,8 +997,6 @@ func (alloc *Allocation) generateKey(properties Properties) (string, error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // TODO aggregate by annotation
|
|
|
-
|
|
|
return strings.Join(names, "/"), nil
|
|
|
}
|
|
|
|
|
|
@@ -1032,6 +1032,115 @@ func (as *AllocationSet) Clone() *AllocationSet {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// ComputeIdleAllocations computes the idle allocations for the AllocationSet,
|
|
|
+// given a set of Assets. Ideally, assetSet should contain only Nodes, but if
|
|
|
+// it contains other Assets, they will be ignored; only CPU, GPU and RAM are
|
|
|
+// considered for idle allocation. One idle allocation per-cluster will be
|
|
|
+// computed and returned, keyed by cluster_id.
|
|
|
+func (as *AllocationSet) ComputeIdleAllocations(assetSet *AssetSet) (map[string]*Allocation, error) {
|
|
|
+ if as == nil {
|
|
|
+ return nil, fmt.Errorf("cannot compute idle allocation for nil AllocationSet")
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO niko/allocation-etl remove after testing and benchmarking
|
|
|
+ profStart := time.Now()
|
|
|
+ defer log.Profile(profStart, fmt.Sprintf("ComputeIdleAllocations: %s", as.Window))
|
|
|
+
|
|
|
+ if assetSet == nil {
|
|
|
+ return nil, fmt.Errorf("cannot compute idle allocation with nil AssetSet")
|
|
|
+ }
|
|
|
+
|
|
|
+ if !as.Window.Equal(assetSet.Window) {
|
|
|
+ return nil, fmt.Errorf("cannot compute idle allocation for sets with mismatched windows: %s != %s", as.Window, assetSet.Window)
|
|
|
+ }
|
|
|
+
|
|
|
+ window := as.Window
|
|
|
+
|
|
|
+ // Build a map of cumulative cluster asset costs, per resource; i.e.
|
|
|
+ // cluster-to-{cpu|gpu|ram}-to-cost.
|
|
|
+ assetClusterResourceCosts := map[string]map[string]float64{}
|
|
|
+ assetSet.Each(func(key string, a Asset) {
|
|
|
+ if node, ok := a.(*Node); ok {
|
|
|
+ if _, ok := assetClusterResourceCosts[node.Properties().Cluster]; !ok {
|
|
|
+ assetClusterResourceCosts[node.Properties().Cluster] = map[string]float64{}
|
|
|
+ }
|
|
|
+ assetClusterResourceCosts[node.Properties().Cluster]["cpu"] += node.CPUCost * (1.0 - node.Discount)
|
|
|
+ assetClusterResourceCosts[node.Properties().Cluster]["gpu"] += node.GPUCost * (1.0 - node.Discount)
|
|
|
+ assetClusterResourceCosts[node.Properties().Cluster]["ram"] += node.RAMCost * (1.0 - node.Discount)
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ // Determine start, end on a per-cluster basis
|
|
|
+ clusterStarts := map[string]time.Time{}
|
|
|
+ clusterEnds := map[string]time.Time{}
|
|
|
+
|
|
|
+ // Subtract allocated costs from asset costs, leaving only the remaining
|
|
|
+ // idle costs.
|
|
|
+ as.Each(func(name string, a *Allocation) {
|
|
|
+ cluster, err := a.Properties.GetCluster()
|
|
|
+ if err != nil {
|
|
|
+ // Failed to find allocation's cluster
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, ok := assetClusterResourceCosts[cluster]; !ok {
|
|
|
+ // Failed to find assets for allocation's cluster
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Set cluster (start, end) if they are either not currently set,
|
|
|
+ // or if the detected (start, end) of the current allocation falls
|
|
|
+ // before or after, respectively, the current values.
|
|
|
+ if s, ok := clusterStarts[cluster]; !ok || a.Start.Before(s) {
|
|
|
+ clusterStarts[cluster] = a.Start
|
|
|
+ }
|
|
|
+ if e, ok := clusterEnds[cluster]; !ok || a.End.Before(e) {
|
|
|
+ clusterEnds[cluster] = a.End
|
|
|
+ }
|
|
|
+
|
|
|
+ assetClusterResourceCosts[cluster]["cpu"] -= a.CPUCost
|
|
|
+ assetClusterResourceCosts[cluster]["gpu"] -= a.GPUCost
|
|
|
+ assetClusterResourceCosts[cluster]["ram"] -= a.RAMCost
|
|
|
+ })
|
|
|
+
|
|
|
+ // Turn remaining un-allocated asset costs into idle allocations
|
|
|
+ idleAllocs := map[string]*Allocation{}
|
|
|
+ for cluster, resources := range assetClusterResourceCosts {
|
|
|
+ // Default start and end to the (start, end) of the given window, but
|
|
|
+ // use the actual, detected (start, end) pair if they are available.
|
|
|
+ start := *window.Start()
|
|
|
+ if s, ok := clusterStarts[cluster]; ok && window.Contains(s) {
|
|
|
+ start = s
|
|
|
+ }
|
|
|
+ end := *window.End()
|
|
|
+ if e, ok := clusterEnds[cluster]; ok && window.Contains(e) {
|
|
|
+ end = e
|
|
|
+ }
|
|
|
+
|
|
|
+ idleAlloc := &Allocation{
|
|
|
+ Name: fmt.Sprintf("%s/%s", cluster, IdleSuffix),
|
|
|
+ Properties: Properties{ClusterProp: cluster},
|
|
|
+ Start: start,
|
|
|
+ End: end,
|
|
|
+ Minutes: end.Sub(start).Minutes(), // TODO deprecate w/ niko/allocation-minutes
|
|
|
+ CPUCost: resources["cpu"],
|
|
|
+ GPUCost: resources["gpu"],
|
|
|
+ RAMCost: resources["ram"],
|
|
|
+ }
|
|
|
+ idleAlloc.TotalCost = idleAlloc.CPUCost + idleAlloc.GPUCost + idleAlloc.RAMCost
|
|
|
+
|
|
|
+ // Do not continue if multiple idle allocations are computed for a
|
|
|
+ // single cluster.
|
|
|
+ if _, ok := idleAllocs[cluster]; ok {
|
|
|
+ return nil, fmt.Errorf("duplicate idle allocations for cluster %s", cluster)
|
|
|
+ }
|
|
|
+
|
|
|
+ idleAllocs[cluster] = idleAlloc
|
|
|
+ }
|
|
|
+
|
|
|
+ return idleAllocs, nil
|
|
|
+}
|
|
|
+
|
|
|
// Delete removes the allocation with the given name from the set
|
|
|
func (as *AllocationSet) Delete(name string) {
|
|
|
if as == nil {
|
|
|
@@ -1179,6 +1288,8 @@ func (as *AllocationSet) Resolution() time.Duration {
|
|
|
return as.Window.Duration()
|
|
|
}
|
|
|
|
|
|
+// Set uses the given Allocation to overwrite the existing entry in the
|
|
|
+// AllocationSet under the Allocation's name.
|
|
|
func (as *AllocationSet) Set(alloc *Allocation) error {
|
|
|
if as.IsEmpty() {
|
|
|
as.Lock()
|
|
|
@@ -1238,6 +1349,7 @@ func (as *AllocationSet) TotalCost() float64 {
|
|
|
return tc
|
|
|
}
|
|
|
|
|
|
+// UTCOffset returns the AllocationSet's configured UTCOffset.
|
|
|
func (as *AllocationSet) UTCOffset() time.Duration {
|
|
|
_, zone := as.Start().Zone()
|
|
|
return time.Duration(zone) * time.Second
|
|
|
@@ -1305,11 +1417,17 @@ func (as *AllocationSet) accumulate(that *AllocationSet) (*AllocationSet, error)
|
|
|
return acc, nil
|
|
|
}
|
|
|
|
|
|
+// AllocationSetRange is a thread-safe slice of AllocationSets. It is meant to
|
|
|
+// be used such that the AllocationSets held are consecutive and coherent with
|
|
|
+// respect to using the same aggregation properties, UTC offset, and
|
|
|
+// resolution. However these rules are not necessarily enforced, so use wisely.
|
|
|
type AllocationSetRange struct {
|
|
|
sync.RWMutex
|
|
|
allocations []*AllocationSet
|
|
|
}
|
|
|
|
|
|
+// NewAllocationSetRange instantiates a new range composed of the given
|
|
|
+// AllocationSets in the order provided.
|
|
|
func NewAllocationSetRange(allocs ...*AllocationSet) *AllocationSetRange {
|
|
|
return &AllocationSetRange{
|
|
|
allocations: allocs,
|
|
|
@@ -1338,6 +1456,8 @@ func (asr *AllocationSetRange) Accumulate() (*AllocationSet, error) {
|
|
|
// TODO niko/etl accumulate into lower-resolution chunks of the given resolution
|
|
|
// func (asr *AllocationSetRange) AccumulateBy(resolution time.Duration) *AllocationSetRange
|
|
|
|
|
|
+// AggregateBy aggregates each AllocationSet in the range by the given
|
|
|
+// properties and options.
|
|
|
func (asr *AllocationSetRange) AggregateBy(properties Properties, options *AllocationAggregationOptions) error {
|
|
|
aggRange := &AllocationSetRange{allocations: []*AllocationSet{}}
|
|
|
|
|
|
@@ -1357,6 +1477,8 @@ func (asr *AllocationSetRange) AggregateBy(properties Properties, options *Alloc
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+// Append appends the given AllocationSet to the end of the range. It does not
|
|
|
+// validate whether or not that violates window continuity.
|
|
|
func (asr *AllocationSetRange) Append(that *AllocationSet) {
|
|
|
asr.Lock()
|
|
|
defer asr.Unlock()
|
|
|
@@ -1374,6 +1496,7 @@ func (asr *AllocationSetRange) Each(f func(int, *AllocationSet)) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// Get retrieves the AllocationSet at the given index of the range.
|
|
|
func (asr *AllocationSetRange) Get(i int) (*AllocationSet, error) {
|
|
|
if i < 0 || i >= len(asr.allocations) {
|
|
|
return nil, fmt.Errorf("AllocationSetRange: index out of range: %d", i)
|
|
|
@@ -1441,6 +1564,7 @@ func (asr *AllocationSetRange) InsertRange(that *AllocationSetRange) error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+// Length returns the length of the range, which is zero if nil
|
|
|
func (asr *AllocationSetRange) Length() int {
|
|
|
if asr == nil || asr.allocations == nil {
|
|
|
return 0
|
|
|
@@ -1451,12 +1575,15 @@ func (asr *AllocationSetRange) Length() int {
|
|
|
return len(asr.allocations)
|
|
|
}
|
|
|
|
|
|
+// MarshalJSON JSON-encodes the range
|
|
|
func (asr *AllocationSetRange) MarshalJSON() ([]byte, error) {
|
|
|
asr.RLock()
|
|
|
asr.RUnlock()
|
|
|
return json.Marshal(asr.allocations)
|
|
|
}
|
|
|
|
|
|
+// Slice copies the underlying slice of AllocationSets, maintaining order,
|
|
|
+// and returns the copied slice.
|
|
|
func (asr *AllocationSetRange) Slice() []*AllocationSet {
|
|
|
if asr == nil || asr.allocations == nil {
|
|
|
return nil
|
|
|
@@ -1479,6 +1606,9 @@ func (asr *AllocationSetRange) String() string {
|
|
|
return fmt.Sprintf("AllocationSetRange{length: %d}", asr.Length())
|
|
|
}
|
|
|
|
|
|
+// UTCOffset returns the detected UTCOffset of the AllocationSets within the
|
|
|
+// range. Defaults to 0 if the range is nil or empty. Does not warn if there
|
|
|
+// are sets with conflicting UTCOffsets (just returns the first).
|
|
|
func (asr *AllocationSetRange) UTCOffset() time.Duration {
|
|
|
if asr.Length() == 0 {
|
|
|
return 0
|