Просмотр исходного кода

Add request grouping for CostDataRange

Matt Bolt 6 лет назад
Родитель
Сommit
08194d8689
3 измененных файлов с 73 добавлено и 52 удалено
  1. 70 51
      costmodel/costmodel.go
  2. 2 1
      go.mod
  3. 1 0
      go.sum

+ 70 - 51
costmodel/costmodel.go

@@ -20,6 +20,9 @@ import (
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/klog"
+
+	"golang.org/x/sync/singleflight"
+	"github.com/google/uuid"
 )
 
 const (
@@ -45,19 +48,24 @@ const (
 )
 
 type CostModel struct {
-	Cache ClusterCache
+	Cache        ClusterCache
+	RequestGroup *singleflight.Group
 
 	stop chan struct{}
 }
 
 func NewCostModel(client kubernetes.Interface) *CostModel {
+	// request grouping to prevent over-requesting the same data prior to caching
+	requestGroup := new(singleflight.Group)
+
 	stopCh := make(chan struct{})
 	cache := NewKubernetesClusterCache(client)
 	cache.Run(stopCh)
 
 	return &CostModel{
-		Cache: cache,
-		stop:  stopCh,
+		Cache:        cache,
+		RequestGroup: requestGroup,
+		stop:         stopCh,
 	}
 }
 
@@ -1289,7 +1297,42 @@ func costDataPassesFilters(costs *CostData, namespace string, cluster string) bo
 	return passesNamespace && passesCluster
 }
 
+// Attempt to create a key for the request. Reduce the times to minutes in order to more easily group requests based on 
+// real time ranges. If for any reason, the key generation fails, return a uuid to ensure uniqueness.
+func requestKeyFor(startString string, endString string, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) string {
+	fullLayout := "2006-01-02T15:04:05.000Z"
+	keyLayout := "2006-01-02T15:04Z"
+
+	sTime, err := time.Parse(fullLayout, startString)
+	if err != nil {
+		return uuid.New().String()
+	}
+	eTime, err := time.Parse(fullLayout, startString)
+	if err != nil {
+		return uuid.New().String()
+	}
+
+	startKey := sTime.Format(keyLayout)
+	endKey := eTime.Format(keyLayout)
+
+	return fmt.Sprintf("%s,%s,%s,%s,%s,%t", startKey, endKey, windowString, filterNamespace, filterCluster, remoteEnabled)
+}
+
 func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
+	startString, endString, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
+	key := requestKeyFor(startString, endString, windowString, filterNamespace, filterCluster, remoteEnabled)
+
+	klog.V(1).Infof("ComputeCostDataRange with Key: %s", key)
+	
+	result, err, _ := cm.RequestGroup.Do(key, func() (interface{}, error) {
+		return cm.costDataRange(cli, clientset, cp, startString, endString, windowString, filterNamespace, filterCluster, remoteEnabled)
+	})
+
+	data := result.(map[string]*CostData)
+	return data, err
+}
+
+func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
 	startString, endString, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
@@ -1335,124 +1378,96 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	var promErr error
 	var resultRAMRequests interface{}
 	go func() {
-		defer measureTime(time.Now(), queryRAMRequests)
 		defer wg.Done()
 
 		resultRAMRequests, promErr = QueryRange(cli, queryRAMRequests, start, end, window)
 	}()
 	var resultRAMUsage interface{}
 	go func() {
-		defer measureTime(time.Now(), queryRAMUsage)
 		defer wg.Done()
 
 		resultRAMUsage, promErr = QueryRange(cli, queryRAMUsage, start, end, window)
 	}()
 	var resultCPURequests interface{}
 	go func() {
-		defer measureTime(time.Now(), queryCPURequests)
 		defer wg.Done()
 
 		resultCPURequests, promErr = QueryRange(cli, queryCPURequests, start, end, window)
 	}()
 	var resultCPUUsage interface{}
 	go func() {
-		defer measureTime(time.Now(), queryCPUUsage)
 		defer wg.Done()
 
 		resultCPUUsage, promErr = QueryRange(cli, queryCPUUsage, start, end, window)
 	}()
 	var resultGPURequests interface{}
 	go func() {
-		defer measureTime(time.Now(), queryGPURequests)
 		defer wg.Done()
 
 		resultGPURequests, promErr = QueryRange(cli, queryGPURequests, start, end, window)
 	}()
 	var resultPVRequests interface{}
 	go func() {
-		defer measureTime(time.Now(), queryPVRequests)
 		defer wg.Done()
 
 		resultPVRequests, promErr = QueryRange(cli, queryPVRequests, start, end, window)
 	}()
 	var resultNetZoneRequests interface{}
 	go func() {
-		defer measureTime(time.Now(), queryNetZoneRequests)
 		defer wg.Done()
 
 		resultNetZoneRequests, promErr = QueryRange(cli, queryNetZoneRequests, start, end, window)
 	}()
 	var resultNetRegionRequests interface{}
 	go func() {
-		defer measureTime(time.Now(), queryNetRegionRequests)
 		defer wg.Done()
 
 		resultNetRegionRequests, promErr = QueryRange(cli, queryNetRegionRequests, start, end, window)
 	}()
 	var resultNetInternetRequests interface{}
 	go func() {
-		defer measureTime(time.Now(), queryNetInternetRequests)
 		defer wg.Done()
 
 		resultNetInternetRequests, promErr = QueryRange(cli, queryNetInternetRequests, start, end, window)
 	}()
 	var pvPodAllocationResults interface{}
 	go func() {
-		pvAllocQuery := fmt.Sprintf(queryPVCAllocation, windowString)
-
-		defer measureTime(time.Now(), pvAllocQuery)
 		defer wg.Done()
 
-		pvPodAllocationResults, promErr = QueryRange(cli, pvAllocQuery, start, end, window)
+		pvPodAllocationResults, promErr = QueryRange(cli, fmt.Sprintf(queryPVCAllocation, windowString), start, end, window)
 	}()
 	var pvCostResults interface{}
 	go func() {
-		pvHourlyCostQuery := fmt.Sprintf(queryPVHourlyCost, windowString)
-
-		defer measureTime(time.Now(), pvHourlyCostQuery)
 		defer wg.Done()
 
-		pvCostResults, promErr = QueryRange(cli, pvHourlyCostQuery, start, end, window)
+		pvCostResults, promErr = QueryRange(cli, fmt.Sprintf(queryPVHourlyCost, windowString), start, end, window)
 	}()
 	var nsLabelsResults interface{}
 	go func() {
-		nsLabelQuery := fmt.Sprintf(queryNSLabels, windowString)
-
-		defer measureTime(time.Now(), nsLabelQuery)
 		defer wg.Done()
 
-		nsLabelsResults, promErr = QueryRange(cli, nsLabelQuery, start, end, window)
+		nsLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryNSLabels, windowString), start, end, window)
 	}()
 	var podLabelsResults interface{}
 	go func() {
-		podLabelQuery := fmt.Sprintf(queryPodLabels, windowString)
-
-		defer measureTime(time.Now(), podLabelQuery)
 		defer wg.Done()
 
-		podLabelsResults, promErr = QueryRange(cli, podLabelQuery, start, end, window)
+		podLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryPodLabels, windowString), start, end, window)
 	}()
 	var serviceLabelsResults interface{}
 	go func() {
-		serviceLabelQuery := fmt.Sprintf(queryServiceLabels, windowString)
-
-		defer measureTime(time.Now(), serviceLabelQuery)
 		defer wg.Done()
 
-		serviceLabelsResults, promErr = QueryRange(cli, serviceLabelQuery, start, end, window)
+		serviceLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryServiceLabels, windowString), start, end, window)
 	}()
 	var deploymentLabelsResults interface{}
 	go func() {
-		depLabelQuery := fmt.Sprintf(queryDeploymentLabels, windowString)
-
-		defer measureTime(time.Now(), depLabelQuery)
 		defer wg.Done()
 
-		deploymentLabelsResults, promErr = QueryRange(cli, depLabelQuery, start, end, window)
+		deploymentLabelsResults, promErr = QueryRange(cli, fmt.Sprintf(queryDeploymentLabels, windowString), start, end, window)
 	}()
 	var normalizationResults interface{}
 	go func() {
-		defer measureTime(time.Now(), normalization)
 		defer wg.Done()
 
 		normalizationResults, promErr = QueryRange(cli, normalization, start, end, window)
@@ -1895,20 +1910,21 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 			}
 		}
 	}
-
-	w := end.Sub(start)
-	w += window
-	if w.Minutes() > 0 {
-		wStr := fmt.Sprintf("%dm", int(w.Minutes()))
-		err = findDeletedNodeInfo(cli, missingNodes, wStr)
-		if err != nil {
-			klog.V(1).Infof("Error fetching historical node data: %s", err.Error())
-		}
-		err = findDeletedPodInfo(cli, missingContainers, wStr)
-		if err != nil {
-			klog.V(1).Infof("Error fetching historical pod data: %s", err.Error())
+	/*
+		w := end.Sub(start)
+		w += window
+		if w.Minutes() > 0 {
+			wStr := fmt.Sprintf("%dm", int(w.Minutes()))
+			err = findDeletedNodeInfo(cli, missingNodes, wStr)
+			if err != nil {
+				klog.V(1).Infof("Error fetching historical node data: %s", err.Error())
+			}
+			err = findDeletedPodInfo(cli, missingContainers, wStr)
+			if err != nil {
+				klog.V(1).Infof("Error fetching historical pod data: %s", err.Error())
+			}
 		}
-	}
+	*/
 
 	return containerNameCost, err
 }
@@ -2258,6 +2274,8 @@ func QueryRange(cli prometheusClient.Client, query string, start, end time.Time,
 	q.Set("step", strconv.FormatFloat(step.Seconds(), 'f', 3, 64))
 	u.RawQuery = q.Encode()
 
+	klog.V(1).Infof("Request: %s", u.String())
+
 	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
 	if err != nil {
 		return nil, err
@@ -2609,5 +2627,6 @@ func wrapPrometheusError(qr interface{}) (string, error) {
 
 func measureTime(start time.Time, name string) {
 	elapsed := time.Since(start)
-	klog.V(1).Infof("[Profile][%s] %s", elapsed, name)
+
+	klog.V(3).Infof("[Profiler] %s took %s", name, elapsed)
 }

+ 2 - 1
go.mod

@@ -12,6 +12,7 @@ require (
 	github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
 	github.com/golang/mock v1.2.0
 	github.com/google/martian v2.1.0+incompatible // indirect
+	github.com/google/uuid v1.1.1
 	github.com/googleapis/gax-go v2.0.2+incompatible // indirect
 	github.com/gophercloud/gophercloud v0.2.0 // indirect
 	github.com/imdario/mergo v0.3.7 // indirect
@@ -28,7 +29,7 @@ require (
 	golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529 // indirect
 	golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac // indirect
 	golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
-	golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect
+	golang.org/x/sync v0.0.0-20190423024810-112230192c58
 	google.golang.org/api v0.4.0
 	gotest.tools v2.2.0+incompatible
 	k8s.io/api v0.0.0-20190913080256-21721929cffa

+ 1 - 0
go.sum

@@ -93,6 +93,7 @@ github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPg
 github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
 github.com/google/uuid v0.0.0-20171113160352-8c31c18f31ed/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
 github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/googleapis/gax-go v2.0.2+incompatible h1:silFMLAnr330+NRuag/VjIGF7TLp/LBrV2CJKFLWEww=
 github.com/googleapis/gax-go v2.0.2+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY=