Просмотр исходного кода

Merge branch 'master' into Bolt-add-requests-usage-back

Matt Bolt 6 лет назад
Родитель
Сommit
70cb043e18
3 измененных файлов с 23 добавлено и 282 удалено
  1. 20 35
      costmodel/aggregations.go
  2. 1 247
      costmodel/router.go
  3. 2 0
      main.go

+ 20 - 35
costmodel/aggregations.go

@@ -125,7 +125,7 @@ func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, label
 	return sr
 }
 
-func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, windowString, offset, resolution string) (map[string]float64, error) {
+func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, windowString, offset string) (map[string]float64, error) {
 	coefficients := make(map[string]float64)
 
 	windowDuration, err := time.ParseDuration(windowString)
@@ -133,12 +133,6 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 		return nil, err
 	}
 
-	resolutionDuration, err := ParseDuration(resolution)
-	resolutionCoefficient := resolutionDuration.Hours()
-	if resolutionCoefficient < 1 {
-		resolutionCoefficient = 1 // just use 1 hour here, for numbers less than 1.
-	}
-
 	allTotals, err := ClusterCostsForAllClusters(cli, cp, windowString, offset)
 	if err != nil {
 		return nil, err
@@ -171,11 +165,11 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 		for _, costDatum := range costData {
 			if costDatum.ClusterID == cid {
 				cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, 1)
-				totalContainerCost += totalVectors(cpuv) * resolutionCoefficient
-				totalContainerCost += totalVectors(ramv) * resolutionCoefficient
-				totalContainerCost += totalVectors(gpuv) * resolutionCoefficient
+				totalContainerCost += totalVectors(cpuv)
+				totalContainerCost += totalVectors(ramv)
+				totalContainerCost += totalVectors(gpuv)
 				for _, pv := range pvvs {
-					totalContainerCost += totalVectors(pv) * resolutionCoefficient
+					totalContainerCost += totalVectors(pv)
 				}
 			}
 
@@ -189,14 +183,13 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 
 // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
 type AggregationOptions struct {
-	DataCount             int                // number of cost data points expected; ensures proper rate calculation if data is incomplete
-	Discount              float64            // percent by which to discount CPU, RAM, and GPU cost
-	IdleCoefficients      map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
-	IncludeEfficiency     bool               // set to true to receive efficiency/usage data
-	IncludeTimeSeries     bool               // set to true to receive time series data
-	Rate                  string             // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
-	ResolutionCoefficient float64            // coefficient for converting hourly costs to per-resolution cost; e.g. 6 for a 6h resolution
-	SharedResourceInfo    *SharedResourceInfo
+	DataCount          int                // number of cost data points expected; ensures proper rate calculation if data is incomplete
+	Discount           float64            // percent by which to discount CPU, RAM, and GPU cost
+	IdleCoefficients   map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
+	IncludeEfficiency  bool               // set to true to receive efficiency/usage data
+	IncludeTimeSeries  bool               // set to true to receive time series data
+	Rate               string             // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
+	SharedResourceInfo *SharedResourceInfo
 }
 
 // Helper method to test request/usgae values against allocation averages for efficiency scores. Generate a warning log if
@@ -233,14 +226,6 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 		idleCoefficients = make(map[string]float64)
 	}
 
-	// resolution coefficient compensates for less-frequent-than-hourly samples by multiplying
-	// cumulative values by the hours between samples. does not apply to rate data and defaults
-	// to 1.0, which matches hourly sampling of hourly data.
-	resolutionCoefficient := opts.ResolutionCoefficient
-	if resolutionCoefficient == 0.0 || rate != "" {
-		resolutionCoefficient = 1.0
-	}
-
 	// aggregations collects key-value pairs of resource group-to-aggregated data
 	// e.g. namespace-to-data or label-value-to-data
 	aggregations := make(map[string]*Aggregation)
@@ -256,12 +241,12 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 		}
 		if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
 			cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, idleCoefficient)
-			sharedResourceCost += totalVectors(cpuv) * resolutionCoefficient
-			sharedResourceCost += totalVectors(ramv) * resolutionCoefficient
-			sharedResourceCost += totalVectors(gpuv) * resolutionCoefficient
+			sharedResourceCost += totalVectors(cpuv)
+			sharedResourceCost += totalVectors(ramv)
+			sharedResourceCost += totalVectors(gpuv)
 			sharedResourceCost += totalVectors(netv)
 			for _, pv := range pvvs {
-				sharedResourceCost += totalVectors(pv) * resolutionCoefficient
+				sharedResourceCost += totalVectors(pv)
 			}
 		} else {
 			if field == "cluster" {
@@ -296,10 +281,10 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 	}
 
 	for key, agg := range aggregations {
-		agg.CPUCost = totalVectors(agg.CPUCostVector) * resolutionCoefficient
-		agg.RAMCost = totalVectors(agg.RAMCostVector) * resolutionCoefficient
-		agg.GPUCost = totalVectors(agg.GPUCostVector) * resolutionCoefficient
-		agg.PVCost = totalVectors(agg.PVCostVector) * resolutionCoefficient
+		agg.CPUCost = totalVectors(agg.CPUCostVector)
+		agg.RAMCost = totalVectors(agg.RAMCostVector)
+		agg.GPUCost = totalVectors(agg.GPUCostVector)
+		agg.PVCost = totalVectors(agg.PVCostVector)
 		agg.NetworkCost = totalVectors(agg.NetworkCostVector)
 		agg.SharedCost = sharedResourceCost / float64(len(aggregations))
 

+ 1 - 247
costmodel/router.go

@@ -62,7 +62,6 @@ type Accesses struct {
 	ServiceSelectorRecorder       *prometheus.GaugeVec
 	DeploymentSelectorRecorder    *prometheus.GaugeVec
 	Model                         *CostModel
-	AggregateCache                *cache.Cache
 	CostDataCache                 *cache.Cache
 	OutOfClusterCache             *cache.Cache
 	SettingsCache                 *cache.Cache
@@ -379,249 +378,6 @@ func (a *Accesses) CustomPricingHasChanged() bool {
 	return true
 }
 
-// AggregateCostModel handles HTTP requests to the aggregated cost model API, which can be parametrized
-// by time period using window and offset, aggregation field and subfield (e.g. grouping by label.app
-// using aggregation=label, aggregationSubfield=app), and filtered by namespace and cluster.
-func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
-
-	w.Header().Set("Content-Type", "application/json")
-	w.Header().Set("Access-Control-Allow-Origin", "*")
-
-	duration := r.URL.Query().Get("window")
-	offset := r.URL.Query().Get("offset")
-	namespace := r.URL.Query().Get("namespace")
-	cluster := r.URL.Query().Get("cluster")
-	field := r.URL.Query().Get("aggregation")
-	subfieldStr := r.URL.Query().Get("aggregationSubfield")
-	rate := r.URL.Query().Get("rate")
-	allocateIdle := r.URL.Query().Get("allocateIdle") == "true"
-	sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
-	sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
-	sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
-	remote := r.URL.Query().Get("remote") != "false"
-
-	subfields := []string{}
-	if len(subfieldStr) > 0 {
-		subfields = strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
-	}
-
-	// timeSeries == true maintains the time series dimension of the data,
-	// which by default gets summed over the entire interval
-	includeTimeSeries := r.URL.Query().Get("timeSeries") == "true"
-
-	// efficiency == true aggregates and returns usage and efficiency data
-	includeEfficiency := r.URL.Query().Get("efficiency") == "true"
-
-	// disableCache, if set to "true", tells this function to recompute and
-	// cache the requested data
-	disableCache := r.URL.Query().Get("disableCache") == "true"
-
-	// clearCache, if set to "true", tells this function to flush the cache,
-	// then recompute and cache the requested data
-	clearCache := r.URL.Query().Get("clearCache") == "true"
-
-	// aggregation field is required
-	if field == "" {
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write(WrapData(nil, fmt.Errorf("Missing aggregation field parameter")))
-		return
-	}
-
-	// aggregation subfield is required when aggregation field is "label"
-	if field == "label" && len(subfields) == 0 {
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write(WrapData(nil, fmt.Errorf("Missing aggregation subfield parameter for aggregation by label")))
-		return
-	}
-
-	// enforce one of four available rate options
-	if rate != "" && rate != "hourly" && rate != "daily" && rate != "monthly" {
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write(WrapData(nil, fmt.Errorf("If set, rate parameter must be one of: 'hourly', 'daily', 'monthly'")))
-		return
-	}
-
-	// if custom pricing has changed, then clear the cache and recompute data
-	if A.CustomPricingHasChanged() {
-		clearCache = true
-	}
-
-	// clear cache prior to checking the cache so that a clearCache=true
-	// request always returns a freshly computed value
-	if clearCache {
-		A.AggregateCache.Flush()
-		A.CostDataCache.Flush()
-	}
-
-	// parametrize cache key by all request parameters
-	aggKey := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t",
-		duration, offset, namespace, cluster, field, strings.Join(subfields, ","), rate,
-		allocateIdle, includeTimeSeries, includeEfficiency)
-
-	// check the cache for aggregated response; if cache is hit and not disabled, return response
-	if result, found := A.AggregateCache.Get(aggKey); found && !disableCache {
-		w.Write(WrapDataWithMessage(result, nil, fmt.Sprintf("aggregate cache hit: %s", aggKey)))
-		return
-	}
-
-	// enable remote if it is available and not disabled
-	remoteAvailable := os.Getenv(remoteEnabled) == "true"
-	remoteEnabled := remote && remoteAvailable
-
-	// Use Thanos Client if it exists (enabled) and remote flag set
-	var pClient prometheusClient.Client
-	if remote && A.ThanosClient != nil {
-		pClient = A.ThanosClient
-	} else {
-		pClient = A.PrometheusClient
-	}
-
-	// convert duration and offset to start and end times
-	startTime, endTime, err := ParseTimeRange(duration, offset)
-	if err != nil {
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write(WrapData(nil, fmt.Errorf("Error parsing duration (%s) and offset (%s)", duration, offset)))
-		return
-	}
-	durationHours := endTime.Sub(*startTime).Hours()
-
-	threeHoursAgo := time.Now().Add(-3 * time.Hour)
-	if A.ThanosClient != nil && endTime.After(threeHoursAgo) {
-		klog.Infof("Setting end time backwards to first present data")
-		*endTime = time.Now().Add(-3 * time.Hour)
-	}
-
-	// determine resolution by size of duration
-	resolution := duration
-	if durationHours >= 2160 {
-		// 90 days
-		resolution = "72h"
-	} else if durationHours >= 720 {
-		// 30 days
-		resolution = "24h"
-	} else if durationHours >= 168 {
-		// 7 days
-		resolution = "6h"
-	} else if durationHours >= 48 {
-		// 2 days
-		resolution = "2h"
-	} else if durationHours > 1 {
-		resolution = "1h"
-	}
-	resolutionDuration, err := ParseDuration(resolution)
-	resolutionHours := resolutionDuration.Hours()
-	if resolutionHours < 1 {
-		resolutionHours = 1
-	}
-	if err != nil {
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write(WrapData(nil, fmt.Errorf("Error parsing resolution (%s)", resolution)))
-		return
-	}
-
-	// exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
-	//   e.g. requesting duration=2d, offset=1d, resolution=1h on Jan 4 12:00:00 should provide data for Jan 1 12:00 - Jan 3 12:00
-	//        which has the equivalent start and end times of Jan 1 1:00 and Jan 3 12:00, respectively.
-	*startTime = startTime.Add(1 * *resolutionDuration)
-
-	// attempt to retrieve cost data from cache
-	var costData map[string]*CostData
-	key := fmt.Sprintf(`%s:%s:%s:%t`, duration, offset, resolution, remoteEnabled)
-	cacheData, found := A.CostDataCache.Get(key)
-	if found && !disableCache {
-		ok := false
-		costData, ok = cacheData.(map[string]*CostData)
-		if !ok {
-			klog.Errorf("caching error: failed to cast cost data to struct: %s", key)
-		}
-	} else {
-		start := startTime.Format(RFC3339Milli)
-		end := endTime.Format(RFC3339Milli)
-		costData, err = A.Model.ComputeCostDataRange(pClient, A.KubeClientSet, A.Cloud, start, end, resolution, "", "", remoteEnabled)
-		if err != nil {
-			w.Write(WrapData(nil, err))
-			return
-		}
-
-		A.CostDataCache.Set(key, costData, cache.DefaultExpiration)
-	}
-
-	c, err := A.Cloud.GetConfig()
-	if err != nil {
-		w.Write(WrapData(nil, err))
-		return
-	}
-	discount, err := ParsePercentString(c.Discount)
-	if err != nil {
-		w.Write(WrapData(nil, err))
-		return
-	}
-
-	idleCoefficients := make(map[string]float64)
-
-	if allocateIdle {
-		idleDurationCalcHours := durationHours
-		if durationHours < 1 {
-			idleDurationCalcHours = 1
-		}
-		windowStr := fmt.Sprintf("%dh", int(idleDurationCalcHours))
-		if A.ThanosClient != nil {
-			klog.Infof("Setting offset to 3h")
-			offset = "3h"
-		}
-		idleCoefficients, err = ComputeIdleCoefficient(costData, pClient, A.Cloud, discount, windowStr, offset, resolution)
-		if err != nil {
-			klog.Errorf("error computing idle coefficient: windowString=%s, offset=%s, err=%s", windowStr, offset, err)
-			w.Write(WrapData(nil, err))
-			return
-		}
-	}
-
-	sn := []string{}
-	sln := []string{}
-	slv := []string{}
-	if sharedNamespaces != "" {
-		sn = strings.Split(sharedNamespaces, ",")
-	}
-	if sharedLabelNames != "" {
-		sln = strings.Split(sharedLabelNames, ",")
-		slv = strings.Split(sharedLabelValues, ",")
-		if len(sln) != len(slv) || slv[0] == "" {
-			w.Write(WrapData(nil, fmt.Errorf("Supply exacly one label value per label name")))
-			return
-		}
-	}
-	var sr *SharedResourceInfo
-	if len(sn) > 0 || len(sln) > 0 {
-		sr = NewSharedResourceInfo(true, sn, sln, slv)
-	}
-
-	for cid, idleCoefficient := range idleCoefficients {
-		klog.Infof("Idle Coeff: %s: %f", cid, idleCoefficient)
-	}
-
-	// filter cost data by namespace and cluster after caching for maximal cache hits
-	costData = FilterCostData(costData, namespace, cluster)
-
-	dataCount := int(durationHours / resolutionHours)
-
-	// aggregate cost model data by given fields and cache the result for the default expiration
-	opts := &AggregationOptions{
-		DataCount:             dataCount,
-		Discount:              discount,
-		IdleCoefficients:      idleCoefficients,
-		IncludeEfficiency:     includeEfficiency,
-		IncludeTimeSeries:     includeTimeSeries,
-		Rate:                  rate,
-		ResolutionCoefficient: resolutionHours,
-		SharedResourceInfo:    sr,
-	}
-	result := AggregateCostData(costData, field, subfields, A.Cloud, opts)
-	A.AggregateCache.Set(aggKey, result, cache.DefaultExpiration)
-
-	w.Write(WrapDataWithMessage(result, nil, fmt.Sprintf("aggregate cache miss: %s", aggKey)))
-}
-
 func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -1076,7 +832,7 @@ func (a *Accesses) recordPrices() {
 	}()
 }
 
-func init() {
+func Initialize() {
 	klog.InitFlags(nil)
 	flag.Set("v", "3")
 	flag.Parse()
@@ -1211,7 +967,6 @@ func init() {
 	})
 
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
-	aggregateCache := cache.New(time.Minute*5, time.Minute*10)
 	costDataCache := cache.New(time.Minute*5, time.Minute*10)
 	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
 	settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
@@ -1234,7 +989,6 @@ func init() {
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		PersistentVolumePriceRecorder: pvGv,
 		Model:                         NewCostModel(kubeClientset),
-		AggregateCache:                aggregateCache,
 		CostDataCache:                 costDataCache,
 		OutOfClusterCache:             outOfClusterCache,
 		SettingsCache:                 settingsCache,

+ 2 - 0
main.go

@@ -9,6 +9,8 @@ import (
 )
 
 func main() {
+	costmodel.Initialize()
+
 	rootMux := http.NewServeMux()
 	rootMux.Handle("/", costmodel.Router)
 	rootMux.Handle("/metrics", promhttp.Handler())