Explorar o código

add mutex, always run RI synchronously once

AjayTripathy %!s(int64=6) %!d(string=hai) anos
pai
achega
bbfb19ab25
Modificáronse 1 ficheiros con 66 adicións e 53 borrados
  1. 66 53
      pkg/cloud/awsprovider.go

+ 66 - 53
pkg/cloud/awsprovider.go

@@ -46,6 +46,7 @@ type AWS struct {
 	SpotPricingByInstanceID map[string]*spotInfo
 	RIPricingByInstanceID   map[string]*RIData
 	RIDataRunning           bool
+	RIDataLock              sync.RWMutex
 	ValidPricingKeys        map[string]bool
 	Clientset               clustercache.ClusterCache
 	BaseCPUPrice            string
@@ -511,10 +512,22 @@ func (aws *AWS) DownloadPricingData() error {
 		key := aws.GetPVKey(pv, params, "")
 		pvkeys[key.Features()] = key
 	}
-	if !aws.RIDataRunning {
-		err = aws.GetReservationDataFromAthena()
+	if !aws.RIDataRunning && c.AthenaBucketName != "" {
+		err = aws.GetReservationDataFromAthena() // Block until one run has completed.
 		if err != nil {
 			klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
+		} else { // If we make one successful run, check on new reservation data every hour
+			go func() {
+				for {
+					aws.RIDataRunning = true
+					klog.Infof("Reserved Instance watcher running... next update in 1h")
+					time.Sleep(5 * time.Minute)
+					err := aws.GetReservationDataFromAthena()
+					if err != nil {
+						klog.Infof("Error updating RI data: %s", err.Error())
+					}
+				}
+			}()
 		}
 	}
 
@@ -710,6 +723,8 @@ func (aws *AWS) AllNodePricing() (interface{}, error) {
 
 func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
 	key := k.Features()
+	aws.RIDataLock.RLock()
+	defer aws.RIDataLock.RUnlock()
 	if aws.isPreemptible(key) {
 		if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
 			var spotcost string
@@ -1144,62 +1159,60 @@ func (a *AWS) GetReservationDataFromAthena() error {
 		return err
 	}
 	if cfg.AthenaBucketName == "" {
-		return nil
+		return fmt.Errorf("No Athena Bucket configured")
 	}
-	go func() {
-		for {
-			a.RIDataRunning = true
-			a.RIPricingByInstanceID = make(map[string]*RIData)
-			tNow := time.Now()
-			tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
-			start := tOneDayAgo.Format("2006-01-02")
-			end := tNow.Format("2006-01-02")
-			q := `SELECT   
-				line_item_usage_start_date,
-				reservation_reservation_a_r_n,
-				line_item_resource_id,
-				reservation_effective_cost
-			FROM athena_test as cost_data
-			WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
-			AND reservation_reservation_a_r_n <> '' ORDER BY 
-			line_item_usage_start_date DESC`
-			query := fmt.Sprintf(q, start, end)
-			op, err := a.QueryAthenaBillingData(query)
+	if a.RIPricingByInstanceID == nil {
+		a.RIPricingByInstanceID = make(map[string]*RIData)
+	}
+	tNow := time.Now()
+	tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
+	start := tOneDayAgo.Format("2006-01-02")
+	end := tNow.Format("2006-01-02")
+	q := `SELECT   
+		line_item_usage_start_date,
+		reservation_reservation_a_r_n,
+		line_item_resource_id,
+		reservation_effective_cost
+	FROM athena_test as cost_data
+	WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
+	AND reservation_reservation_a_r_n <> '' ORDER BY 
+	line_item_usage_start_date DESC`
+	query := fmt.Sprintf(q, start, end)
+	op, err := a.QueryAthenaBillingData(query)
+	if err != nil {
+		return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
+	}
+	klog.Infof("Fetching RI data...")
+	if len(op.ResultSet.Rows) > 1 {
+		a.RIDataLock.Lock()
+		mostRecentDate := ""
+		for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
+			d := *r.Data[0].VarCharValue
+			if mostRecentDate == "" {
+				mostRecentDate = d
+			} else if mostRecentDate != d { // Get all most recent assignments
+				break
+			}
+			cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
 			if err != nil {
-				klog.Infof("Error fetching Reserved Instance Data: %s", err)
+				klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
 			}
-			klog.Infof("Fetching RI data...")
-			if len(op.ResultSet.Rows) > 1 {
-				mostRecentDate := ""
-				for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
-					d := *r.Data[0].VarCharValue
-					if mostRecentDate == "" {
-						mostRecentDate = d
-					} else if mostRecentDate != d { // Get all most recent assignments
-						break
-					}
-					cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
-					if err != nil {
-						klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
-					}
-					r := &RIData{
-						ResourceID:     *r.Data[2].VarCharValue,
-						EffectiveCost:  cost,
-						ReservationARN: *r.Data[1].VarCharValue,
-						MostRecentDate: d,
-					}
-					a.RIPricingByInstanceID[r.ResourceID] = r
-				}
-				klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
-				for k, r := range a.RIPricingByInstanceID {
-					klog.V(1).Infof("Reserved Instance Data found for node %s : %f", k, r.EffectiveCost)
-				}
-			} else {
-				klog.Infof("No reserved instance data found")
+			r := &RIData{
+				ResourceID:     *r.Data[2].VarCharValue,
+				EffectiveCost:  cost,
+				ReservationARN: *r.Data[1].VarCharValue,
+				MostRecentDate: d,
 			}
-			time.Sleep(time.Hour)
+			a.RIPricingByInstanceID[r.ResourceID] = r
+		}
+		klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
+		for k, r := range a.RIPricingByInstanceID {
+			klog.V(1).Infof("Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
 		}
-	}()
+		a.RIDataLock.Unlock()
+	} else {
+		klog.Infof("No reserved instance data found")
+	}
 	return nil
 }