allocation.go 61 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/env"
  10. "github.com/kubecost/cost-model/pkg/kubecost"
  11. "github.com/kubecost/cost-model/pkg/log"
  12. "github.com/kubecost/cost-model/pkg/prom"
  13. "github.com/kubecost/cost-model/pkg/util"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, cluster_id)[%s:%s]%s`
  18. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  19. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  20. queryFmtRAMUsage = `avg(avg_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  21. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  22. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  23. queryFmtCPUUsage = `avg(rate(container_cpu_usage_seconds_total{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  24. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  25. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  26. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  27. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  28. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
  29. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, cluster_id)[%s:%s]%s`
  30. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, cluster_id)`
  31. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, cluster_id)`
  32. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, cluster_id)`
  33. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, cluster_id)`
  34. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  35. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (cluster_id)`
  36. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  37. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (cluster_id)`
  38. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  39. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (cluster_id)`
  40. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
  41. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
  42. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
  43. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
  44. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
  45. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
  46. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
  47. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, cluster_id)`
  48. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,cluster_id)`
  49. )
  50. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  51. // for the window defined by the given start and end times. The Allocations
  52. // returned are unaggregated (i.e. down to the container level).
  53. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  54. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  55. // 2. Run and apply the results of the remaining queries to
  56. // 3. Build out AllocationSet from completed Pod map
  57. // Create a window spanning the requested query
  58. window := kubecost.NewWindow(&start, &end)
  59. // TODO niko/computeallocation remove log
  60. defer log.Profile(time.Now(), fmt.Sprintf("CostModel.ComputeAllocation: completed %s", window))
  61. // Create an empty AllocationSet. For safety, in the case of an error, we
  62. // should prefer to return this empty set with the error. (In the case of
  63. // no error, of course we populate the set and return it.)
  64. allocSet := kubecost.NewAllocationSet(start, end)
  65. // (1) Build out Pod map
  66. // Build out a map of Allocations as a mapping from pod-to-container-to-
  67. // underlying-Allocation instance, starting with (start, end) so that we
  68. // begin with minutes, from which we compute resource allocation and cost
  69. // totals from measured rate data.
  70. podMap := map[podKey]*Pod{}
  71. // clusterStarts and clusterEnds record the earliest start and latest end
  72. // times, respectively, on a cluster-basis. These are used for unmounted
  73. // PVs and other "virtual" Allocations so that minutes are maximally
  74. // accurate during start-up or spin-down of a cluster
  75. clusterStart := map[string]time.Time{}
  76. clusterEnd := map[string]time.Time{}
  77. cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
  78. // (2) Run and apply remaining queries
  79. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  80. // including handling Thanos offset
  81. durStr, offStr, err := window.DurationOffsetForPrometheus()
  82. if err != nil {
  83. // Negative duration, so return empty set
  84. return allocSet, nil
  85. }
  86. // Convert resolution duration to a query-ready string
  87. resStr := util.DurationString(resolution)
  88. ctx := prom.NewContext(cm.PrometheusClient)
  89. startQuerying := time.Now()
  90. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr)
  91. resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
  92. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr)
  93. resChRAMRequests := ctx.Query(queryRAMRequests)
  94. queryRAMUsage := fmt.Sprintf(queryFmtRAMUsage, durStr, offStr)
  95. resChRAMUsage := ctx.Query(queryRAMUsage)
  96. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr)
  97. resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
  98. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr)
  99. resChCPURequests := ctx.Query(queryCPURequests)
  100. queryCPUUsage := fmt.Sprintf(queryFmtCPUUsage, durStr, offStr)
  101. resChCPUUsage := ctx.Query(queryCPUUsage)
  102. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr)
  103. resChGPUsRequested := ctx.Query(queryGPUsRequested)
  104. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr)
  105. resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
  106. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr)
  107. resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
  108. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr)
  109. resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
  110. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
  111. resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
  112. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, durStr, resStr, offStr)
  113. resChPVCInfo := ctx.Query(queryPVCInfo)
  114. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr)
  115. resChPVBytes := ctx.Query(queryPVBytes)
  116. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr)
  117. resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
  118. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr)
  119. resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
  120. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr)
  121. resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
  122. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr)
  123. resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
  124. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr)
  125. resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
  126. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr)
  127. resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
  128. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr)
  129. resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
  130. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr)
  131. resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
  132. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr)
  133. resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
  134. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
  135. resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
  136. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
  137. resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
  138. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
  139. resChPodLabels := ctx.Query(queryPodLabels)
  140. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
  141. resChPodAnnotations := ctx.Query(queryPodAnnotations)
  142. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
  143. resChServiceLabels := ctx.Query(queryServiceLabels)
  144. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
  145. resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
  146. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
  147. resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
  148. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr)
  149. resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
  150. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr)
  151. resChJobLabels := ctx.Query(queryJobLabels)
  152. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  153. resCPURequests, _ := resChCPURequests.Await()
  154. resCPUUsage, _ := resChCPUUsage.Await()
  155. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  156. resRAMRequests, _ := resChRAMRequests.Await()
  157. resRAMUsage, _ := resChRAMUsage.Await()
  158. resGPUsRequested, _ := resChGPUsRequested.Await()
  159. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  160. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  161. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  162. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  163. resPVBytes, _ := resChPVBytes.Await()
  164. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  165. resPVCInfo, _ := resChPVCInfo.Await()
  166. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  167. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  168. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  169. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  170. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  171. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  172. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  173. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  174. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  175. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  176. resPodLabels, _ := resChPodLabels.Await()
  177. resPodAnnotations, _ := resChPodAnnotations.Await()
  178. resServiceLabels, _ := resChServiceLabels.Await()
  179. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  180. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  181. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  182. resJobLabels, _ := resChJobLabels.Await()
  183. log.Profile(startQuerying, "CostModel.ComputeAllocation: queries complete")
  184. if ctx.HasErrors() {
  185. for _, err := range ctx.Errors() {
  186. log.Errorf("CostModel.ComputeAllocation: %s", err)
  187. }
  188. return allocSet, ctx.ErrorCollection()
  189. }
  190. defer log.Profile(time.Now(), "CostModel.ComputeAllocation: processing complete")
  191. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  192. applyCPUCoresRequested(podMap, resCPURequests)
  193. applyCPUCoresUsed(podMap, resCPUUsage)
  194. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  195. applyRAMBytesRequested(podMap, resRAMRequests)
  196. applyRAMBytesUsed(podMap, resRAMUsage)
  197. applyGPUsRequested(podMap, resGPUsRequested)
  198. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  199. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  200. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  201. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  202. podLabels := resToPodLabels(resPodLabels)
  203. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  204. podAnnotations := resToPodAnnotations(resPodAnnotations)
  205. applyLabels(podMap, namespaceLabels, podLabels)
  206. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  207. serviceLabels := getServiceLabels(resServiceLabels)
  208. applyServicesToPods(podMap, podLabels, serviceLabels)
  209. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  210. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  211. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  212. podJobMap := resToPodJobMap(resJobLabels)
  213. applyControllersToPods(podMap, podDeploymentMap)
  214. applyControllersToPods(podMap, podStatefulSetMap)
  215. applyControllersToPods(podMap, podDaemonSetMap)
  216. applyControllersToPods(podMap, podJobMap)
  217. // TODO breakdown network costs?
  218. // Build out a map of Nodes with resource costs, discounts, and node types
  219. // for converting resource allocation data to cumulative costs.
  220. nodeMap := map[nodeKey]*NodePricing{}
  221. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  222. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  223. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  224. applyNodeSpot(nodeMap, resNodeIsSpot)
  225. applyNodeDiscount(nodeMap, cm)
  226. // Build out the map of all PVs with class, size and cost-per-hour.
  227. // Note: this does not record time running, which we may want to
  228. // include later for increased PV precision. (As long as the PV has
  229. // a PVC, we get time running there, so this is only inaccurate
  230. // for short-lived, unmounted PVs.)
  231. pvMap := map[pvKey]*PV{}
  232. buildPVMap(pvMap, resPVCostPerGiBHour)
  233. applyPVBytes(pvMap, resPVBytes)
  234. // Build out the map of all PVCs with time running, bytes requested,
  235. // and connect to the correct PV from pvMap. (If no PV exists, that
  236. // is noted, but does not result in any allocation/cost.)
  237. pvcMap := map[pvcKey]*PVC{}
  238. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  239. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  240. // Build out the relationships of pods to their PVCs. This step
  241. // populates the PVC.Count field so that PVC allocation can be
  242. // split appropriately among each pod's container allocation.
  243. podPVCMap := map[podKey][]*PVC{}
  244. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  245. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  246. // cluster representing each cluster's unmounted PVs (if necessary).
  247. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  248. // (3) Build out AllocationSet from Pod map
  249. for _, pod := range podMap {
  250. for _, alloc := range pod.Allocations {
  251. cluster, _ := alloc.Properties.GetCluster()
  252. nodeName, _ := alloc.Properties.GetNode()
  253. namespace, _ := alloc.Properties.GetNamespace()
  254. pod, _ := alloc.Properties.GetPod()
  255. container, _ := alloc.Properties.GetContainer()
  256. podKey := newPodKey(cluster, namespace, pod)
  257. nodeKey := newNodeKey(cluster, nodeName)
  258. node := cm.getNodePricing(nodeMap, nodeKey)
  259. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  260. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  261. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  262. if pvcs, ok := podPVCMap[podKey]; ok {
  263. for _, pvc := range pvcs {
  264. // Determine the (start, end) of the relationship between the
  265. // given PVC and the associated Allocation so that a precise
  266. // number of hours can be used to compute cumulative cost.
  267. s, e := alloc.Start, alloc.End
  268. if pvc.Start.After(alloc.Start) {
  269. s = pvc.Start
  270. }
  271. if pvc.End.Before(alloc.End) {
  272. e = pvc.End
  273. }
  274. minutes := e.Sub(s).Minutes()
  275. hrs := minutes / 60.0
  276. count := float64(pvc.Count)
  277. if pvc.Count < 1 {
  278. count = 1
  279. }
  280. gib := pvc.Bytes / 1024 / 1024 / 1024
  281. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  282. // Apply the size and cost of the PV to the allocation, each
  283. // weighted by count (i.e. the number of containers in the pod)
  284. alloc.PVByteHours += pvc.Bytes * hrs / count
  285. alloc.PVCost += cost / count
  286. }
  287. }
  288. // Make sure that the name is correct (node may not be present at this
  289. // point due to it missing from queryMinutes) then insert.
  290. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  291. allocSet.Set(alloc)
  292. }
  293. }
  294. return allocSet, nil
  295. }
  296. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  297. // Assumes that window is positive and closed
  298. start, end := *window.Start(), *window.End()
  299. // Convert resolution duration to a query-ready string
  300. resStr := util.DurationString(resolution)
  301. ctx := prom.NewContext(cm.PrometheusClient)
  302. profile := time.Now()
  303. // Query for (start, end) by (pod, namespace, cluster) over the given
  304. // window, using the given resolution, and if necessary in batches no
  305. // larger than the given maximum batch size. If working in batches, track
  306. // overall progress by starting with (window.start, window.start) and
  307. // querying in batches no larger than maxBatchSize from start-to-end,
  308. // folding each result set into podMap as the results come back.
  309. coverage := kubecost.NewWindow(&start, &start)
  310. numQuery := 1
  311. for coverage.End().Before(end) {
  312. // Determine the (start, end) of the current batch
  313. batchStart := *coverage.End()
  314. batchEnd := coverage.End().Add(maxBatchSize)
  315. if batchEnd.After(end) {
  316. batchEnd = end
  317. }
  318. batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
  319. var resPods []*prom.QueryResult
  320. var err error
  321. maxTries := 3
  322. numTries := 0
  323. for resPods == nil && numTries < maxTries {
  324. numTries++
  325. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  326. // including handling Thanos offset
  327. durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
  328. if err != nil || durStr == "" {
  329. // Negative duration, so set empty results and don't query
  330. resPods = []*prom.QueryResult{}
  331. err = nil
  332. break
  333. }
  334. // Submit and profile query
  335. queryPods := fmt.Sprintf(queryFmtPods, durStr, resStr, offStr)
  336. queryProfile := time.Now()
  337. resPods, err = ctx.Query(queryPods).Await()
  338. if err != nil {
  339. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  340. resPods = nil
  341. }
  342. }
  343. if err != nil {
  344. return err
  345. }
  346. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  347. coverage = coverage.ExpandEnd(batchEnd)
  348. numQuery++
  349. }
  350. log.Profile(profile, "CostModel.ComputeAllocation: pod map built")
  351. return nil
  352. }
  353. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  354. for _, res := range resPods {
  355. if len(res.Values) == 0 {
  356. log.Warningf("CostModel.ComputeAllocation: empty minutes result")
  357. continue
  358. }
  359. cluster, err := res.GetString("cluster_id")
  360. if err != nil {
  361. cluster = env.GetClusterID()
  362. }
  363. labels, err := res.GetStrings("namespace", "pod")
  364. if err != nil {
  365. log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  366. continue
  367. }
  368. namespace := labels["namespace"]
  369. pod := labels["pod"]
  370. key := newPodKey(cluster, namespace, pod)
  371. // allocStart and allocEnd are the timestamps of the first and last
  372. // minutes the pod was running, respectively. We subtract one resolution
  373. // from allocStart because this point will actually represent the end
  374. // of the first minute. We don't subtract from allocEnd because it
  375. // already represents the end of the last minute.
  376. var allocStart, allocEnd time.Time
  377. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  378. for _, datum := range res.Values {
  379. t := time.Unix(int64(datum.Timestamp), 0)
  380. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  381. // Set the start timestamp to the earliest non-zero timestamp
  382. allocStart = t
  383. // Record adjustment coefficient, i.e. the portion of the start
  384. // timestamp to "ignore". That is, sometimes the value will be
  385. // 0.5, meaning that we should discount the time running by
  386. // half of the resolution the timestamp stands for.
  387. startAdjustmentCoeff = (1.0 - datum.Value)
  388. }
  389. if datum.Value > 0 && window.Contains(t) {
  390. // Set the end timestamp to the latest non-zero timestamp
  391. allocEnd = t
  392. // If the end timestamp differs from the start, then record the
  393. // adjustment coefficient, i.e. the portion of the end
  394. // timestamp to "ignore". That is, sometimes the value will be
  395. // 0.5, meaning that we should discount the time running by
  396. // half of the resolution the timestamp stands for.
  397. if !allocStart.Equal(t) {
  398. endAdjustmentCoeff = (1.0 - datum.Value)
  399. }
  400. }
  401. }
  402. if allocStart.IsZero() || allocEnd.IsZero() {
  403. continue
  404. }
  405. // Adjust timestamps accorind to the resolution and the adjustment
  406. // coefficients, as described above. That is, count the start timestamp
  407. // from the beginning of the resolution, not the end. Then "reduce" the
  408. // start and end by the correct amount, in the case that the "running"
  409. // value of the first or last timestamp was not a full 1.0.
  410. allocStart = allocStart.Add(-resolution)
  411. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff) * resolution)
  412. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff) * resolution)
  413. // Set start if unset or this datum's start time is earlier than the
  414. // current earliest time.
  415. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  416. clusterStart[cluster] = allocStart
  417. }
  418. // Set end if unset or this datum's end time is later than the
  419. // current latest time.
  420. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  421. clusterEnd[cluster] = allocEnd
  422. }
  423. if pod, ok := podMap[key]; ok {
  424. // Pod has already been recorded, so update it accordingly
  425. if allocStart.Before(pod.Start) {
  426. pod.Start = allocStart
  427. }
  428. if allocEnd.After(pod.End) {
  429. pod.End = allocEnd
  430. }
  431. } else {
  432. // Pod has not been recorded yet, so insert it
  433. podMap[key] = &Pod{
  434. Window: window.Clone(),
  435. Start: allocStart,
  436. End: allocEnd,
  437. Key: key,
  438. Allocations: map[string]*kubecost.Allocation{},
  439. }
  440. }
  441. }
  442. }
  443. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  444. for _, res := range resCPUCoresAllocated {
  445. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  446. if err != nil {
  447. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  448. continue
  449. }
  450. pod, ok := podMap[key]
  451. if !ok {
  452. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result for unidentified pod: %s", key)
  453. continue
  454. }
  455. container, err := res.GetString("container")
  456. if err != nil {
  457. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  458. continue
  459. }
  460. if _, ok := pod.Allocations[container]; !ok {
  461. pod.AppendContainer(container)
  462. }
  463. cpuCores := res.Values[0].Value
  464. hours := pod.Allocations[container].Minutes() / 60.0
  465. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  466. node, err := res.GetString("node")
  467. if err != nil {
  468. log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  469. continue
  470. }
  471. pod.Allocations[container].Properties.SetNode(node)
  472. }
  473. }
  474. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  475. for _, res := range resCPUCoresRequested {
  476. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  477. if err != nil {
  478. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  479. continue
  480. }
  481. pod, ok := podMap[key]
  482. if !ok {
  483. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result for unidentified pod: %s", key)
  484. continue
  485. }
  486. container, err := res.GetString("container")
  487. if err != nil {
  488. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  489. continue
  490. }
  491. if _, ok := pod.Allocations[container]; !ok {
  492. pod.AppendContainer(container)
  493. }
  494. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  495. // If CPU allocation is less than requests, set CPUCoreHours to
  496. // request level.
  497. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  498. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  499. }
  500. node, err := res.GetString("node")
  501. if err != nil {
  502. log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  503. continue
  504. }
  505. pod.Allocations[container].Properties.SetNode(node)
  506. }
  507. }
  508. func applyCPUCoresUsed(podMap map[podKey]*Pod, resCPUCoresUsed []*prom.QueryResult) {
  509. for _, res := range resCPUCoresUsed {
  510. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  511. if err != nil {
  512. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage result missing field: %s", err)
  513. continue
  514. }
  515. pod, ok := podMap[key]
  516. if !ok {
  517. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage result for unidentified pod: %s", key)
  518. continue
  519. }
  520. container, err := res.GetString("container_name")
  521. if err != nil {
  522. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage query result missing 'container': %s", key)
  523. continue
  524. }
  525. if _, ok := pod.Allocations[container]; !ok {
  526. pod.AppendContainer(container)
  527. }
  528. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  529. }
  530. }
  531. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  532. for _, res := range resRAMBytesAllocated {
  533. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  534. if err != nil {
  535. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  536. continue
  537. }
  538. pod, ok := podMap[key]
  539. if !ok {
  540. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result for unidentified pod: %s", key)
  541. continue
  542. }
  543. container, err := res.GetString("container")
  544. if err != nil {
  545. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  546. continue
  547. }
  548. if _, ok := pod.Allocations[container]; !ok {
  549. pod.AppendContainer(container)
  550. }
  551. ramBytes := res.Values[0].Value
  552. hours := pod.Allocations[container].Minutes() / 60.0
  553. pod.Allocations[container].RAMByteHours = ramBytes * hours
  554. node, err := res.GetString("node")
  555. if err != nil {
  556. log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  557. continue
  558. }
  559. pod.Allocations[container].Properties.SetNode(node)
  560. }
  561. }
  562. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  563. for _, res := range resRAMBytesRequested {
  564. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  565. if err != nil {
  566. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  567. continue
  568. }
  569. pod, ok := podMap[key]
  570. if !ok {
  571. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result for unidentified pod: %s", key)
  572. continue
  573. }
  574. container, err := res.GetString("container")
  575. if err != nil {
  576. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  577. continue
  578. }
  579. if _, ok := pod.Allocations[container]; !ok {
  580. pod.AppendContainer(container)
  581. }
  582. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  583. // If RAM allocation is less than requests, set RAMByteHours to
  584. // request level.
  585. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  586. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  587. }
  588. node, err := res.GetString("node")
  589. if err != nil {
  590. log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  591. continue
  592. }
  593. pod.Allocations[container].Properties.SetNode(node)
  594. }
  595. }
  596. func applyRAMBytesUsed(podMap map[podKey]*Pod, resRAMBytesUsed []*prom.QueryResult) {
  597. for _, res := range resRAMBytesUsed {
  598. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  599. if err != nil {
  600. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage result missing field: %s", err)
  601. continue
  602. }
  603. pod, ok := podMap[key]
  604. if !ok {
  605. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage result for unidentified pod: %s", key)
  606. continue
  607. }
  608. container, err := res.GetString("container_name")
  609. if err != nil {
  610. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage query result missing 'container': %s", key)
  611. continue
  612. }
  613. if _, ok := pod.Allocations[container]; !ok {
  614. pod.AppendContainer(container)
  615. }
  616. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  617. }
  618. }
  619. func applyGPUsRequested(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult) {
  620. for _, res := range resGPUsRequested {
  621. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  622. if err != nil {
  623. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  624. continue
  625. }
  626. pod, ok := podMap[key]
  627. if !ok {
  628. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result for unidentified pod: %s", key)
  629. continue
  630. }
  631. container, err := res.GetString("container")
  632. if err != nil {
  633. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  634. continue
  635. }
  636. if _, ok := pod.Allocations[container]; !ok {
  637. pod.AppendContainer(container)
  638. }
  639. hrs := pod.Allocations[container].Minutes() / 60.0
  640. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  641. }
  642. }
  643. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  644. costPerGiBByCluster := map[string]float64{}
  645. for _, res := range resNetworkCostPerGiB {
  646. cluster, err := res.GetString("cluster_id")
  647. if err != nil {
  648. cluster = env.GetClusterID()
  649. }
  650. costPerGiBByCluster[cluster] = res.Values[0].Value
  651. }
  652. for _, res := range resNetworkGiB {
  653. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  654. if err != nil {
  655. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  656. continue
  657. }
  658. pod, ok := podMap[podKey]
  659. if !ok {
  660. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result for unidentified pod: %s", podKey)
  661. continue
  662. }
  663. for _, alloc := range pod.Allocations {
  664. gib := res.Values[0].Value / float64(len(pod.Allocations))
  665. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  666. alloc.NetworkCost = gib * costPerGiB
  667. }
  668. }
  669. }
  670. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[string]map[string]string {
  671. namespaceLabels := map[string]map[string]string{}
  672. for _, res := range resNamespaceLabels {
  673. namespace, err := res.GetString("namespace")
  674. if err != nil {
  675. continue
  676. }
  677. if _, ok := namespaceLabels[namespace]; !ok {
  678. namespaceLabels[namespace] = map[string]string{}
  679. }
  680. for k, l := range res.GetLabels() {
  681. namespaceLabels[namespace][k] = l
  682. }
  683. }
  684. return namespaceLabels
  685. }
  686. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  687. podLabels := map[podKey]map[string]string{}
  688. for _, res := range resPodLabels {
  689. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  690. if err != nil {
  691. continue
  692. }
  693. if _, ok := podLabels[podKey]; !ok {
  694. podLabels[podKey] = map[string]string{}
  695. }
  696. for k, l := range res.GetLabels() {
  697. podLabels[podKey][k] = l
  698. }
  699. }
  700. return podLabels
  701. }
  702. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  703. namespaceAnnotations := map[string]map[string]string{}
  704. for _, res := range resNamespaceAnnotations {
  705. namespace, err := res.GetString("namespace")
  706. if err != nil {
  707. continue
  708. }
  709. if _, ok := namespaceAnnotations[namespace]; !ok {
  710. namespaceAnnotations[namespace] = map[string]string{}
  711. }
  712. for k, l := range res.GetAnnotations() {
  713. namespaceAnnotations[namespace][k] = l
  714. }
  715. }
  716. return namespaceAnnotations
  717. }
  718. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  719. podAnnotations := map[podKey]map[string]string{}
  720. for _, res := range resPodAnnotations {
  721. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  722. if err != nil {
  723. continue
  724. }
  725. if _, ok := podAnnotations[podKey]; !ok {
  726. podAnnotations[podKey] = map[string]string{}
  727. }
  728. for k, l := range res.GetAnnotations() {
  729. podAnnotations[podKey][k] = l
  730. }
  731. }
  732. return podAnnotations
  733. }
  734. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[string]map[string]string, podLabels map[podKey]map[string]string) {
  735. for key, pod := range podMap {
  736. for _, alloc := range pod.Allocations {
  737. allocLabels, err := alloc.Properties.GetLabels()
  738. if err != nil {
  739. allocLabels = map[string]string{}
  740. }
  741. // Apply namespace labels first, then pod labels so that pod labels
  742. // overwrite namespace labels.
  743. if labels, ok := namespaceLabels[key.Namespace]; ok {
  744. for k, v := range labels {
  745. allocLabels[k] = v
  746. }
  747. }
  748. if labels, ok := podLabels[key]; ok {
  749. for k, v := range labels {
  750. allocLabels[k] = v
  751. }
  752. }
  753. alloc.Properties.SetLabels(allocLabels)
  754. }
  755. }
  756. }
  757. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  758. for key, pod := range podMap {
  759. for _, alloc := range pod.Allocations {
  760. allocAnnotations, err := alloc.Properties.GetAnnotations()
  761. if err != nil {
  762. allocAnnotations = map[string]string{}
  763. }
  764. // Apply namespace annotations first, then pod annotations so that
  765. // pod labels overwrite namespace labels.
  766. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  767. for k, v := range labels {
  768. allocAnnotations[k] = v
  769. }
  770. }
  771. if labels, ok := podAnnotations[key]; ok {
  772. for k, v := range labels {
  773. allocAnnotations[k] = v
  774. }
  775. }
  776. alloc.Properties.SetAnnotations(allocAnnotations)
  777. }
  778. }
  779. }
  780. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  781. serviceLabels := map[serviceKey]map[string]string{}
  782. for _, res := range resServiceLabels {
  783. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service")
  784. if err != nil {
  785. continue
  786. }
  787. if _, ok := serviceLabels[serviceKey]; !ok {
  788. serviceLabels[serviceKey] = map[string]string{}
  789. }
  790. for k, l := range res.GetLabels() {
  791. serviceLabels[serviceKey][k] = l
  792. }
  793. }
  794. // Prune duplicate services. That is, if the same service exists with
  795. // hyphens instead of underscores, keep the one that uses hyphens.
  796. for key := range serviceLabels {
  797. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  798. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  799. if _, ok := serviceLabels[duplicateKey]; ok {
  800. delete(serviceLabels, key)
  801. }
  802. }
  803. return serviceLabels
  804. }
  805. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  806. deploymentLabels := map[controllerKey]map[string]string{}
  807. for _, res := range resDeploymentLabels {
  808. controllerKey, err := resultDeploymentKey(res, "cluster_id", "namespace", "deployment")
  809. if err != nil {
  810. continue
  811. }
  812. if _, ok := deploymentLabels[controllerKey]; !ok {
  813. deploymentLabels[controllerKey] = map[string]string{}
  814. }
  815. for k, l := range res.GetLabels() {
  816. deploymentLabels[controllerKey][k] = l
  817. }
  818. }
  819. // Prune duplicate deployments. That is, if the same deployment exists with
  820. // hyphens instead of underscores, keep the one that uses hyphens.
  821. for key := range deploymentLabels {
  822. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  823. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  824. if _, ok := deploymentLabels[duplicateKey]; ok {
  825. delete(deploymentLabels, key)
  826. }
  827. }
  828. return deploymentLabels
  829. }
  830. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  831. statefulSetLabels := map[controllerKey]map[string]string{}
  832. for _, res := range resStatefulSetLabels {
  833. controllerKey, err := resultStatefulSetKey(res, "cluster_id", "namespace", "statefulSet")
  834. if err != nil {
  835. continue
  836. }
  837. if _, ok := statefulSetLabels[controllerKey]; !ok {
  838. statefulSetLabels[controllerKey] = map[string]string{}
  839. }
  840. for k, l := range res.GetLabels() {
  841. statefulSetLabels[controllerKey][k] = l
  842. }
  843. }
  844. // Prune duplicate stateful sets. That is, if the same stateful set exists
  845. // with hyphens instead of underscores, keep the one that uses hyphens.
  846. for key := range statefulSetLabels {
  847. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  848. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  849. if _, ok := statefulSetLabels[duplicateKey]; ok {
  850. delete(statefulSetLabels, key)
  851. }
  852. }
  853. return statefulSetLabels
  854. }
  855. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  856. podControllerMap := map[podKey]controllerKey{}
  857. // For each controller, turn the labels into a selector and attempt to
  858. // match it with each set of pod labels. A match indicates that the pod
  859. // belongs to the controller.
  860. for cKey, cLabels := range controllerLabels {
  861. selector := labels.Set(cLabels).AsSelectorPreValidated()
  862. for pKey, pLabels := range podLabels {
  863. // If the pod is in a different cluster or namespace, there is
  864. // no need to compare the labels.
  865. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  866. continue
  867. }
  868. podLabelSet := labels.Set(pLabels)
  869. if selector.Matches(podLabelSet) {
  870. if _, ok := podControllerMap[pKey]; ok {
  871. log.Warningf("CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  872. }
  873. podControllerMap[pKey] = cKey
  874. }
  875. }
  876. }
  877. return podControllerMap
  878. }
  879. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  880. daemonSetLabels := map[podKey]controllerKey{}
  881. for _, res := range resDaemonSetLabels {
  882. controllerKey, err := resultDaemonSetKey(res, "cluster_id", "namespace", "owner_name")
  883. if err != nil {
  884. continue
  885. }
  886. pod, err := res.GetString("pod")
  887. if err != nil {
  888. log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  889. }
  890. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  891. daemonSetLabels[podKey] = controllerKey
  892. }
  893. return daemonSetLabels
  894. }
  895. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  896. jobLabels := map[podKey]controllerKey{}
  897. for _, res := range resJobLabels {
  898. controllerKey, err := resultJobKey(res, "cluster_id", "namespace", "owner_name")
  899. if err != nil {
  900. continue
  901. }
  902. // Convert the name of Jobs generated by CronJobs to the name of the
  903. // CronJob by stripping the timestamp off the end.
  904. match := isCron.FindStringSubmatch(controllerKey.Controller)
  905. if match != nil {
  906. controllerKey.Controller = match[1]
  907. }
  908. pod, err := res.GetString("pod")
  909. if err != nil {
  910. log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  911. }
  912. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  913. jobLabels[podKey] = controllerKey
  914. }
  915. return jobLabels
  916. }
  917. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, serviceLabels map[serviceKey]map[string]string) {
  918. podServicesMap := map[podKey][]serviceKey{}
  919. // For each service, turn the labels into a selector and attempt to
  920. // match it with each set of pod labels. A match indicates that the pod
  921. // belongs to the service.
  922. for sKey, sLabels := range serviceLabels {
  923. selector := labels.Set(sLabels).AsSelectorPreValidated()
  924. for pKey, pLabels := range podLabels {
  925. // If the pod is in a different cluster or namespace, there is
  926. // no need to compare the labels.
  927. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  928. continue
  929. }
  930. podLabelSet := labels.Set(pLabels)
  931. if selector.Matches(podLabelSet) {
  932. if _, ok := podServicesMap[pKey]; !ok {
  933. podServicesMap[pKey] = []serviceKey{}
  934. }
  935. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  936. }
  937. }
  938. }
  939. // For each allocation in each pod, attempt to find and apply the list of
  940. // services associated with the allocation's pod.
  941. for key, pod := range podMap {
  942. for _, alloc := range pod.Allocations {
  943. if sKeys, ok := podServicesMap[key]; ok {
  944. services := []string{}
  945. for _, sKey := range sKeys {
  946. services = append(services, sKey.Service)
  947. }
  948. alloc.Properties.SetServices(services)
  949. }
  950. }
  951. }
  952. }
  953. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  954. for key, pod := range podMap {
  955. for _, alloc := range pod.Allocations {
  956. if controllerKey, ok := podControllerMap[key]; ok {
  957. alloc.Properties.SetControllerKind(controllerKey.ControllerKind)
  958. alloc.Properties.SetController(controllerKey.Controller)
  959. }
  960. }
  961. }
  962. }
  963. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  964. for _, res := range resNodeCostPerCPUHr {
  965. cluster, err := res.GetString("cluster_id")
  966. if err != nil {
  967. cluster = env.GetClusterID()
  968. }
  969. node, err := res.GetString("node")
  970. if err != nil {
  971. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  972. continue
  973. }
  974. instanceType, err := res.GetString("instance_type")
  975. if err != nil {
  976. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  977. continue
  978. }
  979. key := newNodeKey(cluster, node)
  980. if _, ok := nodeMap[key]; !ok {
  981. nodeMap[key] = &NodePricing{
  982. Name: node,
  983. NodeType: instanceType,
  984. }
  985. }
  986. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  987. }
  988. }
  989. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  990. for _, res := range resNodeCostPerRAMGiBHr {
  991. cluster, err := res.GetString("cluster_id")
  992. if err != nil {
  993. cluster = env.GetClusterID()
  994. }
  995. node, err := res.GetString("node")
  996. if err != nil {
  997. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  998. continue
  999. }
  1000. instanceType, err := res.GetString("instance_type")
  1001. if err != nil {
  1002. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1003. continue
  1004. }
  1005. key := newNodeKey(cluster, node)
  1006. if _, ok := nodeMap[key]; !ok {
  1007. nodeMap[key] = &NodePricing{
  1008. Name: node,
  1009. NodeType: instanceType,
  1010. }
  1011. }
  1012. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1013. }
  1014. }
  1015. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1016. for _, res := range resNodeCostPerGPUHr {
  1017. cluster, err := res.GetString("cluster_id")
  1018. if err != nil {
  1019. cluster = env.GetClusterID()
  1020. }
  1021. node, err := res.GetString("node")
  1022. if err != nil {
  1023. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1024. continue
  1025. }
  1026. instanceType, err := res.GetString("instance_type")
  1027. if err != nil {
  1028. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1029. continue
  1030. }
  1031. key := newNodeKey(cluster, node)
  1032. if _, ok := nodeMap[key]; !ok {
  1033. nodeMap[key] = &NodePricing{
  1034. Name: node,
  1035. NodeType: instanceType,
  1036. }
  1037. }
  1038. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1039. }
  1040. }
  1041. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1042. for _, res := range resNodeIsSpot {
  1043. cluster, err := res.GetString("cluster_id")
  1044. if err != nil {
  1045. cluster = env.GetClusterID()
  1046. }
  1047. node, err := res.GetString("node")
  1048. if err != nil {
  1049. log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1050. continue
  1051. }
  1052. key := newNodeKey(cluster, node)
  1053. if _, ok := nodeMap[key]; !ok {
  1054. log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1055. continue
  1056. }
  1057. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1058. }
  1059. }
  1060. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1061. if cm == nil {
  1062. return
  1063. }
  1064. c, err := cm.Provider.GetConfig()
  1065. if err != nil {
  1066. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1067. return
  1068. }
  1069. discount, err := ParsePercentString(c.Discount)
  1070. if err != nil {
  1071. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1072. return
  1073. }
  1074. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1075. if err != nil {
  1076. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1077. return
  1078. }
  1079. for _, node := range nodeMap {
  1080. // TODO GKE Reserved Instances into account
  1081. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1082. node.CostPerCPUHr *= (1.0 - node.Discount)
  1083. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1084. }
  1085. }
  1086. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1087. for _, res := range resPVCostPerGiBHour {
  1088. cluster, err := res.GetString("cluster_id")
  1089. if err != nil {
  1090. cluster = env.GetClusterID()
  1091. }
  1092. name, err := res.GetString("volumename")
  1093. if err != nil {
  1094. log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
  1095. continue
  1096. }
  1097. key := newPVKey(cluster, name)
  1098. pvMap[key] = &PV{
  1099. Cluster: cluster,
  1100. Name: name,
  1101. CostPerGiBHour: res.Values[0].Value,
  1102. }
  1103. }
  1104. }
  1105. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1106. for _, res := range resPVBytes {
  1107. key, err := resultPVKey(res, "cluster_id", "persistentvolume")
  1108. if err != nil {
  1109. log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1110. continue
  1111. }
  1112. if _, ok := pvMap[key]; !ok {
  1113. log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1114. continue
  1115. }
  1116. pvMap[key].Bytes = res.Values[0].Value
  1117. }
  1118. }
  1119. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1120. for _, res := range resPVCInfo {
  1121. cluster, err := res.GetString("cluster_id")
  1122. if err != nil {
  1123. cluster = env.GetClusterID()
  1124. }
  1125. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1126. if err != nil {
  1127. log.Warningf("CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1128. continue
  1129. }
  1130. namespace := values["namespace"]
  1131. name := values["persistentvolumeclaim"]
  1132. volume := values["volumename"]
  1133. storageClass := values["storageclass"]
  1134. pvKey := newPVKey(cluster, volume)
  1135. pvcKey := newPVCKey(cluster, namespace, name)
  1136. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1137. // the PVC was running, respectively. We subtract 1m from pvcStart
  1138. // because this point will actually represent the end of the first
  1139. // minute. We don't subtract from pvcEnd because it already represents
  1140. // the end of the last minute.
  1141. var pvcStart, pvcEnd time.Time
  1142. for _, datum := range res.Values {
  1143. t := time.Unix(int64(datum.Timestamp), 0)
  1144. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1145. pvcStart = t
  1146. }
  1147. if datum.Value > 0 && window.Contains(t) {
  1148. pvcEnd = t
  1149. }
  1150. }
  1151. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1152. log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1153. }
  1154. pvcStart = pvcStart.Add(-time.Minute)
  1155. if _, ok := pvMap[pvKey]; !ok {
  1156. log.Warningf("CostModel.ComputeAllocation: PV missing for PVC info query result: %s", pvKey)
  1157. continue
  1158. }
  1159. pvMap[pvKey].StorageClass = storageClass
  1160. if _, ok := pvcMap[pvcKey]; !ok {
  1161. pvcMap[pvcKey] = &PVC{}
  1162. }
  1163. pvcMap[pvcKey].Name = name
  1164. pvcMap[pvcKey].Namespace = namespace
  1165. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1166. pvcMap[pvcKey].Start = pvcStart
  1167. pvcMap[pvcKey].End = pvcEnd
  1168. }
  1169. }
  1170. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1171. for _, res := range resPVCBytesRequested {
  1172. key, err := resultPVCKey(res, "cluster_id", "namespace", "persistentvolumeclaim")
  1173. if err != nil {
  1174. log.Warningf("CostModel.ComputeAllocation: PVC bytes requested query result missing field: %s", err)
  1175. continue
  1176. }
  1177. if _, ok := pvcMap[key]; !ok {
  1178. log.Warningf("CostModel.ComputeAllocation: PVC bytes requested result for missing PVC: %s", key)
  1179. continue
  1180. }
  1181. pvcMap[key].Bytes = res.Values[0].Value
  1182. }
  1183. }
  1184. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1185. for _, res := range resPodPVCAllocation {
  1186. cluster, err := res.GetString("cluster_id")
  1187. if err != nil {
  1188. cluster = env.GetClusterID()
  1189. }
  1190. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1191. if err != nil {
  1192. log.Warningf("CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1193. continue
  1194. }
  1195. namespace := values["namespace"]
  1196. pod := values["pod"]
  1197. name := values["persistentvolumeclaim"]
  1198. volume := values["persistentvolume"]
  1199. podKey := newPodKey(cluster, namespace, pod)
  1200. pvKey := newPVKey(cluster, volume)
  1201. pvcKey := newPVCKey(cluster, namespace, name)
  1202. if _, ok := pvMap[pvKey]; !ok {
  1203. log.Warningf("CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1204. continue
  1205. }
  1206. if _, ok := podPVCMap[podKey]; !ok {
  1207. podPVCMap[podKey] = []*PVC{}
  1208. }
  1209. pvc, ok := pvcMap[pvcKey]
  1210. if !ok {
  1211. log.Warningf("CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1212. continue
  1213. }
  1214. count := 1
  1215. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1216. count = len(pod.Allocations)
  1217. } else {
  1218. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1219. }
  1220. pvc.Count = count
  1221. pvc.Mounted = true
  1222. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1223. }
  1224. }
  1225. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1226. unmountedPVBytes := map[string]float64{}
  1227. unmountedPVCost := map[string]float64{}
  1228. for _, pv := range pvMap {
  1229. mounted := false
  1230. for _, pvc := range pvcMap {
  1231. if pvc.Volume == nil {
  1232. continue
  1233. }
  1234. if pvc.Volume == pv {
  1235. mounted = true
  1236. break
  1237. }
  1238. }
  1239. if !mounted {
  1240. gib := pv.Bytes / 1024 / 1024 / 1024
  1241. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1242. cost := pv.CostPerGiBHour * gib * hrs
  1243. unmountedPVCost[pv.Cluster] += cost
  1244. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1245. }
  1246. }
  1247. for cluster, amount := range unmountedPVCost {
  1248. container := kubecost.UnmountedSuffix
  1249. pod := kubecost.UnmountedSuffix
  1250. namespace := kubecost.UnmountedSuffix
  1251. node := ""
  1252. key := newPodKey(cluster, namespace, pod)
  1253. podMap[key] = &Pod{
  1254. Window: window.Clone(),
  1255. Start: *window.Start(),
  1256. End: *window.End(),
  1257. Key: key,
  1258. Allocations: map[string]*kubecost.Allocation{},
  1259. }
  1260. podMap[key].AppendContainer(container)
  1261. podMap[key].Allocations[container].Properties.SetCluster(cluster)
  1262. podMap[key].Allocations[container].Properties.SetNode(node)
  1263. podMap[key].Allocations[container].Properties.SetNamespace(namespace)
  1264. podMap[key].Allocations[container].Properties.SetPod(pod)
  1265. podMap[key].Allocations[container].Properties.SetContainer(container)
  1266. podMap[key].Allocations[container].PVByteHours = unmountedPVBytes[cluster] * window.Minutes() / 60.0
  1267. podMap[key].Allocations[container].PVCost = amount
  1268. }
  1269. }
  1270. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1271. unmountedPVCBytes := map[namespaceKey]float64{}
  1272. unmountedPVCCost := map[namespaceKey]float64{}
  1273. for _, pvc := range pvcMap {
  1274. if !pvc.Mounted && pvc.Volume != nil {
  1275. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1276. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1277. hrs := pvc.Minutes() / 60.0
  1278. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1279. unmountedPVCCost[key] += cost
  1280. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1281. }
  1282. }
  1283. for key, amount := range unmountedPVCCost {
  1284. container := kubecost.UnmountedSuffix
  1285. pod := kubecost.UnmountedSuffix
  1286. namespace := key.Namespace
  1287. node := ""
  1288. cluster := key.Cluster
  1289. podKey := newPodKey(cluster, namespace, pod)
  1290. podMap[podKey] = &Pod{
  1291. Window: window.Clone(),
  1292. Start: *window.Start(),
  1293. End: *window.End(),
  1294. Key: podKey,
  1295. Allocations: map[string]*kubecost.Allocation{},
  1296. }
  1297. podMap[podKey].AppendContainer(container)
  1298. podMap[podKey].Allocations[container].Properties.SetCluster(cluster)
  1299. podMap[podKey].Allocations[container].Properties.SetNode(node)
  1300. podMap[podKey].Allocations[container].Properties.SetNamespace(namespace)
  1301. podMap[podKey].Allocations[container].Properties.SetPod(pod)
  1302. podMap[podKey].Allocations[container].Properties.SetContainer(container)
  1303. podMap[podKey].Allocations[container].PVByteHours = unmountedPVCBytes[key] * window.Minutes() / 60.0
  1304. podMap[podKey].Allocations[container].PVCost = amount
  1305. }
  1306. }
  1307. // getNodePricing determines node pricing, given a key and a mapping from keys
  1308. // to their NodePricing instances, as well as the custom pricing configuration
  1309. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1310. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1311. // back on custom pricing as a default.
  1312. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1313. // Find the relevant NodePricing, if it exists. If not, substitute the
  1314. // custom NodePricing as a default.
  1315. node, ok := nodeMap[nodeKey]
  1316. if !ok || node == nil {
  1317. if nodeKey.Node != "" {
  1318. log.Warningf("CostModel: failed to find node for %s", nodeKey)
  1319. }
  1320. return cm.getCustomNodePricing(false)
  1321. }
  1322. // If custom pricing is enabled and can be retrieved, override detected
  1323. // node pricing with the custom values.
  1324. customPricingConfig, err := cm.Provider.GetConfig()
  1325. if err != nil {
  1326. log.Warningf("CostModel: failed to load custom pricing: %s", err)
  1327. }
  1328. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1329. return cm.getCustomNodePricing(node.Preemptible)
  1330. }
  1331. // If any of the values are NaN or zero, replace them with the custom
  1332. // values as default.
  1333. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1334. // them as strings like this?
  1335. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1336. log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1337. cpuCostStr := customPricingConfig.CPU
  1338. if node.Preemptible {
  1339. cpuCostStr = customPricingConfig.SpotCPU
  1340. }
  1341. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1342. if err != nil {
  1343. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1344. }
  1345. node.CostPerCPUHr = costPerCPUHr
  1346. }
  1347. if math.IsNaN(node.CostPerGPUHr) {
  1348. log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1349. gpuCostStr := customPricingConfig.GPU
  1350. if node.Preemptible {
  1351. gpuCostStr = customPricingConfig.SpotGPU
  1352. }
  1353. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1354. if err != nil {
  1355. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1356. }
  1357. node.CostPerGPUHr = costPerGPUHr
  1358. }
  1359. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1360. log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1361. ramCostStr := customPricingConfig.RAM
  1362. if node.Preemptible {
  1363. ramCostStr = customPricingConfig.SpotRAM
  1364. }
  1365. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1366. if err != nil {
  1367. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1368. }
  1369. node.CostPerRAMGiBHr = costPerRAMHr
  1370. }
  1371. return node
  1372. }
  1373. // getCustomNodePricing converts the CostModel's configured custom pricing
  1374. // values into a NodePricing instance.
  1375. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1376. customPricingConfig, err := cm.Provider.GetConfig()
  1377. if err != nil {
  1378. return nil
  1379. }
  1380. cpuCostStr := customPricingConfig.CPU
  1381. gpuCostStr := customPricingConfig.GPU
  1382. ramCostStr := customPricingConfig.RAM
  1383. if spot {
  1384. cpuCostStr = customPricingConfig.SpotCPU
  1385. gpuCostStr = customPricingConfig.SpotGPU
  1386. ramCostStr = customPricingConfig.SpotRAM
  1387. }
  1388. node := &NodePricing{}
  1389. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1390. if err != nil {
  1391. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1392. }
  1393. node.CostPerCPUHr = costPerCPUHr
  1394. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1395. if err != nil {
  1396. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1397. }
  1398. node.CostPerGPUHr = costPerGPUHr
  1399. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1400. if err != nil {
  1401. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1402. }
  1403. node.CostPerRAMGiBHr = costPerRAMHr
  1404. return node
  1405. }
  1406. type NodePricing struct {
  1407. Name string
  1408. NodeType string
  1409. Preemptible bool
  1410. CostPerCPUHr float64
  1411. CostPerRAMGiBHr float64
  1412. CostPerGPUHr float64
  1413. Discount float64
  1414. Source string
  1415. }
  1416. // Pod describes a running pod's start and end time within a Window and
  1417. // all the Allocations (i.e. containers) contained within it.
  1418. type Pod struct {
  1419. Window kubecost.Window
  1420. Start time.Time
  1421. End time.Time
  1422. Key podKey
  1423. Allocations map[string]*kubecost.Allocation
  1424. }
  1425. // AppendContainer adds an entry for the given container name to the Pod.
  1426. func (p Pod) AppendContainer(container string) {
  1427. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1428. alloc := &kubecost.Allocation{
  1429. Name: name,
  1430. Properties: kubecost.Properties{},
  1431. Window: p.Window.Clone(),
  1432. Start: p.Start,
  1433. End: p.End,
  1434. }
  1435. alloc.Properties.SetContainer(container)
  1436. alloc.Properties.SetPod(p.Key.Pod)
  1437. alloc.Properties.SetNamespace(p.Key.Namespace)
  1438. alloc.Properties.SetCluster(p.Key.Cluster)
  1439. p.Allocations[container] = alloc
  1440. }
  1441. // PVC describes a PersistentVolumeClaim
  1442. // TODO:CLEANUP move to pkg/kubecost?
  1443. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1444. type PVC struct {
  1445. Bytes float64 `json:"bytes"`
  1446. Count int `json:"count"`
  1447. Name string `json:"name"`
  1448. Cluster string `json:"cluster"`
  1449. Namespace string `json:"namespace"`
  1450. Volume *PV `json:"persistentVolume"`
  1451. Mounted bool `json:"mounted"`
  1452. Start time.Time `json:"start"`
  1453. End time.Time `json:"end"`
  1454. }
  1455. // Cost computes the cumulative cost of the PVC
  1456. func (pvc *PVC) Cost() float64 {
  1457. if pvc == nil || pvc.Volume == nil {
  1458. return 0.0
  1459. }
  1460. gib := pvc.Bytes / 1024 / 1024 / 1024
  1461. hrs := pvc.Minutes() / 60.0
  1462. return pvc.Volume.CostPerGiBHour * gib * hrs
  1463. }
  1464. // Minutes computes the number of minutes over which the PVC is defined
  1465. func (pvc *PVC) Minutes() float64 {
  1466. if pvc == nil {
  1467. return 0.0
  1468. }
  1469. return pvc.End.Sub(pvc.Start).Minutes()
  1470. }
  1471. // String returns a string representation of the PVC
  1472. func (pvc *PVC) String() string {
  1473. if pvc == nil {
  1474. return "<nil>"
  1475. }
  1476. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  1477. }
  1478. // PV describes a PersistentVolume
  1479. // TODO:CLEANUP move to pkg/kubecost?
  1480. type PV struct {
  1481. Bytes float64 `json:"bytes"`
  1482. CostPerGiBHour float64 `json:"costPerGiBHour"`
  1483. Cluster string `json:"cluster"`
  1484. Name string `json:"name"`
  1485. StorageClass string `json:"storageClass"`
  1486. }
  1487. // String returns a string representation of the PV
  1488. func (pv *PV) String() string {
  1489. if pv == nil {
  1490. return "<nil>"
  1491. }
  1492. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  1493. }