فهرست منبع

merge pricing fixes

Ajay Tripathy 5 سال پیش
والد
کامیت
0a3e2bc558
1فایلهای تغییر یافته به همراه38 افزوده شده و 329 حذف شده
  1. 38 329
      pkg/cloud/awsprovider.go

+ 38 - 329
pkg/cloud/awsprovider.go

@@ -136,8 +136,6 @@ type AWS struct {
 	BaseSpotRAMPrice            string
 	BaseSpotRAMPrice            string
 	SpotLabelName               string
 	SpotLabelName               string
 	SpotLabelValue              string
 	SpotLabelValue              string
-	ServiceKeyName              string
-	ServiceKeySecret            string
 	SpotDataRegion              string
 	SpotDataRegion              string
 	SpotDataBucket              string
 	SpotDataBucket              string
 	SpotDataPrefix              string
 	SpotDataPrefix              string
@@ -590,9 +588,7 @@ func (aws *AWS) DownloadPricingData() error {
 	aws.ProjectID = c.ProjectID
 	aws.ProjectID = c.ProjectID
 	aws.SpotDataRegion = c.SpotDataRegion
 	aws.SpotDataRegion = c.SpotDataRegion
 
 
-	skn, sks := aws.getAWSAuth(false, c)
-	aws.ServiceKeyName = skn
-	aws.ServiceKeySecret = sks
+	aws.ConfigureAuthWith(c) // load aws authentication from configuration or secret
 
 
 	if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
 	if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
 		klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
 		klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
@@ -861,7 +857,7 @@ func (aws *AWS) refreshSpotPricing(force bool) {
 		return
 		return
 	}
 	}
 
 
-	sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
+	sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
 	if err != nil {
 	if err != nil {
 		klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
 		klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
 		aws.SpotPricingStatus = err.Error()
 		aws.SpotPricingStatus = err.Error()
@@ -1172,6 +1168,31 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 	return makeStructure(defaultClusterName)
 	return makeStructure(defaultClusterName)
 }
 }
 
 
+// updates the authentication to the latest values (via config or secret)
+func (aws *AWS) ConfigureAuth() error {
+	c, err := aws.Config.GetCustomPricingData()
+	if err != nil {
+		klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
+	}
+	return aws.ConfigureAuthWith(c)
+}
+
+// updates the authentication to the latest values (via config or secret)
+func (aws *AWS) ConfigureAuthWith(config *CustomPricing) error {
+	accessKeyID, accessKeySecret := aws.getAWSAuth(false, config)
+	if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
+		err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
+		if err != nil {
+			return err
+		}
+		err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
 // Gets the aws key id and secret
 // Gets the aws key id and secret
 func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
 func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
 	if aws.ServiceAccountChecks == nil { // safety in case checks don't exist
 	if aws.ServiceAccountChecks == nil { // safety in case checks don't exist
@@ -1241,22 +1262,6 @@ func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
 	return awsSecret, nil
 	return awsSecret, nil
 }
 }
 
 
-func (aws *AWS) configureAWSAuth() error {
-	accessKeyID := aws.ServiceKeyName
-	accessKeySecret := aws.ServiceKeySecret
-	if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
-		err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
-		if err != nil {
-			return err
-		}
-		err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
-		if err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
 func getClusterConfig(ccFile string) (map[string]string, error) {
 func getClusterConfig(ccFile string) (map[string]string, error) {
 	clusterConfig, err := os.Open(ccFile)
 	clusterConfig, err := os.Open(ccFile)
 	if err != nil {
 	if err != nil {
@@ -1276,43 +1281,6 @@ func getClusterConfig(ccFile string) (map[string]string, error) {
 	return clusterConf, nil
 	return clusterConf, nil
 }
 }
 
 
-// SetKeyEnv ensures that the two environment variables necessary to configure
-// a new AWS Session are set.
-func (a *AWS) SetKeyEnv() error {
-	// TODO add this to the helm chart, mirroring the cost-model
-	// configPath := env.GetConfigPath()
-	configPath := defaultConfigPath
-	path := configPath + "aws.json"
-
-	if _, err := os.Stat(path); err != nil {
-		if os.IsNotExist(err) {
-			log.DedupedErrorf(5, "file %s does not exist", path)
-		} else {
-			log.DedupedErrorf(5, "other file open error: %s", err)
-		}
-		return err
-	}
-
-	jsonFile, err := os.Open(path)
-	defer jsonFile.Close()
-
-	configMap := map[string]string{}
-	configBytes, err := ioutil.ReadAll(jsonFile)
-	if err != nil {
-		return err
-	}
-	json.Unmarshal([]byte(configBytes), &configMap)
-
-	keyName := configMap["awsServiceKeyName"]
-	keySecret := configMap["awsServiceKeySecret"]
-
-	// These are required before calling NewEnvCredentials below
-	env.Set(env.AWSAccessKeyIDEnvVar, keyName)
-	env.Set(env.AWSAccessKeySecretEnvVar, keySecret)
-
-	return nil
-}
-
 func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
 func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
 	sess, err := session.NewSession(&aws.Config{
 	sess, err := session.NewSession(&aws.Config{
 		Region:      aws.String(region),
 		Region:      aws.String(region),
@@ -1327,9 +1295,7 @@ func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput
 }
 }
 
 
 func (a *AWS) GetAddresses() ([]byte, error) {
 func (a *AWS) GetAddresses() ([]byte, error) {
-	if err := a.SetKeyEnv(); err != nil {
-		return nil, err
-	}
+	a.ConfigureAuth() // load authentication data into env vars
 
 
 	addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
 	addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
 	errorCh := make(chan error, len(awsRegions))
 	errorCh := make(chan error, len(awsRegions))
@@ -1414,9 +1380,7 @@ func (a *AWS) getDisksForRegion(region string, maxResults int64, nextToken *stri
 
 
 // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
 // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
 func (a *AWS) GetDisks() ([]byte, error) {
 func (a *AWS) GetDisks() ([]byte, error) {
-	if err := a.SetKeyEnv(); err != nil {
-		return nil, err
-	}
+	a.ConfigureAuth() // load authentication data into env vars
 
 
 	volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
 	volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
 	errorCh := make(chan error, len(awsRegions))
 	errorCh := make(chan error, len(awsRegions))
@@ -1637,16 +1601,9 @@ func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutpu
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if customPricing.ServiceKeyName != "" {
-		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
-		if err != nil {
-			return nil, err
-		}
-		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
-		if err != nil {
-			return nil, err
-		}
-	}
+
+	a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
+
 	region := aws.String(customPricing.AthenaRegion)
 	region := aws.String(customPricing.AthenaRegion)
 	resultsBucket := customPricing.AthenaBucketName
 	resultsBucket := customPricing.AthenaBucketName
 	database := customPricing.AthenaDatabase
 	database := customPricing.AthenaDatabase
@@ -1960,16 +1917,9 @@ func (a *AWS) QuerySQL(query string) ([]byte, error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if customPricing.ServiceKeyName != "" {
-		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
-		if err != nil {
-			return nil, err
-		}
-		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
-		if err != nil {
-			return nil, err
-		}
-	}
+
+	a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
+
 	athenaConfigs, err := os.Open("/var/configs/athena.json")
 	athenaConfigs, err := os.Open("/var/configs/athena.json")
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -2084,22 +2034,13 @@ func (f fnames) Less(i, j int) bool {
 	return t1.Before(t2)
 	return t1.Before(t2)
 }
 }
 
 
-func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
+func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
 	if a.ServiceAccountChecks == nil { // Set up checks to store error/success states
 	if a.ServiceAccountChecks == nil { // Set up checks to store error/success states
 		a.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
 		a.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
 	}
 	}
 
 
-	// credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
-	if accessKeyID != "" && accessKeySecret != "" {
-		err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
-		if err != nil {
-			return nil, err
-		}
-		err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
-		if err != nil {
-			return nil, err
-		}
-	}
+	a.ConfigureAuth() // configure aws api authentication by setting env vars
+
 	s3Prefix := projectID
 	s3Prefix := projectID
 	if len(prefix) != 0 {
 	if len(prefix) != 0 {
 		s3Prefix = prefix + "/" + s3Prefix
 		s3Prefix = prefix + "/" + s3Prefix
@@ -2254,239 +2195,7 @@ func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, regi
 }
 }
 
 
 func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
 func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
-	/*
-		numReserved := len(a.ReservedInstances)
-
-		// Early return if no reserved instance data loaded
-		if numReserved == 0 {
-			klog.V(4).Infof("[Reserved] No Reserved Instances")
-			return
-		}
-
-		cfg, err := a.GetConfig()
-		defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
-		if err != nil {
-			klog.V(3).Infof("Could not parse default cpu price")
-			defaultCPU = 0.031611
-		}
-
-		defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
-		if err != nil {
-			klog.V(3).Infof("Could not parse default ram price")
-			defaultRAM = 0.004237
-		}
-
-		cpuToRAMRatio := defaultCPU / defaultRAM
-
-		now := time.Now()
-
-		instances := make(map[string][]*AWSReservedInstance)
-		for _, r := range a.ReservedInstances {
-			if now.Before(r.StartDate) || now.After(r.EndDate) {
-				klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
-				continue
-			}
-
-			_, ok := instances[r.Region]
-			if !ok {
-				instances[r.Region] = []*AWSReservedInstance{r}
-			} else {
-				instances[r.Region] = append(instances[r.Region], r)
-			}
-		}
-
-		awsNodes := make(map[string]*v1.Node)
-		currentNodes := a.Clientset.GetAllNodes()
-
-		// Create a node name -> node map
-		for _, awsNode := range currentNodes {
-			awsNodes[awsNode.GetName()] = awsNode
-		}
-
-		// go through all provider nodes using k8s nodes for region
-		for nodeName, node := range nodes {
-			// Reset reserved allocation to prevent double allocation
-			node.Reserved = nil
-
-			kNode, ok := awsNodes[nodeName]
-			if !ok {
-				klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
-				continue
-			}
-
-			nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
-			if !ok {
-				klog.V(1).Infof("[Reserved] Could not find node region")
-				continue
-			}
-
-			reservedInstances, ok := instances[nodeRegion]
-			if !ok {
-				klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
-				continue
-			}
-
-			// Determine the InstanceType of the node
-			instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
-			if !ok {
-				continue
-			}
-
-			ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
-			if err != nil {
-				continue
-			}
-			ramGB := ramBytes / 1024 / 1024 / 1024
-
-			cpu, err := strconv.ParseFloat(node.VCPU, 64)
-			if err != nil {
-				continue
-			}
-
-			ramMultiple := cpu*cpuToRAMRatio + ramGB
-
-			node.Reserved = &ReservedInstanceData{
-				ReservedCPU: 0,
-				ReservedRAM: 0,
-			}
-
-			for i, reservedInstance := range reservedInstances {
-				if reservedInstance.InstanceType == instanceType {
-					// Use < 0 to mark as ALL
-					node.Reserved.ReservedCPU = -1
-					node.Reserved.ReservedRAM = -1
-
-					// Set Costs based on CPU/RAM ratios
-					ramPrice := reservedInstance.PricePerHour / ramMultiple
-					node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
-					node.Reserved.RAMCost = ramPrice
-
-					// Remove the reserve from the temporary slice to prevent
-					// being reallocated
-					instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
-					break
-				}
-			}
-		}*/
-}
-
-type AWSReservedInstance struct {
-	Zone           string
-	Region         string
-	InstanceType   string
-	InstanceCount  int64
-	InstanceTenacy string
-	StartDate      time.Time
-	EndDate        time.Time
-	PricePerHour   float64
-}
-
-func (ari *AWSReservedInstance) String() string {
-	return fmt.Sprintf("[Zone: %s, Region: %s, Type: %s, Count: %d, Tenacy: %s, Start: %+v, End: %+v, Price: %f]", ari.Zone, ari.Region, ari.InstanceType, ari.InstanceCount, ari.InstanceTenacy, ari.StartDate, ari.EndDate, ari.PricePerHour)
-}
-
-func isReservedInstanceHourlyPrice(rc *ec2.RecurringCharge) bool {
-	return rc != nil && rc.Frequency != nil && *rc.Frequency == "Hourly"
-}
-
-func getReservedInstancePrice(ri *ec2.ReservedInstances) (float64, error) {
-	var pricePerHour float64
-	if len(ri.RecurringCharges) > 0 {
-		for _, rc := range ri.RecurringCharges {
-			if isReservedInstanceHourlyPrice(rc) {
-				pricePerHour = *rc.Amount
-				break
-			}
-		}
-	}
-
-	// If we're still unable to resolve hourly price, try fixed -> hourly
-	if pricePerHour == 0 {
-		if ri.Duration != nil && ri.FixedPrice != nil {
-			var durHours float64
-			durSeconds := float64(*ri.Duration)
-			fixedPrice := float64(*ri.FixedPrice)
-			if durSeconds != 0 && fixedPrice != 0 {
-				durHours = durSeconds / 60 / 60
-				pricePerHour = fixedPrice / durHours
-			}
-		}
-	}
-
-	if pricePerHour == 0 {
-		return 0, fmt.Errorf("Failed to resolve an hourly price from FixedPrice or Recurring Costs")
-	}
-
-	return pricePerHour, nil
-}
-
-func getRegionReservedInstances(region string) ([]*AWSReservedInstance, error) {
-	c := &aws.Config{
-		Region: aws.String(region),
-	}
-	s := session.Must(session.NewSession(c))
-	svc := ec2.New(s)
-
-	response, err := svc.DescribeReservedInstances(&ec2.DescribeReservedInstancesInput{})
-	if err != nil {
-		return nil, err
-	}
-
-	var reservedInstances []*AWSReservedInstance
-	for _, ri := range response.ReservedInstances {
-		var zone string
-		if ri.AvailabilityZone != nil {
-			zone = *ri.AvailabilityZone
-		}
-		pricePerHour, err := getReservedInstancePrice(ri)
-		if err != nil {
-			klog.V(1).Infof("Error Resolving Price: %s", err.Error())
-			continue
-		}
-		reservedInstances = append(reservedInstances, &AWSReservedInstance{
-			Zone:           zone,
-			Region:         region,
-			InstanceType:   *ri.InstanceType,
-			InstanceCount:  *ri.InstanceCount,
-			InstanceTenacy: *ri.InstanceTenancy,
-			StartDate:      *ri.Start,
-			EndDate:        *ri.End,
-			PricePerHour:   pricePerHour,
-		})
-	}
-
-	return reservedInstances, nil
-}
-
-func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
-	err := a.configureAWSAuth()
-	if err != nil {
-		return nil, fmt.Errorf("Error Configuring aws auth: %s", err.Error())
-	}
-
-	var reservedInstances []*AWSReservedInstance
-
-	nodes := a.Clientset.GetAllNodes()
-	regionsSeen := make(map[string]bool)
-	for _, node := range nodes {
-		region, ok := node.Labels[v1.LabelZoneRegion]
-		if !ok {
-			continue
-		}
-		if regionsSeen[region] {
-			continue
-		}
-
-		ris, err := getRegionReservedInstances(region)
-		if err != nil {
-			klog.V(3).Infof("Error getting reserved instances: %s", err.Error())
-			continue
-		}
-		regionsSeen[region] = true
-		reservedInstances = append(reservedInstances, ris...)
-	}
 
 
-	return reservedInstances, nil
 }
 }
 
 
 func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {
 func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {