cluster.go 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679
  1. package costmodel
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/opencost/opencost/pkg/cloud/provider"
  9. prometheus "github.com/prometheus/client_golang/api"
  10. "golang.org/x/exp/slices"
  11. "github.com/opencost/opencost/core/pkg/log"
  12. "github.com/opencost/opencost/core/pkg/opencost"
  13. "github.com/opencost/opencost/core/pkg/util/timeutil"
  14. "github.com/opencost/opencost/pkg/cloud/models"
  15. "github.com/opencost/opencost/pkg/env"
  16. "github.com/opencost/opencost/pkg/prom"
  17. )
  18. const (
  19. queryClusterCores = `sum(
  20. avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost{%s}[%s] %s)) by (node, %s) * 730 +
  21. avg(avg_over_time(node_gpu_hourly_cost{%s}[%s] %s)) by (node, %s) * 730
  22. ) by (%s)`
  23. queryClusterRAM = `sum(
  24. avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost{%s}[%s] %s)) by (node, %s) * 730
  25. ) by (%s)`
  26. queryStorage = `sum(
  27. avg(avg_over_time(pv_hourly_cost{%s}[%s] %s)) by (persistentvolume, %s) * 730
  28. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  29. ) by (%s) %s`
  30. queryTotal = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 +
  31. sum(
  32. avg(avg_over_time(pv_hourly_cost{%s}[1h])) by (persistentvolume, %s) * 730
  33. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  34. ) by (%s) %s`
  35. queryNodes = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 %s`
  36. )
  37. const MAX_LOCAL_STORAGE_SIZE = 1024 * 1024 * 1024 * 1024
  38. // When ASSET_INCLUDE_LOCAL_DISK_COST is set to false, local storage
  39. // provisioned by sig-storage-local-static-provisioner is excluded
  40. // by checking if the volume is prefixed by "local-pv-".
  41. //
  42. // This is based on the sig-storage-local-static-provisioner implementation,
  43. // which creates all PVs with the "local-pv-" prefix. For reference, see:
  44. // https://github.com/kubernetes-sigs/sig-storage-local-static-provisioner/blob/b6f465027bd059e92c0032c81dd1e1d90e35c909/pkg/discovery/discovery.go#L410-L417
  45. const SIG_STORAGE_LOCAL_PROVISIONER_PREFIX = "local-pv-"
  46. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  47. // are broken down by cores, memory, and storage.
  48. type ClusterCosts struct {
  49. Start *time.Time `json:"startTime"`
  50. End *time.Time `json:"endTime"`
  51. CPUCumulative float64 `json:"cpuCumulativeCost"`
  52. CPUMonthly float64 `json:"cpuMonthlyCost"`
  53. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  54. GPUCumulative float64 `json:"gpuCumulativeCost"`
  55. GPUMonthly float64 `json:"gpuMonthlyCost"`
  56. RAMCumulative float64 `json:"ramCumulativeCost"`
  57. RAMMonthly float64 `json:"ramMonthlyCost"`
  58. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  59. StorageCumulative float64 `json:"storageCumulativeCost"`
  60. StorageMonthly float64 `json:"storageMonthlyCost"`
  61. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  62. TotalCumulative float64 `json:"totalCumulativeCost"`
  63. TotalMonthly float64 `json:"totalMonthlyCost"`
  64. DataMinutes float64
  65. }
  66. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  67. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  68. type ClusterCostsBreakdown struct {
  69. Idle float64 `json:"idle"`
  70. Other float64 `json:"other"`
  71. System float64 `json:"system"`
  72. User float64 `json:"user"`
  73. }
  74. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  75. // the associated monthly rate data, and returns the Costs.
  76. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  77. start, end := timeutil.ParseTimeRange(window, offset)
  78. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  79. if dataHours == 0 {
  80. dataHours = end.Sub(start).Hours()
  81. }
  82. // Do not allow zero-length windows to prevent divide-by-zero issues
  83. if dataHours == 0 {
  84. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  85. }
  86. cc := &ClusterCosts{
  87. Start: &start,
  88. End: &end,
  89. CPUCumulative: cpu,
  90. GPUCumulative: gpu,
  91. RAMCumulative: ram,
  92. StorageCumulative: storage,
  93. TotalCumulative: cpu + gpu + ram + storage,
  94. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  95. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  96. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  97. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  98. }
  99. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  100. return cc, nil
  101. }
  102. type Disk struct {
  103. Cluster string
  104. Name string
  105. ProviderID string
  106. StorageClass string
  107. VolumeName string
  108. ClaimName string
  109. ClaimNamespace string
  110. Cost float64
  111. Bytes float64
  112. // These two fields may not be available at all times because they rely on
  113. // a new set of metrics that may or may not be available. Thus, they must
  114. // be nilable to represent the complete absence of the data.
  115. //
  116. // In other words, nilability here lets us distinguish between
  117. // "metric is not available" and "metric is available but is 0".
  118. //
  119. // They end in "Ptr" to distinguish from an earlier version in order to
  120. // ensure that all usages are checked for nil.
  121. BytesUsedAvgPtr *float64
  122. BytesUsedMaxPtr *float64
  123. Local bool
  124. Start time.Time
  125. End time.Time
  126. Minutes float64
  127. Breakdown *ClusterCostsBreakdown
  128. }
  129. type DiskIdentifier struct {
  130. Cluster string
  131. Name string
  132. }
  133. func ClusterDisks(client prometheus.Client, cp models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  134. // Start from the time "end", querying backwards
  135. t := end
  136. // minsPerResolution determines accuracy and resource use for the following
  137. // queries. Smaller values (higher resolution) result in better accuracy,
  138. // but more expensive queries, and vice-a-versa.
  139. resolution := env.GetETLResolution()
  140. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  141. var minsPerResolution int
  142. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  143. minsPerResolution = 1
  144. log.DedupedWarningf(3, "ClusterDisks(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  145. }
  146. durStr := timeutil.DurationString(end.Sub(start))
  147. if durStr == "" {
  148. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  149. }
  150. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  151. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (%s, persistentvolume,provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  152. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (%s, persistentvolume)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  153. queryActiveMins := fmt.Sprintf(`avg(kube_persistentvolume_capacity_bytes{%s}) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  154. queryPVStorageClass := fmt.Sprintf(`avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, persistentvolume, storageclass)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  155. queryPVUsedAvg := fmt.Sprintf(`avg(avg_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  156. queryPVUsedMax := fmt.Sprintf(`max(max_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  157. queryPVCInfo := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolumeclaim_info{%s}[%s])) by (%s, volumename, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  158. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  159. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  160. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  161. resChPVStorageClass := ctx.QueryAtTime(queryPVStorageClass, t)
  162. resChPVUsedAvg := ctx.QueryAtTime(queryPVUsedAvg, t)
  163. resChPVUsedMax := ctx.QueryAtTime(queryPVUsedMax, t)
  164. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, t)
  165. resPVCost, _ := resChPVCost.Await()
  166. resPVSize, _ := resChPVSize.Await()
  167. resActiveMins, _ := resChActiveMins.Await()
  168. resPVStorageClass, _ := resChPVStorageClass.Await()
  169. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  170. resPVUsedMax, _ := resChPVUsedMax.Await()
  171. resPVCInfo, _ := resChPVCInfo.Await()
  172. // Cloud providers do not always charge for a node's local disk costs (i.e.
  173. // ephemeral storage). Provide an option to opt out of calculating &
  174. // allocating local disk costs. Note, that this does not affect
  175. // PersistentVolume costs.
  176. //
  177. // Ref:
  178. // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/RootDeviceStorage.html
  179. // https://learn.microsoft.com/en-us/azure/virtual-machines/managed-disks-overview#temporary-disk
  180. // https://cloud.google.com/compute/docs/disks/local-ssd
  181. resLocalStorageCost := []*prom.QueryResult{}
  182. resLocalStorageUsedCost := []*prom.QueryResult{}
  183. resLocalStorageUsedAvg := []*prom.QueryResult{}
  184. resLocalStorageUsedMax := []*prom.QueryResult{}
  185. resLocalStorageBytes := []*prom.QueryResult{}
  186. resLocalActiveMins := []*prom.QueryResult{}
  187. if env.GetAssetIncludeLocalDiskCost() {
  188. // hourlyToCumulative is a scaling factor that, when multiplied by an
  189. // hourly value, converts it to a cumulative value; i.e. [$/hr] *
  190. // [min/res]*[hr/min] = [$/res]
  191. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  192. costPerGBHr := 0.04 / 730.0
  193. // container_fs metrics contains metrics for disks that are not local storage of the node. While not perfect to
  194. // attempt to identify the correct device which is being used as local storage we first filter for devices mounted
  195. // at paths `/dev/nvme.*` or `/dev/sda.*`. There still may be multiple devices mounted at paths matching the regex
  196. // so later on we will select the device with the highest `container_fs_limit_bytes` per instance to create a local disk asset
  197. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  198. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  199. queryLocalStorageUsedAvg := fmt.Sprintf(`avg(sum(avg_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  200. queryLocalStorageUsedMax := fmt.Sprintf(`max(sum(max_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  201. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm])`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  202. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost{%s}) by (%s, node)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  203. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  204. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  205. resChLocalStoreageUsedAvg := ctx.QueryAtTime(queryLocalStorageUsedAvg, t)
  206. resChLocalStoreageUsedMax := ctx.QueryAtTime(queryLocalStorageUsedMax, t)
  207. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  208. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  209. resLocalStorageCost, _ = resChLocalStorageCost.Await()
  210. resLocalStorageUsedCost, _ = resChLocalStorageUsedCost.Await()
  211. resLocalStorageUsedAvg, _ = resChLocalStoreageUsedAvg.Await()
  212. resLocalStorageUsedMax, _ = resChLocalStoreageUsedMax.Await()
  213. resLocalStorageBytes, _ = resChLocalStorageBytes.Await()
  214. resLocalActiveMins, _ = resChLocalActiveMins.Await()
  215. }
  216. if ctx.HasErrors() {
  217. return nil, ctx.ErrorCollection()
  218. }
  219. diskMap := map[DiskIdentifier]*Disk{}
  220. for _, result := range resPVCInfo {
  221. cluster, err := result.GetString(env.GetPromClusterLabel())
  222. if err != nil {
  223. cluster = env.GetClusterID()
  224. }
  225. volumeName, err := result.GetString("volumename")
  226. if err != nil {
  227. log.Debugf("ClusterDisks: pv claim data missing volumename")
  228. continue
  229. }
  230. claimName, err := result.GetString("persistentvolumeclaim")
  231. if err != nil {
  232. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  233. continue
  234. }
  235. claimNamespace, err := result.GetString("namespace")
  236. if err != nil {
  237. log.Debugf("ClusterDisks: pv claim data missing namespace")
  238. continue
  239. }
  240. key := DiskIdentifier{cluster, volumeName}
  241. if _, ok := diskMap[key]; !ok {
  242. diskMap[key] = &Disk{
  243. Cluster: cluster,
  244. Name: volumeName,
  245. Breakdown: &ClusterCostsBreakdown{},
  246. }
  247. }
  248. diskMap[key].VolumeName = volumeName
  249. diskMap[key].ClaimName = claimName
  250. diskMap[key].ClaimNamespace = claimNamespace
  251. }
  252. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, cp, opencost.NewClosedWindow(start, end))
  253. type localStorage struct {
  254. device string
  255. disk *Disk
  256. }
  257. localStorageDisks := map[DiskIdentifier]localStorage{}
  258. // Start with local storage bytes so that the device with the largest size which has passed the
  259. // query filters can be determined
  260. for _, result := range resLocalStorageBytes {
  261. cluster, err := result.GetString(env.GetPromClusterLabel())
  262. if err != nil {
  263. cluster = env.GetClusterID()
  264. }
  265. name, err := result.GetString("instance")
  266. if err != nil {
  267. log.Warnf("ClusterDisks: local storage data missing instance")
  268. continue
  269. }
  270. device, err := result.GetString("device")
  271. if err != nil {
  272. log.Warnf("ClusterDisks: local storage data missing device")
  273. continue
  274. }
  275. bytes := result.Values[0].Value
  276. // Ignore disks that are larger than the max size
  277. if bytes > MAX_LOCAL_STORAGE_SIZE {
  278. continue
  279. }
  280. key := DiskIdentifier{cluster, name}
  281. // only keep the device with the most bytes per instance
  282. if current, ok := localStorageDisks[key]; !ok || current.disk.Bytes < bytes {
  283. localStorageDisks[key] = localStorage{
  284. device: device,
  285. disk: &Disk{
  286. Cluster: cluster,
  287. Name: name,
  288. Breakdown: &ClusterCostsBreakdown{},
  289. Local: true,
  290. StorageClass: opencost.LocalStorageClass,
  291. Bytes: bytes,
  292. },
  293. }
  294. }
  295. }
  296. for _, result := range resLocalStorageCost {
  297. cluster, err := result.GetString(env.GetPromClusterLabel())
  298. if err != nil {
  299. cluster = env.GetClusterID()
  300. }
  301. name, err := result.GetString("instance")
  302. if err != nil {
  303. log.Warnf("ClusterDisks: local storage data missing instance")
  304. continue
  305. }
  306. device, err := result.GetString("device")
  307. if err != nil {
  308. log.Warnf("ClusterDisks: local storage data missing device")
  309. continue
  310. }
  311. cost := result.Values[0].Value
  312. key := DiskIdentifier{cluster, name}
  313. ls, ok := localStorageDisks[key]
  314. if !ok || ls.device != device {
  315. continue
  316. }
  317. ls.disk.Cost = cost
  318. }
  319. for _, result := range resLocalStorageUsedCost {
  320. cluster, err := result.GetString(env.GetPromClusterLabel())
  321. if err != nil {
  322. cluster = env.GetClusterID()
  323. }
  324. name, err := result.GetString("instance")
  325. if err != nil {
  326. log.Warnf("ClusterDisks: local storage usage data missing instance")
  327. continue
  328. }
  329. device, err := result.GetString("device")
  330. if err != nil {
  331. log.Warnf("ClusterDisks: local storage data missing device")
  332. continue
  333. }
  334. cost := result.Values[0].Value
  335. key := DiskIdentifier{cluster, name}
  336. ls, ok := localStorageDisks[key]
  337. if !ok || ls.device != device {
  338. continue
  339. }
  340. ls.disk.Breakdown.System = cost / ls.disk.Cost
  341. }
  342. for _, result := range resLocalStorageUsedAvg {
  343. cluster, err := result.GetString(env.GetPromClusterLabel())
  344. if err != nil {
  345. cluster = env.GetClusterID()
  346. }
  347. name, err := result.GetString("instance")
  348. if err != nil {
  349. log.Warnf("ClusterDisks: local storage data missing instance")
  350. continue
  351. }
  352. device, err := result.GetString("device")
  353. if err != nil {
  354. log.Warnf("ClusterDisks: local storage data missing device")
  355. continue
  356. }
  357. bytesAvg := result.Values[0].Value
  358. key := DiskIdentifier{cluster, name}
  359. ls, ok := localStorageDisks[key]
  360. if !ok || ls.device != device {
  361. continue
  362. }
  363. ls.disk.BytesUsedAvgPtr = &bytesAvg
  364. }
  365. for _, result := range resLocalStorageUsedMax {
  366. cluster, err := result.GetString(env.GetPromClusterLabel())
  367. if err != nil {
  368. cluster = env.GetClusterID()
  369. }
  370. name, err := result.GetString("instance")
  371. if err != nil {
  372. log.Warnf("ClusterDisks: local storage data missing instance")
  373. continue
  374. }
  375. device, err := result.GetString("device")
  376. if err != nil {
  377. log.Warnf("ClusterDisks: local storage data missing device")
  378. continue
  379. }
  380. bytesMax := result.Values[0].Value
  381. key := DiskIdentifier{cluster, name}
  382. ls, ok := localStorageDisks[key]
  383. if !ok || ls.device != device {
  384. continue
  385. }
  386. ls.disk.BytesUsedMaxPtr = &bytesMax
  387. }
  388. for _, result := range resLocalActiveMins {
  389. cluster, err := result.GetString(env.GetPromClusterLabel())
  390. if err != nil {
  391. cluster = env.GetClusterID()
  392. }
  393. name, err := result.GetString("node")
  394. if err != nil {
  395. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  396. continue
  397. }
  398. providerID, err := result.GetString("provider_id")
  399. if err != nil {
  400. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  401. continue
  402. }
  403. key := DiskIdentifier{cluster, name}
  404. ls, ok := localStorageDisks[key]
  405. if !ok {
  406. continue
  407. }
  408. ls.disk.ProviderID = provider.ParseLocalDiskID(providerID)
  409. if len(result.Values) == 0 {
  410. continue
  411. }
  412. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  413. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  414. mins := e.Sub(s).Minutes()
  415. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  416. ls.disk.End = e
  417. ls.disk.Start = s
  418. ls.disk.Minutes = mins
  419. }
  420. // move local storage disks to main disk map
  421. for key, ls := range localStorageDisks {
  422. diskMap[key] = ls.disk
  423. }
  424. var unTracedDiskLogData []DiskIdentifier
  425. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  426. for _, result := range resPVStorageClass {
  427. cluster, err := result.GetString(env.GetPromClusterLabel())
  428. if err != nil {
  429. cluster = env.GetClusterID()
  430. }
  431. name, _ := result.GetString("persistentvolume")
  432. key := DiskIdentifier{cluster, name}
  433. if _, ok := diskMap[key]; !ok {
  434. if !slices.Contains(unTracedDiskLogData, key) {
  435. unTracedDiskLogData = append(unTracedDiskLogData, key)
  436. }
  437. continue
  438. }
  439. if len(result.Values) == 0 {
  440. continue
  441. }
  442. storageClass, err := result.GetString("storageclass")
  443. if err != nil {
  444. diskMap[key].StorageClass = opencost.UnknownStorageClass
  445. } else {
  446. diskMap[key].StorageClass = storageClass
  447. }
  448. }
  449. // Logging the unidentified disk information outside the loop
  450. for _, unIdentifiedDisk := range unTracedDiskLogData {
  451. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  452. }
  453. for _, disk := range diskMap {
  454. // Apply all remaining RAM to Idle
  455. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  456. // Set provider Id to the name for reconciliation
  457. if disk.ProviderID == "" {
  458. disk.ProviderID = disk.Name
  459. }
  460. }
  461. if !env.GetAssetIncludeLocalDiskCost() {
  462. return filterOutLocalPVs(diskMap), nil
  463. }
  464. return diskMap, nil
  465. }
  466. type NodeOverhead struct {
  467. CpuOverheadFraction float64
  468. RamOverheadFraction float64
  469. }
  470. type Node struct {
  471. Cluster string
  472. Name string
  473. ProviderID string
  474. NodeType string
  475. CPUCost float64
  476. CPUCores float64
  477. GPUCost float64
  478. GPUCount float64
  479. RAMCost float64
  480. RAMBytes float64
  481. Discount float64
  482. Preemptible bool
  483. CPUBreakdown *ClusterCostsBreakdown
  484. RAMBreakdown *ClusterCostsBreakdown
  485. Start time.Time
  486. End time.Time
  487. Minutes float64
  488. Labels map[string]string
  489. CostPerCPUHr float64
  490. CostPerRAMGiBHr float64
  491. CostPerGPUHr float64
  492. Overhead *NodeOverhead
  493. }
  494. // GKE lies about the number of cores e2 nodes have. This table
  495. // contains a mapping from node type -> actual CPU cores
  496. // for those cases.
  497. var partialCPUMap = map[string]float64{
  498. "e2-micro": 0.25,
  499. "e2-small": 0.5,
  500. "e2-medium": 1.0,
  501. }
  502. type NodeIdentifier struct {
  503. Cluster string
  504. Name string
  505. ProviderID string
  506. }
  507. type nodeIdentifierNoProviderID struct {
  508. Cluster string
  509. Name string
  510. }
  511. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  512. for k, v := range activeDataMap {
  513. keyNon := nodeIdentifierNoProviderID{
  514. Cluster: k.Cluster,
  515. Name: k.Name,
  516. }
  517. if cost, ok := costMap[k]; ok {
  518. minutes := v.minutes
  519. count := 1.0
  520. if c, ok := resourceCountMap[keyNon]; ok {
  521. count = c
  522. }
  523. costMap[k] = cost * (minutes / 60) * count
  524. }
  525. }
  526. }
  527. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  528. for k, v := range activeDataMap {
  529. if cost, ok := costMap[k]; ok {
  530. minutes := v.minutes
  531. costMap[k] = cost * (minutes / 60)
  532. }
  533. }
  534. }
  535. func ClusterNodes(cp models.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  536. // Start from the time "end", querying backwards
  537. t := end
  538. // minsPerResolution determines accuracy and resource use for the following
  539. // queries. Smaller values (higher resolution) result in better accuracy,
  540. // but more expensive queries, and vice-a-versa.
  541. resolution := env.GetETLResolution()
  542. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  543. var minsPerResolution int
  544. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  545. minsPerResolution = 1
  546. log.DedupedWarningf(3, "ClusterNodes(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  547. }
  548. durStr := timeutil.DurationString(end.Sub(start))
  549. if durStr == "" {
  550. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  551. }
  552. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  553. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  554. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  555. queryNodeCPUCoresCapacity := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  556. queryNodeCPUCoresAllocatable := fmt.Sprintf(`avg(avg_over_time(kube_node_status_allocatable_cpu_cores{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  557. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  558. queryNodeRAMBytesCapacity := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  559. queryNodeRAMBytesAllocatable := fmt.Sprintf(`avg(avg_over_time(kube_node_status_allocatable_memory_bytes{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  560. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count{%s}[%s])) by (%s, node, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  561. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  562. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total{%s}[%s:%dm])) by (kubernetes_node, %s, mode)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel())
  563. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system", %s}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  564. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system", %s}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  565. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost{%s}) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  566. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot{%s}[%s:%dm])`, env.GetPromClusterFilter(), durStr, minsPerResolution)
  567. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels{%s}[%s:%dm])`, env.GetPromClusterFilter(), durStr, minsPerResolution)
  568. // Return errors if these fail
  569. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  570. resChNodeCPUCoresCapacity := requiredCtx.QueryAtTime(queryNodeCPUCoresCapacity, t)
  571. resChNodeCPUCoresAllocatable := requiredCtx.QueryAtTime(queryNodeCPUCoresAllocatable, t)
  572. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  573. resChNodeRAMBytesCapacity := requiredCtx.QueryAtTime(queryNodeRAMBytesCapacity, t)
  574. resChNodeRAMBytesAllocatable := requiredCtx.QueryAtTime(queryNodeRAMBytesAllocatable, t)
  575. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  576. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  577. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  578. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  579. // Do not return errors if these fail, but log warnings
  580. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  581. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  582. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  583. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  584. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  585. resNodeCPUCoresCapacity, _ := resChNodeCPUCoresCapacity.Await()
  586. resNodeCPUCoresAllocatable, _ := resChNodeCPUCoresAllocatable.Await()
  587. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  588. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  589. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  590. resNodeRAMBytesCapacity, _ := resChNodeRAMBytesCapacity.Await()
  591. resNodeRAMBytesAllocatable, _ := resChNodeRAMBytesAllocatable.Await()
  592. resIsSpot, _ := resChIsSpot.Await()
  593. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  594. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  595. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  596. resActiveMins, _ := resChActiveMins.Await()
  597. resLabels, _ := resChLabels.Await()
  598. if optionalCtx.HasErrors() {
  599. for _, err := range optionalCtx.Errors() {
  600. log.Warnf("ClusterNodes: %s", err)
  601. }
  602. }
  603. if requiredCtx.HasErrors() {
  604. for _, err := range requiredCtx.Errors() {
  605. log.Errorf("ClusterNodes: %s", err)
  606. }
  607. return nil, requiredCtx.ErrorCollection()
  608. }
  609. activeDataMap := buildActiveDataMap(resActiveMins, resolution, opencost.NewClosedWindow(start, end))
  610. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  611. preemptibleMap := buildPreemptibleMap(resIsSpot)
  612. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  613. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  614. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  615. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  616. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  617. cpuCoresCapacityMap := buildCPUCoresMap(resNodeCPUCoresCapacity)
  618. ramBytesCapacityMap := buildRAMBytesMap(resNodeRAMBytesCapacity)
  619. cpuCoresAllocatableMap := buildCPUCoresMap(resNodeCPUCoresAllocatable)
  620. ramBytesAllocatableMap := buildRAMBytesMap(resNodeRAMBytesAllocatable)
  621. overheadMap := buildOverheadMap(ramBytesCapacityMap, ramBytesAllocatableMap, cpuCoresCapacityMap, cpuCoresAllocatableMap)
  622. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  623. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  624. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  625. labelsMap := buildLabelsMap(resLabels)
  626. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresCapacityMap)
  627. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesCapacityMap)
  628. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  629. nodeMap := buildNodeMap(
  630. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  631. cpuCoresCapacityMap, ramBytesCapacityMap, ramUserPctMap,
  632. ramSystemPctMap,
  633. cpuBreakdownMap,
  634. activeDataMap,
  635. preemptibleMap,
  636. labelsMap,
  637. clusterAndNameToType,
  638. resolution,
  639. overheadMap,
  640. )
  641. c, err := cp.GetConfig()
  642. if err != nil {
  643. return nil, err
  644. }
  645. discount, err := ParsePercentString(c.Discount)
  646. if err != nil {
  647. return nil, err
  648. }
  649. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  650. if err != nil {
  651. return nil, err
  652. }
  653. for _, node := range nodeMap {
  654. // TODO take GKE Reserved Instances into account
  655. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  656. // Apply all remaining resources to Idle
  657. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  658. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  659. }
  660. return nodeMap, nil
  661. }
  662. type LoadBalancerIdentifier struct {
  663. Cluster string
  664. Namespace string
  665. Name string
  666. }
  667. type LoadBalancer struct {
  668. Cluster string
  669. Namespace string
  670. Name string
  671. ProviderID string
  672. Cost float64
  673. Start time.Time
  674. End time.Time
  675. Minutes float64
  676. Private bool
  677. Ip string
  678. }
  679. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  680. // Start from the time "end", querying backwards
  681. t := end
  682. // minsPerResolution determines accuracy and resource use for the following
  683. // queries. Smaller values (higher resolution) result in better accuracy,
  684. // but more expensive queries, and vice-a-versa.
  685. resolution := env.GetETLResolution()
  686. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  687. var minsPerResolution int
  688. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  689. minsPerResolution = 1
  690. log.DedupedWarningf(3, "ClusterLoadBalancers(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  691. }
  692. // Query for the duration between start and end
  693. durStr := timeutil.DurationString(end.Sub(start))
  694. if durStr == "" {
  695. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  696. }
  697. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  698. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, %s, ingress_ip)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  699. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost{%s}) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  700. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  701. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  702. resLBCost, _ := resChLBCost.Await()
  703. resActiveMins, _ := resChActiveMins.Await()
  704. if ctx.HasErrors() {
  705. return nil, ctx.ErrorCollection()
  706. }
  707. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  708. for _, result := range resActiveMins {
  709. cluster, err := result.GetString(env.GetPromClusterLabel())
  710. if err != nil {
  711. cluster = env.GetClusterID()
  712. }
  713. namespace, err := result.GetString("namespace")
  714. if err != nil {
  715. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  716. continue
  717. }
  718. name, err := result.GetString("service_name")
  719. if err != nil {
  720. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  721. continue
  722. }
  723. providerID, err := result.GetString("ingress_ip")
  724. if err != nil {
  725. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  726. providerID = ""
  727. }
  728. key := LoadBalancerIdentifier{
  729. Cluster: cluster,
  730. Namespace: namespace,
  731. Name: name,
  732. }
  733. // Skip if there are no data
  734. if len(result.Values) == 0 {
  735. continue
  736. }
  737. // Add load balancer to the set of load balancers
  738. if _, ok := loadBalancerMap[key]; !ok {
  739. loadBalancerMap[key] = &LoadBalancer{
  740. Cluster: cluster,
  741. Namespace: namespace,
  742. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  743. ProviderID: provider.ParseLBID(providerID),
  744. }
  745. }
  746. // Append start, end, and minutes. This should come before all other data.
  747. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  748. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  749. loadBalancerMap[key].Start = s
  750. loadBalancerMap[key].End = e
  751. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  752. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  753. // Prevents there from being a duplicate LoadBalancers on the same day
  754. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  755. loadBalancerMap[key].ProviderID = providerID
  756. }
  757. }
  758. for _, result := range resLBCost {
  759. cluster, err := result.GetString(env.GetPromClusterLabel())
  760. if err != nil {
  761. cluster = env.GetClusterID()
  762. }
  763. namespace, err := result.GetString("namespace")
  764. if err != nil {
  765. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  766. continue
  767. }
  768. name, err := result.GetString("service_name")
  769. if err != nil {
  770. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  771. continue
  772. }
  773. providerID, err := result.GetString("ingress_ip")
  774. if err != nil {
  775. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  776. // only update asset cost when an actual IP was returned
  777. continue
  778. }
  779. key := LoadBalancerIdentifier{
  780. Cluster: cluster,
  781. Namespace: namespace,
  782. Name: name,
  783. }
  784. // Apply cost as price-per-hour * hours
  785. if lb, ok := loadBalancerMap[key]; ok {
  786. lbPricePerHr := result.Values[0].Value
  787. // interpolate any missing data
  788. resultMins := lb.Minutes
  789. if resultMins > 0 {
  790. scaleFactor := (resultMins + resolution.Minutes()) / resultMins
  791. hrs := (lb.Minutes * scaleFactor) / 60.0
  792. lb.Cost += lbPricePerHr * hrs
  793. } else {
  794. log.DedupedWarningf(20, "ClusterLoadBalancers: found zero minutes for key: %v", key)
  795. }
  796. if lb.Ip != "" && lb.Ip != providerID {
  797. log.DedupedWarningf(5, "ClusterLoadBalancers: multiple IPs per load balancer not supported, using most recent IP")
  798. }
  799. lb.Ip = providerID
  800. lb.Private = privateIPCheck(providerID)
  801. } else {
  802. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %v", key)
  803. }
  804. }
  805. return loadBalancerMap, nil
  806. }
  807. // Check if an ip is private.
  808. func privateIPCheck(ip string) bool {
  809. ipAddress := net.ParseIP(ip)
  810. return ipAddress.IsPrivate()
  811. }
  812. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  813. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider models.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  814. if window < 10*time.Minute {
  815. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  816. }
  817. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  818. start, end := timeutil.ParseTimeRange(window, offset)
  819. mins := end.Sub(start).Minutes()
  820. // minsPerResolution determines accuracy and resource use for the following
  821. // queries. Smaller values (higher resolution) result in better accuracy,
  822. // but more expensive queries, and vice-a-versa.
  823. resolution := env.GetETLResolution()
  824. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  825. var minsPerResolution int
  826. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) < 1 {
  827. minsPerResolution = 1
  828. log.DedupedWarningf(3, "ComputeClusterCosts(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  829. }
  830. windowStr := timeutil.DurationString(window)
  831. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  832. // value, converts it to a cumulative value; i.e.
  833. // [$/hr] * [min/res]*[hr/min] = [$/res]
  834. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  835. const fmtQueryDataCount = `
  836. count_over_time(sum(kube_node_status_capacity_cpu_cores{%s}) by (%s)[%s:%dm]%s) * %d
  837. `
  838. const fmtQueryTotalGPU = `
  839. sum(
  840. sum_over_time(node_gpu_hourly_cost{%s}[%s:%dm]%s) * %f
  841. ) by (%s)
  842. `
  843. const fmtQueryTotalCPU = `
  844. sum(
  845. sum_over_time(avg(kube_node_status_capacity_cpu_cores{%s}) by (node, %s)[%s:%dm]%s) *
  846. avg(avg_over_time(node_cpu_hourly_cost{%s}[%s:%dm]%s)) by (node, %s) * %f
  847. ) by (%s)
  848. `
  849. const fmtQueryTotalRAM = `
  850. sum(
  851. sum_over_time(avg(kube_node_status_capacity_memory_bytes{%s}) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  852. avg(avg_over_time(node_ram_hourly_cost{%s}[%s:%dm]%s)) by (node, %s) * %f
  853. ) by (%s)
  854. `
  855. const fmtQueryTotalStorage = `
  856. sum(
  857. sum_over_time(avg(kube_persistentvolume_capacity_bytes{%s}) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  858. avg(avg_over_time(pv_hourly_cost{%s}[%s:%dm]%s)) by (persistentvolume, %s) * %f
  859. ) by (%s)
  860. `
  861. const fmtQueryCPUModePct = `
  862. sum(rate(node_cpu_seconds_total{%s}[%s]%s)) by (%s, mode) / ignoring(mode)
  863. group_left sum(rate(node_cpu_seconds_total{%s}[%s]%s)) by (%s)
  864. `
  865. const fmtQueryRAMSystemPct = `
  866. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system", %s}[%s:%dm]%s)) by (%s)
  867. / sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm]%s)) by (%s)
  868. `
  869. const fmtQueryRAMUserPct = `
  870. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes{%s}[%s:%dm]%s)) by (%s)
  871. / sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm]%s)) by (%s)
  872. `
  873. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  874. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  875. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  876. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  877. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  878. if queryTotalLocalStorage != "" {
  879. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  880. }
  881. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  882. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  883. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  884. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  885. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  886. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  887. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  888. resChs := ctx.QueryAll(
  889. queryDataCount,
  890. queryTotalGPU,
  891. queryTotalCPU,
  892. queryTotalRAM,
  893. queryTotalStorage,
  894. )
  895. // Only submit the local storage query if it is valid. Otherwise Prometheus
  896. // will return errors. Always append something to resChs, regardless, to
  897. // maintain indexing.
  898. if queryTotalLocalStorage != "" {
  899. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  900. } else {
  901. resChs = append(resChs, nil)
  902. }
  903. if withBreakdown {
  904. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, env.GetPromClusterFilter(), windowStr, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, fmtOffset, env.GetPromClusterLabel())
  905. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  906. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  907. bdResChs := ctx.QueryAll(
  908. queryCPUModePct,
  909. queryRAMSystemPct,
  910. queryRAMUserPct,
  911. )
  912. // Only submit the local storage query if it is valid. Otherwise Prometheus
  913. // will return errors. Always append something to resChs, regardless, to
  914. // maintain indexing.
  915. if queryUsedLocalStorage != "" {
  916. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  917. } else {
  918. bdResChs = append(bdResChs, nil)
  919. }
  920. resChs = append(resChs, bdResChs...)
  921. }
  922. resDataCount, _ := resChs[0].Await()
  923. resTotalGPU, _ := resChs[1].Await()
  924. resTotalCPU, _ := resChs[2].Await()
  925. resTotalRAM, _ := resChs[3].Await()
  926. resTotalStorage, _ := resChs[4].Await()
  927. if ctx.HasErrors() {
  928. return nil, ctx.ErrorCollection()
  929. }
  930. defaultClusterID := env.GetClusterID()
  931. dataMinsByCluster := map[string]float64{}
  932. for _, result := range resDataCount {
  933. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  934. if clusterID == "" {
  935. clusterID = defaultClusterID
  936. }
  937. dataMins := mins
  938. if len(result.Values) > 0 {
  939. dataMins = result.Values[0].Value
  940. } else {
  941. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  942. }
  943. dataMinsByCluster[clusterID] = dataMins
  944. }
  945. // Determine combined discount
  946. discount, customDiscount := 0.0, 0.0
  947. c, err := a.CloudProvider.GetConfig()
  948. if err == nil {
  949. discount, err = ParsePercentString(c.Discount)
  950. if err != nil {
  951. discount = 0.0
  952. }
  953. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  954. if err != nil {
  955. customDiscount = 0.0
  956. }
  957. }
  958. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  959. costData := make(map[string]map[string]float64)
  960. // Helper function to iterate over Prom query results, parsing the raw values into
  961. // the intermediate costData structure.
  962. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  963. for _, result := range results {
  964. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  965. if clusterID == "" {
  966. clusterID = defaultClusterID
  967. }
  968. if _, ok := costData[clusterID]; !ok {
  969. costData[clusterID] = map[string]float64{}
  970. }
  971. if len(result.Values) > 0 {
  972. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  973. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  974. }
  975. }
  976. }
  977. // Apply both sustained use and custom discounts to RAM and CPU
  978. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  979. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  980. // Apply only custom discount to GPU and storage
  981. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  982. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  983. if queryTotalLocalStorage != "" {
  984. resTotalLocalStorage, err := resChs[5].Await()
  985. if err != nil {
  986. return nil, err
  987. }
  988. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  989. }
  990. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  991. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  992. pvUsedCostMap := map[string]float64{}
  993. if withBreakdown {
  994. resCPUModePct, _ := resChs[6].Await()
  995. resRAMSystemPct, _ := resChs[7].Await()
  996. resRAMUserPct, _ := resChs[8].Await()
  997. if ctx.HasErrors() {
  998. return nil, ctx.ErrorCollection()
  999. }
  1000. for _, result := range resCPUModePct {
  1001. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1002. if clusterID == "" {
  1003. clusterID = defaultClusterID
  1004. }
  1005. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  1006. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  1007. }
  1008. cpuBD := cpuBreakdownMap[clusterID]
  1009. mode, err := result.GetString("mode")
  1010. if err != nil {
  1011. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  1012. mode = "other"
  1013. }
  1014. switch mode {
  1015. case "idle":
  1016. cpuBD.Idle += result.Values[0].Value
  1017. case "system":
  1018. cpuBD.System += result.Values[0].Value
  1019. case "user":
  1020. cpuBD.User += result.Values[0].Value
  1021. default:
  1022. cpuBD.Other += result.Values[0].Value
  1023. }
  1024. }
  1025. for _, result := range resRAMSystemPct {
  1026. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1027. if clusterID == "" {
  1028. clusterID = defaultClusterID
  1029. }
  1030. if _, ok := ramBreakdownMap[clusterID]; !ok {
  1031. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  1032. }
  1033. ramBD := ramBreakdownMap[clusterID]
  1034. ramBD.System += result.Values[0].Value
  1035. }
  1036. for _, result := range resRAMUserPct {
  1037. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1038. if clusterID == "" {
  1039. clusterID = defaultClusterID
  1040. }
  1041. if _, ok := ramBreakdownMap[clusterID]; !ok {
  1042. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  1043. }
  1044. ramBD := ramBreakdownMap[clusterID]
  1045. ramBD.User += result.Values[0].Value
  1046. }
  1047. for _, ramBD := range ramBreakdownMap {
  1048. remaining := 1.0
  1049. remaining -= ramBD.Other
  1050. remaining -= ramBD.System
  1051. remaining -= ramBD.User
  1052. ramBD.Idle = remaining
  1053. }
  1054. if queryUsedLocalStorage != "" {
  1055. resUsedLocalStorage, err := resChs[9].Await()
  1056. if err != nil {
  1057. return nil, err
  1058. }
  1059. for _, result := range resUsedLocalStorage {
  1060. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1061. if clusterID == "" {
  1062. clusterID = defaultClusterID
  1063. }
  1064. pvUsedCostMap[clusterID] += result.Values[0].Value
  1065. }
  1066. }
  1067. }
  1068. if ctx.HasErrors() {
  1069. for _, err := range ctx.Errors() {
  1070. log.Errorf("ComputeClusterCosts: %s", err)
  1071. }
  1072. return nil, ctx.ErrorCollection()
  1073. }
  1074. // Convert intermediate structure to Costs instances
  1075. costsByCluster := map[string]*ClusterCosts{}
  1076. for id, cd := range costData {
  1077. dataMins, ok := dataMinsByCluster[id]
  1078. if !ok {
  1079. dataMins = mins
  1080. log.Warnf("Cluster cost data count not found for cluster %s", id)
  1081. }
  1082. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  1083. if err != nil {
  1084. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  1085. return nil, err
  1086. }
  1087. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  1088. costs.CPUBreakdown = cpuBD
  1089. }
  1090. if ramBD, ok := ramBreakdownMap[id]; ok {
  1091. costs.RAMBreakdown = ramBD
  1092. }
  1093. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  1094. if pvUC, ok := pvUsedCostMap[id]; ok {
  1095. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  1096. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  1097. }
  1098. costs.DataMinutes = dataMins
  1099. costsByCluster[id] = costs
  1100. }
  1101. return costsByCluster, nil
  1102. }
  1103. type Totals struct {
  1104. TotalCost [][]string `json:"totalcost"`
  1105. CPUCost [][]string `json:"cpucost"`
  1106. MemCost [][]string `json:"memcost"`
  1107. StorageCost [][]string `json:"storageCost"`
  1108. }
  1109. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  1110. if len(qrs) == 0 {
  1111. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  1112. }
  1113. result := qrs[0]
  1114. totals := [][]string{}
  1115. for _, value := range result.Values {
  1116. d0 := fmt.Sprintf("%f", value.Timestamp)
  1117. d1 := fmt.Sprintf("%f", value.Value)
  1118. toAppend := []string{
  1119. d0,
  1120. d1,
  1121. }
  1122. totals = append(totals, toAppend)
  1123. }
  1124. return totals, nil
  1125. }
  1126. // ClusterCostsOverTime gives the full cluster costs over time
  1127. func ClusterCostsOverTime(cli prometheus.Client, provider models.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  1128. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  1129. if localStorageQuery != "" {
  1130. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  1131. }
  1132. layout := "2006-01-02T15:04:05.000Z"
  1133. start, err := time.Parse(layout, startString)
  1134. if err != nil {
  1135. log.Errorf("Error parsing time %s. Error: %s", startString, err.Error())
  1136. return nil, err
  1137. }
  1138. end, err := time.Parse(layout, endString)
  1139. if err != nil {
  1140. log.Errorf("Error parsing time %s. Error: %s", endString, err.Error())
  1141. return nil, err
  1142. }
  1143. fmtWindow := timeutil.DurationString(window)
  1144. if fmtWindow == "" {
  1145. err := fmt.Errorf("window value invalid or missing")
  1146. log.Errorf("Error parsing time %v. Error: %s", window, err.Error())
  1147. return nil, err
  1148. }
  1149. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  1150. qCores := fmt.Sprintf(queryClusterCores, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1151. qRAM := fmt.Sprintf(queryClusterRAM, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1152. qStorage := fmt.Sprintf(queryStorage, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1153. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1154. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  1155. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  1156. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  1157. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  1158. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  1159. resultClusterCores, err := resChClusterCores.Await()
  1160. if err != nil {
  1161. return nil, err
  1162. }
  1163. resultClusterRAM, err := resChClusterRAM.Await()
  1164. if err != nil {
  1165. return nil, err
  1166. }
  1167. resultStorage, err := resChStorage.Await()
  1168. if err != nil {
  1169. return nil, err
  1170. }
  1171. resultTotal, err := resChTotal.Await()
  1172. if err != nil {
  1173. return nil, err
  1174. }
  1175. coreTotal, err := resultToTotals(resultClusterCores)
  1176. if err != nil {
  1177. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  1178. return nil, err
  1179. }
  1180. ramTotal, err := resultToTotals(resultClusterRAM)
  1181. if err != nil {
  1182. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  1183. return nil, err
  1184. }
  1185. storageTotal, err := resultToTotals(resultStorage)
  1186. if err != nil {
  1187. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  1188. }
  1189. clusterTotal, err := resultToTotals(resultTotal)
  1190. if err != nil {
  1191. // If clusterTotal query failed, it's likely because there are no PVs, which
  1192. // causes the qTotal query to return no data. Instead, query only node costs.
  1193. // If that fails, return an error because something is actually wrong.
  1194. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterFilter(), env.GetPromClusterLabel(), localStorageQuery)
  1195. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  1196. for _, warning := range warnings {
  1197. log.Warnf(warning)
  1198. }
  1199. if err != nil {
  1200. return nil, err
  1201. }
  1202. clusterTotal, err = resultToTotals(resultNodes)
  1203. if err != nil {
  1204. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  1205. return nil, err
  1206. }
  1207. }
  1208. return &Totals{
  1209. TotalCost: clusterTotal,
  1210. CPUCost: coreTotal,
  1211. MemCost: ramTotal,
  1212. StorageCost: storageTotal,
  1213. }, nil
  1214. }
  1215. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo []*prom.QueryResult, cp models.Provider, window opencost.Window) {
  1216. for _, result := range resActiveMins {
  1217. cluster, err := result.GetString(env.GetPromClusterLabel())
  1218. if err != nil {
  1219. cluster = env.GetClusterID()
  1220. }
  1221. name, err := result.GetString("persistentvolume")
  1222. if err != nil {
  1223. log.Warnf("ClusterDisks: active mins missing pv name")
  1224. continue
  1225. }
  1226. if len(result.Values) == 0 {
  1227. continue
  1228. }
  1229. key := DiskIdentifier{cluster, name}
  1230. if _, ok := diskMap[key]; !ok {
  1231. diskMap[key] = &Disk{
  1232. Cluster: cluster,
  1233. Name: name,
  1234. Breakdown: &ClusterCostsBreakdown{},
  1235. }
  1236. }
  1237. s, e := calculateStartAndEnd(result, resolution, window)
  1238. mins := e.Sub(s).Minutes()
  1239. diskMap[key].End = e
  1240. diskMap[key].Start = s
  1241. diskMap[key].Minutes = mins
  1242. }
  1243. for _, result := range resPVSize {
  1244. cluster, err := result.GetString(env.GetPromClusterLabel())
  1245. if err != nil {
  1246. cluster = env.GetClusterID()
  1247. }
  1248. name, err := result.GetString("persistentvolume")
  1249. if err != nil {
  1250. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  1251. continue
  1252. }
  1253. // TODO niko/assets storage class
  1254. bytes := result.Values[0].Value
  1255. key := DiskIdentifier{cluster, name}
  1256. if _, ok := diskMap[key]; !ok {
  1257. diskMap[key] = &Disk{
  1258. Cluster: cluster,
  1259. Name: name,
  1260. Breakdown: &ClusterCostsBreakdown{},
  1261. }
  1262. }
  1263. diskMap[key].Bytes = bytes
  1264. }
  1265. customPricingEnabled := provider.CustomPricesEnabled(cp)
  1266. customPricingConfig, err := cp.GetConfig()
  1267. if err != nil {
  1268. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1269. }
  1270. for _, result := range resPVCost {
  1271. cluster, err := result.GetString(env.GetPromClusterLabel())
  1272. if err != nil {
  1273. cluster = env.GetClusterID()
  1274. }
  1275. name, err := result.GetString("persistentvolume")
  1276. if err != nil {
  1277. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1278. continue
  1279. }
  1280. // TODO niko/assets storage class
  1281. var cost float64
  1282. if customPricingEnabled && customPricingConfig != nil {
  1283. customPVCostStr := customPricingConfig.Storage
  1284. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1285. if err != nil {
  1286. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1287. }
  1288. cost = customPVCost
  1289. } else {
  1290. cost = result.Values[0].Value
  1291. }
  1292. key := DiskIdentifier{cluster, name}
  1293. if _, ok := diskMap[key]; !ok {
  1294. diskMap[key] = &Disk{
  1295. Cluster: cluster,
  1296. Name: name,
  1297. Breakdown: &ClusterCostsBreakdown{},
  1298. }
  1299. }
  1300. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1301. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1302. if providerID != "" {
  1303. diskMap[key].ProviderID = provider.ParsePVID(providerID)
  1304. }
  1305. }
  1306. for _, result := range resPVUsedAvg {
  1307. cluster, err := result.GetString(env.GetPromClusterLabel())
  1308. if err != nil {
  1309. cluster = env.GetClusterID()
  1310. }
  1311. claimName, err := result.GetString("persistentvolumeclaim")
  1312. if err != nil {
  1313. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1314. continue
  1315. }
  1316. claimNamespace, err := result.GetString("namespace")
  1317. if err != nil {
  1318. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1319. continue
  1320. }
  1321. var volumeName string
  1322. for _, thatRes := range resPVCInfo {
  1323. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1324. if err != nil {
  1325. thatCluster = env.GetClusterID()
  1326. }
  1327. thatVolumeName, err := thatRes.GetString("volumename")
  1328. if err != nil {
  1329. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1330. continue
  1331. }
  1332. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1333. if err != nil {
  1334. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1335. continue
  1336. }
  1337. thatClaimNamespace, err := thatRes.GetString("namespace")
  1338. if err != nil {
  1339. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1340. continue
  1341. }
  1342. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1343. volumeName = thatVolumeName
  1344. }
  1345. }
  1346. usage := result.Values[0].Value
  1347. key := DiskIdentifier{cluster, volumeName}
  1348. if _, ok := diskMap[key]; !ok {
  1349. diskMap[key] = &Disk{
  1350. Cluster: cluster,
  1351. Name: volumeName,
  1352. Breakdown: &ClusterCostsBreakdown{},
  1353. }
  1354. }
  1355. diskMap[key].BytesUsedAvgPtr = &usage
  1356. }
  1357. for _, result := range resPVUsedMax {
  1358. cluster, err := result.GetString(env.GetPromClusterLabel())
  1359. if err != nil {
  1360. cluster = env.GetClusterID()
  1361. }
  1362. claimName, err := result.GetString("persistentvolumeclaim")
  1363. if err != nil {
  1364. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1365. continue
  1366. }
  1367. claimNamespace, err := result.GetString("namespace")
  1368. if err != nil {
  1369. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1370. continue
  1371. }
  1372. var volumeName string
  1373. for _, thatRes := range resPVCInfo {
  1374. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1375. if err != nil {
  1376. thatCluster = env.GetClusterID()
  1377. }
  1378. thatVolumeName, err := thatRes.GetString("volumename")
  1379. if err != nil {
  1380. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1381. continue
  1382. }
  1383. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1384. if err != nil {
  1385. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1386. continue
  1387. }
  1388. thatClaimNamespace, err := thatRes.GetString("namespace")
  1389. if err != nil {
  1390. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1391. continue
  1392. }
  1393. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1394. volumeName = thatVolumeName
  1395. }
  1396. }
  1397. usage := result.Values[0].Value
  1398. key := DiskIdentifier{cluster, volumeName}
  1399. if _, ok := diskMap[key]; !ok {
  1400. diskMap[key] = &Disk{
  1401. Cluster: cluster,
  1402. Name: volumeName,
  1403. Breakdown: &ClusterCostsBreakdown{},
  1404. }
  1405. }
  1406. diskMap[key].BytesUsedMaxPtr = &usage
  1407. }
  1408. }
  1409. // filterOutLocalPVs removes local Persistent Volumes (PVs) from the given disk map.
  1410. // Local PVs are identified by the prefix "local-pv-" in their names, which is the
  1411. // convention used by sig-storage-local-static-provisioner.
  1412. //
  1413. // Parameters:
  1414. // - diskMap: A map of DiskIdentifier to Disk pointers, representing all PVs.
  1415. //
  1416. // Returns:
  1417. // - A new map of DiskIdentifier to Disk pointers, containing only non-local PVs.
  1418. func filterOutLocalPVs(diskMap map[DiskIdentifier]*Disk) map[DiskIdentifier]*Disk {
  1419. nonLocalPVDiskMap := map[DiskIdentifier]*Disk{}
  1420. for key, val := range diskMap {
  1421. if !strings.HasPrefix(key.Name, SIG_STORAGE_LOCAL_PROVISIONER_PREFIX) {
  1422. nonLocalPVDiskMap[key] = val
  1423. }
  1424. }
  1425. return nonLocalPVDiskMap
  1426. }