Bladeren bron

Issue #173 enable custom pricing for nodes and storage; add filter by cluster name; update tests

Niko Kovacevic 6 jaren geleden
bovenliggende
commit
8a44927cef
7 gewijzigde bestanden met toevoegingen van 80 en 53 verwijderingen
  1. 3 25
      cloud/customprovider.go
  2. 49 0
      cloud/provider.go
  3. 15 17
      costmodel/costmodel.go
  4. 4 2
      main.go
  5. 1 1
      test/aggregation_test.go
  6. 4 4
      test/historical_pod_test.go
  7. 4 4
      test/remote_cluster_test.go

+ 3 - 25
cloud/customprovider.go

@@ -129,32 +129,10 @@ func (cp *CustomProvider) NodePricing(key Key) (*Node, error) {
 		gpuCount = "1" // TODO: support more than one gpu.
 	}
 
-	cpu := cp.Pricing[k].CPU
-	ram := cp.Pricing[k].RAM
-	gpu := cp.Pricing[k].GPU
-
-	// If custom pricing is enabled and can be retrieved, replace
-	// default cost values with custom values
-	customPricing, err := cp.GetConfig()
-	if CustomPricesEnabled(cp) && err == nil {
-		// TODO nikovacevic-caching how to determine if it is spot?
-		isSpot := false
-
-		if isSpot {
-			cpu = customPricing.SpotCPU
-			ram = customPricing.SpotRAM
-			gpu = customPricing.SpotGPU
-		} else {
-			cpu = customPricing.CPU
-			ram = customPricing.RAM
-			gpu = customPricing.GPU
-		}
-	}
-
 	return &Node{
-		VCPUCost: cpu,
-		RAMCost:  ram,
-		GPUCost:  gpu,
+		VCPUCost: cp.Pricing[k].CPU,
+		RAMCost:  cp.Pricing[k].RAM,
+		GPUCost:  cp.Pricing[k].GPU,
 		GPU:      gpuCount,
 	}, nil
 }

+ 49 - 0
cloud/provider.go

@@ -55,6 +55,11 @@ type Node struct {
 	GPUCost          string `json:"gpuCost"`
 }
 
+// IsSpot determines whether or not a Node uses spot by usage type
+func (n *Node) IsSpot() bool {
+	return strings.Contains(n.UsageType, "spot") || strings.Contains(n.UsageType, "emptible")
+}
+
 // Network is the interface by which the provider and cost model communicate network egress prices.
 // The provider will best-effort try to fill out this struct.
 type Network struct {
@@ -180,6 +185,50 @@ func CustomPricesEnabled(p Provider) bool {
 	return config.CustomPricesEnabled == "true"
 }
 
+// NodePricing pulls pricing data from the given provider for the node at the given key.
+// If custom pricing is enabled, those pricing data are returned instead.
+func NodePricing(p Provider, k Key) (*Node, error) {
+	node, err := p.NodePricing(k)
+	if err != nil {
+		return nil, err
+	}
+
+	// If custom pricing is enabled and can be retrieved, replace
+	// default cost values with custom values
+	customPricing, err := p.GetConfig()
+	if CustomPricesEnabled(p) && err == nil {
+		if node.IsSpot() {
+			node.VCPUCost = customPricing.SpotCPU
+			node.RAMCost = customPricing.SpotRAM
+			node.GPUCost = customPricing.SpotGPU
+		} else {
+			node.VCPUCost = customPricing.CPU
+			node.RAMCost = customPricing.RAM
+			node.GPUCost = customPricing.GPU
+		}
+	}
+
+	return node, nil
+}
+
+// PVPricing pulls pricing data from the given provider for the persisten volume at the
+// given key. If custom pricing is enabled, those pricing data are returned instead.
+func PVPricing(p Provider, k PVKey) (*PV, error) {
+	pv, err := p.PVPricing(k)
+	if err != nil {
+		return nil, err
+	}
+
+	// If custom pricing is enabled and can be retrieved, replace
+	// default cost values with custom values
+	customPricing, err := p.GetConfig()
+	if CustomPricesEnabled(p) && err == nil {
+		pv.Cost = customPricing.Storage
+	}
+
+	return pv, nil
+}
+
 // GetDefaultPricingData will search for a json file representing pricing data in /models/ and use it for base pricing info.
 func GetDefaultPricingData(fname string) (*CustomPricing, error) {
 	path := os.Getenv("CONFIG_PATH")

+ 15 - 17
costmodel/costmodel.go

@@ -912,13 +912,13 @@ func addPVData(cache ClusterCache, pvClaimMapping map[string]*PersistentVolumeCl
 	return nil
 }
 
-func GetPVCost(pv *costAnalyzerCloud.PV, kpv *v1.PersistentVolume, cloud costAnalyzerCloud.Provider) error {
-	cfg, err := cloud.GetConfig()
+func GetPVCost(pv *costAnalyzerCloud.PV, kpv *v1.PersistentVolume, cp costAnalyzerCloud.Provider) error {
+	cfg, err := cp.GetConfig()
 	if err != nil {
 		return err
 	}
-	key := cloud.GetPVKey(kpv, pv.Parameters)
-	pvWithCost, err := cloud.PVPricing(key)
+	key := cp.GetPVKey(kpv, pv.Parameters)
+	pvWithCost, err := cloud.PVPricing(cp, key)
 	if err != nil {
 		pv.Cost = cfg.Storage
 		return err
@@ -944,7 +944,7 @@ func getNodeCost(cache ClusterCache, cp costAnalyzerCloud.Provider) (map[string]
 		nodeLabels := n.GetObjectMeta().GetLabels()
 		nodeLabels["providerID"] = n.Spec.ProviderID
 
-		cnode, err := cp.NodePricing(cp.GetKey(nodeLabels))
+		cnode, err := cloud.NodePricing(cp, cp.GetKey(nodeLabels))
 		if err != nil {
 			klog.V(1).Infof("Error getting node. Error: " + err.Error())
 			nodes[name] = cnode
@@ -1093,8 +1093,15 @@ func getPodDeployments(cache ClusterCache, podList []*v1.Pod) (map[string]map[st
 	return podDeploymentsMapping, nil
 }
 
+func costDataPassesFilters(costs *CostData, namespace string, cluster string) bool {
+	passesNamespace := namespace == "" || costs.Namespace == namespace
+	passesCluster := cluster == "" || costs.ClusterID == cluster
+
+	return passesNamespace && passesCluster
+}
+
 func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
-	startString, endString, windowString string, filterNamespace string, remoteEnabled bool) (map[string]*CostData, error) {
+	startString, endString, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
 	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
@@ -1432,13 +1439,9 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 				costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 				costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
 
-				if filterNamespace == "" {
-					containerNameCost[newKey] = costs
-				} else if costs.Namespace == filterNamespace {
+				if costDataPassesFilters(costs, filterNamespace, filterCluster) {
 					containerNameCost[newKey] = costs
 				}
-
-				// TODO nikovacevic-caching filter cluster ID
 			}
 
 		} else {
@@ -1502,15 +1505,10 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
 
-			if filterNamespace == "" {
-				containerNameCost[key] = costs
-				missingContainers[key] = costs
-			} else if costs.Namespace == filterNamespace {
+			if costDataPassesFilters(costs, filterNamespace, filterCluster) {
 				containerNameCost[key] = costs
 				missingContainers[key] = costs
 			}
-
-			// TODO nikovacevic-caching filter cluster ID
 		}
 	}
 

+ 4 - 2
main.go

@@ -252,6 +252,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	window := r.URL.Query().Get("window")
 	offset := r.URL.Query().Get("offset")
 	namespace := r.URL.Query().Get("namespace")
+	cluster := r.URL.Query().Get("cluster")
 	field := r.URL.Query().Get("aggregation")
 	subfield := r.URL.Query().Get("aggregationSubfield")
 	allocateIdle := r.URL.Query().Get("allocateIdle")
@@ -347,7 +348,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	}
 	klog.Infof("REMOTE ENABLED: %t", remoteEnabled)
 
-	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, "1h", namespace, remoteEnabled)
+	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, "1h", namespace, cluster, remoteEnabled)
 	if err != nil {
 		w.Write(wrapData(nil, err))
 		return
@@ -408,6 +409,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	window := r.URL.Query().Get("window")
 	fields := r.URL.Query().Get("filterFields")
 	namespace := r.URL.Query().Get("namespace")
+	cluster := r.URL.Query().Get("cluster")
 	aggregationField := r.URL.Query().Get("aggregation")
 	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
 	remote := r.URL.Query().Get("remote")
@@ -417,7 +419,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	if remoteAvailable == "true" && remote != "false" {
 		remoteEnabled = true
 	}
-	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, window, namespace, remoteEnabled)
+	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, window, namespace, cluster, remoteEnabled)
 	if err != nil {
 		w.Write(wrapData(nil, err))
 	}

+ 1 - 1
test/aggregation_test.go

@@ -101,7 +101,7 @@ func TestAggregation(t *testing.T) {
 	costData := make(map[string]*costModel.CostData)
 	costData["test1,foo,nginx,testnode"] = cd1
 	costData["test1,bar,nginx,testnode"] = cd2
-	agg := costModel.AggregateCostModel(costData, 0.0, 1.0, nil, "namespace", "")
+	agg := costModel.AggregateCostModel(costData, "namespace", "", false, 0.0, 1.0, nil)
 	log.Printf("agg: %+v", agg["test1"])
 	assert.Equal(t, agg["test1"].TotalCost, 8.0)
 }

+ 4 - 4
test/historical_pod_test.go

@@ -190,11 +190,11 @@ func TestPodUpDown(t *testing.T) {
 	log.Printf("Starting at %s \n", startStr)
 	log.Printf("Ending at %s \n", endStr)
 	provider.DownloadPricingData()
-	data, err := cm.ComputeCostDataRange(promCli, rclient, provider, startStr, endStr, "1m", "", false)
+	data, err := cm.ComputeCostDataRange(promCli, rclient, provider, startStr, endStr, "1m", "", "", false)
 	if err != nil {
 		panic(err)
 	}
-	agg := costModel.AggregateCostModel(data, 0.0, 1.0, nil, "namespace", "")
+	agg := costModel.AggregateCostModel(data, "namespace", "", false, 0.0, 1.0, nil)
 	_, ok := agg["test"]
 	assert.Assert(t, ok)
 
@@ -202,11 +202,11 @@ func TestPodUpDown(t *testing.T) {
 	if err != nil {
 		panic(err)
 	}
-	agg2 := costModel.AggregateCostModel(data2, 0.0, 1.0, nil, "namespace", "")
+	agg2 := costModel.AggregateCostModel(data2, "namespace", "", false, 0.0, 1.0, nil)
 	_, ok2 := agg2["test"]
 	assert.Assert(t, ok2)
 
-	agg3 := costModel.AggregateCostModel(data, 0.0, 1.0, nil, "label", "testaggregation")
+	agg3 := costModel.AggregateCostModel(data, "label", "testaggregation", false, 0.0, 1.0, nil)
 	_, ok3 := agg3["foo"]
 	assert.Assert(t, ok3)
 }

+ 4 - 4
test/remote_cluster_test.go

@@ -55,7 +55,7 @@ func TestClusterConvergence(t *testing.T) {
 	log.Printf("Ending at %s \n", endStr)
 	provider.DownloadPricingData()
 
-	data, err := cm.ComputeCostDataRange(promCli, rclient, provider, startStr, endStr, "1h", "", false)
+	data, err := cm.ComputeCostDataRange(promCli, rclient, provider, startStr, endStr, "1h", "", "", false)
 	if err != nil {
 		panic(err)
 	}
@@ -63,13 +63,13 @@ func TestClusterConvergence(t *testing.T) {
 	os.Setenv("SQL_ADDRESS", "ab5cfc235d64e11e9b8280265f54018f-778641917.us-east-2.elb.amazonaws.com")
 	os.Setenv("REMOTE_WRITE_PASSWORD", "savemoney123")
 
-	data2, err := cm.ComputeCostDataRange(promCli, rclient, provider, startStr, endStr, "1h", "", true)
+	data2, err := cm.ComputeCostDataRange(promCli, rclient, provider, startStr, endStr, "1h", "", "", true)
 	if err != nil {
 		panic(err)
 	}
 
-	agg := costModel.AggregateCostModel(data, 0.0, 1.0, nil, "namespace", "")
-	agg2 := costModel.AggregateCostModel(data2, 0.0, 1.0, nil, "namespace", "")
+	agg := costModel.AggregateCostModel(data, "namespace", "", false, 0.0, 1.0, nil)
+	agg2 := costModel.AggregateCostModel(data2, "namespace", "", false, 0.0, 1.0, nil)
 
 	assert.Equal(t, agg["kubecost"].TotalCost, agg2["kubecost"].TotalCost)