Explorar o código

initial commit of csv provider

AjayTripathy %!s(int64=6) %!d(string=hai) anos
pai
achega
a03cb3a14b

+ 2 - 2
pkg/cloud/awsprovider.go

@@ -482,7 +482,7 @@ func (key *awsPVKey) Features() string {
 }
 
 // GetKey maps node labels to information needed to retrieve pricing data
-func (aws *AWS) GetKey(labels map[string]string) Key {
+func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
 	return &awsKey{
 		SpotLabelName:  aws.SpotLabelName,
 		SpotLabelValue: aws.SpotLabelValue,
@@ -531,7 +531,7 @@ func (aws *AWS) DownloadPricingData() error {
 	inputkeys := make(map[string]bool)
 	for _, n := range nodeList {
 		labels := n.GetObjectMeta().GetLabels()
-		key := aws.GetKey(labels)
+		key := aws.GetKey(labels, n)
 		inputkeys[key.Features()] = true
 	}
 

+ 1 - 1
pkg/cloud/azureprovider.go

@@ -285,7 +285,7 @@ func (az *Azure) loadAzureAuthSecret(force bool) (*AzureServiceKey, error) {
 	return azureSecret, nil
 }
 
-func (az *Azure) GetKey(labels map[string]string) Key {
+func (az *Azure) GetKey(labels map[string]string, n *v1.Node) Key {
 	cfg, err := az.GetConfig()
 	if err != nil {
 		klog.Infof("Error loading azure custom pricing information")

+ 223 - 0
pkg/cloud/csvprovider.go

@@ -0,0 +1,223 @@
+package cloud
+
+import (
+	"encoding/csv"
+	"io"
+	"os"
+	"strings"
+	"sync"
+
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/klog"
+
+	"github.com/jszwec/csvutil"
+)
+
+type CSVProvider struct {
+	*CustomProvider
+	CSVLocation             string
+	Pricing                 map[string]*price
+	NodeMapField            string
+	PricingPV               map[string]*price
+	PVMapField              string
+	DownloadPricingDataLock sync.RWMutex
+}
+type price struct {
+	EndTimestamp      string `csv:"EndTimestamp"`
+	InstanceID        string `csv:"InstanceID"`
+	AssetClass        string `csv:"AssetClass"`
+	InstanceIDField   string `csv:"InstanceIDField"`
+	InstanceType      string `csv:"InstanceType"`
+	MarketPriceHourly string `csv:"MarketPriceHourly"`
+	Version           string `csv:"Version"`
+}
+
+func parseMapField(mf string) {
+
+}
+
+func GetCsv(location string) (io.Reader, error) {
+	return os.Open(location)
+}
+
+func (c *CSVProvider) DownloadPricingData() error {
+	c.DownloadPricingDataLock.Lock()
+	defer c.DownloadPricingDataLock.Unlock()
+	pricing := make(map[string]*price)
+	pvpricing := make(map[string]*price)
+	header, err := csvutil.Header(price{}, "csv")
+	if err != nil {
+		return err
+	}
+	fieldsPerRecord := len(header)
+	csvr, err := GetCsv(c.CSVLocation)
+	csvReader := csv.NewReader(csvr)
+	csvReader.Comma = '\t'
+	csvReader.FieldsPerRecord = fieldsPerRecord
+
+	dec, err := csvutil.NewDecoder(csvReader, header...)
+	if err != nil {
+		return err
+	}
+	for {
+		p := price{}
+		err := dec.Decode(&p)
+		csvParseErr, isCsvParseErr := err.(*csv.ParseError)
+		if err == io.EOF {
+			break
+		} else if err == csvutil.ErrFieldCount || (isCsvParseErr && csvParseErr.Err == csv.ErrFieldCount) {
+			rec := dec.Record()
+			if len(rec) != 1 {
+				klog.V(2).Infof("Expected %d price info fields but received %d: %s", fieldsPerRecord, len(rec), rec)
+				continue
+			}
+			if strings.Index(rec[0], "#") == 0 {
+				continue
+			} else {
+				klog.V(3).Infof("skipping non-CSV line: %s", rec)
+				continue
+			}
+		} else if err != nil {
+			klog.V(2).Infof("Error during spot info decode: %+v", err)
+			continue
+		}
+
+		klog.V(4).Infof("Found price info %+v", p)
+		if p.AssetClass == "pv" {
+			pvpricing[p.InstanceID] = &p
+			c.PVMapField = p.InstanceIDField
+		} else if p.AssetClass == "node" {
+			pricing[p.InstanceID] = &p
+			c.NodeMapField = p.InstanceIDField
+		} else {
+			klog.Infof("Unrecognized asset class %s, defaulting to node", p.AssetClass)
+			pricing[p.InstanceID] = &p
+			c.NodeMapField = p.InstanceIDField
+		}
+	}
+	c.Pricing = pricing
+	c.PricingPV = pvpricing
+	return nil
+}
+
+type csvKey struct {
+	Labels     map[string]string
+	ProviderID string
+}
+
+func (k *csvKey) Features() string {
+	return ""
+}
+func (k *csvKey) GPUType() string {
+	return ""
+}
+func (k *csvKey) ID() string {
+	return k.ProviderID
+}
+
+func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
+	c.DownloadPricingDataLock.RLock()
+	defer c.DownloadPricingDataLock.RUnlock()
+	if p, ok := c.Pricing[key.ID()]; ok {
+		return &Node{
+			Cost: p.MarketPriceHourly,
+		}, nil
+	} else {
+		klog.Infof("Unable to find Node matching %s", key.ID())
+		return &Node{}, nil
+	}
+}
+
+func NodeValueFromMapField(m string, n *v1.Node) string {
+	mf := strings.Split(m, ".")
+	if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
+		return n.Spec.ProviderID
+	} else if len(mf) > 1 && mf[0] == "metadata" {
+		if mf[1] == "name" {
+			return n.Name
+		} else if mf[1] == "labels" {
+			lkey := strings.Join(mf[2:len(mf)], "")
+			return n.Labels[lkey]
+		} else if mf[1] == "annotations" {
+			akey := strings.Join(mf[2:len(mf)], "")
+			return n.Annotations[akey]
+		} else {
+			klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For Node", m)
+			return ""
+		}
+	} else {
+		klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For Node", m)
+		return ""
+	}
+}
+
+func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
+	mf := strings.Split(m, ".")
+	if len(mf) > 1 && mf[0] == "metadata" {
+		if mf[1] == "name" {
+			return n.Name
+		} else if mf[1] == "labels" {
+			lkey := strings.Join(mf[2:len(mf)], "")
+			return n.Labels[lkey]
+		} else if mf[1] == "annotations" {
+			akey := strings.Join(mf[2:len(mf)], "")
+			return n.Annotations[akey]
+		} else {
+			klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
+			return ""
+		}
+	} else {
+		klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
+		return ""
+	}
+}
+
+func (c *CSVProvider) GetKey(l map[string]string, n *v1.Node) Key {
+	id := NodeValueFromMapField(c.NodeMapField, n)
+	return &csvKey{
+		ProviderID: id,
+		Labels:     l,
+	}
+}
+
+type csvPVKey struct {
+	Labels                 map[string]string
+	ProviderID             string
+	StorageClassName       string
+	StorageClassParameters map[string]string
+	Name                   string
+	DefaultRegion          string
+}
+
+func (key *csvPVKey) GetStorageClass() string {
+	return key.StorageClassName
+}
+
+func (key *csvPVKey) Features() string {
+	return key.ProviderID
+}
+
+func (c *CSVProvider) GetPVKey(pv *v1.PersistentVolume, parameters map[string]string, defaultRegion string) PVKey {
+	id := PVValueFromMapField(c.PVMapField, pv)
+	return &csvPVKey{
+		Labels:                 pv.Labels,
+		ProviderID:             id,
+		StorageClassName:       pv.Spec.StorageClassName,
+		StorageClassParameters: parameters,
+		Name:                   pv.Name,
+		DefaultRegion:          defaultRegion,
+	}
+}
+
+func (c *CSVProvider) PVPricing(pvk PVKey) (*PV, error) {
+	c.DownloadPricingDataLock.RLock()
+	defer c.DownloadPricingDataLock.RUnlock()
+	pricing, ok := c.PricingPV[pvk.Features()]
+	if !ok {
+		klog.V(4).Infof("Persistent Volume pricing not found for %s: %s", pvk.GetStorageClass(), pvk.Features())
+		return &PV{}, nil
+	}
+	return &PV{
+		Cost: pricing.MarketPriceHourly,
+	}, nil
+}

+ 1 - 1
pkg/cloud/customprovider.go

@@ -177,7 +177,7 @@ func (cp *CustomProvider) DownloadPricingData() error {
 	return nil
 }
 
-func (cp *CustomProvider) GetKey(labels map[string]string) Key {
+func (cp *CustomProvider) GetKey(labels map[string]string, n *v1.Node) Key {
 	return &customProviderKey{
 		SpotLabel:      cp.SpotLabel,
 		SpotLabelValue: cp.SpotLabelValue,

+ 2 - 2
pkg/cloud/gcpprovider.go

@@ -943,7 +943,7 @@ func (gcp *GCP) DownloadPricingData() error {
 
 	for _, n := range nodeList {
 		labels := n.GetObjectMeta().GetLabels()
-		key := gcp.GetKey(labels)
+		key := gcp.GetKey(labels, n)
 		inputkeys[key.Features()] = key
 	}
 
@@ -1285,7 +1285,7 @@ type gcpKey struct {
 	Labels map[string]string
 }
 
-func (gcp *GCP) GetKey(labels map[string]string) Key {
+func (gcp *GCP) GetKey(labels map[string]string, n *v1.Node) Key {
 	return &gcpKey{
 		Labels: labels,
 	}

+ 11 - 1
pkg/cloud/provider.go

@@ -169,7 +169,7 @@ type Provider interface {
 	NetworkPricing() (*Network, error)
 	AllNodePricing() (interface{}, error)
 	DownloadPricingData() error
-	GetKey(map[string]string) Key
+	GetKey(map[string]string, *v1.Node) Key
 	GetPVKey(*v1.PersistentVolume, map[string]string, string) PVKey
 	UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error)
 	UpdateConfigFromConfigMap(map[string]string) (*CustomPricing, error)
@@ -230,6 +230,16 @@ func NewCrossClusterProvider(ctype string, overrideConfigPath string, cache clus
 
 // NewProvider looks at the nodespec or provider metadata server to decide which provider to instantiate.
 func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, error) {
+	if os.Getenv("USE_CSV_PROVIDER") == "true" {
+		klog.Infof("Using CSV Provider with CSV at %s", os.Getenv("CSV_PATH"))
+		return &CSVProvider{
+			CSVLocation: os.Getenv("CSV_PATH"),
+			CustomProvider: &CustomProvider{
+				Clientset: cache,
+				Config:    NewProviderConfig("default.json"),
+			},
+		}, nil
+	}
 	if metadata.OnGCE() {
 		klog.V(3).Info("metadata reports we are in GCE")
 		if apiKey == "" {

+ 3 - 3
pkg/costmodel/costmodel.go

@@ -1137,7 +1137,7 @@ func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*cos
 		nodeLabels := n.GetObjectMeta().GetLabels()
 		nodeLabels["providerID"] = n.Spec.ProviderID
 
-		cnode, err := cp.NodePricing(cp.GetKey(nodeLabels))
+		cnode, err := cp.NodePricing(cp.GetKey(nodeLabels, n))
 		if err != nil {
 			klog.V(1).Infof("[Warning] Error getting node pricing. Error: " + err.Error())
 			if cnode != nil {
@@ -1209,7 +1209,7 @@ func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*cos
 
 		if newCnode.GPU != "" && newCnode.GPUCost == "" {
 			// We couldn't find a gpu cost, so fix cpu and ram, then accordingly
-			klog.V(4).Infof("GPU without cost found for %s, calculating...", cp.GetKey(nodeLabels).Features())
+			klog.V(4).Infof("GPU without cost found for %s, calculating...", cp.GetKey(nodeLabels, n).Features())
 
 			defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
 			if err != nil {
@@ -1299,7 +1299,7 @@ func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*cos
 			newCnode.GPUCost = fmt.Sprintf("%f", gpuPrice)
 		} else if newCnode.RAMCost == "" {
 			// We couldn't find a ramcost, so fix cpu and allocate ram accordingly
-			klog.V(4).Infof("No RAM cost found for %s, calculating...", cp.GetKey(nodeLabels).Features())
+			klog.V(4).Infof("No RAM cost found for %s, calculating...", cp.GetKey(nodeLabels, n).Features())
 
 			defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
 			if err != nil {

+ 43 - 0
test/cloud_test.go

@@ -0,0 +1,43 @@
+package test
+
+import (
+	"testing"
+	"github.com/kubecost/cost-model/pkg/cloud"
+	v1 "k8s.io/api/core/v1"
+)
+
+const(
+	providerIDMap = "spec.providerID"
+	nameMap = "metadata.name"
+	labelMapFoo = "metadata.labels.foo"
+)
+func TestNodeValueFromMapField(t *testing.T) {
+	providerIDWant := "providerid"
+	nameWant := "name"
+	labelFooWant := "labelfoo"
+
+	
+	n := &v1.Node{}
+	n.Spec.ProviderID = providerIDWant
+	n.Name = nameWant
+	n.Labels = make(map[string]string)
+	n.Labels["foo"] = labelFooWant
+
+	got := cloud.NodeValueFromMapField(providerIDMap, n)
+	if got != providerIDWant {
+		t.Errorf("Assert on '%s' want '%s' got '%s'", providerIDMap, providerIDWant, got)
+	}
+	
+	got = cloud.NodeValueFromMapField(nameMap, n)
+	if got != nameWant {
+		t.Errorf("Assert on '%s' want '%s' got '%s'", nameMap, nameWant, got)
+	}
+
+	got = cloud.NodeValueFromMapField(labelMapFoo, n)
+	if got != labelFooWant {
+		t.Errorf("Assert on '%s' want '%s' got '%s'", labelMapFoo, labelFooWant, got)
+	}
+
+}
+
+