Răsfoiți Sursa

initial commit

AjayTripathy 6 ani în urmă
părinte
comite
63e024812e
1 a modificat fișierele cu 235 adăugiri și 96 ștergeri
  1. 235 96
      pkg/cloud/awsprovider.go

+ 235 - 96
pkg/cloud/awsprovider.go

@@ -44,6 +44,7 @@ const AthenaInfoUpdateType = "athenainfo"
 type AWS struct {
 type AWS struct {
 	Pricing                 map[string]*AWSProductTerms
 	Pricing                 map[string]*AWSProductTerms
 	SpotPricingByInstanceID map[string]*spotInfo
 	SpotPricingByInstanceID map[string]*spotInfo
+	RIPricingByInstanceID   map[string]*RIData
 	ValidPricingKeys        map[string]bool
 	ValidPricingKeys        map[string]bool
 	Clientset               clustercache.ClusterCache
 	Clientset               clustercache.ClusterCache
 	BaseCPUPrice            string
 	BaseCPUPrice            string
@@ -60,7 +61,6 @@ type AWS struct {
 	SpotDataPrefix          string
 	SpotDataPrefix          string
 	ProjectID               string
 	ProjectID               string
 	DownloadPricingDataLock sync.RWMutex
 	DownloadPricingDataLock sync.RWMutex
-	ReservedInstances       []*AWSReservedInstance
 	Config                  *ProviderConfig
 	Config                  *ProviderConfig
 	*CustomProvider
 	*CustomProvider
 }
 }
@@ -511,14 +511,13 @@ func (aws *AWS) DownloadPricingData() error {
 		pvkeys[key.Features()] = key
 		pvkeys[key.Features()] = key
 	}
 	}
 
 
-	reserved, err := aws.getReservedInstances()
+	err = aws.GetReservationDataFromAthena()
 	if err != nil {
 	if err != nil {
 		klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
 		klog.V(1).Infof("Failed to lookup reserved instance data: %s", err.Error())
 	} else {
 	} else {
-		klog.V(1).Infof("Found %d reserved instances", len(reserved))
-		aws.ReservedInstances = reserved
-		for _, r := range reserved {
-			klog.V(1).Infof("%s", r)
+		klog.V(1).Infof("Found %d reserved instances", len(aws.RIPricingByInstanceID))
+		for k, r := range aws.RIPricingByInstanceID {
+			klog.V(1).Infof("%s : %s", k, r)
 		}
 		}
 	}
 	}
 
 
@@ -747,6 +746,20 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*No
 			BaseGPUPrice: aws.BaseGPUPrice,
 			BaseGPUPrice: aws.BaseGPUPrice,
 			UsageType:    usageType,
 			UsageType:    usageType,
 		}, nil
 		}, nil
+	} else if ri, ok := aws.RIPricingByInstanceID[k.ID()]; ok {
+		strCost := fmt.Sprintf("%f", ri.EffectiveCost)
+		return &Node{
+			Cost:         strCost,
+			VCPU:         terms.VCpu,
+			RAM:          terms.Memory,
+			GPU:          terms.GPU,
+			Storage:      terms.Storage,
+			BaseCPUPrice: aws.BaseCPUPrice,
+			BaseRAMPrice: aws.BaseRAMPrice,
+			BaseGPUPrice: aws.BaseGPUPrice,
+			UsageType:    usageType,
+		}, nil
+
 	}
 	}
 	c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
 	c, ok := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode]
 	if !ok {
 	if !ok {
@@ -1051,6 +1064,131 @@ func generateAWSGroupBy(lastIdx int) string {
 	return strings.Join(sequence, ",")
 	return strings.Join(sequence, ",")
 }
 }
 
 
+func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
+	customPricing, err := a.GetConfig()
+	if err != nil {
+		return nil, err
+	}
+	if customPricing.ServiceKeyName != "" {
+		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
+		if err != nil {
+			return nil, err
+		}
+		err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
+		if err != nil {
+			return nil, err
+		}
+	}
+	region := aws.String(customPricing.AthenaRegion)
+	resultsBucket := customPricing.AthenaBucketName
+	database := customPricing.AthenaDatabase
+	c := &aws.Config{
+		Region: region,
+	}
+	s := session.Must(session.NewSession(c))
+	svc := athena.New(s)
+
+	var e athena.StartQueryExecutionInput
+
+	var r athena.ResultConfiguration
+	r.SetOutputLocation(resultsBucket)
+	e.SetResultConfiguration(&r)
+
+	e.SetQueryString(query)
+	var q athena.QueryExecutionContext
+	q.SetDatabase(database)
+	e.SetQueryExecutionContext(&q)
+
+	res, err := svc.StartQueryExecution(&e)
+	if err != nil {
+		return nil, err
+	}
+
+	klog.V(2).Infof("StartQueryExecution result:")
+	klog.V(2).Infof(res.GoString())
+
+	var qri athena.GetQueryExecutionInput
+	qri.SetQueryExecutionId(*res.QueryExecutionId)
+
+	var qrop *athena.GetQueryExecutionOutput
+	duration := time.Duration(2) * time.Second // Pause for 2 seconds
+
+	for {
+		qrop, err = svc.GetQueryExecution(&qri)
+		if err != nil {
+			return nil, err
+		}
+		if *qrop.QueryExecution.Status.State != "RUNNING" {
+			break
+		}
+		time.Sleep(duration)
+	}
+	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
+
+		var ip athena.GetQueryResultsInput
+		ip.SetQueryExecutionId(*res.QueryExecutionId)
+
+		return svc.GetQueryResults(&ip)
+	} else {
+		return nil, fmt.Errorf("No results available for %s", query)
+	}
+}
+
+type RIData struct {
+	ResourceID     string
+	EffectiveCost  float64
+	ReservationARN string
+	MostRecentDate string
+}
+
+func (a *AWS) GetReservationDataFromAthena() error {
+	a.RIPricingByInstanceID = make(map[string]*RIData)
+	tNow := time.Now()
+	tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
+	start := tOneDayAgo.Format("2006-01-02")
+	end := tNow.Format("2006-01-02")
+	q := `SELECT   
+		line_item_usage_start_date,
+		reservation_reservation_a_r_n,
+		line_item_resource_id,
+		reservation_effective_cost
+	FROM athena_test as cost_data
+	WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
+	AND reservation_reservation_a_r_n <> '' ORDER BY 
+	line_item_usage_start_date DESC`
+	query := fmt.Sprintf(q, start, end)
+	op, err := a.QueryAthenaBillingData(query)
+	if err != nil {
+		return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
+	}
+	if len(op.ResultSet.Rows) > 1 {
+		mostRecentDate := ""
+		for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
+			d := *r.Data[0].VarCharValue
+			if mostRecentDate == "" {
+				mostRecentDate = d
+			} else if mostRecentDate != d { // Get all most recent assignments
+				break
+			}
+			cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
+			if err != nil {
+				klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
+			}
+			r := &RIData{
+				ResourceID:     *r.Data[2].VarCharValue,
+				EffectiveCost:  cost,
+				ReservationARN: *r.Data[1].VarCharValue,
+				MostRecentDate: d,
+			}
+			a.RIPricingByInstanceID[r.ResourceID] = r
+		}
+	} else {
+		klog.Infof("No reserved instance data found")
+		return nil
+	}
+	return nil
+}
+
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
@@ -1482,119 +1620,120 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 }
 }
 
 
 func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
 func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
-	numReserved := len(a.ReservedInstances)
+	/*
+		numReserved := len(a.ReservedInstances)
 
 
-	// Early return if no reserved instance data loaded
-	if numReserved == 0 {
-		klog.V(4).Infof("[Reserved] No Reserved Instances")
-		return
-	}
+		// Early return if no reserved instance data loaded
+		if numReserved == 0 {
+			klog.V(4).Infof("[Reserved] No Reserved Instances")
+			return
+		}
 
 
-	cfg, err := a.GetConfig()
-	defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
-	if err != nil {
-		klog.V(3).Infof("Could not parse default cpu price")
-		defaultCPU = 0.031611
-	}
+		cfg, err := a.GetConfig()
+		defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
+		if err != nil {
+			klog.V(3).Infof("Could not parse default cpu price")
+			defaultCPU = 0.031611
+		}
 
 
-	defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
-	if err != nil {
-		klog.V(3).Infof("Could not parse default ram price")
-		defaultRAM = 0.004237
-	}
+		defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
+		if err != nil {
+			klog.V(3).Infof("Could not parse default ram price")
+			defaultRAM = 0.004237
+		}
 
 
-	cpuToRAMRatio := defaultCPU / defaultRAM
+		cpuToRAMRatio := defaultCPU / defaultRAM
 
 
-	now := time.Now()
+		now := time.Now()
 
 
-	instances := make(map[string][]*AWSReservedInstance)
-	for _, r := range a.ReservedInstances {
-		if now.Before(r.StartDate) || now.After(r.EndDate) {
-			klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
-			continue
-		}
+		instances := make(map[string][]*AWSReservedInstance)
+		for _, r := range a.ReservedInstances {
+			if now.Before(r.StartDate) || now.After(r.EndDate) {
+				klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
+				continue
+			}
 
 
-		_, ok := instances[r.Region]
-		if !ok {
-			instances[r.Region] = []*AWSReservedInstance{r}
-		} else {
-			instances[r.Region] = append(instances[r.Region], r)
+			_, ok := instances[r.Region]
+			if !ok {
+				instances[r.Region] = []*AWSReservedInstance{r}
+			} else {
+				instances[r.Region] = append(instances[r.Region], r)
+			}
 		}
 		}
-	}
 
 
-	awsNodes := make(map[string]*v1.Node)
-	currentNodes := a.Clientset.GetAllNodes()
+		awsNodes := make(map[string]*v1.Node)
+		currentNodes := a.Clientset.GetAllNodes()
 
 
-	// Create a node name -> node map
-	for _, awsNode := range currentNodes {
-		awsNodes[awsNode.GetName()] = awsNode
-	}
+		// Create a node name -> node map
+		for _, awsNode := range currentNodes {
+			awsNodes[awsNode.GetName()] = awsNode
+		}
 
 
-	// go through all provider nodes using k8s nodes for region
-	for nodeName, node := range nodes {
-		// Reset reserved allocation to prevent double allocation
-		node.Reserved = nil
+		// go through all provider nodes using k8s nodes for region
+		for nodeName, node := range nodes {
+			// Reset reserved allocation to prevent double allocation
+			node.Reserved = nil
 
 
-		kNode, ok := awsNodes[nodeName]
-		if !ok {
-			klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
-			continue
-		}
+			kNode, ok := awsNodes[nodeName]
+			if !ok {
+				klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
+				continue
+			}
 
 
-		nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
-		if !ok {
-			klog.V(1).Infof("[Reserved] Could not find node region")
-			continue
-		}
+			nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
+			if !ok {
+				klog.V(1).Infof("[Reserved] Could not find node region")
+				continue
+			}
 
 
-		reservedInstances, ok := instances[nodeRegion]
-		if !ok {
-			klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
-			continue
-		}
+			reservedInstances, ok := instances[nodeRegion]
+			if !ok {
+				klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
+				continue
+			}
 
 
-		// Determine the InstanceType of the node
-		instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
-		if !ok {
-			continue
-		}
+			// Determine the InstanceType of the node
+			instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
+			if !ok {
+				continue
+			}
 
 
-		ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
-		if err != nil {
-			continue
-		}
-		ramGB := ramBytes / 1024 / 1024 / 1024
+			ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
+			if err != nil {
+				continue
+			}
+			ramGB := ramBytes / 1024 / 1024 / 1024
 
 
-		cpu, err := strconv.ParseFloat(node.VCPU, 64)
-		if err != nil {
-			continue
-		}
+			cpu, err := strconv.ParseFloat(node.VCPU, 64)
+			if err != nil {
+				continue
+			}
 
 
-		ramMultiple := cpu*cpuToRAMRatio + ramGB
+			ramMultiple := cpu*cpuToRAMRatio + ramGB
 
 
-		node.Reserved = &ReservedInstanceData{
-			ReservedCPU: 0,
-			ReservedRAM: 0,
-		}
+			node.Reserved = &ReservedInstanceData{
+				ReservedCPU: 0,
+				ReservedRAM: 0,
+			}
 
 
-		for i, reservedInstance := range reservedInstances {
-			if reservedInstance.InstanceType == instanceType {
-				// Use < 0 to mark as ALL
-				node.Reserved.ReservedCPU = -1
-				node.Reserved.ReservedRAM = -1
+			for i, reservedInstance := range reservedInstances {
+				if reservedInstance.InstanceType == instanceType {
+					// Use < 0 to mark as ALL
+					node.Reserved.ReservedCPU = -1
+					node.Reserved.ReservedRAM = -1
 
 
-				// Set Costs based on CPU/RAM ratios
-				ramPrice := reservedInstance.PricePerHour / ramMultiple
-				node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
-				node.Reserved.RAMCost = ramPrice
+					// Set Costs based on CPU/RAM ratios
+					ramPrice := reservedInstance.PricePerHour / ramMultiple
+					node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
+					node.Reserved.RAMCost = ramPrice
 
 
-				// Remove the reserve from the temporary slice to prevent
-				// being reallocated
-				instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
-				break
+					// Remove the reserve from the temporary slice to prevent
+					// being reallocated
+					instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
+					break
+				}
 			}
 			}
-		}
-	}
+		}*/
 }
 }
 
 
 type AWSReservedInstance struct {
 type AWSReservedInstance struct {