cluster.go 61 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678
  1. package costmodel
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/pkg/cloud/provider"
  9. prometheus "github.com/prometheus/client_golang/api"
  10. "golang.org/x/exp/slices"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/opencost"
  13. "github.com/opencost/opencost/core/pkg/util/timeutil"
  14. "github.com/opencost/opencost/pkg/cloud/models"
  15. "github.com/opencost/opencost/pkg/env"
  16. "github.com/opencost/opencost/pkg/prom"
  17. )
  18. const (
  19. queryClusterCores = `sum(
  20. avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost{%s}[%s] %s)) by (node, %s) * 730 +
  21. avg(avg_over_time(node_gpu_hourly_cost{%s}[%s] %s)) by (node, %s) * 730
  22. ) by (%s)`
  23. queryClusterRAM = `sum(
  24. avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost{%s}[%s] %s)) by (node, %s) * 730
  25. ) by (%s)`
  26. queryStorage = `sum(
  27. avg(avg_over_time(pv_hourly_cost{%s}[%s] %s)) by (persistentvolume, %s) * 730
  28. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  29. ) by (%s) %s`
  30. queryTotal = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 +
  31. sum(
  32. avg(avg_over_time(pv_hourly_cost{%s}[1h])) by (persistentvolume, %s) * 730
  33. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  34. ) by (%s) %s`
  35. queryNodes = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 %s`
  36. )
  37. const MAX_LOCAL_STORAGE_SIZE = 1024 * 1024 * 1024 * 1024
  38. // When ASSET_INCLUDE_LOCAL_DISK_COST is set to false, local storage
  39. // provisioned by sig-storage-local-static-provisioner is excluded
  40. // by checking if the volume is prefixed by "local-pv-".
  41. //
  42. // This is based on the sig-storage-local-static-provisioner implementation,
  43. // which creates all PVs with the "local-pv-" prefix. For reference, see:
  44. // https://github.com/kubernetes-sigs/sig-storage-local-static-provisioner/blob/b6f465027bd059e92c0032c81dd1e1d90e35c909/pkg/discovery/discovery.go#L410-L417
  45. const SIG_STORAGE_LOCAL_PROVISIONER_PREFIX = "local-pv-"
  46. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  47. // are broken down by cores, memory, and storage.
  48. type ClusterCosts struct {
  49. Start *time.Time `json:"startTime"`
  50. End *time.Time `json:"endTime"`
  51. CPUCumulative float64 `json:"cpuCumulativeCost"`
  52. CPUMonthly float64 `json:"cpuMonthlyCost"`
  53. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  54. GPUCumulative float64 `json:"gpuCumulativeCost"`
  55. GPUMonthly float64 `json:"gpuMonthlyCost"`
  56. RAMCumulative float64 `json:"ramCumulativeCost"`
  57. RAMMonthly float64 `json:"ramMonthlyCost"`
  58. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  59. StorageCumulative float64 `json:"storageCumulativeCost"`
  60. StorageMonthly float64 `json:"storageMonthlyCost"`
  61. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  62. TotalCumulative float64 `json:"totalCumulativeCost"`
  63. TotalMonthly float64 `json:"totalMonthlyCost"`
  64. DataMinutes float64
  65. }
  66. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  67. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  68. type ClusterCostsBreakdown struct {
  69. Idle float64 `json:"idle"`
  70. Other float64 `json:"other"`
  71. System float64 `json:"system"`
  72. User float64 `json:"user"`
  73. }
  74. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  75. // the associated monthly rate data, and returns the Costs.
  76. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  77. start, end := timeutil.ParseTimeRange(window, offset)
  78. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  79. if dataHours == 0 {
  80. dataHours = end.Sub(start).Hours()
  81. }
  82. // Do not allow zero-length windows to prevent divide-by-zero issues
  83. if dataHours == 0 {
  84. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  85. }
  86. cc := &ClusterCosts{
  87. Start: &start,
  88. End: &end,
  89. CPUCumulative: cpu,
  90. GPUCumulative: gpu,
  91. RAMCumulative: ram,
  92. StorageCumulative: storage,
  93. TotalCumulative: cpu + gpu + ram + storage,
  94. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  95. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  96. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  97. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  98. }
  99. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  100. return cc, nil
  101. }
  102. type Disk struct {
  103. Cluster string
  104. Name string
  105. ProviderID string
  106. StorageClass string
  107. VolumeName string
  108. ClaimName string
  109. ClaimNamespace string
  110. Cost float64
  111. Bytes float64
  112. // These two fields may not be available at all times because they rely on
  113. // a new set of metrics that may or may not be available. Thus, they must
  114. // be nilable to represent the complete absence of the data.
  115. //
  116. // In other words, nilability here lets us distinguish between
  117. // "metric is not available" and "metric is available but is 0".
  118. //
  119. // They end in "Ptr" to distinguish from an earlier version in order to
  120. // ensure that all usages are checked for nil.
  121. BytesUsedAvgPtr *float64
  122. BytesUsedMaxPtr *float64
  123. Local bool
  124. Start time.Time
  125. End time.Time
  126. Minutes float64
  127. Breakdown *ClusterCostsBreakdown
  128. }
  129. type DiskIdentifier struct {
  130. Cluster string
  131. Name string
  132. }
  133. func ClusterDisks(client prometheus.Client, cp models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  134. // Start from the time "end", querying backwards
  135. t := end
  136. // minsPerResolution determines accuracy and resource use for the following
  137. // queries. Smaller values (higher resolution) result in better accuracy,
  138. // but more expensive queries, and vice-a-versa.
  139. resolution := env.GetETLResolution()
  140. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  141. var minsPerResolution int
  142. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  143. minsPerResolution = 1
  144. log.DedupedWarningf(3, "ClusterDisks(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  145. }
  146. durStr := timeutil.DurationString(end.Sub(start))
  147. if durStr == "" {
  148. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  149. }
  150. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  151. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (%s, persistentvolume,provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  152. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (%s, persistentvolume)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  153. queryActiveMins := fmt.Sprintf(`avg(kube_persistentvolume_capacity_bytes{%s}) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  154. queryPVStorageClass := fmt.Sprintf(`avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, persistentvolume, storageclass)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  155. queryPVUsedAvg := fmt.Sprintf(`avg(avg_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  156. queryPVUsedMax := fmt.Sprintf(`max(max_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  157. queryPVCInfo := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolumeclaim_info{%s}[%s])) by (%s, volumename, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  158. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  159. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  160. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  161. resChPVStorageClass := ctx.QueryAtTime(queryPVStorageClass, t)
  162. resChPVUsedAvg := ctx.QueryAtTime(queryPVUsedAvg, t)
  163. resChPVUsedMax := ctx.QueryAtTime(queryPVUsedMax, t)
  164. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, t)
  165. resPVCost, _ := resChPVCost.Await()
  166. resPVSize, _ := resChPVSize.Await()
  167. resActiveMins, _ := resChActiveMins.Await()
  168. resPVStorageClass, _ := resChPVStorageClass.Await()
  169. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  170. resPVUsedMax, _ := resChPVUsedMax.Await()
  171. resPVCInfo, _ := resChPVCInfo.Await()
  172. // Cloud providers do not always charge for a node's local disk costs (i.e.
  173. // ephemeral storage). Provide an option to opt out of calculating &
  174. // allocating local disk costs. Note, that this does not affect
  175. // PersistentVolume costs.
  176. //
  177. // Ref:
  178. // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/RootDeviceStorage.html
  179. // https://learn.microsoft.com/en-us/azure/virtual-machines/managed-disks-overview#temporary-disk
  180. // https://cloud.google.com/compute/docs/disks/local-ssd
  181. resLocalStorageCost := []*prom.QueryResult{}
  182. resLocalStorageUsedCost := []*prom.QueryResult{}
  183. resLocalStorageUsedAvg := []*prom.QueryResult{}
  184. resLocalStorageUsedMax := []*prom.QueryResult{}
  185. resLocalStorageBytes := []*prom.QueryResult{}
  186. resLocalActiveMins := []*prom.QueryResult{}
  187. if env.GetAssetIncludeLocalDiskCost() {
  188. // hourlyToCumulative is a scaling factor that, when multiplied by an
  189. // hourly value, converts it to a cumulative value; i.e. [$/hr] *
  190. // [min/res]*[hr/min] = [$/res]
  191. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  192. costPerGBHr := 0.04 / 730.0
  193. // container_fs metrics contains metrics for disks that are not local storage of the node. While not perfect to
  194. // attempt to identify the correct device which is being used as local storage we first filter for devices mounted
  195. // at paths `/dev/nvme.*` or `/dev/sda.*`. There still may be multiple devices mounted at paths matching the regex
  196. // so later on we will select the device with the highest `container_fs_limit_bytes` per instance to create a local disk asset
  197. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  198. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  199. queryLocalStorageUsedAvg := fmt.Sprintf(`avg(sum(avg_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  200. queryLocalStorageUsedMax := fmt.Sprintf(`max(sum(max_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  201. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm])`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  202. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost{%s}) by (%s, node)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  203. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  204. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  205. resChLocalStoreageUsedAvg := ctx.QueryAtTime(queryLocalStorageUsedAvg, t)
  206. resChLocalStoreageUsedMax := ctx.QueryAtTime(queryLocalStorageUsedMax, t)
  207. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  208. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  209. resLocalStorageCost, _ = resChLocalStorageCost.Await()
  210. resLocalStorageUsedCost, _ = resChLocalStorageUsedCost.Await()
  211. resLocalStorageUsedAvg, _ = resChLocalStoreageUsedAvg.Await()
  212. resLocalStorageUsedMax, _ = resChLocalStoreageUsedMax.Await()
  213. resLocalStorageBytes, _ = resChLocalStorageBytes.Await()
  214. resLocalActiveMins, _ = resChLocalActiveMins.Await()
  215. }
  216. if ctx.HasErrors() {
  217. return nil, ctx.ErrorCollection()
  218. }
  219. diskMap := map[DiskIdentifier]*Disk{}
  220. for _, result := range resPVCInfo {
  221. cluster, err := result.GetString(env.GetPromClusterLabel())
  222. if err != nil {
  223. cluster = env.GetClusterID()
  224. }
  225. volumeName, err := result.GetString("volumename")
  226. if err != nil {
  227. log.Debugf("ClusterDisks: pv claim data missing volumename")
  228. continue
  229. }
  230. claimName, err := result.GetString("persistentvolumeclaim")
  231. if err != nil {
  232. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  233. continue
  234. }
  235. claimNamespace, err := result.GetString("namespace")
  236. if err != nil {
  237. log.Debugf("ClusterDisks: pv claim data missing namespace")
  238. continue
  239. }
  240. key := DiskIdentifier{cluster, volumeName}
  241. if _, ok := diskMap[key]; !ok {
  242. diskMap[key] = &Disk{
  243. Cluster: cluster,
  244. Name: volumeName,
  245. Breakdown: &ClusterCostsBreakdown{},
  246. }
  247. }
  248. diskMap[key].VolumeName = volumeName
  249. diskMap[key].ClaimName = claimName
  250. diskMap[key].ClaimNamespace = claimNamespace
  251. }
  252. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, cp, opencost.NewClosedWindow(start, end))
  253. type localStorage struct {
  254. device string
  255. disk *Disk
  256. }
  257. localStorageDisks := map[DiskIdentifier]localStorage{}
  258. // Start with local storage bytes so that the device with the largest size which has passed the
  259. // query filters can be determined
  260. for _, result := range resLocalStorageBytes {
  261. cluster, err := result.GetString(env.GetPromClusterLabel())
  262. if err != nil {
  263. cluster = env.GetClusterID()
  264. }
  265. name, err := result.GetString("instance")
  266. if err != nil {
  267. log.Warnf("ClusterDisks: local storage data missing instance")
  268. continue
  269. }
  270. device, err := result.GetString("device")
  271. if err != nil {
  272. log.Warnf("ClusterDisks: local storage data missing device")
  273. continue
  274. }
  275. bytes := result.Values[0].Value
  276. // Ignore disks that are larger than the max size
  277. if bytes > MAX_LOCAL_STORAGE_SIZE {
  278. continue
  279. }
  280. key := DiskIdentifier{cluster, name}
  281. // only keep the device with the most bytes per instance
  282. if current, ok := localStorageDisks[key]; !ok || current.disk.Bytes < bytes {
  283. localStorageDisks[key] = localStorage{
  284. device: device,
  285. disk: &Disk{
  286. Cluster: cluster,
  287. Name: name,
  288. Breakdown: &ClusterCostsBreakdown{},
  289. Local: true,
  290. StorageClass: opencost.LocalStorageClass,
  291. Bytes: bytes,
  292. },
  293. }
  294. }
  295. }
  296. for _, result := range resLocalStorageCost {
  297. cluster, err := result.GetString(env.GetPromClusterLabel())
  298. if err != nil {
  299. cluster = env.GetClusterID()
  300. }
  301. name, err := result.GetString("instance")
  302. if err != nil {
  303. log.Warnf("ClusterDisks: local storage data missing instance")
  304. continue
  305. }
  306. device, err := result.GetString("device")
  307. if err != nil {
  308. log.Warnf("ClusterDisks: local storage data missing device")
  309. continue
  310. }
  311. cost := result.Values[0].Value
  312. key := DiskIdentifier{cluster, name}
  313. ls, ok := localStorageDisks[key]
  314. if !ok || ls.device != device {
  315. continue
  316. }
  317. ls.disk.Cost = cost
  318. }
  319. for _, result := range resLocalStorageUsedCost {
  320. cluster, err := result.GetString(env.GetPromClusterLabel())
  321. if err != nil {
  322. cluster = env.GetClusterID()
  323. }
  324. name, err := result.GetString("instance")
  325. if err != nil {
  326. log.Warnf("ClusterDisks: local storage usage data missing instance")
  327. continue
  328. }
  329. device, err := result.GetString("device")
  330. if err != nil {
  331. log.Warnf("ClusterDisks: local storage data missing device")
  332. continue
  333. }
  334. cost := result.Values[0].Value
  335. key := DiskIdentifier{cluster, name}
  336. ls, ok := localStorageDisks[key]
  337. if !ok || ls.device != device {
  338. continue
  339. }
  340. ls.disk.Breakdown.System = cost / ls.disk.Cost
  341. }
  342. for _, result := range resLocalStorageUsedAvg {
  343. cluster, err := result.GetString(env.GetPromClusterLabel())
  344. if err != nil {
  345. cluster = env.GetClusterID()
  346. }
  347. name, err := result.GetString("instance")
  348. if err != nil {
  349. log.Warnf("ClusterDisks: local storage data missing instance")
  350. continue
  351. }
  352. device, err := result.GetString("device")
  353. if err != nil {
  354. log.Warnf("ClusterDisks: local storage data missing device")
  355. continue
  356. }
  357. bytesAvg := result.Values[0].Value
  358. key := DiskIdentifier{cluster, name}
  359. ls, ok := localStorageDisks[key]
  360. if !ok || ls.device != device {
  361. continue
  362. }
  363. ls.disk.BytesUsedAvgPtr = &bytesAvg
  364. }
  365. for _, result := range resLocalStorageUsedMax {
  366. cluster, err := result.GetString(env.GetPromClusterLabel())
  367. if err != nil {
  368. cluster = env.GetClusterID()
  369. }
  370. name, err := result.GetString("instance")
  371. if err != nil {
  372. log.Warnf("ClusterDisks: local storage data missing instance")
  373. continue
  374. }
  375. device, err := result.GetString("device")
  376. if err != nil {
  377. log.Warnf("ClusterDisks: local storage data missing device")
  378. continue
  379. }
  380. bytesMax := result.Values[0].Value
  381. key := DiskIdentifier{cluster, name}
  382. ls, ok := localStorageDisks[key]
  383. if !ok || ls.device != device {
  384. continue
  385. }
  386. ls.disk.BytesUsedMaxPtr = &bytesMax
  387. }
  388. for _, result := range resLocalActiveMins {
  389. cluster, err := result.GetString(env.GetPromClusterLabel())
  390. if err != nil {
  391. cluster = env.GetClusterID()
  392. }
  393. name, err := result.GetString("node")
  394. if err != nil {
  395. log.DedupedWarningf(3, "ClusterDisks: local active mins data missing 'node' label")
  396. continue
  397. }
  398. providerID, err := result.GetString("provider_id")
  399. if err != nil {
  400. log.DedupedWarningf(3, "ClusterDisks: local active mins data missing 'provider_id' label")
  401. continue
  402. }
  403. key := DiskIdentifier{cluster, name}
  404. ls, ok := localStorageDisks[key]
  405. if !ok {
  406. continue
  407. }
  408. ls.disk.ProviderID = provider.ParseLocalDiskID(providerID)
  409. if len(result.Values) == 0 {
  410. continue
  411. }
  412. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  413. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  414. mins := e.Sub(s).Minutes()
  415. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  416. ls.disk.End = e
  417. ls.disk.Start = s
  418. ls.disk.Minutes = mins
  419. }
  420. // move local storage disks to main disk map
  421. for key, ls := range localStorageDisks {
  422. diskMap[key] = ls.disk
  423. }
  424. var unTracedDiskLogData []DiskIdentifier
  425. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  426. for _, result := range resPVStorageClass {
  427. cluster, err := result.GetString(env.GetPromClusterLabel())
  428. if err != nil {
  429. cluster = env.GetClusterID()
  430. }
  431. name, _ := result.GetString("persistentvolume")
  432. key := DiskIdentifier{cluster, name}
  433. if _, ok := diskMap[key]; !ok {
  434. if !slices.Contains(unTracedDiskLogData, key) {
  435. unTracedDiskLogData = append(unTracedDiskLogData, key)
  436. }
  437. continue
  438. }
  439. if len(result.Values) == 0 {
  440. continue
  441. }
  442. storageClass, err := result.GetString("storageclass")
  443. if err != nil {
  444. diskMap[key].StorageClass = opencost.UnknownStorageClass
  445. } else {
  446. diskMap[key].StorageClass = storageClass
  447. }
  448. }
  449. // Logging the unidentified disk information outside the loop
  450. for _, unIdentifiedDisk := range unTracedDiskLogData {
  451. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  452. }
  453. for _, disk := range diskMap {
  454. // Apply all remaining RAM to Idle
  455. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  456. // Set provider Id to the name for reconciliation
  457. if disk.ProviderID == "" {
  458. disk.ProviderID = disk.Name
  459. }
  460. }
  461. if !env.GetAssetIncludeLocalDiskCost() {
  462. return filterOutLocalPVs(diskMap), nil
  463. }
  464. return diskMap, nil
  465. }
  466. type NodeOverhead struct {
  467. CpuOverheadFraction float64
  468. RamOverheadFraction float64
  469. }
  470. type Node struct {
  471. Cluster string
  472. Name string
  473. ProviderID string
  474. NodeType string
  475. CPUCost float64
  476. CPUCores float64
  477. GPUCost float64
  478. GPUCount float64
  479. RAMCost float64
  480. RAMBytes float64
  481. Discount float64
  482. Preemptible bool
  483. CPUBreakdown *ClusterCostsBreakdown
  484. RAMBreakdown *ClusterCostsBreakdown
  485. Start time.Time
  486. End time.Time
  487. Minutes float64
  488. Labels map[string]string
  489. CostPerCPUHr float64
  490. CostPerRAMGiBHr float64
  491. CostPerGPUHr float64
  492. Overhead *NodeOverhead
  493. }
  494. // GKE lies about the number of cores e2 nodes have. This table
  495. // contains a mapping from node type -> actual CPU cores
  496. // for those cases.
  497. var partialCPUMap = map[string]float64{
  498. "e2-micro": 0.25,
  499. "e2-small": 0.5,
  500. "e2-medium": 1.0,
  501. }
  502. type NodeIdentifier struct {
  503. Cluster string
  504. Name string
  505. ProviderID string
  506. }
  507. type nodeIdentifierNoProviderID struct {
  508. Cluster string
  509. Name string
  510. }
  511. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  512. for k, v := range activeDataMap {
  513. keyNon := nodeIdentifierNoProviderID{
  514. Cluster: k.Cluster,
  515. Name: k.Name,
  516. }
  517. if cost, ok := costMap[k]; ok {
  518. minutes := v.minutes
  519. count := 1.0
  520. if c, ok := resourceCountMap[keyNon]; ok {
  521. count = c
  522. }
  523. costMap[k] = cost * (minutes / 60) * count
  524. }
  525. }
  526. }
  527. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  528. for k, v := range activeDataMap {
  529. if cost, ok := costMap[k]; ok {
  530. minutes := v.minutes
  531. costMap[k] = cost * (minutes / 60)
  532. }
  533. }
  534. }
  535. func ClusterNodes(cp models.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  536. // Start from the time "end", querying backwards
  537. t := end
  538. // minsPerResolution determines accuracy and resource use for the following
  539. // queries. Smaller values (higher resolution) result in better accuracy,
  540. // but more expensive queries, and vice-a-versa.
  541. resolution := env.GetETLResolution()
  542. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  543. var minsPerResolution int
  544. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  545. minsPerResolution = 1
  546. log.DedupedWarningf(3, "ClusterNodes(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  547. }
  548. durStr := timeutil.DurationString(end.Sub(start))
  549. if durStr == "" {
  550. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  551. }
  552. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  553. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  554. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  555. queryNodeCPUCoresCapacity := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  556. queryNodeCPUCoresAllocatable := fmt.Sprintf(`avg(avg_over_time(kube_node_status_allocatable_cpu_cores{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  557. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  558. queryNodeRAMBytesCapacity := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  559. queryNodeRAMBytesAllocatable := fmt.Sprintf(`avg(avg_over_time(kube_node_status_allocatable_memory_bytes{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  560. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count{%s}[%s])) by (%s, node, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  561. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  562. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total{%s}[%s:%dm])) by (kubernetes_node, %s, mode)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel())
  563. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system", %s}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  564. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system", %s}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  565. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost{%s}) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  566. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot{%s}[%s:%dm])`, env.GetPromClusterFilter(), durStr, minsPerResolution)
  567. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels{%s}[%s:%dm])`, env.GetPromClusterFilter(), durStr, minsPerResolution)
  568. // Return errors if these fail
  569. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  570. resChNodeCPUCoresCapacity := requiredCtx.QueryAtTime(queryNodeCPUCoresCapacity, t)
  571. resChNodeCPUCoresAllocatable := requiredCtx.QueryAtTime(queryNodeCPUCoresAllocatable, t)
  572. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  573. resChNodeRAMBytesCapacity := requiredCtx.QueryAtTime(queryNodeRAMBytesCapacity, t)
  574. resChNodeRAMBytesAllocatable := requiredCtx.QueryAtTime(queryNodeRAMBytesAllocatable, t)
  575. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  576. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  577. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  578. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  579. // Do not return errors if these fail, but log warnings
  580. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  581. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  582. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  583. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  584. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  585. resNodeCPUCoresCapacity, _ := resChNodeCPUCoresCapacity.Await()
  586. resNodeCPUCoresAllocatable, _ := resChNodeCPUCoresAllocatable.Await()
  587. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  588. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  589. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  590. resNodeRAMBytesCapacity, _ := resChNodeRAMBytesCapacity.Await()
  591. resNodeRAMBytesAllocatable, _ := resChNodeRAMBytesAllocatable.Await()
  592. resIsSpot, _ := resChIsSpot.Await()
  593. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  594. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  595. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  596. resActiveMins, _ := resChActiveMins.Await()
  597. resLabels, _ := resChLabels.Await()
  598. if optionalCtx.HasErrors() {
  599. for _, err := range optionalCtx.Errors() {
  600. log.Warnf("ClusterNodes: %s", err)
  601. }
  602. }
  603. if requiredCtx.HasErrors() {
  604. for _, err := range requiredCtx.Errors() {
  605. log.Errorf("ClusterNodes: %s", err)
  606. }
  607. return nil, requiredCtx.ErrorCollection()
  608. }
  609. activeDataMap := buildActiveDataMap(resActiveMins, resolution, opencost.NewClosedWindow(start, end))
  610. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  611. preemptibleMap := buildPreemptibleMap(resIsSpot)
  612. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  613. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  614. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  615. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  616. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  617. cpuCoresCapacityMap := buildCPUCoresMap(resNodeCPUCoresCapacity)
  618. ramBytesCapacityMap := buildRAMBytesMap(resNodeRAMBytesCapacity)
  619. cpuCoresAllocatableMap := buildCPUCoresMap(resNodeCPUCoresAllocatable)
  620. ramBytesAllocatableMap := buildRAMBytesMap(resNodeRAMBytesAllocatable)
  621. overheadMap := buildOverheadMap(ramBytesCapacityMap, ramBytesAllocatableMap, cpuCoresCapacityMap, cpuCoresAllocatableMap)
  622. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  623. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  624. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  625. labelsMap := buildLabelsMap(resLabels)
  626. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresCapacityMap)
  627. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesCapacityMap)
  628. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  629. nodeMap := buildNodeMap(
  630. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  631. cpuCoresCapacityMap, ramBytesCapacityMap, ramUserPctMap,
  632. ramSystemPctMap,
  633. cpuBreakdownMap,
  634. activeDataMap,
  635. preemptibleMap,
  636. labelsMap,
  637. clusterAndNameToType,
  638. resolution,
  639. overheadMap,
  640. )
  641. c, err := cp.GetConfig()
  642. if err != nil {
  643. return nil, err
  644. }
  645. discount, err := ParsePercentString(c.Discount)
  646. if err != nil {
  647. return nil, err
  648. }
  649. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  650. if err != nil {
  651. return nil, err
  652. }
  653. for _, node := range nodeMap {
  654. // TODO take GKE Reserved Instances into account
  655. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  656. // Apply all remaining resources to Idle
  657. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  658. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  659. }
  660. return nodeMap, nil
  661. }
  662. type LoadBalancerIdentifier struct {
  663. Cluster string
  664. Namespace string
  665. Name string
  666. }
  667. type LoadBalancer struct {
  668. Cluster string
  669. Namespace string
  670. Name string
  671. ProviderID string
  672. Cost float64
  673. Start time.Time
  674. End time.Time
  675. Minutes float64
  676. Private bool
  677. Ip string
  678. }
  679. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  680. // Start from the time "end", querying backwards
  681. t := end
  682. // minsPerResolution determines accuracy and resource use for the following
  683. // queries. Smaller values (higher resolution) result in better accuracy,
  684. // but more expensive queries, and vice-a-versa.
  685. resolution := env.GetETLResolution()
  686. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  687. var minsPerResolution int
  688. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  689. minsPerResolution = 1
  690. log.DedupedWarningf(3, "ClusterLoadBalancers(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  691. }
  692. // Query for the duration between start and end
  693. durStr := timeutil.DurationString(end.Sub(start))
  694. if durStr == "" {
  695. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  696. }
  697. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  698. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, %s, ingress_ip)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  699. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost{%s}) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  700. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  701. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  702. resLBCost, _ := resChLBCost.Await()
  703. resActiveMins, _ := resChActiveMins.Await()
  704. if ctx.HasErrors() {
  705. return nil, ctx.ErrorCollection()
  706. }
  707. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  708. for _, result := range resActiveMins {
  709. cluster, err := result.GetString(env.GetPromClusterLabel())
  710. if err != nil {
  711. cluster = env.GetClusterID()
  712. }
  713. namespace, err := result.GetString("namespace")
  714. if err != nil {
  715. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  716. continue
  717. }
  718. name, err := result.GetString("service_name")
  719. if err != nil {
  720. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  721. continue
  722. }
  723. providerID, err := result.GetString("ingress_ip")
  724. if err != nil {
  725. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  726. providerID = ""
  727. }
  728. key := LoadBalancerIdentifier{
  729. Cluster: cluster,
  730. Namespace: namespace,
  731. Name: name,
  732. }
  733. // Skip if there are no data
  734. if len(result.Values) == 0 {
  735. continue
  736. }
  737. // Add load balancer to the set of load balancers
  738. if _, ok := loadBalancerMap[key]; !ok {
  739. loadBalancerMap[key] = &LoadBalancer{
  740. Cluster: cluster,
  741. Namespace: namespace,
  742. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  743. ProviderID: provider.ParseLBID(providerID),
  744. }
  745. }
  746. // Append start, end, and minutes. This should come before all other data.
  747. s, e := calculateStartAndEnd(result, resolution, opencost.NewClosedWindow(start, end))
  748. loadBalancerMap[key].Start = s
  749. loadBalancerMap[key].End = e
  750. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  751. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  752. // Prevents there from being a duplicate LoadBalancers on the same day
  753. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  754. loadBalancerMap[key].ProviderID = providerID
  755. }
  756. }
  757. for _, result := range resLBCost {
  758. cluster, err := result.GetString(env.GetPromClusterLabel())
  759. if err != nil {
  760. cluster = env.GetClusterID()
  761. }
  762. namespace, err := result.GetString("namespace")
  763. if err != nil {
  764. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  765. continue
  766. }
  767. name, err := result.GetString("service_name")
  768. if err != nil {
  769. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  770. continue
  771. }
  772. providerID, err := result.GetString("ingress_ip")
  773. if err != nil {
  774. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  775. // only update asset cost when an actual IP was returned
  776. continue
  777. }
  778. key := LoadBalancerIdentifier{
  779. Cluster: cluster,
  780. Namespace: namespace,
  781. Name: name,
  782. }
  783. // Apply cost as price-per-hour * hours
  784. if lb, ok := loadBalancerMap[key]; ok {
  785. lbPricePerHr := result.Values[0].Value
  786. // interpolate any missing data
  787. resultMins := lb.Minutes
  788. if resultMins > 0 {
  789. scaleFactor := (resultMins + resolution.Minutes()) / resultMins
  790. hrs := (lb.Minutes * scaleFactor) / 60.0
  791. lb.Cost += lbPricePerHr * hrs
  792. } else {
  793. log.DedupedWarningf(20, "ClusterLoadBalancers: found zero minutes for key: %v", key)
  794. }
  795. if lb.Ip != "" && lb.Ip != providerID {
  796. log.DedupedWarningf(5, "ClusterLoadBalancers: multiple IPs per load balancer not supported, using most recent IP")
  797. }
  798. lb.Ip = providerID
  799. lb.Private = privateIPCheck(providerID)
  800. } else {
  801. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %v", key)
  802. }
  803. }
  804. return loadBalancerMap, nil
  805. }
  806. // Check if an ip is private.
  807. func privateIPCheck(ip string) bool {
  808. ipAddress := net.ParseIP(ip)
  809. return ipAddress.IsPrivate()
  810. }
  811. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  812. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider models.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  813. if window < 10*time.Minute {
  814. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  815. }
  816. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  817. start, end := timeutil.ParseTimeRange(window, offset)
  818. mins := end.Sub(start).Minutes()
  819. // minsPerResolution determines accuracy and resource use for the following
  820. // queries. Smaller values (higher resolution) result in better accuracy,
  821. // but more expensive queries, and vice-a-versa.
  822. resolution := env.GetETLResolution()
  823. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  824. var minsPerResolution int
  825. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) < 1 {
  826. minsPerResolution = 1
  827. log.DedupedWarningf(3, "ComputeClusterCosts(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  828. }
  829. windowStr := timeutil.DurationString(window)
  830. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  831. // value, converts it to a cumulative value; i.e.
  832. // [$/hr] * [min/res]*[hr/min] = [$/res]
  833. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  834. const fmtQueryDataCount = `
  835. count_over_time(sum(kube_node_status_capacity_cpu_cores{%s}) by (%s)[%s:%dm]%s) * %d
  836. `
  837. const fmtQueryTotalGPU = `
  838. sum(
  839. sum_over_time(node_gpu_hourly_cost{%s}[%s:%dm]%s) * %f
  840. ) by (%s)
  841. `
  842. const fmtQueryTotalCPU = `
  843. sum(
  844. sum_over_time(avg(kube_node_status_capacity_cpu_cores{%s}) by (node, %s)[%s:%dm]%s) *
  845. avg(avg_over_time(node_cpu_hourly_cost{%s}[%s:%dm]%s)) by (node, %s) * %f
  846. ) by (%s)
  847. `
  848. const fmtQueryTotalRAM = `
  849. sum(
  850. sum_over_time(avg(kube_node_status_capacity_memory_bytes{%s}) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  851. avg(avg_over_time(node_ram_hourly_cost{%s}[%s:%dm]%s)) by (node, %s) * %f
  852. ) by (%s)
  853. `
  854. const fmtQueryTotalStorage = `
  855. sum(
  856. sum_over_time(avg(kube_persistentvolume_capacity_bytes{%s}) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  857. avg(avg_over_time(pv_hourly_cost{%s}[%s:%dm]%s)) by (persistentvolume, %s) * %f
  858. ) by (%s)
  859. `
  860. const fmtQueryCPUModePct = `
  861. sum(rate(node_cpu_seconds_total{%s}[%s]%s)) by (%s, mode) / ignoring(mode)
  862. group_left sum(rate(node_cpu_seconds_total{%s}[%s]%s)) by (%s)
  863. `
  864. const fmtQueryRAMSystemPct = `
  865. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system", %s}[%s:%dm]%s)) by (%s)
  866. / sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm]%s)) by (%s)
  867. `
  868. const fmtQueryRAMUserPct = `
  869. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes{%s}[%s:%dm]%s)) by (%s)
  870. / sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm]%s)) by (%s)
  871. `
  872. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  873. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  874. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  875. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  876. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  877. if queryTotalLocalStorage != "" {
  878. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  879. }
  880. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  881. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  882. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  883. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  884. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  885. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  886. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  887. resChs := ctx.QueryAll(
  888. queryDataCount,
  889. queryTotalGPU,
  890. queryTotalCPU,
  891. queryTotalRAM,
  892. queryTotalStorage,
  893. )
  894. // Only submit the local storage query if it is valid. Otherwise Prometheus
  895. // will return errors. Always append something to resChs, regardless, to
  896. // maintain indexing.
  897. if queryTotalLocalStorage != "" {
  898. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  899. } else {
  900. resChs = append(resChs, nil)
  901. }
  902. if withBreakdown {
  903. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, env.GetPromClusterFilter(), windowStr, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, fmtOffset, env.GetPromClusterLabel())
  904. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  905. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  906. bdResChs := ctx.QueryAll(
  907. queryCPUModePct,
  908. queryRAMSystemPct,
  909. queryRAMUserPct,
  910. )
  911. // Only submit the local storage query if it is valid. Otherwise Prometheus
  912. // will return errors. Always append something to resChs, regardless, to
  913. // maintain indexing.
  914. if queryUsedLocalStorage != "" {
  915. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  916. } else {
  917. bdResChs = append(bdResChs, nil)
  918. }
  919. resChs = append(resChs, bdResChs...)
  920. }
  921. resDataCount, _ := resChs[0].Await()
  922. resTotalGPU, _ := resChs[1].Await()
  923. resTotalCPU, _ := resChs[2].Await()
  924. resTotalRAM, _ := resChs[3].Await()
  925. resTotalStorage, _ := resChs[4].Await()
  926. if ctx.HasErrors() {
  927. return nil, ctx.ErrorCollection()
  928. }
  929. defaultClusterID := env.GetClusterID()
  930. dataMinsByCluster := map[string]float64{}
  931. for _, result := range resDataCount {
  932. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  933. if clusterID == "" {
  934. clusterID = defaultClusterID
  935. }
  936. dataMins := mins
  937. if len(result.Values) > 0 {
  938. dataMins = result.Values[0].Value
  939. } else {
  940. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  941. }
  942. dataMinsByCluster[clusterID] = dataMins
  943. }
  944. // Determine combined discount
  945. discount, customDiscount := 0.0, 0.0
  946. c, err := a.CloudProvider.GetConfig()
  947. if err == nil {
  948. discount, err = ParsePercentString(c.Discount)
  949. if err != nil {
  950. discount = 0.0
  951. }
  952. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  953. if err != nil {
  954. customDiscount = 0.0
  955. }
  956. }
  957. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  958. costData := make(map[string]map[string]float64)
  959. // Helper function to iterate over Prom query results, parsing the raw values into
  960. // the intermediate costData structure.
  961. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  962. for _, result := range results {
  963. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  964. if clusterID == "" {
  965. clusterID = defaultClusterID
  966. }
  967. if _, ok := costData[clusterID]; !ok {
  968. costData[clusterID] = map[string]float64{}
  969. }
  970. if len(result.Values) > 0 {
  971. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  972. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  973. }
  974. }
  975. }
  976. // Apply both sustained use and custom discounts to RAM and CPU
  977. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  978. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  979. // Apply only custom discount to GPU and storage
  980. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  981. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  982. if queryTotalLocalStorage != "" {
  983. resTotalLocalStorage, err := resChs[5].Await()
  984. if err != nil {
  985. return nil, err
  986. }
  987. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  988. }
  989. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  990. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  991. pvUsedCostMap := map[string]float64{}
  992. if withBreakdown {
  993. resCPUModePct, _ := resChs[6].Await()
  994. resRAMSystemPct, _ := resChs[7].Await()
  995. resRAMUserPct, _ := resChs[8].Await()
  996. if ctx.HasErrors() {
  997. return nil, ctx.ErrorCollection()
  998. }
  999. for _, result := range resCPUModePct {
  1000. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1001. if clusterID == "" {
  1002. clusterID = defaultClusterID
  1003. }
  1004. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  1005. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  1006. }
  1007. cpuBD := cpuBreakdownMap[clusterID]
  1008. mode, err := result.GetString("mode")
  1009. if err != nil {
  1010. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  1011. mode = "other"
  1012. }
  1013. switch mode {
  1014. case "idle":
  1015. cpuBD.Idle += result.Values[0].Value
  1016. case "system":
  1017. cpuBD.System += result.Values[0].Value
  1018. case "user":
  1019. cpuBD.User += result.Values[0].Value
  1020. default:
  1021. cpuBD.Other += result.Values[0].Value
  1022. }
  1023. }
  1024. for _, result := range resRAMSystemPct {
  1025. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1026. if clusterID == "" {
  1027. clusterID = defaultClusterID
  1028. }
  1029. if _, ok := ramBreakdownMap[clusterID]; !ok {
  1030. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  1031. }
  1032. ramBD := ramBreakdownMap[clusterID]
  1033. ramBD.System += result.Values[0].Value
  1034. }
  1035. for _, result := range resRAMUserPct {
  1036. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1037. if clusterID == "" {
  1038. clusterID = defaultClusterID
  1039. }
  1040. if _, ok := ramBreakdownMap[clusterID]; !ok {
  1041. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  1042. }
  1043. ramBD := ramBreakdownMap[clusterID]
  1044. ramBD.User += result.Values[0].Value
  1045. }
  1046. for _, ramBD := range ramBreakdownMap {
  1047. remaining := 1.0
  1048. remaining -= ramBD.Other
  1049. remaining -= ramBD.System
  1050. remaining -= ramBD.User
  1051. ramBD.Idle = remaining
  1052. }
  1053. if queryUsedLocalStorage != "" {
  1054. resUsedLocalStorage, err := resChs[9].Await()
  1055. if err != nil {
  1056. return nil, err
  1057. }
  1058. for _, result := range resUsedLocalStorage {
  1059. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1060. if clusterID == "" {
  1061. clusterID = defaultClusterID
  1062. }
  1063. pvUsedCostMap[clusterID] += result.Values[0].Value
  1064. }
  1065. }
  1066. }
  1067. if ctx.HasErrors() {
  1068. for _, err := range ctx.Errors() {
  1069. log.Errorf("ComputeClusterCosts: %s", err)
  1070. }
  1071. return nil, ctx.ErrorCollection()
  1072. }
  1073. // Convert intermediate structure to Costs instances
  1074. costsByCluster := map[string]*ClusterCosts{}
  1075. for id, cd := range costData {
  1076. dataMins, ok := dataMinsByCluster[id]
  1077. if !ok {
  1078. dataMins = mins
  1079. log.Warnf("Cluster cost data count not found for cluster %s", id)
  1080. }
  1081. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  1082. if err != nil {
  1083. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  1084. return nil, err
  1085. }
  1086. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  1087. costs.CPUBreakdown = cpuBD
  1088. }
  1089. if ramBD, ok := ramBreakdownMap[id]; ok {
  1090. costs.RAMBreakdown = ramBD
  1091. }
  1092. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  1093. if pvUC, ok := pvUsedCostMap[id]; ok {
  1094. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  1095. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  1096. }
  1097. costs.DataMinutes = dataMins
  1098. costsByCluster[id] = costs
  1099. }
  1100. return costsByCluster, nil
  1101. }
  1102. type Totals struct {
  1103. TotalCost [][]string `json:"totalcost"`
  1104. CPUCost [][]string `json:"cpucost"`
  1105. MemCost [][]string `json:"memcost"`
  1106. StorageCost [][]string `json:"storageCost"`
  1107. }
  1108. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  1109. if len(qrs) == 0 {
  1110. return [][]string{}, fmt.Errorf("not enough data available in the selected time range")
  1111. }
  1112. result := qrs[0]
  1113. totals := [][]string{}
  1114. for _, value := range result.Values {
  1115. d0 := fmt.Sprintf("%f", value.Timestamp)
  1116. d1 := fmt.Sprintf("%f", value.Value)
  1117. toAppend := []string{
  1118. d0,
  1119. d1,
  1120. }
  1121. totals = append(totals, toAppend)
  1122. }
  1123. return totals, nil
  1124. }
  1125. // ClusterCostsOverTime gives the full cluster costs over time
  1126. func ClusterCostsOverTime(cli prometheus.Client, provider models.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  1127. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  1128. if localStorageQuery != "" {
  1129. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  1130. }
  1131. layout := "2006-01-02T15:04:05.000Z"
  1132. start, err := time.Parse(layout, startString)
  1133. if err != nil {
  1134. log.Errorf("Error parsing time %s. Error: %s", startString, err.Error())
  1135. return nil, err
  1136. }
  1137. end, err := time.Parse(layout, endString)
  1138. if err != nil {
  1139. log.Errorf("Error parsing time %s. Error: %s", endString, err.Error())
  1140. return nil, err
  1141. }
  1142. fmtWindow := timeutil.DurationString(window)
  1143. if fmtWindow == "" {
  1144. err := fmt.Errorf("window value invalid or missing")
  1145. log.Errorf("Error parsing time %v. Error: %s", window, err.Error())
  1146. return nil, err
  1147. }
  1148. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  1149. qCores := fmt.Sprintf(queryClusterCores, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1150. qRAM := fmt.Sprintf(queryClusterRAM, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1151. qStorage := fmt.Sprintf(queryStorage, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1152. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1153. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  1154. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  1155. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  1156. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  1157. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  1158. resultClusterCores, err := resChClusterCores.Await()
  1159. if err != nil {
  1160. return nil, err
  1161. }
  1162. resultClusterRAM, err := resChClusterRAM.Await()
  1163. if err != nil {
  1164. return nil, err
  1165. }
  1166. resultStorage, err := resChStorage.Await()
  1167. if err != nil {
  1168. return nil, err
  1169. }
  1170. resultTotal, err := resChTotal.Await()
  1171. if err != nil {
  1172. return nil, err
  1173. }
  1174. coreTotal, err := resultToTotals(resultClusterCores)
  1175. if err != nil {
  1176. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  1177. return nil, err
  1178. }
  1179. ramTotal, err := resultToTotals(resultClusterRAM)
  1180. if err != nil {
  1181. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  1182. return nil, err
  1183. }
  1184. storageTotal, err := resultToTotals(resultStorage)
  1185. if err != nil {
  1186. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  1187. }
  1188. clusterTotal, err := resultToTotals(resultTotal)
  1189. if err != nil {
  1190. // If clusterTotal query failed, it's likely because there are no PVs, which
  1191. // causes the qTotal query to return no data. Instead, query only node costs.
  1192. // If that fails, return an error because something is actually wrong.
  1193. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterFilter(), env.GetPromClusterLabel(), localStorageQuery)
  1194. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  1195. for _, warning := range warnings {
  1196. log.Warnf(warning)
  1197. }
  1198. if err != nil {
  1199. return nil, err
  1200. }
  1201. clusterTotal, err = resultToTotals(resultNodes)
  1202. if err != nil {
  1203. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  1204. return nil, err
  1205. }
  1206. }
  1207. return &Totals{
  1208. TotalCost: clusterTotal,
  1209. CPUCost: coreTotal,
  1210. MemCost: ramTotal,
  1211. StorageCost: storageTotal,
  1212. }, nil
  1213. }
  1214. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo []*prom.QueryResult, cp models.Provider, window opencost.Window) {
  1215. for _, result := range resActiveMins {
  1216. cluster, err := result.GetString(env.GetPromClusterLabel())
  1217. if err != nil {
  1218. cluster = env.GetClusterID()
  1219. }
  1220. name, err := result.GetString("persistentvolume")
  1221. if err != nil {
  1222. log.Warnf("ClusterDisks: active mins missing pv name")
  1223. continue
  1224. }
  1225. if len(result.Values) == 0 {
  1226. continue
  1227. }
  1228. key := DiskIdentifier{cluster, name}
  1229. if _, ok := diskMap[key]; !ok {
  1230. diskMap[key] = &Disk{
  1231. Cluster: cluster,
  1232. Name: name,
  1233. Breakdown: &ClusterCostsBreakdown{},
  1234. }
  1235. }
  1236. s, e := calculateStartAndEnd(result, resolution, window)
  1237. mins := e.Sub(s).Minutes()
  1238. diskMap[key].End = e
  1239. diskMap[key].Start = s
  1240. diskMap[key].Minutes = mins
  1241. }
  1242. for _, result := range resPVSize {
  1243. cluster, err := result.GetString(env.GetPromClusterLabel())
  1244. if err != nil {
  1245. cluster = env.GetClusterID()
  1246. }
  1247. name, err := result.GetString("persistentvolume")
  1248. if err != nil {
  1249. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  1250. continue
  1251. }
  1252. // TODO niko/assets storage class
  1253. bytes := result.Values[0].Value
  1254. key := DiskIdentifier{cluster, name}
  1255. if _, ok := diskMap[key]; !ok {
  1256. diskMap[key] = &Disk{
  1257. Cluster: cluster,
  1258. Name: name,
  1259. Breakdown: &ClusterCostsBreakdown{},
  1260. }
  1261. }
  1262. diskMap[key].Bytes = bytes
  1263. }
  1264. customPricingEnabled := provider.CustomPricesEnabled(cp)
  1265. customPricingConfig, err := cp.GetConfig()
  1266. if err != nil {
  1267. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1268. }
  1269. for _, result := range resPVCost {
  1270. cluster, err := result.GetString(env.GetPromClusterLabel())
  1271. if err != nil {
  1272. cluster = env.GetClusterID()
  1273. }
  1274. name, err := result.GetString("persistentvolume")
  1275. if err != nil {
  1276. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1277. continue
  1278. }
  1279. // TODO niko/assets storage class
  1280. var cost float64
  1281. if customPricingEnabled && customPricingConfig != nil {
  1282. customPVCostStr := customPricingConfig.Storage
  1283. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1284. if err != nil {
  1285. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1286. }
  1287. cost = customPVCost
  1288. } else {
  1289. cost = result.Values[0].Value
  1290. }
  1291. key := DiskIdentifier{cluster, name}
  1292. if _, ok := diskMap[key]; !ok {
  1293. diskMap[key] = &Disk{
  1294. Cluster: cluster,
  1295. Name: name,
  1296. Breakdown: &ClusterCostsBreakdown{},
  1297. }
  1298. }
  1299. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1300. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1301. if providerID != "" {
  1302. diskMap[key].ProviderID = provider.ParsePVID(providerID)
  1303. }
  1304. }
  1305. for _, result := range resPVUsedAvg {
  1306. cluster, err := result.GetString(env.GetPromClusterLabel())
  1307. if err != nil {
  1308. cluster = env.GetClusterID()
  1309. }
  1310. claimName, err := result.GetString("persistentvolumeclaim")
  1311. if err != nil {
  1312. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1313. continue
  1314. }
  1315. claimNamespace, err := result.GetString("namespace")
  1316. if err != nil {
  1317. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1318. continue
  1319. }
  1320. var volumeName string
  1321. for _, thatRes := range resPVCInfo {
  1322. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1323. if err != nil {
  1324. thatCluster = env.GetClusterID()
  1325. }
  1326. thatVolumeName, err := thatRes.GetString("volumename")
  1327. if err != nil {
  1328. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1329. continue
  1330. }
  1331. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1332. if err != nil {
  1333. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1334. continue
  1335. }
  1336. thatClaimNamespace, err := thatRes.GetString("namespace")
  1337. if err != nil {
  1338. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1339. continue
  1340. }
  1341. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1342. volumeName = thatVolumeName
  1343. }
  1344. }
  1345. usage := result.Values[0].Value
  1346. key := DiskIdentifier{cluster, volumeName}
  1347. if _, ok := diskMap[key]; !ok {
  1348. diskMap[key] = &Disk{
  1349. Cluster: cluster,
  1350. Name: volumeName,
  1351. Breakdown: &ClusterCostsBreakdown{},
  1352. }
  1353. }
  1354. diskMap[key].BytesUsedAvgPtr = &usage
  1355. }
  1356. for _, result := range resPVUsedMax {
  1357. cluster, err := result.GetString(env.GetPromClusterLabel())
  1358. if err != nil {
  1359. cluster = env.GetClusterID()
  1360. }
  1361. claimName, err := result.GetString("persistentvolumeclaim")
  1362. if err != nil {
  1363. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1364. continue
  1365. }
  1366. claimNamespace, err := result.GetString("namespace")
  1367. if err != nil {
  1368. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1369. continue
  1370. }
  1371. var volumeName string
  1372. for _, thatRes := range resPVCInfo {
  1373. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1374. if err != nil {
  1375. thatCluster = env.GetClusterID()
  1376. }
  1377. thatVolumeName, err := thatRes.GetString("volumename")
  1378. if err != nil {
  1379. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1380. continue
  1381. }
  1382. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1383. if err != nil {
  1384. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1385. continue
  1386. }
  1387. thatClaimNamespace, err := thatRes.GetString("namespace")
  1388. if err != nil {
  1389. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1390. continue
  1391. }
  1392. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1393. volumeName = thatVolumeName
  1394. }
  1395. }
  1396. usage := result.Values[0].Value
  1397. key := DiskIdentifier{cluster, volumeName}
  1398. if _, ok := diskMap[key]; !ok {
  1399. diskMap[key] = &Disk{
  1400. Cluster: cluster,
  1401. Name: volumeName,
  1402. Breakdown: &ClusterCostsBreakdown{},
  1403. }
  1404. }
  1405. diskMap[key].BytesUsedMaxPtr = &usage
  1406. }
  1407. }
  1408. // filterOutLocalPVs removes local Persistent Volumes (PVs) from the given disk map.
  1409. // Local PVs are identified by the prefix "local-pv-" in their names, which is the
  1410. // convention used by sig-storage-local-static-provisioner.
  1411. //
  1412. // Parameters:
  1413. // - diskMap: A map of DiskIdentifier to Disk pointers, representing all PVs.
  1414. //
  1415. // Returns:
  1416. // - A new map of DiskIdentifier to Disk pointers, containing only non-local PVs.
  1417. func filterOutLocalPVs(diskMap map[DiskIdentifier]*Disk) map[DiskIdentifier]*Disk {
  1418. nonLocalPVDiskMap := map[DiskIdentifier]*Disk{}
  1419. for key, val := range diskMap {
  1420. if !strings.HasPrefix(key.Name, SIG_STORAGE_LOCAL_PROVISIONER_PREFIX) {
  1421. nonLocalPVDiskMap[key] = val
  1422. }
  1423. }
  1424. return nonLocalPVDiskMap
  1425. }