Niko Kovacevic 5 лет назад
Родитель
Сommit
5f375f9e64
2 измененных файлов с 436 добавлено и 8 удалено
  1. 427 8
      pkg/costmodel/cluster.go
  2. 9 0
      pkg/costmodel/costmodel.go

+ 427 - 8
pkg/costmodel/cluster.go

@@ -6,6 +6,7 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/env"
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
 
@@ -103,6 +104,397 @@ func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offse
 	return cc, nil
 }
 
+type Disk struct {
+	Cluster    string
+	Name       string
+	ProviderID string
+	Cost       float64
+	Bytes      float64
+	Local      bool
+}
+
+func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, []error) {
+	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
+	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
+	if offset < time.Minute {
+		offsetStr = ""
+	}
+
+	// minsPerResolution determines accuracy and resource use for the following
+	// queries. Smaller values (higher resolution) result in better accuracy,
+	// but more expensive queries, and vice-a-versa.
+	minsPerResolution := 1
+
+	// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
+	// value, converts it to a cumulative value; i.e.
+	// [$/hr] * [min/res]*[hr/min] = [$/res]
+	hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
+
+	// TODO niko/assets how do we not hard-code this price?
+	costPerGBHr := 0.04 / 730.0
+
+	ctx := prom.NewContext(client)
+	queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume) * avg(pv_hourly_cost) by (cluster_id, persistentvolume))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
+	queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+
+	resChPVCost := ctx.Query(queryPVCost)
+	resChPVSize := ctx.Query(queryPVSize)
+	resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
+	resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
+
+	resPVCost, _ := resChPVCost.Await()
+	resPVSize, _ := resChPVSize.Await()
+	resLocalStorageCost, _ := resChLocalStorageCost.Await()
+	resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
+	if ctx.ErrorCollector.IsError() {
+		return nil, ctx.Errors()
+	}
+
+	diskMap := map[string]*Disk{}
+
+	for _, result := range resPVCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("persistentvolume")
+		if err != nil {
+			log.Warningf("ClusterDisks: PV cost data missing persistentvolume")
+			continue
+		}
+
+		// TODO niko/assets storage class
+
+		cost := result.Values[0].Value
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			diskMap[key] = &Disk{
+				Cluster: cluster,
+				Name:    name,
+			}
+		}
+		diskMap[key].Cost = cost
+	}
+
+	for _, result := range resPVSize {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("persistentvolume")
+		if err != nil {
+			log.Warningf("ClusterDisks: PV size data missing persistentvolume")
+			continue
+		}
+
+		// TODO niko/assets storage class
+
+		bytes := result.Values[0].Value
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			diskMap[key] = &Disk{
+				Cluster: cluster,
+				Name:    name,
+			}
+		}
+		diskMap[key].Bytes = bytes
+	}
+
+	for _, result := range resLocalStorageCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("instance")
+		if err != nil {
+			log.Warningf("ClusterDisks: local storage data missing instance")
+			continue
+		}
+
+		// TODO niko/assets storage class?
+
+		cost := result.Values[0].Value
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			diskMap[key] = &Disk{
+				Cluster: cluster,
+				Name:    name,
+				Local:   true,
+			}
+		}
+		diskMap[key].Cost = cost
+	}
+
+	for _, result := range resLocalStorageBytes {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("instance")
+		if err != nil {
+			log.Warningf("ClusterDisks: local storage data missing instance")
+			continue
+		}
+
+		// TODO niko/assets storage class
+
+		bytes := result.Values[0].Value
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := diskMap[key]; !ok {
+			diskMap[key] = &Disk{
+				Cluster: cluster,
+				Name:    name,
+				Local:   true,
+			}
+		}
+		diskMap[key].Bytes = bytes
+	}
+
+	return diskMap, nil
+}
+
+type Node struct {
+	Cluster     string
+	Name        string
+	ProviderID  string
+	NodeType    string
+	CPUCost     float64
+	CPUCores    float64
+	GPUCost     float64
+	RAMCost     float64
+	RAMBytes    float64
+	Discount    float64
+	Preemptible bool
+}
+
+func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*Node, []error) {
+	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
+	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
+	if offset < time.Minute {
+		offsetStr = ""
+	}
+
+	// minsPerResolution determines accuracy and resource use for the following
+	// queries. Smaller values (higher resolution) result in better accuracy,
+	// but more expensive queries, and vice-a-versa.
+	minsPerResolution := 1
+
+	// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
+	// value, converts it to a cumulative value; i.e.
+	// [$/hr] * [min/res]*[hr/min] = [$/res]
+	hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
+
+	ctx := prom.NewContext(client)
+	queryNodeCPUCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node) * on(node, cluster_id) group_right avg(node_cpu_hourly_cost) by (cluster_id, node, instance_type))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryNodeCPUCores := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost) by (cluster_id, node))[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+
+	resChNodeCPUCost := ctx.Query(queryNodeCPUCost)
+	resChNodeCPUCores := ctx.Query(queryNodeCPUCores)
+	resChNodeRAMCost := ctx.Query(queryNodeRAMCost)
+	resChNodeRAMBytes := ctx.Query(queryNodeRAMBytes)
+	resChNodeGPUCost := ctx.Query(queryNodeGPUCost)
+	resChNodeLabels := ctx.Query(queryNodeLabels)
+
+	resNodeCPUCost, _ := resChNodeCPUCost.Await()
+	resNodeCPUCores, _ := resChNodeCPUCores.Await()
+	resNodeGPUCost, _ := resChNodeGPUCost.Await()
+	resNodeRAMCost, _ := resChNodeRAMCost.Await()
+	resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
+	resNodeLabels, _ := resChNodeLabels.Await()
+	if ctx.ErrorCollector.IsError() {
+		return nil, ctx.Errors()
+	}
+
+	nodeMap := map[string]*Node{}
+
+	for _, result := range resNodeCPUCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterNodes: CPU cost data missing node")
+			continue
+		}
+
+		nodeType, err := result.GetString("instance_type")
+		if err != nil {
+			log.Warningf("ClusterNodes: CPU cost data missing node type")
+		}
+
+		cpuCost := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Cluster:  cluster,
+				Name:     name,
+				NodeType: nodeType,
+			}
+		}
+		nodeMap[key].CPUCost = cpuCost
+		nodeMap[key].NodeType = nodeType
+	}
+
+	for _, result := range resNodeCPUCores {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterNodes: CPU cores data missing node")
+			continue
+		}
+
+		cpuCores := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Cluster: cluster,
+				Name:    name,
+			}
+		}
+		nodeMap[key].CPUCores = cpuCores
+	}
+
+	for _, result := range resNodeRAMCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterNodes: RAM cost data missing node")
+			continue
+		}
+
+		nodeType, err := result.GetString("instance_type")
+		if err != nil {
+			log.Warningf("ClusterNodes: RAM cost data missing node type")
+		}
+
+		ramCost := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Cluster:  cluster,
+				Name:     name,
+				NodeType: nodeType,
+			}
+		}
+		nodeMap[key].RAMCost = ramCost
+		nodeMap[key].NodeType = nodeType
+	}
+
+	for _, result := range resNodeRAMBytes {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterNodes: RAM bytes data missing node")
+			continue
+		}
+
+		ramBytes := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Cluster: cluster,
+				Name:    name,
+			}
+		}
+		nodeMap[key].RAMBytes = ramBytes
+	}
+
+	for _, result := range resNodeGPUCost {
+		cluster, err := result.GetString("cluster_id")
+		if err != nil {
+			cluster = env.GetClusterID()
+		}
+
+		name, err := result.GetString("node")
+		if err != nil {
+			log.Warningf("ClusterNodes: GPU cost data missing node")
+			continue
+		}
+
+		gpuCost := result.Values[0].Value
+
+		key := fmt.Sprintf("%s/%s", cluster, name)
+		if _, ok := nodeMap[key]; !ok {
+			nodeMap[key] = &Node{
+				Cluster: cluster,
+				Name:    name,
+			}
+		}
+		nodeMap[key].GPUCost = gpuCost
+	}
+
+	// node_labels label_cloud_google_com_gke_preemptible
+	for _, result := range resNodeLabels {
+		nodeName, err := result.GetString("node")
+		if err != nil {
+			continue
+		}
+
+		// GCP preemptible label
+		pre, _ := result.GetString("label_cloud_google_com_gke_preemptible")
+		if node, ok := nodeMap[nodeName]; pre == "true" && ok {
+			node.Preemptible = true
+		}
+
+		// TODO AWS preemptible
+		// TODO Azure preemptible
+	}
+
+	c, err := cp.GetConfig()
+	if err != nil {
+		return nil, []error{err}
+	}
+	discount, err := ParsePercentString(c.Discount)
+	if err != nil {
+		return nil, []error{err}
+	}
+	negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
+	if err != nil {
+		return nil, []error{err}
+	}
+
+	for _, node := range nodeMap {
+		if !node.Preemptible {
+			// TODO determine discount(s) based on:
+			// - custom settings
+			// - node RI data
+			// - provider-specific rules, e.g.
+			//   cp.GetDiscount(instanceType string) float64
+			node.Discount = (1.0 - (1.0-discount)*(1.0-negotiatedDiscount))
+		}
+	}
+
+	return nodeMap, nil
+}
+
 // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
 func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string, withBreakdown bool) (map[string]*ClusterCosts, error) {
 	// Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
@@ -198,9 +590,17 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		queryTotalCPU,
 		queryTotalRAM,
 		queryTotalStorage,
-		queryTotalLocalStorage,
 	)
 
+	// Only submit the local storage query if it is valid. Otherwise Prometheus
+	// will return errors. Always append something to resChs, regardless, to
+	// maintain indexing.
+	if queryTotalLocalStorage != "" {
+		resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
+	} else {
+		resChs = append(resChs, nil)
+	}
+
 	if withBreakdown {
 		queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, window, fmtOffset)
 		queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
@@ -210,9 +610,17 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 			queryCPUModePct,
 			queryRAMSystemPct,
 			queryRAMUserPct,
-			queryUsedLocalStorage,
 		)
 
+		// Only submit the local storage query if it is valid. Otherwise Prometheus
+		// will return errors. Always append something to resChs, regardless, to
+		// maintain indexing.
+		if queryUsedLocalStorage != "" {
+			bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
+		} else {
+			bdResChs = append(bdResChs, nil)
+		}
+
 		resChs = append(resChs, bdResChs...)
 	}
 
@@ -283,7 +691,9 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 	// Apply only custom discount to GPU and storage
 	setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
 	setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
-	setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
+	if queryTotalLocalStorage != "" {
+		setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
+	}
 
 	cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
 	ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
@@ -355,15 +765,24 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 			ramBD.Idle = remaining
 		}
 
-		for _, result := range resUsedLocalStorage {
-			clusterID, _ := result.GetString("cluster_id")
-			if clusterID == "" {
-				clusterID = defaultClusterID
+		if queryUsedLocalStorage != "" {
+			for _, result := range resUsedLocalStorage {
+				clusterID, _ := result.GetString("cluster_id")
+				if clusterID == "" {
+					clusterID = defaultClusterID
+				}
+				pvUsedCostMap[clusterID] += result.Values[0].Value
 			}
-			pvUsedCostMap[clusterID] += result.Values[0].Value
 		}
 	}
 
+	if ctx.ErrorCollector.IsError() {
+		for _, err := range ctx.Errors() {
+			log.Errorf("ComputeClusterCosts: %s", err)
+		}
+		return nil, ctx.Errors()[0]
+	}
+
 	// Convert intermediate structure to Costs instances
 	costsByCluster := map[string]*ClusterCosts{}
 	for id, cd := range costData {

+ 9 - 0
pkg/costmodel/costmodel.go

@@ -3,6 +3,7 @@ package costmodel
 import (
 	"fmt"
 	"math"
+	"regexp"
 	"strconv"
 	"strings"
 	"time"
@@ -42,6 +43,9 @@ const (
 	epFlags           = apiPrefix + "/status/flags"
 )
 
+// isCron matches a CronJob name and captures the non-timestamp name
+var isCron = regexp.MustCompile(`^(.+)-\d{10}$`)
+
 type CostModel struct {
 	Cache        clustercache.ClusterCache
 	RequestGroup *singleflight.Group
@@ -108,6 +112,11 @@ func (cd *CostData) GetController() (name string, kind string, hasController boo
 		name = cd.Jobs[0]
 		kind = "job"
 		hasController = true
+
+		match := isCron.FindStringSubmatch(name)
+		if match != nil {
+			name = match[1]
+		}
 	}
 
 	return name, kind, hasController