Procházet zdrojové kódy

add csv fallback (#641) (#646)

* add csv fallback

* log class match

* add counts by source

* add test for pricing source counter and make the names of sources public
Ajay Tripathy před 5 roky
rodič
revize
a42464d152

+ 1 - 1
go.mod

@@ -10,7 +10,7 @@ require (
 	github.com/aws/aws-sdk-go v1.28.9
 	github.com/dimchansky/utfbom v1.1.0 // indirect
 	github.com/getsentry/sentry-go v0.6.1
-	github.com/google/martian v2.1.0+incompatible // indirect
+	github.com/google/martian v2.1.0+incompatible
 	github.com/google/uuid v1.1.1
 	github.com/googleapis/gax-go v2.0.2+incompatible // indirect
 	github.com/gophercloud/gophercloud v0.2.0 // indirect

+ 1 - 0
go.sum

@@ -433,6 +433,7 @@ k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719 h1:uV4S5IB5g4Nvi+TBVNf3e9
 k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719/go.mod h1:I4A+glKBHiTgiEjQiCCQfCAIcIMFGt291SmsvcrFzJA=
 k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6 h1:tGU1C/vMoUV2ZakSH6wQq2shk9KiFtjoH2vDDHlhpA4=
 k8s.io/apimachinery v0.0.0-20190913075812-e119e5e154b6/go.mod h1:nL6pwRT8NgfF8TT68DBI8uEePRt89cSvoXUVqbkWHq4=
+k8s.io/apimachinery v0.20.1 h1:LAhz8pKbgR8tUwn7boK+b2HZdt7MiTu2mkYtFMUjTRQ=
 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab h1:E8Fecph0qbNsAbijJJQryKu4Oi9QTp5cVpjTE+nqg6g=
 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab/go.mod h1:E95RaSlHr79aHaX0aGSwcPNfygDiPKOVXdmivCIZT0k=
 k8s.io/client-go v1.5.1 h1:XaX/lo2/u3/pmFau8HN+sB5C/b4dc4Dmm2eXjBH4p1E=

+ 49 - 4
pkg/cloud/csvprovider.go

@@ -6,11 +6,13 @@ import (
 	"io"
 	"os"
 	"regexp"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/env"
+	"github.com/kubecost/cost-model/pkg/util"
 
 	"github.com/aws/aws-sdk-go/aws"
 	"github.com/aws/aws-sdk-go/aws/session"
@@ -28,6 +30,8 @@ type CSVProvider struct {
 	*CustomProvider
 	CSVLocation             string
 	Pricing                 map[string]*price
+	NodeClassPricing        map[string]float64
+	NodeClassCount          map[string]float64
 	NodeMapField            string
 	PricingPV               map[string]*price
 	PVMapField              string
@@ -53,6 +57,8 @@ func (c *CSVProvider) DownloadPricingData() error {
 	c.DownloadPricingDataLock.Lock()
 	defer c.DownloadPricingDataLock.Unlock()
 	pricing := make(map[string]*price)
+	nodeclasspricing := make(map[string]float64)
+	nodeclasscount := make(map[string]float64)
 	pvpricing := make(map[string]*price)
 	header, err := csvutil.Header(price{}, "csv")
 	if err != nil {
@@ -75,6 +81,8 @@ func (c *CSVProvider) DownloadPricingData() error {
 			csvr = out.Body
 		} else {
 			c.Pricing = pricing
+			c.NodeClassPricing = nodeclasspricing
+			c.NodeClassCount = nodeclasscount
 			c.PricingPV = pvpricing
 			return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
 		}
@@ -84,6 +92,8 @@ func (c *CSVProvider) DownloadPricingData() error {
 	if csverr != nil {
 		klog.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
 		c.Pricing = pricing
+		c.NodeClassPricing = nodeclasspricing
+		c.NodeClassCount = nodeclasscount
 		c.PricingPV = pvpricing
 		return nil
 	}
@@ -94,6 +104,8 @@ func (c *CSVProvider) DownloadPricingData() error {
 	dec, err := csvutil.NewDecoder(csvReader, header...)
 	if err != nil {
 		c.Pricing = pricing
+		c.NodeClassPricing = nodeclasspricing
+		c.NodeClassCount = nodeclasscount
 		c.PricingPV = pvpricing
 		return err
 	}
@@ -130,6 +142,23 @@ func (c *CSVProvider) DownloadPricingData() error {
 			c.PVMapField = p.InstanceIDField
 		} else if p.AssetClass == "node" {
 			pricing[key] = &p
+			classKey := p.Region + "," + p.InstanceType + "," + p.AssetClass
+			cost, err := strconv.ParseFloat(p.MarketPriceHourly, 64)
+			if err != nil {
+
+			} else {
+				if _, ok := nodeclasspricing[classKey]; ok {
+					oldPrice := nodeclasspricing[classKey]
+					oldCount := nodeclasscount[classKey]
+					newPrice := ((oldPrice * oldCount) + cost) / (oldCount + 1.0)
+					nodeclasscount[classKey] = newPrice
+					nodeclasscount[classKey]++
+				} else {
+					nodeclasspricing[classKey] = cost
+					nodeclasscount[classKey] = 1
+				}
+			}
+
 			c.NodeMapField = p.InstanceIDField
 		} else {
 			klog.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
@@ -139,6 +168,8 @@ func (c *CSVProvider) DownloadPricingData() error {
 	}
 	if len(pricing) > 0 {
 		c.Pricing = pricing
+		c.NodeClassPricing = nodeclasspricing
+		c.NodeClassCount = nodeclasscount
 		c.PricingPV = pvpricing
 	} else {
 		log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
@@ -153,7 +184,11 @@ type csvKey struct {
 }
 
 func (k *csvKey) Features() string {
-	return ""
+	instanceType, _ := util.GetInstanceType(k.Labels)
+	region, _ := util.GetRegion(k.Labels)
+	class := "node"
+
+	return region + "," + instanceType + "," + class
 }
 func (k *csvKey) GPUType() string {
 	return ""
@@ -167,18 +202,28 @@ func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
 	defer c.DownloadPricingDataLock.RUnlock()
 	if p, ok := c.Pricing[key.ID()]; ok {
 		return &Node{
-			Cost: p.MarketPriceHourly,
+			Cost:        p.MarketPriceHourly,
+			PricingType: CsvExact,
 		}, nil
 	}
 	s := strings.Split(key.ID(), ",") // Try without a region to be sure
 	if len(s) == 2 {
 		if p, ok := c.Pricing[s[1]]; ok {
 			return &Node{
-				Cost: p.MarketPriceHourly,
+				Cost:        p.MarketPriceHourly,
+				PricingType: CsvExact,
 			}, nil
 		}
 	}
-	return nil, fmt.Errorf("Unable to find Node matching %s", key.ID())
+	classKey := key.Features() // Use node attributes to try and do a class match
+	if cost, ok := c.NodeClassPricing[classKey]; ok {
+		klog.Infof("Unable to find provider ID `%s`, using features:`%s`", key.ID(), key.Features())
+		return &Node{
+			Cost:        fmt.Sprintf("%f", cost),
+			PricingType: CsvClass,
+		}, nil
+	}
+	return nil, fmt.Errorf("Unable to find Node matching `%s`:`%s`", key.ID(), key.Features())
 }
 
 func NodeValueFromMapField(m string, n *v1.Node, useRegion bool) string {

+ 18 - 0
pkg/cloud/provider.go

@@ -59,6 +59,7 @@ type Node struct {
 	Region           string                `json:"region,omitempty"`
 	Reserved         *ReservedInstanceData `json:"reserved,omitempty"`
 	ProviderID       string                `json:"providerID,omitempty"`
+	PricingType      PricingType           `json:"pricingType,omitempty"`
 }
 
 // IsSpot determines whether or not a Node uses spot by usage type
@@ -195,6 +196,23 @@ type PricingSource struct {
 	Error     string `json:"error"`
 }
 
+type PricingType string
+
+const (
+	Api           PricingType = "api"
+	Spot          PricingType = "spot"
+	Reserved      PricingType = "reserved"
+	SavingsPlan   PricingType = "savingsPlan"
+	CsvExact      PricingType = "csvExact"
+	CsvClass      PricingType = "csvClass"
+	DefaultPrices PricingType = "defaultPrices"
+)
+
+type PricingMatchMetadata struct {
+	TotalNodes        int                 `json:"TotalNodes"`
+	PricingTypeCounts map[PricingType]int `json:"PricingType"`
+}
+
 // Provider represents a k8s provider.
 type Provider interface {
 	ClusterInfo() (map[string]string, error)

+ 27 - 5
pkg/costmodel/costmodel.go

@@ -46,10 +46,11 @@ const (
 var isCron = regexp.MustCompile(`^(.+)-\d{10}$`)
 
 type CostModel struct {
-	Cache          clustercache.ClusterCache
-	ClusterMap     clusters.ClusterMap
-	ScrapeInterval time.Duration
-	RequestGroup   *singleflight.Group
+	Cache           clustercache.ClusterCache
+	ClusterMap      clusters.ClusterMap
+	ScrapeInterval  time.Duration
+	RequestGroup    *singleflight.Group
+	pricingMetadata *costAnalyzerCloud.PricingMatchMetadata
 }
 
 func NewCostModel(cache clustercache.ClusterCache, clusterMap clusters.ClusterMap, scrapeInterval time.Duration) *CostModel {
@@ -882,6 +883,14 @@ func GetPVCost(pv *costAnalyzerCloud.PV, kpv *v1.PersistentVolume, cp costAnalyz
 	return nil
 }
 
+func (cm *CostModel) GetPricingSourceCounts() (*costAnalyzerCloud.PricingMatchMetadata, error) {
+	if cm.pricingMetadata != nil {
+		return cm.pricingMetadata, nil
+	} else {
+		return nil, fmt.Errorf("Node costs not yet calculated")
+	}
+}
+
 func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*costAnalyzerCloud.Node, error) {
 	cfg, err := cp.GetConfig()
 	if err != nil {
@@ -891,11 +900,17 @@ func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*cos
 	nodeList := cm.Cache.GetAllNodes()
 	nodes := make(map[string]*costAnalyzerCloud.Node)
 
+	pmd := &costAnalyzerCloud.PricingMatchMetadata{
+		TotalNodes:        0,
+		PricingTypeCounts: make(map[costAnalyzerCloud.PricingType]int),
+	}
 	for _, n := range nodeList {
 		name := n.GetObjectMeta().GetName()
 		nodeLabels := n.GetObjectMeta().GetLabels()
 		nodeLabels["providerID"] = n.Spec.ProviderID
 
+		pmd.TotalNodes++
+
 		cnode, err := cp.NodePricing(cp.GetKey(nodeLabels, n))
 		if err != nil {
 			klog.Infof("Error getting node pricing. Error: %s", err.Error())
@@ -909,6 +924,13 @@ func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*cos
 				}
 			}
 		}
+
+		if _, ok := pmd.PricingTypeCounts[cnode.PricingType]; ok {
+			pmd.PricingTypeCounts[cnode.PricingType]++
+		} else {
+			pmd.PricingTypeCounts[cnode.PricingType] = 1
+		}
+
 		newCnode := *cnode
 		if newCnode.InstanceType == "" {
 			it, _ := util.GetInstanceType(n.Labels)
@@ -1145,7 +1167,7 @@ func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*cos
 
 		nodes[name] = &newCnode
 	}
-
+	cm.pricingMetadata = pmd
 	cp.ApplyReservedInstancePricing(nodes)
 
 	return nodes, nil

+ 8 - 0
pkg/costmodel/router.go

@@ -721,6 +721,13 @@ func (a *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request
 	w.Write(WrapData(a.CloudProvider.PricingSourceStatus(), nil))
 }
 
+func (a *Accesses) GetPricingSourceCounts(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	w.Write(WrapData(a.Model.GetPricingSourceCounts()))
+}
+
 func (a *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -1067,6 +1074,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 	a.Router.GET("/clusterInfoMap", a.GetClusterInfoMap)
 	a.Router.GET("/serviceAccountStatus", a.GetServiceAccountStatus)
 	a.Router.GET("/pricingSourceStatus", a.GetPricingSourceStatus)
+	a.Router.GET("/pricingSourceCounts", a.GetPricingSourceCounts)
 
 	// cluster manager endpoints
 	a.Router.GET("/clusters", managerEndpoints.GetAllClusters)

+ 1 - 0
pkg/kubecost/asset.go

@@ -2468,6 +2468,7 @@ func (as *AssetSet) ReconciliationMatch(query Asset) (Asset, bool, error) {
 	var providerIDMatch Asset
 	for _, asset := range as.assets {
 		if key(asset, fullMatchProps) == fullMatchKey {
+			log.DedupedInfof(10, "Asset ETL: Reconciliation[rcnw]: ReconcileRange Match: %s", fullMatchKey)
 			return asset, true, nil
 		}
 		if key(asset, providerIDMatchProps) == providerIDMatchKey {

+ 120 - 1
test/cloud_test.go

@@ -1,6 +1,8 @@
 package test
 
 import (
+	"fmt"
+	"math"
 	"os"
 	"strings"
 	"testing"
@@ -125,7 +127,8 @@ func TestNodePriceFromCSV(t *testing.T) {
 	unknownN.Name = "unknownname"
 	unknownN.Labels = make(map[string]string)
 	unknownN.Labels["foo"] = labelFooWant
-	k2 := c.GetKey(n.Labels, unknownN)
+	unknownN.Labels["topology.kubernetes.io/region"] = "fakeregion"
+	k2 := c.GetKey(unknownN.Labels, unknownN)
 	resN2, _ := c.NodePricing(k2)
 	if resN2 != nil {
 		t.Errorf("CSV provider should return nil on missing node")
@@ -215,6 +218,7 @@ func TestNodePriceFromCSVWithRegion(t *testing.T) {
 	unknownN.Spec.ProviderID = "fake providerID"
 	unknownN.Name = "unknownname"
 	unknownN.Labels = make(map[string]string)
+	unknownN.Labels["topology.kubernetes.io/region"] = "fakeregion"
 	unknownN.Labels["foo"] = labelFooWant
 	k4 := c.GetKey(unknownN.Labels, unknownN)
 	resN4, _ := c.NodePricing(k4)
@@ -283,6 +287,76 @@ func TestNodePriceFromCSVWithBadConfig(t *testing.T) {
 	}
 }
 
+func TestSourceMatchesFromCSV(t *testing.T) {
+	os.Setenv("CONFIG_PATH", "../configs")
+	c := &cloud.CSVProvider{
+		CSVLocation: "../configs/pricing_schema_case.csv",
+		CustomProvider: &cloud.CustomProvider{
+			Config: cloud.NewProviderConfig("/default.json"),
+		},
+	}
+	c.DownloadPricingData()
+
+	n := &v1.Node{}
+	n.Spec.ProviderID = "fake"
+	n.Name = "nameWant"
+	n.Labels = make(map[string]string)
+	n.Labels["foo"] = "labelFooWant"
+	n.Labels[v1.LabelZoneRegion] = "regionone"
+
+	n2 := &v1.Node{}
+	n2.Spec.ProviderID = "azure:///subscriptions/123a7sd-asd-1234-578a9-123abcdef/resourceGroups/case_12_STaGe_TeSt7/providers/Microsoft.Compute/virtualMachineScaleSets/vmss-agent-worker0-12stagetest7-ezggnore/virtualMachines/7"
+	n2.Labels = make(map[string]string)
+	n2.Labels[v1.LabelZoneRegion] = "eastus2"
+	n2.Labels["foo"] = "labelFooWant"
+
+	k := c.GetKey(n2.Labels, n2)
+	resN, err := c.NodePricing(k)
+	if err != nil {
+		t.Errorf("Error in NodePricing: %s", err.Error())
+	} else {
+		wantPrice := "0.13370357"
+		gotPrice := resN.Cost
+		if gotPrice != wantPrice {
+			t.Errorf("Wanted price '%s' got price '%s'", wantPrice, gotPrice)
+		}
+	}
+
+	n3 := &v1.Node{}
+	n3.Spec.ProviderID = "fake"
+	n3.Name = "nameWant"
+	n3.Labels = make(map[string]string)
+	n.Labels[v1.LabelZoneRegion] = "eastus2"
+	n.Labels[v1.LabelInstanceType] = "Standard_F32s_v2"
+
+	fc := NewFakeNodeCache([]*v1.Node{n, n2, n3})
+	fm := FakeClusterMap{}
+	d, _ := time.ParseDuration("1m")
+
+	model := costmodel.NewCostModel(fc, fm, d)
+
+	_, err = model.GetNodeCost(c)
+	if err != nil {
+		t.Errorf("Error in node pricing: %s", err)
+	}
+	p, err := model.GetPricingSourceCounts()
+	if err != nil {
+		t.Errorf("Error in pricing source counts: %s", err)
+	} else if p.TotalNodes != 3 {
+		t.Errorf("Wanted 3 nodes got %d", p.TotalNodes)
+	}
+	if p.PricingTypeCounts[""] != 1 {
+		t.Errorf("Wanted 1 default match got %d: %+v", p.PricingTypeCounts[""], p.PricingTypeCounts)
+	}
+	if p.PricingTypeCounts["csvExact"] != 1 {
+		t.Errorf("Wanted 1 exact match got %d: %+v", p.PricingTypeCounts["csvExact"], p.PricingTypeCounts)
+	}
+	if p.PricingTypeCounts["csvClass"] != 1 {
+		t.Errorf("Wanted 1 class match got %d: %+v", p.PricingTypeCounts["csvClass"], p.PricingTypeCounts)
+	}
+
+}
+
 func TestNodePriceFromCSVWithCase(t *testing.T) {
 	n := &v1.Node{}
 	n.Spec.ProviderID = "azure:///subscriptions/123a7sd-asd-1234-578a9-123abcdef/resourceGroups/case_12_STaGe_TeSt7/providers/Microsoft.Compute/virtualMachineScaleSets/vmss-agent-worker0-12stagetest7-ezggnore/virtualMachines/7"
@@ -310,3 +384,48 @@ func TestNodePriceFromCSVWithCase(t *testing.T) {
 	}
 
 }
+
+func TestNodePriceFromCSVByClass(t *testing.T) {
+	n := &v1.Node{}
+	n.Spec.ProviderID = "fakeproviderid"
+	n.Labels = make(map[string]string)
+	n.Labels[v1.LabelZoneRegion] = "eastus2"
+	n.Labels[v1.LabelInstanceType] = "Standard_F32s_v2"
+	wantpricefloat := 0.13370357
+	wantPrice := fmt.Sprintf("%f", (math.Round(wantpricefloat*1000000) / 1000000))
+
+	c := &cloud.CSVProvider{
+		CSVLocation: "../configs/pricing_schema_case.csv",
+		CustomProvider: &cloud.CustomProvider{
+			Config: cloud.NewProviderConfig("../configs/default.json"),
+		},
+	}
+
+	c.DownloadPricingData()
+
+	k := c.GetKey(n.Labels, n)
+	resN, err := c.NodePricing(k)
+	if err != nil {
+		t.Errorf("Error in NodePricing: %s", err.Error())
+	} else {
+		gotPrice := resN.Cost
+		if gotPrice != wantPrice {
+			t.Errorf("Wanted price '%s' got price '%s'", wantPrice, gotPrice)
+		}
+	}
+
+	n2 := &v1.Node{}
+	n2.Spec.ProviderID = "fakeproviderid"
+	n2.Labels = make(map[string]string)
+	n2.Labels[v1.LabelZoneRegion] = "fakeregion"
+	n2.Labels[v1.LabelInstanceType] = "Standard_F32s_v2"
+	k2 := c.GetKey(n2.Labels, n)
+
+	c.DownloadPricingData()
+	resN2, err := c.NodePricing(k2)
+
+	if resN2 != nil {
+		t.Errorf("CSV provider should return nil on missing node, instead returned %+v", resN2)
+	}
+
+}