Selaa lähdekoodia

Caching layer for full cost model data and out of cluster costs; fix for converting time durations to start and end dates (e.g. make 1d = 24h, not 25h)

Niko Kovacevic 6 vuotta sitten
vanhempi
sitoutus
1344a92823
2 muutettua tiedostoa jossa 240 lisäystä ja 124 poistoa
  1. 31 5
      costmodel/aggregations.go
  2. 209 119
      costmodel/router.go

+ 31 - 5
costmodel/aggregations.go

@@ -11,6 +11,11 @@ import (
 	"k8s.io/klog"
 )
 
+const (
+	hoursPerDay   = 24.0
+	hoursPerMonth = 730.0
+)
+
 type Aggregation struct {
 	Aggregator           string    `json:"aggregation"`
 	Subfields            []string  `json:"subfields,omitempty"`
@@ -40,10 +45,27 @@ type Aggregation struct {
 	TotalCost            float64   `json:"totalCost"`
 }
 
-const (
-	hoursPerDay   = 24.0
-	hoursPerMonth = 730.0
-)
+func (a *Aggregation) GetDataCount() int {
+	length := 0
+
+	if length < len(a.CPUCostVector) {
+		length = len(a.CPUCostVector)
+	}
+	if length < len(a.RAMCostVector) {
+		length = len(a.RAMCostVector)
+	}
+	if length < len(a.PVCostVector) {
+		length = len(a.PVCostVector)
+	}
+	if length < len(a.GPUCostVector) {
+		length = len(a.GPUCostVector)
+	}
+	if length < len(a.NetworkCostVector) {
+		length = len(a.NetworkCostVector)
+	}
+
+	return length
+}
 
 type SharedResourceInfo struct {
 	ShareResources  bool
@@ -140,7 +162,7 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 
 // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
 type AggregationOptions struct {
-	DataCount          int64              // number of cost data points expected; ensures proper rate calculation if data is incomplete
+	DataCount          int                // number of cost data points expected; ensures proper rate calculation if data is incomplete
 	Discount           float64            // percent by which to discount CPU, RAM, and GPU cost
 	IdleCoefficients   map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
 	IncludeEfficiency  bool               // set to true to receive efficiency/usage data
@@ -229,6 +251,10 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 		agg.NetworkCost = totalVectors(agg.NetworkCostVector)
 		agg.SharedCost = sharedResourceCost / float64(len(aggregations))
 
+		if dataCount == 0 {
+			dataCount = agg.GetDataCount()
+		}
+
 		if rate != "" && dataCount > 0 {
 			agg.CPUCost /= float64(dataCount)
 			agg.RAMCost /= float64(dataCount)

+ 209 - 119
costmodel/router.go

@@ -61,7 +61,9 @@ type Accesses struct {
 	ServiceSelectorRecorder       *prometheus.GaugeVec
 	DeploymentSelectorRecorder    *prometheus.GaugeVec
 	Model                         *CostModel
-	Cache                         *cache.Cache
+	AggregateCache                *cache.Cache
+	CostDataCache                 *cache.Cache
+	OutOfClusterCache             *cache.Cache
 }
 
 type DataEnvelope struct {
@@ -86,6 +88,65 @@ func normalizeTimeParam(param string) (string, error) {
 	return param, nil
 }
 
+// parseDuration converts a Prometheus-style resolution string into a Duration
+func parseDuration(duration string) (*time.Duration, error) {
+	unitStr := duration[len(duration)-1:]
+	var unit time.Duration
+	switch unitStr {
+	case "s":
+		unit = time.Second
+	case "m":
+		unit = time.Minute
+	case "h":
+		unit = time.Hour
+	case "d":
+		unit = 24.0 * time.Hour
+	default:
+		return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+	}
+
+	amountStr := duration[:len(duration)-1]
+	amount, err := strconv.ParseInt(amountStr, 10, 64)
+	if err != nil {
+		return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+	}
+
+	dur := time.Duration(amount) * unit
+	return &dur, nil
+}
+
+// parseTimeRange returns a start and end time, respectively, which are converted from
+// a duration and offset, defined as strings with Prometheus-style syntax.
+func parseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
+	// endTime defaults to the current time, unless an offset is explicity declared,
+	// in which case it shifts endTime back by given duration
+	endTime := time.Now()
+	if offset != "" {
+		o, err := time.ParseDuration(offset)
+		if err != nil {
+			return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
+		}
+		endTime = endTime.Add(-1 * o)
+	}
+
+	// if duration is defined in terms of days, convert to hours
+	// e.g. convert "2d" to "48h"
+	durationNorm, err := normalizeTimeParam(duration)
+	if err != nil {
+		return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
+	}
+
+	// convert time duration into start and end times, formatted
+	// as ISO datetime strings
+	dur, err := time.ParseDuration(durationNorm)
+	if err != nil {
+		return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
+	}
+	startTime := endTime.Add(-1 * dur)
+
+	return &startTime, &endTime, nil
+}
+
 func wrapDataWithMessage(data interface{}, err error, message string) []byte {
 	var resp []byte
 
@@ -209,7 +270,7 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 		// dataCount is the number of time series data expected for the given interval,
 		// which we compute because Prometheus time series vectors omit zero values.
 		// This assumes hourly data, incremented by one to capture the 0th data point.
-		dataCount := int64(dur.Hours()) + 1
+		dataCount := int(dur.Hours()) + 1
 		klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
 
 		opts := &AggregationOptions{
@@ -271,7 +332,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
 	sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
 	sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
-	remote := r.URL.Query().Get("remote")
+	remote := r.URL.Query().Get("remote") != "false"
 
 	subfields := []string{}
 	if len(subfieldStr) > 0 {
@@ -293,79 +354,6 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	// then recompute and cache the requested data
 	clearCache := r.URL.Query().Get("clearCache") == "true"
 
-	// time window must be defined, whether by window and offset or by manually
-	// setting the start and end times as ISO time strings
-	var start, end string
-	var dur time.Duration
-	layout := "2006-01-02T15:04:05.000Z"
-	if window == "" {
-		start = r.URL.Query().Get("start")
-		startTime, err := time.Parse(layout, start)
-		if err != nil {
-			w.WriteHeader(http.StatusBadRequest)
-			w.Write(wrapData(nil, fmt.Errorf("Invalid start parameter: %s", start)))
-			return
-		}
-
-		end = r.URL.Query().Get("end")
-		endTime, err := time.Parse(layout, end)
-		if err != nil {
-			w.WriteHeader(http.StatusBadRequest)
-			w.Write(wrapData(nil, fmt.Errorf("Invalid end parameter: %s", end)))
-			return
-		}
-
-		dur = endTime.Sub(startTime)
-	} else {
-		// endTime defaults to the current time, unless an offset is explicity declared,
-		// in which case it shifts endTime back by given duration
-		endTime := time.Now()
-		if offset != "" {
-			o, err := time.ParseDuration(offset)
-			if err != nil {
-				klog.V(1).Infof("error parsing offset: %s", err)
-				w.Write(wrapData(nil, err))
-				return
-			}
-			endTime = endTime.Add(-1 * o)
-		}
-
-		if a.ThanosClient != nil {
-			if endTime.After(time.Now().Add(-3 * time.Hour)) {
-				klog.Infof("Setting end time backwards to first present data")
-				endTime = time.Now().Add(-3 * time.Hour)
-			}
-		}
-
-		// if window is defined in terms of days, convert to hours
-		// e.g. convert "2d" to "48h"
-		window, err := normalizeTimeParam(window)
-		if err != nil {
-			w.Write(wrapData(nil, err))
-			return
-		}
-
-		// convert time window into start and end times, formatted
-		// as ISO datetime strings
-		dur, err = time.ParseDuration(window)
-		if err != nil {
-			w.Write(wrapData(nil, err))
-			return
-		}
-		startTime := endTime.Add(-1 * dur)
-
-		start = startTime.Format(layout)
-		end = endTime.Format(layout)
-
-		klog.V(1).Infof("start: %s, end: %s", start, end)
-	}
-
-	// dataCount is the number of time series data expected for the given interval,
-	// which we compute because Prometheus time series vectors omit zero values.
-	// This assumes hourly data, incremented by one to capture the 0th data point.
-	dataCount := int64(dur.Hours()) + 1
-	klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
-
 	// aggregation field is required
 	if field == "" {
 		w.WriteHeader(http.StatusBadRequest)
@@ -390,39 +378,51 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	// clear cache prior to checking the cache so that a clearCache=true
 	// request always returns a freshly computed value
 	if clearCache {
-		a.Cache.Flush()
+		a.AggregateCache.Flush()
+		a.CostDataCache.Flush()
 	}
 
 	// parametrize cache key by all request parameters
-	aggKey := fmt.Sprintf("aggregate:%s:%s:%s:%s:%s:%s:%s:%t:%t:%t",
-		window, offset, namespace, cluster, field, strings.Join(subfields, ","), rate, allocateIdle, includeTimeSeries, includeEfficiency)
+	aggKey := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t",
+		window, offset, namespace, cluster, field, strings.Join(subfields, ","), rate,
+		allocateIdle, includeTimeSeries, includeEfficiency)
 
 	// check the cache for aggregated response; if cache is hit and not disabled, return response
-	if result, found := a.Cache.Get(aggKey); found && !disableCache {
-		w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache hit: %s", aggKey)))
+	if result, found := a.AggregateCache.Get(aggKey); found && !disableCache {
+		w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("aggregate cache hit: %s", aggKey)))
 		return
 	}
 
+	// enable remote if it is available and not disabled
 	remoteAvailable := os.Getenv(remoteEnabled) == "true"
-	remoteEnabled := false
-	if remoteAvailable && remote != "false" {
-		remoteEnabled = true
-	}
+	remoteEnabled := remote && remoteAvailable
 
 	// Use Thanos Client if it exists (enabled) and remote flag not set
 	var pClient prometheusClient.Client
-	if remote != "false" && a.ThanosClient != nil {
+	if remote && a.ThanosClient != nil {
 		pClient = a.ThanosClient
 	} else {
 		pClient = a.PrometheusClient
 	}
 
-	data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, "1h", namespace, cluster, remoteEnabled)
+	resolution := "1h"
+	data, err := a.CostDataRangeWithCache(pClient, window, offset, resolution, remoteEnabled, disableCache)
 	if err != nil {
-		klog.V(1).Infof("error computing cost data range: start=%s, end=%s, err=%s", start, end, err)
 		w.Write(wrapData(nil, err))
 		return
 	}
+	data = FilterCostData(data, namespace, cluster)
+
+	startTime, endTime, err := parseTimeRange(window, offset)
+	if err != nil {
+		w.WriteHeader(http.StatusBadRequest)
+		w.Write(wrapData(nil, err))
+		return
+	}
+	dur := endTime.Sub(*startTime)
+	// dataLength is the number of time series data expected for the given interval,
+	// which we compute because Prometheus time series vectors omit zero values.
+	dataCount := int(dur.Hours())
 
 	c, err := a.Cloud.GetConfig()
 	if err != nil {
@@ -485,9 +485,71 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		SharedResourceInfo: sr,
 	}
 	result := AggregateCostData(data, field, subfields, a.Cloud, opts)
-	a.Cache.Set(aggKey, result, cache.DefaultExpiration)
+	a.AggregateCache.Set(aggKey, result, cache.DefaultExpiration)
 
-	w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache miss: %s", aggKey)))
+	w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("aggregate cache miss: %s", aggKey)))
+}
+
+// CostDataRangeWithCache attempts to retrieve cost data for the given range from a cache, computing and caching the data
+// if it is not already available. Filtering by namespace and cluster is disallowed here to maximize likelihood of
+// cache hits.
+func (a *Accesses) CostDataRangeWithCache(pc prometheusClient.Client, duration, offset, resolution string, remoteEnabled, disableCache bool) (map[string]*CostData, error) {
+	// convert duration and offset to start and end times
+	startTime, endTime, err := parseTimeRange(duration, offset)
+	if err != nil {
+		return nil, err
+	}
+
+	threeHoursAgo := time.Now().Add(-3 * time.Hour)
+	if a.ThanosClient != nil && endTime.After(threeHoursAgo) {
+		klog.Infof("Setting end time backwards to first present data")
+		*endTime = time.Now().Add(-3 * time.Hour)
+	}
+
+	resolutionDuration, err := parseDuration(resolution)
+	if err != nil {
+		return nil, err
+	}
+
+	// exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
+	//   e.g. requesting duration=2d, offset=1d, resolution=1h on Jan 4 12:00:00 should provide data for Jan 1 12:00 - Jan 3 12:00
+	//        which has the equivalent start and end times of Jan 1 1:00 and Jan 3 12:00, respectively.
+	*startTime = startTime.Add(1 * *resolutionDuration)
+
+	// attempt to retrieve cost data from cache
+	key := fmt.Sprintf(`%s:%s:%s:%t`, duration, offset, resolution, remoteEnabled)
+	if value, found := a.CostDataCache.Get(key); found && !disableCache {
+		klog.V(1).Infof("cost data cache hit: %s", key)
+		if data, ok := value.(map[string]*CostData); ok {
+			return data, nil
+		}
+		klog.Errorf("caching error: failed to cast data to struct: %s", key)
+	}
+	klog.V(1).Infof("cost data cache miss: %s", key)
+
+	// cache miss; compute data and cache it
+	layout := "2006-01-02T15:04:05.000Z"
+	start := startTime.Format(layout)
+	end := endTime.Format(layout)
+	data, err := a.Model.ComputeCostDataRange(pc, a.KubeClientSet, a.Cloud, start, end, resolution, "", "", remoteEnabled)
+	if err != nil {
+		return nil, err
+	}
+
+	a.CostDataCache.Set(key, data, cache.DefaultExpiration)
+
+	return data, nil
+}
+
+// FilterCostData allows through only CostData that matches the given filters for namespace and clusterId
+func FilterCostData(data map[string]*CostData, namespace, clusterId string) map[string]*CostData {
+	result := make(map[string]*CostData)
+	for key, datum := range data {
+		if costDataPassesFilters(datum, namespace, clusterId) {
+			result[key] = datum
+		}
+	}
+	return result
 }
 
 func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
@@ -535,33 +597,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 		}
 		discount = discount * 0.01
 
-		layout := "2006-01-02T15:04:05.000Z"
-		startTime, err := time.Parse(layout, start)
-		if err != nil {
-			w.Write(wrapData(nil, err))
-			return
-		}
-		endTime, err := time.Parse(layout, end)
-		if err != nil {
-			w.Write(wrapData(nil, err))
-			return
-		}
-
-		dur := endTime.Sub(startTime)
-		if err != nil {
-			w.Write(wrapData(nil, err))
-			return
-		}
-		windowHrs, err := strconv.ParseInt(window[:len(window)-1], 10, 64)
-
-		// dataCount is the number of time series data expected for the given interval,
-		// which we compute because Prometheus time series vectors omit zero values.
-		// This assumes hourly data, incremented by one to capture the 0th data point.
-		dataCount := (int64(dur.Hours()) / windowHrs) + 1
-		klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
-
 		opts := &AggregationOptions{
-			DataCount:        dataCount,
 			Discount:         discount,
 			IdleCoefficients: make(map[string]float64),
 		}
@@ -646,6 +682,56 @@ func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps
 	w.Write(wrapData(data, err))
 }
 
+func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	// start date for which to query costs, inclusive; format YYYY-MM-DD
+	start := r.URL.Query().Get("start")
+	// end date for which to query costs, inclusive; format YYYY-MM-DD
+	end := r.URL.Query().Get("end")
+	// aggregator sets the field by which to aggregate; default, prepended by "kubernetes_"
+	kubernetesAggregation := r.URL.Query().Get("aggregator")
+	// customAggregation allows full customization of aggregator w/o prepending
+	customAggregation := r.URL.Query().Get("customAggregation")
+	// disableCache, if set to "true", tells this function to recompute and
+	// cache the requested data
+	disableCache := r.URL.Query().Get("disableCache") == "true"
+	// clearCache, if set to "true", tells this function to flush the cache,
+	// then recompute and cache the requested data
+	clearCache := r.URL.Query().Get("clearCache") == "true"
+
+	aggregation := "kubernetes_" + kubernetesAggregation
+	if customAggregation != "" {
+		aggregation = customAggregation
+	}
+
+	// clear cache prior to checking the cache so that a clearCache=true
+	// request always returns a freshly computed value
+	if clearCache {
+		a.OutOfClusterCache.Flush()
+	}
+
+	// attempt to retrieve cost data from cache
+	key := fmt.Sprintf(`%s:%s:%s`, start, end, aggregation)
+	if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
+		klog.V(1).Infof("out of cluser cache hit: %s", key)
+		if data, ok := value.([]*costAnalyzerCloud.OutOfClusterAllocation); ok {
+			w.Write(wrapDataWithMessage(data, nil, fmt.Sprintf("out of cluser cache hit: %s", key)))
+			return
+		}
+		klog.Errorf("caching error: failed to type cast data: %s", key)
+	}
+	klog.V(1).Infof("out of cluster cache miss: %s", key)
+
+	data, err := a.Cloud.ExternalAllocations(start, end, aggregation)
+	if err == nil {
+		a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
+	}
+
+	w.Write(wrapDataWithMessage(data, err, fmt.Sprintf("out of cluser cache miss: %s", key)))
+}
+
 func (p *Accesses) GetAllNodePricing(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -1059,7 +1145,9 @@ func init() {
 	})
 
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
-	modelCache := cache.New(time.Minute*5, time.Minute*10)
+	aggregateCache := cache.New(time.Minute*5, time.Minute*10)
+	costDataCache := cache.New(time.Minute*5, time.Minute*10)
+	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
 
 	A = Accesses{
 		PrometheusClient:              promCli,
@@ -1079,7 +1167,9 @@ func init() {
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		PersistentVolumePriceRecorder: pvGv,
 		Model:                         NewCostModel(kubeClientset),
-		Cache:                         modelCache,
+		AggregateCache:                aggregateCache,
+		CostDataCache:                 costDataCache,
+		OutOfClusterCache:             outOfClusterCache,
 	}
 
 	remoteEnabled := os.Getenv(remoteEnabled)
@@ -1138,7 +1228,7 @@ func init() {
 	Router.GET("/costDataModel", A.CostDataModel)
 	Router.GET("/costDataModelRange", A.CostDataModelRange)
 	Router.GET("/costDataModelRangeLarge", A.CostDataModelRangeLarge)
-	Router.GET("/outOfClusterCosts", A.OutofClusterCosts)
+	Router.GET("/outOfClusterCosts", A.OutOfClusterCostsWithCache)
 	Router.GET("/allNodePricing", A.GetAllNodePricing)
 	Router.GET("/healthz", Healthz)
 	Router.GET("/getConfigs", A.GetConfigs)