2
0

allocation.go 78 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/util/timeutil"
  9. "github.com/kubecost/cost-model/pkg/cloud"
  10. "github.com/kubecost/cost-model/pkg/env"
  11. "github.com/kubecost/cost-model/pkg/kubecost"
  12. "github.com/kubecost/cost-model/pkg/log"
  13. "github.com/kubecost/cost-model/pkg/prom"
  14. "k8s.io/apimachinery/pkg/labels"
  15. "k8s.io/klog"
  16. )
  17. const (
  18. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]%s`
  19. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s, provider_id)`
  20. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  21. queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  22. queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  23. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  24. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  25. queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  26. queryFmtCPUUsageMax = `max(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  27. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  28. queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  29. queryFmtGPUUsageAvg = `avg(avg_over_time(DCGM_FI_DEV_GPU_UTIL{container!=""}[%s]%s)) by (container, pod, namespace, %s)`
  30. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
  31. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
  32. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
  33. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
  34. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]%s`
  35. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, %s)`
  36. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
  37. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, %s)`
  38. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, %s)`
  39. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  40. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (%s)`
  41. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  42. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (%s)`
  43. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  44. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (%s)`
  45. queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s]%s)) by (pod_name, pod, namespace, %s)`
  46. queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s]%s)) by (pod_name, pod, namespace, %s)`
  47. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
  48. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
  49. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
  50. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
  51. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
  52. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
  53. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
  54. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, %s)`
  55. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,%s)`
  56. queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s]%s)) by (pod, owner_name, namespace ,%s)`
  57. queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s]%s)) by (replicaset, namespace, %s)`
  58. queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s]%s)) by (namespace, service_name, %s)`
  59. queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]%s`
  60. )
  61. // This is a bit of a hack to work around garbage data from cadvisor
  62. // Ideally you cap each pod to the max CPU on its node, but that involves a bit more complexity, as it it would need to be done when allocations joins with asset data.
  63. const MAX_CPU_CAP = 512
  64. // CanCompute should return true if CostModel can act as a valid source for the
  65. // given time range. In the case of CostModel we want to attempt to compute as
  66. // long as the range starts in the past. If the CostModel ends up not having
  67. // data to match, that's okay, and should be communicated with an error
  68. // response from ComputeAllocation.
  69. func (cm *CostModel) CanCompute(start, end time.Time) bool {
  70. return start.Before(time.Now())
  71. }
  72. // Name returns the name of the Source
  73. func (cm *CostModel) Name() string {
  74. return "CostModel"
  75. }
  76. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  77. // for the window defined by the given start and end times. The Allocations
  78. // returned are unaggregated (i.e. down to the container level).
  79. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  80. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  81. // 2. Run and apply the results of the remaining queries to
  82. // 3. Build out AllocationSet from completed Pod map
  83. // Create a window spanning the requested query
  84. window := kubecost.NewWindow(&start, &end)
  85. // Create an empty AllocationSet. For safety, in the case of an error, we
  86. // should prefer to return this empty set with the error. (In the case of
  87. // no error, of course we populate the set and return it.)
  88. allocSet := kubecost.NewAllocationSet(start, end)
  89. // (1) Build out Pod map
  90. // Build out a map of Allocations as a mapping from pod-to-container-to-
  91. // underlying-Allocation instance, starting with (start, end) so that we
  92. // begin with minutes, from which we compute resource allocation and cost
  93. // totals from measured rate data.
  94. podMap := map[podKey]*Pod{}
  95. // clusterStarts and clusterEnds record the earliest start and latest end
  96. // times, respectively, on a cluster-basis. These are used for unmounted
  97. // PVs and other "virtual" Allocations so that minutes are maximally
  98. // accurate during start-up or spin-down of a cluster
  99. clusterStart := map[string]time.Time{}
  100. clusterEnd := map[string]time.Time{}
  101. cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
  102. // (2) Run and apply remaining queries
  103. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  104. // including handling Thanos offset
  105. durStr, offStr, err := window.DurationOffsetForPrometheus()
  106. if err != nil {
  107. // Negative duration, so return empty set
  108. return allocSet, nil
  109. }
  110. // Convert resolution duration to a query-ready string
  111. resStr := timeutil.DurationString(resolution)
  112. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  113. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr, env.GetPromClusterLabel())
  114. resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
  115. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr, env.GetPromClusterLabel())
  116. resChRAMRequests := ctx.Query(queryRAMRequests)
  117. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, offStr, env.GetPromClusterLabel())
  118. resChRAMUsageAvg := ctx.Query(queryRAMUsageAvg)
  119. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, offStr, env.GetPromClusterLabel())
  120. resChRAMUsageMax := ctx.Query(queryRAMUsageMax)
  121. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr, env.GetPromClusterLabel())
  122. resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
  123. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr, env.GetPromClusterLabel())
  124. resChCPURequests := ctx.Query(queryCPURequests)
  125. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, offStr, env.GetPromClusterLabel())
  126. resChCPUUsageAvg := ctx.Query(queryCPUUsageAvg)
  127. queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, offStr, env.GetPromClusterLabel())
  128. resChCPUUsageMax := ctx.Query(queryCPUUsageMax)
  129. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr, env.GetPromClusterLabel())
  130. resChGPUsRequested := ctx.Query(queryGPUsRequested)
  131. queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, durStr, offStr, env.GetPromClusterLabel())
  132. resChGPUsAllocated := ctx.Query(queryGPUsAllocated)
  133. queryGPUUsageAvg := fmt.Sprintf(queryFmtGPUUsageAvg, durStr, offStr, env.GetPromClusterLabel())
  134. resChGPUUsageAvg := ctx.Query(queryGPUUsageAvg)
  135. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr, env.GetPromClusterLabel())
  136. resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
  137. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr, env.GetPromClusterLabel())
  138. resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
  139. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr, env.GetPromClusterLabel())
  140. resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
  141. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
  142. resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
  143. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr, offStr)
  144. resChPVCInfo := ctx.Query(queryPVCInfo)
  145. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr, env.GetPromClusterLabel())
  146. resChPVBytes := ctx.Query(queryPVBytes)
  147. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr, env.GetPromClusterLabel())
  148. resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
  149. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr, env.GetPromClusterLabel())
  150. resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
  151. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr, env.GetPromClusterLabel())
  152. resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
  153. queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, durStr, offStr, env.GetPromClusterLabel())
  154. resChNetTransferBytes := ctx.Query(queryNetTransferBytes)
  155. queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, durStr, offStr, env.GetPromClusterLabel())
  156. resChNetReceiveBytes := ctx.Query(queryNetReceiveBytes)
  157. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr, env.GetPromClusterLabel())
  158. resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
  159. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
  160. resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
  161. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr, env.GetPromClusterLabel())
  162. resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
  163. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
  164. resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
  165. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr, env.GetPromClusterLabel())
  166. resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
  167. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
  168. resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
  169. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
  170. resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
  171. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
  172. resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
  173. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
  174. resChPodLabels := ctx.Query(queryPodLabels)
  175. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
  176. resChPodAnnotations := ctx.Query(queryPodAnnotations)
  177. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
  178. resChServiceLabels := ctx.Query(queryServiceLabels)
  179. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
  180. resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
  181. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
  182. resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
  183. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr, env.GetPromClusterLabel())
  184. resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
  185. queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, durStr, offStr, env.GetPromClusterLabel())
  186. resChPodsWithReplicaSetOwner := ctx.Query(queryPodsWithReplicaSetOwner)
  187. queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, durStr, offStr, env.GetPromClusterLabel())
  188. resChReplicaSetsWithoutOwners := ctx.Query(queryReplicaSetsWithoutOwners)
  189. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr, env.GetPromClusterLabel())
  190. resChJobLabels := ctx.Query(queryJobLabels)
  191. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, offStr, env.GetPromClusterLabel())
  192. resChLBCostPerHr := ctx.Query(queryLBCostPerHr)
  193. queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr, offStr)
  194. resChLBActiveMins := ctx.Query(queryLBActiveMins)
  195. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  196. resCPURequests, _ := resChCPURequests.Await()
  197. resCPUUsageAvg, _ := resChCPUUsageAvg.Await()
  198. resCPUUsageMax, _ := resChCPUUsageMax.Await()
  199. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  200. resRAMRequests, _ := resChRAMRequests.Await()
  201. resRAMUsageAvg, _ := resChRAMUsageAvg.Await()
  202. resRAMUsageMax, _ := resChRAMUsageMax.Await()
  203. resGPUsRequested, _ := resChGPUsRequested.Await()
  204. resGPUsAllocated, _ := resChGPUsAllocated.Await()
  205. resGPUUsageAvg, _ := resChGPUUsageAvg.Await()
  206. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  207. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  208. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  209. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  210. resPVBytes, _ := resChPVBytes.Await()
  211. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  212. resPVCInfo, _ := resChPVCInfo.Await()
  213. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  214. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  215. resNetTransferBytes, _ := resChNetTransferBytes.Await()
  216. resNetReceiveBytes, _ := resChNetReceiveBytes.Await()
  217. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  218. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  219. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  220. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  221. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  222. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  223. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  224. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  225. resPodLabels, _ := resChPodLabels.Await()
  226. resPodAnnotations, _ := resChPodAnnotations.Await()
  227. resServiceLabels, _ := resChServiceLabels.Await()
  228. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  229. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  230. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  231. resPodsWithReplicaSetOwner, _ := resChPodsWithReplicaSetOwner.Await()
  232. resReplicaSetsWithoutOwners, _ := resChReplicaSetsWithoutOwners.Await()
  233. resJobLabels, _ := resChJobLabels.Await()
  234. resLBCostPerHr, _ := resChLBCostPerHr.Await()
  235. resLBActiveMins, _ := resChLBActiveMins.Await()
  236. if ctx.HasErrors() {
  237. for _, err := range ctx.Errors() {
  238. log.Errorf("CostModel.ComputeAllocation: %s", err)
  239. }
  240. return allocSet, ctx.ErrorCollection()
  241. }
  242. // We choose to apply allocation before requests in the cases of RAM and
  243. // CPU so that we can assert that allocation should always be greater than
  244. // or equal to request.
  245. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  246. applyCPUCoresRequested(podMap, resCPURequests)
  247. applyCPUCoresUsedAvg(podMap, resCPUUsageAvg)
  248. applyCPUCoresUsedMax(podMap, resCPUUsageMax)
  249. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  250. applyRAMBytesRequested(podMap, resRAMRequests)
  251. applyRAMBytesUsedAvg(podMap, resRAMUsageAvg)
  252. applyRAMBytesUsedMax(podMap, resRAMUsageMax)
  253. applyGPUsAllocated(podMap, resGPUsRequested, resGPUsAllocated)
  254. applyGPUUsageAvg(podMap, resGPUUsageAvg)
  255. applyNetworkTotals(podMap, resNetTransferBytes, resNetReceiveBytes)
  256. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  257. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  258. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  259. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  260. podLabels := resToPodLabels(resPodLabels)
  261. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  262. podAnnotations := resToPodAnnotations(resPodAnnotations)
  263. applyLabels(podMap, namespaceLabels, podLabels)
  264. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  265. serviceLabels := getServiceLabels(resServiceLabels)
  266. allocsByService := map[serviceKey][]*kubecost.Allocation{}
  267. applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
  268. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  269. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  270. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  271. podJobMap := resToPodJobMap(resJobLabels)
  272. podReplicaSetMap := resToPodReplicaSetMap(resPodsWithReplicaSetOwner, resReplicaSetsWithoutOwners)
  273. applyControllersToPods(podMap, podDeploymentMap)
  274. applyControllersToPods(podMap, podStatefulSetMap)
  275. applyControllersToPods(podMap, podDaemonSetMap)
  276. applyControllersToPods(podMap, podJobMap)
  277. applyControllersToPods(podMap, podReplicaSetMap)
  278. // TODO breakdown network costs?
  279. // Build out a map of Nodes with resource costs, discounts, and node types
  280. // for converting resource allocation data to cumulative costs.
  281. nodeMap := map[nodeKey]*NodePricing{}
  282. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  283. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  284. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  285. applyNodeSpot(nodeMap, resNodeIsSpot)
  286. applyNodeDiscount(nodeMap, cm)
  287. // Build out the map of all PVs with class, size and cost-per-hour.
  288. // Note: this does not record time running, which we may want to
  289. // include later for increased PV precision. (As long as the PV has
  290. // a PVC, we get time running there, so this is only inaccurate
  291. // for short-lived, unmounted PVs.)
  292. pvMap := map[pvKey]*PV{}
  293. buildPVMap(pvMap, resPVCostPerGiBHour)
  294. applyPVBytes(pvMap, resPVBytes)
  295. // Build out the map of all PVCs with time running, bytes requested,
  296. // and connect to the correct PV from pvMap. (If no PV exists, that
  297. // is noted, but does not result in any allocation/cost.)
  298. pvcMap := map[pvcKey]*PVC{}
  299. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  300. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  301. // Build out the relationships of pods to their PVCs. This step
  302. // populates the PVC.Count field so that PVC allocation can be
  303. // split appropriately among each pod's container allocation.
  304. podPVCMap := map[podKey][]*PVC{}
  305. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  306. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  307. // cluster representing each cluster's unmounted PVs (if necessary).
  308. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  309. lbMap := getLoadBalancerCosts(resLBCostPerHr, resLBActiveMins, resolution)
  310. applyLoadBalancersToPods(lbMap, allocsByService)
  311. // (3) Build out AllocationSet from Pod map
  312. for _, pod := range podMap {
  313. for _, alloc := range pod.Allocations {
  314. cluster := alloc.Properties.Cluster
  315. nodeName := alloc.Properties.Node
  316. namespace := alloc.Properties.Namespace
  317. pod := alloc.Properties.Pod
  318. container := alloc.Properties.Container
  319. podKey := newPodKey(cluster, namespace, pod)
  320. nodeKey := newNodeKey(cluster, nodeName)
  321. node := cm.getNodePricing(nodeMap, nodeKey)
  322. alloc.Properties.ProviderID = node.ProviderID
  323. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  324. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  325. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  326. if pvcs, ok := podPVCMap[podKey]; ok {
  327. for _, pvc := range pvcs {
  328. // Determine the (start, end) of the relationship between the
  329. // given PVC and the associated Allocation so that a precise
  330. // number of hours can be used to compute cumulative cost.
  331. s, e := alloc.Start, alloc.End
  332. if pvc.Start.After(alloc.Start) {
  333. s = pvc.Start
  334. }
  335. if pvc.End.Before(alloc.End) {
  336. e = pvc.End
  337. }
  338. minutes := e.Sub(s).Minutes()
  339. hrs := minutes / 60.0
  340. count := float64(pvc.Count)
  341. if pvc.Count < 1 {
  342. count = 1
  343. }
  344. gib := pvc.Bytes / 1024 / 1024 / 1024
  345. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  346. // Apply the size and cost of the PV to the allocation, each
  347. // weighted by count (i.e. the number of containers in the pod)
  348. // record the amount of total PVBytes Hours attributable to a given PV
  349. if alloc.PVs == nil {
  350. alloc.PVs = kubecost.PVAllocations{}
  351. }
  352. pvKey := kubecost.PVKey{
  353. Cluster: pvc.Cluster,
  354. Name: pvc.Volume.Name,
  355. }
  356. alloc.PVs[pvKey] = &kubecost.PVAllocation{
  357. ByteHours: pvc.Bytes * hrs / count,
  358. Cost: cost / count,
  359. }
  360. }
  361. }
  362. // Make sure that the name is correct (node may not be present at this
  363. // point due to it missing from queryMinutes) then insert.
  364. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  365. allocSet.Set(alloc)
  366. }
  367. }
  368. return allocSet, nil
  369. }
  370. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  371. // Assumes that window is positive and closed
  372. start, end := *window.Start(), *window.End()
  373. // Convert resolution duration to a query-ready string
  374. resStr := timeutil.DurationString(resolution)
  375. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  376. // Query for (start, end) by (pod, namespace, cluster) over the given
  377. // window, using the given resolution, and if necessary in batches no
  378. // larger than the given maximum batch size. If working in batches, track
  379. // overall progress by starting with (window.start, window.start) and
  380. // querying in batches no larger than maxBatchSize from start-to-end,
  381. // folding each result set into podMap as the results come back.
  382. coverage := kubecost.NewWindow(&start, &start)
  383. numQuery := 1
  384. for coverage.End().Before(end) {
  385. // Determine the (start, end) of the current batch
  386. batchStart := *coverage.End()
  387. batchEnd := coverage.End().Add(maxBatchSize)
  388. if batchEnd.After(end) {
  389. batchEnd = end
  390. }
  391. batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
  392. var resPods []*prom.QueryResult
  393. var err error
  394. maxTries := 3
  395. numTries := 0
  396. for resPods == nil && numTries < maxTries {
  397. numTries++
  398. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  399. // including handling Thanos offset
  400. durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
  401. if err != nil || durStr == "" {
  402. // Negative duration, so set empty results and don't query
  403. resPods = []*prom.QueryResult{}
  404. err = nil
  405. break
  406. }
  407. // Submit and profile query
  408. queryPods := fmt.Sprintf(queryFmtPods, env.GetPromClusterLabel(), durStr, resStr, offStr)
  409. queryProfile := time.Now()
  410. resPods, err = ctx.Query(queryPods).Await()
  411. if err != nil {
  412. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  413. resPods = nil
  414. }
  415. }
  416. if err != nil {
  417. return err
  418. }
  419. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  420. coverage = coverage.ExpandEnd(batchEnd)
  421. numQuery++
  422. }
  423. return nil
  424. }
  425. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  426. for _, res := range resPods {
  427. if len(res.Values) == 0 {
  428. log.Warningf("CostModel.ComputeAllocation: empty minutes result")
  429. continue
  430. }
  431. cluster, err := res.GetString(env.GetPromClusterLabel())
  432. if err != nil {
  433. cluster = env.GetClusterID()
  434. }
  435. labels, err := res.GetStrings("namespace", "pod")
  436. if err != nil {
  437. log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  438. continue
  439. }
  440. namespace := labels["namespace"]
  441. pod := labels["pod"]
  442. key := newPodKey(cluster, namespace, pod)
  443. // allocStart and allocEnd are the timestamps of the first and last
  444. // minutes the pod was running, respectively. We subtract one resolution
  445. // from allocStart because this point will actually represent the end
  446. // of the first minute. We don't subtract from allocEnd because it
  447. // already represents the end of the last minute.
  448. var allocStart, allocEnd time.Time
  449. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  450. for _, datum := range res.Values {
  451. t := time.Unix(int64(datum.Timestamp), 0)
  452. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  453. // Set the start timestamp to the earliest non-zero timestamp
  454. allocStart = t
  455. // Record adjustment coefficient, i.e. the portion of the start
  456. // timestamp to "ignore". That is, sometimes the value will be
  457. // 0.5, meaning that we should discount the time running by
  458. // half of the resolution the timestamp stands for.
  459. startAdjustmentCoeff = (1.0 - datum.Value)
  460. }
  461. if datum.Value > 0 && window.Contains(t) {
  462. // Set the end timestamp to the latest non-zero timestamp
  463. allocEnd = t
  464. // Record adjustment coefficient, i.e. the portion of the end
  465. // timestamp to "ignore". (See explanation above for start.)
  466. endAdjustmentCoeff = (1.0 - datum.Value)
  467. }
  468. }
  469. if allocStart.IsZero() || allocEnd.IsZero() {
  470. continue
  471. }
  472. // Adjust timestamps according to the resolution and the adjustment
  473. // coefficients, as described above. That is, count the start timestamp
  474. // from the beginning of the resolution, not the end. Then "reduce" the
  475. // start and end by the correct amount, in the case that the "running"
  476. // value of the first or last timestamp was not a full 1.0.
  477. allocStart = allocStart.Add(-resolution)
  478. // Note: the *100 and /100 are necessary because Duration is an int, so
  479. // 0.5, for instance, will be truncated, resulting in no adjustment.
  480. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  481. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  482. // If there is only one point with a value <= 0.5 that the start and
  483. // end timestamps both share, then we will enter this case because at
  484. // least half of a resolution will be subtracted from both the start
  485. // and the end. If that is the case, then add back half of each side
  486. // so that the pod is said to run for half a resolution total.
  487. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  488. // end up with allocEnd == allocStart and each coeff == 0.5. In
  489. // that case, add 0.25m to each side, resulting in 0.5m duration.
  490. if !allocEnd.After(allocStart) {
  491. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  492. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  493. }
  494. // Set start if unset or this datum's start time is earlier than the
  495. // current earliest time.
  496. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  497. clusterStart[cluster] = allocStart
  498. }
  499. // Set end if unset or this datum's end time is later than the
  500. // current latest time.
  501. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  502. clusterEnd[cluster] = allocEnd
  503. }
  504. if pod, ok := podMap[key]; ok {
  505. // Pod has already been recorded, so update it accordingly
  506. if allocStart.Before(pod.Start) {
  507. pod.Start = allocStart
  508. }
  509. if allocEnd.After(pod.End) {
  510. pod.End = allocEnd
  511. }
  512. } else {
  513. // Pod has not been recorded yet, so insert it
  514. podMap[key] = &Pod{
  515. Window: window.Clone(),
  516. Start: allocStart,
  517. End: allocEnd,
  518. Key: key,
  519. Allocations: map[string]*kubecost.Allocation{},
  520. }
  521. }
  522. }
  523. }
  524. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  525. for _, res := range resCPUCoresAllocated {
  526. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  527. if err != nil {
  528. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  529. continue
  530. }
  531. pod, ok := podMap[key]
  532. if !ok {
  533. continue
  534. }
  535. container, err := res.GetString("container")
  536. if err != nil {
  537. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  538. continue
  539. }
  540. if _, ok := pod.Allocations[container]; !ok {
  541. pod.AppendContainer(container)
  542. }
  543. cpuCores := res.Values[0].Value
  544. if cpuCores > MAX_CPU_CAP {
  545. klog.Infof("[WARNING] Very large cpu allocation, clamping to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
  546. cpuCores = 0.0
  547. }
  548. hours := pod.Allocations[container].Minutes() / 60.0
  549. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  550. node, err := res.GetString("node")
  551. if err != nil {
  552. log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  553. continue
  554. }
  555. pod.Allocations[container].Properties.Node = node
  556. }
  557. }
  558. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  559. for _, res := range resCPUCoresRequested {
  560. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  561. if err != nil {
  562. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  563. continue
  564. }
  565. pod, ok := podMap[key]
  566. if !ok {
  567. continue
  568. }
  569. container, err := res.GetString("container")
  570. if err != nil {
  571. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  572. continue
  573. }
  574. if _, ok := pod.Allocations[container]; !ok {
  575. pod.AppendContainer(container)
  576. }
  577. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  578. // If CPU allocation is less than requests, set CPUCoreHours to
  579. // request level.
  580. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  581. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  582. }
  583. if pod.Allocations[container].CPUCores() > MAX_CPU_CAP {
  584. klog.Infof("[WARNING] Very large cpu allocation, clamping! to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
  585. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  586. }
  587. node, err := res.GetString("node")
  588. if err != nil {
  589. log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  590. continue
  591. }
  592. pod.Allocations[container].Properties.Node = node
  593. }
  594. }
  595. func applyCPUCoresUsedAvg(podMap map[podKey]*Pod, resCPUCoresUsedAvg []*prom.QueryResult) {
  596. for _, res := range resCPUCoresUsedAvg {
  597. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  598. if err != nil {
  599. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg result missing field: %s", err)
  600. continue
  601. }
  602. pod, ok := podMap[key]
  603. if !ok {
  604. continue
  605. }
  606. container, err := res.GetString("container")
  607. if container == "" || err != nil {
  608. container, err = res.GetString("container_name")
  609. if err != nil {
  610. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg query result missing 'container': %s", key)
  611. continue
  612. }
  613. }
  614. if _, ok := pod.Allocations[container]; !ok {
  615. pod.AppendContainer(container)
  616. }
  617. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  618. if res.Values[0].Value > MAX_CPU_CAP {
  619. klog.Infof("[WARNING] Very large cpu USAGE, dropping outlier")
  620. pod.Allocations[container].CPUCoreUsageAverage = 0.0
  621. }
  622. }
  623. }
  624. func applyCPUCoresUsedMax(podMap map[podKey]*Pod, resCPUCoresUsedMax []*prom.QueryResult) {
  625. for _, res := range resCPUCoresUsedMax {
  626. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  627. if err != nil {
  628. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max result missing field: %s", err)
  629. continue
  630. }
  631. pod, ok := podMap[key]
  632. if !ok {
  633. continue
  634. }
  635. container, err := res.GetString("container")
  636. if container == "" || err != nil {
  637. container, err = res.GetString("container_name")
  638. if err != nil {
  639. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max query result missing 'container': %s", key)
  640. continue
  641. }
  642. }
  643. if _, ok := pod.Allocations[container]; !ok {
  644. pod.AppendContainer(container)
  645. }
  646. if pod.Allocations[container].RawAllocationOnly == nil {
  647. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  648. CPUCoreUsageMax: res.Values[0].Value,
  649. }
  650. } else {
  651. pod.Allocations[container].RawAllocationOnly.CPUCoreUsageMax = res.Values[0].Value
  652. }
  653. }
  654. }
  655. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  656. for _, res := range resRAMBytesAllocated {
  657. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  658. if err != nil {
  659. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  660. continue
  661. }
  662. pod, ok := podMap[key]
  663. if !ok {
  664. continue
  665. }
  666. container, err := res.GetString("container")
  667. if err != nil {
  668. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  669. continue
  670. }
  671. if _, ok := pod.Allocations[container]; !ok {
  672. pod.AppendContainer(container)
  673. }
  674. ramBytes := res.Values[0].Value
  675. hours := pod.Allocations[container].Minutes() / 60.0
  676. pod.Allocations[container].RAMByteHours = ramBytes * hours
  677. node, err := res.GetString("node")
  678. if err != nil {
  679. log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  680. continue
  681. }
  682. pod.Allocations[container].Properties.Node = node
  683. }
  684. }
  685. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  686. for _, res := range resRAMBytesRequested {
  687. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  688. if err != nil {
  689. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  690. continue
  691. }
  692. pod, ok := podMap[key]
  693. if !ok {
  694. continue
  695. }
  696. container, err := res.GetString("container")
  697. if err != nil {
  698. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  699. continue
  700. }
  701. if _, ok := pod.Allocations[container]; !ok {
  702. pod.AppendContainer(container)
  703. }
  704. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  705. // If RAM allocation is less than requests, set RAMByteHours to
  706. // request level.
  707. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  708. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  709. }
  710. node, err := res.GetString("node")
  711. if err != nil {
  712. log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  713. continue
  714. }
  715. pod.Allocations[container].Properties.Node = node
  716. }
  717. }
  718. func applyRAMBytesUsedAvg(podMap map[podKey]*Pod, resRAMBytesUsedAvg []*prom.QueryResult) {
  719. for _, res := range resRAMBytesUsedAvg {
  720. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  721. if err != nil {
  722. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM avg usage result missing field: %s", err)
  723. continue
  724. }
  725. pod, ok := podMap[key]
  726. if !ok {
  727. continue
  728. }
  729. container, err := res.GetString("container")
  730. if container == "" || err != nil {
  731. container, err = res.GetString("container_name")
  732. if err != nil {
  733. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage avg query result missing 'container': %s", key)
  734. continue
  735. }
  736. }
  737. if _, ok := pod.Allocations[container]; !ok {
  738. pod.AppendContainer(container)
  739. }
  740. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  741. }
  742. }
  743. func applyRAMBytesUsedMax(podMap map[podKey]*Pod, resRAMBytesUsedMax []*prom.QueryResult) {
  744. for _, res := range resRAMBytesUsedMax {
  745. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  746. if err != nil {
  747. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max result missing field: %s", err)
  748. continue
  749. }
  750. pod, ok := podMap[key]
  751. if !ok {
  752. continue
  753. }
  754. container, err := res.GetString("container")
  755. if container == "" || err != nil {
  756. container, err = res.GetString("container_name")
  757. if err != nil {
  758. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max query result missing 'container': %s", key)
  759. continue
  760. }
  761. }
  762. if _, ok := pod.Allocations[container]; !ok {
  763. pod.AppendContainer(container)
  764. }
  765. if pod.Allocations[container].RawAllocationOnly == nil {
  766. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  767. RAMBytesUsageMax: res.Values[0].Value,
  768. }
  769. } else {
  770. pod.Allocations[container].RawAllocationOnly.RAMBytesUsageMax = res.Values[0].Value
  771. }
  772. }
  773. }
  774. func applyGPUsAllocated(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult, resGPUsAllocated []*prom.QueryResult) {
  775. if len(resGPUsAllocated) > 0 { // Use the new query, when it's become available in a window
  776. resGPUsRequested = resGPUsAllocated
  777. }
  778. for _, res := range resGPUsRequested {
  779. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  780. if err != nil {
  781. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  782. continue
  783. }
  784. pod, ok := podMap[key]
  785. if !ok {
  786. continue
  787. }
  788. container, err := res.GetString("container")
  789. if err != nil {
  790. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  791. continue
  792. }
  793. if _, ok := pod.Allocations[container]; !ok {
  794. pod.AppendContainer(container)
  795. }
  796. hrs := pod.Allocations[container].Minutes() / 60.0
  797. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  798. pod.Allocations[container].GPURequestAverage = res.Values[0].Value
  799. }
  800. }
  801. func applyGPUUsageAvg(podMap map[podKey]*Pod, resGPUUsageAvg []*prom.QueryResult) {
  802. for _, res := range resGPUUsageAvg {
  803. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  804. if err != nil {
  805. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU usage avg result missing field: %s", err)
  806. continue
  807. }
  808. pod, ok := podMap[key]
  809. if !ok {
  810. continue
  811. }
  812. container, err := res.GetString("container")
  813. if err != nil {
  814. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU usage avg query result missing 'container': %s", key)
  815. continue
  816. }
  817. if _, ok := pod.Allocations[container]; !ok {
  818. pod.AppendContainer(container)
  819. }
  820. log.Infof("COMPUTEALLOCATION TESTING: GPU AVERAGE FOR POD CONTAINER %s: %f", container, res.Values[0].Value)
  821. pod.Allocations[container].GPUUsageAverage = res.Values[0].Value * 0.1
  822. }
  823. }
  824. func applyNetworkTotals(podMap map[podKey]*Pod, resNetworkTransferBytes []*prom.QueryResult, resNetworkReceiveBytes []*prom.QueryResult) {
  825. for _, res := range resNetworkTransferBytes {
  826. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  827. if err != nil {
  828. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Transfer Bytes query result missing field: %s", err)
  829. continue
  830. }
  831. pod, ok := podMap[podKey]
  832. if !ok {
  833. continue
  834. }
  835. for _, alloc := range pod.Allocations {
  836. alloc.NetworkTransferBytes = res.Values[0].Value / float64(len(pod.Allocations))
  837. }
  838. }
  839. for _, res := range resNetworkReceiveBytes {
  840. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  841. if err != nil {
  842. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Receive Bytes query result missing field: %s", err)
  843. continue
  844. }
  845. pod, ok := podMap[podKey]
  846. if !ok {
  847. continue
  848. }
  849. for _, alloc := range pod.Allocations {
  850. alloc.NetworkReceiveBytes = res.Values[0].Value / float64(len(pod.Allocations))
  851. }
  852. }
  853. }
  854. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  855. costPerGiBByCluster := map[string]float64{}
  856. for _, res := range resNetworkCostPerGiB {
  857. cluster, err := res.GetString(env.GetPromClusterLabel())
  858. if err != nil {
  859. cluster = env.GetClusterID()
  860. }
  861. costPerGiBByCluster[cluster] = res.Values[0].Value
  862. }
  863. for _, res := range resNetworkGiB {
  864. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  865. if err != nil {
  866. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  867. continue
  868. }
  869. pod, ok := podMap[podKey]
  870. if !ok {
  871. continue
  872. }
  873. for _, alloc := range pod.Allocations {
  874. gib := res.Values[0].Value / float64(len(pod.Allocations))
  875. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  876. alloc.NetworkCost = gib * costPerGiB
  877. }
  878. }
  879. }
  880. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
  881. namespaceLabels := map[namespaceKey]map[string]string{}
  882. for _, res := range resNamespaceLabels {
  883. nsKey, err := resultNamespaceKey(res, env.GetPromClusterLabel(), "namespace")
  884. if err != nil {
  885. continue
  886. }
  887. if _, ok := namespaceLabels[nsKey]; !ok {
  888. namespaceLabels[nsKey] = map[string]string{}
  889. }
  890. for k, l := range res.GetLabels() {
  891. namespaceLabels[nsKey][k] = l
  892. }
  893. }
  894. return namespaceLabels
  895. }
  896. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  897. podLabels := map[podKey]map[string]string{}
  898. for _, res := range resPodLabels {
  899. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  900. if err != nil {
  901. continue
  902. }
  903. if _, ok := podLabels[podKey]; !ok {
  904. podLabels[podKey] = map[string]string{}
  905. }
  906. for k, l := range res.GetLabels() {
  907. podLabels[podKey][k] = l
  908. }
  909. }
  910. return podLabels
  911. }
  912. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  913. namespaceAnnotations := map[string]map[string]string{}
  914. for _, res := range resNamespaceAnnotations {
  915. namespace, err := res.GetString("namespace")
  916. if err != nil {
  917. continue
  918. }
  919. if _, ok := namespaceAnnotations[namespace]; !ok {
  920. namespaceAnnotations[namespace] = map[string]string{}
  921. }
  922. for k, l := range res.GetAnnotations() {
  923. namespaceAnnotations[namespace][k] = l
  924. }
  925. }
  926. return namespaceAnnotations
  927. }
  928. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  929. podAnnotations := map[podKey]map[string]string{}
  930. for _, res := range resPodAnnotations {
  931. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  932. if err != nil {
  933. continue
  934. }
  935. if _, ok := podAnnotations[podKey]; !ok {
  936. podAnnotations[podKey] = map[string]string{}
  937. }
  938. for k, l := range res.GetAnnotations() {
  939. podAnnotations[podKey][k] = l
  940. }
  941. }
  942. return podAnnotations
  943. }
  944. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
  945. for podKey, pod := range podMap {
  946. for _, alloc := range pod.Allocations {
  947. allocLabels := alloc.Properties.Labels
  948. if allocLabels == nil {
  949. allocLabels = make(map[string]string)
  950. }
  951. // Apply namespace labels first, then pod labels so that pod labels
  952. // overwrite namespace labels.
  953. nsKey := podKey.namespaceKey // newNamespaceKey(podKey.Cluster, podKey.Namespace)
  954. if labels, ok := namespaceLabels[nsKey]; ok {
  955. for k, v := range labels {
  956. allocLabels[k] = v
  957. }
  958. }
  959. if labels, ok := podLabels[podKey]; ok {
  960. for k, v := range labels {
  961. allocLabels[k] = v
  962. }
  963. }
  964. alloc.Properties.Labels = allocLabels
  965. }
  966. }
  967. }
  968. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  969. for key, pod := range podMap {
  970. for _, alloc := range pod.Allocations {
  971. allocAnnotations := alloc.Properties.Annotations
  972. if allocAnnotations == nil {
  973. allocAnnotations = make(map[string]string)
  974. }
  975. // Apply namespace annotations first, then pod annotations so that
  976. // pod labels overwrite namespace labels.
  977. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  978. for k, v := range labels {
  979. allocAnnotations[k] = v
  980. }
  981. }
  982. if labels, ok := podAnnotations[key]; ok {
  983. for k, v := range labels {
  984. allocAnnotations[k] = v
  985. }
  986. }
  987. alloc.Properties.Annotations = allocAnnotations
  988. }
  989. }
  990. }
  991. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  992. serviceLabels := map[serviceKey]map[string]string{}
  993. for _, res := range resServiceLabels {
  994. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service")
  995. if err != nil {
  996. continue
  997. }
  998. if _, ok := serviceLabels[serviceKey]; !ok {
  999. serviceLabels[serviceKey] = map[string]string{}
  1000. }
  1001. for k, l := range res.GetLabels() {
  1002. serviceLabels[serviceKey][k] = l
  1003. }
  1004. }
  1005. // Prune duplicate services. That is, if the same service exists with
  1006. // hyphens instead of underscores, keep the one that uses hyphens.
  1007. for key := range serviceLabels {
  1008. if strings.Contains(key.Service, "_") {
  1009. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  1010. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  1011. if _, ok := serviceLabels[duplicateKey]; ok {
  1012. delete(serviceLabels, key)
  1013. }
  1014. }
  1015. }
  1016. return serviceLabels
  1017. }
  1018. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  1019. deploymentLabels := map[controllerKey]map[string]string{}
  1020. for _, res := range resDeploymentLabels {
  1021. controllerKey, err := resultDeploymentKey(res, env.GetPromClusterLabel(), "namespace", "deployment")
  1022. if err != nil {
  1023. continue
  1024. }
  1025. if _, ok := deploymentLabels[controllerKey]; !ok {
  1026. deploymentLabels[controllerKey] = map[string]string{}
  1027. }
  1028. for k, l := range res.GetLabels() {
  1029. deploymentLabels[controllerKey][k] = l
  1030. }
  1031. }
  1032. // Prune duplicate deployments. That is, if the same deployment exists with
  1033. // hyphens instead of underscores, keep the one that uses hyphens.
  1034. for key := range deploymentLabels {
  1035. if strings.Contains(key.Controller, "_") {
  1036. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  1037. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  1038. if _, ok := deploymentLabels[duplicateKey]; ok {
  1039. delete(deploymentLabels, key)
  1040. }
  1041. }
  1042. }
  1043. return deploymentLabels
  1044. }
  1045. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  1046. statefulSetLabels := map[controllerKey]map[string]string{}
  1047. for _, res := range resStatefulSetLabels {
  1048. controllerKey, err := resultStatefulSetKey(res, env.GetPromClusterLabel(), "namespace", "statefulSet")
  1049. if err != nil {
  1050. continue
  1051. }
  1052. if _, ok := statefulSetLabels[controllerKey]; !ok {
  1053. statefulSetLabels[controllerKey] = map[string]string{}
  1054. }
  1055. for k, l := range res.GetLabels() {
  1056. statefulSetLabels[controllerKey][k] = l
  1057. }
  1058. }
  1059. // Prune duplicate stateful sets. That is, if the same stateful set exists
  1060. // with hyphens instead of underscores, keep the one that uses hyphens.
  1061. for key := range statefulSetLabels {
  1062. if strings.Contains(key.Controller, "_") {
  1063. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  1064. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  1065. if _, ok := statefulSetLabels[duplicateKey]; ok {
  1066. delete(statefulSetLabels, key)
  1067. }
  1068. }
  1069. }
  1070. return statefulSetLabels
  1071. }
  1072. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  1073. podControllerMap := map[podKey]controllerKey{}
  1074. // For each controller, turn the labels into a selector and attempt to
  1075. // match it with each set of pod labels. A match indicates that the pod
  1076. // belongs to the controller.
  1077. for cKey, cLabels := range controllerLabels {
  1078. selector := labels.Set(cLabels).AsSelectorPreValidated()
  1079. for pKey, pLabels := range podLabels {
  1080. // If the pod is in a different cluster or namespace, there is
  1081. // no need to compare the labels.
  1082. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  1083. continue
  1084. }
  1085. podLabelSet := labels.Set(pLabels)
  1086. if selector.Matches(podLabelSet) {
  1087. if _, ok := podControllerMap[pKey]; ok {
  1088. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  1089. }
  1090. podControllerMap[pKey] = cKey
  1091. }
  1092. }
  1093. }
  1094. return podControllerMap
  1095. }
  1096. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  1097. daemonSetLabels := map[podKey]controllerKey{}
  1098. for _, res := range resDaemonSetLabels {
  1099. controllerKey, err := resultDaemonSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1100. if err != nil {
  1101. continue
  1102. }
  1103. pod, err := res.GetString("pod")
  1104. if err != nil {
  1105. log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  1106. }
  1107. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1108. daemonSetLabels[podKey] = controllerKey
  1109. }
  1110. return daemonSetLabels
  1111. }
  1112. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  1113. jobLabels := map[podKey]controllerKey{}
  1114. for _, res := range resJobLabels {
  1115. controllerKey, err := resultJobKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1116. if err != nil {
  1117. continue
  1118. }
  1119. // Convert the name of Jobs generated by CronJobs to the name of the
  1120. // CronJob by stripping the timestamp off the end.
  1121. match := isCron.FindStringSubmatch(controllerKey.Controller)
  1122. if match != nil {
  1123. controllerKey.Controller = match[1]
  1124. }
  1125. pod, err := res.GetString("pod")
  1126. if err != nil {
  1127. log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  1128. }
  1129. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1130. jobLabels[podKey] = controllerKey
  1131. }
  1132. return jobLabels
  1133. }
  1134. func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*prom.QueryResult, resReplicaSetsWithoutOwners []*prom.QueryResult) map[podKey]controllerKey {
  1135. // Build out set of ReplicaSets that have no owners, themselves, such that
  1136. // the ReplicaSet should be used as the owner of the Pods it controls.
  1137. // (This should exclude, for example, ReplicaSets that are controlled by
  1138. // Deployments, in which case the Deployment should be the Pod's owner.)
  1139. replicaSets := map[controllerKey]struct{}{}
  1140. for _, res := range resReplicaSetsWithoutOwners {
  1141. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "replicaset")
  1142. if err != nil {
  1143. continue
  1144. }
  1145. replicaSets[controllerKey] = struct{}{}
  1146. }
  1147. // Create the mapping of Pods to ReplicaSets, ignoring any ReplicaSets that
  1148. // to not appear in the set of uncontrolled ReplicaSets above.
  1149. podToReplicaSet := map[podKey]controllerKey{}
  1150. for _, res := range resPodsWithReplicaSetOwner {
  1151. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1152. if err != nil {
  1153. continue
  1154. }
  1155. if _, ok := replicaSets[controllerKey]; !ok {
  1156. continue
  1157. }
  1158. pod, err := res.GetString("pod")
  1159. if err != nil {
  1160. log.Warningf("CostModel.ComputeAllocation: ReplicaSet result without pod: %s", controllerKey)
  1161. }
  1162. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1163. podToReplicaSet[podKey] = controllerKey
  1164. }
  1165. return podToReplicaSet
  1166. }
  1167. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, allocsByService map[serviceKey][]*kubecost.Allocation, serviceLabels map[serviceKey]map[string]string) {
  1168. podServicesMap := map[podKey][]serviceKey{}
  1169. // For each service, turn the labels into a selector and attempt to
  1170. // match it with each set of pod labels. A match indicates that the pod
  1171. // belongs to the service.
  1172. for sKey, sLabels := range serviceLabels {
  1173. selector := labels.Set(sLabels).AsSelectorPreValidated()
  1174. for pKey, pLabels := range podLabels {
  1175. // If the pod is in a different cluster or namespace, there is
  1176. // no need to compare the labels.
  1177. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  1178. continue
  1179. }
  1180. podLabelSet := labels.Set(pLabels)
  1181. if selector.Matches(podLabelSet) {
  1182. if _, ok := podServicesMap[pKey]; !ok {
  1183. podServicesMap[pKey] = []serviceKey{}
  1184. }
  1185. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  1186. }
  1187. }
  1188. }
  1189. // For each allocation in each pod, attempt to find and apply the list of
  1190. // services associated with the allocation's pod.
  1191. for key, pod := range podMap {
  1192. for _, alloc := range pod.Allocations {
  1193. if sKeys, ok := podServicesMap[key]; ok {
  1194. services := []string{}
  1195. for _, sKey := range sKeys {
  1196. services = append(services, sKey.Service)
  1197. allocsByService[sKey] = append(allocsByService[sKey], alloc)
  1198. }
  1199. alloc.Properties.Services = services
  1200. }
  1201. }
  1202. }
  1203. }
  1204. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  1205. for key, pod := range podMap {
  1206. for _, alloc := range pod.Allocations {
  1207. if controllerKey, ok := podControllerMap[key]; ok {
  1208. alloc.Properties.ControllerKind = controllerKey.ControllerKind
  1209. alloc.Properties.Controller = controllerKey.Controller
  1210. }
  1211. }
  1212. }
  1213. }
  1214. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  1215. for _, res := range resNodeCostPerCPUHr {
  1216. cluster, err := res.GetString(env.GetPromClusterLabel())
  1217. if err != nil {
  1218. cluster = env.GetClusterID()
  1219. }
  1220. node, err := res.GetString("node")
  1221. if err != nil {
  1222. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1223. continue
  1224. }
  1225. instanceType, err := res.GetString("instance_type")
  1226. if err != nil {
  1227. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1228. continue
  1229. }
  1230. providerID, err := res.GetString("provider_id")
  1231. if err != nil {
  1232. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1233. continue
  1234. }
  1235. key := newNodeKey(cluster, node)
  1236. if _, ok := nodeMap[key]; !ok {
  1237. nodeMap[key] = &NodePricing{
  1238. Name: node,
  1239. NodeType: instanceType,
  1240. ProviderID: cloud.ParseID(providerID),
  1241. }
  1242. }
  1243. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  1244. }
  1245. }
  1246. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  1247. for _, res := range resNodeCostPerRAMGiBHr {
  1248. cluster, err := res.GetString(env.GetPromClusterLabel())
  1249. if err != nil {
  1250. cluster = env.GetClusterID()
  1251. }
  1252. node, err := res.GetString("node")
  1253. if err != nil {
  1254. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1255. continue
  1256. }
  1257. instanceType, err := res.GetString("instance_type")
  1258. if err != nil {
  1259. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1260. continue
  1261. }
  1262. providerID, err := res.GetString("provider_id")
  1263. if err != nil {
  1264. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1265. continue
  1266. }
  1267. key := newNodeKey(cluster, node)
  1268. if _, ok := nodeMap[key]; !ok {
  1269. nodeMap[key] = &NodePricing{
  1270. Name: node,
  1271. NodeType: instanceType,
  1272. ProviderID: cloud.ParseID(providerID),
  1273. }
  1274. }
  1275. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1276. }
  1277. }
  1278. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1279. for _, res := range resNodeCostPerGPUHr {
  1280. cluster, err := res.GetString(env.GetPromClusterLabel())
  1281. if err != nil {
  1282. cluster = env.GetClusterID()
  1283. }
  1284. node, err := res.GetString("node")
  1285. if err != nil {
  1286. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1287. continue
  1288. }
  1289. instanceType, err := res.GetString("instance_type")
  1290. if err != nil {
  1291. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1292. continue
  1293. }
  1294. providerID, err := res.GetString("provider_id")
  1295. if err != nil {
  1296. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1297. continue
  1298. }
  1299. key := newNodeKey(cluster, node)
  1300. if _, ok := nodeMap[key]; !ok {
  1301. nodeMap[key] = &NodePricing{
  1302. Name: node,
  1303. NodeType: instanceType,
  1304. ProviderID: cloud.ParseID(providerID),
  1305. }
  1306. }
  1307. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1308. }
  1309. }
  1310. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1311. for _, res := range resNodeIsSpot {
  1312. cluster, err := res.GetString(env.GetPromClusterLabel())
  1313. if err != nil {
  1314. cluster = env.GetClusterID()
  1315. }
  1316. node, err := res.GetString("node")
  1317. if err != nil {
  1318. log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1319. continue
  1320. }
  1321. key := newNodeKey(cluster, node)
  1322. if _, ok := nodeMap[key]; !ok {
  1323. log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1324. continue
  1325. }
  1326. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1327. }
  1328. }
  1329. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1330. if cm == nil {
  1331. return
  1332. }
  1333. c, err := cm.Provider.GetConfig()
  1334. if err != nil {
  1335. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1336. return
  1337. }
  1338. discount, err := ParsePercentString(c.Discount)
  1339. if err != nil {
  1340. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1341. return
  1342. }
  1343. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1344. if err != nil {
  1345. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1346. return
  1347. }
  1348. for _, node := range nodeMap {
  1349. // TODO GKE Reserved Instances into account
  1350. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1351. node.CostPerCPUHr *= (1.0 - node.Discount)
  1352. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1353. }
  1354. }
  1355. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1356. for _, res := range resPVCostPerGiBHour {
  1357. cluster, err := res.GetString(env.GetPromClusterLabel())
  1358. if err != nil {
  1359. cluster = env.GetClusterID()
  1360. }
  1361. name, err := res.GetString("volumename")
  1362. if err != nil {
  1363. log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
  1364. continue
  1365. }
  1366. key := newPVKey(cluster, name)
  1367. pvMap[key] = &PV{
  1368. Cluster: cluster,
  1369. Name: name,
  1370. CostPerGiBHour: res.Values[0].Value,
  1371. }
  1372. }
  1373. }
  1374. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1375. for _, res := range resPVBytes {
  1376. key, err := resultPVKey(res, env.GetPromClusterLabel(), "persistentvolume")
  1377. if err != nil {
  1378. log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1379. continue
  1380. }
  1381. if _, ok := pvMap[key]; !ok {
  1382. log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1383. continue
  1384. }
  1385. pvMap[key].Bytes = res.Values[0].Value
  1386. }
  1387. }
  1388. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1389. for _, res := range resPVCInfo {
  1390. cluster, err := res.GetString(env.GetPromClusterLabel())
  1391. if err != nil {
  1392. cluster = env.GetClusterID()
  1393. }
  1394. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1395. if err != nil {
  1396. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1397. continue
  1398. }
  1399. namespace := values["namespace"]
  1400. name := values["persistentvolumeclaim"]
  1401. volume := values["volumename"]
  1402. storageClass := values["storageclass"]
  1403. pvKey := newPVKey(cluster, volume)
  1404. pvcKey := newPVCKey(cluster, namespace, name)
  1405. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1406. // the PVC was running, respectively. We subtract 1m from pvcStart
  1407. // because this point will actually represent the end of the first
  1408. // minute. We don't subtract from pvcEnd because it already represents
  1409. // the end of the last minute.
  1410. var pvcStart, pvcEnd time.Time
  1411. for _, datum := range res.Values {
  1412. t := time.Unix(int64(datum.Timestamp), 0)
  1413. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1414. pvcStart = t
  1415. }
  1416. if datum.Value > 0 && window.Contains(t) {
  1417. pvcEnd = t
  1418. }
  1419. }
  1420. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1421. log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1422. }
  1423. pvcStart = pvcStart.Add(-time.Minute)
  1424. if _, ok := pvMap[pvKey]; !ok {
  1425. continue
  1426. }
  1427. pvMap[pvKey].StorageClass = storageClass
  1428. if _, ok := pvcMap[pvcKey]; !ok {
  1429. pvcMap[pvcKey] = &PVC{}
  1430. }
  1431. pvcMap[pvcKey].Name = name
  1432. pvcMap[pvcKey].Namespace = namespace
  1433. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1434. pvcMap[pvcKey].Start = pvcStart
  1435. pvcMap[pvcKey].End = pvcEnd
  1436. }
  1437. }
  1438. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1439. for _, res := range resPVCBytesRequested {
  1440. key, err := resultPVCKey(res, env.GetPromClusterLabel(), "namespace", "persistentvolumeclaim")
  1441. if err != nil {
  1442. continue
  1443. }
  1444. if _, ok := pvcMap[key]; !ok {
  1445. continue
  1446. }
  1447. pvcMap[key].Bytes = res.Values[0].Value
  1448. }
  1449. }
  1450. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1451. for _, res := range resPodPVCAllocation {
  1452. cluster, err := res.GetString(env.GetPromClusterLabel())
  1453. if err != nil {
  1454. cluster = env.GetClusterID()
  1455. }
  1456. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1457. if err != nil {
  1458. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1459. continue
  1460. }
  1461. namespace := values["namespace"]
  1462. pod := values["pod"]
  1463. name := values["persistentvolumeclaim"]
  1464. volume := values["persistentvolume"]
  1465. podKey := newPodKey(cluster, namespace, pod)
  1466. pvKey := newPVKey(cluster, volume)
  1467. pvcKey := newPVCKey(cluster, namespace, name)
  1468. if _, ok := pvMap[pvKey]; !ok {
  1469. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1470. continue
  1471. }
  1472. if _, ok := podPVCMap[podKey]; !ok {
  1473. podPVCMap[podKey] = []*PVC{}
  1474. }
  1475. pvc, ok := pvcMap[pvcKey]
  1476. if !ok {
  1477. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1478. continue
  1479. }
  1480. count := 1
  1481. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1482. count = len(pod.Allocations)
  1483. } else {
  1484. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1485. }
  1486. pvc.Count = count
  1487. pvc.Mounted = true
  1488. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1489. }
  1490. }
  1491. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1492. unmountedPVBytes := map[string]float64{}
  1493. unmountedPVCost := map[string]float64{}
  1494. for _, pv := range pvMap {
  1495. mounted := false
  1496. for _, pvc := range pvcMap {
  1497. if pvc.Volume == nil {
  1498. continue
  1499. }
  1500. if pvc.Volume == pv {
  1501. mounted = true
  1502. break
  1503. }
  1504. }
  1505. if !mounted {
  1506. gib := pv.Bytes / 1024 / 1024 / 1024
  1507. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1508. cost := pv.CostPerGiBHour * gib * hrs
  1509. unmountedPVCost[pv.Cluster] += cost
  1510. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1511. }
  1512. }
  1513. for cluster, amount := range unmountedPVCost {
  1514. container := kubecost.UnmountedSuffix
  1515. pod := kubecost.UnmountedSuffix
  1516. namespace := kubecost.UnmountedSuffix
  1517. node := ""
  1518. key := newPodKey(cluster, namespace, pod)
  1519. podMap[key] = &Pod{
  1520. Window: window.Clone(),
  1521. Start: *window.Start(),
  1522. End: *window.End(),
  1523. Key: key,
  1524. Allocations: map[string]*kubecost.Allocation{},
  1525. }
  1526. podMap[key].AppendContainer(container)
  1527. podMap[key].Allocations[container].Properties.Cluster = cluster
  1528. podMap[key].Allocations[container].Properties.Node = node
  1529. podMap[key].Allocations[container].Properties.Namespace = namespace
  1530. podMap[key].Allocations[container].Properties.Pod = pod
  1531. podMap[key].Allocations[container].Properties.Container = container
  1532. pvKey := kubecost.PVKey{
  1533. Cluster: cluster,
  1534. Name: kubecost.UnmountedSuffix,
  1535. }
  1536. unmountedPVs := kubecost.PVAllocations{
  1537. pvKey: {
  1538. ByteHours: unmountedPVBytes[cluster] * window.Minutes() / 60.0,
  1539. Cost: amount,
  1540. },
  1541. }
  1542. podMap[key].Allocations[container].PVs = podMap[key].Allocations[container].PVs.Add(unmountedPVs)
  1543. }
  1544. }
  1545. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1546. unmountedPVCBytes := map[namespaceKey]float64{}
  1547. unmountedPVCCost := map[namespaceKey]float64{}
  1548. for _, pvc := range pvcMap {
  1549. if !pvc.Mounted && pvc.Volume != nil {
  1550. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1551. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1552. hrs := pvc.Minutes() / 60.0
  1553. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1554. unmountedPVCCost[key] += cost
  1555. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1556. }
  1557. }
  1558. for key, amount := range unmountedPVCCost {
  1559. container := kubecost.UnmountedSuffix
  1560. pod := kubecost.UnmountedSuffix
  1561. namespace := key.Namespace
  1562. node := ""
  1563. cluster := key.Cluster
  1564. podKey := newPodKey(cluster, namespace, pod)
  1565. podMap[podKey] = &Pod{
  1566. Window: window.Clone(),
  1567. Start: *window.Start(),
  1568. End: *window.End(),
  1569. Key: podKey,
  1570. Allocations: map[string]*kubecost.Allocation{},
  1571. }
  1572. podMap[podKey].AppendContainer(container)
  1573. podMap[podKey].Allocations[container].Properties.Cluster = cluster
  1574. podMap[podKey].Allocations[container].Properties.Node = node
  1575. podMap[podKey].Allocations[container].Properties.Namespace = namespace
  1576. podMap[podKey].Allocations[container].Properties.Pod = pod
  1577. podMap[podKey].Allocations[container].Properties.Container = container
  1578. pvKey := kubecost.PVKey{
  1579. Cluster: cluster,
  1580. Name: kubecost.UnmountedSuffix,
  1581. }
  1582. unmountedPVs := kubecost.PVAllocations{
  1583. pvKey: {
  1584. ByteHours: unmountedPVCBytes[key] * window.Minutes() / 60.0,
  1585. Cost: amount,
  1586. },
  1587. }
  1588. podMap[podKey].Allocations[container].PVs = podMap[podKey].Allocations[container].PVs.Add(unmountedPVs)
  1589. }
  1590. }
  1591. // LB describes the start and end time of a Load Balancer along with cost
  1592. type LB struct {
  1593. TotalCost float64
  1594. Start time.Time
  1595. End time.Time
  1596. }
  1597. func getLoadBalancerCosts(resLBCost, resLBActiveMins []*prom.QueryResult, resolution time.Duration) map[serviceKey]*LB {
  1598. lbMap := make(map[serviceKey]*LB)
  1599. lbHourlyCosts := make(map[serviceKey]float64)
  1600. for _, res := range resLBCost {
  1601. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  1602. if err != nil {
  1603. continue
  1604. }
  1605. lbHourlyCosts[serviceKey] = res.Values[0].Value
  1606. }
  1607. for _, res := range resLBActiveMins {
  1608. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  1609. if err != nil || len(res.Values) == 0 {
  1610. continue
  1611. }
  1612. if _, ok := lbHourlyCosts[serviceKey]; !ok {
  1613. log.Warningf("CostModel: failed to find hourly cost for Load Balancer: %v", serviceKey)
  1614. continue
  1615. }
  1616. s := time.Unix(int64(res.Values[0].Timestamp), 0)
  1617. // subtract resolution from start time to cover full time period
  1618. s = s.Add(-resolution)
  1619. e := time.Unix(int64(res.Values[len(res.Values)-1].Timestamp), 0)
  1620. hours := e.Sub(s).Hours()
  1621. lbMap[serviceKey] = &LB{
  1622. TotalCost: lbHourlyCosts[serviceKey] * hours,
  1623. Start: s,
  1624. End: e,
  1625. }
  1626. }
  1627. return lbMap
  1628. }
  1629. func applyLoadBalancersToPods(lbMap map[serviceKey]*LB, allocsByService map[serviceKey][]*kubecost.Allocation) {
  1630. for sKey, lb := range lbMap {
  1631. totalHours := 0.0
  1632. allocHours := make(map[*kubecost.Allocation]float64)
  1633. // Add portion of load balancing cost to each allocation
  1634. // proportional to the total number of hours allocations used the load balancer
  1635. for _, alloc := range allocsByService[sKey] {
  1636. // Determine the (start, end) of the relationship between the
  1637. // given LB and the associated Allocation so that a precise
  1638. // number of hours can be used to compute cumulative cost.
  1639. s, e := alloc.Start, alloc.End
  1640. if lb.Start.After(alloc.Start) {
  1641. s = lb.Start
  1642. }
  1643. if lb.End.Before(alloc.End) {
  1644. e = lb.End
  1645. }
  1646. hours := e.Sub(s).Hours()
  1647. // A negative number of hours signifies no overlap between the windows
  1648. if hours > 0 {
  1649. totalHours += hours
  1650. allocHours[alloc] = hours
  1651. }
  1652. }
  1653. // Distribute cost of service once total hours is calculated
  1654. for alloc, hours := range allocHours {
  1655. alloc.LoadBalancerCost += lb.TotalCost * hours / totalHours
  1656. }
  1657. }
  1658. }
  1659. // getNodePricing determines node pricing, given a key and a mapping from keys
  1660. // to their NodePricing instances, as well as the custom pricing configuration
  1661. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1662. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1663. // back on custom pricing as a default.
  1664. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1665. // Find the relevant NodePricing, if it exists. If not, substitute the
  1666. // custom NodePricing as a default.
  1667. node, ok := nodeMap[nodeKey]
  1668. if !ok || node == nil {
  1669. if nodeKey.Node != "" {
  1670. log.DedupedWarningf(5, "CostModel: failed to find node for %s", nodeKey)
  1671. }
  1672. return cm.getCustomNodePricing(false)
  1673. }
  1674. // If custom pricing is enabled and can be retrieved, override detected
  1675. // node pricing with the custom values.
  1676. customPricingConfig, err := cm.Provider.GetConfig()
  1677. if err != nil {
  1678. log.Warningf("CostModel: failed to load custom pricing: %s", err)
  1679. }
  1680. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1681. return cm.getCustomNodePricing(node.Preemptible)
  1682. }
  1683. node.Source = "prometheus"
  1684. // If any of the values are NaN or zero, replace them with the custom
  1685. // values as default.
  1686. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1687. // them as strings like this?
  1688. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1689. log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1690. cpuCostStr := customPricingConfig.CPU
  1691. if node.Preemptible {
  1692. cpuCostStr = customPricingConfig.SpotCPU
  1693. }
  1694. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1695. if err != nil {
  1696. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1697. }
  1698. node.CostPerCPUHr = costPerCPUHr
  1699. node.Source += "/customCPU"
  1700. }
  1701. if math.IsNaN(node.CostPerGPUHr) {
  1702. log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1703. gpuCostStr := customPricingConfig.GPU
  1704. if node.Preemptible {
  1705. gpuCostStr = customPricingConfig.SpotGPU
  1706. }
  1707. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1708. if err != nil {
  1709. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1710. }
  1711. node.CostPerGPUHr = costPerGPUHr
  1712. node.Source += "/customGPU"
  1713. }
  1714. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1715. log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1716. ramCostStr := customPricingConfig.RAM
  1717. if node.Preemptible {
  1718. ramCostStr = customPricingConfig.SpotRAM
  1719. }
  1720. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1721. if err != nil {
  1722. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1723. }
  1724. node.CostPerRAMGiBHr = costPerRAMHr
  1725. node.Source += "/customRAM"
  1726. }
  1727. return node
  1728. }
  1729. // getCustomNodePricing converts the CostModel's configured custom pricing
  1730. // values into a NodePricing instance.
  1731. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1732. customPricingConfig, err := cm.Provider.GetConfig()
  1733. if err != nil {
  1734. return nil
  1735. }
  1736. cpuCostStr := customPricingConfig.CPU
  1737. gpuCostStr := customPricingConfig.GPU
  1738. ramCostStr := customPricingConfig.RAM
  1739. if spot {
  1740. cpuCostStr = customPricingConfig.SpotCPU
  1741. gpuCostStr = customPricingConfig.SpotGPU
  1742. ramCostStr = customPricingConfig.SpotRAM
  1743. }
  1744. node := &NodePricing{Source: "custom"}
  1745. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1746. if err != nil {
  1747. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1748. }
  1749. node.CostPerCPUHr = costPerCPUHr
  1750. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1751. if err != nil {
  1752. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1753. }
  1754. node.CostPerGPUHr = costPerGPUHr
  1755. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1756. if err != nil {
  1757. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1758. }
  1759. node.CostPerRAMGiBHr = costPerRAMHr
  1760. return node
  1761. }
  1762. // NodePricing describes the resource costs associated with a given node, as
  1763. // well as the source of the information (e.g. prometheus, custom)
  1764. type NodePricing struct {
  1765. Name string
  1766. NodeType string
  1767. ProviderID string
  1768. Preemptible bool
  1769. CostPerCPUHr float64
  1770. CostPerRAMGiBHr float64
  1771. CostPerGPUHr float64
  1772. Discount float64
  1773. Source string
  1774. }
  1775. // Pod describes a running pod's start and end time within a Window and
  1776. // all the Allocations (i.e. containers) contained within it.
  1777. type Pod struct {
  1778. Window kubecost.Window
  1779. Start time.Time
  1780. End time.Time
  1781. Key podKey
  1782. Allocations map[string]*kubecost.Allocation
  1783. }
  1784. // AppendContainer adds an entry for the given container name to the Pod.
  1785. func (p Pod) AppendContainer(container string) {
  1786. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1787. alloc := &kubecost.Allocation{
  1788. Name: name,
  1789. Properties: &kubecost.AllocationProperties{},
  1790. Window: p.Window.Clone(),
  1791. Start: p.Start,
  1792. End: p.End,
  1793. }
  1794. alloc.Properties.Container = container
  1795. alloc.Properties.Pod = p.Key.Pod
  1796. alloc.Properties.Namespace = p.Key.Namespace
  1797. alloc.Properties.Cluster = p.Key.Cluster
  1798. p.Allocations[container] = alloc
  1799. }
  1800. // PVC describes a PersistentVolumeClaim
  1801. // TODO:CLEANUP move to pkg/kubecost?
  1802. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1803. type PVC struct {
  1804. Bytes float64 `json:"bytes"`
  1805. Count int `json:"count"`
  1806. Name string `json:"name"`
  1807. Cluster string `json:"cluster"`
  1808. Namespace string `json:"namespace"`
  1809. Volume *PV `json:"persistentVolume"`
  1810. Mounted bool `json:"mounted"`
  1811. Start time.Time `json:"start"`
  1812. End time.Time `json:"end"`
  1813. }
  1814. // Cost computes the cumulative cost of the PVC
  1815. func (pvc *PVC) Cost() float64 {
  1816. if pvc == nil || pvc.Volume == nil {
  1817. return 0.0
  1818. }
  1819. gib := pvc.Bytes / 1024 / 1024 / 1024
  1820. hrs := pvc.Minutes() / 60.0
  1821. return pvc.Volume.CostPerGiBHour * gib * hrs
  1822. }
  1823. // Minutes computes the number of minutes over which the PVC is defined
  1824. func (pvc *PVC) Minutes() float64 {
  1825. if pvc == nil {
  1826. return 0.0
  1827. }
  1828. return pvc.End.Sub(pvc.Start).Minutes()
  1829. }
  1830. // String returns a string representation of the PVC
  1831. func (pvc *PVC) String() string {
  1832. if pvc == nil {
  1833. return "<nil>"
  1834. }
  1835. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  1836. }
  1837. // PV describes a PersistentVolume
  1838. // TODO:CLEANUP move to pkg/kubecost?
  1839. type PV struct {
  1840. Bytes float64 `json:"bytes"`
  1841. CostPerGiBHour float64 `json:"costPerGiBHour"`
  1842. Cluster string `json:"cluster"`
  1843. Name string `json:"name"`
  1844. StorageClass string `json:"storageClass"`
  1845. }
  1846. // String returns a string representation of the PV
  1847. func (pv *PV) String() string {
  1848. if pv == nil {
  1849. return "<nil>"
  1850. }
  1851. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  1852. }