cluster.go 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450
  1. package costmodel
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/pkg/cloud/provider"
  9. "golang.org/x/exp/slices"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/opencost/opencost/core/pkg/opencost"
  12. "github.com/opencost/opencost/core/pkg/source"
  13. "github.com/opencost/opencost/core/pkg/util/timeutil"
  14. "github.com/opencost/opencost/pkg/cloud/models"
  15. "github.com/opencost/opencost/pkg/env"
  16. )
  17. const (
  18. queryClusterCores = `sum(
  19. avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost{%s}[%s] %s)) by (node, %s) * 730 +
  20. avg(avg_over_time(node_gpu_hourly_cost{%s}[%s] %s)) by (node, %s) * 730
  21. ) by (%s)`
  22. queryClusterRAM = `sum(
  23. avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost{%s}[%s] %s)) by (node, %s) * 730
  24. ) by (%s)`
  25. queryStorage = `sum(
  26. avg(avg_over_time(pv_hourly_cost{%s}[%s] %s)) by (persistentvolume, %s) * 730
  27. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  28. ) by (%s) %s`
  29. queryTotal = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 +
  30. sum(
  31. avg(avg_over_time(pv_hourly_cost{%s}[1h])) by (persistentvolume, %s) * 730
  32. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  33. ) by (%s) %s`
  34. queryNodes = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 %s`
  35. )
  36. const MAX_LOCAL_STORAGE_SIZE = 1024 * 1024 * 1024 * 1024
  37. // When ASSET_INCLUDE_LOCAL_DISK_COST is set to false, local storage
  38. // provisioned by sig-storage-local-static-provisioner is excluded
  39. // by checking if the volume is prefixed by "local-pv-".
  40. //
  41. // This is based on the sig-storage-local-static-provisioner implementation,
  42. // which creates all PVs with the "local-pv-" prefix. For reference, see:
  43. // https://github.com/kubernetes-sigs/sig-storage-local-static-provisioner/blob/b6f465027bd059e92c0032c81dd1e1d90e35c909/pkg/discovery/discovery.go#L410-L417
  44. const SIG_STORAGE_LOCAL_PROVISIONER_PREFIX = "local-pv-"
  45. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  46. // are broken down by cores, memory, and storage.
  47. type ClusterCosts struct {
  48. Start *time.Time `json:"startTime"`
  49. End *time.Time `json:"endTime"`
  50. CPUCumulative float64 `json:"cpuCumulativeCost"`
  51. CPUMonthly float64 `json:"cpuMonthlyCost"`
  52. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  53. GPUCumulative float64 `json:"gpuCumulativeCost"`
  54. GPUMonthly float64 `json:"gpuMonthlyCost"`
  55. RAMCumulative float64 `json:"ramCumulativeCost"`
  56. RAMMonthly float64 `json:"ramMonthlyCost"`
  57. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  58. StorageCumulative float64 `json:"storageCumulativeCost"`
  59. StorageMonthly float64 `json:"storageMonthlyCost"`
  60. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  61. TotalCumulative float64 `json:"totalCumulativeCost"`
  62. TotalMonthly float64 `json:"totalMonthlyCost"`
  63. DataMinutes float64
  64. }
  65. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  66. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  67. type ClusterCostsBreakdown struct {
  68. Idle float64 `json:"idle"`
  69. Other float64 `json:"other"`
  70. System float64 `json:"system"`
  71. User float64 `json:"user"`
  72. }
  73. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  74. // the associated monthly rate data, and returns the Costs.
  75. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  76. start, end := timeutil.ParseTimeRange(window, offset)
  77. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  78. if dataHours == 0 {
  79. dataHours = end.Sub(start).Hours()
  80. }
  81. // Do not allow zero-length windows to prevent divide-by-zero issues
  82. if dataHours == 0 {
  83. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  84. }
  85. cc := &ClusterCosts{
  86. Start: &start,
  87. End: &end,
  88. CPUCumulative: cpu,
  89. GPUCumulative: gpu,
  90. RAMCumulative: ram,
  91. StorageCumulative: storage,
  92. TotalCumulative: cpu + gpu + ram + storage,
  93. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  94. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  95. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  96. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  97. }
  98. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  99. return cc, nil
  100. }
  101. type Disk struct {
  102. Cluster string
  103. Name string
  104. ProviderID string
  105. StorageClass string
  106. VolumeName string
  107. ClaimName string
  108. ClaimNamespace string
  109. Cost float64
  110. Bytes float64
  111. // These two fields may not be available at all times because they rely on
  112. // a new set of metrics that may or may not be available. Thus, they must
  113. // be nilable to represent the complete absence of the data.
  114. //
  115. // In other words, nilability here lets us distinguish between
  116. // "metric is not available" and "metric is available but is 0".
  117. //
  118. // They end in "Ptr" to distinguish from an earlier version in order to
  119. // ensure that all usages are checked for nil.
  120. BytesUsedAvgPtr *float64
  121. BytesUsedMaxPtr *float64
  122. Local bool
  123. Start time.Time
  124. End time.Time
  125. Minutes float64
  126. Breakdown *ClusterCostsBreakdown
  127. }
  128. type DiskIdentifier struct {
  129. Cluster string
  130. Name string
  131. }
  132. func ClusterDisks(dataSource source.OpenCostDataSource, cp models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  133. resolution := env.GetETLResolution()
  134. grp := source.NewQueryGroup()
  135. resChPVCost := grp.With(dataSource.QueryPVCost(start, end))
  136. resChPVSize := grp.With(dataSource.QueryPVSize(start, end))
  137. resChActiveMins := grp.With(dataSource.QueryPVActiveMinutes(start, end))
  138. resChPVStorageClass := grp.With(dataSource.QueryPVStorageClass(start, end))
  139. resChPVUsedAvg := grp.With(dataSource.QueryPVUsedAverage(start, end))
  140. resChPVUsedMax := grp.With(dataSource.QueryPVUsedMax(start, end))
  141. resChPVCInfo := grp.With(dataSource.QueryPVCInfo(start, end))
  142. resPVCost, _ := resChPVCost.Await()
  143. resPVSize, _ := resChPVSize.Await()
  144. resActiveMins, _ := resChActiveMins.Await()
  145. resPVStorageClass, _ := resChPVStorageClass.Await()
  146. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  147. resPVUsedMax, _ := resChPVUsedMax.Await()
  148. resPVCInfo, _ := resChPVCInfo.Await()
  149. // Cloud providers do not always charge for a node's local disk costs (i.e.
  150. // ephemeral storage). Provide an option to opt out of calculating &
  151. // allocating local disk costs. Note, that this does not affect
  152. // PersistentVolume costs.
  153. //
  154. // Ref:
  155. // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/RootDeviceStorage.html
  156. // https://learn.microsoft.com/en-us/azure/virtual-machines/managed-disks-overview#temporary-disk
  157. // https://cloud.google.com/compute/docs/disks/local-ssd
  158. resLocalStorageCost := []*source.QueryResult{}
  159. resLocalStorageUsedCost := []*source.QueryResult{}
  160. resLocalStorageUsedAvg := []*source.QueryResult{}
  161. resLocalStorageUsedMax := []*source.QueryResult{}
  162. resLocalStorageBytes := []*source.QueryResult{}
  163. resLocalActiveMins := []*source.QueryResult{}
  164. if env.GetAssetIncludeLocalDiskCost() {
  165. resChLocalStorageCost := grp.With(dataSource.QueryLocalStorageCost(start, end))
  166. resChLocalStorageUsedCost := grp.With(dataSource.QueryLocalStorageUsedCost(start, end))
  167. resChLocalStoreageUsedAvg := grp.With(dataSource.QueryLocalStorageUsedAvg(start, end))
  168. resChLocalStoreageUsedMax := grp.With(dataSource.QueryLocalStorageUsedMax(start, end))
  169. resChLocalStorageBytes := grp.With(dataSource.QueryLocalStorageBytes(start, end))
  170. resChLocalActiveMins := grp.With(dataSource.QueryLocalStorageActiveMinutes(start, end))
  171. resLocalStorageCost, _ = resChLocalStorageCost.Await()
  172. resLocalStorageUsedCost, _ = resChLocalStorageUsedCost.Await()
  173. resLocalStorageUsedAvg, _ = resChLocalStoreageUsedAvg.Await()
  174. resLocalStorageUsedMax, _ = resChLocalStoreageUsedMax.Await()
  175. resLocalStorageBytes, _ = resChLocalStorageBytes.Await()
  176. resLocalActiveMins, _ = resChLocalActiveMins.Await()
  177. }
  178. if grp.HasErrors() {
  179. return nil, grp.Error()
  180. }
  181. diskMap := map[DiskIdentifier]*Disk{}
  182. for _, result := range resPVCInfo {
  183. cluster, err := result.GetCluster()
  184. if err != nil {
  185. cluster = env.GetClusterID()
  186. }
  187. volumeName, err := result.GetString("volumename")
  188. if err != nil {
  189. log.Debugf("ClusterDisks: pv claim data missing volumename")
  190. continue
  191. }
  192. claimName, err := result.GetString("persistentvolumeclaim")
  193. if err != nil {
  194. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  195. continue
  196. }
  197. claimNamespace, err := result.GetNamespace()
  198. if err != nil {
  199. log.Debugf("ClusterDisks: pv claim data missing namespace")
  200. continue
  201. }
  202. key := DiskIdentifier{cluster, volumeName}
  203. if _, ok := diskMap[key]; !ok {
  204. diskMap[key] = &Disk{
  205. Cluster: cluster,
  206. Name: volumeName,
  207. Breakdown: &ClusterCostsBreakdown{},
  208. }
  209. }
  210. diskMap[key].VolumeName = volumeName
  211. diskMap[key].ClaimName = claimName
  212. diskMap[key].ClaimNamespace = claimNamespace
  213. }
  214. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, cp, opencost.NewClosedWindow(start, end))
  215. type localStorage struct {
  216. device string
  217. disk *Disk
  218. }
  219. localStorageDisks := map[DiskIdentifier]localStorage{}
  220. // Start with local storage bytes so that the device with the largest size which has passed the
  221. // query filters can be determined
  222. for _, result := range resLocalStorageBytes {
  223. cluster, err := result.GetCluster()
  224. if err != nil {
  225. cluster = env.GetClusterID()
  226. }
  227. name, err := result.GetInstance()
  228. if err != nil {
  229. log.Warnf("ClusterDisks: local storage data missing instance")
  230. continue
  231. }
  232. device, err := result.GetDevice()
  233. if err != nil {
  234. log.Warnf("ClusterDisks: local storage data missing device")
  235. continue
  236. }
  237. bytes := result.Values[0].Value
  238. // Ignore disks that are larger than the max size
  239. if bytes > MAX_LOCAL_STORAGE_SIZE {
  240. continue
  241. }
  242. key := DiskIdentifier{cluster, name}
  243. // only keep the device with the most bytes per instance
  244. if current, ok := localStorageDisks[key]; !ok || current.disk.Bytes < bytes {
  245. localStorageDisks[key] = localStorage{
  246. device: device,
  247. disk: &Disk{
  248. Cluster: cluster,
  249. Name: name,
  250. Breakdown: &ClusterCostsBreakdown{},
  251. Local: true,
  252. StorageClass: opencost.LocalStorageClass,
  253. Bytes: bytes,
  254. },
  255. }
  256. }
  257. }
  258. for _, result := range resLocalStorageCost {
  259. cluster, err := result.GetCluster()
  260. if err != nil {
  261. cluster = env.GetClusterID()
  262. }
  263. name, err := result.GetInstance()
  264. if err != nil {
  265. log.Warnf("ClusterDisks: local storage data missing instance")
  266. continue
  267. }
  268. device, err := result.GetDevice()
  269. if err != nil {
  270. log.Warnf("ClusterDisks: local storage data missing device")
  271. continue
  272. }
  273. cost := result.Values[0].Value
  274. key := DiskIdentifier{cluster, name}
  275. ls, ok := localStorageDisks[key]
  276. if !ok || ls.device != device {
  277. continue
  278. }
  279. ls.disk.Cost = cost
  280. }
  281. for _, result := range resLocalStorageUsedCost {
  282. cluster, err := result.GetCluster()
  283. if err != nil {
  284. cluster = env.GetClusterID()
  285. }
  286. name, err := result.GetInstance()
  287. if err != nil {
  288. log.Warnf("ClusterDisks: local storage usage data missing instance")
  289. continue
  290. }
  291. device, err := result.GetDevice()
  292. if err != nil {
  293. log.Warnf("ClusterDisks: local storage data missing device")
  294. continue
  295. }
  296. cost := result.Values[0].Value
  297. key := DiskIdentifier{cluster, name}
  298. ls, ok := localStorageDisks[key]
  299. if !ok || ls.device != device {
  300. continue
  301. }
  302. ls.disk.Breakdown.System = cost / ls.disk.Cost
  303. }
  304. for _, result := range resLocalStorageUsedAvg {
  305. cluster, err := result.GetCluster()
  306. if err != nil {
  307. cluster = env.GetClusterID()
  308. }
  309. name, err := result.GetInstance()
  310. if err != nil {
  311. log.Warnf("ClusterDisks: local storage data missing instance")
  312. continue
  313. }
  314. device, err := result.GetDevice()
  315. if err != nil {
  316. log.Warnf("ClusterDisks: local storage data missing device")
  317. continue
  318. }
  319. bytesAvg := result.Values[0].Value
  320. key := DiskIdentifier{cluster, name}
  321. ls, ok := localStorageDisks[key]
  322. if !ok || ls.device != device {
  323. continue
  324. }
  325. ls.disk.BytesUsedAvgPtr = &bytesAvg
  326. }
  327. for _, result := range resLocalStorageUsedMax {
  328. cluster, err := result.GetCluster()
  329. if err != nil {
  330. cluster = env.GetClusterID()
  331. }
  332. name, err := result.GetInstance()
  333. if err != nil {
  334. log.Warnf("ClusterDisks: local storage data missing instance")
  335. continue
  336. }
  337. device, err := result.GetDevice()
  338. if err != nil {
  339. log.Warnf("ClusterDisks: local storage data missing device")
  340. continue
  341. }
  342. bytesMax := result.Values[0].Value
  343. key := DiskIdentifier{cluster, name}
  344. ls, ok := localStorageDisks[key]
  345. if !ok || ls.device != device {
  346. continue
  347. }
  348. ls.disk.BytesUsedMaxPtr = &bytesMax
  349. }
  350. for _, result := range resLocalActiveMins {
  351. cluster, err := result.GetCluster()
  352. if err != nil {
  353. cluster = env.GetClusterID()
  354. }
  355. name, err := result.GetNode()
  356. if err != nil {
  357. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  358. continue
  359. }
  360. providerID, err := result.GetProviderID()
  361. if err != nil {
  362. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  363. continue
  364. }
  365. key := DiskIdentifier{cluster, name}
  366. ls, ok := localStorageDisks[key]
  367. if !ok {
  368. continue
  369. }
  370. ls.disk.ProviderID = provider.ParseLocalDiskID(providerID)
  371. if len(result.Values) == 0 {
  372. continue
  373. }
  374. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  375. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  376. mins := e.Sub(s).Minutes()
  377. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  378. ls.disk.End = e
  379. ls.disk.Start = s
  380. ls.disk.Minutes = mins
  381. }
  382. // move local storage disks to main disk map
  383. for key, ls := range localStorageDisks {
  384. diskMap[key] = ls.disk
  385. }
  386. var unTracedDiskLogData []DiskIdentifier
  387. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  388. for _, result := range resPVStorageClass {
  389. cluster, err := result.GetCluster()
  390. if err != nil {
  391. cluster = env.GetClusterID()
  392. }
  393. name, _ := result.GetString("persistentvolume")
  394. key := DiskIdentifier{cluster, name}
  395. if _, ok := diskMap[key]; !ok {
  396. if !slices.Contains(unTracedDiskLogData, key) {
  397. unTracedDiskLogData = append(unTracedDiskLogData, key)
  398. }
  399. continue
  400. }
  401. if len(result.Values) == 0 {
  402. continue
  403. }
  404. storageClass, err := result.GetString("storageclass")
  405. if err != nil {
  406. diskMap[key].StorageClass = opencost.UnknownStorageClass
  407. } else {
  408. diskMap[key].StorageClass = storageClass
  409. }
  410. }
  411. // Logging the unidentified disk information outside the loop
  412. for _, unIdentifiedDisk := range unTracedDiskLogData {
  413. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  414. }
  415. for _, disk := range diskMap {
  416. // Apply all remaining RAM to Idle
  417. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  418. // Set provider Id to the name for reconciliation
  419. if disk.ProviderID == "" {
  420. disk.ProviderID = disk.Name
  421. }
  422. }
  423. if !env.GetAssetIncludeLocalDiskCost() {
  424. return filterOutLocalPVs(diskMap), nil
  425. }
  426. return diskMap, nil
  427. }
  428. type NodeOverhead struct {
  429. CpuOverheadFraction float64
  430. RamOverheadFraction float64
  431. }
  432. type Node struct {
  433. Cluster string
  434. Name string
  435. ProviderID string
  436. NodeType string
  437. CPUCost float64
  438. CPUCores float64
  439. GPUCost float64
  440. GPUCount float64
  441. RAMCost float64
  442. RAMBytes float64
  443. Discount float64
  444. Preemptible bool
  445. CPUBreakdown *ClusterCostsBreakdown
  446. RAMBreakdown *ClusterCostsBreakdown
  447. Start time.Time
  448. End time.Time
  449. Minutes float64
  450. Labels map[string]string
  451. CostPerCPUHr float64
  452. CostPerRAMGiBHr float64
  453. CostPerGPUHr float64
  454. Overhead *NodeOverhead
  455. }
  456. // GKE lies about the number of cores e2 nodes have. This table
  457. // contains a mapping from node type -> actual CPU cores
  458. // for those cases.
  459. var partialCPUMap = map[string]float64{
  460. "e2-micro": 0.25,
  461. "e2-small": 0.5,
  462. "e2-medium": 1.0,
  463. }
  464. type NodeIdentifier struct {
  465. Cluster string
  466. Name string
  467. ProviderID string
  468. }
  469. type nodeIdentifierNoProviderID struct {
  470. Cluster string
  471. Name string
  472. }
  473. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  474. for k, v := range activeDataMap {
  475. keyNon := nodeIdentifierNoProviderID{
  476. Cluster: k.Cluster,
  477. Name: k.Name,
  478. }
  479. if cost, ok := costMap[k]; ok {
  480. minutes := v.minutes
  481. count := 1.0
  482. if c, ok := resourceCountMap[keyNon]; ok {
  483. count = c
  484. }
  485. costMap[k] = cost * (minutes / 60) * count
  486. }
  487. }
  488. }
  489. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  490. for k, v := range activeDataMap {
  491. if cost, ok := costMap[k]; ok {
  492. minutes := v.minutes
  493. costMap[k] = cost * (minutes / 60)
  494. }
  495. }
  496. }
  497. func ClusterNodes(dataSource source.OpenCostDataSource, cp models.Provider, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  498. resolution := env.GetETLResolution()
  499. requiredGrp := source.NewQueryGroup()
  500. optionalGrp := source.NewQueryGroup()
  501. // return errors if these fail
  502. resChNodeCPUHourlyCost := requiredGrp.With(dataSource.QueryNodeCPUHourlyCost(start, end))
  503. resChNodeCPUCoresCapacity := requiredGrp.With(dataSource.QueryNodeCPUCoresCapacity(start, end))
  504. resChNodeCPUCoresAllocatable := requiredGrp.With(dataSource.QueryNodeCPUCoresAllocatable(start, end))
  505. resChNodeRAMHourlyCost := requiredGrp.With(dataSource.QueryNodeRAMHourlyCost(start, end))
  506. resChNodeRAMBytesCapacity := requiredGrp.With(dataSource.QueryNodeRAMBytesCapacity(start, end))
  507. resChNodeRAMBytesAllocatable := requiredGrp.With(dataSource.QueryNodeRAMBytesAllocatable(start, end))
  508. resChNodeGPUCount := requiredGrp.With(dataSource.QueryNodeGPUCount(start, end))
  509. resChNodeGPUHourlyCost := requiredGrp.With(dataSource.QueryNodeGPUHourlyCost(start, end))
  510. resChActiveMins := requiredGrp.With(dataSource.QueryNodeActiveMinutes(start, end))
  511. resChIsSpot := requiredGrp.With(dataSource.QueryNodeIsSpot(start, end))
  512. // Do not return errors if these fail, but log warnings
  513. resChNodeCPUModeTotal := optionalGrp.With(dataSource.QueryNodeCPUModeTotal(start, end))
  514. resChNodeRAMSystemPct := optionalGrp.With(dataSource.QueryNodeRAMSystemPercent(start, end))
  515. resChNodeRAMUserPct := optionalGrp.With(dataSource.QueryNodeRAMUserPercent(start, end))
  516. resChLabels := optionalGrp.With(dataSource.QueryNodeLabels(start, end))
  517. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  518. resNodeCPUCoresCapacity, _ := resChNodeCPUCoresCapacity.Await()
  519. resNodeCPUCoresAllocatable, _ := resChNodeCPUCoresAllocatable.Await()
  520. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  521. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  522. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  523. resNodeRAMBytesCapacity, _ := resChNodeRAMBytesCapacity.Await()
  524. resNodeRAMBytesAllocatable, _ := resChNodeRAMBytesAllocatable.Await()
  525. resIsSpot, _ := resChIsSpot.Await()
  526. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  527. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  528. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  529. resActiveMins, _ := resChActiveMins.Await()
  530. resLabels, _ := resChLabels.Await()
  531. if optionalGrp.HasErrors() {
  532. for _, err := range optionalGrp.Errors() {
  533. log.Warnf("ClusterNodes: %s", err)
  534. }
  535. }
  536. if requiredGrp.HasErrors() {
  537. for _, err := range requiredGrp.Errors() {
  538. log.Errorf("ClusterNodes: %s", err)
  539. }
  540. return nil, requiredGrp.Error()
  541. }
  542. activeDataMap := buildActiveDataMap(resActiveMins, resolution, opencost.NewClosedWindow(start, end))
  543. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  544. preemptibleMap := buildPreemptibleMap(resIsSpot)
  545. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  546. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  547. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  548. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  549. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  550. cpuCoresCapacityMap := buildCPUCoresMap(resNodeCPUCoresCapacity)
  551. ramBytesCapacityMap := buildRAMBytesMap(resNodeRAMBytesCapacity)
  552. cpuCoresAllocatableMap := buildCPUCoresMap(resNodeCPUCoresAllocatable)
  553. ramBytesAllocatableMap := buildRAMBytesMap(resNodeRAMBytesAllocatable)
  554. overheadMap := buildOverheadMap(ramBytesCapacityMap, ramBytesAllocatableMap, cpuCoresCapacityMap, cpuCoresAllocatableMap)
  555. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  556. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  557. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  558. labelsMap := buildLabelsMap(resLabels)
  559. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresCapacityMap)
  560. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesCapacityMap)
  561. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  562. nodeMap := buildNodeMap(
  563. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  564. cpuCoresCapacityMap, ramBytesCapacityMap, ramUserPctMap,
  565. ramSystemPctMap,
  566. cpuBreakdownMap,
  567. activeDataMap,
  568. preemptibleMap,
  569. labelsMap,
  570. clusterAndNameToType,
  571. resolution,
  572. overheadMap,
  573. )
  574. c, err := cp.GetConfig()
  575. if err != nil {
  576. return nil, err
  577. }
  578. discount, err := ParsePercentString(c.Discount)
  579. if err != nil {
  580. return nil, err
  581. }
  582. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  583. if err != nil {
  584. return nil, err
  585. }
  586. for _, node := range nodeMap {
  587. // TODO take GKE Reserved Instances into account
  588. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  589. // Apply all remaining resources to Idle
  590. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  591. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  592. }
  593. return nodeMap, nil
  594. }
  595. type LoadBalancerIdentifier struct {
  596. Cluster string
  597. Namespace string
  598. Name string
  599. }
  600. type LoadBalancer struct {
  601. Cluster string
  602. Namespace string
  603. Name string
  604. ProviderID string
  605. Cost float64
  606. Start time.Time
  607. End time.Time
  608. Minutes float64
  609. Private bool
  610. Ip string
  611. }
  612. func ClusterLoadBalancers(dataSource source.OpenCostDataSource, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  613. resolution := env.GetETLResolution()
  614. grp := source.NewQueryGroup()
  615. resChLBCost := grp.With(dataSource.QueryLBCost(start, end))
  616. resChActiveMins := grp.With(dataSource.QueryLBActiveMinutes(start, end))
  617. resLBCost, _ := resChLBCost.Await()
  618. resActiveMins, _ := resChActiveMins.Await()
  619. if grp.HasErrors() {
  620. return nil, grp.Error()
  621. }
  622. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  623. for _, result := range resActiveMins {
  624. cluster, err := result.GetCluster()
  625. if err != nil {
  626. cluster = env.GetClusterID()
  627. }
  628. namespace, err := result.GetNamespace()
  629. if err != nil {
  630. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  631. continue
  632. }
  633. name, err := result.GetString("service_name")
  634. if err != nil {
  635. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  636. continue
  637. }
  638. providerID, err := result.GetString("ingress_ip")
  639. if err != nil {
  640. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  641. providerID = ""
  642. }
  643. key := LoadBalancerIdentifier{
  644. Cluster: cluster,
  645. Namespace: namespace,
  646. Name: name,
  647. }
  648. // Skip if there are no data
  649. if len(result.Values) == 0 {
  650. continue
  651. }
  652. // Add load balancer to the set of load balancers
  653. if _, ok := loadBalancerMap[key]; !ok {
  654. loadBalancerMap[key] = &LoadBalancer{
  655. Cluster: cluster,
  656. Namespace: namespace,
  657. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  658. ProviderID: provider.ParseLBID(providerID),
  659. }
  660. }
  661. // Append start, end, and minutes. This should come before all other data.
  662. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  663. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  664. loadBalancerMap[key].Start = s
  665. loadBalancerMap[key].End = e
  666. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  667. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  668. // Prevents there from being a duplicate LoadBalancers on the same day
  669. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  670. loadBalancerMap[key].ProviderID = providerID
  671. }
  672. }
  673. for _, result := range resLBCost {
  674. cluster, err := result.GetCluster()
  675. if err != nil {
  676. cluster = env.GetClusterID()
  677. }
  678. namespace, err := result.GetNamespace()
  679. if err != nil {
  680. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  681. continue
  682. }
  683. name, err := result.GetString("service_name")
  684. if err != nil {
  685. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  686. continue
  687. }
  688. providerID, err := result.GetString("ingress_ip")
  689. if err != nil {
  690. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  691. // only update asset cost when an actual IP was returned
  692. continue
  693. }
  694. key := LoadBalancerIdentifier{
  695. Cluster: cluster,
  696. Namespace: namespace,
  697. Name: name,
  698. }
  699. // Apply cost as price-per-hour * hours
  700. if lb, ok := loadBalancerMap[key]; ok {
  701. lbPricePerHr := result.Values[0].Value
  702. // interpolate any missing data
  703. resultMins := lb.Minutes
  704. if resultMins > 0 {
  705. scaleFactor := (resultMins + resolution.Minutes()) / resultMins
  706. hrs := (lb.Minutes * scaleFactor) / 60.0
  707. lb.Cost += lbPricePerHr * hrs
  708. } else {
  709. log.DedupedWarningf(20, "ClusterLoadBalancers: found zero minutes for key: %v", key)
  710. }
  711. if lb.Ip != "" && lb.Ip != providerID {
  712. log.DedupedWarningf(5, "ClusterLoadBalancers: multiple IPs per load balancer not supported, using most recent IP")
  713. }
  714. lb.Ip = providerID
  715. lb.Private = privateIPCheck(providerID)
  716. } else {
  717. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %v", key)
  718. }
  719. }
  720. return loadBalancerMap, nil
  721. }
  722. // Check if an ip is private.
  723. func privateIPCheck(ip string) bool {
  724. ipAddress := net.ParseIP(ip)
  725. return ipAddress.IsPrivate()
  726. }
  727. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  728. func (a *Accesses) ComputeClusterCosts(dataSource source.OpenCostDataSource, provider models.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  729. if window < 10*time.Minute {
  730. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  731. }
  732. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  733. start, end := timeutil.ParseTimeRange(window, offset)
  734. mins := end.Sub(start).Minutes()
  735. providerName := ""
  736. if clusterInfo, err := provider.ClusterInfo(); err != nil {
  737. providerName = clusterInfo["provider"]
  738. }
  739. grp := source.NewQueryGroup()
  740. resChs := []*source.QueryGroupAsyncResult{}
  741. queryDataCount := grp.With(dataSource.QueryDataCount(start, end))
  742. queryTotalGPU := grp.With(dataSource.QueryTotalGPU(start, end))
  743. queryTotalCPU := grp.With(dataSource.QueryTotalCPU(start, end))
  744. queryTotalRAM := grp.With(dataSource.QueryTotalRAM(start, end))
  745. queryTotalStorage := grp.With(dataSource.QueryTotalStorage(start, end))
  746. queryTotalLocalStorage := grp.With(dataSource.QueryLocalStorageBytesByProvider(providerName, start, end))
  747. resChs = append(resChs, queryDataCount, queryTotalGPU, queryTotalCPU, queryTotalRAM, queryTotalStorage, queryTotalLocalStorage)
  748. if withBreakdown {
  749. queryCPUModePct := grp.With(dataSource.QueryNodeCPUModePercent(start, end))
  750. queryRAMSystemPct := grp.With(dataSource.QueryNodeRAMSystemPercent(start, end))
  751. queryRAMUserPct := grp.With(dataSource.QueryNodeRAMUserPercent(start, end))
  752. queryUsedLocalStorage := grp.With(dataSource.QueryLocalStorageUsedByProvider(providerName, start, end))
  753. resChs = append(resChs, queryCPUModePct, queryRAMSystemPct, queryRAMUserPct, queryUsedLocalStorage)
  754. }
  755. resDataCount, _ := resChs[0].Await()
  756. resTotalGPU, _ := resChs[1].Await()
  757. resTotalCPU, _ := resChs[2].Await()
  758. resTotalRAM, _ := resChs[3].Await()
  759. resTotalStorage, _ := resChs[4].Await()
  760. if grp.HasErrors() {
  761. return nil, grp.Error()
  762. }
  763. defaultClusterID := env.GetClusterID()
  764. dataMinsByCluster := map[string]float64{}
  765. for _, result := range resDataCount {
  766. clusterID, _ := result.GetCluster()
  767. if clusterID == "" {
  768. clusterID = defaultClusterID
  769. }
  770. dataMins := mins
  771. if len(result.Values) > 0 {
  772. dataMins = result.Values[0].Value
  773. } else {
  774. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  775. }
  776. dataMinsByCluster[clusterID] = dataMins
  777. }
  778. // Determine combined discount
  779. discount, customDiscount := 0.0, 0.0
  780. c, err := a.CloudProvider.GetConfig()
  781. if err == nil {
  782. discount, err = ParsePercentString(c.Discount)
  783. if err != nil {
  784. discount = 0.0
  785. }
  786. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  787. if err != nil {
  788. customDiscount = 0.0
  789. }
  790. }
  791. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  792. costData := make(map[string]map[string]float64)
  793. // Helper function to iterate over Prom query results, parsing the raw values into
  794. // the intermediate costData structure.
  795. setCostsFromResults := func(costData map[string]map[string]float64, results []*source.QueryResult, name string, discount float64, customDiscount float64) {
  796. for _, result := range results {
  797. clusterID, _ := result.GetCluster()
  798. if clusterID == "" {
  799. clusterID = defaultClusterID
  800. }
  801. if _, ok := costData[clusterID]; !ok {
  802. costData[clusterID] = map[string]float64{}
  803. }
  804. if len(result.Values) > 0 {
  805. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  806. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  807. }
  808. }
  809. }
  810. // Apply both sustained use and custom discounts to RAM and CPU
  811. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  812. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  813. // Apply only custom discount to GPU and storage
  814. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  815. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  816. resTotalLocalStorage, err := resChs[5].Await()
  817. if err != nil {
  818. return nil, err
  819. }
  820. if len(resTotalLocalStorage) > 0 {
  821. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  822. }
  823. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  824. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  825. pvUsedCostMap := map[string]float64{}
  826. if withBreakdown {
  827. resCPUModePct, _ := resChs[6].Await()
  828. resRAMSystemPct, _ := resChs[7].Await()
  829. resRAMUserPct, _ := resChs[8].Await()
  830. if grp.HasErrors() {
  831. return nil, grp.Error()
  832. }
  833. for _, result := range resCPUModePct {
  834. clusterID, _ := result.GetCluster()
  835. if clusterID == "" {
  836. clusterID = defaultClusterID
  837. }
  838. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  839. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  840. }
  841. cpuBD := cpuBreakdownMap[clusterID]
  842. mode, err := result.GetString("mode")
  843. if err != nil {
  844. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  845. mode = "other"
  846. }
  847. switch mode {
  848. case "idle":
  849. cpuBD.Idle += result.Values[0].Value
  850. case "system":
  851. cpuBD.System += result.Values[0].Value
  852. case "user":
  853. cpuBD.User += result.Values[0].Value
  854. default:
  855. cpuBD.Other += result.Values[0].Value
  856. }
  857. }
  858. for _, result := range resRAMSystemPct {
  859. clusterID, _ := result.GetCluster()
  860. if clusterID == "" {
  861. clusterID = defaultClusterID
  862. }
  863. if _, ok := ramBreakdownMap[clusterID]; !ok {
  864. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  865. }
  866. ramBD := ramBreakdownMap[clusterID]
  867. ramBD.System += result.Values[0].Value
  868. }
  869. for _, result := range resRAMUserPct {
  870. clusterID, _ := result.GetCluster()
  871. if clusterID == "" {
  872. clusterID = defaultClusterID
  873. }
  874. if _, ok := ramBreakdownMap[clusterID]; !ok {
  875. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  876. }
  877. ramBD := ramBreakdownMap[clusterID]
  878. ramBD.User += result.Values[0].Value
  879. }
  880. for _, ramBD := range ramBreakdownMap {
  881. remaining := 1.0
  882. remaining -= ramBD.Other
  883. remaining -= ramBD.System
  884. remaining -= ramBD.User
  885. ramBD.Idle = remaining
  886. }
  887. resUsedLocalStorage, err := resChs[9].Await()
  888. if err != nil {
  889. return nil, err
  890. }
  891. for _, result := range resUsedLocalStorage {
  892. clusterID, _ := result.GetCluster()
  893. if clusterID == "" {
  894. clusterID = defaultClusterID
  895. }
  896. pvUsedCostMap[clusterID] += result.Values[0].Value
  897. }
  898. }
  899. if grp.HasErrors() {
  900. for _, err := range grp.Errors() {
  901. log.Errorf("ComputeClusterCosts: %s", err)
  902. }
  903. return nil, grp.Error()
  904. }
  905. // Convert intermediate structure to Costs instances
  906. costsByCluster := map[string]*ClusterCosts{}
  907. for id, cd := range costData {
  908. dataMins, ok := dataMinsByCluster[id]
  909. if !ok {
  910. dataMins = mins
  911. log.Warnf("Cluster cost data count not found for cluster %s", id)
  912. }
  913. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  914. if err != nil {
  915. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  916. return nil, err
  917. }
  918. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  919. costs.CPUBreakdown = cpuBD
  920. }
  921. if ramBD, ok := ramBreakdownMap[id]; ok {
  922. costs.RAMBreakdown = ramBD
  923. }
  924. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  925. if pvUC, ok := pvUsedCostMap[id]; ok {
  926. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  927. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  928. }
  929. costs.DataMinutes = dataMins
  930. costsByCluster[id] = costs
  931. }
  932. return costsByCluster, nil
  933. }
  934. type Totals struct {
  935. TotalCost [][]string `json:"totalcost"`
  936. CPUCost [][]string `json:"cpucost"`
  937. MemCost [][]string `json:"memcost"`
  938. StorageCost [][]string `json:"storageCost"`
  939. }
  940. func resultToTotals(qrs []*source.QueryResult) ([][]string, error) {
  941. if len(qrs) == 0 {
  942. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  943. }
  944. result := qrs[0]
  945. totals := [][]string{}
  946. for _, value := range result.Values {
  947. d0 := fmt.Sprintf("%f", value.Timestamp)
  948. d1 := fmt.Sprintf("%f", value.Value)
  949. toAppend := []string{
  950. d0,
  951. d1,
  952. }
  953. totals = append(totals, toAppend)
  954. }
  955. return totals, nil
  956. }
  957. // ClusterCostsOverTime gives the full cluster costs over time
  958. func ClusterCostsOverTime(dataSource source.OpenCostDataSource, provider models.Provider, start, end time.Time, window, offset time.Duration) (*Totals, error) {
  959. providerName := ""
  960. if clusterInfo, err := provider.ClusterInfo(); err != nil {
  961. providerName = clusterInfo["provider"]
  962. }
  963. grp := source.NewQueryGroup()
  964. qCores := grp.With(dataSource.QueryClusterCores(start, end, window))
  965. qRAM := grp.With(dataSource.QueryClusterRAM(start, end, window))
  966. qStorage := grp.With(dataSource.QueryClusterStorageByProvider(providerName, start, end, window))
  967. qTotal := grp.With(dataSource.QueryClusterTotalByProvider(providerName, start, end, window))
  968. resultClusterCores, _ := qCores.Await()
  969. resultClusterRAM, _ := qRAM.Await()
  970. resultStorage, _ := qStorage.Await()
  971. resultTotal, _ := qTotal.Await()
  972. if grp.HasErrors() {
  973. return nil, grp.Error()
  974. }
  975. coreTotal, err := resultToTotals(resultClusterCores)
  976. if err != nil {
  977. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  978. return nil, err
  979. }
  980. ramTotal, err := resultToTotals(resultClusterRAM)
  981. if err != nil {
  982. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  983. return nil, err
  984. }
  985. storageTotal, err := resultToTotals(resultStorage)
  986. if err != nil {
  987. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  988. }
  989. clusterTotal, err := resultToTotals(resultTotal)
  990. if err != nil {
  991. // If clusterTotal query failed, it's likely because there are no PVs, which
  992. // causes the qTotal query to return no data. Instead, query only node costs.
  993. // If that fails, return an error because something is actually wrong.
  994. qNodes := grp.With(dataSource.QueryClusterNodesByProvider(providerName, start, end, window))
  995. resultNodes, err := qNodes.Await()
  996. if err != nil {
  997. return nil, err
  998. }
  999. clusterTotal, err = resultToTotals(resultNodes)
  1000. if err != nil {
  1001. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  1002. return nil, err
  1003. }
  1004. }
  1005. return &Totals{
  1006. TotalCost: clusterTotal,
  1007. CPUCost: coreTotal,
  1008. MemCost: ramTotal,
  1009. StorageCost: storageTotal,
  1010. }, nil
  1011. }
  1012. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo []*source.QueryResult, cp models.Provider, window opencost.Window) {
  1013. for _, result := range resActiveMins {
  1014. cluster, err := result.GetCluster()
  1015. if err != nil {
  1016. cluster = env.GetClusterID()
  1017. }
  1018. name, err := result.GetString("persistentvolume")
  1019. if err != nil {
  1020. log.Warnf("ClusterDisks: active mins missing pv name")
  1021. continue
  1022. }
  1023. if len(result.Values) == 0 {
  1024. continue
  1025. }
  1026. key := DiskIdentifier{cluster, name}
  1027. if _, ok := diskMap[key]; !ok {
  1028. diskMap[key] = &Disk{
  1029. Cluster: cluster,
  1030. Name: name,
  1031. Breakdown: &ClusterCostsBreakdown{},
  1032. }
  1033. }
  1034. s, e := calculateStartAndEnd(result, resolution, window)
  1035. mins := e.Sub(s).Minutes()
  1036. diskMap[key].End = e
  1037. diskMap[key].Start = s
  1038. diskMap[key].Minutes = mins
  1039. }
  1040. for _, result := range resPVSize {
  1041. cluster, err := result.GetCluster()
  1042. if err != nil {
  1043. cluster = env.GetClusterID()
  1044. }
  1045. name, err := result.GetString("persistentvolume")
  1046. if err != nil {
  1047. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  1048. continue
  1049. }
  1050. // TODO niko/assets storage class
  1051. bytes := result.Values[0].Value
  1052. key := DiskIdentifier{cluster, name}
  1053. if _, ok := diskMap[key]; !ok {
  1054. diskMap[key] = &Disk{
  1055. Cluster: cluster,
  1056. Name: name,
  1057. Breakdown: &ClusterCostsBreakdown{},
  1058. }
  1059. }
  1060. diskMap[key].Bytes = bytes
  1061. }
  1062. customPricingEnabled := provider.CustomPricesEnabled(cp)
  1063. customPricingConfig, err := cp.GetConfig()
  1064. if err != nil {
  1065. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1066. }
  1067. for _, result := range resPVCost {
  1068. cluster, err := result.GetCluster()
  1069. if err != nil {
  1070. cluster = env.GetClusterID()
  1071. }
  1072. name, err := result.GetString("persistentvolume")
  1073. if err != nil {
  1074. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1075. continue
  1076. }
  1077. // TODO niko/assets storage class
  1078. var cost float64
  1079. if customPricingEnabled && customPricingConfig != nil {
  1080. customPVCostStr := customPricingConfig.Storage
  1081. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1082. if err != nil {
  1083. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1084. }
  1085. cost = customPVCost
  1086. } else {
  1087. cost = result.Values[0].Value
  1088. }
  1089. key := DiskIdentifier{cluster, name}
  1090. if _, ok := diskMap[key]; !ok {
  1091. diskMap[key] = &Disk{
  1092. Cluster: cluster,
  1093. Name: name,
  1094. Breakdown: &ClusterCostsBreakdown{},
  1095. }
  1096. }
  1097. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1098. providerID, _ := result.GetProviderID() // just put the providerID set up here, it's the simplest query.
  1099. if providerID != "" {
  1100. diskMap[key].ProviderID = provider.ParsePVID(providerID)
  1101. }
  1102. }
  1103. for _, result := range resPVUsedAvg {
  1104. cluster, err := result.GetCluster()
  1105. if err != nil {
  1106. cluster = env.GetClusterID()
  1107. }
  1108. claimName, err := result.GetString("persistentvolumeclaim")
  1109. if err != nil {
  1110. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1111. continue
  1112. }
  1113. claimNamespace, err := result.GetNamespace()
  1114. if err != nil {
  1115. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1116. continue
  1117. }
  1118. var volumeName string
  1119. for _, thatRes := range resPVCInfo {
  1120. thatCluster, err := thatRes.GetCluster()
  1121. if err != nil {
  1122. thatCluster = env.GetClusterID()
  1123. }
  1124. thatVolumeName, err := thatRes.GetString("volumename")
  1125. if err != nil {
  1126. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1127. continue
  1128. }
  1129. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1130. if err != nil {
  1131. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1132. continue
  1133. }
  1134. thatClaimNamespace, err := thatRes.GetNamespace()
  1135. if err != nil {
  1136. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1137. continue
  1138. }
  1139. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1140. volumeName = thatVolumeName
  1141. }
  1142. }
  1143. usage := result.Values[0].Value
  1144. key := DiskIdentifier{cluster, volumeName}
  1145. if _, ok := diskMap[key]; !ok {
  1146. diskMap[key] = &Disk{
  1147. Cluster: cluster,
  1148. Name: volumeName,
  1149. Breakdown: &ClusterCostsBreakdown{},
  1150. }
  1151. }
  1152. diskMap[key].BytesUsedAvgPtr = &usage
  1153. }
  1154. for _, result := range resPVUsedMax {
  1155. cluster, err := result.GetCluster()
  1156. if err != nil {
  1157. cluster = env.GetClusterID()
  1158. }
  1159. claimName, err := result.GetString("persistentvolumeclaim")
  1160. if err != nil {
  1161. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1162. continue
  1163. }
  1164. claimNamespace, err := result.GetNamespace()
  1165. if err != nil {
  1166. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1167. continue
  1168. }
  1169. var volumeName string
  1170. for _, thatRes := range resPVCInfo {
  1171. thatCluster, err := thatRes.GetCluster()
  1172. if err != nil {
  1173. thatCluster = env.GetClusterID()
  1174. }
  1175. thatVolumeName, err := thatRes.GetString("volumename")
  1176. if err != nil {
  1177. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1178. continue
  1179. }
  1180. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1181. if err != nil {
  1182. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1183. continue
  1184. }
  1185. thatClaimNamespace, err := thatRes.GetNamespace()
  1186. if err != nil {
  1187. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1188. continue
  1189. }
  1190. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1191. volumeName = thatVolumeName
  1192. }
  1193. }
  1194. usage := result.Values[0].Value
  1195. key := DiskIdentifier{cluster, volumeName}
  1196. if _, ok := diskMap[key]; !ok {
  1197. diskMap[key] = &Disk{
  1198. Cluster: cluster,
  1199. Name: volumeName,
  1200. Breakdown: &ClusterCostsBreakdown{},
  1201. }
  1202. }
  1203. diskMap[key].BytesUsedMaxPtr = &usage
  1204. }
  1205. }
  1206. // filterOutLocalPVs removes local Persistent Volumes (PVs) from the given disk map.
  1207. // Local PVs are identified by the prefix "local-pv-" in their names, which is the
  1208. // convention used by sig-storage-local-static-provisioner.
  1209. //
  1210. // Parameters:
  1211. // - diskMap: A map of DiskIdentifier to Disk pointers, representing all PVs.
  1212. //
  1213. // Returns:
  1214. // - A new map of DiskIdentifier to Disk pointers, containing only non-local PVs.
  1215. func filterOutLocalPVs(diskMap map[DiskIdentifier]*Disk) map[DiskIdentifier]*Disk {
  1216. nonLocalPVDiskMap := map[DiskIdentifier]*Disk{}
  1217. for key, val := range diskMap {
  1218. if !strings.HasPrefix(key.Name, SIG_STORAGE_LOCAL_PROVISIONER_PREFIX) {
  1219. nonLocalPVDiskMap[key] = val
  1220. }
  1221. }
  1222. return nonLocalPVDiskMap
  1223. }