|
|
@@ -196,7 +196,7 @@ func GetTotalContainerCost(costData map[string]*CostData, rate string, cp cloud.
|
|
|
return totalContainerCost
|
|
|
}
|
|
|
|
|
|
-func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, customDiscount float64, windowString, offset string) (map[string]float64, error) {
|
|
|
+func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, customDiscount float64, window, offset time.Duration) (map[string]float64, error) {
|
|
|
coefficients := make(map[string]float64)
|
|
|
|
|
|
profileName := "ComputeIdleCoefficient: ComputeClusterCosts"
|
|
|
@@ -204,12 +204,12 @@ func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli pro
|
|
|
|
|
|
var clusterCosts map[string]*ClusterCosts
|
|
|
var err error
|
|
|
-
|
|
|
- key := fmt.Sprintf("%s:%s", windowString, offset)
|
|
|
+ fmtWindow, fmtOffset := timeutil.DurationOffsetStrings(window, offset)
|
|
|
+ key := fmt.Sprintf("%s:%s", fmtWindow, fmtOffset)
|
|
|
if data, valid := a.ClusterCostsCache.Get(key); valid {
|
|
|
clusterCosts = data.(map[string]*ClusterCosts)
|
|
|
} else {
|
|
|
- clusterCosts, err = a.ComputeClusterCosts(cli, cp, windowString, offset, false)
|
|
|
+ clusterCosts, err = a.ComputeClusterCosts(cli, cp, window, offset, false)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -225,7 +225,7 @@ func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, cli pro
|
|
|
}
|
|
|
|
|
|
if costs.TotalCumulative == 0 {
|
|
|
- return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, windowString, offset)
|
|
|
+ return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, fmtWindow, fmtOffset)
|
|
|
}
|
|
|
|
|
|
totalContainerCost := 0.0
|
|
|
@@ -1492,11 +1492,9 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Convert to Prometheus-compatible strings
|
|
|
- durStr, offStr := timeutil.DurationOffsetStrings(dur, off)
|
|
|
-
|
|
|
- idleCoefficients, err = a.ComputeIdleCoefficient(costData, promClient, a.CloudProvider, discount, customDiscount, durStr, offStr)
|
|
|
+ idleCoefficients, err = a.ComputeIdleCoefficient(costData, promClient, a.CloudProvider, discount, customDiscount, dur, off)
|
|
|
if err != nil {
|
|
|
+ durStr, offStr := timeutil.DurationOffsetStrings(dur, off)
|
|
|
log.Errorf("ComputeAggregateCostModel: error computing idle coefficient: duration=%s, offset=%s, err=%s", durStr, offStr, err)
|
|
|
return nil, "", err
|
|
|
}
|
|
|
@@ -1744,10 +1742,16 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
// for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
|
|
|
// if the default parameters change, the old cached defaults with eventually expire. Thus, the
|
|
|
// timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
|
|
|
- warmFunc := func(duration, durationHrs, offset string, cacheEfficiencyData bool) (error, error) {
|
|
|
+ warmFunc := func(duration, offset time.Duration, cacheEfficiencyData bool) (error, error) {
|
|
|
+ if a.ThanosClient != nil {
|
|
|
+ duration = thanos.OffsetDuration()
|
|
|
+ log.Infof("Setting Offset to %s", duration)
|
|
|
+ }
|
|
|
+ fmtDuration, fmtOffset := timeutil.DurationOffsetStrings(duration, offset)
|
|
|
+ durationHrs, err := timeutil.DayDurationToHourDuration(fmtDuration)
|
|
|
promClient := a.GetPrometheusClient(true)
|
|
|
|
|
|
- windowStr := fmt.Sprintf("%s offset %s", duration, offset)
|
|
|
+ windowStr := fmt.Sprintf("%s fmtOffset %s", fmtDuration, fmtOffset)
|
|
|
window, err := kubecost.ParseWindowUTC(windowStr)
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("invalid window from window string: %s", windowStr)
|
|
|
@@ -1778,17 +1782,15 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
|
|
|
aggKey := GenerateAggKey(window, field, subfields, aggOpts)
|
|
|
log.Infof("aggregation: cache warming defaults: %s", aggKey)
|
|
|
- key := fmt.Sprintf("%s:%s", durationHrs, offset)
|
|
|
+ key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
|
|
|
|
|
|
_, _, aggErr := a.ComputeAggregateCostModel(promClient, window, field, subfields, aggOpts)
|
|
|
if aggErr != nil {
|
|
|
log.Infof("Error building cache %s: %s", window, aggErr)
|
|
|
}
|
|
|
- if a.ThanosClient != nil {
|
|
|
- offset = thanos.Offset()
|
|
|
- log.Infof("Setting offset to %s", offset)
|
|
|
- }
|
|
|
- totals, err := a.ComputeClusterCosts(promClient, a.CloudProvider, durationHrs, offset, cacheEfficiencyData)
|
|
|
+
|
|
|
+
|
|
|
+ totals, err := a.ComputeClusterCosts(promClient, a.CloudProvider, duration, offset, cacheEfficiencyData)
|
|
|
if err != nil {
|
|
|
log.Infof("Error building cluster costs cache %s", key)
|
|
|
}
|
|
|
@@ -1800,9 +1802,9 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
}
|
|
|
if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
|
|
|
a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(window.Duration()))
|
|
|
- log.Infof("caching %s cluster costs for %s", duration, a.GetCacheExpiration(window.Duration()))
|
|
|
+ log.Infof("caching %s cluster costs for %s", fmtDuration, a.GetCacheExpiration(window.Duration()))
|
|
|
} else {
|
|
|
- log.Warningf("not caching %s cluster costs: no data or less than %f minutes data ", duration, clusterCostsCacheMinutes)
|
|
|
+ log.Warningf("not caching %s cluster costs: no data or less than %f minutes data ", fmtDuration, clusterCostsCacheMinutes)
|
|
|
}
|
|
|
return aggErr, err
|
|
|
}
|
|
|
@@ -1811,18 +1813,16 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
go func(sem *util.Semaphore) {
|
|
|
defer errors.HandlePanic()
|
|
|
|
|
|
- duration := "1d"
|
|
|
- offset := "1m"
|
|
|
- durHrs := "24h"
|
|
|
- dur := 24 * time.Hour
|
|
|
+ offset := time.Minute
|
|
|
+ duration := 24 * time.Hour
|
|
|
|
|
|
for {
|
|
|
sem.Acquire()
|
|
|
- warmFunc(duration, durHrs, offset, true)
|
|
|
+ warmFunc(duration, offset, true)
|
|
|
sem.Return()
|
|
|
|
|
|
- log.Infof("aggregation: warm cache: %s", duration)
|
|
|
- time.Sleep(a.GetCacheRefresh(dur))
|
|
|
+ log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
|
|
|
+ time.Sleep(a.GetCacheRefresh(duration))
|
|
|
}
|
|
|
}(sem)
|
|
|
|
|
|
@@ -1831,18 +1831,16 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
go func(sem *util.Semaphore) {
|
|
|
defer errors.HandlePanic()
|
|
|
|
|
|
- duration := "2d"
|
|
|
- offset := "1m"
|
|
|
- durHrs := "48h"
|
|
|
- dur := 2 * 24 * time.Hour
|
|
|
+ offset := time.Minute
|
|
|
+ duration := 2 * 24 * time.Hour
|
|
|
|
|
|
for {
|
|
|
sem.Acquire()
|
|
|
- warmFunc(duration, durHrs, offset, false)
|
|
|
+ warmFunc(duration, offset, false)
|
|
|
sem.Return()
|
|
|
|
|
|
- log.Infof("aggregation: warm cache: %s", duration)
|
|
|
- time.Sleep(a.GetCacheRefresh(dur))
|
|
|
+ log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
|
|
|
+ time.Sleep(a.GetCacheRefresh(duration))
|
|
|
}
|
|
|
}(sem)
|
|
|
|
|
|
@@ -1850,19 +1848,17 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
go func(sem *util.Semaphore) {
|
|
|
defer errors.HandlePanic()
|
|
|
|
|
|
- duration := "7d"
|
|
|
- offset := "1m"
|
|
|
- durHrs := "168h"
|
|
|
- dur := 7 * 24 * time.Hour
|
|
|
+ offset := time.Minute
|
|
|
+ duration := 7 * 24 * time.Hour
|
|
|
|
|
|
for {
|
|
|
sem.Acquire()
|
|
|
- aggErr, err := warmFunc(duration, durHrs, offset, false)
|
|
|
+ aggErr, err := warmFunc(duration, offset, false)
|
|
|
sem.Return()
|
|
|
|
|
|
- log.Infof("aggregation: warm cache: %s", duration)
|
|
|
+ log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
|
|
|
if aggErr == nil && err == nil {
|
|
|
- time.Sleep(a.GetCacheRefresh(dur))
|
|
|
+ time.Sleep(a.GetCacheRefresh(duration))
|
|
|
} else {
|
|
|
time.Sleep(5 * time.Minute)
|
|
|
}
|
|
|
@@ -1874,16 +1870,14 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
defer errors.HandlePanic()
|
|
|
|
|
|
for {
|
|
|
- duration := "30d"
|
|
|
- offset := "1m"
|
|
|
- durHrs := "720h"
|
|
|
- dur := 30 * 24 * time.Hour
|
|
|
+ offset := time.Minute
|
|
|
+ duration := 30 * 24 * time.Hour
|
|
|
|
|
|
sem.Acquire()
|
|
|
- aggErr, err := warmFunc(duration, durHrs, offset, false)
|
|
|
+ aggErr, err := warmFunc(duration, offset, false)
|
|
|
sem.Return()
|
|
|
if aggErr == nil && err == nil {
|
|
|
- time.Sleep(a.GetCacheRefresh(dur))
|
|
|
+ time.Sleep(a.GetCacheRefresh(duration))
|
|
|
} else {
|
|
|
time.Sleep(5 * time.Minute)
|
|
|
}
|