totals.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759
  1. package kubecost
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "github.com/opencost/opencost/pkg/log"
  8. "github.com/patrickmn/go-cache"
  9. )
  10. // AllocationTotals represents aggregate costs of all Allocations for
  11. // a given cluster or tuple of (cluster, node) between a given start and end
  12. // time, where the costs are aggregated per-resource. AllocationTotals
  13. // is designed to be used as a pre-computed intermediate data structure when
  14. // contextual knowledge is required to carry out a task, but computing totals
  15. // on-the-fly would be expensive; e.g. idle allocation; sharing coefficients
  16. // for idle or shared resources, etc.
  17. type AllocationTotals struct {
  18. Start time.Time `json:"start"`
  19. End time.Time `json:"end"`
  20. Cluster string `json:"cluster"`
  21. Node string `json:"node"`
  22. Count int `json:"count"`
  23. CPUCost float64 `json:"cpuCost"`
  24. CPUCostAdjustment float64 `json:"cpuCostAdjustment"`
  25. GPUCost float64 `json:"gpuCost"`
  26. GPUCostAdjustment float64 `json:"gpuCostAdjustment"`
  27. LoadBalancerCost float64 `json:"loadBalancerCost"`
  28. LoadBalancerCostAdjustment float64 `json:"loadBalancerCostAdjustment"`
  29. NetworkCost float64 `json:"networkCost"`
  30. NetworkCostAdjustment float64 `json:"networkCostAdjustment"`
  31. PersistentVolumeCost float64 `json:"persistentVolumeCost"`
  32. PersistentVolumeCostAdjustment float64 `json:"persistentVolumeCostAdjustment"`
  33. RAMCost float64 `json:"ramCost"`
  34. RAMCostAdjustment float64 `json:"ramCostAdjustment"`
  35. }
  36. // ClearAdjustments sets all adjustment fields to 0.0
  37. func (art *AllocationTotals) ClearAdjustments() {
  38. art.CPUCostAdjustment = 0.0
  39. art.GPUCostAdjustment = 0.0
  40. art.LoadBalancerCostAdjustment = 0.0
  41. art.NetworkCostAdjustment = 0.0
  42. art.PersistentVolumeCostAdjustment = 0.0
  43. art.RAMCostAdjustment = 0.0
  44. }
  45. // Clone deep copies the AllocationTotals
  46. func (art *AllocationTotals) Clone() *AllocationTotals {
  47. return &AllocationTotals{
  48. Start: art.Start,
  49. End: art.End,
  50. Cluster: art.Cluster,
  51. Node: art.Node,
  52. Count: art.Count,
  53. CPUCost: art.CPUCost,
  54. CPUCostAdjustment: art.CPUCostAdjustment,
  55. GPUCost: art.GPUCost,
  56. GPUCostAdjustment: art.GPUCostAdjustment,
  57. LoadBalancerCost: art.LoadBalancerCost,
  58. LoadBalancerCostAdjustment: art.LoadBalancerCostAdjustment,
  59. NetworkCost: art.NetworkCost,
  60. NetworkCostAdjustment: art.NetworkCostAdjustment,
  61. PersistentVolumeCost: art.PersistentVolumeCost,
  62. PersistentVolumeCostAdjustment: art.PersistentVolumeCostAdjustment,
  63. RAMCost: art.RAMCost,
  64. RAMCostAdjustment: art.RAMCostAdjustment,
  65. }
  66. }
  67. // TotalCPUCost returns CPU cost with adjustment.
  68. func (art *AllocationTotals) TotalCPUCost() float64 {
  69. return art.CPUCost + art.CPUCostAdjustment
  70. }
  71. // TotalGPUCost returns GPU cost with adjustment.
  72. func (art *AllocationTotals) TotalGPUCost() float64 {
  73. return art.GPUCost + art.GPUCostAdjustment
  74. }
  75. // TotalLoadBalancerCost returns LoadBalancer cost with adjustment.
  76. func (art *AllocationTotals) TotalLoadBalancerCost() float64 {
  77. return art.LoadBalancerCost + art.LoadBalancerCostAdjustment
  78. }
  79. // TotalNetworkCost returns Network cost with adjustment.
  80. func (art *AllocationTotals) TotalNetworkCost() float64 {
  81. return art.NetworkCost + art.NetworkCostAdjustment
  82. }
  83. // TotalPersistentVolumeCost returns PersistentVolume cost with adjustment.
  84. func (art *AllocationTotals) TotalPersistentVolumeCost() float64 {
  85. return art.PersistentVolumeCost + art.PersistentVolumeCostAdjustment
  86. }
  87. // TotalRAMCost returns RAM cost with adjustment.
  88. func (art *AllocationTotals) TotalRAMCost() float64 {
  89. return art.RAMCost + art.RAMCostAdjustment
  90. }
  91. // TotalCost returns the sum of all costs.
  92. func (art *AllocationTotals) TotalCost() float64 {
  93. return art.TotalCPUCost() + art.TotalGPUCost() + art.TotalLoadBalancerCost() +
  94. art.TotalNetworkCost() + art.TotalPersistentVolumeCost() + art.TotalRAMCost()
  95. }
  96. // ComputeAllocationTotals totals the resource costs of the given AllocationSet
  97. // using the given property, i.e. cluster or node, where "node" really means to
  98. // use the fully-qualified (cluster, node) tuple.
  99. func ComputeAllocationTotals(as *AllocationSet, prop string) map[string]*AllocationTotals {
  100. arts := map[string]*AllocationTotals{}
  101. as.Each(func(name string, alloc *Allocation) {
  102. // Do not count idle or unmounted allocations
  103. if alloc.IsIdle() || alloc.IsUnmounted() {
  104. return
  105. }
  106. // Default to computing totals by Cluster, but allow override to use Node.
  107. key := alloc.Properties.Cluster
  108. if prop == AllocationNodeProp {
  109. key = fmt.Sprintf("%s/%s", alloc.Properties.Cluster, alloc.Properties.Node)
  110. }
  111. if _, ok := arts[key]; !ok {
  112. arts[key] = &AllocationTotals{
  113. Start: alloc.Start,
  114. End: alloc.End,
  115. Cluster: alloc.Properties.Cluster,
  116. Node: alloc.Properties.Node,
  117. }
  118. }
  119. if arts[key].Start.After(alloc.Start) {
  120. arts[key].Start = alloc.Start
  121. }
  122. if arts[key].End.Before(alloc.End) {
  123. arts[key].End = alloc.End
  124. }
  125. if arts[key].Node != alloc.Properties.Node {
  126. arts[key].Node = ""
  127. }
  128. arts[key].Count++
  129. arts[key].CPUCost += alloc.CPUCost
  130. arts[key].CPUCostAdjustment += alloc.CPUCostAdjustment
  131. arts[key].GPUCost += alloc.GPUCost
  132. arts[key].GPUCostAdjustment += alloc.GPUCostAdjustment
  133. arts[key].LoadBalancerCost += alloc.LoadBalancerCost
  134. arts[key].LoadBalancerCostAdjustment += alloc.LoadBalancerCostAdjustment
  135. arts[key].NetworkCost += alloc.NetworkCost
  136. arts[key].NetworkCostAdjustment += alloc.NetworkCostAdjustment
  137. arts[key].PersistentVolumeCost += alloc.PVCost() // NOTE: PVCost() does not include adjustment
  138. arts[key].PersistentVolumeCostAdjustment += alloc.PVCostAdjustment
  139. arts[key].RAMCost += alloc.RAMCost
  140. arts[key].RAMCostAdjustment += alloc.RAMCostAdjustment
  141. })
  142. return arts
  143. }
  144. // AllocationTotalsSet represents totals, summed by both "cluster" and "node"
  145. // for a given window of time.
  146. type AllocationTotalsSet struct {
  147. Cluster map[string]*AllocationTotals `json:"cluster"`
  148. Node map[string]*AllocationTotals `json:"node"`
  149. Window Window `json:"window"`
  150. }
  151. func NewAllocationTotalsSet(window Window, byCluster, byNode map[string]*AllocationTotals) *AllocationTotalsSet {
  152. return &AllocationTotalsSet{
  153. Cluster: byCluster,
  154. Node: byNode,
  155. Window: window.Clone(),
  156. }
  157. }
  158. // AssetTotals represents aggregate costs of all Assets for a given
  159. // cluster or tuple of (cluster, node) between a given start and end time,
  160. // where the costs are aggregated per-resource. AssetTotals is designed
  161. // to be used as a pre-computed intermediate data structure when contextual
  162. // knowledge is required to carry out a task, but computing totals on-the-fly
  163. // would be expensive; e.g. idle allocation, shared tenancy costs
  164. type AssetTotals struct {
  165. Start time.Time `json:"start"`
  166. End time.Time `json:"end"`
  167. Cluster string `json:"cluster"`
  168. Node string `json:"node"`
  169. Count int `json:"count"`
  170. AttachedVolumeCost float64 `json:"attachedVolumeCost"`
  171. AttachedVolumeCostAdjustment float64 `json:"attachedVolumeCostAdjustment"`
  172. ClusterManagementCost float64 `json:"clusterManagementCost"`
  173. ClusterManagementCostAdjustment float64 `json:"clusterManagementCostAdjustment"`
  174. CPUCost float64 `json:"cpuCost"`
  175. CPUCostAdjustment float64 `json:"cpuCostAdjustment"`
  176. GPUCost float64 `json:"gpuCost"`
  177. GPUCostAdjustment float64 `json:"gpuCostAdjustment"`
  178. LoadBalancerCost float64 `json:"loadBalancerCost"`
  179. LoadBalancerCostAdjustment float64 `json:"loadBalancerCostAdjustment"`
  180. PersistentVolumeCost float64 `json:"persistentVolumeCost"`
  181. PersistentVolumeCostAdjustment float64 `json:"persistentVolumeCostAdjustment"`
  182. RAMCost float64 `json:"ramCost"`
  183. RAMCostAdjustment float64 `json:"ramCostAdjustment"`
  184. }
  185. // ClearAdjustments sets all adjustment fields to 0.0
  186. func (art *AssetTotals) ClearAdjustments() {
  187. art.AttachedVolumeCostAdjustment = 0.0
  188. art.ClusterManagementCostAdjustment = 0.0
  189. art.CPUCostAdjustment = 0.0
  190. art.GPUCostAdjustment = 0.0
  191. art.LoadBalancerCostAdjustment = 0.0
  192. art.PersistentVolumeCostAdjustment = 0.0
  193. art.RAMCostAdjustment = 0.0
  194. }
  195. // Clone deep copies the AssetTotals
  196. func (art *AssetTotals) Clone() *AssetTotals {
  197. return &AssetTotals{
  198. Start: art.Start,
  199. End: art.End,
  200. Cluster: art.Cluster,
  201. Node: art.Node,
  202. Count: art.Count,
  203. AttachedVolumeCost: art.AttachedVolumeCost,
  204. AttachedVolumeCostAdjustment: art.AttachedVolumeCostAdjustment,
  205. ClusterManagementCost: art.ClusterManagementCost,
  206. ClusterManagementCostAdjustment: art.ClusterManagementCostAdjustment,
  207. CPUCost: art.CPUCost,
  208. CPUCostAdjustment: art.CPUCostAdjustment,
  209. GPUCost: art.GPUCost,
  210. GPUCostAdjustment: art.GPUCostAdjustment,
  211. LoadBalancerCost: art.LoadBalancerCost,
  212. LoadBalancerCostAdjustment: art.LoadBalancerCostAdjustment,
  213. PersistentVolumeCost: art.PersistentVolumeCost,
  214. PersistentVolumeCostAdjustment: art.PersistentVolumeCostAdjustment,
  215. RAMCost: art.RAMCost,
  216. RAMCostAdjustment: art.RAMCostAdjustment,
  217. }
  218. }
  219. // TotalAttachedVolumeCost returns CPU cost with adjustment.
  220. func (art *AssetTotals) TotalAttachedVolumeCost() float64 {
  221. return art.AttachedVolumeCost + art.AttachedVolumeCostAdjustment
  222. }
  223. // TotalClusterManagementCost returns ClusterManagement cost with adjustment.
  224. func (art *AssetTotals) TotalClusterManagementCost() float64 {
  225. return art.ClusterManagementCost + art.ClusterManagementCostAdjustment
  226. }
  227. // TotalCPUCost returns CPU cost with adjustment.
  228. func (art *AssetTotals) TotalCPUCost() float64 {
  229. return art.CPUCost + art.CPUCostAdjustment
  230. }
  231. // TotalGPUCost returns GPU cost with adjustment.
  232. func (art *AssetTotals) TotalGPUCost() float64 {
  233. return art.GPUCost + art.GPUCostAdjustment
  234. }
  235. // TotalLoadBalancerCost returns LoadBalancer cost with adjustment.
  236. func (art *AssetTotals) TotalLoadBalancerCost() float64 {
  237. return art.LoadBalancerCost + art.LoadBalancerCostAdjustment
  238. }
  239. // TotalPersistentVolumeCost returns PersistentVolume cost with adjustment.
  240. func (art *AssetTotals) TotalPersistentVolumeCost() float64 {
  241. return art.PersistentVolumeCost + art.PersistentVolumeCostAdjustment
  242. }
  243. // TotalRAMCost returns RAM cost with adjustment.
  244. func (art *AssetTotals) TotalRAMCost() float64 {
  245. return art.RAMCost + art.RAMCostAdjustment
  246. }
  247. // TotalCost returns the sum of all costs
  248. func (art *AssetTotals) TotalCost() float64 {
  249. return art.TotalAttachedVolumeCost() + art.TotalClusterManagementCost() +
  250. art.TotalCPUCost() + art.TotalGPUCost() + art.TotalLoadBalancerCost() +
  251. art.TotalPersistentVolumeCost() + art.TotalRAMCost()
  252. }
  253. // ComputeAssetTotals totals the resource costs of the given AssetSet,
  254. // using the given property, i.e. cluster or node, where "node" really means to
  255. // use the fully-qualified (cluster, node) tuple.
  256. // NOTE: we're not capturing LoadBalancers here yet, but only because we don't
  257. // yet need them. They could be added.
  258. func ComputeAssetTotals(as *AssetSet, prop AssetProperty) map[string]*AssetTotals {
  259. arts := map[string]*AssetTotals{}
  260. // Attached disks are tracked by matching their name with the name of the
  261. // node, as is standard for attached disks.
  262. nodeNames := map[string]bool{}
  263. disks := map[string]*Disk{}
  264. as.Each(func(name string, asset Asset) {
  265. if node, ok := asset.(*Node); ok {
  266. // Default to computing totals by Cluster, but allow override to use Node.
  267. key := node.Properties().Cluster
  268. if prop == AssetNodeProp {
  269. key = fmt.Sprintf("%s/%s", node.Properties().Cluster, node.Properties().Name)
  270. }
  271. // Add node name to list of node names. (These are to be used later
  272. // for attached volumes.)
  273. nodeNames[fmt.Sprintf("%s/%s", node.Properties().Cluster, node.Properties().Name)] = true
  274. // adjustmentRate is used to scale resource costs proportionally
  275. // by the adjustment. This is necessary because we only get one
  276. // adjustment per Node, not one per-resource-per-Node.
  277. //
  278. // e.g. total cost = $90 (cost = $100, adjustment = -$10) => 0.9000 ( 90 / 100)
  279. // e.g. total cost = $150 (cost = $450, adjustment = -$300) => 0.3333 (150 / 450)
  280. // e.g. total cost = $150 (cost = $100, adjustment = $50) => 1.5000 (150 / 100)
  281. adjustmentRate := 1.0
  282. if node.TotalCost()-node.Adjustment() == 0 {
  283. // If (totalCost - adjustment) is 0.0 then adjustment cancels
  284. // the entire node cost and we should make everything 0
  285. // without dividing by 0.
  286. adjustmentRate = 0.0
  287. log.DedupedWarningf(5, "ComputeTotals: node cost adjusted to $0.00 for %s", node.Properties().Name)
  288. } else if node.Adjustment() != 0.0 {
  289. // adjustmentRate is the ratio of cost-with-adjustment (i.e. TotalCost)
  290. // to cost-without-adjustment (i.e. TotalCost - Adjustment).
  291. adjustmentRate = node.TotalCost() / (node.TotalCost() - node.Adjustment())
  292. }
  293. // 1. Start with raw, measured resource cost
  294. // 2. Apply discount to get discounted resource cost
  295. // 3. Apply adjustment to get final "adjusted" resource cost
  296. // 4. Subtract (3 - 2) to get adjustment in doller-terms
  297. // 5. Use (2 + 4) as total cost, so (2) is "cost" and (4) is "adjustment"
  298. // Example:
  299. // - node.CPUCost = 10.00
  300. // - node.Discount = 0.20 // We assume a 20% discount
  301. // - adjustmentRate = 0.75 // CUR says we need to reduce to 75% of our post-discount node cost
  302. //
  303. // 1. See above
  304. // 2. discountedCPUCost = 10.00 * (1.0 - 0.2) = 8.00
  305. // 3. adjustedCPUCost = 8.00 * 0.75 = 6.00 // this is the actual cost according to the CUR
  306. // 4. adjustment = 6.00 - 8.00 = -2.00
  307. // 5. totalCost = 6.00, which is the sum of (2) cost = 8.00 and (4) adjustment = -2.00
  308. discountedCPUCost := node.CPUCost * (1.0 - node.Discount)
  309. adjustedCPUCost := discountedCPUCost * adjustmentRate
  310. cpuCostAdjustment := adjustedCPUCost - discountedCPUCost
  311. discountedRAMCost := node.RAMCost * (1.0 - node.Discount)
  312. adjustedRAMCost := discountedRAMCost * adjustmentRate
  313. ramCostAdjustment := adjustedRAMCost - discountedRAMCost
  314. adjustedGPUCost := node.GPUCost * adjustmentRate
  315. gpuCostAdjustment := adjustedGPUCost - node.GPUCost
  316. if _, ok := arts[key]; !ok {
  317. arts[key] = &AssetTotals{
  318. Start: node.Start(),
  319. End: node.End(),
  320. Cluster: node.Properties().Cluster,
  321. Node: node.Properties().Name,
  322. }
  323. }
  324. if arts[key].Start.After(node.Start()) {
  325. arts[key].Start = node.Start()
  326. }
  327. if arts[key].End.Before(node.End()) {
  328. arts[key].End = node.End()
  329. }
  330. if arts[key].Node != node.Properties().Name {
  331. arts[key].Node = ""
  332. }
  333. arts[key].Count++
  334. // TotalCPUCost will be discounted cost + adjustment
  335. arts[key].CPUCost += discountedCPUCost
  336. arts[key].CPUCostAdjustment += cpuCostAdjustment
  337. // TotalRAMCost will be discounted cost + adjustment
  338. arts[key].RAMCost += discountedRAMCost
  339. arts[key].RAMCostAdjustment += ramCostAdjustment
  340. // TotalGPUCost will be discounted cost + adjustment
  341. arts[key].GPUCost += node.GPUCost
  342. arts[key].GPUCostAdjustment += gpuCostAdjustment
  343. } else if lb, ok := asset.(*LoadBalancer); ok && prop == AssetClusterProp {
  344. // Only record load balancers when prop is Cluster because we
  345. // can't break down LoadBalancer by node.
  346. key := lb.Properties().Cluster
  347. if _, ok := arts[key]; !ok {
  348. arts[key] = &AssetTotals{
  349. Start: lb.Start(),
  350. End: lb.End(),
  351. Cluster: lb.Properties().Cluster,
  352. }
  353. }
  354. arts[key].Count++
  355. arts[key].LoadBalancerCost += lb.Cost
  356. arts[key].LoadBalancerCostAdjustment += lb.adjustment
  357. } else if cm, ok := asset.(*ClusterManagement); ok && prop == AssetClusterProp {
  358. // Only record cluster management when prop is Cluster because we
  359. // can't break down ClusterManagement by node.
  360. key := cm.Properties().Cluster
  361. if _, ok := arts[key]; !ok {
  362. arts[key] = &AssetTotals{
  363. Start: cm.Start(),
  364. End: cm.End(),
  365. Cluster: cm.Properties().Cluster,
  366. }
  367. }
  368. arts[key].Count++
  369. arts[key].ClusterManagementCost += cm.Cost
  370. arts[key].ClusterManagementCostAdjustment += cm.adjustment
  371. } else if disk, ok := asset.(*Disk); ok {
  372. // Record disks in an intermediate structure, which will be
  373. // processed after all assets have been seen.
  374. key := fmt.Sprintf("%s/%s", disk.Properties().Cluster, disk.Properties().Name)
  375. disks[key] = disk
  376. }
  377. })
  378. // Record all disks as either attached volumes or persistent volumes.
  379. for name, disk := range disks {
  380. // By default, the key will be the name, which is the tuple of
  381. // cluster/node. But if we're aggregating by cluster only, then
  382. // reset the key to just the cluster.
  383. key := name
  384. if prop == AssetClusterProp {
  385. key = disk.Properties().Cluster
  386. }
  387. if _, ok := arts[key]; !ok {
  388. arts[key] = &AssetTotals{
  389. Start: disk.Start(),
  390. End: disk.End(),
  391. Cluster: disk.Properties().Cluster,
  392. }
  393. if prop == AssetNodeProp {
  394. arts[key].Node = disk.Properties().Name
  395. }
  396. }
  397. _, isAttached := nodeNames[name]
  398. if isAttached {
  399. // Record attached volume data at the cluster and node level, using
  400. // name matching to distinguish from PersistentVolumes.
  401. // TODO can we make a stronger match at the underlying ETL layer?
  402. arts[key].Count++
  403. arts[key].AttachedVolumeCost += disk.Cost
  404. arts[key].AttachedVolumeCostAdjustment += disk.adjustment
  405. } else if prop == AssetClusterProp {
  406. // Here, we're looking at a PersistentVolume because we're not
  407. // looking at an AttachedVolume. Only record PersistentVolume data
  408. // at the cluster level (i.e. prop == AssetClusterProp).
  409. arts[key].Count++
  410. arts[key].PersistentVolumeCost += disk.Cost
  411. arts[key].PersistentVolumeCostAdjustment += disk.adjustment
  412. }
  413. }
  414. return arts
  415. }
  416. // AssetTotalsSet represents totals, summed by both "cluster" and "node"
  417. // for a given window of time.
  418. type AssetTotalsSet struct {
  419. Cluster map[string]*AssetTotals `json:"cluster"`
  420. Node map[string]*AssetTotals `json:"node"`
  421. Window Window `json:"window"`
  422. }
  423. func NewAssetTotalsSet(window Window, byCluster, byNode map[string]*AssetTotals) *AssetTotalsSet {
  424. return &AssetTotalsSet{
  425. Cluster: byCluster,
  426. Node: byNode,
  427. Window: window.Clone(),
  428. }
  429. }
  430. // ComputeIdleCoefficients returns the idle coefficients for CPU, GPU, and RAM
  431. // (in that order) for the given resource costs and totals.
  432. func ComputeIdleCoefficients(shareSplit, key string, cpuCost, gpuCost, ramCost float64, allocationTotals map[string]*AllocationTotals) (float64, float64, float64) {
  433. if shareSplit == ShareNone {
  434. return 0.0, 0.0, 0.0
  435. }
  436. if shareSplit != ShareEven {
  437. shareSplit = ShareWeighted
  438. }
  439. var cpuCoeff, gpuCoeff, ramCoeff float64
  440. if _, ok := allocationTotals[key]; !ok {
  441. return 0.0, 0.0, 0.0
  442. }
  443. if shareSplit == ShareEven {
  444. coeff := 1.0 / float64(allocationTotals[key].Count)
  445. return coeff, coeff, coeff
  446. }
  447. if allocationTotals[key].TotalCPUCost() > 0 {
  448. cpuCoeff = cpuCost / allocationTotals[key].TotalCPUCost()
  449. }
  450. if allocationTotals[key].TotalGPUCost() > 0 {
  451. gpuCoeff = gpuCost / allocationTotals[key].TotalGPUCost()
  452. }
  453. if allocationTotals[key].TotalRAMCost() > 0 {
  454. ramCoeff = ramCost / allocationTotals[key].TotalRAMCost()
  455. }
  456. return cpuCoeff, gpuCoeff, ramCoeff
  457. }
  458. // TotalsStore acts as both an AllocationTotalsStore and an
  459. // AssetTotalsStore.
  460. type TotalsStore interface {
  461. AllocationTotalsStore
  462. AssetTotalsStore
  463. }
  464. // AllocationTotalsStore allows for storing (i.e. setting and
  465. // getting) AllocationTotals by cluster and by node.
  466. type AllocationTotalsStore interface {
  467. GetAllocationTotalsByCluster(start, end time.Time) (map[string]*AllocationTotals, bool)
  468. GetAllocationTotalsByNode(start, end time.Time) (map[string]*AllocationTotals, bool)
  469. SetAllocationTotalsByCluster(start, end time.Time, rts map[string]*AllocationTotals)
  470. SetAllocationTotalsByNode(start, end time.Time, rts map[string]*AllocationTotals)
  471. }
  472. // UpdateAllocationTotalsStore updates an AllocationTotalsStore
  473. // by totaling the given AllocationSet and saving the totals.
  474. func UpdateAllocationTotalsStore(arts AllocationTotalsStore, as *AllocationSet) (*AllocationTotalsSet, error) {
  475. if arts == nil {
  476. return nil, errors.New("cannot update nil AllocationTotalsStore")
  477. }
  478. if as == nil {
  479. return nil, errors.New("cannot update AllocationTotalsStore from nil AllocationSet")
  480. }
  481. if as.Window.IsOpen() {
  482. return nil, errors.New("cannot update AllocationTotalsStore from AllocationSet with open window")
  483. }
  484. start := *as.Window.Start()
  485. end := *as.Window.End()
  486. artsByCluster := ComputeAllocationTotals(as, AllocationClusterProp)
  487. arts.SetAllocationTotalsByCluster(start, end, artsByCluster)
  488. artsByNode := ComputeAllocationTotals(as, AllocationNodeProp)
  489. arts.SetAllocationTotalsByNode(start, end, artsByNode)
  490. log.Debugf("ETL: Allocation: updated resource totals for %s", as.Window)
  491. win := NewClosedWindow(start, end)
  492. abc := map[string]*AllocationTotals{}
  493. for key, val := range artsByCluster {
  494. abc[key] = val.Clone()
  495. }
  496. abn := map[string]*AllocationTotals{}
  497. for key, val := range artsByNode {
  498. abn[key] = val.Clone()
  499. }
  500. return NewAllocationTotalsSet(win, abc, abn), nil
  501. }
  502. // AssetTotalsStore allows for storing (i.e. setting and getting)
  503. // AssetTotals by cluster and by node.
  504. type AssetTotalsStore interface {
  505. GetAssetTotalsByCluster(start, end time.Time) (map[string]*AssetTotals, bool)
  506. GetAssetTotalsByNode(start, end time.Time) (map[string]*AssetTotals, bool)
  507. SetAssetTotalsByCluster(start, end time.Time, rts map[string]*AssetTotals)
  508. SetAssetTotalsByNode(start, end time.Time, rts map[string]*AssetTotals)
  509. }
  510. // UpdateAssetTotalsStore updates an AssetTotalsStore
  511. // by totaling the given AssetSet and saving the totals.
  512. func UpdateAssetTotalsStore(arts AssetTotalsStore, as *AssetSet) (*AssetTotalsSet, error) {
  513. if arts == nil {
  514. return nil, errors.New("cannot update nil AssetTotalsStore")
  515. }
  516. if as == nil {
  517. return nil, errors.New("cannot update AssetTotalsStore from nil AssetSet")
  518. }
  519. if as.Window.IsOpen() {
  520. return nil, errors.New("cannot update AssetTotalsStore from AssetSet with open window")
  521. }
  522. start := *as.Window.Start()
  523. end := *as.Window.End()
  524. artsByCluster := ComputeAssetTotals(as, AssetClusterProp)
  525. arts.SetAssetTotalsByCluster(start, end, artsByCluster)
  526. artsByNode := ComputeAssetTotals(as, AssetNodeProp)
  527. arts.SetAssetTotalsByNode(start, end, artsByNode)
  528. log.Debugf("ETL: Asset: updated resource totals for %s", as.Window)
  529. win := NewClosedWindow(start, end)
  530. abc := map[string]*AssetTotals{}
  531. for key, val := range artsByCluster {
  532. abc[key] = val.Clone()
  533. }
  534. abn := map[string]*AssetTotals{}
  535. for key, val := range artsByNode {
  536. abn[key] = val.Clone()
  537. }
  538. return NewAssetTotalsSet(win, abc, abn), nil
  539. }
  540. // MemoryTotalsStore is an in-memory cache TotalsStore
  541. type MemoryTotalsStore struct {
  542. allocTotalsByCluster *cache.Cache
  543. allocTotalsByNode *cache.Cache
  544. assetTotalsByCluster *cache.Cache
  545. assetTotalsByNode *cache.Cache
  546. }
  547. // NewMemoryTotalsStore instantiates a new MemoryTotalsStore,
  548. // which is composed of four in-memory caches.
  549. func NewMemoryTotalsStore() *MemoryTotalsStore {
  550. return &MemoryTotalsStore{
  551. allocTotalsByCluster: cache.New(cache.NoExpiration, cache.NoExpiration),
  552. allocTotalsByNode: cache.New(cache.NoExpiration, cache.NoExpiration),
  553. assetTotalsByCluster: cache.New(cache.NoExpiration, cache.NoExpiration),
  554. assetTotalsByNode: cache.New(cache.NoExpiration, cache.NoExpiration),
  555. }
  556. }
  557. // GetAllocationTotalsByCluster retrieves the AllocationTotals
  558. // by cluster for the given start and end times.
  559. func (mts *MemoryTotalsStore) GetAllocationTotalsByCluster(start time.Time, end time.Time) (map[string]*AllocationTotals, bool) {
  560. k := storeKey(start, end)
  561. if raw, ok := mts.allocTotalsByCluster.Get(k); !ok {
  562. return map[string]*AllocationTotals{}, false
  563. } else {
  564. original := raw.(map[string]*AllocationTotals)
  565. totals := make(map[string]*AllocationTotals, len(original))
  566. for k, v := range original {
  567. totals[k] = v.Clone()
  568. }
  569. return totals, true
  570. }
  571. }
  572. // GetAllocationTotalsByNode retrieves the AllocationTotals
  573. // by node for the given start and end times.
  574. func (mts *MemoryTotalsStore) GetAllocationTotalsByNode(start time.Time, end time.Time) (map[string]*AllocationTotals, bool) {
  575. k := storeKey(start, end)
  576. if raw, ok := mts.allocTotalsByNode.Get(k); !ok {
  577. return map[string]*AllocationTotals{}, false
  578. } else {
  579. original := raw.(map[string]*AllocationTotals)
  580. totals := make(map[string]*AllocationTotals, len(original))
  581. for k, v := range original {
  582. totals[k] = v.Clone()
  583. }
  584. return totals, true
  585. }
  586. }
  587. // SetAllocationTotalsByCluster set the per-cluster AllocationTotals
  588. // to the given values for the given start and end times.
  589. func (mts *MemoryTotalsStore) SetAllocationTotalsByCluster(start time.Time, end time.Time, arts map[string]*AllocationTotals) {
  590. k := storeKey(start, end)
  591. mts.allocTotalsByCluster.Set(k, arts, cache.NoExpiration)
  592. }
  593. // SetAllocationTotalsByNode set the per-node AllocationTotals
  594. // to the given values for the given start and end times.
  595. func (mts *MemoryTotalsStore) SetAllocationTotalsByNode(start time.Time, end time.Time, arts map[string]*AllocationTotals) {
  596. k := storeKey(start, end)
  597. mts.allocTotalsByNode.Set(k, arts, cache.NoExpiration)
  598. }
  599. // GetAssetTotalsByCluster retrieves the AssetTotals
  600. // by cluster for the given start and end times.
  601. func (mts *MemoryTotalsStore) GetAssetTotalsByCluster(start time.Time, end time.Time) (map[string]*AssetTotals, bool) {
  602. k := storeKey(start, end)
  603. if raw, ok := mts.assetTotalsByCluster.Get(k); !ok {
  604. return map[string]*AssetTotals{}, false
  605. } else {
  606. original := raw.(map[string]*AssetTotals)
  607. totals := make(map[string]*AssetTotals, len(original))
  608. for k, v := range original {
  609. totals[k] = v.Clone()
  610. }
  611. return totals, true
  612. }
  613. }
  614. // GetAssetTotalsByNode retrieves the AssetTotals
  615. // by node for the given start and end times.
  616. func (mts *MemoryTotalsStore) GetAssetTotalsByNode(start time.Time, end time.Time) (map[string]*AssetTotals, bool) {
  617. k := storeKey(start, end)
  618. if raw, ok := mts.assetTotalsByNode.Get(k); !ok {
  619. return map[string]*AssetTotals{}, false
  620. } else {
  621. original := raw.(map[string]*AssetTotals)
  622. totals := make(map[string]*AssetTotals, len(original))
  623. for k, v := range original {
  624. totals[k] = v.Clone()
  625. }
  626. return totals, true
  627. }
  628. }
  629. // SetAssetTotalsByCluster set the per-cluster AssetTotals
  630. // to the given values for the given start and end times.
  631. func (mts *MemoryTotalsStore) SetAssetTotalsByCluster(start time.Time, end time.Time, arts map[string]*AssetTotals) {
  632. k := storeKey(start, end)
  633. mts.assetTotalsByCluster.Set(k, arts, cache.NoExpiration)
  634. }
  635. // SetAssetTotalsByNode set the per-node AssetTotals
  636. // to the given values for the given start and end times.
  637. func (mts *MemoryTotalsStore) SetAssetTotalsByNode(start time.Time, end time.Time, arts map[string]*AssetTotals) {
  638. k := storeKey(start, end)
  639. mts.assetTotalsByNode.Set(k, arts, cache.NoExpiration)
  640. }
  641. // storeKey creates a storage key based on start and end times
  642. func storeKey(start, end time.Time) string {
  643. startStr := strconv.FormatInt(start.Unix(), 10)
  644. endStr := strconv.FormatInt(end.Unix(), 10)
  645. return fmt.Sprintf("%s-%s", startStr, endStr)
  646. }