allocation.go 69 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/env"
  10. "github.com/kubecost/cost-model/pkg/kubecost"
  11. "github.com/kubecost/cost-model/pkg/log"
  12. "github.com/kubecost/cost-model/pkg/prom"
  13. "github.com/kubecost/cost-model/pkg/util"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, cluster_id)[%s:%s]%s`
  18. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id, provider_id)`
  19. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  20. queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  21. queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  22. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  23. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  24. queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  25. // This query could be written without the recording rule
  26. // "kubecost_savings_container_cpu_usage_seconds", but we should
  27. // only do that when we're ready to incur the performance tradeoffs
  28. // with subqueries which would probably be in the world of hourly
  29. // ETL.
  30. //
  31. // See PromQL subquery documentation for a rate example:
  32. // https://prometheus.io/blog/2019/01/28/subquery-support/#examples
  33. queryFmtCPUUsageMax = `max(max_over_time(kubecost_savings_container_cpu_usage_seconds[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  34. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  35. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type, provider_id)`
  36. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, cluster_id, instance_type, provider_id)`
  37. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type, provider_id)`
  38. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
  39. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, cluster_id)[%s:%s]%s`
  40. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, cluster_id)`
  41. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, cluster_id)`
  42. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, cluster_id)`
  43. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, cluster_id)`
  44. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  45. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (cluster_id)`
  46. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  47. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (cluster_id)`
  48. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  49. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (cluster_id)`
  50. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
  51. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
  52. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
  53. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
  54. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
  55. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
  56. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
  57. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, cluster_id)`
  58. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,cluster_id)`
  59. queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s]%s)) by (namespace, service_name, cluster_id)`
  60. queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, cluster_id)[%s:%s]%s`
  61. )
  62. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  63. // for the window defined by the given start and end times. The Allocations
  64. // returned are unaggregated (i.e. down to the container level).
  65. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  66. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  67. // 2. Run and apply the results of the remaining queries to
  68. // 3. Build out AllocationSet from completed Pod map
  69. // Create a window spanning the requested query
  70. window := kubecost.NewWindow(&start, &end)
  71. // Create an empty AllocationSet. For safety, in the case of an error, we
  72. // should prefer to return this empty set with the error. (In the case of
  73. // no error, of course we populate the set and return it.)
  74. allocSet := kubecost.NewAllocationSet(start, end)
  75. // (1) Build out Pod map
  76. // Build out a map of Allocations as a mapping from pod-to-container-to-
  77. // underlying-Allocation instance, starting with (start, end) so that we
  78. // begin with minutes, from which we compute resource allocation and cost
  79. // totals from measured rate data.
  80. podMap := map[podKey]*Pod{}
  81. // clusterStarts and clusterEnds record the earliest start and latest end
  82. // times, respectively, on a cluster-basis. These are used for unmounted
  83. // PVs and other "virtual" Allocations so that minutes are maximally
  84. // accurate during start-up or spin-down of a cluster
  85. clusterStart := map[string]time.Time{}
  86. clusterEnd := map[string]time.Time{}
  87. cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
  88. // (2) Run and apply remaining queries
  89. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  90. // including handling Thanos offset
  91. durStr, offStr, err := window.DurationOffsetForPrometheus()
  92. if err != nil {
  93. // Negative duration, so return empty set
  94. return allocSet, nil
  95. }
  96. // Convert resolution duration to a query-ready string
  97. resStr := util.DurationString(resolution)
  98. ctx := prom.NewContext(cm.PrometheusClient)
  99. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr)
  100. resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
  101. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr)
  102. resChRAMRequests := ctx.Query(queryRAMRequests)
  103. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, offStr)
  104. resChRAMUsageAvg := ctx.Query(queryRAMUsageAvg)
  105. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, offStr)
  106. resChRAMUsageMax := ctx.Query(queryRAMUsageMax)
  107. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr)
  108. resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
  109. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr)
  110. resChCPURequests := ctx.Query(queryCPURequests)
  111. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, offStr)
  112. resChCPUUsageAvg := ctx.Query(queryCPUUsageAvg)
  113. queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, offStr)
  114. resChCPUUsageMax := ctx.Query(queryCPUUsageMax)
  115. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr)
  116. resChGPUsRequested := ctx.Query(queryGPUsRequested)
  117. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr)
  118. resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
  119. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr)
  120. resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
  121. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr)
  122. resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
  123. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
  124. resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
  125. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, durStr, resStr, offStr)
  126. resChPVCInfo := ctx.Query(queryPVCInfo)
  127. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr)
  128. resChPVBytes := ctx.Query(queryPVBytes)
  129. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr)
  130. resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
  131. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr)
  132. resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
  133. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr)
  134. resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
  135. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr)
  136. resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
  137. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr)
  138. resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
  139. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr)
  140. resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
  141. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr)
  142. resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
  143. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr)
  144. resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
  145. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr)
  146. resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
  147. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
  148. resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
  149. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
  150. resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
  151. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
  152. resChPodLabels := ctx.Query(queryPodLabels)
  153. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
  154. resChPodAnnotations := ctx.Query(queryPodAnnotations)
  155. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
  156. resChServiceLabels := ctx.Query(queryServiceLabels)
  157. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
  158. resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
  159. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
  160. resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
  161. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr)
  162. resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
  163. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr)
  164. resChJobLabels := ctx.Query(queryJobLabels)
  165. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, offStr)
  166. resChLBCostPerHr := ctx.Query(queryLBCostPerHr)
  167. queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, durStr, resStr, offStr)
  168. resChLBActiveMins := ctx.Query(queryLBActiveMins)
  169. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  170. resCPURequests, _ := resChCPURequests.Await()
  171. resCPUUsageAvg, _ := resChCPUUsageAvg.Await()
  172. resCPUUsageMax, _ := resChCPUUsageMax.Await()
  173. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  174. resRAMRequests, _ := resChRAMRequests.Await()
  175. resRAMUsageAvg, _ := resChRAMUsageAvg.Await()
  176. resRAMUsageMax, _ := resChRAMUsageMax.Await()
  177. resGPUsRequested, _ := resChGPUsRequested.Await()
  178. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  179. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  180. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  181. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  182. resPVBytes, _ := resChPVBytes.Await()
  183. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  184. resPVCInfo, _ := resChPVCInfo.Await()
  185. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  186. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  187. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  188. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  189. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  190. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  191. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  192. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  193. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  194. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  195. resPodLabels, _ := resChPodLabels.Await()
  196. resPodAnnotations, _ := resChPodAnnotations.Await()
  197. resServiceLabels, _ := resChServiceLabels.Await()
  198. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  199. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  200. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  201. resJobLabels, _ := resChJobLabels.Await()
  202. resLBCostPerHr, _ := resChLBCostPerHr.Await()
  203. resLBActiveMins, _ := resChLBActiveMins.Await()
  204. if ctx.HasErrors() {
  205. for _, err := range ctx.Errors() {
  206. log.Errorf("CostModel.ComputeAllocation: %s", err)
  207. }
  208. return allocSet, ctx.ErrorCollection()
  209. }
  210. // We choose to apply allocation before requests in the cases of RAM and
  211. // CPU so that we can assert that allocation should always be greater than
  212. // or equal to request.
  213. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  214. applyCPUCoresRequested(podMap, resCPURequests)
  215. applyCPUCoresUsedAvg(podMap, resCPUUsageAvg)
  216. applyCPUCoresUsedMax(podMap, resCPUUsageMax)
  217. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  218. applyRAMBytesRequested(podMap, resRAMRequests)
  219. applyRAMBytesUsedAvg(podMap, resRAMUsageAvg)
  220. applyRAMBytesUsedMax(podMap, resRAMUsageMax)
  221. applyGPUsRequested(podMap, resGPUsRequested)
  222. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  223. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  224. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  225. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  226. podLabels := resToPodLabels(resPodLabels)
  227. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  228. podAnnotations := resToPodAnnotations(resPodAnnotations)
  229. applyLabels(podMap, namespaceLabels, podLabels)
  230. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  231. serviceLabels := getServiceLabels(resServiceLabels)
  232. allocsByService := map[serviceKey][]*kubecost.Allocation{}
  233. applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
  234. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  235. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  236. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  237. podJobMap := resToPodJobMap(resJobLabels)
  238. applyControllersToPods(podMap, podDeploymentMap)
  239. applyControllersToPods(podMap, podStatefulSetMap)
  240. applyControllersToPods(podMap, podDaemonSetMap)
  241. applyControllersToPods(podMap, podJobMap)
  242. // TODO breakdown network costs?
  243. // Build out a map of Nodes with resource costs, discounts, and node types
  244. // for converting resource allocation data to cumulative costs.
  245. nodeMap := map[nodeKey]*NodePricing{}
  246. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr, cm.Provider.ParseID)
  247. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr, cm.Provider.ParseID)
  248. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr, cm.Provider.ParseID)
  249. applyNodeSpot(nodeMap, resNodeIsSpot)
  250. applyNodeDiscount(nodeMap, cm)
  251. // Build out the map of all PVs with class, size and cost-per-hour.
  252. // Note: this does not record time running, which we may want to
  253. // include later for increased PV precision. (As long as the PV has
  254. // a PVC, we get time running there, so this is only inaccurate
  255. // for short-lived, unmounted PVs.)
  256. pvMap := map[pvKey]*PV{}
  257. buildPVMap(pvMap, resPVCostPerGiBHour)
  258. applyPVBytes(pvMap, resPVBytes)
  259. // Build out the map of all PVCs with time running, bytes requested,
  260. // and connect to the correct PV from pvMap. (If no PV exists, that
  261. // is noted, but does not result in any allocation/cost.)
  262. pvcMap := map[pvcKey]*PVC{}
  263. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  264. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  265. // Build out the relationships of pods to their PVCs. This step
  266. // populates the PVC.Count field so that PVC allocation can be
  267. // split appropriately among each pod's container allocation.
  268. podPVCMap := map[podKey][]*PVC{}
  269. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  270. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  271. // cluster representing each cluster's unmounted PVs (if necessary).
  272. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  273. lbMap := getLoadBalancerCosts(resLBCostPerHr, resLBActiveMins, resolution)
  274. applyLoadBalancersToPods(lbMap, allocsByService)
  275. // (3) Build out AllocationSet from Pod map
  276. for _, pod := range podMap {
  277. for _, alloc := range pod.Allocations {
  278. cluster := alloc.Properties.Cluster
  279. nodeName := alloc.Properties.Node
  280. namespace := alloc.Properties.Namespace
  281. pod := alloc.Properties.Pod
  282. container := alloc.Properties.Container
  283. podKey := newPodKey(cluster, namespace, pod)
  284. nodeKey := newNodeKey(cluster, nodeName)
  285. node := cm.getNodePricing(nodeMap, nodeKey)
  286. alloc.Properties.ProviderID = node.ProviderID
  287. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  288. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  289. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  290. if pvcs, ok := podPVCMap[podKey]; ok {
  291. for _, pvc := range pvcs {
  292. // Determine the (start, end) of the relationship between the
  293. // given PVC and the associated Allocation so that a precise
  294. // number of hours can be used to compute cumulative cost.
  295. s, e := alloc.Start, alloc.End
  296. if pvc.Start.After(alloc.Start) {
  297. s = pvc.Start
  298. }
  299. if pvc.End.Before(alloc.End) {
  300. e = pvc.End
  301. }
  302. minutes := e.Sub(s).Minutes()
  303. hrs := minutes / 60.0
  304. count := float64(pvc.Count)
  305. if pvc.Count < 1 {
  306. count = 1
  307. }
  308. gib := pvc.Bytes / 1024 / 1024 / 1024
  309. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  310. // Apply the size and cost of the PV to the allocation, each
  311. // weighted by count (i.e. the number of containers in the pod)
  312. // record the amount of total PVBytes Hours attributable to a given PV
  313. if alloc.PVs == nil {
  314. alloc.PVs = kubecost.PVAllocations{}
  315. }
  316. pvKey := kubecost.PVKey{
  317. Cluster: pvc.Cluster,
  318. Name: pvc.Volume.Name,
  319. }
  320. alloc.PVs[pvKey] = &kubecost.PVAllocation{
  321. ByteHours: pvc.Bytes * hrs / count,
  322. Cost: cost / count,
  323. }
  324. }
  325. }
  326. // Make sure that the name is correct (node may not be present at this
  327. // point due to it missing from queryMinutes) then insert.
  328. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  329. allocSet.Set(alloc)
  330. }
  331. }
  332. return allocSet, nil
  333. }
  334. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  335. // Assumes that window is positive and closed
  336. start, end := *window.Start(), *window.End()
  337. // Convert resolution duration to a query-ready string
  338. resStr := util.DurationString(resolution)
  339. ctx := prom.NewContext(cm.PrometheusClient)
  340. // Query for (start, end) by (pod, namespace, cluster) over the given
  341. // window, using the given resolution, and if necessary in batches no
  342. // larger than the given maximum batch size. If working in batches, track
  343. // overall progress by starting with (window.start, window.start) and
  344. // querying in batches no larger than maxBatchSize from start-to-end,
  345. // folding each result set into podMap as the results come back.
  346. coverage := kubecost.NewWindow(&start, &start)
  347. numQuery := 1
  348. for coverage.End().Before(end) {
  349. // Determine the (start, end) of the current batch
  350. batchStart := *coverage.End()
  351. batchEnd := coverage.End().Add(maxBatchSize)
  352. if batchEnd.After(end) {
  353. batchEnd = end
  354. }
  355. batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
  356. var resPods []*prom.QueryResult
  357. var err error
  358. maxTries := 3
  359. numTries := 0
  360. for resPods == nil && numTries < maxTries {
  361. numTries++
  362. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  363. // including handling Thanos offset
  364. durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
  365. if err != nil || durStr == "" {
  366. // Negative duration, so set empty results and don't query
  367. resPods = []*prom.QueryResult{}
  368. err = nil
  369. break
  370. }
  371. // Submit and profile query
  372. queryPods := fmt.Sprintf(queryFmtPods, durStr, resStr, offStr)
  373. queryProfile := time.Now()
  374. resPods, err = ctx.Query(queryPods).Await()
  375. if err != nil {
  376. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  377. resPods = nil
  378. }
  379. }
  380. if err != nil {
  381. return err
  382. }
  383. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  384. coverage = coverage.ExpandEnd(batchEnd)
  385. numQuery++
  386. }
  387. return nil
  388. }
  389. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  390. for _, res := range resPods {
  391. if len(res.Values) == 0 {
  392. log.Warningf("CostModel.ComputeAllocation: empty minutes result")
  393. continue
  394. }
  395. cluster, err := res.GetString("cluster_id")
  396. if err != nil {
  397. cluster = env.GetClusterID()
  398. }
  399. labels, err := res.GetStrings("namespace", "pod")
  400. if err != nil {
  401. log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  402. continue
  403. }
  404. namespace := labels["namespace"]
  405. pod := labels["pod"]
  406. key := newPodKey(cluster, namespace, pod)
  407. // allocStart and allocEnd are the timestamps of the first and last
  408. // minutes the pod was running, respectively. We subtract one resolution
  409. // from allocStart because this point will actually represent the end
  410. // of the first minute. We don't subtract from allocEnd because it
  411. // already represents the end of the last minute.
  412. var allocStart, allocEnd time.Time
  413. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  414. for _, datum := range res.Values {
  415. t := time.Unix(int64(datum.Timestamp), 0)
  416. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  417. // Set the start timestamp to the earliest non-zero timestamp
  418. allocStart = t
  419. // Record adjustment coefficient, i.e. the portion of the start
  420. // timestamp to "ignore". That is, sometimes the value will be
  421. // 0.5, meaning that we should discount the time running by
  422. // half of the resolution the timestamp stands for.
  423. startAdjustmentCoeff = (1.0 - datum.Value)
  424. }
  425. if datum.Value > 0 && window.Contains(t) {
  426. // Set the end timestamp to the latest non-zero timestamp
  427. allocEnd = t
  428. // Record adjustment coefficient, i.e. the portion of the end
  429. // timestamp to "ignore". (See explanation above for start.)
  430. endAdjustmentCoeff = (1.0 - datum.Value)
  431. }
  432. }
  433. if allocStart.IsZero() || allocEnd.IsZero() {
  434. continue
  435. }
  436. // Adjust timestamps according to the resolution and the adjustment
  437. // coefficients, as described above. That is, count the start timestamp
  438. // from the beginning of the resolution, not the end. Then "reduce" the
  439. // start and end by the correct amount, in the case that the "running"
  440. // value of the first or last timestamp was not a full 1.0.
  441. allocStart = allocStart.Add(-resolution)
  442. // Note: the *100 and /100 are necessary because Duration is an int, so
  443. // 0.5, for instance, will be truncated, resulting in no adjustment.
  444. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  445. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  446. // If there is only one point with a value <= 0.5 that the start and
  447. // end timestamps both share, then we will enter this case because at
  448. // least half of a resolution will be subtracted from both the start
  449. // and the end. If that is the case, then add back half of each side
  450. // so that the pod is said to run for half a resolution total.
  451. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  452. // end up with allocEnd == allocStart and each coeff == 0.5. In
  453. // that case, add 0.25m to each side, resulting in 0.5m duration.
  454. if !allocEnd.After(allocStart) {
  455. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  456. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  457. }
  458. // Set start if unset or this datum's start time is earlier than the
  459. // current earliest time.
  460. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  461. clusterStart[cluster] = allocStart
  462. }
  463. // Set end if unset or this datum's end time is later than the
  464. // current latest time.
  465. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  466. clusterEnd[cluster] = allocEnd
  467. }
  468. if pod, ok := podMap[key]; ok {
  469. // Pod has already been recorded, so update it accordingly
  470. if allocStart.Before(pod.Start) {
  471. pod.Start = allocStart
  472. }
  473. if allocEnd.After(pod.End) {
  474. pod.End = allocEnd
  475. }
  476. } else {
  477. // Pod has not been recorded yet, so insert it
  478. podMap[key] = &Pod{
  479. Window: window.Clone(),
  480. Start: allocStart,
  481. End: allocEnd,
  482. Key: key,
  483. Allocations: map[string]*kubecost.Allocation{},
  484. }
  485. }
  486. }
  487. }
  488. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  489. for _, res := range resCPUCoresAllocated {
  490. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  491. if err != nil {
  492. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  493. continue
  494. }
  495. pod, ok := podMap[key]
  496. if !ok {
  497. continue
  498. }
  499. container, err := res.GetString("container")
  500. if err != nil {
  501. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  502. continue
  503. }
  504. if _, ok := pod.Allocations[container]; !ok {
  505. pod.AppendContainer(container)
  506. }
  507. cpuCores := res.Values[0].Value
  508. hours := pod.Allocations[container].Minutes() / 60.0
  509. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  510. node, err := res.GetString("node")
  511. if err != nil {
  512. log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  513. continue
  514. }
  515. pod.Allocations[container].Properties.Node = node
  516. }
  517. }
  518. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  519. for _, res := range resCPUCoresRequested {
  520. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  521. if err != nil {
  522. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  523. continue
  524. }
  525. pod, ok := podMap[key]
  526. if !ok {
  527. continue
  528. }
  529. container, err := res.GetString("container")
  530. if err != nil {
  531. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  532. continue
  533. }
  534. if _, ok := pod.Allocations[container]; !ok {
  535. pod.AppendContainer(container)
  536. }
  537. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  538. // If CPU allocation is less than requests, set CPUCoreHours to
  539. // request level.
  540. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  541. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  542. }
  543. node, err := res.GetString("node")
  544. if err != nil {
  545. log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  546. continue
  547. }
  548. pod.Allocations[container].Properties.Node = node
  549. }
  550. }
  551. func applyCPUCoresUsedAvg(podMap map[podKey]*Pod, resCPUCoresUsedAvg []*prom.QueryResult) {
  552. for _, res := range resCPUCoresUsedAvg {
  553. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  554. if err != nil {
  555. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg result missing field: %s", err)
  556. continue
  557. }
  558. pod, ok := podMap[key]
  559. if !ok {
  560. continue
  561. }
  562. container, err := res.GetString("container_name")
  563. if err != nil {
  564. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg query result missing 'container': %s", key)
  565. continue
  566. }
  567. if _, ok := pod.Allocations[container]; !ok {
  568. pod.AppendContainer(container)
  569. }
  570. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  571. }
  572. }
  573. func applyCPUCoresUsedMax(podMap map[podKey]*Pod, resCPUCoresUsedMax []*prom.QueryResult) {
  574. for _, res := range resCPUCoresUsedMax {
  575. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  576. if err != nil {
  577. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max result missing field: %s", err)
  578. continue
  579. }
  580. pod, ok := podMap[key]
  581. if !ok {
  582. continue
  583. }
  584. container, err := res.GetString("container_name")
  585. if err != nil {
  586. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max query result missing 'container': %s", key)
  587. continue
  588. }
  589. if _, ok := pod.Allocations[container]; !ok {
  590. pod.AppendContainer(container)
  591. }
  592. if pod.Allocations[container].RawAllocationOnly == nil {
  593. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  594. CPUCoreUsageMax: res.Values[0].Value,
  595. }
  596. } else {
  597. pod.Allocations[container].RawAllocationOnly.CPUCoreUsageMax = res.Values[0].Value
  598. }
  599. }
  600. }
  601. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  602. for _, res := range resRAMBytesAllocated {
  603. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  604. if err != nil {
  605. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  606. continue
  607. }
  608. pod, ok := podMap[key]
  609. if !ok {
  610. continue
  611. }
  612. container, err := res.GetString("container")
  613. if err != nil {
  614. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  615. continue
  616. }
  617. if _, ok := pod.Allocations[container]; !ok {
  618. pod.AppendContainer(container)
  619. }
  620. ramBytes := res.Values[0].Value
  621. hours := pod.Allocations[container].Minutes() / 60.0
  622. pod.Allocations[container].RAMByteHours = ramBytes * hours
  623. node, err := res.GetString("node")
  624. if err != nil {
  625. log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  626. continue
  627. }
  628. pod.Allocations[container].Properties.Node = node
  629. }
  630. }
  631. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  632. for _, res := range resRAMBytesRequested {
  633. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  634. if err != nil {
  635. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  636. continue
  637. }
  638. pod, ok := podMap[key]
  639. if !ok {
  640. continue
  641. }
  642. container, err := res.GetString("container")
  643. if err != nil {
  644. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  645. continue
  646. }
  647. if _, ok := pod.Allocations[container]; !ok {
  648. pod.AppendContainer(container)
  649. }
  650. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  651. // If RAM allocation is less than requests, set RAMByteHours to
  652. // request level.
  653. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  654. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  655. }
  656. node, err := res.GetString("node")
  657. if err != nil {
  658. log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  659. continue
  660. }
  661. pod.Allocations[container].Properties.Node = node
  662. }
  663. }
  664. func applyRAMBytesUsedAvg(podMap map[podKey]*Pod, resRAMBytesUsedAvg []*prom.QueryResult) {
  665. for _, res := range resRAMBytesUsedAvg {
  666. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  667. if err != nil {
  668. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM avg usage result missing field: %s", err)
  669. continue
  670. }
  671. pod, ok := podMap[key]
  672. if !ok {
  673. continue
  674. }
  675. container, err := res.GetString("container_name")
  676. if err != nil {
  677. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage avg query result missing 'container': %s", key)
  678. continue
  679. }
  680. if _, ok := pod.Allocations[container]; !ok {
  681. pod.AppendContainer(container)
  682. }
  683. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  684. }
  685. }
  686. func applyRAMBytesUsedMax(podMap map[podKey]*Pod, resRAMBytesUsedMax []*prom.QueryResult) {
  687. for _, res := range resRAMBytesUsedMax {
  688. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  689. if err != nil {
  690. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max result missing field: %s", err)
  691. continue
  692. }
  693. pod, ok := podMap[key]
  694. if !ok {
  695. continue
  696. }
  697. container, err := res.GetString("container_name")
  698. if err != nil {
  699. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max query result missing 'container': %s", key)
  700. continue
  701. }
  702. if _, ok := pod.Allocations[container]; !ok {
  703. pod.AppendContainer(container)
  704. }
  705. if pod.Allocations[container].RawAllocationOnly == nil {
  706. pod.Allocations[container].RawAllocationOnly = &kubecost.RawAllocationOnlyData{
  707. RAMBytesUsageMax: res.Values[0].Value,
  708. }
  709. } else {
  710. pod.Allocations[container].RawAllocationOnly.RAMBytesUsageMax = res.Values[0].Value
  711. }
  712. }
  713. }
  714. func applyGPUsRequested(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult) {
  715. for _, res := range resGPUsRequested {
  716. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  717. if err != nil {
  718. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  719. continue
  720. }
  721. pod, ok := podMap[key]
  722. if !ok {
  723. continue
  724. }
  725. container, err := res.GetString("container")
  726. if err != nil {
  727. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  728. continue
  729. }
  730. if _, ok := pod.Allocations[container]; !ok {
  731. pod.AppendContainer(container)
  732. }
  733. hrs := pod.Allocations[container].Minutes() / 60.0
  734. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  735. }
  736. }
  737. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  738. costPerGiBByCluster := map[string]float64{}
  739. for _, res := range resNetworkCostPerGiB {
  740. cluster, err := res.GetString("cluster_id")
  741. if err != nil {
  742. cluster = env.GetClusterID()
  743. }
  744. costPerGiBByCluster[cluster] = res.Values[0].Value
  745. }
  746. for _, res := range resNetworkGiB {
  747. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  748. if err != nil {
  749. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  750. continue
  751. }
  752. pod, ok := podMap[podKey]
  753. if !ok {
  754. continue
  755. }
  756. for _, alloc := range pod.Allocations {
  757. gib := res.Values[0].Value / float64(len(pod.Allocations))
  758. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  759. alloc.NetworkCost = gib * costPerGiB
  760. }
  761. }
  762. }
  763. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
  764. namespaceLabels := map[namespaceKey]map[string]string{}
  765. for _, res := range resNamespaceLabels {
  766. nsKey, err := resultNamespaceKey(res, "cluster_id", "namespace")
  767. if err != nil {
  768. continue
  769. }
  770. if _, ok := namespaceLabels[nsKey]; !ok {
  771. namespaceLabels[nsKey] = map[string]string{}
  772. }
  773. for k, l := range res.GetLabels() {
  774. namespaceLabels[nsKey][k] = l
  775. }
  776. }
  777. return namespaceLabels
  778. }
  779. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  780. podLabels := map[podKey]map[string]string{}
  781. for _, res := range resPodLabels {
  782. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  783. if err != nil {
  784. continue
  785. }
  786. if _, ok := podLabels[podKey]; !ok {
  787. podLabels[podKey] = map[string]string{}
  788. }
  789. for k, l := range res.GetLabels() {
  790. podLabels[podKey][k] = l
  791. }
  792. }
  793. return podLabels
  794. }
  795. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  796. namespaceAnnotations := map[string]map[string]string{}
  797. for _, res := range resNamespaceAnnotations {
  798. namespace, err := res.GetString("namespace")
  799. if err != nil {
  800. continue
  801. }
  802. if _, ok := namespaceAnnotations[namespace]; !ok {
  803. namespaceAnnotations[namespace] = map[string]string{}
  804. }
  805. for k, l := range res.GetAnnotations() {
  806. namespaceAnnotations[namespace][k] = l
  807. }
  808. }
  809. return namespaceAnnotations
  810. }
  811. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  812. podAnnotations := map[podKey]map[string]string{}
  813. for _, res := range resPodAnnotations {
  814. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  815. if err != nil {
  816. continue
  817. }
  818. if _, ok := podAnnotations[podKey]; !ok {
  819. podAnnotations[podKey] = map[string]string{}
  820. }
  821. for k, l := range res.GetAnnotations() {
  822. podAnnotations[podKey][k] = l
  823. }
  824. }
  825. return podAnnotations
  826. }
  827. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
  828. for podKey, pod := range podMap {
  829. for _, alloc := range pod.Allocations {
  830. allocLabels := alloc.Properties.Labels
  831. if allocLabels == nil {
  832. allocLabels = make(map[string]string)
  833. }
  834. // Apply namespace labels first, then pod labels so that pod labels
  835. // overwrite namespace labels.
  836. nsKey := newNamespaceKey(podKey.Cluster, podKey.Namespace)
  837. if labels, ok := namespaceLabels[nsKey]; ok {
  838. for k, v := range labels {
  839. allocLabels[k] = v
  840. }
  841. }
  842. if labels, ok := podLabels[podKey]; ok {
  843. for k, v := range labels {
  844. allocLabels[k] = v
  845. }
  846. }
  847. alloc.Properties.Labels = allocLabels
  848. }
  849. }
  850. }
  851. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  852. for key, pod := range podMap {
  853. for _, alloc := range pod.Allocations {
  854. allocAnnotations := alloc.Properties.Annotations
  855. if allocAnnotations == nil {
  856. allocAnnotations = make(map[string]string)
  857. }
  858. // Apply namespace annotations first, then pod annotations so that
  859. // pod labels overwrite namespace labels.
  860. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  861. for k, v := range labels {
  862. allocAnnotations[k] = v
  863. }
  864. }
  865. if labels, ok := podAnnotations[key]; ok {
  866. for k, v := range labels {
  867. allocAnnotations[k] = v
  868. }
  869. }
  870. alloc.Properties.Annotations = allocAnnotations
  871. }
  872. }
  873. }
  874. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  875. serviceLabels := map[serviceKey]map[string]string{}
  876. for _, res := range resServiceLabels {
  877. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service")
  878. if err != nil {
  879. continue
  880. }
  881. if _, ok := serviceLabels[serviceKey]; !ok {
  882. serviceLabels[serviceKey] = map[string]string{}
  883. }
  884. for k, l := range res.GetLabels() {
  885. serviceLabels[serviceKey][k] = l
  886. }
  887. }
  888. // Prune duplicate services. That is, if the same service exists with
  889. // hyphens instead of underscores, keep the one that uses hyphens.
  890. for key := range serviceLabels {
  891. if strings.Contains(key.Service, "_") {
  892. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  893. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  894. if _, ok := serviceLabels[duplicateKey]; ok {
  895. delete(serviceLabels, key)
  896. }
  897. }
  898. }
  899. return serviceLabels
  900. }
  901. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  902. deploymentLabels := map[controllerKey]map[string]string{}
  903. for _, res := range resDeploymentLabels {
  904. controllerKey, err := resultDeploymentKey(res, "cluster_id", "namespace", "deployment")
  905. if err != nil {
  906. continue
  907. }
  908. if _, ok := deploymentLabels[controllerKey]; !ok {
  909. deploymentLabels[controllerKey] = map[string]string{}
  910. }
  911. for k, l := range res.GetLabels() {
  912. deploymentLabels[controllerKey][k] = l
  913. }
  914. }
  915. // Prune duplicate deployments. That is, if the same deployment exists with
  916. // hyphens instead of underscores, keep the one that uses hyphens.
  917. for key := range deploymentLabels {
  918. if strings.Contains(key.Controller, "_") {
  919. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  920. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  921. if _, ok := deploymentLabels[duplicateKey]; ok {
  922. delete(deploymentLabels, key)
  923. }
  924. }
  925. }
  926. return deploymentLabels
  927. }
  928. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  929. statefulSetLabels := map[controllerKey]map[string]string{}
  930. for _, res := range resStatefulSetLabels {
  931. controllerKey, err := resultStatefulSetKey(res, "cluster_id", "namespace", "statefulSet")
  932. if err != nil {
  933. continue
  934. }
  935. if _, ok := statefulSetLabels[controllerKey]; !ok {
  936. statefulSetLabels[controllerKey] = map[string]string{}
  937. }
  938. for k, l := range res.GetLabels() {
  939. statefulSetLabels[controllerKey][k] = l
  940. }
  941. }
  942. // Prune duplicate stateful sets. That is, if the same stateful set exists
  943. // with hyphens instead of underscores, keep the one that uses hyphens.
  944. for key := range statefulSetLabels {
  945. if strings.Contains(key.Controller, "_") {
  946. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  947. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  948. if _, ok := statefulSetLabels[duplicateKey]; ok {
  949. delete(statefulSetLabels, key)
  950. }
  951. }
  952. }
  953. return statefulSetLabels
  954. }
  955. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  956. podControllerMap := map[podKey]controllerKey{}
  957. // For each controller, turn the labels into a selector and attempt to
  958. // match it with each set of pod labels. A match indicates that the pod
  959. // belongs to the controller.
  960. for cKey, cLabels := range controllerLabels {
  961. selector := labels.Set(cLabels).AsSelectorPreValidated()
  962. for pKey, pLabels := range podLabels {
  963. // If the pod is in a different cluster or namespace, there is
  964. // no need to compare the labels.
  965. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  966. continue
  967. }
  968. podLabelSet := labels.Set(pLabels)
  969. if selector.Matches(podLabelSet) {
  970. if _, ok := podControllerMap[pKey]; ok {
  971. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  972. }
  973. podControllerMap[pKey] = cKey
  974. }
  975. }
  976. }
  977. return podControllerMap
  978. }
  979. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  980. daemonSetLabels := map[podKey]controllerKey{}
  981. for _, res := range resDaemonSetLabels {
  982. controllerKey, err := resultDaemonSetKey(res, "cluster_id", "namespace", "owner_name")
  983. if err != nil {
  984. continue
  985. }
  986. pod, err := res.GetString("pod")
  987. if err != nil {
  988. log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  989. }
  990. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  991. daemonSetLabels[podKey] = controllerKey
  992. }
  993. return daemonSetLabels
  994. }
  995. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  996. jobLabels := map[podKey]controllerKey{}
  997. for _, res := range resJobLabels {
  998. controllerKey, err := resultJobKey(res, "cluster_id", "namespace", "owner_name")
  999. if err != nil {
  1000. continue
  1001. }
  1002. // Convert the name of Jobs generated by CronJobs to the name of the
  1003. // CronJob by stripping the timestamp off the end.
  1004. match := isCron.FindStringSubmatch(controllerKey.Controller)
  1005. if match != nil {
  1006. controllerKey.Controller = match[1]
  1007. }
  1008. pod, err := res.GetString("pod")
  1009. if err != nil {
  1010. log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  1011. }
  1012. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  1013. jobLabels[podKey] = controllerKey
  1014. }
  1015. return jobLabels
  1016. }
  1017. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, allocsByService map[serviceKey][]*kubecost.Allocation, serviceLabels map[serviceKey]map[string]string) {
  1018. podServicesMap := map[podKey][]serviceKey{}
  1019. // For each service, turn the labels into a selector and attempt to
  1020. // match it with each set of pod labels. A match indicates that the pod
  1021. // belongs to the service.
  1022. for sKey, sLabels := range serviceLabels {
  1023. selector := labels.Set(sLabels).AsSelectorPreValidated()
  1024. for pKey, pLabels := range podLabels {
  1025. // If the pod is in a different cluster or namespace, there is
  1026. // no need to compare the labels.
  1027. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  1028. continue
  1029. }
  1030. podLabelSet := labels.Set(pLabels)
  1031. if selector.Matches(podLabelSet) {
  1032. if _, ok := podServicesMap[pKey]; !ok {
  1033. podServicesMap[pKey] = []serviceKey{}
  1034. }
  1035. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  1036. }
  1037. }
  1038. }
  1039. // For each allocation in each pod, attempt to find and apply the list of
  1040. // services associated with the allocation's pod.
  1041. for key, pod := range podMap {
  1042. for _, alloc := range pod.Allocations {
  1043. if sKeys, ok := podServicesMap[key]; ok {
  1044. services := []string{}
  1045. for _, sKey := range sKeys {
  1046. services = append(services, sKey.Service)
  1047. allocsByService[sKey] = append(allocsByService[sKey], alloc)
  1048. }
  1049. alloc.Properties.Services = services
  1050. }
  1051. }
  1052. }
  1053. }
  1054. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  1055. for key, pod := range podMap {
  1056. for _, alloc := range pod.Allocations {
  1057. if controllerKey, ok := podControllerMap[key]; ok {
  1058. alloc.Properties.ControllerKind = controllerKey.ControllerKind
  1059. alloc.Properties.Controller = controllerKey.Controller
  1060. }
  1061. }
  1062. }
  1063. }
  1064. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult,
  1065. providerIDParser func(string) string) {
  1066. for _, res := range resNodeCostPerCPUHr {
  1067. cluster, err := res.GetString("cluster_id")
  1068. if err != nil {
  1069. cluster = env.GetClusterID()
  1070. }
  1071. node, err := res.GetString("node")
  1072. if err != nil {
  1073. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1074. continue
  1075. }
  1076. instanceType, err := res.GetString("instance_type")
  1077. if err != nil {
  1078. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1079. continue
  1080. }
  1081. providerID, err := res.GetString("provider_id")
  1082. if err != nil {
  1083. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  1084. continue
  1085. }
  1086. key := newNodeKey(cluster, node)
  1087. if _, ok := nodeMap[key]; !ok {
  1088. nodeMap[key] = &NodePricing{
  1089. Name: node,
  1090. NodeType: instanceType,
  1091. ProviderID: providerIDParser(providerID),
  1092. }
  1093. }
  1094. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  1095. }
  1096. }
  1097. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult,
  1098. providerIDParser func(string) string) {
  1099. for _, res := range resNodeCostPerRAMGiBHr {
  1100. cluster, err := res.GetString("cluster_id")
  1101. if err != nil {
  1102. cluster = env.GetClusterID()
  1103. }
  1104. node, err := res.GetString("node")
  1105. if err != nil {
  1106. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1107. continue
  1108. }
  1109. instanceType, err := res.GetString("instance_type")
  1110. if err != nil {
  1111. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1112. continue
  1113. }
  1114. providerID, err := res.GetString("provider_id")
  1115. if err != nil {
  1116. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1117. continue
  1118. }
  1119. key := newNodeKey(cluster, node)
  1120. if _, ok := nodeMap[key]; !ok {
  1121. nodeMap[key] = &NodePricing{
  1122. Name: node,
  1123. NodeType: instanceType,
  1124. ProviderID: providerIDParser(providerID),
  1125. }
  1126. }
  1127. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1128. }
  1129. }
  1130. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult,
  1131. providerIDParser func(string) string) {
  1132. for _, res := range resNodeCostPerGPUHr {
  1133. cluster, err := res.GetString("cluster_id")
  1134. if err != nil {
  1135. cluster = env.GetClusterID()
  1136. }
  1137. node, err := res.GetString("node")
  1138. if err != nil {
  1139. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1140. continue
  1141. }
  1142. instanceType, err := res.GetString("instance_type")
  1143. if err != nil {
  1144. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1145. continue
  1146. }
  1147. providerID, err := res.GetString("provider_id")
  1148. if err != nil {
  1149. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1150. continue
  1151. }
  1152. key := newNodeKey(cluster, node)
  1153. if _, ok := nodeMap[key]; !ok {
  1154. nodeMap[key] = &NodePricing{
  1155. Name: node,
  1156. NodeType: instanceType,
  1157. ProviderID: providerIDParser(providerID),
  1158. }
  1159. }
  1160. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1161. }
  1162. }
  1163. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1164. for _, res := range resNodeIsSpot {
  1165. cluster, err := res.GetString("cluster_id")
  1166. if err != nil {
  1167. cluster = env.GetClusterID()
  1168. }
  1169. node, err := res.GetString("node")
  1170. if err != nil {
  1171. log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1172. continue
  1173. }
  1174. key := newNodeKey(cluster, node)
  1175. if _, ok := nodeMap[key]; !ok {
  1176. log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1177. continue
  1178. }
  1179. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1180. }
  1181. }
  1182. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1183. if cm == nil {
  1184. return
  1185. }
  1186. c, err := cm.Provider.GetConfig()
  1187. if err != nil {
  1188. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1189. return
  1190. }
  1191. discount, err := ParsePercentString(c.Discount)
  1192. if err != nil {
  1193. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1194. return
  1195. }
  1196. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1197. if err != nil {
  1198. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1199. return
  1200. }
  1201. for _, node := range nodeMap {
  1202. // TODO GKE Reserved Instances into account
  1203. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1204. node.CostPerCPUHr *= (1.0 - node.Discount)
  1205. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1206. }
  1207. }
  1208. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1209. for _, res := range resPVCostPerGiBHour {
  1210. cluster, err := res.GetString("cluster_id")
  1211. if err != nil {
  1212. cluster = env.GetClusterID()
  1213. }
  1214. name, err := res.GetString("volumename")
  1215. if err != nil {
  1216. log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
  1217. continue
  1218. }
  1219. key := newPVKey(cluster, name)
  1220. pvMap[key] = &PV{
  1221. Cluster: cluster,
  1222. Name: name,
  1223. CostPerGiBHour: res.Values[0].Value,
  1224. }
  1225. }
  1226. }
  1227. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1228. for _, res := range resPVBytes {
  1229. key, err := resultPVKey(res, "cluster_id", "persistentvolume")
  1230. if err != nil {
  1231. log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1232. continue
  1233. }
  1234. if _, ok := pvMap[key]; !ok {
  1235. log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1236. continue
  1237. }
  1238. pvMap[key].Bytes = res.Values[0].Value
  1239. }
  1240. }
  1241. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1242. for _, res := range resPVCInfo {
  1243. cluster, err := res.GetString("cluster_id")
  1244. if err != nil {
  1245. cluster = env.GetClusterID()
  1246. }
  1247. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1248. if err != nil {
  1249. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1250. continue
  1251. }
  1252. namespace := values["namespace"]
  1253. name := values["persistentvolumeclaim"]
  1254. volume := values["volumename"]
  1255. storageClass := values["storageclass"]
  1256. pvKey := newPVKey(cluster, volume)
  1257. pvcKey := newPVCKey(cluster, namespace, name)
  1258. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1259. // the PVC was running, respectively. We subtract 1m from pvcStart
  1260. // because this point will actually represent the end of the first
  1261. // minute. We don't subtract from pvcEnd because it already represents
  1262. // the end of the last minute.
  1263. var pvcStart, pvcEnd time.Time
  1264. for _, datum := range res.Values {
  1265. t := time.Unix(int64(datum.Timestamp), 0)
  1266. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1267. pvcStart = t
  1268. }
  1269. if datum.Value > 0 && window.Contains(t) {
  1270. pvcEnd = t
  1271. }
  1272. }
  1273. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1274. log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1275. }
  1276. pvcStart = pvcStart.Add(-time.Minute)
  1277. if _, ok := pvMap[pvKey]; !ok {
  1278. continue
  1279. }
  1280. pvMap[pvKey].StorageClass = storageClass
  1281. if _, ok := pvcMap[pvcKey]; !ok {
  1282. pvcMap[pvcKey] = &PVC{}
  1283. }
  1284. pvcMap[pvcKey].Name = name
  1285. pvcMap[pvcKey].Namespace = namespace
  1286. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1287. pvcMap[pvcKey].Start = pvcStart
  1288. pvcMap[pvcKey].End = pvcEnd
  1289. }
  1290. }
  1291. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1292. for _, res := range resPVCBytesRequested {
  1293. key, err := resultPVCKey(res, "cluster_id", "namespace", "persistentvolumeclaim")
  1294. if err != nil {
  1295. continue
  1296. }
  1297. if _, ok := pvcMap[key]; !ok {
  1298. continue
  1299. }
  1300. pvcMap[key].Bytes = res.Values[0].Value
  1301. }
  1302. }
  1303. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1304. for _, res := range resPodPVCAllocation {
  1305. cluster, err := res.GetString("cluster_id")
  1306. if err != nil {
  1307. cluster = env.GetClusterID()
  1308. }
  1309. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1310. if err != nil {
  1311. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1312. continue
  1313. }
  1314. namespace := values["namespace"]
  1315. pod := values["pod"]
  1316. name := values["persistentvolumeclaim"]
  1317. volume := values["persistentvolume"]
  1318. podKey := newPodKey(cluster, namespace, pod)
  1319. pvKey := newPVKey(cluster, volume)
  1320. pvcKey := newPVCKey(cluster, namespace, name)
  1321. if _, ok := pvMap[pvKey]; !ok {
  1322. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1323. continue
  1324. }
  1325. if _, ok := podPVCMap[podKey]; !ok {
  1326. podPVCMap[podKey] = []*PVC{}
  1327. }
  1328. pvc, ok := pvcMap[pvcKey]
  1329. if !ok {
  1330. log.DedupedWarningf(5, "CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1331. continue
  1332. }
  1333. count := 1
  1334. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1335. count = len(pod.Allocations)
  1336. } else {
  1337. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1338. }
  1339. pvc.Count = count
  1340. pvc.Mounted = true
  1341. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1342. }
  1343. }
  1344. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1345. unmountedPVBytes := map[string]float64{}
  1346. unmountedPVCost := map[string]float64{}
  1347. for _, pv := range pvMap {
  1348. mounted := false
  1349. for _, pvc := range pvcMap {
  1350. if pvc.Volume == nil {
  1351. continue
  1352. }
  1353. if pvc.Volume == pv {
  1354. mounted = true
  1355. break
  1356. }
  1357. }
  1358. if !mounted {
  1359. gib := pv.Bytes / 1024 / 1024 / 1024
  1360. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1361. cost := pv.CostPerGiBHour * gib * hrs
  1362. unmountedPVCost[pv.Cluster] += cost
  1363. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1364. }
  1365. }
  1366. for cluster, amount := range unmountedPVCost {
  1367. container := kubecost.UnmountedSuffix
  1368. pod := kubecost.UnmountedSuffix
  1369. namespace := kubecost.UnmountedSuffix
  1370. node := ""
  1371. key := newPodKey(cluster, namespace, pod)
  1372. podMap[key] = &Pod{
  1373. Window: window.Clone(),
  1374. Start: *window.Start(),
  1375. End: *window.End(),
  1376. Key: key,
  1377. Allocations: map[string]*kubecost.Allocation{},
  1378. }
  1379. podMap[key].AppendContainer(container)
  1380. podMap[key].Allocations[container].Properties.Cluster = cluster
  1381. podMap[key].Allocations[container].Properties.Node = node
  1382. podMap[key].Allocations[container].Properties.Namespace = namespace
  1383. podMap[key].Allocations[container].Properties.Pod = pod
  1384. podMap[key].Allocations[container].Properties.Container = container
  1385. pvKey := kubecost.PVKey{
  1386. Cluster: cluster,
  1387. Name: kubecost.UnmountedSuffix,
  1388. }
  1389. unmountedBreakDown := kubecost.PVAllocations{
  1390. pvKey: {
  1391. ByteHours: unmountedPVBytes[cluster] * window.Minutes() / 60.0,
  1392. Cost: amount,
  1393. },
  1394. }
  1395. podMap[key].Allocations[container].PVs = podMap[key].Allocations[container].PVs.Add(unmountedBreakDown)
  1396. }
  1397. }
  1398. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1399. unmountedPVCBytes := map[namespaceKey]float64{}
  1400. unmountedPVCCost := map[namespaceKey]float64{}
  1401. for _, pvc := range pvcMap {
  1402. if !pvc.Mounted && pvc.Volume != nil {
  1403. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1404. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1405. hrs := pvc.Minutes() / 60.0
  1406. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1407. unmountedPVCCost[key] += cost
  1408. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1409. }
  1410. }
  1411. for key, amount := range unmountedPVCCost {
  1412. container := kubecost.UnmountedSuffix
  1413. pod := kubecost.UnmountedSuffix
  1414. namespace := key.Namespace
  1415. node := ""
  1416. cluster := key.Cluster
  1417. podKey := newPodKey(cluster, namespace, pod)
  1418. podMap[podKey] = &Pod{
  1419. Window: window.Clone(),
  1420. Start: *window.Start(),
  1421. End: *window.End(),
  1422. Key: podKey,
  1423. Allocations: map[string]*kubecost.Allocation{},
  1424. }
  1425. podMap[podKey].AppendContainer(container)
  1426. podMap[podKey].Allocations[container].Properties.Cluster = cluster
  1427. podMap[podKey].Allocations[container].Properties.Node = node
  1428. podMap[podKey].Allocations[container].Properties.Namespace = namespace
  1429. podMap[podKey].Allocations[container].Properties.Pod = pod
  1430. podMap[podKey].Allocations[container].Properties.Container = container
  1431. pvKey := kubecost.PVKey{
  1432. Cluster: cluster,
  1433. Name: kubecost.UnmountedSuffix,
  1434. }
  1435. unmountedBreakDown := kubecost.PVAllocations{
  1436. pvKey: {
  1437. ByteHours: unmountedPVCBytes[key] * window.Minutes() / 60.0,
  1438. Cost: amount,
  1439. },
  1440. }
  1441. podMap[podKey].Allocations[container].PVs = podMap[podKey].Allocations[container].PVs.Add(unmountedBreakDown)
  1442. }
  1443. }
  1444. // LB describes the start and end time of a Load Balancer along with cost
  1445. type LB struct {
  1446. TotalCost float64
  1447. Start time.Time
  1448. End time.Time
  1449. }
  1450. func getLoadBalancerCosts(resLBCost, resLBActiveMins []*prom.QueryResult, resolution time.Duration) map[serviceKey]*LB {
  1451. lbMap := make(map[serviceKey]*LB)
  1452. lbHourlyCosts := make(map[serviceKey]float64)
  1453. for _, res := range resLBCost {
  1454. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service_name")
  1455. if err != nil {
  1456. continue
  1457. }
  1458. lbHourlyCosts[serviceKey] = res.Values[0].Value
  1459. }
  1460. for _, res := range resLBActiveMins {
  1461. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service_name")
  1462. if err != nil || len(res.Values) == 0 {
  1463. continue
  1464. }
  1465. if _, ok := lbHourlyCosts[serviceKey]; !ok {
  1466. log.Warningf("CostModel: failed to find hourly cost for Load Balancer: %v", serviceKey)
  1467. continue
  1468. }
  1469. s := time.Unix(int64(res.Values[0].Timestamp), 0)
  1470. // subtract resolution from start time to cover full time period
  1471. s = s.Add(-resolution)
  1472. e := time.Unix(int64(res.Values[len(res.Values)-1].Timestamp), 0)
  1473. hours := e.Sub(s).Hours()
  1474. lbMap[serviceKey] = &LB{
  1475. TotalCost: lbHourlyCosts[serviceKey] * hours,
  1476. Start: s,
  1477. End: e,
  1478. }
  1479. }
  1480. return lbMap
  1481. }
  1482. func applyLoadBalancersToPods(lbMap map[serviceKey]*LB, allocsByService map[serviceKey][]*kubecost.Allocation) {
  1483. for sKey, lb := range lbMap {
  1484. totalHours := 0.0
  1485. allocHours := make(map[*kubecost.Allocation]float64)
  1486. // Add portion of load balancing cost to each allocation
  1487. // proportional to the total number of hours allocations used the load balancer
  1488. for _, alloc := range allocsByService[sKey] {
  1489. // Determine the (start, end) of the relationship between the
  1490. // given LB and the associated Allocation so that a precise
  1491. // number of hours can be used to compute cumulative cost.
  1492. s, e := alloc.Start, alloc.End
  1493. if lb.Start.After(alloc.Start) {
  1494. s = lb.Start
  1495. }
  1496. if lb.End.Before(alloc.End) {
  1497. e = lb.End
  1498. }
  1499. hours := e.Sub(s).Hours()
  1500. // A negative number of hours signifies no overlap between the windows
  1501. if hours > 0 {
  1502. totalHours += hours
  1503. allocHours[alloc] = hours
  1504. }
  1505. }
  1506. // Distribute cost of service once total hours is calculated
  1507. for alloc, hours := range allocHours {
  1508. alloc.LoadBalancerCost += lb.TotalCost * hours / totalHours
  1509. }
  1510. }
  1511. }
  1512. // getNodePricing determines node pricing, given a key and a mapping from keys
  1513. // to their NodePricing instances, as well as the custom pricing configuration
  1514. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1515. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1516. // back on custom pricing as a default.
  1517. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1518. // Find the relevant NodePricing, if it exists. If not, substitute the
  1519. // custom NodePricing as a default.
  1520. node, ok := nodeMap[nodeKey]
  1521. if !ok || node == nil {
  1522. if nodeKey.Node != "" {
  1523. log.DedupedWarningf(5, "CostModel: failed to find node for %s", nodeKey)
  1524. }
  1525. return cm.getCustomNodePricing(false)
  1526. }
  1527. // If custom pricing is enabled and can be retrieved, override detected
  1528. // node pricing with the custom values.
  1529. customPricingConfig, err := cm.Provider.GetConfig()
  1530. if err != nil {
  1531. log.Warningf("CostModel: failed to load custom pricing: %s", err)
  1532. }
  1533. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1534. return cm.getCustomNodePricing(node.Preemptible)
  1535. }
  1536. node.Source = "prometheus"
  1537. // If any of the values are NaN or zero, replace them with the custom
  1538. // values as default.
  1539. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1540. // them as strings like this?
  1541. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1542. log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1543. cpuCostStr := customPricingConfig.CPU
  1544. if node.Preemptible {
  1545. cpuCostStr = customPricingConfig.SpotCPU
  1546. }
  1547. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1548. if err != nil {
  1549. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1550. }
  1551. node.CostPerCPUHr = costPerCPUHr
  1552. node.Source += "/customCPU"
  1553. }
  1554. if math.IsNaN(node.CostPerGPUHr) {
  1555. log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1556. gpuCostStr := customPricingConfig.GPU
  1557. if node.Preemptible {
  1558. gpuCostStr = customPricingConfig.SpotGPU
  1559. }
  1560. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1561. if err != nil {
  1562. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1563. }
  1564. node.CostPerGPUHr = costPerGPUHr
  1565. node.Source += "/customGPU"
  1566. }
  1567. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1568. log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1569. ramCostStr := customPricingConfig.RAM
  1570. if node.Preemptible {
  1571. ramCostStr = customPricingConfig.SpotRAM
  1572. }
  1573. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1574. if err != nil {
  1575. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1576. }
  1577. node.CostPerRAMGiBHr = costPerRAMHr
  1578. node.Source += "/customRAM"
  1579. }
  1580. return node
  1581. }
  1582. // getCustomNodePricing converts the CostModel's configured custom pricing
  1583. // values into a NodePricing instance.
  1584. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1585. customPricingConfig, err := cm.Provider.GetConfig()
  1586. if err != nil {
  1587. return nil
  1588. }
  1589. cpuCostStr := customPricingConfig.CPU
  1590. gpuCostStr := customPricingConfig.GPU
  1591. ramCostStr := customPricingConfig.RAM
  1592. if spot {
  1593. cpuCostStr = customPricingConfig.SpotCPU
  1594. gpuCostStr = customPricingConfig.SpotGPU
  1595. ramCostStr = customPricingConfig.SpotRAM
  1596. }
  1597. node := &NodePricing{Source: "custom"}
  1598. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1599. if err != nil {
  1600. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1601. }
  1602. node.CostPerCPUHr = costPerCPUHr
  1603. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1604. if err != nil {
  1605. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1606. }
  1607. node.CostPerGPUHr = costPerGPUHr
  1608. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1609. if err != nil {
  1610. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1611. }
  1612. node.CostPerRAMGiBHr = costPerRAMHr
  1613. return node
  1614. }
  1615. // NodePricing describes the resource costs associated with a given node, as
  1616. // well as the source of the information (e.g. prometheus, custom)
  1617. type NodePricing struct {
  1618. Name string
  1619. NodeType string
  1620. ProviderID string
  1621. Preemptible bool
  1622. CostPerCPUHr float64
  1623. CostPerRAMGiBHr float64
  1624. CostPerGPUHr float64
  1625. Discount float64
  1626. Source string
  1627. }
  1628. // Pod describes a running pod's start and end time within a Window and
  1629. // all the Allocations (i.e. containers) contained within it.
  1630. type Pod struct {
  1631. Window kubecost.Window
  1632. Start time.Time
  1633. End time.Time
  1634. Key podKey
  1635. Allocations map[string]*kubecost.Allocation
  1636. }
  1637. // AppendContainer adds an entry for the given container name to the Pod.
  1638. func (p Pod) AppendContainer(container string) {
  1639. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1640. alloc := &kubecost.Allocation{
  1641. Name: name,
  1642. Properties: &kubecost.AllocationProperties{},
  1643. Window: p.Window.Clone(),
  1644. Start: p.Start,
  1645. End: p.End,
  1646. }
  1647. alloc.Properties.Container = container
  1648. alloc.Properties.Pod = p.Key.Pod
  1649. alloc.Properties.Namespace = p.Key.Namespace
  1650. alloc.Properties.Cluster = p.Key.Cluster
  1651. p.Allocations[container] = alloc
  1652. }
  1653. // PVC describes a PersistentVolumeClaim
  1654. // TODO:CLEANUP move to pkg/kubecost?
  1655. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1656. type PVC struct {
  1657. Bytes float64 `json:"bytes"`
  1658. Count int `json:"count"`
  1659. Name string `json:"name"`
  1660. Cluster string `json:"cluster"`
  1661. Namespace string `json:"namespace"`
  1662. Volume *PV `json:"persistentVolume"`
  1663. Mounted bool `json:"mounted"`
  1664. Start time.Time `json:"start"`
  1665. End time.Time `json:"end"`
  1666. }
  1667. // Cost computes the cumulative cost of the PVC
  1668. func (pvc *PVC) Cost() float64 {
  1669. if pvc == nil || pvc.Volume == nil {
  1670. return 0.0
  1671. }
  1672. gib := pvc.Bytes / 1024 / 1024 / 1024
  1673. hrs := pvc.Minutes() / 60.0
  1674. return pvc.Volume.CostPerGiBHour * gib * hrs
  1675. }
  1676. // Minutes computes the number of minutes over which the PVC is defined
  1677. func (pvc *PVC) Minutes() float64 {
  1678. if pvc == nil {
  1679. return 0.0
  1680. }
  1681. return pvc.End.Sub(pvc.Start).Minutes()
  1682. }
  1683. // String returns a string representation of the PVC
  1684. func (pvc *PVC) String() string {
  1685. if pvc == nil {
  1686. return "<nil>"
  1687. }
  1688. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  1689. }
  1690. // PV describes a PersistentVolume
  1691. // TODO:CLEANUP move to pkg/kubecost?
  1692. type PV struct {
  1693. Bytes float64 `json:"bytes"`
  1694. CostPerGiBHour float64 `json:"costPerGiBHour"`
  1695. Cluster string `json:"cluster"`
  1696. Name string `json:"name"`
  1697. StorageClass string `json:"storageClass"`
  1698. }
  1699. // String returns a string representation of the PV
  1700. func (pv *PV) String() string {
  1701. if pv == nil {
  1702. return "<nil>"
  1703. }
  1704. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  1705. }