|
|
@@ -885,17 +885,13 @@ const minCostDataLength = 2
|
|
|
// EmptyDataError describes an error caused by empty cost data for some
|
|
|
// defined interval
|
|
|
type EmptyDataError struct {
|
|
|
- err error
|
|
|
- duration string
|
|
|
- offset string
|
|
|
+ err error
|
|
|
+ window kubecost.Window
|
|
|
}
|
|
|
|
|
|
// Error implements the error interface
|
|
|
func (ede *EmptyDataError) Error() string {
|
|
|
- err := fmt.Sprintf("empty data for range: %s", ede.duration)
|
|
|
- if ede.offset != "" {
|
|
|
- err += fmt.Sprintf(" offset %s", ede.offset)
|
|
|
- }
|
|
|
+ err := fmt.Sprintf("empty data for range: %s", ede.window)
|
|
|
if ede.err != nil {
|
|
|
err += fmt.Sprintf(": %s", ede.err)
|
|
|
}
|
|
|
@@ -995,14 +991,99 @@ func compressVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Ve
|
|
|
return compressed
|
|
|
}
|
|
|
|
|
|
+type AggregateQueryOpts struct {
|
|
|
+ Rate string
|
|
|
+ Filters map[string]string
|
|
|
+ SharedResources *SharedResourceInfo
|
|
|
+ ShareSplit string
|
|
|
+ AllocateIdle bool
|
|
|
+ IncludeTimeSeries bool
|
|
|
+ IncludeEfficiency bool
|
|
|
+ DisableCache bool
|
|
|
+ ClearCache bool
|
|
|
+ NoCache bool
|
|
|
+ NoExpireCache bool
|
|
|
+ RemoteEnabled bool
|
|
|
+ DisableSharedOverhead bool
|
|
|
+ UseETLAdapter bool
|
|
|
+}
|
|
|
+
|
|
|
+func DefaultAggregateQueryOpts() *AggregateQueryOpts {
|
|
|
+ return &AggregateQueryOpts{
|
|
|
+ Rate: "",
|
|
|
+ Filters: map[string]string{},
|
|
|
+ SharedResources: nil,
|
|
|
+ ShareSplit: SplitTypeWeighted,
|
|
|
+ AllocateIdle: false,
|
|
|
+ IncludeTimeSeries: true,
|
|
|
+ IncludeEfficiency: true,
|
|
|
+ DisableCache: false,
|
|
|
+ ClearCache: false,
|
|
|
+ NoCache: false,
|
|
|
+ NoExpireCache: false,
|
|
|
+ RemoteEnabled: env.IsRemoteEnabled(),
|
|
|
+ DisableSharedOverhead: false,
|
|
|
+ UseETLAdapter: false,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// ComputeAggregateCostModel computes cost data for the given window, then aggregates it by the given fields.
|
|
|
// Data is cached on two levels: the aggregation is cached as well as the underlying cost data.
|
|
|
-func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, duration, offset, field string, subfields []string, rate string, filters map[string]string, sri *SharedResourceInfo, shared string, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, disableSharedOverhead, useETLAdapter bool) (map[string]*Aggregation, string, error) {
|
|
|
- profileBaseName := fmt.Sprintf("ComputeAggregateCostModel(duration=%s, offet=%s, field=%s)", duration, offset, field)
|
|
|
- defer measureTime(time.Now(), profileThreshold, profileBaseName)
|
|
|
+func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error) {
|
|
|
+ // Window must be closed, i.e. neither start nor end can be nil
|
|
|
+ if window.IsOpen() {
|
|
|
+ return nil, "", fmt.Errorf("illegal window: %s", window)
|
|
|
+ }
|
|
|
|
|
|
- // parse cost data filters into FilterFuncs
|
|
|
- filterFuncs := []FilterFunc{}
|
|
|
+ // Window is the range of the query, i.e. (start, end)
|
|
|
+
|
|
|
+ // Resolution is the duration of each datum in the cost model range query,
|
|
|
+ // which corresponds to both the step size given to Prometheus query_range
|
|
|
+ // and to the window passed to the range queries.
|
|
|
+ // i.e. by default, we support 1h resolution for queries of windows defined
|
|
|
+ // in terms of days or integer multiples of hours (e.g. 1d, 12h)
|
|
|
+
|
|
|
+ // Determine resolution by size of duration and divisibility of window.
|
|
|
+ // By default, resolution is 1hr. If the window is smaller than 1hr, then
|
|
|
+ // resolution goes down to 1m. If the window is not a multiple of 1hr, then
|
|
|
+ // resolution goes down to 1m. If the window is greater than 1d, then
|
|
|
+ // resolution gets scaled up to improve performance by reducing the amount
|
|
|
+ // of data being computed.
|
|
|
+ resolution := time.Hour
|
|
|
+ durMins := int64(math.Trunc(window.Minutes()))
|
|
|
+
|
|
|
+ if durMins < 24*60 { // less than 1d
|
|
|
+ if durMins%60 != 0 { // not divisible by 1h
|
|
|
+ resolution = time.Minute
|
|
|
+ }
|
|
|
+ } else { // greater than 1d
|
|
|
+ if durMins >= 7*24*60 { // greater than (or equal to) 7 days
|
|
|
+ resolution = 24.0 * time.Hour
|
|
|
+ } else if durMins >= 2*24*60 { // greater than (or equal to) 2 days
|
|
|
+ resolution = 3.0 * time.Hour
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Parse options
|
|
|
+ if opts == nil {
|
|
|
+ opts = DefaultAggregateQueryOpts()
|
|
|
+ }
|
|
|
+ rate := opts.Rate
|
|
|
+ filters := opts.Filters
|
|
|
+ sri := opts.SharedResources
|
|
|
+ shared := opts.ShareSplit
|
|
|
+ allocateIdle := opts.AllocateIdle
|
|
|
+ includeTimeSeries := opts.IncludeTimeSeries
|
|
|
+ includeEfficiency := opts.IncludeEfficiency
|
|
|
+ disableCache := opts.DisableCache
|
|
|
+ clearCache := opts.ClearCache
|
|
|
+ noCache := opts.NoCache
|
|
|
+ noExpireCache := opts.NoExpireCache
|
|
|
+ remoteEnabled := opts.RemoteEnabled
|
|
|
+ disableSharedOverhead := opts.DisableSharedOverhead
|
|
|
+
|
|
|
+ // retainFuncs override filterFuncs. Make sure shared resources do not
|
|
|
+ // get filtered out.
|
|
|
retainFuncs := []FilterFunc{}
|
|
|
retainFuncs = append(retainFuncs, func(cd *CostData) (bool, string) {
|
|
|
if sri != nil {
|
|
|
@@ -1010,6 +1091,10 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
}
|
|
|
return false, ""
|
|
|
})
|
|
|
+
|
|
|
+ // Parse cost data filters into FilterFuncs
|
|
|
+ filterFuncs := []FilterFunc{}
|
|
|
+
|
|
|
aggregateEnvironment := func(costDatum *CostData) string {
|
|
|
if field == "cluster" {
|
|
|
return costDatum.ClusterID
|
|
|
@@ -1109,6 +1194,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
return false, aggEnv
|
|
|
})
|
|
|
}
|
|
|
+
|
|
|
if filters["cluster"] != "" {
|
|
|
// clusters may be comma-separated, e.g. cluster-one,cluster-two
|
|
|
// multiple clusters are evaluated as an OR relationship
|
|
|
@@ -1130,6 +1216,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
return false, aggEnv
|
|
|
})
|
|
|
}
|
|
|
+
|
|
|
if filters["labels"] != "" {
|
|
|
// labels are expected to be comma-separated and to take the form key=value
|
|
|
// e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
|
|
|
@@ -1186,101 +1273,55 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
a.CostDataCache.Flush()
|
|
|
}
|
|
|
|
|
|
- cacheExpiry := a.GetCacheExpiration(duration)
|
|
|
+ cacheExpiry := a.GetCacheExpiration(window.Duration())
|
|
|
if noExpireCache {
|
|
|
cacheExpiry = cache.NoExpiration
|
|
|
}
|
|
|
|
|
|
// parametrize cache key by all request parameters
|
|
|
- aggKey := GenerateAggKey(aggKeyParams{
|
|
|
- duration: duration,
|
|
|
- offset: offset,
|
|
|
- filters: filters,
|
|
|
- field: field,
|
|
|
- subfields: subfields,
|
|
|
- rate: rate,
|
|
|
- sri: sri,
|
|
|
- shareType: shared,
|
|
|
- idle: allocateIdle,
|
|
|
- timeSeries: includeTimeSeries,
|
|
|
- efficiency: includeEfficiency,
|
|
|
- })
|
|
|
-
|
|
|
- // convert duration and offset to start and end times
|
|
|
- startTime, endTime, err := ParseTimeRange(duration, offset)
|
|
|
- if err != nil {
|
|
|
- return nil, "", fmt.Errorf("Error parsing duration (%s) and offset (%s): %s", duration, offset, err)
|
|
|
- }
|
|
|
- durationHours := endTime.Sub(*startTime).Hours()
|
|
|
+ aggKey := GenerateAggKey(window, field, subfields, opts)
|
|
|
|
|
|
thanosOffset := time.Now().Add(-thanos.OffsetDuration())
|
|
|
- if a.ThanosClient != nil && endTime.After(thanosOffset) {
|
|
|
+ if a.ThanosClient != nil && window.End().After(thanosOffset) {
|
|
|
klog.V(4).Infof("Setting end time backwards to first present data")
|
|
|
|
|
|
// Apply offsets to both end and start times to maintain correct time range
|
|
|
- deltaDuration := endTime.Sub(thanosOffset)
|
|
|
- *startTime = startTime.Add(-1 * deltaDuration)
|
|
|
- *endTime = time.Now().Add(-thanos.OffsetDuration())
|
|
|
- }
|
|
|
-
|
|
|
- // determine resolution by size of duration
|
|
|
- resolutionHours := durationHours
|
|
|
- if durationHours >= 2160 {
|
|
|
- // 90 days
|
|
|
- resolutionHours = 72.0
|
|
|
- } else if durationHours >= 720 {
|
|
|
- // 30 days
|
|
|
- resolutionHours = 24.0
|
|
|
- } else if durationHours >= 168 {
|
|
|
- // 7 days
|
|
|
- resolutionHours = 24.0
|
|
|
- } else if durationHours >= 48 {
|
|
|
- // 2 days
|
|
|
- resolutionHours = 2.0
|
|
|
- } else if durationHours >= 1 {
|
|
|
- resolutionHours = 1.0
|
|
|
- }
|
|
|
-
|
|
|
- key := fmt.Sprintf(`%s:%s:%fh:%t`, duration, offset, resolutionHours, remoteEnabled)
|
|
|
+ deltaDuration := window.End().Sub(thanosOffset)
|
|
|
+ s := window.Start().Add(-1 * deltaDuration)
|
|
|
+ e := time.Now().Add(-thanos.OffsetDuration())
|
|
|
+ window.Set(&s, &e)
|
|
|
+ }
|
|
|
+
|
|
|
+ key := fmt.Sprintf(`%s:%fh:%t`, window, resolution.Hours(), remoteEnabled)
|
|
|
|
|
|
// report message about which of the two caches hit. by default report a miss
|
|
|
cacheMessage := fmt.Sprintf("L1 cache miss: %s L2 cache miss: %s", aggKey, key)
|
|
|
|
|
|
- log.Infof("Open Source Accesses: AggregateCache=%v", a.AggregateCache)
|
|
|
-
|
|
|
// check the cache for aggregated response; if cache is hit and not disabled, return response
|
|
|
if value, found := a.AggregateCache.Get(aggKey); found && !disableCache && !noCache {
|
|
|
result, ok := value.(map[string]*Aggregation)
|
|
|
if !ok {
|
|
|
// disable cache and recompute if type cast fails
|
|
|
klog.Errorf("caching error: failed to cast aggregate data to struct: %s", aggKey)
|
|
|
- return a.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters, sri, shared, allocateIdle, includeTimeSeries, includeEfficiency, true, false, noExpireCache, noCache, remoteEnabled, disableSharedOverhead, useETLAdapter)
|
|
|
+ return a.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
|
|
|
}
|
|
|
return result, fmt.Sprintf("aggregate cache hit: %s", aggKey), nil
|
|
|
}
|
|
|
|
|
|
- profileStart := time.Now()
|
|
|
- profileName := profileBaseName + ": "
|
|
|
-
|
|
|
- window := duration
|
|
|
- if durationHours >= 1 {
|
|
|
- window = fmt.Sprintf("%dh", int(resolutionHours))
|
|
|
+ if window.Hours() >= 1.0 {
|
|
|
// exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
|
|
|
- *startTime = startTime.Add(time.Duration(resolutionHours) * time.Hour)
|
|
|
+ start := window.Start().Add(resolution)
|
|
|
+ window.Set(&start, window.End())
|
|
|
} else {
|
|
|
// don't cache requests for durations of less than one hour
|
|
|
- klog.Infof("key %s has durationhours %f", key, durationHours)
|
|
|
disableCache = true
|
|
|
}
|
|
|
|
|
|
- profileBaseName = fmt.Sprintf("ComputeAggregateCostModel(duration=%s, offset=%s, field=%s, window=%s)", duration, offset, field, window)
|
|
|
-
|
|
|
// attempt to retrieve cost data from cache
|
|
|
var costData map[string]*CostData
|
|
|
+ var err error
|
|
|
cacheData, found := a.CostDataCache.Get(key)
|
|
|
if found && !disableCache && !noCache {
|
|
|
- profileName += "get cost data from cache"
|
|
|
-
|
|
|
ok := false
|
|
|
costData, ok = cacheData.(map[string]*CostData)
|
|
|
cacheMessage = fmt.Sprintf("L1 cache miss: %s, L2 cost data cache hit: %s", aggKey, key)
|
|
|
@@ -1296,12 +1337,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
klog.V(3).Infof("Cache item: %s", k)
|
|
|
}
|
|
|
|
|
|
- profileName += "compute cost data"
|
|
|
-
|
|
|
- start := startTime.Format(RFC3339Milli)
|
|
|
- end := endTime.Format(RFC3339Milli)
|
|
|
-
|
|
|
- costData, err = a.Model.ComputeCostDataRange(promClient, a.CloudProvider, start, end, window, resolutionHours, "", "", remoteEnabled, offset)
|
|
|
+ costData, err = a.Model.ComputeCostDataRange(promClient, a.CloudProvider, window, resolution, "", "", remoteEnabled)
|
|
|
if err != nil {
|
|
|
if prom.IsErrorCollection(err) {
|
|
|
return nil, "", err
|
|
|
@@ -1310,7 +1346,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
return nil, "", pce
|
|
|
}
|
|
|
if strings.Contains(err.Error(), "data is empty") {
|
|
|
- return nil, "", &EmptyDataError{err: err, duration: duration, offset: offset}
|
|
|
+ return nil, "", &EmptyDataError{err: err, window: window}
|
|
|
}
|
|
|
return nil, "", err
|
|
|
}
|
|
|
@@ -1319,13 +1355,13 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
// aggregates and cache if the length is sufficiently high
|
|
|
costDataLen := costDataTimeSeriesLength(costData)
|
|
|
|
|
|
- if durationHours < 1.0 {
|
|
|
+ if window.Hours() < 1.0 {
|
|
|
// scale hourly cost data down to fractional hour
|
|
|
- costData = ScaleHourlyCostData(costData, resolutionHours)
|
|
|
+ costData = ScaleHourlyCostData(costData, resolution.Hours())
|
|
|
}
|
|
|
|
|
|
if costDataLen == 0 {
|
|
|
- return nil, "", &EmptyDataError{duration: duration, offset: offset}
|
|
|
+ return nil, "", &EmptyDataError{window: window}
|
|
|
}
|
|
|
if costDataLen >= minCostDataLength && !noCache {
|
|
|
klog.Infof("Setting L2 cache: %s", key)
|
|
|
@@ -1333,8 +1369,6 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- measureTime(profileStart, profileThreshold, profileName)
|
|
|
-
|
|
|
c, err := a.CloudProvider.GetConfig()
|
|
|
if err != nil {
|
|
|
return nil, "", err
|
|
|
@@ -1352,7 +1386,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
if !disableSharedOverhead {
|
|
|
for key, val := range c.SharedCosts {
|
|
|
cost, err := strconv.ParseFloat(val, 64)
|
|
|
- durationCoefficient := durationHours / util.HoursPerMonth
|
|
|
+ durationCoefficient := window.Hours() / util.HoursPerMonth
|
|
|
if err != nil {
|
|
|
return nil, "", fmt.Errorf("Unable to parse shared cost %s: %s", val, err.Error())
|
|
|
}
|
|
|
@@ -1363,13 +1397,10 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- profileStart = time.Now()
|
|
|
- profileName = profileBaseName + ": compute idle coefficient"
|
|
|
-
|
|
|
idleCoefficients := make(map[string]float64)
|
|
|
if allocateIdle {
|
|
|
- idleDurationCalcHours := durationHours
|
|
|
- if durationHours < 1 {
|
|
|
+ idleDurationCalcHours := window.Hours()
|
|
|
+ if window.Hours() < 1 {
|
|
|
idleDurationCalcHours = 1
|
|
|
}
|
|
|
windowStr := fmt.Sprintf("%dh", int(idleDurationCalcHours))
|
|
|
@@ -1392,28 +1423,18 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
totalContainerCost = GetTotalContainerCost(costData, rate, a.CloudProvider, discount, customDiscount, idleCoefficients)
|
|
|
}
|
|
|
|
|
|
- measureTime(profileStart, profileThreshold, profileName)
|
|
|
-
|
|
|
- profileStart = time.Now()
|
|
|
- profileName = profileBaseName + ": filter cost data"
|
|
|
-
|
|
|
// filter cost data by namespace and cluster after caching for maximal cache hits
|
|
|
costData, filteredContainerCount, filteredEnvironments := FilterCostData(costData, retainFuncs, filterFuncs)
|
|
|
|
|
|
- measureTime(profileStart, profileThreshold, profileName)
|
|
|
-
|
|
|
- profileStart = time.Now()
|
|
|
- profileName = profileBaseName + ": aggregate cost data"
|
|
|
-
|
|
|
// aggregate cost model data by given fields and cache the result for the default expiration
|
|
|
- opts := &AggregationOptions{
|
|
|
+ aggOpts := &AggregationOptions{
|
|
|
Discount: discount,
|
|
|
CustomDiscount: customDiscount,
|
|
|
IdleCoefficients: idleCoefficients,
|
|
|
IncludeEfficiency: includeEfficiency,
|
|
|
IncludeTimeSeries: includeTimeSeries,
|
|
|
Rate: rate,
|
|
|
- ResolutionHours: resolutionHours,
|
|
|
+ ResolutionHours: resolution.Hours(),
|
|
|
SharedResourceInfo: sri,
|
|
|
SharedCosts: sc,
|
|
|
FilteredContainerCount: filteredContainerCount,
|
|
|
@@ -1421,29 +1442,27 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
TotalContainerCost: totalContainerCost,
|
|
|
SharedSplit: shared,
|
|
|
}
|
|
|
- result := AggregateCostData(costData, field, subfields, a.CloudProvider, opts)
|
|
|
+ result := AggregateCostData(costData, field, subfields, a.CloudProvider, aggOpts)
|
|
|
|
|
|
// If sending time series data back, switch scale back to hourly data. At this point,
|
|
|
// resolutionHours may have converted our hourly data to more- or less-than hourly data.
|
|
|
if includeTimeSeries {
|
|
|
for _, aggs := range result {
|
|
|
- ScaleAggregationTimeSeries(aggs, resolutionHours)
|
|
|
+ ScaleAggregationTimeSeries(aggs, resolution.Hours())
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// compute length of the time series in the cost data and only cache
|
|
|
// aggregation results if the length is sufficiently high
|
|
|
costDataLen := costDataTimeSeriesLength(costData)
|
|
|
- if costDataLen >= minCostDataLength && durationHours > 1 && !noCache {
|
|
|
+ if costDataLen >= minCostDataLength && window.Hours() > 1.0 && !noCache {
|
|
|
// Set the result map (rather than a pointer to it) because map is a reference type
|
|
|
klog.Infof("Caching key in aggregate cache: %s", key)
|
|
|
a.AggregateCache.Set(aggKey, result, cacheExpiry)
|
|
|
} else {
|
|
|
- klog.Infof("Not caching for key %s. Not enough data: %t, Duration less than 1h: %t, noCache: %t", key, costDataLen < minCostDataLength, durationHours < 1, noCache)
|
|
|
+ klog.Infof("Not caching for key %s. Not enough data: %t, Duration less than 1h: %t, noCache: %t", key, costDataLen < minCostDataLength, window.Hours() < 1, noCache)
|
|
|
}
|
|
|
|
|
|
- measureTime(profileStart, profileThreshold, profileName)
|
|
|
-
|
|
|
return result, cacheMessage, nil
|
|
|
}
|
|
|
|
|
|
@@ -1521,10 +1540,14 @@ type aggKeyParams struct {
|
|
|
}
|
|
|
|
|
|
// GenerateAggKey generates a parameter-unique key for caching the aggregate cost model
|
|
|
-func GenerateAggKey(ps aggKeyParams) string {
|
|
|
+func GenerateAggKey(window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) string {
|
|
|
+ if opts == nil {
|
|
|
+ opts = DefaultAggregateQueryOpts()
|
|
|
+ }
|
|
|
+
|
|
|
// parse, trim, and sort podprefix filters
|
|
|
podPrefixFilters := []string{}
|
|
|
- if ppfs, ok := ps.filters["podprefix"]; ok && ppfs != "" {
|
|
|
+ if ppfs, ok := opts.Filters["podprefix"]; ok && ppfs != "" {
|
|
|
for _, psf := range strings.Split(ppfs, ",") {
|
|
|
podPrefixFilters = append(podPrefixFilters, strings.TrimSpace(psf))
|
|
|
}
|
|
|
@@ -1534,7 +1557,7 @@ func GenerateAggKey(ps aggKeyParams) string {
|
|
|
|
|
|
// parse, trim, and sort namespace filters
|
|
|
nsFilters := []string{}
|
|
|
- if nsfs, ok := ps.filters["namespace"]; ok && nsfs != "" {
|
|
|
+ if nsfs, ok := opts.Filters["namespace"]; ok && nsfs != "" {
|
|
|
for _, nsf := range strings.Split(nsfs, ",") {
|
|
|
nsFilters = append(nsFilters, strings.TrimSpace(nsf))
|
|
|
}
|
|
|
@@ -1544,7 +1567,7 @@ func GenerateAggKey(ps aggKeyParams) string {
|
|
|
|
|
|
// parse, trim, and sort node filters
|
|
|
nodeFilters := []string{}
|
|
|
- if nodefs, ok := ps.filters["node"]; ok && nodefs != "" {
|
|
|
+ if nodefs, ok := opts.Filters["node"]; ok && nodefs != "" {
|
|
|
for _, nodef := range strings.Split(nodefs, ",") {
|
|
|
nodeFilters = append(nodeFilters, strings.TrimSpace(nodef))
|
|
|
}
|
|
|
@@ -1554,7 +1577,7 @@ func GenerateAggKey(ps aggKeyParams) string {
|
|
|
|
|
|
// parse, trim, and sort cluster filters
|
|
|
cFilters := []string{}
|
|
|
- if cfs, ok := ps.filters["cluster"]; ok && cfs != "" {
|
|
|
+ if cfs, ok := opts.Filters["cluster"]; ok && cfs != "" {
|
|
|
for _, cf := range strings.Split(cfs, ",") {
|
|
|
cFilters = append(cFilters, strings.TrimSpace(cf))
|
|
|
}
|
|
|
@@ -1564,7 +1587,7 @@ func GenerateAggKey(ps aggKeyParams) string {
|
|
|
|
|
|
// parse, trim, and sort label filters
|
|
|
lFilters := []string{}
|
|
|
- if lfs, ok := ps.filters["labels"]; ok && lfs != "" {
|
|
|
+ if lfs, ok := opts.Filters["labels"]; ok && lfs != "" {
|
|
|
for _, lf := range strings.Split(lfs, ",") {
|
|
|
// trim whitespace from the label name and the label value
|
|
|
// of each label name/value pair, then reconstruct
|
|
|
@@ -1585,19 +1608,20 @@ func GenerateAggKey(ps aggKeyParams) string {
|
|
|
|
|
|
filterStr := fmt.Sprintf("%s:%s:%s:%s:%s", nsFilterStr, nodeFilterStr, cFilterStr, lFilterStr, podPrefixFiltersStr)
|
|
|
|
|
|
- sort.Strings(ps.subfields)
|
|
|
- fieldStr := fmt.Sprintf("%s:%s", ps.field, strings.Join(ps.subfields, ","))
|
|
|
+ sort.Strings(subfields)
|
|
|
+ fieldStr := fmt.Sprintf("%s:%s", field, strings.Join(subfields, ","))
|
|
|
|
|
|
- return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t", ps.duration, ps.offset, filterStr, fieldStr, ps.rate,
|
|
|
- ps.sri, ps.shareType, ps.idle, ps.timeSeries, ps.efficiency)
|
|
|
+ // TODO convert window back to 1d
|
|
|
+ return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t", window, filterStr, fieldStr, opts.Rate,
|
|
|
+ opts.SharedResources, opts.ShareSplit, opts.AllocateIdle, opts.IncludeTimeSeries,
|
|
|
+ opts.IncludeEfficiency)
|
|
|
}
|
|
|
|
|
|
// Aggregator is capable of computing the aggregated cost model. This is
|
|
|
// a brutal interface, which should be cleaned up, but it's necessary for
|
|
|
// being able to swap in an ETL-backed implementation.
|
|
|
-// TODO clean up, simplify
|
|
|
type Aggregator interface {
|
|
|
- ComputeAggregateCostModel(promClient prometheusClient.Client, duration, offset, field string, subfields []string, rate string, filters map[string]string, sri *SharedResourceInfo, shared string, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, disableSharedOverhead, useETLAdapter bool) (map[string]*Aggregation, string, error)
|
|
|
+ ComputeAggregateCostModel(promClient prometheusClient.Client, window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error)
|
|
|
}
|
|
|
|
|
|
func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
@@ -1609,52 +1633,44 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
// if the default parameters change, the old cached defaults with eventually expire. Thus, the
|
|
|
// timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
|
|
|
warmFunc := func(duration, durationHrs, offset string, cacheEfficiencyData bool) (error, error) {
|
|
|
+ promClient := a.GetPrometheusClient(true)
|
|
|
+
|
|
|
+ windowStr := fmt.Sprintf("%s offset %s", duration, offset)
|
|
|
+ window, err := kubecost.ParseWindowUTC(windowStr)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("invalid window from window string: %s", windowStr)
|
|
|
+ }
|
|
|
+
|
|
|
field := "namespace"
|
|
|
subfields := []string{}
|
|
|
- rate := ""
|
|
|
- filters := map[string]string{}
|
|
|
- includeTimeSeries := false
|
|
|
- includeEfficiency := true
|
|
|
- disableCache := true
|
|
|
- clearCache := false
|
|
|
- noCache := false
|
|
|
- noExpireCache := false
|
|
|
- remote := true
|
|
|
- shareSplit := "weighted"
|
|
|
- remoteAvailable := env.IsRemoteEnabled()
|
|
|
- remoteEnabled := remote && remoteAvailable
|
|
|
- promClient := a.GetPrometheusClient(remote)
|
|
|
- allocateIdle := cloud.AllocateIdleByDefault(a.CloudProvider)
|
|
|
+
|
|
|
+ aggOpts := DefaultAggregateQueryOpts()
|
|
|
+ aggOpts.Rate = ""
|
|
|
+ aggOpts.Filters = map[string]string{}
|
|
|
+ aggOpts.IncludeTimeSeries = false
|
|
|
+ aggOpts.IncludeEfficiency = true
|
|
|
+ aggOpts.DisableCache = true
|
|
|
+ aggOpts.ClearCache = false
|
|
|
+ aggOpts.NoCache = false
|
|
|
+ aggOpts.NoExpireCache = false
|
|
|
+ aggOpts.ShareSplit = SplitTypeWeighted
|
|
|
+ aggOpts.RemoteEnabled = env.IsRemoteEnabled()
|
|
|
+ aggOpts.AllocateIdle = cloud.AllocateIdleByDefault(a.CloudProvider)
|
|
|
|
|
|
sharedNamespaces := cloud.SharedNamespaces(a.CloudProvider)
|
|
|
sharedLabelNames, sharedLabelValues := cloud.SharedLabels(a.CloudProvider)
|
|
|
|
|
|
- var sri *SharedResourceInfo
|
|
|
if len(sharedNamespaces) > 0 || len(sharedLabelNames) > 0 {
|
|
|
- sri = NewSharedResourceInfo(true, sharedNamespaces, sharedLabelNames, sharedLabelValues)
|
|
|
- }
|
|
|
-
|
|
|
- aggKey := GenerateAggKey(aggKeyParams{
|
|
|
- duration: duration,
|
|
|
- offset: offset,
|
|
|
- filters: filters,
|
|
|
- field: field,
|
|
|
- subfields: subfields,
|
|
|
- rate: rate,
|
|
|
- sri: sri,
|
|
|
- shareType: shareSplit,
|
|
|
- idle: allocateIdle,
|
|
|
- timeSeries: includeTimeSeries,
|
|
|
- efficiency: includeEfficiency,
|
|
|
- })
|
|
|
+ aggOpts.SharedResources = NewSharedResourceInfo(true, sharedNamespaces, sharedLabelNames, sharedLabelValues)
|
|
|
+ }
|
|
|
+
|
|
|
+ aggKey := GenerateAggKey(window, field, subfields, aggOpts)
|
|
|
log.Infof("aggregation: cache warming defaults: %s", aggKey)
|
|
|
key := fmt.Sprintf("%s:%s", durationHrs, offset)
|
|
|
|
|
|
- _, _, aggErr := a.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters,
|
|
|
- sri, shareSplit, allocateIdle, includeTimeSeries, includeEfficiency, disableCache,
|
|
|
- clearCache, noCache, noExpireCache, remoteEnabled, false, false)
|
|
|
+ _, _, aggErr := a.ComputeAggregateCostModel(promClient, window, field, subfields, aggOpts)
|
|
|
if aggErr != nil {
|
|
|
- log.Infof("Error building cache %s: %s", key, aggErr)
|
|
|
+ log.Infof("Error building cache %s: %s", window, aggErr)
|
|
|
}
|
|
|
if a.ThanosClient != nil {
|
|
|
offset = thanos.Offset()
|
|
|
@@ -1671,8 +1687,8 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
}
|
|
|
}
|
|
|
if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
|
|
|
- a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(duration))
|
|
|
- log.Infof("caching %s cluster costs for %s", duration, a.GetCacheExpiration(duration))
|
|
|
+ a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(window.Duration()))
|
|
|
+ log.Infof("caching %s cluster costs for %s", duration, a.GetCacheExpiration(window.Duration()))
|
|
|
} else {
|
|
|
log.Warningf("not caching %s cluster costs: no data or less than %f minutes data ", duration, clusterCostsCacheMinutes)
|
|
|
}
|
|
|
@@ -1683,17 +1699,18 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
go func(sem *util.Semaphore) {
|
|
|
defer errors.HandlePanic()
|
|
|
|
|
|
- for {
|
|
|
- duration := "1d"
|
|
|
- offset := "1m"
|
|
|
- durHrs := "24h"
|
|
|
+ duration := "1d"
|
|
|
+ offset := "1m"
|
|
|
+ durHrs := "24h"
|
|
|
+ dur := 24 * time.Hour
|
|
|
|
|
|
+ for {
|
|
|
sem.Acquire()
|
|
|
warmFunc(duration, durHrs, offset, true)
|
|
|
sem.Return()
|
|
|
|
|
|
log.Infof("aggregation: warm cache: %s", duration)
|
|
|
- time.Sleep(a.GetCacheRefresh(duration))
|
|
|
+ time.Sleep(a.GetCacheRefresh(dur))
|
|
|
}
|
|
|
}(sem)
|
|
|
|
|
|
@@ -1701,17 +1718,18 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
go func(sem *util.Semaphore) {
|
|
|
defer errors.HandlePanic()
|
|
|
|
|
|
- for {
|
|
|
- duration := "2d"
|
|
|
- offset := "1m"
|
|
|
- durHrs := "48h"
|
|
|
+ duration := "2d"
|
|
|
+ offset := "1m"
|
|
|
+ durHrs := "48h"
|
|
|
+ dur := 2 * 24 * time.Hour
|
|
|
|
|
|
+ for {
|
|
|
sem.Acquire()
|
|
|
warmFunc(duration, durHrs, offset, false)
|
|
|
sem.Return()
|
|
|
|
|
|
log.Infof("aggregation: warm cache: %s", duration)
|
|
|
- time.Sleep(a.GetCacheRefresh(duration))
|
|
|
+ time.Sleep(a.GetCacheRefresh(dur))
|
|
|
}
|
|
|
}(sem)
|
|
|
|
|
|
@@ -1720,18 +1738,19 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
go func(sem *util.Semaphore) {
|
|
|
defer errors.HandlePanic()
|
|
|
|
|
|
- for {
|
|
|
- duration := "7d"
|
|
|
- offset := "1m"
|
|
|
- durHrs := "168h"
|
|
|
+ duration := "7d"
|
|
|
+ offset := "1m"
|
|
|
+ durHrs := "168h"
|
|
|
+ dur := 7 * 24 * time.Hour
|
|
|
|
|
|
+ for {
|
|
|
sem.Acquire()
|
|
|
aggErr, err := warmFunc(duration, durHrs, offset, false)
|
|
|
sem.Return()
|
|
|
|
|
|
log.Infof("aggregation: warm cache: %s", duration)
|
|
|
if aggErr == nil && err == nil {
|
|
|
- time.Sleep(a.GetCacheRefresh(duration))
|
|
|
+ time.Sleep(a.GetCacheRefresh(dur))
|
|
|
} else {
|
|
|
time.Sleep(5 * time.Minute)
|
|
|
}
|
|
|
@@ -1746,12 +1765,13 @@ func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
duration := "30d"
|
|
|
offset := "1m"
|
|
|
durHrs := "720h"
|
|
|
+ dur := 30 * 24 * time.Hour
|
|
|
|
|
|
sem.Acquire()
|
|
|
aggErr, err := warmFunc(duration, durHrs, offset, false)
|
|
|
sem.Return()
|
|
|
if aggErr == nil && err == nil {
|
|
|
- time.Sleep(a.GetCacheRefresh(duration))
|
|
|
+ time.Sleep(a.GetCacheRefresh(dur))
|
|
|
} else {
|
|
|
time.Sleep(5 * time.Minute)
|
|
|
}
|
|
|
@@ -1790,14 +1810,13 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
|
|
|
http.Error(w, fmt.Sprintf("invalid window: %s", err), http.StatusBadRequest)
|
|
|
return
|
|
|
}
|
|
|
- duration, offset := window.ToDurationOffset()
|
|
|
|
|
|
durRegex := regexp.MustCompile(`^(\d+)(m|h|d|s)$`)
|
|
|
isDurationStr := durRegex.MatchString(windowStr)
|
|
|
|
|
|
// legacy offset option should override window offset
|
|
|
if r.URL.Query().Get("offset") != "" {
|
|
|
- offset = r.URL.Query().Get("offset")
|
|
|
+ offset := r.URL.Query().Get("offset")
|
|
|
// Shift window by offset, but only when manually set with separate
|
|
|
// parameter and window was provided as a duration string. Otherwise,
|
|
|
// do not alter the (duration, offset) from ParseWindowWithOffset.
|
|
|
@@ -1821,10 +1840,7 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // redirect requests with no offset to a 1m offset to improve cache hits
|
|
|
- if offset == "" {
|
|
|
- offset = "1m"
|
|
|
- }
|
|
|
+ opts := DefaultAggregateQueryOpts()
|
|
|
|
|
|
// parse remaining query parameters
|
|
|
namespace := r.URL.Query().Get("namespace")
|
|
|
@@ -1835,14 +1851,11 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
|
|
|
labelArray[0] = strings.ReplaceAll(labelArray[0], "-", "_")
|
|
|
labels = strings.Join(labelArray, "=")
|
|
|
field := r.URL.Query().Get("aggregation")
|
|
|
- subfieldStr := r.URL.Query().Get("aggregationSubfield")
|
|
|
- rate := r.URL.Query().Get("rate")
|
|
|
- idleFlag := r.URL.Query().Get("allocateIdle")
|
|
|
sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
|
|
|
sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
|
|
|
sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
|
|
|
remote := r.URL.Query().Get("remote") != "false"
|
|
|
- shared := r.URL.Query().Get("sharedSplit")
|
|
|
+ subfieldStr := r.URL.Query().Get("aggregationSubfield")
|
|
|
subfields := []string{}
|
|
|
if len(subfieldStr) > 0 {
|
|
|
s := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
|
|
|
@@ -1851,38 +1864,42 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- var allocateIdle bool
|
|
|
+ idleFlag := r.URL.Query().Get("allocateIdle")
|
|
|
if idleFlag == "default" {
|
|
|
c, _ := a.CloudProvider.GetConfig()
|
|
|
- allocateIdle = (c.DefaultIdle == "true")
|
|
|
+ opts.AllocateIdle = (c.DefaultIdle == "true")
|
|
|
} else {
|
|
|
- allocateIdle = (idleFlag == "true")
|
|
|
+ opts.AllocateIdle = (idleFlag == "true")
|
|
|
}
|
|
|
|
|
|
+ opts.Rate = r.URL.Query().Get("rate")
|
|
|
+
|
|
|
+ opts.ShareSplit = r.URL.Query().Get("sharedSplit")
|
|
|
+
|
|
|
// timeSeries == true maintains the time series dimension of the data,
|
|
|
// which by default gets summed over the entire interval
|
|
|
- includeTimeSeries := r.URL.Query().Get("timeSeries") == "true"
|
|
|
-
|
|
|
- // efficiency == true aggregates and returns usage and efficiency data
|
|
|
- // includeEfficiency := r.URL.Query().Get("efficiency") == "true"
|
|
|
+ opts.IncludeTimeSeries = r.URL.Query().Get("timeSeries") == "true"
|
|
|
|
|
|
// efficiency has been deprecated in favor of a default to always send efficiency
|
|
|
- includeEfficiency := true
|
|
|
+ opts.IncludeEfficiency = true
|
|
|
|
|
|
// TODO niko/caching rename "recomputeCache"
|
|
|
// disableCache, if set to "true", tells this function to recompute and
|
|
|
// cache the requested data
|
|
|
- disableCache := r.URL.Query().Get("disableCache") == "true"
|
|
|
+ opts.DisableCache = r.URL.Query().Get("disableCache") == "true"
|
|
|
|
|
|
// clearCache, if set to "true", tells this function to flush the cache,
|
|
|
// then recompute and cache the requested data
|
|
|
- clearCache := r.URL.Query().Get("clearCache") == "true"
|
|
|
+ opts.ClearCache = r.URL.Query().Get("clearCache") == "true"
|
|
|
|
|
|
// noCache avoids the cache altogether, both reading from and writing to
|
|
|
- noCache := r.URL.Query().Get("noCache") == "true"
|
|
|
+ opts.NoCache = r.URL.Query().Get("noCache") == "true"
|
|
|
|
|
|
// noExpireCache should only be used by cache warming to set non-expiring caches
|
|
|
- noExpireCache := false
|
|
|
+ opts.NoExpireCache = false
|
|
|
+
|
|
|
+ // etl triggers ETL adapter
|
|
|
+ opts.UseETLAdapter = r.URL.Query().Get("etl") == "true"
|
|
|
|
|
|
// aggregation field is required
|
|
|
if field == "" {
|
|
|
@@ -1897,7 +1914,7 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
|
|
|
}
|
|
|
|
|
|
// enforce one of four available rate options
|
|
|
- if rate != "" && rate != "hourly" && rate != "daily" && rate != "monthly" {
|
|
|
+ if opts.Rate != "" && opts.Rate != "hourly" && opts.Rate != "daily" && opts.Rate != "monthly" {
|
|
|
http.Error(w, "If set, rate parameter must be one of: 'hourly', 'daily', 'monthly'", http.StatusBadRequest)
|
|
|
return
|
|
|
}
|
|
|
@@ -1906,7 +1923,7 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
|
|
|
// namespace and cluster are exact-string-matches
|
|
|
// labels are expected to be comma-separated and to take the form key=value
|
|
|
// e.g. app=cost-analyzer,app.kubernetes.io/instance=kubecost
|
|
|
- filters := map[string]string{
|
|
|
+ opts.Filters = map[string]string{
|
|
|
"namespace": namespace,
|
|
|
"cluster": cluster,
|
|
|
"labels": labels,
|
|
|
@@ -1928,21 +1945,18 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
- var sr *SharedResourceInfo
|
|
|
if len(sn) > 0 || len(sln) > 0 {
|
|
|
- sr = NewSharedResourceInfo(true, sn, sln, slv)
|
|
|
+ opts.SharedResources = NewSharedResourceInfo(true, sn, sln, slv)
|
|
|
}
|
|
|
|
|
|
// enable remote if it is available and not disabled
|
|
|
- remoteEnabled := remote && env.IsRemoteEnabled()
|
|
|
+ opts.RemoteEnabled = remote && env.IsRemoteEnabled()
|
|
|
|
|
|
promClient := a.GetPrometheusClient(remote)
|
|
|
|
|
|
- useETLAdapter := r.URL.Query().Get("etl") == "true"
|
|
|
-
|
|
|
var data map[string]*Aggregation
|
|
|
var message string
|
|
|
- data, message, err = a.AggAPI.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters, sr, shared, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, false, useETLAdapter)
|
|
|
+ data, message, err = a.AggAPI.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
|
|
|
|
|
|
// Find any warnings in http request context
|
|
|
warning, _ := util.GetWarning(r)
|