Ver Fonte

Merge pull request #534 from kubecost/niko/assets

ClusterDisks, ClusterNodes: updates to support Assets ETL
Niko Kovacevic há 5 anos atrás
pai
commit
fa7ac1a6d6
3 ficheiros alterados com 295 adições e 40 exclusões
  1. 3 1
      pkg/cloud/awsprovider.go
  2. 3 1
      pkg/cloud/gcpprovider.go
  3. 289 38
      pkg/costmodel/cluster.go

+ 3 - 1
pkg/cloud/awsprovider.go

@@ -2419,7 +2419,9 @@ func (aws *AWS) ParseID(id string) string {
 	rx := regexp.MustCompile("aws://[^/]*/[^/]*/([^/]+)")
 	match := rx.FindStringSubmatch(id)
 	if len(match) < 2 {
-		log.Infof("awsprovider.ParseID: failed to parse %s", id)
+		if id != "" {
+			log.Infof("awsprovider.ParseID: failed to parse %s", id)
+		}
 		return id
 	}
 

+ 3 - 1
pkg/cloud/gcpprovider.go

@@ -1431,7 +1431,9 @@ func (gcp *GCP) ParseID(id string) string {
 	rx := regexp.MustCompile("gce://[^/]*/[^/]*/([^/]+)")
 	match := rx.FindStringSubmatch(id)
 	if len(match) < 2 {
-		log.Infof("gcpprovider.ParseID: failed to parse %s", id)
+		if id != "" {
+			log.Infof("gcpprovider.ParseID: failed to parse %s", id)
+		}
 		return id
 	}
 

+ 289 - 38
pkg/costmodel/cluster.go

@@ -111,6 +111,9 @@ type Disk struct {
 	Cost       float64
 	Bytes      float64
 	Local      bool
+	Start      time.Time
+	Minutes    float64
+	Breakdown  *ClusterCostsBreakdown
 }
 
 func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, []error) {
@@ -136,18 +139,28 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	ctx := prom.NewContext(client)
 	queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume) * avg(pv_hourly_cost) by (cluster_id, persistentvolume))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (cluster_id, persistentvolume)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+
 	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
+	queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
 	queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (cluster_id, node)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
 
 	resChPVCost := ctx.Query(queryPVCost)
 	resChPVSize := ctx.Query(queryPVSize)
+	resChActiveMins := ctx.Query(queryActiveMins)
 	resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
+	resChLocalStorageUsedCost := ctx.Query(queryLocalStorageUsedCost)
 	resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
+	resChLocalActiveMins := ctx.Query(queryLocalActiveMins)
 
 	resPVCost, _ := resChPVCost.Await()
 	resPVSize, _ := resChPVSize.Await()
+	resActiveMins, _ := resChActiveMins.Await()
 	resLocalStorageCost, _ := resChLocalStorageCost.Await()
+	resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
 	resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
+	resLocalActiveMins, _ := resChLocalActiveMins.Await()
 	if ctx.ErrorCollector.IsError() {
 		return nil, ctx.Errors()
 	}
@@ -172,8 +185,9 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
-				Cluster: cluster,
-				Name:    name,
+				Cluster:   cluster,
+				Name:      name,
+				Breakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		diskMap[key].Cost += cost
@@ -197,8 +211,9 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
-				Cluster: cluster,
-				Name:    name,
+				Cluster:   cluster,
+				Name:      name,
+				Breakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		diskMap[key].Bytes = bytes
@@ -216,20 +231,44 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 			continue
 		}
 
-		// TODO niko/assets storage class?
-
 		cost := result.Values[0].Value
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
-				Cluster: cluster,
-				Name:    name,
-				Local:   true,
+				Cluster:   cluster,
+				Name:      name,
+				Breakdown: &ClusterCostsBreakdown{},
+				Local:     true,
 			}
 		}
 		diskMap[key].Cost += cost
 	}
 
+	for _, result := range resLocalStorageUsedCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("instance")
+		if err != nil {
+			log.Warningf("ClusterDisks: local storage usage data missing instance")
+			continue
+		}
+
+		cost := result.Values[0].Value
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			diskMap[key] = &Disk{
+				Cluster:   cluster,
+				Name:      name,
+				Breakdown: &ClusterCostsBreakdown{},
+				Local:     true,
+			}
+		}
+		diskMap[key].Breakdown.System = cost / diskMap[key].Cost
+	}
+
 	for _, result := range resLocalStorageBytes {
 		cluster, err := result.GetString("cluster_id")
 		if err != nil {
@@ -242,8 +281,6 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 			continue
 		}
 
-		// TODO niko/assets storage class
-
 		bytes := result.Values[0].Value
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := diskMap[key]; !ok {
@@ -256,21 +293,94 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		diskMap[key].Bytes = bytes
 	}
 
+	for _, result := range resActiveMins {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("persistentvolume")
+		if err != nil {
+			log.Warningf("ClusterDisks: active mins missing instance")
+			continue
+		}
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			log.Warningf("ClusterDisks: active mins for unidentified disk")
+			continue
+		}
+
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		s := time.Unix(int64(result.Values[0].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		mins := e.Sub(s).Minutes()
+
+		// TODO niko/assets if mins >= threshold, interpolate for missing data?
+
+		diskMap[key].Start = s
+		diskMap[key].Minutes = mins
+	}
+
+	for _, result := range resLocalActiveMins {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterDisks: local active mins data missing instance")
+			continue
+		}
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			log.Warningf("ClusterDisks: local active mins for unidentified disk")
+			continue
+		}
+
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		s := time.Unix(int64(result.Values[0].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		mins := e.Sub(s).Minutes()
+
+		// TODO niko/assets if mins >= threshold, interpolate for missing data?
+
+		diskMap[key].Start = s
+		diskMap[key].Minutes = mins
+	}
+
+	for _, disk := range diskMap {
+		// Apply all remaining RAM to Idle
+		disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
+	}
+
 	return diskMap, nil
 }
 
 type Node struct {
-	Cluster     string
-	Name        string
-	ProviderID  string
-	NodeType    string
-	CPUCost     float64
-	CPUCores    float64
-	GPUCost     float64
-	RAMCost     float64
-	RAMBytes    float64
-	Discount    float64
-	Preemptible bool
+	Cluster      string
+	Name         string
+	ProviderID   string
+	NodeType     string
+	CPUCost      float64
+	CPUCores     float64
+	GPUCost      float64
+	RAMCost      float64
+	RAMBytes     float64
+	Discount     float64
+	Preemptible  bool
+	CPUBreakdown *ClusterCostsBreakdown
+	RAMBreakdown *ClusterCostsBreakdown
+	Start        time.Time
+	Minutes      float64
 }
 
 func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*Node, []error) {
@@ -297,6 +407,10 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost) by (cluster_id, node, provider_id))[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeLabels := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeCPUModePct := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
+	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
+	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
+	queryActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (cluster_id, node)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
 
 	resChNodeCPUCost := ctx.Query(queryNodeCPUCost)
 	resChNodeCPUCores := ctx.Query(queryNodeCPUCores)
@@ -304,6 +418,10 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resChNodeRAMBytes := ctx.Query(queryNodeRAMBytes)
 	resChNodeGPUCost := ctx.Query(queryNodeGPUCost)
 	resChNodeLabels := ctx.Query(queryNodeLabels)
+	resChNodeCPUModePct := ctx.Query(queryNodeCPUModePct)
+	resChNodeRAMSystemPct := ctx.Query(queryNodeRAMSystemPct)
+	resChNodeRAMUserPct := ctx.Query(queryNodeRAMUserPct)
+	resChActiveMins := ctx.Query(queryActiveMins)
 
 	resNodeCPUCost, _ := resChNodeCPUCost.Await()
 	resNodeCPUCores, _ := resChNodeCPUCores.Await()
@@ -311,6 +429,10 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resNodeRAMCost, _ := resChNodeRAMCost.Await()
 	resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
 	resNodeLabels, _ := resChNodeLabels.Await()
+	resNodeCPUModePct, _ := resChNodeCPUModePct.Await()
+	resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
+	resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
+	resActiveMins, _ := resChActiveMins.Await()
 	if ctx.ErrorCollector.IsError() {
 		return nil, ctx.Errors()
 	}
@@ -337,10 +459,12 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster:    cluster,
-				Name:       name,
-				NodeType:   nodeType,
-				ProviderID: cp.ParseID(providerID),
+				Cluster:      cluster,
+				Name:         name,
+				NodeType:     nodeType,
+				ProviderID:   cp.ParseID(providerID),
+				CPUBreakdown: &ClusterCostsBreakdown{},
+				RAMBreakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		nodeMap[key].CPUCost += cpuCost
@@ -368,8 +492,10 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster: cluster,
-				Name:    name,
+				Cluster:      cluster,
+				Name:         name,
+				CPUBreakdown: &ClusterCostsBreakdown{},
+				RAMBreakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		node := nodeMap[key]
@@ -405,10 +531,12 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster:    cluster,
-				Name:       name,
-				NodeType:   nodeType,
-				ProviderID: cp.ParseID(providerID),
+				Cluster:      cluster,
+				Name:         name,
+				NodeType:     nodeType,
+				ProviderID:   cp.ParseID(providerID),
+				CPUBreakdown: &ClusterCostsBreakdown{},
+				RAMBreakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		nodeMap[key].RAMCost += ramCost
@@ -432,8 +560,10 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster: cluster,
-				Name:    name,
+				Cluster:      cluster,
+				Name:         name,
+				CPUBreakdown: &ClusterCostsBreakdown{},
+				RAMBreakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		nodeMap[key].RAMBytes = ramBytes
@@ -459,15 +589,133 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		key := fmt.Sprintf("%s/%s", cluster, name)
 		if _, ok := nodeMap[key]; !ok {
 			nodeMap[key] = &Node{
-				Cluster:    cluster,
-				Name:       name,
-				NodeType:   nodeType,
-				ProviderID: cp.ParseID(providerID),
+				Cluster:      cluster,
+				Name:         name,
+				NodeType:     nodeType,
+				ProviderID:   cp.ParseID(providerID),
+				CPUBreakdown: &ClusterCostsBreakdown{},
+				RAMBreakdown: &ClusterCostsBreakdown{},
 			}
 		}
 		nodeMap[key].GPUCost += gpuCost
 	}
 
+	for _, result := range resNodeCPUModePct {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("kubernetes_node")
+		if err != nil {
+			log.Warningf("ClusterNodes: CPU mode data missing node")
+			continue
+		}
+
+		mode, err := result.GetString("mode")
+		if err != nil {
+			log.Warningf("ClusterNodes: unable to read CPU mode: %s", err)
+			mode = "other"
+		}
+
+		pct := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			log.Warningf("ClusterNodes: CPU mode data for unidentified node")
+			continue
+		}
+
+		switch mode {
+		case "idle":
+			nodeMap[key].CPUBreakdown.Idle += pct
+		case "system":
+			nodeMap[key].CPUBreakdown.System += pct
+		case "user":
+			nodeMap[key].CPUBreakdown.User += pct
+		default:
+			nodeMap[key].CPUBreakdown.Other += pct
+		}
+	}
+
+	for _, result := range resNodeRAMSystemPct {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("instance")
+		if err != nil {
+			log.Warningf("ClusterNodes: RAM system percent missing node")
+			continue
+		}
+
+		pct := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			log.Warningf("ClusterNodes: RAM system percent for unidentified node")
+			continue
+		}
+
+		nodeMap[key].RAMBreakdown.System += pct
+	}
+
+	for _, result := range resNodeRAMUserPct {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("instance")
+		if err != nil {
+			log.Warningf("ClusterNodes: RAM system percent missing node")
+			continue
+		}
+
+		pct := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			log.Warningf("ClusterNodes: RAM system percent for unidentified node")
+			continue
+		}
+
+		nodeMap[key].RAMBreakdown.User += pct
+	}
+
+	for _, result := range resActiveMins {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterNodes: active mins missing node")
+			continue
+		}
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			log.Warningf("ClusterNodes: active mins for unidentified node")
+			continue
+		}
+
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		s := time.Unix(int64(result.Values[0].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		mins := e.Sub(s).Minutes()
+
+		// TODO niko/assets if mins >= threshold, interpolate for missing data?
+
+		nodeMap[key].Start = s
+		nodeMap[key].Minutes = mins
+	}
+
 	// Determine preemptibility with node labels
 	for _, result := range resNodeLabels {
 		nodeName, err := result.GetString("node")
@@ -510,6 +758,9 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	for _, node := range nodeMap {
 		// TODO take RI into account
 		node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
+
+		// Apply all remaining RAM to Idle
+		node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
 	}
 
 	return nodeMap, nil