allocation.go 80 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/util/timeutil"
  9. "github.com/kubecost/cost-model/pkg/cloud"
  10. "github.com/kubecost/cost-model/pkg/env"
  11. "github.com/kubecost/cost-model/pkg/kubecost"
  12. "github.com/kubecost/cost-model/pkg/log"
  13. "github.com/kubecost/cost-model/pkg/prom"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]%s`
  18. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s, provider_id)`
  19. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  20. queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  21. queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  22. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  23. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  24. queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  25. queryFmtCPUUsageMax = `max(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  26. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  27. queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
  28. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
  29. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
  30. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
  31. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
  32. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]%s`
  33. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, %s)`
  34. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
  35. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, %s)`
  36. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, %s)`
  37. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  38. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (%s)`
  39. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  40. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (%s)`
  41. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  42. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (%s)`
  43. queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s]%s)) by (pod_name, pod, namespace, %s)`
  44. queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s]%s)) by (pod_name, pod, namespace, %s)`
  45. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
  46. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
  47. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
  48. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
  49. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
  50. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
  51. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
  52. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, %s)`
  53. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,%s)`
  54. queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s]%s)) by (pod, owner_name, namespace ,%s)`
  55. queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s]%s)) by (replicaset, namespace, %s)`
  56. queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s]%s)) by (namespace, service_name, %s)`
  57. queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]%s`
  58. )
  59. // This is a bit of a hack to work around garbage data from cadvisor
  60. // Ideally you cap each pod to the max CPU on its node, but that involves a bit more complexity, as it it would need to be done when allocations joins with asset data.
  61. const MAX_CPU_CAP = 512
  62. // CanCompute should return true if CostModel can act as a valid source for the
  63. // given time range. In the case of CostModel we want to attempt to compute as
  64. // long as the range starts in the past. If the CostModel ends up not having
  65. // data to match, that's okay, and should be communicated with an error
  66. // response from ComputeAllocation.
  67. func (cm *CostModel) CanCompute(start, end time.Time) bool {
  68. return start.Before(time.Now())
  69. }
  70. // Name returns the name of the Source
  71. func (cm *CostModel) Name() string {
  72. return "CostModel"
  73. }
  74. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  75. // for the window defined by the given start and end times. The Allocations
  76. // returned are unaggregated (i.e. down to the container level).
  77. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  78. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  79. // 2. Run and apply the results of the remaining queries to
  80. // 3. Build out AllocationSet from completed Pod map
  81. // Create a window spanning the requested query
  82. window := kubecost.NewWindow(&start, &end)
  83. // Create an empty AllocationSet. For safety, in the case of an error, we
  84. // should prefer to return this empty set with the error. (In the case of
  85. // no error, of course we populate the set and return it.)
  86. allocSet := kubecost.NewAllocationSet(start, end)
  87. // (1) Build out Pod map
  88. // Build out a map of Allocations as a mapping from pod-to-container-to-
  89. // underlying-Allocation instance, starting with (start, end) so that we
  90. // begin with minutes, from which we compute resource allocation and cost
  91. // totals from measured rate data.
  92. podMap := map[podKey]*Pod{}
  93. // clusterStarts and clusterEnds record the earliest start and latest end
  94. // times, respectively, on a cluster-basis. These are used for unmounted
  95. // PVs and other "virtual" Allocations so that minutes are maximally
  96. // accurate during start-up or spin-down of a cluster
  97. clusterStart := map[string]time.Time{}
  98. clusterEnd := map[string]time.Time{}
  99. cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
  100. // (2) Run and apply remaining queries
  101. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  102. // including handling Thanos offset
  103. durStr, offStr, err := window.DurationOffsetForPrometheus()
  104. if err != nil {
  105. // Negative duration, so return empty set
  106. return allocSet, nil
  107. }
  108. // Convert resolution duration to a query-ready string
  109. resStr := timeutil.DurationString(resolution)
  110. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  111. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr, env.GetPromClusterLabel())
  112. resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
  113. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr, env.GetPromClusterLabel())
  114. resChRAMRequests := ctx.Query(queryRAMRequests)
  115. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, offStr, env.GetPromClusterLabel())
  116. resChRAMUsageAvg := ctx.Query(queryRAMUsageAvg)
  117. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, offStr, env.GetPromClusterLabel())
  118. resChRAMUsageMax := ctx.Query(queryRAMUsageMax)
  119. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr, env.GetPromClusterLabel())
  120. resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
  121. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr, env.GetPromClusterLabel())
  122. resChCPURequests := ctx.Query(queryCPURequests)
  123. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, offStr, env.GetPromClusterLabel())
  124. resChCPUUsageAvg := ctx.Query(queryCPUUsageAvg)
  125. queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, offStr, env.GetPromClusterLabel())
  126. resChCPUUsageMax := ctx.Query(queryCPUUsageMax)
  127. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr, env.GetPromClusterLabel())
  128. resChGPUsRequested := ctx.Query(queryGPUsRequested)
  129. queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, durStr, offStr, env.GetPromClusterLabel())
  130. resChGPUsAllocated := ctx.Query(queryGPUsAllocated)
  131. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr, env.GetPromClusterLabel())
  132. resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
  133. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr, env.GetPromClusterLabel())
  134. resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
  135. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr, env.GetPromClusterLabel())
  136. resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
  137. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
  138. resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
  139. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr, offStr)
  140. resChPVCInfo := ctx.Query(queryPVCInfo)
  141. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr, env.GetPromClusterLabel())
  142. resChPVBytes := ctx.Query(queryPVBytes)
  143. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr, env.GetPromClusterLabel())
  144. resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
  145. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr, env.GetPromClusterLabel())
  146. resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
  147. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr, env.GetPromClusterLabel())
  148. resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
  149. queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, durStr, offStr, env.GetPromClusterLabel())
  150. resChNetTransferBytes := ctx.Query(queryNetTransferBytes)
  151. queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, durStr, offStr, env.GetPromClusterLabel())
  152. resChNetReceiveBytes := ctx.Query(queryNetReceiveBytes)
  153. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr, env.GetPromClusterLabel())
  154. resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
  155. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
  156. resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
  157. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr, env.GetPromClusterLabel())
  158. resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
  159. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
  160. resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
  161. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr, env.GetPromClusterLabel())
  162. resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
  163. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
  164. resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
  165. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
  166. resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
  167. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
  168. resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
  169. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
  170. resChPodLabels := ctx.Query(queryPodLabels)
  171. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
  172. resChPodAnnotations := ctx.Query(queryPodAnnotations)
  173. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
  174. resChServiceLabels := ctx.Query(queryServiceLabels)
  175. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
  176. resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
  177. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
  178. resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
  179. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr, env.GetPromClusterLabel())
  180. resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
  181. queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, durStr, offStr, env.GetPromClusterLabel())
  182. resChPodsWithReplicaSetOwner := ctx.Query(queryPodsWithReplicaSetOwner)
  183. queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, durStr, offStr, env.GetPromClusterLabel())
  184. resChReplicaSetsWithoutOwners := ctx.Query(queryReplicaSetsWithoutOwners)
  185. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr, env.GetPromClusterLabel())
  186. resChJobLabels := ctx.Query(queryJobLabels)
  187. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, offStr, env.GetPromClusterLabel())
  188. resChLBCostPerHr := ctx.Query(queryLBCostPerHr)
  189. queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr, offStr)
  190. resChLBActiveMins := ctx.Query(queryLBActiveMins)
  191. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  192. resCPURequests, _ := resChCPURequests.Await()
  193. resCPUUsageAvg, _ := resChCPUUsageAvg.Await()
  194. resCPUUsageMax, _ := resChCPUUsageMax.Await()
  195. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  196. resRAMRequests, _ := resChRAMRequests.Await()
  197. resRAMUsageAvg, _ := resChRAMUsageAvg.Await()
  198. resRAMUsageMax, _ := resChRAMUsageMax.Await()
  199. resGPUsRequested, _ := resChGPUsRequested.Await()
  200. resGPUsAllocated, _ := resChGPUsAllocated.Await()
  201. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  202. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  203. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  204. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  205. resPVBytes, _ := resChPVBytes.Await()
  206. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  207. resPVCInfo, _ := resChPVCInfo.Await()
  208. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  209. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  210. resNetTransferBytes, _ := resChNetTransferBytes.Await()
  211. resNetReceiveBytes, _ := resChNetReceiveBytes.Await()
  212. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  213. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  214. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  215. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  216. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  217. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  218. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  219. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  220. resPodLabels, _ := resChPodLabels.Await()
  221. resPodAnnotations, _ := resChPodAnnotations.Await()
  222. resServiceLabels, _ := resChServiceLabels.Await()
  223. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  224. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  225. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  226. resPodsWithReplicaSetOwner, _ := resChPodsWithReplicaSetOwner.Await()
  227. resReplicaSetsWithoutOwners, _ := resChReplicaSetsWithoutOwners.Await()
  228. resJobLabels, _ := resChJobLabels.Await()
  229. resLBCostPerHr, _ := resChLBCostPerHr.Await()
  230. resLBActiveMins, _ := resChLBActiveMins.Await()
  231. if ctx.HasErrors() {
  232. for _, err := range ctx.Errors() {
  233. log.Errorf("CostModel.ComputeAllocation: %s", err)
  234. }
  235. return allocSet, ctx.ErrorCollection()
  236. }
  237. // We choose to apply allocation before requests in the cases of RAM and
  238. // CPU so that we can assert that allocation should always be greater than
  239. // or equal to request.
  240. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  241. applyCPUCoresRequested(podMap, resCPURequests)
  242. applyCPUCoresUsedAvg(podMap, resCPUUsageAvg)
  243. applyCPUCoresUsedMax(podMap, resCPUUsageMax)
  244. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  245. applyRAMBytesRequested(podMap, resRAMRequests)
  246. applyRAMBytesUsedAvg(podMap, resRAMUsageAvg)
  247. applyRAMBytesUsedMax(podMap, resRAMUsageMax)
  248. applyGPUsAllocated(podMap, resGPUsRequested, resGPUsAllocated)
  249. applyNetworkTotals(podMap, resNetTransferBytes, resNetReceiveBytes)
  250. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  251. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  252. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  253. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  254. podLabels := resToPodLabels(resPodLabels)
  255. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  256. podAnnotations := resToPodAnnotations(resPodAnnotations)
  257. applyLabels(podMap, namespaceLabels, podLabels)
  258. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  259. serviceLabels := getServiceLabels(resServiceLabels)
  260. allocsByService := map[serviceKey][]*kubecost.Allocation{}
  261. applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
  262. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  263. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  264. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  265. podJobMap := resToPodJobMap(resJobLabels)
  266. podReplicaSetMap := resToPodReplicaSetMap(resPodsWithReplicaSetOwner, resReplicaSetsWithoutOwners)
  267. applyControllersToPods(podMap, podDeploymentMap)
  268. applyControllersToPods(podMap, podStatefulSetMap)
  269. applyControllersToPods(podMap, podDaemonSetMap)
  270. applyControllersToPods(podMap, podJobMap)
  271. applyControllersToPods(podMap, podReplicaSetMap)
  272. // TODO breakdown network costs?
  273. // Build out a map of Nodes with resource costs, discounts, and node types
  274. // for converting resource allocation data to cumulative costs.
  275. nodeMap := map[nodeKey]*NodePricing{}
  276. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  277. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  278. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  279. applyNodeSpot(nodeMap, resNodeIsSpot)
  280. applyNodeDiscount(nodeMap, cm)
  281. // Build out the map of all PVs with class, size and cost-per-hour.
  282. // Note: this does not record time running, which we may want to
  283. // include later for increased PV precision. (As long as the PV has
  284. // a PVC, we get time running there, so this is only inaccurate
  285. // for short-lived, unmounted PVs.)
  286. pvMap := map[pvKey]*PV{}
  287. buildPVMap(pvMap, resPVCostPerGiBHour)
  288. applyPVBytes(pvMap, resPVBytes)
  289. // Build out the map of all PVCs with time running, bytes requested,
  290. // and connect to the correct PV from pvMap. (If no PV exists, that
  291. // is noted, but does not result in any allocation/cost.)
  292. pvcMap := map[pvcKey]*PVC{}
  293. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  294. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  295. // Build out the relationships of pods to their PVCs. This step
  296. // populates the PVC.Count field so that PVC allocation can be
  297. // split appropriately among each pod's container allocation.
  298. podPVCMap := map[podKey][]*PVC{}
  299. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  300. // Because PVCs can be shared among pods, the respective PV cost
  301. // needs to be evenly distributed to those pods based on time
  302. // running, as well as the amount of time the PVC was shared.
  303. // Build a relation between every PVC to the pods that mount it
  304. // and a window representing the interval during which they
  305. // were associated.
  306. pvcPodIntervalMap := make(map[pvcKey]map[podKey]kubecost.Window)
  307. for _, pod := range podMap {
  308. for _, alloc := range pod.Allocations {
  309. cluster := alloc.Properties.Cluster
  310. namespace := alloc.Properties.Namespace
  311. pod := alloc.Properties.Pod
  312. thisPodKey := newPodKey(cluster, namespace, pod)
  313. if pvcs, ok := podPVCMap[thisPodKey]; ok {
  314. for _, pvc := range pvcs {
  315. // Determine the (start, end) of the relationship between the
  316. // given PVC and the associated Allocation so that a precise
  317. // number of hours can be used to compute cumulative cost.
  318. s, e := alloc.Start, alloc.End
  319. if pvc.Start.After(alloc.Start) {
  320. s = pvc.Start
  321. }
  322. if pvc.End.Before(alloc.End) {
  323. e = pvc.End
  324. }
  325. thisPVCKey := newPVCKey(cluster, namespace, pvc.Name)
  326. if pvcPodIntervalMap[thisPVCKey] == nil {
  327. pvcPodIntervalMap[thisPVCKey] = make(map[podKey]kubecost.Window)
  328. }
  329. pvcPodIntervalMap[thisPVCKey][thisPodKey] = kubecost.NewWindow(&s, &e)
  330. }
  331. }
  332. // We only need to look at one alloc per pod
  333. break
  334. }
  335. }
  336. // Build out a PV price coefficient for each pod with a PVC. Each
  337. // PVC-pod relation needs a coefficient which modifies the PV cost
  338. // such that PV costs can be shared between all pods using that PVC.
  339. sharedPVCCostCoefficientMap := make(map[pvcKey]map[podKey][]CoefficientComponent)
  340. for pvcKey, podIntervalMap := range pvcPodIntervalMap {
  341. // Get single-point intervals from alloc-PVC relation windows.
  342. intervals := getIntervalPointsFromWindows(podIntervalMap)
  343. // Determine coefficients for each PVC-pod relation.
  344. sharedPVCCostCoefficientMap[pvcKey] = getPVCCostCoefficients(intervals, podIntervalMap)
  345. }
  346. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  347. // cluster representing each cluster's unmounted PVs (if necessary).
  348. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  349. lbMap := getLoadBalancerCosts(resLBCostPerHr, resLBActiveMins, resolution)
  350. applyLoadBalancersToPods(lbMap, allocsByService)
  351. // (3) Build out AllocationSet from Pod map
  352. for _, pod := range podMap {
  353. for _, alloc := range pod.Allocations {
  354. cluster := alloc.Properties.Cluster
  355. nodeName := alloc.Properties.Node
  356. namespace := alloc.Properties.Namespace
  357. pod := alloc.Properties.Pod
  358. container := alloc.Properties.Container
  359. podKey := newPodKey(cluster, namespace, pod)
  360. nodeKey := newNodeKey(cluster, nodeName)
  361. node := cm.getNodePricing(nodeMap, nodeKey)
  362. alloc.Properties.ProviderID = node.ProviderID
  363. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  364. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  365. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  366. if pvcs, ok := podPVCMap[podKey]; ok {
  367. for _, pvc := range pvcs {
  368. pvcKey := newPVCKey(cluster, namespace, pvc.Name)
  369. s, e := alloc.Start, alloc.End
  370. if pvcInterval, ok := pvcPodIntervalMap[pvcKey][podKey]; ok {
  371. s, e = *pvcInterval.Start(), *pvcInterval.End()
  372. } else {
  373. log.DedupedWarningf(10, "CostModel.ComputeAllocation: allocation %s and PVC %s have no associated active window", alloc.Name, pvc.Name)
  374. }
  375. minutes := e.Sub(s).Minutes()
  376. hrs := minutes / 60.0
  377. count := float64(pvc.Count)
  378. if pvc.Count < 1 {
  379. count = 1
  380. }
  381. gib := pvc.Bytes / 1024 / 1024 / 1024
  382. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  383. // Scale PV cost by PVC sharing coefficient.
  384. if coeffComponents, ok := sharedPVCCostCoefficientMap[pvcKey][podKey]; ok {
  385. cost *= getCoefficientFromComponents(coeffComponents)
  386. } else {
  387. log.DedupedWarningf(10, "CostModel.ComputeAllocation: allocation %s and PVC %s have relation but no coeff", alloc.Name, pvc.Name)
  388. }
  389. // Apply the size and cost of the PV to the allocation, each
  390. // weighted by count (i.e. the number of containers in the pod)
  391. // record the amount of total PVBytes Hours attributable to a given PV
  392. if alloc.PVs == nil {
  393. alloc.PVs = kubecost.PVAllocations{}
  394. }
  395. pvKey := kubecost.PVKey{
  396. Cluster: pvc.Cluster,
  397. Name: pvc.Volume.Name,
  398. }
  399. alloc.PVs[pvKey] = &kubecost.PVAllocation{
  400. ByteHours: pvc.Bytes * hrs / count,
  401. Cost: cost / count,
  402. }
  403. }
  404. }
  405. // Make sure that the name is correct (node may not be present at this
  406. // point due to it missing from queryMinutes) then insert.
  407. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  408. allocSet.Set(alloc)
  409. }
  410. }
  411. return allocSet, nil
  412. }
  413. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  414. // Assumes that window is positive and closed
  415. start, end := *window.Start(), *window.End()
  416. // Convert resolution duration to a query-ready string
  417. resStr := timeutil.DurationString(resolution)
  418. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  419. // Query for (start, end) by (pod, namespace, cluster) over the given
  420. // window, using the given resolution, and if necessary in batches no
  421. // larger than the given maximum batch size. If working in batches, track
  422. // overall progress by starting with (window.start, window.start) and
  423. // querying in batches no larger than maxBatchSize from start-to-end,
  424. // folding each result set into podMap as the results come back.
  425. coverage := kubecost.NewWindow(&start, &start)
  426. numQuery := 1
  427. for coverage.End().Before(end) {
  428. // Determine the (start, end) of the current batch
  429. batchStart := *coverage.End()
  430. batchEnd := coverage.End().Add(maxBatchSize)
  431. if batchEnd.After(end) {
  432. batchEnd = end
  433. }
  434. batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
  435. var resPods []*prom.QueryResult
  436. var err error
  437. maxTries := 3
  438. numTries := 0
  439. for resPods == nil && numTries < maxTries {
  440. numTries++
  441. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  442. // including handling Thanos offset
  443. durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
  444. if err != nil || durStr == "" {
  445. // Negative duration, so set empty results and don't query
  446. resPods = []*prom.QueryResult{}
  447. err = nil
  448. break
  449. }
  450. // Submit and profile query
  451. queryPods := fmt.Sprintf(queryFmtPods, env.GetPromClusterLabel(), durStr, resStr, offStr)
  452. queryProfile := time.Now()
  453. resPods, err = ctx.Query(queryPods).Await()
  454. if err != nil {
  455. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  456. resPods = nil
  457. }
  458. }
  459. if err != nil {
  460. return err
  461. }
  462. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  463. coverage = coverage.ExpandEnd(batchEnd)
  464. numQuery++
  465. }
  466. return nil
  467. }
  468. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  469. for _, res := range resPods {
  470. if len(res.Values) == 0 {
  471. log.DedupedWarningf(10, "CostModel.ComputeAllocation: empty minutes result")
  472. continue
  473. }
  474. cluster, err := res.GetString(env.GetPromClusterLabel())
  475. if err != nil {
  476. cluster = env.GetClusterID()
  477. }
  478. labels, err := res.GetStrings("namespace", "pod")
  479. if err != nil {
  480. log.DedupedWarningf(10, "CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  481. continue
  482. }
  483. namespace := labels["namespace"]
  484. pod := labels["pod"]
  485. key := newPodKey(cluster, namespace, pod)
  486. // allocStart and allocEnd are the timestamps of the first and last
  487. // minutes the pod was running, respectively. We subtract one resolution
  488. // from allocStart because this point will actually represent the end
  489. // of the first minute. We don't subtract from allocEnd because it
  490. // already represents the end of the last minute.
  491. var allocStart, allocEnd time.Time
  492. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  493. for _, datum := range res.Values {
  494. t := time.Unix(int64(datum.Timestamp), 0)
  495. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  496. // Set the start timestamp to the earliest non-zero timestamp
  497. allocStart = t
  498. // Record adjustment coefficient, i.e. the portion of the start
  499. // timestamp to "ignore". That is, sometimes the value will be
  500. // 0.5, meaning that we should discount the time running by
  501. // half of the resolution the timestamp stands for.
  502. startAdjustmentCoeff = (1.0 - datum.Value)
  503. }
  504. if datum.Value > 0 && window.Contains(t) {
  505. // Set the end timestamp to the latest non-zero timestamp
  506. allocEnd = t
  507. // Record adjustment coefficient, i.e. the portion of the end
  508. // timestamp to "ignore". (See explanation above for start.)
  509. endAdjustmentCoeff = (1.0 - datum.Value)
  510. }
  511. }
  512. if allocStart.IsZero() || allocEnd.IsZero() {
  513. continue
  514. }
  515. // Adjust timestamps according to the resolution and the adjustment
  516. // coefficients, as described above. That is, count the start timestamp
  517. // from the beginning of the resolution, not the end. Then "reduce" the
  518. // start and end by the correct amount, in the case that the "running"
  519. // value of the first or last timestamp was not a full 1.0.
  520. allocStart = allocStart.Add(-resolution)
  521. // Note: the *100 and /100 are necessary because Duration is an int, so
  522. // 0.5, for instance, will be truncated, resulting in no adjustment.
  523. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  524. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  525. // Ensure that the allocStart is always within the window, adjusting
  526. // for the occasions where start falls 1m before the query window.
  527. // NOTE: window here will always be closed (so no need to nil check
  528. // "start").
  529. // TODO:CLEANUP revisit query methodology to figure out why this is
  530. // happening on occasion
  531. if allocStart.Before(*window.Start()) {
  532. allocStart = *window.Start()
  533. }
  534. // If there is only one point with a value <= 0.5 that the start and
  535. // end timestamps both share, then we will enter this case because at
  536. // least half of a resolution will be subtracted from both the start
  537. // and the end. If that is the case, then add back half of each side
  538. // so that the pod is said to run for half a resolution total.
  539. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  540. // end up with allocEnd == allocStart and each coeff == 0.5. In
  541. // that case, add 0.25m to each side, resulting in 0.5m duration.
  542. if !allocEnd.After(allocStart) {
  543. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  544. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  545. }
  546. // Ensure that the allocEnf is always within the window, adjusting
  547. // for the occasions where end falls 1m after the query window. This
  548. // has not ever happened, but is symmetrical with the start check
  549. // above.
  550. // NOTE: window here will always be closed (so no need to nil check
  551. // "end").
  552. // TODO:CLEANUP revisit query methodology to figure out why this is
  553. // happening on occasion
  554. if allocEnd.After(*window.End()) {
  555. allocEnd = *window.End()
  556. }
  557. // Set start if unset or this datum's start time is earlier than the
  558. // current earliest time.
  559. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  560. clusterStart[cluster] = allocStart
  561. }
  562. // Set end if unset or this datum's end time is later than the
  563. // current latest time.
  564. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  565. clusterEnd[cluster] = allocEnd
  566. }
  567. if pod, ok := podMap[key]; ok {
  568. // Pod has already been recorded, so update it accordingly
  569. if allocStart.Before(pod.Start) {
  570. pod.Start = allocStart
  571. }
  572. if allocEnd.After(pod.End) {
  573. pod.End = allocEnd
  574. }
  575. } else {
  576. // Pod has not been recorded yet, so insert it
  577. podMap[key] = &Pod{
  578. Window: window.Clone(),
  579. Start: allocStart,
  580. End: allocEnd,
  581. Key: key,
  582. Allocations: map[string]*kubecost.Allocation{},
  583. }
  584. }
  585. }
  586. }
  587. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  588. for _, res := range resCPUCoresAllocated {
  589. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  590. if err != nil {
  591. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  592. continue
  593. }
  594. pod, ok := podMap[key]
  595. if !ok {
  596. continue
  597. }
  598. container, err := res.GetString("container")
  599. if err != nil {
  600. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  601. continue
  602. }
  603. if _, ok := pod.Allocations[container]; !ok {
  604. pod.AppendContainer(container)
  605. }
  606. cpuCores := res.Values[0].Value
  607. if cpuCores > MAX_CPU_CAP {
  608. log.Infof("[WARNING] Very large cpu allocation, clamping to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
  609. cpuCores = 0.0
  610. }
  611. hours := pod.Allocations[container].Minutes() / 60.0
  612. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  613. node, err := res.GetString("node")
  614. if err != nil {
  615. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  616. continue
  617. }
  618. pod.Allocations[container].Properties.Node = node
  619. }
  620. }
  621. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  622. for _, res := range resCPUCoresRequested {
  623. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  624. if err != nil {
  625. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  626. continue
  627. }
  628. pod, ok := podMap[key]
  629. if !ok {
  630. continue
  631. }
  632. container, err := res.GetString("container")
  633. if err != nil {
  634. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  635. continue
  636. }
  637. if _, ok := pod.Allocations[container]; !ok {
  638. pod.AppendContainer(container)
  639. }
  640. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  641. // If CPU allocation is less than requests, set CPUCoreHours to
  642. // request level.
  643. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  644. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  645. }
  646. if pod.Allocations[container].CPUCores() > MAX_CPU_CAP {
  647. log.Infof("[WARNING] Very large cpu allocation, clamping! to %f", res.Values[0].Value*(pod.Allocations[container].Minutes()/60.0))
  648. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  649. }
  650. node, err := res.GetString("node")
  651. if err != nil {
  652. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  653. continue
  654. }
  655. pod.Allocations[container].Properties.Node = node
  656. }
  657. }
  658. func applyCPUCoresUsedAvg(podMap map[podKey]*Pod, resCPUCoresUsedAvg []*prom.QueryResult) {
  659. for _, res := range resCPUCoresUsedAvg {
  660. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  661. if err != nil {
  662. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg result missing field: %s", err)
  663. continue
  664. }
  665. pod, ok := podMap[key]
  666. if !ok {
  667. continue
  668. }
  669. container, err := res.GetString("container")
  670. if container == "" || err != nil {
  671. container, err = res.GetString("container_name")
  672. if err != nil {
  673. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg query result missing 'container': %s", key)
  674. continue
  675. }
  676. }
  677. if _, ok := pod.Allocations[container]; !ok {
  678. pod.AppendContainer(container)
  679. }
  680. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  681. if res.Values[0].Value > MAX_CPU_CAP {
  682. log.Infof("[WARNING] Very large cpu USAGE, dropping outlier")
  683. pod.Allocations[container].CPUCoreUsageAverage = 0.0
  684. }
  685. }
  686. }
  687. func applyCPUCoresUsedMax(podMap map[podKey]*Pod, resCPUCoresUsedMax []*prom.QueryResult) {
  688. for _, res := range resCPUCoresUsedMax {
  689. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  690. if err != nil {
  691. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max result missing field: %s", err)
  692. continue
  693. }
  694. pod, ok := podMap[key]
  695. if !ok {
  696. continue
  697. }
  698. container, err := res.GetString("container")
  699. if container == "" || err != nil {
  700. container, err = res.GetString("container_name")
  701. if err != nil {
  702. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max query result missing 'container': %s", key)
  703. continue
  704. }
  705. }
  706. if _, ok := pod.Allocations[container]; !ok {
  707. pod.AppendContainer(container)
  708. }
  709. if pod.Allocations[container].RawAllocationOnly == nil {
  710. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  711. CPUCoreUsageMax: res.Values[0].Value,
  712. }
  713. } else {
  714. pod.Allocations[container].RawAllocationOnly.CPUCoreUsageMax = res.Values[0].Value
  715. }
  716. }
  717. }
  718. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  719. for _, res := range resRAMBytesAllocated {
  720. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  721. if err != nil {
  722. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  723. continue
  724. }
  725. pod, ok := podMap[key]
  726. if !ok {
  727. continue
  728. }
  729. container, err := res.GetString("container")
  730. if err != nil {
  731. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  732. continue
  733. }
  734. if _, ok := pod.Allocations[container]; !ok {
  735. pod.AppendContainer(container)
  736. }
  737. ramBytes := res.Values[0].Value
  738. hours := pod.Allocations[container].Minutes() / 60.0
  739. pod.Allocations[container].RAMByteHours = ramBytes * hours
  740. node, err := res.GetString("node")
  741. if err != nil {
  742. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  743. continue
  744. }
  745. pod.Allocations[container].Properties.Node = node
  746. }
  747. }
  748. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  749. for _, res := range resRAMBytesRequested {
  750. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  751. if err != nil {
  752. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  753. continue
  754. }
  755. pod, ok := podMap[key]
  756. if !ok {
  757. continue
  758. }
  759. container, err := res.GetString("container")
  760. if err != nil {
  761. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  762. continue
  763. }
  764. if _, ok := pod.Allocations[container]; !ok {
  765. pod.AppendContainer(container)
  766. }
  767. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  768. // If RAM allocation is less than requests, set RAMByteHours to
  769. // request level.
  770. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  771. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  772. }
  773. node, err := res.GetString("node")
  774. if err != nil {
  775. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  776. continue
  777. }
  778. pod.Allocations[container].Properties.Node = node
  779. }
  780. }
  781. func applyRAMBytesUsedAvg(podMap map[podKey]*Pod, resRAMBytesUsedAvg []*prom.QueryResult) {
  782. for _, res := range resRAMBytesUsedAvg {
  783. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  784. if err != nil {
  785. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM avg usage result missing field: %s", err)
  786. continue
  787. }
  788. pod, ok := podMap[key]
  789. if !ok {
  790. continue
  791. }
  792. container, err := res.GetString("container")
  793. if container == "" || err != nil {
  794. container, err = res.GetString("container_name")
  795. if err != nil {
  796. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage avg query result missing 'container': %s", key)
  797. continue
  798. }
  799. }
  800. if _, ok := pod.Allocations[container]; !ok {
  801. pod.AppendContainer(container)
  802. }
  803. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  804. }
  805. }
  806. func applyRAMBytesUsedMax(podMap map[podKey]*Pod, resRAMBytesUsedMax []*prom.QueryResult) {
  807. for _, res := range resRAMBytesUsedMax {
  808. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  809. if err != nil {
  810. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max result missing field: %s", err)
  811. continue
  812. }
  813. pod, ok := podMap[key]
  814. if !ok {
  815. continue
  816. }
  817. container, err := res.GetString("container")
  818. if container == "" || err != nil {
  819. container, err = res.GetString("container_name")
  820. if err != nil {
  821. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max query result missing 'container': %s", key)
  822. continue
  823. }
  824. }
  825. if _, ok := pod.Allocations[container]; !ok {
  826. pod.AppendContainer(container)
  827. }
  828. if pod.Allocations[container].RawAllocationOnly == nil {
  829. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  830. RAMBytesUsageMax: res.Values[0].Value,
  831. }
  832. } else {
  833. pod.Allocations[container].RawAllocationOnly.RAMBytesUsageMax = res.Values[0].Value
  834. }
  835. }
  836. }
  837. func applyGPUsAllocated(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult, resGPUsAllocated []*prom.QueryResult) {
  838. if len(resGPUsAllocated) > 0 { // Use the new query, when it's become available in a window
  839. resGPUsRequested = resGPUsAllocated
  840. }
  841. for _, res := range resGPUsRequested {
  842. key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  843. if err != nil {
  844. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  845. continue
  846. }
  847. pod, ok := podMap[key]
  848. if !ok {
  849. continue
  850. }
  851. container, err := res.GetString("container")
  852. if err != nil {
  853. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  854. continue
  855. }
  856. if _, ok := pod.Allocations[container]; !ok {
  857. pod.AppendContainer(container)
  858. }
  859. hrs := pod.Allocations[container].Minutes() / 60.0
  860. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  861. }
  862. }
  863. func applyNetworkTotals(podMap map[podKey]*Pod, resNetworkTransferBytes []*prom.QueryResult, resNetworkReceiveBytes []*prom.QueryResult) {
  864. for _, res := range resNetworkTransferBytes {
  865. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  866. if err != nil {
  867. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Transfer Bytes query result missing field: %s", err)
  868. continue
  869. }
  870. pod, ok := podMap[podKey]
  871. if !ok {
  872. continue
  873. }
  874. for _, alloc := range pod.Allocations {
  875. alloc.NetworkTransferBytes = res.Values[0].Value / float64(len(pod.Allocations))
  876. }
  877. }
  878. for _, res := range resNetworkReceiveBytes {
  879. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  880. if err != nil {
  881. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Receive Bytes query result missing field: %s", err)
  882. continue
  883. }
  884. pod, ok := podMap[podKey]
  885. if !ok {
  886. continue
  887. }
  888. for _, alloc := range pod.Allocations {
  889. alloc.NetworkReceiveBytes = res.Values[0].Value / float64(len(pod.Allocations))
  890. }
  891. }
  892. }
  893. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  894. costPerGiBByCluster := map[string]float64{}
  895. for _, res := range resNetworkCostPerGiB {
  896. cluster, err := res.GetString(env.GetPromClusterLabel())
  897. if err != nil {
  898. cluster = env.GetClusterID()
  899. }
  900. costPerGiBByCluster[cluster] = res.Values[0].Value
  901. }
  902. for _, res := range resNetworkGiB {
  903. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  904. if err != nil {
  905. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  906. continue
  907. }
  908. pod, ok := podMap[podKey]
  909. if !ok {
  910. continue
  911. }
  912. for _, alloc := range pod.Allocations {
  913. gib := res.Values[0].Value / float64(len(pod.Allocations))
  914. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  915. alloc.NetworkCost = gib * costPerGiB
  916. }
  917. }
  918. }
  919. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
  920. namespaceLabels := map[namespaceKey]map[string]string{}
  921. for _, res := range resNamespaceLabels {
  922. nsKey, err := resultNamespaceKey(res, env.GetPromClusterLabel(), "namespace")
  923. if err != nil {
  924. continue
  925. }
  926. if _, ok := namespaceLabels[nsKey]; !ok {
  927. namespaceLabels[nsKey] = map[string]string{}
  928. }
  929. for k, l := range res.GetLabels() {
  930. namespaceLabels[nsKey][k] = l
  931. }
  932. }
  933. return namespaceLabels
  934. }
  935. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  936. podLabels := map[podKey]map[string]string{}
  937. for _, res := range resPodLabels {
  938. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  939. if err != nil {
  940. continue
  941. }
  942. if _, ok := podLabels[podKey]; !ok {
  943. podLabels[podKey] = map[string]string{}
  944. }
  945. for k, l := range res.GetLabels() {
  946. podLabels[podKey][k] = l
  947. }
  948. }
  949. return podLabels
  950. }
  951. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  952. namespaceAnnotations := map[string]map[string]string{}
  953. for _, res := range resNamespaceAnnotations {
  954. namespace, err := res.GetString("namespace")
  955. if err != nil {
  956. continue
  957. }
  958. if _, ok := namespaceAnnotations[namespace]; !ok {
  959. namespaceAnnotations[namespace] = map[string]string{}
  960. }
  961. for k, l := range res.GetAnnotations() {
  962. namespaceAnnotations[namespace][k] = l
  963. }
  964. }
  965. return namespaceAnnotations
  966. }
  967. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  968. podAnnotations := map[podKey]map[string]string{}
  969. for _, res := range resPodAnnotations {
  970. podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
  971. if err != nil {
  972. continue
  973. }
  974. if _, ok := podAnnotations[podKey]; !ok {
  975. podAnnotations[podKey] = map[string]string{}
  976. }
  977. for k, l := range res.GetAnnotations() {
  978. podAnnotations[podKey][k] = l
  979. }
  980. }
  981. return podAnnotations
  982. }
  983. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
  984. for podKey, pod := range podMap {
  985. for _, alloc := range pod.Allocations {
  986. allocLabels := alloc.Properties.Labels
  987. if allocLabels == nil {
  988. allocLabels = make(map[string]string)
  989. }
  990. // Apply namespace labels first, then pod labels so that pod labels
  991. // overwrite namespace labels.
  992. nsKey := podKey.namespaceKey // newNamespaceKey(podKey.Cluster, podKey.Namespace)
  993. if labels, ok := namespaceLabels[nsKey]; ok {
  994. for k, v := range labels {
  995. allocLabels[k] = v
  996. }
  997. }
  998. if labels, ok := podLabels[podKey]; ok {
  999. for k, v := range labels {
  1000. allocLabels[k] = v
  1001. }
  1002. }
  1003. alloc.Properties.Labels = allocLabels
  1004. }
  1005. }
  1006. }
  1007. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  1008. for key, pod := range podMap {
  1009. for _, alloc := range pod.Allocations {
  1010. allocAnnotations := alloc.Properties.Annotations
  1011. if allocAnnotations == nil {
  1012. allocAnnotations = make(map[string]string)
  1013. }
  1014. // Apply namespace annotations first, then pod annotations so that
  1015. // pod labels overwrite namespace labels.
  1016. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  1017. for k, v := range labels {
  1018. allocAnnotations[k] = v
  1019. }
  1020. }
  1021. if labels, ok := podAnnotations[key]; ok {
  1022. for k, v := range labels {
  1023. allocAnnotations[k] = v
  1024. }
  1025. }
  1026. alloc.Properties.Annotations = allocAnnotations
  1027. }
  1028. }
  1029. }
  1030. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  1031. serviceLabels := map[serviceKey]map[string]string{}
  1032. for _, res := range resServiceLabels {
  1033. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service")
  1034. if err != nil {
  1035. continue
  1036. }
  1037. if _, ok := serviceLabels[serviceKey]; !ok {
  1038. serviceLabels[serviceKey] = map[string]string{}
  1039. }
  1040. for k, l := range res.GetLabels() {
  1041. serviceLabels[serviceKey][k] = l
  1042. }
  1043. }
  1044. // Prune duplicate services. That is, if the same service exists with
  1045. // hyphens instead of underscores, keep the one that uses hyphens.
  1046. for key := range serviceLabels {
  1047. if strings.Contains(key.Service, "_") {
  1048. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  1049. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  1050. if _, ok := serviceLabels[duplicateKey]; ok {
  1051. delete(serviceLabels, key)
  1052. }
  1053. }
  1054. }
  1055. return serviceLabels
  1056. }
  1057. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  1058. deploymentLabels := map[controllerKey]map[string]string{}
  1059. for _, res := range resDeploymentLabels {
  1060. controllerKey, err := resultDeploymentKey(res, env.GetPromClusterLabel(), "namespace", "deployment")
  1061. if err != nil {
  1062. continue
  1063. }
  1064. if _, ok := deploymentLabels[controllerKey]; !ok {
  1065. deploymentLabels[controllerKey] = map[string]string{}
  1066. }
  1067. for k, l := range res.GetLabels() {
  1068. deploymentLabels[controllerKey][k] = l
  1069. }
  1070. }
  1071. // Prune duplicate deployments. That is, if the same deployment exists with
  1072. // hyphens instead of underscores, keep the one that uses hyphens.
  1073. for key := range deploymentLabels {
  1074. if strings.Contains(key.Controller, "_") {
  1075. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  1076. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  1077. if _, ok := deploymentLabels[duplicateKey]; ok {
  1078. delete(deploymentLabels, key)
  1079. }
  1080. }
  1081. }
  1082. return deploymentLabels
  1083. }
  1084. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  1085. statefulSetLabels := map[controllerKey]map[string]string{}
  1086. for _, res := range resStatefulSetLabels {
  1087. controllerKey, err := resultStatefulSetKey(res, env.GetPromClusterLabel(), "namespace", "statefulSet")
  1088. if err != nil {
  1089. continue
  1090. }
  1091. if _, ok := statefulSetLabels[controllerKey]; !ok {
  1092. statefulSetLabels[controllerKey] = map[string]string{}
  1093. }
  1094. for k, l := range res.GetLabels() {
  1095. statefulSetLabels[controllerKey][k] = l
  1096. }
  1097. }
  1098. // Prune duplicate stateful sets. That is, if the same stateful set exists
  1099. // with hyphens instead of underscores, keep the one that uses hyphens.
  1100. for key := range statefulSetLabels {
  1101. if strings.Contains(key.Controller, "_") {
  1102. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  1103. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  1104. if _, ok := statefulSetLabels[duplicateKey]; ok {
  1105. delete(statefulSetLabels, key)
  1106. }
  1107. }
  1108. }
  1109. return statefulSetLabels
  1110. }
  1111. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  1112. podControllerMap := map[podKey]controllerKey{}
  1113. // For each controller, turn the labels into a selector and attempt to
  1114. // match it with each set of pod labels. A match indicates that the pod
  1115. // belongs to the controller.
  1116. for cKey, cLabels := range controllerLabels {
  1117. selector := labels.Set(cLabels).AsSelectorPreValidated()
  1118. for pKey, pLabels := range podLabels {
  1119. // If the pod is in a different cluster or namespace, there is
  1120. // no need to compare the labels.
  1121. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  1122. continue
  1123. }
  1124. podLabelSet := labels.Set(pLabels)
  1125. if selector.Matches(podLabelSet) {
  1126. if _, ok := podControllerMap[pKey]; ok {
  1127. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  1128. }
  1129. podControllerMap[pKey] = cKey
  1130. }
  1131. }
  1132. }
  1133. return podControllerMap
  1134. }
  1135. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  1136. daemonSetLabels := map[podKey]controllerKey{}
  1137. for _, res := range resDaemonSetLabels {
  1138. controllerKey, err := resultDaemonSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1139. if err != nil {
  1140. continue
  1141. }
  1142. pod, err := res.GetString("pod")
  1143. if err != nil {
  1144. log.DedupedWarningf(10, "CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  1145. }
  1146. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1147. daemonSetLabels[podKey] = controllerKey
  1148. }
  1149. return daemonSetLabels
  1150. }
  1151. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  1152. jobLabels := map[podKey]controllerKey{}
  1153. for _, res := range resJobLabels {
  1154. controllerKey, err := resultJobKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1155. if err != nil {
  1156. continue
  1157. }
  1158. // Convert the name of Jobs generated by CronJobs to the name of the
  1159. // CronJob by stripping the timestamp off the end.
  1160. match := isCron.FindStringSubmatch(controllerKey.Controller)
  1161. if match != nil {
  1162. controllerKey.Controller = match[1]
  1163. }
  1164. pod, err := res.GetString("pod")
  1165. if err != nil {
  1166. log.DedupedWarningf(10, "CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  1167. }
  1168. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1169. jobLabels[podKey] = controllerKey
  1170. }
  1171. return jobLabels
  1172. }
  1173. func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*prom.QueryResult, resReplicaSetsWithoutOwners []*prom.QueryResult) map[podKey]controllerKey {
  1174. // Build out set of ReplicaSets that have no owners, themselves, such that
  1175. // the ReplicaSet should be used as the owner of the Pods it controls.
  1176. // (This should exclude, for example, ReplicaSets that are controlled by
  1177. // Deployments, in which case the Deployment should be the Pod's owner.)
  1178. replicaSets := map[controllerKey]struct{}{}
  1179. for _, res := range resReplicaSetsWithoutOwners {
  1180. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "replicaset")
  1181. if err != nil {
  1182. continue
  1183. }
  1184. replicaSets[controllerKey] = struct{}{}
  1185. }
  1186. // Create the mapping of Pods to ReplicaSets, ignoring any ReplicaSets that
  1187. // to not appear in the set of uncontrolled ReplicaSets above.
  1188. podToReplicaSet := map[podKey]controllerKey{}
  1189. for _, res := range resPodsWithReplicaSetOwner {
  1190. controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
  1191. if err != nil {
  1192. continue
  1193. }
  1194. if _, ok := replicaSets[controllerKey]; !ok {
  1195. continue
  1196. }
  1197. pod, err := res.GetString("pod")
  1198. if err != nil {
  1199. log.DedupedWarningf(10, "CostModel.ComputeAllocation: ReplicaSet result without pod: %s", controllerKey)
  1200. }
  1201. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1202. podToReplicaSet[podKey] = controllerKey
  1203. }
  1204. return podToReplicaSet
  1205. }
  1206. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, allocsByService map[serviceKey][]*kubecost.Allocation, serviceLabels map[serviceKey]map[string]string) {
  1207. podServicesMap := map[podKey][]serviceKey{}
  1208. // For each service, turn the labels into a selector and attempt to
  1209. // match it with each set of pod labels. A match indicates that the pod
  1210. // belongs to the service.
  1211. for sKey, sLabels := range serviceLabels {
  1212. selector := labels.Set(sLabels).AsSelectorPreValidated()
  1213. for pKey, pLabels := range podLabels {
  1214. // If the pod is in a different cluster or namespace, there is
  1215. // no need to compare the labels.
  1216. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  1217. continue
  1218. }
  1219. podLabelSet := labels.Set(pLabels)
  1220. if selector.Matches(podLabelSet) {
  1221. if _, ok := podServicesMap[pKey]; !ok {
  1222. podServicesMap[pKey] = []serviceKey{}
  1223. }
  1224. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  1225. }
  1226. }
  1227. }
  1228. // For each allocation in each pod, attempt to find and apply the list of
  1229. // services associated with the allocation's pod.
  1230. for key, pod := range podMap {
  1231. for _, alloc := range pod.Allocations {
  1232. if sKeys, ok := podServicesMap[key]; ok {
  1233. services := []string{}
  1234. for _, sKey := range sKeys {
  1235. services = append(services, sKey.Service)
  1236. allocsByService[sKey] = append(allocsByService[sKey], alloc)
  1237. }
  1238. alloc.Properties.Services = services
  1239. }
  1240. }
  1241. }
  1242. }
  1243. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  1244. for key, pod := range podMap {
  1245. for _, alloc := range pod.Allocations {
  1246. if controllerKey, ok := podControllerMap[key]; ok {
  1247. alloc.Properties.ControllerKind = controllerKey.ControllerKind
  1248. alloc.Properties.Controller = controllerKey.Controller
  1249. }
  1250. }
  1251. }
  1252. }
  1253. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  1254. for _, res := range resNodeCostPerCPUHr {
  1255. cluster, err := res.GetString(env.GetPromClusterLabel())
  1256. if err != nil {
  1257. cluster = env.GetClusterID()
  1258. }
  1259. node, err := res.GetString("node")
  1260. if err != nil {
  1261. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1262. continue
  1263. }
  1264. instanceType, err := res.GetString("instance_type")
  1265. if err != nil {
  1266. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1267. continue
  1268. }
  1269. providerID, err := res.GetString("provider_id")
  1270. if err != nil {
  1271. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1272. continue
  1273. }
  1274. key := newNodeKey(cluster, node)
  1275. if _, ok := nodeMap[key]; !ok {
  1276. nodeMap[key] = &NodePricing{
  1277. Name: node,
  1278. NodeType: instanceType,
  1279. ProviderID: cloud.ParseID(providerID),
  1280. }
  1281. }
  1282. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  1283. }
  1284. }
  1285. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  1286. for _, res := range resNodeCostPerRAMGiBHr {
  1287. cluster, err := res.GetString(env.GetPromClusterLabel())
  1288. if err != nil {
  1289. cluster = env.GetClusterID()
  1290. }
  1291. node, err := res.GetString("node")
  1292. if err != nil {
  1293. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1294. continue
  1295. }
  1296. instanceType, err := res.GetString("instance_type")
  1297. if err != nil {
  1298. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1299. continue
  1300. }
  1301. providerID, err := res.GetString("provider_id")
  1302. if err != nil {
  1303. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1304. continue
  1305. }
  1306. key := newNodeKey(cluster, node)
  1307. if _, ok := nodeMap[key]; !ok {
  1308. nodeMap[key] = &NodePricing{
  1309. Name: node,
  1310. NodeType: instanceType,
  1311. ProviderID: cloud.ParseID(providerID),
  1312. }
  1313. }
  1314. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1315. }
  1316. }
  1317. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1318. for _, res := range resNodeCostPerGPUHr {
  1319. cluster, err := res.GetString(env.GetPromClusterLabel())
  1320. if err != nil {
  1321. cluster = env.GetClusterID()
  1322. }
  1323. node, err := res.GetString("node")
  1324. if err != nil {
  1325. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1326. continue
  1327. }
  1328. instanceType, err := res.GetString("instance_type")
  1329. if err != nil {
  1330. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1331. continue
  1332. }
  1333. providerID, err := res.GetString("provider_id")
  1334. if err != nil {
  1335. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1336. continue
  1337. }
  1338. key := newNodeKey(cluster, node)
  1339. if _, ok := nodeMap[key]; !ok {
  1340. nodeMap[key] = &NodePricing{
  1341. Name: node,
  1342. NodeType: instanceType,
  1343. ProviderID: cloud.ParseID(providerID),
  1344. }
  1345. }
  1346. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1347. }
  1348. }
  1349. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1350. for _, res := range resNodeIsSpot {
  1351. cluster, err := res.GetString(env.GetPromClusterLabel())
  1352. if err != nil {
  1353. cluster = env.GetClusterID()
  1354. }
  1355. node, err := res.GetString("node")
  1356. if err != nil {
  1357. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1358. continue
  1359. }
  1360. key := newNodeKey(cluster, node)
  1361. if _, ok := nodeMap[key]; !ok {
  1362. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1363. continue
  1364. }
  1365. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1366. }
  1367. }
  1368. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1369. if cm == nil {
  1370. return
  1371. }
  1372. c, err := cm.Provider.GetConfig()
  1373. if err != nil {
  1374. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1375. return
  1376. }
  1377. discount, err := ParsePercentString(c.Discount)
  1378. if err != nil {
  1379. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1380. return
  1381. }
  1382. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1383. if err != nil {
  1384. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1385. return
  1386. }
  1387. for _, node := range nodeMap {
  1388. // TODO GKE Reserved Instances into account
  1389. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1390. node.CostPerCPUHr *= (1.0 - node.Discount)
  1391. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1392. }
  1393. }
  1394. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1395. for _, res := range resPVCostPerGiBHour {
  1396. cluster, err := res.GetString(env.GetPromClusterLabel())
  1397. if err != nil {
  1398. cluster = env.GetClusterID()
  1399. }
  1400. name, err := res.GetString("volumename")
  1401. if err != nil {
  1402. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PV cost without volumename")
  1403. continue
  1404. }
  1405. key := newPVKey(cluster, name)
  1406. pvMap[key] = &PV{
  1407. Cluster: cluster,
  1408. Name: name,
  1409. CostPerGiBHour: res.Values[0].Value,
  1410. }
  1411. }
  1412. }
  1413. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1414. for _, res := range resPVBytes {
  1415. key, err := resultPVKey(res, env.GetPromClusterLabel(), "persistentvolume")
  1416. if err != nil {
  1417. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1418. continue
  1419. }
  1420. if _, ok := pvMap[key]; !ok {
  1421. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1422. continue
  1423. }
  1424. pvMap[key].Bytes = res.Values[0].Value
  1425. }
  1426. }
  1427. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1428. for _, res := range resPVCInfo {
  1429. cluster, err := res.GetString(env.GetPromClusterLabel())
  1430. if err != nil {
  1431. cluster = env.GetClusterID()
  1432. }
  1433. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1434. if err != nil {
  1435. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1436. continue
  1437. }
  1438. namespace := values["namespace"]
  1439. name := values["persistentvolumeclaim"]
  1440. volume := values["volumename"]
  1441. storageClass := values["storageclass"]
  1442. pvKey := newPVKey(cluster, volume)
  1443. pvcKey := newPVCKey(cluster, namespace, name)
  1444. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1445. // the PVC was running, respectively. We subtract 1m from pvcStart
  1446. // because this point will actually represent the end of the first
  1447. // minute. We don't subtract from pvcEnd because it already represents
  1448. // the end of the last minute.
  1449. var pvcStart, pvcEnd time.Time
  1450. for _, datum := range res.Values {
  1451. t := time.Unix(int64(datum.Timestamp), 0)
  1452. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1453. pvcStart = t
  1454. }
  1455. if datum.Value > 0 && window.Contains(t) {
  1456. pvcEnd = t
  1457. }
  1458. }
  1459. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1460. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1461. }
  1462. pvcStart = pvcStart.Add(-time.Minute)
  1463. if _, ok := pvMap[pvKey]; !ok {
  1464. continue
  1465. }
  1466. pvMap[pvKey].StorageClass = storageClass
  1467. if _, ok := pvcMap[pvcKey]; !ok {
  1468. pvcMap[pvcKey] = &PVC{}
  1469. }
  1470. pvcMap[pvcKey].Name = name
  1471. pvcMap[pvcKey].Namespace = namespace
  1472. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1473. pvcMap[pvcKey].Start = pvcStart
  1474. pvcMap[pvcKey].End = pvcEnd
  1475. }
  1476. }
  1477. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1478. for _, res := range resPVCBytesRequested {
  1479. key, err := resultPVCKey(res, env.GetPromClusterLabel(), "namespace", "persistentvolumeclaim")
  1480. if err != nil {
  1481. continue
  1482. }
  1483. if _, ok := pvcMap[key]; !ok {
  1484. continue
  1485. }
  1486. pvcMap[key].Bytes = res.Values[0].Value
  1487. }
  1488. }
  1489. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1490. for _, res := range resPodPVCAllocation {
  1491. cluster, err := res.GetString(env.GetPromClusterLabel())
  1492. if err != nil {
  1493. cluster = env.GetClusterID()
  1494. }
  1495. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1496. if err != nil {
  1497. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1498. continue
  1499. }
  1500. namespace := values["namespace"]
  1501. pod := values["pod"]
  1502. name := values["persistentvolumeclaim"]
  1503. volume := values["persistentvolume"]
  1504. podKey := newPodKey(cluster, namespace, pod)
  1505. pvKey := newPVKey(cluster, volume)
  1506. pvcKey := newPVCKey(cluster, namespace, name)
  1507. if _, ok := pvMap[pvKey]; !ok {
  1508. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1509. continue
  1510. }
  1511. if _, ok := podPVCMap[podKey]; !ok {
  1512. podPVCMap[podKey] = []*PVC{}
  1513. }
  1514. pvc, ok := pvcMap[pvcKey]
  1515. if !ok {
  1516. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1517. continue
  1518. }
  1519. count := 1
  1520. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1521. count = len(pod.Allocations)
  1522. } else {
  1523. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1524. }
  1525. pvc.Count = count
  1526. pvc.Mounted = true
  1527. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1528. }
  1529. }
  1530. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1531. unmountedPVBytes := map[string]float64{}
  1532. unmountedPVCost := map[string]float64{}
  1533. for _, pv := range pvMap {
  1534. mounted := false
  1535. for _, pvc := range pvcMap {
  1536. if pvc.Volume == nil {
  1537. continue
  1538. }
  1539. if pvc.Volume == pv {
  1540. mounted = true
  1541. break
  1542. }
  1543. }
  1544. if !mounted {
  1545. gib := pv.Bytes / 1024 / 1024 / 1024
  1546. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1547. cost := pv.CostPerGiBHour * gib * hrs
  1548. unmountedPVCost[pv.Cluster] += cost
  1549. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1550. }
  1551. }
  1552. for cluster, amount := range unmountedPVCost {
  1553. container := kubecost.UnmountedSuffix
  1554. pod := kubecost.UnmountedSuffix
  1555. namespace := kubecost.UnmountedSuffix
  1556. node := ""
  1557. key := newPodKey(cluster, namespace, pod)
  1558. podMap[key] = &Pod{
  1559. Window: window.Clone(),
  1560. Start: *window.Start(),
  1561. End: *window.End(),
  1562. Key: key,
  1563. Allocations: map[string]*kubecost.Allocation{},
  1564. }
  1565. podMap[key].AppendContainer(container)
  1566. podMap[key].Allocations[container].Properties.Cluster = cluster
  1567. podMap[key].Allocations[container].Properties.Node = node
  1568. podMap[key].Allocations[container].Properties.Namespace = namespace
  1569. podMap[key].Allocations[container].Properties.Pod = pod
  1570. podMap[key].Allocations[container].Properties.Container = container
  1571. pvKey := kubecost.PVKey{
  1572. Cluster: cluster,
  1573. Name: kubecost.UnmountedSuffix,
  1574. }
  1575. unmountedPVs := kubecost.PVAllocations{
  1576. pvKey: {
  1577. ByteHours: unmountedPVBytes[cluster] * window.Minutes() / 60.0,
  1578. Cost: amount,
  1579. },
  1580. }
  1581. podMap[key].Allocations[container].PVs = podMap[key].Allocations[container].PVs.Add(unmountedPVs)
  1582. }
  1583. }
  1584. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1585. unmountedPVCBytes := map[namespaceKey]float64{}
  1586. unmountedPVCCost := map[namespaceKey]float64{}
  1587. for _, pvc := range pvcMap {
  1588. if !pvc.Mounted && pvc.Volume != nil {
  1589. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1590. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1591. hrs := pvc.Minutes() / 60.0
  1592. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1593. unmountedPVCCost[key] += cost
  1594. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1595. }
  1596. }
  1597. for key, amount := range unmountedPVCCost {
  1598. container := kubecost.UnmountedSuffix
  1599. pod := kubecost.UnmountedSuffix
  1600. namespace := key.Namespace
  1601. node := ""
  1602. cluster := key.Cluster
  1603. podKey := newPodKey(cluster, namespace, pod)
  1604. podMap[podKey] = &Pod{
  1605. Window: window.Clone(),
  1606. Start: *window.Start(),
  1607. End: *window.End(),
  1608. Key: podKey,
  1609. Allocations: map[string]*kubecost.Allocation{},
  1610. }
  1611. podMap[podKey].AppendContainer(container)
  1612. podMap[podKey].Allocations[container].Properties.Cluster = cluster
  1613. podMap[podKey].Allocations[container].Properties.Node = node
  1614. podMap[podKey].Allocations[container].Properties.Namespace = namespace
  1615. podMap[podKey].Allocations[container].Properties.Pod = pod
  1616. podMap[podKey].Allocations[container].Properties.Container = container
  1617. pvKey := kubecost.PVKey{
  1618. Cluster: cluster,
  1619. Name: kubecost.UnmountedSuffix,
  1620. }
  1621. unmountedPVs := kubecost.PVAllocations{
  1622. pvKey: {
  1623. ByteHours: unmountedPVCBytes[key] * window.Minutes() / 60.0,
  1624. Cost: amount,
  1625. },
  1626. }
  1627. podMap[podKey].Allocations[container].PVs = podMap[podKey].Allocations[container].PVs.Add(unmountedPVs)
  1628. }
  1629. }
  1630. // LB describes the start and end time of a Load Balancer along with cost
  1631. type LB struct {
  1632. TotalCost float64
  1633. Start time.Time
  1634. End time.Time
  1635. }
  1636. func getLoadBalancerCosts(resLBCost, resLBActiveMins []*prom.QueryResult, resolution time.Duration) map[serviceKey]*LB {
  1637. lbMap := make(map[serviceKey]*LB)
  1638. lbHourlyCosts := make(map[serviceKey]float64)
  1639. for _, res := range resLBCost {
  1640. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  1641. if err != nil {
  1642. continue
  1643. }
  1644. lbHourlyCosts[serviceKey] = res.Values[0].Value
  1645. }
  1646. for _, res := range resLBActiveMins {
  1647. serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
  1648. if err != nil || len(res.Values) == 0 {
  1649. continue
  1650. }
  1651. if _, ok := lbHourlyCosts[serviceKey]; !ok {
  1652. log.DedupedWarningf(10, "CostModel: failed to find hourly cost for Load Balancer: %v", serviceKey)
  1653. continue
  1654. }
  1655. s := time.Unix(int64(res.Values[0].Timestamp), 0)
  1656. // subtract resolution from start time to cover full time period
  1657. s = s.Add(-resolution)
  1658. e := time.Unix(int64(res.Values[len(res.Values)-1].Timestamp), 0)
  1659. hours := e.Sub(s).Hours()
  1660. lbMap[serviceKey] = &LB{
  1661. TotalCost: lbHourlyCosts[serviceKey] * hours,
  1662. Start: s,
  1663. End: e,
  1664. }
  1665. }
  1666. return lbMap
  1667. }
  1668. func applyLoadBalancersToPods(lbMap map[serviceKey]*LB, allocsByService map[serviceKey][]*kubecost.Allocation) {
  1669. for sKey, lb := range lbMap {
  1670. totalHours := 0.0
  1671. allocHours := make(map[*kubecost.Allocation]float64)
  1672. // Add portion of load balancing cost to each allocation
  1673. // proportional to the total number of hours allocations used the load balancer
  1674. for _, alloc := range allocsByService[sKey] {
  1675. // Determine the (start, end) of the relationship between the
  1676. // given LB and the associated Allocation so that a precise
  1677. // number of hours can be used to compute cumulative cost.
  1678. s, e := alloc.Start, alloc.End
  1679. if lb.Start.After(alloc.Start) {
  1680. s = lb.Start
  1681. }
  1682. if lb.End.Before(alloc.End) {
  1683. e = lb.End
  1684. }
  1685. hours := e.Sub(s).Hours()
  1686. // A negative number of hours signifies no overlap between the windows
  1687. if hours > 0 {
  1688. totalHours += hours
  1689. allocHours[alloc] = hours
  1690. }
  1691. }
  1692. // Distribute cost of service once total hours is calculated
  1693. for alloc, hours := range allocHours {
  1694. alloc.LoadBalancerCost += lb.TotalCost * hours / totalHours
  1695. }
  1696. }
  1697. }
  1698. // getNodePricing determines node pricing, given a key and a mapping from keys
  1699. // to their NodePricing instances, as well as the custom pricing configuration
  1700. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1701. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1702. // back on custom pricing as a default.
  1703. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1704. // Find the relevant NodePricing, if it exists. If not, substitute the
  1705. // custom NodePricing as a default.
  1706. node, ok := nodeMap[nodeKey]
  1707. if !ok || node == nil {
  1708. if nodeKey.Node != "" {
  1709. log.DedupedWarningf(5, "CostModel: failed to find node for %s", nodeKey)
  1710. }
  1711. return cm.getCustomNodePricing(false)
  1712. }
  1713. // If custom pricing is enabled and can be retrieved, override detected
  1714. // node pricing with the custom values.
  1715. customPricingConfig, err := cm.Provider.GetConfig()
  1716. if err != nil {
  1717. log.DedupedWarningf(10, "CostModel: failed to load custom pricing: %s", err)
  1718. }
  1719. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1720. return cm.getCustomNodePricing(node.Preemptible)
  1721. }
  1722. node.Source = "prometheus"
  1723. // If any of the values are NaN or zero, replace them with the custom
  1724. // values as default.
  1725. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1726. // them as strings like this?
  1727. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1728. log.DedupedWarningf(10, "CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1729. cpuCostStr := customPricingConfig.CPU
  1730. if node.Preemptible {
  1731. cpuCostStr = customPricingConfig.SpotCPU
  1732. }
  1733. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1734. if err != nil {
  1735. log.DedupedWarningf(10, "CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1736. }
  1737. node.CostPerCPUHr = costPerCPUHr
  1738. node.Source += "/customCPU"
  1739. }
  1740. if math.IsNaN(node.CostPerGPUHr) {
  1741. log.DedupedWarningf(10, "CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1742. gpuCostStr := customPricingConfig.GPU
  1743. if node.Preemptible {
  1744. gpuCostStr = customPricingConfig.SpotGPU
  1745. }
  1746. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1747. if err != nil {
  1748. log.DedupedWarningf(10, "CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1749. }
  1750. node.CostPerGPUHr = costPerGPUHr
  1751. node.Source += "/customGPU"
  1752. }
  1753. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1754. log.DedupedWarningf(10, "CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1755. ramCostStr := customPricingConfig.RAM
  1756. if node.Preemptible {
  1757. ramCostStr = customPricingConfig.SpotRAM
  1758. }
  1759. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1760. if err != nil {
  1761. log.DedupedWarningf(10, "CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1762. }
  1763. node.CostPerRAMGiBHr = costPerRAMHr
  1764. node.Source += "/customRAM"
  1765. }
  1766. return node
  1767. }
  1768. // getCustomNodePricing converts the CostModel's configured custom pricing
  1769. // values into a NodePricing instance.
  1770. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1771. customPricingConfig, err := cm.Provider.GetConfig()
  1772. if err != nil {
  1773. return nil
  1774. }
  1775. cpuCostStr := customPricingConfig.CPU
  1776. gpuCostStr := customPricingConfig.GPU
  1777. ramCostStr := customPricingConfig.RAM
  1778. if spot {
  1779. cpuCostStr = customPricingConfig.SpotCPU
  1780. gpuCostStr = customPricingConfig.SpotGPU
  1781. ramCostStr = customPricingConfig.SpotRAM
  1782. }
  1783. node := &NodePricing{Source: "custom"}
  1784. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1785. if err != nil {
  1786. log.DedupedWarningf(10, "CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1787. }
  1788. node.CostPerCPUHr = costPerCPUHr
  1789. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1790. if err != nil {
  1791. log.DedupedWarningf(10, "CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1792. }
  1793. node.CostPerGPUHr = costPerGPUHr
  1794. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1795. if err != nil {
  1796. log.DedupedWarningf(10, "CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1797. }
  1798. node.CostPerRAMGiBHr = costPerRAMHr
  1799. return node
  1800. }
  1801. // NodePricing describes the resource costs associated with a given node, as
  1802. // well as the source of the information (e.g. prometheus, custom)
  1803. type NodePricing struct {
  1804. Name string
  1805. NodeType string
  1806. ProviderID string
  1807. Preemptible bool
  1808. CostPerCPUHr float64
  1809. CostPerRAMGiBHr float64
  1810. CostPerGPUHr float64
  1811. Discount float64
  1812. Source string
  1813. }
  1814. // Pod describes a running pod's start and end time within a Window and
  1815. // all the Allocations (i.e. containers) contained within it.
  1816. type Pod struct {
  1817. Window kubecost.Window
  1818. Start time.Time
  1819. End time.Time
  1820. Key podKey
  1821. Allocations map[string]*kubecost.Allocation
  1822. }
  1823. // AppendContainer adds an entry for the given container name to the Pod.
  1824. func (p Pod) AppendContainer(container string) {
  1825. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1826. alloc := &kubecost.Allocation{
  1827. Name: name,
  1828. Properties: &kubecost.AllocationProperties{},
  1829. Window: p.Window.Clone(),
  1830. Start: p.Start,
  1831. End: p.End,
  1832. }
  1833. alloc.Properties.Container = container
  1834. alloc.Properties.Pod = p.Key.Pod
  1835. alloc.Properties.Namespace = p.Key.Namespace
  1836. alloc.Properties.Cluster = p.Key.Cluster
  1837. p.Allocations[container] = alloc
  1838. }
  1839. // PVC describes a PersistentVolumeClaim
  1840. // TODO:CLEANUP move to pkg/kubecost?
  1841. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1842. type PVC struct {
  1843. Bytes float64 `json:"bytes"`
  1844. Count int `json:"count"`
  1845. Name string `json:"name"`
  1846. Cluster string `json:"cluster"`
  1847. Namespace string `json:"namespace"`
  1848. Volume *PV `json:"persistentVolume"`
  1849. Mounted bool `json:"mounted"`
  1850. Start time.Time `json:"start"`
  1851. End time.Time `json:"end"`
  1852. }
  1853. // Cost computes the cumulative cost of the PVC
  1854. func (pvc *PVC) Cost() float64 {
  1855. if pvc == nil || pvc.Volume == nil {
  1856. return 0.0
  1857. }
  1858. gib := pvc.Bytes / 1024 / 1024 / 1024
  1859. hrs := pvc.Minutes() / 60.0
  1860. return pvc.Volume.CostPerGiBHour * gib * hrs
  1861. }
  1862. // Minutes computes the number of minutes over which the PVC is defined
  1863. func (pvc *PVC) Minutes() float64 {
  1864. if pvc == nil {
  1865. return 0.0
  1866. }
  1867. return pvc.End.Sub(pvc.Start).Minutes()
  1868. }
  1869. // String returns a string representation of the PVC
  1870. func (pvc *PVC) String() string {
  1871. if pvc == nil {
  1872. return "<nil>"
  1873. }
  1874. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  1875. }
  1876. // PV describes a PersistentVolume
  1877. // TODO:CLEANUP move to pkg/kubecost?
  1878. type PV struct {
  1879. Bytes float64 `json:"bytes"`
  1880. CostPerGiBHour float64 `json:"costPerGiBHour"`
  1881. Cluster string `json:"cluster"`
  1882. Name string `json:"name"`
  1883. StorageClass string `json:"storageClass"`
  1884. }
  1885. // String returns a string representation of the PV
  1886. func (pv *PV) String() string {
  1887. if pv == nil {
  1888. return "<nil>"
  1889. }
  1890. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  1891. }