Просмотр исходного кода

Merge pull request #1121 from kubecost/kaelan-add-v1.91.2-patch-to-dev

Cherry-pick v1.91.2 changes into dev
Kaelan Patel 4 лет назад
Родитель
Сommit
8b454c503b
1 измененных файлов с 17 добавлено и 6 удалено
  1. 17 6
      pkg/cloud/awsprovider.go

+ 17 - 6
pkg/cloud/awsprovider.go

@@ -1606,9 +1606,12 @@ func (aws *AWS) QueryAthenaPaginated(ctx context.Context, query string, fn func(
 	// Query Athena
 	startQueryExecutionOutput, err := cli.StartQueryExecution(ctx, startQueryExecutionInput)
 	if err != nil {
-		log.Errorf(err.Error())
+		return fmt.Errorf("QueryAthenaPaginated: start query error: %s", err.Error())
+	}
+	err = waitForQueryToComplete(ctx, cli, startQueryExecutionOutput.QueryExecutionId)
+	if err != nil {
+		return fmt.Errorf("QueryAthenaPaginated: query execution error: %s", err.Error())
 	}
-	waitForQueryToComplete(ctx, cli, startQueryExecutionOutput.QueryExecutionId)
 	queryResultsInput := &athena.GetQueryResultsInput{
 		QueryExecutionId: startQueryExecutionOutput.QueryExecutionId,
 	}
@@ -1624,18 +1627,26 @@ func (aws *AWS) QueryAthenaPaginated(ctx context.Context, query string, fn func(
 	return nil
 }
 
-func waitForQueryToComplete(ctx context.Context, client *athena.Client, queryExecutionID *string) {
+func waitForQueryToComplete(ctx context.Context, client *athena.Client, queryExecutionID *string) error {
 	inp := &athena.GetQueryExecutionInput{
 		QueryExecutionId: queryExecutionID,
 	}
 	isQueryStillRunning := true
 	for isQueryStillRunning {
-		qe, _ := client.GetQueryExecution(ctx, inp)
+		qe, err := client.GetQueryExecution(ctx, inp)
+		if err != nil {
+			return err
+		}
 		if qe.QueryExecution.Status.State == "SUCCEEDED" {
 			isQueryStillRunning = false
+			continue
+		}
+		if qe.QueryExecution.Status.State != "RUNNING" && qe.QueryExecution.Status.State != "QUEUED" {
+			return fmt.Errorf("no query results available for query %s", *queryExecutionID)
 		}
 		time.Sleep(2 * time.Second)
 	}
+	return nil
 }
 
 type SavingsPlanData struct {
@@ -1756,9 +1767,9 @@ func (aws *AWS) GetReservationDataFromAthena() error {
 	columns, _ := aws.fetchColumns()
 
 	if !columns["reservation_reservation_a_r_n"] || !columns["reservation_effective_cost"] {
-		err = fmt.Errorf("No reservation data available in Athena")
+		err = fmt.Errorf("no reservation data available in Athena")
 		aws.RIPricingError = err
-		log.Infof(err.Error())
+		return err
 	}
 	if aws.RIPricingByInstanceID == nil {
 		aws.RIPricingByInstanceID = make(map[string]*RIData)