Răsfoiți Sursa

Add returns on error/FAILED state for athena query wait

Kaelan Patel 4 ani în urmă
părinte
comite
dfd7c20a83
1 a modificat fișierele cu 15 adăugiri și 5 ștergeri
  1. 15 5
      pkg/cloud/awsprovider.go

+ 15 - 5
pkg/cloud/awsprovider.go

@@ -1608,7 +1608,10 @@ func (aws *AWS) QueryAthenaPaginated(ctx context.Context, query string, fn func(
 	if err != nil {
 	if err != nil {
 		log.Errorf(err.Error())
 		log.Errorf(err.Error())
 	}
 	}
-	waitForQueryToComplete(ctx, cli, startQueryExecutionOutput.QueryExecutionId)
+	err = waitForQueryToComplete(ctx, cli, startQueryExecutionOutput.QueryExecutionId)
+	if err != nil {
+		log.Errorf("QueryAthenaPaginated: query execution error: %s", err.Error())
+	}
 	queryResultsInput := &athena.GetQueryResultsInput{
 	queryResultsInput := &athena.GetQueryResultsInput{
 		QueryExecutionId: startQueryExecutionOutput.QueryExecutionId,
 		QueryExecutionId: startQueryExecutionOutput.QueryExecutionId,
 	}
 	}
@@ -1624,18 +1627,25 @@ func (aws *AWS) QueryAthenaPaginated(ctx context.Context, query string, fn func(
 	return nil
 	return nil
 }
 }
 
 
-func waitForQueryToComplete(ctx context.Context, client *athena.Client, queryExecutionID *string) {
+func waitForQueryToComplete(ctx context.Context, client *athena.Client, queryExecutionID *string) error {
 	inp := &athena.GetQueryExecutionInput{
 	inp := &athena.GetQueryExecutionInput{
 		QueryExecutionId: queryExecutionID,
 		QueryExecutionId: queryExecutionID,
 	}
 	}
 	isQueryStillRunning := true
 	isQueryStillRunning := true
 	for isQueryStillRunning {
 	for isQueryStillRunning {
-		qe, _ := client.GetQueryExecution(ctx, inp)
+		qe, err := client.GetQueryExecution(ctx, inp)
+		if err != nil {
+			return err
+		}
+		if qe.QueryExecution.Status.State != "RUNNING" && qe.QueryExecution.Status.State != "QUEUED" {
+			return fmt.Errorf("no query results available for query %s", *queryExecutionID)
+		}
 		if qe.QueryExecution.Status.State == "SUCCEEDED" {
 		if qe.QueryExecution.Status.State == "SUCCEEDED" {
 			isQueryStillRunning = false
 			isQueryStillRunning = false
 		}
 		}
 		time.Sleep(2 * time.Second)
 		time.Sleep(2 * time.Second)
 	}
 	}
+	return nil
 }
 }
 
 
 type SavingsPlanData struct {
 type SavingsPlanData struct {
@@ -1756,9 +1766,9 @@ func (aws *AWS) GetReservationDataFromAthena() error {
 	columns, _ := aws.fetchColumns()
 	columns, _ := aws.fetchColumns()
 
 
 	if !columns["reservation_reservation_a_r_n"] || !columns["reservation_effective_cost"] {
 	if !columns["reservation_reservation_a_r_n"] || !columns["reservation_effective_cost"] {
-		err = fmt.Errorf("No reservation data available in Athena")
+		err = fmt.Errorf("no reservation data available in Athena")
 		aws.RIPricingError = err
 		aws.RIPricingError = err
-		log.Infof(err.Error())
+		return err
 	}
 	}
 	if aws.RIPricingByInstanceID == nil {
 	if aws.RIPricingByInstanceID == nil {
 		aws.RIPricingByInstanceID = make(map[string]*RIData)
 		aws.RIPricingByInstanceID = make(map[string]*RIData)