Explorar o código

Sth/kcm 4335 (#3243)

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb hai 9 meses
pai
achega
050ce5a643

+ 23 - 19
modules/collector-source/pkg/metric/aggregator/increase.go

@@ -6,13 +6,13 @@ import (
 )
 
 type increaseAggregator struct {
-	lock        sync.Mutex
-	labelValues []string
-	initialized bool
-	initialTime time.Time
-	currentTime time.Time
-	initial     float64
-	current     float64
+	lock         sync.Mutex
+	labelValues  []string
+	currentTime  time.Time
+	previousTime time.Time
+	previous     float64
+	current      float64
+	increase     float64
 }
 
 func Increase(labelValues []string) MetricAggregator {
@@ -21,6 +21,16 @@ func Increase(labelValues []string) MetricAggregator {
 	}
 }
 
+// getIncrease returns the current increase without updating the state
+func (a *increaseAggregator) getIncrease() float64 {
+	increase := a.increase
+	// ignore decreases and do not return increase if only one sample has been recorded
+	if a.previous < a.current && !a.previousTime.IsZero() {
+		increase += a.current - a.previous
+	}
+	return increase
+}
+
 func (a *increaseAggregator) AdditionInfo() map[string]string {
 	return nil
 }
@@ -32,20 +42,14 @@ func (a *increaseAggregator) LabelValues() []string {
 func (a *increaseAggregator) Update(value float64, timestamp time.Time, additionalInfo map[string]string) {
 	a.lock.Lock()
 	defer a.lock.Unlock()
-	if !a.initialized {
-		a.initialTime = timestamp
-		a.currentTime = timestamp
-		a.initialized = true
-	}
-	if a.initialTime == timestamp {
-		a.initial += value
-	}
-
-	if a.currentTime.Before(timestamp) {
+	if timestamp.After(a.currentTime) {
+		// update state and reset current
+		a.increase = a.getIncrease()
+		a.previousTime = a.currentTime
 		a.currentTime = timestamp
+		a.previous = a.current
 		a.current = 0
 	}
-
 	a.current += value
 }
 
@@ -53,6 +57,6 @@ func (a *increaseAggregator) Value() []MetricValue {
 	a.lock.Lock()
 	defer a.lock.Unlock()
 	return []MetricValue{
-		{Value: a.current - a.initial},
+		{Value: a.getIncrease()},
 	}
 }

+ 29 - 2
modules/collector-source/pkg/metric/aggregator/increase_test.go

@@ -7,8 +7,10 @@ import (
 )
 
 func TestIncreaseAggregator_Value(t *testing.T) {
-	time1 := time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC)
-	time2 := time.Date(1, 1, 1, 0, 15, 0, 0, time.UTC)
+	time1 := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
+	time2 := time.Date(2025, 1, 1, 0, 15, 0, 0, time.UTC)
+	time3 := time.Date(2025, 1, 1, 0, 30, 0, 0, time.UTC)
+	time4 := time.Date(2025, 1, 1, 0, 45, 0, 0, time.UTC)
 	type update struct {
 		value                 float64
 		timestamp             time.Time
@@ -81,6 +83,31 @@ func TestIncreaseAggregator_Value(t *testing.T) {
 				},
 			},
 		},
+		"set restart": {
+			updates: []update{
+				{
+					value:     3,
+					timestamp: time1,
+				},
+				{
+					value:     4,
+					timestamp: time2,
+				},
+				{
+					value:     1,
+					timestamp: time3,
+				},
+				{
+					value:     2,
+					timestamp: time4,
+				},
+			},
+			want: []MetricValue{
+				{
+					Value: 2,
+				},
+			},
+		},
 	}
 	for name, tt := range tests {
 		t.Run(name, func(t *testing.T) {

+ 25 - 0
modules/collector-source/pkg/metric/aggregator/iratemax_test.go

@@ -125,6 +125,31 @@ func TestIRateMaxAggregator_Value(t *testing.T) {
 				},
 			},
 		},
+		"set restart": {
+			updates: []update{
+				{
+					value:     3,
+					timestamp: time1,
+				},
+				{
+					value:     4,
+					timestamp: time2,
+				},
+				{
+					value:     1,
+					timestamp: time3,
+				},
+				{
+					value:     2,
+					timestamp: time4,
+				},
+			},
+			want: []MetricValue{
+				{
+					Value: 1,
+				},
+			},
+		},
 	}
 	for name, tt := range tests {
 		t.Run(name, func(t *testing.T) {

+ 32 - 21
modules/collector-source/pkg/metric/aggregator/rate.go

@@ -8,13 +8,14 @@ import (
 // rateAggregator is a MetricAggregator which returns the average rate per second change of the samples that it tracks.
 // to function properly calls to Update must have a timestamp greater than or equal to the last call to update.
 type rateAggregator struct {
-	lock        sync.Mutex
-	labelValues []string
-	initialized bool
-	initialTime time.Time
-	currentTime time.Time
-	initial     float64
-	current     float64
+	lock         sync.Mutex
+	labelValues  []string
+	previousTime time.Time
+	previous     float64
+	currentTime  time.Time
+	current      float64
+	runningAvg   float64
+	seconds      float64
 }
 
 func Rate(labelValues []string) MetricAggregator {
@@ -23,6 +24,22 @@ func Rate(labelValues []string) MetricAggregator {
 	}
 }
 
+// getRunningAvgSeconds returns the running average without updating the state
+func (a *rateAggregator) getRunningAvgSeconds() (float64, float64) {
+	runningAvg := a.runningAvg
+	seconds := a.seconds
+	// ignore decreases and base case where only one sample has been recorded
+	if a.previous < a.current && !a.previousTime.IsZero() {
+		currentSeconds := a.currentTime.Sub(a.previousTime).Seconds()
+		// ratio used to add the rate since the last recorded timestamp into the running average
+		weightingRatio := currentSeconds / (currentSeconds + seconds)
+		currentRate := (a.current - a.previous) / currentSeconds
+		runningAvg = (runningAvg * (1 - weightingRatio)) + (currentRate * weightingRatio)
+		seconds += currentSeconds
+	}
+	return runningAvg, seconds
+}
+
 func (a *rateAggregator) AdditionInfo() map[string]string {
 	return nil
 }
@@ -34,34 +51,28 @@ func (a *rateAggregator) LabelValues() []string {
 func (a *rateAggregator) Update(value float64, timestamp time.Time, additionalInfo map[string]string) {
 	a.lock.Lock()
 	defer a.lock.Unlock()
-	if !a.initialized {
-		a.initialTime = timestamp
-		a.currentTime = timestamp
-		a.initialized = true
-	}
-	if a.initialTime == timestamp {
-		a.initial += value
-	}
-
-	if a.currentTime.Before(timestamp) {
+	// If samples from a new timestamp finalize current values by moving them to previous
+	if timestamp.After(a.currentTime) {
+		// update state and reset current
+		a.runningAvg, a.seconds = a.getRunningAvgSeconds()
+		a.previous = a.current
+		a.previousTime = a.currentTime
 		a.currentTime = timestamp
 		a.current = 0
 	}
-
 	a.current += value
 }
 
 func (a *rateAggregator) Value() []MetricValue {
 	a.lock.Lock()
 	defer a.lock.Unlock()
-	seconds := a.currentTime.Sub(a.initialTime).Seconds()
+	average, seconds := a.getRunningAvgSeconds()
 	if seconds == 0 {
 		return []MetricValue{
 			{Value: 0},
 		}
 	}
-	increase := a.current - a.initial
 	return []MetricValue{
-		{Value: increase / seconds},
+		{Value: average},
 	}
 }

+ 29 - 3
modules/collector-source/pkg/metric/aggregator/rate_test.go

@@ -7,9 +7,10 @@ import (
 )
 
 func TestRateAggregator_Value(t *testing.T) {
-	time1 := time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC)
-	time2 := time.Date(1, 1, 1, 0, 0, 1, 0, time.UTC)
-	time3 := time.Date(1, 1, 1, 0, 0, 2, 0, time.UTC)
+	time1 := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
+	time2 := time.Date(2025, 1, 1, 0, 0, 1, 0, time.UTC)
+	time3 := time.Date(2025, 1, 1, 0, 0, 2, 0, time.UTC)
+	time4 := time.Date(2025, 1, 1, 0, 0, 3, 0, time.UTC)
 	type update struct {
 		value                 float64
 		timestamp             time.Time
@@ -103,6 +104,31 @@ func TestRateAggregator_Value(t *testing.T) {
 				},
 			},
 		},
+		"set restart": {
+			updates: []update{
+				{
+					value:     3,
+					timestamp: time1,
+				},
+				{
+					value:     4,
+					timestamp: time2,
+				},
+				{
+					value:     1,
+					timestamp: time3,
+				},
+				{
+					value:     2,
+					timestamp: time4,
+				},
+			},
+			want: []MetricValue{
+				{
+					Value: 1,
+				},
+			},
+		},
 	}
 	for name, tt := range tests {
 		t.Run(name, func(t *testing.T) {