Răsfoiți Sursa

Produce GPU saturation and device metrics from both data sources

Wire the GPU saturation/device telemetry query layer behind the
DataSource interface so the kubemodel hydration (next change) can consume
it identically regardless of data source.

- core/source: add the GPU saturation and device-metric query methods to
  MetricsQuerier (throttle violation/reason, framebuffer occupancy avg/max,
  memory pressure, XID errors, DRAM/SM activity, PCIe/NVLink rates; device
  power/temperature/utilization/memory), plus the GPUSaturationResult and
  GPUDeviceMetricResult decoders. Series are container-attributed and keyed
  by pod_uid to match the kubemodel convention.
- prometheus-source: DCGM-derived implementations (gpusaturationquerier,
  gpudevicequerier). Throttle reasons decode the clock-throttle-reasons
  bitmask; memory pressure compares framebuffer occupancy against
  GPU_MEMORY_SATURATION_THRESHOLD. All selectors are cluster-filter scoped.
- collector-source: parallel implementation over the native collectors,
  with three reusable metric aggregators (above-threshold ratio, bitset
  ratio, changes) and a synthetic framebuffer memory-pressure ratio so the
  collector path emits the same signal as the Prometheus path.

Query builders, decoders, and aggregators are covered by unit tests.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: Cliff Colvin <clifford.colvin@gmail.com>
Cliff Colvin 1 săptămână în urmă
părinte
comite
73d55a3b22
25 a modificat fișierele cu 2177 adăugiri și 0 ștergeri
  1. 27 0
      core/pkg/source/datasource.go
  2. 69 0
      core/pkg/source/decoders.go
  3. 93 0
      core/pkg/source/decoders_test.go
  4. 6 0
      modules/collector-source/pkg/collector/collector.go
  5. 1 0
      modules/collector-source/pkg/collector/datasource.go
  6. 150 0
      modules/collector-source/pkg/collector/gpusaturation.go
  7. 202 0
      modules/collector-source/pkg/collector/gpusaturationquerier.go
  8. 235 0
      modules/collector-source/pkg/collector/gpusaturationquerier_test.go
  9. 68 0
      modules/collector-source/pkg/metric/aggregator/abovethresholdratio.go
  10. 83 0
      modules/collector-source/pkg/metric/aggregator/abovethresholdratio_test.go
  11. 71 0
      modules/collector-source/pkg/metric/aggregator/bitsetratio.go
  12. 88 0
      modules/collector-source/pkg/metric/aggregator/bitsetratio_test.go
  13. 56 0
      modules/collector-source/pkg/metric/aggregator/changes.go
  14. 75 0
      modules/collector-source/pkg/metric/aggregator/changes_test.go
  15. 30 0
      modules/collector-source/pkg/metric/collector.go
  16. 33 0
      modules/collector-source/pkg/metric/metrics.go
  17. 104 0
      modules/collector-source/pkg/metric/synthetic/gpumemory.go
  18. 163 0
      modules/collector-source/pkg/metric/synthetic/gpumemory_test.go
  19. 20 0
      modules/collector-source/pkg/scrape/dcgm.go
  20. 4 0
      modules/prometheus-source/go.mod
  21. 9 0
      modules/prometheus-source/go.sum
  22. 7 0
      modules/prometheus-source/pkg/prom/config.go
  23. 67 0
      modules/prometheus-source/pkg/prom/gpudevicequerier.go
  24. 282 0
      modules/prometheus-source/pkg/prom/gpusaturationquerier.go
  25. 234 0
      modules/prometheus-source/pkg/prom/gpusaturationquerier_test.go

+ 27 - 0
core/pkg/source/datasource.go

@@ -93,6 +93,33 @@ type MetricsQuerier interface {
 	QueryDCGMContainerUsageAvg(start, end time.Time) *Future[DCGMDeviceContainerUsageResult]
 	QueryDCGMContainerUsageMax(start, end time.Time) *Future[DCGMDeviceContainerUsageResult]
 
+	// GPU saturation (USE method). Sources return no results when the
+	// underlying DCGM fields are unavailable; absence is never zero.
+	QueryGPUThrottleViolationRatio(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUThrottleReasonRatio(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUMemoryUsedRatioAvg(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUMemoryUsedRatioMax(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUMemoryPressureRatio(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUXIDErrorCount(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUDRAMActiveAvg(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUDRAMActiveMax(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUSMActiveAvg(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUSMOccupancyAvg(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUPCIeTxBytesAvg(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUPCIeRxBytesAvg(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUNVLinkTxBytesAvg(start, end time.Time) *Future[GPUSaturationResult]
+	QueryGPUNVLinkRxBytesAvg(start, end time.Time) *Future[GPUSaturationResult]
+
+	// Device-level GPU metrics (DeviceInfo / DevicePerformance support):
+	// grouped by device identity only, all from the default dcgm-exporter
+	// configuration
+	QueryGPUDevicePowerAvg(start, end time.Time) *Future[GPUDeviceMetricResult]
+	QueryGPUDeviceTempAvg(start, end time.Time) *Future[GPUDeviceMetricResult]
+	QueryGPUDeviceUsageAvg(start, end time.Time) *Future[GPUDeviceMetricResult]
+	QueryGPUDeviceUsageMax(start, end time.Time) *Future[GPUDeviceMetricResult]
+	QueryGPUDeviceMemoryUsedAvg(start, end time.Time) *Future[GPUDeviceMetricResult]
+	QueryGPUDeviceMemoryUsedMax(start, end time.Time) *Future[GPUDeviceMetricResult]
+
 	// PVC
 	QueryPodPVCAllocation(start, end time.Time) *Future[PodPVCAllocationResult]
 	QueryPVCBytesRequested(start, end time.Time) *Future[PVCBytesRequestedResult]

+ 69 - 0
core/pkg/source/decoders.go

@@ -46,6 +46,9 @@ const (
 	HostNameLabel        = "Hostname"
 	UUIDLabel            = "UUID"
 	ResourceLabel        = "resource"
+	ReasonLabel          = "reason"
+	MIGProfileLabel      = "GPU_I_PROFILE"
+	MIGInstanceLabel     = "GPU_I_ID"
 	DeploymentLabel      = "deployment"
 	StatefulSetLabel     = "statefulSet"
 	DaemonSetLabel       = "daemonset"
@@ -1090,6 +1093,72 @@ func DecodeGPUInfoResult(result *QueryResult) *GPUInfoResult {
 	}
 }
 
+// GPUSaturationResult is the shared result shape for every GPU saturation
+// query. The signal queried (throttle ratio, memory pressure, etc.) is
+// determined by the DataSource method that produced the result; Reason is
+// only populated by the throttle queries, and the MIG labels only when
+// dcgm-exporter reports MIG instances as distinct devices.
+// GPUDeviceMetricResult is the result shape for device-level GPU metric
+// queries (power, temperature, device-level utilization, memory used).
+// It shares GPUSaturationResult's label decoding: device identity plus MIG
+// instance labels; container attribution fields stay empty for
+// device-level groupings.
+type GPUDeviceMetricResult = GPUSaturationResult
+
+// DecodeGPUDeviceMetricResult decodes a device-level GPU metric result.
+func DecodeGPUDeviceMetricResult(result *QueryResult) *GPUDeviceMetricResult {
+	return DecodeGPUSaturationResult(result)
+}
+
+type GPUSaturationResult struct {
+	UID         string
+	Cluster     string
+	Namespace   string
+	Pod         string
+	Container   string
+	Device      string
+	ModelName   string
+	UUID        string
+	MIGProfile  string
+	MIGInstance string
+	Reason      string
+	Data        []*util.Vector
+}
+
+func DecodeGPUSaturationResult(result *QueryResult) *GPUSaturationResult {
+	// DCGM series carry the pod UID as pod_uid in the kubemodel scrape
+	// convention; fall back to the legacy uid label for older configs
+	uid, err := result.GetString(PodUIDLabel)
+	if err != nil {
+		uid, _ = result.GetString(UIDLabel)
+	}
+	cluster, _ := result.GetCluster()
+	namespace, _ := result.GetNamespace()
+	pod, _ := result.GetPod()
+	container, _ := result.GetContainer()
+	device, _ := result.GetString(DeviceLabel)
+	modelName, _ := result.GetString(ModelNameLabel)
+	uuid, _ := result.GetString(UUIDLabel)
+	migProfile, _ := result.GetString(MIGProfileLabel)
+	migInstance, _ := result.GetString(MIGInstanceLabel)
+	reason, _ := result.GetString(ReasonLabel)
+
+	return &GPUSaturationResult{
+		UID:         uid,
+		Cluster:     cluster,
+		Namespace:   namespace,
+		Pod:         pod,
+		Container:   container,
+		Device:      device,
+		ModelName:   modelName,
+		UUID:        uuid,
+		MIGProfile:  migProfile,
+		MIGInstance: migInstance,
+		Reason:      reason,
+		Data:        result.Values,
+	}
+}
+
 type IsGPUSharedResult struct {
 	UID       string
 	Cluster   string

+ 93 - 0
core/pkg/source/decoders_test.go

@@ -0,0 +1,93 @@
+package source
+
+import (
+	"testing"
+
+	"github.com/opencost/opencost/core/pkg/util"
+)
+
+func TestDecodeGPUSaturationResult(t *testing.T) {
+	values := []*util.Vector{{Timestamp: 1000, Value: 0.25}}
+
+	t.Run("all labels present", func(t *testing.T) {
+		result := NewQueryResult(map[string]any{
+			"pod_uid":       "pod-uid-1",
+			"cluster_id":    "cluster-1",
+			"namespace":     "gpu-ns",
+			"pod":           "gpu-pod",
+			"container":     "gpu-container",
+			"device":        "nvidia0",
+			"modelName":     "NVIDIA A100-SXM4-40GB",
+			"UUID":          "GPU-1234",
+			"GPU_I_PROFILE": "1g.5gb",
+			"GPU_I_ID":      "3",
+			"reason":        "sw_power_cap",
+		}, values, nil)
+
+		decoded := DecodeGPUSaturationResult(result)
+
+		if decoded.UID != "pod-uid-1" {
+			t.Errorf("UID = %q, want %q", decoded.UID, "pod-uid-1")
+		}
+		if decoded.Cluster != "cluster-1" {
+			t.Errorf("Cluster = %q, want %q", decoded.Cluster, "cluster-1")
+		}
+		if decoded.Namespace != "gpu-ns" {
+			t.Errorf("Namespace = %q, want %q", decoded.Namespace, "gpu-ns")
+		}
+		if decoded.Pod != "gpu-pod" {
+			t.Errorf("Pod = %q, want %q", decoded.Pod, "gpu-pod")
+		}
+		if decoded.Container != "gpu-container" {
+			t.Errorf("Container = %q, want %q", decoded.Container, "gpu-container")
+		}
+		if decoded.Device != "nvidia0" {
+			t.Errorf("Device = %q, want %q", decoded.Device, "nvidia0")
+		}
+		if decoded.ModelName != "NVIDIA A100-SXM4-40GB" {
+			t.Errorf("ModelName = %q, want %q", decoded.ModelName, "NVIDIA A100-SXM4-40GB")
+		}
+		if decoded.UUID != "GPU-1234" {
+			t.Errorf("UUID = %q, want %q", decoded.UUID, "GPU-1234")
+		}
+		if decoded.MIGProfile != "1g.5gb" {
+			t.Errorf("MIGProfile = %q, want %q", decoded.MIGProfile, "1g.5gb")
+		}
+		if decoded.MIGInstance != "3" {
+			t.Errorf("MIGInstance = %q, want %q", decoded.MIGInstance, "3")
+		}
+		if decoded.Reason != "sw_power_cap" {
+			t.Errorf("Reason = %q, want %q", decoded.Reason, "sw_power_cap")
+		}
+		if len(decoded.Data) != 1 || decoded.Data[0].Value != 0.25 {
+			t.Errorf("Data not passed through: %+v", decoded.Data)
+		}
+	})
+
+	t.Run("optional labels absent", func(t *testing.T) {
+		// non-MIG GPU without a reason-labeled query: those labels simply
+		// do not exist on the series and must decode to empty strings
+		// legacy scrape configs label the pod UID as "uid"; the decoder
+		// falls back to it when pod_uid is absent
+		result := NewQueryResult(map[string]any{
+			"uid":        "pod-uid-1",
+			"cluster_id": "cluster-1",
+			"namespace":  "gpu-ns",
+			"pod":        "gpu-pod",
+			"container":  "gpu-container",
+			"UUID":       "GPU-1234",
+		}, values, nil)
+
+		decoded := DecodeGPUSaturationResult(result)
+
+		if decoded.MIGProfile != "" || decoded.MIGInstance != "" || decoded.Reason != "" {
+			t.Errorf("expected absent labels to decode to empty strings, got %+v", decoded)
+		}
+		if decoded.Device != "" || decoded.ModelName != "" {
+			t.Errorf("expected absent device labels to decode to empty strings, got %+v", decoded)
+		}
+		if decoded.UUID != "GPU-1234" {
+			t.Errorf("UUID = %q, want %q", decoded.UUID, "GPU-1234")
+		}
+	})
+}

+ 6 - 0
modules/collector-source/pkg/collector/collector.go

@@ -74,6 +74,12 @@ func NewOpenCostMetricStore() metric.MetricStore {
 	memStore.Register(NewDCGMUptimeMetricCollector())
 	memStore.Register(NewDCGMContainerUsageAvgMetricCollector())
 	memStore.Register(NewDCGMContainerUsageMaxMetricCollector())
+	for _, gpuSaturationCollector := range NewGPUSaturationMetricCollectors() {
+		memStore.Register(gpuSaturationCollector)
+	}
+	for _, gpuDeviceCollector := range NewGPUDeviceMetricCollectors() {
+		memStore.Register(gpuDeviceCollector)
+	}
 	memStore.Register(NewNodeCPUPricePerHourMetricCollector())
 	memStore.Register(NewNodeRAMPricePerGiBHourMetricCollector())
 	memStore.Register(NewNodeGPUPricePerHourMetricCollector())

+ 1 - 0
modules/collector-source/pkg/collector/datasource.go

@@ -88,6 +88,7 @@ func NewCollectorDataSource(
 		updater,
 		synthetic.NewContainerMemoryAllocationSynthesizer(),
 		synthetic.NewContainerCpuAllocationSynthesizer(),
+		synthetic.NewGPUMemoryUsedRatioSynthesizer(),
 	)
 	updater = metricSynthesizer
 

+ 150 - 0
modules/collector-source/pkg/collector/gpusaturation.go

@@ -0,0 +1,150 @@
+package collector
+
+import (
+	coreenv "github.com/opencost/opencost/core/pkg/env"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metric/aggregator"
+)
+
+// GPU saturation collectors
+//
+// These collectors aggregate USE-method GPU saturation signals from
+// dcgm-exporter scrapes. When a DCGM field is absent from the scrape (not in
+// the exporter config, or no DCP profiling support) the corresponding
+// collector simply accumulates nothing and its query returns no results,
+// which downstream treats as "signal absent" rather than zero.
+
+// gpuSaturationLabels is the label set shared by every GPU saturation
+// collector: container attribution, GPU identity, and MIG instance labels.
+// Labels absent from a scrape resolve to empty strings.
+var gpuSaturationLabels = []string{
+	source.NamespaceLabel,
+	source.PodLabel,
+	source.PodUIDLabel,
+	source.ContainerLabel,
+	source.DeviceLabel,
+	source.ModelNameLabel,
+	source.UUIDLabel,
+	source.MIGProfileLabel,
+	source.MIGInstanceLabel,
+}
+
+func gpuSaturationFilter(labels map[string]string) bool {
+	return labels[source.ContainerLabel] != ""
+}
+
+// gpuThrottleViolationCollectorMetrics maps each violation collector ID to
+// its DCGM source metric. The counters accumulate microseconds spent
+// throttled; the querier converts the windowed increase into a fraction of
+// the window and tags the reason.
+var gpuThrottleViolationCollectors = []struct {
+	ID     metric.MetricCollectorID
+	Metric string
+	Reason string
+}{
+	{ID: metric.GPUThrottleViolationPowerID, Metric: metric.DCGMFIDEVPOWERVIOLATION, Reason: opencost.GPUThrottleViolationPower},
+	{ID: metric.GPUThrottleViolationThermalID, Metric: metric.DCGMFIDEVTHERMALVIOLATION, Reason: opencost.GPUThrottleViolationThermal},
+	{ID: metric.GPUThrottleViolationSyncBoostID, Metric: metric.DCGMFIDEVSYNCBOOSTVIOLATION, Reason: opencost.GPUThrottleViolationSyncBoost},
+	{ID: metric.GPUThrottleViolationBoardLimitID, Metric: metric.DCGMFIDEVBOARDLIMITVIOLATION, Reason: opencost.GPUThrottleViolationBoardLimit},
+}
+
+// gpuThrottleBitmaskMetrics enumerates both names of the DCGM clock throttle
+// reasons bitmask field (renamed in DCGM 3.3+); at most one is scraped per
+// dcgm-exporter version, so only one family of collectors ever accumulates
+// data and the querier's merge of both ID families is effectively a union
+// with one empty side. Registering per-name collectors was chosen over
+// renaming at scrape time because the TargetScraper is a generic
+// name-filtered pipe with no transform hook; if scrape-time normalization
+// is ever added, collapse this to the canonical name and halve the
+// collectors.
+var gpuThrottleBitmaskMetrics = []string{
+	metric.DCGMFIDEVCLOCKTHROTTLEREASONS,
+	metric.DCGMFIDEVCLOCKSEVENTREASONS,
+}
+
+func newGPUSaturationCollector(id metric.MetricCollectorID, metricName string, factory aggregator.MetricAggregatorFactory) *metric.MetricCollector {
+	return metric.NewMetricCollector(
+		id,
+		metricName,
+		gpuSaturationLabels,
+		factory,
+		gpuSaturationFilter,
+	)
+}
+
+// gpuDeviceLabels groups device-level metrics by device identity (and MIG
+// instance) without container attribution: power, temperature, and
+// device-level utilization describe the whole device regardless of which
+// containers share it.
+var gpuDeviceLabels = []string{
+	source.DeviceLabel,
+	source.ModelNameLabel,
+	source.UUIDLabel,
+	source.MIGProfileLabel,
+	source.MIGInstanceLabel,
+}
+
+func newGPUDeviceCollector(id metric.MetricCollectorID, metricName string, factory aggregator.MetricAggregatorFactory) *metric.MetricCollector {
+	return metric.NewMetricCollector(id, metricName, gpuDeviceLabels, factory, nil)
+}
+
+// NewGPUDeviceMetricCollectors returns the collectors backing the
+// DeviceInfo / DevicePerformance contracts: power, temperature,
+// device-level compute utilization, and framebuffer used.
+func NewGPUDeviceMetricCollectors() []*metric.MetricCollector {
+	return []*metric.MetricCollector{
+		newGPUDeviceCollector(metric.GPUDevicePowerAvgID, metric.DCGMFIDEVPOWERUSAGE, aggregator.AverageOverTime),
+		newGPUDeviceCollector(metric.GPUDeviceTempAvgID, metric.DCGMFIDEVGPUTEMP, aggregator.AverageOverTime),
+		newGPUDeviceCollector(metric.GPUDeviceUsageAvgID, metric.DCGMFIPROFGRENGINEACTIVE, aggregator.AverageOverTime),
+		newGPUDeviceCollector(metric.GPUDeviceUsageMaxID, metric.DCGMFIPROFGRENGINEACTIVE, aggregator.MaxOverTime),
+		newGPUDeviceCollector(metric.GPUDeviceMemoryUsedAvgID, metric.DCGMFIDEVFBUSED, aggregator.AverageOverTime),
+		newGPUDeviceCollector(metric.GPUDeviceMemoryUsedMaxID, metric.DCGMFIDEVFBUSED, aggregator.MaxOverTime),
+	}
+}
+
+// NewGPUSaturationMetricCollectors returns every collector needed for the
+// GPU saturation signals.
+func NewGPUSaturationMetricCollectors() []*metric.MetricCollector {
+	collectors := []*metric.MetricCollector{
+		// framebuffer occupancy over the synthetic per-sample ratio metric
+		// joined from FB_USED/FB_FREE at scrape time (see
+		// synthetic.GPUMemoryUsedRatioSynthesizer)
+		newGPUSaturationCollector(metric.GPUMemoryUsedAvgID, metric.OpencostGPUMemoryUsedRatio, aggregator.AverageOverTime),
+		newGPUSaturationCollector(metric.GPUMemoryUsedMaxID, metric.OpencostGPUMemoryUsedRatio, aggregator.MaxOverTime),
+		newGPUSaturationCollector(metric.GPUMemoryPressureRatioID, metric.OpencostGPUMemoryUsedRatio, aggregator.AboveThresholdRatio(coreenv.GetGPUMemorySaturationThreshold())),
+		// XID error events: count value transitions of the last-error gauge
+		newGPUSaturationCollector(metric.GPUXIDErrorCountID, metric.DCGMFIDEVXIDERRORS, aggregator.Changes),
+		// DCP profiling gauges
+		newGPUSaturationCollector(metric.GPUDRAMActiveAvgID, metric.DCGMFIPROFDRAMACTIVE, aggregator.AverageOverTime),
+		newGPUSaturationCollector(metric.GPUDRAMActiveMaxID, metric.DCGMFIPROFDRAMACTIVE, aggregator.MaxOverTime),
+		newGPUSaturationCollector(metric.GPUSMActiveAvgID, metric.DCGMFIPROFSMACTIVE, aggregator.AverageOverTime),
+		newGPUSaturationCollector(metric.GPUSMOccupancyAvgID, metric.DCGMFIPROFSMOCCUPANCY, aggregator.AverageOverTime),
+		// DCP byte counters as average bytes/sec
+		newGPUSaturationCollector(metric.GPUPCIeTxBytesAvgID, metric.DCGMFIPROFPCIETXBYTES, aggregator.Rate),
+		newGPUSaturationCollector(metric.GPUPCIeRxBytesAvgID, metric.DCGMFIPROFPCIERXBYTES, aggregator.Rate),
+		newGPUSaturationCollector(metric.GPUNVLinkTxBytesAvgID, metric.DCGMFIPROFNVLINKTXBYTES, aggregator.Rate),
+		newGPUSaturationCollector(metric.GPUNVLinkRxBytesAvgID, metric.DCGMFIPROFNVLINKRXBYTES, aggregator.Rate),
+	}
+
+	// throttle violation counters: windowed increase, normalized by the
+	// querier
+	for _, violation := range gpuThrottleViolationCollectors {
+		collectors = append(collectors, newGPUSaturationCollector(violation.ID, violation.Metric, aggregator.Increase))
+	}
+
+	// throttle reasons bitmask: one bit-ratio collector per
+	// (metric name, saturation-relevant reason)
+	for _, metricName := range gpuThrottleBitmaskMetrics {
+		for _, reason := range opencost.GPUThrottleReasons {
+			collectors = append(collectors, newGPUSaturationCollector(
+				metric.GPUThrottleReasonCollectorID(metricName, reason.Name),
+				metricName,
+				aggregator.BitSetRatio(reason.Bit),
+			))
+		}
+	}
+
+	return collectors
+}

+ 202 - 0
modules/collector-source/pkg/collector/gpusaturationquerier.go

@@ -0,0 +1,202 @@
+package collector
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metric/aggregator"
+)
+
+// GPU saturation queries over the collector store. These mirror the
+// prometheus-source queries: same DataSource method names, same
+// GPUSaturationResult semantics. Signals whose DCGM field was never scraped
+// produce no results, which downstream treats as absent rather than zero.
+
+// gpuSaturationResultsFuture wraps a set of MetricResults into the shared
+// GPUSaturationResult future shape.
+func gpuSaturationResultsFuture(name string, results []*aggregator.MetricResult, err error) *source.Future[source.GPUSaturationResult] {
+	queryResults := source.NewQueryResults(name)
+	queryResults.Error = err
+	for _, result := range results {
+		queryResults.Results = append(queryResults.Results, result.ToQueryResult())
+	}
+	ch := make(source.QueryResultsChan, 1)
+	ch <- queryResults
+	return source.NewFuture(source.DecodeGPUSaturationResult, ch)
+}
+
+// queryGPUReasonTagged queries one collector per reason, tags each result
+// with the reason label, and optionally transforms every value.
+func (c *collectorMetricsQuerier) queryGPUReasonTagged(name string, start, end time.Time, idReasons map[metric.MetricCollectorID]string, transform func(float64) float64) *source.Future[source.GPUSaturationResult] {
+	var tagged []*aggregator.MetricResult
+	var firstErr error
+
+	collector := c.collectorProvider.GetStore(start, end)
+	if collector != nil {
+		for id, reason := range idReasons {
+			results, err := collector.Query(id)
+			if err != nil {
+				if firstErr == nil {
+					firstErr = err
+				}
+				continue
+			}
+			for _, result := range results {
+				if result.MetricLabels == nil {
+					result.MetricLabels = map[string]string{}
+				}
+				result.MetricLabels[source.ReasonLabel] = reason
+				if transform != nil {
+					for i := range result.Values {
+						result.Values[i].Value = transform(result.Values[i].Value)
+					}
+				}
+				tagged = append(tagged, result)
+			}
+		}
+	}
+
+	return gpuSaturationResultsFuture(name, tagged, firstErr)
+}
+
+// QueryGPUThrottleViolationRatio reports the fraction of the window each GPU
+// spent throttled, per reason, from the DCGM violation microsecond counters.
+func (c *collectorMetricsQuerier) QueryGPUThrottleViolationRatio(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	idReasons := make(map[metric.MetricCollectorID]string, len(gpuThrottleViolationCollectors))
+	for _, violation := range gpuThrottleViolationCollectors {
+		idReasons[violation.ID] = violation.Reason
+	}
+
+	windowMicros := float64(end.Sub(start).Microseconds())
+	if windowMicros <= 0 {
+		return gpuSaturationResultsFuture("GPUThrottleViolationRatio", nil, fmt.Errorf("invalid window for GPUThrottleViolationRatio: %s to %s", start, end))
+	}
+
+	return c.queryGPUReasonTagged("GPUThrottleViolationRatio", start, end, idReasons, func(increaseMicros float64) float64 {
+		return increaseMicros / windowMicros
+	})
+}
+
+// QueryGPUThrottleReasonRatio reports the fraction of scraped samples in
+// which each saturation-relevant bit of the clock throttle reasons bitmask
+// was set. Both DCGM field names are queried; at most one is ever scraped.
+func (c *collectorMetricsQuerier) QueryGPUThrottleReasonRatio(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	idReasons := make(map[metric.MetricCollectorID]string, 2*len(opencost.GPUThrottleReasons))
+	for _, metricName := range gpuThrottleBitmaskMetrics {
+		for _, reason := range opencost.GPUThrottleReasons {
+			idReasons[metric.GPUThrottleReasonCollectorID(metricName, reason.Name)] = reason.Name
+		}
+	}
+	return c.queryGPUReasonTagged("GPUThrottleReasonRatio", start, end, idReasons, nil)
+}
+
+// QueryGPUMemoryUsedRatioAvg reports average framebuffer occupancy over the
+// window: FB_USED / (FB_USED + FB_FREE), aggregated from the per-sample
+// occupancy ratio synthesized at scrape time.
+func (c *collectorMetricsQuerier) QueryGPUMemoryUsedRatioAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return queryCollector(c, start, end, metric.GPUMemoryUsedAvgID, source.DecodeGPUSaturationResult)
+}
+
+// QueryGPUMemoryUsedRatioMax reports peak framebuffer occupancy over the
+// window.
+func (c *collectorMetricsQuerier) QueryGPUMemoryUsedRatioMax(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return queryCollector(c, start, end, metric.GPUMemoryUsedMaxID, source.DecodeGPUSaturationResult)
+}
+
+// QueryGPUMemoryPressureRatio reports the fraction of scraped samples in
+// which framebuffer occupancy was at or above the configured threshold,
+// from the same synthesized per-sample occupancy ratio.
+func (c *collectorMetricsQuerier) QueryGPUMemoryPressureRatio(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return queryCollector(c, start, end, metric.GPUMemoryPressureRatioID, source.DecodeGPUSaturationResult)
+}
+
+// QueryGPUXIDErrorCount reports the number of XID error transitions
+// observed in the window.
+func (c *collectorMetricsQuerier) QueryGPUXIDErrorCount(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return queryCollector(c, start, end, metric.GPUXIDErrorCountID, source.DecodeGPUSaturationResult)
+}
+
+// QueryGPUDRAMActiveAvg reports the average ratio of cycles the device
+// memory interface was active. Requires DCP profiling.
+func (c *collectorMetricsQuerier) QueryGPUDRAMActiveAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return queryCollector(c, start, end, metric.GPUDRAMActiveAvgID, source.DecodeGPUSaturationResult)
+}
+
+// QueryGPUDRAMActiveMax reports the peak ratio of cycles the device memory
+// interface was active. Requires DCP profiling.
+func (c *collectorMetricsQuerier) QueryGPUDRAMActiveMax(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return queryCollector(c, start, end, metric.GPUDRAMActiveMaxID, source.DecodeGPUSaturationResult)
+}
+
+// QueryGPUSMActiveAvg reports the average ratio of cycles at least one warp
+// was resident on any SM. Requires DCP profiling and explicit enablement.
+func (c *collectorMetricsQuerier) QueryGPUSMActiveAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return queryCollector(c, start, end, metric.GPUSMActiveAvgID, source.DecodeGPUSaturationResult)
+}
+
+// QueryGPUSMOccupancyAvg reports the average ratio of resident warps to the
+// SM maximum. Requires DCP profiling and explicit enablement.
+func (c *collectorMetricsQuerier) QueryGPUSMOccupancyAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return queryCollector(c, start, end, metric.GPUSMOccupancyAvgID, source.DecodeGPUSaturationResult)
+}
+
+// QueryGPUPCIeTxBytesAvg reports average PCIe transmit throughput in
+// bytes/sec. Requires DCP profiling.
+func (c *collectorMetricsQuerier) QueryGPUPCIeTxBytesAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return queryCollector(c, start, end, metric.GPUPCIeTxBytesAvgID, source.DecodeGPUSaturationResult)
+}
+
+// QueryGPUPCIeRxBytesAvg reports average PCIe receive throughput in
+// bytes/sec. Requires DCP profiling.
+func (c *collectorMetricsQuerier) QueryGPUPCIeRxBytesAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return queryCollector(c, start, end, metric.GPUPCIeRxBytesAvgID, source.DecodeGPUSaturationResult)
+}
+
+// QueryGPUNVLinkTxBytesAvg reports average NVLink transmit throughput in
+// bytes/sec. Requires DCP profiling and explicit enablement.
+func (c *collectorMetricsQuerier) QueryGPUNVLinkTxBytesAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return queryCollector(c, start, end, metric.GPUNVLinkTxBytesAvgID, source.DecodeGPUSaturationResult)
+}
+
+// QueryGPUNVLinkRxBytesAvg reports average NVLink receive throughput in
+// bytes/sec. Requires DCP profiling and explicit enablement.
+func (c *collectorMetricsQuerier) QueryGPUNVLinkRxBytesAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return queryCollector(c, start, end, metric.GPUNVLinkRxBytesAvgID, source.DecodeGPUSaturationResult)
+}
+
+// Device-level GPU metric queries (DeviceInfo / DevicePerformance support).
+
+// QueryGPUDevicePowerAvg reports average device power draw in watts.
+func (c *collectorMetricsQuerier) QueryGPUDevicePowerAvg(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	return queryCollector(c, start, end, metric.GPUDevicePowerAvgID, source.DecodeGPUDeviceMetricResult)
+}
+
+// QueryGPUDeviceTempAvg reports average device temperature in Celsius.
+func (c *collectorMetricsQuerier) QueryGPUDeviceTempAvg(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	return queryCollector(c, start, end, metric.GPUDeviceTempAvgID, source.DecodeGPUDeviceMetricResult)
+}
+
+// QueryGPUDeviceUsageAvg reports average device-level compute utilization
+// as a 0-1 ratio.
+func (c *collectorMetricsQuerier) QueryGPUDeviceUsageAvg(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	return queryCollector(c, start, end, metric.GPUDeviceUsageAvgID, source.DecodeGPUDeviceMetricResult)
+}
+
+// QueryGPUDeviceUsageMax reports peak device-level compute utilization as a
+// 0-1 ratio.
+func (c *collectorMetricsQuerier) QueryGPUDeviceUsageMax(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	return queryCollector(c, start, end, metric.GPUDeviceUsageMaxID, source.DecodeGPUDeviceMetricResult)
+}
+
+// QueryGPUDeviceMemoryUsedAvg reports average framebuffer used in MiB.
+func (c *collectorMetricsQuerier) QueryGPUDeviceMemoryUsedAvg(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	return queryCollector(c, start, end, metric.GPUDeviceMemoryUsedAvgID, source.DecodeGPUDeviceMetricResult)
+}
+
+// QueryGPUDeviceMemoryUsedMax reports peak framebuffer used in MiB.
+func (c *collectorMetricsQuerier) QueryGPUDeviceMemoryUsedMax(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	return queryCollector(c, start, end, metric.GPUDeviceMemoryUsedMaxID, source.DecodeGPUDeviceMetricResult)
+}

+ 235 - 0
modules/collector-source/pkg/collector/gpusaturationquerier_test.go

@@ -0,0 +1,235 @@
+package collector
+
+import (
+	"math"
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
+)
+
+// gpuSaturationMockProvider builds a store with one hour of DCGM saturation
+// samples for a single GPU container. SM_ACTIVE, SM_OCCUPANCY, and NVLink
+// metrics are deliberately never updated to exercise absent-signal behavior.
+func gpuSaturationMockProvider(t *testing.T) (StoreProvider, time.Time, time.Time) {
+	t.Helper()
+	t.Setenv("GPU_MEMORY_SATURATION_THRESHOLD", "0.6")
+
+	start, _ := time.Parse(time.RFC3339, Start1Str)
+	end, _ := time.Parse(time.RFC3339, End1Str)
+
+	gpuInfo := map[string]string{
+		source.NamespaceLabel:   "namespace1",
+		source.PodLabel:         "pod1",
+		source.PodUIDLabel:      "pod-uuid1",
+		source.ContainerLabel:   "container1",
+		source.DeviceLabel:      "nvidia0",
+		source.ModelNameLabel:   "Tesla T4",
+		source.UUIDLabel:        "GPU-1",
+		source.MIGProfileLabel:  "",
+		source.MIGInstanceLabel: "",
+	}
+
+	store := NewOpenCostMetricStore()
+
+	// power violation counter: 1.8e9us accumulated over a 3.6e9us window
+	store.Update(metric.DCGMFIDEVPOWERVIOLATION, gpuInfo, 0, start, nil)
+	store.Update(metric.DCGMFIDEVPOWERVIOLATION, gpuInfo, 1.8e9, end, nil)
+
+	// throttle bitmask (legacy field name): sw_power_cap set in 1 of 2 samples
+	store.Update(metric.DCGMFIDEVCLOCKTHROTTLEREASONS, gpuInfo, 0x4, start, nil)
+	store.Update(metric.DCGMFIDEVCLOCKTHROTTLEREASONS, gpuInfo, 0x0, end, nil)
+
+	// framebuffer occupancy ratio, as synthesized from FB_USED/FB_FREE per
+	// scrape by GPUMemoryUsedRatioSynthesizer (see synthetic package tests
+	// for the join itself): avg 0.625, max 0.75, half of samples >= 0.6
+	store.Update(metric.OpencostGPUMemoryUsedRatio, gpuInfo, 0.5, start, nil)
+	store.Update(metric.OpencostGPUMemoryUsedRatio, gpuInfo, 0.75, end, nil)
+
+	// one XID error transition
+	store.Update(metric.DCGMFIDEVXIDERRORS, gpuInfo, 0, start, nil)
+	store.Update(metric.DCGMFIDEVXIDERRORS, gpuInfo, 13, end, nil)
+
+	// DRAM activity gauge
+	store.Update(metric.DCGMFIPROFDRAMACTIVE, gpuInfo, 0.5, start, nil)
+	store.Update(metric.DCGMFIPROFDRAMACTIVE, gpuInfo, 0.7, end, nil)
+
+	// PCIe tx counter: 3.6e12 bytes over 3600s = 1e9 bytes/sec
+	store.Update(metric.DCGMFIPROFPCIETXBYTES, gpuInfo, 0, start, nil)
+	store.Update(metric.DCGMFIPROFPCIETXBYTES, gpuInfo, 3.6e12, end, nil)
+
+	return &MockStoreProvider{metricsCollector: store}, start, end
+}
+
+func awaitGPUSaturation(t *testing.T, f *source.Future[source.GPUSaturationResult]) []*source.GPUSaturationResult {
+	t.Helper()
+	res, err := f.Await()
+	if err != nil {
+		t.Fatalf("unexpected error: %v", err)
+	}
+	return res
+}
+
+func requireValue(t *testing.T, results []*source.GPUSaturationResult, want float64) {
+	t.Helper()
+	if len(results) != 1 {
+		t.Fatalf("expected 1 result, got %d", len(results))
+	}
+	got := results[0].Data[0].Value
+	if math.Abs(got-want) > 1e-9 {
+		t.Errorf("value = %v, want %v", got, want)
+	}
+	if results[0].UUID != "GPU-1" || results[0].Container != "container1" {
+		t.Errorf("result lost GPU identity labels: %+v", results[0])
+	}
+}
+
+func TestCollectorMetricsQuerier_GPUThrottleViolationRatio(t *testing.T) {
+	provider, start, end := gpuSaturationMockProvider(t)
+	c := collectorMetricsQuerier{collectorProvider: provider}
+
+	results := awaitGPUSaturation(t, c.QueryGPUThrottleViolationRatio(start, end))
+
+	// only the power violation counter was scraped
+	requireValue(t, results, 0.5)
+	if results[0].Reason != opencost.GPUThrottleViolationPower {
+		t.Errorf("Reason = %q, want %q", results[0].Reason, opencost.GPUThrottleViolationPower)
+	}
+}
+
+func TestCollectorMetricsQuerier_GPUThrottleReasonRatio(t *testing.T) {
+	provider, start, end := gpuSaturationMockProvider(t)
+	c := collectorMetricsQuerier{collectorProvider: provider}
+
+	results := awaitGPUSaturation(t, c.QueryGPUThrottleReasonRatio(start, end))
+
+	// the legacy bitmask field was scraped, so every reason bit reports
+	got := map[string]float64{}
+	for _, res := range results {
+		got[res.Reason] = res.Data[0].Value
+	}
+	if len(got) != len(opencost.GPUThrottleReasons) {
+		t.Fatalf("expected %d reasons, got %d: %v", len(opencost.GPUThrottleReasons), len(got), got)
+	}
+	for _, reason := range opencost.GPUThrottleReasons {
+		want := 0.0
+		if reason.Name == opencost.GPUThrottleReasonSwPowerCap {
+			want = 0.5
+		}
+		if math.Abs(got[reason.Name]-want) > 1e-9 {
+			t.Errorf("reason %q ratio = %v, want %v", reason.Name, got[reason.Name], want)
+		}
+	}
+}
+
+func TestCollectorMetricsQuerier_GPUMemoryUsedRatio(t *testing.T) {
+	provider, start, end := gpuSaturationMockProvider(t)
+	c := collectorMetricsQuerier{collectorProvider: provider}
+
+	// avg of per-sample ratios (0.5, 0.75)
+	requireValue(t, awaitGPUSaturation(t, c.QueryGPUMemoryUsedRatioAvg(start, end)), 0.625)
+	// max of per-sample ratios
+	requireValue(t, awaitGPUSaturation(t, c.QueryGPUMemoryUsedRatioMax(start, end)), 0.75)
+}
+
+func TestCollectorMetricsQuerier_GPUMemoryPressureRatio(t *testing.T) {
+	provider, start, end := gpuSaturationMockProvider(t)
+	c := collectorMetricsQuerier{collectorProvider: provider}
+
+	// threshold configured to 0.6: one of two samples (0.75) is at or above
+	requireValue(t, awaitGPUSaturation(t, c.QueryGPUMemoryPressureRatio(start, end)), 0.5)
+}
+
+func TestCollectorMetricsQuerier_GPUXIDErrorCount(t *testing.T) {
+	provider, start, end := gpuSaturationMockProvider(t)
+	c := collectorMetricsQuerier{collectorProvider: provider}
+
+	requireValue(t, awaitGPUSaturation(t, c.QueryGPUXIDErrorCount(start, end)), 1)
+}
+
+func TestCollectorMetricsQuerier_GPUDRAMActive(t *testing.T) {
+	provider, start, end := gpuSaturationMockProvider(t)
+	c := collectorMetricsQuerier{collectorProvider: provider}
+
+	requireValue(t, awaitGPUSaturation(t, c.QueryGPUDRAMActiveAvg(start, end)), 0.6)
+	requireValue(t, awaitGPUSaturation(t, c.QueryGPUDRAMActiveMax(start, end)), 0.7)
+}
+
+func TestCollectorMetricsQuerier_GPUPCIeTxBytesAvg(t *testing.T) {
+	provider, start, end := gpuSaturationMockProvider(t)
+	c := collectorMetricsQuerier{collectorProvider: provider}
+
+	requireValue(t, awaitGPUSaturation(t, c.QueryGPUPCIeTxBytesAvg(start, end)), 1e9)
+}
+
+// TestCollectorMetricsQuerier_GPUDeviceMetrics verifies the device-level
+// queries aggregate from the device-labeled DCGM series.
+func TestCollectorMetricsQuerier_GPUDeviceMetrics(t *testing.T) {
+	start, _ := time.Parse(time.RFC3339, Start1Str)
+	end, _ := time.Parse(time.RFC3339, End1Str)
+
+	deviceInfo := map[string]string{
+		source.DeviceLabel:      "nvidia0",
+		source.ModelNameLabel:   "Tesla T4",
+		source.UUIDLabel:        "GPU-1",
+		source.MIGProfileLabel:  "",
+		source.MIGInstanceLabel: "",
+	}
+	store := NewOpenCostMetricStore()
+	store.Update(metric.DCGMFIDEVPOWERUSAGE, deviceInfo, 120, start, nil)
+	store.Update(metric.DCGMFIDEVPOWERUSAGE, deviceInfo, 160, end, nil)
+	store.Update(metric.DCGMFIDEVGPUTEMP, deviceInfo, 55, start, nil)
+	store.Update(metric.DCGMFIPROFGRENGINEACTIVE, deviceInfo, 0.4, start, nil)
+	store.Update(metric.DCGMFIPROFGRENGINEACTIVE, deviceInfo, 0.9, end, nil)
+	store.Update(metric.DCGMFIDEVFBUSED, deviceInfo, 1024, start, nil)
+	store.Update(metric.DCGMFIDEVFBUSED, deviceInfo, 2048, end, nil)
+
+	c := collectorMetricsQuerier{collectorProvider: &MockStoreProvider{metricsCollector: store}}
+
+	checks := map[string]struct {
+		future *source.Future[source.GPUDeviceMetricResult]
+		want   float64
+	}{
+		"power avg":  {c.QueryGPUDevicePowerAvg(start, end), 140},
+		"temp avg":   {c.QueryGPUDeviceTempAvg(start, end), 55},
+		"usage avg":  {c.QueryGPUDeviceUsageAvg(start, end), 0.65},
+		"usage max":  {c.QueryGPUDeviceUsageMax(start, end), 0.9},
+		"memory avg": {c.QueryGPUDeviceMemoryUsedAvg(start, end), 1536},
+		"memory max": {c.QueryGPUDeviceMemoryUsedMax(start, end), 2048},
+	}
+	for name, check := range checks {
+		results := awaitGPUSaturation(t, check.future)
+		if len(results) != 1 {
+			t.Fatalf("%s: expected 1 result, got %d", name, len(results))
+		}
+		if got := results[0].Data[0].Value; math.Abs(got-check.want) > 1e-9 {
+			t.Errorf("%s = %v, want %v", name, got, check.want)
+		}
+		if results[0].UUID != "GPU-1" {
+			t.Errorf("%s: lost device identity: %+v", name, results[0])
+		}
+	}
+}
+
+// TestCollectorMetricsQuerier_GPUSaturationAbsentSignals verifies that
+// signals whose DCGM fields were never scraped return no results instead of
+// zeroes.
+func TestCollectorMetricsQuerier_GPUSaturationAbsentSignals(t *testing.T) {
+	provider, start, end := gpuSaturationMockProvider(t)
+	c := collectorMetricsQuerier{collectorProvider: provider}
+
+	absent := map[string]*source.Future[source.GPUSaturationResult]{
+		"SMActiveAvg":      c.QueryGPUSMActiveAvg(start, end),
+		"SMOccupancyAvg":   c.QueryGPUSMOccupancyAvg(start, end),
+		"PCIeRxBytesAvg":   c.QueryGPUPCIeRxBytesAvg(start, end),
+		"NVLinkTxBytesAvg": c.QueryGPUNVLinkTxBytesAvg(start, end),
+		"NVLinkRxBytesAvg": c.QueryGPUNVLinkRxBytesAvg(start, end),
+	}
+	for name, future := range absent {
+		if results := awaitGPUSaturation(t, future); len(results) != 0 {
+			t.Errorf("%s: expected no results for unscraped metric, got %d", name, len(results))
+		}
+	}
+}

+ 68 - 0
modules/collector-source/pkg/metric/aggregator/abovethresholdratio.go

@@ -0,0 +1,68 @@
+package aggregator
+
+import (
+	"math"
+	"sync"
+	"time"
+)
+
+// aboveThresholdRatioAggregator is a MetricAggregator which returns the
+// fraction of unique-timestamp samples whose value was at or above a fixed
+// threshold. It is used to derive time-over-threshold pressure signals such
+// as GPU memory pressure.
+type aboveThresholdRatioAggregator struct {
+	lock        sync.Mutex
+	labelValues []string
+	threshold   float64
+	count       int
+	aboveCount  int
+	currentTime *time.Time
+	currentHit  bool
+}
+
+// AboveThresholdRatio returns a MetricAggregatorFactory producing
+// aggregators that report the fraction of samples >= threshold.
+func AboveThresholdRatio(threshold float64) MetricAggregatorFactory {
+	return func(labelValues []string) MetricAggregator {
+		return &aboveThresholdRatioAggregator{
+			labelValues: labelValues,
+			threshold:   threshold,
+		}
+	}
+}
+
+func (a *aboveThresholdRatioAggregator) AdditionInfo() map[string]string {
+	return nil
+}
+
+func (a *aboveThresholdRatioAggregator) LabelValues() []string {
+	return a.labelValues
+}
+
+func (a *aboveThresholdRatioAggregator) Update(value float64, timestamp time.Time, additionalInfo map[string]string) {
+	a.lock.Lock()
+	defer a.lock.Unlock()
+
+	hit := !math.IsNaN(value) && value >= a.threshold
+
+	if a.currentTime == nil || !timestamp.Equal(*a.currentTime) {
+		a.currentTime = &timestamp
+		a.currentHit = false
+		a.count++
+	}
+	if hit && !a.currentHit {
+		a.currentHit = true
+		a.aboveCount++
+	}
+}
+
+func (a *aboveThresholdRatioAggregator) Value() []MetricValue {
+	a.lock.Lock()
+	defer a.lock.Unlock()
+	if a.count == 0 {
+		return []MetricValue{{Value: 0}}
+	}
+	return []MetricValue{
+		{Value: float64(a.aboveCount) / float64(a.count)},
+	}
+}

+ 83 - 0
modules/collector-source/pkg/metric/aggregator/abovethresholdratio_test.go

@@ -0,0 +1,83 @@
+package aggregator
+
+import (
+	"math"
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestAboveThresholdRatioAggregator_Value(t *testing.T) {
+	time1 := time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC)
+	time2 := time.Date(1, 1, 1, 0, 1, 0, 0, time.UTC)
+	time3 := time.Date(1, 1, 1, 0, 2, 0, 0, time.UTC)
+	time4 := time.Date(1, 1, 1, 0, 3, 0, 0, time.UTC)
+
+	type update struct {
+		value     float64
+		timestamp time.Time
+	}
+	tests := map[string]struct {
+		threshold float64
+		updates   []update
+		want      []MetricValue
+	}{
+		"no update": {
+			threshold: 0.9,
+			updates:   []update{},
+			want:      []MetricValue{{Value: 0}},
+		},
+		"all above": {
+			threshold: 0.9,
+			updates: []update{
+				{value: 0.95, timestamp: time1},
+				{value: 0.99, timestamp: time2},
+			},
+			want: []MetricValue{{Value: 1}},
+		},
+		"threshold is inclusive": {
+			threshold: 0.9,
+			updates:   []update{{value: 0.9, timestamp: time1}},
+			want:      []MetricValue{{Value: 1}},
+		},
+		"quarter above": {
+			threshold: 0.9,
+			updates: []update{
+				{value: 0.95, timestamp: time1},
+				{value: 0.5, timestamp: time2},
+				{value: 0.89, timestamp: time3},
+				{value: 0.1, timestamp: time4},
+			},
+			want: []MetricValue{{Value: 0.25}},
+		},
+		"duplicate timestamp counts once": {
+			threshold: 0.9,
+			updates: []update{
+				{value: 0.95, timestamp: time1},
+				{value: 0.1, timestamp: time1},
+				{value: 0.1, timestamp: time2},
+			},
+			want: []MetricValue{{Value: 0.5}},
+		},
+		"NaN counts as below": {
+			threshold: 0.9,
+			updates: []update{
+				{value: math.NaN(), timestamp: time1},
+				{value: 0.95, timestamp: time2},
+			},
+			want: []MetricValue{{Value: 0.5}},
+		},
+	}
+
+	for name, tt := range tests {
+		t.Run(name, func(t *testing.T) {
+			agg := AboveThresholdRatio(tt.threshold)([]string{})
+			for _, u := range tt.updates {
+				agg.Update(u.value, u.timestamp, nil)
+			}
+			if got := agg.Value(); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("Value() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}

+ 71 - 0
modules/collector-source/pkg/metric/aggregator/bitsetratio.go

@@ -0,0 +1,71 @@
+package aggregator
+
+import (
+	"math"
+	"sync"
+	"time"
+)
+
+// bitSetRatioAggregator is a MetricAggregator which returns the fraction of
+// unique-timestamp samples in which a specific bit was set in the sample
+// value, treating the value as an integer bitmask. It is used to derive the
+// fraction of a window a GPU spent throttled for a specific reason from the
+// DCGM clock throttle reasons bitmask.
+type bitSetRatioAggregator struct {
+	lock        sync.Mutex
+	labelValues []string
+	bit         uint64
+	count       int
+	setCount    int
+	currentTime *time.Time
+	currentSet  bool
+}
+
+// BitSetRatio returns a MetricAggregatorFactory producing aggregators that
+// report the fraction of samples with the given bit set.
+func BitSetRatio(bit uint64) MetricAggregatorFactory {
+	return func(labelValues []string) MetricAggregator {
+		return &bitSetRatioAggregator{
+			labelValues: labelValues,
+			bit:         bit,
+		}
+	}
+}
+
+func (a *bitSetRatioAggregator) AdditionInfo() map[string]string {
+	return nil
+}
+
+func (a *bitSetRatioAggregator) LabelValues() []string {
+	return a.labelValues
+}
+
+func (a *bitSetRatioAggregator) Update(value float64, timestamp time.Time, additionalInfo map[string]string) {
+	a.lock.Lock()
+	defer a.lock.Unlock()
+
+	// NaN or negative values cannot be valid bitmasks; count the sample as
+	// bit-clear rather than guessing
+	set := !math.IsNaN(value) && value >= 0 && uint64(value)&a.bit != 0
+
+	if a.currentTime == nil || !timestamp.Equal(*a.currentTime) {
+		a.currentTime = &timestamp
+		a.currentSet = false
+		a.count++
+	}
+	if set && !a.currentSet {
+		a.currentSet = true
+		a.setCount++
+	}
+}
+
+func (a *bitSetRatioAggregator) Value() []MetricValue {
+	a.lock.Lock()
+	defer a.lock.Unlock()
+	if a.count == 0 {
+		return []MetricValue{{Value: 0}}
+	}
+	return []MetricValue{
+		{Value: float64(a.setCount) / float64(a.count)},
+	}
+}

+ 88 - 0
modules/collector-source/pkg/metric/aggregator/bitsetratio_test.go

@@ -0,0 +1,88 @@
+package aggregator
+
+import (
+	"math"
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestBitSetRatioAggregator_Value(t *testing.T) {
+	time1 := time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC)
+	time2 := time.Date(1, 1, 1, 0, 1, 0, 0, time.UTC)
+	time3 := time.Date(1, 1, 1, 0, 2, 0, 0, time.UTC)
+	time4 := time.Date(1, 1, 1, 0, 3, 0, 0, time.UTC)
+
+	type update struct {
+		value     float64
+		timestamp time.Time
+	}
+	tests := map[string]struct {
+		bit     uint64
+		updates []update
+		want    []MetricValue
+	}{
+		"no update": {
+			bit:     0x8,
+			updates: []update{},
+			want:    []MetricValue{{Value: 0}},
+		},
+		"single sample bit set": {
+			bit:     0x8,
+			updates: []update{{value: 8, timestamp: time1}},
+			want:    []MetricValue{{Value: 1}},
+		},
+		"single sample bit clear": {
+			bit:     0x8,
+			updates: []update{{value: 4, timestamp: time1}},
+			want:    []MetricValue{{Value: 0}},
+		},
+		"half of samples set": {
+			bit: 0x4,
+			updates: []update{
+				{value: 0x4, timestamp: time1},
+				{value: 0x0, timestamp: time2},
+				{value: 0x4 | 0x8, timestamp: time3},
+				{value: 0x8, timestamp: time4},
+			},
+			want: []MetricValue{{Value: 0.5}},
+		},
+		"other bits do not count": {
+			bit: 0x40,
+			updates: []update{
+				{value: 0x1 | 0x2 | 0x4 | 0x8 | 0x10 | 0x20 | 0x80, timestamp: time1},
+			},
+			want: []MetricValue{{Value: 0}},
+		},
+		"duplicate timestamp counts once": {
+			bit: 0x4,
+			updates: []update{
+				{value: 0x4, timestamp: time1},
+				{value: 0x0, timestamp: time1},
+				{value: 0x0, timestamp: time2},
+			},
+			want: []MetricValue{{Value: 0.5}},
+		},
+		"invalid values are treated as bit clear": {
+			bit: 0x4,
+			updates: []update{
+				{value: math.NaN(), timestamp: time1},
+				{value: -4, timestamp: time2},
+				{value: 0x4, timestamp: time3},
+			},
+			want: []MetricValue{{Value: 1.0 / 3.0}},
+		},
+	}
+
+	for name, tt := range tests {
+		t.Run(name, func(t *testing.T) {
+			agg := BitSetRatio(tt.bit)([]string{})
+			for _, u := range tt.updates {
+				agg.Update(u.value, u.timestamp, nil)
+			}
+			if got := agg.Value(); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("Value() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}

+ 56 - 0
modules/collector-source/pkg/metric/aggregator/changes.go

@@ -0,0 +1,56 @@
+package aggregator
+
+import (
+	"sync"
+	"time"
+)
+
+// changesAggregator is a MetricAggregator which counts how many times the
+// sample value changed between consecutive samples, equivalent to
+// PromQL's changes(). Updates must arrive in timestamp order; out-of-order
+// or duplicate timestamps are ignored.
+type changesAggregator struct {
+	lock        sync.Mutex
+	labelValues []string
+	initialized bool
+	lastValue   float64
+	lastTime    time.Time
+	changes     float64
+}
+
+func Changes(labelValues []string) MetricAggregator {
+	return &changesAggregator{
+		labelValues: labelValues,
+	}
+}
+
+func (a *changesAggregator) AdditionInfo() map[string]string {
+	return nil
+}
+
+func (a *changesAggregator) LabelValues() []string {
+	return a.labelValues
+}
+
+func (a *changesAggregator) Update(value float64, timestamp time.Time, additionalInfo map[string]string) {
+	a.lock.Lock()
+	defer a.lock.Unlock()
+
+	if a.initialized && !timestamp.After(a.lastTime) {
+		return
+	}
+	if a.initialized && value != a.lastValue {
+		a.changes++
+	}
+	a.initialized = true
+	a.lastValue = value
+	a.lastTime = timestamp
+}
+
+func (a *changesAggregator) Value() []MetricValue {
+	a.lock.Lock()
+	defer a.lock.Unlock()
+	return []MetricValue{
+		{Value: a.changes},
+	}
+}

+ 75 - 0
modules/collector-source/pkg/metric/aggregator/changes_test.go

@@ -0,0 +1,75 @@
+package aggregator
+
+import (
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestChangesAggregator_Value(t *testing.T) {
+	time1 := time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC)
+	time2 := time.Date(1, 1, 1, 0, 1, 0, 0, time.UTC)
+	time3 := time.Date(1, 1, 1, 0, 2, 0, 0, time.UTC)
+	time4 := time.Date(1, 1, 1, 0, 3, 0, 0, time.UTC)
+
+	type update struct {
+		value     float64
+		timestamp time.Time
+	}
+	tests := map[string]struct {
+		updates []update
+		want    []MetricValue
+	}{
+		"no update": {
+			updates: []update{},
+			want:    []MetricValue{{Value: 0}},
+		},
+		"single sample is zero changes": {
+			updates: []update{{value: 13, timestamp: time1}},
+			want:    []MetricValue{{Value: 0}},
+		},
+		"constant value is zero changes": {
+			updates: []update{
+				{value: 0, timestamp: time1},
+				{value: 0, timestamp: time2},
+				{value: 0, timestamp: time3},
+			},
+			want: []MetricValue{{Value: 0}},
+		},
+		"each transition counts": {
+			updates: []update{
+				{value: 0, timestamp: time1},
+				{value: 13, timestamp: time2},
+				{value: 13, timestamp: time3},
+				{value: 31, timestamp: time4},
+			},
+			want: []MetricValue{{Value: 2}},
+		},
+		"out of order updates are ignored": {
+			updates: []update{
+				{value: 0, timestamp: time2},
+				{value: 13, timestamp: time1},
+			},
+			want: []MetricValue{{Value: 0}},
+		},
+		"duplicate timestamp is ignored": {
+			updates: []update{
+				{value: 0, timestamp: time1},
+				{value: 13, timestamp: time1},
+			},
+			want: []MetricValue{{Value: 0}},
+		},
+	}
+
+	for name, tt := range tests {
+		t.Run(name, func(t *testing.T) {
+			agg := Changes([]string{})
+			for _, u := range tt.updates {
+				agg.Update(u.value, u.timestamp, nil)
+			}
+			if got := agg.Value(); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("Value() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}

+ 30 - 0
modules/collector-source/pkg/metric/collector.go

@@ -75,6 +75,28 @@ const (
 	GPUsAllocatedID                            MetricCollectorID = "GPUsAllocated"
 	IsGPUSharedID                              MetricCollectorID = "IsGPUShared"
 	GPUInfoID                                  MetricCollectorID = "GPUInfo"
+	GPUThrottleViolationPowerID                MetricCollectorID = "GPUThrottleViolationPower"
+	GPUThrottleViolationThermalID              MetricCollectorID = "GPUThrottleViolationThermal"
+	GPUThrottleViolationSyncBoostID            MetricCollectorID = "GPUThrottleViolationSyncBoost"
+	GPUThrottleViolationBoardLimitID           MetricCollectorID = "GPUThrottleViolationBoardLimit"
+	GPUMemoryUsedAvgID                         MetricCollectorID = "GPUMemoryUsedAvg"
+	GPUMemoryUsedMaxID                         MetricCollectorID = "GPUMemoryUsedMax"
+	GPUMemoryPressureRatioID                   MetricCollectorID = "GPUMemoryPressureRatio"
+	GPUXIDErrorCountID                         MetricCollectorID = "GPUXIDErrorCount"
+	GPUDRAMActiveAvgID                         MetricCollectorID = "GPUDRAMActiveAvg"
+	GPUDRAMActiveMaxID                         MetricCollectorID = "GPUDRAMActiveMax"
+	GPUSMActiveAvgID                           MetricCollectorID = "GPUSMActiveAvg"
+	GPUSMOccupancyAvgID                        MetricCollectorID = "GPUSMOccupancyAvg"
+	GPUPCIeTxBytesAvgID                        MetricCollectorID = "GPUPCIeTxBytesAvg"
+	GPUPCIeRxBytesAvgID                        MetricCollectorID = "GPUPCIeRxBytesAvg"
+	GPUNVLinkTxBytesAvgID                      MetricCollectorID = "GPUNVLinkTxBytesAvg"
+	GPUNVLinkRxBytesAvgID                      MetricCollectorID = "GPUNVLinkRxBytesAvg"
+	GPUDevicePowerAvgID                        MetricCollectorID = "GPUDevicePowerAvg"
+	GPUDeviceTempAvgID                         MetricCollectorID = "GPUDeviceTempAvg"
+	GPUDeviceUsageAvgID                        MetricCollectorID = "GPUDeviceUsageAvg"
+	GPUDeviceUsageMaxID                        MetricCollectorID = "GPUDeviceUsageMax"
+	GPUDeviceMemoryUsedAvgID                   MetricCollectorID = "GPUDeviceMemoryUsedAvg"
+	GPUDeviceMemoryUsedMaxID                   MetricCollectorID = "GPUDeviceMemoryUsedMax"
 	NodeCPUPricePerHourID                      MetricCollectorID = "NodeCPUPricePerHour"
 	NodeRAMPricePerGiBHourID                   MetricCollectorID = "NodeRAMPricePerGiBHour"
 	NodeGPUPricePerHourID                      MetricCollectorID = "NodeGPUPricePerHour"
@@ -170,6 +192,14 @@ const (
 	ResourceQuotaStatusUsedRAMLimitMaxID       MetricCollectorID = "ResourceQuotaStatusUsedRAMLimitMax"
 )
 
+// GPUThrottleReasonCollectorID returns the deterministic collector ID for the
+// throttle reason bit collector over the given DCGM bitmask metric name. One
+// collector exists per (bitmask metric name, reason) pair because the DCGM
+// field was renamed in 3.3+ and only one of the two names is ever scraped.
+func GPUThrottleReasonCollectorID(metricName, reason string) MetricCollectorID {
+	return MetricCollectorID("GPUThrottleReason/" + metricName + "/" + reason)
+}
+
 // MetricCollector is a data structure that represents a specific MetricCollector metric instance that contains its own breakdown
 // of stored metrics by a specific label set.
 type MetricCollector struct {

+ 33 - 0
modules/collector-source/pkg/metric/metrics.go

@@ -63,6 +63,39 @@ const (
 	DCGMFIPROFGRENGINEACTIVE = "DCGM_FI_PROF_GR_ENGINE_ACTIVE"
 	DCGMFIDEVDECUTIL         = "DCGM_FI_DEV_DEC_UTIL"
 
+	// DCGM saturation metrics (default dcgm-exporter configuration)
+	DCGMFIDEVPOWERVIOLATION      = "DCGM_FI_DEV_POWER_VIOLATION"
+	DCGMFIDEVTHERMALVIOLATION    = "DCGM_FI_DEV_THERMAL_VIOLATION"
+	DCGMFIDEVSYNCBOOSTVIOLATION  = "DCGM_FI_DEV_SYNC_BOOST_VIOLATION"
+	DCGMFIDEVBOARDLIMITVIOLATION = "DCGM_FI_DEV_BOARD_LIMIT_VIOLATION"
+	DCGMFIDEVFBUSED              = "DCGM_FI_DEV_FB_USED"
+	DCGMFIDEVFBFREE              = "DCGM_FI_DEV_FB_FREE"
+	DCGMFIDEVXIDERRORS           = "DCGM_FI_DEV_XID_ERRORS"
+	DCGMFIDEVPOWERUSAGE          = "DCGM_FI_DEV_POWER_USAGE"
+	DCGMFIDEVGPUTEMP             = "DCGM_FI_DEV_GPU_TEMP"
+
+	// DCGM saturation metrics requiring explicit enablement in the
+	// dcgm-exporter configuration. The clock throttle reasons bitmask was
+	// renamed in DCGM 3.3+; both names are scraped, at most one exists.
+	DCGMFIDEVCLOCKTHROTTLEREASONS = "DCGM_FI_DEV_CLOCK_THROTTLE_REASONS"
+	DCGMFIDEVCLOCKSEVENTREASONS   = "DCGM_FI_DEV_CLOCKS_EVENT_REASONS"
+
+	// DCGM DCP profiling saturation metrics (require Volta+ GPUs;
+	// SM_ACTIVE, SM_OCCUPANCY, and NVLINK additionally require explicit
+	// enablement in the dcgm-exporter configuration)
+	DCGMFIPROFDRAMACTIVE    = "DCGM_FI_PROF_DRAM_ACTIVE"
+	DCGMFIPROFSMACTIVE      = "DCGM_FI_PROF_SM_ACTIVE"
+	DCGMFIPROFSMOCCUPANCY   = "DCGM_FI_PROF_SM_OCCUPANCY"
+	DCGMFIPROFPCIETXBYTES   = "DCGM_FI_PROF_PCIE_TX_BYTES"
+	DCGMFIPROFPCIERXBYTES   = "DCGM_FI_PROF_PCIE_RX_BYTES"
+	DCGMFIPROFNVLINKTXBYTES = "DCGM_FI_PROF_NVLINK_TX_BYTES"
+	DCGMFIPROFNVLINKRXBYTES = "DCGM_FI_PROF_NVLINK_RX_BYTES"
+
+	// Synthetic metrics generated from DCGM scrapes (see pkg/metric/synthetic)
+	// OpencostGPUMemoryUsedRatio is the per-sample framebuffer occupancy
+	// ratio FB_USED / (FB_USED + FB_FREE), joined per scrape
+	OpencostGPUMemoryUsedRatio = "opencost_gpu_memory_used_ratio"
+
 	// Network Metrics
 	KubecostPodNetworkEgressBytesTotal  = "kubecost_pod_network_egress_bytes_total"
 	KubecostPodNetworkIngressBytesTotal = "kubecost_pod_network_ingress_bytes_total"

+ 104 - 0
modules/collector-source/pkg/metric/synthetic/gpumemory.go

@@ -0,0 +1,104 @@
+package synthetic
+
+import (
+	"maps"
+	"math"
+	"strings"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
+)
+
+// gpuFramebufferSample pairs the framebuffer used/free updates for one GPU
+// (or MIG instance) and container within a single scrape.
+type gpuFramebufferSample struct {
+	used *metric.Update
+	free *metric.Update
+}
+
+// GPUMemoryUsedRatioSynthesizer joins DCGM_FI_DEV_FB_USED and
+// DCGM_FI_DEV_FB_FREE within each scrape and synthesizes a per-sample
+// framebuffer occupancy ratio metric, used / (used + free). Joining per
+// scrape is what makes time-over-threshold memory pressure computable
+// downstream; post-aggregation joins cannot recover the per-sample ratio.
+type GPUMemoryUsedRatioSynthesizer struct {
+	byDevice map[string]*gpuFramebufferSample
+}
+
+// NewGPUMemoryUsedRatioSynthesizer creates a synthesizer producing
+// OpencostGPUMemoryUsedRatio updates from the DCGM framebuffer metrics.
+func NewGPUMemoryUsedRatioSynthesizer() *GPUMemoryUsedRatioSynthesizer {
+	return &GPUMemoryUsedRatioSynthesizer{
+		byDevice: make(map[string]*gpuFramebufferSample),
+	}
+}
+
+// gpuDeviceKey identifies one GPU (or MIG instance) attached to one
+// container: dcgm-exporter emits one used/free series per such pairing.
+func gpuDeviceKey(labels map[string]string) string {
+	return strings.Join([]string{
+		labels[source.UUIDLabel],
+		labels[source.MIGInstanceLabel],
+		labels[source.PodUIDLabel],
+		labels[source.ContainerLabel],
+	}, "|")
+}
+
+// Process records framebuffer used/free updates; all other metrics are
+// ignored.
+func (s *GPUMemoryUsedRatioSynthesizer) Process(t time.Time, update *metric.Update) {
+	var sample *gpuFramebufferSample
+	switch update.Name {
+	case metric.DCGMFIDEVFBUSED, metric.DCGMFIDEVFBFREE:
+		key := gpuDeviceKey(update.Labels)
+		if _, ok := s.byDevice[key]; !ok {
+			s.byDevice[key] = &gpuFramebufferSample{}
+		}
+		sample = s.byDevice[key]
+	default:
+		return
+	}
+
+	if update.Name == metric.DCGMFIDEVFBUSED {
+		sample.used = update
+	} else {
+		sample.free = update
+	}
+}
+
+// Synthesize emits one occupancy ratio update per device that reported both
+// framebuffer metrics this scrape. Devices missing either half, or
+// reporting a non-positive or non-finite total, emit nothing.
+func (s *GPUMemoryUsedRatioSynthesizer) Synthesize() []metric.Update {
+	var updates []metric.Update
+
+	for _, sample := range s.byDevice {
+		if sample.used == nil || sample.free == nil {
+			continue
+		}
+		used := sample.used.Value
+		free := sample.free.Value
+		total := used + free
+		// Both components must be individually non-negative: checking only
+		// the total would let a corrupt negative FB_FREE (e.g. used=100,
+		// free=-50) through and produce a ratio above 1, escaping the
+		// documented [0, 1] occupancy range. (Code review finding.)
+		if math.IsNaN(total) || math.IsInf(total, 0) || total <= 0 || used < 0 || free < 0 {
+			continue
+		}
+
+		updates = append(updates, metric.Update{
+			Name:   metric.OpencostGPUMemoryUsedRatio,
+			Labels: maps.Clone(sample.used.Labels),
+			Value:  used / total,
+		})
+	}
+
+	return updates
+}
+
+// Clear resets the per-scrape state.
+func (s *GPUMemoryUsedRatioSynthesizer) Clear() {
+	s.byDevice = make(map[string]*gpuFramebufferSample)
+}

+ 163 - 0
modules/collector-source/pkg/metric/synthetic/gpumemory_test.go

@@ -0,0 +1,163 @@
+package synthetic
+
+import (
+	"math"
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
+)
+
+// capturingUpdater records the UpdateSet handed to the next stage of the
+// synthesizer pipeline.
+type capturingUpdater struct {
+	set *metric.UpdateSet
+}
+
+func (c *capturingUpdater) Update(set *metric.UpdateSet) {
+	c.set = set
+}
+
+func gpuFBUpdate(name string, uuid, migInstance string, value float64) *metric.Update {
+	return &metric.Update{
+		Name: name,
+		Labels: map[string]string{
+			"UUID":      uuid,
+			"GPU_I_ID":  migInstance,
+			"pod_uid":   "pod-uuid1",
+			"container": "container1",
+			"namespace": "namespace1",
+			"pod":       "pod1",
+		},
+		Value: value,
+	}
+}
+
+func TestGPUMemoryUsedRatioSynthesizer(t *testing.T) {
+	now := time.Now()
+
+	t.Run("joins used and free into a ratio", func(t *testing.T) {
+		s := NewGPUMemoryUsedRatioSynthesizer()
+		s.Process(now, gpuFBUpdate(metric.DCGMFIDEVFBUSED, "GPU-1", "", 12000))
+		s.Process(now, gpuFBUpdate(metric.DCGMFIDEVFBFREE, "GPU-1", "", 4000))
+
+		updates := s.Synthesize()
+		if len(updates) != 1 {
+			t.Fatalf("expected 1 synthetic update, got %d", len(updates))
+		}
+		got := updates[0]
+		if got.Name != metric.OpencostGPUMemoryUsedRatio {
+			t.Errorf("Name = %q, want %q", got.Name, metric.OpencostGPUMemoryUsedRatio)
+		}
+		if got.Value != 0.75 {
+			t.Errorf("Value = %v, want 0.75", got.Value)
+		}
+		if got.Labels["UUID"] != "GPU-1" || got.Labels["container"] != "container1" {
+			t.Errorf("labels not carried through: %v", got.Labels)
+		}
+	})
+
+	t.Run("MIG instances synthesize independently", func(t *testing.T) {
+		s := NewGPUMemoryUsedRatioSynthesizer()
+		s.Process(now, gpuFBUpdate(metric.DCGMFIDEVFBUSED, "GPU-1", "1", 5000))
+		s.Process(now, gpuFBUpdate(metric.DCGMFIDEVFBFREE, "GPU-1", "1", 5000))
+		s.Process(now, gpuFBUpdate(metric.DCGMFIDEVFBUSED, "GPU-1", "2", 2000))
+		s.Process(now, gpuFBUpdate(metric.DCGMFIDEVFBFREE, "GPU-1", "2", 8000))
+
+		updates := s.Synthesize()
+		if len(updates) != 2 {
+			t.Fatalf("expected 2 synthetic updates, got %d", len(updates))
+		}
+		byInstance := map[string]float64{}
+		for _, u := range updates {
+			byInstance[u.Labels["GPU_I_ID"]] = u.Value
+		}
+		if byInstance["1"] != 0.5 || byInstance["2"] != 0.2 {
+			t.Errorf("per-instance ratios = %v, want {1:0.5, 2:0.2}", byInstance)
+		}
+	})
+
+	t.Run("missing half emits nothing", func(t *testing.T) {
+		s := NewGPUMemoryUsedRatioSynthesizer()
+		s.Process(now, gpuFBUpdate(metric.DCGMFIDEVFBUSED, "GPU-1", "", 12000))
+		if updates := s.Synthesize(); len(updates) != 0 {
+			t.Errorf("expected no updates without FB_FREE, got %v", updates)
+		}
+	})
+
+	t.Run("invalid totals emit nothing", func(t *testing.T) {
+		cases := map[string][2]float64{
+			"zero total":     {0, 0},
+			"negative used":  {-1, 100},
+			"NaN free":       {100, math.NaN()},
+			"infinite total": {math.Inf(1), 100},
+			// negative free with positive total would yield ratio > 1
+			// (100/(100-50) = 2.0) if only the total were validated
+			"negative free": {100, -50},
+		}
+		for name, values := range cases {
+			s := NewGPUMemoryUsedRatioSynthesizer()
+			s.Process(now, gpuFBUpdate(metric.DCGMFIDEVFBUSED, "GPU-1", "", values[0]))
+			s.Process(now, gpuFBUpdate(metric.DCGMFIDEVFBFREE, "GPU-1", "", values[1]))
+			if updates := s.Synthesize(); len(updates) != 0 {
+				t.Errorf("%s: expected no updates, got %v", name, updates)
+			}
+		}
+	})
+
+	t.Run("unrelated metrics are ignored", func(t *testing.T) {
+		s := NewGPUMemoryUsedRatioSynthesizer()
+		s.Process(now, gpuFBUpdate(metric.DCGMFIPROFGRENGINEACTIVE, "GPU-1", "", 0.9))
+		if updates := s.Synthesize(); len(updates) != 0 {
+			t.Errorf("expected no updates for unrelated metric, got %v", updates)
+		}
+	})
+
+	t.Run("joins correctly through the MetricSynthesizers pipeline", func(t *testing.T) {
+		// Exercises the real dispatch path: MetricSynthesizers.Update copies
+		// each Update into a loop-body variable and passes its address to
+		// Process. The body-scoped declaration yields a distinct allocation
+		// per iteration, so stored pointers never alias; this test pins that
+		// by pushing two devices' used/free pairs through one UpdateSet and
+		// asserting each synthesized ratio reflects its own samples.
+		captured := &capturingUpdater{}
+		pipeline := NewMetricSynthesizers(captured, NewGPUMemoryUsedRatioSynthesizer())
+
+		pipeline.Update(&metric.UpdateSet{
+			Timestamp: now,
+			Updates: []metric.Update{
+				*gpuFBUpdate(metric.DCGMFIDEVFBUSED, "GPU-1", "", 12000),
+				*gpuFBUpdate(metric.DCGMFIDEVFBFREE, "GPU-1", "", 4000),
+				*gpuFBUpdate(metric.DCGMFIDEVFBUSED, "GPU-2", "", 2000),
+				*gpuFBUpdate(metric.DCGMFIDEVFBFREE, "GPU-2", "", 8000),
+			},
+		})
+
+		ratios := map[string]float64{}
+		for _, u := range captured.set.Updates {
+			if u.Name == metric.OpencostGPUMemoryUsedRatio {
+				ratios[u.Labels["UUID"]] = u.Value
+			}
+		}
+		if len(ratios) != 2 {
+			t.Fatalf("expected 2 synthesized ratios, got %d: %v", len(ratios), ratios)
+		}
+		if ratios["GPU-1"] != 0.75 || ratios["GPU-2"] != 0.2 {
+			t.Errorf("ratios = %v, want {GPU-1:0.75, GPU-2:0.2}", ratios)
+		}
+		// original updates must pass through untouched alongside synthetics
+		if len(captured.set.Updates) != 6 {
+			t.Errorf("expected 4 originals + 2 synthetics, got %d", len(captured.set.Updates))
+		}
+	})
+
+	t.Run("Clear resets state between scrapes", func(t *testing.T) {
+		s := NewGPUMemoryUsedRatioSynthesizer()
+		s.Process(now, gpuFBUpdate(metric.DCGMFIDEVFBUSED, "GPU-1", "", 12000))
+		s.Clear()
+		s.Process(now, gpuFBUpdate(metric.DCGMFIDEVFBFREE, "GPU-1", "", 4000))
+		if updates := s.Synthesize(); len(updates) != 0 {
+			t.Errorf("expected no join across Clear, got %v", updates)
+		}
+	})
+}

+ 20 - 0
modules/collector-source/pkg/scrape/dcgm.go

@@ -26,6 +26,26 @@ func newDCGMTargetScraper(provider target.TargetProvider) *TargetScraper {
 		[]string{
 			metric.DCGMFIPROFGRENGINEACTIVE,
 			metric.DCGMFIDEVDECUTIL,
+			// GPU saturation signals; fields absent from the dcgm-exporter
+			// configuration simply never appear in the scrape
+			metric.DCGMFIDEVPOWERVIOLATION,
+			metric.DCGMFIDEVTHERMALVIOLATION,
+			metric.DCGMFIDEVSYNCBOOSTVIOLATION,
+			metric.DCGMFIDEVBOARDLIMITVIOLATION,
+			metric.DCGMFIDEVFBUSED,
+			metric.DCGMFIDEVFBFREE,
+			metric.DCGMFIDEVXIDERRORS,
+			metric.DCGMFIDEVPOWERUSAGE,
+			metric.DCGMFIDEVGPUTEMP,
+			metric.DCGMFIDEVCLOCKTHROTTLEREASONS,
+			metric.DCGMFIDEVCLOCKSEVENTREASONS,
+			metric.DCGMFIPROFDRAMACTIVE,
+			metric.DCGMFIPROFSMACTIVE,
+			metric.DCGMFIPROFSMOCCUPANCY,
+			metric.DCGMFIPROFPCIETXBYTES,
+			metric.DCGMFIPROFPCIERXBYTES,
+			metric.DCGMFIPROFNVLINKTXBYTES,
+			metric.DCGMFIPROFNVLINKRXBYTES,
 		},
 		true)
 }

+ 4 - 0
modules/prometheus-source/go.mod

@@ -25,12 +25,15 @@ require (
 	github.com/go-viper/mapstructure/v2 v2.5.0 // indirect
 	github.com/goccy/go-json v0.10.5 // indirect
 	github.com/google/uuid v1.6.0 // indirect
+	github.com/hashicorp/errwrap v1.1.0 // indirect
+	github.com/hashicorp/go-multierror v1.1.1 // indirect
 	github.com/json-iterator/go v1.1.12 // indirect
 	github.com/mattn/go-colorable v0.1.14 // indirect
 	github.com/mattn/go-isatty v0.0.20 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
 	github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
 	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
+	github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
 	github.com/pelletier/go-toml/v2 v2.2.4 // indirect
 	github.com/prometheus/client_model v0.6.2 // indirect
 	github.com/prometheus/common v0.67.5 // indirect
@@ -43,6 +46,7 @@ require (
 	github.com/x448/float16 v0.8.4 // indirect
 	go.yaml.in/yaml/v2 v2.4.3 // indirect
 	go.yaml.in/yaml/v3 v3.0.4 // indirect
+	golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa // indirect
 	golang.org/x/net v0.52.0 // indirect
 	golang.org/x/oauth2 v0.35.0 // indirect
 	golang.org/x/sys v0.42.0 // indirect

+ 9 - 0
modules/prometheus-source/go.sum

@@ -61,6 +61,11 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
+github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
+github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
 github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
 github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
 github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@@ -88,6 +93,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
 github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
 github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
+github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
 github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
 github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -131,6 +138,8 @@ go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0=
 go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8=
 go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
 go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
+golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa h1:Zt3DZoOFFYkKhDT3v7Lm9FDMEV06GpzjG2jrqW+QTE0=
+golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa/go.mod h1:K79w1Vqn7PoiZn+TkNpx3BUWUQksGO3JcVX6qIjytmA=
 golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
 golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
 golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ=

+ 7 - 0
modules/prometheus-source/pkg/prom/config.go

@@ -34,6 +34,9 @@ type OpenCostPrometheusConfig struct {
 	ClusterFilter         string
 	DataResolution        time.Duration
 	DataResolutionMinutes int
+	// GPUMemorySaturationThreshold is the framebuffer occupancy ratio in
+	// (0, 1] above which GPU memory is considered pressured.
+	GPUMemorySaturationThreshold float64
 }
 
 func (ocpc *OpenCostPrometheusConfig) IsRateLimitRetryEnabled() bool {
@@ -151,5 +154,9 @@ func NewOpenCostPrometheusConfigFromEnv() (*OpenCostPrometheusConfig, error) {
 		ClusterFilter:         clusterFilter,
 		DataResolution:        dataResolution,
 		DataResolutionMinutes: resolutionMinutes,
+
+		// shared with collector-source via core env so both data sources
+		// apply the identical threshold
+		GPUMemorySaturationThreshold: coreenv.GetGPUMemorySaturationThreshold(),
 	}, nil
 }

+ 67 - 0
modules/prometheus-source/pkg/prom/gpudevicequerier.go

@@ -0,0 +1,67 @@
+package prom
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/source"
+)
+
+// Device-level GPU metric queries supporting the DeviceInfo and
+// DevicePerformance contracts on DCGMDevice: power draw, temperature,
+// device-level compute utilization, and absolute memory used. Unlike the
+// container-attributed saturation queries these group by device identity
+// only, since the values describe the whole device (or MIG instance)
+// regardless of which containers share it. All source fields are in the
+// default dcgm-exporter configuration.
+
+// gpuDeviceByLabels groups series by device identity (and MIG instance)
+// without container attribution.
+const gpuDeviceByLabels = `device, modelName, UUID, GPU_I_PROFILE, GPU_I_ID`
+
+// queryGPUDeviceGauge issues an agg(agg_over_time(...)) query for a
+// device-level DCGM gauge.
+func (pds *PrometheusMetricsQuerier) queryGPUDeviceGauge(queryName, metric, agg string, start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	const queryFmt = `%s(%s_over_time(%s{%s}[%s])) by (%s, %s)`
+	cfg := pds.promConfig
+
+	durStr := mustDurationString(queryName, start, end)
+	query := fmt.Sprintf(queryFmt, agg, agg, metric, cfg.ClusterFilter, durStr, gpuDeviceByLabels, cfg.ClusterLabel)
+	return pds.queryGPUSaturation(queryName, query, end)
+}
+
+// QueryGPUDevicePowerAvg queries average device power draw in watts
+// (DCGM_FI_DEV_POWER_USAGE).
+func (pds *PrometheusMetricsQuerier) QueryGPUDevicePowerAvg(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	return pds.queryGPUDeviceGauge("QueryGPUDevicePowerAvg", "DCGM_FI_DEV_POWER_USAGE", "avg", start, end)
+}
+
+// QueryGPUDeviceTempAvg queries average device temperature in degrees
+// Celsius (DCGM_FI_DEV_GPU_TEMP).
+func (pds *PrometheusMetricsQuerier) QueryGPUDeviceTempAvg(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	return pds.queryGPUDeviceGauge("QueryGPUDeviceTempAvg", "DCGM_FI_DEV_GPU_TEMP", "avg", start, end)
+}
+
+// QueryGPUDeviceUsageAvg queries average device-level compute utilization
+// as a 0-1 ratio (DCGM_FI_PROF_GR_ENGINE_ACTIVE without container
+// attribution).
+func (pds *PrometheusMetricsQuerier) QueryGPUDeviceUsageAvg(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	return pds.queryGPUDeviceGauge("QueryGPUDeviceUsageAvg", "DCGM_FI_PROF_GR_ENGINE_ACTIVE", "avg", start, end)
+}
+
+// QueryGPUDeviceUsageMax queries peak device-level compute utilization as a
+// 0-1 ratio.
+func (pds *PrometheusMetricsQuerier) QueryGPUDeviceUsageMax(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	return pds.queryGPUDeviceGauge("QueryGPUDeviceUsageMax", "DCGM_FI_PROF_GR_ENGINE_ACTIVE", "max", start, end)
+}
+
+// QueryGPUDeviceMemoryUsedAvg queries average framebuffer used in MiB
+// (DCGM_FI_DEV_FB_USED); hydration converts to bytes.
+func (pds *PrometheusMetricsQuerier) QueryGPUDeviceMemoryUsedAvg(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	return pds.queryGPUDeviceGauge("QueryGPUDeviceMemoryUsedAvg", "DCGM_FI_DEV_FB_USED", "avg", start, end)
+}
+
+// QueryGPUDeviceMemoryUsedMax queries peak framebuffer used in MiB.
+func (pds *PrometheusMetricsQuerier) QueryGPUDeviceMemoryUsedMax(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
+	return pds.queryGPUDeviceGauge("QueryGPUDeviceMemoryUsedMax", "DCGM_FI_DEV_FB_USED", "max", start, end)
+}

+ 282 - 0
modules/prometheus-source/pkg/prom/gpusaturationquerier.go

@@ -0,0 +1,282 @@
+package prom
+
+import (
+	"fmt"
+	"strings"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
+)
+
+// GPU saturation queries
+//
+// These queries derive USE-method saturation signals for GPUs from
+// dcgm-exporter metrics. Each query is an independent primitive; when the
+// underlying DCGM field is not collected (no dcgm-exporter, the field is
+// disabled in its config, or the GPU lacks DCP profiling support) the query
+// simply returns no series, which downstream code treats as "signal absent"
+// rather than zero.
+//
+// Sources by dcgm-exporter configuration:
+//   - default config: DCGM_FI_DEV_*_VIOLATION, DCGM_FI_DEV_FB_USED/FREE,
+//     DCGM_FI_DEV_XID_ERRORS, DCGM_FI_PROF_DRAM_ACTIVE,
+//     DCGM_FI_PROF_PCIE_TX/RX_BYTES
+//   - requires explicit enablement: DCGM_FI_DEV_CLOCK_THROTTLE_REASONS
+//     (DCGM_FI_DEV_CLOCKS_EVENT_REASONS in DCGM 3.3+),
+//     DCGM_FI_PROF_SM_ACTIVE, DCGM_FI_PROF_SM_OCCUPANCY,
+//     DCGM_FI_PROF_NVLINK_TX/RX_BYTES
+//
+// All DCGM_FI_PROF_* fields additionally require DCP profiling (Volta+).
+
+// gpuSaturationByLabels is the grouping shared by every GPU saturation
+// query: container attribution, GPU identity, and MIG instance labels.
+// Grouping by a label that is absent from a series yields an empty value,
+// so non-MIG series pass through unchanged.
+const gpuSaturationByLabels = `container, pod, namespace, device, modelName, UUID, GPU_I_PROFILE, GPU_I_ID, pod_uid`
+
+// gpuThrottleViolationMetrics maps each DCGM violation counter to its
+// canonical reason name. The counters accumulate microseconds spent
+// throttled, so increase(counter[window]) / window-in-microseconds is the
+// fraction of the window spent throttled for that reason.
+var gpuThrottleViolationMetrics = []struct {
+	Metric string
+	Reason string
+}{
+	{Metric: "DCGM_FI_DEV_POWER_VIOLATION", Reason: opencost.GPUThrottleViolationPower},
+	{Metric: "DCGM_FI_DEV_THERMAL_VIOLATION", Reason: opencost.GPUThrottleViolationThermal},
+	{Metric: "DCGM_FI_DEV_SYNC_BOOST_VIOLATION", Reason: opencost.GPUThrottleViolationSyncBoost},
+	{Metric: "DCGM_FI_DEV_BOARD_LIMIT_VIOLATION", Reason: opencost.GPUThrottleViolationBoardLimit},
+}
+
+// buildGPUThrottleViolationQuery returns one query producing a series per
+// (GPU, container, reason): the fraction of the window each violation
+// counter accumulated. Branches for the four violation metrics are joined
+// with "or" and tagged with a constant "reason" label so a single decoder
+// handles all of them.
+func buildGPUThrottleViolationQuery(clusterFilter, durStr, clusterLabel string, windowSeconds float64) string {
+	windowMicros := windowSeconds * 1e6
+	branches := make([]string, 0, len(gpuThrottleViolationMetrics))
+	for _, violation := range gpuThrottleViolationMetrics {
+		branches = append(branches, fmt.Sprintf(
+			`label_replace(avg(increase(%s{container!="",%s}[%s])) by (%s, %s) / %g, "reason", "%s", "", "")`,
+			violation.Metric, clusterFilter, durStr, gpuSaturationByLabels, clusterLabel, windowMicros, violation.Reason,
+		))
+	}
+	return strings.Join(branches, " or ")
+}
+
+// buildGPUThrottleReasonQuery returns one query producing a series per
+// (GPU, container, reason): the fraction of window samples in which the
+// reason bit was set in the clock throttle reasons bitmask. The bit test
+// floor(mask / bit) % 2 is evaluated per sample via a subquery at the
+// configured resolution (PromQL has no bitwise operators, hence the
+// arithmetic). Both the pre-3.3 and post-3.3 DCGM field names are queried
+// via `or`; at most one exists per dcgm-exporter version. The rename is
+// handled here at the query layer because the prometheus source has no
+// ingest step where the metric name could be normalized once — Prometheus
+// stores whatever dcgm-exporter exposed. If a normalization layer ever
+// exists, collapse these to the canonical name there.
+func buildGPUThrottleReasonQuery(clusterFilter, durStr, clusterLabel string, minsPerResolution int) string {
+	branches := make([]string, 0, len(opencost.GPUThrottleReasons))
+	for _, reason := range opencost.GPUThrottleReasons {
+		branches = append(branches, fmt.Sprintf(
+			`label_replace(avg(avg_over_time(((floor((DCGM_FI_DEV_CLOCK_THROTTLE_REASONS{container!="",%s} or DCGM_FI_DEV_CLOCKS_EVENT_REASONS{container!="",%s}) / %d)) %% 2)[%s:%dm])) by (%s, %s), "reason", "%s", "", "")`,
+			clusterFilter, clusterFilter, reason.Bit, durStr, minsPerResolution, gpuSaturationByLabels, clusterLabel, reason.Name,
+		))
+	}
+	return strings.Join(branches, " or ")
+}
+
+// queryGPUSaturation centralizes the shared shape of every GPU saturation
+// query method: log it, then issue it at the window end.
+func (pds *PrometheusMetricsQuerier) queryGPUSaturation(queryName, query string, end time.Time) *source.Future[source.GPUSaturationResult] {
+	log.Debugf(PrometheusMetricsQueryLogFormat, queryName, end.Unix(), query)
+
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return source.NewFuture(source.DecodeGPUSaturationResult, ctx.QueryAtTime(query, end))
+}
+
+// mustDurationString panics like every other querier when the window cannot
+// be expressed as a duration string.
+func mustDurationString(queryName string, start, end time.Time) string {
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
+	}
+	return durStr
+}
+
+// QueryGPUThrottleViolationRatio queries the fraction of the window each GPU
+// spent throttled, per reason, from the DCGM violation counters (default
+// dcgm-exporter configuration).
+func (pds *PrometheusMetricsQuerier) QueryGPUThrottleViolationRatio(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	const queryName = "QueryGPUThrottleViolationRatio"
+	cfg := pds.promConfig
+
+	durStr := mustDurationString(queryName, start, end)
+	query := buildGPUThrottleViolationQuery(cfg.ClusterFilter, durStr, cfg.ClusterLabel, end.Sub(start).Seconds())
+	return pds.queryGPUSaturation(queryName, query, end)
+}
+
+// QueryGPUThrottleReasonRatio queries the fraction of the window each
+// saturation-relevant bit of the clock throttle reasons bitmask was set.
+// Requires DCGM_FI_DEV_CLOCK_THROTTLE_REASONS (or its DCGM 3.3+ rename) to
+// be enabled in the dcgm-exporter configuration.
+func (pds *PrometheusMetricsQuerier) QueryGPUThrottleReasonRatio(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	const queryName = "QueryGPUThrottleReasonRatio"
+	cfg := pds.promConfig
+
+	minsPerResolution := cfg.DataResolutionMinutes
+	durStr := pds.durationStringFor(start, end, minsPerResolution, false)
+	if durStr == "" {
+		panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
+	}
+
+	query := buildGPUThrottleReasonQuery(cfg.ClusterFilter, durStr, cfg.ClusterLabel, minsPerResolution)
+	return pds.queryGPUSaturation(queryName, query, end)
+}
+
+// QueryGPUMemoryUsedRatioAvg queries average framebuffer occupancy over the
+// window: FB_USED / (FB_USED + FB_FREE). Default dcgm-exporter configuration.
+func (pds *PrometheusMetricsQuerier) QueryGPUMemoryUsedRatioAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	const queryName = "QueryGPUMemoryUsedRatioAvg"
+	const queryFmt = `avg(avg_over_time(DCGM_FI_DEV_FB_USED{container!="",%s}[%s])) by (%s, %s) / (avg(avg_over_time(DCGM_FI_DEV_FB_USED{container!="",%s}[%s])) by (%s, %s) + avg(avg_over_time(DCGM_FI_DEV_FB_FREE{container!="",%s}[%s])) by (%s, %s))`
+	cfg := pds.promConfig
+
+	durStr := mustDurationString(queryName, start, end)
+	query := fmt.Sprintf(queryFmt,
+		cfg.ClusterFilter, durStr, gpuSaturationByLabels, cfg.ClusterLabel,
+		cfg.ClusterFilter, durStr, gpuSaturationByLabels, cfg.ClusterLabel,
+		cfg.ClusterFilter, durStr, gpuSaturationByLabels, cfg.ClusterLabel,
+	)
+	return pds.queryGPUSaturation(queryName, query, end)
+}
+
+// QueryGPUMemoryUsedRatioMax queries peak framebuffer occupancy over the
+// window. The denominator uses window averages, which is exact because
+// FB_USED + FB_FREE is the fixed framebuffer capacity.
+func (pds *PrometheusMetricsQuerier) QueryGPUMemoryUsedRatioMax(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	const queryName = "QueryGPUMemoryUsedRatioMax"
+	const queryFmt = `max(max_over_time(DCGM_FI_DEV_FB_USED{container!="",%s}[%s])) by (%s, %s) / (avg(avg_over_time(DCGM_FI_DEV_FB_USED{container!="",%s}[%s])) by (%s, %s) + avg(avg_over_time(DCGM_FI_DEV_FB_FREE{container!="",%s}[%s])) by (%s, %s))`
+	cfg := pds.promConfig
+
+	durStr := mustDurationString(queryName, start, end)
+	query := fmt.Sprintf(queryFmt,
+		cfg.ClusterFilter, durStr, gpuSaturationByLabels, cfg.ClusterLabel,
+		cfg.ClusterFilter, durStr, gpuSaturationByLabels, cfg.ClusterLabel,
+		cfg.ClusterFilter, durStr, gpuSaturationByLabels, cfg.ClusterLabel,
+	)
+	return pds.queryGPUSaturation(queryName, query, end)
+}
+
+// QueryGPUMemoryPressureRatio queries the fraction of window samples in
+// which framebuffer occupancy exceeded the configured threshold.
+func (pds *PrometheusMetricsQuerier) QueryGPUMemoryPressureRatio(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	const queryName = "QueryGPUMemoryPressureRatio"
+	const queryFmt = `avg(avg_over_time(((DCGM_FI_DEV_FB_USED{container!="",%s} / (DCGM_FI_DEV_FB_USED{container!="",%s} + DCGM_FI_DEV_FB_FREE{container!="",%s})) >= bool %g)[%s:%dm])) by (%s, %s)`
+	cfg := pds.promConfig
+
+	minsPerResolution := cfg.DataResolutionMinutes
+	durStr := pds.durationStringFor(start, end, minsPerResolution, false)
+	if durStr == "" {
+		panic(fmt.Sprintf("failed to parse duration string passed to %s", queryName))
+	}
+
+	query := fmt.Sprintf(queryFmt,
+		cfg.ClusterFilter, cfg.ClusterFilter, cfg.ClusterFilter,
+		cfg.GPUMemorySaturationThreshold, durStr, minsPerResolution,
+		gpuSaturationByLabels, cfg.ClusterLabel,
+	)
+	return pds.queryGPUSaturation(queryName, query, end)
+}
+
+// QueryGPUXIDErrorCount queries the number of XID error events observed in
+// the window. DCGM_FI_DEV_XID_ERRORS reports the most recent XID code, so
+// this counts changes of that value; consecutive identical errors are
+// undercounted. Default dcgm-exporter configuration.
+func (pds *PrometheusMetricsQuerier) QueryGPUXIDErrorCount(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	const queryName = "QueryGPUXIDErrorCount"
+	const queryFmt = `sum(changes(DCGM_FI_DEV_XID_ERRORS{container!="",%s}[%s])) by (%s, %s)`
+	cfg := pds.promConfig
+
+	durStr := mustDurationString(queryName, start, end)
+	query := fmt.Sprintf(queryFmt, cfg.ClusterFilter, durStr, gpuSaturationByLabels, cfg.ClusterLabel)
+	return pds.queryGPUSaturation(queryName, query, end)
+}
+
+// queryGPUGaugeOverTime is the shared implementation for the DCP profiling
+// gauges that need only an avg or max over the window.
+func (pds *PrometheusMetricsQuerier) queryGPUGaugeOverTime(queryName, metric, agg string, start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	const queryFmt = `%s(%s_over_time(%s{container!="",%s}[%s])) by (%s, %s)`
+	cfg := pds.promConfig
+
+	durStr := mustDurationString(queryName, start, end)
+	query := fmt.Sprintf(queryFmt, agg, agg, metric, cfg.ClusterFilter, durStr, gpuSaturationByLabels, cfg.ClusterLabel)
+	return pds.queryGPUSaturation(queryName, query, end)
+}
+
+// queryGPUCounterRate is the shared implementation for the DCP byte
+// counters reported as average bytes/sec over the window.
+func (pds *PrometheusMetricsQuerier) queryGPUCounterRate(queryName, metric string, start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	const queryFmt = `avg(rate(%s{container!="",%s}[%s])) by (%s, %s)`
+	cfg := pds.promConfig
+
+	durStr := mustDurationString(queryName, start, end)
+	query := fmt.Sprintf(queryFmt, metric, cfg.ClusterFilter, durStr, gpuSaturationByLabels, cfg.ClusterLabel)
+	return pds.queryGPUSaturation(queryName, query, end)
+}
+
+// QueryGPUDRAMActiveAvg queries the average ratio of cycles the device
+// memory interface was active. Requires DCP profiling.
+func (pds *PrometheusMetricsQuerier) QueryGPUDRAMActiveAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return pds.queryGPUGaugeOverTime("QueryGPUDRAMActiveAvg", "DCGM_FI_PROF_DRAM_ACTIVE", "avg", start, end)
+}
+
+// QueryGPUDRAMActiveMax queries the peak ratio of cycles the device memory
+// interface was active. Requires DCP profiling.
+func (pds *PrometheusMetricsQuerier) QueryGPUDRAMActiveMax(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return pds.queryGPUGaugeOverTime("QueryGPUDRAMActiveMax", "DCGM_FI_PROF_DRAM_ACTIVE", "max", start, end)
+}
+
+// QueryGPUSMActiveAvg queries the average ratio of cycles at least one warp
+// was assigned to any SM. Requires DCP profiling and explicit enablement in
+// the dcgm-exporter configuration.
+func (pds *PrometheusMetricsQuerier) QueryGPUSMActiveAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return pds.queryGPUGaugeOverTime("QueryGPUSMActiveAvg", "DCGM_FI_PROF_SM_ACTIVE", "avg", start, end)
+}
+
+// QueryGPUSMOccupancyAvg queries the average ratio of resident warps to the
+// SM maximum. Requires DCP profiling and explicit enablement in the
+// dcgm-exporter configuration.
+func (pds *PrometheusMetricsQuerier) QueryGPUSMOccupancyAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return pds.queryGPUGaugeOverTime("QueryGPUSMOccupancyAvg", "DCGM_FI_PROF_SM_OCCUPANCY", "avg", start, end)
+}
+
+// QueryGPUPCIeTxBytesAvg queries average PCIe transmit throughput in
+// bytes/sec. Requires DCP profiling.
+func (pds *PrometheusMetricsQuerier) QueryGPUPCIeTxBytesAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return pds.queryGPUCounterRate("QueryGPUPCIeTxBytesAvg", "DCGM_FI_PROF_PCIE_TX_BYTES", start, end)
+}
+
+// QueryGPUPCIeRxBytesAvg queries average PCIe receive throughput in
+// bytes/sec. Requires DCP profiling.
+func (pds *PrometheusMetricsQuerier) QueryGPUPCIeRxBytesAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return pds.queryGPUCounterRate("QueryGPUPCIeRxBytesAvg", "DCGM_FI_PROF_PCIE_RX_BYTES", start, end)
+}
+
+// QueryGPUNVLinkTxBytesAvg queries average NVLink transmit throughput in
+// bytes/sec. Requires DCP profiling and explicit enablement in the
+// dcgm-exporter configuration.
+func (pds *PrometheusMetricsQuerier) QueryGPUNVLinkTxBytesAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return pds.queryGPUCounterRate("QueryGPUNVLinkTxBytesAvg", "DCGM_FI_PROF_NVLINK_TX_BYTES", start, end)
+}
+
+// QueryGPUNVLinkRxBytesAvg queries average NVLink receive throughput in
+// bytes/sec. Requires DCP profiling and explicit enablement in the
+// dcgm-exporter configuration.
+func (pds *PrometheusMetricsQuerier) QueryGPUNVLinkRxBytesAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
+	return pds.queryGPUCounterRate("QueryGPUNVLinkRxBytesAvg", "DCGM_FI_PROF_NVLINK_RX_BYTES", start, end)
+}

+ 234 - 0
modules/prometheus-source/pkg/prom/gpusaturationquerier_test.go

@@ -0,0 +1,234 @@
+package prom
+
+import (
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/rs/zerolog"
+	zerologger "github.com/rs/zerolog/log"
+)
+
+func TestBuildGPUThrottleViolationQuery(t *testing.T) {
+	query := buildGPUThrottleViolationQuery(`cluster_id="c1"`, "1h", "cluster_id", 3600)
+
+	branches := strings.Split(query, " or ")
+	if len(branches) != 4 {
+		t.Fatalf("expected 4 violation branches, got %d: %s", len(branches), query)
+	}
+
+	// one hour is 3.6e9 microseconds: each branch must normalize by it
+	wantBranch := `label_replace(avg(increase(DCGM_FI_DEV_POWER_VIOLATION{container!="",cluster_id="c1"}[1h])) by (container, pod, namespace, device, modelName, UUID, GPU_I_PROFILE, GPU_I_ID, pod_uid, cluster_id) / 3.6e+09, "reason", "power", "", "")`
+	if branches[0] != wantBranch {
+		t.Errorf("violation branch mismatch:\n got %s\nwant %s", branches[0], wantBranch)
+	}
+
+	for metric, reason := range map[string]string{
+		"DCGM_FI_DEV_POWER_VIOLATION":       "power",
+		"DCGM_FI_DEV_THERMAL_VIOLATION":     "thermal",
+		"DCGM_FI_DEV_SYNC_BOOST_VIOLATION":  "sync_boost",
+		"DCGM_FI_DEV_BOARD_LIMIT_VIOLATION": "board_limit",
+	} {
+		if !strings.Contains(query, metric) {
+			t.Errorf("expected query to reference %s", metric)
+		}
+		if !strings.Contains(query, `"reason", "`+reason+`"`) {
+			t.Errorf("expected query to tag reason %q", reason)
+		}
+	}
+}
+
+func TestBuildGPUThrottleReasonQuery(t *testing.T) {
+	query := buildGPUThrottleReasonQuery(`cluster_id="c1"`, "1h", "cluster_id", 5)
+
+	// one label_replace-wrapped branch per saturation-relevant reason bit
+	if got := strings.Count(query, "label_replace"); got != 6 {
+		t.Fatalf("expected 6 reason branches, got %d: %s", got, query)
+	}
+
+	// the first branch tests the sw_power_cap bit (0x4 == 4) per sample at
+	// the subquery resolution, then averages the 0/1 results over the window
+	wantBranch := `label_replace(avg(avg_over_time(((floor((DCGM_FI_DEV_CLOCK_THROTTLE_REASONS{container!="",cluster_id="c1"} or DCGM_FI_DEV_CLOCKS_EVENT_REASONS{container!="",cluster_id="c1"}) / 4)) % 2)[1h:5m])) by (container, pod, namespace, device, modelName, UUID, GPU_I_PROFILE, GPU_I_ID, pod_uid, cluster_id), "reason", "sw_power_cap", "", "")`
+	if !strings.HasPrefix(query, wantBranch+" or ") {
+		t.Errorf("reason query does not start with expected sw_power_cap branch:\n got %s\nwant prefix %s", query, wantBranch)
+	}
+
+	for reason, bit := range map[string]string{
+		"sw_power_cap":   "/ 4",
+		"hw_slowdown":    "/ 8",
+		"sync_boost":     "/ 16",
+		"sw_thermal":     "/ 32",
+		"hw_thermal":     "/ 64",
+		"hw_power_brake": "/ 128",
+	} {
+		if !strings.Contains(query, `"reason", "`+reason+`"`) {
+			t.Errorf("expected query to tag reason %q", reason)
+		}
+		if !strings.Contains(query, bit) {
+			t.Errorf("expected query to test bit via %q for reason %q", bit, reason)
+		}
+	}
+}
+
+// TestGPUSaturationQueries runs every saturation query against the no-op
+// client and asserts the logged query references the expected DCGM source
+// metric, carries the cluster filter, and groups by the saturation label
+// set.
+func TestGPUSaturationQueries(t *testing.T) {
+	initLogging(t, "debug", false)
+
+	logWriter := new(SingleLogWriter)
+	zerologger.Logger = zerologger.Output(zerolog.ConsoleWriter{
+		Out:        logWriter,
+		TimeFormat: "",
+		NoColor:    true,
+		PartsExclude: []string{
+			zerolog.TimestampFieldName,
+			zerolog.LevelFieldName,
+			zerolog.CallerFieldName,
+		},
+	})
+	defer initLogging(t, "debug", false)
+
+	t.Setenv("PROMETHEUS_SERVER_ENDPOINT", "nowhere")
+	t.Setenv("CURRENT_CLUSTER_ID_FILTER_ENABLED", "true")
+	t.Setenv("CLUSTER_ID", "test-cluster")
+	t.Setenv("GPU_MEMORY_SATURATION_THRESHOLD", "0.8")
+
+	config, err := NewOpenCostPrometheusConfigFromEnv()
+	if err != nil {
+		t.Fatalf("Failed to create OpenCost Prometheus config: %v", err)
+	}
+
+	mock := new(NoOpPromClient)
+	contextFactory := NewContextFactory(mock, config)
+	querier := newPrometheusMetricsQuerier(config, mock, contextFactory)
+
+	queryEnd := time.Now().UTC().Truncate(time.Hour).Add(time.Hour)
+	queryStart := queryEnd.Add(-24 * time.Hour)
+
+	tests := map[string]struct {
+		query      func(time.Time, time.Time)
+		wantMetric string
+		wantExtra  string
+	}{
+		"QueryGPUThrottleViolationRatio": {
+			query:      func(s, e time.Time) { querier.QueryGPUThrottleViolationRatio(s, e) },
+			wantMetric: "DCGM_FI_DEV_POWER_VIOLATION",
+		},
+		"QueryGPUThrottleReasonRatio": {
+			query:      func(s, e time.Time) { querier.QueryGPUThrottleReasonRatio(s, e) },
+			wantMetric: "DCGM_FI_DEV_CLOCK_THROTTLE_REASONS",
+			wantExtra:  "DCGM_FI_DEV_CLOCKS_EVENT_REASONS",
+		},
+		"QueryGPUMemoryUsedRatioAvg": {
+			query:      func(s, e time.Time) { querier.QueryGPUMemoryUsedRatioAvg(s, e) },
+			wantMetric: "DCGM_FI_DEV_FB_USED",
+			wantExtra:  "DCGM_FI_DEV_FB_FREE",
+		},
+		"QueryGPUMemoryUsedRatioMax": {
+			query:      func(s, e time.Time) { querier.QueryGPUMemoryUsedRatioMax(s, e) },
+			wantMetric: "DCGM_FI_DEV_FB_USED",
+			wantExtra:  "max_over_time",
+		},
+		"QueryGPUMemoryPressureRatio": {
+			query:      func(s, e time.Time) { querier.QueryGPUMemoryPressureRatio(s, e) },
+			wantMetric: "DCGM_FI_DEV_FB_USED",
+			wantExtra:  ">= bool 0.8",
+		},
+		"QueryGPUXIDErrorCount": {
+			query:      func(s, e time.Time) { querier.QueryGPUXIDErrorCount(s, e) },
+			wantMetric: "DCGM_FI_DEV_XID_ERRORS",
+			wantExtra:  "changes(",
+		},
+		"QueryGPUDRAMActiveAvg": {
+			query:      func(s, e time.Time) { querier.QueryGPUDRAMActiveAvg(s, e) },
+			wantMetric: "DCGM_FI_PROF_DRAM_ACTIVE",
+			wantExtra:  "avg_over_time",
+		},
+		"QueryGPUDRAMActiveMax": {
+			query:      func(s, e time.Time) { querier.QueryGPUDRAMActiveMax(s, e) },
+			wantMetric: "DCGM_FI_PROF_DRAM_ACTIVE",
+			wantExtra:  "max_over_time",
+		},
+		"QueryGPUSMActiveAvg": {
+			query:      func(s, e time.Time) { querier.QueryGPUSMActiveAvg(s, e) },
+			wantMetric: "DCGM_FI_PROF_SM_ACTIVE",
+		},
+		"QueryGPUSMOccupancyAvg": {
+			query:      func(s, e time.Time) { querier.QueryGPUSMOccupancyAvg(s, e) },
+			wantMetric: "DCGM_FI_PROF_SM_OCCUPANCY",
+		},
+		"QueryGPUPCIeTxBytesAvg": {
+			query:      func(s, e time.Time) { querier.QueryGPUPCIeTxBytesAvg(s, e) },
+			wantMetric: "DCGM_FI_PROF_PCIE_TX_BYTES",
+			wantExtra:  "rate(",
+		},
+		"QueryGPUPCIeRxBytesAvg": {
+			query:      func(s, e time.Time) { querier.QueryGPUPCIeRxBytesAvg(s, e) },
+			wantMetric: "DCGM_FI_PROF_PCIE_RX_BYTES",
+		},
+		"QueryGPUNVLinkTxBytesAvg": {
+			query:      func(s, e time.Time) { querier.QueryGPUNVLinkTxBytesAvg(s, e) },
+			wantMetric: "DCGM_FI_PROF_NVLINK_TX_BYTES",
+		},
+		"QueryGPUNVLinkRxBytesAvg": {
+			query:      func(s, e time.Time) { querier.QueryGPUNVLinkRxBytesAvg(s, e) },
+			wantMetric: "DCGM_FI_PROF_NVLINK_RX_BYTES",
+		},
+	}
+
+	deviceTests := map[string]struct {
+		query      func(time.Time, time.Time)
+		wantMetric string
+	}{
+		"QueryGPUDevicePowerAvg":      {func(s, e time.Time) { querier.QueryGPUDevicePowerAvg(s, e) }, "DCGM_FI_DEV_POWER_USAGE"},
+		"QueryGPUDeviceTempAvg":       {func(s, e time.Time) { querier.QueryGPUDeviceTempAvg(s, e) }, "DCGM_FI_DEV_GPU_TEMP"},
+		"QueryGPUDeviceUsageAvg":      {func(s, e time.Time) { querier.QueryGPUDeviceUsageAvg(s, e) }, "DCGM_FI_PROF_GR_ENGINE_ACTIVE"},
+		"QueryGPUDeviceUsageMax":      {func(s, e time.Time) { querier.QueryGPUDeviceUsageMax(s, e) }, "DCGM_FI_PROF_GR_ENGINE_ACTIVE"},
+		"QueryGPUDeviceMemoryUsedAvg": {func(s, e time.Time) { querier.QueryGPUDeviceMemoryUsedAvg(s, e) }, "DCGM_FI_DEV_FB_USED"},
+		"QueryGPUDeviceMemoryUsedMax": {func(s, e time.Time) { querier.QueryGPUDeviceMemoryUsedMax(s, e) }, "DCGM_FI_DEV_FB_USED"},
+	}
+	const wantDeviceFilter = `cluster_id="test-cluster"`
+	for testName, tc := range deviceTests {
+		t.Run(testName, func(t *testing.T) {
+			tc.query(queryStart, queryEnd)
+			logged := logWriter.Log
+			if !strings.Contains(logged, tc.wantMetric) {
+				t.Errorf("expected query to reference %q, got: %s", tc.wantMetric, logged)
+			}
+			if !strings.Contains(logged, wantDeviceFilter) {
+				t.Errorf("expected query to contain cluster filter %q, got: %s", wantDeviceFilter, logged)
+			}
+			// device-level grouping: no container attribution
+			if !strings.Contains(logged, gpuDeviceByLabels) || strings.Contains(logged, "container,") {
+				t.Errorf("expected device-level grouping %q without container, got: %s", gpuDeviceByLabels, logged)
+			}
+		})
+	}
+
+	const wantFilter = `cluster_id="test-cluster"`
+
+	for testName, tc := range tests {
+		t.Run(testName, func(t *testing.T) {
+			tc.query(queryStart, queryEnd)
+			logged := logWriter.Log
+
+			if !strings.Contains(logged, testName) {
+				t.Errorf("expected log to contain query name %q, got: %s", testName, logged)
+			}
+			if !strings.Contains(logged, tc.wantMetric) {
+				t.Errorf("expected query to reference %q, got: %s", tc.wantMetric, logged)
+			}
+			if tc.wantExtra != "" && !strings.Contains(logged, tc.wantExtra) {
+				t.Errorf("expected query to contain %q, got: %s", tc.wantExtra, logged)
+			}
+			if !strings.Contains(logged, wantFilter) {
+				t.Errorf("expected query to contain cluster filter %q, got: %s", wantFilter, logged)
+			}
+			if !strings.Contains(logged, gpuSaturationByLabels) {
+				t.Errorf("expected query to group by %q, got: %s", gpuSaturationByLabels, logged)
+			}
+		})
+	}
+}