Kaynağa Gözat

Merge branch 'develop' into niko/assets

Niko Kovacevic 5 yıl önce
ebeveyn
işleme
703114eccb
4 değiştirilmiş dosya ile 122 ekleme ve 86 silme
  1. 3 10
      pkg/cloud/awsprovider.go
  2. 74 48
      pkg/costmodel/costmodel.go
  3. 14 10
      pkg/prom/query.go
  4. 31 18
      pkg/prom/result.go

+ 3 - 10
pkg/cloud/awsprovider.go

@@ -990,16 +990,9 @@ func (aws *AWS) NodePricing(k Key) (*Node, error) {
 			}, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
 		}
 		return aws.createNode(terms, usageType, k)
-	} else { // Fall back to base pricing if we can't find the key.
-		klog.V(1).Infof("Invalid Pricing Key \"%s\"", key)
-		return &Node{
-			Cost:             aws.BaseCPUPrice,
-			BaseCPUPrice:     aws.BaseCPUPrice,
-			BaseRAMPrice:     aws.BaseRAMPrice,
-			BaseGPUPrice:     aws.BaseGPUPrice,
-			UsageType:        usageType,
-			UsesBaseCPUPrice: true,
-		}, nil
+	} else { // Fall back to base pricing if we can't find the key. Base pricing is handled at the costmodel level.
+		return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
+
 	}
 }
 

+ 74 - 48
pkg/costmodel/costmodel.go

@@ -178,33 +178,27 @@ const (
 	sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace, cluster_id, kubernetes_name)) by (persistentvolumeclaim, storageclass, namespace, volumename, cluster_id)`
 	// queryRAMAllocationByteHours yields the total byte-hour RAM allocation over the given
 	// window, aggregated by container.
-	//  [line 3]     sum(all byte measurements) = [byte*scrape] by metric
-	//  [lines 4-6]  (") / (approximate scrape count per hour) = [byte*hour] by metric
-	//  [lines 2,7]     sum(") by unique container key = [byte*hour] by container
-	//  [lines 1,8]  relabeling
-	queryRAMAllocationByteHours = `label_replace(label_replace(
-		sum(
-			sum_over_time(container_memory_allocation_bytes{container!="",container!="POD", node!=""}[%s])
-			/ clamp_min(
-				count_over_time(container_memory_allocation_bytes{container!="",container!="POD", node!=""}[%s])/%f,
-				scalar(avg(avg_over_time(prometheus_target_interval_length_seconds[%s])))*%f)
-		) by (namespace,container,pod,node,cluster_id)
-	, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
+	//  [line 3]     sum_over_time(each byte*min in window) / (min/hr kubecost up) = [byte*hour] by metric, adjusted for kubecost downtime
+	//  [lines 2,4]  sum(") by unique container key = [byte*hour] by container
+	//  [lines 1,5]  relabeling
+	queryRAMAllocationByteHours = `
+		label_replace(label_replace(
+			sum(
+				sum_over_time(container_memory_allocation_bytes{container!="",container!="POD", node!=""}[%s:1m]) / %f 
+			) by (namespace,container,pod,node,cluster_id)
+		, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
 	// queryCPUAllocationVCPUHours yields the total VCPU-hour CPU allocation over the given
 	// window, aggregated by container.
-	//  [line 3]     sum(all VCPU measurements within given window) = [VCPU*scrape] by metric
-	//  [lines 4-6]  (") / (approximate scrape count per hour) = [VCPU*hour] by metric
-	//  [lines 2,7]     sum(") by unique container key = [VCPU*hour] by container
-	//  [lines 1,8]  relabeling
-	queryCPUAllocationVCPUHours = `label_replace(label_replace(
-		sum(
-			sum_over_time(container_cpu_allocation{container!="",container!="POD", node!=""}[%s])
-			/ clamp_min(
-				count_over_time(container_cpu_allocation{container!="",container!="POD", node!=""}[%s])/%f,
-				scalar(avg(avg_over_time(prometheus_target_interval_length_seconds[%s])))*%f)
-		) by (namespace,container,pod,node,cluster_id)
-	, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
-	// queryPVCAllocationFmt yields the total byte-hour CPU allocation over the given window.
+	//  [line 3]     sum_over_time(each VCPU*mins in window) / (min/hr kubecost up) = [VCPU*hour] by metric, adjusted for kubecost downtime
+	//  [lines 2,4]  sum(") by unique container key = [VCPU*hour] by container
+	//  [lines 1,5]  relabeling
+	queryCPUAllocationVCPUHours = `
+		label_replace(label_replace(
+			sum(
+				sum_over_time(container_cpu_allocation{container!="",container!="POD", node!=""}[%s:1m]) / %f
+			) by (namespace,container,pod,node,cluster_id)
+		, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
+	// queryPVCAllocationFmt yields the total byte-hour PVC allocation over the given window.
 	//  sum(all VCPU measurements within given window) = [byte*min] by metric
 	//  (") / 60 = [byte*hour] by metric, assuming no missed scrapes
 	//  (") * (normalization factor) = [byte*hour] by metric, normalized for missed scrapes
@@ -226,6 +220,7 @@ const (
 	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryInternetNetworkUsage = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	normalizationStr          = `max(count_over_time(kube_pod_container_resource_requests_memory_bytes{}[%s] %s))`
+	kubecostUpMinsPerHourStr  = `max(count_over_time(node_cpu_hourly_cost[%s:1m])) / %f`
 )
 
 type PrometheusMetadata struct {
@@ -1492,28 +1487,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	return data, err
 }
 
-func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
-	startString, endString, windowString string, resolutionHours float64, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
-
-	// Use a heuristic to tell the difference between missed scrapes and an incomplete window
-	// of data due to fresh install, etc.
-	minimumExpectedScrapeRate := 0.95
-
-	queryRAMAlloc := fmt.Sprintf(queryRAMAllocationByteHours, windowString, windowString, resolutionHours, windowString, minimumExpectedScrapeRate)
-	queryCPUAlloc := fmt.Sprintf(queryCPUAllocationVCPUHours, windowString, windowString, resolutionHours, windowString, minimumExpectedScrapeRate)
-	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
-	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
-	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
-	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
-	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "", resolutionHours, windowString, "")
-	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
-	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, windowString, windowString, resolutionHours, minimumExpectedScrapeRate)
-	queryPVHourlyCost := fmt.Sprintf(queryPVHourlyCostFmt, windowString)
-	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, windowString, "")
-	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, windowString, "")
-	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, windowString, "")
-	queryNormalization := fmt.Sprintf(normalizationStr, windowString, "")
-
+func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider, startString, endString, windowString string, resolutionHours float64, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
 	layout := "2006-01-02T15:04:05.000Z"
 
 	start, err := time.Parse(layout, startString)
@@ -1521,16 +1495,19 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
 		return nil, err
 	}
+
 	end, err := time.Parse(layout, endString)
 	if err != nil {
 		klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
 		return nil, err
 	}
+
 	window, err := time.ParseDuration(windowString)
 	if err != nil {
 		klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
 		return nil, err
 	}
+
 	clusterID := env.GetClusterID()
 
 	durHrs := end.Sub(start).Hours() + 1
@@ -1543,10 +1520,59 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		return CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
 	}
 
+	ctx := prom.NewContext(cli)
+
+	// Query for the average number of minutes per hour that Kubecost was up
+	// in the given range by averaging the number of up minutes-per-hour for
+	// each window in the range. Use that number in the RAM and CPU allocation
+	// queries as the adjutsment factor, scaling only if Kubecost was down
+	// for fewer than 3 minutes (as a heuristic for a reasonable amount of
+	// time to interpolate). Otherwise, use 60 minutes per hour and assume
+	// that this period of time is during Kubecost start-up or a long-term
+	// downtime for which we don't want to interpolate.
+	queryKubecostUpMinsPerHour := fmt.Sprintf(kubecostUpMinsPerHourStr, windowString, window.Hours())
+	resKubecostUp, err := ctx.QueryRangeSync(queryKubecostUpMinsPerHour, start, end, window)
+	if err != nil {
+		log.Errorf("costDataRange: error querying Kubecost up: %s", err)
+		return nil, err
+	}
+
+	kubecostMinsPerHour := 0.0
+	num := 0
+	if len(resKubecostUp) > 0 {
+		for _, val := range resKubecostUp[0].Values {
+			kubecostMinsPerHour += val.Value
+			num++
+		}
+		kubecostMinsPerHour /= float64(num)
+	}
+	if kubecostMinsPerHour <= 57.0 {
+		kubecostMinsPerHour = 60.0
+	}
+
+	// TODO niko/queryfix rewrite PVCAllocation query too, and remove this
+	// Use a heuristic to tell the difference between missed scrapes and an incomplete window
+	// of data due to fresh install, etc.
+	minimumExpectedScrapeRate := 0.95
+
+	queryRAMAlloc := fmt.Sprintf(queryRAMAllocationByteHours, windowString, kubecostMinsPerHour)
+	queryCPUAlloc := fmt.Sprintf(queryCPUAllocationVCPUHours, windowString, kubecostMinsPerHour)
+	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
+	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
+	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
+	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
+	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "", resolutionHours, windowString, "")
+	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
+	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, windowString, windowString, resolutionHours, minimumExpectedScrapeRate)
+	queryPVHourlyCost := fmt.Sprintf(queryPVHourlyCostFmt, windowString)
+	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, windowString, "")
+	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, windowString, "")
+	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, windowString, "")
+	queryNormalization := fmt.Sprintf(normalizationStr, windowString, "")
+
 	queryProfileStart := time.Now()
 
 	// Submit all queries for concurrent evaluation
-	ctx := prom.NewContext(cli)
 	resChRAMRequests := ctx.QueryRange(queryRAMRequests, start, end, window)
 	resChRAMUsage := ctx.QueryRange(queryRAMUsage, start, end, window)
 	resChRAMAlloc := ctx.QueryRange(queryRAMAlloc, start, end, window)

+ 14 - 10
pkg/prom/query.go

@@ -60,8 +60,10 @@ func (ctx *Context) Query(query string) QueryResultsChan {
 		raw, promErr := ctx.query(query)
 		ctx.ErrorCollector.Report(promErr)
 
-		results, parseErr := NewQueryResults(query, raw)
-		ctx.ErrorCollector.Report(parseErr)
+		results := NewQueryResults(query, raw)
+		if results.Error != nil {
+			ctx.ErrorCollector.Report(results.Error)
+		}
 
 		resCh <- results
 	}(ctx, resCh)
@@ -89,9 +91,9 @@ func (ctx *Context) QuerySync(query string) ([]*QueryResult, error) {
 		return nil, err
 	}
 
-	results, err := NewQueryResults(query, raw)
-	if err != nil {
-		return nil, err
+	results := NewQueryResults(query, raw)
+	if results.Error != nil {
+		return nil, results.Error
 	}
 
 	return results.Results, nil
@@ -143,8 +145,10 @@ func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Dur
 		raw, promErr := ctx.queryRange(query, start, end, step)
 		ctx.ErrorCollector.Report(promErr)
 
-		results, parseErr := NewQueryResults(query, raw)
-		ctx.ErrorCollector.Report(parseErr)
+		results := NewQueryResults(query, raw)
+		if results.Error != nil {
+			ctx.ErrorCollector.Report(results.Error)
+		}
 
 		resCh <- results
 	}(ctx, resCh)
@@ -158,9 +162,9 @@ func (ctx *Context) QueryRangeSync(query string, start, end time.Time, step time
 		return nil, err
 	}
 
-	results, err := NewQueryResults(query, raw)
-	if err != nil {
-		return nil, err
+	results := NewQueryResults(query, raw)
+	if results.Error != nil {
+		return nil, results.Error
 	}
 
 	return results.Results, nil

+ 31 - 18
pkg/prom/result.go

@@ -63,32 +63,40 @@ type QueryResult struct {
 
 // NewQueryResults accepts the raw prometheus query result and returns an array of
 // QueryResult objects
-func NewQueryResults(query string, queryResult interface{}) (*QueryResults, error) {
+func NewQueryResults(query string, queryResult interface{}) *QueryResults {
+	qrs := &QueryResults{Query: query}
+
 	if queryResult == nil {
-		return nil, QueryResultNilErr
+		qrs.Error = QueryResultNilErr
+		return qrs
 	}
 
 	data, ok := queryResult.(map[string]interface{})["data"]
 	if !ok {
 		e, err := wrapPrometheusError(queryResult)
 		if err != nil {
-			return nil, err
+			qrs.Error = err
+			return qrs
 		}
-		return nil, fmt.Errorf(e)
+		qrs.Error = fmt.Errorf(e)
+		return qrs
 	}
 
 	// Deep Check for proper formatting
 	d, ok := data.(map[string]interface{})
 	if !ok {
-		return nil, DataFieldFormatErr
+		qrs.Error = DataFieldFormatErr
+		return qrs
 	}
 	resultData, ok := d["result"]
 	if !ok {
-		return nil, ResultFieldDoesNotExistErr
+		qrs.Error = ResultFieldDoesNotExistErr
+		return qrs
 	}
 	resultsData, ok := resultData.([]interface{})
 	if !ok {
-		return nil, ResultFieldFormatErr
+		qrs.Error = ResultFieldFormatErr
+		return qrs
 	}
 
 	// Result vectors from the query
@@ -98,16 +106,19 @@ func NewQueryResults(query string, queryResult interface{}) (*QueryResults, erro
 	for _, val := range resultsData {
 		resultInterface, ok := val.(map[string]interface{})
 		if !ok {
-			return nil, ResultFormatErr
+			qrs.Error = ResultFormatErr
+			return qrs
 		}
 
 		metricInterface, ok := resultInterface["metric"]
 		if !ok {
-			return nil, MetricFieldDoesNotExistErr
+			qrs.Error = MetricFieldDoesNotExistErr
+			return qrs
 		}
 		metricMap, ok := metricInterface.(map[string]interface{})
 		if !ok {
-			return nil, MetricFieldFormatErr
+			qrs.Error = MetricFieldFormatErr
+			return qrs
 		}
 
 		// Define label string for values to ensure that we only run labelsForMetric once
@@ -121,13 +132,15 @@ func NewQueryResults(query string, queryResult interface{}) (*QueryResults, erro
 		if !isRange {
 			dataPoint, ok := resultInterface["value"]
 			if !ok {
-				return nil, ValueFieldDoesNotExistErr
+				qrs.Error = ValueFieldDoesNotExistErr
+				return qrs
 			}
 
 			// Append new data point, log warnings
 			v, warn, err := parseDataPoint(dataPoint)
 			if err != nil {
-				return nil, err
+				qrs.Error = err
+				return qrs
 			}
 			if warn != nil {
 				log.Warningf("%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelsForMetric(metricMap))
@@ -137,14 +150,16 @@ func NewQueryResults(query string, queryResult interface{}) (*QueryResults, erro
 		} else {
 			values, ok := resultInterface["values"].([]interface{})
 			if !ok {
-				return nil, fmt.Errorf("Values field is improperly formatted")
+				qrs.Error = fmt.Errorf("Values field is improperly formatted")
+				return qrs
 			}
 
 			// Append new data points, log warnings
 			for _, value := range values {
 				v, warn, err := parseDataPoint(value)
 				if err != nil {
-					return nil, err
+					qrs.Error = err
+					return qrs
 				}
 				if warn != nil {
 					if labelString == "" {
@@ -163,10 +178,8 @@ func NewQueryResults(query string, queryResult interface{}) (*QueryResults, erro
 		})
 	}
 
-	return &QueryResults{
-		Query:   query,
-		Results: results,
-	}, nil
+	qrs.Results = results
+	return qrs
 }
 
 // GetString returns the requested field, or an error if it does not exist