Browse Source

Merge pull request #610 from kubecost/AjayTripathy-fix-minsq

smooth minutes query out over kubecost restarts
Ajay Tripathy 5 years ago
parent
commit
b9a7ff73c7
4 changed files with 17 additions and 14 deletions
  1. 1 1
      pkg/costmodel/cluster.go
  2. 14 11
      pkg/costmodel/costmodel.go
  3. 1 1
      pkg/costmodel/promparsers.go
  4. 1 1
      pkg/costmodel/router.go

+ 1 - 1
pkg/costmodel/cluster.go

@@ -429,7 +429,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
-	queryActiveMins := fmt.Sprintf(`node_total_hourly_cost[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+	queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node,cluster_id)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
 
 
 	// Return errors if these fail
 	// Return errors if these fail
 	resChNodeCPUCost := requiredCtx.Query(queryNodeCPUCost)
 	resChNodeCPUCost := requiredCtx.Query(queryNodeCPUCost)

+ 14 - 11
pkg/costmodel/costmodel.go

@@ -609,7 +609,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		}
 		}
 	}
 	}
 
 
-	err = findDeletedNodeInfo(cli, missingNodes, window)
+	err = findDeletedNodeInfo(cli, missingNodes, window, "")
 	if err != nil {
 	if err != nil {
 		klog.V(1).Infof("Error fetching historical node data: %s", err.Error())
 		klog.V(1).Infof("Error fetching historical node data: %s", err.Error())
 	}
 	}
@@ -698,13 +698,13 @@ func findDeletedPodInfo(cli prometheusClient.Client, missingContainers map[strin
 	return nil
 	return nil
 }
 }
 
 
-func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*costAnalyzerCloud.Node, window string) error {
+func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*costAnalyzerCloud.Node, window, offset string) error {
 	if len(missingNodes) > 0 {
 	if len(missingNodes) > 0 {
 		defer measureTime(time.Now(), profileThreshold, "Finding Deleted Node Info")
 		defer measureTime(time.Now(), profileThreshold, "Finding Deleted Node Info")
 
 
-		queryHistoricalCPUCost := fmt.Sprintf(`avg_over_time(node_cpu_hourly_cost[%s])`, window)
-		queryHistoricalRAMCost := fmt.Sprintf(`avg_over_time(node_ram_hourly_cost[%s])`, window)
-		queryHistoricalGPUCost := fmt.Sprintf(`avg_over_time(node_gpu_hourly_cost[%s])`, window)
+		queryHistoricalCPUCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s] offset %s)) by (node, instance, cluster_id)`, window, offset)
+		queryHistoricalRAMCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s] offset %s)) by (node, instance, cluster_id)`, window, offset)
+		queryHistoricalGPUCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s] offset %s)) by (node, instance, cluster_id)`, window, offset)
 
 
 		ctx := prom.NewContext(cli)
 		ctx := prom.NewContext(cli)
 		cpuCostResCh := ctx.Query(queryHistoricalCPUCost)
 		cpuCostResCh := ctx.Query(queryHistoricalCPUCost)
@@ -738,6 +738,8 @@ func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*c
 		for node, costv := range cpuCosts {
 		for node, costv := range cpuCosts {
 			if _, ok := missingNodes[node]; ok {
 			if _, ok := missingNodes[node]; ok {
 				missingNodes[node].VCPUCost = fmt.Sprintf("%f", costv[0].Value)
 				missingNodes[node].VCPUCost = fmt.Sprintf("%f", costv[0].Value)
+			} else {
+				log.DedupedWarningf(5, "Node `%s` in prometheus but not k8s api", node)
 			}
 			}
 		}
 		}
 		for node, costv := range ramCosts {
 		for node, costv := range ramCosts {
@@ -1443,9 +1445,10 @@ func requestKeyFor(startString string, endString string, windowString string, fi
 	return fmt.Sprintf("%s,%s,%s,%s,%s,%t", startKey, endKey, windowString, filterNamespace, filterCluster, remoteEnabled)
 	return fmt.Sprintf("%s,%s,%s,%s,%s,%t", startKey, endKey, windowString, filterNamespace, filterCluster, remoteEnabled)
 }
 }
 
 
-// Executes a range query for cost data
+// ComputeCostDataRange executes a range query for cost data.
+// Note that "offset" represents the time between the function call and "endString", and is also passed for convenience
 func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
 func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
-	startString, endString, windowString string, resolutionHours float64, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
+	startString, endString, windowString string, resolutionHours float64, filterNamespace string, filterCluster string, remoteEnabled bool, offset string) (map[string]*CostData, error) {
 	// Create a request key for request grouping. This key will be used to represent the cost-model result
 	// Create a request key for request grouping. This key will be used to represent the cost-model result
 	// for the specific inputs to prevent multiple queries for identical data.
 	// for the specific inputs to prevent multiple queries for identical data.
 	key := requestKeyFor(startString, endString, windowString, filterNamespace, filterCluster, remoteEnabled)
 	key := requestKeyFor(startString, endString, windowString, filterNamespace, filterCluster, remoteEnabled)
@@ -1455,7 +1458,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	// If there is already a request out that uses the same data, wait for it to return to share the results.
 	// If there is already a request out that uses the same data, wait for it to return to share the results.
 	// Otherwise, start executing.
 	// Otherwise, start executing.
 	result, err, _ := cm.RequestGroup.Do(key, func() (interface{}, error) {
 	result, err, _ := cm.RequestGroup.Do(key, func() (interface{}, error) {
-		return cm.costDataRange(cli, clientset, cp, startString, endString, windowString, resolutionHours, filterNamespace, filterCluster, remoteEnabled)
+		return cm.costDataRange(cli, clientset, cp, startString, endString, windowString, resolutionHours, filterNamespace, filterCluster, remoteEnabled, offset)
 	})
 	})
 
 
 	data, ok := result.(map[string]*CostData)
 	data, ok := result.(map[string]*CostData)
@@ -1466,7 +1469,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	return data, err
 	return data, err
 }
 }
 
 
-func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider, startString, endString, windowString string, resolutionHours float64, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
+func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider, startString, endString, windowString string, resolutionHours float64, filterNamespace string, filterCluster string, remoteEnabled bool, offset string) (map[string]*CostData, error) {
 	layout := "2006-01-02T15:04:05.000Z"
 	layout := "2006-01-02T15:04:05.000Z"
 
 
 	start, err := time.Parse(layout, startString)
 	start, err := time.Parse(layout, startString)
@@ -1848,7 +1851,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 
 		namespaceLabels, ok := namespaceLabelsMapping[nsKey]
 		namespaceLabels, ok := namespaceLabelsMapping[nsKey]
 		if !ok {
 		if !ok {
-			klog.V(3).Infof("Missing data for namespace %s", c.Namespace)
+			klog.V(4).Infof("Missing data for namespace %s", c.Namespace)
 		}
 		}
 
 
 		pLabels := podLabels[podKey]
 		pLabels := podLabels[podKey]
@@ -1982,7 +1985,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	w += window
 	w += window
 	if w.Minutes() > 0 {
 	if w.Minutes() > 0 {
 		wStr := fmt.Sprintf("%dm", int(w.Minutes()))
 		wStr := fmt.Sprintf("%dm", int(w.Minutes()))
-		err = findDeletedNodeInfo(cli, missingNodes, wStr)
+		err = findDeletedNodeInfo(cli, missingNodes, wStr, offset)
 		if err != nil {
 		if err != nil {
 			klog.V(1).Infof("Error fetching historical node data: %s", err.Error())
 			klog.V(1).Infof("Error fetching historical node data: %s", err.Error())
 		}
 		}

+ 1 - 1
pkg/costmodel/promparsers.go

@@ -399,7 +399,7 @@ func getCost(qrs []*prom.QueryResult) (map[string][]*util.Vector, error) {
 	toReturn := make(map[string][]*util.Vector)
 	toReturn := make(map[string][]*util.Vector)
 
 
 	for _, val := range qrs {
 	for _, val := range qrs {
-		instance, err := val.GetString("instance")
+		instance, err := val.GetString("node")
 		if err != nil {
 		if err != nil {
 			return toReturn, err
 			return toReturn, err
 		}
 		}

+ 1 - 1
pkg/costmodel/router.go

@@ -386,7 +386,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	}
 	}
 
 
 	resolutionHours := 1.0
 	resolutionHours := 1.0
-	data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, window, resolutionHours, namespace, cluster, remoteEnabled)
+	data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, window, resolutionHours, namespace, cluster, remoteEnabled, "")
 	if err != nil {
 	if err != nil {
 		w.Write(WrapData(nil, err))
 		w.Write(WrapData(nil, err))
 	}
 	}