2
0

allocation_incubating.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. //go:build incubating
  2. package costmodel
  3. import (
  4. "fmt"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/log"
  7. "github.com/opencost/opencost/core/pkg/opencost"
  8. "github.com/opencost/opencost/core/pkg/source"
  9. "github.com/opencost/opencost/pkg/env"
  10. )
  11. // NodeTotals contains the cpu, ram, and gpu costs for a given node over a specific timeframe.
  12. type NodeTotals struct {
  13. Start time.Time
  14. End time.Time
  15. Cluster string
  16. Node string
  17. CPUCost float64
  18. RAMCost float64
  19. GPUCost float64
  20. }
  21. // ComputeAllocationWithNodeTotals uses the CostModel instance to compute an AllocationSet
  22. // for the window defined by the given start and end times. The Allocations returned are unaggregated
  23. // (i.e. down to the container level), and the node totals should contained additional data that can be
  24. // used to calculate the idle costs at the node level.
  25. func (cm *CostModel) ComputeAllocationWithNodeTotals(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, map[string]*NodeTotals, error) {
  26. nodeMap := make(map[string]*NodeTotals)
  27. // If the duration is short enough, compute the AllocationSet directly
  28. if end.Sub(start) <= cm.BatchDuration {
  29. as, nodeData, err := cm.computeAllocation(start, end, resolution)
  30. appendNodeData(nodeMap, start, end, nodeData)
  31. return as, nodeMap, err
  32. }
  33. // If the duration exceeds the configured MaxPrometheusQueryDuration, then
  34. // query for maximum-sized AllocationSets, collect them, and accumulate.
  35. // s and e track the coverage of the entire given window over multiple
  36. // internal queries.
  37. s, e := start, start
  38. // Collect AllocationSets in a range, then accumulate
  39. // TODO optimize by collecting consecutive AllocationSets, accumulating as we go
  40. asr := opencost.NewAllocationSetRange()
  41. for e.Before(end) {
  42. // By default, query for the full remaining duration. But do not let
  43. // any individual query duration exceed the configured max Prometheus
  44. // query duration.
  45. duration := end.Sub(e)
  46. if duration > cm.BatchDuration {
  47. duration = cm.BatchDuration
  48. }
  49. // Set start and end parameters (s, e) for next individual computation.
  50. e = s.Add(duration)
  51. // Compute the individual AllocationSet for just (s, e)
  52. as, nodeData, err := cm.computeAllocation(s, e, resolution)
  53. appendNodeData(nodeMap, s, e, nodeData)
  54. if err != nil {
  55. return opencost.NewAllocationSet(start, end), nodeMap, fmt.Errorf("error computing allocation for %s: %s", opencost.NewClosedWindow(s, e), err)
  56. }
  57. // Append to the range
  58. asr.Append(as)
  59. // Set s equal to e to set up the next query, if one exists.
  60. s = e
  61. }
  62. // Populate annotations, labels, and services on each Allocation. This is
  63. // necessary because Properties.Intersection does not propagate any values
  64. // stored in maps or slices for performance reasons. In this case, however,
  65. // it is both acceptable and necessary to do so.
  66. allocationAnnotations := map[string]map[string]string{}
  67. allocationLabels := map[string]map[string]string{}
  68. allocationServices := map[string]map[string]bool{}
  69. // Also record errors and warnings, then append them to the results later.
  70. errors := []string{}
  71. warnings := []string{}
  72. for _, as := range asr.Allocations {
  73. for k, a := range as.Allocations {
  74. if len(a.Properties.Annotations) > 0 {
  75. if _, ok := allocationAnnotations[k]; !ok {
  76. allocationAnnotations[k] = map[string]string{}
  77. }
  78. for name, val := range a.Properties.Annotations {
  79. allocationAnnotations[k][name] = val
  80. }
  81. }
  82. if len(a.Properties.Labels) > 0 {
  83. if _, ok := allocationLabels[k]; !ok {
  84. allocationLabels[k] = map[string]string{}
  85. }
  86. for name, val := range a.Properties.Labels {
  87. allocationLabels[k][name] = val
  88. }
  89. }
  90. if len(a.Properties.Services) > 0 {
  91. if _, ok := allocationServices[k]; !ok {
  92. allocationServices[k] = map[string]bool{}
  93. }
  94. for _, val := range a.Properties.Services {
  95. allocationServices[k][val] = true
  96. }
  97. }
  98. }
  99. errors = append(errors, as.Errors...)
  100. warnings = append(warnings, as.Warnings...)
  101. }
  102. // Accumulate to yield the result AllocationSet. After this step, we will
  103. // be nearly complete, but without the raw allocation data, which must be
  104. // recomputed.
  105. resultASR, err := asr.Accumulate(opencost.AccumulateOptionAll)
  106. if err != nil {
  107. return opencost.NewAllocationSet(start, end), nil, fmt.Errorf("error accumulating data for %s: %s", opencost.NewClosedWindow(s, e), err)
  108. }
  109. if resultASR != nil && len(resultASR.Allocations) == 0 {
  110. return opencost.NewAllocationSet(start, end), nil, nil
  111. }
  112. if length := len(resultASR.Allocations); length != 1 {
  113. return opencost.NewAllocationSet(start, end), nil, fmt.Errorf("expected 1 accumulated allocation set, found %d sets", length)
  114. }
  115. result := resultASR.Allocations[0]
  116. // Apply the annotations, labels, and services to the post-accumulation
  117. // results. (See above for why this is necessary.)
  118. for k, a := range result.Allocations {
  119. if annotations, ok := allocationAnnotations[k]; ok {
  120. a.Properties.Annotations = annotations
  121. }
  122. if labels, ok := allocationLabels[k]; ok {
  123. a.Properties.Labels = labels
  124. }
  125. if services, ok := allocationServices[k]; ok {
  126. a.Properties.Services = []string{}
  127. for s := range services {
  128. a.Properties.Services = append(a.Properties.Services, s)
  129. }
  130. }
  131. // Expand the Window of all Allocations within the AllocationSet
  132. // to match the Window of the AllocationSet, which gets expanded
  133. // at the end of this function.
  134. a.Window = a.Window.ExpandStart(start).ExpandEnd(end)
  135. }
  136. // Maintain RAM and CPU max usage values by iterating over the range,
  137. // computing maximums on a rolling basis, and setting on the result set.
  138. for _, as := range asr.Allocations {
  139. for key, alloc := range as.Allocations {
  140. resultAlloc := result.Get(key)
  141. if resultAlloc == nil {
  142. continue
  143. }
  144. if resultAlloc.RawAllocationOnly == nil {
  145. resultAlloc.RawAllocationOnly = &opencost.RawAllocationOnlyData{}
  146. }
  147. if alloc.RawAllocationOnly == nil {
  148. // This will happen inevitably for unmounted disks, but should
  149. // ideally not happen for any allocation with CPU and RAM data.
  150. if !alloc.IsUnmounted() {
  151. log.DedupedWarningf(10, "ComputeAllocation: raw allocation data missing for %s", key)
  152. }
  153. continue
  154. }
  155. if alloc.RawAllocationOnly.CPUCoreUsageMax > resultAlloc.RawAllocationOnly.CPUCoreUsageMax {
  156. resultAlloc.RawAllocationOnly.CPUCoreUsageMax = alloc.RawAllocationOnly.CPUCoreUsageMax
  157. }
  158. if alloc.RawAllocationOnly.RAMBytesUsageMax > resultAlloc.RawAllocationOnly.RAMBytesUsageMax {
  159. resultAlloc.RawAllocationOnly.RAMBytesUsageMax = alloc.RawAllocationOnly.RAMBytesUsageMax
  160. }
  161. }
  162. }
  163. // Expand the window to match the queried time range.
  164. result.Window = result.Window.ExpandStart(start).ExpandEnd(end)
  165. // Append errors and warnings
  166. result.Errors = errors
  167. result.Warnings = warnings
  168. return result, nodeMap, nil
  169. }
  170. func appendNodeData(nodeMap map[string]*NodeTotals, s, e time.Time, nodeData map[nodeKey]*nodePricing) {
  171. for k, v := range nodeData {
  172. key := k.String()
  173. if _, ok := nodeMap[key]; !ok {
  174. nodeMap[key] = &NodeTotals{
  175. Start: s,
  176. End: e,
  177. Cluster: k.Cluster,
  178. Node: k.Node,
  179. CPUCost: 0.0,
  180. RAMCost: 0.0,
  181. GPUCost: 0.0,
  182. }
  183. }
  184. hours := e.Sub(s).Hours()
  185. // NOTE: These theoretically shouldn't overlap due to the way the
  186. // NOTE: metrics are accumulated, so this logic is safe.
  187. if s.Before(nodeMap[key].Start) {
  188. nodeMap[key].Start = s
  189. }
  190. if e.After(nodeMap[key].End) {
  191. nodeMap[key].End = e
  192. }
  193. nodeMap[key].CPUCost += v.CPUCores * (v.CostPerCPUHr * hours)
  194. nodeMap[key].RAMCost += v.RAMGiB * (v.CostPerRAMGiBHr * hours)
  195. nodeMap[key].GPUCost += v.GPUCount * (v.CostPerGPUHr * hours)
  196. }
  197. }
  198. // extendedNodeQueryResults is a place holder data type for the incubating
  199. // feature for extending the node details that can be returned with allocation
  200. // data
  201. type extendedNodeQueryResults struct {
  202. nodeCPUCoreResults []*source.NodeCPUCoresCapacityResult
  203. nodeRAMByteResults []*source.NodeRAMBytesCapacityResult
  204. nodeGPUCountResults []*source.NodeGPUCountResult
  205. }
  206. // queryExtendedNodeData makes additional prometheus queries for node data to append on
  207. // the AllocationNodePricing struct.
  208. func queryExtendedNodeData(grp *source.QueryGroup, ds source.MetricsQuerier, start, end time.Time) (*extendedNodeQueryResults, error) {
  209. resChQueryNodeCPUCores := source.WithGroup(grp, ds.QueryNodeCPUCoresCapacity(start, end))
  210. resChQueryNodeRAMBytes := source.WithGroup(grp, ds.QueryNodeRAMBytesCapacity(start, end))
  211. resChQueryNodeGPUCount := source.WithGroup(grp, ds.QueryNodeGPUCount(start, end))
  212. nodeCPUCoreResults, _ := resChQueryNodeCPUCores.Await()
  213. nodeRAMByteResults, _ := resChQueryNodeRAMBytes.Await()
  214. nodeGPUCountResults, _ := resChQueryNodeGPUCount.Await()
  215. return &extendedNodeQueryResults{
  216. nodeCPUCoreResults: nodeCPUCoreResults,
  217. nodeRAMByteResults: nodeRAMByteResults,
  218. nodeGPUCountResults: nodeGPUCountResults,
  219. }, nil
  220. }
  221. // applyExtendedNodeData is a place holder function for the incubating feature
  222. // which appends additional node data to the given node map
  223. func applyExtendedNodeData(nodeMap map[nodeKey]*nodePricing, results *extendedNodeQueryResults) {
  224. if results == nil {
  225. log.Warnf("Extended Node Results were nil. Ignoring...")
  226. return
  227. }
  228. applyNodeCPUCores(nodeMap, results.nodeCPUCoreResults)
  229. applyNodeRAMBytes(nodeMap, results.nodeRAMByteResults)
  230. applyNodeGPUCount(nodeMap, results.nodeGPUCountResults)
  231. }
  232. func applyNodeCPUCores(nodeMap map[nodeKey]*nodePricing, nodeCPUCoreResults []*source.NodeCPUCoresCapacityResult) {
  233. for _, res := range nodeCPUCoreResults {
  234. cluster := res.Cluster
  235. if cluster == "" {
  236. cluster = env.GetClusterID()
  237. }
  238. node := res.Node
  239. if node == "" {
  240. log.Warnf("CostModel.ComputeAllocation: Node CPU Cores query result missing node field")
  241. continue
  242. }
  243. key := newNodeKey(cluster, node)
  244. if _, ok := nodeMap[key]; !ok {
  245. log.Warnf("Unexpectedly found node key that doesn't exist: %s-%s", cluster, node)
  246. nodeMap[key] = &nodePricing{
  247. Name: node,
  248. }
  249. }
  250. nodeMap[key].CPUCores = res.Data[0].Value
  251. }
  252. }
  253. func applyNodeRAMBytes(nodeMap map[nodeKey]*nodePricing, nodeRAMByteResults []*source.NodeRAMBytesCapacityResult) {
  254. for _, res := range nodeRAMByteResults {
  255. cluster := res.Cluster
  256. if cluster == "" {
  257. cluster = env.GetClusterID()
  258. }
  259. node := res.Node
  260. if node == "" {
  261. log.Warnf("CostModel.ComputeAllocation: Node CPU Cores query result missing node field")
  262. continue
  263. }
  264. key := newNodeKey(cluster, node)
  265. if _, ok := nodeMap[key]; !ok {
  266. log.Warnf("Unexpectedly found node key that doesn't exist: %s-%s", cluster, node)
  267. nodeMap[key] = &nodePricing{
  268. Name: node,
  269. }
  270. }
  271. nodeMap[key].RAMGiB = res.Data[0].Value / 1024.0 / 1024.0 / 1024.0
  272. }
  273. }
  274. func applyNodeGPUCount(nodeMap map[nodeKey]*nodePricing, nodeGPUCountResults []*source.NodeGPUCountResult) {
  275. for _, res := range nodeGPUCountResults {
  276. cluster := res.Cluster
  277. if cluster == "" {
  278. cluster = env.GetClusterID()
  279. }
  280. node := res.Node
  281. if node == "" {
  282. log.Warnf("CostModel.ComputeAllocation: Node CPU Cores query result missing node field")
  283. continue
  284. }
  285. key := newNodeKey(cluster, node)
  286. if _, ok := nodeMap[key]; !ok {
  287. log.Warnf("Unexpectedly found node key that doesn't exist: %s-%s", cluster, node)
  288. nodeMap[key] = &nodePricing{
  289. Name: node,
  290. }
  291. }
  292. nodeMap[key].GPUCount = res.Data[0].Value
  293. }
  294. }
  295. // nodePricing describes the resource costs associated with a given node,
  296. // as well as the source of the information (e.g. prometheus, custom)
  297. type nodePricing struct {
  298. Name string
  299. NodeType string
  300. ProviderID string
  301. Preemptible bool
  302. CPUCores float64
  303. CostPerCPUHr float64
  304. RAMGiB float64
  305. CostPerRAMGiBHr float64
  306. GPUCount float64
  307. CostPerGPUHr float64
  308. Discount float64
  309. Source string
  310. }