Bläddra i källkod

Aggregate API: select resolution by duration to limit number of time series data points to cache; turn RFC 3339 timestamp layout into a constant

Niko Kovacevic 6 år sedan
förälder
incheckning
e9074a5f18
1 ändrade filer med 114 tillägg och 111 borttagningar
  1. 114 111
      costmodel/router.go

+ 114 - 111
costmodel/router.go

@@ -30,6 +30,7 @@ import (
 const (
 	prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
 	prometheusTroubleshootingEp    = "http://docs.kubecost.com/custom-prom#troubleshoot"
+	RFC3339Milli                   = "2006-01-02T15:04:05.000Z"
 )
 
 var (
@@ -73,6 +74,45 @@ type DataEnvelope struct {
 	Message string      `json:"message,omitempty"`
 }
 
+// filterCostData allows through only CostData that matches the given filters for namespace and clusterId
+func filterCostData(data map[string]*CostData, namespace, clusterId string) map[string]*CostData {
+	result := make(map[string]*CostData)
+	for key, datum := range data {
+		if costDataPassesFilters(datum, namespace, clusterId) {
+			result[key] = datum
+		}
+	}
+	return result
+}
+
+func filterFields(fields string, data map[string]*CostData) map[string]CostData {
+	fs := strings.Split(fields, ",")
+	fmap := make(map[string]bool)
+	for _, f := range fs {
+		fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
+		klog.V(1).Infof("to delete: %s", fieldNameLower)
+		fmap[fieldNameLower] = true
+	}
+	filteredData := make(map[string]CostData)
+	for cname, costdata := range data {
+		s := reflect.TypeOf(*costdata)
+		val := reflect.ValueOf(*costdata)
+		costdata2 := CostData{}
+		cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
+		n := s.NumField()
+		for i := 0; i < n; i++ {
+			field := s.Field(i)
+			value := val.Field(i)
+			value2 := cd2.Field(i)
+			if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
+				value2.Set(reflect.Value(value))
+			}
+		}
+		filteredData[cname] = cd2.Interface().(CostData)
+	}
+	return filteredData
+}
+
 func normalizeTimeParam(param string) (string, error) {
 	// convert days to hours
 	if param[len(param)-1:] == "d" {
@@ -88,7 +128,7 @@ func normalizeTimeParam(param string) (string, error) {
 	return param, nil
 }
 
-// parseDuration converts a Prometheus-style resolution string into a Duration
+// parseDuration converts a Prometheus-style duration string into a Duration
 func parseDuration(duration string) (*time.Duration, error) {
 	unitStr := duration[len(duration)-1:]
 	var unit time.Duration
@@ -204,34 +244,6 @@ func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps
 	w.Write(wrapData(nil, err))
 }
 
-func filterFields(fields string, data map[string]*CostData) map[string]CostData {
-	fs := strings.Split(fields, ",")
-	fmap := make(map[string]bool)
-	for _, f := range fs {
-		fieldNameLower := strings.ToLower(f) // convert to go struct name by uppercasing first letter
-		klog.V(1).Infof("to delete: %s", fieldNameLower)
-		fmap[fieldNameLower] = true
-	}
-	filteredData := make(map[string]CostData)
-	for cname, costdata := range data {
-		s := reflect.TypeOf(*costdata)
-		val := reflect.ValueOf(*costdata)
-		costdata2 := CostData{}
-		cd2 := reflect.New(reflect.Indirect(reflect.ValueOf(costdata2)).Type()).Elem()
-		n := s.NumField()
-		for i := 0; i < n; i++ {
-			field := s.Field(i)
-			value := val.Field(i)
-			value2 := cd2.Field(i)
-			if _, ok := fmap[strings.ToLower(field.Name)]; !ok {
-				value2.Set(reflect.Value(value))
-			}
-		}
-		filteredData[cname] = cd2.Interface().(CostData)
-	}
-	return filteredData
-}
-
 func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -321,7 +333,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	window := r.URL.Query().Get("window")
+	duration := r.URL.Query().Get("window")
 	offset := r.URL.Query().Get("offset")
 	namespace := r.URL.Query().Get("namespace")
 	cluster := r.URL.Query().Get("cluster")
@@ -348,7 +360,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 
 	// disableCache, if set to "true", tells this function to recompute and
 	// cache the requested data
-	disableCache := r.URL.Query().Get("disableCache") == "true" || allocateIdle
+	disableCache := r.URL.Query().Get("disableCache") == "true"
 
 	// clearCache, if set to "true", tells this function to flush the cache,
 	// then recompute and cache the requested data
@@ -384,7 +396,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 
 	// parametrize cache key by all request parameters
 	aggKey := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t",
-		window, offset, namespace, cluster, field, strings.Join(subfields, ","), rate,
+		duration, offset, namespace, cluster, field, strings.Join(subfields, ","), rate,
 		allocateIdle, includeTimeSeries, includeEfficiency)
 
 	// check the cache for aggregated response; if cache is hit and not disabled, return response
@@ -405,24 +417,76 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		pClient = a.PrometheusClient
 	}
 
-	resolution := "1h"
-	data, err := a.CostDataRangeWithCache(pClient, window, offset, resolution, remoteEnabled, disableCache)
+	// convert duration and offset to start and end times
+	startTime, endTime, err := parseTimeRange(duration, offset)
 	if err != nil {
-		w.Write(wrapData(nil, err))
+		w.WriteHeader(http.StatusBadRequest)
+		w.Write(wrapData(nil, fmt.Errorf("Error parsing duration (%s) and offset (%s)", duration, offset)))
 		return
 	}
-	data = FilterCostData(data, namespace, cluster)
+	durationHours := endTime.Sub(*startTime).Hours()
 
-	startTime, endTime, err := parseTimeRange(window, offset)
+	threeHoursAgo := time.Now().Add(-3 * time.Hour)
+	if a.ThanosClient != nil && endTime.After(threeHoursAgo) {
+		klog.Infof("Setting end time backwards to first present data")
+		*endTime = time.Now().Add(-3 * time.Hour)
+	}
+
+	// determine resolution by size of duration
+	resolution := "1h"
+	if durationHours >= 2160 {
+		// 90 days
+		resolution = "72h"
+	} else if durationHours >= 720 {
+		// 30 days
+		resolution = "24h"
+	} else if durationHours >= 168 {
+		// 7 days
+		resolution = "6h"
+	} else if durationHours >= 48 {
+		// 2 days
+		resolution = "2h"
+	}
+	klog.V(1).Infof("resolution: %s", resolution)
+	resolutionDuration, err := parseDuration(resolution)
+	resolutionHours := resolutionDuration.Hours()
 	if err != nil {
 		w.WriteHeader(http.StatusBadRequest)
-		w.Write(wrapData(nil, err))
+		w.Write(wrapData(nil, fmt.Errorf("Error parsing resolution (%s)", resolution)))
 		return
 	}
-	dur := endTime.Sub(*startTime)
-	// dataLength is the number of time series data expected for the given interval,
-	// which we compute because Prometheus time series vectors omit zero values.
-	dataCount := int(dur.Hours())
+
+	// exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
+	//   e.g. requesting duration=2d, offset=1d, resolution=1h on Jan 4 12:00:00 should provide data for Jan 1 12:00 - Jan 3 12:00
+	//        which has the equivalent start and end times of Jan 1 1:00 and Jan 3 12:00, respectively.
+	*startTime = startTime.Add(1 * *resolutionDuration)
+
+	// attempt to retrieve cost data from cache
+	var costData map[string]*CostData
+	key := fmt.Sprintf(`%s:%s:%s:%t`, duration, offset, resolution, remoteEnabled)
+	cacheData, found := a.CostDataCache.Get(key)
+	if found && !disableCache {
+		klog.V(1).Infof("cost data cache hit: %s", key)
+		ok := false
+		costData, ok = cacheData.(map[string]*CostData)
+		if !ok {
+			klog.Errorf("caching error: failed to cast cost data to struct: %s", key)
+		}
+	} else {
+		klog.V(1).Infof("cost data cache miss: %s", key)
+		start := startTime.Format(RFC3339Milli)
+		end := endTime.Format(RFC3339Milli)
+		costData, err = a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, resolution, "", "", remoteEnabled)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+			return
+		}
+
+		a.CostDataCache.Set(key, costData, cache.DefaultExpiration)
+	}
+
+	// filter cost data by namespace and cluster after caching for maximal cache hits
+	costData = filterCostData(costData, namespace, cluster)
 
 	c, err := a.Cloud.GetConfig()
 	if err != nil {
@@ -438,12 +502,12 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 
 	idleCoefficients := make(map[string]float64)
 	if allocateIdle {
-		windowStr := fmt.Sprintf("%dh", int(dur.Hours()))
+		windowStr := fmt.Sprintf("%dh", int(durationHours))
 		if a.ThanosClient != nil {
 			klog.Infof("Setting offset to 3h")
 			offset = "3h"
 		}
-		idleCoefficients, err = ComputeIdleCoefficient(data, pClient, a.Cloud, discount, windowStr, offset)
+		idleCoefficients, err = ComputeIdleCoefficient(costData, pClient, a.Cloud, discount, windowStr, offset)
 		if err != nil {
 			klog.V(1).Infof("error computing idle coefficient: windowString=%s, offset=%s, err=%s", windowStr, offset, err)
 			w.Write(wrapData(nil, err))
@@ -474,6 +538,9 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		klog.Infof("Idle Coeff: %s: %f", cid, idleCoefficient)
 	}
 
+	dataCount := int(durationHours / resolutionHours)
+	klog.V(1).Infof("data count = %d for duration (%fh) resolution (%fh)", dataCount, durationHours, resolutionHours)
+
 	// aggregate cost model data by given fields and cache the result for the default expiration
 	opts := &AggregationOptions{
 		DataCount:          dataCount,
@@ -484,74 +551,12 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		Rate:               rate,
 		SharedResourceInfo: sr,
 	}
-	result := AggregateCostData(data, field, subfields, a.Cloud, opts)
+	result := AggregateCostData(costData, field, subfields, a.Cloud, opts)
 	a.AggregateCache.Set(aggKey, result, cache.DefaultExpiration)
 
 	w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("aggregate cache miss: %s", aggKey)))
 }
 
-// CostDataRangeWithCache attempts to retrieve cost data for the given range from a cache, computing and caching the data
-// if it is not already available. Filtering by namespace and cluster is disallowed here to maximize likelihood of
-// cache hits.
-func (a *Accesses) CostDataRangeWithCache(pc prometheusClient.Client, duration, offset, resolution string, remoteEnabled, disableCache bool) (map[string]*CostData, error) {
-	// convert duration and offset to start and end times
-	startTime, endTime, err := parseTimeRange(duration, offset)
-	if err != nil {
-		return nil, err
-	}
-
-	threeHoursAgo := time.Now().Add(-3 * time.Hour)
-	if a.ThanosClient != nil && endTime.After(threeHoursAgo) {
-		klog.Infof("Setting end time backwards to first present data")
-		*endTime = time.Now().Add(-3 * time.Hour)
-	}
-
-	resolutionDuration, err := parseDuration(resolution)
-	if err != nil {
-		return nil, err
-	}
-
-	// exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
-	//   e.g. requesting duration=2d, offset=1d, resolution=1h on Jan 4 12:00:00 should provide data for Jan 1 12:00 - Jan 3 12:00
-	//        which has the equivalent start and end times of Jan 1 1:00 and Jan 3 12:00, respectively.
-	*startTime = startTime.Add(1 * *resolutionDuration)
-
-	// attempt to retrieve cost data from cache
-	key := fmt.Sprintf(`%s:%s:%s:%t`, duration, offset, resolution, remoteEnabled)
-	if value, found := a.CostDataCache.Get(key); found && !disableCache {
-		klog.V(1).Infof("cost data cache hit: %s", key)
-		if data, ok := value.(map[string]*CostData); ok {
-			return data, nil
-		}
-		klog.Errorf("caching error: failed to cast data to struct: %s", key)
-	}
-	klog.V(1).Infof("cost data cache miss: %s", key)
-
-	// cache miss; compute data and cache it
-	layout := "2006-01-02T15:04:05.000Z"
-	start := startTime.Format(layout)
-	end := endTime.Format(layout)
-	data, err := a.Model.ComputeCostDataRange(pc, a.KubeClientSet, a.Cloud, start, end, resolution, "", "", remoteEnabled)
-	if err != nil {
-		return nil, err
-	}
-
-	a.CostDataCache.Set(key, data, cache.DefaultExpiration)
-
-	return data, nil
-}
-
-// FilterCostData allows through only CostData that matches the given filters for namespace and clusterId
-func FilterCostData(data map[string]*CostData, namespace, clusterId string) map[string]*CostData {
-	result := make(map[string]*CostData)
-	for key, datum := range data {
-		if costDataPassesFilters(datum, namespace, clusterId) {
-			result[key] = datum
-		}
-	}
-	return result
-}
-
 func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -622,8 +627,6 @@ func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Reques
 	endString := r.URL.Query().Get("end")
 	windowString := r.URL.Query().Get("window")
 
-	layout := "2006-01-02T15:04:05.000Z"
-
 	var start time.Time
 	var end time.Time
 	var err error
@@ -632,7 +635,7 @@ func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Reques
 		windowString = "1h"
 	}
 	if startString != "" {
-		start, err = time.Parse(layout, startString)
+		start, err = time.Parse(RFC3339Milli, startString)
 		if err != nil {
 			klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
 			w.Write(wrapData(nil, err))
@@ -646,7 +649,7 @@ func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Reques
 		start = time.Now().Add(-2 * window)
 	}
 	if endString != "" {
-		end, err = time.Parse(layout, endString)
+		end, err = time.Parse(RFC3339Milli, endString)
 		if err != nil {
 			klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
 			w.Write(wrapData(nil, err))