allocation.go 77 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/util/timeutil"
  9. "github.com/kubecost/cost-model/pkg/cloud"
  10. "github.com/kubecost/cost-model/pkg/env"
  11. "github.com/kubecost/cost-model/pkg/kubecost"
  12. "github.com/kubecost/cost-model/pkg/log"
  13. "github.com/kubecost/cost-model/pkg/prom"
  14. "k8s.io/apimachinery/pkg/labels"
  15. "k8s.io/klog"
  16. )
  17. const (
  18. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]%s`
  19. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s, provider_id)`
  20. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  21. queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  22. queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  23. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  24. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  25. queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  26. queryFmtCPUUsageMax = `max(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  27. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  28. queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  29. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
  30. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
  31. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
  32. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
  33. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]%s`
  34. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, %s)`
  35. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
  36. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, %s)`
  37. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, %s)`
  38. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  39. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (%s)`
  40. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  41. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (%s)`
  42. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  43. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (%s)`
  44. queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s]%s)) by (pod_name, pod, namespace, %s)`
  45. queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s]%s)) by (pod_name, pod, namespace, %s)`
  46. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
  47. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
  48. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
  49. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
  50. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
  51. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
  52. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
  53. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, %s)`
  54. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,%s)`
  55. queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s]%s)) by (pod, owner_name, namespace ,%s)`
  56. queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s]%s)) by (replicaset, namespace, %s)`
  57. queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s]%s)) by (namespace, service_name, %s)`
  58. queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]%s`
  59. )
  60. // This is a bit of a hack to work around garbage data from cadvisor
  61. // Ideally you cap each pod to the max CPU on its node, but that involves a bit more complexity, as it it would need to be done when allocations joins with asset data.
  62. const MAX_CPU_CAP = 512
  63. // CanCompute should return true if CostModel can act as a valid source for the
  64. // given time range. In the case of CostModel we want to attempt to compute as
  65. // long as the range starts in the past. If the CostModel ends up not having
  66. // data to match, that's okay, and should be communicated with an error
  67. // response from ComputeAllocation.
  68. func (cm *CostModel) CanCompute(start, end time.Time) bool {
  69. return start.Before(time.Now())
  70. }
  71. // Name returns the name of the Source
  72. func (cm *CostModel) Name() string {
  73. return "CostModel"
  74. }
  75. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  76. // for the window defined by the given start and end times. The Allocations
  77. // returned are unaggregated (i.e. down to the container level).
  78. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  79. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  80. // 2. Run and apply the results of the remaining queries to
  81. // 3. Build out AllocationSet from completed Pod map
  82. // Create a window spanning the requested query
  83. window := kubecost.NewWindow(&start, &end)
  84. // Create an empty AllocationSet. For safety, in the case of an error, we
  85. // should prefer to return this empty set with the error. (In the case of
  86. // no error, of course we populate the set and return it.)
  87. allocSet := kubecost.NewAllocationSet(start, end)
  88. // (1) Build out Pod map
  89. // Build out a map of Allocations as a mapping from pod-to-container-to-
  90. // underlying-Allocation instance, starting with (start, end) so that we
  91. // begin with minutes, from which we compute resource allocation and cost
  92. // totals from measured rate data.
  93. podMap := map[podKey]*Pod{}
  94. // clusterStarts and clusterEnds record the earliest start and latest end
  95. // times, respectively, on a cluster-basis. These are used for unmounted
  96. // PVs and other "virtual" Allocations so that minutes are maximally
  97. // accurate during start-up or spin-down of a cluster
  98. clusterStart := map[string]time.Time{}
  99. clusterEnd := map[string]time.Time{}
  100. cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
  101. // (2) Run and apply remaining queries
  102. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  103. // including handling Thanos offset
  104. durStr, offStr, err := window.DurationOffsetForPrometheus()
  105. if err != nil {
  106. // Negative duration, so return empty set
  107. return allocSet, nil
  108. }
  109. // Convert resolution duration to a query-ready string
  110. resStr := timeutil.DurationString(resolution)
  111. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  112. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr, env.GetPromClusterLabel())
  113. resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
  114. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr, env.GetPromClusterLabel())
  115. resChRAMRequests := ctx.Query(queryRAMRequests)
  116. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, offStr, env.GetPromClusterLabel())
  117. resChRAMUsageAvg := ctx.Query(queryRAMUsageAvg)
  118. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, offStr, env.GetPromClusterLabel())
  119. resChRAMUsageMax := ctx.Query(queryRAMUsageMax)
  120. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr, env.GetPromClusterLabel())
  121. resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
  122. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr, env.GetPromClusterLabel())
  123. resChCPURequests := ctx.Query(queryCPURequests)
  124. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, offStr, env.GetPromClusterLabel())
  125. resChCPUUsageAvg := ctx.Query(queryCPUUsageAvg)
  126. queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, offStr, env.GetPromClusterLabel())
  127. resChCPUUsageMax := ctx.Query(queryCPUUsageMax)
  128. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr, env.GetPromClusterLabel())
  129. resChGPUsRequested := ctx.Query(queryGPUsRequested)
  130. queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, durStr, offStr, env.GetPromClusterLabel())
  131. resChGPUsAllocated := ctx.Query(queryGPUsAllocated)
  132. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr, env.GetPromClusterLabel())
  133. resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
  134. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr, env.GetPromClusterLabel())
  135. resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
  136. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr, env.GetPromClusterLabel())
  137. resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
  138. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
  139. resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
  140. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr, offStr)
  141. resChPVCInfo := ctx.Query(queryPVCInfo)
  142. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr, env.GetPromClusterLabel())
  143. resChPVBytes := ctx.Query(queryPVBytes)
  144. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr, env.GetPromClusterLabel())
  145. resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
  146. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr, env.GetPromClusterLabel())
  147. resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
  148. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr, env.GetPromClusterLabel())
  149. resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
  150. queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, durStr, offStr, env.GetPromClusterLabel())
  151. resChNetTransferBytes := ctx.Query(queryNetTransferBytes)
  152. queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, durStr, offStr, env.GetPromClusterLabel())
  153. resChNetReceiveBytes := ctx.Query(queryNetReceiveBytes)
  154. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr, env.GetPromClusterLabel())
  155. resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
  156. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
  157. resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
  158. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr, env.GetPromClusterLabel())
  159. resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
  160. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
  161. resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
  162. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr, env.GetPromClusterLabel())
  163. resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
  164. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
  165. resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
  166. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
  167. resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
  168. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
  169. resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
  170. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
  171. resChPodLabels := ctx.Query(queryPodLabels)
  172. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
  173. resChPodAnnotations := ctx.Query(queryPodAnnotations)
  174. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
  175. resChServiceLabels := ctx.Query(queryServiceLabels)
  176. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
  177. resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
  178. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
  179. resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
  180. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr, env.GetPromClusterLabel())
  181. resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
  182. queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, durStr, offStr, env.GetPromClusterLabel())
  183. resChPodsWithReplicaSetOwner := ctx.Query(queryPodsWithReplicaSetOwner)
  184. queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, durStr, offStr, env.GetPromClusterLabel())
  185. resChReplicaSetsWithoutOwners := ctx.Query(queryReplicaSetsWithoutOwners)
  186. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr, env.GetPromClusterLabel())
  187. resChJobLabels := ctx.Query(queryJobLabels)
  188. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, offStr, env.GetPromClusterLabel())
  189. resChLBCostPerHr := ctx.Query(queryLBCostPerHr)
  190. queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr, offStr)
  191. resChLBActiveMins := ctx.Query(queryLBActiveMins)
  192. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  193. resCPURequests, _ := resChCPURequests.Await()
  194. resCPUUsageAvg, _ := resChCPUUsageAvg.Await()
  195. resCPUUsageMax, _ := resChCPUUsageMax.Await()
  196. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  197. resRAMRequests, _ := resChRAMRequests.Await()
  198. resRAMUsageAvg, _ := resChRAMUsageAvg.Await()
  199. resRAMUsageMax, _ := resChRAMUsageMax.Await()
  200. resGPUsRequested, _ := resChGPUsRequested.Await()
  201. resGPUsAllocated, _ := resChGPUsAllocated.Await()
  202. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  203. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  204. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  205. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  206. resPVBytes, _ := resChPVBytes.Await()
  207. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  208. resPVCInfo, _ := resChPVCInfo.Await()
  209. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  210. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  211. resNetTransferBytes, _ := resChNetTransferBytes.Await()
  212. resNetReceiveBytes, _ := resChNetReceiveBytes.Await()
  213. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  214. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  215. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  216. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  217. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  218. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  219. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  220. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  221. resPodLabels, _ := resChPodLabels.Await()
  222. resPodAnnotations, _ := resChPodAnnotations.Await()
  223. resServiceLabels, _ := resChServiceLabels.Await()
  224. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  225. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  226. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  227. resPodsWithReplicaSetOwner, _ := resChPodsWithReplicaSetOwner.Await()
  228. resReplicaSetsWithoutOwners, _ := resChReplicaSetsWithoutOwners.Await()
  229. resJobLabels, _ := resChJobLabels.Await()
  230. resLBCostPerHr, _ := resChLBCostPerHr.Await()
  231. resLBActiveMins, _ := resChLBActiveMins.Await()
  232. if ctx.HasErrors() {
  233. for _, err := range ctx.Errors() {
  234. log.Errorf("CostModel.ComputeAllocation: %s", err)
  235. }
  236. return allocSet, ctx.ErrorCollection()
  237. }
  238. // We choose to apply allocation before requests in the cases of RAM and
  239. // CPU so that we can assert that allocation should always be greater than
  240. // or equal to request.
  241. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  242. applyCPUCoresRequested(podMap, resCPURequests)
  243. applyCPUCoresUsedAvg(podMap, resCPUUsageAvg)
  244. applyCPUCoresUsedMax(podMap, resCPUUsageMax)
  245. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  246. applyRAMBytesRequested(podMap, resRAMRequests)
  247. applyRAMBytesUsedAvg(podMap, resRAMUsageAvg)
  248. applyRAMBytesUsedMax(podMap, resRAMUsageMax)
  249. applyGPUsAllocated(podMap, resGPUsRequested, resGPUsAllocated)
  250. applyNetworkTotals(podMap, resNetTransferBytes, resNetReceiveBytes)
  251. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  252. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  253. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  254. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  255. podLabels := resToPodLabels(resPodLabels)
  256. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  257. podAnnotations := resToPodAnnotations(resPodAnnotations)
  258. applyLabels(podMap, namespaceLabels, podLabels)
  259. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  260. serviceLabels := getServiceLabels(resServiceLabels)
  261. allocsByService := map[serviceKey][]*kubecost.Allocation{}
  262. applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
  263. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  264. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  265. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  266. podJobMap := resToPodJobMap(resJobLabels)
  267. podReplicaSetMap := resToPodReplicaSetMap(resPodsWithReplicaSetOwner, resReplicaSetsWithoutOwners)
  268. applyControllersToPods(podMap, podDeploymentMap)
  269. applyControllersToPods(podMap, podStatefulSetMap)
  270. applyControllersToPods(podMap, podDaemonSetMap)
  271. applyControllersToPods(podMap, podJobMap)
  272. applyControllersToPods(podMap, podReplicaSetMap)
  273. // TODO breakdown network costs?
  274. // Build out a map of Nodes with resource costs, discounts, and node types
  275. // for converting resource allocation data to cumulative costs.
  276. nodeMap := map[nodeKey]*NodePricing{}
  277. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  278. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  279. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  280. applyNodeSpot(nodeMap, resNodeIsSpot)
  281. applyNodeDiscount(nodeMap, cm)
  282. // Build out the map of all PVs with class, size and cost-per-hour.
  283. // Note: this does not record time running, which we may want to
  284. // include later for increased PV precision. (As long as the PV has
  285. // a PVC, we get time running there, so this is only inaccurate
  286. // for short-lived, unmounted PVs.)
  287. pvMap := map[pvKey]*PV{}
  288. buildPVMap(pvMap, resPVCostPerGiBHour)
  289. applyPVBytes(pvMap, resPVBytes)
  290. // Build out the map of all PVCs with time running, bytes requested,
  291. // and connect to the correct PV from pvMap. (If no PV exists, that
  292. // is noted, but does not result in any allocation/cost.)
  293. pvcMap := map[pvcKey]*PVC{}
  294. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  295. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  296. // Build out the relationships of pods to their PVCs. This step
  297. // populates the PVC.Count field so that PVC allocation can be
  298. // split appropriately among each pod's container allocation.
  299. podPVCMap := map[podKey][]*PVC{}
  300. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  301. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  302. // cluster representing each cluster's unmounted PVs (if necessary).
  303. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  304. lbMap := getLoadBalancerCosts(resLBCostPerHr, resLBActiveMins, resolution)
  305. applyLoadBalancersToPods(lbMap, allocsByService)
  306. // (3) Build out AllocationSet from Pod map
  307. for _, pod := range podMap {
  308. for _, alloc := range pod.Allocations {
  309. cluster := alloc.Properties.Cluster
  310. nodeName := alloc.Properties.Node
  311. namespace := alloc.Properties.Namespace
  312. pod := alloc.Properties.Pod
  313. container := alloc.Properties.Container
  314. podKey := newPodKey(cluster, namespace, pod)
  315. nodeKey := newNodeKey(cluster, nodeName)
  316. node := cm.getNodePricing(nodeMap, nodeKey)
  317. alloc.Properties.ProviderID = node.ProviderID
  318. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  319. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  320. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  321. if pvcs, ok := podPVCMap[podKey]; ok {
  322. for _, pvc := range pvcs {
  323. // Determine the (start, end) of the relationship between the
  324. // given PVC and the associated Allocation so that a precise
  325. // number of hours can be used to compute cumulative cost.
  326. s, e := alloc.Start, alloc.End
  327. if pvc.Start.After(alloc.Start) {
  328. s = pvc.Start
  329. }
  330. if pvc.End.Before(alloc.End) {
  331. e = pvc.End
  332. }
  333. minutes := e.Sub(s).Minutes()
  334. hrs := minutes / 60.0
  335. count := float64(pvc.Count)
  336. if pvc.Count < 1 {
  337. count = 1
  338. }
  339. gib := pvc.Bytes / 1024 / 1024 / 1024
  340. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  341. // Apply the size and cost of the PV to the allocation, each
  342. // weighted by count (i.e. the number of containers in the pod)
  343. // record the amount of total PVBytes Hours attributable to a given PV
  344. if alloc.PVs == nil {
  345. alloc.PVs = kubecost.PVAllocations{}
  346. }
  347. pvKey := kubecost.PVKey{
  348. Cluster: pvc.Cluster,
  349. Name: pvc.Volume.Name,
  350. }
  351. alloc.PVs[pvKey] = &kubecost.PVAllocation{
  352. ByteHours: pvc.Bytes * hrs / count,
  353. Cost: cost / count,
  354. }
  355. }
  356. }
  357. // Make sure that the name is correct (node may not be present at this
  358. // point due to it missing from queryMinutes) then insert.
  359. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  360. allocSet.Set(alloc)
  361. }
  362. }
  363. return allocSet, nil
  364. }
  365. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  366. // Assumes that window is positive and closed
  367. start, end := *window.Start(), *window.End()
  368. // Convert resolution duration to a query-ready string
  369. resStr := timeutil.DurationString(resolution)
  370. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  371. // Query for (start, end) by (pod, namespace, cluster) over the given
  372. // window, using the given resolution, and if necessary in batches no
  373. // larger than the given maximum batch size. If working in batches, track
  374. // overall progress by starting with (window.start, window.start) and
  375. // querying in batches no larger than maxBatchSize from start-to-end,
  376. // folding each result set into podMap as the results come back.
  377. coverage := kubecost.NewWindow(&start, &start)
  378. numQuery := 1
  379. for coverage.End().Before(end) {
  380. // Determine the (start, end) of the current batch
  381. batchStart := *coverage.End()
  382. batchEnd := coverage.End().Add(maxBatchSize)
  383. if batchEnd.After(end) {
  384. batchEnd = end
  385. }
  386. batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
  387. var resPods []*prom.QueryResult
  388. var err error
  389. maxTries := 3
  390. numTries := 0
  391. for resPods == nil && numTries < maxTries {
  392. numTries++
  393. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  394. // including handling Thanos offset
  395. durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
  396. if err != nil || durStr == "" {
  397. // Negative duration, so set empty results and don't query
  398. resPods = []*prom.QueryResult{}
  399. err = nil
  400. break
  401. }
  402. // Submit and profile query
  403. queryPods := fmt.Sprintf(queryFmtPods, env.GetPromClusterLabel(), durStr, resStr, offStr)
  404. queryProfile := time.Now()
  405. resPods, err = ctx.Query(queryPods).Await()
  406. if err != nil {
  407. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  408. resPods = nil
  409. }
  410. }
  411. if err != nil {
  412. return err
  413. }
  414. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  415. coverage = coverage.ExpandEnd(batchEnd)
  416. numQuery++
  417. }
  418. return nil
  419. }
  420. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  421. for _, res := range resPods {
  422. if len(res.Values) == 0 {
  423. log.Warningf("CostModel.ComputeAllocation: empty minutes result")
  424. continue
  425. }
  426. cluster, err := res.GetString(env.GetPromClusterLabel())
  427. if err != nil {
  428. cluster = env.GetClusterID()
  429. }
  430. labels, err := res.GetStrings("namespace", "pod")
  431. if err != nil {
  432. log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  433. continue
  434. }
  435. namespace := labels["namespace"]
  436. pod := labels["pod"]
  437. key := newPodKey(cluster, namespace, pod)
  438. // allocStart and allocEnd are the timestamps of the first and last
  439. // minutes the pod was running, respectively. We subtract one resolution
  440. // from allocStart because this point will actually represent the end
  441. // of the first minute. We don't subtract from allocEnd because it
  442. // already represents the end of the last minute.
  443. var allocStart, allocEnd time.Time
  444. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  445. for _, datum := range res.Values {
  446. t := time.Unix(int64(datum.Timestamp), 0)
  447. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  448. // Set the start timestamp to the earliest non-zero timestamp
  449. allocStart = t
  450. // Record adjustment coefficient, i.e. the portion of the start
  451. // timestamp to "ignore". That is, sometimes the value will be
  452. // 0.5, meaning that we should discount the time running by
  453. // half of the resolution the timestamp stands for.
  454. startAdjustmentCoeff = (1.0 - datum.Value)
  455. }
  456. if datum.Value > 0 && window.Contains(t) {
  457. // Set the end timestamp to the latest non-zero timestamp
  458. allocEnd = t
  459. // Record adjustment coefficient, i.e. the portion of the end
  460. // timestamp to "ignore". (See explanation above for start.)
  461. endAdjustmentCoeff = (1.0 - datum.Value)
  462. }
  463. }
  464. if allocStart.IsZero() || allocEnd.IsZero() {
  465. continue
  466. }
  467. // Adjust timestamps according to the resolution and the adjustment
  468. // coefficients, as described above. That is, count the start timestamp
  469. // from the beginning of the resolution, not the end. Then "reduce" the
  470. // start and end by the correct amount, in the case that the "running"
  471. // value of the first or last timestamp was not a full 1.0.
  472. allocStart = allocStart.Add(-resolution)
  473. // Note: the *100 and /100 are necessary because Duration is an int, so
  474. // 0.5, for instance, will be truncated, resulting in no adjustment.
  475. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  476. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  477. // Ensure that the allocStart is always within the window, adjusting
  478. // for the occasions where start falls 1m before the query window.
  479. // NOTE: window here will always be closed (so no need to nil check
  480. // "start").
  481. // TODO:CLEANUP revisit query methodology to figure out why this is
  482. // happening on occasion
  483. if allocStart.Before(*window.Start()) {
  484. allocStart = *window.Start()
  485. }
  486. // If there is only one point with a value <= 0.5 that the start and
  487. // end timestamps both share, then we will enter this case because at
  488. // least half of a resolution will be subtracted from both the start
  489. // and the end. If that is the case, then add back half of each side
  490. // so that the pod is said to run for half a resolution total.
  491. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  492. // end up with allocEnd == allocStart and each coeff == 0.5. In
  493. // that case, add 0.25m to each side, resulting in 0.5m duration.
  494. if !allocEnd.After(allocStart) {
  495. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  496. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  497. }
  498. // Ensure that the allocEnf is always within the window, adjusting
  499. // for the occasions where end falls 1m after the query window. This
  500. // has not ever happened, but is symmetrical with the start check
  501. // above.
  502. // NOTE: window here will always be closed (so no need to nil check
  503. // "end").
  504. // TODO:CLEANUP revisit query methodology to figure out why this is
  505. // happening on occasion
  506. if allocEnd.After(*window.End()) {
  507. allocEnd = *window.End()
  508. }
  509. // Set start if unset or this datum's start time is earlier than the
  510. // current earliest time.
  511. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  512. clusterStart[cluster] = allocStart
  513. }
  514. // Set end if unset or this datum's end time is later than the
  515. // current latest time.
  516. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  517. clusterEnd[cluster] = allocEnd
  518. }
  519. if pod, ok := podMap[key]; ok {
  520. // Pod has already been recorded, so update it accordingly
  521. if allocStart.Before(pod.Start) {
  522. pod.Start = allocStart
  523. }
  524. if allocEnd.After(pod.End) {
  525. pod.End = allocEnd
  526. }
  527. } else {
  528. // Pod has not been recorded yet, so insert it
  529. podMap[key] = &Pod{
  530. Window: window.Clone(),
  531. Start: allocStart,
  532. End: allocEnd,
  533. Key: key,
  534. Allocations: map[string]*kubecost.Allocation{},
  535. }
  536. }
  537. }
  538. }
  539. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  540. for _, res := range resCPUCoresAllocated {
  541. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  542. if err != nil {
  543. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  544. continue
  545. }
  546. pod, ok := podMap[key]
  547. if !ok {
  548. continue
  549. }
  550. container, err := res.GetString("container")
  551. if err != nil {
  552. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  553. continue
  554. }
  555. if _, ok := pod.Allocations[container]; !ok {
  556. pod.AppendContainer(container)
  557. }
  558. cpuCores := res.Values[0].Value
  559. if cpuCores > MAX_CPU_CAP {
  560. klog.Infof("[WARNING] Very large cpu allocation, clamping to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
  561. cpuCores = 0.0
  562. }
  563. hours := pod.Allocations[container].Minutes() / 60.0
  564. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  565. node, err := res.GetString("node")
  566. if err != nil {
  567. log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  568. continue
  569. }
  570. pod.Allocations[container].Properties.Node = node
  571. }
  572. }
  573. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  574. for _, res := range resCPUCoresRequested {
  575. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  576. if err != nil {
  577. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  578. continue
  579. }
  580. pod, ok := podMap[key]
  581. if !ok {
  582. continue
  583. }
  584. container, err := res.GetString("container")
  585. if err != nil {
  586. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  587. continue
  588. }
  589. if _, ok := pod.Allocations[container]; !ok {
  590. pod.AppendContainer(container)
  591. }
  592. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  593. // If CPU allocation is less than requests, set CPUCoreHours to
  594. // request level.
  595. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  596. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  597. }
  598. if pod.Allocations[container].CPUCores() > MAX_CPU_CAP {
  599. klog.Infof("[WARNING] Very large cpu allocation, clamping! to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
  600. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  601. }
  602. node, err := res.GetString("node")
  603. if err != nil {
  604. log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  605. continue
  606. }
  607. pod.Allocations[container].Properties.Node = node
  608. }
  609. }
  610. func applyCPUCoresUsedAvg(podMap map[podKey]*Pod, resCPUCoresUsedAvg []*prom.QueryResult) {
  611. for _, res := range resCPUCoresUsedAvg {
  612. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  613. if err != nil {
  614. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg result missing field: %s", err)
  615. continue
  616. }
  617. pod, ok := podMap[key]
  618. if !ok {
  619. continue
  620. }
  621. container, err := res.GetString("container")
  622. if container == "" || err != nil {
  623. container, err = res.GetString("container_name")
  624. if err != nil {
  625. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg query result missing 'container': %s", key)
  626. continue
  627. }
  628. }
  629. if _, ok := pod.Allocations[container]; !ok {
  630. pod.AppendContainer(container)
  631. }
  632. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  633. if res.Values[0].Value > MAX_CPU_CAP {
  634. klog.Infof("[WARNING] Very large cpu USAGE, dropping outlier")
  635. pod.Allocations[container].CPUCoreUsageAverage = 0.0
  636. }
  637. }
  638. }
  639. func applyCPUCoresUsedMax(podMap map[podKey]*Pod, resCPUCoresUsedMax []*prom.QueryResult) {
  640. for _, res := range resCPUCoresUsedMax {
  641. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  642. if err != nil {
  643. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max result missing field: %s", err)
  644. continue
  645. }
  646. pod, ok := podMap[key]
  647. if !ok {
  648. continue
  649. }
  650. container, err := res.GetString("container")
  651. if container == "" || err != nil {
  652. container, err = res.GetString("container_name")
  653. if err != nil {
  654. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max query result missing 'container': %s", key)
  655. continue
  656. }
  657. }
  658. if _, ok := pod.Allocations[container]; !ok {
  659. pod.AppendContainer(container)
  660. }
  661. if pod.Allocations[container].RawAllocationOnly == nil {
  662. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  663. CPUCoreUsageMax: res.Values[0].Value,
  664. }
  665. } else {
  666. pod.Allocations[container].RawAllocationOnly.CPUCoreUsageMax = res.Values[0].Value
  667. }
  668. }
  669. }
  670. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  671. for _, res := range resRAMBytesAllocated {
  672. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  673. if err != nil {
  674. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  675. continue
  676. }
  677. pod, ok := podMap[key]
  678. if !ok {
  679. continue
  680. }
  681. container, err := res.GetString("container")
  682. if err != nil {
  683. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  684. continue
  685. }
  686. if _, ok := pod.Allocations[container]; !ok {
  687. pod.AppendContainer(container)
  688. }
  689. ramBytes := res.Values[0].Value
  690. hours := pod.Allocations[container].Minutes() / 60.0
  691. pod.Allocations[container].RAMByteHours = ramBytes * hours
  692. node, err := res.GetString("node")
  693. if err != nil {
  694. log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  695. continue
  696. }
  697. pod.Allocations[container].Properties.Node = node
  698. }
  699. }
  700. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  701. for _, res := range resRAMBytesRequested {
  702. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  703. if err != nil {
  704. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  705. continue
  706. }
  707. pod, ok := podMap[key]
  708. if !ok {
  709. continue
  710. }
  711. container, err := res.GetString("container")
  712. if err != nil {
  713. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  714. continue
  715. }
  716. if _, ok := pod.Allocations[container]; !ok {
  717. pod.AppendContainer(container)
  718. }
  719. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  720. // If RAM allocation is less than requests, set RAMByteHours to
  721. // request level.
  722. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  723. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  724. }
  725. node, err := res.GetString("node")
  726. if err != nil {
  727. log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  728. continue
  729. }
  730. pod.Allocations[container].Properties.Node = node
  731. }
  732. }
  733. func applyRAMBytesUsedAvg(podMap map[podKey]*Pod, resRAMBytesUsedAvg []*prom.QueryResult) {
  734. for _, res := range resRAMBytesUsedAvg {
  735. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  736. if err != nil {
  737. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM avg usage result missing field: %s", err)
  738. continue
  739. }
  740. pod, ok := podMap[key]
  741. if !ok {
  742. continue
  743. }
  744. container, err := res.GetString("container")
  745. if container == "" || err != nil {
  746. container, err = res.GetString("container_name")
  747. if err != nil {
  748. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage avg query result missing 'container': %s", key)
  749. continue
  750. }
  751. }
  752. if _, ok := pod.Allocations[container]; !ok {
  753. pod.AppendContainer(container)
  754. }
  755. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  756. }
  757. }
  758. func applyRAMBytesUsedMax(podMap map[podKey]*Pod, resRAMBytesUsedMax []*prom.QueryResult) {
  759. for _, res := range resRAMBytesUsedMax {
  760. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  761. if err != nil {
  762. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max result missing field: %s", err)
  763. continue
  764. }
  765. pod, ok := podMap[key]
  766. if !ok {
  767. continue
  768. }
  769. container, err := res.GetString("container")
  770. if container == "" || err != nil {
  771. container, err = res.GetString("container_name")
  772. if err != nil {
  773. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max query result missing 'container': %s", key)
  774. continue
  775. }
  776. }
  777. if _, ok := pod.Allocations[container]; !ok {
  778. pod.AppendContainer(container)
  779. }
  780. if pod.Allocations[container].RawAllocationOnly == nil {
  781. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  782. RAMBytesUsageMax: res.Values[0].Value,
  783. }
  784. } else {
  785. pod.Allocations[container].RawAllocationOnly.RAMBytesUsageMax = res.Values[0].Value
  786. }
  787. }
  788. }
  789. func applyGPUsAllocated(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult, resGPUsAllocated []*prom.QueryResult) {
  790. if len(resGPUsAllocated) > 0 { // Use the new query, when it's become available in a window
  791. resGPUsRequested = resGPUsAllocated
  792. }
  793. for _, res := range resGPUsRequested {
  794. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  795. if err != nil {
  796. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  797. continue
  798. }
  799. pod, ok := podMap[key]
  800. if !ok {
  801. continue
  802. }
  803. container, err := res.GetString("container")
  804. if err != nil {
  805. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  806. continue
  807. }
  808. if _, ok := pod.Allocations[container]; !ok {
  809. pod.AppendContainer(container)
  810. }
  811. hrs := pod.Allocations[container].Minutes() / 60.0
  812. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  813. }
  814. }
  815. func applyNetworkTotals(podMap map[podKey]*Pod, resNetworkTransferBytes []*prom.QueryResult, resNetworkReceiveBytes []*prom.QueryResult) {
  816. for _, res := range resNetworkTransferBytes {
  817. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  818. if err != nil {
  819. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Transfer Bytes query result missing field: %s", err)
  820. continue
  821. }
  822. pod, ok := podMap[podKey]
  823. if !ok {
  824. continue
  825. }
  826. for _, alloc := range pod.Allocations {
  827. alloc.NetworkTransferBytes = res.Values[0].Value / float64(len(pod.Allocations))
  828. }
  829. }
  830. for _, res := range resNetworkReceiveBytes {
  831. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  832. if err != nil {
  833. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Receive Bytes query result missing field: %s", err)
  834. continue
  835. }
  836. pod, ok := podMap[podKey]
  837. if !ok {
  838. continue
  839. }
  840. for _, alloc := range pod.Allocations {
  841. alloc.NetworkReceiveBytes = res.Values[0].Value / float64(len(pod.Allocations))
  842. }
  843. }
  844. }
  845. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  846. costPerGiBByCluster := map[string]float64{}
  847. for _, res := range resNetworkCostPerGiB {
  848. cluster, err := res.GetString(env.GetPromClusterLabel())
  849. if err != nil {
  850. cluster = env.GetClusterID()
  851. }
  852. costPerGiBByCluster[cluster] = res.Values[0].Value
  853. }
  854. for _, res := range resNetworkGiB {
  855. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  856. if err != nil {
  857. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  858. continue
  859. }
  860. pod, ok := podMap[podKey]
  861. if !ok {
  862. continue
  863. }
  864. for _, alloc := range pod.Allocations {
  865. gib := res.Values[0].Value / float64(len(pod.Allocations))
  866. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  867. alloc.NetworkCost = gib * costPerGiB
  868. }
  869. }
  870. }
  871. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
  872. namespaceLabels := map[namespaceKey]map[string]string{}
  873. for _, res := range resNamespaceLabels {
  874. nsKey, err := resultNamespaceKey(res, env.GetPromClusterLabel(), "namespace")
  875. if err != nil {
  876. continue
  877. }
  878. if _, ok := namespaceLabels[nsKey]; !ok {
  879. namespaceLabels[nsKey] = map[string]string{}
  880. }
  881. for k, l := range res.GetLabels() {
  882. namespaceLabels[nsKey][k] = l
  883. }
  884. }
  885. return namespaceLabels
  886. }
  887. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  888. podLabels := map[podKey]map[string]string{}
  889. for _, res := range resPodLabels {
  890. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  891. if err != nil {
  892. continue
  893. }
  894. if _, ok := podLabels[podKey]; !ok {
  895. podLabels[podKey] = map[string]string{}
  896. }
  897. for k, l := range res.GetLabels() {
  898. podLabels[podKey][k] = l
  899. }
  900. }
  901. return podLabels
  902. }
  903. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  904. namespaceAnnotations := map[string]map[string]string{}
  905. for _, res := range resNamespaceAnnotations {
  906. namespace, err := res.GetString("namespace")
  907. if err != nil {
  908. continue
  909. }
  910. if _, ok := namespaceAnnotations[namespace]; !ok {
  911. namespaceAnnotations[namespace] = map[string]string{}
  912. }
  913. for k, l := range res.GetAnnotations() {
  914. namespaceAnnotations[namespace][k] = l
  915. }
  916. }
  917. return namespaceAnnotations
  918. }
  919. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  920. podAnnotations := map[podKey]map[string]string{}
  921. for _, res := range resPodAnnotations {
  922. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  923. if err != nil {
  924. continue
  925. }
  926. if _, ok := podAnnotations[podKey]; !ok {
  927. podAnnotations[podKey] = map[string]string{}
  928. }
  929. for k, l := range res.GetAnnotations() {
  930. podAnnotations[podKey][k] = l
  931. }
  932. }
  933. return podAnnotations
  934. }
  935. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
  936. for podKey, pod := range podMap {
  937. for _, alloc := range pod.Allocations {
  938. allocLabels := alloc.Properties.Labels
  939. if allocLabels == nil {
  940. allocLabels = make(map[string]string)
  941. }
  942. // Apply namespace labels first, then pod labels so that pod labels
  943. // overwrite namespace labels.
  944. nsKey := podKey.namespaceKey // newNamespaceKey(podKey.Cluster, podKey.Namespace)
  945. if labels, ok := namespaceLabels[nsKey]; ok {
  946. for k, v := range labels {
  947. allocLabels[k] = v
  948. }
  949. }
  950. if labels, ok := podLabels[podKey]; ok {
  951. for k, v := range labels {
  952. allocLabels[k] = v
  953. }
  954. }
  955. alloc.Properties.Labels = allocLabels
  956. }
  957. }
  958. }
  959. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  960. for key, pod := range podMap {
  961. for _, alloc := range pod.Allocations {
  962. allocAnnotations := alloc.Properties.Annotations
  963. if allocAnnotations == nil {
  964. allocAnnotations = make(map[string]string)
  965. }
  966. // Apply namespace annotations first, then pod annotations so that
  967. // pod labels overwrite namespace labels.
  968. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  969. for k, v := range labels {
  970. allocAnnotations[k] = v
  971. }
  972. }
  973. if labels, ok := podAnnotations[key]; ok {
  974. for k, v := range labels {
  975. allocAnnotations[k] = v
  976. }
  977. }
  978. alloc.Properties.Annotations = allocAnnotations
  979. }
  980. }
  981. }
  982. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  983. serviceLabels := map[serviceKey]map[string]string{}
  984. for _, res := range resServiceLabels {
  985. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service")
  986. if err != nil {
  987. continue
  988. }
  989. if _, ok := serviceLabels[serviceKey]; !ok {
  990. serviceLabels[serviceKey] = map[string]string{}
  991. }
  992. for k, l := range res.GetLabels() {
  993. serviceLabels[serviceKey][k] = l
  994. }
  995. }
  996. // Prune duplicate services. That is, if the same service exists with
  997. // hyphens instead of underscores, keep the one that uses hyphens.
  998. for key := range serviceLabels {
  999. if strings.Contains(key.Service, "_") {
  1000. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  1001. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  1002. if _, ok := serviceLabels[duplicateKey]; ok {
  1003. delete(serviceLabels, key)
  1004. }
  1005. }
  1006. }
  1007. return serviceLabels
  1008. }
  1009. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  1010. deploymentLabels := map[controllerKey]map[string]string{}
  1011. for _, res := range resDeploymentLabels {
  1012. controllerKey, err := resultDeploymentKey(res, env.GetPromClusterLabel(), "namespace", "deployment")
  1013. if err != nil {
  1014. continue
  1015. }
  1016. if _, ok := deploymentLabels[controllerKey]; !ok {
  1017. deploymentLabels[controllerKey] = map[string]string{}
  1018. }
  1019. for k, l := range res.GetLabels() {
  1020. deploymentLabels[controllerKey][k] = l
  1021. }
  1022. }
  1023. // Prune duplicate deployments. That is, if the same deployment exists with
  1024. // hyphens instead of underscores, keep the one that uses hyphens.
  1025. for key := range deploymentLabels {
  1026. if strings.Contains(key.Controller, "_") {
  1027. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  1028. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  1029. if _, ok := deploymentLabels[duplicateKey]; ok {
  1030. delete(deploymentLabels, key)
  1031. }
  1032. }
  1033. }
  1034. return deploymentLabels
  1035. }
  1036. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  1037. statefulSetLabels := map[controllerKey]map[string]string{}
  1038. for _, res := range resStatefulSetLabels {
  1039. controllerKey, err := resultStatefulSetKey(res, env.GetPromClusterLabel(), "namespace", "statefulSet")
  1040. if err != nil {
  1041. continue
  1042. }
  1043. if _, ok := statefulSetLabels[controllerKey]; !ok {
  1044. statefulSetLabels[controllerKey] = map[string]string{}
  1045. }
  1046. for k, l := range res.GetLabels() {
  1047. statefulSetLabels[controllerKey][k] = l
  1048. }
  1049. }
  1050. // Prune duplicate stateful sets. That is, if the same stateful set exists
  1051. // with hyphens instead of underscores, keep the one that uses hyphens.
  1052. for key := range statefulSetLabels {
  1053. if strings.Contains(key.Controller, "_") {
  1054. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  1055. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  1056. if _, ok := statefulSetLabels[duplicateKey]; ok {
  1057. delete(statefulSetLabels, key)
  1058. }
  1059. }
  1060. }
  1061. return statefulSetLabels
  1062. }
  1063. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  1064. podControllerMap := map[podKey]controllerKey{}
  1065. // For each controller, turn the labels into a selector and attempt to
  1066. // match it with each set of pod labels. A match indicates that the pod
  1067. // belongs to the controller.
  1068. for cKey, cLabels := range controllerLabels {
  1069. selector := labels.Set(cLabels).AsSelectorPreValidated()
  1070. for pKey, pLabels := range podLabels {
  1071. // If the pod is in a different cluster or namespace, there is
  1072. // no need to compare the labels.
  1073. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  1074. continue
  1075. }
  1076. podLabelSet := labels.Set(pLabels)
  1077. if selector.Matches(podLabelSet) {
  1078. if _, ok := podControllerMap[pKey]; ok {
  1079. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  1080. }
  1081. podControllerMap[pKey] = cKey
  1082. }
  1083. }
  1084. }
  1085. return podControllerMap
  1086. }
  1087. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  1088. daemonSetLabels := map[podKey]controllerKey{}
  1089. for _, res := range resDaemonSetLabels {
  1090. controllerKey, err := resultDaemonSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1091. if err != nil {
  1092. continue
  1093. }
  1094. pod, err := res.GetString("pod")
  1095. if err != nil {
  1096. log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  1097. }
  1098. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1099. daemonSetLabels[podKey] = controllerKey
  1100. }
  1101. return daemonSetLabels
  1102. }
  1103. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  1104. jobLabels := map[podKey]controllerKey{}
  1105. for _, res := range resJobLabels {
  1106. controllerKey, err := resultJobKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1107. if err != nil {
  1108. continue
  1109. }
  1110. // Convert the name of Jobs generated by CronJobs to the name of the
  1111. // CronJob by stripping the timestamp off the end.
  1112. match := isCron.FindStringSubmatch(controllerKey.Controller)
  1113. if match != nil {
  1114. controllerKey.Controller = match[1]
  1115. }
  1116. pod, err := res.GetString("pod")
  1117. if err != nil {
  1118. log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  1119. }
  1120. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1121. jobLabels[podKey] = controllerKey
  1122. }
  1123. return jobLabels
  1124. }
  1125. func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*prom.QueryResult, resReplicaSetsWithoutOwners []*prom.QueryResult) map[podKey]controllerKey {
  1126. // Build out set of ReplicaSets that have no owners, themselves, such that
  1127. // the ReplicaSet should be used as the owner of the Pods it controls.
  1128. // (This should exclude, for example, ReplicaSets that are controlled by
  1129. // Deployments, in which case the Deployment should be the Pod's owner.)
  1130. replicaSets := map[controllerKey]struct{}{}
  1131. for _, res := range resReplicaSetsWithoutOwners {
  1132. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "replicaset")
  1133. if err != nil {
  1134. continue
  1135. }
  1136. replicaSets[controllerKey] = struct{}{}
  1137. }
  1138. // Create the mapping of Pods to ReplicaSets, ignoring any ReplicaSets that
  1139. // to not appear in the set of uncontrolled ReplicaSets above.
  1140. podToReplicaSet := map[podKey]controllerKey{}
  1141. for _, res := range resPodsWithReplicaSetOwner {
  1142. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1143. if err != nil {
  1144. continue
  1145. }
  1146. if _, ok := replicaSets[controllerKey]; !ok {
  1147. continue
  1148. }
  1149. pod, err := res.GetString("pod")
  1150. if err != nil {
  1151. log.Warningf("CostModel.ComputeAllocation: ReplicaSet result without pod: %s", controllerKey)
  1152. }
  1153. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1154. podToReplicaSet[podKey] = controllerKey
  1155. }
  1156. return podToReplicaSet
  1157. }
  1158. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, allocsByService map[serviceKey][]*kubecost.Allocation, serviceLabels map[serviceKey]map[string]string) {
  1159. podServicesMap := map[podKey][]serviceKey{}
  1160. // For each service, turn the labels into a selector and attempt to
  1161. // match it with each set of pod labels. A match indicates that the pod
  1162. // belongs to the service.
  1163. for sKey, sLabels := range serviceLabels {
  1164. selector := labels.Set(sLabels).AsSelectorPreValidated()
  1165. for pKey, pLabels := range podLabels {
  1166. // If the pod is in a different cluster or namespace, there is
  1167. // no need to compare the labels.
  1168. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  1169. continue
  1170. }
  1171. podLabelSet := labels.Set(pLabels)
  1172. if selector.Matches(podLabelSet) {
  1173. if _, ok := podServicesMap[pKey]; !ok {
  1174. podServicesMap[pKey] = []serviceKey{}
  1175. }
  1176. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  1177. }
  1178. }
  1179. }
  1180. // For each allocation in each pod, attempt to find and apply the list of
  1181. // services associated with the allocation's pod.
  1182. for key, pod := range podMap {
  1183. for _, alloc := range pod.Allocations {
  1184. if sKeys, ok := podServicesMap[key]; ok {
  1185. services := []string{}
  1186. for _, sKey := range sKeys {
  1187. services = append(services, sKey.Service)
  1188. allocsByService[sKey] = append(allocsByService[sKey], alloc)
  1189. }
  1190. alloc.Properties.Services = services
  1191. }
  1192. }
  1193. }
  1194. }
  1195. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  1196. for key, pod := range podMap {
  1197. for _, alloc := range pod.Allocations {
  1198. if controllerKey, ok := podControllerMap[key]; ok {
  1199. alloc.Properties.ControllerKind = controllerKey.ControllerKind
  1200. alloc.Properties.Controller = controllerKey.Controller
  1201. }
  1202. }
  1203. }
  1204. }
  1205. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  1206. for _, res := range resNodeCostPerCPUHr {
  1207. cluster, err := res.GetString(env.GetPromClusterLabel())
  1208. if err != nil {
  1209. cluster = env.GetClusterID()
  1210. }
  1211. node, err := res.GetString("node")
  1212. if err != nil {
  1213. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1214. continue
  1215. }
  1216. instanceType, err := res.GetString("instance_type")
  1217. if err != nil {
  1218. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1219. continue
  1220. }
  1221. providerID, err := res.GetString("provider_id")
  1222. if err != nil {
  1223. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1224. continue
  1225. }
  1226. key := newNodeKey(cluster, node)
  1227. if _, ok := nodeMap[key]; !ok {
  1228. nodeMap[key] = &NodePricing{
  1229. Name: node,
  1230. NodeType: instanceType,
  1231. ProviderID: cloud.ParseID(providerID),
  1232. }
  1233. }
  1234. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  1235. }
  1236. }
  1237. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  1238. for _, res := range resNodeCostPerRAMGiBHr {
  1239. cluster, err := res.GetString(env.GetPromClusterLabel())
  1240. if err != nil {
  1241. cluster = env.GetClusterID()
  1242. }
  1243. node, err := res.GetString("node")
  1244. if err != nil {
  1245. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1246. continue
  1247. }
  1248. instanceType, err := res.GetString("instance_type")
  1249. if err != nil {
  1250. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1251. continue
  1252. }
  1253. providerID, err := res.GetString("provider_id")
  1254. if err != nil {
  1255. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1256. continue
  1257. }
  1258. key := newNodeKey(cluster, node)
  1259. if _, ok := nodeMap[key]; !ok {
  1260. nodeMap[key] = &NodePricing{
  1261. Name: node,
  1262. NodeType: instanceType,
  1263. ProviderID: cloud.ParseID(providerID),
  1264. }
  1265. }
  1266. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1267. }
  1268. }
  1269. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1270. for _, res := range resNodeCostPerGPUHr {
  1271. cluster, err := res.GetString(env.GetPromClusterLabel())
  1272. if err != nil {
  1273. cluster = env.GetClusterID()
  1274. }
  1275. node, err := res.GetString("node")
  1276. if err != nil {
  1277. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1278. continue
  1279. }
  1280. instanceType, err := res.GetString("instance_type")
  1281. if err != nil {
  1282. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1283. continue
  1284. }
  1285. providerID, err := res.GetString("provider_id")
  1286. if err != nil {
  1287. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1288. continue
  1289. }
  1290. key := newNodeKey(cluster, node)
  1291. if _, ok := nodeMap[key]; !ok {
  1292. nodeMap[key] = &NodePricing{
  1293. Name: node,
  1294. NodeType: instanceType,
  1295. ProviderID: cloud.ParseID(providerID),
  1296. }
  1297. }
  1298. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1299. }
  1300. }
  1301. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1302. for _, res := range resNodeIsSpot {
  1303. cluster, err := res.GetString(env.GetPromClusterLabel())
  1304. if err != nil {
  1305. cluster = env.GetClusterID()
  1306. }
  1307. node, err := res.GetString("node")
  1308. if err != nil {
  1309. log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1310. continue
  1311. }
  1312. key := newNodeKey(cluster, node)
  1313. if _, ok := nodeMap[key]; !ok {
  1314. log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1315. continue
  1316. }
  1317. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1318. }
  1319. }
  1320. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1321. if cm == nil {
  1322. return
  1323. }
  1324. c, err := cm.Provider.GetConfig()
  1325. if err != nil {
  1326. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1327. return
  1328. }
  1329. discount, err := ParsePercentString(c.Discount)
  1330. if err != nil {
  1331. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1332. return
  1333. }
  1334. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1335. if err != nil {
  1336. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1337. return
  1338. }
  1339. for _, node := range nodeMap {
  1340. // TODO GKE Reserved Instances into account
  1341. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1342. node.CostPerCPUHr *= (1.0 - node.Discount)
  1343. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1344. }
  1345. }
  1346. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1347. for _, res := range resPVCostPerGiBHour {
  1348. cluster, err := res.GetString(env.GetPromClusterLabel())
  1349. if err != nil {
  1350. cluster = env.GetClusterID()
  1351. }
  1352. name, err := res.GetString("volumename")
  1353. if err != nil {
  1354. log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
  1355. continue
  1356. }
  1357. key := newPVKey(cluster, name)
  1358. pvMap[key] = &PV{
  1359. Cluster: cluster,
  1360. Name: name,
  1361. CostPerGiBHour: res.Values[0].Value,
  1362. }
  1363. }
  1364. }
  1365. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1366. for _, res := range resPVBytes {
  1367. key, err := resultPVKey(res, env.GetPromClusterLabel(), "persistentvolume")
  1368. if err != nil {
  1369. log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1370. continue
  1371. }
  1372. if _, ok := pvMap[key]; !ok {
  1373. log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1374. continue
  1375. }
  1376. pvMap[key].Bytes = res.Values[0].Value
  1377. }
  1378. }
  1379. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1380. for _, res := range resPVCInfo {
  1381. cluster, err := res.GetString(env.GetPromClusterLabel())
  1382. if err != nil {
  1383. cluster = env.GetClusterID()
  1384. }
  1385. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1386. if err != nil {
  1387. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1388. continue
  1389. }
  1390. namespace := values["namespace"]
  1391. name := values["persistentvolumeclaim"]
  1392. volume := values["volumename"]
  1393. storageClass := values["storageclass"]
  1394. pvKey := newPVKey(cluster, volume)
  1395. pvcKey := newPVCKey(cluster, namespace, name)
  1396. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1397. // the PVC was running, respectively. We subtract 1m from pvcStart
  1398. // because this point will actually represent the end of the first
  1399. // minute. We don't subtract from pvcEnd because it already represents
  1400. // the end of the last minute.
  1401. var pvcStart, pvcEnd time.Time
  1402. for _, datum := range res.Values {
  1403. t := time.Unix(int64(datum.Timestamp), 0)
  1404. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1405. pvcStart = t
  1406. }
  1407. if datum.Value > 0 && window.Contains(t) {
  1408. pvcEnd = t
  1409. }
  1410. }
  1411. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1412. log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1413. }
  1414. pvcStart = pvcStart.Add(-time.Minute)
  1415. if _, ok := pvMap[pvKey]; !ok {
  1416. continue
  1417. }
  1418. pvMap[pvKey].StorageClass = storageClass
  1419. if _, ok := pvcMap[pvcKey]; !ok {
  1420. pvcMap[pvcKey] = &PVC{}
  1421. }
  1422. pvcMap[pvcKey].Name = name
  1423. pvcMap[pvcKey].Namespace = namespace
  1424. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1425. pvcMap[pvcKey].Start = pvcStart
  1426. pvcMap[pvcKey].End = pvcEnd
  1427. }
  1428. }
  1429. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1430. for _, res := range resPVCBytesRequested {
  1431. key, err := resultPVCKey(res, env.GetPromClusterLabel(), "namespace", "persistentvolumeclaim")
  1432. if err != nil {
  1433. continue
  1434. }
  1435. if _, ok := pvcMap[key]; !ok {
  1436. continue
  1437. }
  1438. pvcMap[key].Bytes = res.Values[0].Value
  1439. }
  1440. }
  1441. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1442. for _, res := range resPodPVCAllocation {
  1443. cluster, err := res.GetString(env.GetPromClusterLabel())
  1444. if err != nil {
  1445. cluster = env.GetClusterID()
  1446. }
  1447. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1448. if err != nil {
  1449. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1450. continue
  1451. }
  1452. namespace := values["namespace"]
  1453. pod := values["pod"]
  1454. name := values["persistentvolumeclaim"]
  1455. volume := values["persistentvolume"]
  1456. podKey := newPodKey(cluster, namespace, pod)
  1457. pvKey := newPVKey(cluster, volume)
  1458. pvcKey := newPVCKey(cluster, namespace, name)
  1459. if _, ok := pvMap[pvKey]; !ok {
  1460. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1461. continue
  1462. }
  1463. if _, ok := podPVCMap[podKey]; !ok {
  1464. podPVCMap[podKey] = []*PVC{}
  1465. }
  1466. pvc, ok := pvcMap[pvcKey]
  1467. if !ok {
  1468. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1469. continue
  1470. }
  1471. count := 1
  1472. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1473. count = len(pod.Allocations)
  1474. } else {
  1475. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1476. }
  1477. pvc.Count = count
  1478. pvc.Mounted = true
  1479. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1480. }
  1481. }
  1482. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1483. unmountedPVBytes := map[string]float64{}
  1484. unmountedPVCost := map[string]float64{}
  1485. for _, pv := range pvMap {
  1486. mounted := false
  1487. for _, pvc := range pvcMap {
  1488. if pvc.Volume == nil {
  1489. continue
  1490. }
  1491. if pvc.Volume == pv {
  1492. mounted = true
  1493. break
  1494. }
  1495. }
  1496. if !mounted {
  1497. gib := pv.Bytes / 1024 / 1024 / 1024
  1498. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1499. cost := pv.CostPerGiBHour * gib * hrs
  1500. unmountedPVCost[pv.Cluster] += cost
  1501. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1502. }
  1503. }
  1504. for cluster, amount := range unmountedPVCost {
  1505. container := kubecost.UnmountedSuffix
  1506. pod := kubecost.UnmountedSuffix
  1507. namespace := kubecost.UnmountedSuffix
  1508. node := ""
  1509. key := newPodKey(cluster, namespace, pod)
  1510. podMap[key] = &Pod{
  1511. Window: window.Clone(),
  1512. Start: *window.Start(),
  1513. End: *window.End(),
  1514. Key: key,
  1515. Allocations: map[string]*kubecost.Allocation{},
  1516. }
  1517. podMap[key].AppendContainer(container)
  1518. podMap[key].Allocations[container].Properties.Cluster = cluster
  1519. podMap[key].Allocations[container].Properties.Node = node
  1520. podMap[key].Allocations[container].Properties.Namespace = namespace
  1521. podMap[key].Allocations[container].Properties.Pod = pod
  1522. podMap[key].Allocations[container].Properties.Container = container
  1523. pvKey := kubecost.PVKey{
  1524. Cluster: cluster,
  1525. Name: kubecost.UnmountedSuffix,
  1526. }
  1527. unmountedPVs := kubecost.PVAllocations{
  1528. pvKey: {
  1529. ByteHours: unmountedPVBytes[cluster] * window.Minutes() / 60.0,
  1530. Cost: amount,
  1531. },
  1532. }
  1533. podMap[key].Allocations[container].PVs = podMap[key].Allocations[container].PVs.Add(unmountedPVs)
  1534. }
  1535. }
  1536. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1537. unmountedPVCBytes := map[namespaceKey]float64{}
  1538. unmountedPVCCost := map[namespaceKey]float64{}
  1539. for _, pvc := range pvcMap {
  1540. if !pvc.Mounted && pvc.Volume != nil {
  1541. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1542. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1543. hrs := pvc.Minutes() / 60.0
  1544. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1545. unmountedPVCCost[key] += cost
  1546. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1547. }
  1548. }
  1549. for key, amount := range unmountedPVCCost {
  1550. container := kubecost.UnmountedSuffix
  1551. pod := kubecost.UnmountedSuffix
  1552. namespace := key.Namespace
  1553. node := ""
  1554. cluster := key.Cluster
  1555. podKey := newPodKey(cluster, namespace, pod)
  1556. podMap[podKey] = &Pod{
  1557. Window: window.Clone(),
  1558. Start: *window.Start(),
  1559. End: *window.End(),
  1560. Key: podKey,
  1561. Allocations: map[string]*kubecost.Allocation{},
  1562. }
  1563. podMap[podKey].AppendContainer(container)
  1564. podMap[podKey].Allocations[container].Properties.Cluster = cluster
  1565. podMap[podKey].Allocations[container].Properties.Node = node
  1566. podMap[podKey].Allocations[container].Properties.Namespace = namespace
  1567. podMap[podKey].Allocations[container].Properties.Pod = pod
  1568. podMap[podKey].Allocations[container].Properties.Container = container
  1569. pvKey := kubecost.PVKey{
  1570. Cluster: cluster,
  1571. Name: kubecost.UnmountedSuffix,
  1572. }
  1573. unmountedPVs := kubecost.PVAllocations{
  1574. pvKey: {
  1575. ByteHours: unmountedPVCBytes[key] * window.Minutes() / 60.0,
  1576. Cost: amount,
  1577. },
  1578. }
  1579. podMap[podKey].Allocations[container].PVs = podMap[podKey].Allocations[container].PVs.Add(unmountedPVs)
  1580. }
  1581. }
  1582. // LB describes the start and end time of a Load Balancer along with cost
  1583. type LB struct {
  1584. TotalCost float64
  1585. Start time.Time
  1586. End time.Time
  1587. }
  1588. func getLoadBalancerCosts(resLBCost, resLBActiveMins []*prom.QueryResult, resolution time.Duration) map[serviceKey]*LB {
  1589. lbMap := make(map[serviceKey]*LB)
  1590. lbHourlyCosts := make(map[serviceKey]float64)
  1591. for _, res := range resLBCost {
  1592. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  1593. if err != nil {
  1594. continue
  1595. }
  1596. lbHourlyCosts[serviceKey] = res.Values[0].Value
  1597. }
  1598. for _, res := range resLBActiveMins {
  1599. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  1600. if err != nil || len(res.Values) == 0 {
  1601. continue
  1602. }
  1603. if _, ok := lbHourlyCosts[serviceKey]; !ok {
  1604. log.Warningf("CostModel: failed to find hourly cost for Load Balancer: %v", serviceKey)
  1605. continue
  1606. }
  1607. s := time.Unix(int64(res.Values[0].Timestamp), 0)
  1608. // subtract resolution from start time to cover full time period
  1609. s = s.Add(-resolution)
  1610. e := time.Unix(int64(res.Values[len(res.Values)-1].Timestamp), 0)
  1611. hours := e.Sub(s).Hours()
  1612. lbMap[serviceKey] = &LB{
  1613. TotalCost: lbHourlyCosts[serviceKey] * hours,
  1614. Start: s,
  1615. End: e,
  1616. }
  1617. }
  1618. return lbMap
  1619. }
  1620. func applyLoadBalancersToPods(lbMap map[serviceKey]*LB, allocsByService map[serviceKey][]*kubecost.Allocation) {
  1621. for sKey, lb := range lbMap {
  1622. totalHours := 0.0
  1623. allocHours := make(map[*kubecost.Allocation]float64)
  1624. // Add portion of load balancing cost to each allocation
  1625. // proportional to the total number of hours allocations used the load balancer
  1626. for _, alloc := range allocsByService[sKey] {
  1627. // Determine the (start, end) of the relationship between the
  1628. // given LB and the associated Allocation so that a precise
  1629. // number of hours can be used to compute cumulative cost.
  1630. s, e := alloc.Start, alloc.End
  1631. if lb.Start.After(alloc.Start) {
  1632. s = lb.Start
  1633. }
  1634. if lb.End.Before(alloc.End) {
  1635. e = lb.End
  1636. }
  1637. hours := e.Sub(s).Hours()
  1638. // A negative number of hours signifies no overlap between the windows
  1639. if hours > 0 {
  1640. totalHours += hours
  1641. allocHours[alloc] = hours
  1642. }
  1643. }
  1644. // Distribute cost of service once total hours is calculated
  1645. for alloc, hours := range allocHours {
  1646. alloc.LoadBalancerCost += lb.TotalCost * hours / totalHours
  1647. }
  1648. }
  1649. }
  1650. // getNodePricing determines node pricing, given a key and a mapping from keys
  1651. // to their NodePricing instances, as well as the custom pricing configuration
  1652. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1653. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1654. // back on custom pricing as a default.
  1655. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1656. // Find the relevant NodePricing, if it exists. If not, substitute the
  1657. // custom NodePricing as a default.
  1658. node, ok := nodeMap[nodeKey]
  1659. if !ok || node == nil {
  1660. if nodeKey.Node != "" {
  1661. log.DedupedWarningf(5, "CostModel: failed to find node for %s", nodeKey)
  1662. }
  1663. return cm.getCustomNodePricing(false)
  1664. }
  1665. // If custom pricing is enabled and can be retrieved, override detected
  1666. // node pricing with the custom values.
  1667. customPricingConfig, err := cm.Provider.GetConfig()
  1668. if err != nil {
  1669. log.Warningf("CostModel: failed to load custom pricing: %s", err)
  1670. }
  1671. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1672. return cm.getCustomNodePricing(node.Preemptible)
  1673. }
  1674. node.Source = "prometheus"
  1675. // If any of the values are NaN or zero, replace them with the custom
  1676. // values as default.
  1677. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1678. // them as strings like this?
  1679. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1680. log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1681. cpuCostStr := customPricingConfig.CPU
  1682. if node.Preemptible {
  1683. cpuCostStr = customPricingConfig.SpotCPU
  1684. }
  1685. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1686. if err != nil {
  1687. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1688. }
  1689. node.CostPerCPUHr = costPerCPUHr
  1690. node.Source += "/customCPU"
  1691. }
  1692. if math.IsNaN(node.CostPerGPUHr) {
  1693. log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1694. gpuCostStr := customPricingConfig.GPU
  1695. if node.Preemptible {
  1696. gpuCostStr = customPricingConfig.SpotGPU
  1697. }
  1698. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1699. if err != nil {
  1700. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1701. }
  1702. node.CostPerGPUHr = costPerGPUHr
  1703. node.Source += "/customGPU"
  1704. }
  1705. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1706. log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1707. ramCostStr := customPricingConfig.RAM
  1708. if node.Preemptible {
  1709. ramCostStr = customPricingConfig.SpotRAM
  1710. }
  1711. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1712. if err != nil {
  1713. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1714. }
  1715. node.CostPerRAMGiBHr = costPerRAMHr
  1716. node.Source += "/customRAM"
  1717. }
  1718. return node
  1719. }
  1720. // getCustomNodePricing converts the CostModel's configured custom pricing
  1721. // values into a NodePricing instance.
  1722. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1723. customPricingConfig, err := cm.Provider.GetConfig()
  1724. if err != nil {
  1725. return nil
  1726. }
  1727. cpuCostStr := customPricingConfig.CPU
  1728. gpuCostStr := customPricingConfig.GPU
  1729. ramCostStr := customPricingConfig.RAM
  1730. if spot {
  1731. cpuCostStr = customPricingConfig.SpotCPU
  1732. gpuCostStr = customPricingConfig.SpotGPU
  1733. ramCostStr = customPricingConfig.SpotRAM
  1734. }
  1735. node := &NodePricing{Source: "custom"}
  1736. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1737. if err != nil {
  1738. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1739. }
  1740. node.CostPerCPUHr = costPerCPUHr
  1741. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1742. if err != nil {
  1743. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1744. }
  1745. node.CostPerGPUHr = costPerGPUHr
  1746. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1747. if err != nil {
  1748. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1749. }
  1750. node.CostPerRAMGiBHr = costPerRAMHr
  1751. return node
  1752. }
  1753. // NodePricing describes the resource costs associated with a given node, as
  1754. // well as the source of the information (e.g. prometheus, custom)
  1755. type NodePricing struct {
  1756. Name string
  1757. NodeType string
  1758. ProviderID string
  1759. Preemptible bool
  1760. CostPerCPUHr float64
  1761. CostPerRAMGiBHr float64
  1762. CostPerGPUHr float64
  1763. Discount float64
  1764. Source string
  1765. }
  1766. // Pod describes a running pod's start and end time within a Window and
  1767. // all the Allocations (i.e. containers) contained within it.
  1768. type Pod struct {
  1769. Window kubecost.Window
  1770. Start time.Time
  1771. End time.Time
  1772. Key podKey
  1773. Allocations map[string]*kubecost.Allocation
  1774. }
  1775. // AppendContainer adds an entry for the given container name to the Pod.
  1776. func (p Pod) AppendContainer(container string) {
  1777. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1778. alloc := &kubecost.Allocation{
  1779. Name: name,
  1780. Properties: &kubecost.AllocationProperties{},
  1781. Window: p.Window.Clone(),
  1782. Start: p.Start,
  1783. End: p.End,
  1784. }
  1785. alloc.Properties.Container = container
  1786. alloc.Properties.Pod = p.Key.Pod
  1787. alloc.Properties.Namespace = p.Key.Namespace
  1788. alloc.Properties.Cluster = p.Key.Cluster
  1789. p.Allocations[container] = alloc
  1790. }
  1791. // PVC describes a PersistentVolumeClaim
  1792. // TODO:CLEANUP move to pkg/kubecost?
  1793. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1794. type PVC struct {
  1795. Bytes float64 `json:"bytes"`
  1796. Count int `json:"count"`
  1797. Name string `json:"name"`
  1798. Cluster string `json:"cluster"`
  1799. Namespace string `json:"namespace"`
  1800. Volume *PV `json:"persistentVolume"`
  1801. Mounted bool `json:"mounted"`
  1802. Start time.Time `json:"start"`
  1803. End time.Time `json:"end"`
  1804. }
  1805. // Cost computes the cumulative cost of the PVC
  1806. func (pvc *PVC) Cost() float64 {
  1807. if pvc == nil || pvc.Volume == nil {
  1808. return 0.0
  1809. }
  1810. gib := pvc.Bytes / 1024 / 1024 / 1024
  1811. hrs := pvc.Minutes() / 60.0
  1812. return pvc.Volume.CostPerGiBHour * gib * hrs
  1813. }
  1814. // Minutes computes the number of minutes over which the PVC is defined
  1815. func (pvc *PVC) Minutes() float64 {
  1816. if pvc == nil {
  1817. return 0.0
  1818. }
  1819. return pvc.End.Sub(pvc.Start).Minutes()
  1820. }
  1821. // String returns a string representation of the PVC
  1822. func (pvc *PVC) String() string {
  1823. if pvc == nil {
  1824. return "<nil>"
  1825. }
  1826. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  1827. }
  1828. // PV describes a PersistentVolume
  1829. // TODO:CLEANUP move to pkg/kubecost?
  1830. type PV struct {
  1831. Bytes float64 `json:"bytes"`
  1832. CostPerGiBHour float64 `json:"costPerGiBHour"`
  1833. Cluster string `json:"cluster"`
  1834. Name string `json:"name"`
  1835. StorageClass string `json:"storageClass"`
  1836. }
  1837. // String returns a string representation of the PV
  1838. func (pv *PV) String() string {
  1839. if pv == nil {
  1840. return "<nil>"
  1841. }
  1842. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  1843. }