Просмотр исходного кода

[cherry-pick] Add offset when extracting start time from Prometheus query (#3094) (#3107)

Signed-off-by: thomasvn <thomasvn.dev@gmail.com>
Thomas Nguyen 1 год назад
Родитель
Сommit
bc90a723b4

+ 11 - 10
pkg/costmodel/allocation_helpers.go

@@ -1458,7 +1458,6 @@ func getLoadBalancerCosts(lbMap map[serviceKey]*lbCost, resLBCost, resLBActiveMi
 			continue
 		}
 
-		// load balancers have interpolation for costs, we don't need to offset the resolution
 		lbStart, lbEnd := calculateStartAndEnd(res, resolution, window)
 		if lbStart.IsZero() || lbEnd.IsZero() {
 			log.Warnf("CostModel.ComputeAllocation: pvc %s has no running time", serviceKey)
@@ -2353,18 +2352,20 @@ func getUnmountedPodForNamespace(window opencost.Window, podMap map[podKey]*pod,
 
 func calculateStartAndEnd(result *prom.QueryResult, resolution time.Duration, window opencost.Window) (time.Time, time.Time) {
 	// Start and end for a range vector are pulled from the timestamps of the
-	// first and final values in the range. There is no "offsetting" required
-	// of the start or the end, as we used to do. If you query for a duration
-	// of time that is divisible by the given resolution, and set the end time
-	// to be precisely the end of the window, Prometheus should give all the
-	// relevant timestamps.
-	//
-	// E.g. avg(kube_pod_container_status_running{}) by (pod, namespace)[1h:1m]
-	// with time=01:00:00 will return, for a pod running the entire time,
-	// 61 timestamps where the first is 00:00:00 and the last is 01:00:00.
+	// first and final values in the range.
 	s := time.Unix(int64(result.Values[0].Timestamp), 0).UTC()
 	e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).UTC()
 
+	// As of Prometheus v3, we have had to reintroduce the "offsetting" of the
+	// start time.
+	//
+	// E.g. avg(node_total_hourly_cost{}) by (node, provider_id)[1h:5m] with
+	// time=01:00:00 will return, for a node running the entire time, 12
+	// timestamps where the first is 00:05:00 and the last is 01:00:00.
+	if IsPrometheusVersionGTE3() {
+		s = s.Add(-resolution)
+	}
+
 	// The only corner-case here is what to do if you only get one timestamp.
 	// This dilemma still requires the use of the resolution, and can be
 	// clamped using the window. In this case, we want to honor the existence

+ 89 - 31
pkg/costmodel/allocation_helpers_test.go

@@ -14,7 +14,8 @@ const Ki = 1024
 const Mi = Ki * 1024
 const Gi = Mi * 1024
 
-const minute = 60.0
+const second = 1.0
+const minute = second * 60.0
 const hour = minute * 60.0
 
 var windowStart = time.Date(2020, 6, 16, 0, 0, 0, 0, time.UTC)
@@ -201,7 +202,6 @@ var pvMap1 = map[pvKey]*pv{
 	},
 }
 
-/* pv/pvc Helpers */
 func TestBuildPVMap(t *testing.T) {
 	pvMap1NoBytes := make(map[pvKey]*pv, len(pvMap1))
 	for thisPVKey, thisPV := range pvMap1 {
@@ -211,6 +211,9 @@ func TestBuildPVMap(t *testing.T) {
 		pvMap1NoBytes[thisPVKey] = clonePV
 	}
 
+	// These test cases are mocking behavior from Prometheus v3+
+	prometheusVersion = "3.0.0"
+
 	testCases := map[string]struct {
 		resolution              time.Duration
 		resultsPVCostPerGiBHour []*prom.QueryResult
@@ -272,9 +275,6 @@ func TestBuildPVMap(t *testing.T) {
 						"persistentvolume": "pv1",
 					},
 					Values: []*util.Vector{
-						{
-							Timestamp: startFloat,
-						},
 						{
 							Timestamp: startFloat + (hour * 6),
 						},
@@ -292,9 +292,6 @@ func TestBuildPVMap(t *testing.T) {
 						"persistentvolume": "pv2",
 					},
 					Values: []*util.Vector{
-						{
-							Timestamp: startFloat,
-						},
 						{
 							Timestamp: startFloat + (hour * 6),
 						},
@@ -315,9 +312,6 @@ func TestBuildPVMap(t *testing.T) {
 						"persistentvolume": "pv3",
 					},
 					Values: []*util.Vector{
-						{
-							Timestamp: startFloat + (hour * 6),
-						},
 						{
 							Timestamp: startFloat + (hour * 12),
 						},
@@ -332,9 +326,6 @@ func TestBuildPVMap(t *testing.T) {
 						"persistentvolume": "pv4",
 					},
 					Values: []*util.Vector{
-						{
-							Timestamp: startFloat,
-						},
 						{
 							Timestamp: startFloat + (hour * 6),
 						},
@@ -372,8 +363,6 @@ func TestBuildPVMap(t *testing.T) {
 	}
 }
 
-/* Helper Helpers */
-
 func TestGetUnmountedPodForCluster(t *testing.T) {
 	testCases := map[string]struct {
 		window   opencost.Window
@@ -455,37 +444,38 @@ func TestGetUnmountedPodForCluster(t *testing.T) {
 }
 
 func TestCalculateStartAndEnd(t *testing.T) {
+	// These test cases are mocking behavior from Prometheus v3+
+	prometheusVersion = "3.0.0"
 
 	testCases := map[string]struct {
-		resolution    time.Duration
+		resolution    time.Duration   // User defined config when querying Prometheus
+		window        opencost.Window // User defined config when querying Allocations/Assets
 		expectedStart time.Time
 		expectedEnd   time.Time
 		result        *prom.QueryResult
 	}{
+		// Example: avg(node_total_hourly_cost{}) by (node, provider_id)[1h:1h]
 		"1 hour resolution, 1 hour window": {
 			resolution:    time.Hour,
+			window:        opencost.NewClosedWindow(windowStart, windowStart.Add(time.Hour)),
 			expectedStart: windowStart,
 			expectedEnd:   windowStart.Add(time.Hour),
 			result: &prom.QueryResult{
 				Values: []*util.Vector{
-					{
-						Timestamp: startFloat,
-					},
 					{
 						Timestamp: startFloat + (minute * 60),
 					},
 				},
 			},
 		},
+		// Example: avg(node_total_hourly_cost{}) by (node, provider_id)[1h:30m]
 		"30 minute resolution, 1 hour window": {
 			resolution:    time.Minute * 30,
+			window:        opencost.NewClosedWindow(windowStart, windowStart.Add(time.Hour)),
 			expectedStart: windowStart,
 			expectedEnd:   windowStart.Add(time.Hour),
 			result: &prom.QueryResult{
 				Values: []*util.Vector{
-					{
-						Timestamp: startFloat,
-					},
 					{
 						Timestamp: startFloat + (minute * 30),
 					},
@@ -495,36 +485,83 @@ func TestCalculateStartAndEnd(t *testing.T) {
 				},
 			},
 		},
+		// Example: avg(node_total_hourly_cost{}) by (node, provider_id)[45m:15m]
 		"15 minute resolution, 45 minute window": {
 			resolution:    time.Minute * 15,
+			window:        opencost.NewClosedWindow(windowStart, windowStart.Add(time.Minute*45)),
 			expectedStart: windowStart,
 			expectedEnd:   windowStart.Add(time.Minute * 45),
 			result: &prom.QueryResult{
 				Values: []*util.Vector{
 					{
-						Timestamp: startFloat + (minute * 0),
+						Timestamp: startFloat + (minute * 15),
+					},
+					{
+						Timestamp: startFloat + (minute * 30),
+					},
+					{
+						Timestamp: startFloat + (minute * 45),
+					},
+				},
+			},
+		},
+		// Example: avg(node_total_hourly_cost{}) by (node, provider_id)[30m:5m]
+		"5 minute resolution, 30 minute window": {
+			resolution:    time.Minute * 5,
+			window:        opencost.NewClosedWindow(windowStart, windowStart.Add(time.Minute*30)),
+			expectedStart: windowStart,
+			expectedEnd:   windowStart.Add(time.Minute * 30),
+			result: &prom.QueryResult{
+				Values: []*util.Vector{
+					{
+						Timestamp: startFloat + (minute * 5),
+					},
+					{
+						Timestamp: startFloat + (minute * 10),
 					},
 					{
 						Timestamp: startFloat + (minute * 15),
 					},
+					{
+						Timestamp: startFloat + (minute * 20),
+					},
+					{
+						Timestamp: startFloat + (minute * 25),
+					},
 					{
 						Timestamp: startFloat + (minute * 30),
 					},
+				},
+			},
+		},
+		// Example: avg(node_total_hourly_cost{}) by (node, provider_id)[30m:5m]
+		"5 minute resolution, 30 minute window, partial data": {
+			resolution:    time.Minute * 5,
+			window:        opencost.NewClosedWindow(windowStart, windowStart.Add(time.Minute*30)),
+			expectedStart: windowStart.Add(time.Minute * 5),
+			expectedEnd:   windowStart.Add(time.Minute * 20),
+			result: &prom.QueryResult{
+				Values: []*util.Vector{
 					{
-						Timestamp: startFloat + (minute * 45),
+						Timestamp: startFloat + (minute * 10),
+					},
+					{
+						Timestamp: startFloat + (minute * 15),
+					},
+					{
+						Timestamp: startFloat + (minute * 20),
 					},
 				},
 			},
 		},
+		// Example: avg(node_total_hourly_cost{}) by (node, provider_id)[5m:1m]
 		"1 minute resolution, 5 minute window": {
 			resolution:    time.Minute,
+			window:        opencost.NewClosedWindow(windowStart.Add(time.Minute*15), windowStart.Add(time.Minute*20)),
 			expectedStart: windowStart.Add(time.Minute * 15),
 			expectedEnd:   windowStart.Add(time.Minute * 20),
 			result: &prom.QueryResult{
 				Values: []*util.Vector{
-					{
-						Timestamp: startFloat + (minute * 15),
-					},
 					{
 						Timestamp: startFloat + (minute * 16),
 					},
@@ -543,26 +580,47 @@ func TestCalculateStartAndEnd(t *testing.T) {
 				},
 			},
 		},
+		// Example: avg(node_total_hourly_cost{}) by (node, provider_id)[5m:1m]
+		"1 minute resolution, 5 minute window, partial data": {
+			resolution:    time.Minute,
+			window:        opencost.NewClosedWindow(windowStart.Add(time.Minute*15), windowStart.Add(time.Minute*20)),
+			expectedStart: windowStart.Add(time.Minute * 18),
+			expectedEnd:   windowStart.Add(time.Minute * 20),
+			result: &prom.QueryResult{
+				Values: []*util.Vector{
+					{
+						Timestamp: startFloat + (minute * 19),
+					},
+					{
+						Timestamp: startFloat + (minute * 20),
+					},
+				},
+			},
+		},
+		// Example: avg(node_total_hourly_cost{}) by (node, provider_id)[1m:1m]
 		"1 minute resolution, 1 minute window": {
 			resolution:    time.Minute,
+			window:        opencost.NewClosedWindow(windowStart.Add(time.Minute*14).Add(time.Second*30), windowStart.Add(time.Minute*15).Add(time.Second*30)),
 			expectedStart: windowStart.Add(time.Minute * 14).Add(time.Second * 30),
 			expectedEnd:   windowStart.Add(time.Minute * 15).Add(time.Second * 30),
 			result: &prom.QueryResult{
 				Values: []*util.Vector{
 					{
-						Timestamp: startFloat + (minute * 15),
+						Timestamp: startFloat + (minute * 15) + (second * 30),
 					},
 				},
 			},
 		},
+		// Example: avg(node_total_hourly_cost{}) by (node, provider_id)[1m:1m]
 		"1 minute resolution, 1 minute window, at window start": {
 			resolution:    time.Minute,
+			window:        opencost.NewClosedWindow(windowStart, windowStart.Add(time.Second*30)),
 			expectedStart: windowStart,
 			expectedEnd:   windowStart.Add(time.Second * 30),
 			result: &prom.QueryResult{
 				Values: []*util.Vector{
 					{
-						Timestamp: startFloat,
+						Timestamp: startFloat + (second * 30),
 					},
 				},
 			},
@@ -571,7 +629,7 @@ func TestCalculateStartAndEnd(t *testing.T) {
 
 	for name, testCase := range testCases {
 		t.Run(name, func(t *testing.T) {
-			start, end := calculateStartAndEnd(testCase.result, testCase.resolution, window)
+			start, end := calculateStartAndEnd(testCase.result, testCase.resolution, testCase.window)
 			if !start.Equal(testCase.expectedStart) {
 				t.Errorf("start does not match: expected %v; got %v", testCase.expectedStart, start)
 			}

+ 4 - 5
pkg/costmodel/cluster.go

@@ -457,13 +457,13 @@ func ClusterDisks(client prometheus.Client, cp models.Provider, start, end time.
 
 		name, err := result.GetString("node")
 		if err != nil {
-			log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
+			log.DedupedWarningf(3, "ClusterDisks: local active mins data missing 'node' label")
 			continue
 		}
 
 		providerID, err := result.GetString("provider_id")
 		if err != nil {
-			log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
+			log.DedupedWarningf(3, "ClusterDisks: local active mins data missing 'provider_id' label")
 			continue
 		}
 
@@ -879,8 +879,7 @@ func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[L
 		}
 
 		// Append start, end, and minutes. This should come before all other data.
-		s := time.Unix(int64(result.Values[0].Timestamp), 0)
-		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		s, e := calculateStartAndEnd(result, resolution, opencost.NewClosedWindow(start, end))
 		loadBalancerMap[key].Start = s
 		loadBalancerMap[key].End = e
 		loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
@@ -1297,7 +1296,7 @@ type Totals struct {
 
 func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
 	if len(qrs) == 0 {
-		return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
+		return [][]string{}, fmt.Errorf("not enough data available in the selected time range")
 	}
 
 	result := qrs[0]

+ 0 - 1
pkg/costmodel/cluster_helpers.go

@@ -531,7 +531,6 @@ func buildActiveDataMap(resActiveMins []*prom.QueryResult, resolution time.Durat
 		s, e := calculateStartAndEnd(result, resolution, window)
 		mins := e.Sub(s).Minutes()
 
-		// TODO niko/assets if mins >= threshold, interpolate for missing data?
 		m[key] = activeData{
 			start:   s,
 			end:     e,

+ 19 - 0
pkg/costmodel/promparsers.go

@@ -3,6 +3,8 @@ package costmodel
 import (
 	"errors"
 	"fmt"
+	"strconv"
+	"strings"
 	"time"
 
 	"github.com/opencost/opencost/core/pkg/log"
@@ -13,6 +15,23 @@ import (
 	"github.com/opencost/opencost/pkg/prom"
 )
 
+var (
+	// prometheusVersion stores the Prometheus server version (major.minor.patch).
+	// Defaults to "0.0.0" if version cannot be retrieved
+	prometheusVersion = "0.0.0"
+)
+
+// IsPrometheusVersionGTE3 returns true if the Prometheus server's major version
+// is 3 or higher.
+func IsPrometheusVersionGTE3() bool {
+	if v := strings.Split(prometheusVersion, "."); len(v) > 0 {
+		if major, err := strconv.Atoi(v[0]); err == nil && major >= 3 {
+			return true
+		}
+	}
+	return false
+}
+
 func GetPVInfoLocal(cache clustercache.ClusterCache, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
 	toReturn := make(map[string]*PersistentVolumeClaimData)
 

+ 2 - 1
pkg/costmodel/router.go

@@ -1223,11 +1223,12 @@ func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.
 	}
 
 	api := prometheusAPI.NewAPI(promCli)
-	_, err = api.Buildinfo(context.Background())
+	result, err := api.Buildinfo(context.Background())
 	if err != nil {
 		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
 	} else {
 		log.Infof("Retrieved a prometheus config file from: %s", address)
+		prometheusVersion = result.Version
 	}
 
 	if scrapeInterval == 0 {