cluster.go 58 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596
  1. package costmodel
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "time"
  7. "github.com/opencost/opencost/pkg/cloud/provider"
  8. prometheus "github.com/prometheus/client_golang/api"
  9. "golang.org/x/exp/slices"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/opencost/opencost/core/pkg/opencost"
  12. "github.com/opencost/opencost/core/pkg/util/timeutil"
  13. "github.com/opencost/opencost/pkg/cloud/models"
  14. "github.com/opencost/opencost/pkg/env"
  15. "github.com/opencost/opencost/pkg/prom"
  16. )
  17. const (
  18. queryClusterCores = `sum(
  19. avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost{%s}[%s] %s)) by (node, %s) * 730 +
  20. avg(avg_over_time(node_gpu_hourly_cost{%s}[%s] %s)) by (node, %s) * 730
  21. ) by (%s)`
  22. queryClusterRAM = `sum(
  23. avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost{%s}[%s] %s)) by (node, %s) * 730
  24. ) by (%s)`
  25. queryStorage = `sum(
  26. avg(avg_over_time(pv_hourly_cost{%s}[%s] %s)) by (persistentvolume, %s) * 730
  27. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  28. ) by (%s) %s`
  29. queryTotal = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 +
  30. sum(
  31. avg(avg_over_time(pv_hourly_cost{%s}[1h])) by (persistentvolume, %s) * 730
  32. * avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  33. ) by (%s) %s`
  34. queryNodes = `sum(avg(node_total_hourly_cost{%s}) by (node, %s)) * 730 %s`
  35. )
  36. const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
  37. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  38. // are broken down by cores, memory, and storage.
  39. type ClusterCosts struct {
  40. Start *time.Time `json:"startTime"`
  41. End *time.Time `json:"endTime"`
  42. CPUCumulative float64 `json:"cpuCumulativeCost"`
  43. CPUMonthly float64 `json:"cpuMonthlyCost"`
  44. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  45. GPUCumulative float64 `json:"gpuCumulativeCost"`
  46. GPUMonthly float64 `json:"gpuMonthlyCost"`
  47. RAMCumulative float64 `json:"ramCumulativeCost"`
  48. RAMMonthly float64 `json:"ramMonthlyCost"`
  49. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  50. StorageCumulative float64 `json:"storageCumulativeCost"`
  51. StorageMonthly float64 `json:"storageMonthlyCost"`
  52. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  53. TotalCumulative float64 `json:"totalCumulativeCost"`
  54. TotalMonthly float64 `json:"totalMonthlyCost"`
  55. DataMinutes float64
  56. }
  57. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  58. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  59. type ClusterCostsBreakdown struct {
  60. Idle float64 `json:"idle"`
  61. Other float64 `json:"other"`
  62. System float64 `json:"system"`
  63. User float64 `json:"user"`
  64. }
  65. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  66. // the associated monthly rate data, and returns the Costs.
  67. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  68. start, end := timeutil.ParseTimeRange(window, offset)
  69. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  70. if dataHours == 0 {
  71. dataHours = end.Sub(start).Hours()
  72. }
  73. // Do not allow zero-length windows to prevent divide-by-zero issues
  74. if dataHours == 0 {
  75. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  76. }
  77. cc := &ClusterCosts{
  78. Start: &start,
  79. End: &end,
  80. CPUCumulative: cpu,
  81. GPUCumulative: gpu,
  82. RAMCumulative: ram,
  83. StorageCumulative: storage,
  84. TotalCumulative: cpu + gpu + ram + storage,
  85. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  86. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  87. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  88. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  89. }
  90. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  91. return cc, nil
  92. }
  93. type Disk struct {
  94. Cluster string
  95. Name string
  96. ProviderID string
  97. StorageClass string
  98. VolumeName string
  99. ClaimName string
  100. ClaimNamespace string
  101. Cost float64
  102. Bytes float64
  103. // These two fields may not be available at all times because they rely on
  104. // a new set of metrics that may or may not be available. Thus, they must
  105. // be nilable to represent the complete absence of the data.
  106. //
  107. // In other words, nilability here lets us distinguish between
  108. // "metric is not available" and "metric is available but is 0".
  109. //
  110. // They end in "Ptr" to distinguish from an earlier version in order to
  111. // ensure that all usages are checked for nil.
  112. BytesUsedAvgPtr *float64
  113. BytesUsedMaxPtr *float64
  114. Local bool
  115. Start time.Time
  116. End time.Time
  117. Minutes float64
  118. Breakdown *ClusterCostsBreakdown
  119. }
  120. type DiskIdentifier struct {
  121. Cluster string
  122. Name string
  123. }
  124. func ClusterDisks(client prometheus.Client, provider models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  125. // Start from the time "end", querying backwards
  126. t := end
  127. // minsPerResolution determines accuracy and resource use for the following
  128. // queries. Smaller values (higher resolution) result in better accuracy,
  129. // but more expensive queries, and vice-a-versa.
  130. resolution := env.GetETLResolution()
  131. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  132. var minsPerResolution int
  133. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  134. minsPerResolution = 1
  135. log.DedupedWarningf(3, "ClusterDisks(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  136. }
  137. durStr := timeutil.DurationString(end.Sub(start))
  138. if durStr == "" {
  139. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  140. }
  141. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  142. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (%s, persistentvolume,provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  143. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (%s, persistentvolume)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  144. queryActiveMins := fmt.Sprintf(`avg(kube_persistentvolume_capacity_bytes{%s}) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  145. queryPVStorageClass := fmt.Sprintf(`avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, persistentvolume, storageclass)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  146. queryPVUsedAvg := fmt.Sprintf(`avg(avg_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  147. queryPVUsedMax := fmt.Sprintf(`max(max_over_time(kubelet_volume_stats_used_bytes{%s}[%s])) by (%s, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  148. queryPVCInfo := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolumeclaim_info{%s}[%s])) by (%s, volumename, persistentvolumeclaim, namespace)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  149. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  150. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  151. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  152. resChPVStorageClass := ctx.QueryAtTime(queryPVStorageClass, t)
  153. resChPVUsedAvg := ctx.QueryAtTime(queryPVUsedAvg, t)
  154. resChPVUsedMax := ctx.QueryAtTime(queryPVUsedMax, t)
  155. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, t)
  156. resPVCost, _ := resChPVCost.Await()
  157. resPVSize, _ := resChPVSize.Await()
  158. resActiveMins, _ := resChActiveMins.Await()
  159. resPVStorageClass, _ := resChPVStorageClass.Await()
  160. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  161. resPVUsedMax, _ := resChPVUsedMax.Await()
  162. resPVCInfo, _ := resChPVCInfo.Await()
  163. // Cloud providers do not always charge for a node's local disk costs (i.e.
  164. // ephemeral storage). Provide an option to opt out of calculating &
  165. // allocating local disk costs. Note, that this does not affect
  166. // PersistentVolume costs.
  167. //
  168. // Ref:
  169. // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/RootDeviceStorage.html
  170. // https://learn.microsoft.com/en-us/azure/virtual-machines/managed-disks-overview#temporary-disk
  171. // https://cloud.google.com/compute/docs/disks/local-ssd
  172. resLocalStorageCost := []*prom.QueryResult{}
  173. resLocalStorageUsedCost := []*prom.QueryResult{}
  174. resLocalStorageUsedAvg := []*prom.QueryResult{}
  175. resLocalStorageUsedMax := []*prom.QueryResult{}
  176. resLocalStorageBytes := []*prom.QueryResult{}
  177. resLocalActiveMins := []*prom.QueryResult{}
  178. if env.GetAssetIncludeLocalDiskCost() {
  179. // hourlyToCumulative is a scaling factor that, when multiplied by an
  180. // hourly value, converts it to a cumulative value; i.e. [$/hr] *
  181. // [min/res]*[hr/min] = [$/res]
  182. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  183. costPerGBHr := 0.04 / 730.0
  184. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/", %s}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  185. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/", %s}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  186. queryLocalStorageUsedAvg := fmt.Sprintf(`avg(sum(avg_over_time(container_fs_usage_bytes{device!="tmpfs", id="/", %s}[%s])) by (instance, %s, job)) by (instance, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  187. queryLocalStorageUsedMax := fmt.Sprintf(`max(sum(max_over_time(container_fs_usage_bytes{device!="tmpfs", id="/", %s}[%s])) by (instance, %s, job)) by (instance, %s)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  188. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/", %s}) by (instance, %s)[%s:%dm])`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  189. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost{%s}) by (%s, node)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  190. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  191. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  192. resChLocalStoreageUsedAvg := ctx.QueryAtTime(queryLocalStorageUsedAvg, t)
  193. resChLocalStoreageUsedMax := ctx.QueryAtTime(queryLocalStorageUsedMax, t)
  194. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  195. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  196. resLocalStorageCost, _ = resChLocalStorageCost.Await()
  197. resLocalStorageUsedCost, _ = resChLocalStorageUsedCost.Await()
  198. resLocalStorageUsedAvg, _ = resChLocalStoreageUsedAvg.Await()
  199. resLocalStorageUsedMax, _ = resChLocalStoreageUsedMax.Await()
  200. resLocalStorageBytes, _ = resChLocalStorageBytes.Await()
  201. resLocalActiveMins, _ = resChLocalActiveMins.Await()
  202. }
  203. if ctx.HasErrors() {
  204. return nil, ctx.ErrorCollection()
  205. }
  206. diskMap := map[DiskIdentifier]*Disk{}
  207. for _, result := range resPVCInfo {
  208. cluster, err := result.GetString(env.GetPromClusterLabel())
  209. if err != nil {
  210. cluster = env.GetClusterID()
  211. }
  212. volumeName, err := result.GetString("volumename")
  213. if err != nil {
  214. log.Debugf("ClusterDisks: pv claim data missing volumename")
  215. continue
  216. }
  217. claimName, err := result.GetString("persistentvolumeclaim")
  218. if err != nil {
  219. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  220. continue
  221. }
  222. claimNamespace, err := result.GetString("namespace")
  223. if err != nil {
  224. log.Debugf("ClusterDisks: pv claim data missing namespace")
  225. continue
  226. }
  227. key := DiskIdentifier{cluster, volumeName}
  228. if _, ok := diskMap[key]; !ok {
  229. diskMap[key] = &Disk{
  230. Cluster: cluster,
  231. Name: volumeName,
  232. Breakdown: &ClusterCostsBreakdown{},
  233. }
  234. }
  235. diskMap[key].VolumeName = volumeName
  236. diskMap[key].ClaimName = claimName
  237. diskMap[key].ClaimNamespace = claimNamespace
  238. }
  239. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, provider, opencost.NewClosedWindow(start, end))
  240. for _, result := range resLocalStorageCost {
  241. cluster, err := result.GetString(env.GetPromClusterLabel())
  242. if err != nil {
  243. cluster = env.GetClusterID()
  244. }
  245. name, err := result.GetString("instance")
  246. if err != nil {
  247. log.Warnf("ClusterDisks: local storage data missing instance")
  248. continue
  249. }
  250. cost := result.Values[0].Value
  251. key := DiskIdentifier{cluster, name}
  252. if _, ok := diskMap[key]; !ok {
  253. diskMap[key] = &Disk{
  254. Cluster: cluster,
  255. Name: name,
  256. Breakdown: &ClusterCostsBreakdown{},
  257. Local: true,
  258. }
  259. }
  260. diskMap[key].Cost += cost
  261. //Assigning explicitly the storage class of local storage to local
  262. diskMap[key].StorageClass = opencost.LocalStorageClass
  263. }
  264. for _, result := range resLocalStorageUsedCost {
  265. cluster, err := result.GetString(env.GetPromClusterLabel())
  266. if err != nil {
  267. cluster = env.GetClusterID()
  268. }
  269. name, err := result.GetString("instance")
  270. if err != nil {
  271. log.Warnf("ClusterDisks: local storage usage data missing instance")
  272. continue
  273. }
  274. cost := result.Values[0].Value
  275. key := DiskIdentifier{cluster, name}
  276. if _, ok := diskMap[key]; !ok {
  277. diskMap[key] = &Disk{
  278. Cluster: cluster,
  279. Name: name,
  280. Breakdown: &ClusterCostsBreakdown{},
  281. Local: true,
  282. }
  283. }
  284. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  285. }
  286. for _, result := range resLocalStorageUsedAvg {
  287. cluster, err := result.GetString(env.GetPromClusterLabel())
  288. if err != nil {
  289. cluster = env.GetClusterID()
  290. }
  291. name, err := result.GetString("instance")
  292. if err != nil {
  293. log.Warnf("ClusterDisks: local storage data missing instance")
  294. continue
  295. }
  296. bytesAvg := result.Values[0].Value
  297. key := DiskIdentifier{cluster, name}
  298. if _, ok := diskMap[key]; !ok {
  299. diskMap[key] = &Disk{
  300. Cluster: cluster,
  301. Name: name,
  302. Breakdown: &ClusterCostsBreakdown{},
  303. Local: true,
  304. }
  305. }
  306. diskMap[key].BytesUsedAvgPtr = &bytesAvg
  307. }
  308. for _, result := range resLocalStorageUsedMax {
  309. cluster, err := result.GetString(env.GetPromClusterLabel())
  310. if err != nil {
  311. cluster = env.GetClusterID()
  312. }
  313. name, err := result.GetString("instance")
  314. if err != nil {
  315. log.Warnf("ClusterDisks: local storage data missing instance")
  316. continue
  317. }
  318. bytesMax := result.Values[0].Value
  319. key := DiskIdentifier{cluster, name}
  320. if _, ok := diskMap[key]; !ok {
  321. diskMap[key] = &Disk{
  322. Cluster: cluster,
  323. Name: name,
  324. Breakdown: &ClusterCostsBreakdown{},
  325. Local: true,
  326. }
  327. }
  328. diskMap[key].BytesUsedMaxPtr = &bytesMax
  329. }
  330. for _, result := range resLocalStorageBytes {
  331. cluster, err := result.GetString(env.GetPromClusterLabel())
  332. if err != nil {
  333. cluster = env.GetClusterID()
  334. }
  335. name, err := result.GetString("instance")
  336. if err != nil {
  337. log.Warnf("ClusterDisks: local storage data missing instance")
  338. continue
  339. }
  340. bytes := result.Values[0].Value
  341. key := DiskIdentifier{cluster, name}
  342. if _, ok := diskMap[key]; !ok {
  343. diskMap[key] = &Disk{
  344. Cluster: cluster,
  345. Name: name,
  346. Breakdown: &ClusterCostsBreakdown{},
  347. Local: true,
  348. }
  349. }
  350. diskMap[key].Bytes = bytes
  351. if bytes/1024/1024/1024 > maxLocalDiskSize {
  352. log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
  353. delete(diskMap, key)
  354. }
  355. }
  356. for _, result := range resLocalActiveMins {
  357. cluster, err := result.GetString(env.GetPromClusterLabel())
  358. if err != nil {
  359. cluster = env.GetClusterID()
  360. }
  361. name, err := result.GetString("node")
  362. if err != nil {
  363. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  364. continue
  365. }
  366. key := DiskIdentifier{cluster, name}
  367. if _, ok := diskMap[key]; !ok {
  368. log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
  369. continue
  370. }
  371. if len(result.Values) == 0 {
  372. continue
  373. }
  374. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  375. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  376. mins := e.Sub(s).Minutes()
  377. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  378. diskMap[key].End = e
  379. diskMap[key].Start = s
  380. diskMap[key].Minutes = mins
  381. }
  382. var unTracedDiskLogData []DiskIdentifier
  383. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  384. for _, result := range resPVStorageClass {
  385. cluster, err := result.GetString(env.GetPromClusterLabel())
  386. if err != nil {
  387. cluster = env.GetClusterID()
  388. }
  389. name, _ := result.GetString("persistentvolume")
  390. key := DiskIdentifier{cluster, name}
  391. if _, ok := diskMap[key]; !ok {
  392. if !slices.Contains(unTracedDiskLogData, key) {
  393. unTracedDiskLogData = append(unTracedDiskLogData, key)
  394. }
  395. continue
  396. }
  397. if len(result.Values) == 0 {
  398. continue
  399. }
  400. storageClass, err := result.GetString("storageclass")
  401. if err != nil {
  402. diskMap[key].StorageClass = opencost.UnknownStorageClass
  403. } else {
  404. diskMap[key].StorageClass = storageClass
  405. }
  406. }
  407. // Logging the unidentified disk information outside the loop
  408. for _, unIdentifiedDisk := range unTracedDiskLogData {
  409. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  410. }
  411. for _, disk := range diskMap {
  412. // Apply all remaining RAM to Idle
  413. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  414. // Set provider Id to the name for reconciliation
  415. if disk.ProviderID == "" {
  416. disk.ProviderID = disk.Name
  417. }
  418. }
  419. return diskMap, nil
  420. }
  421. type NodeOverhead struct {
  422. CpuOverheadFraction float64
  423. RamOverheadFraction float64
  424. }
  425. type Node struct {
  426. Cluster string
  427. Name string
  428. ProviderID string
  429. NodeType string
  430. CPUCost float64
  431. CPUCores float64
  432. GPUCost float64
  433. GPUCount float64
  434. RAMCost float64
  435. RAMBytes float64
  436. Discount float64
  437. Preemptible bool
  438. CPUBreakdown *ClusterCostsBreakdown
  439. RAMBreakdown *ClusterCostsBreakdown
  440. Start time.Time
  441. End time.Time
  442. Minutes float64
  443. Labels map[string]string
  444. CostPerCPUHr float64
  445. CostPerRAMGiBHr float64
  446. CostPerGPUHr float64
  447. Overhead *NodeOverhead
  448. }
  449. // GKE lies about the number of cores e2 nodes have. This table
  450. // contains a mapping from node type -> actual CPU cores
  451. // for those cases.
  452. var partialCPUMap = map[string]float64{
  453. "e2-micro": 0.25,
  454. "e2-small": 0.5,
  455. "e2-medium": 1.0,
  456. }
  457. type NodeIdentifier struct {
  458. Cluster string
  459. Name string
  460. ProviderID string
  461. }
  462. type nodeIdentifierNoProviderID struct {
  463. Cluster string
  464. Name string
  465. }
  466. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  467. for k, v := range activeDataMap {
  468. keyNon := nodeIdentifierNoProviderID{
  469. Cluster: k.Cluster,
  470. Name: k.Name,
  471. }
  472. if cost, ok := costMap[k]; ok {
  473. minutes := v.minutes
  474. count := 1.0
  475. if c, ok := resourceCountMap[keyNon]; ok {
  476. count = c
  477. }
  478. costMap[k] = cost * (minutes / 60) * count
  479. }
  480. }
  481. }
  482. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  483. for k, v := range activeDataMap {
  484. if cost, ok := costMap[k]; ok {
  485. minutes := v.minutes
  486. costMap[k] = cost * (minutes / 60)
  487. }
  488. }
  489. }
  490. func ClusterNodes(cp models.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  491. // Start from the time "end", querying backwards
  492. t := end
  493. // minsPerResolution determines accuracy and resource use for the following
  494. // queries. Smaller values (higher resolution) result in better accuracy,
  495. // but more expensive queries, and vice-a-versa.
  496. resolution := env.GetETLResolution()
  497. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  498. var minsPerResolution int
  499. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  500. minsPerResolution = 1
  501. log.DedupedWarningf(3, "ClusterNodes(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  502. }
  503. durStr := timeutil.DurationString(end.Sub(start))
  504. if durStr == "" {
  505. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  506. }
  507. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  508. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  509. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  510. queryNodeCPUCoresCapacity := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  511. queryNodeCPUCoresAllocatable := fmt.Sprintf(`avg(avg_over_time(kube_node_status_allocatable_cpu_cores{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  512. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  513. queryNodeRAMBytesCapacity := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  514. queryNodeRAMBytesAllocatable := fmt.Sprintf(`avg(avg_over_time(kube_node_status_allocatable_memory_bytes{%s}[%s])) by (%s, node)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  515. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count{%s}[%s])) by (%s, node, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  516. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost{%s}[%s])) by (%s, node, instance_type, provider_id)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  517. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total{%s}[%s:%dm])) by (kubernetes_node, %s, mode)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel())
  518. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system", %s}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  519. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system", %s}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterFilter(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  520. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost{%s}) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  521. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot{%s}[%s:%dm])`, env.GetPromClusterFilter(), durStr, minsPerResolution)
  522. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels{%s}[%s:%dm])`, env.GetPromClusterFilter(), durStr, minsPerResolution)
  523. // Return errors if these fail
  524. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  525. resChNodeCPUCoresCapacity := requiredCtx.QueryAtTime(queryNodeCPUCoresCapacity, t)
  526. resChNodeCPUCoresAllocatable := requiredCtx.QueryAtTime(queryNodeCPUCoresAllocatable, t)
  527. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  528. resChNodeRAMBytesCapacity := requiredCtx.QueryAtTime(queryNodeRAMBytesCapacity, t)
  529. resChNodeRAMBytesAllocatable := requiredCtx.QueryAtTime(queryNodeRAMBytesAllocatable, t)
  530. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  531. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  532. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  533. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  534. // Do not return errors if these fail, but log warnings
  535. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  536. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  537. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  538. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  539. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  540. resNodeCPUCoresCapacity, _ := resChNodeCPUCoresCapacity.Await()
  541. resNodeCPUCoresAllocatable, _ := resChNodeCPUCoresAllocatable.Await()
  542. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  543. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  544. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  545. resNodeRAMBytesCapacity, _ := resChNodeRAMBytesCapacity.Await()
  546. resNodeRAMBytesAllocatable, _ := resChNodeRAMBytesAllocatable.Await()
  547. resIsSpot, _ := resChIsSpot.Await()
  548. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  549. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  550. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  551. resActiveMins, _ := resChActiveMins.Await()
  552. resLabels, _ := resChLabels.Await()
  553. if optionalCtx.HasErrors() {
  554. for _, err := range optionalCtx.Errors() {
  555. log.Warnf("ClusterNodes: %s", err)
  556. }
  557. }
  558. if requiredCtx.HasErrors() {
  559. for _, err := range requiredCtx.Errors() {
  560. log.Errorf("ClusterNodes: %s", err)
  561. }
  562. return nil, requiredCtx.ErrorCollection()
  563. }
  564. activeDataMap := buildActiveDataMap(resActiveMins, resolution, opencost.NewClosedWindow(start, end))
  565. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  566. preemptibleMap := buildPreemptibleMap(resIsSpot)
  567. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  568. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  569. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  570. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  571. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  572. cpuCoresCapacityMap := buildCPUCoresMap(resNodeCPUCoresCapacity)
  573. ramBytesCapacityMap := buildRAMBytesMap(resNodeRAMBytesCapacity)
  574. cpuCoresAllocatableMap := buildCPUCoresMap(resNodeCPUCoresAllocatable)
  575. ramBytesAllocatableMap := buildRAMBytesMap(resNodeRAMBytesAllocatable)
  576. overheadMap := buildOverheadMap(ramBytesCapacityMap, ramBytesAllocatableMap, cpuCoresCapacityMap, cpuCoresAllocatableMap)
  577. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  578. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  579. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  580. labelsMap := buildLabelsMap(resLabels)
  581. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresCapacityMap)
  582. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesCapacityMap)
  583. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  584. nodeMap := buildNodeMap(
  585. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  586. cpuCoresCapacityMap, ramBytesCapacityMap, ramUserPctMap,
  587. ramSystemPctMap,
  588. cpuBreakdownMap,
  589. activeDataMap,
  590. preemptibleMap,
  591. labelsMap,
  592. clusterAndNameToType,
  593. resolution,
  594. overheadMap,
  595. )
  596. c, err := cp.GetConfig()
  597. if err != nil {
  598. return nil, err
  599. }
  600. discount, err := ParsePercentString(c.Discount)
  601. if err != nil {
  602. return nil, err
  603. }
  604. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  605. if err != nil {
  606. return nil, err
  607. }
  608. for _, node := range nodeMap {
  609. // TODO take GKE Reserved Instances into account
  610. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  611. // Apply all remaining resources to Idle
  612. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  613. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  614. }
  615. return nodeMap, nil
  616. }
  617. type LoadBalancerIdentifier struct {
  618. Cluster string
  619. Namespace string
  620. Name string
  621. }
  622. type LoadBalancer struct {
  623. Cluster string
  624. Namespace string
  625. Name string
  626. ProviderID string
  627. Cost float64
  628. Start time.Time
  629. End time.Time
  630. Minutes float64
  631. Private bool
  632. Ip string
  633. }
  634. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  635. // Start from the time "end", querying backwards
  636. t := end
  637. // minsPerResolution determines accuracy and resource use for the following
  638. // queries. Smaller values (higher resolution) result in better accuracy,
  639. // but more expensive queries, and vice-a-versa.
  640. resolution := env.GetETLResolution()
  641. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  642. var minsPerResolution int
  643. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  644. minsPerResolution = 1
  645. log.DedupedWarningf(3, "ClusterLoadBalancers(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  646. }
  647. // Query for the duration between start and end
  648. durStr := timeutil.DurationString(end.Sub(start))
  649. if durStr == "" {
  650. return nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  651. }
  652. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  653. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, %s, ingress_ip)`, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  654. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost{%s}) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
  655. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  656. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  657. resLBCost, _ := resChLBCost.Await()
  658. resActiveMins, _ := resChActiveMins.Await()
  659. if ctx.HasErrors() {
  660. return nil, ctx.ErrorCollection()
  661. }
  662. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  663. for _, result := range resActiveMins {
  664. cluster, err := result.GetString(env.GetPromClusterLabel())
  665. if err != nil {
  666. cluster = env.GetClusterID()
  667. }
  668. namespace, err := result.GetString("namespace")
  669. if err != nil {
  670. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  671. continue
  672. }
  673. name, err := result.GetString("service_name")
  674. if err != nil {
  675. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  676. continue
  677. }
  678. providerID, err := result.GetString("ingress_ip")
  679. if err != nil {
  680. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  681. providerID = ""
  682. }
  683. key := LoadBalancerIdentifier{
  684. Cluster: cluster,
  685. Namespace: namespace,
  686. Name: name,
  687. }
  688. // Skip if there are no data
  689. if len(result.Values) == 0 {
  690. continue
  691. }
  692. // Add load balancer to the set of load balancers
  693. if _, ok := loadBalancerMap[key]; !ok {
  694. loadBalancerMap[key] = &LoadBalancer{
  695. Cluster: cluster,
  696. Namespace: namespace,
  697. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  698. ProviderID: provider.ParseLBID(providerID),
  699. }
  700. }
  701. // Append start, end, and minutes. This should come before all other data.
  702. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  703. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  704. loadBalancerMap[key].Start = s
  705. loadBalancerMap[key].End = e
  706. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  707. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  708. // Prevents there from being a duplicate LoadBalancers on the same day
  709. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  710. loadBalancerMap[key].ProviderID = providerID
  711. }
  712. }
  713. for _, result := range resLBCost {
  714. cluster, err := result.GetString(env.GetPromClusterLabel())
  715. if err != nil {
  716. cluster = env.GetClusterID()
  717. }
  718. namespace, err := result.GetString("namespace")
  719. if err != nil {
  720. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  721. continue
  722. }
  723. name, err := result.GetString("service_name")
  724. if err != nil {
  725. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  726. continue
  727. }
  728. providerID, err := result.GetString("ingress_ip")
  729. if err != nil {
  730. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  731. // only update asset cost when an actual IP was returned
  732. continue
  733. }
  734. key := LoadBalancerIdentifier{
  735. Cluster: cluster,
  736. Namespace: namespace,
  737. Name: name,
  738. }
  739. // Apply cost as price-per-hour * hours
  740. if lb, ok := loadBalancerMap[key]; ok {
  741. lbPricePerHr := result.Values[0].Value
  742. // interpolate any missing data
  743. resultMins := lb.Minutes
  744. if resultMins > 0 {
  745. scaleFactor := (resultMins + resolution.Minutes()) / resultMins
  746. hrs := (lb.Minutes * scaleFactor) / 60.0
  747. lb.Cost += lbPricePerHr * hrs
  748. } else {
  749. log.DedupedWarningf(20, "ClusterLoadBalancers: found zero minutes for key: %v", key)
  750. }
  751. if lb.Ip != "" && lb.Ip != providerID {
  752. log.DedupedWarningf(5, "ClusterLoadBalancers: multiple IPs per load balancer not supported, using most recent IP")
  753. }
  754. lb.Ip = providerID
  755. lb.Private = privateIPCheck(providerID)
  756. } else {
  757. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %v", key)
  758. }
  759. }
  760. return loadBalancerMap, nil
  761. }
  762. // Check if an ip is private.
  763. func privateIPCheck(ip string) bool {
  764. ipAddress := net.ParseIP(ip)
  765. return ipAddress.IsPrivate()
  766. }
  767. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  768. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider models.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  769. if window < 10*time.Minute {
  770. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  771. }
  772. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  773. start, end := timeutil.ParseTimeRange(window, offset)
  774. mins := end.Sub(start).Minutes()
  775. // minsPerResolution determines accuracy and resource use for the following
  776. // queries. Smaller values (higher resolution) result in better accuracy,
  777. // but more expensive queries, and vice-a-versa.
  778. resolution := env.GetETLResolution()
  779. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  780. var minsPerResolution int
  781. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) < 1 {
  782. minsPerResolution = 1
  783. log.DedupedWarningf(3, "ComputeClusterCosts(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  784. }
  785. windowStr := timeutil.DurationString(window)
  786. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  787. // value, converts it to a cumulative value; i.e.
  788. // [$/hr] * [min/res]*[hr/min] = [$/res]
  789. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  790. const fmtQueryDataCount = `
  791. count_over_time(sum(kube_node_status_capacity_cpu_cores{%s}) by (%s)[%s:%dm]%s) * %d
  792. `
  793. const fmtQueryTotalGPU = `
  794. sum(
  795. sum_over_time(node_gpu_hourly_cost{%s}[%s:%dm]%s) * %f
  796. ) by (%s)
  797. `
  798. const fmtQueryTotalCPU = `
  799. sum(
  800. sum_over_time(avg(kube_node_status_capacity_cpu_cores{%s}) by (node, %s)[%s:%dm]%s) *
  801. avg(avg_over_time(node_cpu_hourly_cost{%s}[%s:%dm]%s)) by (node, %s) * %f
  802. ) by (%s)
  803. `
  804. const fmtQueryTotalRAM = `
  805. sum(
  806. sum_over_time(avg(kube_node_status_capacity_memory_bytes{%s}) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  807. avg(avg_over_time(node_ram_hourly_cost{%s}[%s:%dm]%s)) by (node, %s) * %f
  808. ) by (%s)
  809. `
  810. const fmtQueryTotalStorage = `
  811. sum(
  812. sum_over_time(avg(kube_persistentvolume_capacity_bytes{%s}) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  813. avg(avg_over_time(pv_hourly_cost{%s}[%s:%dm]%s)) by (persistentvolume, %s) * %f
  814. ) by (%s)
  815. `
  816. const fmtQueryCPUModePct = `
  817. sum(rate(node_cpu_seconds_total{%s}[%s]%s)) by (%s, mode) / ignoring(mode)
  818. group_left sum(rate(node_cpu_seconds_total{%s}[%s]%s)) by (%s)
  819. `
  820. const fmtQueryRAMSystemPct = `
  821. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system", %s}[%s:%dm]%s)) by (%s)
  822. / sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm]%s)) by (%s)
  823. `
  824. const fmtQueryRAMUserPct = `
  825. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes{%s}[%s:%dm]%s)) by (%s)
  826. / sum(sum_over_time(kube_node_status_capacity_memory_bytes{%s}[%s:%dm]%s)) by (%s)
  827. `
  828. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  829. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  830. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  831. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  832. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  833. if queryTotalLocalStorage != "" {
  834. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  835. }
  836. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  837. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  838. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  839. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  840. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  841. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterFilter(), env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  842. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  843. resChs := ctx.QueryAll(
  844. queryDataCount,
  845. queryTotalGPU,
  846. queryTotalCPU,
  847. queryTotalRAM,
  848. queryTotalStorage,
  849. )
  850. // Only submit the local storage query if it is valid. Otherwise Prometheus
  851. // will return errors. Always append something to resChs, regardless, to
  852. // maintain indexing.
  853. if queryTotalLocalStorage != "" {
  854. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  855. } else {
  856. resChs = append(resChs, nil)
  857. }
  858. if withBreakdown {
  859. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, env.GetPromClusterFilter(), windowStr, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, fmtOffset, env.GetPromClusterLabel())
  860. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  861. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  862. bdResChs := ctx.QueryAll(
  863. queryCPUModePct,
  864. queryRAMSystemPct,
  865. queryRAMUserPct,
  866. )
  867. // Only submit the local storage query if it is valid. Otherwise Prometheus
  868. // will return errors. Always append something to resChs, regardless, to
  869. // maintain indexing.
  870. if queryUsedLocalStorage != "" {
  871. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  872. } else {
  873. bdResChs = append(bdResChs, nil)
  874. }
  875. resChs = append(resChs, bdResChs...)
  876. }
  877. resDataCount, _ := resChs[0].Await()
  878. resTotalGPU, _ := resChs[1].Await()
  879. resTotalCPU, _ := resChs[2].Await()
  880. resTotalRAM, _ := resChs[3].Await()
  881. resTotalStorage, _ := resChs[4].Await()
  882. if ctx.HasErrors() {
  883. return nil, ctx.ErrorCollection()
  884. }
  885. defaultClusterID := env.GetClusterID()
  886. dataMinsByCluster := map[string]float64{}
  887. for _, result := range resDataCount {
  888. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  889. if clusterID == "" {
  890. clusterID = defaultClusterID
  891. }
  892. dataMins := mins
  893. if len(result.Values) > 0 {
  894. dataMins = result.Values[0].Value
  895. } else {
  896. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  897. }
  898. dataMinsByCluster[clusterID] = dataMins
  899. }
  900. // Determine combined discount
  901. discount, customDiscount := 0.0, 0.0
  902. c, err := a.CloudProvider.GetConfig()
  903. if err == nil {
  904. discount, err = ParsePercentString(c.Discount)
  905. if err != nil {
  906. discount = 0.0
  907. }
  908. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  909. if err != nil {
  910. customDiscount = 0.0
  911. }
  912. }
  913. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  914. costData := make(map[string]map[string]float64)
  915. // Helper function to iterate over Prom query results, parsing the raw values into
  916. // the intermediate costData structure.
  917. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  918. for _, result := range results {
  919. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  920. if clusterID == "" {
  921. clusterID = defaultClusterID
  922. }
  923. if _, ok := costData[clusterID]; !ok {
  924. costData[clusterID] = map[string]float64{}
  925. }
  926. if len(result.Values) > 0 {
  927. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  928. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  929. }
  930. }
  931. }
  932. // Apply both sustained use and custom discounts to RAM and CPU
  933. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  934. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  935. // Apply only custom discount to GPU and storage
  936. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  937. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  938. if queryTotalLocalStorage != "" {
  939. resTotalLocalStorage, err := resChs[5].Await()
  940. if err != nil {
  941. return nil, err
  942. }
  943. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  944. }
  945. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  946. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  947. pvUsedCostMap := map[string]float64{}
  948. if withBreakdown {
  949. resCPUModePct, _ := resChs[6].Await()
  950. resRAMSystemPct, _ := resChs[7].Await()
  951. resRAMUserPct, _ := resChs[8].Await()
  952. if ctx.HasErrors() {
  953. return nil, ctx.ErrorCollection()
  954. }
  955. for _, result := range resCPUModePct {
  956. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  957. if clusterID == "" {
  958. clusterID = defaultClusterID
  959. }
  960. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  961. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  962. }
  963. cpuBD := cpuBreakdownMap[clusterID]
  964. mode, err := result.GetString("mode")
  965. if err != nil {
  966. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  967. mode = "other"
  968. }
  969. switch mode {
  970. case "idle":
  971. cpuBD.Idle += result.Values[0].Value
  972. case "system":
  973. cpuBD.System += result.Values[0].Value
  974. case "user":
  975. cpuBD.User += result.Values[0].Value
  976. default:
  977. cpuBD.Other += result.Values[0].Value
  978. }
  979. }
  980. for _, result := range resRAMSystemPct {
  981. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  982. if clusterID == "" {
  983. clusterID = defaultClusterID
  984. }
  985. if _, ok := ramBreakdownMap[clusterID]; !ok {
  986. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  987. }
  988. ramBD := ramBreakdownMap[clusterID]
  989. ramBD.System += result.Values[0].Value
  990. }
  991. for _, result := range resRAMUserPct {
  992. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  993. if clusterID == "" {
  994. clusterID = defaultClusterID
  995. }
  996. if _, ok := ramBreakdownMap[clusterID]; !ok {
  997. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  998. }
  999. ramBD := ramBreakdownMap[clusterID]
  1000. ramBD.User += result.Values[0].Value
  1001. }
  1002. for _, ramBD := range ramBreakdownMap {
  1003. remaining := 1.0
  1004. remaining -= ramBD.Other
  1005. remaining -= ramBD.System
  1006. remaining -= ramBD.User
  1007. ramBD.Idle = remaining
  1008. }
  1009. if queryUsedLocalStorage != "" {
  1010. resUsedLocalStorage, err := resChs[9].Await()
  1011. if err != nil {
  1012. return nil, err
  1013. }
  1014. for _, result := range resUsedLocalStorage {
  1015. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  1016. if clusterID == "" {
  1017. clusterID = defaultClusterID
  1018. }
  1019. pvUsedCostMap[clusterID] += result.Values[0].Value
  1020. }
  1021. }
  1022. }
  1023. if ctx.HasErrors() {
  1024. for _, err := range ctx.Errors() {
  1025. log.Errorf("ComputeClusterCosts: %s", err)
  1026. }
  1027. return nil, ctx.ErrorCollection()
  1028. }
  1029. // Convert intermediate structure to Costs instances
  1030. costsByCluster := map[string]*ClusterCosts{}
  1031. for id, cd := range costData {
  1032. dataMins, ok := dataMinsByCluster[id]
  1033. if !ok {
  1034. dataMins = mins
  1035. log.Warnf("Cluster cost data count not found for cluster %s", id)
  1036. }
  1037. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  1038. if err != nil {
  1039. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  1040. return nil, err
  1041. }
  1042. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  1043. costs.CPUBreakdown = cpuBD
  1044. }
  1045. if ramBD, ok := ramBreakdownMap[id]; ok {
  1046. costs.RAMBreakdown = ramBD
  1047. }
  1048. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  1049. if pvUC, ok := pvUsedCostMap[id]; ok {
  1050. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  1051. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  1052. }
  1053. costs.DataMinutes = dataMins
  1054. costsByCluster[id] = costs
  1055. }
  1056. return costsByCluster, nil
  1057. }
  1058. type Totals struct {
  1059. TotalCost [][]string `json:"totalcost"`
  1060. CPUCost [][]string `json:"cpucost"`
  1061. MemCost [][]string `json:"memcost"`
  1062. StorageCost [][]string `json:"storageCost"`
  1063. }
  1064. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  1065. if len(qrs) == 0 {
  1066. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  1067. }
  1068. result := qrs[0]
  1069. totals := [][]string{}
  1070. for _, value := range result.Values {
  1071. d0 := fmt.Sprintf("%f", value.Timestamp)
  1072. d1 := fmt.Sprintf("%f", value.Value)
  1073. toAppend := []string{
  1074. d0,
  1075. d1,
  1076. }
  1077. totals = append(totals, toAppend)
  1078. }
  1079. return totals, nil
  1080. }
  1081. // ClusterCostsOverTime gives the full cluster costs over time
  1082. func ClusterCostsOverTime(cli prometheus.Client, provider models.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  1083. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  1084. if localStorageQuery != "" {
  1085. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  1086. }
  1087. layout := "2006-01-02T15:04:05.000Z"
  1088. start, err := time.Parse(layout, startString)
  1089. if err != nil {
  1090. log.Errorf("Error parsing time %s. Error: %s", startString, err.Error())
  1091. return nil, err
  1092. }
  1093. end, err := time.Parse(layout, endString)
  1094. if err != nil {
  1095. log.Errorf("Error parsing time %s. Error: %s", endString, err.Error())
  1096. return nil, err
  1097. }
  1098. fmtWindow := timeutil.DurationString(window)
  1099. if fmtWindow == "" {
  1100. err := fmt.Errorf("window value invalid or missing")
  1101. log.Errorf("Error parsing time %v. Error: %s", window, err.Error())
  1102. return nil, err
  1103. }
  1104. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  1105. qCores := fmt.Sprintf(queryClusterCores, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1106. qRAM := fmt.Sprintf(queryClusterRAM, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1107. qStorage := fmt.Sprintf(queryStorage, env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterFilter(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1108. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterFilter(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1109. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  1110. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  1111. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  1112. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  1113. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  1114. resultClusterCores, err := resChClusterCores.Await()
  1115. if err != nil {
  1116. return nil, err
  1117. }
  1118. resultClusterRAM, err := resChClusterRAM.Await()
  1119. if err != nil {
  1120. return nil, err
  1121. }
  1122. resultStorage, err := resChStorage.Await()
  1123. if err != nil {
  1124. return nil, err
  1125. }
  1126. resultTotal, err := resChTotal.Await()
  1127. if err != nil {
  1128. return nil, err
  1129. }
  1130. coreTotal, err := resultToTotals(resultClusterCores)
  1131. if err != nil {
  1132. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  1133. return nil, err
  1134. }
  1135. ramTotal, err := resultToTotals(resultClusterRAM)
  1136. if err != nil {
  1137. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  1138. return nil, err
  1139. }
  1140. storageTotal, err := resultToTotals(resultStorage)
  1141. if err != nil {
  1142. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  1143. }
  1144. clusterTotal, err := resultToTotals(resultTotal)
  1145. if err != nil {
  1146. // If clusterTotal query failed, it's likely because there are no PVs, which
  1147. // causes the qTotal query to return no data. Instead, query only node costs.
  1148. // If that fails, return an error because something is actually wrong.
  1149. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterFilter(), env.GetPromClusterLabel(), localStorageQuery)
  1150. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  1151. for _, warning := range warnings {
  1152. log.Warnf(warning)
  1153. }
  1154. if err != nil {
  1155. return nil, err
  1156. }
  1157. clusterTotal, err = resultToTotals(resultNodes)
  1158. if err != nil {
  1159. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  1160. return nil, err
  1161. }
  1162. }
  1163. return &Totals{
  1164. TotalCost: clusterTotal,
  1165. CPUCost: coreTotal,
  1166. MemCost: ramTotal,
  1167. StorageCost: storageTotal,
  1168. }, nil
  1169. }
  1170. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo []*prom.QueryResult, cp models.Provider, window opencost.Window) {
  1171. for _, result := range resActiveMins {
  1172. cluster, err := result.GetString(env.GetPromClusterLabel())
  1173. if err != nil {
  1174. cluster = env.GetClusterID()
  1175. }
  1176. name, err := result.GetString("persistentvolume")
  1177. if err != nil {
  1178. log.Warnf("ClusterDisks: active mins missing pv name")
  1179. continue
  1180. }
  1181. if len(result.Values) == 0 {
  1182. continue
  1183. }
  1184. key := DiskIdentifier{cluster, name}
  1185. if _, ok := diskMap[key]; !ok {
  1186. diskMap[key] = &Disk{
  1187. Cluster: cluster,
  1188. Name: name,
  1189. Breakdown: &ClusterCostsBreakdown{},
  1190. }
  1191. }
  1192. s, e := calculateStartAndEnd(result, resolution, window)
  1193. mins := e.Sub(s).Minutes()
  1194. diskMap[key].End = e
  1195. diskMap[key].Start = s
  1196. diskMap[key].Minutes = mins
  1197. }
  1198. for _, result := range resPVSize {
  1199. cluster, err := result.GetString(env.GetPromClusterLabel())
  1200. if err != nil {
  1201. cluster = env.GetClusterID()
  1202. }
  1203. name, err := result.GetString("persistentvolume")
  1204. if err != nil {
  1205. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  1206. continue
  1207. }
  1208. // TODO niko/assets storage class
  1209. bytes := result.Values[0].Value
  1210. key := DiskIdentifier{cluster, name}
  1211. if _, ok := diskMap[key]; !ok {
  1212. diskMap[key] = &Disk{
  1213. Cluster: cluster,
  1214. Name: name,
  1215. Breakdown: &ClusterCostsBreakdown{},
  1216. }
  1217. }
  1218. diskMap[key].Bytes = bytes
  1219. }
  1220. customPricingEnabled := provider.CustomPricesEnabled(cp)
  1221. customPricingConfig, err := cp.GetConfig()
  1222. if err != nil {
  1223. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1224. }
  1225. for _, result := range resPVCost {
  1226. cluster, err := result.GetString(env.GetPromClusterLabel())
  1227. if err != nil {
  1228. cluster = env.GetClusterID()
  1229. }
  1230. name, err := result.GetString("persistentvolume")
  1231. if err != nil {
  1232. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1233. continue
  1234. }
  1235. // TODO niko/assets storage class
  1236. var cost float64
  1237. if customPricingEnabled && customPricingConfig != nil {
  1238. customPVCostStr := customPricingConfig.Storage
  1239. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1240. if err != nil {
  1241. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1242. }
  1243. cost = customPVCost
  1244. } else {
  1245. cost = result.Values[0].Value
  1246. }
  1247. key := DiskIdentifier{cluster, name}
  1248. if _, ok := diskMap[key]; !ok {
  1249. diskMap[key] = &Disk{
  1250. Cluster: cluster,
  1251. Name: name,
  1252. Breakdown: &ClusterCostsBreakdown{},
  1253. }
  1254. }
  1255. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1256. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1257. if providerID != "" {
  1258. diskMap[key].ProviderID = provider.ParsePVID(providerID)
  1259. }
  1260. }
  1261. for _, result := range resPVUsedAvg {
  1262. cluster, err := result.GetString(env.GetPromClusterLabel())
  1263. if err != nil {
  1264. cluster = env.GetClusterID()
  1265. }
  1266. claimName, err := result.GetString("persistentvolumeclaim")
  1267. if err != nil {
  1268. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1269. continue
  1270. }
  1271. claimNamespace, err := result.GetString("namespace")
  1272. if err != nil {
  1273. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1274. continue
  1275. }
  1276. var volumeName string
  1277. for _, thatRes := range resPVCInfo {
  1278. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1279. if err != nil {
  1280. thatCluster = env.GetClusterID()
  1281. }
  1282. thatVolumeName, err := thatRes.GetString("volumename")
  1283. if err != nil {
  1284. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1285. continue
  1286. }
  1287. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1288. if err != nil {
  1289. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1290. continue
  1291. }
  1292. thatClaimNamespace, err := thatRes.GetString("namespace")
  1293. if err != nil {
  1294. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1295. continue
  1296. }
  1297. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1298. volumeName = thatVolumeName
  1299. }
  1300. }
  1301. usage := result.Values[0].Value
  1302. key := DiskIdentifier{cluster, volumeName}
  1303. if _, ok := diskMap[key]; !ok {
  1304. diskMap[key] = &Disk{
  1305. Cluster: cluster,
  1306. Name: volumeName,
  1307. Breakdown: &ClusterCostsBreakdown{},
  1308. }
  1309. }
  1310. diskMap[key].BytesUsedAvgPtr = &usage
  1311. }
  1312. for _, result := range resPVUsedMax {
  1313. cluster, err := result.GetString(env.GetPromClusterLabel())
  1314. if err != nil {
  1315. cluster = env.GetClusterID()
  1316. }
  1317. claimName, err := result.GetString("persistentvolumeclaim")
  1318. if err != nil {
  1319. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1320. continue
  1321. }
  1322. claimNamespace, err := result.GetString("namespace")
  1323. if err != nil {
  1324. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1325. continue
  1326. }
  1327. var volumeName string
  1328. for _, thatRes := range resPVCInfo {
  1329. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1330. if err != nil {
  1331. thatCluster = env.GetClusterID()
  1332. }
  1333. thatVolumeName, err := thatRes.GetString("volumename")
  1334. if err != nil {
  1335. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1336. continue
  1337. }
  1338. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1339. if err != nil {
  1340. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1341. continue
  1342. }
  1343. thatClaimNamespace, err := thatRes.GetString("namespace")
  1344. if err != nil {
  1345. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1346. continue
  1347. }
  1348. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1349. volumeName = thatVolumeName
  1350. }
  1351. }
  1352. usage := result.Values[0].Value
  1353. key := DiskIdentifier{cluster, volumeName}
  1354. if _, ok := diskMap[key]; !ok {
  1355. diskMap[key] = &Disk{
  1356. Cluster: cluster,
  1357. Name: volumeName,
  1358. Breakdown: &ClusterCostsBreakdown{},
  1359. }
  1360. }
  1361. diskMap[key].BytesUsedMaxPtr = &usage
  1362. }
  1363. }