gpusaturationquerier.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package collector
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/opencost"
  6. "github.com/opencost/opencost/core/pkg/source"
  7. "github.com/opencost/opencost/modules/collector-source/pkg/metric"
  8. "github.com/opencost/opencost/modules/collector-source/pkg/metric/aggregator"
  9. )
  10. // GPU saturation queries over the collector store. These mirror the
  11. // prometheus-source queries: same DataSource method names, same
  12. // GPUSaturationResult semantics. Signals whose DCGM field was never scraped
  13. // produce no results, which downstream treats as absent rather than zero.
  14. // gpuSaturationResultsFuture wraps a set of MetricResults into the shared
  15. // GPUSaturationResult future shape.
  16. func gpuSaturationResultsFuture(name string, results []*aggregator.MetricResult, err error) *source.Future[source.GPUSaturationResult] {
  17. queryResults := source.NewQueryResults(name)
  18. queryResults.Error = err
  19. for _, result := range results {
  20. queryResults.Results = append(queryResults.Results, result.ToQueryResult())
  21. }
  22. ch := make(source.QueryResultsChan, 1)
  23. ch <- queryResults
  24. return source.NewFuture(source.DecodeGPUSaturationResult, ch)
  25. }
  26. // queryGPUReasonTagged queries one collector per reason, tags each result
  27. // with the reason label, and optionally transforms every value.
  28. func (c *collectorMetricsQuerier) queryGPUReasonTagged(name string, start, end time.Time, idReasons map[metric.MetricCollectorID]string, transform func(float64) float64) *source.Future[source.GPUSaturationResult] {
  29. var tagged []*aggregator.MetricResult
  30. var firstErr error
  31. collector := c.collectorProvider.GetStore(start, end)
  32. if collector != nil {
  33. for id, reason := range idReasons {
  34. results, err := collector.Query(id)
  35. if err != nil {
  36. if firstErr == nil {
  37. firstErr = err
  38. }
  39. continue
  40. }
  41. for _, result := range results {
  42. if result.MetricLabels == nil {
  43. result.MetricLabels = map[string]string{}
  44. }
  45. result.MetricLabels[source.ReasonLabel] = reason
  46. if transform != nil {
  47. for i := range result.Values {
  48. result.Values[i].Value = transform(result.Values[i].Value)
  49. }
  50. }
  51. tagged = append(tagged, result)
  52. }
  53. }
  54. }
  55. return gpuSaturationResultsFuture(name, tagged, firstErr)
  56. }
  57. // QueryGPUThrottleViolationRatio reports the fraction of the window each GPU
  58. // spent throttled, per reason, from the DCGM violation microsecond counters.
  59. func (c *collectorMetricsQuerier) QueryGPUThrottleViolationRatio(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  60. idReasons := make(map[metric.MetricCollectorID]string, len(gpuThrottleViolationCollectors))
  61. for _, violation := range gpuThrottleViolationCollectors {
  62. idReasons[violation.ID] = violation.Reason
  63. }
  64. windowMicros := float64(end.Sub(start).Microseconds())
  65. if windowMicros <= 0 {
  66. return gpuSaturationResultsFuture("GPUThrottleViolationRatio", nil, fmt.Errorf("invalid window for GPUThrottleViolationRatio: %s to %s", start, end))
  67. }
  68. return c.queryGPUReasonTagged("GPUThrottleViolationRatio", start, end, idReasons, func(increaseMicros float64) float64 {
  69. return increaseMicros / windowMicros
  70. })
  71. }
  72. // QueryGPUThrottleReasonRatio reports the fraction of scraped samples in
  73. // which each saturation-relevant bit of the clock throttle reasons bitmask
  74. // was set. Both DCGM field names are queried; at most one is ever scraped.
  75. func (c *collectorMetricsQuerier) QueryGPUThrottleReasonRatio(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  76. idReasons := make(map[metric.MetricCollectorID]string, 2*len(opencost.GPUThrottleReasons))
  77. for _, metricName := range gpuThrottleBitmaskMetrics {
  78. for _, reason := range opencost.GPUThrottleReasons {
  79. idReasons[metric.GPUThrottleReasonCollectorID(metricName, reason.Name)] = reason.Name
  80. }
  81. }
  82. return c.queryGPUReasonTagged("GPUThrottleReasonRatio", start, end, idReasons, nil)
  83. }
  84. // QueryGPUMemoryUsedRatioAvg reports average framebuffer occupancy over the
  85. // window: FB_USED / (FB_USED + FB_FREE), aggregated from the per-sample
  86. // occupancy ratio synthesized at scrape time.
  87. func (c *collectorMetricsQuerier) QueryGPUMemoryUsedRatioAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  88. return queryCollector(c, start, end, metric.GPUMemoryUsedAvgID, source.DecodeGPUSaturationResult)
  89. }
  90. // QueryGPUMemoryUsedRatioMax reports peak framebuffer occupancy over the
  91. // window.
  92. func (c *collectorMetricsQuerier) QueryGPUMemoryUsedRatioMax(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  93. return queryCollector(c, start, end, metric.GPUMemoryUsedMaxID, source.DecodeGPUSaturationResult)
  94. }
  95. // QueryGPUMemoryPressureRatio reports the fraction of scraped samples in
  96. // which framebuffer occupancy was at or above the configured threshold,
  97. // from the same synthesized per-sample occupancy ratio.
  98. func (c *collectorMetricsQuerier) QueryGPUMemoryPressureRatio(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  99. return queryCollector(c, start, end, metric.GPUMemoryPressureRatioID, source.DecodeGPUSaturationResult)
  100. }
  101. // QueryGPUXIDErrorCount reports the number of XID error transitions
  102. // observed in the window.
  103. func (c *collectorMetricsQuerier) QueryGPUXIDErrorCount(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  104. return queryCollector(c, start, end, metric.GPUXIDErrorCountID, source.DecodeGPUSaturationResult)
  105. }
  106. // QueryGPUDRAMActiveAvg reports the average ratio of cycles the device
  107. // memory interface was active. Requires DCP profiling.
  108. func (c *collectorMetricsQuerier) QueryGPUDRAMActiveAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  109. return queryCollector(c, start, end, metric.GPUDRAMActiveAvgID, source.DecodeGPUSaturationResult)
  110. }
  111. // QueryGPUDRAMActiveMax reports the peak ratio of cycles the device memory
  112. // interface was active. Requires DCP profiling.
  113. func (c *collectorMetricsQuerier) QueryGPUDRAMActiveMax(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  114. return queryCollector(c, start, end, metric.GPUDRAMActiveMaxID, source.DecodeGPUSaturationResult)
  115. }
  116. // QueryGPUSMActiveAvg reports the average ratio of cycles at least one warp
  117. // was resident on any SM. Requires DCP profiling and explicit enablement.
  118. func (c *collectorMetricsQuerier) QueryGPUSMActiveAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  119. return queryCollector(c, start, end, metric.GPUSMActiveAvgID, source.DecodeGPUSaturationResult)
  120. }
  121. // QueryGPUSMOccupancyAvg reports the average ratio of resident warps to the
  122. // SM maximum. Requires DCP profiling and explicit enablement.
  123. func (c *collectorMetricsQuerier) QueryGPUSMOccupancyAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  124. return queryCollector(c, start, end, metric.GPUSMOccupancyAvgID, source.DecodeGPUSaturationResult)
  125. }
  126. // QueryGPUPCIeTxBytesAvg reports average PCIe transmit throughput in
  127. // bytes/sec. Requires DCP profiling.
  128. func (c *collectorMetricsQuerier) QueryGPUPCIeTxBytesAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  129. return queryCollector(c, start, end, metric.GPUPCIeTxBytesAvgID, source.DecodeGPUSaturationResult)
  130. }
  131. // QueryGPUPCIeRxBytesAvg reports average PCIe receive throughput in
  132. // bytes/sec. Requires DCP profiling.
  133. func (c *collectorMetricsQuerier) QueryGPUPCIeRxBytesAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  134. return queryCollector(c, start, end, metric.GPUPCIeRxBytesAvgID, source.DecodeGPUSaturationResult)
  135. }
  136. // QueryGPUNVLinkTxBytesAvg reports average NVLink transmit throughput in
  137. // bytes/sec. Requires DCP profiling and explicit enablement.
  138. func (c *collectorMetricsQuerier) QueryGPUNVLinkTxBytesAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  139. return queryCollector(c, start, end, metric.GPUNVLinkTxBytesAvgID, source.DecodeGPUSaturationResult)
  140. }
  141. // QueryGPUNVLinkRxBytesAvg reports average NVLink receive throughput in
  142. // bytes/sec. Requires DCP profiling and explicit enablement.
  143. func (c *collectorMetricsQuerier) QueryGPUNVLinkRxBytesAvg(start, end time.Time) *source.Future[source.GPUSaturationResult] {
  144. return queryCollector(c, start, end, metric.GPUNVLinkRxBytesAvgID, source.DecodeGPUSaturationResult)
  145. }
  146. // Device-level GPU metric queries (DeviceInfo / DevicePerformance support).
  147. // QueryGPUDevicePowerAvg reports average device power draw in watts.
  148. func (c *collectorMetricsQuerier) QueryGPUDevicePowerAvg(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
  149. return queryCollector(c, start, end, metric.GPUDevicePowerAvgID, source.DecodeGPUDeviceMetricResult)
  150. }
  151. // QueryGPUDeviceTempAvg reports average device temperature in Celsius.
  152. func (c *collectorMetricsQuerier) QueryGPUDeviceTempAvg(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
  153. return queryCollector(c, start, end, metric.GPUDeviceTempAvgID, source.DecodeGPUDeviceMetricResult)
  154. }
  155. // QueryGPUDeviceUsageAvg reports average device-level compute utilization
  156. // as a 0-1 ratio.
  157. func (c *collectorMetricsQuerier) QueryGPUDeviceUsageAvg(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
  158. return queryCollector(c, start, end, metric.GPUDeviceUsageAvgID, source.DecodeGPUDeviceMetricResult)
  159. }
  160. // QueryGPUDeviceUsageMax reports peak device-level compute utilization as a
  161. // 0-1 ratio.
  162. func (c *collectorMetricsQuerier) QueryGPUDeviceUsageMax(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
  163. return queryCollector(c, start, end, metric.GPUDeviceUsageMaxID, source.DecodeGPUDeviceMetricResult)
  164. }
  165. // QueryGPUDeviceMemoryUsedAvg reports average framebuffer used in MiB.
  166. func (c *collectorMetricsQuerier) QueryGPUDeviceMemoryUsedAvg(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
  167. return queryCollector(c, start, end, metric.GPUDeviceMemoryUsedAvgID, source.DecodeGPUDeviceMetricResult)
  168. }
  169. // QueryGPUDeviceMemoryUsedMax reports peak framebuffer used in MiB.
  170. func (c *collectorMetricsQuerier) QueryGPUDeviceMemoryUsedMax(start, end time.Time) *source.Future[source.GPUDeviceMetricResult] {
  171. return queryCollector(c, start, end, metric.GPUDeviceMemoryUsedMaxID, source.DecodeGPUDeviceMetricResult)
  172. }