Sfoglia il codice sorgente

Merge pull request #481 from kubecost/AjayTripathy-initial-savingsplan

first pass of savings plan implementation
Ajay Tripathy 5 anni fa
parent
commit
66128afb41
1 ha cambiato i file con 139 aggiunte e 25 eliminazioni
  1. 139 25
      pkg/cloud/awsprovider.go

+ 139 - 25
pkg/cloud/awsprovider.go

@@ -76,31 +76,34 @@ var awsRegions = []string{
 
 // AWS represents an Amazon Provider
 type AWS struct {
-	Pricing                 map[string]*AWSProductTerms
-	SpotPricingByInstanceID map[string]*spotInfo
-	SpotPricingUpdatedAt    *time.Time
-	SpotRefreshRunning      bool
-	SpotPricingLock         sync.RWMutex
-	RIPricingByInstanceID   map[string]*RIData
-	RIDataRunning           bool
-	RIDataLock              sync.RWMutex
-	ValidPricingKeys        map[string]bool
-	Clientset               clustercache.ClusterCache
-	BaseCPUPrice            string
-	BaseRAMPrice            string
-	BaseGPUPrice            string
-	BaseSpotCPUPrice        string
-	BaseSpotRAMPrice        string
-	SpotLabelName           string
-	SpotLabelValue          string
-	ServiceKeyName          string
-	ServiceKeySecret        string
-	SpotDataRegion          string
-	SpotDataBucket          string
-	SpotDataPrefix          string
-	ProjectID               string
-	DownloadPricingDataLock sync.RWMutex
-	Config                  *ProviderConfig
+	Pricing                     map[string]*AWSProductTerms
+	SpotPricingByInstanceID     map[string]*spotInfo
+	SpotPricingUpdatedAt        *time.Time
+	SpotRefreshRunning          bool
+	SpotPricingLock             sync.RWMutex
+	RIPricingByInstanceID       map[string]*RIData
+	RIDataRunning               bool
+	RIDataLock                  sync.RWMutex
+	SavingsPlanDataByInstanceID map[string]*SavingsPlanData
+	SavingsPlanDataRunning      bool
+	SavingsPlanDataLock         sync.RWMutex
+	ValidPricingKeys            map[string]bool
+	Clientset                   clustercache.ClusterCache
+	BaseCPUPrice                string
+	BaseRAMPrice                string
+	BaseGPUPrice                string
+	BaseSpotCPUPrice            string
+	BaseSpotRAMPrice            string
+	SpotLabelName               string
+	SpotLabelValue              string
+	ServiceKeyName              string
+	ServiceKeySecret            string
+	SpotDataRegion              string
+	SpotDataBucket              string
+	SpotDataPrefix              string
+	ProjectID                   string
+	DownloadPricingDataLock     sync.RWMutex
+	Config                      *ProviderConfig
 	*CustomProvider
 }
 
@@ -590,6 +593,25 @@ func (aws *AWS) DownloadPricingData() error {
 			}()
 		}
 	}
+	if !aws.SavingsPlanDataRunning && c.AthenaBucketName != "" {
+		err = aws.GetSavingsPlanDataFromAthena()
+		if err != nil {
+			klog.V(1).Infof("Failed to lookup savings plan data: %s", err.Error())
+		} else {
+			go func() {
+				defer errors.HandlePanic()
+				aws.SavingsPlanDataRunning = true
+				for {
+					klog.Infof("Savings Plan watcher running... next update in 1h")
+					time.Sleep(time.Hour)
+					err := aws.GetSavingsPlanDataFromAthena()
+					if err != nil {
+						klog.Infof("Error updating Savings Plan data: %s", err.Error())
+					}
+				}
+			}()
+		}
+	}
 
 	aws.Pricing = make(map[string]*AWSProductTerms)
 	aws.ValidPricingKeys = make(map[string]bool)
@@ -833,6 +855,14 @@ func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
 	return data, ok
 }
 
+func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
+	aws.SavingsPlanDataLock.RLock()
+	defer aws.SavingsPlanDataLock.RUnlock()
+
+	data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
+	return data, ok
+}
+
 func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
 	key := k.Features()
 
@@ -870,6 +900,20 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*No
 			BaseGPUPrice: aws.BaseGPUPrice,
 			UsageType:    usageType,
 		}, nil
+	} else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
+		strCost := fmt.Sprintf("%f", sp.EffectiveCost)
+		return &Node{
+			Cost:         strCost,
+			VCPU:         terms.VCpu,
+			RAM:          terms.Memory,
+			GPU:          terms.GPU,
+			Storage:      terms.Storage,
+			BaseCPUPrice: aws.BaseCPUPrice,
+			BaseRAMPrice: aws.BaseRAMPrice,
+			BaseGPUPrice: aws.BaseGPUPrice,
+			UsageType:    usageType,
+		}, nil
+
 	} else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
 		strCost := fmt.Sprintf("%f", ri.EffectiveCost)
 		return &Node{
@@ -1485,6 +1529,76 @@ func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutpu
 	}
 }
 
+type SavingsPlanData struct {
+	ResourceID     string
+	EffectiveCost  float64
+	SavingsPlanARN string
+	MostRecentDate string
+}
+
+func (a *AWS) GetSavingsPlanDataFromAthena() error {
+	cfg, err := a.GetConfig()
+	if err != nil {
+		return err
+	}
+	if cfg.AthenaBucketName == "" {
+		return fmt.Errorf("No Athena Bucket configured")
+	}
+	if a.SavingsPlanDataByInstanceID == nil {
+		a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
+	}
+	tNow := time.Now()
+	tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
+	start := tOneDayAgo.Format("2006-01-02")
+	end := tNow.Format("2006-01-02")
+	q := `SELECT   
+		line_item_usage_start_date,
+		savings_plan_savings_plan_a_r_n,
+		line_item_resource_id,
+		savings_plan_savings_plan_effective_cost
+	FROM %s as cost_data
+	WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
+	AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY 
+	line_item_usage_start_date DESC`
+	query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
+	op, err := a.QueryAthenaBillingData(query)
+	if err != nil {
+		return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
+	}
+	klog.Infof("Fetching SavingsPlan data...")
+	if len(op.ResultSet.Rows) > 1 {
+		a.SavingsPlanDataLock.Lock()
+		mostRecentDate := ""
+		for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
+			d := *r.Data[0].VarCharValue
+			if mostRecentDate == "" {
+				mostRecentDate = d
+			} else if mostRecentDate != d { // Get all most recent assignments
+				break
+			}
+			cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
+			if err != nil {
+				klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
+			}
+			r := &SavingsPlanData{
+				ResourceID:     *r.Data[2].VarCharValue,
+				EffectiveCost:  cost,
+				SavingsPlanARN: *r.Data[1].VarCharValue,
+				MostRecentDate: d,
+			}
+			a.SavingsPlanDataByInstanceID[r.ResourceID] = r
+		}
+		klog.V(1).Infof("Found %d savings plan applied instances", len(a.SavingsPlanDataByInstanceID))
+		for k, r := range a.SavingsPlanDataByInstanceID {
+			klog.V(1).Infof("Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
+		}
+		a.SavingsPlanDataLock.Unlock()
+	} else {
+		klog.Infof("No savings plan applied instance data found")
+	}
+	return nil
+}
+
 type RIData struct {
 	ResourceID     string
 	EffectiveCost  float64