Переглянути джерело

Migrate Agg API WIP: refactor CDMR to use window complete

Niko Kovacevic 5 роки тому
батько
коміт
a60c663d20
3 змінених файлів з 80 додано та 115 видалено
  1. 11 9
      pkg/costmodel/aggregation.go
  2. 68 103
      pkg/costmodel/costmodel.go
  3. 1 3
      pkg/costmodel/router.go

+ 11 - 9
pkg/costmodel/aggregation.go

@@ -1030,18 +1030,18 @@ func DefaultAggregateQueryOpts() *AggregateQueryOpts {
 // ComputeAggregateCostModel computes cost data for the given window, then aggregates it by the given fields.
 // Data is cached on two levels: the aggregation is cached as well as the underlying cost data.
 func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, window kubecost.Window, field string, subfields []string, opts *AggregateQueryOpts) (map[string]*Aggregation, string, error) {
-	// Window must be closed, i.e. neither start nor end can be nil
+	// Window is the range of the query, i.e. (start, end)
+	// It must be closed, i.e. neither start nor end can be nil
 	if window.IsOpen() {
 		return nil, "", fmt.Errorf("illegal window: %s", window)
 	}
 
-	// Window is the range of the query, i.e. (start, end)
-
 	// Resolution is the duration of each datum in the cost model range query,
 	// which corresponds to both the step size given to Prometheus query_range
 	// and to the window passed to the range queries.
 	// i.e. by default, we support 1h resolution for queries of windows defined
 	// in terms of days or integer multiples of hours (e.g. 1d, 12h)
+	resolution := time.Hour
 
 	// Determine resolution by size of duration and divisibility of window.
 	// By default, resolution is 1hr. If the window is smaller than 1hr, then
@@ -1049,9 +1049,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 	// resolution goes down to 1m. If the window is greater than 1d, then
 	// resolution gets scaled up to improve performance by reducing the amount
 	// of data being computed.
-	resolution := time.Hour
 	durMins := int64(math.Trunc(window.Minutes()))
-
 	if durMins < 24*60 { // less than 1d
 		if durMins%60 != 0 { // not divisible by 1h
 			resolution = time.Minute
@@ -1337,7 +1335,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 			klog.V(3).Infof("Cache item: %s", k)
 		}
 
-		costData, err = a.Model.ComputeCostDataRange(promClient, a.CloudProvider, window, resolution, "", "", remoteEnabled)
+		costData, err = a.Model.ComputeCostDataRange(promClient, a.CloudProvider, window, resolution, "", "")
 		if err != nil {
 			if prom.IsErrorCollection(err) {
 				return nil, "", err
@@ -1399,18 +1397,22 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 
 	idleCoefficients := make(map[string]float64)
 	if allocateIdle {
+		duration, offset := window.ToDurationOffset()
+
 		idleDurationCalcHours := window.Hours()
 		if window.Hours() < 1 {
 			idleDurationCalcHours = 1
 		}
-		windowStr := fmt.Sprintf("%dh", int(idleDurationCalcHours))
+		duration = fmt.Sprintf("%dh", int(idleDurationCalcHours))
+
 		if a.ThanosClient != nil {
 			offset = thanos.Offset()
 			klog.Infof("Setting offset to %s", offset)
 		}
-		idleCoefficients, err = a.ComputeIdleCoefficient(costData, promClient, a.CloudProvider, discount, customDiscount, windowStr, offset)
+
+		idleCoefficients, err = a.ComputeIdleCoefficient(costData, promClient, a.CloudProvider, discount, customDiscount, duration, offset)
 		if err != nil {
-			klog.Errorf("error computing idle coefficient: windowString=%s, offset=%s, err=%s", windowStr, offset, err)
+			klog.Errorf("error computing idle coefficient: windowString=%s, offset=%s, err=%s", duration, offset, err)
 			return nil, "", err
 		}
 	}

+ 68 - 103
pkg/costmodel/costmodel.go

@@ -22,7 +22,6 @@ import (
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/klog"
 
-	"github.com/google/uuid"
 	"golang.org/x/sync/singleflight"
 )
 
@@ -1428,36 +1427,24 @@ func floorMultiple(value int64, multiple int64) int64 {
 
 // Attempt to create a key for the request. Reduce the times to minutes in order to more easily group requests based on
 // real time ranges. If for any reason, the key generation fails, return a uuid to ensure uniqueness.
-func requestKeyFor(startString string, endString string, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) string {
-	fullLayout := "2006-01-02T15:04:05.000Z"
+func requestKeyFor(window kubecost.Window, resolution time.Duration, filterNamespace string, filterCluster string) string {
 	keyLayout := "2006-01-02T15:04Z"
 
-	sTime, err := time.Parse(fullLayout, startString)
-	if err != nil {
-		klog.V(1).Infof("[Warning] Start=%s failed to parse when generating request key: %s", startString, err.Error())
-		return uuid.New().String()
-	}
-	eTime, err := time.Parse(fullLayout, endString)
-	if err != nil {
-		klog.V(1).Infof("[Warning] End=%s failed to parse when generating request key: %s", endString, err.Error())
-		return uuid.New().String()
-	}
-
 	// We "snap" start time and duration to their closest 5 min multiple less than itself, by
 	// applying a snapped duration to a snapped start time.
-	durMins := int64(eTime.Sub(sTime).Minutes())
+	durMins := int64(window.Minutes())
 	durMins = floorMultiple(durMins, 5)
 
-	sMins := int64(sTime.Minute())
+	sMins := int64(window.Start().Minute())
 	sOffset := sMins - floorMultiple(sMins, 5)
 
-	sTime = sTime.Add(-time.Duration(sOffset) * time.Minute)
-	eTime = sTime.Add(time.Duration(durMins) * time.Minute)
+	sTime := window.Start().Add(-time.Duration(sOffset) * time.Minute)
+	eTime := window.Start().Add(time.Duration(durMins) * time.Minute)
 
 	startKey := sTime.Format(keyLayout)
 	endKey := eTime.Format(keyLayout)
 
-	return fmt.Sprintf("%s,%s,%s,%s,%s,%t", startKey, endKey, windowString, filterNamespace, filterCluster, remoteEnabled)
+	return fmt.Sprintf("%s,%s,%s,%s,%s,%t", startKey, endKey, resolution.String(), filterNamespace, filterCluster)
 }
 
 // func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, cp costAnalyzerCloud.Provider,
@@ -1466,17 +1453,17 @@ func requestKeyFor(startString string, endString string, windowString string, fi
 
 // ComputeCostDataRange executes a range query for cost data.
 // Note that "offset" represents the time between the function call and "endString", and is also passed for convenience
-func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, cp costAnalyzerCloud.Provider, window kubecost.Window, resolution time.Duration, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
+func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, cp costAnalyzerCloud.Provider, window kubecost.Window, resolution time.Duration, filterNamespace string, filterCluster string) (map[string]*CostData, error) {
 	// Create a request key for request grouping. This key will be used to represent the cost-model result
 	// for the specific inputs to prevent multiple queries for identical data.
-	key := requestKeyFor(window, resolution, filterNamespace, filterCluster, remoteEnabled)
+	key := requestKeyFor(window, resolution, filterNamespace, filterCluster)
 
 	klog.V(4).Infof("ComputeCostDataRange with Key: %s", key)
 
 	// If there is already a request out that uses the same data, wait for it to return to share the results.
 	// Otherwise, start executing.
 	result, err, _ := cm.RequestGroup.Do(key, func() (interface{}, error) {
-		return cm.costDataRange(cli, cp, window, resolution, filterNamespace, filterCluster, remoteEnabled, offset)
+		return cm.costDataRange(cli, cp, window, resolution, filterNamespace, filterCluster)
 	})
 
 	data, ok := result.(map[string]*CostData)
@@ -1487,62 +1474,73 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, cp costAn
 	return data, err
 }
 
-func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerCloud.Provider, window kubecost.Window, resolution time.Duration, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
+func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerCloud.Provider, window kubecost.Window, resolution time.Duration, filterNamespace string, filterCluster string) (map[string]*CostData, error) {
 	clusterID := env.GetClusterID()
 
-	durHrs := end.Sub(start).Hours() + 1
+	// durHrs := end.Sub(start).Hours() + 1
+
+	if window.IsOpen() {
+		return nil, fmt.Errorf("illegal window: %s", window)
+	}
+	start := *window.Start()
+	end := *window.End()
 
-	if remoteEnabled == true {
-		remoteLayout := "2006-01-02T15:04:05Z"
-		remoteStartStr := start.Format(remoteLayout)
-		remoteEndStr := end.Format(remoteLayout)
-		klog.V(1).Infof("Using remote database for query from %s to %s with window %s", startString, endString, windowString)
-		return CostDataRangeFromSQL("", "", windowString, remoteStartStr, remoteEndStr)
+	// Snap resolution to the nearest minute
+	resMins := int64(math.Trunc(resolution.Minutes()))
+	if resMins == 0 {
+		return nil, fmt.Errorf("resolution must be greater than 0.0")
 	}
+	resolution = time.Duration(resMins) * time.Minute
+
+	// Convert to Prometheus-style duration string in terms of m or h
+	resStr := fmt.Sprintf("%sm", resMins)
+	if resMins%60 == 0 {
+		resStr = fmt.Sprintf("%sh", resMins/60)
+	}
+
+	log.Infof("CostDataRange(%s, %s, %f)", window, resStr, resolution.Hours())
 
 	scrapeIntervalSeconds := cm.ScrapeInterval.Seconds()
 
 	ctx := prom.NewContext(cli)
 
-	queryRAMAlloc := fmt.Sprintf(queryRAMAllocationByteHours, windowString, scrapeIntervalSeconds)
-	queryCPUAlloc := fmt.Sprintf(queryCPUAllocationVCPUHours, windowString, scrapeIntervalSeconds)
-	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
-	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
-	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
-	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
-	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "", resolutionHours, windowString, "")
+	queryRAMAlloc := fmt.Sprintf(queryRAMAllocationByteHours, resStr, scrapeIntervalSeconds)
+	queryCPUAlloc := fmt.Sprintf(queryCPUAllocationVCPUHours, resStr, scrapeIntervalSeconds)
+	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, resStr, "", resStr, "")
+	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, resStr, "", resStr, "")
+	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, resStr, "", resStr, "")
+	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, resStr, "")
+	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, resStr, "", resStr, "", resolution.Hours(), resStr, "")
 	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
-	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, windowString, scrapeIntervalSeconds)
-	queryPVHourlyCost := fmt.Sprintf(queryPVHourlyCostFmt, windowString)
-	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, windowString, "")
-	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, windowString, "")
-	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, windowString, "")
-	queryNormalization := fmt.Sprintf(normalizationStr, windowString, "")
-
-	queryProfileStart := time.Now()
+	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, resStr, scrapeIntervalSeconds)
+	queryPVHourlyCost := fmt.Sprintf(queryPVHourlyCostFmt, resStr)
+	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, resStr, "")
+	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, resStr, "")
+	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, resStr, "")
+	queryNormalization := fmt.Sprintf(normalizationStr, resStr, "")
 
 	// Submit all queries for concurrent evaluation
-	resChRAMRequests := ctx.QueryRange(queryRAMRequests, start, end, window)
-	resChRAMUsage := ctx.QueryRange(queryRAMUsage, start, end, window)
-	resChRAMAlloc := ctx.QueryRange(queryRAMAlloc, start, end, window)
-	resChCPURequests := ctx.QueryRange(queryCPURequests, start, end, window)
-	resChCPUUsage := ctx.QueryRange(queryCPUUsage, start, end, window)
-	resChCPUAlloc := ctx.QueryRange(queryCPUAlloc, start, end, window)
-	resChGPURequests := ctx.QueryRange(queryGPURequests, start, end, window)
-	resChPVRequests := ctx.QueryRange(queryPVRequests, start, end, window)
-	resChPVCAlloc := ctx.QueryRange(queryPVCAllocation, start, end, window)
-	resChPVHourlyCost := ctx.QueryRange(queryPVHourlyCost, start, end, window)
-	resChNetZoneRequests := ctx.QueryRange(queryNetZoneRequests, start, end, window)
-	resChNetRegionRequests := ctx.QueryRange(queryNetRegionRequests, start, end, window)
-	resChNetInternetRequests := ctx.QueryRange(queryNetInternetRequests, start, end, window)
-	resChNSLabels := ctx.QueryRange(fmt.Sprintf(queryNSLabels, windowString), start, end, window)
-	resChPodLabels := ctx.QueryRange(fmt.Sprintf(queryPodLabels, windowString), start, end, window)
-	resChServiceLabels := ctx.QueryRange(fmt.Sprintf(queryServiceLabels, windowString), start, end, window)
-	resChDeploymentLabels := ctx.QueryRange(fmt.Sprintf(queryDeploymentLabels, windowString), start, end, window)
-	resChStatefulsetLabels := ctx.QueryRange(fmt.Sprintf(queryStatefulsetLabels, windowString), start, end, window)
-	resChJobs := ctx.QueryRange(queryPodJobs, start, end, window)
-	resChDaemonsets := ctx.QueryRange(queryPodDaemonsets, start, end, window)
-	resChNormalization := ctx.QueryRange(queryNormalization, start, end, window)
+	resChRAMRequests := ctx.QueryRange(queryRAMRequests, start, end, resolution)
+	resChRAMUsage := ctx.QueryRange(queryRAMUsage, start, end, resolution)
+	resChRAMAlloc := ctx.QueryRange(queryRAMAlloc, start, end, resolution)
+	resChCPURequests := ctx.QueryRange(queryCPURequests, start, end, resolution)
+	resChCPUUsage := ctx.QueryRange(queryCPUUsage, start, end, resolution)
+	resChCPUAlloc := ctx.QueryRange(queryCPUAlloc, start, end, resolution)
+	resChGPURequests := ctx.QueryRange(queryGPURequests, start, end, resolution)
+	resChPVRequests := ctx.QueryRange(queryPVRequests, start, end, resolution)
+	resChPVCAlloc := ctx.QueryRange(queryPVCAllocation, start, end, resolution)
+	resChPVHourlyCost := ctx.QueryRange(queryPVHourlyCost, start, end, resolution)
+	resChNetZoneRequests := ctx.QueryRange(queryNetZoneRequests, start, end, resolution)
+	resChNetRegionRequests := ctx.QueryRange(queryNetRegionRequests, start, end, resolution)
+	resChNetInternetRequests := ctx.QueryRange(queryNetInternetRequests, start, end, resolution)
+	resChNSLabels := ctx.QueryRange(fmt.Sprintf(queryNSLabels, resStr), start, end, resolution)
+	resChPodLabels := ctx.QueryRange(fmt.Sprintf(queryPodLabels, resStr), start, end, resolution)
+	resChServiceLabels := ctx.QueryRange(fmt.Sprintf(queryServiceLabels, resStr), start, end, resolution)
+	resChDeploymentLabels := ctx.QueryRange(fmt.Sprintf(queryDeploymentLabels, resStr), start, end, resolution)
+	resChStatefulsetLabels := ctx.QueryRange(fmt.Sprintf(queryStatefulsetLabels, resStr), start, end, resolution)
+	resChJobs := ctx.QueryRange(queryPodJobs, start, end, resolution)
+	resChDaemonsets := ctx.QueryRange(queryPodDaemonsets, start, end, resolution)
+	resChNormalization := ctx.QueryRange(queryNormalization, start, end, resolution)
 
 	// Pull k8s pod, controller, service, and namespace details
 	podlist := cm.Cache.GetAllPods()
@@ -1590,9 +1588,6 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 	resJobs, _ := resChJobs.Await()
 	resNormalization, _ := resChNormalization.Await()
 
-	measureTime(queryProfileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): Prom/k8s Queries", durHrs))
-	defer measureTime(time.Now(), profileThreshold, fmt.Sprintf("costDataRange(%fh): Processing Query Data", durHrs))
-
 	// NOTE: The way we currently handle errors and warnings only early returns if there is an error. Warnings
 	// NOTE: will not propagate unless coupled with errors.
 	if ctx.HasErrors() {
@@ -1611,18 +1606,12 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 		return nil, ctx.ErrorCollection()
 	}
 
-	profileStart := time.Now()
-
 	normalizationValue, err := getNormalizations(resNormalization)
 	if err != nil {
-		msg := fmt.Sprintf("error computing normalization for start=%s, end=%s, window=%s, res=%f", start, end, window, resolutionHours*60*60)
+		msg := fmt.Sprintf("error computing normalization for start=%s, end=%s, res=%s", start, end, resolution)
 		return nil, prom.WrapError(err, msg)
 	}
 
-	measureTime(profileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): compute normalizations", durHrs))
-
-	profileStart = time.Now()
-
 	pvClaimMapping, err := GetPVInfo(resPVRequests, clusterID)
 	if err != nil {
 		// Just log for compatibility with KSM less than 1.6
@@ -1652,10 +1641,6 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 		}
 	}
 
-	measureTime(profileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): process PV data", durHrs))
-
-	profileStart = time.Now()
-
 	nsLabels, err := GetNamespaceLabelsMetrics(resNSLabels, clusterID)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Namespace Labels for Metrics: %s", err.Error())
@@ -1684,10 +1669,6 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 		klog.V(1).Infof("Unable to get Deployment Match Labels for Metrics: %s", err.Error())
 	}
 
-	measureTime(profileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): process labels", durHrs))
-
-	profileStart = time.Now()
-
 	podStatefulsetMetricsMapping, err := getPodDeploymentsWithMetrics(statefulsetLabels, podLabels)
 	if err != nil {
 		klog.V(1).Infof("Unable to get match Statefulset Labels Metrics to Pods: %s", err.Error())
@@ -1722,10 +1703,6 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 		networkUsageMap = make(map[string]*NetworkUsageData)
 	}
 
-	measureTime(profileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): process deployments, services, and network usage", durHrs))
-
-	profileStart = time.Now()
-
 	containerNameCost := make(map[string]*CostData)
 	containers := make(map[string]bool)
 	otherClusterPVRecorded := make(map[string]bool)
@@ -1788,20 +1765,12 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 		containers[key] = true
 	}
 
-	measureTime(profileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): GetContainerMetricVectors", durHrs))
-
-	profileStart = time.Now()
-
 	// Request metrics can show up after pod eviction and completion.
 	// This method synchronizes requests to allocations such that when
 	// allocation is 0, so are requests
 	applyAllocationToRequests(RAMAllocMap, RAMReqMap)
 	applyAllocationToRequests(CPUAllocMap, CPUReqMap)
 
-	measureTime(profileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): applyAllocationToRequests", durHrs))
-
-	profileStart = time.Now()
-
 	missingNodes := make(map[string]*costAnalyzerCloud.Node)
 	missingContainers := make(map[string]*CostData)
 	for key := range containers {
@@ -1977,8 +1946,6 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 		}
 	}
 
-	measureTime(profileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): build CostData map", durHrs))
-
 	unmounted := findUnmountedPVCostData(cm.ClusterMap, unmountedPVs, namespaceLabelsMapping)
 	for k, costs := range unmounted {
 		klog.V(4).Infof("Unmounted PVs in Namespace/ClusterID: %s/%s", costs.Namespace, costs.ClusterID)
@@ -1988,11 +1955,9 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 		}
 	}
 
-	w := end.Sub(start)
-	w += window
-	if w.Minutes() > 0 {
-		wStr := fmt.Sprintf("%dm", int(w.Minutes()))
-		err = findDeletedNodeInfo(cli, missingNodes, wStr, offset)
+	if window.Minutes() > 0 {
+		dur, off := window.ToDurationOffset()
+		err = findDeletedNodeInfo(cli, missingNodes, dur, off)
 		if err != nil {
 			klog.V(1).Infof("Error fetching historical node data: %s", err.Error())
 		}

+ 1 - 3
pkg/costmodel/router.go

@@ -485,8 +485,6 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 		http.Error(w, fmt.Sprintf("invalid date range: %s", wStr), http.StatusBadRequest)
 	}
 
-	remoteEnabled := env.IsRemoteEnabled() && remote != "false"
-
 	// Use Thanos Client if it exists (enabled) and remote flag set
 	var pClient prometheusClient.Client
 	if remote != "false" && a.ThanosClient != nil {
@@ -496,7 +494,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	}
 
 	resolution := time.Hour
-	data, err := a.Model.ComputeCostDataRange(pClient, a.CloudProvider, window, resolution, namespace, cluster, remoteEnabled)
+	data, err := a.Model.ComputeCostDataRange(pClient, a.CloudProvider, window, resolution, namespace, cluster)
 	if err != nil {
 		w.Write(WrapData(nil, err))
 	}