فهرست منبع

merge develop and add serviceccount checks field

Ajay Tripathy 5 سال پیش
والد
کامیت
3ab9c72b5c
3فایلهای تغییر یافته به همراه203 افزوده شده و 27 حذف شده
  1. 140 26
      pkg/cloud/awsprovider.go
  2. 27 1
      pkg/costmodel/costmodel.go
  3. 36 0
      pkg/costmodel/promparsers.go

+ 140 - 26
pkg/cloud/awsprovider.go

@@ -76,32 +76,35 @@ var awsRegions = []string{
 
 // AWS represents an Amazon Provider
 type AWS struct {
-	Pricing                 map[string]*AWSProductTerms
-	SpotPricingByInstanceID map[string]*spotInfo
-	SpotPricingUpdatedAt    *time.Time
-	SpotRefreshRunning      bool
-	SpotPricingLock         sync.RWMutex
-	RIPricingByInstanceID   map[string]*RIData
-	RIDataRunning           bool
-	RIDataLock              sync.RWMutex
-	ValidPricingKeys        map[string]bool
-	Clientset               clustercache.ClusterCache
-	BaseCPUPrice            string
-	BaseRAMPrice            string
-	BaseGPUPrice            string
-	BaseSpotCPUPrice        string
-	BaseSpotRAMPrice        string
-	SpotLabelName           string
-	SpotLabelValue          string
-	ServiceKeyName          string
-	ServiceKeySecret        string
-	SpotDataRegion          string
-	SpotDataBucket          string
-	SpotDataPrefix          string
-	ProjectID               string
-	DownloadPricingDataLock sync.RWMutex
-	Config                  *ProviderConfig
-	ServiceAccountChecks    map[string]*ServiceAccountCheck
+	Pricing                     map[string]*AWSProductTerms
+	SpotPricingByInstanceID     map[string]*spotInfo
+	SpotPricingUpdatedAt        *time.Time
+	SpotRefreshRunning          bool
+	SpotPricingLock             sync.RWMutex
+	RIPricingByInstanceID       map[string]*RIData
+	RIDataRunning               bool
+	RIDataLock                  sync.RWMutex
+	SavingsPlanDataByInstanceID map[string]*SavingsPlanData
+	SavingsPlanDataRunning      bool
+	SavingsPlanDataLock         sync.RWMutex
+	ValidPricingKeys            map[string]bool
+	Clientset                   clustercache.ClusterCache
+	BaseCPUPrice                string
+	BaseRAMPrice                string
+	BaseGPUPrice                string
+	BaseSpotCPUPrice            string
+	BaseSpotRAMPrice            string
+	SpotLabelName               string
+	SpotLabelValue              string
+	ServiceKeyName              string
+	ServiceKeySecret            string
+	SpotDataRegion              string
+	SpotDataBucket              string
+	SpotDataPrefix              string
+	ProjectID                   string
+	DownloadPricingDataLock     sync.RWMutex
+	Config                      *ProviderConfig
+	ServiceAccountChecks        map[string]*ServiceAccountCheck
 	*CustomProvider
 }
 
@@ -594,6 +597,25 @@ func (aws *AWS) DownloadPricingData() error {
 			}()
 		}
 	}
+	if !aws.SavingsPlanDataRunning && c.AthenaBucketName != "" {
+		err = aws.GetSavingsPlanDataFromAthena()
+		if err != nil {
+			klog.V(1).Infof("Failed to lookup savings plan data: %s", err.Error())
+		} else {
+			go func() {
+				defer errors.HandlePanic()
+				aws.SavingsPlanDataRunning = true
+				for {
+					klog.Infof("Savings Plan watcher running... next update in 1h")
+					time.Sleep(time.Hour)
+					err := aws.GetSavingsPlanDataFromAthena()
+					if err != nil {
+						klog.Infof("Error updating Savings Plan data: %s", err.Error())
+					}
+				}
+			}()
+		}
+	}
 
 	aws.Pricing = make(map[string]*AWSProductTerms)
 	aws.ValidPricingKeys = make(map[string]bool)
@@ -837,6 +859,14 @@ func (aws *AWS) reservedInstancePricing(instanceID string) (*RIData, bool) {
 	return data, ok
 }
 
+func (aws *AWS) savingsPlanPricing(instanceID string) (*SavingsPlanData, bool) {
+	aws.SavingsPlanDataLock.RLock()
+	defer aws.SavingsPlanDataLock.RUnlock()
+
+	data, ok := aws.SavingsPlanDataByInstanceID[instanceID]
+	return data, ok
+}
+
 func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
 	key := k.Features()
 
@@ -874,6 +904,20 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*No
 			BaseGPUPrice: aws.BaseGPUPrice,
 			UsageType:    usageType,
 		}, nil
+	} else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
+		strCost := fmt.Sprintf("%f", sp.EffectiveCost)
+		return &Node{
+			Cost:         strCost,
+			VCPU:         terms.VCpu,
+			RAM:          terms.Memory,
+			GPU:          terms.GPU,
+			Storage:      terms.Storage,
+			BaseCPUPrice: aws.BaseCPUPrice,
+			BaseRAMPrice: aws.BaseRAMPrice,
+			BaseGPUPrice: aws.BaseGPUPrice,
+			UsageType:    usageType,
+		}, nil
+
 	} else if ri, ok := aws.reservedInstancePricing(k.ID()); ok {
 		strCost := fmt.Sprintf("%f", ri.EffectiveCost)
 		return &Node{
@@ -1503,6 +1547,76 @@ func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutpu
 	}
 }
 
+type SavingsPlanData struct {
+	ResourceID     string
+	EffectiveCost  float64
+	SavingsPlanARN string
+	MostRecentDate string
+}
+
+func (a *AWS) GetSavingsPlanDataFromAthena() error {
+	cfg, err := a.GetConfig()
+	if err != nil {
+		return err
+	}
+	if cfg.AthenaBucketName == "" {
+		return fmt.Errorf("No Athena Bucket configured")
+	}
+	if a.SavingsPlanDataByInstanceID == nil {
+		a.SavingsPlanDataByInstanceID = make(map[string]*SavingsPlanData)
+	}
+	tNow := time.Now()
+	tOneDayAgo := tNow.Add(time.Duration(-25) * time.Hour) // Also get files from one day ago to avoid boundary conditions
+	start := tOneDayAgo.Format("2006-01-02")
+	end := tNow.Format("2006-01-02")
+	q := `SELECT   
+		line_item_usage_start_date,
+		savings_plan_savings_plan_a_r_n,
+		line_item_resource_id,
+		savings_plan_savings_plan_effective_cost
+	FROM %s as cost_data
+	WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
+	AND line_item_line_item_type = 'SavingsPlanCoveredUsage' ORDER BY 
+	line_item_usage_start_date DESC`
+	query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
+	op, err := a.QueryAthenaBillingData(query)
+	if err != nil {
+		return fmt.Errorf("Error fetching Savings Plan Data: %s", err)
+	}
+	klog.Infof("Fetching SavingsPlan data...")
+	if len(op.ResultSet.Rows) > 1 {
+		a.SavingsPlanDataLock.Lock()
+		mostRecentDate := ""
+		for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
+			d := *r.Data[0].VarCharValue
+			if mostRecentDate == "" {
+				mostRecentDate = d
+			} else if mostRecentDate != d { // Get all most recent assignments
+				break
+			}
+			cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
+			if err != nil {
+				klog.Infof("Error converting `%s` from float ", *r.Data[3].VarCharValue)
+			}
+			r := &SavingsPlanData{
+				ResourceID:     *r.Data[2].VarCharValue,
+				EffectiveCost:  cost,
+				SavingsPlanARN: *r.Data[1].VarCharValue,
+				MostRecentDate: d,
+			}
+			a.SavingsPlanDataByInstanceID[r.ResourceID] = r
+		}
+		klog.V(1).Infof("Found %d savings plan applied instances", len(a.SavingsPlanDataByInstanceID))
+		for k, r := range a.SavingsPlanDataByInstanceID {
+			klog.V(1).Infof("Reserved Instance Data found for node %s : %f at time %s", k, r.EffectiveCost, r.MostRecentDate)
+		}
+		a.SavingsPlanDataLock.Unlock()
+	} else {
+		klog.Infof("No savings plan applied instance data found")
+	}
+	return nil
+}
+
 type RIData struct {
 	ResourceID     string
 	EffectiveCost  float64

+ 27 - 1
pkg/costmodel/costmodel.go

@@ -218,6 +218,7 @@ const (
 	queryDeploymentLabels     = `avg_over_time(deployment_match_labels[%s])`
 	queryStatefulsetLabels    = `avg_over_time(statefulSet_match_labels[%s])`
 	queryPodDaemonsets        = `sum(kube_pod_owner{owner_kind="DaemonSet"}) by (namespace,pod,owner_name,cluster_id)`
+	queryPodJobs              = `sum(kube_pod_owner{owner_kind="Job"}) by (namespace,pod,owner_name,cluster_id)`
 	queryServiceLabels        = `avg_over_time(service_selector_labels[%s])`
 	queryZoneNetworkUsage     = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
@@ -1738,7 +1739,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		return CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
 	}
 
-	numQueries := 21
+	numQueries := 22
 
 	var wg sync.WaitGroup
 	wg.Add(numQueries)
@@ -1981,6 +1982,19 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 			ec.Report(fmt.Errorf("Daemonsets: %s", promErr))
 		}
 	}()
+	var jobResults interface{}
+	go func() {
+		defer wg.Done()
+		defer measureTimeAsync(time.Now(), profileThreshold, "Jobs", queryProfileCh)
+		defer errors.HandlePanic()
+
+		var promErr error
+		jobResults, promErr = QueryRange(cli, fmt.Sprintf(queryPodJobs), start, end, window)
+
+		if promErr != nil {
+			ec.Report(fmt.Errorf("Jobs: %s", promErr))
+		}
+	}()
 	var statefulsetLabelsResults interface{}
 	go func() {
 		defer wg.Done()
@@ -2032,6 +2046,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		if k8sErr != nil {
 			return
 		}
+
 		namespaceLabelsMapping, k8sErr = getNamespaceLabels(cm.Cache, clusterID)
 		if k8sErr != nil {
 			return
@@ -2158,6 +2173,11 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		klog.V(1).Infof("Unable to get Pod Daemonsets for Metrics: %s", err.Error())
 	}
 
+	podJobs, err := GetPodJobsWithMetrics(jobResults, clusterID)
+	if err != nil {
+		klog.V(1).Infof("Unable to get Pod Jobs for Metrics: %s", err.Error())
+	}
+
 	podServicesMetricsMapping, err := getPodServicesWithMetrics(serviceLabels, podLabels)
 	if err != nil {
 		klog.V(1).Infof("Unable to get match Service Labels Metrics to Pods: %s", err.Error())
@@ -2407,6 +2427,11 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 			pds = []string{ds}
 		}
 
+		jobs := []string{}
+		if job, ok := podJobs[podKey]; ok {
+			jobs = []string{job}
+		}
+
 		costs := &CostData{
 			Name:            c.ContainerName,
 			PodName:         c.PodName,
@@ -2417,6 +2442,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 			Deployments:     podDeployments,
 			Daemonsets:      pds,
 			Statefulsets:    podStatefulSets,
+			Jobs:            jobs,
 			RAMReq:          RAMReqV,
 			RAMUsed:         RAMUsedV,
 			CPUReq:          CPUReqV,

+ 36 - 0
pkg/costmodel/promparsers.go

@@ -283,6 +283,42 @@ func GetPodDaemonsetsWithMetrics(queryResult interface{}, defaultClusterID strin
 	return toReturn, nil
 }
 
+func GetPodJobsWithMetrics(queryResult interface{}, defaultClusterID string) (map[string]string, error) {
+	toReturn := make(map[string]string)
+
+	// TODO: Pass actual query instead of PodJobsWithMetrics
+	result, err := prom.NewQueryResults("PodJobsWithMetrics", queryResult)
+	if err != nil {
+		return toReturn, err
+	}
+	for _, val := range result.Results {
+		ds, err := val.GetString("owner_name")
+		if err != nil {
+			return toReturn, err
+		}
+
+		ns, err := val.GetString("namespace")
+		if err != nil {
+			return toReturn, err
+		}
+
+		clusterID, err := val.GetString("cluster_id")
+		if clusterID == "" {
+			clusterID = defaultClusterID
+		}
+
+		pod, err := val.GetString("pod")
+		if err != nil {
+			return toReturn, err
+		}
+
+		nsKey := ns + "," + pod + "," + clusterID
+		toReturn[nsKey] = ds
+	}
+
+	return toReturn, nil
+}
+
 func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)