Преглед изворни кода

Merge pull request #452 from kubecost/develop

Merge develop into master
Ajay Tripathy пре 5 година
родитељ
комит
d05c511675
4 измењених фајлова са 86 додато и 10 уклоњено
  1. 67 10
      pkg/cloud/awsprovider.go
  2. 7 0
      pkg/cloud/csvprovider.go
  3. 2 0
      pkg/cloud/customprovider.go
  4. 10 0
      test/cloud_test.go

+ 67 - 10
pkg/cloud/awsprovider.go

@@ -75,6 +75,9 @@ var awsRegions = []string{
 type AWS struct {
 	Pricing                 map[string]*AWSProductTerms
 	SpotPricingByInstanceID map[string]*spotInfo
+	SpotPricingUpdatedAt    *time.Time
+	SpotRefreshRunning      bool
+	SpotPricingLock         sync.RWMutex
 	RIPricingByInstanceID   map[string]*RIData
 	RIDataRunning           bool
 	RIDataLock              sync.RWMutex
@@ -558,6 +561,9 @@ func (aws *AWS) DownloadPricingData() error {
 		key := aws.GetPVKey(pv, params, "")
 		pvkeys[key.Features()] = key
 	}
+
+	// RIDataRunning establishes the existance of the goroutine. Since it's possible we
+	// run multiple downloads, we don't want to create multiple go routines if one already exists
 	if !aws.RIDataRunning && c.AthenaBucketName != "" {
 		err = aws.GetReservationDataFromAthena() // Block until one run has completed.
 		if err != nil {
@@ -565,9 +571,9 @@ func (aws *AWS) DownloadPricingData() error {
 		} else { // If we make one successful run, check on new reservation data every hour
 			go func() {
 				defer errors.HandlePanic()
+				aws.RIDataRunning = true
 
 				for {
-					aws.RIDataRunning = true
 					klog.Infof("Reserved Instance watcher running... next update in 1h")
 					time.Sleep(time.Hour)
 					err := aws.GetReservationDataFromAthena()
@@ -726,14 +732,50 @@ func (aws *AWS) DownloadPricingData() error {
 		}
 	}
 
+	// Always run spot pricing refresh when performing download
+	aws.refreshSpotPricing(true)
+
+	// Only start a single refresh goroutine
+	if !aws.SpotRefreshRunning {
+		aws.SpotRefreshRunning = true
+
+		go func() {
+			defer errors.HandlePanic()
+
+			for {
+				klog.Infof("Spot Pricing Refresh scheduled in 1 hr.")
+				time.Sleep(time.Hour)
+
+				// Reoccurring refresh checks update times
+				aws.refreshSpotPricing(false)
+			}
+		}()
+	}
+
+	return nil
+}
+
+func (aws *AWS) refreshSpotPricing(force bool) {
+	aws.SpotPricingLock.Lock()
+	defer aws.SpotPricingLock.Unlock()
+
+	now := time.Now().UTC()
+	updateTime := now.Add(-time.Hour)
+
+	// Return if there was an update time set and an hour hasn't elapsed
+	if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
+		return
+	}
+
 	sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
 	if err != nil {
 		klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
-	} else {
-		aws.SpotPricingByInstanceID = sp
+		return
 	}
 
-	return nil
+	// update time last updated
+	aws.SpotPricingUpdatedAt = &now
+	aws.SpotPricingByInstanceID = sp
 }
 
 // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
@@ -769,11 +811,26 @@ func (aws *AWS) AllNodePricing() (interface{}, error) {
 	return aws.Pricing, nil
 }
 
-func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
-	key := k.Features()
+func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
+	aws.SpotPricingLock.RLock()
+	defer aws.SpotPricingLock.RUnlock()
+
+	info, ok := aws.SpotPricingByInstanceID[instanceID]
+	return info, ok
+}
+
+func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
 	aws.RIDataLock.RLock()
 	defer aws.RIDataLock.RUnlock()
-	if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok {
+
+	data, ok := aws.RIPricingByInstanceID[instanceID]
+	return data, ok
+}
+
+func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
+	key := k.Features()
+
+	if spotInfo, ok := aws.spotPricing(k.ID()); ok {
 		var spotcost string
 		klog.V(3).Infof("Looking up spot data from feed for node %s", k.ID())
 		arr := strings.Split(spotInfo.Charge, " ")
@@ -807,7 +864,7 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*No
 			BaseGPUPrice: aws.BaseGPUPrice,
 			UsageType:    usageType,
 		}, nil
-	} else if ri, ok := aws.RIPricingByInstanceID[k.ID()]; ok {
+	} else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
 		strCost := fmt.Sprintf("%f", ri.EffectiveCost)
 		return &Node{
 			Cost:         strCost,
@@ -1776,8 +1833,8 @@ func (f fnames) Less(i, j int) bool {
 }
 
 func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
-
-	if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
+	// credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
+	if accessKeyID != "" && accessKeySecret != "" {
 		err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
 		if err != nil {
 			return nil, err

+ 7 - 0
pkg/cloud/csvprovider.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"io"
 	"os"
+	"regexp"
 	"strings"
 	"sync"
 	"time"
@@ -165,6 +166,12 @@ func (c *CSVProvider) NodePricing(key Key) (*Node, error) {
 func NodeValueFromMapField(m string, n *v1.Node) string {
 	mf := strings.Split(m, ".")
 	if len(mf) == 2 && mf[0] == "spec" && mf[1] == "providerID" {
+		provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
+		for matchNum, group := range provIdRx.FindStringSubmatch(n.Spec.ProviderID) {
+			if matchNum == 2 {
+				return group
+			}
+		}
 		return n.Spec.ProviderID
 	} else if len(mf) > 1 && mf[0] == "metadata" {
 		if mf[1] == "name" {

+ 2 - 0
pkg/cloud/customprovider.go

@@ -3,6 +3,7 @@ package cloud
 import (
 	"encoding/json"
 	"io"
+	"os"
 	"strconv"
 	"strings"
 	"sync"
@@ -105,6 +106,7 @@ func (cp *CustomProvider) ClusterInfo() (map[string]string, error) {
 		m["name"] = conf.ClusterName
 	}
 	m["provider"] = "custom"
+	m["id"] = os.Getenv(clusterIDKey)
 	return m, nil
 }
 

+ 10 - 0
test/cloud_test.go

@@ -11,6 +11,16 @@ const(
 	nameMap = "metadata.name"
 	labelMapFoo = "metadata.labels.foo"
 )
+func TestTransformedValueFromMapField(t *testing.T) {
+	providerIDWant := "i-05445591e0d182d42"
+	n := &v1.Node{}
+	n.Spec.ProviderID = "aws:///us-east-1a/i-05445591e0d182d42"
+	got := cloud.NodeValueFromMapField(providerIDMap, n)
+	if got != providerIDWant {
+		t.Errorf("Assert on '%s' want '%s' got '%s'", providerIDMap, providerIDWant, got)
+	}
+}
+
 func TestNodeValueFromMapField(t *testing.T) {
 	providerIDWant := "providerid"
 	nameWant := "gke-standard-cluster-1-pool-1-91dc432d-cg69"