فهرست منبع

Improvements to RI data retrieval and load. Added a spot pricing refresh to AWS to ensure we continually try and load any new pricing updates every hour.

Matt Bolt 6 سال پیش
والد
کامیت
d79fdf0f84
1فایلهای تغییر یافته به همراه67 افزوده شده و 10 حذف شده
  1. 67 10
      pkg/cloud/awsprovider.go

+ 67 - 10
pkg/cloud/awsprovider.go

@@ -75,6 +75,9 @@ var awsRegions = []string{
 type AWS struct {
 	Pricing                 map[string]*AWSProductTerms
 	SpotPricingByInstanceID map[string]*spotInfo
+	SpotPricingUpdatedAt    *time.Time
+	SpotRefreshRunning      bool
+	SpotPricingLock         sync.RWMutex
 	RIPricingByInstanceID   map[string]*RIData
 	RIDataRunning           bool
 	RIDataLock              sync.RWMutex
@@ -558,6 +561,9 @@ func (aws *AWS) DownloadPricingData() error {
 		key := aws.GetPVKey(pv, params, "")
 		pvkeys[key.Features()] = key
 	}
+
+	// RIDataRunning establishes the existance of the goroutine. Since it's possible we
+	// run multiple downloads, we don't want to create multiple go routines if one already exists
 	if !aws.RIDataRunning && c.AthenaBucketName != "" {
 		err = aws.GetReservationDataFromAthena() // Block until one run has completed.
 		if err != nil {
@@ -565,9 +571,9 @@ func (aws *AWS) DownloadPricingData() error {
 		} else { // If we make one successful run, check on new reservation data every hour
 			go func() {
 				defer errors.HandlePanic()
+				aws.RIDataRunning = true
 
 				for {
-					aws.RIDataRunning = true
 					klog.Infof("Reserved Instance watcher running... next update in 1h")
 					time.Sleep(time.Hour)
 					err := aws.GetReservationDataFromAthena()
@@ -726,14 +732,50 @@ func (aws *AWS) DownloadPricingData() error {
 		}
 	}
 
+	// Always run spot pricing refresh when performing download
+	aws.refreshSpotPricing(true)
+
+	// Only start a single refresh goroutine
+	if !aws.SpotRefreshRunning {
+		aws.SpotRefreshRunning = true
+
+		go func() {
+			defer errors.HandlePanic()
+
+			for {
+				klog.Infof("Spot Pricing Refresh scheduled in 1 hr.")
+				time.Sleep(time.Hour)
+
+				// Reoccurring refresh checks update times
+				aws.refreshSpotPricing(false)
+			}
+		}()
+	}
+
+	return nil
+}
+
+func (aws *AWS) refreshSpotPricing(force bool) {
+	aws.SpotPricingLock.Lock()
+	defer aws.SpotPricingLock.Unlock()
+
+	now := time.Now().UTC()
+	updateTime := now.Add(-time.Hour)
+
+	// Return if there was an update time set and an hour hasn't elapsed
+	if !force && aws.SpotPricingUpdatedAt != nil && aws.SpotPricingUpdatedAt.After(updateTime) {
+		return
+	}
+
 	sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
 	if err != nil {
 		klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
-	} else {
-		aws.SpotPricingByInstanceID = sp
+		return
 	}
 
-	return nil
+	// update time last updated
+	aws.SpotPricingUpdatedAt = &now
+	aws.SpotPricingByInstanceID = sp
 }
 
 // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
@@ -769,11 +811,26 @@ func (aws *AWS) AllNodePricing() (interface{}, error) {
 	return aws.Pricing, nil
 }
 
-func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
-	key := k.Features()
+func (aws *AWS) spotPricing(instanceID string) (*spotInfo, bool) {
+	aws.SpotPricingLock.RLock()
+	defer aws.SpotPricingLock.RUnlock()
+
+	info, ok := aws.SpotPricingByInstanceID[instanceID]
+	return info, ok
+}
+
+func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
 	aws.RIDataLock.RLock()
 	defer aws.RIDataLock.RUnlock()
-	if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok {
+
+	data, ok := aws.RIPricingByInstanceID[instanceID]
+	return data, ok
+}
+
+func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
+	key := k.Features()
+
+	if spotInfo, ok := aws.spotPricing(k.ID()); ok {
 		var spotcost string
 		klog.V(3).Infof("Looking up spot data from feed for node %s", k.ID())
 		arr := strings.Split(spotInfo.Charge, " ")
@@ -807,7 +864,7 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*No
 			BaseGPUPrice: aws.BaseGPUPrice,
 			UsageType:    usageType,
 		}, nil
-	} else if ri, ok := aws.RIPricingByInstanceID[k.ID()]; ok {
+	} else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
 		strCost := fmt.Sprintf("%f", ri.EffectiveCost)
 		return &Node{
 			Cost:         strCost,
@@ -1776,8 +1833,8 @@ func (f fnames) Less(i, j int) bool {
 }
 
 func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
-
-	if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
+	// credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
+	if accessKeyID != "" && accessKeySecret != "" {
 		err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
 		if err != nil {
 			return nil, err