totals.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
  1. package kubecost
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "github.com/kubecost/cost-model/pkg/log"
  8. "github.com/patrickmn/go-cache"
  9. )
  10. // AllocationTotals represents aggregate costs of all Allocations for
  11. // a given cluster or tuple of (cluster, node) between a given start and end
  12. // time, where the costs are aggregated per-resource. AllocationTotals
  13. // is designed to be used as a pre-computed intermediate data structure when
  14. // contextual knowledge is required to carry out a task, but computing totals
  15. // on-the-fly would be expensive; e.g. idle allocation; sharing coefficients
  16. // for idle or shared resources, etc.
  17. type AllocationTotals struct {
  18. Start time.Time `json:"start"`
  19. End time.Time `json:"end"`
  20. Cluster string `json:"cluster"`
  21. Node string `json:"node"`
  22. Count int `json:"count"`
  23. CPUCost float64 `json:"cpuCost"`
  24. CPUCostAdjustment float64 `json:"cpuCostAdjustment"`
  25. GPUCost float64 `json:"gpuCost"`
  26. GPUCostAdjustment float64 `json:"gpuCostAdjustment"`
  27. LoadBalancerCost float64 `json:"loadBalancerCost"`
  28. LoadBalancerCostAdjustment float64 `json:"loadBalancerCostAdjustment"`
  29. NetworkCost float64 `json:"networkCost"`
  30. NetworkCostAdjustment float64 `json:"networkCostAdjustment"`
  31. PersistentVolumeCost float64 `json:"persistentVolumeCost"`
  32. PersistentVolumeCostAdjustment float64 `json:"persistentVolumeCostAdjustment"`
  33. RAMCost float64 `json:"ramCost"`
  34. RAMCostAdjustment float64 `json:"ramCostAdjustment"`
  35. }
  36. // ClearAdjustments sets all adjustment fields to 0.0
  37. func (art *AllocationTotals) ClearAdjustments() {
  38. art.CPUCostAdjustment = 0.0
  39. art.GPUCostAdjustment = 0.0
  40. art.RAMCostAdjustment = 0.0
  41. }
  42. // TotalCPUCost returns CPU cost with adjustment.
  43. func (art *AllocationTotals) TotalCPUCost() float64 {
  44. return art.CPUCost + art.CPUCostAdjustment
  45. }
  46. // TotalGPUCost returns GPU cost with adjustment.
  47. func (art *AllocationTotals) TotalGPUCost() float64 {
  48. return art.GPUCost + art.GPUCostAdjustment
  49. }
  50. // TotalLoadBalancerCost returns LoadBalancer cost with adjustment.
  51. func (art *AllocationTotals) TotalLoadBalancerCost() float64 {
  52. return art.LoadBalancerCost + art.LoadBalancerCostAdjustment
  53. }
  54. // TotalNetworkCost returns Network cost with adjustment.
  55. func (art *AllocationTotals) TotalNetworkCost() float64 {
  56. return art.NetworkCost + art.NetworkCostAdjustment
  57. }
  58. // TotalPersistentVolumeCost returns PersistentVolume cost with adjustment.
  59. func (art *AllocationTotals) TotalPersistentVolumeCost() float64 {
  60. return art.PersistentVolumeCost + art.PersistentVolumeCostAdjustment
  61. }
  62. // TotalRAMCost returns RAM cost with adjustment.
  63. func (art *AllocationTotals) TotalRAMCost() float64 {
  64. return art.RAMCost + art.RAMCostAdjustment
  65. }
  66. // TotalCost returns the sum of all costs.
  67. func (art *AllocationTotals) TotalCost() float64 {
  68. return art.TotalCPUCost() + art.TotalGPUCost() + art.TotalLoadBalancerCost() +
  69. art.TotalNetworkCost() + art.TotalPersistentVolumeCost() + art.TotalRAMCost()
  70. }
  71. // ComputeAllocationTotals totals the resource costs of the given AllocationSet
  72. // using the given property, i.e. cluster or node, where "node" really means to
  73. // use the fully-qualified (cluster, node) tuple.
  74. func ComputeAllocationTotals(as *AllocationSet, prop string) map[string]*AllocationTotals {
  75. arts := map[string]*AllocationTotals{}
  76. as.Each(func(name string, alloc *Allocation) {
  77. // Do not count idle or unmounted allocations
  78. if alloc.IsIdle() || alloc.IsUnmounted() {
  79. return
  80. }
  81. // Default to computing totals by Cluster, but allow override to use Node.
  82. key := alloc.Properties.Cluster
  83. if prop == AllocationNodeProp {
  84. key = fmt.Sprintf("%s/%s", alloc.Properties.Cluster, alloc.Properties.Node)
  85. }
  86. if _, ok := arts[key]; !ok {
  87. arts[key] = &AllocationTotals{
  88. Start: alloc.Start,
  89. End: alloc.End,
  90. Cluster: alloc.Properties.Cluster,
  91. Node: alloc.Properties.Node,
  92. }
  93. }
  94. if arts[key].Start.After(alloc.Start) {
  95. arts[key].Start = alloc.Start
  96. }
  97. if arts[key].End.Before(alloc.End) {
  98. arts[key].End = alloc.End
  99. }
  100. if arts[key].Node != alloc.Properties.Node {
  101. arts[key].Node = ""
  102. }
  103. arts[key].Count++
  104. arts[key].CPUCost += alloc.CPUCost
  105. arts[key].CPUCostAdjustment += alloc.CPUCostAdjustment
  106. arts[key].GPUCost += alloc.GPUCost
  107. arts[key].GPUCostAdjustment += alloc.GPUCostAdjustment
  108. arts[key].LoadBalancerCost += alloc.LoadBalancerCost
  109. arts[key].LoadBalancerCostAdjustment += alloc.LoadBalancerCostAdjustment
  110. arts[key].NetworkCost += alloc.NetworkCost
  111. arts[key].NetworkCostAdjustment += alloc.NetworkCostAdjustment
  112. arts[key].PersistentVolumeCost += alloc.PVCost() // NOTE: PVCost() does not include adjustment
  113. arts[key].PersistentVolumeCostAdjustment += alloc.PVCostAdjustment
  114. arts[key].RAMCost += alloc.RAMCost
  115. arts[key].RAMCostAdjustment += alloc.RAMCostAdjustment
  116. })
  117. return arts
  118. }
  119. // AssetTotals represents aggregate costs of all Assets for a given
  120. // cluster or tuple of (cluster, node) between a given start and end time,
  121. // where the costs are aggregated per-resource. AssetTotals is designed
  122. // to be used as a pre-computed intermediate data structure when contextual
  123. // knowledge is required to carry out a task, but computing totals on-the-fly
  124. // would be expensive; e.g. idle allocation, shared tenancy costs
  125. type AssetTotals struct {
  126. Start time.Time `json:"start"`
  127. End time.Time `json:"end"`
  128. Cluster string `json:"cluster"`
  129. Node string `json:"node"`
  130. Count int `json:"count"`
  131. AttachedVolumeCost float64 `json:"attachedVolumeCost"`
  132. AttachedVolumeCostAdjustment float64 `json:"attachedVolumeCostAdjustment"`
  133. ClusterManagementCost float64 `json:"clusterManagementCost"`
  134. ClusterManagementCostAdjustment float64 `json:"clusterManagementCostAdjustment"`
  135. CPUCost float64 `json:"cpuCost"`
  136. CPUCostAdjustment float64 `json:"cpuCostAdjustment"`
  137. GPUCost float64 `json:"gpuCost"`
  138. GPUCostAdjustment float64 `json:"gpuCostAdjustment"`
  139. LoadBalancerCost float64 `json:"loadBalancerCost"`
  140. LoadBalancerCostAdjustment float64 `json:"loadBalancerCostAdjustment"`
  141. PersistentVolumeCost float64 `json:"persistentVolumeCost"`
  142. PersistentVolumeCostAdjustment float64 `json:"persistentVolumeCostAdjustment"`
  143. RAMCost float64 `json:"ramCost"`
  144. RAMCostAdjustment float64 `json:"ramCostAdjustment"`
  145. }
  146. // ClearAdjustments sets all adjustment fields to 0.0
  147. func (art *AssetTotals) ClearAdjustments() {
  148. art.AttachedVolumeCostAdjustment = 0.0
  149. art.ClusterManagementCostAdjustment = 0.0
  150. art.CPUCostAdjustment = 0.0
  151. art.GPUCostAdjustment = 0.0
  152. art.LoadBalancerCostAdjustment = 0.0
  153. art.PersistentVolumeCostAdjustment = 0.0
  154. art.RAMCostAdjustment = 0.0
  155. }
  156. // TotalAttachedVolumeCost returns CPU cost with adjustment.
  157. func (art *AssetTotals) TotalAttachedVolumeCost() float64 {
  158. return art.AttachedVolumeCost + art.AttachedVolumeCostAdjustment
  159. }
  160. // TotalClusterManagementCost returns ClusterManagement cost with adjustment.
  161. func (art *AssetTotals) TotalClusterManagementCost() float64 {
  162. return art.ClusterManagementCost + art.ClusterManagementCostAdjustment
  163. }
  164. // TotalCPUCost returns CPU cost with adjustment.
  165. func (art *AssetTotals) TotalCPUCost() float64 {
  166. return art.CPUCost + art.CPUCostAdjustment
  167. }
  168. // TotalGPUCost returns GPU cost with adjustment.
  169. func (art *AssetTotals) TotalGPUCost() float64 {
  170. return art.GPUCost + art.GPUCostAdjustment
  171. }
  172. // TotalLoadBalancerCost returns LoadBalancer cost with adjustment.
  173. func (art *AssetTotals) TotalLoadBalancerCost() float64 {
  174. return art.LoadBalancerCost + art.LoadBalancerCostAdjustment
  175. }
  176. // TotalPersistentVolumeCost returns PersistentVolume cost with adjustment.
  177. func (art *AssetTotals) TotalPersistentVolumeCost() float64 {
  178. return art.PersistentVolumeCost + art.PersistentVolumeCostAdjustment
  179. }
  180. // TotalRAMCost returns RAM cost with adjustment.
  181. func (art *AssetTotals) TotalRAMCost() float64 {
  182. return art.RAMCost + art.RAMCostAdjustment
  183. }
  184. // TotalCost returns the sum of all costs
  185. func (art *AssetTotals) TotalCost() float64 {
  186. return art.TotalAttachedVolumeCost() + art.TotalClusterManagementCost() +
  187. art.TotalCPUCost() + art.TotalGPUCost() + art.TotalLoadBalancerCost() +
  188. art.TotalPersistentVolumeCost() + art.TotalRAMCost()
  189. }
  190. // ComputeAssetTotals totals the resource costs of the given AssetSet,
  191. // using the given property, i.e. cluster or node, where "node" really means to
  192. // use the fully-qualified (cluster, node) tuple.
  193. // NOTE: we're not capturing LoadBalancers here yet, but only because we don't
  194. // yet need them. They could be added.
  195. func ComputeAssetTotals(as *AssetSet, prop AssetProperty) map[string]*AssetTotals {
  196. arts := map[string]*AssetTotals{}
  197. // Attached disks are tracked by matching their name with the name of the
  198. // node, as is standard for attached disks.
  199. nodeNames := map[string]bool{}
  200. disks := map[string]*Disk{}
  201. as.Each(func(name string, asset Asset) {
  202. if node, ok := asset.(*Node); ok {
  203. // Default to computing totals by Cluster, but allow override to use Node.
  204. key := node.Properties().Cluster
  205. if prop == AssetNodeProp {
  206. key = fmt.Sprintf("%s/%s", node.Properties().Cluster, node.Properties().Name)
  207. }
  208. // Add node name to list of node names, but only if aggregating
  209. // by node. (These are to be used later for attached volumes.)
  210. nodeNames[key] = true
  211. // adjustmentRate is used to scale resource costs proportionally
  212. // by the adjustment. This is necessary because we only get one
  213. // adjustment per Node, not one per-resource-per-Node.
  214. //
  215. // e.g. total cost = $90 (cost = $100, adjustment = -$10) => 0.9000 ( 90 / 100)
  216. // e.g. total cost = $150 (cost = $450, adjustment = -$300) => 0.3333 (150 / 450)
  217. // e.g. total cost = $150 (cost = $100, adjustment = $50) => 1.5000 (150 / 100)
  218. adjustmentRate := 1.0
  219. if node.TotalCost()-node.Adjustment() == 0 {
  220. // If (totalCost - adjustment) is 0.0 then adjustment cancels
  221. // the entire node cost and we should make everything 0
  222. // without dividing by 0.
  223. adjustmentRate = 0.0
  224. log.DedupedWarningf(5, "ComputeTotals: node cost adjusted to $0.00 for %s", node.Properties().Name)
  225. } else if node.Adjustment() != 0.0 {
  226. // adjustmentRate is the ratio of cost-with-adjustment (i.e. TotalCost)
  227. // to cost-without-adjustment (i.e. TotalCost - Adjustment).
  228. adjustmentRate = node.TotalCost() / (node.TotalCost() - node.Adjustment())
  229. }
  230. // 1. Start with raw, measured resource cost
  231. // 2. Apply discount to get discounted resource cost
  232. // 3. Apply adjustment to get final "adjusted" resource cost
  233. // 4. Subtract (3 - 2) to get adjustment in doller-terms
  234. // 5. Use (2 + 4) as total cost, so (2) is "cost" and (4) is "adjustment"
  235. // Example:
  236. // - node.CPUCost = 10.00
  237. // - node.Discount = 0.20 // We assume a 20% discount
  238. // - adjustmentRate = 0.75 // CUR says we need to reduce to 75% of our post-discount node cost
  239. //
  240. // 1. See above
  241. // 2. discountedCPUCost = 10.00 * (1.0 - 0.2) = 8.00
  242. // 3. adjustedCPUCost = 8.00 * 0.75 = 6.00 // this is the actual cost according to the CUR
  243. // 4. adjustment = 6.00 - 8.00 = -2.00
  244. // 5. totalCost = 6.00, which is the sum of (2) cost = 8.00 and (4) adjustment = -2.00
  245. discountedCPUCost := node.CPUCost * (1.0 - node.Discount)
  246. adjustedCPUCost := discountedCPUCost * adjustmentRate
  247. cpuCostAdjustment := adjustedCPUCost - discountedCPUCost
  248. discountedGPUCost := node.GPUCost * (1.0 - node.Discount)
  249. adjustedGPUCost := discountedGPUCost * adjustmentRate
  250. gpuCostAdjustment := discountedGPUCost - adjustedGPUCost
  251. discountedRAMCost := node.RAMCost * (1.0 - node.Discount)
  252. adjustedRAMCost := discountedRAMCost * adjustmentRate
  253. ramCostAdjustment := adjustedRAMCost - discountedRAMCost
  254. if _, ok := arts[key]; !ok {
  255. arts[key] = &AssetTotals{
  256. Start: node.Start(),
  257. End: node.End(),
  258. Cluster: node.Properties().Cluster,
  259. Node: node.Properties().Name,
  260. }
  261. }
  262. if arts[key].Start.After(node.Start()) {
  263. arts[key].Start = node.Start()
  264. }
  265. if arts[key].End.Before(node.End()) {
  266. arts[key].End = node.End()
  267. }
  268. if arts[key].Node != node.Properties().Name {
  269. arts[key].Node = ""
  270. }
  271. arts[key].Count++
  272. // TotalCPUCost will be discounted cost + adjustment
  273. arts[key].CPUCost += discountedCPUCost
  274. arts[key].CPUCostAdjustment += cpuCostAdjustment
  275. // TotalRAMCost will be discounted cost + adjustment
  276. arts[key].RAMCost += discountedRAMCost
  277. arts[key].RAMCostAdjustment += ramCostAdjustment
  278. // TotalGPUCost will be discounted cost + adjustment
  279. arts[key].GPUCost += discountedGPUCost
  280. arts[key].GPUCostAdjustment += gpuCostAdjustment
  281. } else if lb, ok := asset.(*LoadBalancer); ok && prop == AssetClusterProp {
  282. // Only record load balancers when prop is Cluster because we
  283. // can't break down LoadBalancer by node.
  284. key := lb.Properties().Cluster
  285. if _, ok := arts[key]; !ok {
  286. arts[key] = &AssetTotals{
  287. Start: lb.Start(),
  288. End: lb.End(),
  289. Cluster: lb.Properties().Cluster,
  290. }
  291. }
  292. arts[key].Count++
  293. arts[key].LoadBalancerCost += lb.Cost
  294. arts[key].LoadBalancerCost += lb.adjustment
  295. } else if cm, ok := asset.(*ClusterManagement); ok && prop == AssetClusterProp {
  296. // Only record cluster management when prop is Cluster because we
  297. // can't break down ClusterManagement by node.
  298. key := cm.Properties().Cluster
  299. if _, ok := arts[key]; !ok {
  300. arts[key] = &AssetTotals{
  301. Start: cm.Start(),
  302. End: cm.End(),
  303. Cluster: cm.Properties().Cluster,
  304. }
  305. }
  306. arts[key].Count++
  307. arts[key].ClusterManagementCost += cm.TotalCost()
  308. } else if disk, ok := asset.(*Disk); ok {
  309. // Record disks in an intermediate structure, which will be
  310. // processed after all assets have been seen.
  311. key := fmt.Sprintf("%s/%s", disk.Properties().Cluster, disk.Properties().Name)
  312. disks[key] = disk
  313. }
  314. })
  315. // Record all disks as either attached volumes or persistent volumes.
  316. for name, disk := range disks {
  317. // By default, the key will be the name, which is the tuple of
  318. // cluster/node. But if we're aggregating by cluster only, then
  319. // reset the key to just the cluster.
  320. key := name
  321. if prop == AssetClusterProp {
  322. key = disk.Properties().Cluster
  323. }
  324. if _, ok := arts[key]; !ok {
  325. arts[key] = &AssetTotals{
  326. Start: disk.Start(),
  327. End: disk.End(),
  328. Cluster: disk.Properties().Cluster,
  329. }
  330. if prop == AssetNodeProp {
  331. arts[key].Node = disk.Properties().Name
  332. }
  333. }
  334. _, isAttached := nodeNames[name]
  335. if isAttached {
  336. // Record attached volume data at the cluster and node level, using
  337. // name matching to distinguish from PersistentVolumes.
  338. // TODO can we make a stronger match at the underlying ETL layer?
  339. arts[key].Count++
  340. arts[key].AttachedVolumeCost += disk.Cost
  341. arts[key].AttachedVolumeCostAdjustment += disk.adjustment
  342. } else if prop == AssetClusterProp {
  343. // Only record PersistentVolume data at the cluster level
  344. arts[key].Count++
  345. arts[key].PersistentVolumeCost += disk.Cost
  346. arts[key].PersistentVolumeCostAdjustment += disk.adjustment
  347. }
  348. }
  349. return arts
  350. }
  351. // ComputeIdleCoefficients returns the idle coefficients for CPU, GPU, and RAM
  352. // (in that order) for the given resource costs and totals.
  353. func ComputeIdleCoefficients(shareSplit, key string, cpuCost, gpuCost, ramCost float64, allocationTotals map[string]*AllocationTotals) (float64, float64, float64) {
  354. if shareSplit == ShareNone {
  355. return 0.0, 0.0, 0.0
  356. }
  357. if shareSplit != ShareEven {
  358. shareSplit = ShareWeighted
  359. }
  360. var cpuCoeff, gpuCoeff, ramCoeff float64
  361. if _, ok := allocationTotals[key]; !ok {
  362. return 0.0, 0.0, 0.0
  363. }
  364. if shareSplit == ShareEven {
  365. coeff := 1.0 / float64(allocationTotals[key].Count)
  366. return coeff, coeff, coeff
  367. }
  368. if allocationTotals[key].CPUCost > 0 {
  369. cpuCoeff = cpuCost / allocationTotals[key].TotalCPUCost()
  370. }
  371. if allocationTotals[key].GPUCost > 0 {
  372. gpuCoeff = gpuCost / allocationTotals[key].TotalGPUCost()
  373. }
  374. if allocationTotals[key].RAMCost > 0 {
  375. ramCoeff = ramCost / allocationTotals[key].TotalRAMCost()
  376. }
  377. return cpuCoeff, gpuCoeff, ramCoeff
  378. }
  379. // TotalsStore acts as both an AllocationTotalsStore and an
  380. // AssetTotalsStore.
  381. type TotalsStore interface {
  382. AllocationTotalsStore
  383. AssetTotalsStore
  384. }
  385. // AllocationTotalsStore allows for storing (i.e. setting and
  386. // getting) AllocationTotals by cluster and by node.
  387. type AllocationTotalsStore interface {
  388. GetAllocationTotalsByCluster(start, end time.Time) (map[string]*AllocationTotals, bool)
  389. GetAllocationTotalsByNode(start, end time.Time) (map[string]*AllocationTotals, bool)
  390. SetAllocationTotalsByCluster(start, end time.Time, rts map[string]*AllocationTotals)
  391. SetAllocationTotalsByNode(start, end time.Time, rts map[string]*AllocationTotals)
  392. }
  393. // UpdateAllocationTotalsStore updates an AllocationTotalsStore
  394. // by totaling the given AllocationSet and saving the totals.
  395. func UpdateAllocationTotalsStore(arts AllocationTotalsStore, as *AllocationSet) error {
  396. if arts == nil {
  397. return errors.New("cannot update nil AllocationTotalsStore")
  398. }
  399. if as == nil {
  400. return errors.New("cannot update AllocationTotalsStore from nil AllocationSet")
  401. }
  402. if as.Window.IsOpen() {
  403. return errors.New("cannot update AllocationTotalsStore from AllocationSet with open window")
  404. }
  405. start := *as.Window.Start()
  406. end := *as.Window.End()
  407. artsByCluster := ComputeAllocationTotals(as, AllocationClusterProp)
  408. arts.SetAllocationTotalsByCluster(start, end, artsByCluster)
  409. artsByNode := ComputeAllocationTotals(as, AllocationNodeProp)
  410. arts.SetAllocationTotalsByNode(start, end, artsByNode)
  411. log.Infof("ETL: Allocation: updated resource totals for %s", as.Window)
  412. return nil
  413. }
  414. // AssetTotalsStore allows for storing (i.e. setting and getting)
  415. // AssetTotals by cluster and by node.
  416. type AssetTotalsStore interface {
  417. GetAssetTotalsByCluster(start, end time.Time) (map[string]*AssetTotals, bool)
  418. GetAssetTotalsByNode(start, end time.Time) (map[string]*AssetTotals, bool)
  419. SetAssetTotalsByCluster(start, end time.Time, rts map[string]*AssetTotals)
  420. SetAssetTotalsByNode(start, end time.Time, rts map[string]*AssetTotals)
  421. }
  422. // UpdateAssetTotalsStore updates an AssetTotalsStore
  423. // by totaling the given AssetSet and saving the totals.
  424. func UpdateAssetTotalsStore(arts AssetTotalsStore, as *AssetSet) error {
  425. if arts == nil {
  426. return errors.New("cannot update nil AssetTotalsStore")
  427. }
  428. if as == nil {
  429. return errors.New("cannot update AssetTotalsStore from nil AssetSet")
  430. }
  431. if as.Window.IsOpen() {
  432. return errors.New("cannot update AssetTotalsStore from AssetSet with open window")
  433. }
  434. start := *as.Window.Start()
  435. end := *as.Window.End()
  436. artsByCluster := ComputeAssetTotals(as, AssetClusterProp)
  437. arts.SetAssetTotalsByCluster(start, end, artsByCluster)
  438. artsByNode := ComputeAssetTotals(as, AssetNodeProp)
  439. arts.SetAssetTotalsByNode(start, end, artsByNode)
  440. log.Infof("ETL: Asset: updated resource totals for %s", as.Window)
  441. return nil
  442. }
  443. // MemoryTotalsStore is an in-memory cache TotalsStore
  444. type MemoryTotalsStore struct {
  445. allocTotalsByCluster *cache.Cache
  446. allocTotalsByNode *cache.Cache
  447. assetTotalsByCluster *cache.Cache
  448. assetTotalsByNode *cache.Cache
  449. }
  450. // NewMemoryTotalsStore instantiates a new MemoryTotalsStore,
  451. // which is composed of four in-memory caches.
  452. func NewMemoryTotalsStore() *MemoryTotalsStore {
  453. return &MemoryTotalsStore{
  454. allocTotalsByCluster: cache.New(cache.NoExpiration, cache.NoExpiration),
  455. allocTotalsByNode: cache.New(cache.NoExpiration, cache.NoExpiration),
  456. assetTotalsByCluster: cache.New(cache.NoExpiration, cache.NoExpiration),
  457. assetTotalsByNode: cache.New(cache.NoExpiration, cache.NoExpiration),
  458. }
  459. }
  460. // GetAllocationTotalsByCluster retrieves the AllocationTotals
  461. // by cluster for the given start and end times.
  462. func (mts *MemoryTotalsStore) GetAllocationTotalsByCluster(start time.Time, end time.Time) (map[string]*AllocationTotals, bool) {
  463. k := storeKey(start, end)
  464. if raw, ok := mts.allocTotalsByCluster.Get(k); ok {
  465. return raw.(map[string]*AllocationTotals), true
  466. } else {
  467. return map[string]*AllocationTotals{}, false
  468. }
  469. }
  470. // GetAllocationTotalsByNode retrieves the AllocationTotals
  471. // by node for the given start and end times.
  472. func (mts *MemoryTotalsStore) GetAllocationTotalsByNode(start time.Time, end time.Time) (map[string]*AllocationTotals, bool) {
  473. k := storeKey(start, end)
  474. if raw, ok := mts.allocTotalsByNode.Get(k); ok {
  475. return raw.(map[string]*AllocationTotals), true
  476. } else {
  477. return map[string]*AllocationTotals{}, false
  478. }
  479. }
  480. // SetAllocationTotalsByCluster set the per-cluster AllocationTotals
  481. // to the given values for the given start and end times.
  482. func (mts *MemoryTotalsStore) SetAllocationTotalsByCluster(start time.Time, end time.Time, arts map[string]*AllocationTotals) {
  483. k := storeKey(start, end)
  484. mts.allocTotalsByCluster.Set(k, arts, cache.NoExpiration)
  485. }
  486. // SetAllocationTotalsByNode set the per-node AllocationTotals
  487. // to the given values for the given start and end times.
  488. func (mts *MemoryTotalsStore) SetAllocationTotalsByNode(start time.Time, end time.Time, arts map[string]*AllocationTotals) {
  489. k := storeKey(start, end)
  490. mts.allocTotalsByNode.Set(k, arts, cache.NoExpiration)
  491. }
  492. // GetAssetTotalsByCluster retrieves the AssetTotals
  493. // by cluster for the given start and end times.
  494. func (mts *MemoryTotalsStore) GetAssetTotalsByCluster(start time.Time, end time.Time) (map[string]*AssetTotals, bool) {
  495. k := storeKey(start, end)
  496. if raw, ok := mts.assetTotalsByCluster.Get(k); ok {
  497. return raw.(map[string]*AssetTotals), true
  498. } else {
  499. return map[string]*AssetTotals{}, false
  500. }
  501. }
  502. // GetAssetTotalsByNode retrieves the AssetTotals
  503. // by node for the given start and end times.
  504. func (mts *MemoryTotalsStore) GetAssetTotalsByNode(start time.Time, end time.Time) (map[string]*AssetTotals, bool) {
  505. k := storeKey(start, end)
  506. if raw, ok := mts.assetTotalsByNode.Get(k); ok {
  507. return raw.(map[string]*AssetTotals), true
  508. } else {
  509. return map[string]*AssetTotals{}, false
  510. }
  511. }
  512. // SetAssetTotalsByCluster set the per-cluster AssetTotals
  513. // to the given values for the given start and end times.
  514. func (mts *MemoryTotalsStore) SetAssetTotalsByCluster(start time.Time, end time.Time, arts map[string]*AssetTotals) {
  515. k := storeKey(start, end)
  516. mts.assetTotalsByCluster.Set(k, arts, cache.NoExpiration)
  517. }
  518. // SetAssetTotalsByNode set the per-node AssetTotals
  519. // to the given values for the given start and end times.
  520. func (mts *MemoryTotalsStore) SetAssetTotalsByNode(start time.Time, end time.Time, arts map[string]*AssetTotals) {
  521. k := storeKey(start, end)
  522. mts.assetTotalsByNode.Set(k, arts, cache.NoExpiration)
  523. }
  524. // storeKey creates a storage key based on start and end times
  525. func storeKey(start, end time.Time) string {
  526. startStr := strconv.FormatInt(start.Unix(), 10)
  527. endStr := strconv.FormatInt(end.Unix(), 10)
  528. return fmt.Sprintf("%s-%s", startStr, endStr)
  529. }