allocation.go 71 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/env"
  10. "github.com/kubecost/cost-model/pkg/kubecost"
  11. "github.com/kubecost/cost-model/pkg/log"
  12. "github.com/kubecost/cost-model/pkg/prom"
  13. "github.com/kubecost/cost-model/pkg/util"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]%s`
  18. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s, provider_id)`
  19. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  20. queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, %s)`
  21. queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, %s)`
  22. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  23. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  24. queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, %s)`
  25. // This query could be written without the recording rule
  26. // "kubecost_savings_container_cpu_usage_seconds", but we should
  27. // only do that when we're ready to incur the performance tradeoffs
  28. // with subqueries which would probably be in the world of hourly
  29. // ETL.
  30. //
  31. // See PromQL subquery documentation for a rate example:
  32. // https://prometheus.io/blog/2019/01/28/subquery-support/#examples
  33. queryFmtCPUUsageMax = `max(max_over_time(kubecost_savings_container_cpu_usage_seconds[%s]%s)) by (container_name, pod_name, namespace, instance, %s)`
  34. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  35. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
  36. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
  37. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
  38. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
  39. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]%s`
  40. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, %s)`
  41. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
  42. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, %s)`
  43. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, %s)`
  44. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  45. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (%s)`
  46. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  47. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (%s)`
  48. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  49. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (%s)`
  50. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
  51. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
  52. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
  53. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
  54. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
  55. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
  56. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
  57. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, %s)`
  58. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,%s)`
  59. queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s]%s)) by (namespace, service_name, %s)`
  60. queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]%s`
  61. )
  62. // CanCompute should return true if CostModel can act as a valid source for the
  63. // given time range. In the case of CostModel we want to attempt to compute as
  64. // long as the range starts in the past. If the CostModel ends up not having
  65. // data to match, that's okay, and should be communicated with an error
  66. // response from ComputeAllocation.
  67. func (cm *CostModel) CanCompute(start, end time.Time) bool {
  68. return start.Before(time.Now())
  69. }
  70. // Name returns the name of the Source
  71. func (cm *CostModel) Name() string {
  72. return "CostModel"
  73. }
  74. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  75. // for the window defined by the given start and end times. The Allocations
  76. // returned are unaggregated (i.e. down to the container level).
  77. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  78. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  79. // 2. Run and apply the results of the remaining queries to
  80. // 3. Build out AllocationSet from completed Pod map
  81. // Create a window spanning the requested query
  82. window := kubecost.NewWindow(&start, &end)
  83. // Create an empty AllocationSet. For safety, in the case of an error, we
  84. // should prefer to return this empty set with the error. (In the case of
  85. // no error, of course we populate the set and return it.)
  86. allocSet := kubecost.NewAllocationSet(start, end)
  87. // (1) Build out Pod map
  88. // Build out a map of Allocations as a mapping from pod-to-container-to-
  89. // underlying-Allocation instance, starting with (start, end) so that we
  90. // begin with minutes, from which we compute resource allocation and cost
  91. // totals from measured rate data.
  92. podMap := map[podKey]*Pod{}
  93. // clusterStarts and clusterEnds record the earliest start and latest end
  94. // times, respectively, on a cluster-basis. These are used for unmounted
  95. // PVs and other "virtual" Allocations so that minutes are maximally
  96. // accurate during start-up or spin-down of a cluster
  97. clusterStart := map[string]time.Time{}
  98. clusterEnd := map[string]time.Time{}
  99. cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
  100. // (2) Run and apply remaining queries
  101. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  102. // including handling Thanos offset
  103. durStr, offStr, err := window.DurationOffsetForPrometheus()
  104. if err != nil {
  105. // Negative duration, so return empty set
  106. return allocSet, nil
  107. }
  108. // Convert resolution duration to a query-ready string
  109. resStr := util.DurationString(resolution)
  110. ctx := prom.NewContext(cm.PrometheusClient)
  111. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr, env.GetPromClusterLabel())
  112. resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
  113. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr, env.GetPromClusterLabel())
  114. resChRAMRequests := ctx.Query(queryRAMRequests)
  115. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, offStr, env.GetPromClusterLabel())
  116. resChRAMUsageAvg := ctx.Query(queryRAMUsageAvg)
  117. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, offStr, env.GetPromClusterLabel())
  118. resChRAMUsageMax := ctx.Query(queryRAMUsageMax)
  119. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr, env.GetPromClusterLabel())
  120. resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
  121. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr, env.GetPromClusterLabel())
  122. resChCPURequests := ctx.Query(queryCPURequests)
  123. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, offStr, env.GetPromClusterLabel())
  124. resChCPUUsageAvg := ctx.Query(queryCPUUsageAvg)
  125. queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, offStr, env.GetPromClusterLabel())
  126. resChCPUUsageMax := ctx.Query(queryCPUUsageMax)
  127. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr, env.GetPromClusterLabel())
  128. resChGPUsRequested := ctx.Query(queryGPUsRequested)
  129. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr, env.GetPromClusterLabel())
  130. resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
  131. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr, env.GetPromClusterLabel())
  132. resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
  133. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr, env.GetPromClusterLabel())
  134. resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
  135. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
  136. resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
  137. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr, offStr)
  138. resChPVCInfo := ctx.Query(queryPVCInfo)
  139. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr, env.GetPromClusterLabel())
  140. resChPVBytes := ctx.Query(queryPVBytes)
  141. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr, env.GetPromClusterLabel())
  142. resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
  143. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr, env.GetPromClusterLabel())
  144. resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
  145. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr, env.GetPromClusterLabel())
  146. resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
  147. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr, env.GetPromClusterLabel())
  148. resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
  149. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
  150. resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
  151. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr, env.GetPromClusterLabel())
  152. resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
  153. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
  154. resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
  155. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr, env.GetPromClusterLabel())
  156. resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
  157. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
  158. resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
  159. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
  160. resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
  161. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
  162. resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
  163. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
  164. resChPodLabels := ctx.Query(queryPodLabels)
  165. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
  166. resChPodAnnotations := ctx.Query(queryPodAnnotations)
  167. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
  168. resChServiceLabels := ctx.Query(queryServiceLabels)
  169. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
  170. resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
  171. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
  172. resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
  173. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr, env.GetPromClusterLabel())
  174. resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
  175. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr, env.GetPromClusterLabel())
  176. resChJobLabels := ctx.Query(queryJobLabels)
  177. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, offStr, env.GetPromClusterLabel())
  178. resChLBCostPerHr := ctx.Query(queryLBCostPerHr)
  179. queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr, offStr)
  180. resChLBActiveMins := ctx.Query(queryLBActiveMins)
  181. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  182. resCPURequests, _ := resChCPURequests.Await()
  183. resCPUUsageAvg, _ := resChCPUUsageAvg.Await()
  184. resCPUUsageMax, _ := resChCPUUsageMax.Await()
  185. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  186. resRAMRequests, _ := resChRAMRequests.Await()
  187. resRAMUsageAvg, _ := resChRAMUsageAvg.Await()
  188. resRAMUsageMax, _ := resChRAMUsageMax.Await()
  189. resGPUsRequested, _ := resChGPUsRequested.Await()
  190. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  191. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  192. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  193. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  194. resPVBytes, _ := resChPVBytes.Await()
  195. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  196. resPVCInfo, _ := resChPVCInfo.Await()
  197. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  198. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  199. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  200. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  201. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  202. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  203. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  204. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  205. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  206. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  207. resPodLabels, _ := resChPodLabels.Await()
  208. resPodAnnotations, _ := resChPodAnnotations.Await()
  209. resServiceLabels, _ := resChServiceLabels.Await()
  210. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  211. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  212. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  213. resJobLabels, _ := resChJobLabels.Await()
  214. resLBCostPerHr, _ := resChLBCostPerHr.Await()
  215. resLBActiveMins, _ := resChLBActiveMins.Await()
  216. if ctx.HasErrors() {
  217. for _, err := range ctx.Errors() {
  218. log.Errorf("CostModel.ComputeAllocation: %s", err)
  219. }
  220. return allocSet, ctx.ErrorCollection()
  221. }
  222. // We choose to apply allocation before requests in the cases of RAM and
  223. // CPU so that we can assert that allocation should always be greater than
  224. // or equal to request.
  225. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  226. applyCPUCoresRequested(podMap, resCPURequests)
  227. applyCPUCoresUsedAvg(podMap, resCPUUsageAvg)
  228. applyCPUCoresUsedMax(podMap, resCPUUsageMax)
  229. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  230. applyRAMBytesRequested(podMap, resRAMRequests)
  231. applyRAMBytesUsedAvg(podMap, resRAMUsageAvg)
  232. applyRAMBytesUsedMax(podMap, resRAMUsageMax)
  233. applyGPUsRequested(podMap, resGPUsRequested)
  234. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  235. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  236. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  237. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  238. podLabels := resToPodLabels(resPodLabels)
  239. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  240. podAnnotations := resToPodAnnotations(resPodAnnotations)
  241. applyLabels(podMap, namespaceLabels, podLabels)
  242. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  243. serviceLabels := getServiceLabels(resServiceLabels)
  244. allocsByService := map[serviceKey][]*kubecost.Allocation{}
  245. applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
  246. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  247. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  248. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  249. podJobMap := resToPodJobMap(resJobLabels)
  250. applyControllersToPods(podMap, podDeploymentMap)
  251. applyControllersToPods(podMap, podStatefulSetMap)
  252. applyControllersToPods(podMap, podDaemonSetMap)
  253. applyControllersToPods(podMap, podJobMap)
  254. // TODO breakdown network costs?
  255. // Build out a map of Nodes with resource costs, discounts, and node types
  256. // for converting resource allocation data to cumulative costs.
  257. nodeMap := map[nodeKey]*NodePricing{}
  258. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr, cm.Provider.ParseID)
  259. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr, cm.Provider.ParseID)
  260. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr, cm.Provider.ParseID)
  261. applyNodeSpot(nodeMap, resNodeIsSpot)
  262. applyNodeDiscount(nodeMap, cm)
  263. // Build out the map of all PVs with class, size and cost-per-hour.
  264. // Note: this does not record time running, which we may want to
  265. // include later for increased PV precision. (As long as the PV has
  266. // a PVC, we get time running there, so this is only inaccurate
  267. // for short-lived, unmounted PVs.)
  268. pvMap := map[pvKey]*PV{}
  269. buildPVMap(pvMap, resPVCostPerGiBHour)
  270. applyPVBytes(pvMap, resPVBytes)
  271. // Build out the map of all PVCs with time running, bytes requested,
  272. // and connect to the correct PV from pvMap. (If no PV exists, that
  273. // is noted, but does not result in any allocation/cost.)
  274. pvcMap := map[pvcKey]*PVC{}
  275. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  276. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  277. // Build out the relationships of pods to their PVCs. This step
  278. // populates the PVC.Count field so that PVC allocation can be
  279. // split appropriately among each pod's container allocation.
  280. podPVCMap := map[podKey][]*PVC{}
  281. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  282. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  283. // cluster representing each cluster's unmounted PVs (if necessary).
  284. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  285. lbMap := getLoadBalancerCosts(resLBCostPerHr, resLBActiveMins, resolution)
  286. applyLoadBalancersToPods(lbMap, allocsByService)
  287. // (3) Build out AllocationSet from Pod map
  288. for _, pod := range podMap {
  289. for _, alloc := range pod.Allocations {
  290. cluster := alloc.Properties.Cluster
  291. nodeName := alloc.Properties.Node
  292. namespace := alloc.Properties.Namespace
  293. pod := alloc.Properties.Pod
  294. container := alloc.Properties.Container
  295. podKey := newPodKey(cluster, namespace, pod)
  296. nodeKey := newNodeKey(cluster, nodeName)
  297. node := cm.getNodePricing(nodeMap, nodeKey)
  298. alloc.Properties.ProviderID = node.ProviderID
  299. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  300. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  301. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  302. if pvcs, ok := podPVCMap[podKey]; ok {
  303. for _, pvc := range pvcs {
  304. // Determine the (start, end) of the relationship between the
  305. // given PVC and the associated Allocation so that a precise
  306. // number of hours can be used to compute cumulative cost.
  307. s, e := alloc.Start, alloc.End
  308. if pvc.Start.After(alloc.Start) {
  309. s = pvc.Start
  310. }
  311. if pvc.End.Before(alloc.End) {
  312. e = pvc.End
  313. }
  314. minutes := e.Sub(s).Minutes()
  315. hrs := minutes / 60.0
  316. count := float64(pvc.Count)
  317. if pvc.Count < 1 {
  318. count = 1
  319. }
  320. gib := pvc.Bytes / 1024 / 1024 / 1024
  321. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  322. // Apply the size and cost of the PV to the allocation, each
  323. // weighted by count (i.e. the number of containers in the pod)
  324. // record the amount of total PVBytes Hours attributable to a given PV
  325. if alloc.PVs == nil {
  326. alloc.PVs = kubecost.PVAllocations{}
  327. }
  328. pvKey := kubecost.PVKey{
  329. Cluster: pvc.Cluster,
  330. Name: pvc.Volume.Name,
  331. }
  332. alloc.PVs[pvKey] = &kubecost.PVAllocation{
  333. ByteHours: pvc.Bytes * hrs / count,
  334. Cost: cost / count,
  335. }
  336. }
  337. }
  338. // Make sure that the name is correct (node may not be present at this
  339. // point due to it missing from queryMinutes) then insert.
  340. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  341. allocSet.Set(alloc)
  342. }
  343. }
  344. return allocSet, nil
  345. }
  346. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  347. // Assumes that window is positive and closed
  348. start, end := *window.Start(), *window.End()
  349. // Convert resolution duration to a query-ready string
  350. resStr := util.DurationString(resolution)
  351. ctx := prom.NewContext(cm.PrometheusClient)
  352. // Query for (start, end) by (pod, namespace, cluster) over the given
  353. // window, using the given resolution, and if necessary in batches no
  354. // larger than the given maximum batch size. If working in batches, track
  355. // overall progress by starting with (window.start, window.start) and
  356. // querying in batches no larger than maxBatchSize from start-to-end,
  357. // folding each result set into podMap as the results come back.
  358. coverage := kubecost.NewWindow(&start, &start)
  359. numQuery := 1
  360. for coverage.End().Before(end) {
  361. // Determine the (start, end) of the current batch
  362. batchStart := *coverage.End()
  363. batchEnd := coverage.End().Add(maxBatchSize)
  364. if batchEnd.After(end) {
  365. batchEnd = end
  366. }
  367. batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
  368. var resPods []*prom.QueryResult
  369. var err error
  370. maxTries := 3
  371. numTries := 0
  372. for resPods == nil && numTries < maxTries {
  373. numTries++
  374. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  375. // including handling Thanos offset
  376. durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
  377. if err != nil || durStr == "" {
  378. // Negative duration, so set empty results and don't query
  379. resPods = []*prom.QueryResult{}
  380. err = nil
  381. break
  382. }
  383. // Submit and profile query
  384. queryPods := fmt.Sprintf(queryFmtPods, env.GetPromClusterLabel(), durStr, resStr, offStr)
  385. queryProfile := time.Now()
  386. resPods, err = ctx.Query(queryPods).Await()
  387. if err != nil {
  388. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  389. resPods = nil
  390. }
  391. }
  392. if err != nil {
  393. return err
  394. }
  395. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  396. coverage = coverage.ExpandEnd(batchEnd)
  397. numQuery++
  398. }
  399. return nil
  400. }
  401. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  402. for _, res := range resPods {
  403. if len(res.Values) == 0 {
  404. log.Warningf("CostModel.ComputeAllocation: empty minutes result")
  405. continue
  406. }
  407. cluster, err := res.GetString(env.GetPromClusterLabel())
  408. if err != nil {
  409. cluster = env.GetClusterID()
  410. }
  411. labels, err := res.GetStrings("namespace", "pod")
  412. if err != nil {
  413. log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  414. continue
  415. }
  416. namespace := labels["namespace"]
  417. pod := labels["pod"]
  418. key := newPodKey(cluster, namespace, pod)
  419. // allocStart and allocEnd are the timestamps of the first and last
  420. // minutes the pod was running, respectively. We subtract one resolution
  421. // from allocStart because this point will actually represent the end
  422. // of the first minute. We don't subtract from allocEnd because it
  423. // already represents the end of the last minute.
  424. var allocStart, allocEnd time.Time
  425. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  426. for _, datum := range res.Values {
  427. t := time.Unix(int64(datum.Timestamp), 0)
  428. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  429. // Set the start timestamp to the earliest non-zero timestamp
  430. allocStart = t
  431. // Record adjustment coefficient, i.e. the portion of the start
  432. // timestamp to "ignore". That is, sometimes the value will be
  433. // 0.5, meaning that we should discount the time running by
  434. // half of the resolution the timestamp stands for.
  435. startAdjustmentCoeff = (1.0 - datum.Value)
  436. }
  437. if datum.Value > 0 && window.Contains(t) {
  438. // Set the end timestamp to the latest non-zero timestamp
  439. allocEnd = t
  440. // Record adjustment coefficient, i.e. the portion of the end
  441. // timestamp to "ignore". (See explanation above for start.)
  442. endAdjustmentCoeff = (1.0 - datum.Value)
  443. }
  444. }
  445. if allocStart.IsZero() || allocEnd.IsZero() {
  446. continue
  447. }
  448. // Adjust timestamps according to the resolution and the adjustment
  449. // coefficients, as described above. That is, count the start timestamp
  450. // from the beginning of the resolution, not the end. Then "reduce" the
  451. // start and end by the correct amount, in the case that the "running"
  452. // value of the first or last timestamp was not a full 1.0.
  453. allocStart = allocStart.Add(-resolution)
  454. // Note: the *100 and /100 are necessary because Duration is an int, so
  455. // 0.5, for instance, will be truncated, resulting in no adjustment.
  456. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  457. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  458. // If there is only one point with a value <= 0.5 that the start and
  459. // end timestamps both share, then we will enter this case because at
  460. // least half of a resolution will be subtracted from both the start
  461. // and the end. If that is the case, then add back half of each side
  462. // so that the pod is said to run for half a resolution total.
  463. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  464. // end up with allocEnd == allocStart and each coeff == 0.5. In
  465. // that case, add 0.25m to each side, resulting in 0.5m duration.
  466. if !allocEnd.After(allocStart) {
  467. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  468. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  469. }
  470. // Set start if unset or this datum's start time is earlier than the
  471. // current earliest time.
  472. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  473. clusterStart[cluster] = allocStart
  474. }
  475. // Set end if unset or this datum's end time is later than the
  476. // current latest time.
  477. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  478. clusterEnd[cluster] = allocEnd
  479. }
  480. if pod, ok := podMap[key]; ok {
  481. // Pod has already been recorded, so update it accordingly
  482. if allocStart.Before(pod.Start) {
  483. pod.Start = allocStart
  484. }
  485. if allocEnd.After(pod.End) {
  486. pod.End = allocEnd
  487. }
  488. } else {
  489. // Pod has not been recorded yet, so insert it
  490. podMap[key] = &Pod{
  491. Window: window.Clone(),
  492. Start: allocStart,
  493. End: allocEnd,
  494. Key: key,
  495. Allocations: map[string]*kubecost.Allocation{},
  496. }
  497. }
  498. }
  499. }
  500. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  501. for _, res := range resCPUCoresAllocated {
  502. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
  503. if err != nil {
  504. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  505. continue
  506. }
  507. pod, ok := podMap[key]
  508. if !ok {
  509. continue
  510. }
  511. container, err := res.GetString("container")
  512. if err != nil {
  513. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  514. continue
  515. }
  516. if _, ok := pod.Allocations[container]; !ok {
  517. pod.AppendContainer(container)
  518. }
  519. cpuCores := res.Values[0].Value
  520. hours := pod.Allocations[container].Minutes() / 60.0
  521. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  522. node, err := res.GetString("node")
  523. if err != nil {
  524. log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  525. continue
  526. }
  527. pod.Allocations[container].Properties.Node = node
  528. }
  529. }
  530. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  531. for _, res := range resCPUCoresRequested {
  532. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
  533. if err != nil {
  534. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  535. continue
  536. }
  537. pod, ok := podMap[key]
  538. if !ok {
  539. continue
  540. }
  541. container, err := res.GetString("container")
  542. if err != nil {
  543. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  544. continue
  545. }
  546. if _, ok := pod.Allocations[container]; !ok {
  547. pod.AppendContainer(container)
  548. }
  549. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  550. // If CPU allocation is less than requests, set CPUCoreHours to
  551. // request level.
  552. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  553. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  554. }
  555. node, err := res.GetString("node")
  556. if err != nil {
  557. log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  558. continue
  559. }
  560. pod.Allocations[container].Properties.Node = node
  561. }
  562. }
  563. func applyCPUCoresUsedAvg(podMap map[podKey]*Pod, resCPUCoresUsedAvg []*prom.QueryResult) {
  564. for _, res := range resCPUCoresUsedAvg {
  565. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod_name")
  566. if err != nil {
  567. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg result missing field: %s", err)
  568. continue
  569. }
  570. pod, ok := podMap[key]
  571. if !ok {
  572. continue
  573. }
  574. container, err := res.GetString("container_name")
  575. if err != nil {
  576. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg query result missing 'container': %s", key)
  577. continue
  578. }
  579. if _, ok := pod.Allocations[container]; !ok {
  580. pod.AppendContainer(container)
  581. }
  582. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  583. }
  584. }
  585. func applyCPUCoresUsedMax(podMap map[podKey]*Pod, resCPUCoresUsedMax []*prom.QueryResult) {
  586. for _, res := range resCPUCoresUsedMax {
  587. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod_name")
  588. if err != nil {
  589. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max result missing field: %s", err)
  590. continue
  591. }
  592. pod, ok := podMap[key]
  593. if !ok {
  594. continue
  595. }
  596. container, err := res.GetString("container_name")
  597. if err != nil {
  598. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max query result missing 'container': %s", key)
  599. continue
  600. }
  601. if _, ok := pod.Allocations[container]; !ok {
  602. pod.AppendContainer(container)
  603. }
  604. if pod.Allocations[container].RawAllocationOnly == nil {
  605. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  606. CPUCoreUsageMax: res.Values[0].Value,
  607. }
  608. } else {
  609. pod.Allocations[container].RawAllocationOnly.CPUCoreUsageMax = res.Values[0].Value
  610. }
  611. }
  612. }
  613. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  614. for _, res := range resRAMBytesAllocated {
  615. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
  616. if err != nil {
  617. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  618. continue
  619. }
  620. pod, ok := podMap[key]
  621. if !ok {
  622. continue
  623. }
  624. container, err := res.GetString("container")
  625. if err != nil {
  626. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  627. continue
  628. }
  629. if _, ok := pod.Allocations[container]; !ok {
  630. pod.AppendContainer(container)
  631. }
  632. ramBytes := res.Values[0].Value
  633. hours := pod.Allocations[container].Minutes() / 60.0
  634. pod.Allocations[container].RAMByteHours = ramBytes * hours
  635. node, err := res.GetString("node")
  636. if err != nil {
  637. log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  638. continue
  639. }
  640. pod.Allocations[container].Properties.Node = node
  641. }
  642. }
  643. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  644. for _, res := range resRAMBytesRequested {
  645. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
  646. if err != nil {
  647. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  648. continue
  649. }
  650. pod, ok := podMap[key]
  651. if !ok {
  652. continue
  653. }
  654. container, err := res.GetString("container")
  655. if err != nil {
  656. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  657. continue
  658. }
  659. if _, ok := pod.Allocations[container]; !ok {
  660. pod.AppendContainer(container)
  661. }
  662. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  663. // If RAM allocation is less than requests, set RAMByteHours to
  664. // request level.
  665. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  666. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  667. }
  668. node, err := res.GetString("node")
  669. if err != nil {
  670. log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  671. continue
  672. }
  673. pod.Allocations[container].Properties.Node = node
  674. }
  675. }
  676. func applyRAMBytesUsedAvg(podMap map[podKey]*Pod, resRAMBytesUsedAvg []*prom.QueryResult) {
  677. for _, res := range resRAMBytesUsedAvg {
  678. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod_name")
  679. if err != nil {
  680. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM avg usage result missing field: %s", err)
  681. continue
  682. }
  683. pod, ok := podMap[key]
  684. if !ok {
  685. continue
  686. }
  687. container, err := res.GetString("container_name")
  688. if err != nil {
  689. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage avg query result missing 'container': %s", key)
  690. continue
  691. }
  692. if _, ok := pod.Allocations[container]; !ok {
  693. pod.AppendContainer(container)
  694. }
  695. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  696. }
  697. }
  698. func applyRAMBytesUsedMax(podMap map[podKey]*Pod, resRAMBytesUsedMax []*prom.QueryResult) {
  699. for _, res := range resRAMBytesUsedMax {
  700. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod_name")
  701. if err != nil {
  702. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max result missing field: %s", err)
  703. continue
  704. }
  705. pod, ok := podMap[key]
  706. if !ok {
  707. continue
  708. }
  709. container, err := res.GetString("container_name")
  710. if err != nil {
  711. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max query result missing 'container': %s", key)
  712. continue
  713. }
  714. if _, ok := pod.Allocations[container]; !ok {
  715. pod.AppendContainer(container)
  716. }
  717. if pod.Allocations[container].RawAllocationOnly == nil {
  718. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  719. RAMBytesUsageMax: res.Values[0].Value,
  720. }
  721. } else {
  722. pod.Allocations[container].RawAllocationOnly.RAMBytesUsageMax = res.Values[0].Value
  723. }
  724. }
  725. }
  726. func applyGPUsRequested(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult) {
  727. for _, res := range resGPUsRequested {
  728. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
  729. if err != nil {
  730. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  731. continue
  732. }
  733. pod, ok := podMap[key]
  734. if !ok {
  735. continue
  736. }
  737. container, err := res.GetString("container")
  738. if err != nil {
  739. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  740. continue
  741. }
  742. if _, ok := pod.Allocations[container]; !ok {
  743. pod.AppendContainer(container)
  744. }
  745. hrs := pod.Allocations[container].Minutes() / 60.0
  746. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  747. }
  748. }
  749. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  750. costPerGiBByCluster := map[string]float64{}
  751. for _, res := range resNetworkCostPerGiB {
  752. cluster, err := res.GetString(env.GetPromClusterLabel())
  753. if err != nil {
  754. cluster = env.GetClusterID()
  755. }
  756. costPerGiBByCluster[cluster] = res.Values[0].Value
  757. }
  758. for _, res := range resNetworkGiB {
  759. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod_name")
  760. if err != nil {
  761. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  762. continue
  763. }
  764. pod, ok := podMap[podKey]
  765. if !ok {
  766. continue
  767. }
  768. for _, alloc := range pod.Allocations {
  769. gib := res.Values[0].Value / float64(len(pod.Allocations))
  770. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  771. alloc.NetworkCost = gib * costPerGiB
  772. }
  773. }
  774. }
  775. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
  776. namespaceLabels := map[namespaceKey]map[string]string{}
  777. for _, res := range resNamespaceLabels {
  778. nsKey, err := resultNamespaceKey(res, env.GetPromClusterLabel(), "namespace")
  779. if err != nil {
  780. continue
  781. }
  782. if _, ok := namespaceLabels[nsKey]; !ok {
  783. namespaceLabels[nsKey] = map[string]string{}
  784. }
  785. for k, l := range res.GetLabels() {
  786. namespaceLabels[nsKey][k] = l
  787. }
  788. }
  789. return namespaceLabels
  790. }
  791. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  792. podLabels := map[podKey]map[string]string{}
  793. for _, res := range resPodLabels {
  794. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
  795. if err != nil {
  796. continue
  797. }
  798. if _, ok := podLabels[podKey]; !ok {
  799. podLabels[podKey] = map[string]string{}
  800. }
  801. for k, l := range res.GetLabels() {
  802. podLabels[podKey][k] = l
  803. }
  804. }
  805. return podLabels
  806. }
  807. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  808. namespaceAnnotations := map[string]map[string]string{}
  809. for _, res := range resNamespaceAnnotations {
  810. namespace, err := res.GetString("namespace")
  811. if err != nil {
  812. continue
  813. }
  814. if _, ok := namespaceAnnotations[namespace]; !ok {
  815. namespaceAnnotations[namespace] = map[string]string{}
  816. }
  817. for k, l := range res.GetAnnotations() {
  818. namespaceAnnotations[namespace][k] = l
  819. }
  820. }
  821. return namespaceAnnotations
  822. }
  823. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  824. podAnnotations := map[podKey]map[string]string{}
  825. for _, res := range resPodAnnotations {
  826. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
  827. if err != nil {
  828. continue
  829. }
  830. if _, ok := podAnnotations[podKey]; !ok {
  831. podAnnotations[podKey] = map[string]string{}
  832. }
  833. for k, l := range res.GetAnnotations() {
  834. podAnnotations[podKey][k] = l
  835. }
  836. }
  837. return podAnnotations
  838. }
  839. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
  840. for podKey, pod := range podMap {
  841. for _, alloc := range pod.Allocations {
  842. allocLabels := alloc.Properties.Labels
  843. if allocLabels == nil {
  844. allocLabels = make(map[string]string)
  845. }
  846. // Apply namespace labels first, then pod labels so that pod labels
  847. // overwrite namespace labels.
  848. nsKey := newNamespaceKey(podKey.Cluster, podKey.Namespace)
  849. if labels, ok := namespaceLabels[nsKey]; ok {
  850. for k, v := range labels {
  851. allocLabels[k] = v
  852. }
  853. }
  854. if labels, ok := podLabels[podKey]; ok {
  855. for k, v := range labels {
  856. allocLabels[k] = v
  857. }
  858. }
  859. alloc.Properties.Labels = allocLabels
  860. }
  861. }
  862. }
  863. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  864. for key, pod := range podMap {
  865. for _, alloc := range pod.Allocations {
  866. allocAnnotations := alloc.Properties.Annotations
  867. if allocAnnotations == nil {
  868. allocAnnotations = make(map[string]string)
  869. }
  870. // Apply namespace annotations first, then pod annotations so that
  871. // pod labels overwrite namespace labels.
  872. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  873. for k, v := range labels {
  874. allocAnnotations[k] = v
  875. }
  876. }
  877. if labels, ok := podAnnotations[key]; ok {
  878. for k, v := range labels {
  879. allocAnnotations[k] = v
  880. }
  881. }
  882. alloc.Properties.Annotations = allocAnnotations
  883. }
  884. }
  885. }
  886. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  887. serviceLabels := map[serviceKey]map[string]string{}
  888. for _, res := range resServiceLabels {
  889. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service")
  890. if err != nil {
  891. continue
  892. }
  893. if _, ok := serviceLabels[serviceKey]; !ok {
  894. serviceLabels[serviceKey] = map[string]string{}
  895. }
  896. for k, l := range res.GetLabels() {
  897. serviceLabels[serviceKey][k] = l
  898. }
  899. }
  900. // Prune duplicate services. That is, if the same service exists with
  901. // hyphens instead of underscores, keep the one that uses hyphens.
  902. for key := range serviceLabels {
  903. if strings.Contains(key.Service, "_") {
  904. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  905. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  906. if _, ok := serviceLabels[duplicateKey]; ok {
  907. delete(serviceLabels, key)
  908. }
  909. }
  910. }
  911. return serviceLabels
  912. }
  913. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  914. deploymentLabels := map[controllerKey]map[string]string{}
  915. for _, res := range resDeploymentLabels {
  916. controllerKey, err := resultDeploymentKey(res, env.GetPromClusterLabel(), "namespace", "deployment")
  917. if err != nil {
  918. continue
  919. }
  920. if _, ok := deploymentLabels[controllerKey]; !ok {
  921. deploymentLabels[controllerKey] = map[string]string{}
  922. }
  923. for k, l := range res.GetLabels() {
  924. deploymentLabels[controllerKey][k] = l
  925. }
  926. }
  927. // Prune duplicate deployments. That is, if the same deployment exists with
  928. // hyphens instead of underscores, keep the one that uses hyphens.
  929. for key := range deploymentLabels {
  930. if strings.Contains(key.Controller, "_") {
  931. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  932. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  933. if _, ok := deploymentLabels[duplicateKey]; ok {
  934. delete(deploymentLabels, key)
  935. }
  936. }
  937. }
  938. return deploymentLabels
  939. }
  940. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  941. statefulSetLabels := map[controllerKey]map[string]string{}
  942. for _, res := range resStatefulSetLabels {
  943. controllerKey, err := resultStatefulSetKey(res, env.GetPromClusterLabel(), "namespace", "statefulSet")
  944. if err != nil {
  945. continue
  946. }
  947. if _, ok := statefulSetLabels[controllerKey]; !ok {
  948. statefulSetLabels[controllerKey] = map[string]string{}
  949. }
  950. for k, l := range res.GetLabels() {
  951. statefulSetLabels[controllerKey][k] = l
  952. }
  953. }
  954. // Prune duplicate stateful sets. That is, if the same stateful set exists
  955. // with hyphens instead of underscores, keep the one that uses hyphens.
  956. for key := range statefulSetLabels {
  957. if strings.Contains(key.Controller, "_") {
  958. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  959. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  960. if _, ok := statefulSetLabels[duplicateKey]; ok {
  961. delete(statefulSetLabels, key)
  962. }
  963. }
  964. }
  965. return statefulSetLabels
  966. }
  967. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  968. podControllerMap := map[podKey]controllerKey{}
  969. // For each controller, turn the labels into a selector and attempt to
  970. // match it with each set of pod labels. A match indicates that the pod
  971. // belongs to the controller.
  972. for cKey, cLabels := range controllerLabels {
  973. selector := labels.Set(cLabels).AsSelectorPreValidated()
  974. for pKey, pLabels := range podLabels {
  975. // If the pod is in a different cluster or namespace, there is
  976. // no need to compare the labels.
  977. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  978. continue
  979. }
  980. podLabelSet := labels.Set(pLabels)
  981. if selector.Matches(podLabelSet) {
  982. if _, ok := podControllerMap[pKey]; ok {
  983. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  984. }
  985. podControllerMap[pKey] = cKey
  986. }
  987. }
  988. }
  989. return podControllerMap
  990. }
  991. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  992. daemonSetLabels := map[podKey]controllerKey{}
  993. for _, res := range resDaemonSetLabels {
  994. controllerKey, err := resultDaemonSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  995. if err != nil {
  996. continue
  997. }
  998. pod, err := res.GetString("pod")
  999. if err != nil {
  1000. log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  1001. }
  1002. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1003. daemonSetLabels[podKey] = controllerKey
  1004. }
  1005. return daemonSetLabels
  1006. }
  1007. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  1008. jobLabels := map[podKey]controllerKey{}
  1009. for _, res := range resJobLabels {
  1010. controllerKey, err := resultJobKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1011. if err != nil {
  1012. continue
  1013. }
  1014. // Convert the name of Jobs generated by CronJobs to the name of the
  1015. // CronJob by stripping the timestamp off the end.
  1016. match := isCron.FindStringSubmatch(controllerKey.Controller)
  1017. if match != nil {
  1018. controllerKey.Controller = match[1]
  1019. }
  1020. pod, err := res.GetString("pod")
  1021. if err != nil {
  1022. log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  1023. }
  1024. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1025. jobLabels[podKey] = controllerKey
  1026. }
  1027. return jobLabels
  1028. }
  1029. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, allocsByService map[serviceKey][]*kubecost.Allocation, serviceLabels map[serviceKey]map[string]string) {
  1030. podServicesMap := map[podKey][]serviceKey{}
  1031. // For each service, turn the labels into a selector and attempt to
  1032. // match it with each set of pod labels. A match indicates that the pod
  1033. // belongs to the service.
  1034. for sKey, sLabels := range serviceLabels {
  1035. selector := labels.Set(sLabels).AsSelectorPreValidated()
  1036. for pKey, pLabels := range podLabels {
  1037. // If the pod is in a different cluster or namespace, there is
  1038. // no need to compare the labels.
  1039. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  1040. continue
  1041. }
  1042. podLabelSet := labels.Set(pLabels)
  1043. if selector.Matches(podLabelSet) {
  1044. if _, ok := podServicesMap[pKey]; !ok {
  1045. podServicesMap[pKey] = []serviceKey{}
  1046. }
  1047. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  1048. }
  1049. }
  1050. }
  1051. // For each allocation in each pod, attempt to find and apply the list of
  1052. // services associated with the allocation's pod.
  1053. for key, pod := range podMap {
  1054. for _, alloc := range pod.Allocations {
  1055. if sKeys, ok := podServicesMap[key]; ok {
  1056. services := []string{}
  1057. for _, sKey := range sKeys {
  1058. services = append(services, sKey.Service)
  1059. allocsByService[sKey] = append(allocsByService[sKey], alloc)
  1060. }
  1061. alloc.Properties.Services = services
  1062. }
  1063. }
  1064. }
  1065. }
  1066. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  1067. for key, pod := range podMap {
  1068. for _, alloc := range pod.Allocations {
  1069. if controllerKey, ok := podControllerMap[key]; ok {
  1070. alloc.Properties.ControllerKind = controllerKey.ControllerKind
  1071. alloc.Properties.Controller = controllerKey.Controller
  1072. }
  1073. }
  1074. }
  1075. }
  1076. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult,
  1077. providerIDParser func(string) string) {
  1078. for _, res := range resNodeCostPerCPUHr {
  1079. cluster, err := res.GetString(env.GetPromClusterLabel())
  1080. if err != nil {
  1081. cluster = env.GetClusterID()
  1082. }
  1083. node, err := res.GetString("node")
  1084. if err != nil {
  1085. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1086. continue
  1087. }
  1088. instanceType, err := res.GetString("instance_type")
  1089. if err != nil {
  1090. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1091. continue
  1092. }
  1093. providerID, err := res.GetString("provider_id")
  1094. if err != nil {
  1095. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1096. continue
  1097. }
  1098. key := newNodeKey(cluster, node)
  1099. if _, ok := nodeMap[key]; !ok {
  1100. nodeMap[key] = &NodePricing{
  1101. Name: node,
  1102. NodeType: instanceType,
  1103. ProviderID: providerIDParser(providerID),
  1104. }
  1105. }
  1106. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  1107. }
  1108. }
  1109. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult,
  1110. providerIDParser func(string) string) {
  1111. for _, res := range resNodeCostPerRAMGiBHr {
  1112. cluster, err := res.GetString(env.GetPromClusterLabel())
  1113. if err != nil {
  1114. cluster = env.GetClusterID()
  1115. }
  1116. node, err := res.GetString("node")
  1117. if err != nil {
  1118. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1119. continue
  1120. }
  1121. instanceType, err := res.GetString("instance_type")
  1122. if err != nil {
  1123. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1124. continue
  1125. }
  1126. providerID, err := res.GetString("provider_id")
  1127. if err != nil {
  1128. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1129. continue
  1130. }
  1131. key := newNodeKey(cluster, node)
  1132. if _, ok := nodeMap[key]; !ok {
  1133. nodeMap[key] = &NodePricing{
  1134. Name: node,
  1135. NodeType: instanceType,
  1136. ProviderID: providerIDParser(providerID),
  1137. }
  1138. }
  1139. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1140. }
  1141. }
  1142. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult,
  1143. providerIDParser func(string) string) {
  1144. for _, res := range resNodeCostPerGPUHr {
  1145. cluster, err := res.GetString(env.GetPromClusterLabel())
  1146. if err != nil {
  1147. cluster = env.GetClusterID()
  1148. }
  1149. node, err := res.GetString("node")
  1150. if err != nil {
  1151. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1152. continue
  1153. }
  1154. instanceType, err := res.GetString("instance_type")
  1155. if err != nil {
  1156. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1157. continue
  1158. }
  1159. providerID, err := res.GetString("provider_id")
  1160. if err != nil {
  1161. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1162. continue
  1163. }
  1164. key := newNodeKey(cluster, node)
  1165. if _, ok := nodeMap[key]; !ok {
  1166. nodeMap[key] = &NodePricing{
  1167. Name: node,
  1168. NodeType: instanceType,
  1169. ProviderID: providerIDParser(providerID),
  1170. }
  1171. }
  1172. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1173. }
  1174. }
  1175. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1176. for _, res := range resNodeIsSpot {
  1177. cluster, err := res.GetString(env.GetPromClusterLabel())
  1178. if err != nil {
  1179. cluster = env.GetClusterID()
  1180. }
  1181. node, err := res.GetString("node")
  1182. if err != nil {
  1183. log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1184. continue
  1185. }
  1186. key := newNodeKey(cluster, node)
  1187. if _, ok := nodeMap[key]; !ok {
  1188. log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1189. continue
  1190. }
  1191. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1192. }
  1193. }
  1194. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1195. if cm == nil {
  1196. return
  1197. }
  1198. c, err := cm.Provider.GetConfig()
  1199. if err != nil {
  1200. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1201. return
  1202. }
  1203. discount, err := ParsePercentString(c.Discount)
  1204. if err != nil {
  1205. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1206. return
  1207. }
  1208. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1209. if err != nil {
  1210. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1211. return
  1212. }
  1213. for _, node := range nodeMap {
  1214. // TODO GKE Reserved Instances into account
  1215. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1216. node.CostPerCPUHr *= (1.0 - node.Discount)
  1217. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1218. }
  1219. }
  1220. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1221. for _, res := range resPVCostPerGiBHour {
  1222. cluster, err := res.GetString(env.GetPromClusterLabel())
  1223. if err != nil {
  1224. cluster = env.GetClusterID()
  1225. }
  1226. name, err := res.GetString("volumename")
  1227. if err != nil {
  1228. log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
  1229. continue
  1230. }
  1231. key := newPVKey(cluster, name)
  1232. pvMap[key] = &PV{
  1233. Cluster: cluster,
  1234. Name: name,
  1235. CostPerGiBHour: res.Values[0].Value,
  1236. }
  1237. }
  1238. }
  1239. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1240. for _, res := range resPVBytes {
  1241. key, err := resultPVKey(res, env.GetPromClusterLabel(), "persistentvolume")
  1242. if err != nil {
  1243. log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1244. continue
  1245. }
  1246. if _, ok := pvMap[key]; !ok {
  1247. log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1248. continue
  1249. }
  1250. pvMap[key].Bytes = res.Values[0].Value
  1251. }
  1252. }
  1253. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1254. for _, res := range resPVCInfo {
  1255. cluster, err := res.GetString(env.GetPromClusterLabel())
  1256. if err != nil {
  1257. cluster = env.GetClusterID()
  1258. }
  1259. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1260. if err != nil {
  1261. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1262. continue
  1263. }
  1264. namespace := values["namespace"]
  1265. name := values["persistentvolumeclaim"]
  1266. volume := values["volumename"]
  1267. storageClass := values["storageclass"]
  1268. pvKey := newPVKey(cluster, volume)
  1269. pvcKey := newPVCKey(cluster, namespace, name)
  1270. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1271. // the PVC was running, respectively. We subtract 1m from pvcStart
  1272. // because this point will actually represent the end of the first
  1273. // minute. We don't subtract from pvcEnd because it already represents
  1274. // the end of the last minute.
  1275. var pvcStart, pvcEnd time.Time
  1276. for _, datum := range res.Values {
  1277. t := time.Unix(int64(datum.Timestamp), 0)
  1278. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1279. pvcStart = t
  1280. }
  1281. if datum.Value > 0 && window.Contains(t) {
  1282. pvcEnd = t
  1283. }
  1284. }
  1285. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1286. log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1287. }
  1288. pvcStart = pvcStart.Add(-time.Minute)
  1289. if _, ok := pvMap[pvKey]; !ok {
  1290. continue
  1291. }
  1292. pvMap[pvKey].StorageClass = storageClass
  1293. if _, ok := pvcMap[pvcKey]; !ok {
  1294. pvcMap[pvcKey] = &PVC{}
  1295. }
  1296. pvcMap[pvcKey].Name = name
  1297. pvcMap[pvcKey].Namespace = namespace
  1298. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1299. pvcMap[pvcKey].Start = pvcStart
  1300. pvcMap[pvcKey].End = pvcEnd
  1301. }
  1302. }
  1303. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1304. for _, res := range resPVCBytesRequested {
  1305. key, err := resultPVCKey(res, env.GetPromClusterLabel(), "namespace", "persistentvolumeclaim")
  1306. if err != nil {
  1307. continue
  1308. }
  1309. if _, ok := pvcMap[key]; !ok {
  1310. continue
  1311. }
  1312. pvcMap[key].Bytes = res.Values[0].Value
  1313. }
  1314. }
  1315. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1316. for _, res := range resPodPVCAllocation {
  1317. cluster, err := res.GetString(env.GetPromClusterLabel())
  1318. if err != nil {
  1319. cluster = env.GetClusterID()
  1320. }
  1321. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1322. if err != nil {
  1323. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1324. continue
  1325. }
  1326. namespace := values["namespace"]
  1327. pod := values["pod"]
  1328. name := values["persistentvolumeclaim"]
  1329. volume := values["persistentvolume"]
  1330. podKey := newPodKey(cluster, namespace, pod)
  1331. pvKey := newPVKey(cluster, volume)
  1332. pvcKey := newPVCKey(cluster, namespace, name)
  1333. if _, ok := pvMap[pvKey]; !ok {
  1334. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1335. continue
  1336. }
  1337. if _, ok := podPVCMap[podKey]; !ok {
  1338. podPVCMap[podKey] = []*PVC{}
  1339. }
  1340. pvc, ok := pvcMap[pvcKey]
  1341. if !ok {
  1342. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1343. continue
  1344. }
  1345. count := 1
  1346. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1347. count = len(pod.Allocations)
  1348. } else {
  1349. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1350. }
  1351. pvc.Count = count
  1352. pvc.Mounted = true
  1353. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1354. }
  1355. }
  1356. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1357. unmountedPVBytes := map[string]float64{}
  1358. unmountedPVCost := map[string]float64{}
  1359. for _, pv := range pvMap {
  1360. mounted := false
  1361. for _, pvc := range pvcMap {
  1362. if pvc.Volume == nil {
  1363. continue
  1364. }
  1365. if pvc.Volume == pv {
  1366. mounted = true
  1367. break
  1368. }
  1369. }
  1370. if !mounted {
  1371. gib := pv.Bytes / 1024 / 1024 / 1024
  1372. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1373. cost := pv.CostPerGiBHour * gib * hrs
  1374. unmountedPVCost[pv.Cluster] += cost
  1375. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1376. }
  1377. }
  1378. for cluster, amount := range unmountedPVCost {
  1379. container := kubecost.UnmountedSuffix
  1380. pod := kubecost.UnmountedSuffix
  1381. namespace := kubecost.UnmountedSuffix
  1382. node := ""
  1383. key := newPodKey(cluster, namespace, pod)
  1384. podMap[key] = &Pod{
  1385. Window: window.Clone(),
  1386. Start: *window.Start(),
  1387. End: *window.End(),
  1388. Key: key,
  1389. Allocations: map[string]*kubecost.Allocation{},
  1390. }
  1391. podMap[key].AppendContainer(container)
  1392. podMap[key].Allocations[container].Properties.Cluster = cluster
  1393. podMap[key].Allocations[container].Properties.Node = node
  1394. podMap[key].Allocations[container].Properties.Namespace = namespace
  1395. podMap[key].Allocations[container].Properties.Pod = pod
  1396. podMap[key].Allocations[container].Properties.Container = container
  1397. pvKey := kubecost.PVKey{
  1398. Cluster: cluster,
  1399. Name: kubecost.UnmountedSuffix,
  1400. }
  1401. unmountedPVs := kubecost.PVAllocations{
  1402. pvKey: {
  1403. ByteHours: unmountedPVBytes[cluster] * window.Minutes() / 60.0,
  1404. Cost: amount,
  1405. },
  1406. }
  1407. podMap[key].Allocations[container].PVs = podMap[key].Allocations[container].PVs.Add(unmountedPVs)
  1408. }
  1409. }
  1410. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1411. unmountedPVCBytes := map[namespaceKey]float64{}
  1412. unmountedPVCCost := map[namespaceKey]float64{}
  1413. for _, pvc := range pvcMap {
  1414. if !pvc.Mounted && pvc.Volume != nil {
  1415. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1416. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1417. hrs := pvc.Minutes() / 60.0
  1418. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1419. unmountedPVCCost[key] += cost
  1420. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1421. }
  1422. }
  1423. for key, amount := range unmountedPVCCost {
  1424. container := kubecost.UnmountedSuffix
  1425. pod := kubecost.UnmountedSuffix
  1426. namespace := key.Namespace
  1427. node := ""
  1428. cluster := key.Cluster
  1429. podKey := newPodKey(cluster, namespace, pod)
  1430. podMap[podKey] = &Pod{
  1431. Window: window.Clone(),
  1432. Start: *window.Start(),
  1433. End: *window.End(),
  1434. Key: podKey,
  1435. Allocations: map[string]*kubecost.Allocation{},
  1436. }
  1437. podMap[podKey].AppendContainer(container)
  1438. podMap[podKey].Allocations[container].Properties.Cluster = cluster
  1439. podMap[podKey].Allocations[container].Properties.Node = node
  1440. podMap[podKey].Allocations[container].Properties.Namespace = namespace
  1441. podMap[podKey].Allocations[container].Properties.Pod = pod
  1442. podMap[podKey].Allocations[container].Properties.Container = container
  1443. pvKey := kubecost.PVKey{
  1444. Cluster: cluster,
  1445. Name: kubecost.UnmountedSuffix,
  1446. }
  1447. unmountedPVs := kubecost.PVAllocations{
  1448. pvKey: {
  1449. ByteHours: unmountedPVCBytes[key] * window.Minutes() / 60.0,
  1450. Cost: amount,
  1451. },
  1452. }
  1453. podMap[podKey].Allocations[container].PVs = podMap[podKey].Allocations[container].PVs.Add(unmountedPVs)
  1454. }
  1455. }
  1456. // LB describes the start and end time of a Load Balancer along with cost
  1457. type LB struct {
  1458. TotalCost float64
  1459. Start time.Time
  1460. End time.Time
  1461. }
  1462. func getLoadBalancerCosts(resLBCost, resLBActiveMins []*prom.QueryResult, resolution time.Duration) map[serviceKey]*LB {
  1463. lbMap := make(map[serviceKey]*LB)
  1464. lbHourlyCosts := make(map[serviceKey]float64)
  1465. for _, res := range resLBCost {
  1466. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  1467. if err != nil {
  1468. continue
  1469. }
  1470. lbHourlyCosts[serviceKey] = res.Values[0].Value
  1471. }
  1472. for _, res := range resLBActiveMins {
  1473. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  1474. if err != nil || len(res.Values) == 0 {
  1475. continue
  1476. }
  1477. if _, ok := lbHourlyCosts[serviceKey]; !ok {
  1478. log.Warningf("CostModel: failed to find hourly cost for Load Balancer: %v", serviceKey)
  1479. continue
  1480. }
  1481. s := time.Unix(int64(res.Values[0].Timestamp), 0)
  1482. // subtract resolution from start time to cover full time period
  1483. s = s.Add(-resolution)
  1484. e := time.Unix(int64(res.Values[len(res.Values)-1].Timestamp), 0)
  1485. hours := e.Sub(s).Hours()
  1486. lbMap[serviceKey] = &LB{
  1487. TotalCost: lbHourlyCosts[serviceKey] * hours,
  1488. Start: s,
  1489. End: e,
  1490. }
  1491. }
  1492. return lbMap
  1493. }
  1494. func applyLoadBalancersToPods(lbMap map[serviceKey]*LB, allocsByService map[serviceKey][]*kubecost.Allocation) {
  1495. for sKey, lb := range lbMap {
  1496. totalHours := 0.0
  1497. allocHours := make(map[*kubecost.Allocation]float64)
  1498. // Add portion of load balancing cost to each allocation
  1499. // proportional to the total number of hours allocations used the load balancer
  1500. for _, alloc := range allocsByService[sKey] {
  1501. // Determine the (start, end) of the relationship between the
  1502. // given LB and the associated Allocation so that a precise
  1503. // number of hours can be used to compute cumulative cost.
  1504. s, e := alloc.Start, alloc.End
  1505. if lb.Start.After(alloc.Start) {
  1506. s = lb.Start
  1507. }
  1508. if lb.End.Before(alloc.End) {
  1509. e = lb.End
  1510. }
  1511. hours := e.Sub(s).Hours()
  1512. // A negative number of hours signifies no overlap between the windows
  1513. if hours > 0 {
  1514. totalHours += hours
  1515. allocHours[alloc] = hours
  1516. }
  1517. }
  1518. // Distribute cost of service once total hours is calculated
  1519. for alloc, hours := range allocHours {
  1520. alloc.LoadBalancerCost += lb.TotalCost * hours / totalHours
  1521. }
  1522. }
  1523. }
  1524. // getNodePricing determines node pricing, given a key and a mapping from keys
  1525. // to their NodePricing instances, as well as the custom pricing configuration
  1526. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1527. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1528. // back on custom pricing as a default.
  1529. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1530. // Find the relevant NodePricing, if it exists. If not, substitute the
  1531. // custom NodePricing as a default.
  1532. node, ok := nodeMap[nodeKey]
  1533. if !ok || node == nil {
  1534. if nodeKey.Node != "" {
  1535. log.DedupedWarningf(5, "CostModel: failed to find node for %s", nodeKey)
  1536. }
  1537. return cm.getCustomNodePricing(false)
  1538. }
  1539. // If custom pricing is enabled and can be retrieved, override detected
  1540. // node pricing with the custom values.
  1541. customPricingConfig, err := cm.Provider.GetConfig()
  1542. if err != nil {
  1543. log.Warningf("CostModel: failed to load custom pricing: %s", err)
  1544. }
  1545. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1546. return cm.getCustomNodePricing(node.Preemptible)
  1547. }
  1548. node.Source = "prometheus"
  1549. // If any of the values are NaN or zero, replace them with the custom
  1550. // values as default.
  1551. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1552. // them as strings like this?
  1553. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1554. log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1555. cpuCostStr := customPricingConfig.CPU
  1556. if node.Preemptible {
  1557. cpuCostStr = customPricingConfig.SpotCPU
  1558. }
  1559. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1560. if err != nil {
  1561. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1562. }
  1563. node.CostPerCPUHr = costPerCPUHr
  1564. node.Source += "/customCPU"
  1565. }
  1566. if math.IsNaN(node.CostPerGPUHr) {
  1567. log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1568. gpuCostStr := customPricingConfig.GPU
  1569. if node.Preemptible {
  1570. gpuCostStr = customPricingConfig.SpotGPU
  1571. }
  1572. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1573. if err != nil {
  1574. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1575. }
  1576. node.CostPerGPUHr = costPerGPUHr
  1577. node.Source += "/customGPU"
  1578. }
  1579. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1580. log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1581. ramCostStr := customPricingConfig.RAM
  1582. if node.Preemptible {
  1583. ramCostStr = customPricingConfig.SpotRAM
  1584. }
  1585. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1586. if err != nil {
  1587. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1588. }
  1589. node.CostPerRAMGiBHr = costPerRAMHr
  1590. node.Source += "/customRAM"
  1591. }
  1592. return node
  1593. }
  1594. // getCustomNodePricing converts the CostModel's configured custom pricing
  1595. // values into a NodePricing instance.
  1596. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1597. customPricingConfig, err := cm.Provider.GetConfig()
  1598. if err != nil {
  1599. return nil
  1600. }
  1601. cpuCostStr := customPricingConfig.CPU
  1602. gpuCostStr := customPricingConfig.GPU
  1603. ramCostStr := customPricingConfig.RAM
  1604. if spot {
  1605. cpuCostStr = customPricingConfig.SpotCPU
  1606. gpuCostStr = customPricingConfig.SpotGPU
  1607. ramCostStr = customPricingConfig.SpotRAM
  1608. }
  1609. node := &NodePricing{Source: "custom"}
  1610. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1611. if err != nil {
  1612. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1613. }
  1614. node.CostPerCPUHr = costPerCPUHr
  1615. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1616. if err != nil {
  1617. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1618. }
  1619. node.CostPerGPUHr = costPerGPUHr
  1620. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1621. if err != nil {
  1622. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1623. }
  1624. node.CostPerRAMGiBHr = costPerRAMHr
  1625. return node
  1626. }
  1627. // NodePricing describes the resource costs associated with a given node, as
  1628. // well as the source of the information (e.g. prometheus, custom)
  1629. type NodePricing struct {
  1630. Name string
  1631. NodeType string
  1632. ProviderID string
  1633. Preemptible bool
  1634. CostPerCPUHr float64
  1635. CostPerRAMGiBHr float64
  1636. CostPerGPUHr float64
  1637. Discount float64
  1638. Source string
  1639. }
  1640. // Pod describes a running pod's start and end time within a Window and
  1641. // all the Allocations (i.e. containers) contained within it.
  1642. type Pod struct {
  1643. Window kubecost.Window
  1644. Start time.Time
  1645. End time.Time
  1646. Key podKey
  1647. Allocations map[string]*kubecost.Allocation
  1648. }
  1649. // AppendContainer adds an entry for the given container name to the Pod.
  1650. func (p Pod) AppendContainer(container string) {
  1651. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1652. alloc := &kubecost.Allocation{
  1653. Name: name,
  1654. Properties: &kubecost.AllocationProperties{},
  1655. Window: p.Window.Clone(),
  1656. Start: p.Start,
  1657. End: p.End,
  1658. }
  1659. alloc.Properties.Container = container
  1660. alloc.Properties.Pod = p.Key.Pod
  1661. alloc.Properties.Namespace = p.Key.Namespace
  1662. alloc.Properties.Cluster = p.Key.Cluster
  1663. p.Allocations[container] = alloc
  1664. }
  1665. // PVC describes a PersistentVolumeClaim
  1666. // TODO:CLEANUP move to pkg/kubecost?
  1667. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1668. type PVC struct {
  1669. Bytes float64 `json:"bytes"`
  1670. Count int `json:"count"`
  1671. Name string `json:"name"`
  1672. Cluster string `json:"cluster"`
  1673. Namespace string `json:"namespace"`
  1674. Volume *PV `json:"persistentVolume"`
  1675. Mounted bool `json:"mounted"`
  1676. Start time.Time `json:"start"`
  1677. End time.Time `json:"end"`
  1678. }
  1679. // Cost computes the cumulative cost of the PVC
  1680. func (pvc *PVC) Cost() float64 {
  1681. if pvc == nil || pvc.Volume == nil {
  1682. return 0.0
  1683. }
  1684. gib := pvc.Bytes / 1024 / 1024 / 1024
  1685. hrs := pvc.Minutes() / 60.0
  1686. return pvc.Volume.CostPerGiBHour * gib * hrs
  1687. }
  1688. // Minutes computes the number of minutes over which the PVC is defined
  1689. func (pvc *PVC) Minutes() float64 {
  1690. if pvc == nil {
  1691. return 0.0
  1692. }
  1693. return pvc.End.Sub(pvc.Start).Minutes()
  1694. }
  1695. // String returns a string representation of the PVC
  1696. func (pvc *PVC) String() string {
  1697. if pvc == nil {
  1698. return "<nil>"
  1699. }
  1700. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  1701. }
  1702. // PV describes a PersistentVolume
  1703. // TODO:CLEANUP move to pkg/kubecost?
  1704. type PV struct {
  1705. Bytes float64 `json:"bytes"`
  1706. CostPerGiBHour float64 `json:"costPerGiBHour"`
  1707. Cluster string `json:"cluster"`
  1708. Name string `json:"name"`
  1709. StorageClass string `json:"storageClass"`
  1710. }
  1711. // String returns a string representation of the PV
  1712. func (pv *PV) String() string {
  1713. if pv == nil {
  1714. return "<nil>"
  1715. }
  1716. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  1717. }