cluster.go 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381
  1. package costmodel
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/pkg/cloud/provider"
  9. "golang.org/x/exp/slices"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/opencost/opencost/core/pkg/opencost"
  12. "github.com/opencost/opencost/core/pkg/source"
  13. "github.com/opencost/opencost/core/pkg/util/timeutil"
  14. "github.com/opencost/opencost/pkg/cloud/models"
  15. "github.com/opencost/opencost/pkg/env"
  16. )
  17. const MAX_LOCAL_STORAGE_SIZE = 1024 * 1024 * 1024 * 1024
  18. // When ASSET_INCLUDE_LOCAL_DISK_COST is set to false, local storage
  19. // provisioned by sig-storage-local-static-provisioner is excluded
  20. // by checking if the volume is prefixed by "local-pv-".
  21. //
  22. // This is based on the sig-storage-local-static-provisioner implementation,
  23. // which creates all PVs with the "local-pv-" prefix. For reference, see:
  24. // https://github.com/kubernetes-sigs/sig-storage-local-static-provisioner/blob/b6f465027bd059e92c0032c81dd1e1d90e35c909/pkg/discovery/discovery.go#L410-L417
  25. const SIG_STORAGE_LOCAL_PROVISIONER_PREFIX = "local-pv-"
  26. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  27. // are broken down by cores, memory, and storage.
  28. type ClusterCosts struct {
  29. Start *time.Time `json:"startTime"`
  30. End *time.Time `json:"endTime"`
  31. CPUCumulative float64 `json:"cpuCumulativeCost"`
  32. CPUMonthly float64 `json:"cpuMonthlyCost"`
  33. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  34. GPUCumulative float64 `json:"gpuCumulativeCost"`
  35. GPUMonthly float64 `json:"gpuMonthlyCost"`
  36. RAMCumulative float64 `json:"ramCumulativeCost"`
  37. RAMMonthly float64 `json:"ramMonthlyCost"`
  38. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  39. StorageCumulative float64 `json:"storageCumulativeCost"`
  40. StorageMonthly float64 `json:"storageMonthlyCost"`
  41. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  42. TotalCumulative float64 `json:"totalCumulativeCost"`
  43. TotalMonthly float64 `json:"totalMonthlyCost"`
  44. DataMinutes float64
  45. }
  46. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  47. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  48. type ClusterCostsBreakdown struct {
  49. Idle float64 `json:"idle"`
  50. Other float64 `json:"other"`
  51. System float64 `json:"system"`
  52. User float64 `json:"user"`
  53. }
  54. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  55. // the associated monthly rate data, and returns the Costs.
  56. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  57. start, end := timeutil.ParseTimeRange(window, offset)
  58. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  59. if dataHours == 0 {
  60. dataHours = end.Sub(start).Hours()
  61. }
  62. // Do not allow zero-length windows to prevent divide-by-zero issues
  63. if dataHours == 0 {
  64. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  65. }
  66. cc := &ClusterCosts{
  67. Start: &start,
  68. End: &end,
  69. CPUCumulative: cpu,
  70. GPUCumulative: gpu,
  71. RAMCumulative: ram,
  72. StorageCumulative: storage,
  73. TotalCumulative: cpu + gpu + ram + storage,
  74. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  75. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  76. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  77. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  78. }
  79. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  80. return cc, nil
  81. }
  82. type Disk struct {
  83. Cluster string
  84. Name string
  85. ProviderID string
  86. StorageClass string
  87. VolumeName string
  88. ClaimName string
  89. ClaimNamespace string
  90. Cost float64
  91. Bytes float64
  92. // These two fields may not be available at all times because they rely on
  93. // a new set of metrics that may or may not be available. Thus, they must
  94. // be nilable to represent the complete absence of the data.
  95. //
  96. // In other words, nilability here lets us distinguish between
  97. // "metric is not available" and "metric is available but is 0".
  98. //
  99. // They end in "Ptr" to distinguish from an earlier version in order to
  100. // ensure that all usages are checked for nil.
  101. BytesUsedAvgPtr *float64
  102. BytesUsedMaxPtr *float64
  103. Local bool
  104. Start time.Time
  105. End time.Time
  106. Minutes float64
  107. Breakdown *ClusterCostsBreakdown
  108. }
  109. type DiskIdentifier struct {
  110. Cluster string
  111. Name string
  112. }
  113. func ClusterDisks(dataSource source.OpenCostDataSource, cp models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  114. resolution := env.GetETLResolution()
  115. grp := source.NewQueryGroup()
  116. resChPVCost := source.WithGroup(grp, dataSource.QueryPVPricePerGiBHour(start, end))
  117. resChPVSize := source.WithGroup(grp, dataSource.QueryPVBytes(start, end))
  118. resChActiveMins := source.WithGroup(grp, dataSource.QueryPVActiveMinutes(start, end))
  119. resChPVStorageClass := source.WithGroup(grp, dataSource.QueryPVInfo(start, end))
  120. resChPVUsedAvg := source.WithGroup(grp, dataSource.QueryPVUsedAverage(start, end))
  121. resChPVUsedMax := source.WithGroup(grp, dataSource.QueryPVUsedMax(start, end))
  122. resChPVCInfo := source.WithGroup(grp, dataSource.QueryPVCInfo(start, end))
  123. resPVCost, _ := resChPVCost.Await()
  124. resPVSize, _ := resChPVSize.Await()
  125. resActiveMins, _ := resChActiveMins.Await()
  126. resPVStorageClass, _ := resChPVStorageClass.Await()
  127. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  128. resPVUsedMax, _ := resChPVUsedMax.Await()
  129. resPVCInfo, _ := resChPVCInfo.Await()
  130. // Cloud providers do not always charge for a node's local disk costs (i.e.
  131. // ephemeral storage). Provide an option to opt out of calculating &
  132. // allocating local disk costs. Note, that this does not affect
  133. // PersistentVolume costs.
  134. //
  135. // Ref:
  136. // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/RootDeviceStorage.html
  137. // https://learn.microsoft.com/en-us/azure/virtual-machines/managed-disks-overview#temporary-disk
  138. // https://cloud.google.com/compute/docs/disks/local-ssd
  139. resLocalStorageCost := []*source.LocalStorageCostResult{}
  140. resLocalStorageUsedCost := []*source.LocalStorageUsedCostResult{}
  141. resLocalStorageUsedAvg := []*source.LocalStorageUsedAvgResult{}
  142. resLocalStorageUsedMax := []*source.LocalStorageUsedMaxResult{}
  143. resLocalStorageBytes := []*source.LocalStorageBytesResult{}
  144. resLocalActiveMins := []*source.LocalStorageActiveMinutesResult{}
  145. if env.GetAssetIncludeLocalDiskCost() {
  146. resChLocalStorageCost := source.WithGroup(grp, dataSource.QueryLocalStorageCost(start, end))
  147. resChLocalStorageUsedCost := source.WithGroup(grp, dataSource.QueryLocalStorageUsedCost(start, end))
  148. resChLocalStoreageUsedAvg := source.WithGroup(grp, dataSource.QueryLocalStorageUsedAvg(start, end))
  149. resChLocalStoreageUsedMax := source.WithGroup(grp, dataSource.QueryLocalStorageUsedMax(start, end))
  150. resChLocalStorageBytes := source.WithGroup(grp, dataSource.QueryLocalStorageBytes(start, end))
  151. resChLocalActiveMins := source.WithGroup(grp, dataSource.QueryLocalStorageActiveMinutes(start, end))
  152. resLocalStorageCost, _ = resChLocalStorageCost.Await()
  153. resLocalStorageUsedCost, _ = resChLocalStorageUsedCost.Await()
  154. resLocalStorageUsedAvg, _ = resChLocalStoreageUsedAvg.Await()
  155. resLocalStorageUsedMax, _ = resChLocalStoreageUsedMax.Await()
  156. resLocalStorageBytes, _ = resChLocalStorageBytes.Await()
  157. resLocalActiveMins, _ = resChLocalActiveMins.Await()
  158. }
  159. if grp.HasErrors() {
  160. return nil, grp.Error()
  161. }
  162. diskMap := buildAssetsPVCMap(resPVCInfo)
  163. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, cp, opencost.NewClosedWindow(start, end))
  164. type localStorage struct {
  165. device string
  166. disk *Disk
  167. }
  168. localStorageDisks := map[DiskIdentifier]localStorage{}
  169. // Start with local storage bytes so that the device with the largest size which has passed the
  170. // query filters can be determined
  171. for _, result := range resLocalStorageBytes {
  172. cluster := result.Cluster
  173. if cluster == "" {
  174. cluster = env.GetClusterID()
  175. }
  176. name := result.Instance
  177. if name == "" {
  178. log.Warnf("ClusterDisks: local storage data missing instance")
  179. continue
  180. }
  181. device := result.Device
  182. if device == "" {
  183. log.Warnf("ClusterDisks: local storage data missing device")
  184. continue
  185. }
  186. bytes := result.Data[0].Value
  187. // Ignore disks that are larger than the max size
  188. if bytes > MAX_LOCAL_STORAGE_SIZE {
  189. continue
  190. }
  191. key := DiskIdentifier{cluster, name}
  192. // only keep the device with the most bytes per instance
  193. if current, ok := localStorageDisks[key]; !ok || current.disk.Bytes < bytes {
  194. localStorageDisks[key] = localStorage{
  195. device: device,
  196. disk: &Disk{
  197. Cluster: cluster,
  198. Name: name,
  199. Breakdown: &ClusterCostsBreakdown{},
  200. Local: true,
  201. StorageClass: opencost.LocalStorageClass,
  202. Bytes: bytes,
  203. },
  204. }
  205. }
  206. }
  207. for _, result := range resLocalStorageCost {
  208. cluster := result.Cluster
  209. if cluster == "" {
  210. cluster = env.GetClusterID()
  211. }
  212. name := result.Instance
  213. if name == "" {
  214. log.Warnf("ClusterDisks: local storage data missing instance")
  215. continue
  216. }
  217. device := result.Device
  218. if device == "" {
  219. log.Warnf("ClusterDisks: local storage data missing device")
  220. continue
  221. }
  222. cost := result.Data[0].Value
  223. key := DiskIdentifier{cluster, name}
  224. ls, ok := localStorageDisks[key]
  225. if !ok || ls.device != device {
  226. continue
  227. }
  228. ls.disk.Cost = cost
  229. }
  230. for _, result := range resLocalStorageUsedCost {
  231. cluster := result.Cluster
  232. if cluster == "" {
  233. cluster = env.GetClusterID()
  234. }
  235. name := result.Instance
  236. if name == "" {
  237. log.Warnf("ClusterDisks: local storage data missing instance")
  238. continue
  239. }
  240. device := result.Device
  241. if device == "" {
  242. log.Warnf("ClusterDisks: local storage data missing device")
  243. continue
  244. }
  245. cost := result.Data[0].Value
  246. key := DiskIdentifier{cluster, name}
  247. ls, ok := localStorageDisks[key]
  248. if !ok || ls.device != device {
  249. continue
  250. }
  251. ls.disk.Breakdown.System = cost / ls.disk.Cost
  252. }
  253. for _, result := range resLocalStorageUsedAvg {
  254. cluster := result.Cluster
  255. if cluster == "" {
  256. cluster = env.GetClusterID()
  257. }
  258. name := result.Instance
  259. if name == "" {
  260. log.Warnf("ClusterDisks: local storage data missing instance")
  261. continue
  262. }
  263. device := result.Device
  264. if device == "" {
  265. log.Warnf("ClusterDisks: local storage data missing device")
  266. continue
  267. }
  268. bytesAvg := result.Data[0].Value
  269. key := DiskIdentifier{cluster, name}
  270. ls, ok := localStorageDisks[key]
  271. if !ok || ls.device != device {
  272. continue
  273. }
  274. ls.disk.BytesUsedAvgPtr = &bytesAvg
  275. }
  276. for _, result := range resLocalStorageUsedMax {
  277. cluster := result.Cluster
  278. if cluster == "" {
  279. cluster = env.GetClusterID()
  280. }
  281. name := result.Instance
  282. if name == "" {
  283. log.Warnf("ClusterDisks: local storage data missing instance")
  284. continue
  285. }
  286. device := result.Device
  287. if device == "" {
  288. log.Warnf("ClusterDisks: local storage data missing device")
  289. continue
  290. }
  291. bytesMax := result.Data[0].Value
  292. key := DiskIdentifier{cluster, name}
  293. ls, ok := localStorageDisks[key]
  294. if !ok || ls.device != device {
  295. continue
  296. }
  297. ls.disk.BytesUsedMaxPtr = &bytesMax
  298. }
  299. for _, result := range resLocalActiveMins {
  300. cluster := result.Cluster
  301. if cluster == "" {
  302. cluster = env.GetClusterID()
  303. }
  304. name := result.Node
  305. if name == "" {
  306. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  307. continue
  308. }
  309. providerID := result.ProviderID
  310. if providerID == "" {
  311. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  312. continue
  313. }
  314. key := DiskIdentifier{cluster, name}
  315. ls, ok := localStorageDisks[key]
  316. if !ok {
  317. continue
  318. }
  319. ls.disk.ProviderID = provider.ParseLocalDiskID(providerID)
  320. if len(result.Data) == 0 {
  321. continue
  322. }
  323. s := time.Unix(int64(result.Data[0].Timestamp), 0)
  324. e := time.Unix(int64(result.Data[len(result.Data)-1].Timestamp), 0)
  325. mins := e.Sub(s).Minutes()
  326. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  327. ls.disk.End = e
  328. ls.disk.Start = s
  329. ls.disk.Minutes = mins
  330. }
  331. // move local storage disks to main disk map
  332. for key, ls := range localStorageDisks {
  333. diskMap[key] = ls.disk
  334. }
  335. var unTracedDiskLogData []DiskIdentifier
  336. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  337. for _, result := range resPVStorageClass {
  338. cluster := result.Cluster
  339. if cluster == "" {
  340. cluster = env.GetClusterID()
  341. }
  342. name := result.PersistentVolume
  343. key := DiskIdentifier{cluster, name}
  344. if _, ok := diskMap[key]; !ok {
  345. if !slices.Contains(unTracedDiskLogData, key) {
  346. unTracedDiskLogData = append(unTracedDiskLogData, key)
  347. }
  348. continue
  349. }
  350. if len(result.Data) == 0 {
  351. continue
  352. }
  353. storageClass := result.StorageClass
  354. if storageClass == "" {
  355. diskMap[key].StorageClass = opencost.UnknownStorageClass
  356. } else {
  357. diskMap[key].StorageClass = storageClass
  358. }
  359. }
  360. // Logging the unidentified disk information outside the loop
  361. for _, unIdentifiedDisk := range unTracedDiskLogData {
  362. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  363. }
  364. for _, disk := range diskMap {
  365. // Apply all remaining RAM to Idle
  366. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  367. // Set provider Id to the name for reconciliation
  368. if disk.ProviderID == "" {
  369. disk.ProviderID = disk.Name
  370. }
  371. }
  372. if !env.GetAssetIncludeLocalDiskCost() {
  373. return filterOutLocalPVs(diskMap), nil
  374. }
  375. return diskMap, nil
  376. }
  377. type NodeOverhead struct {
  378. CpuOverheadFraction float64
  379. RamOverheadFraction float64
  380. }
  381. type Node struct {
  382. Cluster string
  383. Name string
  384. ProviderID string
  385. NodeType string
  386. CPUCost float64
  387. CPUCores float64
  388. GPUCost float64
  389. GPUCount float64
  390. RAMCost float64
  391. RAMBytes float64
  392. Discount float64
  393. Preemptible bool
  394. CPUBreakdown *ClusterCostsBreakdown
  395. RAMBreakdown *ClusterCostsBreakdown
  396. Start time.Time
  397. End time.Time
  398. Minutes float64
  399. Labels map[string]string
  400. CostPerCPUHr float64
  401. CostPerRAMGiBHr float64
  402. CostPerGPUHr float64
  403. Overhead *NodeOverhead
  404. }
  405. // GKE lies about the number of cores e2 nodes have. This table
  406. // contains a mapping from node type -> actual CPU cores
  407. // for those cases.
  408. var partialCPUMap = map[string]float64{
  409. "e2-micro": 0.25,
  410. "e2-small": 0.5,
  411. "e2-medium": 1.0,
  412. }
  413. type NodeIdentifier struct {
  414. Cluster string
  415. Name string
  416. ProviderID string
  417. }
  418. type nodeIdentifierNoProviderID struct {
  419. Cluster string
  420. Name string
  421. }
  422. type ClusterManagementIdentifier struct {
  423. Cluster string
  424. Provisioner string
  425. }
  426. type ClusterManagementCost struct {
  427. Cluster string
  428. Provisioner string
  429. Cost float64
  430. }
  431. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  432. for k, v := range activeDataMap {
  433. keyNon := nodeIdentifierNoProviderID{
  434. Cluster: k.Cluster,
  435. Name: k.Name,
  436. }
  437. if cost, ok := costMap[k]; ok {
  438. minutes := v.minutes
  439. count := 1.0
  440. if c, ok := resourceCountMap[keyNon]; ok {
  441. count = c
  442. }
  443. costMap[k] = cost * (minutes / 60.0) * count
  444. }
  445. }
  446. }
  447. func costTimesMinute[T comparable](activeDataMap map[T]activeData, costMap map[T]float64) {
  448. for k, v := range activeDataMap {
  449. if cost, ok := costMap[k]; ok {
  450. minutes := v.minutes
  451. costMap[k] = cost * (minutes / 60)
  452. }
  453. }
  454. }
  455. func ClusterNodes(dataSource source.OpenCostDataSource, cp models.Provider, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  456. resolution := env.GetETLResolution()
  457. requiredGrp := source.NewQueryGroup()
  458. optionalGrp := source.NewQueryGroup()
  459. // return errors if these fail
  460. resChNodeCPUHourlyCost := source.WithGroup(requiredGrp, dataSource.QueryNodeCPUPricePerHr(start, end))
  461. resChNodeCPUCoresCapacity := source.WithGroup(requiredGrp, dataSource.QueryNodeCPUCoresCapacity(start, end))
  462. resChNodeCPUCoresAllocatable := source.WithGroup(requiredGrp, dataSource.QueryNodeCPUCoresAllocatable(start, end))
  463. resChNodeRAMHourlyCost := source.WithGroup(requiredGrp, dataSource.QueryNodeRAMPricePerGiBHr(start, end))
  464. resChNodeRAMBytesCapacity := source.WithGroup(requiredGrp, dataSource.QueryNodeRAMBytesCapacity(start, end))
  465. resChNodeRAMBytesAllocatable := source.WithGroup(requiredGrp, dataSource.QueryNodeRAMBytesAllocatable(start, end))
  466. resChNodeGPUCount := source.WithGroup(requiredGrp, dataSource.QueryNodeGPUCount(start, end))
  467. resChNodeGPUHourlyPrice := source.WithGroup(requiredGrp, dataSource.QueryNodeGPUPricePerHr(start, end))
  468. resChActiveMins := source.WithGroup(requiredGrp, dataSource.QueryNodeActiveMinutes(start, end))
  469. resChIsSpot := source.WithGroup(requiredGrp, dataSource.QueryNodeIsSpot(start, end))
  470. // Do not return errors if these fail, but log warnings
  471. resChNodeCPUModeTotal := source.WithGroup(optionalGrp, dataSource.QueryNodeCPUModeTotal(start, end))
  472. resChNodeRAMSystemPct := source.WithGroup(optionalGrp, dataSource.QueryNodeRAMSystemPercent(start, end))
  473. resChNodeRAMUserPct := source.WithGroup(optionalGrp, dataSource.QueryNodeRAMUserPercent(start, end))
  474. resChLabels := source.WithGroup(optionalGrp, dataSource.QueryNodeLabels(start, end))
  475. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  476. resNodeCPUCoresCapacity, _ := resChNodeCPUCoresCapacity.Await()
  477. resNodeCPUCoresAllocatable, _ := resChNodeCPUCoresAllocatable.Await()
  478. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  479. resNodeGPUHourlyPrice, _ := resChNodeGPUHourlyPrice.Await()
  480. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  481. resNodeRAMBytesCapacity, _ := resChNodeRAMBytesCapacity.Await()
  482. resNodeRAMBytesAllocatable, _ := resChNodeRAMBytesAllocatable.Await()
  483. resIsSpot, _ := resChIsSpot.Await()
  484. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  485. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  486. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  487. resActiveMins, _ := resChActiveMins.Await()
  488. resLabels, _ := resChLabels.Await()
  489. if optionalGrp.HasErrors() {
  490. for _, err := range optionalGrp.Errors() {
  491. log.Warnf("ClusterNodes: %s", err)
  492. }
  493. }
  494. if requiredGrp.HasErrors() {
  495. for _, err := range requiredGrp.Errors() {
  496. log.Errorf("ClusterNodes: %s", err)
  497. }
  498. return nil, requiredGrp.Error()
  499. }
  500. activeDataMap := buildActiveDataMap(resActiveMins, nodeKeyGen, nodeValues, resolution, opencost.NewClosedWindow(start, end))
  501. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  502. preemptibleMap := buildPreemptibleMap(resIsSpot)
  503. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  504. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  505. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyPrice, gpuCountMap, cp, preemptibleMap)
  506. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  507. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  508. cpuCoresCapacityMap := buildCPUCoresMap(resNodeCPUCoresCapacity)
  509. ramBytesCapacityMap := buildRAMBytesMap(resNodeRAMBytesCapacity)
  510. cpuCoresAllocatableMap := buildCPUCoresMap(resNodeCPUCoresAllocatable)
  511. ramBytesAllocatableMap := buildRAMBytesMap(resNodeRAMBytesAllocatable)
  512. overheadMap := buildOverheadMap(ramBytesCapacityMap, ramBytesAllocatableMap, cpuCoresCapacityMap, cpuCoresAllocatableMap)
  513. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  514. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  515. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  516. labelsMap := buildLabelsMap(resLabels)
  517. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresCapacityMap)
  518. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesCapacityMap)
  519. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  520. nodeMap := buildNodeMap(
  521. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  522. cpuCoresCapacityMap, ramBytesCapacityMap, ramUserPctMap,
  523. ramSystemPctMap,
  524. cpuBreakdownMap,
  525. activeDataMap,
  526. preemptibleMap,
  527. labelsMap,
  528. clusterAndNameToType,
  529. overheadMap,
  530. )
  531. c, err := cp.GetConfig()
  532. if err != nil {
  533. return nil, err
  534. }
  535. discount, err := ParsePercentString(c.Discount)
  536. if err != nil {
  537. return nil, err
  538. }
  539. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  540. if err != nil {
  541. return nil, err
  542. }
  543. for _, node := range nodeMap {
  544. // TODO take GKE Reserved Instances into account
  545. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  546. // Apply all remaining resources to Idle
  547. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  548. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  549. }
  550. return nodeMap, nil
  551. }
  552. type LoadBalancerIdentifier struct {
  553. Cluster string
  554. Namespace string
  555. Name string
  556. IngressIP string
  557. }
  558. type LoadBalancer struct {
  559. Cluster string
  560. Namespace string
  561. Name string
  562. ProviderID string
  563. Cost float64
  564. Start time.Time
  565. End time.Time
  566. Minutes float64
  567. Private bool
  568. Ip string
  569. }
  570. func ClusterLoadBalancers(dataSource source.OpenCostDataSource, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  571. resolution := env.GetETLResolution()
  572. grp := source.NewQueryGroup()
  573. resChLBCost := source.WithGroup(grp, dataSource.QueryLBPricePerHr(start, end))
  574. resChActiveMins := source.WithGroup(grp, dataSource.QueryLBActiveMinutes(start, end))
  575. resLBCost, _ := resChLBCost.Await()
  576. resActiveMins, _ := resChActiveMins.Await()
  577. if grp.HasErrors() {
  578. return nil, grp.Error()
  579. }
  580. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  581. activeMap := buildActiveDataMap(resActiveMins, loadBalancerKeyGen, lbValues, resolution, opencost.NewClosedWindow(start, end))
  582. for _, result := range resLBCost {
  583. key, ok := loadBalancerKeyGen(result)
  584. if !ok {
  585. continue
  586. }
  587. lbPricePerHr := result.Data[0].Value
  588. lb := &LoadBalancer{
  589. Cluster: key.Cluster,
  590. Namespace: key.Namespace,
  591. Name: key.Name,
  592. Cost: lbPricePerHr, // default to hourly cost, overwrite if active entry exists
  593. Ip: key.IngressIP,
  594. Private: privateIPCheck(key.IngressIP),
  595. }
  596. if active, ok := activeMap[key]; ok {
  597. lb.Start = active.start
  598. lb.End = active.end
  599. lb.Minutes = active.minutes
  600. if active.minutes > 0 {
  601. lb.Cost = lbPricePerHr * (active.minutes / 60.0)
  602. } else {
  603. log.DedupedWarningf(20, "ClusterLoadBalancers: found zero minutes for key: %v", key)
  604. }
  605. }
  606. loadBalancerMap[key] = lb
  607. }
  608. return loadBalancerMap, nil
  609. }
  610. func ClusterManagement(dataSource source.OpenCostDataSource, start, end time.Time) (map[ClusterManagementIdentifier]*ClusterManagementCost, error) {
  611. resolution := env.GetETLResolution()
  612. grp := source.NewQueryGroup()
  613. resChCMPrice := source.WithGroup(grp, dataSource.QueryClusterManagementPricePerHr(start, end))
  614. resChCMDur := source.WithGroup(grp, dataSource.QueryClusterManagementDuration(start, end))
  615. resCMPrice, _ := resChCMPrice.Await()
  616. resCMDur, _ := resChCMDur.Await()
  617. if grp.HasErrors() {
  618. return nil, grp.Error()
  619. }
  620. clusterManagementPriceMap := make(map[ClusterManagementIdentifier]*ClusterManagementCost, len(resCMDur))
  621. activeMap := buildActiveDataMap(resCMDur, clusterManagementKeyGen, clusterManagementValues, resolution, opencost.NewClosedWindow(start, end))
  622. for _, result := range resCMPrice {
  623. key, ok := clusterManagementKeyGen(result)
  624. if !ok {
  625. continue
  626. }
  627. cmPricePerHr := result.Data[0].Value
  628. cm := &ClusterManagementCost{
  629. Cluster: key.Cluster,
  630. Provisioner: key.Provisioner,
  631. Cost: cmPricePerHr, // default to hourly cost, overwrite if active entry exists
  632. }
  633. if active, ok := activeMap[key]; ok {
  634. if active.minutes > 0 {
  635. cm.Cost = cmPricePerHr * (active.minutes / 60.0)
  636. } else {
  637. log.DedupedWarningf(20, "ClusterManagement: found zero minutes for key: %v", key)
  638. }
  639. }
  640. clusterManagementPriceMap[key] = cm
  641. }
  642. return clusterManagementPriceMap, nil
  643. }
  644. // Check if an ip is private.
  645. func privateIPCheck(ip string) bool {
  646. ipAddress := net.ParseIP(ip)
  647. return ipAddress.IsPrivate()
  648. }
  649. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  650. func (a *Accesses) ComputeClusterCosts(dataSource source.OpenCostDataSource, provider models.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  651. if window < 10*time.Minute {
  652. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  653. }
  654. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  655. start, end := timeutil.ParseTimeRange(window, offset)
  656. mins := end.Sub(start).Minutes()
  657. providerName := ""
  658. if clusterInfo, err := provider.ClusterInfo(); err != nil {
  659. providerName = clusterInfo["provider"]
  660. }
  661. grp := source.NewQueryGroup()
  662. queryDataCount := source.WithGroup(grp, dataSource.QueryDataCount(start, end))
  663. queryTotalGPU := source.WithGroup(grp, dataSource.QueryTotalGPU(start, end))
  664. queryTotalCPU := source.WithGroup(grp, dataSource.QueryTotalCPU(start, end))
  665. queryTotalRAM := source.WithGroup(grp, dataSource.QueryTotalRAM(start, end))
  666. queryTotalStorage := source.WithGroup(grp, dataSource.QueryTotalStorage(start, end))
  667. queryTotalLocalStorage := source.WithGroup(grp, dataSource.QueryLocalStorageBytesByProvider(providerName, start, end))
  668. var queryCPUModePct *source.QueryGroupFuture[source.NodeCPUModePercentResult]
  669. var queryRAMSystemPct *source.QueryGroupFuture[source.NodeRAMSystemPercentResult]
  670. var queryRAMUserPct *source.QueryGroupFuture[source.NodeRAMUserPercentResult]
  671. var queryUsedLocalStorage *source.QueryGroupFuture[source.LocalStorageUsedByProviderResult]
  672. if withBreakdown {
  673. queryCPUModePct = source.WithGroup(grp, dataSource.QueryNodeCPUModePercent(start, end))
  674. queryRAMSystemPct = source.WithGroup(grp, dataSource.QueryNodeRAMSystemPercent(start, end))
  675. queryRAMUserPct = source.WithGroup(grp, dataSource.QueryNodeRAMUserPercent(start, end))
  676. queryUsedLocalStorage = source.WithGroup(grp, dataSource.QueryLocalStorageUsedByProvider(providerName, start, end))
  677. }
  678. resDataCount, _ := queryDataCount.Await()
  679. resTotalGPU, _ := queryTotalGPU.Await()
  680. resTotalCPU, _ := queryTotalCPU.Await()
  681. resTotalRAM, _ := queryTotalRAM.Await()
  682. resTotalStorage, _ := queryTotalStorage.Await()
  683. if grp.HasErrors() {
  684. return nil, grp.Error()
  685. }
  686. defaultClusterID := env.GetClusterID()
  687. dataMinsByCluster := map[string]float64{}
  688. for _, result := range resDataCount {
  689. clusterID := result.Cluster
  690. if clusterID == "" {
  691. clusterID = defaultClusterID
  692. }
  693. dataMins := mins
  694. if len(result.Data) > 0 {
  695. dataMins = result.Data[0].Value
  696. } else {
  697. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  698. }
  699. dataMinsByCluster[clusterID] = dataMins
  700. }
  701. // Determine combined discount
  702. discount, customDiscount := 0.0, 0.0
  703. c, err := a.CloudProvider.GetConfig()
  704. if err == nil {
  705. discount, err = ParsePercentString(c.Discount)
  706. if err != nil {
  707. discount = 0.0
  708. }
  709. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  710. if err != nil {
  711. customDiscount = 0.0
  712. }
  713. }
  714. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  715. costData := make(map[string]map[string]float64)
  716. // Helper function to iterate over Prom query results, parsing the raw values into
  717. // the intermediate costData structure.
  718. setCostsFromResults := func(costData map[string]map[string]float64, results []*source.TotalResult, name string, discount float64, customDiscount float64) {
  719. for _, result := range results {
  720. clusterID := result.Cluster
  721. if clusterID == "" {
  722. clusterID = defaultClusterID
  723. }
  724. if _, ok := costData[clusterID]; !ok {
  725. costData[clusterID] = map[string]float64{}
  726. }
  727. if len(result.Data) > 0 {
  728. costData[clusterID][name] += result.Data[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  729. costData[clusterID]["total"] += result.Data[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  730. }
  731. }
  732. }
  733. // Apply both sustained use and custom discounts to RAM and CPU
  734. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  735. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  736. // Apply only custom discount to GPU and storage
  737. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  738. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  739. resTotalLocalStorage, err := queryTotalLocalStorage.Await()
  740. if err != nil {
  741. return nil, err
  742. }
  743. if len(resTotalLocalStorage) > 0 {
  744. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  745. }
  746. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  747. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  748. pvUsedCostMap := map[string]float64{}
  749. if withBreakdown {
  750. resCPUModePct, _ := queryCPUModePct.Await()
  751. resRAMSystemPct, _ := queryRAMSystemPct.Await()
  752. resRAMUserPct, _ := queryRAMUserPct.Await()
  753. if grp.HasErrors() {
  754. return nil, grp.Error()
  755. }
  756. for _, result := range resCPUModePct {
  757. clusterID := result.Cluster
  758. if clusterID == "" {
  759. clusterID = defaultClusterID
  760. }
  761. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  762. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  763. }
  764. cpuBD := cpuBreakdownMap[clusterID]
  765. mode := result.Mode
  766. if mode == "" {
  767. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  768. mode = "other"
  769. }
  770. switch mode {
  771. case "idle":
  772. cpuBD.Idle += result.Data[0].Value
  773. case "system":
  774. cpuBD.System += result.Data[0].Value
  775. case "user":
  776. cpuBD.User += result.Data[0].Value
  777. default:
  778. cpuBD.Other += result.Data[0].Value
  779. }
  780. }
  781. for _, result := range resRAMSystemPct {
  782. clusterID := result.Cluster
  783. if clusterID == "" {
  784. clusterID = defaultClusterID
  785. }
  786. if _, ok := ramBreakdownMap[clusterID]; !ok {
  787. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  788. }
  789. ramBD := ramBreakdownMap[clusterID]
  790. ramBD.System += result.Data[0].Value
  791. }
  792. for _, result := range resRAMUserPct {
  793. clusterID := result.Cluster
  794. if clusterID == "" {
  795. clusterID = defaultClusterID
  796. }
  797. if _, ok := ramBreakdownMap[clusterID]; !ok {
  798. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  799. }
  800. ramBD := ramBreakdownMap[clusterID]
  801. ramBD.User += result.Data[0].Value
  802. }
  803. for _, ramBD := range ramBreakdownMap {
  804. remaining := 1.0
  805. remaining -= ramBD.Other
  806. remaining -= ramBD.System
  807. remaining -= ramBD.User
  808. ramBD.Idle = remaining
  809. }
  810. resUsedLocalStorage, err := queryUsedLocalStorage.Await()
  811. if err != nil {
  812. return nil, err
  813. }
  814. for _, result := range resUsedLocalStorage {
  815. clusterID := result.Cluster
  816. if clusterID == "" {
  817. clusterID = defaultClusterID
  818. }
  819. pvUsedCostMap[clusterID] += result.Data[0].Value
  820. }
  821. }
  822. if grp.HasErrors() {
  823. for _, err := range grp.Errors() {
  824. log.Errorf("ComputeClusterCosts: %s", err)
  825. }
  826. return nil, grp.Error()
  827. }
  828. // Convert intermediate structure to Costs instances
  829. costsByCluster := map[string]*ClusterCosts{}
  830. for id, cd := range costData {
  831. dataMins, ok := dataMinsByCluster[id]
  832. if !ok {
  833. dataMins = mins
  834. log.Warnf("Cluster cost data count not found for cluster %s", id)
  835. }
  836. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  837. if err != nil {
  838. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  839. return nil, err
  840. }
  841. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  842. costs.CPUBreakdown = cpuBD
  843. }
  844. if ramBD, ok := ramBreakdownMap[id]; ok {
  845. costs.RAMBreakdown = ramBD
  846. }
  847. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  848. if pvUC, ok := pvUsedCostMap[id]; ok {
  849. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  850. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  851. }
  852. costs.DataMinutes = dataMins
  853. costsByCluster[id] = costs
  854. }
  855. return costsByCluster, nil
  856. }
  857. type Totals struct {
  858. TotalCost [][]string `json:"totalcost"`
  859. CPUCost [][]string `json:"cpucost"`
  860. MemCost [][]string `json:"memcost"`
  861. StorageCost [][]string `json:"storageCost"`
  862. }
  863. func resultToTotals(qrs []*source.ClusterResult) ([][]string, error) {
  864. if len(qrs) == 0 {
  865. return [][]string{}, fmt.Errorf("not enough data available in the selected time range")
  866. }
  867. result := qrs[0]
  868. totals := [][]string{}
  869. for _, value := range result.Data {
  870. d0 := fmt.Sprintf("%f", value.Timestamp)
  871. d1 := fmt.Sprintf("%f", value.Value)
  872. toAppend := []string{
  873. d0,
  874. d1,
  875. }
  876. totals = append(totals, toAppend)
  877. }
  878. return totals, nil
  879. }
  880. // ClusterCostsOverTime gives the full cluster costs over time
  881. func ClusterCostsOverTime(dataSource source.OpenCostDataSource, provider models.Provider, start, end time.Time, window, offset time.Duration) (*Totals, error) {
  882. providerName := ""
  883. if clusterInfo, err := provider.ClusterInfo(); err != nil {
  884. providerName = clusterInfo["provider"]
  885. }
  886. grp := source.NewQueryGroup()
  887. qCores := source.WithGroup(grp, dataSource.QueryClusterCores(start, end, window))
  888. qRAM := source.WithGroup(grp, dataSource.QueryClusterRAM(start, end, window))
  889. qStorage := source.WithGroup(grp, dataSource.QueryClusterStorageByProvider(providerName, start, end, window))
  890. qTotal := source.WithGroup(grp, dataSource.QueryClusterTotalByProvider(providerName, start, end, window))
  891. resultClusterCores, _ := qCores.Await()
  892. resultClusterRAM, _ := qRAM.Await()
  893. resultStorage, _ := qStorage.Await()
  894. resultTotal, _ := qTotal.Await()
  895. if grp.HasErrors() {
  896. return nil, grp.Error()
  897. }
  898. coreTotal, err := resultToTotals(resultClusterCores)
  899. if err != nil {
  900. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  901. return nil, err
  902. }
  903. ramTotal, err := resultToTotals(resultClusterRAM)
  904. if err != nil {
  905. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  906. return nil, err
  907. }
  908. storageTotal, err := resultToTotals(resultStorage)
  909. if err != nil {
  910. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  911. }
  912. clusterTotal, err := resultToTotals(resultTotal)
  913. if err != nil {
  914. // If clusterTotal query failed, it's likely because there are no PVs, which
  915. // causes the qTotal query to return no data. Instead, query only node costs.
  916. // If that fails, return an error because something is actually wrong.
  917. qNodes := source.WithGroup(grp, dataSource.QueryClusterNodesByProvider(providerName, start, end, window))
  918. resultNodes, err := qNodes.Await()
  919. if err != nil {
  920. return nil, err
  921. }
  922. clusterTotal, err = resultToTotals(resultNodes)
  923. if err != nil {
  924. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  925. return nil, err
  926. }
  927. }
  928. return &Totals{
  929. TotalCost: clusterTotal,
  930. CPUCost: coreTotal,
  931. MemCost: ramTotal,
  932. StorageCost: storageTotal,
  933. }, nil
  934. }
  935. func pvCosts(
  936. diskMap map[DiskIdentifier]*Disk,
  937. resolution time.Duration,
  938. resActiveMins []*source.PVActiveMinutesResult,
  939. resPVSize []*source.PVBytesResult,
  940. resPVCost []*source.PVPricePerGiBHourResult,
  941. resPVUsedAvg []*source.PVUsedAvgResult,
  942. resPVUsedMax []*source.PVUsedMaxResult,
  943. resPVCInfo []*source.PVCInfoResult,
  944. cp models.Provider,
  945. window opencost.Window,
  946. ) {
  947. for _, result := range resActiveMins {
  948. cluster := result.Cluster
  949. if cluster == "" {
  950. cluster = env.GetClusterID()
  951. }
  952. name := result.PersistentVolume
  953. if name == "" {
  954. log.Warnf("ClusterDisks: active mins missing pv name")
  955. continue
  956. }
  957. if len(result.Data) == 0 {
  958. continue
  959. }
  960. key := DiskIdentifier{cluster, name}
  961. if _, ok := diskMap[key]; !ok {
  962. diskMap[key] = &Disk{
  963. Cluster: cluster,
  964. Name: name,
  965. Breakdown: &ClusterCostsBreakdown{},
  966. }
  967. }
  968. s, e := calculateStartAndEnd(result.Data, resolution, window)
  969. mins := e.Sub(s).Minutes()
  970. diskMap[key].End = e
  971. diskMap[key].Start = s
  972. diskMap[key].Minutes = mins
  973. }
  974. for _, result := range resPVSize {
  975. cluster := result.Cluster
  976. if cluster == "" {
  977. cluster = env.GetClusterID()
  978. }
  979. name := result.PersistentVolume
  980. if name == "" {
  981. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  982. continue
  983. }
  984. // TODO niko/assets storage class
  985. bytes := result.Data[0].Value
  986. key := DiskIdentifier{cluster, name}
  987. if _, ok := diskMap[key]; !ok {
  988. diskMap[key] = &Disk{
  989. Cluster: cluster,
  990. Name: name,
  991. Breakdown: &ClusterCostsBreakdown{},
  992. }
  993. }
  994. diskMap[key].Bytes = bytes
  995. }
  996. customPricingEnabled := provider.CustomPricesEnabled(cp)
  997. customPricingConfig, err := cp.GetConfig()
  998. if err != nil {
  999. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1000. }
  1001. for _, result := range resPVCost {
  1002. cluster := result.Cluster
  1003. if cluster == "" {
  1004. cluster = env.GetClusterID()
  1005. }
  1006. name := result.PersistentVolume
  1007. if name == "" {
  1008. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1009. continue
  1010. }
  1011. // TODO niko/assets storage class
  1012. var cost float64
  1013. if customPricingEnabled && customPricingConfig != nil {
  1014. customPVCostStr := customPricingConfig.Storage
  1015. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1016. if err != nil {
  1017. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1018. }
  1019. cost = customPVCost
  1020. } else {
  1021. cost = result.Data[0].Value
  1022. }
  1023. key := DiskIdentifier{cluster, name}
  1024. if _, ok := diskMap[key]; !ok {
  1025. diskMap[key] = &Disk{
  1026. Cluster: cluster,
  1027. Name: name,
  1028. Breakdown: &ClusterCostsBreakdown{},
  1029. }
  1030. }
  1031. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1032. providerID := result.ProviderID // just put the providerID set up here, it's the simplest query.
  1033. if providerID != "" {
  1034. diskMap[key].ProviderID = provider.ParsePVID(providerID)
  1035. }
  1036. }
  1037. for _, result := range resPVUsedAvg {
  1038. cluster := result.Cluster
  1039. if cluster == "" {
  1040. cluster = env.GetClusterID()
  1041. }
  1042. claimName := result.PersistentVolumeClaim
  1043. if claimName == "" {
  1044. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1045. continue
  1046. }
  1047. claimNamespace := result.Namespace
  1048. if claimNamespace == "" {
  1049. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1050. continue
  1051. }
  1052. var volumeName string
  1053. for _, thatRes := range resPVCInfo {
  1054. thatCluster := thatRes.Cluster
  1055. if thatCluster == "" {
  1056. thatCluster = env.GetClusterID()
  1057. }
  1058. thatVolumeName := thatRes.VolumeName
  1059. if thatVolumeName == "" {
  1060. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1061. continue
  1062. }
  1063. thatClaimName := thatRes.PersistentVolumeClaim
  1064. if thatClaimName == "" {
  1065. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1066. continue
  1067. }
  1068. thatClaimNamespace := thatRes.Namespace
  1069. if thatClaimNamespace == "" {
  1070. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1071. continue
  1072. }
  1073. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1074. volumeName = thatVolumeName
  1075. }
  1076. }
  1077. usage := result.Data[0].Value
  1078. key := DiskIdentifier{cluster, volumeName}
  1079. if _, ok := diskMap[key]; !ok {
  1080. diskMap[key] = &Disk{
  1081. Cluster: cluster,
  1082. Name: volumeName,
  1083. Breakdown: &ClusterCostsBreakdown{},
  1084. }
  1085. }
  1086. diskMap[key].BytesUsedAvgPtr = &usage
  1087. }
  1088. for _, result := range resPVUsedMax {
  1089. cluster := result.Cluster
  1090. if cluster == "" {
  1091. cluster = env.GetClusterID()
  1092. }
  1093. claimName := result.PersistentVolumeClaim
  1094. if claimName == "" {
  1095. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1096. continue
  1097. }
  1098. claimNamespace := result.Namespace
  1099. if claimNamespace == "" {
  1100. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1101. continue
  1102. }
  1103. var volumeName string
  1104. for _, thatRes := range resPVCInfo {
  1105. thatCluster := thatRes.Cluster
  1106. if thatCluster == "" {
  1107. thatCluster = env.GetClusterID()
  1108. }
  1109. thatVolumeName := thatRes.VolumeName
  1110. if thatVolumeName == "" {
  1111. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1112. continue
  1113. }
  1114. thatClaimName := thatRes.PersistentVolumeClaim
  1115. if thatClaimName == "" {
  1116. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1117. continue
  1118. }
  1119. thatClaimNamespace := thatRes.Namespace
  1120. if thatClaimNamespace == "" {
  1121. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1122. continue
  1123. }
  1124. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1125. volumeName = thatVolumeName
  1126. }
  1127. }
  1128. usage := result.Data[0].Value
  1129. key := DiskIdentifier{cluster, volumeName}
  1130. if _, ok := diskMap[key]; !ok {
  1131. diskMap[key] = &Disk{
  1132. Cluster: cluster,
  1133. Name: volumeName,
  1134. Breakdown: &ClusterCostsBreakdown{},
  1135. }
  1136. }
  1137. diskMap[key].BytesUsedMaxPtr = &usage
  1138. }
  1139. }
  1140. // filterOutLocalPVs removes local Persistent Volumes (PVs) from the given disk map.
  1141. // Local PVs are identified by the prefix "local-pv-" in their names, which is the
  1142. // convention used by sig-storage-local-static-provisioner.
  1143. //
  1144. // Parameters:
  1145. // - diskMap: A map of DiskIdentifier to Disk pointers, representing all PVs.
  1146. //
  1147. // Returns:
  1148. // - A new map of DiskIdentifier to Disk pointers, containing only non-local PVs.
  1149. func filterOutLocalPVs(diskMap map[DiskIdentifier]*Disk) map[DiskIdentifier]*Disk {
  1150. nonLocalPVDiskMap := map[DiskIdentifier]*Disk{}
  1151. for key, val := range diskMap {
  1152. if !strings.HasPrefix(key.Name, SIG_STORAGE_LOCAL_PROVISIONER_PREFIX) {
  1153. nonLocalPVDiskMap[key] = val
  1154. }
  1155. }
  1156. return nonLocalPVDiskMap
  1157. }