Просмотр исходного кода

Prevent Athena Queries from running if there are missing columns

Sean Holcomb 5 лет назад
Родитель
Сommit
de0408d85a
1 измененных файлов с 125 добавлено и 56 удалено
  1. 125 56
      pkg/cloud/awsprovider.go

+ 125 - 56
pkg/cloud/awsprovider.go

@@ -1789,14 +1789,20 @@ func (a *AWS) GetReservationDataFromAthena() error {
 	if cfg.AthenaBucketName == "" {
 		return fmt.Errorf("No Athena Bucket configured")
 	}
-	if a.RIPricingByInstanceID == nil {
-		a.RIPricingByInstanceID = make(map[string]*RIData)
-	}
-	tNow := time.Now()
-	tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
-	start := tOneDayAgo.Format("2006-01-02")
-	end := tNow.Format("2006-01-02")
-	q := `SELECT   
+
+	// Query for all column names in advance in order to validate configured
+	// label columns
+	columns, _ := a.ShowAthenaColumns()
+
+	if columns["reservation_reservation_a_r_n"] && columns["reservation_effective_cost"] {
+		if a.RIPricingByInstanceID == nil {
+			a.RIPricingByInstanceID = make(map[string]*RIData)
+		}
+		tNow := time.Now()
+		tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
+		start := tOneDayAgo.Format("2006-01-02")
+		end := tNow.Format("2006-01-02")
+		q := `SELECT   
 		line_item_usage_start_date,
 		reservation_reservation_a_r_n,
 		line_item_resource_id,
@@ -1805,47 +1811,95 @@ func (a *AWS) GetReservationDataFromAthena() error {
 	WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
 	AND reservation_reservation_a_r_n <> '' ORDER BY 
 	line_item_usage_start_date DESC`
-	query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
-	op, err := a.QueryAthenaBillingData(query)
-	if err != nil {
-		a.RIPricingStatus = err.Error()
-		return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
-	}
-	a.RIPricingStatus = ""
-	klog.Infof("Fetching RI data...")
-	if len(op.ResultSet.Rows) > 1 {
-		a.RIDataLock.Lock()
-		mostRecentDate := ""
-		for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
-			d := *r.Data[0].VarCharValue
-			if mostRecentDate == "" {
-				mostRecentDate = d
-			} else if mostRecentDate != d { // Get all most recent assignments
-				break
-			}
-			cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
-			if err != nil {
-				klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
+		query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
+		op, err := a.QueryAthenaBillingData(query)
+		if err != nil {
+			a.RIPricingStatus = err.Error()
+			return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
+		}
+		a.RIPricingStatus = ""
+		klog.Infof("Fetching RI data...")
+		if len(op.ResultSet.Rows) > 1 {
+			a.RIDataLock.Lock()
+			mostRecentDate := ""
+			for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
+				d := *r.Data[0].VarCharValue
+				if mostRecentDate == "" {
+					mostRecentDate = d
+				} else if mostRecentDate != d { // Get all most recent assignments
+					break
+				}
+				cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
+				if err != nil {
+					klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
+				}
+				r := &RIData{
+					ResourceID:     *r.Data[2].VarCharValue,
+					EffectiveCost:  cost,
+					ReservationARN: *r.Data[1].VarCharValue,
+					MostRecentDate: d,
+				}
+				a.RIPricingByInstanceID[r.ResourceID] = r
 			}
-			r := &RIData{
-				ResourceID:     *r.Data[2].VarCharValue,
-				EffectiveCost:  cost,
-				ReservationARN: *r.Data[1].VarCharValue,
-				MostRecentDate: d,
+			klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
+			for k, r := range a.RIPricingByInstanceID {
+				log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
 			}
-			a.RIPricingByInstanceID[r.ResourceID] = r
-		}
-		klog.V(1).Infof("Found %d reserved instances", len(a.RIPricingByInstanceID))
-		for k, r := range a.RIPricingByInstanceID {
-			log.DedupedInfof(5, "Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
+			a.RIDataLock.Unlock()
+		} else {
+			klog.Infof("No reserved instance data found")
 		}
-		a.RIDataLock.Unlock()
 	} else {
-		klog.Infof("No reserved instance data found")
+		klog.Infof("No reserved data available in Athena")
+		a.RIPricingStatus = ""
 	}
 	return nil
 }
 
+// ShowAthenaColumns returns a list of the names of all columns in the configured
+// Athena tables
+func (aws *AWS) ShowAthenaColumns() (map[string]bool, error) {
+	columnSet := map[string]bool{}
+	// Configure Athena query
+	cfg, err := aws.GetConfig()
+	if err != nil {
+		return nil, err
+	}
+	if cfg.AthenaTable == "" {
+		return nil, fmt.Errorf("AthenaTable not configured")
+	}
+	if cfg.AthenaBucketName == "" {
+		return nil, fmt.Errorf("AthenaBucketName not configured")
+	}
+
+	q := `SHOW COLUMNS IN  %s`
+	query := fmt.Sprintf(q, cfg.AthenaTable)
+	results, svc, err := aws.QueryAthenaPaginated(query)
+
+	columns := []string{}
+	pageNum := 0
+	athenaErr := svc.GetQueryResultsPages(results, func(page *athena.GetQueryResultsOutput, lastpage bool) bool {
+		for _, row := range page.ResultSet.Rows {
+			columns = append(columns, *row.Data[0].VarCharValue)
+		}
+
+		pageNum++
+
+		return true
+	})
+	if athenaErr != nil {
+		log.Warningf("Error getting Athena columns: %s", err)
+		return columnSet, athenaErr
+	}
+
+	for _, col := range columns {
+		columnSet[col] = true
+	}
+
+	return columnSet, nil
+}
+
+
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
@@ -1923,26 +1977,41 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 		}
 		return true
 	}
+	// Query for all column names in advance in order to validate configured
+	// label columns
+	columns, _ := a.ShowAthenaColumns()
 
-	klog.V(3).Infof("Running Query: %s", query)
-	ip, svc, err := a.QueryAthenaPaginated(query)
-
-	athenaErr := svc.GetQueryResultsPages(ip, processResults)
-	if athenaErr != nil {
-		klog.Infof("RETURNING ATHENA ERROR")
-		return nil, athenaErr
+	// Check for all aggregators being formatted into the query
+	containsColumns := true
+	for _, agg := range formattedAggregators {
+		if columns[agg] != true {
+			containsColumns = false
+			klog.Warningf("Athena missing column: %s", agg)
+		}
 	}
+	if containsColumns {
+		klog.V(3).Infof("Running Query: %s", query)
+		ip, svc, _ := a.QueryAthenaPaginated(query)
 
-	if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
-		gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
-		if err != nil {
-			klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
+		athenaErr := svc.GetQueryResultsPages(ip, processResults)
+		if athenaErr != nil {
+			klog.Infof("RETURNING ATHENA ERROR")
+			return nil, athenaErr
 		}
-		gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
-		if err != nil {
-			klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
+
+		if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
+			gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
+			if err != nil {
+				klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
+			}
+			gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue, true)
+			if err != nil {
+				klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
+			}
+			oocAllocs = append(oocAllocs, gcpOOC...)
 		}
-		oocAllocs = append(oocAllocs, gcpOOC...)
+	} else {
+		klog.Infof("External Allocations: Athena Query skipped due to missing columns")
 	}
 	return oocAllocs, nil
 }