Просмотр исходного кода

Merge pull request #230 from kubecost/Bolt-profiling-thanos

Performance Improvements
Matt Bolt 6 лет назад
Родитель
Сommit
564a81b250
3 измененных файлов с 98 добавлено и 24 удалено
  1. 95 23
      costmodel/costmodel.go
  2. 2 1
      go.mod
  3. 1 0
      go.sum

+ 95 - 23
costmodel/costmodel.go

@@ -20,6 +20,9 @@ import (
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/klog"
+
+	"github.com/google/uuid"
+	"golang.org/x/sync/singleflight"
 )
 
 const (
@@ -45,19 +48,24 @@ const (
 )
 
 type CostModel struct {
-	Cache ClusterCache
+	Cache        ClusterCache
+	RequestGroup *singleflight.Group
 
 	stop chan struct{}
 }
 
 func NewCostModel(client kubernetes.Interface) *CostModel {
+	// request grouping to prevent over-requesting the same data prior to caching
+	requestGroup := new(singleflight.Group)
+
 	stopCh := make(chan struct{})
 	cache := NewKubernetesClusterCache(client)
 	cache.Run(stopCh)
 
 	return &CostModel{
-		Cache: cache,
-		stop:  stopCh,
+		Cache:        cache,
+		RequestGroup: requestGroup,
+		stop:         stopCh,
 	}
 }
 
@@ -1289,7 +1297,51 @@ func costDataPassesFilters(costs *CostData, namespace string, cluster string) bo
 	return passesNamespace && passesCluster
 }
 
+// Attempt to create a key for the request. Reduce the times to minutes in order to more easily group requests based on
+// real time ranges. If for any reason, the key generation fails, return a uuid to ensure uniqueness.
+func requestKeyFor(startString string, endString string, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) string {
+	fullLayout := "2006-01-02T15:04:05.000Z"
+	keyLayout := "2006-01-02T15:04Z"
+
+	sTime, err := time.Parse(fullLayout, startString)
+	if err != nil {
+		return uuid.New().String()
+	}
+	eTime, err := time.Parse(fullLayout, startString)
+	if err != nil {
+		return uuid.New().String()
+	}
+
+	startKey := sTime.Format(keyLayout)
+	endKey := eTime.Format(keyLayout)
+
+	return fmt.Sprintf("%s,%s,%s,%s,%s,%t", startKey, endKey, windowString, filterNamespace, filterCluster, remoteEnabled)
+}
+
+// Executes a range query for cost data
 func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
+	startString, endString, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
+	// Create a request key for request grouping. This key will be used to represent the cost-model result
+	// for the specific inputs to prevent multiple queries for identical data.
+	key := requestKeyFor(startString, endString, windowString, filterNamespace, filterCluster, remoteEnabled)
+
+	klog.V(3).Infof("ComputeCostDataRange with Key: %s", key)
+
+	// If there is already a request out that uses the same data, wait for it to return to share the results.
+	// Otherwise, start executing.
+	result, err, _ := cm.RequestGroup.Do(key, func() (interface{}, error) {
+		return cm.costDataRange(cli, clientset, cp, startString, endString, windowString, filterNamespace, filterCluster, remoteEnabled)
+	})
+
+	data, ok := result.(map[string]*CostData)
+	if !ok {
+		return nil, fmt.Errorf("Failed to cast result as map[string]*CostData")
+	}
+
+	return data, err
+}
+
+func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
 	startString, endString, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
@@ -1335,83 +1387,99 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	var promErr error
 	var resultRAMRequests interface{}
 	go func() {
-		resultRAMRequests, promErr = QueryRange(cli, queryRAMRequests, start, end, window)
 		defer wg.Done()
+
+		resultRAMRequests, promErr = QueryRange(cli, queryRAMRequests, start, end, window)
 	}()
 	var resultRAMUsage interface{}
 	go func() {
-		resultRAMUsage, promErr = QueryRange(cli, queryRAMUsage, start, end, window)
 		defer wg.Done()
+
+		resultRAMUsage, promErr = QueryRange(cli, queryRAMUsage, start, end, window)
 	}()
 	var resultCPURequests interface{}
 	go func() {
-		resultCPURequests, promErr = QueryRange(cli, queryCPURequests, start, end, window)
 		defer wg.Done()
+
+		resultCPURequests, promErr = QueryRange(cli, queryCPURequests, start, end, window)
 	}()
 	var resultCPUUsage interface{}
 	go func() {
-		resultCPUUsage, promErr = QueryRange(cli, queryCPUUsage, start, end, window)
 		defer wg.Done()
+
+		resultCPUUsage, promErr = QueryRange(cli, queryCPUUsage, start, end, window)
 	}()
 	var resultGPURequests interface{}
 	go func() {
-		resultGPURequests, promErr = QueryRange(cli, queryGPURequests, start, end, window)
 		defer wg.Done()
+
+		resultGPURequests, promErr = QueryRange(cli, queryGPURequests, start, end, window)
 	}()
 	var resultPVRequests interface{}
 	go func() {
-		resultPVRequests, promErr = QueryRange(cli, queryPVRequests, start, end, window)
 		defer wg.Done()
+
+		resultPVRequests, promErr = QueryRange(cli, queryPVRequests, start, end, window)
 	}()
 	var resultNetZoneRequests interface{}
 	go func() {
-		resultNetZoneRequests, promErr = QueryRange(cli, queryNetZoneRequests, start, end, window)
 		defer wg.Done()
+
+		resultNetZoneRequests, promErr = QueryRange(cli, queryNetZoneRequests, start, end, window)
 	}()
 	var resultNetRegionRequests interface{}
 	go func() {
-		resultNetRegionRequests, promErr = QueryRange(cli, queryNetRegionRequests, start, end, window)
 		defer wg.Done()
+
+		resultNetRegionRequests, promErr = QueryRange(cli, queryNetRegionRequests, start, end, window)
 	}()
 	var resultNetInternetRequests interface{}
 	go func() {
-		resultNetInternetRequests, promErr = QueryRange(cli, queryNetInternetRequests, start, end, window)
 		defer wg.Done()
+
+		resultNetInternetRequests, promErr = QueryRange(cli, queryNetInternetRequests, start, end, window)
 	}()
 	var pvPodAllocationResults interface{}
 	go func() {
-		pvPodAllocationResults, promErr = QueryRange(cli, fmt.Sprintf(queryPVCAllocation, windowString), start, end, window)
 		defer wg.Done()
+
+		pvPodAllocationResults, promErr = QueryRange(cli, fmt.Sprintf(queryPVCAllocation, windowString), start, end, window)
 	}()
 	var pvCostResults interface{}
 	go func() {
-		pvCostResults, promErr = QueryRange(cli, fmt.Sprintf(queryPVHourlyCost, windowString), start, end, window)
 		defer wg.Done()
+
+		pvCostResults, promErr = QueryRange(cli, fmt.Sprintf(queryPVHourlyCost, windowString), start, end, window)
 	}()
 	var nsLabelsResults interface{}
 	go func() {
-		nsLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryNSLabels, windowString), start, end, window)
 		defer wg.Done()
+
+		nsLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryNSLabels, windowString), start, end, window)
 	}()
 	var podLabelsResults interface{}
 	go func() {
-		podLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryPodLabels, windowString), start, end, window)
 		defer wg.Done()
+
+		podLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryPodLabels, windowString), start, end, window)
 	}()
 	var serviceLabelsResults interface{}
 	go func() {
-		serviceLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryServiceLabels, windowString), start, end, window)
 		defer wg.Done()
+
+		serviceLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryServiceLabels, windowString), start, end, window)
 	}()
 	var deploymentLabelsResults interface{}
 	go func() {
-		deploymentLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryDeploymentLabels, windowString), start, end, window)
 		defer wg.Done()
+
+		deploymentLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryDeploymentLabels, windowString), start, end, window)
 	}()
 	var normalizationResults interface{}
 	go func() {
-		normalizationResults, promErr = QueryRange(cli, normalization, start, end, window)
 		defer wg.Done()
+
+		normalizationResults, promErr = QueryRange(cli, normalization, start, end, window)
 	}()
 
 	podDeploymentsMapping := make(map[string]map[string][]string)
@@ -1439,6 +1507,8 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 
 	wg.Wait()
 
+	defer measureTime(time.Now(), "Processing Query Data")
+
 	if promErr != nil {
 		return nil, fmt.Errorf("Error querying prometheus: %s", promErr.Error())
 	}
@@ -1858,10 +1928,6 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		if err != nil {
 			klog.V(1).Infof("Error fetching historical node data: %s", err.Error())
 		}
-		err = findDeletedPodInfo(cli, missingContainers, wStr)
-		if err != nil {
-			klog.V(1).Infof("Error fetching historical pod data: %s", err.Error())
-		}
 	}
 
 	return containerNameCost, err
@@ -2560,3 +2626,9 @@ func wrapPrometheusError(qr interface{}) (string, error) {
 	eStr, ok := e.(string)
 	return eStr, nil
 }
+
+func measureTime(start time.Time, name string) {
+	elapsed := time.Since(start)
+
+	klog.V(3).Infof("[Profiler] %s took %s", name, elapsed)
+}

+ 2 - 1
go.mod

@@ -12,6 +12,7 @@ require (
 	github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
 	github.com/golang/mock v1.2.0
 	github.com/google/martian v2.1.0+incompatible // indirect
+	github.com/google/uuid v1.1.1
 	github.com/googleapis/gax-go v2.0.2+incompatible // indirect
 	github.com/gophercloud/gophercloud v0.2.0 // indirect
 	github.com/imdario/mergo v0.3.7 // indirect
@@ -28,7 +29,7 @@ require (
 	golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529 // indirect
 	golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac // indirect
 	golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
-	golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect
+	golang.org/x/sync v0.0.0-20190423024810-112230192c58
 	google.golang.org/api v0.4.0
 	gotest.tools v2.2.0+incompatible
 	k8s.io/api v0.0.0-20190913080256-21721929cffa

+ 1 - 0
go.sum

@@ -93,6 +93,7 @@ github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPg
 github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
 github.com/google/uuid v0.0.0-20171113160352-8c31c18f31ed/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
 github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/googleapis/gax-go v2.0.2+incompatible h1:silFMLAnr330+NRuag/VjIGF7TLp/LBrV2CJKFLWEww=
 github.com/googleapis/gax-go v2.0.2+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY=