cluster.go 59 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641
  1. package costmodel
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "time"
  7. "github.com/opencost/opencost/pkg/cloud/provider"
  8. prometheus "github.com/prometheus/client_golang/api"
  9. "golang.org/x/exp/slices"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/opencost/opencost/core/pkg/opencost"
  12. "github.com/opencost/opencost/core/pkg/util/timeutil"
  13. "github.com/opencost/opencost/pkg/cloud/models"
  14. "github.com/opencost/opencost/pkg/env"
  15. "github.com/opencost/opencost/pkg/prom"
  16. )
  17. const (
  18. queryClusterCores = `sum(
  19. avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost{%s}[%s] %s)) by (node, %s) * 730 +
  20. avg(avg_over_time(node_gpu_hourly_cost{%s}[%s] %s)) by (node, %s) * 730
  21. ) by (%s)`
  22. queryClusterRAM = `sum(
  23. avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost{%s}[%s] %s)) by (node, %s) * 730
  24. ) by (%s)`
  25. queryStorage = `sum(
  26. avg(avg_over_time(pv_hourly_cost{%s}[%s] %s)) by (persistentvolume, %s) * 730
  27. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  28. ) by (%s) %s`
  29. queryTotal = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 +
  30. sum(
  31. avg(avg_over_time(pv_hourly_cost{%s}[1h])) by (persistentvolume, %s) * 730
  32. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  33. ) by (%s) %s`
  34. queryNodes = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 %s`
  35. )
  36. const MAX_LOCAL_STORAGE_SIZE = 1024 * 1024 * 1024 * 1024
  37. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  38. // are broken down by cores, memory, and storage.
  39. type ClusterCosts struct {
  40. Start *time.Time `json:"startTime"`
  41. End *time.Time `json:"endTime"`
  42. CPUCumulative float64 `json:"cpuCumulativeCost"`
  43. CPUMonthly float64 `json:"cpuMonthlyCost"`
  44. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  45. GPUCumulative float64 `json:"gpuCumulativeCost"`
  46. GPUMonthly float64 `json:"gpuMonthlyCost"`
  47. RAMCumulative float64 `json:"ramCumulativeCost"`
  48. RAMMonthly float64 `json:"ramMonthlyCost"`
  49. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  50. StorageCumulative float64 `json:"storageCumulativeCost"`
  51. StorageMonthly float64 `json:"storageMonthlyCost"`
  52. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  53. TotalCumulative float64 `json:"totalCumulativeCost"`
  54. TotalMonthly float64 `json:"totalMonthlyCost"`
  55. DataMinutes float64
  56. }
  57. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  58. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  59. type ClusterCostsBreakdown struct {
  60. Idle float64 `json:"idle"`
  61. Other float64 `json:"other"`
  62. System float64 `json:"system"`
  63. User float64 `json:"user"`
  64. }
  65. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  66. // the associated monthly rate data, and returns the Costs.
  67. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  68. start, end := timeutil.ParseTimeRange(window, offset)
  69. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  70. if dataHours == 0 {
  71. dataHours = end.Sub(start).Hours()
  72. }
  73. // Do not allow zero-length windows to prevent divide-by-zero issues
  74. if dataHours == 0 {
  75. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  76. }
  77. cc := &ClusterCosts{
  78. Start: &start,
  79. End: &end,
  80. CPUCumulative: cpu,
  81. GPUCumulative: gpu,
  82. RAMCumulative: ram,
  83. StorageCumulative: storage,
  84. TotalCumulative: cpu + gpu + ram + storage,
  85. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  86. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  87. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  88. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  89. }
  90. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  91. return cc, nil
  92. }
  93. type Disk struct {
  94. Cluster string
  95. Name string
  96. ProviderID string
  97. StorageClass string
  98. VolumeName string
  99. ClaimName string
  100. ClaimNamespace string
  101. Cost float64
  102. Bytes float64
  103. // These two fields may not be available at all times because they rely on
  104. // a new set of metrics that may or may not be available. Thus, they must
  105. // be nilable to represent the complete absence of the data.
  106. //
  107. // In other words, nilability here lets us distinguish between
  108. // "metric is not available" and "metric is available but is 0".
  109. //
  110. // They end in "Ptr" to distinguish from an earlier version in order to
  111. // ensure that all usages are checked for nil.
  112. BytesUsedAvgPtr *float64
  113. BytesUsedMaxPtr *float64
  114. Local bool
  115. Start time.Time
  116. End time.Time
  117. Minutes float64
  118. Breakdown *ClusterCostsBreakdown
  119. }
  120. type DiskIdentifier struct {
  121. Cluster string
  122. Name string
  123. }
  124. func ClusterDisks(client prometheus.Client, cp models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  125. // Start from the time "end", querying backwards
  126. t := end
  127. // minsPerResolution determines accuracy and resource use for the following
  128. // queries. Smaller values (higher resolution) result in better accuracy,
  129. // but more expensive queries, and vice-a-versa.
  130. resolution := env.GetETLResolution()
  131. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  132. var minsPerResolution int
  133. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  134. minsPerResolution = 1
  135. log.DedupedWarningf(3, "ClusterDisks(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  136. }
  137. durStr := timeutil.DurationString(end.Sub(start))
  138. if durStr == "" {
  139. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  140. }
  141. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  142. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (%s, persistentvolume,provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  143. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (%s, persistentvolume)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  144. queryActiveMins := fmt.Sprintf(`avg(kube_persistentvolume_capacity_bytes{%s}) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  145. queryPVStorageClass := fmt.Sprintf(`avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, persistentvolume, storageclass)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  146. queryPVUsedAvg := fmt.Sprintf(`avg(avg_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  147. queryPVUsedMax := fmt.Sprintf(`max(max_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  148. queryPVCInfo := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolumeclaim_info{%s}[%s])) by (%s, volumename, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  149. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  150. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  151. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  152. resChPVStorageClass := ctx.QueryAtTime(queryPVStorageClass, t)
  153. resChPVUsedAvg := ctx.QueryAtTime(queryPVUsedAvg, t)
  154. resChPVUsedMax := ctx.QueryAtTime(queryPVUsedMax, t)
  155. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, t)
  156. resPVCost, _ := resChPVCost.Await()
  157. resPVSize, _ := resChPVSize.Await()
  158. resActiveMins, _ := resChActiveMins.Await()
  159. resPVStorageClass, _ := resChPVStorageClass.Await()
  160. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  161. resPVUsedMax, _ := resChPVUsedMax.Await()
  162. resPVCInfo, _ := resChPVCInfo.Await()
  163. // Cloud providers do not always charge for a node's local disk costs (i.e.
  164. // ephemeral storage). Provide an option to opt out of calculating &
  165. // allocating local disk costs. Note, that this does not affect
  166. // PersistentVolume costs.
  167. //
  168. // Ref:
  169. // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/RootDeviceStorage.html
  170. // https://learn.microsoft.com/en-us/azure/virtual-machines/managed-disks-overview#temporary-disk
  171. // https://cloud.google.com/compute/docs/disks/local-ssd
  172. resLocalStorageCost := []*prom.QueryResult{}
  173. resLocalStorageUsedCost := []*prom.QueryResult{}
  174. resLocalStorageUsedAvg := []*prom.QueryResult{}
  175. resLocalStorageUsedMax := []*prom.QueryResult{}
  176. resLocalStorageBytes := []*prom.QueryResult{}
  177. resLocalActiveMins := []*prom.QueryResult{}
  178. if env.GetAssetIncludeLocalDiskCost() {
  179. // hourlyToCumulative is a scaling factor that, when multiplied by an
  180. // hourly value, converts it to a cumulative value; i.e. [$/hr] *
  181. // [min/res]*[hr/min] = [$/res]
  182. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  183. costPerGBHr := 0.04 / 730.0
  184. // container_fs metrics contains metrics for disks that are not local storage of the node. While not perfect to
  185. // attempt to identify the correct device which is being used as local storage we first filter for devices mounted
  186. // at paths `/dev/nvme.*` or `/dev/sda.*`. There still may be multiple devices mounted at paths matching the regex
  187. // so later on we will select the device with the highest `container_fs_limit_bytes` per instance to create a local disk asset
  188. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  189. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  190. queryLocalStorageUsedAvg := fmt.Sprintf(`avg(sum(avg_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  191. queryLocalStorageUsedMax := fmt.Sprintf(`max(sum(max_over_time(container_fs_usage_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}[%s])) by (instance, device, %s, job)) by (instance, device, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  192. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device=~"/dev/(nvme|sda).*", id="/", %s}) by (instance, device, %s)[%s:%dm])`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  193. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost{%s}) by (%s, node)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  194. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  195. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  196. resChLocalStoreageUsedAvg := ctx.QueryAtTime(queryLocalStorageUsedAvg, t)
  197. resChLocalStoreageUsedMax := ctx.QueryAtTime(queryLocalStorageUsedMax, t)
  198. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  199. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  200. resLocalStorageCost, _ = resChLocalStorageCost.Await()
  201. resLocalStorageUsedCost, _ = resChLocalStorageUsedCost.Await()
  202. resLocalStorageUsedAvg, _ = resChLocalStoreageUsedAvg.Await()
  203. resLocalStorageUsedMax, _ = resChLocalStoreageUsedMax.Await()
  204. resLocalStorageBytes, _ = resChLocalStorageBytes.Await()
  205. resLocalActiveMins, _ = resChLocalActiveMins.Await()
  206. }
  207. if ctx.HasErrors() {
  208. return nil, ctx.ErrorCollection()
  209. }
  210. diskMap := map[DiskIdentifier]*Disk{}
  211. for _, result := range resPVCInfo {
  212. cluster, err := result.GetString(env.GetPromClusterLabel())
  213. if err != nil {
  214. cluster = env.GetClusterID()
  215. }
  216. volumeName, err := result.GetString("volumename")
  217. if err != nil {
  218. log.Debugf("ClusterDisks: pv claim data missing volumename")
  219. continue
  220. }
  221. claimName, err := result.GetString("persistentvolumeclaim")
  222. if err != nil {
  223. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  224. continue
  225. }
  226. claimNamespace, err := result.GetString("namespace")
  227. if err != nil {
  228. log.Debugf("ClusterDisks: pv claim data missing namespace")
  229. continue
  230. }
  231. key := DiskIdentifier{cluster, volumeName}
  232. if _, ok := diskMap[key]; !ok {
  233. diskMap[key] = &Disk{
  234. Cluster: cluster,
  235. Name: volumeName,
  236. Breakdown: &ClusterCostsBreakdown{},
  237. }
  238. }
  239. diskMap[key].VolumeName = volumeName
  240. diskMap[key].ClaimName = claimName
  241. diskMap[key].ClaimNamespace = claimNamespace
  242. }
  243. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, cp, opencost.NewClosedWindow(start, end))
  244. type localStorage struct {
  245. device string
  246. disk *Disk
  247. }
  248. localStorageDisks := map[DiskIdentifier]localStorage{}
  249. // Start with local storage bytes so that the device with the largest size which has passed the
  250. // query filters can be determined
  251. for _, result := range resLocalStorageBytes {
  252. cluster, err := result.GetString(env.GetPromClusterLabel())
  253. if err != nil {
  254. cluster = env.GetClusterID()
  255. }
  256. name, err := result.GetString("instance")
  257. if err != nil {
  258. log.Warnf("ClusterDisks: local storage data missing instance")
  259. continue
  260. }
  261. device, err := result.GetString("device")
  262. if err != nil {
  263. log.Warnf("ClusterDisks: local storage data missing device")
  264. continue
  265. }
  266. bytes := result.Values[0].Value
  267. // Ignore disks that are larger than the max size
  268. if bytes > MAX_LOCAL_STORAGE_SIZE {
  269. continue
  270. }
  271. key := DiskIdentifier{cluster, name}
  272. // only keep the device with the most bytes per instance
  273. if current, ok := localStorageDisks[key]; !ok || current.disk.Bytes < bytes {
  274. localStorageDisks[key] = localStorage{
  275. device: device,
  276. disk: &Disk{
  277. Cluster: cluster,
  278. Name: name,
  279. Breakdown: &ClusterCostsBreakdown{},
  280. Local: true,
  281. StorageClass: opencost.LocalStorageClass,
  282. Bytes: bytes,
  283. },
  284. }
  285. }
  286. }
  287. for _, result := range resLocalStorageCost {
  288. cluster, err := result.GetString(env.GetPromClusterLabel())
  289. if err != nil {
  290. cluster = env.GetClusterID()
  291. }
  292. name, err := result.GetString("instance")
  293. if err != nil {
  294. log.Warnf("ClusterDisks: local storage data missing instance")
  295. continue
  296. }
  297. device, err := result.GetString("device")
  298. if err != nil {
  299. log.Warnf("ClusterDisks: local storage data missing device")
  300. continue
  301. }
  302. cost := result.Values[0].Value
  303. key := DiskIdentifier{cluster, name}
  304. ls, ok := localStorageDisks[key]
  305. if !ok || ls.device != device {
  306. continue
  307. }
  308. ls.disk.Cost = cost
  309. }
  310. for _, result := range resLocalStorageUsedCost {
  311. cluster, err := result.GetString(env.GetPromClusterLabel())
  312. if err != nil {
  313. cluster = env.GetClusterID()
  314. }
  315. name, err := result.GetString("instance")
  316. if err != nil {
  317. log.Warnf("ClusterDisks: local storage usage data missing instance")
  318. continue
  319. }
  320. device, err := result.GetString("device")
  321. if err != nil {
  322. log.Warnf("ClusterDisks: local storage data missing device")
  323. continue
  324. }
  325. cost := result.Values[0].Value
  326. key := DiskIdentifier{cluster, name}
  327. ls, ok := localStorageDisks[key]
  328. if !ok || ls.device != device {
  329. continue
  330. }
  331. ls.disk.Breakdown.System = cost / ls.disk.Cost
  332. }
  333. for _, result := range resLocalStorageUsedAvg {
  334. cluster, err := result.GetString(env.GetPromClusterLabel())
  335. if err != nil {
  336. cluster = env.GetClusterID()
  337. }
  338. name, err := result.GetString("instance")
  339. if err != nil {
  340. log.Warnf("ClusterDisks: local storage data missing instance")
  341. continue
  342. }
  343. device, err := result.GetString("device")
  344. if err != nil {
  345. log.Warnf("ClusterDisks: local storage data missing device")
  346. continue
  347. }
  348. bytesAvg := result.Values[0].Value
  349. key := DiskIdentifier{cluster, name}
  350. ls, ok := localStorageDisks[key]
  351. if !ok || ls.device != device {
  352. continue
  353. }
  354. ls.disk.BytesUsedAvgPtr = &bytesAvg
  355. }
  356. for _, result := range resLocalStorageUsedMax {
  357. cluster, err := result.GetString(env.GetPromClusterLabel())
  358. if err != nil {
  359. cluster = env.GetClusterID()
  360. }
  361. name, err := result.GetString("instance")
  362. if err != nil {
  363. log.Warnf("ClusterDisks: local storage data missing instance")
  364. continue
  365. }
  366. device, err := result.GetString("device")
  367. if err != nil {
  368. log.Warnf("ClusterDisks: local storage data missing device")
  369. continue
  370. }
  371. bytesMax := result.Values[0].Value
  372. key := DiskIdentifier{cluster, name}
  373. ls, ok := localStorageDisks[key]
  374. if !ok || ls.device != device {
  375. continue
  376. }
  377. ls.disk.BytesUsedMaxPtr = &bytesMax
  378. }
  379. for _, result := range resLocalActiveMins {
  380. cluster, err := result.GetString(env.GetPromClusterLabel())
  381. if err != nil {
  382. cluster = env.GetClusterID()
  383. }
  384. name, err := result.GetString("node")
  385. if err != nil {
  386. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  387. continue
  388. }
  389. providerID, err := result.GetString("provider_id")
  390. if err != nil {
  391. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  392. continue
  393. }
  394. key := DiskIdentifier{cluster, name}
  395. ls, ok := localStorageDisks[key]
  396. if !ok {
  397. continue
  398. }
  399. ls.disk.ProviderID = provider.ParseLocalDiskID(providerID)
  400. if len(result.Values) == 0 {
  401. continue
  402. }
  403. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  404. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  405. mins := e.Sub(s).Minutes()
  406. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  407. ls.disk.End = e
  408. ls.disk.Start = s
  409. ls.disk.Minutes = mins
  410. }
  411. // move local storage disks to main disk map
  412. for key, ls := range localStorageDisks {
  413. diskMap[key] = ls.disk
  414. }
  415. var unTracedDiskLogData []DiskIdentifier
  416. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  417. for _, result := range resPVStorageClass {
  418. cluster, err := result.GetString(env.GetPromClusterLabel())
  419. if err != nil {
  420. cluster = env.GetClusterID()
  421. }
  422. name, _ := result.GetString("persistentvolume")
  423. key := DiskIdentifier{cluster, name}
  424. if _, ok := diskMap[key]; !ok {
  425. if !slices.Contains(unTracedDiskLogData, key) {
  426. unTracedDiskLogData = append(unTracedDiskLogData, key)
  427. }
  428. continue
  429. }
  430. if len(result.Values) == 0 {
  431. continue
  432. }
  433. storageClass, err := result.GetString("storageclass")
  434. if err != nil {
  435. diskMap[key].StorageClass = opencost.UnknownStorageClass
  436. } else {
  437. diskMap[key].StorageClass = storageClass
  438. }
  439. }
  440. // Logging the unidentified disk information outside the loop
  441. for _, unIdentifiedDisk := range unTracedDiskLogData {
  442. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  443. }
  444. for _, disk := range diskMap {
  445. // Apply all remaining RAM to Idle
  446. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  447. // Set provider Id to the name for reconciliation
  448. if disk.ProviderID == "" {
  449. disk.ProviderID = disk.Name
  450. }
  451. }
  452. return diskMap, nil
  453. }
  454. type NodeOverhead struct {
  455. CpuOverheadFraction float64
  456. RamOverheadFraction float64
  457. }
  458. type Node struct {
  459. Cluster string
  460. Name string
  461. ProviderID string
  462. NodeType string
  463. CPUCost float64
  464. CPUCores float64
  465. GPUCost float64
  466. GPUCount float64
  467. RAMCost float64
  468. RAMBytes float64
  469. Discount float64
  470. Preemptible bool
  471. CPUBreakdown *ClusterCostsBreakdown
  472. RAMBreakdown *ClusterCostsBreakdown
  473. Start time.Time
  474. End time.Time
  475. Minutes float64
  476. Labels map[string]string
  477. CostPerCPUHr float64
  478. CostPerRAMGiBHr float64
  479. CostPerGPUHr float64
  480. Overhead *NodeOverhead
  481. }
  482. // GKE lies about the number of cores e2 nodes have. This table
  483. // contains a mapping from node type -> actual CPU cores
  484. // for those cases.
  485. var partialCPUMap = map[string]float64{
  486. "e2-micro": 0.25,
  487. "e2-small": 0.5,
  488. "e2-medium": 1.0,
  489. }
  490. type NodeIdentifier struct {
  491. Cluster string
  492. Name string
  493. ProviderID string
  494. }
  495. type nodeIdentifierNoProviderID struct {
  496. Cluster string
  497. Name string
  498. }
  499. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  500. for k, v := range activeDataMap {
  501. keyNon := nodeIdentifierNoProviderID{
  502. Cluster: k.Cluster,
  503. Name: k.Name,
  504. }
  505. if cost, ok := costMap[k]; ok {
  506. minutes := v.minutes
  507. count := 1.0
  508. if c, ok := resourceCountMap[keyNon]; ok {
  509. count = c
  510. }
  511. costMap[k] = cost * (minutes / 60) * count
  512. }
  513. }
  514. }
  515. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  516. for k, v := range activeDataMap {
  517. if cost, ok := costMap[k]; ok {
  518. minutes := v.minutes
  519. costMap[k] = cost * (minutes / 60)
  520. }
  521. }
  522. }
  523. func ClusterNodes(cp models.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  524. // Start from the time "end", querying backwards
  525. t := end
  526. // minsPerResolution determines accuracy and resource use for the following
  527. // queries. Smaller values (higher resolution) result in better accuracy,
  528. // but more expensive queries, and vice-a-versa.
  529. resolution := env.GetETLResolution()
  530. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  531. var minsPerResolution int
  532. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  533. minsPerResolution = 1
  534. log.DedupedWarningf(3, "ClusterNodes(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  535. }
  536. durStr := timeutil.DurationString(end.Sub(start))
  537. if durStr == "" {
  538. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  539. }
  540. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  541. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  542. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  543. queryNodeCPUCoresCapacity := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  544. queryNodeCPUCoresAllocatable := fmt.Sprintf(`avg(avg_over_time(kube_node_status_allocatable_cpu_cores{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  545. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  546. queryNodeRAMBytesCapacity := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  547. queryNodeRAMBytesAllocatable := fmt.Sprintf(`avg(avg_over_time(kube_node_status_allocatable_memory_bytes{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  548. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count{%s}[%s])) by (%s, node, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  549. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  550. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total{%s}[%s:%dm])) by (kubernetes_node, %s, mode)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel())
  551. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system", %s}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  552. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system", %s}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  553. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost{%s}) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  554. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot{%s}[%s:%dm])`, env.GetPromClusterFilter(), durStr, minsPerResolution)
  555. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels{%s}[%s:%dm])`, env.GetPromClusterFilter(), durStr, minsPerResolution)
  556. // Return errors if these fail
  557. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  558. resChNodeCPUCoresCapacity := requiredCtx.QueryAtTime(queryNodeCPUCoresCapacity, t)
  559. resChNodeCPUCoresAllocatable := requiredCtx.QueryAtTime(queryNodeCPUCoresAllocatable, t)
  560. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  561. resChNodeRAMBytesCapacity := requiredCtx.QueryAtTime(queryNodeRAMBytesCapacity, t)
  562. resChNodeRAMBytesAllocatable := requiredCtx.QueryAtTime(queryNodeRAMBytesAllocatable, t)
  563. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  564. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  565. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  566. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  567. // Do not return errors if these fail, but log warnings
  568. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  569. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  570. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  571. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  572. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  573. resNodeCPUCoresCapacity, _ := resChNodeCPUCoresCapacity.Await()
  574. resNodeCPUCoresAllocatable, _ := resChNodeCPUCoresAllocatable.Await()
  575. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  576. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  577. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  578. resNodeRAMBytesCapacity, _ := resChNodeRAMBytesCapacity.Await()
  579. resNodeRAMBytesAllocatable, _ := resChNodeRAMBytesAllocatable.Await()
  580. resIsSpot, _ := resChIsSpot.Await()
  581. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  582. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  583. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  584. resActiveMins, _ := resChActiveMins.Await()
  585. resLabels, _ := resChLabels.Await()
  586. if optionalCtx.HasErrors() {
  587. for _, err := range optionalCtx.Errors() {
  588. log.Warnf("ClusterNodes: %s", err)
  589. }
  590. }
  591. if requiredCtx.HasErrors() {
  592. for _, err := range requiredCtx.Errors() {
  593. log.Errorf("ClusterNodes: %s", err)
  594. }
  595. return nil, requiredCtx.ErrorCollection()
  596. }
  597. activeDataMap := buildActiveDataMap(resActiveMins, resolution, opencost.NewClosedWindow(start, end))
  598. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  599. preemptibleMap := buildPreemptibleMap(resIsSpot)
  600. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  601. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  602. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  603. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  604. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  605. cpuCoresCapacityMap := buildCPUCoresMap(resNodeCPUCoresCapacity)
  606. ramBytesCapacityMap := buildRAMBytesMap(resNodeRAMBytesCapacity)
  607. cpuCoresAllocatableMap := buildCPUCoresMap(resNodeCPUCoresAllocatable)
  608. ramBytesAllocatableMap := buildRAMBytesMap(resNodeRAMBytesAllocatable)
  609. overheadMap := buildOverheadMap(ramBytesCapacityMap, ramBytesAllocatableMap, cpuCoresCapacityMap, cpuCoresAllocatableMap)
  610. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  611. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  612. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  613. labelsMap := buildLabelsMap(resLabels)
  614. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresCapacityMap)
  615. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesCapacityMap)
  616. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  617. nodeMap := buildNodeMap(
  618. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  619. cpuCoresCapacityMap, ramBytesCapacityMap, ramUserPctMap,
  620. ramSystemPctMap,
  621. cpuBreakdownMap,
  622. activeDataMap,
  623. preemptibleMap,
  624. labelsMap,
  625. clusterAndNameToType,
  626. resolution,
  627. overheadMap,
  628. )
  629. c, err := cp.GetConfig()
  630. if err != nil {
  631. return nil, err
  632. }
  633. discount, err := ParsePercentString(c.Discount)
  634. if err != nil {
  635. return nil, err
  636. }
  637. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  638. if err != nil {
  639. return nil, err
  640. }
  641. for _, node := range nodeMap {
  642. // TODO take GKE Reserved Instances into account
  643. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  644. // Apply all remaining resources to Idle
  645. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  646. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  647. }
  648. return nodeMap, nil
  649. }
  650. type LoadBalancerIdentifier struct {
  651. Cluster string
  652. Namespace string
  653. Name string
  654. }
  655. type LoadBalancer struct {
  656. Cluster string
  657. Namespace string
  658. Name string
  659. ProviderID string
  660. Cost float64
  661. Start time.Time
  662. End time.Time
  663. Minutes float64
  664. Private bool
  665. Ip string
  666. }
  667. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  668. // Start from the time "end", querying backwards
  669. t := end
  670. // minsPerResolution determines accuracy and resource use for the following
  671. // queries. Smaller values (higher resolution) result in better accuracy,
  672. // but more expensive queries, and vice-a-versa.
  673. resolution := env.GetETLResolution()
  674. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  675. var minsPerResolution int
  676. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  677. minsPerResolution = 1
  678. log.DedupedWarningf(3, "ClusterLoadBalancers(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  679. }
  680. // Query for the duration between start and end
  681. durStr := timeutil.DurationString(end.Sub(start))
  682. if durStr == "" {
  683. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  684. }
  685. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  686. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, %s, ingress_ip)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  687. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost{%s}) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  688. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  689. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  690. resLBCost, _ := resChLBCost.Await()
  691. resActiveMins, _ := resChActiveMins.Await()
  692. if ctx.HasErrors() {
  693. return nil, ctx.ErrorCollection()
  694. }
  695. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  696. for _, result := range resActiveMins {
  697. cluster, err := result.GetString(env.GetPromClusterLabel())
  698. if err != nil {
  699. cluster = env.GetClusterID()
  700. }
  701. namespace, err := result.GetString("namespace")
  702. if err != nil {
  703. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  704. continue
  705. }
  706. name, err := result.GetString("service_name")
  707. if err != nil {
  708. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  709. continue
  710. }
  711. providerID, err := result.GetString("ingress_ip")
  712. if err != nil {
  713. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  714. providerID = ""
  715. }
  716. key := LoadBalancerIdentifier{
  717. Cluster: cluster,
  718. Namespace: namespace,
  719. Name: name,
  720. }
  721. // Skip if there are no data
  722. if len(result.Values) == 0 {
  723. continue
  724. }
  725. // Add load balancer to the set of load balancers
  726. if _, ok := loadBalancerMap[key]; !ok {
  727. loadBalancerMap[key] = &LoadBalancer{
  728. Cluster: cluster,
  729. Namespace: namespace,
  730. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  731. ProviderID: provider.ParseLBID(providerID),
  732. }
  733. }
  734. // Append start, end, and minutes. This should come before all other data.
  735. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  736. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  737. loadBalancerMap[key].Start = s
  738. loadBalancerMap[key].End = e
  739. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  740. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  741. // Prevents there from being a duplicate LoadBalancers on the same day
  742. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  743. loadBalancerMap[key].ProviderID = providerID
  744. }
  745. }
  746. for _, result := range resLBCost {
  747. cluster, err := result.GetString(env.GetPromClusterLabel())
  748. if err != nil {
  749. cluster = env.GetClusterID()
  750. }
  751. namespace, err := result.GetString("namespace")
  752. if err != nil {
  753. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  754. continue
  755. }
  756. name, err := result.GetString("service_name")
  757. if err != nil {
  758. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  759. continue
  760. }
  761. providerID, err := result.GetString("ingress_ip")
  762. if err != nil {
  763. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  764. // only update asset cost when an actual IP was returned
  765. continue
  766. }
  767. key := LoadBalancerIdentifier{
  768. Cluster: cluster,
  769. Namespace: namespace,
  770. Name: name,
  771. }
  772. // Apply cost as price-per-hour * hours
  773. if lb, ok := loadBalancerMap[key]; ok {
  774. lbPricePerHr := result.Values[0].Value
  775. // interpolate any missing data
  776. resultMins := lb.Minutes
  777. if resultMins > 0 {
  778. scaleFactor := (resultMins + resolution.Minutes()) / resultMins
  779. hrs := (lb.Minutes * scaleFactor) / 60.0
  780. lb.Cost += lbPricePerHr * hrs
  781. } else {
  782. log.DedupedWarningf(20, "ClusterLoadBalancers: found zero minutes for key: %v", key)
  783. }
  784. if lb.Ip != "" && lb.Ip != providerID {
  785. log.DedupedWarningf(5, "ClusterLoadBalancers: multiple IPs per load balancer not supported, using most recent IP")
  786. }
  787. lb.Ip = providerID
  788. lb.Private = privateIPCheck(providerID)
  789. } else {
  790. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %v", key)
  791. }
  792. }
  793. return loadBalancerMap, nil
  794. }
  795. // Check if an ip is private.
  796. func privateIPCheck(ip string) bool {
  797. ipAddress := net.ParseIP(ip)
  798. return ipAddress.IsPrivate()
  799. }
  800. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  801. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider models.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  802. if window < 10*time.Minute {
  803. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  804. }
  805. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  806. start, end := timeutil.ParseTimeRange(window, offset)
  807. mins := end.Sub(start).Minutes()
  808. // minsPerResolution determines accuracy and resource use for the following
  809. // queries. Smaller values (higher resolution) result in better accuracy,
  810. // but more expensive queries, and vice-a-versa.
  811. resolution := env.GetETLResolution()
  812. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  813. var minsPerResolution int
  814. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) < 1 {
  815. minsPerResolution = 1
  816. log.DedupedWarningf(3, "ComputeClusterCosts(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  817. }
  818. windowStr := timeutil.DurationString(window)
  819. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  820. // value, converts it to a cumulative value; i.e.
  821. // [$/hr] * [min/res]*[hr/min] = [$/res]
  822. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  823. const fmtQueryDataCount = `
  824. count_over_time(sum(kube_node_status_capacity_cpu_cores{%s}) by (%s)[%s:%dm]%s) * %d
  825. `
  826. const fmtQueryTotalGPU = `
  827. sum(
  828. sum_over_time(node_gpu_hourly_cost{%s}[%s:%dm]%s) * %f
  829. ) by (%s)
  830. `
  831. const fmtQueryTotalCPU = `
  832. sum(
  833. sum_over_time(avg(kube_node_status_capacity_cpu_cores{%s}) by (node, %s)[%s:%dm]%s) *
  834. avg(avg_over_time(node_cpu_hourly_cost{%s}[%s:%dm]%s)) by (node, %s) * %f
  835. ) by (%s)
  836. `
  837. const fmtQueryTotalRAM = `
  838. sum(
  839. sum_over_time(avg(kube_node_status_capacity_memory_bytes{%s}) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  840. avg(avg_over_time(node_ram_hourly_cost{%s}[%s:%dm]%s)) by (node, %s) * %f
  841. ) by (%s)
  842. `
  843. const fmtQueryTotalStorage = `
  844. sum(
  845. sum_over_time(avg(kube_persistentvolume_capacity_bytes{%s}) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  846. avg(avg_over_time(pv_hourly_cost{%s}[%s:%dm]%s)) by (persistentvolume, %s) * %f
  847. ) by (%s)
  848. `
  849. const fmtQueryCPUModePct = `
  850. sum(rate(node_cpu_seconds_total{%s}[%s]%s)) by (%s, mode) / ignoring(mode)
  851. group_left sum(rate(node_cpu_seconds_total{%s}[%s]%s)) by (%s)
  852. `
  853. const fmtQueryRAMSystemPct = `
  854. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system", %s}[%s:%dm]%s)) by (%s)
  855. / sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm]%s)) by (%s)
  856. `
  857. const fmtQueryRAMUserPct = `
  858. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes{%s}[%s:%dm]%s)) by (%s)
  859. / sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm]%s)) by (%s)
  860. `
  861. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  862. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  863. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  864. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  865. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  866. if queryTotalLocalStorage != "" {
  867. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  868. }
  869. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  870. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  871. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  872. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  873. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  874. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  875. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  876. resChs := ctx.QueryAll(
  877. queryDataCount,
  878. queryTotalGPU,
  879. queryTotalCPU,
  880. queryTotalRAM,
  881. queryTotalStorage,
  882. )
  883. // Only submit the local storage query if it is valid. Otherwise Prometheus
  884. // will return errors. Always append something to resChs, regardless, to
  885. // maintain indexing.
  886. if queryTotalLocalStorage != "" {
  887. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  888. } else {
  889. resChs = append(resChs, nil)
  890. }
  891. if withBreakdown {
  892. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, env.GetPromClusterFilter(), windowStr, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, fmtOffset, env.GetPromClusterLabel())
  893. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  894. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  895. bdResChs := ctx.QueryAll(
  896. queryCPUModePct,
  897. queryRAMSystemPct,
  898. queryRAMUserPct,
  899. )
  900. // Only submit the local storage query if it is valid. Otherwise Prometheus
  901. // will return errors. Always append something to resChs, regardless, to
  902. // maintain indexing.
  903. if queryUsedLocalStorage != "" {
  904. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  905. } else {
  906. bdResChs = append(bdResChs, nil)
  907. }
  908. resChs = append(resChs, bdResChs...)
  909. }
  910. resDataCount, _ := resChs[0].Await()
  911. resTotalGPU, _ := resChs[1].Await()
  912. resTotalCPU, _ := resChs[2].Await()
  913. resTotalRAM, _ := resChs[3].Await()
  914. resTotalStorage, _ := resChs[4].Await()
  915. if ctx.HasErrors() {
  916. return nil, ctx.ErrorCollection()
  917. }
  918. defaultClusterID := env.GetClusterID()
  919. dataMinsByCluster := map[string]float64{}
  920. for _, result := range resDataCount {
  921. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  922. if clusterID == "" {
  923. clusterID = defaultClusterID
  924. }
  925. dataMins := mins
  926. if len(result.Values) > 0 {
  927. dataMins = result.Values[0].Value
  928. } else {
  929. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  930. }
  931. dataMinsByCluster[clusterID] = dataMins
  932. }
  933. // Determine combined discount
  934. discount, customDiscount := 0.0, 0.0
  935. c, err := a.CloudProvider.GetConfig()
  936. if err == nil {
  937. discount, err = ParsePercentString(c.Discount)
  938. if err != nil {
  939. discount = 0.0
  940. }
  941. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  942. if err != nil {
  943. customDiscount = 0.0
  944. }
  945. }
  946. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  947. costData := make(map[string]map[string]float64)
  948. // Helper function to iterate over Prom query results, parsing the raw values into
  949. // the intermediate costData structure.
  950. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  951. for _, result := range results {
  952. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  953. if clusterID == "" {
  954. clusterID = defaultClusterID
  955. }
  956. if _, ok := costData[clusterID]; !ok {
  957. costData[clusterID] = map[string]float64{}
  958. }
  959. if len(result.Values) > 0 {
  960. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  961. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  962. }
  963. }
  964. }
  965. // Apply both sustained use and custom discounts to RAM and CPU
  966. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  967. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  968. // Apply only custom discount to GPU and storage
  969. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  970. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  971. if queryTotalLocalStorage != "" {
  972. resTotalLocalStorage, err := resChs[5].Await()
  973. if err != nil {
  974. return nil, err
  975. }
  976. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  977. }
  978. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  979. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  980. pvUsedCostMap := map[string]float64{}
  981. if withBreakdown {
  982. resCPUModePct, _ := resChs[6].Await()
  983. resRAMSystemPct, _ := resChs[7].Await()
  984. resRAMUserPct, _ := resChs[8].Await()
  985. if ctx.HasErrors() {
  986. return nil, ctx.ErrorCollection()
  987. }
  988. for _, result := range resCPUModePct {
  989. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  990. if clusterID == "" {
  991. clusterID = defaultClusterID
  992. }
  993. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  994. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  995. }
  996. cpuBD := cpuBreakdownMap[clusterID]
  997. mode, err := result.GetString("mode")
  998. if err != nil {
  999. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  1000. mode = "other"
  1001. }
  1002. switch mode {
  1003. case "idle":
  1004. cpuBD.Idle += result.Values[0].Value
  1005. case "system":
  1006. cpuBD.System += result.Values[0].Value
  1007. case "user":
  1008. cpuBD.User += result.Values[0].Value
  1009. default:
  1010. cpuBD.Other += result.Values[0].Value
  1011. }
  1012. }
  1013. for _, result := range resRAMSystemPct {
  1014. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1015. if clusterID == "" {
  1016. clusterID = defaultClusterID
  1017. }
  1018. if _, ok := ramBreakdownMap[clusterID]; !ok {
  1019. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  1020. }
  1021. ramBD := ramBreakdownMap[clusterID]
  1022. ramBD.System += result.Values[0].Value
  1023. }
  1024. for _, result := range resRAMUserPct {
  1025. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1026. if clusterID == "" {
  1027. clusterID = defaultClusterID
  1028. }
  1029. if _, ok := ramBreakdownMap[clusterID]; !ok {
  1030. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  1031. }
  1032. ramBD := ramBreakdownMap[clusterID]
  1033. ramBD.User += result.Values[0].Value
  1034. }
  1035. for _, ramBD := range ramBreakdownMap {
  1036. remaining := 1.0
  1037. remaining -= ramBD.Other
  1038. remaining -= ramBD.System
  1039. remaining -= ramBD.User
  1040. ramBD.Idle = remaining
  1041. }
  1042. if queryUsedLocalStorage != "" {
  1043. resUsedLocalStorage, err := resChs[9].Await()
  1044. if err != nil {
  1045. return nil, err
  1046. }
  1047. for _, result := range resUsedLocalStorage {
  1048. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1049. if clusterID == "" {
  1050. clusterID = defaultClusterID
  1051. }
  1052. pvUsedCostMap[clusterID] += result.Values[0].Value
  1053. }
  1054. }
  1055. }
  1056. if ctx.HasErrors() {
  1057. for _, err := range ctx.Errors() {
  1058. log.Errorf("ComputeClusterCosts: %s", err)
  1059. }
  1060. return nil, ctx.ErrorCollection()
  1061. }
  1062. // Convert intermediate structure to Costs instances
  1063. costsByCluster := map[string]*ClusterCosts{}
  1064. for id, cd := range costData {
  1065. dataMins, ok := dataMinsByCluster[id]
  1066. if !ok {
  1067. dataMins = mins
  1068. log.Warnf("Cluster cost data count not found for cluster %s", id)
  1069. }
  1070. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  1071. if err != nil {
  1072. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  1073. return nil, err
  1074. }
  1075. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  1076. costs.CPUBreakdown = cpuBD
  1077. }
  1078. if ramBD, ok := ramBreakdownMap[id]; ok {
  1079. costs.RAMBreakdown = ramBD
  1080. }
  1081. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  1082. if pvUC, ok := pvUsedCostMap[id]; ok {
  1083. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  1084. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  1085. }
  1086. costs.DataMinutes = dataMins
  1087. costsByCluster[id] = costs
  1088. }
  1089. return costsByCluster, nil
  1090. }
  1091. type Totals struct {
  1092. TotalCost [][]string `json:"totalcost"`
  1093. CPUCost [][]string `json:"cpucost"`
  1094. MemCost [][]string `json:"memcost"`
  1095. StorageCost [][]string `json:"storageCost"`
  1096. }
  1097. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  1098. if len(qrs) == 0 {
  1099. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  1100. }
  1101. result := qrs[0]
  1102. totals := [][]string{}
  1103. for _, value := range result.Values {
  1104. d0 := fmt.Sprintf("%f", value.Timestamp)
  1105. d1 := fmt.Sprintf("%f", value.Value)
  1106. toAppend := []string{
  1107. d0,
  1108. d1,
  1109. }
  1110. totals = append(totals, toAppend)
  1111. }
  1112. return totals, nil
  1113. }
  1114. // ClusterCostsOverTime gives the full cluster costs over time
  1115. func ClusterCostsOverTime(cli prometheus.Client, provider models.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  1116. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  1117. if localStorageQuery != "" {
  1118. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  1119. }
  1120. layout := "2006-01-02T15:04:05.000Z"
  1121. start, err := time.Parse(layout, startString)
  1122. if err != nil {
  1123. log.Errorf("Error parsing time %s. Error: %s", startString, err.Error())
  1124. return nil, err
  1125. }
  1126. end, err := time.Parse(layout, endString)
  1127. if err != nil {
  1128. log.Errorf("Error parsing time %s. Error: %s", endString, err.Error())
  1129. return nil, err
  1130. }
  1131. fmtWindow := timeutil.DurationString(window)
  1132. if fmtWindow == "" {
  1133. err := fmt.Errorf("window value invalid or missing")
  1134. log.Errorf("Error parsing time %v. Error: %s", window, err.Error())
  1135. return nil, err
  1136. }
  1137. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  1138. qCores := fmt.Sprintf(queryClusterCores, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1139. qRAM := fmt.Sprintf(queryClusterRAM, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1140. qStorage := fmt.Sprintf(queryStorage, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1141. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1142. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  1143. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  1144. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  1145. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  1146. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  1147. resultClusterCores, err := resChClusterCores.Await()
  1148. if err != nil {
  1149. return nil, err
  1150. }
  1151. resultClusterRAM, err := resChClusterRAM.Await()
  1152. if err != nil {
  1153. return nil, err
  1154. }
  1155. resultStorage, err := resChStorage.Await()
  1156. if err != nil {
  1157. return nil, err
  1158. }
  1159. resultTotal, err := resChTotal.Await()
  1160. if err != nil {
  1161. return nil, err
  1162. }
  1163. coreTotal, err := resultToTotals(resultClusterCores)
  1164. if err != nil {
  1165. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  1166. return nil, err
  1167. }
  1168. ramTotal, err := resultToTotals(resultClusterRAM)
  1169. if err != nil {
  1170. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  1171. return nil, err
  1172. }
  1173. storageTotal, err := resultToTotals(resultStorage)
  1174. if err != nil {
  1175. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  1176. }
  1177. clusterTotal, err := resultToTotals(resultTotal)
  1178. if err != nil {
  1179. // If clusterTotal query failed, it's likely because there are no PVs, which
  1180. // causes the qTotal query to return no data. Instead, query only node costs.
  1181. // If that fails, return an error because something is actually wrong.
  1182. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterFilter(), env.GetPromClusterLabel(), localStorageQuery)
  1183. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  1184. for _, warning := range warnings {
  1185. log.Warnf(warning)
  1186. }
  1187. if err != nil {
  1188. return nil, err
  1189. }
  1190. clusterTotal, err = resultToTotals(resultNodes)
  1191. if err != nil {
  1192. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  1193. return nil, err
  1194. }
  1195. }
  1196. return &Totals{
  1197. TotalCost: clusterTotal,
  1198. CPUCost: coreTotal,
  1199. MemCost: ramTotal,
  1200. StorageCost: storageTotal,
  1201. }, nil
  1202. }
  1203. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo []*prom.QueryResult, cp models.Provider, window opencost.Window) {
  1204. for _, result := range resActiveMins {
  1205. cluster, err := result.GetString(env.GetPromClusterLabel())
  1206. if err != nil {
  1207. cluster = env.GetClusterID()
  1208. }
  1209. name, err := result.GetString("persistentvolume")
  1210. if err != nil {
  1211. log.Warnf("ClusterDisks: active mins missing pv name")
  1212. continue
  1213. }
  1214. if len(result.Values) == 0 {
  1215. continue
  1216. }
  1217. key := DiskIdentifier{cluster, name}
  1218. if _, ok := diskMap[key]; !ok {
  1219. diskMap[key] = &Disk{
  1220. Cluster: cluster,
  1221. Name: name,
  1222. Breakdown: &ClusterCostsBreakdown{},
  1223. }
  1224. }
  1225. s, e := calculateStartAndEnd(result, resolution, window)
  1226. mins := e.Sub(s).Minutes()
  1227. diskMap[key].End = e
  1228. diskMap[key].Start = s
  1229. diskMap[key].Minutes = mins
  1230. }
  1231. for _, result := range resPVSize {
  1232. cluster, err := result.GetString(env.GetPromClusterLabel())
  1233. if err != nil {
  1234. cluster = env.GetClusterID()
  1235. }
  1236. name, err := result.GetString("persistentvolume")
  1237. if err != nil {
  1238. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  1239. continue
  1240. }
  1241. // TODO niko/assets storage class
  1242. bytes := result.Values[0].Value
  1243. key := DiskIdentifier{cluster, name}
  1244. if _, ok := diskMap[key]; !ok {
  1245. diskMap[key] = &Disk{
  1246. Cluster: cluster,
  1247. Name: name,
  1248. Breakdown: &ClusterCostsBreakdown{},
  1249. }
  1250. }
  1251. diskMap[key].Bytes = bytes
  1252. }
  1253. customPricingEnabled := provider.CustomPricesEnabled(cp)
  1254. customPricingConfig, err := cp.GetConfig()
  1255. if err != nil {
  1256. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1257. }
  1258. for _, result := range resPVCost {
  1259. cluster, err := result.GetString(env.GetPromClusterLabel())
  1260. if err != nil {
  1261. cluster = env.GetClusterID()
  1262. }
  1263. name, err := result.GetString("persistentvolume")
  1264. if err != nil {
  1265. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1266. continue
  1267. }
  1268. // TODO niko/assets storage class
  1269. var cost float64
  1270. if customPricingEnabled && customPricingConfig != nil {
  1271. customPVCostStr := customPricingConfig.Storage
  1272. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1273. if err != nil {
  1274. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1275. }
  1276. cost = customPVCost
  1277. } else {
  1278. cost = result.Values[0].Value
  1279. }
  1280. key := DiskIdentifier{cluster, name}
  1281. if _, ok := diskMap[key]; !ok {
  1282. diskMap[key] = &Disk{
  1283. Cluster: cluster,
  1284. Name: name,
  1285. Breakdown: &ClusterCostsBreakdown{},
  1286. }
  1287. }
  1288. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1289. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1290. if providerID != "" {
  1291. diskMap[key].ProviderID = provider.ParsePVID(providerID)
  1292. }
  1293. }
  1294. for _, result := range resPVUsedAvg {
  1295. cluster, err := result.GetString(env.GetPromClusterLabel())
  1296. if err != nil {
  1297. cluster = env.GetClusterID()
  1298. }
  1299. claimName, err := result.GetString("persistentvolumeclaim")
  1300. if err != nil {
  1301. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1302. continue
  1303. }
  1304. claimNamespace, err := result.GetString("namespace")
  1305. if err != nil {
  1306. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1307. continue
  1308. }
  1309. var volumeName string
  1310. for _, thatRes := range resPVCInfo {
  1311. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1312. if err != nil {
  1313. thatCluster = env.GetClusterID()
  1314. }
  1315. thatVolumeName, err := thatRes.GetString("volumename")
  1316. if err != nil {
  1317. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1318. continue
  1319. }
  1320. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1321. if err != nil {
  1322. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1323. continue
  1324. }
  1325. thatClaimNamespace, err := thatRes.GetString("namespace")
  1326. if err != nil {
  1327. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1328. continue
  1329. }
  1330. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1331. volumeName = thatVolumeName
  1332. }
  1333. }
  1334. usage := result.Values[0].Value
  1335. key := DiskIdentifier{cluster, volumeName}
  1336. if _, ok := diskMap[key]; !ok {
  1337. diskMap[key] = &Disk{
  1338. Cluster: cluster,
  1339. Name: volumeName,
  1340. Breakdown: &ClusterCostsBreakdown{},
  1341. }
  1342. }
  1343. diskMap[key].BytesUsedAvgPtr = &usage
  1344. }
  1345. for _, result := range resPVUsedMax {
  1346. cluster, err := result.GetString(env.GetPromClusterLabel())
  1347. if err != nil {
  1348. cluster = env.GetClusterID()
  1349. }
  1350. claimName, err := result.GetString("persistentvolumeclaim")
  1351. if err != nil {
  1352. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1353. continue
  1354. }
  1355. claimNamespace, err := result.GetString("namespace")
  1356. if err != nil {
  1357. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1358. continue
  1359. }
  1360. var volumeName string
  1361. for _, thatRes := range resPVCInfo {
  1362. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1363. if err != nil {
  1364. thatCluster = env.GetClusterID()
  1365. }
  1366. thatVolumeName, err := thatRes.GetString("volumename")
  1367. if err != nil {
  1368. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1369. continue
  1370. }
  1371. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1372. if err != nil {
  1373. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1374. continue
  1375. }
  1376. thatClaimNamespace, err := thatRes.GetString("namespace")
  1377. if err != nil {
  1378. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1379. continue
  1380. }
  1381. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1382. volumeName = thatVolumeName
  1383. }
  1384. }
  1385. usage := result.Values[0].Value
  1386. key := DiskIdentifier{cluster, volumeName}
  1387. if _, ok := diskMap[key]; !ok {
  1388. diskMap[key] = &Disk{
  1389. Cluster: cluster,
  1390. Name: volumeName,
  1391. Breakdown: &ClusterCostsBreakdown{},
  1392. }
  1393. }
  1394. diskMap[key].BytesUsedMaxPtr = &usage
  1395. }
  1396. }