Ver código fonte

AWS Reserved Instance pricing per region.

Matt Bolt 6 anos atrás
pai
commit
66ed60d32b
1 arquivos alterados com 234 adições e 53 exclusões
  1. 234 53
      cloud/awsprovider.go

+ 234 - 53
cloud/awsprovider.go

@@ -35,6 +35,7 @@ import (
 
 
 const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
 const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
 const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
 const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
+const awsReservedInstancePricePerHour = 0.0287
 const supportedSpotFeedVersion = "1"
 const supportedSpotFeedVersion = "1"
 const SpotInfoUpdateType = "spotinfo"
 const SpotInfoUpdateType = "spotinfo"
 const AthenaInfoUpdateType = "athenainfo"
 const AthenaInfoUpdateType = "athenainfo"
@@ -59,6 +60,7 @@ type AWS struct {
 	SpotDataPrefix          string
 	SpotDataPrefix          string
 	ProjectID               string
 	ProjectID               string
 	DownloadPricingDataLock sync.RWMutex
 	DownloadPricingDataLock sync.RWMutex
+	ReservedInstances       []*AWSReservedInstance
 	*CustomProvider
 	*CustomProvider
 }
 }
 
 
@@ -506,6 +508,17 @@ func (aws *AWS) DownloadPricingData() error {
 		pvkeys[key.Features()] = key
 		pvkeys[key.Features()] = key
 	}
 	}
 
 
+	reserved, err := aws.getReservedInstances()
+	if err != nil {
+		klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
+	} else {
+		klog.V(1).Infof("Found %d reserved instances", len(reserved))
+		aws.ReservedInstances = reserved
+		for _, r := range reserved {
+			klog.V(1).Infof("Reserved: CPU: %d, RAM: %d, Region: %s, Start: %s, End: %s", r.ReservedCPU, r.ReservedRAM, r.Region, r.StartDate.String(), r.EndDate.String())
+		}
+	}
+
 	aws.Pricing = make(map[string]*AWSProductTerms)
 	aws.Pricing = make(map[string]*AWSProductTerms)
 	aws.ValidPricingKeys = make(map[string]bool)
 	aws.ValidPricingKeys = make(map[string]bool)
 	skusToKeys := make(map[string]string)
 	skusToKeys := make(map[string]string)
@@ -909,31 +922,39 @@ func (*AWS) AddServiceKey(formValues url.Values) error {
 	return ioutil.WriteFile("/var/configs/key.json", result, 0644)
 	return ioutil.WriteFile("/var/configs/key.json", result, 0644)
 }
 }
 
 
-// GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
-func (*AWS) GetDisks() ([]byte, error) {
-	jsonFile, err := os.Open("/var/configs/key.json")
-	if err == nil {
-		byteValue, _ := ioutil.ReadAll(jsonFile)
-		var result map[string]string
-		err := json.Unmarshal([]byte(byteValue), &result)
-		if err != nil {
-			return nil, err
-		}
-		err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
-		if err != nil {
-			return nil, err
-		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
-		if err != nil {
-			return nil, err
+func configureAWSAuth(keyFile string) error {
+	jsonFile, err := os.Open(keyFile)
+	if err != nil {
+		if os.IsNotExist(err) {
+			klog.V(2).Infof("Using Default Credentials")
+			return nil
 		}
 		}
-	} else if os.IsNotExist(err) {
-		klog.V(2).Infof("Using Default Credentials")
-	} else {
-		return nil, err
+
+		return err
 	}
 	}
 	defer jsonFile.Close()
 	defer jsonFile.Close()
-	clusterConfig, err := os.Open("/var/configs/cluster.json")
+
+	byteValue, _ := ioutil.ReadAll(jsonFile)
+	var result map[string]string
+	err = json.Unmarshal([]byte(byteValue), &result)
+	if err != nil {
+		return err
+	}
+
+	err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_id"])
+	if err != nil {
+		return err
+	}
+
+	err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func getClusterConfig(ccFile string) (map[string]string, error) {
+	clusterConfig, err := os.Open(ccFile)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -947,7 +968,23 @@ func (*AWS) GetDisks() ([]byte, error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	region := aws.String(clusterConf["region"])
+
+	return clusterConf, nil
+}
+
+// GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
+func (*AWS) GetDisks() ([]byte, error) {
+	err := configureAWSAuth("/var/configs/key.json")
+	if err != nil {
+		return nil, err
+	}
+
+	clusterConfig, err := getClusterConfig("/var/configs/cluster.json")
+	if err != nil {
+		return nil, err
+	}
+
+	region := aws.String(clusterConfig["region"])
 	c := &aws.Config{
 	c := &aws.Config{
 		Region: region,
 		Region: region,
 	}
 	}
@@ -1398,8 +1435,108 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 	return spots, nil
 	return spots, nil
 }
 }
 
 
+func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
+	numReserved := len(a.ReservedInstances)
+
+	// Early return if no reserved instance data loaded
+	if numReserved == 0 {
+		klog.V(1).Infof("[Reserved] No Reserved Instances")
+		return
+	}
+
+	now := time.Now()
+
+	instances := make(map[string][]*AWSReservedInstance)
+	for _, r := range a.ReservedInstances {
+		if now.Before(r.StartDate) || now.After(r.EndDate) {
+			klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
+			continue
+		}
+
+		_, ok := instances[r.Region]
+		if !ok {
+			instances[r.Region] = []*AWSReservedInstance{r}
+		} else {
+			instances[r.Region] = append(instances[r.Region], r)
+		}
+	}
+
+	awsNodes := make(map[string]*v1.Node)
+	currentNodes := a.Clientset.GetAllNodes()
+
+	// Create a node name -> node map
+	for _, awsNode := range currentNodes {
+		awsNodes[awsNode.GetName()] = awsNode
+	}
+
+	// go through all provider nodes using k8s nodes for region
+	for nodeName, node := range nodes {
+		// Reset reserved allocation to prevent double allocation
+		node.Reserved = nil
+
+		kNode, ok := awsNodes[nodeName]
+		if !ok {
+			klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
+			continue
+		}
+
+		nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
+		if !ok {
+			klog.V(1).Infof("[Reserved] Could not find node region")
+			continue
+		}
+
+		reservedInstances, ok := instances[nodeRegion]
+		if !ok {
+			klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
+			continue
+		}
+
+		// Determine the InstanceType of the node
+		instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
+		if !ok {
+			continue
+		}
+
+		ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
+		if err != nil {
+			continue
+		}
+		ramGB := ramBytes / 1024 / 1024 / 1024
+
+		cpu, err := strconv.ParseFloat(node.VCPU, 64)
+		if err != nil {
+			continue
+		}
+
+		node.Reserved = &ReservedInstanceData{
+			ReservedCPU: 0,
+			ReservedRAM: 0,
+		}
+
+		for i, reservedInstance := range reservedInstances {
+			if reservedInstance.InstanceType == instanceType {
+				// Use < 0 to mark as ALL
+				node.Reserved.ReservedCPU = -1
+				node.Reserved.ReservedRAM = -1
+
+				// Set Costs -- Split evenly for RAM/CPU (resolves during aggregation)
+				costPerResource := reservedInstance.PricePerHour / (ramGB + cpu)
+				node.Reserved.CPUCost = costPerResource
+				node.Reserved.RAMCost = costPerResource
+
+				// Remove the reserve from the temporary slice to prevent
+				// being reallocated
+				instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
+				break
+			}
+		}
+	}
+}
+
 type AWSReservedInstance struct {
 type AWSReservedInstance struct {
 	Zone           string
 	Zone           string
+	Region         string
 	InstanceType   string
 	InstanceType   string
 	InstanceCount  int64
 	InstanceCount  int64
 	InstanceTenacy string
 	InstanceTenacy string
@@ -1408,45 +1545,48 @@ type AWSReservedInstance struct {
 	PricePerHour   float64
 	PricePerHour   float64
 }
 }
 
 
-type AWSReservedCounter struct {
-	RemainingInstances int64
-	Instance           *AWSReservedInstance
+func (ari *AWSReservedInstance) String() string {
+	return fmt.Sprintf("[Zone: %s, Region: %s, Type: %s, Count: %d, Tenacy: %s, Start: %+v, End: %+v, Price: %f]", ari.Zone, ari.Region, ari.InstanceType, ari.InstanceCount, ari.InstanceTenacy, ari.StartDate, ari.EndDate, ari.PricePerHour)
 }
 }
 
 
-func (aws *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
-
+func isReservedInstanceHourlyPrice(rc *ec2.RecurringCharge) bool {
+	return rc != nil && rc.Frequency != nil && *rc.Frequency == "Hourly"
 }
 }
 
 
-func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
-	customPricing, err := a.GetConfig()
-	if err != nil {
-		return nil, err
-	}
-	if customPricing.ServiceKeyName != "" {
-		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
-		if err != nil {
-			return nil, err
-		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
-		if err != nil {
-			return nil, err
+func getReservedInstancePrice(ri *ec2.ReservedInstances) (float64, error) {
+	var pricePerHour float64
+	if len(ri.RecurringCharges) > 0 {
+		for _, rc := range ri.RecurringCharges {
+			if isReservedInstanceHourlyPrice(rc) {
+				pricePerHour = *rc.Amount
+				break
+			}
 		}
 		}
 	}
 	}
-	athenaConfigs, err := os.Open("/var/configs/athena.json")
-	if err != nil {
-		return nil, err
+
+	// If we're still unable to resolve hourly price, try fixed -> hourly
+	if pricePerHour == 0 {
+		if ri.Duration != nil && ri.FixedPrice != nil {
+			var durHours float64
+			durSeconds := float64(*ri.Duration)
+			fixedPrice := float64(*ri.FixedPrice)
+			if durSeconds != 0 && fixedPrice != 0 {
+				durHours = durSeconds / 60 / 60
+				pricePerHour = fixedPrice / durHours
+			}
+		}
 	}
 	}
-	defer athenaConfigs.Close()
-	b, err := ioutil.ReadAll(athenaConfigs)
-	if err != nil {
-		return nil, err
+
+	if pricePerHour == 0 {
+		return 0, fmt.Errorf("Failed to resolve an hourly price from FixedPrice or Recurring Costs")
 	}
 	}
-	var athenaConf map[string]string
-	json.Unmarshal([]byte(b), &athenaConf)
-	region := aws.String(customPricing.AthenaRegion)
 
 
+	return pricePerHour, nil
+}
+
+func getRegionReservedInstances(region string) ([]*AWSReservedInstance, error) {
 	c := &aws.Config{
 	c := &aws.Config{
-		Region: region,
+		Region: aws.String(region),
 	}
 	}
 	s := session.Must(session.NewSession(c))
 	s := session.Must(session.NewSession(c))
 	svc := ec2.New(s)
 	svc := ec2.New(s)
@@ -1458,16 +1598,57 @@ func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
 
 
 	var reservedInstances []*AWSReservedInstance
 	var reservedInstances []*AWSReservedInstance
 	for _, ri := range response.ReservedInstances {
 	for _, ri := range response.ReservedInstances {
+		var zone string
+		if ri.AvailabilityZone != nil {
+			zone = *ri.AvailabilityZone
+		}
+		pricePerHour, err := getReservedInstancePrice(ri)
+		if err != nil {
+			klog.V(1).Infof("Error Resolving Price: %s", err.Error())
+			continue
+		}
 		reservedInstances = append(reservedInstances, &AWSReservedInstance{
 		reservedInstances = append(reservedInstances, &AWSReservedInstance{
-			Zone:           *ri.AvailabilityZone,
+			Zone:           zone,
+			Region:         region,
 			InstanceType:   *ri.InstanceType,
 			InstanceType:   *ri.InstanceType,
 			InstanceCount:  *ri.InstanceCount,
 			InstanceCount:  *ri.InstanceCount,
 			InstanceTenacy: *ri.InstanceTenancy,
 			InstanceTenacy: *ri.InstanceTenancy,
 			StartDate:      *ri.Start,
 			StartDate:      *ri.Start,
 			EndDate:        *ri.End,
 			EndDate:        *ri.End,
+			PricePerHour:   pricePerHour,
 		})
 		})
 	}
 	}
 
 
 	return reservedInstances, nil
 	return reservedInstances, nil
+}
 
 
+func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
+	err := configureAWSAuth("/var/configs/key.json")
+	if err != nil {
+		return nil, err
+	}
+
+	var reservedInstances []*AWSReservedInstance
+
+	nodes := a.Clientset.GetAllNodes()
+	regionsSeen := make(map[string]bool)
+	for _, node := range nodes {
+		region, ok := node.Labels[v1.LabelZoneRegion]
+		if !ok {
+			continue
+		}
+		if regionsSeen[region] {
+			continue
+		}
+
+		ris, err := getRegionReservedInstances(region)
+		if err != nil {
+			klog.V(3).Infof("Error getting reserved instances: %s", err.Error())
+			continue
+		}
+		regionsSeen[region] = true
+		reservedInstances = append(reservedInstances, ris...)
+	}
+
+	return reservedInstances, nil
 }
 }