Просмотр исходного кода

Merge pull request #595 from kubecost/AjayTripathy-downsampling-safe-queries

Ajay tripathy downsampling safe queries
Ajay Tripathy 5 лет назад
Родитель
Сommit
95fe9cc210
2 измененных файлов с 17 добавлено и 56 удалено
  1. 2 2
      pkg/cloud/awsprovider.go
  2. 15 54
      pkg/costmodel/costmodel.go

+ 2 - 2
pkg/cloud/awsprovider.go

@@ -1881,8 +1881,8 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 	page := 0
 	processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
 		iter := op.ResultSet.Rows
-		if page == 0 && len(iter) > 1 {
-			iter = op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)]
+		if page == 0 && len(iter) > 0 {
+			iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
 		}
 		page++
 		for _, r := range iter {

+ 15 - 54
pkg/costmodel/costmodel.go

@@ -181,36 +181,31 @@ const (
 	sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace, cluster_id, kubernetes_name)) by (persistentvolumeclaim, storageclass, namespace, volumename, cluster_id)`
 	// queryRAMAllocationByteHours yields the total byte-hour RAM allocation over the given
 	// window, aggregated by container.
-	//  [line 3]     sum_over_time(each byte*min in window) / (min/hr kubecost up) = [byte*hour] by metric, adjusted for kubecost downtime
-	//  [lines 2,4]  sum(") by unique container key = [byte*hour] by container
+	//  [line 3]  sum_over_time(each byte) = [byte*scrape] by metric
+	//  [line 4] (scalar(avg(prometheus_target_interval_length_seconds)) = [seconds/scrape] / 60 / 60 =  [hours/scrape] by container
+	//  [lines 2,4]  sum(") by unique container key and multiply [byte*scrape] * [hours/scrape] for byte*hours
 	//  [lines 1,5]  relabeling
 	queryRAMAllocationByteHours = `
 		label_replace(label_replace(
 			sum(
-				sum_over_time(container_memory_allocation_bytes{container!="",container!="POD", node!=""}[%s:1m]) / %f 
-			) by (namespace,container,pod,node,cluster_id)
+				sum_over_time(container_memory_allocation_bytes{container!="",container!="POD", node!=""}[%s])
+			) by (namespace,container,pod,node,cluster_id) * (scalar(avg(prometheus_target_interval_length_seconds)) / 60 / 60)
 		, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
 	// queryCPUAllocationVCPUHours yields the total VCPU-hour CPU allocation over the given
 	// window, aggregated by container.
-	//  [line 3]     sum_over_time(each VCPU*mins in window) / (min/hr kubecost up) = [VCPU*hour] by metric, adjusted for kubecost downtime
-	//  [lines 2,4]  sum(") by unique container key = [VCPU*hour] by container
+	//  [line 3] sum_over_time(each VCPU*mins in window) = [VCPU*scrape] by metric
+	//  [line 4] (scalar(avg(prometheus_target_interval_length_seconds)) = [seconds/scrape] / 60 / 60 =  [hours/scrape] by container
+	//  [lines 2,4]  sum(") by unique container key and multiply [VCPU*scrape] * [hours/scrape] for VCPU*hours
 	//  [lines 1,5]  relabeling
 	queryCPUAllocationVCPUHours = `
 		label_replace(label_replace(
 			sum(
-				sum_over_time(container_cpu_allocation{container!="",container!="POD", node!=""}[%s:1m]) / %f
-			) by (namespace,container,pod,node,cluster_id)
+				sum_over_time(container_cpu_allocation{container!="",container!="POD", node!=""}[%s])
+			) by (namespace,container,pod,node,cluster_id) * (scalar(avg(prometheus_target_interval_length_seconds)) / 60 / 60)
 		, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
 	// queryPVCAllocationFmt yields the total byte-hour PVC allocation over the given window.
-	//  sum(all VCPU measurements within given window) = [byte*min] by metric
-	//  (") / 60 = [byte*hour] by metric, assuming no missed scrapes
-	//  (") * (normalization factor) = [byte*hour] by metric, normalized for missed scrapes
-	//  sum(") by unique pvc = [VCPU*hour] by (cluster, namespace, pod, pv, pvc)
-	// Note: normalization factor is 1.0 if no scrapes are missed and has an upper bound determined by minExpectedScrapeRate
-	// so that coarse resolutions don't push normalization factors too high; e.g. 24h resolution with 1h of data would make
-	// for a normalization factor of 24. With a minimumExpectedScrapeRate of 0.95, that caps the norm factor at
-	queryPVCAllocationFmt = `sum(sum_over_time(pod_pvc_allocation[%s:1m])) by (cluster_id, namespace, pod, persistentvolume, persistentvolumeclaim) / 60
-		* 60 / clamp_min(count_over_time(sum(pod_pvc_allocation) by (cluster_id, namespace, pod, persistentvolume, persistentvolumeclaim)[%s:1m])/%f, 60 * %f)`
+	// sum_over_time(each byte) = [byte*scrape] by metric *(scalar(avg(prometheus_target_interval_length_seconds)) = [seconds/scrape] / 60 / 60 =  [hours/scrape] by pod
+	queryPVCAllocationFmt     = `sum(sum_over_time(pod_pvc_allocation[%s])) by (cluster_id, namespace, pod, persistentvolume, persistentvolumeclaim) * scalar(avg(prometheus_target_interval_length_seconds)/60/60)`
 	queryPVHourlyCostFmt      = `avg_over_time(pv_hourly_cost[%s])`
 	queryNSLabels             = `avg_over_time(kube_namespace_labels[%s])`
 	queryPodLabels            = `avg_over_time(kube_pod_labels[%s])`
@@ -223,7 +218,6 @@ const (
 	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryInternetNetworkUsage = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	normalizationStr          = `max(count_over_time(kube_pod_container_resource_requests_memory_bytes{}[%s] %s))`
-	kubecostUpMinsPerHourStr  = `max(count_over_time(node_cpu_hourly_cost[%s:1m])) / %f`
 )
 
 func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
@@ -1505,48 +1499,15 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	ctx := prom.NewContext(cli)
 
-	// Query for the average number of minutes per hour that Kubecost was up
-	// in the given range by averaging the number of up minutes-per-hour for
-	// each window in the range. Use that number in the RAM and CPU allocation
-	// queries as the adjutsment factor, scaling only if Kubecost was down
-	// for fewer than 3 minutes (as a heuristic for a reasonable amount of
-	// time to interpolate). Otherwise, use 60 minutes per hour and assume
-	// that this period of time is during Kubecost start-up or a long-term
-	// downtime for which we don't want to interpolate.
-	queryKubecostUpMinsPerHour := fmt.Sprintf(kubecostUpMinsPerHourStr, windowString, window.Hours())
-	resKubecostUp, err := ctx.QueryRangeSync(queryKubecostUpMinsPerHour, start, end, window)
-	if err != nil {
-		log.Errorf("costDataRange: error querying Kubecost up: %s", err)
-		return nil, err
-	}
-
-	kubecostMinsPerHour := 0.0
-	num := 0
-	if len(resKubecostUp) > 0 {
-		for _, val := range resKubecostUp[0].Values {
-			kubecostMinsPerHour += val.Value
-			num++
-		}
-		kubecostMinsPerHour /= float64(num)
-	}
-	if kubecostMinsPerHour <= 57.0 {
-		kubecostMinsPerHour = 60.0
-	}
-
-	// TODO niko/queryfix rewrite PVCAllocation query too, and remove this
-	// Use a heuristic to tell the difference between missed scrapes and an incomplete window
-	// of data due to fresh install, etc.
-	minimumExpectedScrapeRate := 0.95
-
-	queryRAMAlloc := fmt.Sprintf(queryRAMAllocationByteHours, windowString, kubecostMinsPerHour)
-	queryCPUAlloc := fmt.Sprintf(queryCPUAllocationVCPUHours, windowString, kubecostMinsPerHour)
+	queryRAMAlloc := fmt.Sprintf(queryRAMAllocationByteHours, windowString)
+	queryCPUAlloc := fmt.Sprintf(queryCPUAllocationVCPUHours, windowString)
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
 	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
 	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
 	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "", resolutionHours, windowString, "")
 	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
-	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, windowString, windowString, resolutionHours, minimumExpectedScrapeRate)
+	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, windowString)
 	queryPVHourlyCost := fmt.Sprintf(queryPVHourlyCostFmt, windowString)
 	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, windowString, "")
 	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, windowString, "")