cluster.go 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024
  1. package costmodel
  2. import (
  3. "net"
  4. "strconv"
  5. "strings"
  6. "time"
  7. coreenv "github.com/opencost/opencost/core/pkg/env"
  8. "github.com/opencost/opencost/pkg/cloud/provider"
  9. "golang.org/x/exp/slices"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/opencost/opencost/core/pkg/opencost"
  12. "github.com/opencost/opencost/core/pkg/source"
  13. "github.com/opencost/opencost/pkg/cloud/models"
  14. "github.com/opencost/opencost/pkg/env"
  15. )
  16. const MAX_LOCAL_STORAGE_SIZE = 1024 * 1024 * 1024 * 1024
  17. // When ASSET_INCLUDE_LOCAL_DISK_COST is set to false, local storage
  18. // provisioned by sig-storage-local-static-provisioner is excluded
  19. // by checking if the volume is prefixed by "local-pv-".
  20. //
  21. // This is based on the sig-storage-local-static-provisioner implementation,
  22. // which creates all PVs with the "local-pv-" prefix. For reference, see:
  23. // https://github.com/kubernetes-sigs/sig-storage-local-static-provisioner/blob/b6f465027bd059e92c0032c81dd1e1d90e35c909/pkg/discovery/discovery.go#L410-L417
  24. const SIG_STORAGE_LOCAL_PROVISIONER_PREFIX = "local-pv-"
  25. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  26. // are broken down by cores, memory, and storage.
  27. type ClusterCosts struct {
  28. Start *time.Time `json:"startTime"`
  29. End *time.Time `json:"endTime"`
  30. CPUCumulative float64 `json:"cpuCumulativeCost"`
  31. CPUMonthly float64 `json:"cpuMonthlyCost"`
  32. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  33. GPUCumulative float64 `json:"gpuCumulativeCost"`
  34. GPUMonthly float64 `json:"gpuMonthlyCost"`
  35. RAMCumulative float64 `json:"ramCumulativeCost"`
  36. RAMMonthly float64 `json:"ramMonthlyCost"`
  37. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  38. StorageCumulative float64 `json:"storageCumulativeCost"`
  39. StorageMonthly float64 `json:"storageMonthlyCost"`
  40. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  41. TotalCumulative float64 `json:"totalCumulativeCost"`
  42. TotalMonthly float64 `json:"totalMonthlyCost"`
  43. DataMinutes float64
  44. }
  45. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  46. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  47. type ClusterCostsBreakdown struct {
  48. Idle float64 `json:"idle"`
  49. Other float64 `json:"other"`
  50. System float64 `json:"system"`
  51. User float64 `json:"user"`
  52. }
  53. type Disk struct {
  54. Cluster string
  55. Name string
  56. ProviderID string
  57. StorageClass string
  58. VolumeName string
  59. ClaimName string
  60. ClaimNamespace string
  61. Cost float64
  62. Bytes float64
  63. // These two fields may not be available at all times because they rely on
  64. // a new set of metrics that may or may not be available. Thus, they must
  65. // be nilable to represent the complete absence of the data.
  66. //
  67. // In other words, nilability here lets us distinguish between
  68. // "metric is not available" and "metric is available but is 0".
  69. //
  70. // They end in "Ptr" to distinguish from an earlier version in order to
  71. // ensure that all usages are checked for nil.
  72. BytesUsedAvgPtr *float64
  73. BytesUsedMaxPtr *float64
  74. Local bool
  75. Start time.Time
  76. End time.Time
  77. Minutes float64
  78. Breakdown *ClusterCostsBreakdown
  79. }
  80. type DiskIdentifier struct {
  81. Cluster string
  82. Name string
  83. }
  84. func ClusterDisks(dataSource source.OpenCostDataSource, cp models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  85. resolution := dataSource.Resolution()
  86. grp := source.NewQueryGroup()
  87. mq := dataSource.Metrics()
  88. resChPVCost := source.WithGroup(grp, mq.QueryPVPricePerGiBHour(start, end))
  89. resChPVSize := source.WithGroup(grp, mq.QueryPVBytes(start, end))
  90. resChActiveMins := source.WithGroup(grp, mq.QueryPVActiveMinutes(start, end))
  91. resChPVStorageClass := source.WithGroup(grp, mq.QueryPVInfo(start, end))
  92. resChPVUsedAvg := source.WithGroup(grp, mq.QueryPVUsedAverage(start, end))
  93. resChPVUsedMax := source.WithGroup(grp, mq.QueryPVUsedMax(start, end))
  94. resChPVCInfo := source.WithGroup(grp, mq.QueryPVCInfo(start, end))
  95. resPVCost, _ := resChPVCost.Await()
  96. resPVSize, _ := resChPVSize.Await()
  97. resActiveMins, _ := resChActiveMins.Await()
  98. resPVStorageClass, _ := resChPVStorageClass.Await()
  99. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  100. resPVUsedMax, _ := resChPVUsedMax.Await()
  101. resPVCInfo, _ := resChPVCInfo.Await()
  102. // Cloud providers do not always charge for a node's local disk costs (i.e.
  103. // ephemeral storage). Provide an option to opt out of calculating &
  104. // allocating local disk costs. Note, that this does not affect
  105. // PersistentVolume costs.
  106. //
  107. // Ref:
  108. // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/RootDeviceStorage.html
  109. // https://learn.microsoft.com/en-us/azure/virtual-machines/managed-disks-overview#temporary-disk
  110. // https://cloud.google.com/compute/docs/disks/local-ssd
  111. resLocalStorageCost := []*source.LocalStorageCostResult{}
  112. resLocalStorageUsedCost := []*source.LocalStorageUsedCostResult{}
  113. resLocalStorageUsedAvg := []*source.LocalStorageUsedAvgResult{}
  114. resLocalStorageUsedMax := []*source.LocalStorageUsedMaxResult{}
  115. resLocalStorageBytes := []*source.LocalStorageBytesResult{}
  116. resLocalActiveMins := []*source.LocalStorageActiveMinutesResult{}
  117. if env.IsAssetIncludeLocalDiskCost() {
  118. resChLocalStorageCost := source.WithGroup(grp, mq.QueryLocalStorageCost(start, end))
  119. resChLocalStorageUsedCost := source.WithGroup(grp, mq.QueryLocalStorageUsedCost(start, end))
  120. resChLocalStoreageUsedAvg := source.WithGroup(grp, mq.QueryLocalStorageUsedAvg(start, end))
  121. resChLocalStoreageUsedMax := source.WithGroup(grp, mq.QueryLocalStorageUsedMax(start, end))
  122. resChLocalStorageBytes := source.WithGroup(grp, mq.QueryLocalStorageBytes(start, end))
  123. resChLocalActiveMins := source.WithGroup(grp, mq.QueryLocalStorageActiveMinutes(start, end))
  124. resLocalStorageCost, _ = resChLocalStorageCost.Await()
  125. resLocalStorageUsedCost, _ = resChLocalStorageUsedCost.Await()
  126. resLocalStorageUsedAvg, _ = resChLocalStoreageUsedAvg.Await()
  127. resLocalStorageUsedMax, _ = resChLocalStoreageUsedMax.Await()
  128. resLocalStorageBytes, _ = resChLocalStorageBytes.Await()
  129. resLocalActiveMins, _ = resChLocalActiveMins.Await()
  130. }
  131. if grp.HasErrors() {
  132. return nil, grp.Error()
  133. }
  134. diskMap := buildAssetsPVCMap(resPVCInfo)
  135. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, cp, opencost.NewClosedWindow(start, end))
  136. type localStorage struct {
  137. device string
  138. disk *Disk
  139. }
  140. localStorageDisks := map[DiskIdentifier]localStorage{}
  141. // Start with local storage bytes so that the device with the largest size which has passed the
  142. // query filters can be determined
  143. for _, result := range resLocalStorageBytes {
  144. cluster := result.Cluster
  145. if cluster == "" {
  146. cluster = coreenv.GetClusterID()
  147. }
  148. name := result.Instance
  149. if name == "" {
  150. log.Warnf("ClusterDisks: local storage data missing instance")
  151. continue
  152. }
  153. device := result.Device
  154. if device == "" {
  155. log.Warnf("ClusterDisks: local storage data missing device")
  156. continue
  157. }
  158. bytes := result.Data[0].Value
  159. // Ignore disks that are larger than the max size
  160. if bytes > MAX_LOCAL_STORAGE_SIZE {
  161. continue
  162. }
  163. key := DiskIdentifier{cluster, name}
  164. // only keep the device with the most bytes per instance
  165. if current, ok := localStorageDisks[key]; !ok || current.disk.Bytes < bytes {
  166. localStorageDisks[key] = localStorage{
  167. device: device,
  168. disk: &Disk{
  169. Cluster: cluster,
  170. Name: name,
  171. Breakdown: &ClusterCostsBreakdown{},
  172. Local: true,
  173. StorageClass: opencost.LocalStorageClass,
  174. Bytes: bytes,
  175. },
  176. }
  177. }
  178. }
  179. for _, result := range resLocalStorageCost {
  180. cluster := result.Cluster
  181. if cluster == "" {
  182. cluster = coreenv.GetClusterID()
  183. }
  184. name := result.Instance
  185. if name == "" {
  186. log.Warnf("ClusterDisks: local storage data missing instance")
  187. continue
  188. }
  189. device := result.Device
  190. if device == "" {
  191. log.Warnf("ClusterDisks: local storage data missing device")
  192. continue
  193. }
  194. cost := result.Data[0].Value
  195. key := DiskIdentifier{cluster, name}
  196. ls, ok := localStorageDisks[key]
  197. if !ok || ls.device != device {
  198. continue
  199. }
  200. ls.disk.Cost = cost
  201. }
  202. for _, result := range resLocalStorageUsedCost {
  203. cluster := result.Cluster
  204. if cluster == "" {
  205. cluster = coreenv.GetClusterID()
  206. }
  207. name := result.Instance
  208. if name == "" {
  209. log.Warnf("ClusterDisks: local storage data missing instance")
  210. continue
  211. }
  212. device := result.Device
  213. if device == "" {
  214. log.Warnf("ClusterDisks: local storage data missing device")
  215. continue
  216. }
  217. cost := result.Data[0].Value
  218. key := DiskIdentifier{cluster, name}
  219. ls, ok := localStorageDisks[key]
  220. if !ok || ls.device != device {
  221. continue
  222. }
  223. ls.disk.Breakdown.System = cost / ls.disk.Cost
  224. }
  225. for _, result := range resLocalStorageUsedAvg {
  226. cluster := result.Cluster
  227. if cluster == "" {
  228. cluster = coreenv.GetClusterID()
  229. }
  230. name := result.Instance
  231. if name == "" {
  232. log.Warnf("ClusterDisks: local storage data missing instance")
  233. continue
  234. }
  235. device := result.Device
  236. if device == "" {
  237. log.Warnf("ClusterDisks: local storage data missing device")
  238. continue
  239. }
  240. bytesAvg := result.Data[0].Value
  241. key := DiskIdentifier{cluster, name}
  242. ls, ok := localStorageDisks[key]
  243. if !ok || ls.device != device {
  244. continue
  245. }
  246. ls.disk.BytesUsedAvgPtr = &bytesAvg
  247. }
  248. for _, result := range resLocalStorageUsedMax {
  249. cluster := result.Cluster
  250. if cluster == "" {
  251. cluster = coreenv.GetClusterID()
  252. }
  253. name := result.Instance
  254. if name == "" {
  255. log.Warnf("ClusterDisks: local storage data missing instance")
  256. continue
  257. }
  258. device := result.Device
  259. if device == "" {
  260. log.Warnf("ClusterDisks: local storage data missing device")
  261. continue
  262. }
  263. bytesMax := result.Data[0].Value
  264. key := DiskIdentifier{cluster, name}
  265. ls, ok := localStorageDisks[key]
  266. if !ok || ls.device != device {
  267. continue
  268. }
  269. ls.disk.BytesUsedMaxPtr = &bytesMax
  270. }
  271. for _, result := range resLocalActiveMins {
  272. cluster := result.Cluster
  273. if cluster == "" {
  274. cluster = coreenv.GetClusterID()
  275. }
  276. name := result.Node
  277. if name == "" {
  278. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  279. continue
  280. }
  281. providerID := result.ProviderID
  282. if providerID == "" {
  283. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing provider_id")
  284. continue
  285. }
  286. key := DiskIdentifier{cluster, name}
  287. ls, ok := localStorageDisks[key]
  288. if !ok {
  289. continue
  290. }
  291. ls.disk.ProviderID = provider.ParseLocalDiskID(providerID)
  292. if len(result.Data) == 0 {
  293. continue
  294. }
  295. s := time.Unix(int64(result.Data[0].Timestamp), 0)
  296. e := time.Unix(int64(result.Data[len(result.Data)-1].Timestamp), 0)
  297. mins := e.Sub(s).Minutes()
  298. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  299. ls.disk.End = e
  300. ls.disk.Start = s
  301. ls.disk.Minutes = mins
  302. }
  303. // move local storage disks to main disk map
  304. for key, ls := range localStorageDisks {
  305. diskMap[key] = ls.disk
  306. }
  307. var unTracedDiskLogData []DiskIdentifier
  308. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  309. for _, result := range resPVStorageClass {
  310. cluster := result.Cluster
  311. if cluster == "" {
  312. cluster = coreenv.GetClusterID()
  313. }
  314. name := result.PersistentVolume
  315. key := DiskIdentifier{cluster, name}
  316. if _, ok := diskMap[key]; !ok {
  317. if !slices.Contains(unTracedDiskLogData, key) {
  318. unTracedDiskLogData = append(unTracedDiskLogData, key)
  319. }
  320. continue
  321. }
  322. if len(result.Data) == 0 {
  323. continue
  324. }
  325. storageClass := result.StorageClass
  326. if storageClass == "" {
  327. diskMap[key].StorageClass = opencost.UnknownStorageClass
  328. } else {
  329. diskMap[key].StorageClass = storageClass
  330. }
  331. }
  332. // Logging the unidentified disk information outside the loop
  333. for _, unIdentifiedDisk := range unTracedDiskLogData {
  334. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  335. }
  336. for _, disk := range diskMap {
  337. // Apply all remaining RAM to Idle
  338. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  339. // Set provider Id to the name for reconciliation
  340. if disk.ProviderID == "" {
  341. disk.ProviderID = disk.Name
  342. }
  343. }
  344. if !env.IsAssetIncludeLocalDiskCost() {
  345. return filterOutLocalPVs(diskMap), nil
  346. }
  347. return diskMap, nil
  348. }
  349. type NodeOverhead struct {
  350. CpuOverheadFraction float64
  351. RamOverheadFraction float64
  352. }
  353. type Node struct {
  354. Cluster string
  355. Name string
  356. ProviderID string
  357. NodeType string
  358. CPUCost float64
  359. CPUCores float64
  360. GPUCost float64
  361. GPUCount float64
  362. RAMCost float64
  363. RAMBytes float64
  364. Discount float64
  365. Preemptible bool
  366. CPUBreakdown *ClusterCostsBreakdown
  367. RAMBreakdown *ClusterCostsBreakdown
  368. Start time.Time
  369. End time.Time
  370. Minutes float64
  371. Labels map[string]string
  372. CostPerCPUHr float64
  373. CostPerRAMGiBHr float64
  374. CostPerGPUHr float64
  375. Overhead *NodeOverhead
  376. }
  377. // GKE lies about the number of cores e2 nodes have. This table
  378. // contains a mapping from node type -> actual CPU cores
  379. // for those cases.
  380. var partialCPUMap = map[string]float64{
  381. "e2-micro": 0.25,
  382. "e2-small": 0.5,
  383. "e2-medium": 1.0,
  384. }
  385. type NodeIdentifier struct {
  386. Cluster string
  387. Name string
  388. ProviderID string
  389. }
  390. type nodeIdentifierNoProviderID struct {
  391. Cluster string
  392. Name string
  393. }
  394. type ClusterManagementIdentifier struct {
  395. Cluster string
  396. Provisioner string
  397. }
  398. type ClusterManagementCost struct {
  399. Cluster string
  400. Provisioner string
  401. Cost float64
  402. }
  403. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  404. for k, v := range activeDataMap {
  405. keyNon := nodeIdentifierNoProviderID{
  406. Cluster: k.Cluster,
  407. Name: k.Name,
  408. }
  409. if cost, ok := costMap[k]; ok {
  410. minutes := v.minutes
  411. count := 1.0
  412. if c, ok := resourceCountMap[keyNon]; ok {
  413. count = c
  414. }
  415. costMap[k] = cost * (minutes / 60.0) * count
  416. }
  417. }
  418. }
  419. func costTimesMinute[T comparable](activeDataMap map[T]activeData, costMap map[T]float64) {
  420. for k, v := range activeDataMap {
  421. if cost, ok := costMap[k]; ok {
  422. minutes := v.minutes
  423. costMap[k] = cost * (minutes / 60)
  424. }
  425. }
  426. }
  427. func ClusterNodes(dataSource source.OpenCostDataSource, cp models.Provider, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  428. mq := dataSource.Metrics()
  429. resolution := dataSource.Resolution()
  430. requiredGrp := source.NewQueryGroup()
  431. optionalGrp := source.NewQueryGroup()
  432. // return errors if these fail
  433. resChNodeCPUHourlyCost := source.WithGroup(requiredGrp, mq.QueryNodeCPUPricePerHr(start, end))
  434. resChNodeCPUCoresCapacity := source.WithGroup(requiredGrp, mq.QueryNodeCPUCoresCapacity(start, end))
  435. resChNodeCPUCoresAllocatable := source.WithGroup(requiredGrp, mq.QueryNodeCPUCoresAllocatable(start, end))
  436. resChNodeRAMHourlyCost := source.WithGroup(requiredGrp, mq.QueryNodeRAMPricePerGiBHr(start, end))
  437. resChNodeRAMBytesCapacity := source.WithGroup(requiredGrp, mq.QueryNodeRAMBytesCapacity(start, end))
  438. resChNodeRAMBytesAllocatable := source.WithGroup(requiredGrp, mq.QueryNodeRAMBytesAllocatable(start, end))
  439. resChNodeGPUCount := source.WithGroup(requiredGrp, mq.QueryNodeGPUCount(start, end))
  440. resChNodeGPUHourlyPrice := source.WithGroup(requiredGrp, mq.QueryNodeGPUPricePerHr(start, end))
  441. resChActiveMins := source.WithGroup(requiredGrp, mq.QueryNodeActiveMinutes(start, end))
  442. resChIsSpot := source.WithGroup(requiredGrp, mq.QueryNodeIsSpot(start, end))
  443. // Do not return errors if these fail, but log warnings
  444. resChNodeCPUModeTotal := source.WithGroup(optionalGrp, mq.QueryNodeCPUModeTotal(start, end))
  445. resChNodeRAMSystemPct := source.WithGroup(optionalGrp, mq.QueryNodeRAMSystemPercent(start, end))
  446. resChNodeRAMUserPct := source.WithGroup(optionalGrp, mq.QueryNodeRAMUserPercent(start, end))
  447. resChLabels := source.WithGroup(optionalGrp, mq.QueryNodeLabels(start, end))
  448. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  449. resNodeCPUCoresCapacity, _ := resChNodeCPUCoresCapacity.Await()
  450. resNodeCPUCoresAllocatable, _ := resChNodeCPUCoresAllocatable.Await()
  451. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  452. resNodeGPUHourlyPrice, _ := resChNodeGPUHourlyPrice.Await()
  453. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  454. resNodeRAMBytesCapacity, _ := resChNodeRAMBytesCapacity.Await()
  455. resNodeRAMBytesAllocatable, _ := resChNodeRAMBytesAllocatable.Await()
  456. resIsSpot, _ := resChIsSpot.Await()
  457. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  458. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  459. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  460. resActiveMins, _ := resChActiveMins.Await()
  461. resLabels, _ := resChLabels.Await()
  462. if optionalGrp.HasErrors() {
  463. for _, err := range optionalGrp.Errors() {
  464. log.Warnf("ClusterNodes: %s", err)
  465. }
  466. }
  467. if requiredGrp.HasErrors() {
  468. for _, err := range requiredGrp.Errors() {
  469. log.Errorf("ClusterNodes: %s", err)
  470. }
  471. return nil, requiredGrp.Error()
  472. }
  473. activeDataMap := buildActiveDataMap(resActiveMins, nodeKeyGen, nodeValues, resolution, opencost.NewClosedWindow(start, end))
  474. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  475. preemptibleMap := buildPreemptibleMap(resIsSpot)
  476. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  477. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  478. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyPrice, gpuCountMap, cp, preemptibleMap)
  479. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  480. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  481. cpuCoresCapacityMap := buildCPUCoresMap(resNodeCPUCoresCapacity)
  482. ramBytesCapacityMap := buildRAMBytesMap(resNodeRAMBytesCapacity)
  483. cpuCoresAllocatableMap := buildCPUCoresMap(resNodeCPUCoresAllocatable)
  484. ramBytesAllocatableMap := buildRAMBytesMap(resNodeRAMBytesAllocatable)
  485. overheadMap := buildOverheadMap(ramBytesCapacityMap, ramBytesAllocatableMap, cpuCoresCapacityMap, cpuCoresAllocatableMap)
  486. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  487. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  488. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  489. labelsMap := buildLabelsMap(resLabels)
  490. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresCapacityMap)
  491. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesCapacityMap)
  492. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  493. nodeMap := buildNodeMap(
  494. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  495. cpuCoresCapacityMap, ramBytesCapacityMap, ramUserPctMap,
  496. ramSystemPctMap,
  497. cpuBreakdownMap,
  498. activeDataMap,
  499. preemptibleMap,
  500. labelsMap,
  501. clusterAndNameToType,
  502. overheadMap,
  503. )
  504. c, err := cp.GetConfig()
  505. if err != nil {
  506. return nil, err
  507. }
  508. discount, err := ParsePercentString(c.Discount)
  509. if err != nil {
  510. return nil, err
  511. }
  512. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  513. if err != nil {
  514. return nil, err
  515. }
  516. for _, node := range nodeMap {
  517. // TODO take GKE Reserved Instances into account
  518. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  519. // Apply all remaining resources to Idle
  520. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  521. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  522. }
  523. return nodeMap, nil
  524. }
  525. type LoadBalancerIdentifier struct {
  526. Cluster string
  527. Namespace string
  528. Name string
  529. IngressIP string
  530. }
  531. type LoadBalancer struct {
  532. Cluster string
  533. Namespace string
  534. Name string
  535. ProviderID string
  536. Cost float64
  537. Start time.Time
  538. End time.Time
  539. Minutes float64
  540. Private bool
  541. Ip string
  542. }
  543. func ClusterLoadBalancers(dataSource source.OpenCostDataSource, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  544. resolution := dataSource.Resolution()
  545. grp := source.NewQueryGroup()
  546. mq := dataSource.Metrics()
  547. resChLBCost := source.WithGroup(grp, mq.QueryLBPricePerHr(start, end))
  548. resChActiveMins := source.WithGroup(grp, mq.QueryLBActiveMinutes(start, end))
  549. resLBCost, _ := resChLBCost.Await()
  550. resActiveMins, _ := resChActiveMins.Await()
  551. if grp.HasErrors() {
  552. return nil, grp.Error()
  553. }
  554. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  555. activeMap := buildActiveDataMap(resActiveMins, loadBalancerKeyGen, lbValues, resolution, opencost.NewClosedWindow(start, end))
  556. for _, result := range resLBCost {
  557. key, ok := loadBalancerKeyGen(result)
  558. if !ok {
  559. continue
  560. }
  561. lbPricePerHr := result.Data[0].Value
  562. lb := &LoadBalancer{
  563. Cluster: key.Cluster,
  564. Namespace: key.Namespace,
  565. Name: key.Name,
  566. Cost: lbPricePerHr, // default to hourly cost, overwrite if active entry exists
  567. Ip: key.IngressIP,
  568. Private: privateIPCheck(key.IngressIP),
  569. ProviderID: provider.ParseLBID(key.IngressIP),
  570. }
  571. if active, ok := activeMap[key]; ok {
  572. lb.Start = active.start
  573. lb.End = active.end
  574. lb.Minutes = active.minutes
  575. if lb.Minutes > 0 {
  576. lb.Cost = lbPricePerHr * (lb.Minutes / 60.0)
  577. } else {
  578. log.DedupedWarningf(20, "ClusterLoadBalancers: found zero minutes for key: %v", key)
  579. }
  580. }
  581. loadBalancerMap[key] = lb
  582. }
  583. return loadBalancerMap, nil
  584. }
  585. func ClusterManagement(dataSource source.OpenCostDataSource, start, end time.Time) (map[ClusterManagementIdentifier]*ClusterManagementCost, error) {
  586. resolution := dataSource.Resolution()
  587. grp := source.NewQueryGroup()
  588. mq := dataSource.Metrics()
  589. resChCMPrice := source.WithGroup(grp, mq.QueryClusterManagementPricePerHr(start, end))
  590. resChCMDur := source.WithGroup(grp, mq.QueryClusterManagementDuration(start, end))
  591. resCMPrice, _ := resChCMPrice.Await()
  592. resCMDur, _ := resChCMDur.Await()
  593. if grp.HasErrors() {
  594. return nil, grp.Error()
  595. }
  596. clusterManagementPriceMap := make(map[ClusterManagementIdentifier]*ClusterManagementCost, len(resCMDur))
  597. activeMap := buildActiveDataMap(resCMDur, clusterManagementKeyGen, clusterManagementValues, resolution, opencost.NewClosedWindow(start, end))
  598. for _, result := range resCMPrice {
  599. key, ok := clusterManagementKeyGen(result)
  600. if !ok {
  601. continue
  602. }
  603. cmPricePerHr := result.Data[0].Value
  604. cm := &ClusterManagementCost{
  605. Cluster: key.Cluster,
  606. Provisioner: key.Provisioner,
  607. Cost: cmPricePerHr, // default to hourly cost, overwrite if active entry exists
  608. }
  609. if active, ok := activeMap[key]; ok {
  610. if active.minutes > 0 {
  611. cm.Cost = cmPricePerHr * (active.minutes / 60.0)
  612. } else {
  613. log.DedupedWarningf(20, "ClusterManagement: found zero minutes for key: %v", key)
  614. }
  615. }
  616. clusterManagementPriceMap[key] = cm
  617. }
  618. return clusterManagementPriceMap, nil
  619. }
  620. // Check if an ip is private.
  621. func privateIPCheck(ip string) bool {
  622. ipAddress := net.ParseIP(ip)
  623. return ipAddress.IsPrivate()
  624. }
  625. func pvCosts(
  626. diskMap map[DiskIdentifier]*Disk,
  627. resolution time.Duration,
  628. resActiveMins []*source.PVActiveMinutesResult,
  629. resPVSize []*source.PVBytesResult,
  630. resPVCost []*source.PVPricePerGiBHourResult,
  631. resPVUsedAvg []*source.PVUsedAvgResult,
  632. resPVUsedMax []*source.PVUsedMaxResult,
  633. resPVCInfo []*source.PVCInfoResult,
  634. cp models.Provider,
  635. window opencost.Window,
  636. ) {
  637. for _, result := range resActiveMins {
  638. cluster := result.Cluster
  639. if cluster == "" {
  640. cluster = coreenv.GetClusterID()
  641. }
  642. name := result.PersistentVolume
  643. if name == "" {
  644. log.Warnf("ClusterDisks: active mins missing pv name")
  645. continue
  646. }
  647. if len(result.Data) == 0 {
  648. continue
  649. }
  650. key := DiskIdentifier{
  651. Cluster: cluster,
  652. Name: name,
  653. }
  654. if _, ok := diskMap[key]; !ok {
  655. diskMap[key] = &Disk{
  656. Cluster: cluster,
  657. Name: name,
  658. Breakdown: &ClusterCostsBreakdown{},
  659. }
  660. }
  661. s, e := calculateStartAndEnd(result.Data, resolution, window)
  662. mins := e.Sub(s).Minutes()
  663. diskMap[key].End = e
  664. diskMap[key].Start = s
  665. diskMap[key].Minutes = mins
  666. }
  667. for _, result := range resPVSize {
  668. cluster := result.Cluster
  669. if cluster == "" {
  670. cluster = coreenv.GetClusterID()
  671. }
  672. name := result.PersistentVolume
  673. if name == "" {
  674. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  675. continue
  676. }
  677. // TODO niko/assets storage class
  678. bytes := result.Data[0].Value
  679. key := DiskIdentifier{cluster, name}
  680. if _, ok := diskMap[key]; !ok {
  681. diskMap[key] = &Disk{
  682. Cluster: cluster,
  683. Name: name,
  684. Breakdown: &ClusterCostsBreakdown{},
  685. }
  686. }
  687. diskMap[key].Bytes = bytes
  688. }
  689. customPricingEnabled := provider.CustomPricesEnabled(cp)
  690. customPricingConfig, err := cp.GetConfig()
  691. if err != nil {
  692. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  693. }
  694. for _, result := range resPVCost {
  695. cluster := result.Cluster
  696. if cluster == "" {
  697. cluster = coreenv.GetClusterID()
  698. }
  699. name := result.PersistentVolume
  700. if name == "" {
  701. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  702. continue
  703. }
  704. // TODO niko/assets storage class
  705. var cost float64
  706. if customPricingEnabled && customPricingConfig != nil {
  707. customPVCostStr := customPricingConfig.Storage
  708. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  709. if err != nil {
  710. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  711. }
  712. cost = customPVCost
  713. } else {
  714. cost = result.Data[0].Value
  715. }
  716. key := DiskIdentifier{cluster, name}
  717. if _, ok := diskMap[key]; !ok {
  718. diskMap[key] = &Disk{
  719. Cluster: cluster,
  720. Name: name,
  721. Breakdown: &ClusterCostsBreakdown{},
  722. }
  723. }
  724. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  725. providerID := result.ProviderID // just put the providerID set up here, it's the simplest query.
  726. if providerID != "" {
  727. diskMap[key].ProviderID = provider.ParsePVID(providerID)
  728. }
  729. }
  730. for _, result := range resPVUsedAvg {
  731. cluster := result.Cluster
  732. if cluster == "" {
  733. cluster = coreenv.GetClusterID()
  734. }
  735. claimName := result.PersistentVolumeClaim
  736. if claimName == "" {
  737. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  738. continue
  739. }
  740. claimNamespace := result.Namespace
  741. if claimNamespace == "" {
  742. log.Debugf("ClusterDisks: pv usage data missing namespace")
  743. continue
  744. }
  745. var volumeName string
  746. for _, thatRes := range resPVCInfo {
  747. thatCluster := thatRes.Cluster
  748. if thatCluster == "" {
  749. thatCluster = coreenv.GetClusterID()
  750. }
  751. thatVolumeName := thatRes.VolumeName
  752. if thatVolumeName == "" {
  753. log.Debugf("ClusterDisks: pv claim data missing volumename")
  754. continue
  755. }
  756. thatClaimName := thatRes.PersistentVolumeClaim
  757. if thatClaimName == "" {
  758. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  759. continue
  760. }
  761. thatClaimNamespace := thatRes.Namespace
  762. if thatClaimNamespace == "" {
  763. log.Debugf("ClusterDisks: pv claim data missing namespace")
  764. continue
  765. }
  766. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  767. volumeName = thatVolumeName
  768. }
  769. }
  770. usage := result.Data[0].Value
  771. key := DiskIdentifier{
  772. Cluster: cluster,
  773. Name: volumeName,
  774. }
  775. if _, ok := diskMap[key]; !ok {
  776. diskMap[key] = &Disk{
  777. Cluster: cluster,
  778. Name: volumeName,
  779. Breakdown: &ClusterCostsBreakdown{},
  780. }
  781. }
  782. diskMap[key].BytesUsedAvgPtr = &usage
  783. }
  784. for _, result := range resPVUsedMax {
  785. cluster := result.Cluster
  786. if cluster == "" {
  787. cluster = coreenv.GetClusterID()
  788. }
  789. claimName := result.PersistentVolumeClaim
  790. if claimName == "" {
  791. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  792. continue
  793. }
  794. claimNamespace := result.Namespace
  795. if claimNamespace == "" {
  796. log.Debugf("ClusterDisks: pv usage data missing namespace")
  797. continue
  798. }
  799. var volumeName string
  800. for _, thatRes := range resPVCInfo {
  801. thatCluster := thatRes.Cluster
  802. if thatCluster == "" {
  803. thatCluster = coreenv.GetClusterID()
  804. }
  805. thatVolumeName := thatRes.VolumeName
  806. if thatVolumeName == "" {
  807. log.Debugf("ClusterDisks: pv claim data missing volumename")
  808. continue
  809. }
  810. thatClaimName := thatRes.PersistentVolumeClaim
  811. if thatClaimName == "" {
  812. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  813. continue
  814. }
  815. thatClaimNamespace := thatRes.Namespace
  816. if thatClaimNamespace == "" {
  817. log.Debugf("ClusterDisks: pv claim data missing namespace")
  818. continue
  819. }
  820. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  821. volumeName = thatVolumeName
  822. }
  823. }
  824. usage := result.Data[0].Value
  825. key := DiskIdentifier{cluster, volumeName}
  826. if _, ok := diskMap[key]; !ok {
  827. diskMap[key] = &Disk{
  828. Cluster: cluster,
  829. Name: volumeName,
  830. Breakdown: &ClusterCostsBreakdown{},
  831. }
  832. }
  833. diskMap[key].BytesUsedMaxPtr = &usage
  834. }
  835. }
  836. // filterOutLocalPVs removes local Persistent Volumes (PVs) from the given disk map.
  837. // Local PVs are identified by the prefix "local-pv-" in their names, which is the
  838. // convention used by sig-storage-local-static-provisioner.
  839. //
  840. // Parameters:
  841. // - diskMap: A map of DiskIdentifier to Disk pointers, representing all PVs.
  842. //
  843. // Returns:
  844. // - A new map of DiskIdentifier to Disk pointers, containing only non-local PVs.
  845. func filterOutLocalPVs(diskMap map[DiskIdentifier]*Disk) map[DiskIdentifier]*Disk {
  846. nonLocalPVDiskMap := map[DiskIdentifier]*Disk{}
  847. for key, val := range diskMap {
  848. if !strings.HasPrefix(key.Name, SIG_STORAGE_LOCAL_PROVISIONER_PREFIX) {
  849. nonLocalPVDiskMap[key] = val
  850. }
  851. }
  852. return nonLocalPVDiskMap
  853. }