cluster.go 57 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577
  1. package costmodel
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "time"
  7. "github.com/opencost/opencost/pkg/cloud/provider"
  8. prometheus "github.com/prometheus/client_golang/api"
  9. "golang.org/x/exp/slices"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/opencost/opencost/core/pkg/opencost"
  12. "github.com/opencost/opencost/core/pkg/util/timeutil"
  13. "github.com/opencost/opencost/pkg/cloud/models"
  14. "github.com/opencost/opencost/pkg/env"
  15. "github.com/opencost/opencost/pkg/prom"
  16. )
  17. const (
  18. queryClusterCores = `sum(
  19. avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost{%s}[%s] %s)) by (node, %s) * 730 +
  20. avg(avg_over_time(node_gpu_hourly_cost{%s}[%s] %s)) by (node, %s) * 730
  21. ) by (%s)`
  22. queryClusterRAM = `sum(
  23. avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost{%s}[%s] %s)) by (node, %s) * 730
  24. ) by (%s)`
  25. queryStorage = `sum(
  26. avg(avg_over_time(pv_hourly_cost{%s}[%s] %s)) by (persistentvolume, %s) * 730
  27. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  28. ) by (%s) %s`
  29. queryTotal = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 +
  30. sum(
  31. avg(avg_over_time(pv_hourly_cost{%s}[1h])) by (persistentvolume, %s) * 730
  32. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  33. ) by (%s) %s`
  34. queryNodes = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 %s`
  35. )
  36. const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
  37. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  38. // are broken down by cores, memory, and storage.
  39. type ClusterCosts struct {
  40. Start *time.Time `json:"startTime"`
  41. End *time.Time `json:"endTime"`
  42. CPUCumulative float64 `json:"cpuCumulativeCost"`
  43. CPUMonthly float64 `json:"cpuMonthlyCost"`
  44. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  45. GPUCumulative float64 `json:"gpuCumulativeCost"`
  46. GPUMonthly float64 `json:"gpuMonthlyCost"`
  47. RAMCumulative float64 `json:"ramCumulativeCost"`
  48. RAMMonthly float64 `json:"ramMonthlyCost"`
  49. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  50. StorageCumulative float64 `json:"storageCumulativeCost"`
  51. StorageMonthly float64 `json:"storageMonthlyCost"`
  52. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  53. TotalCumulative float64 `json:"totalCumulativeCost"`
  54. TotalMonthly float64 `json:"totalMonthlyCost"`
  55. DataMinutes float64
  56. }
  57. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  58. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  59. type ClusterCostsBreakdown struct {
  60. Idle float64 `json:"idle"`
  61. Other float64 `json:"other"`
  62. System float64 `json:"system"`
  63. User float64 `json:"user"`
  64. }
  65. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  66. // the associated monthly rate data, and returns the Costs.
  67. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  68. start, end := timeutil.ParseTimeRange(window, offset)
  69. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  70. if dataHours == 0 {
  71. dataHours = end.Sub(start).Hours()
  72. }
  73. // Do not allow zero-length windows to prevent divide-by-zero issues
  74. if dataHours == 0 {
  75. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  76. }
  77. cc := &ClusterCosts{
  78. Start: &start,
  79. End: &end,
  80. CPUCumulative: cpu,
  81. GPUCumulative: gpu,
  82. RAMCumulative: ram,
  83. StorageCumulative: storage,
  84. TotalCumulative: cpu + gpu + ram + storage,
  85. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  86. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  87. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  88. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  89. }
  90. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  91. return cc, nil
  92. }
  93. type Disk struct {
  94. Cluster string
  95. Name string
  96. ProviderID string
  97. StorageClass string
  98. VolumeName string
  99. ClaimName string
  100. ClaimNamespace string
  101. Cost float64
  102. Bytes float64
  103. // These two fields may not be available at all times because they rely on
  104. // a new set of metrics that may or may not be available. Thus, they must
  105. // be nilable to represent the complete absence of the data.
  106. //
  107. // In other words, nilability here lets us distinguish between
  108. // "metric is not available" and "metric is available but is 0".
  109. //
  110. // They end in "Ptr" to distinguish from an earlier version in order to
  111. // ensure that all usages are checked for nil.
  112. BytesUsedAvgPtr *float64
  113. BytesUsedMaxPtr *float64
  114. Local bool
  115. Start time.Time
  116. End time.Time
  117. Minutes float64
  118. Breakdown *ClusterCostsBreakdown
  119. }
  120. type DiskIdentifier struct {
  121. Cluster string
  122. Name string
  123. }
  124. func ClusterDisks(client prometheus.Client, provider models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  125. // Start from the time "end", querying backwards
  126. t := end
  127. // minsPerResolution determines accuracy and resource use for the following
  128. // queries. Smaller values (higher resolution) result in better accuracy,
  129. // but more expensive queries, and vice-a-versa.
  130. resolution := env.GetETLResolution()
  131. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  132. var minsPerResolution int
  133. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  134. minsPerResolution = 1
  135. log.DedupedWarningf(3, "ClusterDisks(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  136. }
  137. durStr := timeutil.DurationString(end.Sub(start))
  138. if durStr == "" {
  139. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  140. }
  141. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  142. // value, converts it to a cumulative value; i.e.
  143. // [$/hr] * [min/res]*[hr/min] = [$/res]
  144. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  145. // TODO niko/assets how do we not hard-code this price?
  146. costPerGBHr := 0.04 / 730.0
  147. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  148. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (%s, persistentvolume,provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  149. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (%s, persistentvolume)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  150. queryActiveMins := fmt.Sprintf(`avg(kube_persistentvolume_capacity_bytes{%s}) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  151. queryPVStorageClass := fmt.Sprintf(`avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, persistentvolume, storageclass)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  152. queryPVUsedAvg := fmt.Sprintf(`avg(avg_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  153. queryPVUsedMax := fmt.Sprintf(`max(max_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  154. queryPVCInfo := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolumeclaim_info{%s}[%s])) by (%s, volumename, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  155. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/", %s}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  156. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/", %s}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  157. queryLocalStorageUsedAvg := fmt.Sprintf(`avg(sum(avg_over_time(container_fs_usage_bytes{device!="tmpfs", id="/", %s}[%s])) by (instance, %s, job)) by (instance, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  158. queryLocalStorageUsedMax := fmt.Sprintf(`max(sum(max_over_time(container_fs_usage_bytes{device!="tmpfs", id="/", %s}[%s])) by (instance, %s, job)) by (instance, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  159. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/", %s}) by (instance, %s)[%s:%dm])`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  160. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost{%s}) by (%s, node)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  161. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  162. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  163. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  164. resChPVStorageClass := ctx.QueryAtTime(queryPVStorageClass, t)
  165. resChPVUsedAvg := ctx.QueryAtTime(queryPVUsedAvg, t)
  166. resChPVUsedMax := ctx.QueryAtTime(queryPVUsedMax, t)
  167. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, t)
  168. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  169. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  170. resChLocalStoreageUsedAvg := ctx.QueryAtTime(queryLocalStorageUsedAvg, t)
  171. resChLocalStoreageUsedMax := ctx.QueryAtTime(queryLocalStorageUsedMax, t)
  172. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  173. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  174. resPVCost, _ := resChPVCost.Await()
  175. resPVSize, _ := resChPVSize.Await()
  176. resActiveMins, _ := resChActiveMins.Await()
  177. resPVStorageClass, _ := resChPVStorageClass.Await()
  178. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  179. resPVUsedMax, _ := resChPVUsedMax.Await()
  180. resPVCInfo, _ := resChPVCInfo.Await()
  181. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  182. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  183. resLocalStorageUsedAvg, _ := resChLocalStoreageUsedAvg.Await()
  184. resLocalStorageUsedMax, _ := resChLocalStoreageUsedMax.Await()
  185. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  186. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  187. if ctx.HasErrors() {
  188. return nil, ctx.ErrorCollection()
  189. }
  190. diskMap := map[DiskIdentifier]*Disk{}
  191. for _, result := range resPVCInfo {
  192. cluster, err := result.GetString(env.GetPromClusterLabel())
  193. if err != nil {
  194. cluster = env.GetClusterID()
  195. }
  196. volumeName, err := result.GetString("volumename")
  197. if err != nil {
  198. log.Debugf("ClusterDisks: pv claim data missing volumename")
  199. continue
  200. }
  201. claimName, err := result.GetString("persistentvolumeclaim")
  202. if err != nil {
  203. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  204. continue
  205. }
  206. claimNamespace, err := result.GetString("namespace")
  207. if err != nil {
  208. log.Debugf("ClusterDisks: pv claim data missing namespace")
  209. continue
  210. }
  211. key := DiskIdentifier{cluster, volumeName}
  212. if _, ok := diskMap[key]; !ok {
  213. diskMap[key] = &Disk{
  214. Cluster: cluster,
  215. Name: volumeName,
  216. Breakdown: &ClusterCostsBreakdown{},
  217. }
  218. }
  219. diskMap[key].VolumeName = volumeName
  220. diskMap[key].ClaimName = claimName
  221. diskMap[key].ClaimNamespace = claimNamespace
  222. }
  223. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, provider, opencost.NewClosedWindow(start, end))
  224. for _, result := range resLocalStorageCost {
  225. cluster, err := result.GetString(env.GetPromClusterLabel())
  226. if err != nil {
  227. cluster = env.GetClusterID()
  228. }
  229. name, err := result.GetString("instance")
  230. if err != nil {
  231. log.Warnf("ClusterDisks: local storage data missing instance")
  232. continue
  233. }
  234. cost := result.Values[0].Value
  235. key := DiskIdentifier{cluster, name}
  236. if _, ok := diskMap[key]; !ok {
  237. diskMap[key] = &Disk{
  238. Cluster: cluster,
  239. Name: name,
  240. Breakdown: &ClusterCostsBreakdown{},
  241. Local: true,
  242. }
  243. }
  244. diskMap[key].Cost += cost
  245. //Assigning explicitly the storage class of local storage to local
  246. diskMap[key].StorageClass = opencost.LocalStorageClass
  247. }
  248. for _, result := range resLocalStorageUsedCost {
  249. cluster, err := result.GetString(env.GetPromClusterLabel())
  250. if err != nil {
  251. cluster = env.GetClusterID()
  252. }
  253. name, err := result.GetString("instance")
  254. if err != nil {
  255. log.Warnf("ClusterDisks: local storage usage data missing instance")
  256. continue
  257. }
  258. cost := result.Values[0].Value
  259. key := DiskIdentifier{cluster, name}
  260. if _, ok := diskMap[key]; !ok {
  261. diskMap[key] = &Disk{
  262. Cluster: cluster,
  263. Name: name,
  264. Breakdown: &ClusterCostsBreakdown{},
  265. Local: true,
  266. }
  267. }
  268. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  269. }
  270. for _, result := range resLocalStorageUsedAvg {
  271. cluster, err := result.GetString(env.GetPromClusterLabel())
  272. if err != nil {
  273. cluster = env.GetClusterID()
  274. }
  275. name, err := result.GetString("instance")
  276. if err != nil {
  277. log.Warnf("ClusterDisks: local storage data missing instance")
  278. continue
  279. }
  280. bytesAvg := result.Values[0].Value
  281. key := DiskIdentifier{cluster, name}
  282. if _, ok := diskMap[key]; !ok {
  283. diskMap[key] = &Disk{
  284. Cluster: cluster,
  285. Name: name,
  286. Breakdown: &ClusterCostsBreakdown{},
  287. Local: true,
  288. }
  289. }
  290. diskMap[key].BytesUsedAvgPtr = &bytesAvg
  291. }
  292. for _, result := range resLocalStorageUsedMax {
  293. cluster, err := result.GetString(env.GetPromClusterLabel())
  294. if err != nil {
  295. cluster = env.GetClusterID()
  296. }
  297. name, err := result.GetString("instance")
  298. if err != nil {
  299. log.Warnf("ClusterDisks: local storage data missing instance")
  300. continue
  301. }
  302. bytesMax := result.Values[0].Value
  303. key := DiskIdentifier{cluster, name}
  304. if _, ok := diskMap[key]; !ok {
  305. diskMap[key] = &Disk{
  306. Cluster: cluster,
  307. Name: name,
  308. Breakdown: &ClusterCostsBreakdown{},
  309. Local: true,
  310. }
  311. }
  312. diskMap[key].BytesUsedMaxPtr = &bytesMax
  313. }
  314. for _, result := range resLocalStorageBytes {
  315. cluster, err := result.GetString(env.GetPromClusterLabel())
  316. if err != nil {
  317. cluster = env.GetClusterID()
  318. }
  319. name, err := result.GetString("instance")
  320. if err != nil {
  321. log.Warnf("ClusterDisks: local storage data missing instance")
  322. continue
  323. }
  324. bytes := result.Values[0].Value
  325. key := DiskIdentifier{cluster, name}
  326. if _, ok := diskMap[key]; !ok {
  327. diskMap[key] = &Disk{
  328. Cluster: cluster,
  329. Name: name,
  330. Breakdown: &ClusterCostsBreakdown{},
  331. Local: true,
  332. }
  333. }
  334. diskMap[key].Bytes = bytes
  335. if bytes/1024/1024/1024 > maxLocalDiskSize {
  336. log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
  337. delete(diskMap, key)
  338. }
  339. }
  340. for _, result := range resLocalActiveMins {
  341. cluster, err := result.GetString(env.GetPromClusterLabel())
  342. if err != nil {
  343. cluster = env.GetClusterID()
  344. }
  345. name, err := result.GetString("node")
  346. if err != nil {
  347. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  348. continue
  349. }
  350. key := DiskIdentifier{cluster, name}
  351. if _, ok := diskMap[key]; !ok {
  352. log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
  353. continue
  354. }
  355. if len(result.Values) == 0 {
  356. continue
  357. }
  358. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  359. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  360. mins := e.Sub(s).Minutes()
  361. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  362. diskMap[key].End = e
  363. diskMap[key].Start = s
  364. diskMap[key].Minutes = mins
  365. }
  366. var unTracedDiskLogData []DiskIdentifier
  367. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  368. for _, result := range resPVStorageClass {
  369. cluster, err := result.GetString(env.GetPromClusterLabel())
  370. if err != nil {
  371. cluster = env.GetClusterID()
  372. }
  373. name, _ := result.GetString("persistentvolume")
  374. key := DiskIdentifier{cluster, name}
  375. if _, ok := diskMap[key]; !ok {
  376. if !slices.Contains(unTracedDiskLogData, key) {
  377. unTracedDiskLogData = append(unTracedDiskLogData, key)
  378. }
  379. continue
  380. }
  381. if len(result.Values) == 0 {
  382. continue
  383. }
  384. storageClass, err := result.GetString("storageclass")
  385. if err != nil {
  386. diskMap[key].StorageClass = opencost.UnknownStorageClass
  387. } else {
  388. diskMap[key].StorageClass = storageClass
  389. }
  390. }
  391. // Logging the unidentified disk information outside the loop
  392. for _, unIdentifiedDisk := range unTracedDiskLogData {
  393. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  394. }
  395. for _, disk := range diskMap {
  396. // Apply all remaining RAM to Idle
  397. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  398. // Set provider Id to the name for reconciliation
  399. if disk.ProviderID == "" {
  400. disk.ProviderID = disk.Name
  401. }
  402. }
  403. return diskMap, nil
  404. }
  405. type NodeOverhead struct {
  406. CpuOverheadFraction float64
  407. RamOverheadFraction float64
  408. }
  409. type Node struct {
  410. Cluster string
  411. Name string
  412. ProviderID string
  413. NodeType string
  414. CPUCost float64
  415. CPUCores float64
  416. GPUCost float64
  417. GPUCount float64
  418. RAMCost float64
  419. RAMBytes float64
  420. Discount float64
  421. Preemptible bool
  422. CPUBreakdown *ClusterCostsBreakdown
  423. RAMBreakdown *ClusterCostsBreakdown
  424. Start time.Time
  425. End time.Time
  426. Minutes float64
  427. Labels map[string]string
  428. CostPerCPUHr float64
  429. CostPerRAMGiBHr float64
  430. CostPerGPUHr float64
  431. Overhead *NodeOverhead
  432. }
  433. // GKE lies about the number of cores e2 nodes have. This table
  434. // contains a mapping from node type -> actual CPU cores
  435. // for those cases.
  436. var partialCPUMap = map[string]float64{
  437. "e2-micro": 0.25,
  438. "e2-small": 0.5,
  439. "e2-medium": 1.0,
  440. }
  441. type NodeIdentifier struct {
  442. Cluster string
  443. Name string
  444. ProviderID string
  445. }
  446. type nodeIdentifierNoProviderID struct {
  447. Cluster string
  448. Name string
  449. }
  450. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  451. for k, v := range activeDataMap {
  452. keyNon := nodeIdentifierNoProviderID{
  453. Cluster: k.Cluster,
  454. Name: k.Name,
  455. }
  456. if cost, ok := costMap[k]; ok {
  457. minutes := v.minutes
  458. count := 1.0
  459. if c, ok := resourceCountMap[keyNon]; ok {
  460. count = c
  461. }
  462. costMap[k] = cost * (minutes / 60) * count
  463. }
  464. }
  465. }
  466. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  467. for k, v := range activeDataMap {
  468. if cost, ok := costMap[k]; ok {
  469. minutes := v.minutes
  470. costMap[k] = cost * (minutes / 60)
  471. }
  472. }
  473. }
  474. func ClusterNodes(cp models.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  475. // Start from the time "end", querying backwards
  476. t := end
  477. // minsPerResolution determines accuracy and resource use for the following
  478. // queries. Smaller values (higher resolution) result in better accuracy,
  479. // but more expensive queries, and vice-a-versa.
  480. resolution := env.GetETLResolution()
  481. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  482. var minsPerResolution int
  483. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  484. minsPerResolution = 1
  485. log.DedupedWarningf(3, "ClusterNodes(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  486. }
  487. durStr := timeutil.DurationString(end.Sub(start))
  488. if durStr == "" {
  489. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  490. }
  491. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  492. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  493. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  494. queryNodeCPUCoresCapacity := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  495. queryNodeCPUCoresAllocatable := fmt.Sprintf(`avg(avg_over_time(kube_node_status_allocatable_cpu_cores{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  496. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  497. queryNodeRAMBytesCapacity := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  498. queryNodeRAMBytesAllocatable := fmt.Sprintf(`avg(avg_over_time(kube_node_status_allocatable_memory_bytes{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  499. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count{%s}[%s])) by (%s, node, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  500. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  501. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total{%s}[%s:%dm])) by (kubernetes_node, %s, mode)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel())
  502. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system", %s}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  503. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system", %s}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  504. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost{%s}) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  505. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot{%s}[%s:%dm])`, env.GetPromClusterFilter(), durStr, minsPerResolution)
  506. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels{%s}[%s:%dm])`, env.GetPromClusterFilter(), durStr, minsPerResolution)
  507. // Return errors if these fail
  508. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  509. resChNodeCPUCoresCapacity := requiredCtx.QueryAtTime(queryNodeCPUCoresCapacity, t)
  510. resChNodeCPUCoresAllocatable := requiredCtx.QueryAtTime(queryNodeCPUCoresAllocatable, t)
  511. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  512. resChNodeRAMBytesCapacity := requiredCtx.QueryAtTime(queryNodeRAMBytesCapacity, t)
  513. resChNodeRAMBytesAllocatable := requiredCtx.QueryAtTime(queryNodeRAMBytesAllocatable, t)
  514. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  515. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  516. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  517. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  518. // Do not return errors if these fail, but log warnings
  519. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  520. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  521. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  522. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  523. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  524. resNodeCPUCoresCapacity, _ := resChNodeCPUCoresCapacity.Await()
  525. resNodeCPUCoresAllocatable, _ := resChNodeCPUCoresAllocatable.Await()
  526. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  527. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  528. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  529. resNodeRAMBytesCapacity, _ := resChNodeRAMBytesCapacity.Await()
  530. resNodeRAMBytesAllocatable, _ := resChNodeRAMBytesAllocatable.Await()
  531. resIsSpot, _ := resChIsSpot.Await()
  532. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  533. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  534. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  535. resActiveMins, _ := resChActiveMins.Await()
  536. resLabels, _ := resChLabels.Await()
  537. if optionalCtx.HasErrors() {
  538. for _, err := range optionalCtx.Errors() {
  539. log.Warnf("ClusterNodes: %s", err)
  540. }
  541. }
  542. if requiredCtx.HasErrors() {
  543. for _, err := range requiredCtx.Errors() {
  544. log.Errorf("ClusterNodes: %s", err)
  545. }
  546. return nil, requiredCtx.ErrorCollection()
  547. }
  548. activeDataMap := buildActiveDataMap(resActiveMins, resolution, opencost.NewClosedWindow(start, end))
  549. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  550. preemptibleMap := buildPreemptibleMap(resIsSpot)
  551. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  552. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  553. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  554. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  555. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  556. cpuCoresCapacityMap := buildCPUCoresMap(resNodeCPUCoresCapacity)
  557. ramBytesCapacityMap := buildRAMBytesMap(resNodeRAMBytesCapacity)
  558. cpuCoresAllocatableMap := buildCPUCoresMap(resNodeCPUCoresAllocatable)
  559. ramBytesAllocatableMap := buildRAMBytesMap(resNodeRAMBytesAllocatable)
  560. overheadMap := buildOverheadMap(ramBytesCapacityMap, ramBytesAllocatableMap, cpuCoresCapacityMap, cpuCoresAllocatableMap)
  561. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  562. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  563. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  564. labelsMap := buildLabelsMap(resLabels)
  565. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresCapacityMap)
  566. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesCapacityMap)
  567. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  568. nodeMap := buildNodeMap(
  569. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  570. cpuCoresCapacityMap, ramBytesCapacityMap, ramUserPctMap,
  571. ramSystemPctMap,
  572. cpuBreakdownMap,
  573. activeDataMap,
  574. preemptibleMap,
  575. labelsMap,
  576. clusterAndNameToType,
  577. resolution,
  578. overheadMap,
  579. )
  580. c, err := cp.GetConfig()
  581. if err != nil {
  582. return nil, err
  583. }
  584. discount, err := ParsePercentString(c.Discount)
  585. if err != nil {
  586. return nil, err
  587. }
  588. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  589. if err != nil {
  590. return nil, err
  591. }
  592. for _, node := range nodeMap {
  593. // TODO take GKE Reserved Instances into account
  594. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  595. // Apply all remaining resources to Idle
  596. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  597. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  598. }
  599. return nodeMap, nil
  600. }
  601. type LoadBalancerIdentifier struct {
  602. Cluster string
  603. Namespace string
  604. Name string
  605. }
  606. type LoadBalancer struct {
  607. Cluster string
  608. Namespace string
  609. Name string
  610. ProviderID string
  611. Cost float64
  612. Start time.Time
  613. End time.Time
  614. Minutes float64
  615. Private bool
  616. Ip string
  617. }
  618. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  619. // Start from the time "end", querying backwards
  620. t := end
  621. // minsPerResolution determines accuracy and resource use for the following
  622. // queries. Smaller values (higher resolution) result in better accuracy,
  623. // but more expensive queries, and vice-a-versa.
  624. resolution := env.GetETLResolution()
  625. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  626. var minsPerResolution int
  627. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  628. minsPerResolution = 1
  629. log.DedupedWarningf(3, "ClusterLoadBalancers(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  630. }
  631. // Query for the duration between start and end
  632. durStr := timeutil.DurationString(end.Sub(start))
  633. if durStr == "" {
  634. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  635. }
  636. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  637. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, %s, ingress_ip)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  638. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost{%s}) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  639. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  640. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  641. resLBCost, _ := resChLBCost.Await()
  642. resActiveMins, _ := resChActiveMins.Await()
  643. if ctx.HasErrors() {
  644. return nil, ctx.ErrorCollection()
  645. }
  646. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  647. for _, result := range resActiveMins {
  648. cluster, err := result.GetString(env.GetPromClusterLabel())
  649. if err != nil {
  650. cluster = env.GetClusterID()
  651. }
  652. namespace, err := result.GetString("namespace")
  653. if err != nil {
  654. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  655. continue
  656. }
  657. name, err := result.GetString("service_name")
  658. if err != nil {
  659. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  660. continue
  661. }
  662. providerID, err := result.GetString("ingress_ip")
  663. if err != nil {
  664. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  665. providerID = ""
  666. }
  667. key := LoadBalancerIdentifier{
  668. Cluster: cluster,
  669. Namespace: namespace,
  670. Name: name,
  671. }
  672. // Skip if there are no data
  673. if len(result.Values) == 0 {
  674. continue
  675. }
  676. // Add load balancer to the set of load balancers
  677. if _, ok := loadBalancerMap[key]; !ok {
  678. loadBalancerMap[key] = &LoadBalancer{
  679. Cluster: cluster,
  680. Namespace: namespace,
  681. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  682. ProviderID: provider.ParseLBID(providerID),
  683. }
  684. }
  685. // Append start, end, and minutes. This should come before all other data.
  686. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  687. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  688. loadBalancerMap[key].Start = s
  689. loadBalancerMap[key].End = e
  690. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  691. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  692. // Prevents there from being a duplicate LoadBalancers on the same day
  693. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  694. loadBalancerMap[key].ProviderID = providerID
  695. }
  696. }
  697. for _, result := range resLBCost {
  698. cluster, err := result.GetString(env.GetPromClusterLabel())
  699. if err != nil {
  700. cluster = env.GetClusterID()
  701. }
  702. namespace, err := result.GetString("namespace")
  703. if err != nil {
  704. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  705. continue
  706. }
  707. name, err := result.GetString("service_name")
  708. if err != nil {
  709. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  710. continue
  711. }
  712. providerID, err := result.GetString("ingress_ip")
  713. if err != nil {
  714. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  715. // only update asset cost when an actual IP was returned
  716. continue
  717. }
  718. key := LoadBalancerIdentifier{
  719. Cluster: cluster,
  720. Namespace: namespace,
  721. Name: name,
  722. }
  723. // Apply cost as price-per-hour * hours
  724. if lb, ok := loadBalancerMap[key]; ok {
  725. lbPricePerHr := result.Values[0].Value
  726. // interpolate any missing data
  727. resultMins := lb.Minutes
  728. if resultMins > 0 {
  729. scaleFactor := (resultMins + resolution.Minutes()) / resultMins
  730. hrs := (lb.Minutes * scaleFactor) / 60.0
  731. lb.Cost += lbPricePerHr * hrs
  732. } else {
  733. log.DedupedWarningf(20, "ClusterLoadBalancers: found zero minutes for key: %v", key)
  734. }
  735. if lb.Ip != "" && lb.Ip != providerID {
  736. log.DedupedWarningf(5, "ClusterLoadBalancers: multiple IPs per load balancer not supported, using most recent IP")
  737. }
  738. lb.Ip = providerID
  739. lb.Private = privateIPCheck(providerID)
  740. } else {
  741. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %v", key)
  742. }
  743. }
  744. return loadBalancerMap, nil
  745. }
  746. // Check if an ip is private.
  747. func privateIPCheck(ip string) bool {
  748. ipAddress := net.ParseIP(ip)
  749. return ipAddress.IsPrivate()
  750. }
  751. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  752. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider models.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  753. if window < 10*time.Minute {
  754. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  755. }
  756. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  757. start, end := timeutil.ParseTimeRange(window, offset)
  758. mins := end.Sub(start).Minutes()
  759. // minsPerResolution determines accuracy and resource use for the following
  760. // queries. Smaller values (higher resolution) result in better accuracy,
  761. // but more expensive queries, and vice-a-versa.
  762. resolution := env.GetETLResolution()
  763. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  764. var minsPerResolution int
  765. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) < 1 {
  766. minsPerResolution = 1
  767. log.DedupedWarningf(3, "ComputeClusterCosts(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  768. }
  769. windowStr := timeutil.DurationString(window)
  770. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  771. // value, converts it to a cumulative value; i.e.
  772. // [$/hr] * [min/res]*[hr/min] = [$/res]
  773. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  774. const fmtQueryDataCount = `
  775. count_over_time(sum(kube_node_status_capacity_cpu_cores{%s}) by (%s)[%s:%dm]%s) * %d
  776. `
  777. const fmtQueryTotalGPU = `
  778. sum(
  779. sum_over_time(node_gpu_hourly_cost{%s}[%s:%dm]%s) * %f
  780. ) by (%s)
  781. `
  782. const fmtQueryTotalCPU = `
  783. sum(
  784. sum_over_time(avg(kube_node_status_capacity_cpu_cores{%s}) by (node, %s)[%s:%dm]%s) *
  785. avg(avg_over_time(node_cpu_hourly_cost{%s}[%s:%dm]%s)) by (node, %s) * %f
  786. ) by (%s)
  787. `
  788. const fmtQueryTotalRAM = `
  789. sum(
  790. sum_over_time(avg(kube_node_status_capacity_memory_bytes{%s}) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  791. avg(avg_over_time(node_ram_hourly_cost{%s}[%s:%dm]%s)) by (node, %s) * %f
  792. ) by (%s)
  793. `
  794. const fmtQueryTotalStorage = `
  795. sum(
  796. sum_over_time(avg(kube_persistentvolume_capacity_bytes{%s}) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  797. avg(avg_over_time(pv_hourly_cost{%s}[%s:%dm]%s)) by (persistentvolume, %s) * %f
  798. ) by (%s)
  799. `
  800. const fmtQueryCPUModePct = `
  801. sum(rate(node_cpu_seconds_total{%s}[%s]%s)) by (%s, mode) / ignoring(mode)
  802. group_left sum(rate(node_cpu_seconds_total{%s}[%s]%s)) by (%s)
  803. `
  804. const fmtQueryRAMSystemPct = `
  805. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system", %s}[%s:%dm]%s)) by (%s)
  806. / sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm]%s)) by (%s)
  807. `
  808. const fmtQueryRAMUserPct = `
  809. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes{%s}[%s:%dm]%s)) by (%s)
  810. / sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm]%s)) by (%s)
  811. `
  812. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  813. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  814. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  815. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  816. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  817. if queryTotalLocalStorage != "" {
  818. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  819. }
  820. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  821. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  822. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  823. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  824. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  825. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  826. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  827. resChs := ctx.QueryAll(
  828. queryDataCount,
  829. queryTotalGPU,
  830. queryTotalCPU,
  831. queryTotalRAM,
  832. queryTotalStorage,
  833. )
  834. // Only submit the local storage query if it is valid. Otherwise Prometheus
  835. // will return errors. Always append something to resChs, regardless, to
  836. // maintain indexing.
  837. if queryTotalLocalStorage != "" {
  838. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  839. } else {
  840. resChs = append(resChs, nil)
  841. }
  842. if withBreakdown {
  843. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, env.GetPromClusterFilter(), windowStr, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, fmtOffset, env.GetPromClusterLabel())
  844. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  845. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  846. bdResChs := ctx.QueryAll(
  847. queryCPUModePct,
  848. queryRAMSystemPct,
  849. queryRAMUserPct,
  850. )
  851. // Only submit the local storage query if it is valid. Otherwise Prometheus
  852. // will return errors. Always append something to resChs, regardless, to
  853. // maintain indexing.
  854. if queryUsedLocalStorage != "" {
  855. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  856. } else {
  857. bdResChs = append(bdResChs, nil)
  858. }
  859. resChs = append(resChs, bdResChs...)
  860. }
  861. resDataCount, _ := resChs[0].Await()
  862. resTotalGPU, _ := resChs[1].Await()
  863. resTotalCPU, _ := resChs[2].Await()
  864. resTotalRAM, _ := resChs[3].Await()
  865. resTotalStorage, _ := resChs[4].Await()
  866. if ctx.HasErrors() {
  867. return nil, ctx.ErrorCollection()
  868. }
  869. defaultClusterID := env.GetClusterID()
  870. dataMinsByCluster := map[string]float64{}
  871. for _, result := range resDataCount {
  872. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  873. if clusterID == "" {
  874. clusterID = defaultClusterID
  875. }
  876. dataMins := mins
  877. if len(result.Values) > 0 {
  878. dataMins = result.Values[0].Value
  879. } else {
  880. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  881. }
  882. dataMinsByCluster[clusterID] = dataMins
  883. }
  884. // Determine combined discount
  885. discount, customDiscount := 0.0, 0.0
  886. c, err := a.CloudProvider.GetConfig()
  887. if err == nil {
  888. discount, err = ParsePercentString(c.Discount)
  889. if err != nil {
  890. discount = 0.0
  891. }
  892. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  893. if err != nil {
  894. customDiscount = 0.0
  895. }
  896. }
  897. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  898. costData := make(map[string]map[string]float64)
  899. // Helper function to iterate over Prom query results, parsing the raw values into
  900. // the intermediate costData structure.
  901. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  902. for _, result := range results {
  903. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  904. if clusterID == "" {
  905. clusterID = defaultClusterID
  906. }
  907. if _, ok := costData[clusterID]; !ok {
  908. costData[clusterID] = map[string]float64{}
  909. }
  910. if len(result.Values) > 0 {
  911. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  912. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  913. }
  914. }
  915. }
  916. // Apply both sustained use and custom discounts to RAM and CPU
  917. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  918. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  919. // Apply only custom discount to GPU and storage
  920. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  921. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  922. if queryTotalLocalStorage != "" {
  923. resTotalLocalStorage, err := resChs[5].Await()
  924. if err != nil {
  925. return nil, err
  926. }
  927. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  928. }
  929. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  930. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  931. pvUsedCostMap := map[string]float64{}
  932. if withBreakdown {
  933. resCPUModePct, _ := resChs[6].Await()
  934. resRAMSystemPct, _ := resChs[7].Await()
  935. resRAMUserPct, _ := resChs[8].Await()
  936. if ctx.HasErrors() {
  937. return nil, ctx.ErrorCollection()
  938. }
  939. for _, result := range resCPUModePct {
  940. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  941. if clusterID == "" {
  942. clusterID = defaultClusterID
  943. }
  944. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  945. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  946. }
  947. cpuBD := cpuBreakdownMap[clusterID]
  948. mode, err := result.GetString("mode")
  949. if err != nil {
  950. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  951. mode = "other"
  952. }
  953. switch mode {
  954. case "idle":
  955. cpuBD.Idle += result.Values[0].Value
  956. case "system":
  957. cpuBD.System += result.Values[0].Value
  958. case "user":
  959. cpuBD.User += result.Values[0].Value
  960. default:
  961. cpuBD.Other += result.Values[0].Value
  962. }
  963. }
  964. for _, result := range resRAMSystemPct {
  965. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  966. if clusterID == "" {
  967. clusterID = defaultClusterID
  968. }
  969. if _, ok := ramBreakdownMap[clusterID]; !ok {
  970. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  971. }
  972. ramBD := ramBreakdownMap[clusterID]
  973. ramBD.System += result.Values[0].Value
  974. }
  975. for _, result := range resRAMUserPct {
  976. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  977. if clusterID == "" {
  978. clusterID = defaultClusterID
  979. }
  980. if _, ok := ramBreakdownMap[clusterID]; !ok {
  981. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  982. }
  983. ramBD := ramBreakdownMap[clusterID]
  984. ramBD.User += result.Values[0].Value
  985. }
  986. for _, ramBD := range ramBreakdownMap {
  987. remaining := 1.0
  988. remaining -= ramBD.Other
  989. remaining -= ramBD.System
  990. remaining -= ramBD.User
  991. ramBD.Idle = remaining
  992. }
  993. if queryUsedLocalStorage != "" {
  994. resUsedLocalStorage, err := resChs[9].Await()
  995. if err != nil {
  996. return nil, err
  997. }
  998. for _, result := range resUsedLocalStorage {
  999. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1000. if clusterID == "" {
  1001. clusterID = defaultClusterID
  1002. }
  1003. pvUsedCostMap[clusterID] += result.Values[0].Value
  1004. }
  1005. }
  1006. }
  1007. if ctx.HasErrors() {
  1008. for _, err := range ctx.Errors() {
  1009. log.Errorf("ComputeClusterCosts: %s", err)
  1010. }
  1011. return nil, ctx.ErrorCollection()
  1012. }
  1013. // Convert intermediate structure to Costs instances
  1014. costsByCluster := map[string]*ClusterCosts{}
  1015. for id, cd := range costData {
  1016. dataMins, ok := dataMinsByCluster[id]
  1017. if !ok {
  1018. dataMins = mins
  1019. log.Warnf("Cluster cost data count not found for cluster %s", id)
  1020. }
  1021. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  1022. if err != nil {
  1023. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  1024. return nil, err
  1025. }
  1026. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  1027. costs.CPUBreakdown = cpuBD
  1028. }
  1029. if ramBD, ok := ramBreakdownMap[id]; ok {
  1030. costs.RAMBreakdown = ramBD
  1031. }
  1032. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  1033. if pvUC, ok := pvUsedCostMap[id]; ok {
  1034. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  1035. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  1036. }
  1037. costs.DataMinutes = dataMins
  1038. costsByCluster[id] = costs
  1039. }
  1040. return costsByCluster, nil
  1041. }
  1042. type Totals struct {
  1043. TotalCost [][]string `json:"totalcost"`
  1044. CPUCost [][]string `json:"cpucost"`
  1045. MemCost [][]string `json:"memcost"`
  1046. StorageCost [][]string `json:"storageCost"`
  1047. }
  1048. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  1049. if len(qrs) == 0 {
  1050. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  1051. }
  1052. result := qrs[0]
  1053. totals := [][]string{}
  1054. for _, value := range result.Values {
  1055. d0 := fmt.Sprintf("%f", value.Timestamp)
  1056. d1 := fmt.Sprintf("%f", value.Value)
  1057. toAppend := []string{
  1058. d0,
  1059. d1,
  1060. }
  1061. totals = append(totals, toAppend)
  1062. }
  1063. return totals, nil
  1064. }
  1065. // ClusterCostsOverTime gives the full cluster costs over time
  1066. func ClusterCostsOverTime(cli prometheus.Client, provider models.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  1067. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  1068. if localStorageQuery != "" {
  1069. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  1070. }
  1071. layout := "2006-01-02T15:04:05.000Z"
  1072. start, err := time.Parse(layout, startString)
  1073. if err != nil {
  1074. log.Errorf("Error parsing time %s. Error: %s", startString, err.Error())
  1075. return nil, err
  1076. }
  1077. end, err := time.Parse(layout, endString)
  1078. if err != nil {
  1079. log.Errorf("Error parsing time %s. Error: %s", endString, err.Error())
  1080. return nil, err
  1081. }
  1082. fmtWindow := timeutil.DurationString(window)
  1083. if fmtWindow == "" {
  1084. err := fmt.Errorf("window value invalid or missing")
  1085. log.Errorf("Error parsing time %v. Error: %s", window, err.Error())
  1086. return nil, err
  1087. }
  1088. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  1089. qCores := fmt.Sprintf(queryClusterCores, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1090. qRAM := fmt.Sprintf(queryClusterRAM, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1091. qStorage := fmt.Sprintf(queryStorage, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1092. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1093. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  1094. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  1095. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  1096. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  1097. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  1098. resultClusterCores, err := resChClusterCores.Await()
  1099. if err != nil {
  1100. return nil, err
  1101. }
  1102. resultClusterRAM, err := resChClusterRAM.Await()
  1103. if err != nil {
  1104. return nil, err
  1105. }
  1106. resultStorage, err := resChStorage.Await()
  1107. if err != nil {
  1108. return nil, err
  1109. }
  1110. resultTotal, err := resChTotal.Await()
  1111. if err != nil {
  1112. return nil, err
  1113. }
  1114. coreTotal, err := resultToTotals(resultClusterCores)
  1115. if err != nil {
  1116. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  1117. return nil, err
  1118. }
  1119. ramTotal, err := resultToTotals(resultClusterRAM)
  1120. if err != nil {
  1121. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  1122. return nil, err
  1123. }
  1124. storageTotal, err := resultToTotals(resultStorage)
  1125. if err != nil {
  1126. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  1127. }
  1128. clusterTotal, err := resultToTotals(resultTotal)
  1129. if err != nil {
  1130. // If clusterTotal query failed, it's likely because there are no PVs, which
  1131. // causes the qTotal query to return no data. Instead, query only node costs.
  1132. // If that fails, return an error because something is actually wrong.
  1133. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterFilter(), env.GetPromClusterLabel(), localStorageQuery)
  1134. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  1135. for _, warning := range warnings {
  1136. log.Warnf(warning)
  1137. }
  1138. if err != nil {
  1139. return nil, err
  1140. }
  1141. clusterTotal, err = resultToTotals(resultNodes)
  1142. if err != nil {
  1143. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  1144. return nil, err
  1145. }
  1146. }
  1147. return &Totals{
  1148. TotalCost: clusterTotal,
  1149. CPUCost: coreTotal,
  1150. MemCost: ramTotal,
  1151. StorageCost: storageTotal,
  1152. }, nil
  1153. }
  1154. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo []*prom.QueryResult, cp models.Provider, window opencost.Window) {
  1155. for _, result := range resActiveMins {
  1156. cluster, err := result.GetString(env.GetPromClusterLabel())
  1157. if err != nil {
  1158. cluster = env.GetClusterID()
  1159. }
  1160. name, err := result.GetString("persistentvolume")
  1161. if err != nil {
  1162. log.Warnf("ClusterDisks: active mins missing pv name")
  1163. continue
  1164. }
  1165. if len(result.Values) == 0 {
  1166. continue
  1167. }
  1168. key := DiskIdentifier{cluster, name}
  1169. if _, ok := diskMap[key]; !ok {
  1170. diskMap[key] = &Disk{
  1171. Cluster: cluster,
  1172. Name: name,
  1173. Breakdown: &ClusterCostsBreakdown{},
  1174. }
  1175. }
  1176. s, e := calculateStartAndEnd(result, resolution, window)
  1177. mins := e.Sub(s).Minutes()
  1178. diskMap[key].End = e
  1179. diskMap[key].Start = s
  1180. diskMap[key].Minutes = mins
  1181. }
  1182. for _, result := range resPVSize {
  1183. cluster, err := result.GetString(env.GetPromClusterLabel())
  1184. if err != nil {
  1185. cluster = env.GetClusterID()
  1186. }
  1187. name, err := result.GetString("persistentvolume")
  1188. if err != nil {
  1189. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  1190. continue
  1191. }
  1192. // TODO niko/assets storage class
  1193. bytes := result.Values[0].Value
  1194. key := DiskIdentifier{cluster, name}
  1195. if _, ok := diskMap[key]; !ok {
  1196. diskMap[key] = &Disk{
  1197. Cluster: cluster,
  1198. Name: name,
  1199. Breakdown: &ClusterCostsBreakdown{},
  1200. }
  1201. }
  1202. diskMap[key].Bytes = bytes
  1203. }
  1204. customPricingEnabled := provider.CustomPricesEnabled(cp)
  1205. customPricingConfig, err := cp.GetConfig()
  1206. if err != nil {
  1207. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1208. }
  1209. for _, result := range resPVCost {
  1210. cluster, err := result.GetString(env.GetPromClusterLabel())
  1211. if err != nil {
  1212. cluster = env.GetClusterID()
  1213. }
  1214. name, err := result.GetString("persistentvolume")
  1215. if err != nil {
  1216. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1217. continue
  1218. }
  1219. // TODO niko/assets storage class
  1220. var cost float64
  1221. if customPricingEnabled && customPricingConfig != nil {
  1222. customPVCostStr := customPricingConfig.Storage
  1223. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1224. if err != nil {
  1225. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1226. }
  1227. cost = customPVCost
  1228. } else {
  1229. cost = result.Values[0].Value
  1230. }
  1231. key := DiskIdentifier{cluster, name}
  1232. if _, ok := diskMap[key]; !ok {
  1233. diskMap[key] = &Disk{
  1234. Cluster: cluster,
  1235. Name: name,
  1236. Breakdown: &ClusterCostsBreakdown{},
  1237. }
  1238. }
  1239. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1240. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1241. if providerID != "" {
  1242. diskMap[key].ProviderID = provider.ParsePVID(providerID)
  1243. }
  1244. }
  1245. for _, result := range resPVUsedAvg {
  1246. cluster, err := result.GetString(env.GetPromClusterLabel())
  1247. if err != nil {
  1248. cluster = env.GetClusterID()
  1249. }
  1250. claimName, err := result.GetString("persistentvolumeclaim")
  1251. if err != nil {
  1252. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1253. continue
  1254. }
  1255. claimNamespace, err := result.GetString("namespace")
  1256. if err != nil {
  1257. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1258. continue
  1259. }
  1260. var volumeName string
  1261. for _, thatRes := range resPVCInfo {
  1262. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1263. if err != nil {
  1264. thatCluster = env.GetClusterID()
  1265. }
  1266. thatVolumeName, err := thatRes.GetString("volumename")
  1267. if err != nil {
  1268. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1269. continue
  1270. }
  1271. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1272. if err != nil {
  1273. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1274. continue
  1275. }
  1276. thatClaimNamespace, err := thatRes.GetString("namespace")
  1277. if err != nil {
  1278. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1279. continue
  1280. }
  1281. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1282. volumeName = thatVolumeName
  1283. }
  1284. }
  1285. usage := result.Values[0].Value
  1286. key := DiskIdentifier{cluster, volumeName}
  1287. if _, ok := diskMap[key]; !ok {
  1288. diskMap[key] = &Disk{
  1289. Cluster: cluster,
  1290. Name: volumeName,
  1291. Breakdown: &ClusterCostsBreakdown{},
  1292. }
  1293. }
  1294. diskMap[key].BytesUsedAvgPtr = &usage
  1295. }
  1296. for _, result := range resPVUsedMax {
  1297. cluster, err := result.GetString(env.GetPromClusterLabel())
  1298. if err != nil {
  1299. cluster = env.GetClusterID()
  1300. }
  1301. claimName, err := result.GetString("persistentvolumeclaim")
  1302. if err != nil {
  1303. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1304. continue
  1305. }
  1306. claimNamespace, err := result.GetString("namespace")
  1307. if err != nil {
  1308. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1309. continue
  1310. }
  1311. var volumeName string
  1312. for _, thatRes := range resPVCInfo {
  1313. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1314. if err != nil {
  1315. thatCluster = env.GetClusterID()
  1316. }
  1317. thatVolumeName, err := thatRes.GetString("volumename")
  1318. if err != nil {
  1319. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1320. continue
  1321. }
  1322. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1323. if err != nil {
  1324. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1325. continue
  1326. }
  1327. thatClaimNamespace, err := thatRes.GetString("namespace")
  1328. if err != nil {
  1329. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1330. continue
  1331. }
  1332. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1333. volumeName = thatVolumeName
  1334. }
  1335. }
  1336. usage := result.Values[0].Value
  1337. key := DiskIdentifier{cluster, volumeName}
  1338. if _, ok := diskMap[key]; !ok {
  1339. diskMap[key] = &Disk{
  1340. Cluster: cluster,
  1341. Name: volumeName,
  1342. Breakdown: &ClusterCostsBreakdown{},
  1343. }
  1344. }
  1345. diskMap[key].BytesUsedMaxPtr = &usage
  1346. }
  1347. }