Sfoglia il codice sorgente

Add cpu usage counter reset protection to prevent negative usage samples from being propagated

Matt Bolt 2 giorni fa
parent
commit
1e063c6bda

+ 13 - 4
modules/collector-source/pkg/metric/synthetic/cpuallocation.go

@@ -64,15 +64,24 @@ func (usage *CpuUsageMetric) Value() float64 {
 		return 0.0
 	}
 
-	v1, t1 := usage.current.update.Value, usage.current.timestamp
-	v2, t2 := usage.prev.update.Value, usage.prev.timestamp
+	curr, t1 := usage.current.update.Value, usage.current.timestamp
+	prev, t2 := usage.prev.update.Value, usage.prev.timestamp
+
+	// handle case where current value is less than the previous value, signalling
+	// that the running total was reset, or overflowed.
+	if curr < prev {
+		return 0.0
+	}
+
 	seconds := t1.Sub(t2).Seconds()
+
+	// ensure positive non-zero duration between samples
 	if seconds <= 0.0 {
 		return 0.0
 	}
 
-	irate := (v1 - v2) / seconds
-	return irate
+	irate := (curr - prev) / seconds
+	return max(0.0, irate)
 }
 
 // Shift will set the previous to the current metric, and set the current metric to nil.

+ 176 - 0
modules/collector-source/pkg/metric/synthetic/metricsynthesizer_test.go

@@ -2,6 +2,7 @@ package synthetic
 
 import (
 	"maps"
+	"math"
 	"testing"
 	"time"
 
@@ -420,3 +421,178 @@ func TestMetricSynthesizerCPUAllocation(t *testing.T) {
 	metricSynth.Update(updateSet2)
 	metricSynth.Update(updateSet3)
 }
+
+func TestMetricSynthesizerCPUAllocation_UsageOverflow(t *testing.T) {
+	container1Info := map[string]string{
+		source.NamespaceLabel: "namespace1",
+		source.NodeLabel:      "node1",
+		source.InstanceLabel:  "node1",
+		source.PodLabel:       "pod1",
+		source.UIDLabel:       "pod-uuid1",
+		source.ContainerLabel: "container1",
+	}
+
+	// start a max uint64 nanoseconds -> seconds
+	// since the source metrics use nanoseconds, that's where overflow would occur.
+	var startingCPUNanoSeconds uint64 = math.MaxUint64
+
+	const nanosIncrement uint64 = 40 * 1e9
+
+	toSeconds := func(nanos uint64) float64 {
+		return float64(nanos) * 1e-9
+	}
+
+	updateSet1 := &metric.UpdateSet{
+		Timestamp: time.Date(2026, time.January, 1, 0, 0, 0, 0, time.UTC),
+		Updates: []metric.Update{
+			// First Update has requests AND 1 usage sample
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toCpuResource(container1Info),
+				Value:  0.2,
+			},
+			{
+				Name:   metric.ContainerCPUUsageSecondsTotal,
+				Labels: maps.Clone(container1Info),
+				Value:  toSeconds(startingCPUNanoSeconds),
+			},
+		},
+	}
+
+	updateSet2 := &metric.UpdateSet{
+		Timestamp: time.Date(2026, time.January, 1, 0, 0, 30, 0, time.UTC),
+		Updates: []metric.Update{
+			// Second Update doesn't have request, and has the second usage sample
+			{
+				Name:   metric.ContainerCPUUsageSecondsTotal,
+				Labels: maps.Clone(container1Info),
+				Value:  toSeconds(startingCPUNanoSeconds + nanosIncrement),
+			},
+		},
+	}
+
+	updateSet3 := &metric.UpdateSet{
+		Timestamp: time.Date(2026, time.January, 1, 0, 1, 0, 0, time.UTC),
+		Updates: []metric.Update{
+			{
+				Name:   metric.ContainerCPUUsageSecondsTotal,
+				Labels: maps.Clone(container1Info),
+				Value:  toSeconds(startingCPUNanoSeconds + nanosIncrement + nanosIncrement),
+			},
+		},
+	}
+
+	scrape := 0
+	updater := NewFuncUpdater(func(us *metric.UpdateSet) {
+		// first scrape:
+		//  - container1: alloc = request
+		if scrape == 0 {
+			assertMetricValue(t, us, metric.ContainerCPUAllocation, "container1", 0.2)
+		}
+
+		// second scrape
+		//  - container1: alloc = overflow, reset to current sample
+		if scrape == 1 {
+			assertMetricValue(t, us, metric.ContainerCPUAllocation, "container1", 0.0)
+		}
+
+		// third scrape
+		//  - container1: alloc = 40.0/30s = 1.33333
+		if scrape == 2 {
+			assertMetricValue(t, us, metric.ContainerCPUAllocation, "container1", 1.33333333)
+		}
+
+		scrape += 1
+	})
+
+	metricSynth := NewMetricSynthesizers(updater, NewContainerCpuAllocationSynthesizer(), NewContainerMemoryAllocationSynthesizer())
+
+	metricSynth.Update(updateSet1)
+	metricSynth.Update(updateSet2)
+	metricSynth.Update(updateSet3)
+}
+
+func TestMetricSynthesizerCPUAllocation_UsageCounterReset(t *testing.T) {
+	const nanosIncrement uint64 = 40 * 1e9
+
+	container1Info := map[string]string{
+		source.NamespaceLabel: "namespace1",
+		source.NodeLabel:      "node1",
+		source.InstanceLabel:  "node1",
+		source.PodLabel:       "pod1",
+		source.UIDLabel:       "pod-uuid1",
+		source.ContainerLabel: "container1",
+	}
+
+	// Starting CPU Total Seconds
+	const startingCPUSeconds float64 = 506000.0
+
+	updateSet1 := &metric.UpdateSet{
+		Timestamp: time.Date(2026, time.January, 1, 0, 0, 0, 0, time.UTC),
+		Updates: []metric.Update{
+			// First Update has requests AND 1 usage sample
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toCpuResource(container1Info),
+				Value:  0.2,
+			},
+			{
+				Name:   metric.ContainerCPUUsageSecondsTotal,
+				Labels: maps.Clone(container1Info),
+				Value:  startingCPUSeconds,
+			},
+		},
+	}
+
+	updateSet2 := &metric.UpdateSet{
+		Timestamp: time.Date(2026, time.January, 1, 0, 0, 30, 0, time.UTC),
+		Updates: []metric.Update{
+			// Second Update doesn't have request, and has the second usage sample
+			{
+				Name:   metric.ContainerCPUUsageSecondsTotal,
+				Labels: maps.Clone(container1Info),
+				Value:  startingCPUSeconds - 1000.0,
+			},
+		},
+	}
+
+	updateSet3 := &metric.UpdateSet{
+		Timestamp: time.Date(2026, time.January, 1, 0, 1, 0, 0, time.UTC),
+		Updates: []metric.Update{
+			{
+				Name:   metric.ContainerCPUUsageSecondsTotal,
+				Labels: maps.Clone(container1Info),
+				Value:  (startingCPUSeconds - 1000.0) + 40.0,
+			},
+		},
+	}
+
+	scrape := 0
+	updater := NewFuncUpdater(func(us *metric.UpdateSet) {
+		// first scrape:
+		//  - container1: alloc = request
+		if scrape == 0 {
+			assertMetricValue(t, us, metric.ContainerCPUAllocation, "container1", 0.2)
+		}
+
+		// second scrape
+		//  - container1: alloc = (subtract 1000s - usage sample is less than last recorded), reset to 0.0
+		if scrape == 1 {
+			assertMetricValue(t, us, metric.ContainerCPUAllocation, "container1", 0.0)
+		}
+
+		// third scrape
+		//  - container1: alloc = 40.0/30s = 1.33333
+		if scrape == 2 {
+			assertMetricValue(t, us, metric.ContainerCPUAllocation, "container1", 1.33333333)
+		}
+
+		scrape += 1
+	})
+
+	metricSynth := NewMetricSynthesizers(updater, NewContainerCpuAllocationSynthesizer(), NewContainerMemoryAllocationSynthesizer())
+
+	metricSynth.Update(updateSet1)
+	metricSynth.Update(updateSet2)
+	metricSynth.Update(updateSet3)
+}