cluster.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023
  1. package costmodel
  2. import (
  3. "net"
  4. "strconv"
  5. "strings"
  6. "time"
  7. "github.com/opencost/opencost/pkg/cloud/provider"
  8. "golang.org/x/exp/slices"
  9. "github.com/opencost/opencost/core/pkg/log"
  10. "github.com/opencost/opencost/core/pkg/opencost"
  11. "github.com/opencost/opencost/core/pkg/source"
  12. "github.com/opencost/opencost/pkg/cloud/models"
  13. "github.com/opencost/opencost/pkg/env"
  14. )
  15. const MAX_LOCAL_STORAGE_SIZE = 1024 * 1024 * 1024 * 1024
  16. // When ASSET_INCLUDE_LOCAL_DISK_COST is set to false, local storage
  17. // provisioned by sig-storage-local-static-provisioner is excluded
  18. // by checking if the volume is prefixed by "local-pv-".
  19. //
  20. // This is based on the sig-storage-local-static-provisioner implementation,
  21. // which creates all PVs with the "local-pv-" prefix. For reference, see:
  22. // https://github.com/kubernetes-sigs/sig-storage-local-static-provisioner/blob/b6f465027bd059e92c0032c81dd1e1d90e35c909/pkg/discovery/discovery.go#L410-L417
  23. const SIG_STORAGE_LOCAL_PROVISIONER_PREFIX = "local-pv-"
  24. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  25. // are broken down by cores, memory, and storage.
  26. type ClusterCosts struct {
  27. Start *time.Time `json:"startTime"`
  28. End *time.Time `json:"endTime"`
  29. CPUCumulative float64 `json:"cpuCumulativeCost"`
  30. CPUMonthly float64 `json:"cpuMonthlyCost"`
  31. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  32. GPUCumulative float64 `json:"gpuCumulativeCost"`
  33. GPUMonthly float64 `json:"gpuMonthlyCost"`
  34. RAMCumulative float64 `json:"ramCumulativeCost"`
  35. RAMMonthly float64 `json:"ramMonthlyCost"`
  36. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  37. StorageCumulative float64 `json:"storageCumulativeCost"`
  38. StorageMonthly float64 `json:"storageMonthlyCost"`
  39. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  40. TotalCumulative float64 `json:"totalCumulativeCost"`
  41. TotalMonthly float64 `json:"totalMonthlyCost"`
  42. DataMinutes float64
  43. }
  44. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  45. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  46. type ClusterCostsBreakdown struct {
  47. Idle float64 `json:"idle"`
  48. Other float64 `json:"other"`
  49. System float64 `json:"system"`
  50. User float64 `json:"user"`
  51. }
  52. type Disk struct {
  53. Cluster string
  54. Name string
  55. ProviderID string
  56. StorageClass string
  57. VolumeName string
  58. ClaimName string
  59. ClaimNamespace string
  60. Cost float64
  61. Bytes float64
  62. // These two fields may not be available at all times because they rely on
  63. // a new set of metrics that may or may not be available. Thus, they must
  64. // be nilable to represent the complete absence of the data.
  65. //
  66. // In other words, nilability here lets us distinguish between
  67. // "metric is not available" and "metric is available but is 0".
  68. //
  69. // They end in "Ptr" to distinguish from an earlier version in order to
  70. // ensure that all usages are checked for nil.
  71. BytesUsedAvgPtr *float64
  72. BytesUsedMaxPtr *float64
  73. Local bool
  74. Start time.Time
  75. End time.Time
  76. Minutes float64
  77. Breakdown *ClusterCostsBreakdown
  78. }
  79. type DiskIdentifier struct {
  80. Cluster string
  81. Name string
  82. }
  83. func ClusterDisks(dataSource source.OpenCostDataSource, cp models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  84. resolution := env.GetETLResolution()
  85. grp := source.NewQueryGroup()
  86. mq := dataSource.Metrics()
  87. resChPVCost := source.WithGroup(grp, mq.QueryPVPricePerGiBHour(start, end))
  88. resChPVSize := source.WithGroup(grp, mq.QueryPVBytes(start, end))
  89. resChActiveMins := source.WithGroup(grp, mq.QueryPVActiveMinutes(start, end))
  90. resChPVStorageClass := source.WithGroup(grp, mq.QueryPVInfo(start, end))
  91. resChPVUsedAvg := source.WithGroup(grp, mq.QueryPVUsedAverage(start, end))
  92. resChPVUsedMax := source.WithGroup(grp, mq.QueryPVUsedMax(start, end))
  93. resChPVCInfo := source.WithGroup(grp, mq.QueryPVCInfo(start, end))
  94. resPVCost, _ := resChPVCost.Await()
  95. resPVSize, _ := resChPVSize.Await()
  96. resActiveMins, _ := resChActiveMins.Await()
  97. resPVStorageClass, _ := resChPVStorageClass.Await()
  98. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  99. resPVUsedMax, _ := resChPVUsedMax.Await()
  100. resPVCInfo, _ := resChPVCInfo.Await()
  101. // Cloud providers do not always charge for a node's local disk costs (i.e.
  102. // ephemeral storage). Provide an option to opt out of calculating &
  103. // allocating local disk costs. Note, that this does not affect
  104. // PersistentVolume costs.
  105. //
  106. // Ref:
  107. // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/RootDeviceStorage.html
  108. // https://learn.microsoft.com/en-us/azure/virtual-machines/managed-disks-overview#temporary-disk
  109. // https://cloud.google.com/compute/docs/disks/local-ssd
  110. resLocalStorageCost := []*source.LocalStorageCostResult{}
  111. resLocalStorageUsedCost := []*source.LocalStorageUsedCostResult{}
  112. resLocalStorageUsedAvg := []*source.LocalStorageUsedAvgResult{}
  113. resLocalStorageUsedMax := []*source.LocalStorageUsedMaxResult{}
  114. resLocalStorageBytes := []*source.LocalStorageBytesResult{}
  115. resLocalActiveMins := []*source.LocalStorageActiveMinutesResult{}
  116. if env.IsAssetIncludeLocalDiskCost() {
  117. resChLocalStorageCost := source.WithGroup(grp, mq.QueryLocalStorageCost(start, end))
  118. resChLocalStorageUsedCost := source.WithGroup(grp, mq.QueryLocalStorageUsedCost(start, end))
  119. resChLocalStoreageUsedAvg := source.WithGroup(grp, mq.QueryLocalStorageUsedAvg(start, end))
  120. resChLocalStoreageUsedMax := source.WithGroup(grp, mq.QueryLocalStorageUsedMax(start, end))
  121. resChLocalStorageBytes := source.WithGroup(grp, mq.QueryLocalStorageBytes(start, end))
  122. resChLocalActiveMins := source.WithGroup(grp, mq.QueryLocalStorageActiveMinutes(start, end))
  123. resLocalStorageCost, _ = resChLocalStorageCost.Await()
  124. resLocalStorageUsedCost, _ = resChLocalStorageUsedCost.Await()
  125. resLocalStorageUsedAvg, _ = resChLocalStoreageUsedAvg.Await()
  126. resLocalStorageUsedMax, _ = resChLocalStoreageUsedMax.Await()
  127. resLocalStorageBytes, _ = resChLocalStorageBytes.Await()
  128. resLocalActiveMins, _ = resChLocalActiveMins.Await()
  129. }
  130. if grp.HasErrors() {
  131. return nil, grp.Error()
  132. }
  133. diskMap := buildAssetsPVCMap(resPVCInfo)
  134. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, cp, opencost.NewClosedWindow(start, end))
  135. type localStorage struct {
  136. device string
  137. disk *Disk
  138. }
  139. localStorageDisks := map[DiskIdentifier]localStorage{}
  140. // Start with local storage bytes so that the device with the largest size which has passed the
  141. // query filters can be determined
  142. for _, result := range resLocalStorageBytes {
  143. cluster := result.Cluster
  144. if cluster == "" {
  145. cluster = env.GetClusterID()
  146. }
  147. name := result.Instance
  148. if name == "" {
  149. log.Warnf("ClusterDisks: local storage data missing instance")
  150. continue
  151. }
  152. device := result.Device
  153. if device == "" {
  154. log.Warnf("ClusterDisks: local storage data missing device")
  155. continue
  156. }
  157. bytes := result.Data[0].Value
  158. // Ignore disks that are larger than the max size
  159. if bytes > MAX_LOCAL_STORAGE_SIZE {
  160. continue
  161. }
  162. key := DiskIdentifier{cluster, name}
  163. // only keep the device with the most bytes per instance
  164. if current, ok := localStorageDisks[key]; !ok || current.disk.Bytes < bytes {
  165. localStorageDisks[key] = localStorage{
  166. device: device,
  167. disk: &Disk{
  168. Cluster: cluster,
  169. Name: name,
  170. Breakdown: &ClusterCostsBreakdown{},
  171. Local: true,
  172. StorageClass: opencost.LocalStorageClass,
  173. Bytes: bytes,
  174. },
  175. }
  176. }
  177. }
  178. for _, result := range resLocalStorageCost {
  179. cluster := result.Cluster
  180. if cluster == "" {
  181. cluster = env.GetClusterID()
  182. }
  183. name := result.Instance
  184. if name == "" {
  185. log.Warnf("ClusterDisks: local storage data missing instance")
  186. continue
  187. }
  188. device := result.Device
  189. if device == "" {
  190. log.Warnf("ClusterDisks: local storage data missing device")
  191. continue
  192. }
  193. cost := result.Data[0].Value
  194. key := DiskIdentifier{cluster, name}
  195. ls, ok := localStorageDisks[key]
  196. if !ok || ls.device != device {
  197. continue
  198. }
  199. ls.disk.Cost = cost
  200. }
  201. for _, result := range resLocalStorageUsedCost {
  202. cluster := result.Cluster
  203. if cluster == "" {
  204. cluster = env.GetClusterID()
  205. }
  206. name := result.Instance
  207. if name == "" {
  208. log.Warnf("ClusterDisks: local storage data missing instance")
  209. continue
  210. }
  211. device := result.Device
  212. if device == "" {
  213. log.Warnf("ClusterDisks: local storage data missing device")
  214. continue
  215. }
  216. cost := result.Data[0].Value
  217. key := DiskIdentifier{cluster, name}
  218. ls, ok := localStorageDisks[key]
  219. if !ok || ls.device != device {
  220. continue
  221. }
  222. ls.disk.Breakdown.System = cost / ls.disk.Cost
  223. }
  224. for _, result := range resLocalStorageUsedAvg {
  225. cluster := result.Cluster
  226. if cluster == "" {
  227. cluster = env.GetClusterID()
  228. }
  229. name := result.Instance
  230. if name == "" {
  231. log.Warnf("ClusterDisks: local storage data missing instance")
  232. continue
  233. }
  234. device := result.Device
  235. if device == "" {
  236. log.Warnf("ClusterDisks: local storage data missing device")
  237. continue
  238. }
  239. bytesAvg := result.Data[0].Value
  240. key := DiskIdentifier{cluster, name}
  241. ls, ok := localStorageDisks[key]
  242. if !ok || ls.device != device {
  243. continue
  244. }
  245. ls.disk.BytesUsedAvgPtr = &bytesAvg
  246. }
  247. for _, result := range resLocalStorageUsedMax {
  248. cluster := result.Cluster
  249. if cluster == "" {
  250. cluster = env.GetClusterID()
  251. }
  252. name := result.Instance
  253. if name == "" {
  254. log.Warnf("ClusterDisks: local storage data missing instance")
  255. continue
  256. }
  257. device := result.Device
  258. if device == "" {
  259. log.Warnf("ClusterDisks: local storage data missing device")
  260. continue
  261. }
  262. bytesMax := result.Data[0].Value
  263. key := DiskIdentifier{cluster, name}
  264. ls, ok := localStorageDisks[key]
  265. if !ok || ls.device != device {
  266. continue
  267. }
  268. ls.disk.BytesUsedMaxPtr = &bytesMax
  269. }
  270. for _, result := range resLocalActiveMins {
  271. cluster := result.Cluster
  272. if cluster == "" {
  273. cluster = env.GetClusterID()
  274. }
  275. name := result.Node
  276. if name == "" {
  277. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  278. continue
  279. }
  280. providerID := result.ProviderID
  281. if providerID == "" {
  282. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing provider_id")
  283. continue
  284. }
  285. key := DiskIdentifier{cluster, name}
  286. ls, ok := localStorageDisks[key]
  287. if !ok {
  288. continue
  289. }
  290. ls.disk.ProviderID = provider.ParseLocalDiskID(providerID)
  291. if len(result.Data) == 0 {
  292. continue
  293. }
  294. s := time.Unix(int64(result.Data[0].Timestamp), 0)
  295. e := time.Unix(int64(result.Data[len(result.Data)-1].Timestamp), 0)
  296. mins := e.Sub(s).Minutes()
  297. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  298. ls.disk.End = e
  299. ls.disk.Start = s
  300. ls.disk.Minutes = mins
  301. }
  302. // move local storage disks to main disk map
  303. for key, ls := range localStorageDisks {
  304. diskMap[key] = ls.disk
  305. }
  306. var unTracedDiskLogData []DiskIdentifier
  307. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  308. for _, result := range resPVStorageClass {
  309. cluster := result.Cluster
  310. if cluster == "" {
  311. cluster = env.GetClusterID()
  312. }
  313. name := result.PersistentVolume
  314. key := DiskIdentifier{cluster, name}
  315. if _, ok := diskMap[key]; !ok {
  316. if !slices.Contains(unTracedDiskLogData, key) {
  317. unTracedDiskLogData = append(unTracedDiskLogData, key)
  318. }
  319. continue
  320. }
  321. if len(result.Data) == 0 {
  322. continue
  323. }
  324. storageClass := result.StorageClass
  325. if storageClass == "" {
  326. diskMap[key].StorageClass = opencost.UnknownStorageClass
  327. } else {
  328. diskMap[key].StorageClass = storageClass
  329. }
  330. }
  331. // Logging the unidentified disk information outside the loop
  332. for _, unIdentifiedDisk := range unTracedDiskLogData {
  333. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  334. }
  335. for _, disk := range diskMap {
  336. // Apply all remaining RAM to Idle
  337. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  338. // Set provider Id to the name for reconciliation
  339. if disk.ProviderID == "" {
  340. disk.ProviderID = disk.Name
  341. }
  342. }
  343. if !env.IsAssetIncludeLocalDiskCost() {
  344. return filterOutLocalPVs(diskMap), nil
  345. }
  346. return diskMap, nil
  347. }
  348. type NodeOverhead struct {
  349. CpuOverheadFraction float64
  350. RamOverheadFraction float64
  351. }
  352. type Node struct {
  353. Cluster string
  354. Name string
  355. ProviderID string
  356. NodeType string
  357. CPUCost float64
  358. CPUCores float64
  359. GPUCost float64
  360. GPUCount float64
  361. RAMCost float64
  362. RAMBytes float64
  363. Discount float64
  364. Preemptible bool
  365. CPUBreakdown *ClusterCostsBreakdown
  366. RAMBreakdown *ClusterCostsBreakdown
  367. Start time.Time
  368. End time.Time
  369. Minutes float64
  370. Labels map[string]string
  371. CostPerCPUHr float64
  372. CostPerRAMGiBHr float64
  373. CostPerGPUHr float64
  374. Overhead *NodeOverhead
  375. }
  376. // GKE lies about the number of cores e2 nodes have. This table
  377. // contains a mapping from node type -> actual CPU cores
  378. // for those cases.
  379. var partialCPUMap = map[string]float64{
  380. "e2-micro": 0.25,
  381. "e2-small": 0.5,
  382. "e2-medium": 1.0,
  383. }
  384. type NodeIdentifier struct {
  385. Cluster string
  386. Name string
  387. ProviderID string
  388. }
  389. type nodeIdentifierNoProviderID struct {
  390. Cluster string
  391. Name string
  392. }
  393. type ClusterManagementIdentifier struct {
  394. Cluster string
  395. Provisioner string
  396. }
  397. type ClusterManagementCost struct {
  398. Cluster string
  399. Provisioner string
  400. Cost float64
  401. }
  402. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  403. for k, v := range activeDataMap {
  404. keyNon := nodeIdentifierNoProviderID{
  405. Cluster: k.Cluster,
  406. Name: k.Name,
  407. }
  408. if cost, ok := costMap[k]; ok {
  409. minutes := v.minutes
  410. count := 1.0
  411. if c, ok := resourceCountMap[keyNon]; ok {
  412. count = c
  413. }
  414. costMap[k] = cost * (minutes / 60.0) * count
  415. }
  416. }
  417. }
  418. func costTimesMinute[T comparable](activeDataMap map[T]activeData, costMap map[T]float64) {
  419. for k, v := range activeDataMap {
  420. if cost, ok := costMap[k]; ok {
  421. minutes := v.minutes
  422. costMap[k] = cost * (minutes / 60)
  423. }
  424. }
  425. }
  426. func ClusterNodes(dataSource source.OpenCostDataSource, cp models.Provider, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  427. mq := dataSource.Metrics()
  428. resolution := env.GetETLResolution()
  429. requiredGrp := source.NewQueryGroup()
  430. optionalGrp := source.NewQueryGroup()
  431. // return errors if these fail
  432. resChNodeCPUHourlyCost := source.WithGroup(requiredGrp, mq.QueryNodeCPUPricePerHr(start, end))
  433. resChNodeCPUCoresCapacity := source.WithGroup(requiredGrp, mq.QueryNodeCPUCoresCapacity(start, end))
  434. resChNodeCPUCoresAllocatable := source.WithGroup(requiredGrp, mq.QueryNodeCPUCoresAllocatable(start, end))
  435. resChNodeRAMHourlyCost := source.WithGroup(requiredGrp, mq.QueryNodeRAMPricePerGiBHr(start, end))
  436. resChNodeRAMBytesCapacity := source.WithGroup(requiredGrp, mq.QueryNodeRAMBytesCapacity(start, end))
  437. resChNodeRAMBytesAllocatable := source.WithGroup(requiredGrp, mq.QueryNodeRAMBytesAllocatable(start, end))
  438. resChNodeGPUCount := source.WithGroup(requiredGrp, mq.QueryNodeGPUCount(start, end))
  439. resChNodeGPUHourlyPrice := source.WithGroup(requiredGrp, mq.QueryNodeGPUPricePerHr(start, end))
  440. resChActiveMins := source.WithGroup(requiredGrp, mq.QueryNodeActiveMinutes(start, end))
  441. resChIsSpot := source.WithGroup(requiredGrp, mq.QueryNodeIsSpot(start, end))
  442. // Do not return errors if these fail, but log warnings
  443. resChNodeCPUModeTotal := source.WithGroup(optionalGrp, mq.QueryNodeCPUModeTotal(start, end))
  444. resChNodeRAMSystemPct := source.WithGroup(optionalGrp, mq.QueryNodeRAMSystemPercent(start, end))
  445. resChNodeRAMUserPct := source.WithGroup(optionalGrp, mq.QueryNodeRAMUserPercent(start, end))
  446. resChLabels := source.WithGroup(optionalGrp, mq.QueryNodeLabels(start, end))
  447. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  448. resNodeCPUCoresCapacity, _ := resChNodeCPUCoresCapacity.Await()
  449. resNodeCPUCoresAllocatable, _ := resChNodeCPUCoresAllocatable.Await()
  450. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  451. resNodeGPUHourlyPrice, _ := resChNodeGPUHourlyPrice.Await()
  452. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  453. resNodeRAMBytesCapacity, _ := resChNodeRAMBytesCapacity.Await()
  454. resNodeRAMBytesAllocatable, _ := resChNodeRAMBytesAllocatable.Await()
  455. resIsSpot, _ := resChIsSpot.Await()
  456. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  457. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  458. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  459. resActiveMins, _ := resChActiveMins.Await()
  460. resLabels, _ := resChLabels.Await()
  461. if optionalGrp.HasErrors() {
  462. for _, err := range optionalGrp.Errors() {
  463. log.Warnf("ClusterNodes: %s", err)
  464. }
  465. }
  466. if requiredGrp.HasErrors() {
  467. for _, err := range requiredGrp.Errors() {
  468. log.Errorf("ClusterNodes: %s", err)
  469. }
  470. return nil, requiredGrp.Error()
  471. }
  472. activeDataMap := buildActiveDataMap(resActiveMins, nodeKeyGen, nodeValues, resolution, opencost.NewClosedWindow(start, end))
  473. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  474. preemptibleMap := buildPreemptibleMap(resIsSpot)
  475. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  476. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  477. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyPrice, gpuCountMap, cp, preemptibleMap)
  478. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  479. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  480. cpuCoresCapacityMap := buildCPUCoresMap(resNodeCPUCoresCapacity)
  481. ramBytesCapacityMap := buildRAMBytesMap(resNodeRAMBytesCapacity)
  482. cpuCoresAllocatableMap := buildCPUCoresMap(resNodeCPUCoresAllocatable)
  483. ramBytesAllocatableMap := buildRAMBytesMap(resNodeRAMBytesAllocatable)
  484. overheadMap := buildOverheadMap(ramBytesCapacityMap, ramBytesAllocatableMap, cpuCoresCapacityMap, cpuCoresAllocatableMap)
  485. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  486. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  487. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  488. labelsMap := buildLabelsMap(resLabels)
  489. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresCapacityMap)
  490. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesCapacityMap)
  491. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  492. nodeMap := buildNodeMap(
  493. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  494. cpuCoresCapacityMap, ramBytesCapacityMap, ramUserPctMap,
  495. ramSystemPctMap,
  496. cpuBreakdownMap,
  497. activeDataMap,
  498. preemptibleMap,
  499. labelsMap,
  500. clusterAndNameToType,
  501. overheadMap,
  502. )
  503. c, err := cp.GetConfig()
  504. if err != nil {
  505. return nil, err
  506. }
  507. discount, err := ParsePercentString(c.Discount)
  508. if err != nil {
  509. return nil, err
  510. }
  511. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  512. if err != nil {
  513. return nil, err
  514. }
  515. for _, node := range nodeMap {
  516. // TODO take GKE Reserved Instances into account
  517. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  518. // Apply all remaining resources to Idle
  519. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  520. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  521. }
  522. return nodeMap, nil
  523. }
  524. type LoadBalancerIdentifier struct {
  525. Cluster string
  526. Namespace string
  527. Name string
  528. IngressIP string
  529. }
  530. type LoadBalancer struct {
  531. Cluster string
  532. Namespace string
  533. Name string
  534. ProviderID string
  535. Cost float64
  536. Start time.Time
  537. End time.Time
  538. Minutes float64
  539. Private bool
  540. Ip string
  541. }
  542. func ClusterLoadBalancers(dataSource source.OpenCostDataSource, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  543. resolution := env.GetETLResolution()
  544. grp := source.NewQueryGroup()
  545. mq := dataSource.Metrics()
  546. resChLBCost := source.WithGroup(grp, mq.QueryLBPricePerHr(start, end))
  547. resChActiveMins := source.WithGroup(grp, mq.QueryLBActiveMinutes(start, end))
  548. resLBCost, _ := resChLBCost.Await()
  549. resActiveMins, _ := resChActiveMins.Await()
  550. if grp.HasErrors() {
  551. return nil, grp.Error()
  552. }
  553. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  554. activeMap := buildActiveDataMap(resActiveMins, loadBalancerKeyGen, lbValues, resolution, opencost.NewClosedWindow(start, end))
  555. for _, result := range resLBCost {
  556. key, ok := loadBalancerKeyGen(result)
  557. if !ok {
  558. continue
  559. }
  560. lbPricePerHr := result.Data[0].Value
  561. lb := &LoadBalancer{
  562. Cluster: key.Cluster,
  563. Namespace: key.Namespace,
  564. Name: key.Name,
  565. Cost: lbPricePerHr, // default to hourly cost, overwrite if active entry exists
  566. Ip: key.IngressIP,
  567. Private: privateIPCheck(key.IngressIP),
  568. ProviderID: provider.ParseLBID(key.IngressIP),
  569. }
  570. if active, ok := activeMap[key]; ok {
  571. lb.Start = active.start
  572. lb.End = active.end
  573. lb.Minutes = active.minutes
  574. if lb.Minutes > 0 {
  575. lb.Cost = lbPricePerHr * (lb.Minutes / 60.0)
  576. } else {
  577. log.DedupedWarningf(20, "ClusterLoadBalancers: found zero minutes for key: %v", key)
  578. }
  579. }
  580. loadBalancerMap[key] = lb
  581. }
  582. return loadBalancerMap, nil
  583. }
  584. func ClusterManagement(dataSource source.OpenCostDataSource, start, end time.Time) (map[ClusterManagementIdentifier]*ClusterManagementCost, error) {
  585. resolution := env.GetETLResolution()
  586. grp := source.NewQueryGroup()
  587. mq := dataSource.Metrics()
  588. resChCMPrice := source.WithGroup(grp, mq.QueryClusterManagementPricePerHr(start, end))
  589. resChCMDur := source.WithGroup(grp, mq.QueryClusterManagementDuration(start, end))
  590. resCMPrice, _ := resChCMPrice.Await()
  591. resCMDur, _ := resChCMDur.Await()
  592. if grp.HasErrors() {
  593. return nil, grp.Error()
  594. }
  595. clusterManagementPriceMap := make(map[ClusterManagementIdentifier]*ClusterManagementCost, len(resCMDur))
  596. activeMap := buildActiveDataMap(resCMDur, clusterManagementKeyGen, clusterManagementValues, resolution, opencost.NewClosedWindow(start, end))
  597. for _, result := range resCMPrice {
  598. key, ok := clusterManagementKeyGen(result)
  599. if !ok {
  600. continue
  601. }
  602. cmPricePerHr := result.Data[0].Value
  603. cm := &ClusterManagementCost{
  604. Cluster: key.Cluster,
  605. Provisioner: key.Provisioner,
  606. Cost: cmPricePerHr, // default to hourly cost, overwrite if active entry exists
  607. }
  608. if active, ok := activeMap[key]; ok {
  609. if active.minutes > 0 {
  610. cm.Cost = cmPricePerHr * (active.minutes / 60.0)
  611. } else {
  612. log.DedupedWarningf(20, "ClusterManagement: found zero minutes for key: %v", key)
  613. }
  614. }
  615. clusterManagementPriceMap[key] = cm
  616. }
  617. return clusterManagementPriceMap, nil
  618. }
  619. // Check if an ip is private.
  620. func privateIPCheck(ip string) bool {
  621. ipAddress := net.ParseIP(ip)
  622. return ipAddress.IsPrivate()
  623. }
  624. func pvCosts(
  625. diskMap map[DiskIdentifier]*Disk,
  626. resolution time.Duration,
  627. resActiveMins []*source.PVActiveMinutesResult,
  628. resPVSize []*source.PVBytesResult,
  629. resPVCost []*source.PVPricePerGiBHourResult,
  630. resPVUsedAvg []*source.PVUsedAvgResult,
  631. resPVUsedMax []*source.PVUsedMaxResult,
  632. resPVCInfo []*source.PVCInfoResult,
  633. cp models.Provider,
  634. window opencost.Window,
  635. ) {
  636. for _, result := range resActiveMins {
  637. cluster := result.Cluster
  638. if cluster == "" {
  639. cluster = env.GetClusterID()
  640. }
  641. name := result.PersistentVolume
  642. if name == "" {
  643. log.Warnf("ClusterDisks: active mins missing pv name")
  644. continue
  645. }
  646. if len(result.Data) == 0 {
  647. continue
  648. }
  649. key := DiskIdentifier{
  650. Cluster: cluster,
  651. Name: name,
  652. }
  653. if _, ok := diskMap[key]; !ok {
  654. diskMap[key] = &Disk{
  655. Cluster: cluster,
  656. Name: name,
  657. Breakdown: &ClusterCostsBreakdown{},
  658. }
  659. }
  660. s, e := calculateStartAndEnd(result.Data, resolution, window)
  661. mins := e.Sub(s).Minutes()
  662. diskMap[key].End = e
  663. diskMap[key].Start = s
  664. diskMap[key].Minutes = mins
  665. }
  666. for _, result := range resPVSize {
  667. cluster := result.Cluster
  668. if cluster == "" {
  669. cluster = env.GetClusterID()
  670. }
  671. name := result.PersistentVolume
  672. if name == "" {
  673. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  674. continue
  675. }
  676. // TODO niko/assets storage class
  677. bytes := result.Data[0].Value
  678. key := DiskIdentifier{cluster, name}
  679. if _, ok := diskMap[key]; !ok {
  680. diskMap[key] = &Disk{
  681. Cluster: cluster,
  682. Name: name,
  683. Breakdown: &ClusterCostsBreakdown{},
  684. }
  685. }
  686. diskMap[key].Bytes = bytes
  687. }
  688. customPricingEnabled := provider.CustomPricesEnabled(cp)
  689. customPricingConfig, err := cp.GetConfig()
  690. if err != nil {
  691. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  692. }
  693. for _, result := range resPVCost {
  694. cluster := result.Cluster
  695. if cluster == "" {
  696. cluster = env.GetClusterID()
  697. }
  698. name := result.PersistentVolume
  699. if name == "" {
  700. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  701. continue
  702. }
  703. // TODO niko/assets storage class
  704. var cost float64
  705. if customPricingEnabled && customPricingConfig != nil {
  706. customPVCostStr := customPricingConfig.Storage
  707. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  708. if err != nil {
  709. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  710. }
  711. cost = customPVCost
  712. } else {
  713. cost = result.Data[0].Value
  714. }
  715. key := DiskIdentifier{cluster, name}
  716. if _, ok := diskMap[key]; !ok {
  717. diskMap[key] = &Disk{
  718. Cluster: cluster,
  719. Name: name,
  720. Breakdown: &ClusterCostsBreakdown{},
  721. }
  722. }
  723. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  724. providerID := result.ProviderID // just put the providerID set up here, it's the simplest query.
  725. if providerID != "" {
  726. diskMap[key].ProviderID = provider.ParsePVID(providerID)
  727. }
  728. }
  729. for _, result := range resPVUsedAvg {
  730. cluster := result.Cluster
  731. if cluster == "" {
  732. cluster = env.GetClusterID()
  733. }
  734. claimName := result.PersistentVolumeClaim
  735. if claimName == "" {
  736. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  737. continue
  738. }
  739. claimNamespace := result.Namespace
  740. if claimNamespace == "" {
  741. log.Debugf("ClusterDisks: pv usage data missing namespace")
  742. continue
  743. }
  744. var volumeName string
  745. for _, thatRes := range resPVCInfo {
  746. thatCluster := thatRes.Cluster
  747. if thatCluster == "" {
  748. thatCluster = env.GetClusterID()
  749. }
  750. thatVolumeName := thatRes.VolumeName
  751. if thatVolumeName == "" {
  752. log.Debugf("ClusterDisks: pv claim data missing volumename")
  753. continue
  754. }
  755. thatClaimName := thatRes.PersistentVolumeClaim
  756. if thatClaimName == "" {
  757. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  758. continue
  759. }
  760. thatClaimNamespace := thatRes.Namespace
  761. if thatClaimNamespace == "" {
  762. log.Debugf("ClusterDisks: pv claim data missing namespace")
  763. continue
  764. }
  765. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  766. volumeName = thatVolumeName
  767. }
  768. }
  769. usage := result.Data[0].Value
  770. key := DiskIdentifier{
  771. Cluster: cluster,
  772. Name: volumeName,
  773. }
  774. if _, ok := diskMap[key]; !ok {
  775. diskMap[key] = &Disk{
  776. Cluster: cluster,
  777. Name: volumeName,
  778. Breakdown: &ClusterCostsBreakdown{},
  779. }
  780. }
  781. diskMap[key].BytesUsedAvgPtr = &usage
  782. }
  783. for _, result := range resPVUsedMax {
  784. cluster := result.Cluster
  785. if cluster == "" {
  786. cluster = env.GetClusterID()
  787. }
  788. claimName := result.PersistentVolumeClaim
  789. if claimName == "" {
  790. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  791. continue
  792. }
  793. claimNamespace := result.Namespace
  794. if claimNamespace == "" {
  795. log.Debugf("ClusterDisks: pv usage data missing namespace")
  796. continue
  797. }
  798. var volumeName string
  799. for _, thatRes := range resPVCInfo {
  800. thatCluster := thatRes.Cluster
  801. if thatCluster == "" {
  802. thatCluster = env.GetClusterID()
  803. }
  804. thatVolumeName := thatRes.VolumeName
  805. if thatVolumeName == "" {
  806. log.Debugf("ClusterDisks: pv claim data missing volumename")
  807. continue
  808. }
  809. thatClaimName := thatRes.PersistentVolumeClaim
  810. if thatClaimName == "" {
  811. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  812. continue
  813. }
  814. thatClaimNamespace := thatRes.Namespace
  815. if thatClaimNamespace == "" {
  816. log.Debugf("ClusterDisks: pv claim data missing namespace")
  817. continue
  818. }
  819. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  820. volumeName = thatVolumeName
  821. }
  822. }
  823. usage := result.Data[0].Value
  824. key := DiskIdentifier{cluster, volumeName}
  825. if _, ok := diskMap[key]; !ok {
  826. diskMap[key] = &Disk{
  827. Cluster: cluster,
  828. Name: volumeName,
  829. Breakdown: &ClusterCostsBreakdown{},
  830. }
  831. }
  832. diskMap[key].BytesUsedMaxPtr = &usage
  833. }
  834. }
  835. // filterOutLocalPVs removes local Persistent Volumes (PVs) from the given disk map.
  836. // Local PVs are identified by the prefix "local-pv-" in their names, which is the
  837. // convention used by sig-storage-local-static-provisioner.
  838. //
  839. // Parameters:
  840. // - diskMap: A map of DiskIdentifier to Disk pointers, representing all PVs.
  841. //
  842. // Returns:
  843. // - A new map of DiskIdentifier to Disk pointers, containing only non-local PVs.
  844. func filterOutLocalPVs(diskMap map[DiskIdentifier]*Disk) map[DiskIdentifier]*Disk {
  845. nonLocalPVDiskMap := map[DiskIdentifier]*Disk{}
  846. for key, val := range diskMap {
  847. if !strings.HasPrefix(key.Name, SIG_STORAGE_LOCAL_PROVISIONER_PREFIX) {
  848. nonLocalPVDiskMap[key] = val
  849. }
  850. }
  851. return nonLocalPVDiskMap
  852. }