Procházet zdrojové kódy

Add version detection and duration adjustment for resolution queries.

Matt Bolt před 1 rokem
rodič
revize
cc6b357436

+ 1 - 0
go.mod

@@ -60,6 +60,7 @@ require (
 )
 
 require (
+	github.com/Masterminds/semver v1.5.0 // indirect
 	github.com/fxamacker/cbor/v2 v2.7.0 // indirect
 	github.com/go-ini/ini v1.67.0 // indirect
 	github.com/gofrs/flock v0.8.1 // indirect

+ 2 - 0
go.sum

@@ -100,6 +100,8 @@ github.com/AzureAD/microsoft-authentication-library-for-go v1.3.3 h1:H5xDQaE3Xow
 github.com/AzureAD/microsoft-authentication-library-for-go v1.3.3/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
+github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
+github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
 github.com/aliyun/alibaba-cloud-sdk-go v1.62.3 h1:kWY5c/9JOhSYBogi3mtNG7G9TxXS0CddtQ6RKOI3mvY=
 github.com/aliyun/alibaba-cloud-sdk-go v1.62.3/go.mod h1:Api2AkmMgGaSUAhmk76oaFObkoeCPc/bKAqcyplPODs=
 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=

+ 1 - 0
modules/prometheus-source/go.mod

@@ -8,6 +8,7 @@ replace (
 )
 
 require (
+	github.com/Masterminds/semver v1.5.0
 	github.com/julienschmidt/httprouter v1.3.0
 	github.com/opencost/opencost/core v0.0.0-20241211165149-ee44b80e2fd0
 	github.com/prometheus/client_golang v1.20.5

+ 2 - 0
modules/prometheus-source/go.sum

@@ -39,6 +39,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
+github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
+github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=

+ 17 - 11
modules/prometheus-source/pkg/prom/config.go

@@ -19,6 +19,8 @@ const (
 
 type OpenCostPrometheusConfig struct {
 	ServerEndpoint        string
+	Version               string
+	IsOffsetResolution    bool
 	ClientConfig          *PrometheusClientConfig
 	ScrapeInterval        time.Duration
 	JobName               string
@@ -117,6 +119,8 @@ func NewOpenCostPrometheusConfigFromEnv() (*OpenCostPrometheusConfig, error) {
 
 	return &OpenCostPrometheusConfig{
 		ServerEndpoint:        serverEndpoint,
+		Version:               "0.0.0",
+		IsOffsetResolution:    false,
 		ClientConfig:          clientConfig,
 		ScrapeInterval:        scrapeInterval,
 		JobName:               jobName,
@@ -188,17 +192,19 @@ func NewOpenCostThanosConfigFromEnv() (*OpenCostThanosConfig, error) {
 
 	return &OpenCostThanosConfig{
 		OpenCostPrometheusConfig: &OpenCostPrometheusConfig{
-			ServerEndpoint:   serverEndpoint,
-			ClientConfig:     clientConfig,
-			ScrapeInterval:   scrapeInterval,
-			JobName:          jobName,
-			Offset:           thanosQueryOffset,
-			QueryOffset:      d,
-			MaxQueryDuration: maxQueryDuration,
-			ClusterID:        "", // thanos is multi-cluster
-			ClusterFilter:    "", // thanos is multi-cluster
-			ClusterLabel:     clusterLabel,
-			DataResolution:   dataResolution,
+			ServerEndpoint:     serverEndpoint,
+			Version:            "0.0.0",
+			IsOffsetResolution: false,
+			ClientConfig:       clientConfig,
+			ScrapeInterval:     scrapeInterval,
+			JobName:            jobName,
+			Offset:             thanosQueryOffset,
+			QueryOffset:        d,
+			MaxQueryDuration:   maxQueryDuration,
+			ClusterID:          "", // thanos is multi-cluster
+			ClusterFilter:      "", // thanos is multi-cluster
+			ClusterLabel:       clusterLabel,
+			DataResolution:     dataResolution,
 		},
 		MaxSourceResulution: env.GetThanosMaxSourceResolution(),
 	}, nil

+ 13 - 1
modules/prometheus-source/pkg/prom/datasource.go

@@ -7,6 +7,7 @@ import (
 	"strconv"
 	"time"
 
+	"github.com/Masterminds/semver"
 	"github.com/julienschmidt/httprouter"
 	"github.com/opencost/opencost/modules/prometheus-source/pkg/env"
 
@@ -132,11 +133,22 @@ func NewPrometheusDataSource(infoProvider clusters.ClusterInfoProvider, promConf
 
 	// we don't consider this a fatal error, but we log for visibility
 	api := prometheusAPI.NewAPI(promClient)
-	_, err = api.Buildinfo(context.Background())
+	bi, err := api.Buildinfo(context.Background())
+
 	if err != nil {
 		log.Infof("No valid prometheus config file at %s. Error: %s.\nTroubleshooting help available at: %s.\n**Ignore if using cortex/mimir/thanos here**", promConfig.ServerEndpoint, err.Error(), PrometheusTroubleshootingURL)
 	} else {
 		log.Infof("Retrieved a prometheus config file from: %s", promConfig.ServerEndpoint)
+		promConfig.Version = bi.Version
+
+		// for versions of prometheus >= 3.0.0, we need to offset the resolution for range queries
+		// due to a breaking change in prometheus lookback and range query alignment
+		v, err := semver.NewVersion(promConfig.Version)
+		if err != nil {
+			log.Warnf("Failed to parse prometheus version %s. Error: %s", promConfig.Version, err.Error())
+		} else {
+			promConfig.IsOffsetResolution = v.Major() >= 3
+		}
 	}
 
 	// Fix scrape interval if zero by attempting to lookup the interval for the configured job

+ 24 - 11
modules/prometheus-source/pkg/prom/metricsquerier.go

@@ -115,7 +115,7 @@ func (pds *PrometheusMetricsQuerier) QueryPVActiveMinutes(start, end time.Time)
 	cfg := pds.promConfig
 	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryPVActiveMinutes")
 	}
@@ -134,7 +134,7 @@ func (pds *PrometheusMetricsQuerier) QueryLocalStorageCost(start, end time.Time)
 	cfg := pds.promConfig
 	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryLocalStorageCost")
 	}
@@ -159,7 +159,7 @@ func (pds *PrometheusMetricsQuerier) QueryLocalStorageUsedCost(start, end time.T
 	cfg := pds.promConfig
 	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryLocalStorageUsedCost")
 	}
@@ -219,7 +219,7 @@ func (pds *PrometheusMetricsQuerier) QueryLocalStorageBytes(start, end time.Time
 	cfg := pds.promConfig
 	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryLocalStorageBytes")
 	}
@@ -238,7 +238,7 @@ func (pds *PrometheusMetricsQuerier) QueryLocalStorageActiveMinutes(start, end t
 	cfg := pds.promConfig
 	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryLocalStorageActiveMinutes")
 	}
@@ -357,7 +357,7 @@ func (pds *PrometheusMetricsQuerier) QueryNodeActiveMinutes(start, end time.Time
 	cfg := pds.promConfig
 	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNodeActiveMinutes")
 	}
@@ -375,7 +375,7 @@ func (pds *PrometheusMetricsQuerier) QueryNodeCPUModeTotal(start, end time.Time)
 	cfg := pds.promConfig
 	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNodeCPUModeTotal")
 	}
@@ -392,7 +392,7 @@ func (pds *PrometheusMetricsQuerier) QueryNodeRAMSystemPercent(start, end time.T
 	cfg := pds.promConfig
 	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNodeRAMSystemPercent")
 	}
@@ -410,7 +410,7 @@ func (pds *PrometheusMetricsQuerier) QueryNodeRAMUserPercent(start, end time.Tim
 	cfg := pds.promConfig
 	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryNodeRAMUserPercent")
 	}
@@ -443,7 +443,7 @@ func (pds *PrometheusMetricsQuerier) QueryLBActiveMinutes(start, end time.Time)
 	cfg := pds.promConfig
 	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryLBActiveMinutes")
 	}
@@ -459,7 +459,7 @@ func (pds *PrometheusMetricsQuerier) QueryClusterManagementDuration(start, end t
 	cfg := pds.promConfig
 	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryClusterManagementDuration")
 	}
@@ -1368,6 +1368,19 @@ func (pds *PrometheusMetricsQuerier) QueryDataCoverage(limitDays int) (time.Time
 	return oldest, newest, nil
 }
 
+func (pds *PrometheusMetricsQuerier) durationStringFor(start, end time.Time, minsPerResolution int) string {
+	dur := end.Sub(start)
+
+	// if using a version of prometheus where the resolution needs duration offset,
+	// we need to apply that here
+	if pds.promConfig.IsOffsetResolution {
+		// increase the query time by the resolution
+		dur = dur + (time.Duration(minsPerResolution) * time.Minute)
+	}
+
+	return timeutil.DurationString(dur)
+}
+
 func newEmptyResult[T any](decoder source.ResultDecoder[T]) *source.Future[T] {
 	ch := make(source.QueryResultsChan)
 	go func() {