allocation.go 67 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/env"
  10. "github.com/kubecost/cost-model/pkg/kubecost"
  11. "github.com/kubecost/cost-model/pkg/log"
  12. "github.com/kubecost/cost-model/pkg/prom"
  13. "github.com/kubecost/cost-model/pkg/util"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, cluster_id)[%s:%s]%s`
  18. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  19. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  20. queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  21. queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  22. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  23. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  24. queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  25. // This query could be written without the recording rule
  26. // "kubecost_savings_container_cpu_usage_seconds", but we should
  27. // only do that when we're ready to incur the performance tradeoffs
  28. // with subqueries which would probably be in the world of hourly
  29. // ETL.
  30. //
  31. // See PromQL subquery documentation for a rate example:
  32. // https://prometheus.io/blog/2019/01/28/subquery-support/#examples
  33. queryFmtCPUUsageMax = `
  34. max(
  35. max_over_time(
  36. kubecost_savings_container_cpu_usage_seconds[%s]%s
  37. )
  38. ) by (container_name, pod_name, namespace, instance, cluster_id)`
  39. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  40. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  41. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  42. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  43. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
  44. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, cluster_id)[%s:%s]%s`
  45. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, cluster_id)`
  46. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, cluster_id)`
  47. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, cluster_id)`
  48. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, cluster_id)`
  49. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  50. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (cluster_id)`
  51. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  52. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (cluster_id)`
  53. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  54. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (cluster_id)`
  55. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
  56. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
  57. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
  58. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
  59. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
  60. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
  61. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
  62. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, cluster_id)`
  63. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,cluster_id)`
  64. queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s]%s)) by (namespace, service_name, cluster_id)`
  65. queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, cluster_id)[%s:%s]%s`
  66. )
  67. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  68. // for the window defined by the given start and end times. The Allocations
  69. // returned are unaggregated (i.e. down to the container level).
  70. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  71. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  72. // 2. Run and apply the results of the remaining queries to
  73. // 3. Build out AllocationSet from completed Pod map
  74. // Create a window spanning the requested query
  75. window := kubecost.NewWindow(&start, &end)
  76. // Create an empty AllocationSet. For safety, in the case of an error, we
  77. // should prefer to return this empty set with the error. (In the case of
  78. // no error, of course we populate the set and return it.)
  79. allocSet := kubecost.NewAllocationSet(start, end)
  80. // (1) Build out Pod map
  81. // Build out a map of Allocations as a mapping from pod-to-container-to-
  82. // underlying-Allocation instance, starting with (start, end) so that we
  83. // begin with minutes, from which we compute resource allocation and cost
  84. // totals from measured rate data.
  85. podMap := map[podKey]*Pod{}
  86. // clusterStarts and clusterEnds record the earliest start and latest end
  87. // times, respectively, on a cluster-basis. These are used for unmounted
  88. // PVs and other "virtual" Allocations so that minutes are maximally
  89. // accurate during start-up or spin-down of a cluster
  90. clusterStart := map[string]time.Time{}
  91. clusterEnd := map[string]time.Time{}
  92. cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
  93. // (2) Run and apply remaining queries
  94. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  95. // including handling Thanos offset
  96. durStr, offStr, err := window.DurationOffsetForPrometheus()
  97. if err != nil {
  98. // Negative duration, so return empty set
  99. return allocSet, nil
  100. }
  101. // Convert resolution duration to a query-ready string
  102. resStr := util.DurationString(resolution)
  103. ctx := prom.NewContext(cm.PrometheusClient)
  104. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr)
  105. resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
  106. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr)
  107. resChRAMRequests := ctx.Query(queryRAMRequests)
  108. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, offStr)
  109. resChRAMUsageAvg := ctx.Query(queryRAMUsageAvg)
  110. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, offStr)
  111. resChRAMUsageMax := ctx.Query(queryRAMUsageMax)
  112. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr)
  113. resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
  114. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr)
  115. resChCPURequests := ctx.Query(queryCPURequests)
  116. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, offStr)
  117. resChCPUUsageAvg := ctx.Query(queryCPUUsageAvg)
  118. queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, offStr)
  119. resChCPUUsageMax := ctx.Query(queryCPUUsageMax)
  120. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr)
  121. resChGPUsRequested := ctx.Query(queryGPUsRequested)
  122. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr)
  123. resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
  124. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr)
  125. resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
  126. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr)
  127. resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
  128. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
  129. resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
  130. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, durStr, resStr, offStr)
  131. resChPVCInfo := ctx.Query(queryPVCInfo)
  132. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr)
  133. resChPVBytes := ctx.Query(queryPVBytes)
  134. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr)
  135. resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
  136. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr)
  137. resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
  138. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr)
  139. resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
  140. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr)
  141. resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
  142. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr)
  143. resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
  144. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr)
  145. resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
  146. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr)
  147. resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
  148. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr)
  149. resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
  150. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr)
  151. resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
  152. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
  153. resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
  154. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
  155. resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
  156. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
  157. resChPodLabels := ctx.Query(queryPodLabels)
  158. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
  159. resChPodAnnotations := ctx.Query(queryPodAnnotations)
  160. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
  161. resChServiceLabels := ctx.Query(queryServiceLabels)
  162. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
  163. resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
  164. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
  165. resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
  166. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr)
  167. resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
  168. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr)
  169. resChJobLabels := ctx.Query(queryJobLabels)
  170. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, offStr)
  171. resChLBCostPerHr := ctx.Query(queryLBCostPerHr)
  172. queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, durStr, resStr, offStr)
  173. resChLBActiveMins := ctx.Query(queryLBActiveMins)
  174. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  175. resCPURequests, _ := resChCPURequests.Await()
  176. resCPUUsageAvg, _ := resChCPUUsageAvg.Await()
  177. resCPUUsageMax, _ := resChCPUUsageMax.Await()
  178. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  179. resRAMRequests, _ := resChRAMRequests.Await()
  180. resRAMUsageAvg, _ := resChRAMUsageAvg.Await()
  181. resRAMUsageMax, _ := resChRAMUsageMax.Await()
  182. resGPUsRequested, _ := resChGPUsRequested.Await()
  183. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  184. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  185. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  186. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  187. resPVBytes, _ := resChPVBytes.Await()
  188. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  189. resPVCInfo, _ := resChPVCInfo.Await()
  190. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  191. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  192. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  193. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  194. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  195. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  196. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  197. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  198. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  199. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  200. resPodLabels, _ := resChPodLabels.Await()
  201. resPodAnnotations, _ := resChPodAnnotations.Await()
  202. resServiceLabels, _ := resChServiceLabels.Await()
  203. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  204. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  205. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  206. resJobLabels, _ := resChJobLabels.Await()
  207. resLBCostPerHr, _ := resChLBCostPerHr.Await()
  208. resLBActiveMins, _ := resChLBActiveMins.Await()
  209. if ctx.HasErrors() {
  210. for _, err := range ctx.Errors() {
  211. log.Errorf("CostModel.ComputeAllocation: %s", err)
  212. }
  213. return allocSet, ctx.ErrorCollection()
  214. }
  215. // We choose to apply allocation before requests in the cases of RAM and
  216. // CPU so that we can assert that allocation should always be greater than
  217. // or equal to request.
  218. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  219. applyCPUCoresRequested(podMap, resCPURequests)
  220. applyCPUCoresUsedAvg(podMap, resCPUUsageAvg)
  221. applyCPUCoresUsedMax(podMap, resCPUUsageMax)
  222. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  223. applyRAMBytesRequested(podMap, resRAMRequests)
  224. applyRAMBytesUsedAvg(podMap, resRAMUsageAvg)
  225. applyRAMBytesUsedMax(podMap, resRAMUsageMax)
  226. applyGPUsRequested(podMap, resGPUsRequested)
  227. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  228. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  229. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  230. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  231. podLabels := resToPodLabels(resPodLabels)
  232. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  233. podAnnotations := resToPodAnnotations(resPodAnnotations)
  234. applyLabels(podMap, namespaceLabels, podLabels)
  235. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  236. serviceLabels := getServiceLabels(resServiceLabels)
  237. allocsByService := map[serviceKey][]*kubecost.Allocation{}
  238. applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
  239. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  240. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  241. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  242. podJobMap := resToPodJobMap(resJobLabels)
  243. applyControllersToPods(podMap, podDeploymentMap)
  244. applyControllersToPods(podMap, podStatefulSetMap)
  245. applyControllersToPods(podMap, podDaemonSetMap)
  246. applyControllersToPods(podMap, podJobMap)
  247. // TODO breakdown network costs?
  248. // Build out a map of Nodes with resource costs, discounts, and node types
  249. // for converting resource allocation data to cumulative costs.
  250. nodeMap := map[nodeKey]*NodePricing{}
  251. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  252. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  253. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  254. applyNodeSpot(nodeMap, resNodeIsSpot)
  255. applyNodeDiscount(nodeMap, cm)
  256. // Build out the map of all PVs with class, size and cost-per-hour.
  257. // Note: this does not record time running, which we may want to
  258. // include later for increased PV precision. (As long as the PV has
  259. // a PVC, we get time running there, so this is only inaccurate
  260. // for short-lived, unmounted PVs.)
  261. pvMap := map[pvKey]*PV{}
  262. buildPVMap(pvMap, resPVCostPerGiBHour)
  263. applyPVBytes(pvMap, resPVBytes)
  264. // Build out the map of all PVCs with time running, bytes requested,
  265. // and connect to the correct PV from pvMap. (If no PV exists, that
  266. // is noted, but does not result in any allocation/cost.)
  267. pvcMap := map[pvcKey]*PVC{}
  268. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  269. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  270. // Build out the relationships of pods to their PVCs. This step
  271. // populates the PVC.Count field so that PVC allocation can be
  272. // split appropriately among each pod's container allocation.
  273. podPVCMap := map[podKey][]*PVC{}
  274. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  275. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  276. // cluster representing each cluster's unmounted PVs (if necessary).
  277. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  278. lbMap := getLoadBalancerCosts(resLBCostPerHr, resLBActiveMins, resolution)
  279. applyLoadBalancersToPods(lbMap, allocsByService)
  280. // (3) Build out AllocationSet from Pod map
  281. for _, pod := range podMap {
  282. for _, alloc := range pod.Allocations {
  283. cluster, _ := alloc.Properties.GetCluster()
  284. nodeName, _ := alloc.Properties.GetNode()
  285. namespace, _ := alloc.Properties.GetNamespace()
  286. pod, _ := alloc.Properties.GetPod()
  287. container, _ := alloc.Properties.GetContainer()
  288. podKey := newPodKey(cluster, namespace, pod)
  289. nodeKey := newNodeKey(cluster, nodeName)
  290. node := cm.getNodePricing(nodeMap, nodeKey)
  291. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  292. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  293. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  294. if pvcs, ok := podPVCMap[podKey]; ok {
  295. for _, pvc := range pvcs {
  296. // Determine the (start, end) of the relationship between the
  297. // given PVC and the associated Allocation so that a precise
  298. // number of hours can be used to compute cumulative cost.
  299. s, e := alloc.Start, alloc.End
  300. if pvc.Start.After(alloc.Start) {
  301. s = pvc.Start
  302. }
  303. if pvc.End.Before(alloc.End) {
  304. e = pvc.End
  305. }
  306. minutes := e.Sub(s).Minutes()
  307. hrs := minutes / 60.0
  308. count := float64(pvc.Count)
  309. if pvc.Count < 1 {
  310. count = 1
  311. }
  312. gib := pvc.Bytes / 1024 / 1024 / 1024
  313. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  314. // Apply the size and cost of the PV to the allocation, each
  315. // weighted by count (i.e. the number of containers in the pod)
  316. alloc.PVByteHours += pvc.Bytes * hrs / count
  317. alloc.PVCost += cost / count
  318. }
  319. }
  320. // Make sure that the name is correct (node may not be present at this
  321. // point due to it missing from queryMinutes) then insert.
  322. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  323. allocSet.Set(alloc)
  324. }
  325. }
  326. return allocSet, nil
  327. }
  328. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  329. // Assumes that window is positive and closed
  330. start, end := *window.Start(), *window.End()
  331. // Convert resolution duration to a query-ready string
  332. resStr := util.DurationString(resolution)
  333. ctx := prom.NewContext(cm.PrometheusClient)
  334. // Query for (start, end) by (pod, namespace, cluster) over the given
  335. // window, using the given resolution, and if necessary in batches no
  336. // larger than the given maximum batch size. If working in batches, track
  337. // overall progress by starting with (window.start, window.start) and
  338. // querying in batches no larger than maxBatchSize from start-to-end,
  339. // folding each result set into podMap as the results come back.
  340. coverage := kubecost.NewWindow(&start, &start)
  341. numQuery := 1
  342. for coverage.End().Before(end) {
  343. // Determine the (start, end) of the current batch
  344. batchStart := *coverage.End()
  345. batchEnd := coverage.End().Add(maxBatchSize)
  346. if batchEnd.After(end) {
  347. batchEnd = end
  348. }
  349. batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
  350. var resPods []*prom.QueryResult
  351. var err error
  352. maxTries := 3
  353. numTries := 0
  354. for resPods == nil && numTries < maxTries {
  355. numTries++
  356. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  357. // including handling Thanos offset
  358. durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
  359. if err != nil || durStr == "" {
  360. // Negative duration, so set empty results and don't query
  361. resPods = []*prom.QueryResult{}
  362. err = nil
  363. break
  364. }
  365. // Submit and profile query
  366. queryPods := fmt.Sprintf(queryFmtPods, durStr, resStr, offStr)
  367. queryProfile := time.Now()
  368. resPods, err = ctx.Query(queryPods).Await()
  369. if err != nil {
  370. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  371. resPods = nil
  372. }
  373. }
  374. if err != nil {
  375. return err
  376. }
  377. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  378. coverage = coverage.ExpandEnd(batchEnd)
  379. numQuery++
  380. }
  381. return nil
  382. }
  383. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  384. for _, res := range resPods {
  385. if len(res.Values) == 0 {
  386. log.Warningf("CostModel.ComputeAllocation: empty minutes result")
  387. continue
  388. }
  389. cluster, err := res.GetString("cluster_id")
  390. if err != nil {
  391. cluster = env.GetClusterID()
  392. }
  393. labels, err := res.GetStrings("namespace", "pod")
  394. if err != nil {
  395. log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  396. continue
  397. }
  398. namespace := labels["namespace"]
  399. pod := labels["pod"]
  400. key := newPodKey(cluster, namespace, pod)
  401. // allocStart and allocEnd are the timestamps of the first and last
  402. // minutes the pod was running, respectively. We subtract one resolution
  403. // from allocStart because this point will actually represent the end
  404. // of the first minute. We don't subtract from allocEnd because it
  405. // already represents the end of the last minute.
  406. var allocStart, allocEnd time.Time
  407. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  408. for _, datum := range res.Values {
  409. t := time.Unix(int64(datum.Timestamp), 0)
  410. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  411. // Set the start timestamp to the earliest non-zero timestamp
  412. allocStart = t
  413. // Record adjustment coefficient, i.e. the portion of the start
  414. // timestamp to "ignore". That is, sometimes the value will be
  415. // 0.5, meaning that we should discount the time running by
  416. // half of the resolution the timestamp stands for.
  417. startAdjustmentCoeff = (1.0 - datum.Value)
  418. }
  419. if datum.Value > 0 && window.Contains(t) {
  420. // Set the end timestamp to the latest non-zero timestamp
  421. allocEnd = t
  422. // Record adjustment coefficient, i.e. the portion of the end
  423. // timestamp to "ignore". (See explanation above for start.)
  424. endAdjustmentCoeff = (1.0 - datum.Value)
  425. }
  426. }
  427. if allocStart.IsZero() || allocEnd.IsZero() {
  428. continue
  429. }
  430. // Adjust timestamps according to the resolution and the adjustment
  431. // coefficients, as described above. That is, count the start timestamp
  432. // from the beginning of the resolution, not the end. Then "reduce" the
  433. // start and end by the correct amount, in the case that the "running"
  434. // value of the first or last timestamp was not a full 1.0.
  435. allocStart = allocStart.Add(-resolution)
  436. // Note: the *100 and /100 are necessary because Duration is an int, so
  437. // 0.5, for instance, will be truncated, resulting in no adjustment.
  438. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  439. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  440. // If there is only one point with a value <= 0.5 that the start and
  441. // end timestamps both share, then we will enter this case because at
  442. // least half of a resolution will be subtracted from both the start
  443. // and the end. If that is the case, then add back half of each side
  444. // so that the pod is said to run for half a resolution total.
  445. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  446. // end up with allocEnd == allocStart and each coeff == 0.5. In
  447. // that case, add 0.25m to each side, resulting in 0.5m duration.
  448. if !allocEnd.After(allocStart) {
  449. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  450. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  451. }
  452. // Set start if unset or this datum's start time is earlier than the
  453. // current earliest time.
  454. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  455. clusterStart[cluster] = allocStart
  456. }
  457. // Set end if unset or this datum's end time is later than the
  458. // current latest time.
  459. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  460. clusterEnd[cluster] = allocEnd
  461. }
  462. if pod, ok := podMap[key]; ok {
  463. // Pod has already been recorded, so update it accordingly
  464. if allocStart.Before(pod.Start) {
  465. pod.Start = allocStart
  466. }
  467. if allocEnd.After(pod.End) {
  468. pod.End = allocEnd
  469. }
  470. } else {
  471. // Pod has not been recorded yet, so insert it
  472. podMap[key] = &Pod{
  473. Window: window.Clone(),
  474. Start: allocStart,
  475. End: allocEnd,
  476. Key: key,
  477. Allocations: map[string]*kubecost.Allocation{},
  478. }
  479. }
  480. }
  481. }
  482. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  483. for _, res := range resCPUCoresAllocated {
  484. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  485. if err != nil {
  486. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  487. continue
  488. }
  489. pod, ok := podMap[key]
  490. if !ok {
  491. continue
  492. }
  493. container, err := res.GetString("container")
  494. if err != nil {
  495. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  496. continue
  497. }
  498. if _, ok := pod.Allocations[container]; !ok {
  499. pod.AppendContainer(container)
  500. }
  501. cpuCores := res.Values[0].Value
  502. hours := pod.Allocations[container].Minutes() / 60.0
  503. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  504. node, err := res.GetString("node")
  505. if err != nil {
  506. log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  507. continue
  508. }
  509. pod.Allocations[container].Properties.SetNode(node)
  510. }
  511. }
  512. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  513. for _, res := range resCPUCoresRequested {
  514. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  515. if err != nil {
  516. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  517. continue
  518. }
  519. pod, ok := podMap[key]
  520. if !ok {
  521. continue
  522. }
  523. container, err := res.GetString("container")
  524. if err != nil {
  525. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  526. continue
  527. }
  528. if _, ok := pod.Allocations[container]; !ok {
  529. pod.AppendContainer(container)
  530. }
  531. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  532. // If CPU allocation is less than requests, set CPUCoreHours to
  533. // request level.
  534. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  535. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  536. }
  537. node, err := res.GetString("node")
  538. if err != nil {
  539. log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  540. continue
  541. }
  542. pod.Allocations[container].Properties.SetNode(node)
  543. }
  544. }
  545. func applyCPUCoresUsedAvg(podMap map[podKey]*Pod, resCPUCoresUsedAvg []*prom.QueryResult) {
  546. for _, res := range resCPUCoresUsedAvg {
  547. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  548. if err != nil {
  549. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg result missing field: %s", err)
  550. continue
  551. }
  552. pod, ok := podMap[key]
  553. if !ok {
  554. continue
  555. }
  556. container, err := res.GetString("container_name")
  557. if err != nil {
  558. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg query result missing 'container': %s", key)
  559. continue
  560. }
  561. if _, ok := pod.Allocations[container]; !ok {
  562. pod.AppendContainer(container)
  563. }
  564. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  565. }
  566. }
  567. func applyCPUCoresUsedMax(podMap map[podKey]*Pod, resCPUCoresUsedMax []*prom.QueryResult) {
  568. for _, res := range resCPUCoresUsedMax {
  569. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  570. if err != nil {
  571. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max result missing field: %s", err)
  572. continue
  573. }
  574. pod, ok := podMap[key]
  575. if !ok {
  576. continue
  577. }
  578. container, err := res.GetString("container_name")
  579. if err != nil {
  580. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max query result missing 'container': %s", key)
  581. continue
  582. }
  583. if _, ok := pod.Allocations[container]; !ok {
  584. pod.AppendContainer(container)
  585. }
  586. pod.Allocations[container].CPUCoreUsageMax = res.Values[0].Value
  587. }
  588. }
  589. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  590. for _, res := range resRAMBytesAllocated {
  591. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  592. if err != nil {
  593. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  594. continue
  595. }
  596. pod, ok := podMap[key]
  597. if !ok {
  598. continue
  599. }
  600. container, err := res.GetString("container")
  601. if err != nil {
  602. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  603. continue
  604. }
  605. if _, ok := pod.Allocations[container]; !ok {
  606. pod.AppendContainer(container)
  607. }
  608. ramBytes := res.Values[0].Value
  609. hours := pod.Allocations[container].Minutes() / 60.0
  610. pod.Allocations[container].RAMByteHours = ramBytes * hours
  611. node, err := res.GetString("node")
  612. if err != nil {
  613. log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  614. continue
  615. }
  616. pod.Allocations[container].Properties.SetNode(node)
  617. }
  618. }
  619. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  620. for _, res := range resRAMBytesRequested {
  621. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  622. if err != nil {
  623. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  624. continue
  625. }
  626. pod, ok := podMap[key]
  627. if !ok {
  628. continue
  629. }
  630. container, err := res.GetString("container")
  631. if err != nil {
  632. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  633. continue
  634. }
  635. if _, ok := pod.Allocations[container]; !ok {
  636. pod.AppendContainer(container)
  637. }
  638. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  639. // If RAM allocation is less than requests, set RAMByteHours to
  640. // request level.
  641. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  642. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  643. }
  644. node, err := res.GetString("node")
  645. if err != nil {
  646. log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  647. continue
  648. }
  649. pod.Allocations[container].Properties.SetNode(node)
  650. }
  651. }
  652. func applyRAMBytesUsedAvg(podMap map[podKey]*Pod, resRAMBytesUsedAvg []*prom.QueryResult) {
  653. for _, res := range resRAMBytesUsedAvg {
  654. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  655. if err != nil {
  656. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM avg usage result missing field: %s", err)
  657. continue
  658. }
  659. pod, ok := podMap[key]
  660. if !ok {
  661. continue
  662. }
  663. container, err := res.GetString("container_name")
  664. if err != nil {
  665. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage avg query result missing 'container': %s", key)
  666. continue
  667. }
  668. if _, ok := pod.Allocations[container]; !ok {
  669. pod.AppendContainer(container)
  670. }
  671. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  672. }
  673. }
  674. func applyRAMBytesUsedMax(podMap map[podKey]*Pod, resRAMBytesUsedMax []*prom.QueryResult) {
  675. for _, res := range resRAMBytesUsedMax {
  676. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  677. if err != nil {
  678. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max result missing field: %s", err)
  679. continue
  680. }
  681. pod, ok := podMap[key]
  682. if !ok {
  683. continue
  684. }
  685. container, err := res.GetString("container_name")
  686. if err != nil {
  687. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max query result missing 'container': %s", key)
  688. continue
  689. }
  690. if _, ok := pod.Allocations[container]; !ok {
  691. pod.AppendContainer(container)
  692. }
  693. pod.Allocations[container].RAMBytesUsageMax = res.Values[0].Value
  694. }
  695. }
  696. func applyGPUsRequested(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult) {
  697. for _, res := range resGPUsRequested {
  698. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  699. if err != nil {
  700. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  701. continue
  702. }
  703. pod, ok := podMap[key]
  704. if !ok {
  705. continue
  706. }
  707. container, err := res.GetString("container")
  708. if err != nil {
  709. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  710. continue
  711. }
  712. if _, ok := pod.Allocations[container]; !ok {
  713. pod.AppendContainer(container)
  714. }
  715. hrs := pod.Allocations[container].Minutes() / 60.0
  716. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  717. }
  718. }
  719. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  720. costPerGiBByCluster := map[string]float64{}
  721. for _, res := range resNetworkCostPerGiB {
  722. cluster, err := res.GetString("cluster_id")
  723. if err != nil {
  724. cluster = env.GetClusterID()
  725. }
  726. costPerGiBByCluster[cluster] = res.Values[0].Value
  727. }
  728. for _, res := range resNetworkGiB {
  729. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  730. if err != nil {
  731. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  732. continue
  733. }
  734. pod, ok := podMap[podKey]
  735. if !ok {
  736. continue
  737. }
  738. for _, alloc := range pod.Allocations {
  739. gib := res.Values[0].Value / float64(len(pod.Allocations))
  740. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  741. alloc.NetworkCost = gib * costPerGiB
  742. }
  743. }
  744. }
  745. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
  746. namespaceLabels := map[namespaceKey]map[string]string{}
  747. for _, res := range resNamespaceLabels {
  748. nsKey, err := resultNamespaceKey(res, "cluster_id", "namespace")
  749. if err != nil {
  750. continue
  751. }
  752. if _, ok := namespaceLabels[nsKey]; !ok {
  753. namespaceLabels[nsKey] = map[string]string{}
  754. }
  755. for k, l := range res.GetLabels() {
  756. namespaceLabels[nsKey][k] = l
  757. }
  758. }
  759. return namespaceLabels
  760. }
  761. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  762. podLabels := map[podKey]map[string]string{}
  763. for _, res := range resPodLabels {
  764. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  765. if err != nil {
  766. continue
  767. }
  768. if _, ok := podLabels[podKey]; !ok {
  769. podLabels[podKey] = map[string]string{}
  770. }
  771. for k, l := range res.GetLabels() {
  772. podLabels[podKey][k] = l
  773. }
  774. }
  775. return podLabels
  776. }
  777. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  778. namespaceAnnotations := map[string]map[string]string{}
  779. for _, res := range resNamespaceAnnotations {
  780. namespace, err := res.GetString("namespace")
  781. if err != nil {
  782. continue
  783. }
  784. if _, ok := namespaceAnnotations[namespace]; !ok {
  785. namespaceAnnotations[namespace] = map[string]string{}
  786. }
  787. for k, l := range res.GetAnnotations() {
  788. namespaceAnnotations[namespace][k] = l
  789. }
  790. }
  791. return namespaceAnnotations
  792. }
  793. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  794. podAnnotations := map[podKey]map[string]string{}
  795. for _, res := range resPodAnnotations {
  796. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  797. if err != nil {
  798. continue
  799. }
  800. if _, ok := podAnnotations[podKey]; !ok {
  801. podAnnotations[podKey] = map[string]string{}
  802. }
  803. for k, l := range res.GetAnnotations() {
  804. podAnnotations[podKey][k] = l
  805. }
  806. }
  807. return podAnnotations
  808. }
  809. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
  810. for podKey, pod := range podMap {
  811. for _, alloc := range pod.Allocations {
  812. allocLabels, err := alloc.Properties.GetLabels()
  813. if err != nil {
  814. allocLabels = map[string]string{}
  815. }
  816. // Apply namespace labels first, then pod labels so that pod labels
  817. // overwrite namespace labels.
  818. nsKey := newNamespaceKey(podKey.Cluster, podKey.Namespace)
  819. if labels, ok := namespaceLabels[nsKey]; ok {
  820. for k, v := range labels {
  821. allocLabels[k] = v
  822. }
  823. }
  824. if labels, ok := podLabels[podKey]; ok {
  825. for k, v := range labels {
  826. allocLabels[k] = v
  827. }
  828. }
  829. alloc.Properties.SetLabels(allocLabels)
  830. }
  831. }
  832. }
  833. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  834. for key, pod := range podMap {
  835. for _, alloc := range pod.Allocations {
  836. allocAnnotations, err := alloc.Properties.GetAnnotations()
  837. if err != nil {
  838. allocAnnotations = map[string]string{}
  839. }
  840. // Apply namespace annotations first, then pod annotations so that
  841. // pod labels overwrite namespace labels.
  842. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  843. for k, v := range labels {
  844. allocAnnotations[k] = v
  845. }
  846. }
  847. if labels, ok := podAnnotations[key]; ok {
  848. for k, v := range labels {
  849. allocAnnotations[k] = v
  850. }
  851. }
  852. alloc.Properties.SetAnnotations(allocAnnotations)
  853. }
  854. }
  855. }
  856. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  857. serviceLabels := map[serviceKey]map[string]string{}
  858. for _, res := range resServiceLabels {
  859. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service")
  860. if err != nil {
  861. continue
  862. }
  863. if _, ok := serviceLabels[serviceKey]; !ok {
  864. serviceLabels[serviceKey] = map[string]string{}
  865. }
  866. for k, l := range res.GetLabels() {
  867. serviceLabels[serviceKey][k] = l
  868. }
  869. }
  870. // Prune duplicate services. That is, if the same service exists with
  871. // hyphens instead of underscores, keep the one that uses hyphens.
  872. for key := range serviceLabels {
  873. if strings.Contains(key.Service, "_") {
  874. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  875. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  876. if _, ok := serviceLabels[duplicateKey]; ok {
  877. delete(serviceLabels, key)
  878. }
  879. }
  880. }
  881. return serviceLabels
  882. }
  883. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  884. deploymentLabels := map[controllerKey]map[string]string{}
  885. for _, res := range resDeploymentLabels {
  886. controllerKey, err := resultDeploymentKey(res, "cluster_id", "namespace", "deployment")
  887. if err != nil {
  888. continue
  889. }
  890. if _, ok := deploymentLabels[controllerKey]; !ok {
  891. deploymentLabels[controllerKey] = map[string]string{}
  892. }
  893. for k, l := range res.GetLabels() {
  894. deploymentLabels[controllerKey][k] = l
  895. }
  896. }
  897. // Prune duplicate deployments. That is, if the same deployment exists with
  898. // hyphens instead of underscores, keep the one that uses hyphens.
  899. for key := range deploymentLabels {
  900. if strings.Contains(key.Controller, "_") {
  901. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  902. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  903. if _, ok := deploymentLabels[duplicateKey]; ok {
  904. delete(deploymentLabels, key)
  905. }
  906. }
  907. }
  908. return deploymentLabels
  909. }
  910. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  911. statefulSetLabels := map[controllerKey]map[string]string{}
  912. for _, res := range resStatefulSetLabels {
  913. controllerKey, err := resultStatefulSetKey(res, "cluster_id", "namespace", "statefulSet")
  914. if err != nil {
  915. continue
  916. }
  917. if _, ok := statefulSetLabels[controllerKey]; !ok {
  918. statefulSetLabels[controllerKey] = map[string]string{}
  919. }
  920. for k, l := range res.GetLabels() {
  921. statefulSetLabels[controllerKey][k] = l
  922. }
  923. }
  924. // Prune duplicate stateful sets. That is, if the same stateful set exists
  925. // with hyphens instead of underscores, keep the one that uses hyphens.
  926. for key := range statefulSetLabels {
  927. if strings.Contains(key.Controller, "_") {
  928. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  929. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  930. if _, ok := statefulSetLabels[duplicateKey]; ok {
  931. delete(statefulSetLabels, key)
  932. }
  933. }
  934. }
  935. return statefulSetLabels
  936. }
  937. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  938. podControllerMap := map[podKey]controllerKey{}
  939. // For each controller, turn the labels into a selector and attempt to
  940. // match it with each set of pod labels. A match indicates that the pod
  941. // belongs to the controller.
  942. for cKey, cLabels := range controllerLabels {
  943. selector := labels.Set(cLabels).AsSelectorPreValidated()
  944. for pKey, pLabels := range podLabels {
  945. // If the pod is in a different cluster or namespace, there is
  946. // no need to compare the labels.
  947. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  948. continue
  949. }
  950. podLabelSet := labels.Set(pLabels)
  951. if selector.Matches(podLabelSet) {
  952. if _, ok := podControllerMap[pKey]; ok {
  953. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  954. }
  955. podControllerMap[pKey] = cKey
  956. }
  957. }
  958. }
  959. return podControllerMap
  960. }
  961. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  962. daemonSetLabels := map[podKey]controllerKey{}
  963. for _, res := range resDaemonSetLabels {
  964. controllerKey, err := resultDaemonSetKey(res, "cluster_id", "namespace", "owner_name")
  965. if err != nil {
  966. continue
  967. }
  968. pod, err := res.GetString("pod")
  969. if err != nil {
  970. log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  971. }
  972. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  973. daemonSetLabels[podKey] = controllerKey
  974. }
  975. return daemonSetLabels
  976. }
  977. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  978. jobLabels := map[podKey]controllerKey{}
  979. for _, res := range resJobLabels {
  980. controllerKey, err := resultJobKey(res, "cluster_id", "namespace", "owner_name")
  981. if err != nil {
  982. continue
  983. }
  984. // Convert the name of Jobs generated by CronJobs to the name of the
  985. // CronJob by stripping the timestamp off the end.
  986. match := isCron.FindStringSubmatch(controllerKey.Controller)
  987. if match != nil {
  988. controllerKey.Controller = match[1]
  989. }
  990. pod, err := res.GetString("pod")
  991. if err != nil {
  992. log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  993. }
  994. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  995. jobLabels[podKey] = controllerKey
  996. }
  997. return jobLabels
  998. }
  999. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, allocsByService map[serviceKey][]*kubecost.Allocation, serviceLabels map[serviceKey]map[string]string) {
  1000. podServicesMap := map[podKey][]serviceKey{}
  1001. // For each service, turn the labels into a selector and attempt to
  1002. // match it with each set of pod labels. A match indicates that the pod
  1003. // belongs to the service.
  1004. for sKey, sLabels := range serviceLabels {
  1005. selector := labels.Set(sLabels).AsSelectorPreValidated()
  1006. for pKey, pLabels := range podLabels {
  1007. // If the pod is in a different cluster or namespace, there is
  1008. // no need to compare the labels.
  1009. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  1010. continue
  1011. }
  1012. podLabelSet := labels.Set(pLabels)
  1013. if selector.Matches(podLabelSet) {
  1014. if _, ok := podServicesMap[pKey]; !ok {
  1015. podServicesMap[pKey] = []serviceKey{}
  1016. }
  1017. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  1018. }
  1019. }
  1020. }
  1021. // For each allocation in each pod, attempt to find and apply the list of
  1022. // services associated with the allocation's pod.
  1023. for key, pod := range podMap {
  1024. for _, alloc := range pod.Allocations {
  1025. if sKeys, ok := podServicesMap[key]; ok {
  1026. services := []string{}
  1027. for _, sKey := range sKeys {
  1028. services = append(services, sKey.Service)
  1029. allocsByService[sKey] = append(allocsByService[sKey], alloc)
  1030. }
  1031. alloc.Properties.SetServices(services)
  1032. }
  1033. }
  1034. }
  1035. }
  1036. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  1037. for key, pod := range podMap {
  1038. for _, alloc := range pod.Allocations {
  1039. if controllerKey, ok := podControllerMap[key]; ok {
  1040. alloc.Properties.SetControllerKind(controllerKey.ControllerKind)
  1041. alloc.Properties.SetController(controllerKey.Controller)
  1042. }
  1043. }
  1044. }
  1045. }
  1046. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  1047. for _, res := range resNodeCostPerCPUHr {
  1048. cluster, err := res.GetString("cluster_id")
  1049. if err != nil {
  1050. cluster = env.GetClusterID()
  1051. }
  1052. node, err := res.GetString("node")
  1053. if err != nil {
  1054. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1055. continue
  1056. }
  1057. instanceType, err := res.GetString("instance_type")
  1058. if err != nil {
  1059. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1060. continue
  1061. }
  1062. key := newNodeKey(cluster, node)
  1063. if _, ok := nodeMap[key]; !ok {
  1064. nodeMap[key] = &NodePricing{
  1065. Name: node,
  1066. NodeType: instanceType,
  1067. }
  1068. }
  1069. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  1070. }
  1071. }
  1072. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  1073. for _, res := range resNodeCostPerRAMGiBHr {
  1074. cluster, err := res.GetString("cluster_id")
  1075. if err != nil {
  1076. cluster = env.GetClusterID()
  1077. }
  1078. node, err := res.GetString("node")
  1079. if err != nil {
  1080. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1081. continue
  1082. }
  1083. instanceType, err := res.GetString("instance_type")
  1084. if err != nil {
  1085. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1086. continue
  1087. }
  1088. key := newNodeKey(cluster, node)
  1089. if _, ok := nodeMap[key]; !ok {
  1090. nodeMap[key] = &NodePricing{
  1091. Name: node,
  1092. NodeType: instanceType,
  1093. }
  1094. }
  1095. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1096. }
  1097. }
  1098. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1099. for _, res := range resNodeCostPerGPUHr {
  1100. cluster, err := res.GetString("cluster_id")
  1101. if err != nil {
  1102. cluster = env.GetClusterID()
  1103. }
  1104. node, err := res.GetString("node")
  1105. if err != nil {
  1106. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1107. continue
  1108. }
  1109. instanceType, err := res.GetString("instance_type")
  1110. if err != nil {
  1111. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1112. continue
  1113. }
  1114. key := newNodeKey(cluster, node)
  1115. if _, ok := nodeMap[key]; !ok {
  1116. nodeMap[key] = &NodePricing{
  1117. Name: node,
  1118. NodeType: instanceType,
  1119. }
  1120. }
  1121. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1122. }
  1123. }
  1124. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1125. for _, res := range resNodeIsSpot {
  1126. cluster, err := res.GetString("cluster_id")
  1127. if err != nil {
  1128. cluster = env.GetClusterID()
  1129. }
  1130. node, err := res.GetString("node")
  1131. if err != nil {
  1132. log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1133. continue
  1134. }
  1135. key := newNodeKey(cluster, node)
  1136. if _, ok := nodeMap[key]; !ok {
  1137. log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1138. continue
  1139. }
  1140. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1141. }
  1142. }
  1143. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1144. if cm == nil {
  1145. return
  1146. }
  1147. c, err := cm.Provider.GetConfig()
  1148. if err != nil {
  1149. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1150. return
  1151. }
  1152. discount, err := ParsePercentString(c.Discount)
  1153. if err != nil {
  1154. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1155. return
  1156. }
  1157. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1158. if err != nil {
  1159. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1160. return
  1161. }
  1162. for _, node := range nodeMap {
  1163. // TODO GKE Reserved Instances into account
  1164. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1165. node.CostPerCPUHr *= (1.0 - node.Discount)
  1166. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1167. }
  1168. }
  1169. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1170. for _, res := range resPVCostPerGiBHour {
  1171. cluster, err := res.GetString("cluster_id")
  1172. if err != nil {
  1173. cluster = env.GetClusterID()
  1174. }
  1175. name, err := res.GetString("volumename")
  1176. if err != nil {
  1177. log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
  1178. continue
  1179. }
  1180. key := newPVKey(cluster, name)
  1181. pvMap[key] = &PV{
  1182. Cluster: cluster,
  1183. Name: name,
  1184. CostPerGiBHour: res.Values[0].Value,
  1185. }
  1186. }
  1187. }
  1188. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1189. for _, res := range resPVBytes {
  1190. key, err := resultPVKey(res, "cluster_id", "persistentvolume")
  1191. if err != nil {
  1192. log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1193. continue
  1194. }
  1195. if _, ok := pvMap[key]; !ok {
  1196. log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1197. continue
  1198. }
  1199. pvMap[key].Bytes = res.Values[0].Value
  1200. }
  1201. }
  1202. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1203. for _, res := range resPVCInfo {
  1204. cluster, err := res.GetString("cluster_id")
  1205. if err != nil {
  1206. cluster = env.GetClusterID()
  1207. }
  1208. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1209. if err != nil {
  1210. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1211. continue
  1212. }
  1213. namespace := values["namespace"]
  1214. name := values["persistentvolumeclaim"]
  1215. volume := values["volumename"]
  1216. storageClass := values["storageclass"]
  1217. pvKey := newPVKey(cluster, volume)
  1218. pvcKey := newPVCKey(cluster, namespace, name)
  1219. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1220. // the PVC was running, respectively. We subtract 1m from pvcStart
  1221. // because this point will actually represent the end of the first
  1222. // minute. We don't subtract from pvcEnd because it already represents
  1223. // the end of the last minute.
  1224. var pvcStart, pvcEnd time.Time
  1225. for _, datum := range res.Values {
  1226. t := time.Unix(int64(datum.Timestamp), 0)
  1227. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1228. pvcStart = t
  1229. }
  1230. if datum.Value > 0 && window.Contains(t) {
  1231. pvcEnd = t
  1232. }
  1233. }
  1234. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1235. log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1236. }
  1237. pvcStart = pvcStart.Add(-time.Minute)
  1238. if _, ok := pvMap[pvKey]; !ok {
  1239. continue
  1240. }
  1241. pvMap[pvKey].StorageClass = storageClass
  1242. if _, ok := pvcMap[pvcKey]; !ok {
  1243. pvcMap[pvcKey] = &PVC{}
  1244. }
  1245. pvcMap[pvcKey].Name = name
  1246. pvcMap[pvcKey].Namespace = namespace
  1247. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1248. pvcMap[pvcKey].Start = pvcStart
  1249. pvcMap[pvcKey].End = pvcEnd
  1250. }
  1251. }
  1252. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1253. for _, res := range resPVCBytesRequested {
  1254. key, err := resultPVCKey(res, "cluster_id", "namespace", "persistentvolumeclaim")
  1255. if err != nil {
  1256. continue
  1257. }
  1258. if _, ok := pvcMap[key]; !ok {
  1259. continue
  1260. }
  1261. pvcMap[key].Bytes = res.Values[0].Value
  1262. }
  1263. }
  1264. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1265. for _, res := range resPodPVCAllocation {
  1266. cluster, err := res.GetString("cluster_id")
  1267. if err != nil {
  1268. cluster = env.GetClusterID()
  1269. }
  1270. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1271. if err != nil {
  1272. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1273. continue
  1274. }
  1275. namespace := values["namespace"]
  1276. pod := values["pod"]
  1277. name := values["persistentvolumeclaim"]
  1278. volume := values["persistentvolume"]
  1279. podKey := newPodKey(cluster, namespace, pod)
  1280. pvKey := newPVKey(cluster, volume)
  1281. pvcKey := newPVCKey(cluster, namespace, name)
  1282. if _, ok := pvMap[pvKey]; !ok {
  1283. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1284. continue
  1285. }
  1286. if _, ok := podPVCMap[podKey]; !ok {
  1287. podPVCMap[podKey] = []*PVC{}
  1288. }
  1289. pvc, ok := pvcMap[pvcKey]
  1290. if !ok {
  1291. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1292. continue
  1293. }
  1294. count := 1
  1295. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1296. count = len(pod.Allocations)
  1297. } else {
  1298. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1299. }
  1300. pvc.Count = count
  1301. pvc.Mounted = true
  1302. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1303. }
  1304. }
  1305. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1306. unmountedPVBytes := map[string]float64{}
  1307. unmountedPVCost := map[string]float64{}
  1308. for _, pv := range pvMap {
  1309. mounted := false
  1310. for _, pvc := range pvcMap {
  1311. if pvc.Volume == nil {
  1312. continue
  1313. }
  1314. if pvc.Volume == pv {
  1315. mounted = true
  1316. break
  1317. }
  1318. }
  1319. if !mounted {
  1320. gib := pv.Bytes / 1024 / 1024 / 1024
  1321. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1322. cost := pv.CostPerGiBHour * gib * hrs
  1323. unmountedPVCost[pv.Cluster] += cost
  1324. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1325. }
  1326. }
  1327. for cluster, amount := range unmountedPVCost {
  1328. container := kubecost.UnmountedSuffix
  1329. pod := kubecost.UnmountedSuffix
  1330. namespace := kubecost.UnmountedSuffix
  1331. node := ""
  1332. key := newPodKey(cluster, namespace, pod)
  1333. podMap[key] = &Pod{
  1334. Window: window.Clone(),
  1335. Start: *window.Start(),
  1336. End: *window.End(),
  1337. Key: key,
  1338. Allocations: map[string]*kubecost.Allocation{},
  1339. }
  1340. podMap[key].AppendContainer(container)
  1341. podMap[key].Allocations[container].Properties.SetCluster(cluster)
  1342. podMap[key].Allocations[container].Properties.SetNode(node)
  1343. podMap[key].Allocations[container].Properties.SetNamespace(namespace)
  1344. podMap[key].Allocations[container].Properties.SetPod(pod)
  1345. podMap[key].Allocations[container].Properties.SetContainer(container)
  1346. podMap[key].Allocations[container].PVByteHours = unmountedPVBytes[cluster] * window.Minutes() / 60.0
  1347. podMap[key].Allocations[container].PVCost = amount
  1348. }
  1349. }
  1350. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1351. unmountedPVCBytes := map[namespaceKey]float64{}
  1352. unmountedPVCCost := map[namespaceKey]float64{}
  1353. for _, pvc := range pvcMap {
  1354. if !pvc.Mounted && pvc.Volume != nil {
  1355. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1356. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1357. hrs := pvc.Minutes() / 60.0
  1358. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1359. unmountedPVCCost[key] += cost
  1360. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1361. }
  1362. }
  1363. for key, amount := range unmountedPVCCost {
  1364. container := kubecost.UnmountedSuffix
  1365. pod := kubecost.UnmountedSuffix
  1366. namespace := key.Namespace
  1367. node := ""
  1368. cluster := key.Cluster
  1369. podKey := newPodKey(cluster, namespace, pod)
  1370. podMap[podKey] = &Pod{
  1371. Window: window.Clone(),
  1372. Start: *window.Start(),
  1373. End: *window.End(),
  1374. Key: podKey,
  1375. Allocations: map[string]*kubecost.Allocation{},
  1376. }
  1377. podMap[podKey].AppendContainer(container)
  1378. podMap[podKey].Allocations[container].Properties.SetCluster(cluster)
  1379. podMap[podKey].Allocations[container].Properties.SetNode(node)
  1380. podMap[podKey].Allocations[container].Properties.SetNamespace(namespace)
  1381. podMap[podKey].Allocations[container].Properties.SetPod(pod)
  1382. podMap[podKey].Allocations[container].Properties.SetContainer(container)
  1383. podMap[podKey].Allocations[container].PVByteHours = unmountedPVCBytes[key] * window.Minutes() / 60.0
  1384. podMap[podKey].Allocations[container].PVCost = amount
  1385. }
  1386. }
  1387. // LB describes the start and end time of a Load Balancer along with cost
  1388. type LB struct {
  1389. TotalCost float64
  1390. Start time.Time
  1391. End time.Time
  1392. }
  1393. func getLoadBalancerCosts(resLBCost, resLBActiveMins []*prom.QueryResult, resolution time.Duration) map[serviceKey]*LB {
  1394. lbMap := make(map[serviceKey]*LB)
  1395. lbHourlyCosts := make(map[serviceKey]float64)
  1396. for _, res := range resLBCost {
  1397. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service_name")
  1398. if err != nil {
  1399. continue
  1400. }
  1401. lbHourlyCosts[serviceKey] = res.Values[0].Value
  1402. }
  1403. for _, res := range resLBActiveMins {
  1404. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service_name")
  1405. if err != nil || len(res.Values) == 0 {
  1406. continue
  1407. }
  1408. if _, ok := lbHourlyCosts[serviceKey]; !ok {
  1409. log.Warningf("CostModel: failed to find hourly cost for Load Balancer: %v", serviceKey)
  1410. continue
  1411. }
  1412. s := time.Unix(int64(res.Values[0].Timestamp), 0)
  1413. // subtract resolution from start time to cover full time period
  1414. s = s.Add(-resolution)
  1415. e := time.Unix(int64(res.Values[len(res.Values)-1].Timestamp), 0)
  1416. hours := e.Sub(s).Hours()
  1417. lbMap[serviceKey] = &LB{
  1418. TotalCost: lbHourlyCosts[serviceKey] * hours,
  1419. Start: s,
  1420. End: e,
  1421. }
  1422. }
  1423. return lbMap
  1424. }
  1425. func applyLoadBalancersToPods(lbMap map[serviceKey]*LB, allocsByService map[serviceKey][]*kubecost.Allocation) {
  1426. for sKey, lb := range lbMap {
  1427. totalHours := 0.0
  1428. allocHours := make(map[*kubecost.Allocation]float64)
  1429. // Add portion of load balancing cost to each allocation
  1430. // proportional to the total number of hours allocations used the load balancer
  1431. for _, alloc := range allocsByService[sKey] {
  1432. // Determine the (start, end) of the relationship between the
  1433. // given LB and the associated Allocation so that a precise
  1434. // number of hours can be used to compute cumulative cost.
  1435. s, e := alloc.Start, alloc.End
  1436. if lb.Start.After(alloc.Start) {
  1437. s = lb.Start
  1438. }
  1439. if lb.End.Before(alloc.End) {
  1440. e = lb.End
  1441. }
  1442. hours := e.Sub(s).Hours()
  1443. // A negative number of hours signifies no overlap between the windows
  1444. if hours > 0 {
  1445. totalHours += hours
  1446. allocHours[alloc] = hours
  1447. }
  1448. }
  1449. // Distribute cost of service once total hours is calculated
  1450. for alloc, hours := range allocHours {
  1451. alloc.LoadBalancerCost += lb.TotalCost * hours / totalHours
  1452. }
  1453. }
  1454. }
  1455. // getNodePricing determines node pricing, given a key and a mapping from keys
  1456. // to their NodePricing instances, as well as the custom pricing configuration
  1457. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1458. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1459. // back on custom pricing as a default.
  1460. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1461. // Find the relevant NodePricing, if it exists. If not, substitute the
  1462. // custom NodePricing as a default.
  1463. node, ok := nodeMap[nodeKey]
  1464. if !ok || node == nil {
  1465. if nodeKey.Node != "" {
  1466. log.DedupedWarningf(5, "CostModel: failed to find node for %s", nodeKey)
  1467. }
  1468. return cm.getCustomNodePricing(false)
  1469. }
  1470. // If custom pricing is enabled and can be retrieved, override detected
  1471. // node pricing with the custom values.
  1472. customPricingConfig, err := cm.Provider.GetConfig()
  1473. if err != nil {
  1474. log.Warningf("CostModel: failed to load custom pricing: %s", err)
  1475. }
  1476. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1477. return cm.getCustomNodePricing(node.Preemptible)
  1478. }
  1479. node.Source = "prometheus"
  1480. // If any of the values are NaN or zero, replace them with the custom
  1481. // values as default.
  1482. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1483. // them as strings like this?
  1484. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1485. log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1486. cpuCostStr := customPricingConfig.CPU
  1487. if node.Preemptible {
  1488. cpuCostStr = customPricingConfig.SpotCPU
  1489. }
  1490. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1491. if err != nil {
  1492. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1493. }
  1494. node.CostPerCPUHr = costPerCPUHr
  1495. node.Source += "/customCPU"
  1496. }
  1497. if math.IsNaN(node.CostPerGPUHr) {
  1498. log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1499. gpuCostStr := customPricingConfig.GPU
  1500. if node.Preemptible {
  1501. gpuCostStr = customPricingConfig.SpotGPU
  1502. }
  1503. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1504. if err != nil {
  1505. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1506. }
  1507. node.CostPerGPUHr = costPerGPUHr
  1508. node.Source += "/customGPU"
  1509. }
  1510. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1511. log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1512. ramCostStr := customPricingConfig.RAM
  1513. if node.Preemptible {
  1514. ramCostStr = customPricingConfig.SpotRAM
  1515. }
  1516. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1517. if err != nil {
  1518. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1519. }
  1520. node.CostPerRAMGiBHr = costPerRAMHr
  1521. node.Source += "/customRAM"
  1522. }
  1523. return node
  1524. }
  1525. // getCustomNodePricing converts the CostModel's configured custom pricing
  1526. // values into a NodePricing instance.
  1527. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1528. customPricingConfig, err := cm.Provider.GetConfig()
  1529. if err != nil {
  1530. return nil
  1531. }
  1532. cpuCostStr := customPricingConfig.CPU
  1533. gpuCostStr := customPricingConfig.GPU
  1534. ramCostStr := customPricingConfig.RAM
  1535. if spot {
  1536. cpuCostStr = customPricingConfig.SpotCPU
  1537. gpuCostStr = customPricingConfig.SpotGPU
  1538. ramCostStr = customPricingConfig.SpotRAM
  1539. }
  1540. node := &NodePricing{Source: "custom"}
  1541. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1542. if err != nil {
  1543. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1544. }
  1545. node.CostPerCPUHr = costPerCPUHr
  1546. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1547. if err != nil {
  1548. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1549. }
  1550. node.CostPerGPUHr = costPerGPUHr
  1551. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1552. if err != nil {
  1553. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1554. }
  1555. node.CostPerRAMGiBHr = costPerRAMHr
  1556. return node
  1557. }
  1558. // NodePricing describes the resource costs associated with a given node, as
  1559. // well as the source of the information (e.g. prometheus, custom)
  1560. type NodePricing struct {
  1561. Name string
  1562. NodeType string
  1563. Preemptible bool
  1564. CostPerCPUHr float64
  1565. CostPerRAMGiBHr float64
  1566. CostPerGPUHr float64
  1567. Discount float64
  1568. Source string
  1569. }
  1570. // Pod describes a running pod's start and end time within a Window and
  1571. // all the Allocations (i.e. containers) contained within it.
  1572. type Pod struct {
  1573. Window kubecost.Window
  1574. Start time.Time
  1575. End time.Time
  1576. Key podKey
  1577. Allocations map[string]*kubecost.Allocation
  1578. }
  1579. // AppendContainer adds an entry for the given container name to the Pod.
  1580. func (p Pod) AppendContainer(container string) {
  1581. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1582. alloc := &kubecost.Allocation{
  1583. Name: name,
  1584. Properties: kubecost.Properties{},
  1585. Window: p.Window.Clone(),
  1586. Start: p.Start,
  1587. End: p.End,
  1588. }
  1589. alloc.Properties.SetContainer(container)
  1590. alloc.Properties.SetPod(p.Key.Pod)
  1591. alloc.Properties.SetNamespace(p.Key.Namespace)
  1592. alloc.Properties.SetCluster(p.Key.Cluster)
  1593. p.Allocations[container] = alloc
  1594. }
  1595. // PVC describes a PersistentVolumeClaim
  1596. // TODO:CLEANUP move to pkg/kubecost?
  1597. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1598. type PVC struct {
  1599. Bytes float64 `json:"bytes"`
  1600. Count int `json:"count"`
  1601. Name string `json:"name"`
  1602. Cluster string `json:"cluster"`
  1603. Namespace string `json:"namespace"`
  1604. Volume *PV `json:"persistentVolume"`
  1605. Mounted bool `json:"mounted"`
  1606. Start time.Time `json:"start"`
  1607. End time.Time `json:"end"`
  1608. }
  1609. // Cost computes the cumulative cost of the PVC
  1610. func (pvc *PVC) Cost() float64 {
  1611. if pvc == nil || pvc.Volume == nil {
  1612. return 0.0
  1613. }
  1614. gib := pvc.Bytes / 1024 / 1024 / 1024
  1615. hrs := pvc.Minutes() / 60.0
  1616. return pvc.Volume.CostPerGiBHour * gib * hrs
  1617. }
  1618. // Minutes computes the number of minutes over which the PVC is defined
  1619. func (pvc *PVC) Minutes() float64 {
  1620. if pvc == nil {
  1621. return 0.0
  1622. }
  1623. return pvc.End.Sub(pvc.Start).Minutes()
  1624. }
  1625. // String returns a string representation of the PVC
  1626. func (pvc *PVC) String() string {
  1627. if pvc == nil {
  1628. return "<nil>"
  1629. }
  1630. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  1631. }
  1632. // PV describes a PersistentVolume
  1633. // TODO:CLEANUP move to pkg/kubecost?
  1634. type PV struct {
  1635. Bytes float64 `json:"bytes"`
  1636. CostPerGiBHour float64 `json:"costPerGiBHour"`
  1637. Cluster string `json:"cluster"`
  1638. Name string `json:"name"`
  1639. StorageClass string `json:"storageClass"`
  1640. }
  1641. // String returns a string representation of the PV
  1642. func (pv *PV) String() string {
  1643. if pv == nil {
  1644. return "<nil>"
  1645. }
  1646. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  1647. }