cluster.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977
  1. package costmodel
  2. import (
  3. "net"
  4. "strconv"
  5. "strings"
  6. "time"
  7. coreenv "github.com/opencost/opencost/core/pkg/env"
  8. "github.com/opencost/opencost/pkg/cloud/provider"
  9. "golang.org/x/exp/slices"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/opencost/opencost/core/pkg/opencost"
  12. "github.com/opencost/opencost/core/pkg/source"
  13. "github.com/opencost/opencost/pkg/cloud/models"
  14. "github.com/opencost/opencost/pkg/env"
  15. )
  16. const MAX_LOCAL_STORAGE_SIZE = 1024 * 1024 * 1024 * 1024
  17. const localStoragePricePerGBHr = 0.04 / 730.0
  18. // When ASSET_INCLUDE_LOCAL_DISK_COST is set to false, local storage
  19. // provisioned by sig-storage-local-static-provisioner is excluded
  20. // by checking if the volume is prefixed by "local-pv-".
  21. //
  22. // This is based on the sig-storage-local-static-provisioner implementation,
  23. // which creates all PVs with the "local-pv-" prefix. For reference, see:
  24. // https://github.com/kubernetes-sigs/sig-storage-local-static-provisioner/blob/b6f465027bd059e92c0032c81dd1e1d90e35c909/pkg/discovery/discovery.go#L410-L417
  25. const SIG_STORAGE_LOCAL_PROVISIONER_PREFIX = "local-pv-"
  26. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  27. // are broken down by cores, memory, and storage.
  28. type ClusterCosts struct {
  29. Start *time.Time `json:"startTime"`
  30. End *time.Time `json:"endTime"`
  31. CPUCumulative float64 `json:"cpuCumulativeCost"`
  32. CPUMonthly float64 `json:"cpuMonthlyCost"`
  33. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  34. GPUCumulative float64 `json:"gpuCumulativeCost"`
  35. GPUMonthly float64 `json:"gpuMonthlyCost"`
  36. RAMCumulative float64 `json:"ramCumulativeCost"`
  37. RAMMonthly float64 `json:"ramMonthlyCost"`
  38. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  39. StorageCumulative float64 `json:"storageCumulativeCost"`
  40. StorageMonthly float64 `json:"storageMonthlyCost"`
  41. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  42. TotalCumulative float64 `json:"totalCumulativeCost"`
  43. TotalMonthly float64 `json:"totalMonthlyCost"`
  44. DataMinutes float64
  45. }
  46. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  47. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  48. type ClusterCostsBreakdown struct {
  49. Idle float64 `json:"idle"`
  50. Other float64 `json:"other"`
  51. System float64 `json:"system"`
  52. User float64 `json:"user"`
  53. }
  54. type Disk struct {
  55. Cluster string
  56. Name string
  57. ProviderID string
  58. StorageClass string
  59. VolumeName string
  60. ClaimName string
  61. ClaimNamespace string
  62. Cost float64
  63. Bytes float64
  64. // These two fields may not be available at all times because they rely on
  65. // a new set of metrics that may or may not be available. Thus, they must
  66. // be nilable to represent the complete absence of the data.
  67. //
  68. // In other words, nilability here lets us distinguish between
  69. // "metric is not available" and "metric is available but is 0".
  70. //
  71. // They end in "Ptr" to distinguish from an earlier version in order to
  72. // ensure that all usages are checked for nil.
  73. BytesUsedAvgPtr *float64
  74. BytesUsedMaxPtr *float64
  75. Local bool
  76. Start time.Time
  77. End time.Time
  78. Minutes float64
  79. Breakdown *ClusterCostsBreakdown
  80. }
  81. type DiskIdentifier struct {
  82. Cluster string
  83. Name string
  84. }
  85. func ClusterDisks(dataSource source.OpenCostDataSource, cp models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  86. resolution := dataSource.Resolution()
  87. grp := source.NewQueryGroup()
  88. mq := dataSource.Metrics()
  89. resChPVCost := source.WithGroup(grp, mq.QueryPVPricePerGiBHour(start, end))
  90. resChPVSize := source.WithGroup(grp, mq.QueryPVBytes(start, end))
  91. resChActiveMins := source.WithGroup(grp, mq.QueryPVActiveMinutes(start, end))
  92. resChPVStorageClass := source.WithGroup(grp, mq.QueryPVInfo(start, end))
  93. resChPVUsedAvg := source.WithGroup(grp, mq.QueryPVUsedAverage(start, end))
  94. resChPVUsedMax := source.WithGroup(grp, mq.QueryPVUsedMax(start, end))
  95. resChPVCInfo := source.WithGroup(grp, mq.QueryPVCInfo(start, end))
  96. resPVCost, _ := resChPVCost.Await()
  97. resPVSize, _ := resChPVSize.Await()
  98. resActiveMins, _ := resChActiveMins.Await()
  99. resPVStorageClass, _ := resChPVStorageClass.Await()
  100. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  101. resPVUsedMax, _ := resChPVUsedMax.Await()
  102. resPVCInfo, _ := resChPVCInfo.Await()
  103. // Cloud providers do not always charge for a node's local disk costs (i.e.
  104. // ephemeral storage). Provide an option to opt out of calculating &
  105. // allocating local disk costs. Note, that this does not affect
  106. // PersistentVolume costs.
  107. //
  108. // Ref:
  109. // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/RootDeviceStorage.html
  110. // https://learn.microsoft.com/en-us/azure/virtual-machines/managed-disks-overview#temporary-disk
  111. // https://cloud.google.com/compute/docs/disks/local-ssd
  112. resLocalStorageUsedAvg := []*source.LocalStorageUsedAvgResult{}
  113. resLocalStorageUsedMax := []*source.LocalStorageUsedMaxResult{}
  114. resLocalStorageBytes := []*source.LocalStorageBytesResult{}
  115. resLocalActiveMins := []*source.LocalStorageActiveMinutesResult{}
  116. if env.IsAssetIncludeLocalDiskCost() {
  117. resChLocalStoreageUsedAvg := source.WithGroup(grp, mq.QueryLocalStorageUsedAvg(start, end))
  118. resChLocalStoreageUsedMax := source.WithGroup(grp, mq.QueryLocalStorageUsedMax(start, end))
  119. resChLocalStorageBytes := source.WithGroup(grp, mq.QueryLocalStorageBytes(start, end))
  120. resChLocalActiveMins := source.WithGroup(grp, mq.QueryLocalStorageActiveMinutes(start, end))
  121. resLocalStorageUsedAvg, _ = resChLocalStoreageUsedAvg.Await()
  122. resLocalStorageUsedMax, _ = resChLocalStoreageUsedMax.Await()
  123. resLocalStorageBytes, _ = resChLocalStorageBytes.Await()
  124. resLocalActiveMins, _ = resChLocalActiveMins.Await()
  125. }
  126. if grp.HasErrors() {
  127. return nil, grp.Error()
  128. }
  129. diskMap := buildAssetsPVCMap(resPVCInfo)
  130. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, cp, opencost.NewClosedWindow(start, end))
  131. type localStorage struct {
  132. device string
  133. disk *Disk
  134. }
  135. localStorageDisks := map[DiskIdentifier]localStorage{}
  136. // Start with local storage bytes so that the device with the largest size which has passed the
  137. // query filters can be determined
  138. for _, result := range resLocalStorageBytes {
  139. cluster := result.Cluster
  140. if cluster == "" {
  141. cluster = coreenv.GetClusterID()
  142. }
  143. name := result.Instance
  144. if name == "" {
  145. log.Warnf("ClusterDisks: local storage data missing instance")
  146. continue
  147. }
  148. device := result.Device
  149. if device == "" {
  150. log.Warnf("ClusterDisks: local storage data missing device")
  151. continue
  152. }
  153. bytes := result.Data[0].Value
  154. // Ignore disks that are larger than the max size
  155. if bytes > MAX_LOCAL_STORAGE_SIZE {
  156. continue
  157. }
  158. key := DiskIdentifier{cluster, name}
  159. // only keep the device with the most bytes per instance
  160. if current, ok := localStorageDisks[key]; !ok || current.disk.Bytes < bytes {
  161. localStorageDisks[key] = localStorage{
  162. device: device,
  163. disk: &Disk{
  164. Cluster: cluster,
  165. Name: name,
  166. Breakdown: &ClusterCostsBreakdown{},
  167. Local: true,
  168. StorageClass: opencost.LocalStorageClass,
  169. Bytes: bytes,
  170. },
  171. }
  172. }
  173. }
  174. for _, result := range resLocalStorageUsedAvg {
  175. cluster := result.Cluster
  176. if cluster == "" {
  177. cluster = coreenv.GetClusterID()
  178. }
  179. name := result.Instance
  180. if name == "" {
  181. log.Warnf("ClusterDisks: local storage data missing instance")
  182. continue
  183. }
  184. device := result.Device
  185. if device == "" {
  186. log.Warnf("ClusterDisks: local storage data missing device")
  187. continue
  188. }
  189. bytesAvg := result.Data[0].Value
  190. key := DiskIdentifier{cluster, name}
  191. ls, ok := localStorageDisks[key]
  192. if !ok || ls.device != device {
  193. continue
  194. }
  195. ls.disk.BytesUsedAvgPtr = &bytesAvg
  196. }
  197. for _, result := range resLocalStorageUsedMax {
  198. cluster := result.Cluster
  199. if cluster == "" {
  200. cluster = coreenv.GetClusterID()
  201. }
  202. name := result.Instance
  203. if name == "" {
  204. log.Warnf("ClusterDisks: local storage data missing instance")
  205. continue
  206. }
  207. device := result.Device
  208. if device == "" {
  209. log.Warnf("ClusterDisks: local storage data missing device")
  210. continue
  211. }
  212. bytesMax := result.Data[0].Value
  213. key := DiskIdentifier{cluster, name}
  214. ls, ok := localStorageDisks[key]
  215. if !ok || ls.device != device {
  216. continue
  217. }
  218. ls.disk.BytesUsedMaxPtr = &bytesMax
  219. }
  220. for _, result := range resLocalActiveMins {
  221. cluster := result.Cluster
  222. if cluster == "" {
  223. cluster = coreenv.GetClusterID()
  224. }
  225. name := result.Node
  226. if name == "" {
  227. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  228. continue
  229. }
  230. providerID := result.ProviderID
  231. if providerID == "" {
  232. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing provider_id")
  233. continue
  234. }
  235. key := DiskIdentifier{cluster, name}
  236. ls, ok := localStorageDisks[key]
  237. if !ok {
  238. continue
  239. }
  240. ls.disk.ProviderID = provider.ParseLocalDiskID(providerID)
  241. if len(result.Data) == 0 {
  242. continue
  243. }
  244. s := time.Unix(int64(result.Data[0].Timestamp), 0)
  245. e := time.Unix(int64(result.Data[len(result.Data)-1].Timestamp), 0)
  246. mins := e.Sub(s).Minutes()
  247. ls.disk.End = e
  248. ls.disk.Start = s
  249. ls.disk.Minutes = mins
  250. // Cost = GiB * hours * $-per-GB-hour
  251. ls.disk.Cost = (ls.disk.Bytes / 1024 / 1024 / 1024) * (ls.disk.Minutes / 60) * localStoragePricePerGBHr
  252. bytesUsedAvg := 0.0
  253. if ls.disk.BytesUsedAvgPtr != nil {
  254. bytesUsedAvg = *ls.disk.BytesUsedAvgPtr
  255. }
  256. // Used Cost = Used GiB * hours * $-per-GB-hour
  257. if ls.disk.Cost > 0 {
  258. ls.disk.Breakdown.System = ((bytesUsedAvg / 1024 / 1024 / 1024) * (ls.disk.Minutes / 60) * localStoragePricePerGBHr) / ls.disk.Cost
  259. } else {
  260. ls.disk.Breakdown.System = 0
  261. }
  262. }
  263. // move local storage disks to main disk map
  264. for key, ls := range localStorageDisks {
  265. diskMap[key] = ls.disk
  266. }
  267. var unTracedDiskLogData []DiskIdentifier
  268. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  269. for _, result := range resPVStorageClass {
  270. cluster := result.Cluster
  271. if cluster == "" {
  272. cluster = coreenv.GetClusterID()
  273. }
  274. name := result.PersistentVolume
  275. key := DiskIdentifier{cluster, name}
  276. if _, ok := diskMap[key]; !ok {
  277. if !slices.Contains(unTracedDiskLogData, key) {
  278. unTracedDiskLogData = append(unTracedDiskLogData, key)
  279. }
  280. continue
  281. }
  282. if len(result.Data) == 0 {
  283. continue
  284. }
  285. storageClass := result.StorageClass
  286. if storageClass == "" {
  287. diskMap[key].StorageClass = opencost.UnknownStorageClass
  288. } else {
  289. diskMap[key].StorageClass = storageClass
  290. }
  291. }
  292. // Logging the unidentified disk information outside the loop
  293. for _, unIdentifiedDisk := range unTracedDiskLogData {
  294. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  295. }
  296. for _, disk := range diskMap {
  297. // Apply all remaining RAM to Idle
  298. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  299. // Set provider Id to the name for reconciliation
  300. if disk.ProviderID == "" {
  301. disk.ProviderID = disk.Name
  302. }
  303. }
  304. if !env.IsAssetIncludeLocalDiskCost() {
  305. return filterOutLocalPVs(diskMap), nil
  306. }
  307. return diskMap, nil
  308. }
  309. type NodeOverhead struct {
  310. CpuOverheadFraction float64
  311. RamOverheadFraction float64
  312. }
  313. type Node struct {
  314. Cluster string
  315. Name string
  316. ProviderID string
  317. NodeType string
  318. CPUCost float64
  319. CPUCores float64
  320. GPUCost float64
  321. GPUCount float64
  322. RAMCost float64
  323. RAMBytes float64
  324. Discount float64
  325. Preemptible bool
  326. CPUBreakdown *ClusterCostsBreakdown
  327. RAMBreakdown *ClusterCostsBreakdown
  328. Start time.Time
  329. End time.Time
  330. Minutes float64
  331. Labels map[string]string
  332. CostPerCPUHr float64
  333. CostPerRAMGiBHr float64
  334. CostPerGPUHr float64
  335. Overhead *NodeOverhead
  336. }
  337. // GKE lies about the number of cores e2 nodes have. This table
  338. // contains a mapping from node type -> actual CPU cores
  339. // for those cases.
  340. var partialCPUMap = map[string]float64{
  341. "e2-micro": 0.25,
  342. "e2-small": 0.5,
  343. "e2-medium": 1.0,
  344. }
  345. type NodeIdentifier struct {
  346. Cluster string
  347. Name string
  348. ProviderID string
  349. }
  350. type nodeIdentifierNoProviderID struct {
  351. Cluster string
  352. Name string
  353. }
  354. type ClusterManagementIdentifier struct {
  355. Cluster string
  356. Provisioner string
  357. }
  358. type ClusterManagementCost struct {
  359. Cluster string
  360. Provisioner string
  361. Cost float64
  362. }
  363. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  364. for k, v := range activeDataMap {
  365. keyNon := nodeIdentifierNoProviderID{
  366. Cluster: k.Cluster,
  367. Name: k.Name,
  368. }
  369. if cost, ok := costMap[k]; ok {
  370. minutes := v.minutes
  371. count := 1.0
  372. if c, ok := resourceCountMap[keyNon]; ok {
  373. count = c
  374. }
  375. costMap[k] = cost * (minutes / 60.0) * count
  376. }
  377. }
  378. }
  379. func costTimesMinute[T comparable](activeDataMap map[T]activeData, costMap map[T]float64) {
  380. for k, v := range activeDataMap {
  381. if cost, ok := costMap[k]; ok {
  382. minutes := v.minutes
  383. costMap[k] = cost * (minutes / 60)
  384. }
  385. }
  386. }
  387. func ClusterNodes(dataSource source.OpenCostDataSource, cp models.Provider, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  388. mq := dataSource.Metrics()
  389. resolution := dataSource.Resolution()
  390. requiredGrp := source.NewQueryGroup()
  391. optionalGrp := source.NewQueryGroup()
  392. // return errors if these fail
  393. resChNodeCPUHourlyCost := source.WithGroup(requiredGrp, mq.QueryNodeCPUPricePerHr(start, end))
  394. resChNodeCPUCoresCapacity := source.WithGroup(requiredGrp, mq.QueryNodeCPUCoresCapacity(start, end))
  395. resChNodeCPUCoresAllocatable := source.WithGroup(requiredGrp, mq.QueryNodeCPUCoresAllocatable(start, end))
  396. resChNodeRAMHourlyCost := source.WithGroup(requiredGrp, mq.QueryNodeRAMPricePerGiBHr(start, end))
  397. resChNodeRAMBytesCapacity := source.WithGroup(requiredGrp, mq.QueryNodeRAMBytesCapacity(start, end))
  398. resChNodeRAMBytesAllocatable := source.WithGroup(requiredGrp, mq.QueryNodeRAMBytesAllocatable(start, end))
  399. resChNodeGPUCount := source.WithGroup(requiredGrp, mq.QueryNodeGPUCount(start, end))
  400. resChNodeGPUHourlyPrice := source.WithGroup(requiredGrp, mq.QueryNodeGPUPricePerHr(start, end))
  401. resChActiveMins := source.WithGroup(requiredGrp, mq.QueryNodeActiveMinutes(start, end))
  402. resChIsSpot := source.WithGroup(requiredGrp, mq.QueryNodeIsSpot(start, end))
  403. // Do not return errors if these fail, but log warnings
  404. resChNodeCPUModeTotal := source.WithGroup(optionalGrp, mq.QueryNodeCPUModeTotal(start, end))
  405. resChNodeRAMSystemPct := source.WithGroup(optionalGrp, mq.QueryNodeRAMSystemPercent(start, end))
  406. resChNodeRAMUserPct := source.WithGroup(optionalGrp, mq.QueryNodeRAMUserPercent(start, end))
  407. resChLabels := source.WithGroup(optionalGrp, mq.QueryNodeLabels(start, end))
  408. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  409. resNodeCPUCoresCapacity, _ := resChNodeCPUCoresCapacity.Await()
  410. resNodeCPUCoresAllocatable, _ := resChNodeCPUCoresAllocatable.Await()
  411. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  412. resNodeGPUHourlyPrice, _ := resChNodeGPUHourlyPrice.Await()
  413. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  414. resNodeRAMBytesCapacity, _ := resChNodeRAMBytesCapacity.Await()
  415. resNodeRAMBytesAllocatable, _ := resChNodeRAMBytesAllocatable.Await()
  416. resIsSpot, _ := resChIsSpot.Await()
  417. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  418. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  419. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  420. resActiveMins, _ := resChActiveMins.Await()
  421. resLabels, _ := resChLabels.Await()
  422. if optionalGrp.HasErrors() {
  423. for _, err := range optionalGrp.Errors() {
  424. log.Warnf("ClusterNodes: %s", err)
  425. }
  426. }
  427. if requiredGrp.HasErrors() {
  428. for _, err := range requiredGrp.Errors() {
  429. log.Errorf("ClusterNodes: %s", err)
  430. }
  431. return nil, requiredGrp.Error()
  432. }
  433. activeDataMap := buildActiveDataMap(resActiveMins, nodeKeyGen, nodeValues, resolution, opencost.NewClosedWindow(start, end))
  434. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  435. preemptibleMap := buildPreemptibleMap(resIsSpot)
  436. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  437. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  438. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyPrice, gpuCountMap, cp, preemptibleMap)
  439. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  440. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  441. cpuCoresCapacityMap := buildCPUCoresMap(resNodeCPUCoresCapacity)
  442. ramBytesCapacityMap := buildRAMBytesMap(resNodeRAMBytesCapacity)
  443. cpuCoresAllocatableMap := buildCPUCoresMap(resNodeCPUCoresAllocatable)
  444. ramBytesAllocatableMap := buildRAMBytesMap(resNodeRAMBytesAllocatable)
  445. overheadMap := buildOverheadMap(ramBytesCapacityMap, ramBytesAllocatableMap, cpuCoresCapacityMap, cpuCoresAllocatableMap)
  446. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  447. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  448. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  449. labelsMap := buildLabelsMap(resLabels)
  450. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresCapacityMap)
  451. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesCapacityMap)
  452. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  453. nodeMap := buildNodeMap(
  454. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  455. cpuCoresCapacityMap, ramBytesCapacityMap, ramUserPctMap,
  456. ramSystemPctMap,
  457. cpuBreakdownMap,
  458. activeDataMap,
  459. preemptibleMap,
  460. labelsMap,
  461. clusterAndNameToType,
  462. overheadMap,
  463. )
  464. c, err := cp.GetConfig()
  465. if err != nil {
  466. return nil, err
  467. }
  468. discount, err := ParsePercentString(c.Discount)
  469. if err != nil {
  470. return nil, err
  471. }
  472. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  473. if err != nil {
  474. return nil, err
  475. }
  476. for _, node := range nodeMap {
  477. // TODO take GKE Reserved Instances into account
  478. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  479. // Apply all remaining resources to Idle
  480. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  481. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  482. }
  483. return nodeMap, nil
  484. }
  485. type LoadBalancerIdentifier struct {
  486. Cluster string
  487. Namespace string
  488. Name string
  489. IngressIP string
  490. }
  491. type LoadBalancer struct {
  492. Cluster string
  493. Namespace string
  494. Name string
  495. ProviderID string
  496. Cost float64
  497. Start time.Time
  498. End time.Time
  499. Minutes float64
  500. Private bool
  501. Ip string
  502. }
  503. func ClusterLoadBalancers(dataSource source.OpenCostDataSource, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  504. resolution := dataSource.Resolution()
  505. grp := source.NewQueryGroup()
  506. mq := dataSource.Metrics()
  507. resChLBCost := source.WithGroup(grp, mq.QueryLBPricePerHr(start, end))
  508. resChActiveMins := source.WithGroup(grp, mq.QueryLBActiveMinutes(start, end))
  509. resLBCost, _ := resChLBCost.Await()
  510. resActiveMins, _ := resChActiveMins.Await()
  511. if grp.HasErrors() {
  512. return nil, grp.Error()
  513. }
  514. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  515. activeMap := buildActiveDataMap(resActiveMins, loadBalancerKeyGen, lbValues, resolution, opencost.NewClosedWindow(start, end))
  516. for _, result := range resLBCost {
  517. key, ok := loadBalancerKeyGen(result)
  518. if !ok {
  519. continue
  520. }
  521. lbPricePerHr := result.Data[0].Value
  522. lb := &LoadBalancer{
  523. Cluster: key.Cluster,
  524. Namespace: key.Namespace,
  525. Name: key.Name,
  526. Cost: lbPricePerHr, // default to hourly cost, overwrite if active entry exists
  527. Ip: key.IngressIP,
  528. Private: privateIPCheck(key.IngressIP),
  529. ProviderID: provider.ParseLBID(key.IngressIP),
  530. }
  531. if active, ok := activeMap[key]; ok {
  532. lb.Start = active.start
  533. lb.End = active.end
  534. lb.Minutes = active.minutes
  535. if lb.Minutes > 0 {
  536. lb.Cost = lbPricePerHr * (lb.Minutes / 60.0)
  537. } else {
  538. log.DedupedWarningf(20, "ClusterLoadBalancers: found zero minutes for key: %v", key)
  539. }
  540. }
  541. loadBalancerMap[key] = lb
  542. }
  543. return loadBalancerMap, nil
  544. }
  545. func ClusterManagement(dataSource source.OpenCostDataSource, start, end time.Time) (map[ClusterManagementIdentifier]*ClusterManagementCost, error) {
  546. resolution := dataSource.Resolution()
  547. grp := source.NewQueryGroup()
  548. mq := dataSource.Metrics()
  549. resChCMPrice := source.WithGroup(grp, mq.QueryClusterManagementPricePerHr(start, end))
  550. resChCMDur := source.WithGroup(grp, mq.QueryClusterManagementDuration(start, end))
  551. resCMPrice, _ := resChCMPrice.Await()
  552. resCMDur, _ := resChCMDur.Await()
  553. if grp.HasErrors() {
  554. return nil, grp.Error()
  555. }
  556. clusterManagementPriceMap := make(map[ClusterManagementIdentifier]*ClusterManagementCost, len(resCMDur))
  557. activeMap := buildActiveDataMap(resCMDur, clusterManagementKeyGen, clusterManagementValues, resolution, opencost.NewClosedWindow(start, end))
  558. for _, result := range resCMPrice {
  559. key, ok := clusterManagementKeyGen(result)
  560. if !ok {
  561. continue
  562. }
  563. cmPricePerHr := result.Data[0].Value
  564. cm := &ClusterManagementCost{
  565. Cluster: key.Cluster,
  566. Provisioner: key.Provisioner,
  567. Cost: cmPricePerHr, // default to hourly cost, overwrite if active entry exists
  568. }
  569. if active, ok := activeMap[key]; ok {
  570. if active.minutes > 0 {
  571. cm.Cost = cmPricePerHr * (active.minutes / 60.0)
  572. } else {
  573. log.DedupedWarningf(20, "ClusterManagement: found zero minutes for key: %v", key)
  574. }
  575. }
  576. clusterManagementPriceMap[key] = cm
  577. }
  578. return clusterManagementPriceMap, nil
  579. }
  580. // Check if an ip is private.
  581. func privateIPCheck(ip string) bool {
  582. ipAddress := net.ParseIP(ip)
  583. return ipAddress.IsPrivate()
  584. }
  585. func pvCosts(
  586. diskMap map[DiskIdentifier]*Disk,
  587. resolution time.Duration,
  588. resActiveMins []*source.PVActiveMinutesResult,
  589. resPVSize []*source.PVBytesResult,
  590. resPVCost []*source.PVPricePerGiBHourResult,
  591. resPVUsedAvg []*source.PVUsedAvgResult,
  592. resPVUsedMax []*source.PVUsedMaxResult,
  593. resPVCInfo []*source.PVCInfoResult,
  594. cp models.Provider,
  595. window opencost.Window,
  596. ) {
  597. for _, result := range resActiveMins {
  598. cluster := result.Cluster
  599. if cluster == "" {
  600. cluster = coreenv.GetClusterID()
  601. }
  602. name := result.PersistentVolume
  603. if name == "" {
  604. log.Warnf("ClusterDisks: active mins missing pv name")
  605. continue
  606. }
  607. if len(result.Data) == 0 {
  608. continue
  609. }
  610. key := DiskIdentifier{
  611. Cluster: cluster,
  612. Name: name,
  613. }
  614. if _, ok := diskMap[key]; !ok {
  615. diskMap[key] = &Disk{
  616. Cluster: cluster,
  617. Name: name,
  618. Breakdown: &ClusterCostsBreakdown{},
  619. }
  620. }
  621. s, e := calculateStartAndEnd(result.Data, resolution, window)
  622. mins := e.Sub(s).Minutes()
  623. diskMap[key].End = e
  624. diskMap[key].Start = s
  625. diskMap[key].Minutes = mins
  626. }
  627. for _, result := range resPVSize {
  628. cluster := result.Cluster
  629. if cluster == "" {
  630. cluster = coreenv.GetClusterID()
  631. }
  632. name := result.PersistentVolume
  633. if name == "" {
  634. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  635. continue
  636. }
  637. // TODO niko/assets storage class
  638. bytes := result.Data[0].Value
  639. key := DiskIdentifier{cluster, name}
  640. if _, ok := diskMap[key]; !ok {
  641. diskMap[key] = &Disk{
  642. Cluster: cluster,
  643. Name: name,
  644. Breakdown: &ClusterCostsBreakdown{},
  645. }
  646. }
  647. diskMap[key].Bytes = bytes
  648. }
  649. customPricingEnabled := provider.CustomPricesEnabled(cp)
  650. customPricingConfig, err := cp.GetConfig()
  651. if err != nil {
  652. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  653. }
  654. for _, result := range resPVCost {
  655. cluster := result.Cluster
  656. if cluster == "" {
  657. cluster = coreenv.GetClusterID()
  658. }
  659. name := result.PersistentVolume
  660. if name == "" {
  661. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  662. continue
  663. }
  664. // TODO niko/assets storage class
  665. var cost float64
  666. if customPricingEnabled && customPricingConfig != nil {
  667. customPVCostStr := customPricingConfig.Storage
  668. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  669. if err != nil {
  670. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  671. }
  672. cost = customPVCost
  673. } else {
  674. cost = result.Data[0].Value
  675. }
  676. key := DiskIdentifier{cluster, name}
  677. if _, ok := diskMap[key]; !ok {
  678. diskMap[key] = &Disk{
  679. Cluster: cluster,
  680. Name: name,
  681. Breakdown: &ClusterCostsBreakdown{},
  682. }
  683. }
  684. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  685. providerID := result.ProviderID // just put the providerID set up here, it's the simplest query.
  686. if providerID != "" {
  687. diskMap[key].ProviderID = provider.ParsePVID(providerID)
  688. }
  689. }
  690. for _, result := range resPVUsedAvg {
  691. cluster := result.Cluster
  692. if cluster == "" {
  693. cluster = coreenv.GetClusterID()
  694. }
  695. claimName := result.PersistentVolumeClaim
  696. if claimName == "" {
  697. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  698. continue
  699. }
  700. claimNamespace := result.Namespace
  701. if claimNamespace == "" {
  702. log.Debugf("ClusterDisks: pv usage data missing namespace")
  703. continue
  704. }
  705. var volumeName string
  706. for _, thatRes := range resPVCInfo {
  707. thatCluster := thatRes.Cluster
  708. if thatCluster == "" {
  709. thatCluster = coreenv.GetClusterID()
  710. }
  711. thatVolumeName := thatRes.VolumeName
  712. if thatVolumeName == "" {
  713. log.Debugf("ClusterDisks: pv claim data missing volumename")
  714. continue
  715. }
  716. thatClaimName := thatRes.PersistentVolumeClaim
  717. if thatClaimName == "" {
  718. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  719. continue
  720. }
  721. thatClaimNamespace := thatRes.Namespace
  722. if thatClaimNamespace == "" {
  723. log.Debugf("ClusterDisks: pv claim data missing namespace")
  724. continue
  725. }
  726. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  727. volumeName = thatVolumeName
  728. }
  729. }
  730. usage := result.Data[0].Value
  731. key := DiskIdentifier{
  732. Cluster: cluster,
  733. Name: volumeName,
  734. }
  735. if _, ok := diskMap[key]; !ok {
  736. diskMap[key] = &Disk{
  737. Cluster: cluster,
  738. Name: volumeName,
  739. Breakdown: &ClusterCostsBreakdown{},
  740. }
  741. }
  742. diskMap[key].BytesUsedAvgPtr = &usage
  743. }
  744. for _, result := range resPVUsedMax {
  745. cluster := result.Cluster
  746. if cluster == "" {
  747. cluster = coreenv.GetClusterID()
  748. }
  749. claimName := result.PersistentVolumeClaim
  750. if claimName == "" {
  751. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  752. continue
  753. }
  754. claimNamespace := result.Namespace
  755. if claimNamespace == "" {
  756. log.Debugf("ClusterDisks: pv usage data missing namespace")
  757. continue
  758. }
  759. var volumeName string
  760. for _, thatRes := range resPVCInfo {
  761. thatCluster := thatRes.Cluster
  762. if thatCluster == "" {
  763. thatCluster = coreenv.GetClusterID()
  764. }
  765. thatVolumeName := thatRes.VolumeName
  766. if thatVolumeName == "" {
  767. log.Debugf("ClusterDisks: pv claim data missing volumename")
  768. continue
  769. }
  770. thatClaimName := thatRes.PersistentVolumeClaim
  771. if thatClaimName == "" {
  772. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  773. continue
  774. }
  775. thatClaimNamespace := thatRes.Namespace
  776. if thatClaimNamespace == "" {
  777. log.Debugf("ClusterDisks: pv claim data missing namespace")
  778. continue
  779. }
  780. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  781. volumeName = thatVolumeName
  782. }
  783. }
  784. usage := result.Data[0].Value
  785. key := DiskIdentifier{cluster, volumeName}
  786. if _, ok := diskMap[key]; !ok {
  787. diskMap[key] = &Disk{
  788. Cluster: cluster,
  789. Name: volumeName,
  790. Breakdown: &ClusterCostsBreakdown{},
  791. }
  792. }
  793. diskMap[key].BytesUsedMaxPtr = &usage
  794. }
  795. }
  796. // filterOutLocalPVs removes local Persistent Volumes (PVs) from the given disk map.
  797. // Local PVs are identified by the prefix "local-pv-" in their names, which is the
  798. // convention used by sig-storage-local-static-provisioner.
  799. //
  800. // Parameters:
  801. // - diskMap: A map of DiskIdentifier to Disk pointers, representing all PVs.
  802. //
  803. // Returns:
  804. // - A new map of DiskIdentifier to Disk pointers, containing only non-local PVs.
  805. func filterOutLocalPVs(diskMap map[DiskIdentifier]*Disk) map[DiskIdentifier]*Disk {
  806. nonLocalPVDiskMap := map[DiskIdentifier]*Disk{}
  807. for key, val := range diskMap {
  808. if !strings.HasPrefix(key.Name, SIG_STORAGE_LOCAL_PROVISIONER_PREFIX) {
  809. nonLocalPVDiskMap[key] = val
  810. }
  811. }
  812. return nonLocalPVDiskMap
  813. }