Procházet zdrojové kódy

Merge branch 'develop' into Rajpratik71-patch-1

Pratik Raj před 3 roky
rodič
revize
047bab3c3e

+ 35 - 35
pkg/costmodel/aggregation.go

@@ -1012,38 +1012,38 @@ func compressVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Ve
 }
 
 type AggregateQueryOpts struct {
-	Rate                  string
-	Filters               map[string]string
-	SharedResources       *SharedResourceInfo
-	ShareSplit            string
-	AllocateIdle          bool
-	IncludeTimeSeries     bool
-	IncludeEfficiency     bool
-	DisableCache          bool
-	ClearCache            bool
-	NoCache               bool
-	NoExpireCache         bool
-	RemoteEnabled         bool
-	DisableSharedOverhead bool
-	UseETLAdapter         bool
+	Rate                           string
+	Filters                        map[string]string
+	SharedResources                *SharedResourceInfo
+	ShareSplit                     string
+	AllocateIdle                   bool
+	IncludeTimeSeries              bool
+	IncludeEfficiency              bool
+	DisableAggregateCostModelCache bool
+	ClearCache                     bool
+	NoCache                        bool
+	NoExpireCache                  bool
+	RemoteEnabled                  bool
+	DisableSharedOverhead          bool
+	UseETLAdapter                  bool
 }
 
 func DefaultAggregateQueryOpts() *AggregateQueryOpts {
 	return &AggregateQueryOpts{
-		Rate:                  "",
-		Filters:               map[string]string{},
-		SharedResources:       nil,
-		ShareSplit:            SplitTypeWeighted,
-		AllocateIdle:          false,
-		IncludeTimeSeries:     true,
-		IncludeEfficiency:     true,
-		DisableCache:          false,
-		ClearCache:            false,
-		NoCache:               false,
-		NoExpireCache:         false,
-		RemoteEnabled:         env.IsRemoteEnabled(),
-		DisableSharedOverhead: false,
-		UseETLAdapter:         false,
+		Rate:                           "",
+		Filters:                        map[string]string{},
+		SharedResources:                nil,
+		ShareSplit:                     SplitTypeWeighted,
+		AllocateIdle:                   false,
+		IncludeTimeSeries:              true,
+		IncludeEfficiency:              true,
+		DisableAggregateCostModelCache: env.IsAggregateCostModelCacheDisabled(),
+		ClearCache:                     false,
+		NoCache:                        false,
+		NoExpireCache:                  false,
+		RemoteEnabled:                  env.IsRemoteEnabled(),
+		DisableSharedOverhead:          false,
+		UseETLAdapter:                  false,
 	}
 }
 
@@ -1095,7 +1095,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 	allocateIdle := opts.AllocateIdle
 	includeTimeSeries := opts.IncludeTimeSeries
 	includeEfficiency := opts.IncludeEfficiency
-	disableCache := opts.DisableCache
+	disableAggregateCostModelCache := opts.DisableAggregateCostModelCache
 	clearCache := opts.ClearCache
 	noCache := opts.NoCache
 	noExpireCache := opts.NoExpireCache
@@ -1377,7 +1377,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 	cacheMessage := fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s L2 cache miss: %s", aggKey, key)
 
 	// check the cache for aggregated response; if cache is hit and not disabled, return response
-	if value, found := a.AggregateCache.Get(aggKey); found && !disableCache && !noCache {
+	if value, found := a.AggregateCache.Get(aggKey); found && !disableAggregateCostModelCache && !noCache {
 		result, ok := value.(map[string]*Aggregation)
 		if !ok {
 			// disable cache and recompute if type cast fails
@@ -1393,14 +1393,14 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 		window.Set(&start, window.End())
 	} else {
 		// don't cache requests for durations of less than one hour
-		disableCache = true
+		disableAggregateCostModelCache = true
 	}
 
 	// attempt to retrieve cost data from cache
 	var costData map[string]*CostData
 	var err error
 	cacheData, found := a.CostDataCache.Get(key)
-	if found && !disableCache && !noCache {
+	if found && !disableAggregateCostModelCache && !noCache {
 		ok := false
 		costData, ok = cacheData.(map[string]*CostData)
 		cacheMessage = fmt.Sprintf("ComputeAggregateCostModel: L1 cache miss: %s, L2 cost data cache hit: %s", aggKey, key)
@@ -1408,7 +1408,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 			log.Errorf("ComputeAggregateCostModel: caching error: failed to cast cost data to struct: %s", key)
 		}
 	} else {
-		log.Infof("ComputeAggregateCostModel: missed cache: %s (found %t, disableCache %t, noCache %t)", key, found, disableCache, noCache)
+		log.Infof("ComputeAggregateCostModel: missed cache: %s (found %t, disableAggregateCostModelCache %t, noCache %t)", key, found, disableAggregateCostModelCache, noCache)
 
 		costData, err = a.Model.ComputeCostDataRange(promClient, a.CloudProvider, window, resolution, "", "", remoteEnabled)
 		if err != nil {
@@ -1761,7 +1761,7 @@ func (a *Accesses) warmAggregateCostModelCache() {
 		aggOpts.Filters = map[string]string{}
 		aggOpts.IncludeTimeSeries = false
 		aggOpts.IncludeEfficiency = true
-		aggOpts.DisableCache = true
+		aggOpts.DisableAggregateCostModelCache = true
 		aggOpts.ClearCache = false
 		aggOpts.NoCache = false
 		aggOpts.NoExpireCache = false
@@ -1990,7 +1990,7 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 	// TODO niko/caching rename "recomputeCache"
 	// disableCache, if set to "true", tells this function to recompute and
 	// cache the requested data
-	opts.DisableCache = r.URL.Query().Get("disableCache") == "true"
+	opts.DisableAggregateCostModelCache = r.URL.Query().Get("disableCache") == "true"
 
 	// clearCache, if set to "true", tells this function to flush the cache,
 	// then recompute and cache the requested data

+ 7 - 0
pkg/env/costmodelenv.go

@@ -32,6 +32,7 @@ const (
 	CSVPathEnvVar                  = "CSV_PATH"
 	ConfigPathEnvVar               = "CONFIG_PATH"
 	CloudProviderAPIKeyEnvVar      = "CLOUD_PROVIDER_API_KEY"
+	DisableAggregateCostModelCache = "DISABLE_AGGREGATE_COST_MODEL_CACHE"
 
 	EmitPodAnnotationsMetricEnvVar       = "EMIT_POD_ANNOTATIONS_METRIC"
 	EmitNamespaceAnnotationsMetricEnvVar = "EMIT_NAMESPACE_ANNOTATIONS_METRIC"
@@ -239,6 +240,12 @@ func GetInsecureSkipVerify() bool {
 	return GetBool(InsecureSkipVerify, false)
 }
 
+// IsAggregateCostModelCacheDisabled returns the environment variable value for DisableAggregateCostModelCache which
+// will inform the aggregator on whether to load cached data. Defaults to false
+func IsAggregateCostModelCacheDisabled() bool {
+	return GetBool(DisableAggregateCostModelCache, false)
+}
+
 // IsRemoteEnabled returns the environment variable value for RemoteEnabledEnvVar which represents whether
 // or not remote write is enabled for prometheus for use with SQL backed persistent storage.
 func IsRemoteEnabled() bool {

+ 43 - 0
pkg/env/costmodelenv_test.go

@@ -0,0 +1,43 @@
+package env
+
+import (
+	"os"
+	"testing"
+)
+
+func TestIsCacheDisabled(t *testing.T) {
+	tests := []struct {
+		name string
+		want bool
+		pre  func()
+	}{
+		{
+			name: "Ensure the default value is false",
+			want: false,
+		},
+		{
+			name: "Ensure the value is false when DISABLE_AGGREGATE_COST_MODEL_CACHE is set to false",
+			want: false,
+			pre: func() {
+				os.Setenv("DISABLE_AGGREGATE_COST_MODEL_CACHE", "false")
+			},
+		},
+		{
+			name: "Ensure the value is true when DISABLE_AGGREGATE_COST_MODEL_CACHE is set to true",
+			want: true,
+			pre: func() {
+				os.Setenv("DISABLE_AGGREGATE_COST_MODEL_CACHE", "true")
+			},
+		},
+	}
+	for _, tt := range tests {
+		if tt.pre != nil {
+			tt.pre()
+		}
+		t.Run(tt.name, func(t *testing.T) {
+			if got := IsAggregateCostModelCacheDisabled(); got != tt.want {
+				t.Errorf("IsAggregateCostModelCacheDisabled() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}

+ 23 - 0
pkg/prom/query.go

@@ -258,9 +258,21 @@ func (ctx *Context) query(query string, t time.Time) (interface{}, v1.Warnings,
 	return toReturn, warnings, nil
 }
 
+// isRequestStepAligned will check if the start and end times are aligned with the step
+func (ctx *Context) isRequestStepAligned(start, end time.Time, step time.Duration) bool {
+	startInUnix := start.Unix()
+	endInUnix := end.Unix()
+	stepInSeconds := step.Milliseconds() / 1e3
+	return startInUnix%stepInSeconds == 0 && endInUnix%stepInSeconds == 0
+}
+
 func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Duration) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
+	if !ctx.isRequestStepAligned(start, end, step) {
+		start, end = ctx.alignWindow(start, end, step)
+	}
+
 	go runQueryRange(query, start, end, step, ctx, resCh, "")
 
 	return resCh
@@ -357,6 +369,7 @@ func (ctx *Context) RawQueryRange(query string, start, end time.Time, step time.
 
 func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, v1.Warnings, error) {
 	body, err := ctx.RawQueryRange(query, start, end, step)
+
 	if err != nil {
 		return nil, nil, err
 	}
@@ -382,6 +395,16 @@ func (ctx *Context) queryRange(query string, start, end time.Time, step time.Dur
 	return toReturn, warnings, nil
 }
 
+// alignWindow will update the start and end times to be aligned with the step duration.
+// Current implementation will always floor the start/end times
+func (ctx *Context) alignWindow(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) {
+	// Convert the step duration from Milliseconds to Seconds to match the Unix timestamp, which is in seconds
+	stepInSeconds := step.Milliseconds() / 1e3
+	alignedStart := (start.Unix() / stepInSeconds) * stepInSeconds
+	alignedEnd := (end.Unix() / stepInSeconds) * stepInSeconds
+	return time.Unix(alignedStart, 0).UTC(), time.Unix(alignedEnd, 0).UTC()
+}
+
 // Extracts the warnings from the resulting json if they exist (part of the prometheus response api).
 func warningsFrom(result interface{}) v1.Warnings {
 	var warnings v1.Warnings

+ 159 - 1
pkg/prom/query_test.go

@@ -1,6 +1,11 @@
 package prom
 
-import "testing"
+import (
+	"github.com/prometheus/client_golang/api"
+	"reflect"
+	"testing"
+	"time"
+)
 
 func TestWarningsFrom(t *testing.T) {
 	var results interface{}
@@ -25,3 +30,156 @@ func TestWarningsFrom(t *testing.T) {
 		t.Errorf("Unexpected second warning: %s", warnings[1])
 	}
 }
+
+func TestContext_isRequestStepAligned(t *testing.T) {
+	type fields struct {
+		Client         api.Client
+		name           string
+		errorCollector *QueryErrorCollector
+	}
+	type args struct {
+		start time.Time
+		end   time.Time
+		step  time.Duration
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		args   args
+		want   bool
+	}{
+		{
+			name:   "Test with times that are not step aligned to the hour",
+			fields: fields{},
+			args: args{
+				start: time.Date(2022, 11, 7, 4, 59, 30, 0, time.UTC),
+				end:   time.Date(2022, 11, 8, 4, 59, 30, 0, time.UTC),
+				step:  time.Hour,
+			},
+			want: false,
+		},
+		{
+			name:   "Test with times that are step aligned to the hour",
+			fields: fields{},
+			args: args{
+				start: time.Date(2022, 11, 7, 4, 0, 0, 0, time.UTC),
+				end:   time.Date(2022, 11, 8, 4, 0, 0, 0, time.UTC),
+				step:  time.Hour,
+			},
+			want: true,
+		},
+		{
+			name:   "Test with times where start is aligned to the hour but end is not",
+			fields: fields{},
+			args: args{
+				start: time.Date(2022, 11, 7, 4, 0, 0, 0, time.UTC),
+				end:   time.Date(2022, 11, 8, 4, 59, 0, 0, time.UTC),
+				step:  time.Hour,
+			},
+			want: false,
+		},
+		{
+			name:   "Test with times where end is aligned to the hour but start is not",
+			fields: fields{},
+			args: args{
+				start: time.Date(2022, 11, 7, 4, 59, 0, 0, time.UTC),
+				end:   time.Date(2022, 11, 8, 4, 0, 0, 0, time.UTC),
+				step:  time.Hour,
+			},
+			want: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ctx := &Context{
+				Client:         tt.fields.Client,
+				name:           tt.fields.name,
+				errorCollector: tt.fields.errorCollector,
+			}
+			if got := ctx.isRequestStepAligned(tt.args.start, tt.args.end, tt.args.step); got != tt.want {
+				t.Errorf("isRequestStepAligned() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestContext_alignWindow(t *testing.T) {
+	type fields struct {
+		Client         api.Client
+		name           string
+		errorCollector *QueryErrorCollector
+	}
+	type args struct {
+		start time.Time
+		end   time.Time
+		step  time.Duration
+	}
+	tests := []struct {
+		name      string
+		fields    fields
+		args      args
+		wantStart time.Time
+		wantEnd   time.Time
+	}{
+		{
+			name:   "Do not update the start and end when step-aligned",
+			fields: fields{},
+			args: args{
+				start: time.Date(2022, 11, 7, 4, 0, 0, 0, time.UTC),
+				end:   time.Date(2022, 11, 8, 4, 0, 0, 0, time.UTC),
+				step:  time.Hour,
+			},
+			wantStart: time.Date(2022, 11, 7, 4, 0, 0, 0, time.UTC),
+			wantEnd:   time.Date(2022, 11, 8, 4, 0, 0, 0, time.UTC),
+		},
+		{
+			name:   "Update start to be step-aligned and leave end the same",
+			fields: fields{},
+			args: args{
+				start: time.Date(2022, 11, 7, 4, 59, 0, 0, time.UTC),
+				end:   time.Date(2022, 11, 8, 4, 0, 0, 0, time.UTC),
+				step:  time.Hour,
+			},
+			wantStart: time.Date(2022, 11, 7, 4, 0, 0, 0, time.UTC),
+			wantEnd:   time.Date(2022, 11, 8, 4, 0, 0, 0, time.UTC),
+		},
+		{
+			name:   "Update end to be step-aligned and leave start the same",
+			fields: fields{},
+			args: args{
+				start: time.Date(2022, 11, 7, 4, 0, 0, 0, time.UTC),
+				end:   time.Date(2022, 11, 8, 4, 59, 0, 0, time.UTC),
+				step:  time.Hour,
+			},
+			wantStart: time.Date(2022, 11, 7, 4, 0, 0, 0, time.UTC),
+			wantEnd:   time.Date(2022, 11, 8, 4, 0, 0, 0, time.UTC),
+		},
+		{
+			name:   "Update start and end to be step-aligned",
+			fields: fields{},
+			args: args{
+				start: time.Date(2022, 11, 7, 4, 59, 0, 0, time.UTC),
+				end:   time.Date(2022, 11, 8, 4, 59, 0, 0, time.UTC),
+				step:  time.Hour,
+			},
+			wantStart: time.Date(2022, 11, 7, 4, 0, 0, 0, time.UTC),
+			wantEnd:   time.Date(2022, 11, 8, 4, 0, 0, 0, time.UTC),
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ctx := &Context{
+				Client:         tt.fields.Client,
+				name:           tt.fields.name,
+				errorCollector: tt.fields.errorCollector,
+			}
+			got, got1 := ctx.alignWindow(tt.args.start, tt.args.end, tt.args.step)
+			if !reflect.DeepEqual(got, tt.wantStart) {
+				t.Errorf("alignWindow() got = %v, want %v", got, tt.wantStart)
+			}
+			if !reflect.DeepEqual(got1, tt.wantEnd) {
+				t.Errorf("alignWindow() got1 = %v, want %v", got1, tt.wantEnd)
+			}
+		})
+	}
+}