Procházet zdrojové kódy

Merge pull request #584 from kubecost/AjayTripathy-return-aws-errors

Ajay tripathy return aws errors
Ajay Tripathy před 5 roky
rodič
revize
80946f6145

+ 1 - 0
go.sum

@@ -295,6 +295,7 @@ github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDf
 github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM=
 github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
 github.com/prometheus/client_golang v1.7.1 h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNjaVUwnjpdpA=
+github.com/prometheus/client_golang v1.8.0 h1:zvJNkoCFAnYFNC24FV8nW4JdRJ3GIFcLbg65lL/JDcw=
 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
 github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f h1:BVwpUVJDADN2ufcGik7W992pyps0wZ888b/y9GXcLTU=
 github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=

+ 151 - 92
pkg/cloud/awsprovider.go

@@ -42,6 +42,44 @@ const awsReservedInstancePricePerHour = 0.0287
 const supportedSpotFeedVersion = "1"
 const SpotInfoUpdateType = "spotinfo"
 const AthenaInfoUpdateType = "athenainfo"
+const PreemptibleType = "preemptible"
+
+const APIPricingSource = "Public API"
+const SpotPricingSource = "Spot Data Feed"
+const ReservedInstancePricingSource = "Savings Plan, Reservied Instance, and Out-Of-Cluster"
+
+func (aws *AWS) PricingSourceStatus() map[string]*PricingSource {
+
+	sources := make(map[string]*PricingSource)
+
+	sps := &PricingSource{
+		Name: SpotPricingSource,
+	}
+	sps.Error = aws.SpotPricingStatus
+	if sps.Error != "" {
+		sps.Available = false
+	} else if len(aws.SpotPricingByInstanceID) > 0 {
+		sps.Available = true
+	} else {
+		sps.Error = "No spot instances detected"
+	}
+	sources[SpotPricingSource] = sps
+
+	rps := &PricingSource{
+		Name: ReservedInstancePricingSource,
+	}
+	rps.Error = aws.RIPricingStatus
+	if rps.Error != "" {
+		rps.Available = false
+	} else if len(aws.RIPricingByInstanceID) > 0 {
+		rps.Available = true
+	} else {
+		rps.Error = "No reserved instances detected"
+	}
+	sources[ReservedInstancePricingSource] = rps
+	return sources
+
+}
 
 // How often spot data is refreshed
 const SpotRefreshDuration = 15 * time.Minute
@@ -81,7 +119,9 @@ type AWS struct {
 	SpotPricingUpdatedAt        *time.Time
 	SpotRefreshRunning          bool
 	SpotPricingLock             sync.RWMutex
+	SpotPricingStatus           string
 	RIPricingByInstanceID       map[string]*RIData
+	RIPricingStatus             string
 	RIDataRunning               bool
 	RIDataLock                  sync.RWMutex
 	SavingsPlanDataByInstanceID map[string]*SavingsPlanData
@@ -432,7 +472,7 @@ func (k *awsKey) Features() string {
 	region := k.Labels[v1.LabelZoneRegion]
 
 	key := region + "," + instanceType + "," + operatingSystem
-	usageType := "preemptible"
+	usageType := PreemptibleType
 	spotKey := key + "," + usageType
 	if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
 		return spotKey
@@ -515,7 +555,7 @@ func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
 
 func (aws *AWS) isPreemptible(key string) bool {
 	s := strings.Split(key, ",")
-	if len(s) == 4 && s[3] == "preemptible" {
+	if len(s) == 4 && s[3] == PreemptibleType {
 		return true
 	}
 	return false
@@ -646,7 +686,6 @@ func (aws *AWS) DownloadPricingData() error {
 		klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
 		return err
 	}
-	klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
 
 	dec := json.NewDecoder(resp.Body)
 	for {
@@ -781,6 +820,7 @@ func (aws *AWS) DownloadPricingData() error {
 			}
 		}
 	}
+	klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
 
 	// Always run spot pricing refresh when performing download
 	aws.refreshSpotPricing(true)
@@ -820,8 +860,10 @@ func (aws *AWS) refreshSpotPricing(force bool) {
 	sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
 	if err != nil {
 		klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
+		aws.SpotPricingStatus = err.Error()
 		return
 	}
+	aws.SpotPricingStatus = ""
 
 	// update time last updated
 	aws.SpotPricingUpdatedAt = &now
@@ -925,10 +967,10 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*No
 			BaseCPUPrice: aws.BaseCPUPrice,
 			BaseRAMPrice: aws.BaseRAMPrice,
 			BaseGPUPrice: aws.BaseGPUPrice,
-			UsageType:    usageType,
+			UsageType:    PreemptibleType,
 		}, nil
 	} else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
-		log.DedupedWarningf(5, "Node %s marked preemitible but we have no data in spot feed", k.ID())
+		log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
 		return &Node{
 			VCPU:         terms.VCpu,
 			VCPUCost:     aws.BaseSpotCPUPrice,
@@ -939,7 +981,7 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*No
 			BaseCPUPrice: aws.BaseCPUPrice,
 			BaseRAMPrice: aws.BaseRAMPrice,
 			BaseGPUPrice: aws.BaseGPUPrice,
-			UsageType:    usageType,
+			UsageType:    PreemptibleType,
 		}, nil
 	} else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
 		strCost := fmt.Sprintf("%f", sp.EffectiveCost)
@@ -996,7 +1038,7 @@ func (aws *AWS) NodePricing(k Key) (*Node, error) {
 	key := k.Features()
 	usageType := "ondemand"
 	if aws.isPreemptible(key) {
-		usageType = "preemptible"
+		usageType = PreemptibleType
 	}
 
 	terms, ok := aws.Pricing[key]
@@ -1478,6 +1520,73 @@ func generateAWSGroupBy(lastIdx int) string {
 	return strings.Join(sequence, ",")
 }
 
+func (a *AWS) QueryAthenaPaginated(query string) (*athena.GetQueryResultsInput, *athena.Athena, error) {
+	customPricing, err := a.GetConfig()
+	if err != nil {
+		return nil, nil, err
+	}
+	a.ConfigureAuthWith(customPricing)
+	region := aws.String(customPricing.AthenaRegion)
+	resultsBucket := customPricing.AthenaBucketName
+	database := customPricing.AthenaDatabase
+	c := &aws.Config{
+		Region: region,
+	}
+	s := session.Must(session.NewSession(c))
+	svc := athena.New(s)
+	if customPricing.MasterPayerARN != "" {
+		creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
+		svc = athena.New(s, &aws.Config{
+			Region:      region,
+			Credentials: creds,
+		})
+	}
+
+	var e athena.StartQueryExecutionInput
+
+	var r athena.ResultConfiguration
+	r.SetOutputLocation(resultsBucket)
+	e.SetResultConfiguration(&r)
+
+	e.SetQueryString(query)
+	var q athena.QueryExecutionContext
+	q.SetDatabase(database)
+	e.SetQueryExecutionContext(&q)
+
+	res, err := svc.StartQueryExecution(&e)
+	if err != nil {
+		return nil, svc, err
+	}
+
+	klog.V(2).Infof("StartQueryExecution result:")
+	klog.V(2).Infof(res.GoString())
+
+	var qri athena.GetQueryExecutionInput
+	qri.SetQueryExecutionId(*res.QueryExecutionId)
+
+	var qrop *athena.GetQueryExecutionOutput
+	duration := time.Duration(2) * time.Second // Pause for 2 seconds
+
+	for {
+		qrop, err = svc.GetQueryExecution(&qri)
+		if err != nil {
+			return nil, svc, err
+		}
+		if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
+			break
+		}
+		time.Sleep(duration)
+	}
+	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
+
+		var ip athena.GetQueryResultsInput
+		ip.SetQueryExecutionId(*res.QueryExecutionId)
+		return &ip, svc, nil
+	} else {
+		return nil, svc, fmt.Errorf("No results available for %s", query)
+	}
+}
+
 func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
 	customPricing, err := a.GetConfig()
 	if err != nil {
@@ -1652,8 +1761,10 @@ func (a *AWS) GetReservationDataFromAthena() error {
 	query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
 	op, err := a.QueryAthenaBillingData(query)
 	if err != nil {
+		a.RIPricingStatus = err.Error()
 		return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
 	}
+	a.RIPricingStatus = ""
 	klog.Infof("Fetching RI data...")
 	if len(op.ResultSet.Rows) > 1 {
 		a.RIDataLock.Lock()
@@ -1735,96 +1846,44 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 		WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
 		GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, aggregatorOr, groupby)
 	}
-
-	klog.V(3).Infof("Running Query: %s", query)
-
-	a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
-
-	region := aws.String(customPricing.AthenaRegion)
-	resultsBucket := customPricing.AthenaBucketName
-	database := customPricing.AthenaDatabase
-	c := &aws.Config{
-		Region: region,
-	}
-	s := session.Must(session.NewSession(c))
-	svc := athena.New(s)
-	if customPricing.MasterPayerARN != "" {
-		creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
-		svc = athena.New(s, &aws.Config{
-			Region:      region,
-			Credentials: creds,
-		})
-	}
-
-	var e athena.StartQueryExecutionInput
-
-	var r athena.ResultConfiguration
-	r.SetOutputLocation(resultsBucket)
-	e.SetResultConfiguration(&r)
-
-	e.SetQueryString(query)
-	var q athena.QueryExecutionContext
-	q.SetDatabase(database)
-	e.SetQueryExecutionContext(&q)
-
-	res, err := svc.StartQueryExecution(&e)
-	if err != nil {
-		return nil, err
-	}
-
-	klog.V(2).Infof("StartQueryExecution result:")
-	klog.V(2).Infof(res.GoString())
-
-	var qri athena.GetQueryExecutionInput
-	qri.SetQueryExecutionId(*res.QueryExecutionId)
-
-	var qrop *athena.GetQueryExecutionOutput
-	duration := time.Duration(2) * time.Second // Pause for 2 seconds
-
-	for {
-		qrop, err = svc.GetQueryExecution(&qri)
-		if err != nil {
-			return nil, err
-		}
-		if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
-			break
-		}
-		time.Sleep(duration)
-	}
 	var oocAllocs []*OutOfClusterAllocation
-	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
-
-		var ip athena.GetQueryResultsInput
-		ip.SetQueryExecutionId(*res.QueryExecutionId)
-
-		op, err := svc.GetQueryResults(&ip)
-		if err != nil {
-			return nil, err
+	page := 0
+	processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
+		iter := op.ResultSet.Rows
+		if page == 0 && len(iter) > 1 {
+			iter = op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)]
 		}
-		if len(op.ResultSet.Rows) > 1 {
-			for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows))] {
-				cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
-				if err != nil {
-					return nil, err
-				}
-				environment := ""
-				for _, d := range r.Data[1 : len(formattedAggregators)+1] {
-					if *d.VarCharValue != "" {
-						environment = *d.VarCharValue // just set to the first nonempty match
-					}
-					break
-				}
-				ooc := &OutOfClusterAllocation{
-					Aggregator:  strings.Join(aggregators, ","),
-					Environment: environment,
-					Service:     *r.Data[len(formattedAggregators)+1].VarCharValue,
-					Cost:        cost,
+		page++
+		for _, r := range iter {
+			cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
+			if err != nil {
+				klog.Infof("Error converting cost `%s` from float ", *r.Data[lastIdx].VarCharValue)
+			}
+			environment := ""
+			for _, d := range r.Data[1 : len(formattedAggregators)+1] {
+				if *d.VarCharValue != "" {
+					environment = *d.VarCharValue // just set to the first nonempty match
 				}
-				oocAllocs = append(oocAllocs, ooc)
+				break
 			}
-		} else {
-			klog.V(1).Infof("No results available for %s at database %s between %s and %s", strings.Join(formattedAggregators, ","), customPricing.AthenaTable, start, end)
+			ooc := &OutOfClusterAllocation{
+				Aggregator:  strings.Join(aggregators, ","),
+				Environment: environment,
+				Service:     *r.Data[len(formattedAggregators)+1].VarCharValue,
+				Cost:        cost,
+			}
+			oocAllocs = append(oocAllocs, ooc)
 		}
+		return true
+	}
+
+	klog.V(3).Infof("Running Query: %s", query)
+	ip, svc, err := a.QueryAthenaPaginated(query)
+
+	athenaErr := svc.GetQueryResultsPages(ip, processResults)
+	if athenaErr != nil {
+		klog.Infof("RETURNING ATHENA ERROR")
+		return nil, athenaErr
 	}
 
 	if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.

+ 5 - 0
pkg/cloud/azureprovider.go

@@ -835,6 +835,11 @@ func (az *Azure) ServiceAccountStatus() *ServiceAccountStatus {
 		Checks: []*ServiceAccountCheck{},
 	}
 }
+
+func (az *Azure) PricingSourceStatus() map[string]*PricingSource {
+	return make(map[string]*PricingSource)
+}
+
 func (*Azure) ClusterManagementPricing() (string, float64, error) {
 	return "", 0.0, nil
 }

+ 4 - 0
pkg/cloud/customprovider.go

@@ -304,6 +304,10 @@ func (cp *CustomProvider) ServiceAccountStatus() *ServiceAccountStatus {
 	}
 }
 
+func (cp *CustomProvider) PricingSourceStatus() map[string]*PricingSource {
+	return make(map[string]*PricingSource)
+}
+
 func (cp *CustomProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
 	return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
 }

+ 4 - 0
pkg/cloud/gcpprovider.go

@@ -1429,6 +1429,10 @@ func (gcp *GCP) ServiceAccountStatus() *ServiceAccountStatus {
 	}
 }
 
+func (gcp *GCP) PricingSourceStatus() map[string]*PricingSource {
+	return make(map[string]*PricingSource)
+}
+
 func (gcp *GCP) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
 	class := strings.Split(instanceType, "-")[0]
 	return 1.0 - ((1.0 - sustainedUseDiscount(class, defaultDiscount, isPreemptible)) * (1.0 - negotiatedDiscount))

+ 11 - 0
pkg/cloud/provider.go

@@ -185,6 +185,16 @@ type ServiceAccountCheck struct {
 	AdditionalInfo string `json:additionalInfo`
 }
 
+type PricingSources struct {
+	PricingSources map[string]*PricingSource
+}
+
+type PricingSource struct {
+	Name      string `json:"name"`
+	Available bool   `json:"available"`
+	Error     string `json:"error"`
+}
+
 // Provider represents a k8s provider.
 type Provider interface {
 	ClusterInfo() (map[string]string, error)
@@ -206,6 +216,7 @@ type Provider interface {
 	ExternalAllocations(string, string, []string, string, string, bool) ([]*OutOfClusterAllocation, error)
 	ApplyReservedInstancePricing(map[string]*Node)
 	ServiceAccountStatus() *ServiceAccountStatus
+	PricingSourceStatus() map[string]*PricingSource
 	ClusterManagementPricing() (string, float64, error)
 	CombinedDiscountForNode(string, bool, float64, float64) float64
 	ParseID(string) string

+ 8 - 0
pkg/costmodel/router.go

@@ -637,6 +637,13 @@ func (p *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Reques
 	w.Write(WrapData(A.Cloud.ServiceAccountStatus(), nil))
 }
 
+func (p *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	w.Write(WrapData(A.Cloud.PricingSourceStatus(), nil))
+}
+
 func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -1019,6 +1026,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	Router.GET("/clusterInfo", A.ClusterInfo)
 	Router.GET("/clusterInfoMap", A.GetClusterInfoMap)
 	Router.GET("/serviceAccountStatus", A.GetServiceAccountStatus)
+	Router.GET("/pricingSourceStatus", A.GetPricingSourceStatus)
 
 	// cluster manager endpoints
 	Router.GET("/clusters", managerEndpoints.GetAllClusters)