inference_queries.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. package prom
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/source"
  8. "github.com/opencost/opencost/core/pkg/util"
  9. )
  10. // QueryInferencePromptTokens implements MetricsQuerier.QueryInferencePromptTokens
  11. func (pds *PrometheusMetricsQuerier) QueryInferencePromptTokens(start, end time.Time) *source.Future[source.InferenceTokensResult] {
  12. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  13. // Create a channel for the async result
  14. resultsChan := make(source.QueryResultsChan, 1)
  15. // Execute query asynchronously
  16. go func() {
  17. values, err := queryCounterDelta(ctx, "vllm:prompt_tokens_total", start, end)
  18. if err != nil {
  19. resultsChan <- &source.QueryResults{Error: err}
  20. return
  21. }
  22. // Convert map to QueryResults format
  23. results := mapToQueryResults(values)
  24. resultsChan <- &source.QueryResults{Results: results}
  25. }()
  26. return source.NewFuture(decodeInferenceTokensResult, resultsChan)
  27. }
  28. // QueryInferenceGenerationTokens implements MetricsQuerier.QueryInferenceGenerationTokens
  29. func (pds *PrometheusMetricsQuerier) QueryInferenceGenerationTokens(start, end time.Time) *source.Future[source.InferenceTokensResult] {
  30. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  31. resultsChan := make(source.QueryResultsChan, 1)
  32. go func() {
  33. values, err := queryCounterDelta(ctx, "vllm:generation_tokens_total", start, end)
  34. if err != nil {
  35. resultsChan <- &source.QueryResults{Error: err}
  36. return
  37. }
  38. results := mapToQueryResults(values)
  39. resultsChan <- &source.QueryResults{Results: results}
  40. }()
  41. return source.NewFuture(decodeInferenceTokensResult, resultsChan)
  42. }
  43. // QueryInferenceInputProcessingTime implements MetricsQuerier.QueryInferenceInputProcessingTime
  44. func (pds *PrometheusMetricsQuerier) QueryInferenceInputProcessingTime(start, end time.Time) *source.Future[source.InferenceProcessingTimeResult] {
  45. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  46. resultsChan := make(source.QueryResultsChan, 1)
  47. go func() {
  48. values, err := queryCounterDelta(ctx, "vllm:request_prefill_time_seconds_sum", start, end)
  49. if err != nil {
  50. resultsChan <- &source.QueryResults{Error: err}
  51. return
  52. }
  53. results := mapToQueryResults(values)
  54. resultsChan <- &source.QueryResults{Results: results}
  55. }()
  56. return source.NewFuture(decodeInferenceProcessingTimeResult, resultsChan)
  57. }
  58. // QueryInferenceOutputProcessingTime implements MetricsQuerier.QueryInferenceOutputProcessingTime
  59. func (pds *PrometheusMetricsQuerier) QueryInferenceOutputProcessingTime(start, end time.Time) *source.Future[source.InferenceProcessingTimeResult] {
  60. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  61. resultsChan := make(source.QueryResultsChan, 1)
  62. go func() {
  63. values, err := queryCounterDelta(ctx, "vllm:request_time_per_output_token_seconds_sum", start, end)
  64. if err != nil {
  65. resultsChan <- &source.QueryResults{Error: err}
  66. return
  67. }
  68. results := mapToQueryResults(values)
  69. resultsChan <- &source.QueryResults{Results: results}
  70. }()
  71. return source.NewFuture(decodeInferenceProcessingTimeResult, resultsChan)
  72. }
  73. // QueryInferenceCachedTokens implements MetricsQuerier.QueryInferenceCachedTokens
  74. func (pds *PrometheusMetricsQuerier) QueryInferenceCachedTokens(start, end time.Time) *source.Future[source.InferenceTokensResult] {
  75. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  76. resultsChan := make(source.QueryResultsChan, 1)
  77. go func() {
  78. values, err := queryCounterDelta(ctx, "vllm:prefix_cache_hits_total", start, end)
  79. if err != nil {
  80. resultsChan <- &source.QueryResults{Error: err}
  81. return
  82. }
  83. results := mapToQueryResults(values)
  84. resultsChan <- &source.QueryResults{Results: results}
  85. }()
  86. return source.NewFuture(decodeInferenceTokensResult, resultsChan)
  87. }
  88. // QueryInferenceCacheConfig implements MetricsQuerier.QueryInferenceCacheConfig
  89. func (pds *PrometheusMetricsQuerier) QueryInferenceCacheConfig(t time.Time) *source.Future[source.InferenceCacheConfigResult] {
  90. ctx := pds.promContexts.NewNamedContext(ClusterContextName)
  91. resultsChan := make(source.QueryResultsChan, 1)
  92. go func() {
  93. configs, err := queryCacheConfigs(ctx, t)
  94. if err != nil {
  95. resultsChan <- &source.QueryResults{Error: err}
  96. return
  97. }
  98. results := cacheConfigMapToQueryResults(configs)
  99. resultsChan <- &source.QueryResults{Results: results}
  100. }()
  101. return source.NewFuture(decodeInferenceCacheConfigResult, resultsChan)
  102. }
  103. // Decoder functions
  104. func decodeInferenceTokensResult(result *source.QueryResult) *source.InferenceTokensResult {
  105. key, _ := result.GetString("key")
  106. value := result.Values[0].Value
  107. return &source.InferenceTokensResult{
  108. Values: map[string]float64{key: value},
  109. }
  110. }
  111. func decodeInferenceProcessingTimeResult(result *source.QueryResult) *source.InferenceProcessingTimeResult {
  112. key, _ := result.GetString("key")
  113. value := result.Values[0].Value
  114. return &source.InferenceProcessingTimeResult{
  115. Values: map[string]float64{key: value},
  116. }
  117. }
  118. func decodeInferenceCacheConfigResult(result *source.QueryResult) *source.InferenceCacheConfigResult {
  119. key, _ := result.GetString("key")
  120. enabled := result.Values[0].Value > 0
  121. return &source.InferenceCacheConfigResult{
  122. Configs: map[string]*source.InferenceCacheConfig{
  123. key: {PrefixCachingEnabled: enabled},
  124. },
  125. }
  126. }
  127. // Helper functions
  128. // mapToQueryResults converts a map[string]float64 to []*QueryResult
  129. func mapToQueryResults(values map[string]float64) []*source.QueryResult {
  130. results := make([]*source.QueryResult, 0, len(values))
  131. for key, value := range values {
  132. result := source.NewQueryResult(
  133. map[string]any{"key": key},
  134. []*util.Vector{{Value: value}},
  135. nil,
  136. )
  137. results = append(results, result)
  138. }
  139. return results
  140. }
  141. // cacheConfigMapToQueryResults converts a map[string]*InferenceCacheConfig to []*QueryResult
  142. func cacheConfigMapToQueryResults(configs map[string]*source.InferenceCacheConfig) []*source.QueryResult {
  143. results := make([]*source.QueryResult, 0, len(configs))
  144. for key, config := range configs {
  145. value := 0.0
  146. if config.PrefixCachingEnabled {
  147. value = 1.0
  148. }
  149. result := source.NewQueryResult(
  150. map[string]any{"key": key},
  151. []*util.Vector{{Value: value}},
  152. nil,
  153. )
  154. results = append(results, result)
  155. }
  156. return results
  157. }
  158. // queryCounterDelta returns the net increase of a monotonic counter metric
  159. // over [start, end] per (model_name, namespace).
  160. //
  161. // It uses the @ modifier to pin two instant queries to start and end,
  162. // then subtracts. This avoids the extrapolation inflation produced by
  163. // increase(metric[Xm]) when a series has fewer samples than the window
  164. // (e.g. a pod that restarted mid-window, or a sum across many replicas
  165. // where Prometheus extrapolates each series independently before summing).
  166. //
  167. // last_over_time(metric[2m] @ t) fetches the most recent sample within 2
  168. // minutes of t. 2 minutes covers the default 30s scrape interval with margin.
  169. // Series with no sample near start get a start-value of 0 (treated as new),
  170. // which is the correct behaviour for pods that started mid-window.
  171. // Negative deltas (counter resets) are treated as resets and the delta is set to the end value (post-reset activity).
  172. func queryCounterDelta(ctx *Context, metric string, start, end time.Time) (map[string]float64, error) {
  173. startUnix := start.Unix()
  174. // Clamp end to now: last_over_time with a future @ timestamp returns no results.
  175. effectiveEnd := end
  176. if now := time.Now(); end.After(now) {
  177. effectiveEnd = now
  178. }
  179. endUnix := effectiveEnd.Unix()
  180. // The lookback for last_over_time must span the full window duration.
  181. // A model that was active earlier in the window but idle at query time
  182. // will have its last sample somewhere within the window — a narrow 2m
  183. // lookback would miss it entirely. Using the window duration as the
  184. // lookback guarantees we find the last sample that existed anywhere in
  185. // the window, while the @ pin ensures we don't extrapolate past end.
  186. windowDuration := effectiveEnd.Sub(start)
  187. windowMinutes := int(windowDuration.Minutes())
  188. if windowMinutes < 2 {
  189. windowMinutes = 2
  190. }
  191. // Query counter value at the end of the window.
  192. endQuery := fmt.Sprintf(`sum by (model_name, namespace) (last_over_time(%s[%dm] @ %d))`, metric, windowMinutes, endUnix)
  193. endVals, err := queryInstantMetric(ctx, endQuery, effectiveEnd)
  194. if err != nil {
  195. return nil, fmt.Errorf("end-of-window query for %s: %w", metric, err)
  196. }
  197. // Query counter value at the start of the window.
  198. // Use a narrow 2m lookback here: we want the value just before the window
  199. // opens, not a stale value from much earlier that would undercount the delta.
  200. startQuery := fmt.Sprintf(`sum by (model_name, namespace) (last_over_time(%s[2m] @ %d))`, metric, startUnix)
  201. startVals, err := queryInstantMetric(ctx, startQuery, effectiveEnd)
  202. if err != nil {
  203. return nil, fmt.Errorf("start-of-window query for %s: %w", metric, err)
  204. }
  205. // Delta = end - start. If negative (counter reset), use endVal as a
  206. // lower bound to capture post-reset activity rather than reporting 0.
  207. out := make(map[string]float64, len(endVals))
  208. for key, endVal := range endVals {
  209. delta := endVal - startVals[key]
  210. if delta < 0 {
  211. // Counter reset detected: use endVal to capture post-reset activity
  212. delta = endVal
  213. }
  214. out[key] = delta
  215. }
  216. return out, nil
  217. }
  218. // queryCacheConfigs queries vllm:cache_config_info joined with token metrics
  219. // to get enable_prefix_caching per (model_name, namespace).
  220. // When the join produces no results for a model that has token data, a warning
  221. // is emitted to aid diagnosis of pod-label mismatches.
  222. func queryCacheConfigs(ctx *Context, t time.Time) (map[string]*source.InferenceCacheConfig, error) {
  223. // Join cache_config_info (has enable_prefix_caching label) with
  224. // prompt_tokens_total (has model_name) using namespace+pod as the join key.
  225. query := `
  226. max by (model_name, namespace, enable_prefix_caching) (
  227. sum by (model_name, namespace, pod) (vllm:prompt_tokens_total)
  228. * on (namespace, pod) group_left(enable_prefix_caching)
  229. max by (namespace, pod, enable_prefix_caching) (vllm:cache_config_info)
  230. )
  231. `
  232. raw, _, err := ctx.query(query, t)
  233. if err != nil {
  234. return nil, err
  235. }
  236. results := NewQueryResults(query, raw, source.ClusterKeyWithDefaults(ctx.config.ClusterLabel))
  237. if results.Error != nil {
  238. return nil, results.Error
  239. }
  240. out := make(map[string]*source.InferenceCacheConfig)
  241. for _, result := range results.Results {
  242. modelName, err := result.GetString("model_name")
  243. if err != nil || modelName == "" {
  244. continue
  245. }
  246. namespace, err := result.GetString("namespace")
  247. if err != nil || namespace == "" {
  248. namespace = "unknown"
  249. }
  250. enablePrefixCaching, err := result.GetString("enable_prefix_caching")
  251. if err != nil {
  252. continue
  253. }
  254. prefixCachingEnabled := strings.EqualFold(enablePrefixCaching, "true")
  255. key := modelNamespaceKey(modelName, namespace)
  256. out[key] = &source.InferenceCacheConfig{PrefixCachingEnabled: prefixCachingEnabled}
  257. }
  258. // Check for models that have token data but no cache config — likely a join
  259. // failure due to pod-label mismatch between cache_config_info and prompt_tokens_total.
  260. // Only run the diagnostic query when the join produced nothing; skip it on the happy path.
  261. if len(out) == 0 {
  262. rawQuery := `max by (namespace) (vllm:cache_config_info)`
  263. rawResult, _, rawErr := ctx.query(rawQuery, t)
  264. if rawErr == nil {
  265. diagResults := NewQueryResults(rawQuery, rawResult, source.ClusterKeyWithDefaults(ctx.config.ClusterLabel))
  266. if diagResults.Error == nil && len(diagResults.Results) > 0 {
  267. log.Warnf("InferenceCost: vllm:cache_config_info exists in Prometheus but the join with "+
  268. "vllm:prompt_tokens_total produced no results — likely a pod-label mismatch between "+
  269. "the two metrics (check that both carry matching 'namespace' and 'pod' labels). "+
  270. "prefix_caching_off detection will be disabled; allocation method will be 'compute_time'.")
  271. }
  272. }
  273. }
  274. return out, nil
  275. }
  276. // queryInstantMetric runs a Prometheus instant query evaluated at t and returns a
  277. // map[model_name:namespace]value.
  278. func queryInstantMetric(ctx *Context, query string, t time.Time) (map[string]float64, error) {
  279. raw, _, err := ctx.query(query, t)
  280. if err != nil {
  281. return nil, err
  282. }
  283. results := NewQueryResults(query, raw, source.ClusterKeyWithDefaults(ctx.config.ClusterLabel))
  284. if results.Error != nil {
  285. return nil, results.Error
  286. }
  287. out := make(map[string]float64, len(results.Results))
  288. for _, result := range results.Results {
  289. modelName, err := result.GetString("model_name")
  290. if err != nil || modelName == "" {
  291. continue
  292. }
  293. namespace, err := result.GetString("namespace")
  294. if err != nil || namespace == "" {
  295. namespace = "unknown"
  296. }
  297. if len(result.Values) == 0 {
  298. continue
  299. }
  300. value := result.Values[0].Value
  301. out[modelNamespaceKey(modelName, namespace)] = value
  302. }
  303. return out, nil
  304. }
  305. func modelNamespaceKey(modelName, namespace string) string {
  306. return modelName + ":" + namespace
  307. }
  308. // Made with Bob