Просмотр исходного кода

Merge pull request #251 from kubecost/bolt/reserved-instances

Reserved Instance AWS and GCP Support
Matt Bolt 6 лет назад
Родитель
Сommit
7fe7fd6f74
2 измененных файлов с 285 добавлено и 52 удалено
  1. 275 46
      cloud/awsprovider.go
  2. 10 6
      cloud/gcpprovider.go

+ 275 - 46
cloud/awsprovider.go

@@ -35,6 +35,7 @@ import (
 
 const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
 const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
+const awsReservedInstancePricePerHour = 0.0287
 const supportedSpotFeedVersion = "1"
 const SpotInfoUpdateType = "spotinfo"
 const AthenaInfoUpdateType = "athenainfo"
@@ -59,6 +60,7 @@ type AWS struct {
 	SpotDataPrefix          string
 	ProjectID               string
 	DownloadPricingDataLock sync.RWMutex
+	ReservedInstances       []*AWSReservedInstance
 	*CustomProvider
 }
 
@@ -509,6 +511,17 @@ func (aws *AWS) DownloadPricingData() error {
 		pvkeys[key.Features()] = key
 	}
 
+	reserved, err := aws.getReservedInstances()
+	if err != nil {
+		klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
+	} else {
+		klog.V(1).Infof("Found %d reserved instances", len(reserved))
+		aws.ReservedInstances = reserved
+		for _, r := range reserved {
+			klog.V(1).Infof("%s", r)
+		}
+	}
+
 	aws.Pricing = make(map[string]*AWSProductTerms)
 	aws.ValidPricingKeys = make(map[string]bool)
 	skusToKeys := make(map[string]string)
@@ -912,31 +925,39 @@ func (*AWS) AddServiceKey(formValues url.Values) error {
 	return ioutil.WriteFile("/var/configs/key.json", result, 0644)
 }
 
-// GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
-func (*AWS) GetDisks() ([]byte, error) {
-	jsonFile, err := os.Open("/var/configs/key.json")
-	if err == nil {
-		byteValue, _ := ioutil.ReadAll(jsonFile)
-		var result map[string]string
-		err := json.Unmarshal([]byte(byteValue), &result)
-		if err != nil {
-			return nil, err
-		}
-		err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
-		if err != nil {
-			return nil, err
-		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
-		if err != nil {
-			return nil, err
+func configureAWSAuth(keyFile string) error {
+	jsonFile, err := os.Open(keyFile)
+	if err != nil {
+		if os.IsNotExist(err) {
+			klog.V(2).Infof("Using Default Credentials")
+			return nil
 		}
-	} else if os.IsNotExist(err) {
-		klog.V(2).Infof("Using Default Credentials")
-	} else {
-		return nil, err
+
+		return err
 	}
 	defer jsonFile.Close()
-	clusterConfig, err := os.Open("/var/configs/cluster.json")
+
+	byteValue, _ := ioutil.ReadAll(jsonFile)
+	var result map[string]string
+	err = json.Unmarshal([]byte(byteValue), &result)
+	if err != nil {
+		return err
+	}
+
+	err = os.Setenv(awsAccessKeyIDEnvVar, result["awsServiceKeyName"])
+	if err != nil {
+		return err
+	}
+
+	err = os.Setenv(awsAccessKeySecretEnvVar, result["awsServiceKeySecret"])
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func getClusterConfig(ccFile string) (map[string]string, error) {
+	clusterConfig, err := os.Open(ccFile)
 	if err != nil {
 		return nil, err
 	}
@@ -950,7 +971,23 @@ func (*AWS) GetDisks() ([]byte, error) {
 	if err != nil {
 		return nil, err
 	}
-	region := aws.String(clusterConf["region"])
+
+	return clusterConf, nil
+}
+
+// GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
+func (*AWS) GetDisks() ([]byte, error) {
+	err := configureAWSAuth("/var/configs/key.json")
+	if err != nil {
+		return nil, err
+	}
+
+	clusterConfig, err := getClusterConfig("/var/configs/cluster.json")
+	if err != nil {
+		return nil, err
+	}
+
+	region := aws.String(clusterConfig["region"])
 	c := &aws.Config{
 		Region: region,
 	}
@@ -1401,45 +1438,237 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 	return spots, nil
 }
 
-func (aws *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
+func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
+	numReserved := len(a.ReservedInstances)
 
-}
+	// Early return if no reserved instance data loaded
+	if numReserved == 0 {
+		klog.V(1).Infof("[Reserved] No Reserved Instances")
+		return
+	}
 
-/*
-func (aws *AWS) getReservedInstances() ([]interface{}, error) {
-	customPricing, err := a.GetConfig()
+	cfg, err := a.GetConfig()
+	defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
 	if err != nil {
-		return nil, err
+		klog.V(3).Infof("Could not parse default cpu price")
+		defaultCPU = 0.031611
 	}
-	if customPricing.ServiceKeyName != "" {
-		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
+
+	defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
+	if err != nil {
+		klog.V(3).Infof("Could not parse default ram price")
+		defaultRAM = 0.004237
+	}
+
+	cpuToRAMRatio := defaultCPU / defaultRAM
+
+	now := time.Now()
+
+	instances := make(map[string][]*AWSReservedInstance)
+	for _, r := range a.ReservedInstances {
+		if now.Before(r.StartDate) || now.After(r.EndDate) {
+			klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
+			continue
+		}
+
+		_, ok := instances[r.Region]
+		if !ok {
+			instances[r.Region] = []*AWSReservedInstance{r}
+		} else {
+			instances[r.Region] = append(instances[r.Region], r)
+		}
+	}
+
+	awsNodes := make(map[string]*v1.Node)
+	currentNodes := a.Clientset.GetAllNodes()
+
+	// Create a node name -> node map
+	for _, awsNode := range currentNodes {
+		awsNodes[awsNode.GetName()] = awsNode
+	}
+
+	// go through all provider nodes using k8s nodes for region
+	for nodeName, node := range nodes {
+		// Reset reserved allocation to prevent double allocation
+		node.Reserved = nil
+
+		kNode, ok := awsNodes[nodeName]
+		if !ok {
+			klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
+			continue
+		}
+
+		nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
+		if !ok {
+			klog.V(1).Infof("[Reserved] Could not find node region")
+			continue
+		}
+
+		reservedInstances, ok := instances[nodeRegion]
+		if !ok {
+			klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
+			continue
+		}
+
+		// Determine the InstanceType of the node
+		instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
+		if !ok {
+			continue
+		}
+
+		ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
 		if err != nil {
-			return nil, err
+			continue
 		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
+		ramGB := ramBytes / 1024 / 1024 / 1024
+
+		cpu, err := strconv.ParseFloat(node.VCPU, 64)
 		if err != nil {
-			return nil, err
+			continue
+		}
+
+		ramMultiple := cpu*cpuToRAMRatio + ramGB
+
+		node.Reserved = &ReservedInstanceData{
+			ReservedCPU: 0,
+			ReservedRAM: 0,
+		}
+
+		for i, reservedInstance := range reservedInstances {
+			if reservedInstance.InstanceType == instanceType {
+				// Use < 0 to mark as ALL
+				node.Reserved.ReservedCPU = -1
+				node.Reserved.ReservedRAM = -1
+
+				// Set Costs based on CPU/RAM ratios
+				ramPrice := reservedInstance.PricePerHour / ramMultiple
+				node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
+				node.Reserved.RAMCost = ramPrice
+
+				// Remove the reserve from the temporary slice to prevent
+				// being reallocated
+				instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
+				break
+			}
 		}
 	}
-	athenaConfigs, err := os.Open("/var/configs/athena.json")
+}
+
+type AWSReservedInstance struct {
+	Zone           string
+	Region         string
+	InstanceType   string
+	InstanceCount  int64
+	InstanceTenacy string
+	StartDate      time.Time
+	EndDate        time.Time
+	PricePerHour   float64
+}
+
+func (ari *AWSReservedInstance) String() string {
+	return fmt.Sprintf("[Zone: %s, Region: %s, Type: %s, Count: %d, Tenacy: %s, Start: %+v, End: %+v, Price: %f]", ari.Zone, ari.Region, ari.InstanceType, ari.InstanceCount, ari.InstanceTenacy, ari.StartDate, ari.EndDate, ari.PricePerHour)
+}
+
+func isReservedInstanceHourlyPrice(rc *ec2.RecurringCharge) bool {
+	return rc != nil && rc.Frequency != nil && *rc.Frequency == "Hourly"
+}
+
+func getReservedInstancePrice(ri *ec2.ReservedInstances) (float64, error) {
+	var pricePerHour float64
+	if len(ri.RecurringCharges) > 0 {
+		for _, rc := range ri.RecurringCharges {
+			if isReservedInstanceHourlyPrice(rc) {
+				pricePerHour = *rc.Amount
+				break
+			}
+		}
+	}
+
+	// If we're still unable to resolve hourly price, try fixed -> hourly
+	if pricePerHour == 0 {
+		if ri.Duration != nil && ri.FixedPrice != nil {
+			var durHours float64
+			durSeconds := float64(*ri.Duration)
+			fixedPrice := float64(*ri.FixedPrice)
+			if durSeconds != 0 && fixedPrice != 0 {
+				durHours = durSeconds / 60 / 60
+				pricePerHour = fixedPrice / durHours
+			}
+		}
+	}
+
+	if pricePerHour == 0 {
+		return 0, fmt.Errorf("Failed to resolve an hourly price from FixedPrice or Recurring Costs")
+	}
+
+	return pricePerHour, nil
+}
+
+func getRegionReservedInstances(region string) ([]*AWSReservedInstance, error) {
+	c := &aws.Config{
+		Region: aws.String(region),
+	}
+	s := session.Must(session.NewSession(c))
+	svc := ec2.New(s)
+
+	response, err := svc.DescribeReservedInstances(&ec2.DescribeReservedInstancesInput{})
 	if err != nil {
 		return nil, err
 	}
-	defer athenaConfigs.Close()
-	b, err := ioutil.ReadAll(athenaConfigs)
+
+	var reservedInstances []*AWSReservedInstance
+	for _, ri := range response.ReservedInstances {
+		var zone string
+		if ri.AvailabilityZone != nil {
+			zone = *ri.AvailabilityZone
+		}
+		pricePerHour, err := getReservedInstancePrice(ri)
+		if err != nil {
+			klog.V(1).Infof("Error Resolving Price: %s", err.Error())
+			continue
+		}
+		reservedInstances = append(reservedInstances, &AWSReservedInstance{
+			Zone:           zone,
+			Region:         region,
+			InstanceType:   *ri.InstanceType,
+			InstanceCount:  *ri.InstanceCount,
+			InstanceTenacy: *ri.InstanceTenancy,
+			StartDate:      *ri.Start,
+			EndDate:        *ri.End,
+			PricePerHour:   pricePerHour,
+		})
+	}
+
+	return reservedInstances, nil
+}
+
+func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
+	err := configureAWSAuth("/var/configs/aws.json")
 	if err != nil {
 		return nil, err
 	}
-	var athenaConf map[string]string
-	json.Unmarshal([]byte(b), &athenaConf)
-	region := aws.String(customPricing.AthenaRegion)
 
-	c := &aws.Config{
-		Region: region,
+	var reservedInstances []*AWSReservedInstance
+
+	nodes := a.Clientset.GetAllNodes()
+	regionsSeen := make(map[string]bool)
+	for _, node := range nodes {
+		region, ok := node.Labels[v1.LabelZoneRegion]
+		if !ok {
+			continue
+		}
+		if regionsSeen[region] {
+			continue
+		}
+
+		ris, err := getRegionReservedInstances(region)
+		if err != nil {
+			klog.V(3).Infof("Error getting reserved instances: %s", err.Error())
+			continue
+		}
+		regionsSeen[region] = true
+		reservedInstances = append(reservedInstances, ris...)
 	}
-	s := session.Must(session.NewSession(c))
-	svc := ec2.New(s)
 
-	svc.DescribeReservedInstances()
+	return reservedInstances, nil
 }
-*/

+ 10 - 6
cloud/gcpprovider.go

@@ -710,7 +710,7 @@ func (gcp *GCP) DownloadPricingData() error {
 		klog.V(1).Infof("Found %d reserved instances", len(reserved))
 		gcp.ReservedInstances = reserved
 		for _, r := range reserved {
-			klog.V(1).Infof("Reserved: CPU: %d, RAM: %d, Region: %s, Start: %s, End: %s", r.ReservedCPU, r.ReservedRAM, r.Region, r.StartDate.String(), r.EndDate.String())
+			klog.V(1).Infof("%s", r)
 		}
 	}
 
@@ -783,14 +783,18 @@ type GCPReservedInstance struct {
 	Region      string
 }
 
-type ReservedCounter struct {
+func (r *GCPReservedInstance) String() string {
+	return fmt.Sprintf("[CPU: %d, RAM: %d, Region: %s, Start: %s, End: %s]", r.ReservedCPU, r.ReservedRAM, r.Region, r.StartDate.String(), r.EndDate.String())
+}
+
+type GCPReservedCounter struct {
 	RemainingCPU int64
 	RemainingRAM int64
 	Instance     *GCPReservedInstance
 }
 
-func newReservedCounter(instance *GCPReservedInstance) *ReservedCounter {
-	return &ReservedCounter{
+func newReservedCounter(instance *GCPReservedInstance) *GCPReservedCounter {
+	return &GCPReservedCounter{
 		RemainingCPU: instance.ReservedCPU,
 		RemainingRAM: instance.ReservedRAM,
 		Instance:     instance,
@@ -822,7 +826,7 @@ func (gcp *GCP) ApplyReservedInstancePricing(nodes map[string]*Node) {
 
 	now := time.Now()
 
-	counters := make(map[string][]*ReservedCounter)
+	counters := make(map[string][]*GCPReservedCounter)
 	for _, r := range gcp.ReservedInstances {
 		if now.Before(r.StartDate) || now.After(r.EndDate) {
 			klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
@@ -832,7 +836,7 @@ func (gcp *GCP) ApplyReservedInstancePricing(nodes map[string]*Node) {
 		_, ok := counters[r.Region]
 		counter := newReservedCounter(r)
 		if !ok {
-			counters[r.Region] = []*ReservedCounter{counter}
+			counters[r.Region] = []*GCPReservedCounter{counter}
 		} else {
 			counters[r.Region] = append(counters[r.Region], counter)
 		}