Ajay Tripathy 5 лет назад
Родитель
Сommit
325a33eefc
2 измененных файлов с 114 добавлено и 95 удалено
  1. 1 0
      go.sum
  2. 113 95
      pkg/cloud/awsprovider.go

+ 1 - 0
go.sum

@@ -295,6 +295,7 @@ github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDf
 github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM=
 github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
 github.com/prometheus/client_golang v1.7.1 h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNjaVUwnjpdpA=
+github.com/prometheus/client_golang v1.8.0 h1:zvJNkoCFAnYFNC24FV8nW4JdRJ3GIFcLbg65lL/JDcw=
 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
 github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f h1:BVwpUVJDADN2ufcGik7W992pyps0wZ888b/y9GXcLTU=
 github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=

+ 113 - 95
pkg/cloud/awsprovider.go

@@ -650,7 +650,6 @@ func (aws *AWS) DownloadPricingData() error {
 		klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
 		return err
 	}
-	klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
 
 	dec := json.NewDecoder(resp.Body)
 	for {
@@ -785,6 +784,7 @@ func (aws *AWS) DownloadPricingData() error {
 			}
 		}
 	}
+	klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
 
 	// Always run spot pricing refresh when performing download
 	aws.refreshSpotPricing(true)
@@ -1514,6 +1514,84 @@ func generateAWSGroupBy(lastIdx int) string {
 	return strings.Join(sequence, ",")
 }
 
+func (a *AWS) QueryAthenaPaginated(query string) (*athena.GetQueryResultsInput, *athena.Athena, error) {
+	customPricing, err := a.GetConfig()
+	if err != nil {
+		return nil, nil, err
+	}
+	if customPricing.ServiceKeyName != "" {
+		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
+		if err != nil {
+			return nil, nil, err
+		}
+		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
+		if err != nil {
+			return nil, nil, err
+		}
+	}
+	region := aws.String(customPricing.AthenaRegion)
+	resultsBucket := customPricing.AthenaBucketName
+	database := customPricing.AthenaDatabase
+	c := &aws.Config{
+		Region: region,
+	}
+	s := session.Must(session.NewSession(c))
+	svc := athena.New(s)
+	if customPricing.MasterPayerARN != "" {
+		creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
+		svc = athena.New(s, &aws.Config{
+			Region:      region,
+			Credentials: creds,
+		})
+	}
+
+	var e athena.StartQueryExecutionInput
+
+	var r athena.ResultConfiguration
+	r.SetOutputLocation(resultsBucket)
+	e.SetResultConfiguration(&r)
+
+	e.SetQueryString(query)
+	var q athena.QueryExecutionContext
+	q.SetDatabase(database)
+	e.SetQueryExecutionContext(&q)
+
+	res, err := svc.StartQueryExecution(&e)
+	if err != nil {
+		return nil, svc, err
+	}
+
+	klog.V(2).Infof("StartQueryExecution result:")
+	klog.V(2).Infof(res.GoString())
+
+	var qri athena.GetQueryExecutionInput
+	qri.SetQueryExecutionId(*res.QueryExecutionId)
+
+	var qrop *athena.GetQueryExecutionOutput
+	duration := time.Duration(2) * time.Second // Pause for 2 seconds
+
+	for {
+		qrop, err = svc.GetQueryExecution(&qri)
+		if err != nil {
+			return nil, svc, err
+		}
+		if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
+			break
+		}
+		time.Sleep(duration)
+	}
+	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
+
+		var ip athena.GetQueryResultsInput
+		ip.SetQueryExecutionId(*res.QueryExecutionId)
+		klog.Infof("RETURNED SQID")
+		return &ip, svc, nil
+	} else {
+		klog.Infof("FAILED TO QUERY ATHENA AAAA %s", *qrop.QueryExecution.Status.State)
+		return nil, svc, fmt.Errorf("No results available for %s", query)
+	}
+}
+
 func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
 	customPricing, err := a.GetConfig()
 	if err != nil {
@@ -1778,104 +1856,44 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 		WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
 		GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, aggregatorOr, groupby)
 	}
-
-	klog.V(3).Infof("Running Query: %s", query)
-
-	if customPricing.ServiceKeyName != "" {
-		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
-		if err != nil {
-			return nil, err
-		}
-		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
-		if err != nil {
-			return nil, err
-		}
-	}
-	region := aws.String(customPricing.AthenaRegion)
-	resultsBucket := customPricing.AthenaBucketName
-	database := customPricing.AthenaDatabase
-	c := &aws.Config{
-		Region: region,
-	}
-	s := session.Must(session.NewSession(c))
-	svc := athena.New(s)
-	if customPricing.MasterPayerARN != "" {
-		creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
-		svc = athena.New(s, &aws.Config{
-			Region:      region,
-			Credentials: creds,
-		})
-	}
-
-	var e athena.StartQueryExecutionInput
-
-	var r athena.ResultConfiguration
-	r.SetOutputLocation(resultsBucket)
-	e.SetResultConfiguration(&r)
-
-	e.SetQueryString(query)
-	var q athena.QueryExecutionContext
-	q.SetDatabase(database)
-	e.SetQueryExecutionContext(&q)
-
-	res, err := svc.StartQueryExecution(&e)
-	if err != nil {
-		return nil, err
-	}
-
-	klog.V(2).Infof("StartQueryExecution result:")
-	klog.V(2).Infof(res.GoString())
-
-	var qri athena.GetQueryExecutionInput
-	qri.SetQueryExecutionId(*res.QueryExecutionId)
-
-	var qrop *athena.GetQueryExecutionOutput
-	duration := time.Duration(2) * time.Second // Pause for 2 seconds
-
-	for {
-		qrop, err = svc.GetQueryExecution(&qri)
-		if err != nil {
-			return nil, err
-		}
-		if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
-			break
-		}
-		time.Sleep(duration)
-	}
 	var oocAllocs []*OutOfClusterAllocation
-	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
-
-		var ip athena.GetQueryResultsInput
-		ip.SetQueryExecutionId(*res.QueryExecutionId)
-
-		op, err := svc.GetQueryResults(&ip)
-		if err != nil {
-			return nil, err
-		}
-		if len(op.ResultSet.Rows) > 1 {
-			for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows))] {
-				cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
-				if err != nil {
-					return nil, err
-				}
-				environment := ""
-				for _, d := range r.Data[1 : len(formattedAggregators)+1] {
-					if *d.VarCharValue != "" {
-						environment = *d.VarCharValue // just set to the first nonempty match
-					}
-					break
-				}
-				ooc := &OutOfClusterAllocation{
-					Aggregator:  strings.Join(aggregators, ","),
-					Environment: environment,
-					Service:     *r.Data[len(formattedAggregators)+1].VarCharValue,
-					Cost:        cost,
+	page := 0
+	processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
+		iter := op.ResultSet.Rows
+		if page == 0 && len(iter) > 1 {
+			iter = op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)]
+		}
+		page++
+		for _, r := range iter {
+			cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
+			if err != nil {
+				klog.Infof("Error converting cost `%s` from float ", *r.Data[lastIdx].VarCharValue)
+			}
+			environment := ""
+			for _, d := range r.Data[1 : len(formattedAggregators)+1] {
+				if *d.VarCharValue != "" {
+					environment = *d.VarCharValue // just set to the first nonempty match
 				}
-				oocAllocs = append(oocAllocs, ooc)
+				break
 			}
-		} else {
-			klog.V(1).Infof("No results available for %s at database %s between %s and %s", strings.Join(formattedAggregators, ","), customPricing.AthenaTable, start, end)
+			ooc := &OutOfClusterAllocation{
+				Aggregator:  strings.Join(aggregators, ","),
+				Environment: environment,
+				Service:     *r.Data[len(formattedAggregators)+1].VarCharValue,
+				Cost:        cost,
+			}
+			oocAllocs = append(oocAllocs, ooc)
 		}
+		return true
+	}
+
+	klog.V(3).Infof("Running Query: %s", query)
+	ip, svc, err := a.QueryAthenaPaginated(query)
+
+	athenaErr := svc.GetQueryResultsPages(ip, processResults)
+	if athenaErr != nil {
+		klog.Infof("RETURNING ATHENA ERROR")
+		return nil, athenaErr
 	}
 
 	if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.