allocation_incubating.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. //go:build incubating
  2. package costmodel
  3. import (
  4. "fmt"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/opencost"
  8. "github.com/opencost/opencost/pkg/env"
  9. "github.com/opencost/opencost/pkg/prom"
  10. )
  11. const (
  12. queryFmtNodeCPUCores = `avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`
  13. queryFmtNodeRAMBytes = `avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`
  14. queryFmtNodeGPUCount = `avg(avg_over_time(node_gpu_count[%s])) by (%s, node, provider_id)`
  15. )
  16. // NodeTotals contains the cpu, ram, and gpu costs for a given node over a specific timeframe.
  17. type NodeTotals struct {
  18. Start time.Time
  19. End time.Time
  20. Cluster string
  21. Node string
  22. CPUCost float64
  23. RAMCost float64
  24. GPUCost float64
  25. }
  26. // ComputeAllocationWithNodeTotals uses the CostModel instance to compute an AllocationSet
  27. // for the window defined by the given start and end times. The Allocations returned are unaggregated
  28. // (i.e. down to the container level), and the node totals should contained additional data that can be
  29. // used to calculate the idle costs at the node level.
  30. func (cm *CostModel) ComputeAllocationWithNodeTotals(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, map[string]*NodeTotals, error) {
  31. nodeMap := make(map[string]*NodeTotals)
  32. // If the duration is short enough, compute the AllocationSet directly
  33. if end.Sub(start) <= cm.MaxPrometheusQueryDuration {
  34. as, nodeData, err := cm.computeAllocation(start, end, resolution)
  35. appendNodeData(nodeMap, start, end, nodeData)
  36. return as, nodeMap, err
  37. }
  38. // If the duration exceeds the configured MaxPrometheusQueryDuration, then
  39. // query for maximum-sized AllocationSets, collect them, and accumulate.
  40. // s and e track the coverage of the entire given window over multiple
  41. // internal queries.
  42. s, e := start, start
  43. // Collect AllocationSets in a range, then accumulate
  44. // TODO optimize by collecting consecutive AllocationSets, accumulating as we go
  45. asr := opencost.NewAllocationSetRange()
  46. for e.Before(end) {
  47. // By default, query for the full remaining duration. But do not let
  48. // any individual query duration exceed the configured max Prometheus
  49. // query duration.
  50. duration := end.Sub(e)
  51. if duration > cm.MaxPrometheusQueryDuration {
  52. duration = cm.MaxPrometheusQueryDuration
  53. }
  54. // Set start and end parameters (s, e) for next individual computation.
  55. e = s.Add(duration)
  56. // Compute the individual AllocationSet for just (s, e)
  57. as, nodeData, err := cm.computeAllocation(s, e, resolution)
  58. appendNodeData(nodeMap, s, e, nodeData)
  59. if err != nil {
  60. return opencost.NewAllocationSet(start, end), nodeMap, fmt.Errorf("error computing allocation for %s: %s", opencost.NewClosedWindow(s, e), err)
  61. }
  62. // Append to the range
  63. asr.Append(as)
  64. // Set s equal to e to set up the next query, if one exists.
  65. s = e
  66. }
  67. // Populate annotations, labels, and services on each Allocation. This is
  68. // necessary because Properties.Intersection does not propagate any values
  69. // stored in maps or slices for performance reasons. In this case, however,
  70. // it is both acceptable and necessary to do so.
  71. allocationAnnotations := map[string]map[string]string{}
  72. allocationLabels := map[string]map[string]string{}
  73. allocationServices := map[string]map[string]bool{}
  74. // Also record errors and warnings, then append them to the results later.
  75. errors := []string{}
  76. warnings := []string{}
  77. for _, as := range asr.Allocations {
  78. for k, a := range as.Allocations {
  79. if len(a.Properties.Annotations) > 0 {
  80. if _, ok := allocationAnnotations[k]; !ok {
  81. allocationAnnotations[k] = map[string]string{}
  82. }
  83. for name, val := range a.Properties.Annotations {
  84. allocationAnnotations[k][name] = val
  85. }
  86. }
  87. if len(a.Properties.Labels) > 0 {
  88. if _, ok := allocationLabels[k]; !ok {
  89. allocationLabels[k] = map[string]string{}
  90. }
  91. for name, val := range a.Properties.Labels {
  92. allocationLabels[k][name] = val
  93. }
  94. }
  95. if len(a.Properties.Services) > 0 {
  96. if _, ok := allocationServices[k]; !ok {
  97. allocationServices[k] = map[string]bool{}
  98. }
  99. for _, val := range a.Properties.Services {
  100. allocationServices[k][val] = true
  101. }
  102. }
  103. }
  104. errors = append(errors, as.Errors...)
  105. warnings = append(warnings, as.Warnings...)
  106. }
  107. // Accumulate to yield the result AllocationSet. After this step, we will
  108. // be nearly complete, but without the raw allocation data, which must be
  109. // recomputed.
  110. resultASR, err := asr.Accumulate(opencost.AccumulateOptionAll)
  111. if err != nil {
  112. return opencost.NewAllocationSet(start, end), nil, fmt.Errorf("error accumulating data for %s: %s", opencost.NewClosedWindow(s, e), err)
  113. }
  114. if resultASR != nil && len(resultASR.Allocations) == 0 {
  115. return opencost.NewAllocationSet(start, end), nil, nil
  116. }
  117. if length := len(resultASR.Allocations); length != 1 {
  118. return opencost.NewAllocationSet(start, end), nil, fmt.Errorf("expected 1 accumulated allocation set, found %d sets", length)
  119. }
  120. result := resultASR.Allocations[0]
  121. // Apply the annotations, labels, and services to the post-accumulation
  122. // results. (See above for why this is necessary.)
  123. for k, a := range result.Allocations {
  124. if annotations, ok := allocationAnnotations[k]; ok {
  125. a.Properties.Annotations = annotations
  126. }
  127. if labels, ok := allocationLabels[k]; ok {
  128. a.Properties.Labels = labels
  129. }
  130. if services, ok := allocationServices[k]; ok {
  131. a.Properties.Services = []string{}
  132. for s := range services {
  133. a.Properties.Services = append(a.Properties.Services, s)
  134. }
  135. }
  136. // Expand the Window of all Allocations within the AllocationSet
  137. // to match the Window of the AllocationSet, which gets expanded
  138. // at the end of this function.
  139. a.Window = a.Window.ExpandStart(start).ExpandEnd(end)
  140. }
  141. // Maintain RAM and CPU max usage values by iterating over the range,
  142. // computing maximums on a rolling basis, and setting on the result set.
  143. for _, as := range asr.Allocations {
  144. for key, alloc := range as.Allocations {
  145. resultAlloc := result.Get(key)
  146. if resultAlloc == nil {
  147. continue
  148. }
  149. if resultAlloc.RawAllocationOnly == nil {
  150. resultAlloc.RawAllocationOnly = &opencost.RawAllocationOnlyData{}
  151. }
  152. if alloc.RawAllocationOnly == nil {
  153. // This will happen inevitably for unmounted disks, but should
  154. // ideally not happen for any allocation with CPU and RAM data.
  155. if !alloc.IsUnmounted() {
  156. log.DedupedWarningf(10, "ComputeAllocation: raw allocation data missing for %s", key)
  157. }
  158. continue
  159. }
  160. if alloc.RawAllocationOnly.CPUCoreUsageMax > resultAlloc.RawAllocationOnly.CPUCoreUsageMax {
  161. resultAlloc.RawAllocationOnly.CPUCoreUsageMax = alloc.RawAllocationOnly.CPUCoreUsageMax
  162. }
  163. if alloc.RawAllocationOnly.RAMBytesUsageMax > resultAlloc.RawAllocationOnly.RAMBytesUsageMax {
  164. resultAlloc.RawAllocationOnly.RAMBytesUsageMax = alloc.RawAllocationOnly.RAMBytesUsageMax
  165. }
  166. }
  167. }
  168. // Expand the window to match the queried time range.
  169. result.Window = result.Window.ExpandStart(start).ExpandEnd(end)
  170. // Append errors and warnings
  171. result.Errors = errors
  172. result.Warnings = warnings
  173. return result, nodeMap, nil
  174. }
  175. func appendNodeData(nodeMap map[string]*NodeTotals, s, e time.Time, nodeData map[nodeKey]*nodePricing) {
  176. for k, v := range nodeData {
  177. key := k.String()
  178. if _, ok := nodeMap[key]; !ok {
  179. nodeMap[key] = &NodeTotals{
  180. Start: s,
  181. End: e,
  182. Cluster: k.Cluster,
  183. Node: k.Node,
  184. CPUCost: 0.0,
  185. RAMCost: 0.0,
  186. GPUCost: 0.0,
  187. }
  188. }
  189. hours := e.Sub(s).Hours()
  190. // NOTE: These theoretically shouldn't overlap due to the way the
  191. // NOTE: metrics are accumulated, so this logic is safe.
  192. if s.Before(nodeMap[key].Start) {
  193. nodeMap[key].Start = s
  194. }
  195. if e.After(nodeMap[key].End) {
  196. nodeMap[key].End = e
  197. }
  198. nodeMap[key].CPUCost += v.CPUCores * (v.CostPerCPUHr * hours)
  199. nodeMap[key].RAMCost += v.RAMGiB * (v.CostPerRAMGiBHr * hours)
  200. nodeMap[key].GPUCost += v.GPUCount * (v.CostPerGPUHr * hours)
  201. }
  202. }
  203. // extendedNodeQueryResults is a place holder data type for the incubating
  204. // feature for extending the node details that can be returned with allocation
  205. // data
  206. type extendedNodeQueryResults struct {
  207. nodeCPUCoreResults []*prom.QueryResult
  208. nodeRAMByteResults []*prom.QueryResult
  209. nodeGPUCountResults []*prom.QueryResult
  210. }
  211. // queryExtendedNodeData makes additional prometheus queries for node data to append on
  212. // the AllocationNodePricing struct.
  213. func queryExtendedNodeData(ctx *prom.Context, start, end time.Time, durStr, resStr string) (*extendedNodeQueryResults, error) {
  214. queryNodeCPUCores := fmt.Sprintf(queryFmtNodeCPUCores, durStr, env.GetPromClusterLabel())
  215. resChQueryNodeCPUCores := ctx.QueryAtTime(queryNodeCPUCores, end)
  216. queryNodeRAMBytes := fmt.Sprintf(queryFmtNodeRAMBytes, durStr, env.GetPromClusterLabel())
  217. resChQueryNodeRAMBytes := ctx.QueryAtTime(queryNodeRAMBytes, end)
  218. queryNodeGPUCount := fmt.Sprintf(queryFmtNodeGPUCount, durStr, env.GetPromClusterLabel())
  219. resChQueryNodeGPUCount := ctx.QueryAtTime(queryNodeGPUCount, end)
  220. nodeCPUCoreResults, _ := resChQueryNodeCPUCores.Await()
  221. nodeRAMByteResults, _ := resChQueryNodeRAMBytes.Await()
  222. nodeGPUCountResults, _ := resChQueryNodeGPUCount.Await()
  223. return &extendedNodeQueryResults{
  224. nodeCPUCoreResults: nodeCPUCoreResults,
  225. nodeRAMByteResults: nodeRAMByteResults,
  226. nodeGPUCountResults: nodeGPUCountResults,
  227. }, nil
  228. }
  229. // applyExtendedNodeData is a place holder function for the incubating feature
  230. // which appends additional node data to the given node map
  231. func applyExtendedNodeData(nodeMap map[nodeKey]*nodePricing, results *extendedNodeQueryResults) {
  232. if results == nil {
  233. log.Warnf("Extended Node Results were nil. Ignoring...")
  234. return
  235. }
  236. applyNodeCPUCores(nodeMap, results.nodeCPUCoreResults)
  237. applyNodeRAMBytes(nodeMap, results.nodeRAMByteResults)
  238. applyNodeGPUCount(nodeMap, results.nodeGPUCountResults)
  239. }
  240. func applyNodeCPUCores(nodeMap map[nodeKey]*nodePricing, nodeCPUCoreResults []*prom.QueryResult) {
  241. for _, res := range nodeCPUCoreResults {
  242. cluster, err := res.GetString(env.GetPromClusterLabel())
  243. if err != nil {
  244. cluster = env.GetClusterID()
  245. }
  246. node, err := res.GetString("node")
  247. if err != nil {
  248. log.Warnf("CostModel.ComputeAllocation: Node CPU Cores query result missing field: %s", err)
  249. continue
  250. }
  251. key := newNodeKey(cluster, node)
  252. if _, ok := nodeMap[key]; !ok {
  253. log.Warnf("Unexpectedly found node key that doesn't exist: %s-%s", cluster, node)
  254. nodeMap[key] = &nodePricing{
  255. Name: node,
  256. }
  257. }
  258. nodeMap[key].CPUCores = res.Values[0].Value
  259. }
  260. }
  261. func applyNodeRAMBytes(nodeMap map[nodeKey]*nodePricing, nodeRAMByteResults []*prom.QueryResult) {
  262. for _, res := range nodeRAMByteResults {
  263. cluster, err := res.GetString(env.GetPromClusterLabel())
  264. if err != nil {
  265. cluster = env.GetClusterID()
  266. }
  267. node, err := res.GetString("node")
  268. if err != nil {
  269. log.Warnf("CostModel.ComputeAllocation: Node CPU Cores query result missing field: %s", err)
  270. continue
  271. }
  272. key := newNodeKey(cluster, node)
  273. if _, ok := nodeMap[key]; !ok {
  274. log.Warnf("Unexpectedly found node key that doesn't exist: %s-%s", cluster, node)
  275. nodeMap[key] = &nodePricing{
  276. Name: node,
  277. }
  278. }
  279. nodeMap[key].RAMGiB = res.Values[0].Value / 1024.0 / 1024.0 / 1024.0
  280. }
  281. }
  282. func applyNodeGPUCount(nodeMap map[nodeKey]*nodePricing, nodeGPUCountResults []*prom.QueryResult) {
  283. for _, res := range nodeGPUCountResults {
  284. cluster, err := res.GetString(env.GetPromClusterLabel())
  285. if err != nil {
  286. cluster = env.GetClusterID()
  287. }
  288. node, err := res.GetString("node")
  289. if err != nil {
  290. log.Warnf("CostModel.ComputeAllocation: Node CPU Cores query result missing field: %s", err)
  291. continue
  292. }
  293. key := newNodeKey(cluster, node)
  294. if _, ok := nodeMap[key]; !ok {
  295. log.Warnf("Unexpectedly found node key that doesn't exist: %s-%s", cluster, node)
  296. nodeMap[key] = &nodePricing{
  297. Name: node,
  298. }
  299. }
  300. nodeMap[key].GPUCount = res.Values[0].Value
  301. }
  302. }
  303. // nodePricing describes the resource costs associated with a given node,
  304. // as well as the source of the information (e.g. prometheus, custom)
  305. type nodePricing struct {
  306. Name string
  307. NodeType string
  308. ProviderID string
  309. Preemptible bool
  310. CPUCores float64
  311. CostPerCPUHr float64
  312. RAMGiB float64
  313. CostPerRAMGiBHr float64
  314. GPUCount float64
  315. CostPerGPUHr float64
  316. Discount float64
  317. Source string
  318. }