Просмотр исходного кода

Merge pull request #447 from kubecost/develop

Merge develop into master
Ajay Tripathy 6 лет назад
Родитель
Сommit
e9174c4143
8 измененных файлов с 294 добавлено и 62 удалено
  1. 39 3
      pkg/cloud/csvprovider.go
  2. 19 7
      pkg/cloud/provider.go
  3. 44 20
      pkg/costmodel/costmodel.go
  4. 16 11
      pkg/costmodel/router.go
  5. 60 0
      pkg/log/log.go
  6. 18 16
      pkg/util/pool.go
  7. 5 5
      pkg/util/vector.go
  8. 93 0
      test/keytuple_test.go

+ 39 - 3
pkg/cloud/csvprovider.go

@@ -7,13 +7,19 @@ import (
 	"os"
 	"strings"
 	"sync"
+	"time"
 
+	"github.com/aws/aws-sdk-go/aws"
+	"github.com/aws/aws-sdk-go/aws/session"
+	"github.com/aws/aws-sdk-go/service/s3"
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/klog"
 
 	"github.com/jszwec/csvutil"
 )
 
+const refreshMinutes = 60
+
 type CSVProvider struct {
 	*CustomProvider
 	CSVLocation             string
@@ -47,13 +53,42 @@ func (c *CSVProvider) DownloadPricingData() error {
 		return err
 	}
 	fieldsPerRecord := len(header)
-	csvr, err := GetCsv(c.CSVLocation)
+	var csvr io.Reader
+	var csverr error
+	if strings.HasPrefix(c.CSVLocation, "s3://") {
+		region := os.Getenv("CSV_REGION")
+		conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
+		s3Client := s3.New(session.New(conf))
+		bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
+		if len(bucketAndKey) == 2 {
+			out, err := s3Client.GetObject(&s3.GetObjectInput{
+				Bucket: aws.String(bucketAndKey[0]),
+				Key:    aws.String(bucketAndKey[1]),
+			})
+			csverr = err
+			csvr = out.Body
+		} else {
+			c.Pricing = pricing
+			c.PricingPV = pvpricing
+			return fmt.Errorf("Invalid s3 URI: %s", c.CSVLocation)
+		}
+	} else {
+		csvr, csverr = GetCsv(c.CSVLocation)
+	}
+	if csverr != nil {
+		klog.Infof("Error reading csv at %s: %s", c.CSVLocation, csverr)
+		c.Pricing = pricing
+		c.PricingPV = pvpricing
+		return nil
+	}
 	csvReader := csv.NewReader(csvr)
 	csvReader.Comma = ','
 	csvReader.FieldsPerRecord = fieldsPerRecord
 
 	dec, err := csvutil.NewDecoder(csvReader, header...)
 	if err != nil {
+		c.Pricing = pricing
+		c.PricingPV = pvpricing
 		return err
 	}
 	for {
@@ -97,6 +132,7 @@ func (c *CSVProvider) DownloadPricingData() error {
 	} else {
 		klog.Infof("[WARNING] No data received from csv")
 	}
+	time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
 	return nil
 }
 
@@ -161,11 +197,11 @@ func PVValueFromMapField(m string, n *v1.PersistentVolume) string {
 			akey := strings.Join(mf[2:len(mf)], "")
 			return n.Annotations[akey]
 		} else {
-			klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
+			klog.V(4).Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
 			return ""
 		}
 	} else {
-		klog.Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
+		klog.V(4).Infof("[ERROR] Unsupported InstanceIDField %s in CSV For PV", m)
 		return ""
 	}
 }

+ 19 - 7
pkg/cloud/provider.go

@@ -230,13 +230,31 @@ func NewCrossClusterProvider(ctype string, overrideConfigPath string, cache clus
 
 // NewProvider looks at the nodespec or provider metadata server to decide which provider to instantiate.
 func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, error) {
+	nodes := cache.GetAllNodes()
+	if len(nodes) == 0 {
+		return nil, fmt.Errorf("Could not locate any nodes for cluster.")
+	}
+
+	provider := strings.ToLower(nodes[0].Spec.ProviderID)
+
 	if os.Getenv("USE_CSV_PROVIDER") == "true" {
 		klog.Infof("Using CSV Provider with CSV at %s", os.Getenv("CSV_PATH"))
+		configFileName := ""
+		if metadata.OnGCE() {
+			configFileName = "gcp.json"
+		} else if strings.HasPrefix(provider, "aws") {
+			configFileName = "aws.json"
+		} else if strings.HasPrefix(provider, "azure") {
+			configFileName = "azure.json"
+
+		} else {
+			configFileName = "default.json"
+		}
 		return &CSVProvider{
 			CSVLocation: os.Getenv("CSV_PATH"),
 			CustomProvider: &CustomProvider{
 				Clientset: cache,
-				Config:    NewProviderConfig("default.json"),
+				Config:    NewProviderConfig(configFileName),
 			},
 		}, nil
 	}
@@ -252,12 +270,6 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 		}, nil
 	}
 
-	nodes := cache.GetAllNodes()
-	if len(nodes) == 0 {
-		return nil, fmt.Errorf("Could not locate any nodes for cluster.")
-	}
-
-	provider := strings.ToLower(nodes[0].Spec.ProviderID)
 	if strings.HasPrefix(provider, "aws") {
 		klog.V(2).Info("Found ProviderID starting with \"aws\", using AWS Provider")
 		return &AWS{

+ 44 - 20
pkg/costmodel/costmodel.go

@@ -1489,9 +1489,9 @@ func getPodDeploymentsWithMetrics(deploymentLabels map[string]map[string]string,
 			continue
 		}
 
-		namespace := kt.Namespace
-		name := kt.Key
-		clusterID := kt.ClusterID
+		namespace := kt.Namespace()
+		name := kt.Key()
+		clusterID := kt.ClusterID()
 
 		key := namespace + "," + clusterID
 		if _, ok := podDeploymentsMapping[key]; !ok {
@@ -1503,9 +1503,9 @@ func getPodDeploymentsWithMetrics(deploymentLabels map[string]map[string]string,
 			if err != nil {
 				continue
 			}
-			podNamespace := pkey.Namespace
-			podName := pkey.Key
-			podClusterID := pkey.ClusterID
+			podNamespace := pkey.Namespace()
+			podName := pkey.Key()
+			podClusterID := pkey.ClusterID()
 
 			labelSet := labels.Set(pLabels)
 			if s.Matches(labelSet) && podNamespace == namespace && podClusterID == clusterID {
@@ -1534,9 +1534,9 @@ func getPodServicesWithMetrics(serviceLabels map[string]map[string]string, podLa
 			continue
 		}
 
-		namespace := kt.Namespace
-		name := kt.Key
-		clusterID := kt.ClusterID
+		namespace := kt.Namespace()
+		name := kt.Key()
+		clusterID := kt.ClusterID()
 
 		key := namespace + "," + clusterID
 		if _, ok := podServicesMapping[key]; !ok {
@@ -1552,9 +1552,9 @@ func getPodServicesWithMetrics(serviceLabels map[string]map[string]string, podLa
 			if err != nil {
 				continue
 			}
-			podNamespace := pkey.Namespace
-			podName := pkey.Key
-			podClusterID := pkey.ClusterID
+			podNamespace := pkey.Namespace()
+			podName := pkey.Key()
+			podClusterID := pkey.ClusterID()
 
 			labelSet := labels.Set(pLabels)
 			if s.Matches(labelSet) && podNamespace == namespace && podClusterID == clusterID {
@@ -2817,20 +2817,44 @@ func newContainerMetricFromPrometheus(metrics map[string]interface{}, defaultClu
 }
 
 type KeyTuple struct {
-	Namespace string
-	Key       string
-	ClusterID string
+	key    string
+	kIndex int
+	cIndex int
+}
+
+func (kt *KeyTuple) Namespace() string {
+	return kt.key[0 : kt.kIndex-1]
+}
+
+func (kt *KeyTuple) Key() string {
+	return kt.key[kt.kIndex : kt.cIndex-1]
+}
+
+func (kt *KeyTuple) ClusterID() string {
+	return kt.key[kt.cIndex:]
 }
 
 func NewKeyTuple(key string) (*KeyTuple, error) {
-	r := strings.Split(key, ",")
-	if len(r) != 3 {
+	kIndex := strings.IndexRune(key, ',')
+	if kIndex < 0 {
 		return nil, fmt.Errorf("NewKeyTuple() Provided key not containing exactly 3 components.")
 	}
+	kIndex += 1
+
+	subIndex := strings.IndexRune(key[kIndex:], ',')
+	if subIndex < 0 {
+		return nil, fmt.Errorf("NewKeyTuple() Provided key not containing exactly 3 components.")
+	}
+	cIndex := kIndex + subIndex + 1
+
+	if strings.ContainsRune(key[cIndex:], ',') {
+		return nil, fmt.Errorf("NewKeyTuple() Provided key not containing exactly 3 components.")
+	}
+
 	return &KeyTuple{
-		Namespace: r[0],
-		Key:       r[1],
-		ClusterID: r[2],
+		key:    key,
+		kIndex: kIndex,
+		cIndex: cIndex,
 	}, nil
 }
 

+ 16 - 11
pkg/costmodel/router.go

@@ -943,18 +943,23 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	}
 	promCli, _ := prometheusClient.NewClient(pc)
 
-	api := prometheusAPI.NewAPI(promCli)
-	_, err = api.Config(context.Background())
-	if err != nil {
-		klog.Fatalf("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
-	}
-	klog.V(1).Info("Success: retrieved a prometheus config file from: " + address)
-
-	_, err = ValidatePrometheus(promCli, false)
-	if err != nil {
-		klog.Fatalf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
+	m, err := ValidatePrometheus(promCli, false)
+	if err != nil || m.Running == false {
+		if err != nil {
+			klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
+		} else if m.Running == false {
+			klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prometheusTroubleshootingEp)
+		}
+		api := prometheusAPI.NewAPI(promCli)
+		_, err = api.Config(context.Background())
+		if err != nil {
+			klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prometheusTroubleshootingEp)
+		} else {
+			klog.V(1).Info("Retrieved a prometheus config file from: " + address)
+		}
+	} else {
+		klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
 	}
-	klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
 
 	// Kubernetes API setup
 	kc, err := rest.InClusterConfig()

+ 60 - 0
pkg/log/log.go

@@ -38,3 +38,63 @@ func ProfileWithThreshold(start time.Time, threshold time.Duration, name string)
 		Profilef("%s: %s", elapsed, name)
 	}
 }
+
+type Profiler struct {
+	profiles map[string]time.Duration
+	starts   map[string]time.Time
+}
+
+func NewProfiler() *Profiler {
+	return &Profiler{
+		profiles: map[string]time.Duration{},
+		starts:   map[string]time.Time{},
+	}
+}
+
+func (p *Profiler) Start(name string) {
+	if p == nil {
+		return
+	}
+	p.starts[name] = time.Now()
+}
+
+func (p *Profiler) Stop(name string) time.Duration {
+	if p == nil {
+		return 0
+	}
+	if start, ok := p.starts[name]; ok {
+		elapsed := time.Since(start)
+		p.profiles[name] += elapsed
+		return elapsed
+	}
+	return 0
+}
+
+func (p *Profiler) Log(name string) {
+	if p == nil {
+		return
+	}
+	Profilef("%s: %s", p.profiles[name], name)
+}
+
+func (p *Profiler) LogAll() {
+	if p == nil {
+		return
+	}
+
+	// Print profiles, largest to smallest. (Inefficienct, but shouldn't matter.)
+	print := map[string]time.Duration{}
+	for name, value := range p.profiles {
+		print[name] = value
+	}
+	for len(print) > 0 {
+		largest := ""
+		for name := range print {
+			if print[name] > print[largest] {
+				largest = name
+			}
+		}
+		Profilef("%s: %s", print[largest], largest)
+		delete(print, largest)
+	}
+}

+ 18 - 16
pkg/util/pool.go

@@ -7,8 +7,8 @@ import (
 // A pool of vector maps for mapping float64 timestamps
 // to float64 values
 type VectorMapPool interface {
-	Get() map[float64]float64
-	Put(map[float64]float64)
+	Get() map[uint64]float64
+	Put(map[uint64]float64)
 }
 
 // ------------
@@ -19,17 +19,17 @@ type VectorMapPool interface {
 // maps will block until one is available. You will be unable to
 // Put() a map if the buffer is full.
 type FixedMapPool struct {
-	maps chan map[float64]float64
+	maps chan map[uint64]float64
 	size int
 }
 
 // Returns a map from the pool. Blocks if no maps are available for re-use
-func (mp *FixedMapPool) Get() map[float64]float64 {
+func (mp *FixedMapPool) Get() map[uint64]float64 {
 	return <-mp.maps
 }
 
 // Adds a map back to the pool if there is room. Does not block on overflow.
-func (mp *FixedMapPool) Put(m map[float64]float64) {
+func (mp *FixedMapPool) Put(m map[uint64]float64) {
 	if len(mp.maps) >= mp.size {
 		return
 	}
@@ -44,13 +44,13 @@ func (mp *FixedMapPool) Put(m map[float64]float64) {
 // Creates a new fixed map pool which maintains a fixed pool size
 func NewFixedMapPool(size int) VectorMapPool {
 	mp := &FixedMapPool{
-		maps: make(chan map[float64]float64, size),
+		maps: make(chan map[uint64]float64, size),
 		size: size,
 	}
 
 	// Pre-Populate the buffer with maps
 	for i := 0; i < size; i++ {
-		mp.maps <- make(map[float64]float64)
+		mp.maps <- make(map[uint64]float64)
 	}
 
 	return mp
@@ -64,21 +64,21 @@ func NewFixedMapPool(size int) VectorMapPool {
 // not block if maps are over requested, but will only maintain
 // a buffer up the size limitation.
 type FlexibleMapPool struct {
-	maps chan map[float64]float64
+	maps chan map[uint64]float64
 }
 
 // Returns a map from the pool. Does not block on over-request.
-func (mp *FlexibleMapPool) Get() map[float64]float64 {
+func (mp *FlexibleMapPool) Get() map[uint64]float64 {
 	select {
 	case m := <-mp.maps:
 		return m
 	default:
-		return make(map[float64]float64)
+		return make(map[uint64]float64)
 	}
 }
 
 // Adds a map back to the pool if there is room. Does not block on overflow.
-func (mp *FlexibleMapPool) Put(m map[float64]float64) {
+func (mp *FlexibleMapPool) Put(m map[uint64]float64) {
 	for k := range m {
 		delete(m, k)
 	}
@@ -86,14 +86,16 @@ func (mp *FlexibleMapPool) Put(m map[float64]float64) {
 	// Either return the map to the buffered channel, or do nothing
 	select {
 	case mp.maps <- m:
+		return
 	default:
+		return
 	}
 }
 
 // Creates a new fixed map pool which maintains a fixed pool size
 func NewFlexibleMapPool(size int) VectorMapPool {
 	return &FlexibleMapPool{
-		maps: make(chan map[float64]float64, size),
+		maps: make(chan map[uint64]float64, size),
 	}
 }
 
@@ -105,12 +107,12 @@ type UnboundedMapPool struct {
 }
 
 // Returns a map from the pool. Does not block on over-request.
-func (mp *UnboundedMapPool) Get() map[float64]float64 {
-	return mp.maps.Get().(map[float64]float64)
+func (mp *UnboundedMapPool) Get() map[uint64]float64 {
+	return mp.maps.Get().(map[uint64]float64)
 }
 
 // Adds a map back to the pool if there is room. Does not block on overflow.
-func (mp *UnboundedMapPool) Put(m map[float64]float64) {
+func (mp *UnboundedMapPool) Put(m map[uint64]float64) {
 	for k := range m {
 		delete(m, k)
 	}
@@ -124,7 +126,7 @@ func NewUnboundedMapPool() VectorMapPool {
 	return &UnboundedMapPool{
 		maps: &sync.Pool{
 			New: func() interface{} {
-				return make(map[float64]float64)
+				return make(map[uint64]float64)
 			},
 		},
 	}

+ 5 - 5
pkg/util/vector.go

@@ -68,7 +68,7 @@ func ApplyVectorOp(xvs []*Vector, yvs []*Vector, op VectorJoinOp) []*Vector {
 		// round all non-zero timestamps to the nearest 10 second mark
 		xv.Timestamp = roundTimestamp(xv.Timestamp, 10.0)
 
-		xMap[xv.Timestamp] = xv.Value
+		xMap[uint64(xv.Timestamp)] = xv.Value
 		timestamps = append(timestamps, &Vector{
 			Timestamp: xv.Timestamp,
 		})
@@ -85,8 +85,8 @@ func ApplyVectorOp(xvs []*Vector, yvs []*Vector, op VectorJoinOp) []*Vector {
 		// round all non-zero timestamps to the nearest 10 second mark
 		yv.Timestamp = roundTimestamp(yv.Timestamp, 10.0)
 
-		yMap[yv.Timestamp] = yv.Value
-		if _, ok := xMap[yv.Timestamp]; !ok {
+		yMap[uint64(yv.Timestamp)] = yv.Value
+		if _, ok := xMap[uint64(yv.Timestamp)]; !ok {
 			// no need to double add, since we'll range over sorted timestamps and check.
 			timestamps = append(timestamps, &Vector{
 				Timestamp: yv.Timestamp,
@@ -98,8 +98,8 @@ func ApplyVectorOp(xvs []*Vector, yvs []*Vector, op VectorJoinOp) []*Vector {
 	// reuse the existing slice to reduce allocations
 	result := timestamps[:0]
 	for _, sv := range timestamps {
-		x, okX := xMap[sv.Timestamp]
-		y, okY := yMap[sv.Timestamp]
+		x, okX := xMap[uint64(sv.Timestamp)]
+		y, okY := yMap[uint64(sv.Timestamp)]
 
 		if op(sv, VectorValue(x, okX), VectorValue(y, okY)) {
 			result = append(result, sv)

+ 93 - 0
test/keytuple_test.go

@@ -0,0 +1,93 @@
+package costmodel_test
+
+import (
+	"strings"
+	"testing"
+
+	"github.com/kubecost/cost-model/pkg/costmodel"
+)
+
+func TestKeyTupleSplit(t *testing.T) {
+	const (
+		ns        = "kubecost"
+		key       = "my-pod"
+		clusterID = "cluster-one"
+		fullKey   = "kubecost,my-pod,cluster-one"
+	)
+
+	kt, err := costmodel.NewKeyTuple(fullKey)
+	if err != nil {
+		t.Errorf("Error: %s\n", err)
+		return
+	}
+
+	t.Logf("Namespace: %s, Key: %s, ClusterID: %s\n", kt.Namespace(), kt.Key(), kt.ClusterID())
+
+	if !strings.EqualFold(kt.Namespace(), ns) {
+		t.Errorf("Namespace: \"%s\" != \"%s\"", kt.Namespace(), ns)
+		return
+	}
+	if !strings.EqualFold(kt.Key(), key) {
+		t.Errorf("Key: \"%s\" != \"%s\"\n", kt.Key(), key)
+		return
+	}
+	if !strings.EqualFold(kt.ClusterID(), clusterID) {
+		t.Errorf("ClusterID: \"%s\" != \"%s\"\n", kt.ClusterID(), clusterID)
+		return
+	}
+}
+
+func TestKeyTupleSingleFail(t *testing.T) {
+	_, err := costmodel.NewKeyTuple("foo")
+	if err == nil {
+		t.Errorf("Error was non-nil for single element!")
+		return
+	}
+}
+
+func TestKeyTupleDoubleFail(t *testing.T) {
+	_, err := costmodel.NewKeyTuple("foo,bar")
+	if err == nil {
+		t.Errorf("Error was non-nil for two elements!")
+		return
+	}
+}
+
+func TestKeyTupleMoreThanThreeFail(t *testing.T) {
+	_, err := costmodel.NewKeyTuple("foo,bar,fizz,buzz")
+	if err == nil {
+		t.Errorf("Error was non-nil for two elements!")
+		return
+	}
+}
+
+func TestOnlyCommas(t *testing.T) {
+	kt, err := costmodel.NewKeyTuple(",,")
+	if err != nil {
+		t.Errorf("Error: %s\n", err)
+		return
+	}
+
+	t.Logf("Namespace: \"%s\", Key: \"%s\", ClusterID: \"%s\"\n", kt.Namespace(), kt.Key(), kt.ClusterID())
+
+	if !strings.EqualFold(kt.Namespace(), "") {
+		t.Errorf("Namespace: \"%s\" != \"%s\"", kt.Namespace(), "")
+		return
+	}
+	if !strings.EqualFold(kt.Key(), "") {
+		t.Errorf("Key: \"%s\" != \"%s\"\n", kt.Key(), "")
+		return
+	}
+	if !strings.EqualFold(kt.ClusterID(), "") {
+		t.Errorf("ClusterID: \"%s\" != \"%s\"\n", kt.ClusterID(), "")
+		return
+	}
+}
+
+func TestManyEntrys(t *testing.T) {
+	_, err := costmodel.NewKeyTuple("a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p")
+	if err == nil {
+		t.Errorf("Error was non-nil for single element!")
+		return
+	}
+}