allocation.go 64 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/env"
  10. "github.com/kubecost/cost-model/pkg/kubecost"
  11. "github.com/kubecost/cost-model/pkg/log"
  12. "github.com/kubecost/cost-model/pkg/prom"
  13. "github.com/kubecost/cost-model/pkg/util"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, cluster_id)[%s:%s]%s`
  18. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  19. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  20. queryFmtRAMUsage = `avg(avg_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  21. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  22. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  23. queryFmtCPUUsage = `avg(rate(container_cpu_usage_seconds_total{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  24. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  25. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  26. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  27. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  28. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
  29. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, cluster_id)[%s:%s]%s`
  30. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, cluster_id)`
  31. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, cluster_id)`
  32. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, cluster_id)`
  33. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, cluster_id)`
  34. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  35. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (cluster_id)`
  36. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  37. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (cluster_id)`
  38. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  39. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (cluster_id)`
  40. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
  41. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
  42. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
  43. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
  44. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
  45. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
  46. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
  47. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, cluster_id)`
  48. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,cluster_id)`
  49. queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s]%s)) by (namespace, service_name, cluster_id)`
  50. queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, cluster_id)[%s:%s]%s`
  51. )
  52. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  53. // for the window defined by the given start and end times. The Allocations
  54. // returned are unaggregated (i.e. down to the container level).
  55. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  56. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  57. // 2. Run and apply the results of the remaining queries to
  58. // 3. Build out AllocationSet from completed Pod map
  59. // Create a window spanning the requested query
  60. window := kubecost.NewWindow(&start, &end)
  61. // Create an empty AllocationSet. For safety, in the case of an error, we
  62. // should prefer to return this empty set with the error. (In the case of
  63. // no error, of course we populate the set and return it.)
  64. allocSet := kubecost.NewAllocationSet(start, end)
  65. // (1) Build out Pod map
  66. // Build out a map of Allocations as a mapping from pod-to-container-to-
  67. // underlying-Allocation instance, starting with (start, end) so that we
  68. // begin with minutes, from which we compute resource allocation and cost
  69. // totals from measured rate data.
  70. podMap := map[podKey]*Pod{}
  71. // clusterStarts and clusterEnds record the earliest start and latest end
  72. // times, respectively, on a cluster-basis. These are used for unmounted
  73. // PVs and other "virtual" Allocations so that minutes are maximally
  74. // accurate during start-up or spin-down of a cluster
  75. clusterStart := map[string]time.Time{}
  76. clusterEnd := map[string]time.Time{}
  77. cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
  78. // (2) Run and apply remaining queries
  79. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  80. // including handling Thanos offset
  81. durStr, offStr, err := window.DurationOffsetForPrometheus()
  82. if err != nil {
  83. // Negative duration, so return empty set
  84. return allocSet, nil
  85. }
  86. // Convert resolution duration to a query-ready string
  87. resStr := util.DurationString(resolution)
  88. ctx := prom.NewContext(cm.PrometheusClient)
  89. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr)
  90. resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
  91. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr)
  92. resChRAMRequests := ctx.Query(queryRAMRequests)
  93. queryRAMUsage := fmt.Sprintf(queryFmtRAMUsage, durStr, offStr)
  94. resChRAMUsage := ctx.Query(queryRAMUsage)
  95. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr)
  96. resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
  97. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr)
  98. resChCPURequests := ctx.Query(queryCPURequests)
  99. queryCPUUsage := fmt.Sprintf(queryFmtCPUUsage, durStr, offStr)
  100. resChCPUUsage := ctx.Query(queryCPUUsage)
  101. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr)
  102. resChGPUsRequested := ctx.Query(queryGPUsRequested)
  103. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr)
  104. resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
  105. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr)
  106. resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
  107. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr)
  108. resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
  109. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
  110. resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
  111. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, durStr, resStr, offStr)
  112. resChPVCInfo := ctx.Query(queryPVCInfo)
  113. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr)
  114. resChPVBytes := ctx.Query(queryPVBytes)
  115. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr)
  116. resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
  117. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr)
  118. resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
  119. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr)
  120. resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
  121. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr)
  122. resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
  123. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr)
  124. resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
  125. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr)
  126. resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
  127. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr)
  128. resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
  129. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr)
  130. resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
  131. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr)
  132. resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
  133. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
  134. resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
  135. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
  136. resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
  137. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
  138. resChPodLabels := ctx.Query(queryPodLabels)
  139. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
  140. resChPodAnnotations := ctx.Query(queryPodAnnotations)
  141. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
  142. resChServiceLabels := ctx.Query(queryServiceLabels)
  143. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
  144. resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
  145. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
  146. resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
  147. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr)
  148. resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
  149. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr)
  150. resChJobLabels := ctx.Query(queryJobLabels)
  151. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, offStr)
  152. resChLBCostPerHr := ctx.Query(queryLBCostPerHr)
  153. queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, durStr, resStr, offStr)
  154. resChLBActiveMins := ctx.Query(queryLBActiveMins)
  155. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  156. resCPURequests, _ := resChCPURequests.Await()
  157. resCPUUsage, _ := resChCPUUsage.Await()
  158. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  159. resRAMRequests, _ := resChRAMRequests.Await()
  160. resRAMUsage, _ := resChRAMUsage.Await()
  161. resGPUsRequested, _ := resChGPUsRequested.Await()
  162. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  163. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  164. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  165. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  166. resPVBytes, _ := resChPVBytes.Await()
  167. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  168. resPVCInfo, _ := resChPVCInfo.Await()
  169. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  170. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  171. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  172. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  173. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  174. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  175. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  176. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  177. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  178. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  179. resPodLabels, _ := resChPodLabels.Await()
  180. resPodAnnotations, _ := resChPodAnnotations.Await()
  181. resServiceLabels, _ := resChServiceLabels.Await()
  182. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  183. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  184. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  185. resJobLabels, _ := resChJobLabels.Await()
  186. resLBCostPerHr, _ := resChLBCostPerHr.Await()
  187. resLBActiveMins, _ := resChLBActiveMins.Await()
  188. if ctx.HasErrors() {
  189. for _, err := range ctx.Errors() {
  190. log.Errorf("CostModel.ComputeAllocation: %s", err)
  191. }
  192. return allocSet, ctx.ErrorCollection()
  193. }
  194. // We choose to apply allocation before requests in the cases of RAM and
  195. // CPU so that we can assert that allocation should always be greater than
  196. // or equal to request.
  197. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  198. applyCPUCoresRequested(podMap, resCPURequests)
  199. applyCPUCoresUsed(podMap, resCPUUsage)
  200. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  201. applyRAMBytesRequested(podMap, resRAMRequests)
  202. applyRAMBytesUsed(podMap, resRAMUsage)
  203. applyGPUsRequested(podMap, resGPUsRequested)
  204. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  205. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  206. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  207. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  208. podLabels := resToPodLabels(resPodLabels)
  209. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  210. podAnnotations := resToPodAnnotations(resPodAnnotations)
  211. applyLabels(podMap, namespaceLabels, podLabels)
  212. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  213. serviceLabels := getServiceLabels(resServiceLabels)
  214. allocsByService := map[serviceKey][]*kubecost.Allocation{}
  215. applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
  216. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  217. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  218. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  219. podJobMap := resToPodJobMap(resJobLabels)
  220. applyControllersToPods(podMap, podDeploymentMap)
  221. applyControllersToPods(podMap, podStatefulSetMap)
  222. applyControllersToPods(podMap, podDaemonSetMap)
  223. applyControllersToPods(podMap, podJobMap)
  224. // TODO breakdown network costs?
  225. // Build out a map of Nodes with resource costs, discounts, and node types
  226. // for converting resource allocation data to cumulative costs.
  227. nodeMap := map[nodeKey]*NodePricing{}
  228. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  229. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  230. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  231. applyNodeSpot(nodeMap, resNodeIsSpot)
  232. applyNodeDiscount(nodeMap, cm)
  233. // Build out the map of all PVs with class, size and cost-per-hour.
  234. // Note: this does not record time running, which we may want to
  235. // include later for increased PV precision. (As long as the PV has
  236. // a PVC, we get time running there, so this is only inaccurate
  237. // for short-lived, unmounted PVs.)
  238. pvMap := map[pvKey]*PV{}
  239. buildPVMap(pvMap, resPVCostPerGiBHour)
  240. applyPVBytes(pvMap, resPVBytes)
  241. // Build out the map of all PVCs with time running, bytes requested,
  242. // and connect to the correct PV from pvMap. (If no PV exists, that
  243. // is noted, but does not result in any allocation/cost.)
  244. pvcMap := map[pvcKey]*PVC{}
  245. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  246. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  247. // Build out the relationships of pods to their PVCs. This step
  248. // populates the PVC.Count field so that PVC allocation can be
  249. // split appropriately among each pod's container allocation.
  250. podPVCMap := map[podKey][]*PVC{}
  251. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  252. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  253. // cluster representing each cluster's unmounted PVs (if necessary).
  254. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  255. lbMap := getLoadBalancerCosts(resLBCostPerHr, resLBActiveMins, resolution)
  256. applyLoadBalancersToPods(lbMap, allocsByService)
  257. // (3) Build out AllocationSet from Pod map
  258. for _, pod := range podMap {
  259. for _, alloc := range pod.Allocations {
  260. cluster, _ := alloc.Properties.GetCluster()
  261. nodeName, _ := alloc.Properties.GetNode()
  262. namespace, _ := alloc.Properties.GetNamespace()
  263. pod, _ := alloc.Properties.GetPod()
  264. container, _ := alloc.Properties.GetContainer()
  265. podKey := newPodKey(cluster, namespace, pod)
  266. nodeKey := newNodeKey(cluster, nodeName)
  267. node := cm.getNodePricing(nodeMap, nodeKey)
  268. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  269. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  270. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  271. if pvcs, ok := podPVCMap[podKey]; ok {
  272. for _, pvc := range pvcs {
  273. // Determine the (start, end) of the relationship between the
  274. // given PVC and the associated Allocation so that a precise
  275. // number of hours can be used to compute cumulative cost.
  276. s, e := alloc.Start, alloc.End
  277. if pvc.Start.After(alloc.Start) {
  278. s = pvc.Start
  279. }
  280. if pvc.End.Before(alloc.End) {
  281. e = pvc.End
  282. }
  283. minutes := e.Sub(s).Minutes()
  284. hrs := minutes / 60.0
  285. count := float64(pvc.Count)
  286. if pvc.Count < 1 {
  287. count = 1
  288. }
  289. gib := pvc.Bytes / 1024 / 1024 / 1024
  290. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  291. // Apply the size and cost of the PV to the allocation, each
  292. // weighted by count (i.e. the number of containers in the pod)
  293. alloc.PVByteHours += pvc.Bytes * hrs / count
  294. alloc.PVCost += cost / count
  295. }
  296. }
  297. // Make sure that the name is correct (node may not be present at this
  298. // point due to it missing from queryMinutes) then insert.
  299. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  300. allocSet.Set(alloc)
  301. }
  302. }
  303. return allocSet, nil
  304. }
  305. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  306. // Assumes that window is positive and closed
  307. start, end := *window.Start(), *window.End()
  308. // Convert resolution duration to a query-ready string
  309. resStr := util.DurationString(resolution)
  310. ctx := prom.NewContext(cm.PrometheusClient)
  311. // Query for (start, end) by (pod, namespace, cluster) over the given
  312. // window, using the given resolution, and if necessary in batches no
  313. // larger than the given maximum batch size. If working in batches, track
  314. // overall progress by starting with (window.start, window.start) and
  315. // querying in batches no larger than maxBatchSize from start-to-end,
  316. // folding each result set into podMap as the results come back.
  317. coverage := kubecost.NewWindow(&start, &start)
  318. numQuery := 1
  319. for coverage.End().Before(end) {
  320. // Determine the (start, end) of the current batch
  321. batchStart := *coverage.End()
  322. batchEnd := coverage.End().Add(maxBatchSize)
  323. if batchEnd.After(end) {
  324. batchEnd = end
  325. }
  326. batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
  327. var resPods []*prom.QueryResult
  328. var err error
  329. maxTries := 3
  330. numTries := 0
  331. for resPods == nil && numTries < maxTries {
  332. numTries++
  333. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  334. // including handling Thanos offset
  335. durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
  336. if err != nil || durStr == "" {
  337. // Negative duration, so set empty results and don't query
  338. resPods = []*prom.QueryResult{}
  339. err = nil
  340. break
  341. }
  342. // Submit and profile query
  343. queryPods := fmt.Sprintf(queryFmtPods, durStr, resStr, offStr)
  344. queryProfile := time.Now()
  345. resPods, err = ctx.Query(queryPods).Await()
  346. if err != nil {
  347. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  348. resPods = nil
  349. }
  350. }
  351. if err != nil {
  352. return err
  353. }
  354. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  355. coverage = coverage.ExpandEnd(batchEnd)
  356. numQuery++
  357. }
  358. return nil
  359. }
  360. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  361. for _, res := range resPods {
  362. if len(res.Values) == 0 {
  363. log.Warningf("CostModel.ComputeAllocation: empty minutes result")
  364. continue
  365. }
  366. cluster, err := res.GetString("cluster_id")
  367. if err != nil {
  368. cluster = env.GetClusterID()
  369. }
  370. labels, err := res.GetStrings("namespace", "pod")
  371. if err != nil {
  372. log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  373. continue
  374. }
  375. namespace := labels["namespace"]
  376. pod := labels["pod"]
  377. key := newPodKey(cluster, namespace, pod)
  378. // allocStart and allocEnd are the timestamps of the first and last
  379. // minutes the pod was running, respectively. We subtract one resolution
  380. // from allocStart because this point will actually represent the end
  381. // of the first minute. We don't subtract from allocEnd because it
  382. // already represents the end of the last minute.
  383. var allocStart, allocEnd time.Time
  384. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  385. for _, datum := range res.Values {
  386. t := time.Unix(int64(datum.Timestamp), 0)
  387. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  388. // Set the start timestamp to the earliest non-zero timestamp
  389. allocStart = t
  390. // Record adjustment coefficient, i.e. the portion of the start
  391. // timestamp to "ignore". That is, sometimes the value will be
  392. // 0.5, meaning that we should discount the time running by
  393. // half of the resolution the timestamp stands for.
  394. startAdjustmentCoeff = (1.0 - datum.Value)
  395. }
  396. if datum.Value > 0 && window.Contains(t) {
  397. // Set the end timestamp to the latest non-zero timestamp
  398. allocEnd = t
  399. // Record adjustment coefficient, i.e. the portion of the end
  400. // timestamp to "ignore". (See explanation above for start.)
  401. endAdjustmentCoeff = (1.0 - datum.Value)
  402. }
  403. }
  404. if allocStart.IsZero() || allocEnd.IsZero() {
  405. continue
  406. }
  407. // Adjust timestamps according to the resolution and the adjustment
  408. // coefficients, as described above. That is, count the start timestamp
  409. // from the beginning of the resolution, not the end. Then "reduce" the
  410. // start and end by the correct amount, in the case that the "running"
  411. // value of the first or last timestamp was not a full 1.0.
  412. allocStart = allocStart.Add(-resolution)
  413. // Note: the *100 and /100 are necessary because Duration is an int, so
  414. // 0.5, for instance, will be truncated, resulting in no adjustment.
  415. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  416. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  417. // If there is only one point with a value <= 0.5 that the start and
  418. // end timestamps both share, then we will enter this case because at
  419. // least half of a resolution will be subtracted from both the start
  420. // and the end. If that is the case, then add back half of each side
  421. // so that the pod is said to run for half a resolution total.
  422. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  423. // end up with allocEnd == allocStart and each coeff == 0.5. In
  424. // that case, add 0.25m to each side, resulting in 0.5m duration.
  425. if !allocEnd.After(allocStart) {
  426. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  427. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  428. }
  429. // Set start if unset or this datum's start time is earlier than the
  430. // current earliest time.
  431. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  432. clusterStart[cluster] = allocStart
  433. }
  434. // Set end if unset or this datum's end time is later than the
  435. // current latest time.
  436. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  437. clusterEnd[cluster] = allocEnd
  438. }
  439. if pod, ok := podMap[key]; ok {
  440. // Pod has already been recorded, so update it accordingly
  441. if allocStart.Before(pod.Start) {
  442. pod.Start = allocStart
  443. }
  444. if allocEnd.After(pod.End) {
  445. pod.End = allocEnd
  446. }
  447. } else {
  448. // Pod has not been recorded yet, so insert it
  449. podMap[key] = &Pod{
  450. Window: window.Clone(),
  451. Start: allocStart,
  452. End: allocEnd,
  453. Key: key,
  454. Allocations: map[string]*kubecost.Allocation{},
  455. }
  456. }
  457. }
  458. }
  459. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  460. for _, res := range resCPUCoresAllocated {
  461. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  462. if err != nil {
  463. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  464. continue
  465. }
  466. pod, ok := podMap[key]
  467. if !ok {
  468. continue
  469. }
  470. container, err := res.GetString("container")
  471. if err != nil {
  472. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  473. continue
  474. }
  475. if _, ok := pod.Allocations[container]; !ok {
  476. pod.AppendContainer(container)
  477. }
  478. cpuCores := res.Values[0].Value
  479. hours := pod.Allocations[container].Minutes() / 60.0
  480. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  481. node, err := res.GetString("node")
  482. if err != nil {
  483. log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  484. continue
  485. }
  486. pod.Allocations[container].Properties.SetNode(node)
  487. }
  488. }
  489. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  490. for _, res := range resCPUCoresRequested {
  491. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  492. if err != nil {
  493. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  494. continue
  495. }
  496. pod, ok := podMap[key]
  497. if !ok {
  498. continue
  499. }
  500. container, err := res.GetString("container")
  501. if err != nil {
  502. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  503. continue
  504. }
  505. if _, ok := pod.Allocations[container]; !ok {
  506. pod.AppendContainer(container)
  507. }
  508. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  509. // If CPU allocation is less than requests, set CPUCoreHours to
  510. // request level.
  511. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  512. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  513. }
  514. node, err := res.GetString("node")
  515. if err != nil {
  516. log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  517. continue
  518. }
  519. pod.Allocations[container].Properties.SetNode(node)
  520. }
  521. }
  522. func applyCPUCoresUsed(podMap map[podKey]*Pod, resCPUCoresUsed []*prom.QueryResult) {
  523. for _, res := range resCPUCoresUsed {
  524. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  525. if err != nil {
  526. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage result missing field: %s", err)
  527. continue
  528. }
  529. pod, ok := podMap[key]
  530. if !ok {
  531. continue
  532. }
  533. container, err := res.GetString("container_name")
  534. if err != nil {
  535. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage query result missing 'container': %s", key)
  536. continue
  537. }
  538. if _, ok := pod.Allocations[container]; !ok {
  539. pod.AppendContainer(container)
  540. }
  541. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  542. }
  543. }
  544. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  545. for _, res := range resRAMBytesAllocated {
  546. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  547. if err != nil {
  548. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  549. continue
  550. }
  551. pod, ok := podMap[key]
  552. if !ok {
  553. continue
  554. }
  555. container, err := res.GetString("container")
  556. if err != nil {
  557. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  558. continue
  559. }
  560. if _, ok := pod.Allocations[container]; !ok {
  561. pod.AppendContainer(container)
  562. }
  563. ramBytes := res.Values[0].Value
  564. hours := pod.Allocations[container].Minutes() / 60.0
  565. pod.Allocations[container].RAMByteHours = ramBytes * hours
  566. node, err := res.GetString("node")
  567. if err != nil {
  568. log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  569. continue
  570. }
  571. pod.Allocations[container].Properties.SetNode(node)
  572. }
  573. }
  574. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  575. for _, res := range resRAMBytesRequested {
  576. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  577. if err != nil {
  578. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  579. continue
  580. }
  581. pod, ok := podMap[key]
  582. if !ok {
  583. continue
  584. }
  585. container, err := res.GetString("container")
  586. if err != nil {
  587. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  588. continue
  589. }
  590. if _, ok := pod.Allocations[container]; !ok {
  591. pod.AppendContainer(container)
  592. }
  593. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  594. // If RAM allocation is less than requests, set RAMByteHours to
  595. // request level.
  596. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  597. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  598. }
  599. node, err := res.GetString("node")
  600. if err != nil {
  601. log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  602. continue
  603. }
  604. pod.Allocations[container].Properties.SetNode(node)
  605. }
  606. }
  607. func applyRAMBytesUsed(podMap map[podKey]*Pod, resRAMBytesUsed []*prom.QueryResult) {
  608. for _, res := range resRAMBytesUsed {
  609. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  610. if err != nil {
  611. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage result missing field: %s", err)
  612. continue
  613. }
  614. pod, ok := podMap[key]
  615. if !ok {
  616. continue
  617. }
  618. container, err := res.GetString("container_name")
  619. if err != nil {
  620. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage query result missing 'container': %s", key)
  621. continue
  622. }
  623. if _, ok := pod.Allocations[container]; !ok {
  624. pod.AppendContainer(container)
  625. }
  626. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  627. }
  628. }
  629. func applyGPUsRequested(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult) {
  630. for _, res := range resGPUsRequested {
  631. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  632. if err != nil {
  633. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  634. continue
  635. }
  636. pod, ok := podMap[key]
  637. if !ok {
  638. continue
  639. }
  640. container, err := res.GetString("container")
  641. if err != nil {
  642. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  643. continue
  644. }
  645. if _, ok := pod.Allocations[container]; !ok {
  646. pod.AppendContainer(container)
  647. }
  648. hrs := pod.Allocations[container].Minutes() / 60.0
  649. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  650. }
  651. }
  652. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  653. costPerGiBByCluster := map[string]float64{}
  654. for _, res := range resNetworkCostPerGiB {
  655. cluster, err := res.GetString("cluster_id")
  656. if err != nil {
  657. cluster = env.GetClusterID()
  658. }
  659. costPerGiBByCluster[cluster] = res.Values[0].Value
  660. }
  661. for _, res := range resNetworkGiB {
  662. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  663. if err != nil {
  664. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  665. continue
  666. }
  667. pod, ok := podMap[podKey]
  668. if !ok {
  669. continue
  670. }
  671. for _, alloc := range pod.Allocations {
  672. gib := res.Values[0].Value / float64(len(pod.Allocations))
  673. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  674. alloc.NetworkCost = gib * costPerGiB
  675. }
  676. }
  677. }
  678. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
  679. namespaceLabels := map[namespaceKey]map[string]string{}
  680. for _, res := range resNamespaceLabels {
  681. nsKey, err := resultNamespaceKey(res, "cluster_id", "namespace")
  682. if err != nil {
  683. continue
  684. }
  685. if _, ok := namespaceLabels[nsKey]; !ok {
  686. namespaceLabels[nsKey] = map[string]string{}
  687. }
  688. for k, l := range res.GetLabels() {
  689. namespaceLabels[nsKey][k] = l
  690. }
  691. }
  692. return namespaceLabels
  693. }
  694. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  695. podLabels := map[podKey]map[string]string{}
  696. for _, res := range resPodLabels {
  697. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  698. if err != nil {
  699. continue
  700. }
  701. if _, ok := podLabels[podKey]; !ok {
  702. podLabels[podKey] = map[string]string{}
  703. }
  704. for k, l := range res.GetLabels() {
  705. podLabels[podKey][k] = l
  706. }
  707. }
  708. return podLabels
  709. }
  710. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  711. namespaceAnnotations := map[string]map[string]string{}
  712. for _, res := range resNamespaceAnnotations {
  713. namespace, err := res.GetString("namespace")
  714. if err != nil {
  715. continue
  716. }
  717. if _, ok := namespaceAnnotations[namespace]; !ok {
  718. namespaceAnnotations[namespace] = map[string]string{}
  719. }
  720. for k, l := range res.GetAnnotations() {
  721. namespaceAnnotations[namespace][k] = l
  722. }
  723. }
  724. return namespaceAnnotations
  725. }
  726. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  727. podAnnotations := map[podKey]map[string]string{}
  728. for _, res := range resPodAnnotations {
  729. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  730. if err != nil {
  731. continue
  732. }
  733. if _, ok := podAnnotations[podKey]; !ok {
  734. podAnnotations[podKey] = map[string]string{}
  735. }
  736. for k, l := range res.GetAnnotations() {
  737. podAnnotations[podKey][k] = l
  738. }
  739. }
  740. return podAnnotations
  741. }
  742. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
  743. for podKey, pod := range podMap {
  744. for _, alloc := range pod.Allocations {
  745. allocLabels, err := alloc.Properties.GetLabels()
  746. if err != nil {
  747. allocLabels = map[string]string{}
  748. }
  749. // Apply namespace labels first, then pod labels so that pod labels
  750. // overwrite namespace labels.
  751. nsKey := newNamespaceKey(podKey.Cluster, podKey.Namespace)
  752. if labels, ok := namespaceLabels[nsKey]; ok {
  753. for k, v := range labels {
  754. allocLabels[k] = v
  755. }
  756. }
  757. if labels, ok := podLabels[podKey]; ok {
  758. for k, v := range labels {
  759. allocLabels[k] = v
  760. }
  761. }
  762. alloc.Properties.SetLabels(allocLabels)
  763. }
  764. }
  765. }
  766. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  767. for key, pod := range podMap {
  768. for _, alloc := range pod.Allocations {
  769. allocAnnotations, err := alloc.Properties.GetAnnotations()
  770. if err != nil {
  771. allocAnnotations = map[string]string{}
  772. }
  773. // Apply namespace annotations first, then pod annotations so that
  774. // pod labels overwrite namespace labels.
  775. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  776. for k, v := range labels {
  777. allocAnnotations[k] = v
  778. }
  779. }
  780. if labels, ok := podAnnotations[key]; ok {
  781. for k, v := range labels {
  782. allocAnnotations[k] = v
  783. }
  784. }
  785. alloc.Properties.SetAnnotations(allocAnnotations)
  786. }
  787. }
  788. }
  789. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  790. serviceLabels := map[serviceKey]map[string]string{}
  791. for _, res := range resServiceLabels {
  792. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service")
  793. if err != nil {
  794. continue
  795. }
  796. if _, ok := serviceLabels[serviceKey]; !ok {
  797. serviceLabels[serviceKey] = map[string]string{}
  798. }
  799. for k, l := range res.GetLabels() {
  800. serviceLabels[serviceKey][k] = l
  801. }
  802. }
  803. // Prune duplicate services. That is, if the same service exists with
  804. // hyphens instead of underscores, keep the one that uses hyphens.
  805. for key := range serviceLabels {
  806. if strings.Contains(key.Service, "_") {
  807. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  808. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  809. if _, ok := serviceLabels[duplicateKey]; ok {
  810. delete(serviceLabels, key)
  811. }
  812. }
  813. }
  814. return serviceLabels
  815. }
  816. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  817. deploymentLabels := map[controllerKey]map[string]string{}
  818. for _, res := range resDeploymentLabels {
  819. controllerKey, err := resultDeploymentKey(res, "cluster_id", "namespace", "deployment")
  820. if err != nil {
  821. continue
  822. }
  823. if _, ok := deploymentLabels[controllerKey]; !ok {
  824. deploymentLabels[controllerKey] = map[string]string{}
  825. }
  826. for k, l := range res.GetLabels() {
  827. deploymentLabels[controllerKey][k] = l
  828. }
  829. }
  830. // Prune duplicate deployments. That is, if the same deployment exists with
  831. // hyphens instead of underscores, keep the one that uses hyphens.
  832. for key := range deploymentLabels {
  833. if strings.Contains(key.Controller, "_") {
  834. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  835. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  836. if _, ok := deploymentLabels[duplicateKey]; ok {
  837. delete(deploymentLabels, key)
  838. }
  839. }
  840. }
  841. return deploymentLabels
  842. }
  843. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  844. statefulSetLabels := map[controllerKey]map[string]string{}
  845. for _, res := range resStatefulSetLabels {
  846. controllerKey, err := resultStatefulSetKey(res, "cluster_id", "namespace", "statefulSet")
  847. if err != nil {
  848. continue
  849. }
  850. if _, ok := statefulSetLabels[controllerKey]; !ok {
  851. statefulSetLabels[controllerKey] = map[string]string{}
  852. }
  853. for k, l := range res.GetLabels() {
  854. statefulSetLabels[controllerKey][k] = l
  855. }
  856. }
  857. // Prune duplicate stateful sets. That is, if the same stateful set exists
  858. // with hyphens instead of underscores, keep the one that uses hyphens.
  859. for key := range statefulSetLabels {
  860. if strings.Contains(key.Controller, "_") {
  861. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  862. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  863. if _, ok := statefulSetLabels[duplicateKey]; ok {
  864. delete(statefulSetLabels, key)
  865. }
  866. }
  867. }
  868. return statefulSetLabels
  869. }
  870. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  871. podControllerMap := map[podKey]controllerKey{}
  872. // For each controller, turn the labels into a selector and attempt to
  873. // match it with each set of pod labels. A match indicates that the pod
  874. // belongs to the controller.
  875. for cKey, cLabels := range controllerLabels {
  876. selector := labels.Set(cLabels).AsSelectorPreValidated()
  877. for pKey, pLabels := range podLabels {
  878. // If the pod is in a different cluster or namespace, there is
  879. // no need to compare the labels.
  880. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  881. continue
  882. }
  883. podLabelSet := labels.Set(pLabels)
  884. if selector.Matches(podLabelSet) {
  885. if _, ok := podControllerMap[pKey]; ok {
  886. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  887. }
  888. podControllerMap[pKey] = cKey
  889. }
  890. }
  891. }
  892. return podControllerMap
  893. }
  894. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  895. daemonSetLabels := map[podKey]controllerKey{}
  896. for _, res := range resDaemonSetLabels {
  897. controllerKey, err := resultDaemonSetKey(res, "cluster_id", "namespace", "owner_name")
  898. if err != nil {
  899. continue
  900. }
  901. pod, err := res.GetString("pod")
  902. if err != nil {
  903. log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  904. }
  905. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  906. daemonSetLabels[podKey] = controllerKey
  907. }
  908. return daemonSetLabels
  909. }
  910. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  911. jobLabels := map[podKey]controllerKey{}
  912. for _, res := range resJobLabels {
  913. controllerKey, err := resultJobKey(res, "cluster_id", "namespace", "owner_name")
  914. if err != nil {
  915. continue
  916. }
  917. // Convert the name of Jobs generated by CronJobs to the name of the
  918. // CronJob by stripping the timestamp off the end.
  919. match := isCron.FindStringSubmatch(controllerKey.Controller)
  920. if match != nil {
  921. controllerKey.Controller = match[1]
  922. }
  923. pod, err := res.GetString("pod")
  924. if err != nil {
  925. log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  926. }
  927. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  928. jobLabels[podKey] = controllerKey
  929. }
  930. return jobLabels
  931. }
  932. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, allocsByService map[serviceKey][]*kubecost.Allocation, serviceLabels map[serviceKey]map[string]string) {
  933. podServicesMap := map[podKey][]serviceKey{}
  934. // For each service, turn the labels into a selector and attempt to
  935. // match it with each set of pod labels. A match indicates that the pod
  936. // belongs to the service.
  937. for sKey, sLabels := range serviceLabels {
  938. selector := labels.Set(sLabels).AsSelectorPreValidated()
  939. for pKey, pLabels := range podLabels {
  940. // If the pod is in a different cluster or namespace, there is
  941. // no need to compare the labels.
  942. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  943. continue
  944. }
  945. podLabelSet := labels.Set(pLabels)
  946. if selector.Matches(podLabelSet) {
  947. if _, ok := podServicesMap[pKey]; !ok {
  948. podServicesMap[pKey] = []serviceKey{}
  949. }
  950. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  951. }
  952. }
  953. }
  954. // For each allocation in each pod, attempt to find and apply the list of
  955. // services associated with the allocation's pod.
  956. for key, pod := range podMap {
  957. for _, alloc := range pod.Allocations {
  958. if sKeys, ok := podServicesMap[key]; ok {
  959. services := []string{}
  960. for _, sKey := range sKeys {
  961. services = append(services, sKey.Service)
  962. allocsByService[sKey] = append(allocsByService[sKey], alloc)
  963. }
  964. alloc.Properties.SetServices(services)
  965. }
  966. }
  967. }
  968. }
  969. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  970. for key, pod := range podMap {
  971. for _, alloc := range pod.Allocations {
  972. if controllerKey, ok := podControllerMap[key]; ok {
  973. alloc.Properties.SetControllerKind(controllerKey.ControllerKind)
  974. alloc.Properties.SetController(controllerKey.Controller)
  975. }
  976. }
  977. }
  978. }
  979. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  980. for _, res := range resNodeCostPerCPUHr {
  981. cluster, err := res.GetString("cluster_id")
  982. if err != nil {
  983. cluster = env.GetClusterID()
  984. }
  985. node, err := res.GetString("node")
  986. if err != nil {
  987. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  988. continue
  989. }
  990. instanceType, err := res.GetString("instance_type")
  991. if err != nil {
  992. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  993. continue
  994. }
  995. key := newNodeKey(cluster, node)
  996. if _, ok := nodeMap[key]; !ok {
  997. nodeMap[key] = &NodePricing{
  998. Name: node,
  999. NodeType: instanceType,
  1000. }
  1001. }
  1002. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  1003. }
  1004. }
  1005. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  1006. for _, res := range resNodeCostPerRAMGiBHr {
  1007. cluster, err := res.GetString("cluster_id")
  1008. if err != nil {
  1009. cluster = env.GetClusterID()
  1010. }
  1011. node, err := res.GetString("node")
  1012. if err != nil {
  1013. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1014. continue
  1015. }
  1016. instanceType, err := res.GetString("instance_type")
  1017. if err != nil {
  1018. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1019. continue
  1020. }
  1021. key := newNodeKey(cluster, node)
  1022. if _, ok := nodeMap[key]; !ok {
  1023. nodeMap[key] = &NodePricing{
  1024. Name: node,
  1025. NodeType: instanceType,
  1026. }
  1027. }
  1028. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1029. }
  1030. }
  1031. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1032. for _, res := range resNodeCostPerGPUHr {
  1033. cluster, err := res.GetString("cluster_id")
  1034. if err != nil {
  1035. cluster = env.GetClusterID()
  1036. }
  1037. node, err := res.GetString("node")
  1038. if err != nil {
  1039. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1040. continue
  1041. }
  1042. instanceType, err := res.GetString("instance_type")
  1043. if err != nil {
  1044. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1045. continue
  1046. }
  1047. key := newNodeKey(cluster, node)
  1048. if _, ok := nodeMap[key]; !ok {
  1049. nodeMap[key] = &NodePricing{
  1050. Name: node,
  1051. NodeType: instanceType,
  1052. }
  1053. }
  1054. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1055. }
  1056. }
  1057. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1058. for _, res := range resNodeIsSpot {
  1059. cluster, err := res.GetString("cluster_id")
  1060. if err != nil {
  1061. cluster = env.GetClusterID()
  1062. }
  1063. node, err := res.GetString("node")
  1064. if err != nil {
  1065. log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1066. continue
  1067. }
  1068. key := newNodeKey(cluster, node)
  1069. if _, ok := nodeMap[key]; !ok {
  1070. log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1071. continue
  1072. }
  1073. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1074. }
  1075. }
  1076. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1077. if cm == nil {
  1078. return
  1079. }
  1080. c, err := cm.Provider.GetConfig()
  1081. if err != nil {
  1082. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1083. return
  1084. }
  1085. discount, err := ParsePercentString(c.Discount)
  1086. if err != nil {
  1087. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1088. return
  1089. }
  1090. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1091. if err != nil {
  1092. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1093. return
  1094. }
  1095. for _, node := range nodeMap {
  1096. // TODO GKE Reserved Instances into account
  1097. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1098. node.CostPerCPUHr *= (1.0 - node.Discount)
  1099. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1100. }
  1101. }
  1102. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1103. for _, res := range resPVCostPerGiBHour {
  1104. cluster, err := res.GetString("cluster_id")
  1105. if err != nil {
  1106. cluster = env.GetClusterID()
  1107. }
  1108. name, err := res.GetString("volumename")
  1109. if err != nil {
  1110. log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
  1111. continue
  1112. }
  1113. key := newPVKey(cluster, name)
  1114. pvMap[key] = &PV{
  1115. Cluster: cluster,
  1116. Name: name,
  1117. CostPerGiBHour: res.Values[0].Value,
  1118. }
  1119. }
  1120. }
  1121. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1122. for _, res := range resPVBytes {
  1123. key, err := resultPVKey(res, "cluster_id", "persistentvolume")
  1124. if err != nil {
  1125. log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1126. continue
  1127. }
  1128. if _, ok := pvMap[key]; !ok {
  1129. log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1130. continue
  1131. }
  1132. pvMap[key].Bytes = res.Values[0].Value
  1133. }
  1134. }
  1135. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1136. for _, res := range resPVCInfo {
  1137. cluster, err := res.GetString("cluster_id")
  1138. if err != nil {
  1139. cluster = env.GetClusterID()
  1140. }
  1141. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1142. if err != nil {
  1143. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1144. continue
  1145. }
  1146. namespace := values["namespace"]
  1147. name := values["persistentvolumeclaim"]
  1148. volume := values["volumename"]
  1149. storageClass := values["storageclass"]
  1150. pvKey := newPVKey(cluster, volume)
  1151. pvcKey := newPVCKey(cluster, namespace, name)
  1152. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1153. // the PVC was running, respectively. We subtract 1m from pvcStart
  1154. // because this point will actually represent the end of the first
  1155. // minute. We don't subtract from pvcEnd because it already represents
  1156. // the end of the last minute.
  1157. var pvcStart, pvcEnd time.Time
  1158. for _, datum := range res.Values {
  1159. t := time.Unix(int64(datum.Timestamp), 0)
  1160. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1161. pvcStart = t
  1162. }
  1163. if datum.Value > 0 && window.Contains(t) {
  1164. pvcEnd = t
  1165. }
  1166. }
  1167. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1168. log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1169. }
  1170. pvcStart = pvcStart.Add(-time.Minute)
  1171. if _, ok := pvMap[pvKey]; !ok {
  1172. continue
  1173. }
  1174. pvMap[pvKey].StorageClass = storageClass
  1175. if _, ok := pvcMap[pvcKey]; !ok {
  1176. pvcMap[pvcKey] = &PVC{}
  1177. }
  1178. pvcMap[pvcKey].Name = name
  1179. pvcMap[pvcKey].Namespace = namespace
  1180. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1181. pvcMap[pvcKey].Start = pvcStart
  1182. pvcMap[pvcKey].End = pvcEnd
  1183. }
  1184. }
  1185. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1186. for _, res := range resPVCBytesRequested {
  1187. key, err := resultPVCKey(res, "cluster_id", "namespace", "persistentvolumeclaim")
  1188. if err != nil {
  1189. continue
  1190. }
  1191. if _, ok := pvcMap[key]; !ok {
  1192. continue
  1193. }
  1194. pvcMap[key].Bytes = res.Values[0].Value
  1195. }
  1196. }
  1197. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1198. for _, res := range resPodPVCAllocation {
  1199. cluster, err := res.GetString("cluster_id")
  1200. if err != nil {
  1201. cluster = env.GetClusterID()
  1202. }
  1203. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1204. if err != nil {
  1205. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1206. continue
  1207. }
  1208. namespace := values["namespace"]
  1209. pod := values["pod"]
  1210. name := values["persistentvolumeclaim"]
  1211. volume := values["persistentvolume"]
  1212. podKey := newPodKey(cluster, namespace, pod)
  1213. pvKey := newPVKey(cluster, volume)
  1214. pvcKey := newPVCKey(cluster, namespace, name)
  1215. if _, ok := pvMap[pvKey]; !ok {
  1216. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1217. continue
  1218. }
  1219. if _, ok := podPVCMap[podKey]; !ok {
  1220. podPVCMap[podKey] = []*PVC{}
  1221. }
  1222. pvc, ok := pvcMap[pvcKey]
  1223. if !ok {
  1224. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1225. continue
  1226. }
  1227. count := 1
  1228. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1229. count = len(pod.Allocations)
  1230. } else {
  1231. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1232. }
  1233. pvc.Count = count
  1234. pvc.Mounted = true
  1235. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1236. }
  1237. }
  1238. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1239. unmountedPVBytes := map[string]float64{}
  1240. unmountedPVCost := map[string]float64{}
  1241. for _, pv := range pvMap {
  1242. mounted := false
  1243. for _, pvc := range pvcMap {
  1244. if pvc.Volume == nil {
  1245. continue
  1246. }
  1247. if pvc.Volume == pv {
  1248. mounted = true
  1249. break
  1250. }
  1251. }
  1252. if !mounted {
  1253. gib := pv.Bytes / 1024 / 1024 / 1024
  1254. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1255. cost := pv.CostPerGiBHour * gib * hrs
  1256. unmountedPVCost[pv.Cluster] += cost
  1257. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1258. }
  1259. }
  1260. for cluster, amount := range unmountedPVCost {
  1261. container := kubecost.UnmountedSuffix
  1262. pod := kubecost.UnmountedSuffix
  1263. namespace := kubecost.UnmountedSuffix
  1264. node := ""
  1265. key := newPodKey(cluster, namespace, pod)
  1266. podMap[key] = &Pod{
  1267. Window: window.Clone(),
  1268. Start: *window.Start(),
  1269. End: *window.End(),
  1270. Key: key,
  1271. Allocations: map[string]*kubecost.Allocation{},
  1272. }
  1273. podMap[key].AppendContainer(container)
  1274. podMap[key].Allocations[container].Properties.SetCluster(cluster)
  1275. podMap[key].Allocations[container].Properties.SetNode(node)
  1276. podMap[key].Allocations[container].Properties.SetNamespace(namespace)
  1277. podMap[key].Allocations[container].Properties.SetPod(pod)
  1278. podMap[key].Allocations[container].Properties.SetContainer(container)
  1279. podMap[key].Allocations[container].PVByteHours = unmountedPVBytes[cluster] * window.Minutes() / 60.0
  1280. podMap[key].Allocations[container].PVCost = amount
  1281. }
  1282. }
  1283. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1284. unmountedPVCBytes := map[namespaceKey]float64{}
  1285. unmountedPVCCost := map[namespaceKey]float64{}
  1286. for _, pvc := range pvcMap {
  1287. if !pvc.Mounted && pvc.Volume != nil {
  1288. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1289. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1290. hrs := pvc.Minutes() / 60.0
  1291. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1292. unmountedPVCCost[key] += cost
  1293. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1294. }
  1295. }
  1296. for key, amount := range unmountedPVCCost {
  1297. container := kubecost.UnmountedSuffix
  1298. pod := kubecost.UnmountedSuffix
  1299. namespace := key.Namespace
  1300. node := ""
  1301. cluster := key.Cluster
  1302. podKey := newPodKey(cluster, namespace, pod)
  1303. podMap[podKey] = &Pod{
  1304. Window: window.Clone(),
  1305. Start: *window.Start(),
  1306. End: *window.End(),
  1307. Key: podKey,
  1308. Allocations: map[string]*kubecost.Allocation{},
  1309. }
  1310. podMap[podKey].AppendContainer(container)
  1311. podMap[podKey].Allocations[container].Properties.SetCluster(cluster)
  1312. podMap[podKey].Allocations[container].Properties.SetNode(node)
  1313. podMap[podKey].Allocations[container].Properties.SetNamespace(namespace)
  1314. podMap[podKey].Allocations[container].Properties.SetPod(pod)
  1315. podMap[podKey].Allocations[container].Properties.SetContainer(container)
  1316. podMap[podKey].Allocations[container].PVByteHours = unmountedPVCBytes[key] * window.Minutes() / 60.0
  1317. podMap[podKey].Allocations[container].PVCost = amount
  1318. }
  1319. }
  1320. // LB describes the start and end time of a Load Balancer along with cost
  1321. type LB struct {
  1322. TotalCost float64
  1323. Start time.Time
  1324. End time.Time
  1325. }
  1326. func getLoadBalancerCosts(resLBCost, resLBActiveMins []*prom.QueryResult, resolution time.Duration) map[serviceKey]*LB {
  1327. lbMap := make(map[serviceKey]*LB)
  1328. lbHourlyCosts := make(map[serviceKey]float64)
  1329. for _, res := range resLBCost {
  1330. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service_name")
  1331. if err != nil {
  1332. continue
  1333. }
  1334. lbHourlyCosts[serviceKey] = res.Values[0].Value
  1335. }
  1336. for _, res := range resLBActiveMins {
  1337. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service_name")
  1338. if err != nil || len(res.Values) == 0 {
  1339. continue
  1340. }
  1341. if _, ok := lbHourlyCosts[serviceKey]; !ok {
  1342. log.Warningf("CostModel: failed to find hourly cost for Load Balancer: %v", serviceKey)
  1343. continue
  1344. }
  1345. s := time.Unix(int64(res.Values[0].Timestamp), 0)
  1346. // subtract resolution from start time to cover full time period
  1347. s = s.Add(-resolution)
  1348. e := time.Unix(int64(res.Values[len(res.Values)-1].Timestamp), 0)
  1349. hours := e.Sub(s).Hours()
  1350. lbMap[serviceKey] = &LB{
  1351. TotalCost: lbHourlyCosts[serviceKey] * hours,
  1352. Start: s,
  1353. End: e,
  1354. }
  1355. }
  1356. return lbMap
  1357. }
  1358. func applyLoadBalancersToPods(lbMap map[serviceKey]*LB, allocsByService map[serviceKey][]*kubecost.Allocation) {
  1359. for sKey, lb := range lbMap {
  1360. totalHours := 0.0
  1361. allocHours := make(map[*kubecost.Allocation]float64)
  1362. // Add portion of load balancing cost to each allocation
  1363. // proportional to the total number of hours allocations used the load balancer
  1364. for _, alloc := range allocsByService[sKey] {
  1365. // Determine the (start, end) of the relationship between the
  1366. // given LB and the associated Allocation so that a precise
  1367. // number of hours can be used to compute cumulative cost.
  1368. s, e := alloc.Start, alloc.End
  1369. if lb.Start.After(alloc.Start) {
  1370. s = lb.Start
  1371. }
  1372. if lb.End.Before(alloc.End) {
  1373. e = lb.End
  1374. }
  1375. hours := e.Sub(s).Hours()
  1376. // A negative number of hours signifies no overlap between the windows
  1377. if hours > 0 {
  1378. totalHours += hours
  1379. allocHours[alloc] = hours
  1380. }
  1381. }
  1382. // Distribute cost of service once total hours is calculated
  1383. for alloc, hours := range allocHours {
  1384. alloc.LoadBalancerCost += lb.TotalCost * hours / totalHours
  1385. }
  1386. }
  1387. }
  1388. // getNodePricing determines node pricing, given a key and a mapping from keys
  1389. // to their NodePricing instances, as well as the custom pricing configuration
  1390. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1391. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1392. // back on custom pricing as a default.
  1393. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1394. // Find the relevant NodePricing, if it exists. If not, substitute the
  1395. // custom NodePricing as a default.
  1396. node, ok := nodeMap[nodeKey]
  1397. if !ok || node == nil {
  1398. if nodeKey.Node != "" {
  1399. log.DedupedWarningf(5, "CostModel: failed to find node for %s", nodeKey)
  1400. }
  1401. return cm.getCustomNodePricing(false)
  1402. }
  1403. // If custom pricing is enabled and can be retrieved, override detected
  1404. // node pricing with the custom values.
  1405. customPricingConfig, err := cm.Provider.GetConfig()
  1406. if err != nil {
  1407. log.Warningf("CostModel: failed to load custom pricing: %s", err)
  1408. }
  1409. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1410. return cm.getCustomNodePricing(node.Preemptible)
  1411. }
  1412. node.Source = "prometheus"
  1413. // If any of the values are NaN or zero, replace them with the custom
  1414. // values as default.
  1415. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1416. // them as strings like this?
  1417. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1418. log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1419. cpuCostStr := customPricingConfig.CPU
  1420. if node.Preemptible {
  1421. cpuCostStr = customPricingConfig.SpotCPU
  1422. }
  1423. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1424. if err != nil {
  1425. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1426. }
  1427. node.CostPerCPUHr = costPerCPUHr
  1428. node.Source += "/customCPU"
  1429. }
  1430. if math.IsNaN(node.CostPerGPUHr) {
  1431. log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1432. gpuCostStr := customPricingConfig.GPU
  1433. if node.Preemptible {
  1434. gpuCostStr = customPricingConfig.SpotGPU
  1435. }
  1436. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1437. if err != nil {
  1438. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1439. }
  1440. node.CostPerGPUHr = costPerGPUHr
  1441. node.Source += "/customGPU"
  1442. }
  1443. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1444. log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1445. ramCostStr := customPricingConfig.RAM
  1446. if node.Preemptible {
  1447. ramCostStr = customPricingConfig.SpotRAM
  1448. }
  1449. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1450. if err != nil {
  1451. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1452. }
  1453. node.CostPerRAMGiBHr = costPerRAMHr
  1454. node.Source += "/customRAM"
  1455. }
  1456. return node
  1457. }
  1458. // getCustomNodePricing converts the CostModel's configured custom pricing
  1459. // values into a NodePricing instance.
  1460. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1461. customPricingConfig, err := cm.Provider.GetConfig()
  1462. if err != nil {
  1463. return nil
  1464. }
  1465. cpuCostStr := customPricingConfig.CPU
  1466. gpuCostStr := customPricingConfig.GPU
  1467. ramCostStr := customPricingConfig.RAM
  1468. if spot {
  1469. cpuCostStr = customPricingConfig.SpotCPU
  1470. gpuCostStr = customPricingConfig.SpotGPU
  1471. ramCostStr = customPricingConfig.SpotRAM
  1472. }
  1473. node := &NodePricing{Source: "custom"}
  1474. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1475. if err != nil {
  1476. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1477. }
  1478. node.CostPerCPUHr = costPerCPUHr
  1479. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1480. if err != nil {
  1481. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1482. }
  1483. node.CostPerGPUHr = costPerGPUHr
  1484. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1485. if err != nil {
  1486. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1487. }
  1488. node.CostPerRAMGiBHr = costPerRAMHr
  1489. return node
  1490. }
  1491. // NodePricing describes the resource costs associated with a given node, as
  1492. // well as the source of the information (e.g. prometheus, custom)
  1493. type NodePricing struct {
  1494. Name string
  1495. NodeType string
  1496. Preemptible bool
  1497. CostPerCPUHr float64
  1498. CostPerRAMGiBHr float64
  1499. CostPerGPUHr float64
  1500. Discount float64
  1501. Source string
  1502. }
  1503. // Pod describes a running pod's start and end time within a Window and
  1504. // all the Allocations (i.e. containers) contained within it.
  1505. type Pod struct {
  1506. Window kubecost.Window
  1507. Start time.Time
  1508. End time.Time
  1509. Key podKey
  1510. Allocations map[string]*kubecost.Allocation
  1511. }
  1512. // AppendContainer adds an entry for the given container name to the Pod.
  1513. func (p Pod) AppendContainer(container string) {
  1514. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1515. alloc := &kubecost.Allocation{
  1516. Name: name,
  1517. Properties: kubecost.Properties{},
  1518. Window: p.Window.Clone(),
  1519. Start: p.Start,
  1520. End: p.End,
  1521. }
  1522. alloc.Properties.SetContainer(container)
  1523. alloc.Properties.SetPod(p.Key.Pod)
  1524. alloc.Properties.SetNamespace(p.Key.Namespace)
  1525. alloc.Properties.SetCluster(p.Key.Cluster)
  1526. p.Allocations[container] = alloc
  1527. }
  1528. // PVC describes a PersistentVolumeClaim
  1529. // TODO:CLEANUP move to pkg/kubecost?
  1530. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1531. type PVC struct {
  1532. Bytes float64 `json:"bytes"`
  1533. Count int `json:"count"`
  1534. Name string `json:"name"`
  1535. Cluster string `json:"cluster"`
  1536. Namespace string `json:"namespace"`
  1537. Volume *PV `json:"persistentVolume"`
  1538. Mounted bool `json:"mounted"`
  1539. Start time.Time `json:"start"`
  1540. End time.Time `json:"end"`
  1541. }
  1542. // Cost computes the cumulative cost of the PVC
  1543. func (pvc *PVC) Cost() float64 {
  1544. if pvc == nil || pvc.Volume == nil {
  1545. return 0.0
  1546. }
  1547. gib := pvc.Bytes / 1024 / 1024 / 1024
  1548. hrs := pvc.Minutes() / 60.0
  1549. return pvc.Volume.CostPerGiBHour * gib * hrs
  1550. }
  1551. // Minutes computes the number of minutes over which the PVC is defined
  1552. func (pvc *PVC) Minutes() float64 {
  1553. if pvc == nil {
  1554. return 0.0
  1555. }
  1556. return pvc.End.Sub(pvc.Start).Minutes()
  1557. }
  1558. // String returns a string representation of the PVC
  1559. func (pvc *PVC) String() string {
  1560. if pvc == nil {
  1561. return "<nil>"
  1562. }
  1563. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  1564. }
  1565. // PV describes a PersistentVolume
  1566. // TODO:CLEANUP move to pkg/kubecost?
  1567. type PV struct {
  1568. Bytes float64 `json:"bytes"`
  1569. CostPerGiBHour float64 `json:"costPerGiBHour"`
  1570. Cluster string `json:"cluster"`
  1571. Name string `json:"name"`
  1572. StorageClass string `json:"storageClass"`
  1573. }
  1574. // String returns a string representation of the PV
  1575. func (pv *PV) String() string {
  1576. if pv == nil {
  1577. return "<nil>"
  1578. }
  1579. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  1580. }