|
|
@@ -24,7 +24,6 @@ import (
|
|
|
"github.com/opencost/opencost/core/pkg/util/timeutil"
|
|
|
"github.com/opencost/opencost/pkg/cloud/models"
|
|
|
"github.com/opencost/opencost/pkg/env"
|
|
|
- "github.com/opencost/opencost/pkg/errors"
|
|
|
"github.com/opencost/opencost/pkg/prom"
|
|
|
"github.com/opencost/opencost/pkg/thanos"
|
|
|
)
|
|
|
@@ -1049,9 +1048,10 @@ func DefaultAggregateQueryOpts() *AggregateQueryOpts {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// TODO deprecate and delete? this is absurd with tenantID...
|
|
|
// ComputeAggregateCostModel computes cost data for the given window, then aggregates it by the given fields.
|
|
|
// Data is cached on two levels: the aggregation is cached as well as the underlying cost data.
|
|
|
-func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, window opencost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error) {
|
|
|
+func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, tenantID string, window opencost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error) {
|
|
|
// Window is the range of the query, i.e. (start, end)
|
|
|
// It must be closed, i.e. neither start nor end can be nil
|
|
|
if window.IsOpen() {
|
|
|
@@ -1384,7 +1384,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
|
|
|
if !ok {
|
|
|
// disable cache and recompute if type cast fails
|
|
|
log.Errorf("ComputeAggregateCostModel: caching error: failed to cast aggregate data to struct: %s", aggKey)
|
|
|
- return a.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
|
|
|
+ return a.ComputeAggregateCostModel(promClient, tenantID, window, field, subfields, opts)
|
|
|
}
|
|
|
return result, fmt.Sprintf("aggregate cache hit: %s", aggKey), nil
|
|
|
}
|
|
|
@@ -1725,162 +1725,12 @@ func GenerateAggKey(window opencost.Window, field string, subfields []string, op
|
|
|
opts.IncludeEfficiency)
|
|
|
}
|
|
|
|
|
|
+// TODO this is deprecated, right? Can I delete??
|
|
|
// Aggregator is capable of computing the aggregated cost model. This is
|
|
|
// a brutal interface, which should be cleaned up, but it's necessary for
|
|
|
// being able to swap in an ETL-backed implementation.
|
|
|
type Aggregator interface {
|
|
|
- ComputeAggregateCostModel(promClient prometheusClient.Client, window opencost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error)
|
|
|
-}
|
|
|
-
|
|
|
-func (a *Accesses) warmAggregateCostModelCache() {
|
|
|
- // Only allow one concurrent cache-warming operation
|
|
|
- sem := util.NewSemaphore(1)
|
|
|
-
|
|
|
- // Set default values, pulling them from application settings where applicable, and warm the cache
|
|
|
- // for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
|
|
|
- // if the default parameters change, the old cached defaults with eventually expire. Thus, the
|
|
|
- // timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
|
|
|
- warmFunc := func(duration, offset time.Duration, cacheEfficiencyData bool) (error, error) {
|
|
|
- if a.ThanosClient != nil {
|
|
|
- duration = thanos.OffsetDuration()
|
|
|
- log.Infof("Setting Offset to %s", duration)
|
|
|
- }
|
|
|
- fmtDuration, fmtOffset := timeutil.DurationOffsetStrings(duration, offset)
|
|
|
- durationHrs, err := timeutil.FormatDurationStringDaysToHours(fmtDuration)
|
|
|
- promClient := a.GetPrometheusClient(true)
|
|
|
-
|
|
|
- windowStr := fmt.Sprintf("%s offset %s", fmtDuration, fmtOffset)
|
|
|
- window, err := opencost.ParseWindowUTC(windowStr)
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("invalid window from window string: %s", windowStr)
|
|
|
- }
|
|
|
-
|
|
|
- field := "namespace"
|
|
|
- subfields := []string{}
|
|
|
-
|
|
|
- aggOpts := DefaultAggregateQueryOpts()
|
|
|
- aggOpts.Rate = ""
|
|
|
- aggOpts.Filters = map[string]string{}
|
|
|
- aggOpts.IncludeTimeSeries = false
|
|
|
- aggOpts.IncludeEfficiency = true
|
|
|
- aggOpts.DisableAggregateCostModelCache = true
|
|
|
- aggOpts.ClearCache = false
|
|
|
- aggOpts.NoCache = false
|
|
|
- aggOpts.NoExpireCache = false
|
|
|
- aggOpts.ShareSplit = SplitTypeWeighted
|
|
|
- aggOpts.RemoteEnabled = env.IsRemoteEnabled()
|
|
|
- aggOpts.AllocateIdle = provider.AllocateIdleByDefault(a.CloudProvider)
|
|
|
-
|
|
|
- sharedNamespaces := provider.SharedNamespaces(a.CloudProvider)
|
|
|
- sharedLabelNames, sharedLabelValues := provider.SharedLabels(a.CloudProvider)
|
|
|
-
|
|
|
- if len(sharedNamespaces) > 0 || len(sharedLabelNames) > 0 {
|
|
|
- aggOpts.SharedResources = NewSharedResourceInfo(true, sharedNamespaces, sharedLabelNames, sharedLabelValues)
|
|
|
- }
|
|
|
-
|
|
|
- aggKey := GenerateAggKey(window, field, subfields, aggOpts)
|
|
|
- log.Infof("aggregation: cache warming defaults: %s", aggKey)
|
|
|
- key := fmt.Sprintf("%s:%s", durationHrs, fmtOffset)
|
|
|
-
|
|
|
- _, _, aggErr := a.ComputeAggregateCostModel(promClient, window, field, subfields, aggOpts)
|
|
|
- if aggErr != nil {
|
|
|
- log.Infof("Error building cache %s: %s", window, aggErr)
|
|
|
- }
|
|
|
-
|
|
|
- totals, err := a.ComputeClusterCosts(promClient, a.CloudProvider, duration, offset, cacheEfficiencyData)
|
|
|
- if err != nil {
|
|
|
- log.Infof("Error building cluster costs cache %s", key)
|
|
|
- }
|
|
|
- maxMinutesWithData := 0.0
|
|
|
- for _, cluster := range totals {
|
|
|
- if cluster.DataMinutes > maxMinutesWithData {
|
|
|
- maxMinutesWithData = cluster.DataMinutes
|
|
|
- }
|
|
|
- }
|
|
|
- if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
|
|
|
- a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(window.Duration()))
|
|
|
- log.Infof("caching %s cluster costs for %s", fmtDuration, a.GetCacheExpiration(window.Duration()))
|
|
|
- } else {
|
|
|
- log.Warnf("not caching %s cluster costs: no data or less than %f minutes data ", fmtDuration, clusterCostsCacheMinutes)
|
|
|
- }
|
|
|
- return aggErr, err
|
|
|
- }
|
|
|
-
|
|
|
- // 1 day
|
|
|
- go func(sem *util.Semaphore) {
|
|
|
- defer errors.HandlePanic()
|
|
|
-
|
|
|
- offset := time.Minute
|
|
|
- duration := 24 * time.Hour
|
|
|
-
|
|
|
- for {
|
|
|
- sem.Acquire()
|
|
|
- warmFunc(duration, offset, true)
|
|
|
- sem.Return()
|
|
|
-
|
|
|
- log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
|
|
|
- time.Sleep(a.GetCacheRefresh(duration))
|
|
|
- }
|
|
|
- }(sem)
|
|
|
-
|
|
|
- if !env.IsETLEnabled() {
|
|
|
- // 2 day
|
|
|
- go func(sem *util.Semaphore) {
|
|
|
- defer errors.HandlePanic()
|
|
|
-
|
|
|
- offset := time.Minute
|
|
|
- duration := 2 * 24 * time.Hour
|
|
|
-
|
|
|
- for {
|
|
|
- sem.Acquire()
|
|
|
- warmFunc(duration, offset, false)
|
|
|
- sem.Return()
|
|
|
-
|
|
|
- log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
|
|
|
- time.Sleep(a.GetCacheRefresh(duration))
|
|
|
- }
|
|
|
- }(sem)
|
|
|
-
|
|
|
- // 7 day
|
|
|
- go func(sem *util.Semaphore) {
|
|
|
- defer errors.HandlePanic()
|
|
|
-
|
|
|
- offset := time.Minute
|
|
|
- duration := 7 * 24 * time.Hour
|
|
|
-
|
|
|
- for {
|
|
|
- sem.Acquire()
|
|
|
- aggErr, err := warmFunc(duration, offset, false)
|
|
|
- sem.Return()
|
|
|
-
|
|
|
- log.Infof("aggregation: warm cache: %s", timeutil.DurationString(duration))
|
|
|
- if aggErr == nil && err == nil {
|
|
|
- time.Sleep(a.GetCacheRefresh(duration))
|
|
|
- } else {
|
|
|
- time.Sleep(5 * time.Minute)
|
|
|
- }
|
|
|
- }
|
|
|
- }(sem)
|
|
|
-
|
|
|
- // 30 day
|
|
|
- go func(sem *util.Semaphore) {
|
|
|
- defer errors.HandlePanic()
|
|
|
-
|
|
|
- for {
|
|
|
- offset := time.Minute
|
|
|
- duration := 30 * 24 * time.Hour
|
|
|
-
|
|
|
- sem.Acquire()
|
|
|
- aggErr, err := warmFunc(duration, offset, false)
|
|
|
- sem.Return()
|
|
|
- if aggErr == nil && err == nil {
|
|
|
- time.Sleep(a.GetCacheRefresh(duration))
|
|
|
- } else {
|
|
|
- time.Sleep(5 * time.Minute)
|
|
|
- }
|
|
|
- }
|
|
|
- }(sem)
|
|
|
- }
|
|
|
+ ComputeAggregateCostModel(promClient prometheusClient.Client, tenantID string, window opencost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error)
|
|
|
}
|
|
|
|
|
|
var (
|
|
|
@@ -1897,11 +1747,19 @@ var (
|
|
|
percentRegex = regexp.MustCompile(`(\d+\.*\d*)%`)
|
|
|
)
|
|
|
|
|
|
+// TODO tenantID for this function is absurd. Can I delete?
|
|
|
// AggregateCostModelHandler handles requests to the aggregated cost model API. See
|
|
|
// ComputeAggregateCostModel for details.
|
|
|
func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
|
|
|
+ // TODO replace with real auth service (token, authenication, etc.)
|
|
|
+ tenantID := r.Header.Get("Tenant")
|
|
|
+ if tenantID == "" {
|
|
|
+ http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
windowStr := r.URL.Query().Get("window")
|
|
|
|
|
|
match := rfc3339Regex.FindStringSubmatch(windowStr)
|
|
|
@@ -2063,7 +1921,7 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
|
|
|
|
|
|
var data map[string]*Aggregation
|
|
|
var message string
|
|
|
- data, message, err = a.AggAPI.ComputeAggregateCostModel(promClient, window, field, subfields, opts)
|
|
|
+ data, message, err = a.AggAPI.ComputeAggregateCostModel(promClient, tenantID, window, field, subfields, opts)
|
|
|
|
|
|
// Find any warnings in http request context
|
|
|
warning, _ := httputil.GetWarning(r)
|