Przeglądaj źródła

Merge pull request #498 from kubecost/niko/assets

WIP ClusterDisks and ClusterNodes
Niko Kovacevic 5 lat temu
rodzic
commit
6da0d345c4
1 zmienionych plików z 392 dodań i 0 usunięć
  1. 392 0
      pkg/costmodel/cluster.go

+ 392 - 0
pkg/costmodel/cluster.go

@@ -6,6 +6,7 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/env"
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
 
@@ -103,6 +104,397 @@ func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offse
 	return cc, nil
 }
 
+type Disk struct {
+	Cluster    string
+	Name       string
+	ProviderID string
+	Cost       float64
+	Bytes      float64
+	Local      bool
+}
+
+func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, []error) {
+	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
+	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
+	if offset < time.Minute {
+		offsetStr = ""
+	}
+
+	// minsPerResolution determines accuracy and resource use for the following
+	// queries. Smaller values (higher resolution) result in better accuracy,
+	// but more expensive queries, and vice-a-versa.
+	minsPerResolution := 1
+
+	// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
+	// value, converts it to a cumulative value; i.e.
+	// [$/hr] * [min/res]*[hr/min] = [$/res]
+	hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
+
+	// TODO niko/assets how do we not hard-code this price?
+	costPerGBHr := 0.04 / 730.0
+
+	ctx := prom.NewContext(client)
+	queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume) * avg(pv_hourly_cost) by (cluster_id, persistentvolume))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
+	queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+
+	resChPVCost := ctx.Query(queryPVCost)
+	resChPVSize := ctx.Query(queryPVSize)
+	resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
+	resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
+
+	resPVCost := resChPVCost.Await()
+	resPVSize := resChPVSize.Await()
+	resLocalStorageCost := resChLocalStorageCost.Await()
+	resLocalStorageBytes := resChLocalStorageBytes.Await()
+	if ctx.ErrorCollector.IsError() {
+		return nil, ctx.Errors()
+	}
+
+	diskMap := map[string]*Disk{}
+
+	for _, result := range resPVCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("persistentvolume")
+		if err != nil {
+			log.Warningf("ClusterDisks: PV cost data missing persistentvolume")
+			continue
+		}
+
+		// TODO niko/assets storage class
+
+		cost := result.Values[0].Value
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			diskMap[key] = &Disk{
+				Cluster: cluster,
+				Name:    name,
+			}
+		}
+		diskMap[key].Cost = cost
+	}
+
+	for _, result := range resPVSize {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("persistentvolume")
+		if err != nil {
+			log.Warningf("ClusterDisks: PV size data missing persistentvolume")
+			continue
+		}
+
+		// TODO niko/assets storage class
+
+		bytes := result.Values[0].Value
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			diskMap[key] = &Disk{
+				Cluster: cluster,
+				Name:    name,
+			}
+		}
+		diskMap[key].Bytes = bytes
+	}
+
+	for _, result := range resLocalStorageCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("instance")
+		if err != nil {
+			log.Warningf("ClusterDisks: local storage data missing instance")
+			continue
+		}
+
+		// TODO niko/assets storage class?
+
+		cost := result.Values[0].Value
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			diskMap[key] = &Disk{
+				Cluster: cluster,
+				Name:    name,
+				Local:   true,
+			}
+		}
+		diskMap[key].Cost = cost
+	}
+
+	for _, result := range resLocalStorageBytes {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("instance")
+		if err != nil {
+			log.Warningf("ClusterDisks: local storage data missing instance")
+			continue
+		}
+
+		// TODO niko/assets storage class
+
+		bytes := result.Values[0].Value
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			diskMap[key] = &Disk{
+				Cluster: cluster,
+				Name:    name,
+				Local:   true,
+			}
+		}
+		diskMap[key].Bytes = bytes
+	}
+
+	return diskMap, nil
+}
+
+type Node struct {
+	Cluster     string
+	Name        string
+	ProviderID  string
+	NodeType    string
+	CPUCost     float64
+	CPUCores    float64
+	GPUCost     float64
+	RAMCost     float64
+	RAMBytes    float64
+	Discount    float64
+	Preemptible bool
+}
+
+func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*Node, []error) {
+	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
+	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
+	if offset < time.Minute {
+		offsetStr = ""
+	}
+
+	// minsPerResolution determines accuracy and resource use for the following
+	// queries. Smaller values (higher resolution) result in better accuracy,
+	// but more expensive queries, and vice-a-versa.
+	minsPerResolution := 1
+
+	// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
+	// value, converts it to a cumulative value; i.e.
+	// [$/hr] * [min/res]*[hr/min] = [$/res]
+	hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
+
+	ctx := prom.NewContext(client)
+	queryNodeCPUCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node) * on(node, cluster_id) group_right avg(node_cpu_hourly_cost) by (cluster_id, node, instance_type))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryNodeCPUCores := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost) by (cluster_id, node))[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+
+	resChNodeCPUCost := ctx.Query(queryNodeCPUCost)
+	resChNodeCPUCores := ctx.Query(queryNodeCPUCores)
+	resChNodeRAMCost := ctx.Query(queryNodeRAMCost)
+	resChNodeRAMBytes := ctx.Query(queryNodeRAMBytes)
+	resChNodeGPUCost := ctx.Query(queryNodeGPUCost)
+	resChNodeLabels := ctx.Query(queryNodeLabels)
+
+	resNodeCPUCost := resChNodeCPUCost.Await()
+	resNodeCPUCores := resChNodeCPUCores.Await()
+	resNodeGPUCost := resChNodeGPUCost.Await()
+	resNodeRAMCost := resChNodeRAMCost.Await()
+	resNodeRAMBytes := resChNodeRAMBytes.Await()
+	resNodeLabels := resChNodeLabels.Await()
+	if ctx.ErrorCollector.IsError() {
+		return nil, ctx.Errors()
+	}
+
+	nodeMap := map[string]*Node{}
+
+	for _, result := range resNodeCPUCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterNodes: CPU cost data missing node")
+			continue
+		}
+
+		nodeType, err := result.GetString("instance_type")
+		if err != nil {
+			log.Warningf("ClusterNodes: CPU cost data missing node type")
+		}
+
+		cpuCost := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Cluster:  cluster,
+				Name:     name,
+				NodeType: nodeType,
+			}
+		}
+		nodeMap[key].CPUCost = cpuCost
+		nodeMap[key].NodeType = nodeType
+	}
+
+	for _, result := range resNodeCPUCores {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterNodes: CPU cores data missing node")
+			continue
+		}
+
+		cpuCores := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Cluster: cluster,
+				Name:    name,
+			}
+		}
+		nodeMap[key].CPUCores = cpuCores
+	}
+
+	for _, result := range resNodeRAMCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterNodes: RAM cost data missing node")
+			continue
+		}
+
+		nodeType, err := result.GetString("instance_type")
+		if err != nil {
+			log.Warningf("ClusterNodes: RAM cost data missing node type")
+		}
+
+		ramCost := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Cluster:  cluster,
+				Name:     name,
+				NodeType: nodeType,
+			}
+		}
+		nodeMap[key].RAMCost = ramCost
+		nodeMap[key].NodeType = nodeType
+	}
+
+	for _, result := range resNodeRAMBytes {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterNodes: RAM bytes data missing node")
+			continue
+		}
+
+		ramBytes := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Cluster: cluster,
+				Name:    name,
+			}
+		}
+		nodeMap[key].RAMBytes = ramBytes
+	}
+
+	for _, result := range resNodeGPUCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterNodes: GPU cost data missing node")
+			continue
+		}
+
+		gpuCost := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Cluster: cluster,
+				Name:    name,
+			}
+		}
+		nodeMap[key].GPUCost = gpuCost
+	}
+
+	// node_labels label_cloud_google_com_gke_preemptible
+	for _, result := range resNodeLabels {
+		nodeName, err := result.GetString("node")
+		if err != nil {
+			continue
+		}
+
+		// GCP preemptible label
+		pre, _ := result.GetString("label_cloud_google_com_gke_preemptible")
+		if node, ok := nodeMap[nodeName]; pre == "true" && ok {
+			node.Preemptible = true
+		}
+
+		// TODO AWS preemptible
+		// TODO Azure preemptible
+	}
+
+	c, err := cp.GetConfig()
+	if err != nil {
+		return nil, []error{err}
+	}
+	discount, err := ParsePercentString(c.Discount)
+	if err != nil {
+		return nil, []error{err}
+	}
+	negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
+	if err != nil {
+		return nil, []error{err}
+	}
+
+	for _, node := range nodeMap {
+		if !node.Preemptible {
+			// TODO determine discount(s) based on:
+			// - custom settings
+			// - node RI data
+			// - provider-specific rules, e.g.
+			//   cp.GetDiscount(instanceType string) float64
+			node.Discount = (1.0 - (1.0-discount)*(1.0-negotiatedDiscount))
+		}
+	}
+
+	return nodeMap, nil
+}
+
 // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
 func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string, withBreakdown bool) (map[string]*ClusterCosts, error) {
 	// Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data