allocation.go 62 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/env"
  10. "github.com/kubecost/cost-model/pkg/kubecost"
  11. "github.com/kubecost/cost-model/pkg/log"
  12. "github.com/kubecost/cost-model/pkg/prom"
  13. "github.com/kubecost/cost-model/pkg/util"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, cluster_id)[%s:%s]%s`
  18. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  19. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  20. queryFmtRAMUsage = `avg(avg_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  21. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  22. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  23. queryFmtCPUUsage = `avg(rate(container_cpu_usage_seconds_total{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  24. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  25. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  26. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  27. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  28. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
  29. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, cluster_id)[%s:%s]%s`
  30. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, cluster_id)`
  31. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, cluster_id)`
  32. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, cluster_id)`
  33. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, cluster_id)`
  34. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  35. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (cluster_id)`
  36. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  37. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (cluster_id)`
  38. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  39. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (cluster_id)`
  40. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
  41. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
  42. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
  43. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
  44. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
  45. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
  46. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
  47. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, cluster_id)`
  48. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,cluster_id)`
  49. )
  50. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  51. // for the window defined by the given start and end times. The Allocations
  52. // returned are unaggregated (i.e. down to the container level).
  53. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  54. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  55. // 2. Run and apply the results of the remaining queries to
  56. // 3. Build out AllocationSet from completed Pod map
  57. // Create a window spanning the requested query
  58. window := kubecost.NewWindow(&start, &end)
  59. // Create an empty AllocationSet. For safety, in the case of an error, we
  60. // should prefer to return this empty set with the error. (In the case of
  61. // no error, of course we populate the set and return it.)
  62. allocSet := kubecost.NewAllocationSet(start, end)
  63. // (1) Build out Pod map
  64. // Build out a map of Allocations as a mapping from pod-to-container-to-
  65. // underlying-Allocation instance, starting with (start, end) so that we
  66. // begin with minutes, from which we compute resource allocation and cost
  67. // totals from measured rate data.
  68. podMap := map[podKey]*Pod{}
  69. // clusterStarts and clusterEnds record the earliest start and latest end
  70. // times, respectively, on a cluster-basis. These are used for unmounted
  71. // PVs and other "virtual" Allocations so that minutes are maximally
  72. // accurate during start-up or spin-down of a cluster
  73. clusterStart := map[string]time.Time{}
  74. clusterEnd := map[string]time.Time{}
  75. cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
  76. // (2) Run and apply remaining queries
  77. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  78. // including handling Thanos offset
  79. durStr, offStr, err := window.DurationOffsetForPrometheus()
  80. if err != nil {
  81. // Negative duration, so return empty set
  82. return allocSet, nil
  83. }
  84. // Convert resolution duration to a query-ready string
  85. resStr := util.DurationString(resolution)
  86. ctx := prom.NewContext(cm.PrometheusClient)
  87. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr)
  88. resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
  89. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr)
  90. resChRAMRequests := ctx.Query(queryRAMRequests)
  91. queryRAMUsage := fmt.Sprintf(queryFmtRAMUsage, durStr, offStr)
  92. resChRAMUsage := ctx.Query(queryRAMUsage)
  93. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr)
  94. resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
  95. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr)
  96. resChCPURequests := ctx.Query(queryCPURequests)
  97. queryCPUUsage := fmt.Sprintf(queryFmtCPUUsage, durStr, offStr)
  98. resChCPUUsage := ctx.Query(queryCPUUsage)
  99. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr)
  100. resChGPUsRequested := ctx.Query(queryGPUsRequested)
  101. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr)
  102. resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
  103. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr)
  104. resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
  105. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr)
  106. resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
  107. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
  108. resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
  109. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, durStr, resStr, offStr)
  110. resChPVCInfo := ctx.Query(queryPVCInfo)
  111. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr)
  112. resChPVBytes := ctx.Query(queryPVBytes)
  113. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr)
  114. resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
  115. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr)
  116. resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
  117. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr)
  118. resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
  119. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr)
  120. resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
  121. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr)
  122. resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
  123. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr)
  124. resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
  125. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr)
  126. resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
  127. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr)
  128. resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
  129. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr)
  130. resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
  131. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
  132. resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
  133. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
  134. resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
  135. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
  136. resChPodLabels := ctx.Query(queryPodLabels)
  137. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
  138. resChPodAnnotations := ctx.Query(queryPodAnnotations)
  139. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
  140. resChServiceLabels := ctx.Query(queryServiceLabels)
  141. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
  142. resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
  143. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
  144. resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
  145. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr)
  146. resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
  147. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr)
  148. resChJobLabels := ctx.Query(queryJobLabels)
  149. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  150. resCPURequests, _ := resChCPURequests.Await()
  151. resCPUUsage, _ := resChCPUUsage.Await()
  152. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  153. resRAMRequests, _ := resChRAMRequests.Await()
  154. resRAMUsage, _ := resChRAMUsage.Await()
  155. resGPUsRequested, _ := resChGPUsRequested.Await()
  156. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  157. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  158. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  159. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  160. resPVBytes, _ := resChPVBytes.Await()
  161. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  162. resPVCInfo, _ := resChPVCInfo.Await()
  163. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  164. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  165. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  166. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  167. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  168. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  169. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  170. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  171. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  172. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  173. resPodLabels, _ := resChPodLabels.Await()
  174. resPodAnnotations, _ := resChPodAnnotations.Await()
  175. resServiceLabels, _ := resChServiceLabels.Await()
  176. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  177. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  178. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  179. resJobLabels, _ := resChJobLabels.Await()
  180. if ctx.HasErrors() {
  181. for _, err := range ctx.Errors() {
  182. log.Errorf("CostModel.ComputeAllocation: %s", err)
  183. }
  184. return allocSet, ctx.ErrorCollection()
  185. }
  186. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  187. applyCPUCoresRequested(podMap, resCPURequests)
  188. applyCPUCoresUsed(podMap, resCPUUsage)
  189. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  190. applyRAMBytesRequested(podMap, resRAMRequests)
  191. applyRAMBytesUsed(podMap, resRAMUsage)
  192. applyGPUsRequested(podMap, resGPUsRequested)
  193. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  194. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  195. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  196. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  197. podLabels := resToPodLabels(resPodLabels)
  198. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  199. podAnnotations := resToPodAnnotations(resPodAnnotations)
  200. applyLabels(podMap, namespaceLabels, podLabels)
  201. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  202. serviceLabels := getServiceLabels(resServiceLabels)
  203. applyServicesToPods(podMap, podLabels, serviceLabels)
  204. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  205. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  206. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  207. podJobMap := resToPodJobMap(resJobLabels)
  208. applyControllersToPods(podMap, podDeploymentMap)
  209. applyControllersToPods(podMap, podStatefulSetMap)
  210. applyControllersToPods(podMap, podDaemonSetMap)
  211. applyControllersToPods(podMap, podJobMap)
  212. // TODO breakdown network costs?
  213. // Build out a map of Nodes with resource costs, discounts, and node types
  214. // for converting resource allocation data to cumulative costs.
  215. nodeMap := map[nodeKey]*NodePricing{}
  216. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  217. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  218. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  219. applyNodeSpot(nodeMap, resNodeIsSpot)
  220. applyNodeDiscount(nodeMap, cm)
  221. // Build out the map of all PVs with class, size and cost-per-hour.
  222. // Note: this does not record time running, which we may want to
  223. // include later for increased PV precision. (As long as the PV has
  224. // a PVC, we get time running there, so this is only inaccurate
  225. // for short-lived, unmounted PVs.)
  226. pvMap := map[pvKey]*PV{}
  227. buildPVMap(pvMap, resPVCostPerGiBHour)
  228. applyPVBytes(pvMap, resPVBytes)
  229. // Build out the map of all PVCs with time running, bytes requested,
  230. // and connect to the correct PV from pvMap. (If no PV exists, that
  231. // is noted, but does not result in any allocation/cost.)
  232. pvcMap := map[pvcKey]*PVC{}
  233. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  234. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  235. // Build out the relationships of pods to their PVCs. This step
  236. // populates the PVC.Count field so that PVC allocation can be
  237. // split appropriately among each pod's container allocation.
  238. podPVCMap := map[podKey][]*PVC{}
  239. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  240. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  241. // cluster representing each cluster's unmounted PVs (if necessary).
  242. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  243. // (3) Build out AllocationSet from Pod map
  244. for _, pod := range podMap {
  245. for _, alloc := range pod.Allocations {
  246. cluster, _ := alloc.Properties.GetCluster()
  247. nodeName, _ := alloc.Properties.GetNode()
  248. namespace, _ := alloc.Properties.GetNamespace()
  249. pod, _ := alloc.Properties.GetPod()
  250. container, _ := alloc.Properties.GetContainer()
  251. podKey := newPodKey(cluster, namespace, pod)
  252. nodeKey := newNodeKey(cluster, nodeName)
  253. node := cm.getNodePricing(nodeMap, nodeKey)
  254. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  255. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  256. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  257. if pvcs, ok := podPVCMap[podKey]; ok {
  258. for _, pvc := range pvcs {
  259. // Determine the (start, end) of the relationship between the
  260. // given PVC and the associated Allocation so that a precise
  261. // number of hours can be used to compute cumulative cost.
  262. s, e := alloc.Start, alloc.End
  263. if pvc.Start.After(alloc.Start) {
  264. s = pvc.Start
  265. }
  266. if pvc.End.Before(alloc.End) {
  267. e = pvc.End
  268. }
  269. minutes := e.Sub(s).Minutes()
  270. hrs := minutes / 60.0
  271. count := float64(pvc.Count)
  272. if pvc.Count < 1 {
  273. count = 1
  274. }
  275. gib := pvc.Bytes / 1024 / 1024 / 1024
  276. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  277. // Apply the size and cost of the PV to the allocation, each
  278. // weighted by count (i.e. the number of containers in the pod)
  279. alloc.PVByteHours += pvc.Bytes * hrs / count
  280. alloc.PVCost += cost / count
  281. }
  282. }
  283. // Make sure that the name is correct (node may not be present at this
  284. // point due to it missing from queryMinutes) then insert.
  285. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  286. allocSet.Set(alloc)
  287. }
  288. }
  289. return allocSet, nil
  290. }
  291. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  292. // Assumes that window is positive and closed
  293. start, end := *window.Start(), *window.End()
  294. // Convert resolution duration to a query-ready string
  295. resStr := util.DurationString(resolution)
  296. ctx := prom.NewContext(cm.PrometheusClient)
  297. // Query for (start, end) by (pod, namespace, cluster) over the given
  298. // window, using the given resolution, and if necessary in batches no
  299. // larger than the given maximum batch size. If working in batches, track
  300. // overall progress by starting with (window.start, window.start) and
  301. // querying in batches no larger than maxBatchSize from start-to-end,
  302. // folding each result set into podMap as the results come back.
  303. coverage := kubecost.NewWindow(&start, &start)
  304. numQuery := 1
  305. for coverage.End().Before(end) {
  306. // Determine the (start, end) of the current batch
  307. batchStart := *coverage.End()
  308. batchEnd := coverage.End().Add(maxBatchSize)
  309. if batchEnd.After(end) {
  310. batchEnd = end
  311. }
  312. batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
  313. var resPods []*prom.QueryResult
  314. var err error
  315. maxTries := 3
  316. numTries := 0
  317. for resPods == nil && numTries < maxTries {
  318. numTries++
  319. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  320. // including handling Thanos offset
  321. durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
  322. if err != nil || durStr == "" {
  323. // Negative duration, so set empty results and don't query
  324. resPods = []*prom.QueryResult{}
  325. err = nil
  326. break
  327. }
  328. // Submit and profile query
  329. queryPods := fmt.Sprintf(queryFmtPods, durStr, resStr, offStr)
  330. queryProfile := time.Now()
  331. resPods, err = ctx.Query(queryPods).Await()
  332. if err != nil {
  333. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  334. resPods = nil
  335. }
  336. }
  337. if err != nil {
  338. return err
  339. }
  340. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  341. coverage = coverage.ExpandEnd(batchEnd)
  342. numQuery++
  343. }
  344. return nil
  345. }
  346. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  347. for _, res := range resPods {
  348. if len(res.Values) == 0 {
  349. log.Warningf("CostModel.ComputeAllocation: empty minutes result")
  350. continue
  351. }
  352. cluster, err := res.GetString("cluster_id")
  353. if err != nil {
  354. cluster = env.GetClusterID()
  355. }
  356. labels, err := res.GetStrings("namespace", "pod")
  357. if err != nil {
  358. log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  359. continue
  360. }
  361. namespace := labels["namespace"]
  362. pod := labels["pod"]
  363. key := newPodKey(cluster, namespace, pod)
  364. // allocStart and allocEnd are the timestamps of the first and last
  365. // minutes the pod was running, respectively. We subtract one resolution
  366. // from allocStart because this point will actually represent the end
  367. // of the first minute. We don't subtract from allocEnd because it
  368. // already represents the end of the last minute.
  369. var allocStart, allocEnd time.Time
  370. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  371. for _, datum := range res.Values {
  372. t := time.Unix(int64(datum.Timestamp), 0)
  373. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  374. // Set the start timestamp to the earliest non-zero timestamp
  375. allocStart = t
  376. // Record adjustment coefficient, i.e. the portion of the start
  377. // timestamp to "ignore". That is, sometimes the value will be
  378. // 0.5, meaning that we should discount the time running by
  379. // half of the resolution the timestamp stands for.
  380. startAdjustmentCoeff = (1.0 - datum.Value)
  381. }
  382. if datum.Value > 0 && window.Contains(t) {
  383. // Set the end timestamp to the latest non-zero timestamp
  384. allocEnd = t
  385. // Record adjustment coefficient, i.e. the portion of the end
  386. // timestamp to "ignore". (See explanation above for start.)
  387. endAdjustmentCoeff = (1.0 - datum.Value)
  388. }
  389. }
  390. if allocStart.IsZero() || allocEnd.IsZero() {
  391. continue
  392. }
  393. // Adjust timestamps accorind to the resolution and the adjustment
  394. // coefficients, as described above. That is, count the start timestamp
  395. // from the beginning of the resolution, not the end. Then "reduce" the
  396. // start and end by the correct amount, in the case that the "running"
  397. // value of the first or last timestamp was not a full 1.0.
  398. allocStart = allocStart.Add(-resolution)
  399. // Note: the *100 and /100 are necessary because Duration is an int, so
  400. // 0.5, for instance, will be truncated, resulting in no adjustment.
  401. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  402. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  403. // If there is only one point with a value <= 0.5 that the start and
  404. // end timestamps both share, then we will enter this case because at
  405. // least half of a resolution will be subtracted from both the start
  406. // and the end. If that is the case, then add back half of each side
  407. // so that the pod is said to run for half a resolution total.
  408. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  409. // end up with allocEnd == allocStart and each coeff == 0.5. In
  410. // that case, add 0.25m to each side, resulting in 0.5m duration.
  411. // if !allocEnd.After(allocStart) && startAdjustmentCoeff == endAdjustmentCoeff {
  412. if !allocEnd.After(allocStart) {
  413. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  414. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  415. }
  416. // Set start if unset or this datum's start time is earlier than the
  417. // current earliest time.
  418. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  419. clusterStart[cluster] = allocStart
  420. }
  421. // Set end if unset or this datum's end time is later than the
  422. // current latest time.
  423. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  424. clusterEnd[cluster] = allocEnd
  425. }
  426. if pod, ok := podMap[key]; ok {
  427. // Pod has already been recorded, so update it accordingly
  428. if allocStart.Before(pod.Start) {
  429. pod.Start = allocStart
  430. }
  431. if allocEnd.After(pod.End) {
  432. pod.End = allocEnd
  433. }
  434. } else {
  435. // Pod has not been recorded yet, so insert it
  436. podMap[key] = &Pod{
  437. Window: window.Clone(),
  438. Start: allocStart,
  439. End: allocEnd,
  440. Key: key,
  441. Allocations: map[string]*kubecost.Allocation{},
  442. }
  443. }
  444. }
  445. }
  446. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  447. for _, res := range resCPUCoresAllocated {
  448. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  449. if err != nil {
  450. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  451. continue
  452. }
  453. pod, ok := podMap[key]
  454. if !ok {
  455. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result for unidentified pod: %s", key)
  456. continue
  457. }
  458. container, err := res.GetString("container")
  459. if err != nil {
  460. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  461. continue
  462. }
  463. if _, ok := pod.Allocations[container]; !ok {
  464. pod.AppendContainer(container)
  465. }
  466. cpuCores := res.Values[0].Value
  467. hours := pod.Allocations[container].Minutes() / 60.0
  468. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  469. node, err := res.GetString("node")
  470. if err != nil {
  471. log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  472. continue
  473. }
  474. pod.Allocations[container].Properties.SetNode(node)
  475. }
  476. }
  477. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  478. for _, res := range resCPUCoresRequested {
  479. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  480. if err != nil {
  481. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  482. continue
  483. }
  484. pod, ok := podMap[key]
  485. if !ok {
  486. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result for unidentified pod: %s", key)
  487. continue
  488. }
  489. container, err := res.GetString("container")
  490. if err != nil {
  491. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  492. continue
  493. }
  494. if _, ok := pod.Allocations[container]; !ok {
  495. pod.AppendContainer(container)
  496. }
  497. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  498. // If CPU allocation is less than requests, set CPUCoreHours to
  499. // request level.
  500. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  501. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  502. }
  503. node, err := res.GetString("node")
  504. if err != nil {
  505. log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  506. continue
  507. }
  508. pod.Allocations[container].Properties.SetNode(node)
  509. }
  510. }
  511. func applyCPUCoresUsed(podMap map[podKey]*Pod, resCPUCoresUsed []*prom.QueryResult) {
  512. for _, res := range resCPUCoresUsed {
  513. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  514. if err != nil {
  515. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage result missing field: %s", err)
  516. continue
  517. }
  518. pod, ok := podMap[key]
  519. if !ok {
  520. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage result for unidentified pod: %s", key)
  521. continue
  522. }
  523. container, err := res.GetString("container_name")
  524. if err != nil {
  525. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage query result missing 'container': %s", key)
  526. continue
  527. }
  528. if _, ok := pod.Allocations[container]; !ok {
  529. pod.AppendContainer(container)
  530. }
  531. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  532. }
  533. }
  534. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  535. for _, res := range resRAMBytesAllocated {
  536. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  537. if err != nil {
  538. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  539. continue
  540. }
  541. pod, ok := podMap[key]
  542. if !ok {
  543. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result for unidentified pod: %s", key)
  544. continue
  545. }
  546. container, err := res.GetString("container")
  547. if err != nil {
  548. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  549. continue
  550. }
  551. if _, ok := pod.Allocations[container]; !ok {
  552. pod.AppendContainer(container)
  553. }
  554. ramBytes := res.Values[0].Value
  555. hours := pod.Allocations[container].Minutes() / 60.0
  556. pod.Allocations[container].RAMByteHours = ramBytes * hours
  557. node, err := res.GetString("node")
  558. if err != nil {
  559. log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  560. continue
  561. }
  562. pod.Allocations[container].Properties.SetNode(node)
  563. }
  564. }
  565. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  566. for _, res := range resRAMBytesRequested {
  567. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  568. if err != nil {
  569. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  570. continue
  571. }
  572. pod, ok := podMap[key]
  573. if !ok {
  574. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result for unidentified pod: %s", key)
  575. continue
  576. }
  577. container, err := res.GetString("container")
  578. if err != nil {
  579. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  580. continue
  581. }
  582. if _, ok := pod.Allocations[container]; !ok {
  583. pod.AppendContainer(container)
  584. }
  585. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  586. // If RAM allocation is less than requests, set RAMByteHours to
  587. // request level.
  588. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  589. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  590. }
  591. node, err := res.GetString("node")
  592. if err != nil {
  593. log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  594. continue
  595. }
  596. pod.Allocations[container].Properties.SetNode(node)
  597. }
  598. }
  599. func applyRAMBytesUsed(podMap map[podKey]*Pod, resRAMBytesUsed []*prom.QueryResult) {
  600. for _, res := range resRAMBytesUsed {
  601. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  602. if err != nil {
  603. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage result missing field: %s", err)
  604. continue
  605. }
  606. pod, ok := podMap[key]
  607. if !ok {
  608. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage result for unidentified pod: %s", key)
  609. continue
  610. }
  611. container, err := res.GetString("container_name")
  612. if err != nil {
  613. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage query result missing 'container': %s", key)
  614. continue
  615. }
  616. if _, ok := pod.Allocations[container]; !ok {
  617. pod.AppendContainer(container)
  618. }
  619. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  620. }
  621. }
  622. func applyGPUsRequested(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult) {
  623. for _, res := range resGPUsRequested {
  624. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  625. if err != nil {
  626. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  627. continue
  628. }
  629. pod, ok := podMap[key]
  630. if !ok {
  631. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result for unidentified pod: %s", key)
  632. continue
  633. }
  634. container, err := res.GetString("container")
  635. if err != nil {
  636. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  637. continue
  638. }
  639. if _, ok := pod.Allocations[container]; !ok {
  640. pod.AppendContainer(container)
  641. }
  642. hrs := pod.Allocations[container].Minutes() / 60.0
  643. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  644. }
  645. }
  646. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  647. costPerGiBByCluster := map[string]float64{}
  648. for _, res := range resNetworkCostPerGiB {
  649. cluster, err := res.GetString("cluster_id")
  650. if err != nil {
  651. cluster = env.GetClusterID()
  652. }
  653. costPerGiBByCluster[cluster] = res.Values[0].Value
  654. }
  655. for _, res := range resNetworkGiB {
  656. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  657. if err != nil {
  658. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  659. continue
  660. }
  661. pod, ok := podMap[podKey]
  662. if !ok {
  663. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result for unidentified pod: %s", podKey)
  664. continue
  665. }
  666. for _, alloc := range pod.Allocations {
  667. gib := res.Values[0].Value / float64(len(pod.Allocations))
  668. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  669. alloc.NetworkCost = gib * costPerGiB
  670. }
  671. }
  672. }
  673. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[string]map[string]string {
  674. namespaceLabels := map[string]map[string]string{}
  675. for _, res := range resNamespaceLabels {
  676. namespace, err := res.GetString("namespace")
  677. if err != nil {
  678. continue
  679. }
  680. if _, ok := namespaceLabels[namespace]; !ok {
  681. namespaceLabels[namespace] = map[string]string{}
  682. }
  683. for k, l := range res.GetLabels() {
  684. namespaceLabels[namespace][k] = l
  685. }
  686. }
  687. return namespaceLabels
  688. }
  689. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  690. podLabels := map[podKey]map[string]string{}
  691. for _, res := range resPodLabels {
  692. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  693. if err != nil {
  694. continue
  695. }
  696. if _, ok := podLabels[podKey]; !ok {
  697. podLabels[podKey] = map[string]string{}
  698. }
  699. for k, l := range res.GetLabels() {
  700. podLabels[podKey][k] = l
  701. }
  702. }
  703. return podLabels
  704. }
  705. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  706. namespaceAnnotations := map[string]map[string]string{}
  707. for _, res := range resNamespaceAnnotations {
  708. namespace, err := res.GetString("namespace")
  709. if err != nil {
  710. continue
  711. }
  712. if _, ok := namespaceAnnotations[namespace]; !ok {
  713. namespaceAnnotations[namespace] = map[string]string{}
  714. }
  715. for k, l := range res.GetAnnotations() {
  716. namespaceAnnotations[namespace][k] = l
  717. }
  718. }
  719. return namespaceAnnotations
  720. }
  721. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  722. podAnnotations := map[podKey]map[string]string{}
  723. for _, res := range resPodAnnotations {
  724. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  725. if err != nil {
  726. continue
  727. }
  728. if _, ok := podAnnotations[podKey]; !ok {
  729. podAnnotations[podKey] = map[string]string{}
  730. }
  731. for k, l := range res.GetAnnotations() {
  732. podAnnotations[podKey][k] = l
  733. }
  734. }
  735. return podAnnotations
  736. }
  737. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[string]map[string]string, podLabels map[podKey]map[string]string) {
  738. for key, pod := range podMap {
  739. for _, alloc := range pod.Allocations {
  740. allocLabels, err := alloc.Properties.GetLabels()
  741. if err != nil {
  742. allocLabels = map[string]string{}
  743. }
  744. // Apply namespace labels first, then pod labels so that pod labels
  745. // overwrite namespace labels.
  746. if labels, ok := namespaceLabels[key.Namespace]; ok {
  747. for k, v := range labels {
  748. allocLabels[k] = v
  749. }
  750. }
  751. if labels, ok := podLabels[key]; ok {
  752. for k, v := range labels {
  753. allocLabels[k] = v
  754. }
  755. }
  756. alloc.Properties.SetLabels(allocLabels)
  757. }
  758. }
  759. }
  760. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  761. for key, pod := range podMap {
  762. for _, alloc := range pod.Allocations {
  763. allocAnnotations, err := alloc.Properties.GetAnnotations()
  764. if err != nil {
  765. allocAnnotations = map[string]string{}
  766. }
  767. // Apply namespace annotations first, then pod annotations so that
  768. // pod labels overwrite namespace labels.
  769. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  770. for k, v := range labels {
  771. allocAnnotations[k] = v
  772. }
  773. }
  774. if labels, ok := podAnnotations[key]; ok {
  775. for k, v := range labels {
  776. allocAnnotations[k] = v
  777. }
  778. }
  779. alloc.Properties.SetAnnotations(allocAnnotations)
  780. }
  781. }
  782. }
  783. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  784. serviceLabels := map[serviceKey]map[string]string{}
  785. for _, res := range resServiceLabels {
  786. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service")
  787. if err != nil {
  788. continue
  789. }
  790. if _, ok := serviceLabels[serviceKey]; !ok {
  791. serviceLabels[serviceKey] = map[string]string{}
  792. }
  793. for k, l := range res.GetLabels() {
  794. serviceLabels[serviceKey][k] = l
  795. }
  796. }
  797. // Prune duplicate services. That is, if the same service exists with
  798. // hyphens instead of underscores, keep the one that uses hyphens.
  799. for key := range serviceLabels {
  800. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  801. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  802. if _, ok := serviceLabels[duplicateKey]; ok {
  803. delete(serviceLabels, key)
  804. }
  805. }
  806. return serviceLabels
  807. }
  808. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  809. deploymentLabels := map[controllerKey]map[string]string{}
  810. for _, res := range resDeploymentLabels {
  811. controllerKey, err := resultDeploymentKey(res, "cluster_id", "namespace", "deployment")
  812. if err != nil {
  813. continue
  814. }
  815. if _, ok := deploymentLabels[controllerKey]; !ok {
  816. deploymentLabels[controllerKey] = map[string]string{}
  817. }
  818. for k, l := range res.GetLabels() {
  819. deploymentLabels[controllerKey][k] = l
  820. }
  821. }
  822. // Prune duplicate deployments. That is, if the same deployment exists with
  823. // hyphens instead of underscores, keep the one that uses hyphens.
  824. for key := range deploymentLabels {
  825. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  826. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  827. if _, ok := deploymentLabels[duplicateKey]; ok {
  828. delete(deploymentLabels, key)
  829. }
  830. }
  831. return deploymentLabels
  832. }
  833. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  834. statefulSetLabels := map[controllerKey]map[string]string{}
  835. for _, res := range resStatefulSetLabels {
  836. controllerKey, err := resultStatefulSetKey(res, "cluster_id", "namespace", "statefulSet")
  837. if err != nil {
  838. continue
  839. }
  840. if _, ok := statefulSetLabels[controllerKey]; !ok {
  841. statefulSetLabels[controllerKey] = map[string]string{}
  842. }
  843. for k, l := range res.GetLabels() {
  844. statefulSetLabels[controllerKey][k] = l
  845. }
  846. }
  847. // Prune duplicate stateful sets. That is, if the same stateful set exists
  848. // with hyphens instead of underscores, keep the one that uses hyphens.
  849. for key := range statefulSetLabels {
  850. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  851. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  852. if _, ok := statefulSetLabels[duplicateKey]; ok {
  853. delete(statefulSetLabels, key)
  854. }
  855. }
  856. return statefulSetLabels
  857. }
  858. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  859. podControllerMap := map[podKey]controllerKey{}
  860. // For each controller, turn the labels into a selector and attempt to
  861. // match it with each set of pod labels. A match indicates that the pod
  862. // belongs to the controller.
  863. for cKey, cLabels := range controllerLabels {
  864. selector := labels.Set(cLabels).AsSelectorPreValidated()
  865. for pKey, pLabels := range podLabels {
  866. // If the pod is in a different cluster or namespace, there is
  867. // no need to compare the labels.
  868. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  869. continue
  870. }
  871. podLabelSet := labels.Set(pLabels)
  872. if selector.Matches(podLabelSet) {
  873. if _, ok := podControllerMap[pKey]; ok {
  874. log.Warningf("CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  875. }
  876. podControllerMap[pKey] = cKey
  877. }
  878. }
  879. }
  880. return podControllerMap
  881. }
  882. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  883. daemonSetLabels := map[podKey]controllerKey{}
  884. for _, res := range resDaemonSetLabels {
  885. controllerKey, err := resultDaemonSetKey(res, "cluster_id", "namespace", "owner_name")
  886. if err != nil {
  887. continue
  888. }
  889. pod, err := res.GetString("pod")
  890. if err != nil {
  891. log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  892. }
  893. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  894. daemonSetLabels[podKey] = controllerKey
  895. }
  896. return daemonSetLabels
  897. }
  898. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  899. jobLabels := map[podKey]controllerKey{}
  900. for _, res := range resJobLabels {
  901. controllerKey, err := resultJobKey(res, "cluster_id", "namespace", "owner_name")
  902. if err != nil {
  903. continue
  904. }
  905. // Convert the name of Jobs generated by CronJobs to the name of the
  906. // CronJob by stripping the timestamp off the end.
  907. match := isCron.FindStringSubmatch(controllerKey.Controller)
  908. if match != nil {
  909. controllerKey.Controller = match[1]
  910. }
  911. pod, err := res.GetString("pod")
  912. if err != nil {
  913. log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  914. }
  915. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  916. jobLabels[podKey] = controllerKey
  917. }
  918. return jobLabels
  919. }
  920. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, serviceLabels map[serviceKey]map[string]string) {
  921. podServicesMap := map[podKey][]serviceKey{}
  922. // For each service, turn the labels into a selector and attempt to
  923. // match it with each set of pod labels. A match indicates that the pod
  924. // belongs to the service.
  925. for sKey, sLabels := range serviceLabels {
  926. selector := labels.Set(sLabels).AsSelectorPreValidated()
  927. for pKey, pLabels := range podLabels {
  928. // If the pod is in a different cluster or namespace, there is
  929. // no need to compare the labels.
  930. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  931. continue
  932. }
  933. podLabelSet := labels.Set(pLabels)
  934. if selector.Matches(podLabelSet) {
  935. if _, ok := podServicesMap[pKey]; !ok {
  936. podServicesMap[pKey] = []serviceKey{}
  937. }
  938. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  939. }
  940. }
  941. }
  942. // For each allocation in each pod, attempt to find and apply the list of
  943. // services associated with the allocation's pod.
  944. for key, pod := range podMap {
  945. for _, alloc := range pod.Allocations {
  946. if sKeys, ok := podServicesMap[key]; ok {
  947. services := []string{}
  948. for _, sKey := range sKeys {
  949. services = append(services, sKey.Service)
  950. }
  951. alloc.Properties.SetServices(services)
  952. }
  953. }
  954. }
  955. }
  956. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  957. for key, pod := range podMap {
  958. for _, alloc := range pod.Allocations {
  959. if controllerKey, ok := podControllerMap[key]; ok {
  960. alloc.Properties.SetControllerKind(controllerKey.ControllerKind)
  961. alloc.Properties.SetController(controllerKey.Controller)
  962. }
  963. }
  964. }
  965. }
  966. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  967. for _, res := range resNodeCostPerCPUHr {
  968. cluster, err := res.GetString("cluster_id")
  969. if err != nil {
  970. cluster = env.GetClusterID()
  971. }
  972. node, err := res.GetString("node")
  973. if err != nil {
  974. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  975. continue
  976. }
  977. instanceType, err := res.GetString("instance_type")
  978. if err != nil {
  979. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  980. continue
  981. }
  982. key := newNodeKey(cluster, node)
  983. if _, ok := nodeMap[key]; !ok {
  984. nodeMap[key] = &NodePricing{
  985. Name: node,
  986. NodeType: instanceType,
  987. }
  988. }
  989. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  990. }
  991. }
  992. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  993. for _, res := range resNodeCostPerRAMGiBHr {
  994. cluster, err := res.GetString("cluster_id")
  995. if err != nil {
  996. cluster = env.GetClusterID()
  997. }
  998. node, err := res.GetString("node")
  999. if err != nil {
  1000. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1001. continue
  1002. }
  1003. instanceType, err := res.GetString("instance_type")
  1004. if err != nil {
  1005. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1006. continue
  1007. }
  1008. key := newNodeKey(cluster, node)
  1009. if _, ok := nodeMap[key]; !ok {
  1010. nodeMap[key] = &NodePricing{
  1011. Name: node,
  1012. NodeType: instanceType,
  1013. }
  1014. }
  1015. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1016. }
  1017. }
  1018. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1019. for _, res := range resNodeCostPerGPUHr {
  1020. cluster, err := res.GetString("cluster_id")
  1021. if err != nil {
  1022. cluster = env.GetClusterID()
  1023. }
  1024. node, err := res.GetString("node")
  1025. if err != nil {
  1026. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1027. continue
  1028. }
  1029. instanceType, err := res.GetString("instance_type")
  1030. if err != nil {
  1031. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1032. continue
  1033. }
  1034. key := newNodeKey(cluster, node)
  1035. if _, ok := nodeMap[key]; !ok {
  1036. nodeMap[key] = &NodePricing{
  1037. Name: node,
  1038. NodeType: instanceType,
  1039. }
  1040. }
  1041. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1042. }
  1043. }
  1044. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1045. for _, res := range resNodeIsSpot {
  1046. cluster, err := res.GetString("cluster_id")
  1047. if err != nil {
  1048. cluster = env.GetClusterID()
  1049. }
  1050. node, err := res.GetString("node")
  1051. if err != nil {
  1052. log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1053. continue
  1054. }
  1055. key := newNodeKey(cluster, node)
  1056. if _, ok := nodeMap[key]; !ok {
  1057. log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1058. continue
  1059. }
  1060. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1061. }
  1062. }
  1063. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1064. if cm == nil {
  1065. return
  1066. }
  1067. c, err := cm.Provider.GetConfig()
  1068. if err != nil {
  1069. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1070. return
  1071. }
  1072. discount, err := ParsePercentString(c.Discount)
  1073. if err != nil {
  1074. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1075. return
  1076. }
  1077. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1078. if err != nil {
  1079. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1080. return
  1081. }
  1082. for _, node := range nodeMap {
  1083. // TODO GKE Reserved Instances into account
  1084. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1085. node.CostPerCPUHr *= (1.0 - node.Discount)
  1086. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1087. }
  1088. }
  1089. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1090. for _, res := range resPVCostPerGiBHour {
  1091. cluster, err := res.GetString("cluster_id")
  1092. if err != nil {
  1093. cluster = env.GetClusterID()
  1094. }
  1095. name, err := res.GetString("volumename")
  1096. if err != nil {
  1097. log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
  1098. continue
  1099. }
  1100. key := newPVKey(cluster, name)
  1101. pvMap[key] = &PV{
  1102. Cluster: cluster,
  1103. Name: name,
  1104. CostPerGiBHour: res.Values[0].Value,
  1105. }
  1106. }
  1107. }
  1108. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1109. for _, res := range resPVBytes {
  1110. key, err := resultPVKey(res, "cluster_id", "persistentvolume")
  1111. if err != nil {
  1112. log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1113. continue
  1114. }
  1115. if _, ok := pvMap[key]; !ok {
  1116. log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1117. continue
  1118. }
  1119. pvMap[key].Bytes = res.Values[0].Value
  1120. }
  1121. }
  1122. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1123. for _, res := range resPVCInfo {
  1124. cluster, err := res.GetString("cluster_id")
  1125. if err != nil {
  1126. cluster = env.GetClusterID()
  1127. }
  1128. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1129. if err != nil {
  1130. log.Warningf("CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1131. continue
  1132. }
  1133. namespace := values["namespace"]
  1134. name := values["persistentvolumeclaim"]
  1135. volume := values["volumename"]
  1136. storageClass := values["storageclass"]
  1137. pvKey := newPVKey(cluster, volume)
  1138. pvcKey := newPVCKey(cluster, namespace, name)
  1139. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1140. // the PVC was running, respectively. We subtract 1m from pvcStart
  1141. // because this point will actually represent the end of the first
  1142. // minute. We don't subtract from pvcEnd because it already represents
  1143. // the end of the last minute.
  1144. var pvcStart, pvcEnd time.Time
  1145. for _, datum := range res.Values {
  1146. t := time.Unix(int64(datum.Timestamp), 0)
  1147. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1148. pvcStart = t
  1149. }
  1150. if datum.Value > 0 && window.Contains(t) {
  1151. pvcEnd = t
  1152. }
  1153. }
  1154. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1155. log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1156. }
  1157. pvcStart = pvcStart.Add(-time.Minute)
  1158. if _, ok := pvMap[pvKey]; !ok {
  1159. log.Warningf("CostModel.ComputeAllocation: PV missing for PVC info query result: %s", pvKey)
  1160. continue
  1161. }
  1162. pvMap[pvKey].StorageClass = storageClass
  1163. if _, ok := pvcMap[pvcKey]; !ok {
  1164. pvcMap[pvcKey] = &PVC{}
  1165. }
  1166. pvcMap[pvcKey].Name = name
  1167. pvcMap[pvcKey].Namespace = namespace
  1168. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1169. pvcMap[pvcKey].Start = pvcStart
  1170. pvcMap[pvcKey].End = pvcEnd
  1171. }
  1172. }
  1173. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1174. for _, res := range resPVCBytesRequested {
  1175. key, err := resultPVCKey(res, "cluster_id", "namespace", "persistentvolumeclaim")
  1176. if err != nil {
  1177. log.Warningf("CostModel.ComputeAllocation: PVC bytes requested query result missing field: %s", err)
  1178. continue
  1179. }
  1180. if _, ok := pvcMap[key]; !ok {
  1181. log.Warningf("CostModel.ComputeAllocation: PVC bytes requested result for missing PVC: %s", key)
  1182. continue
  1183. }
  1184. pvcMap[key].Bytes = res.Values[0].Value
  1185. }
  1186. }
  1187. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1188. for _, res := range resPodPVCAllocation {
  1189. cluster, err := res.GetString("cluster_id")
  1190. if err != nil {
  1191. cluster = env.GetClusterID()
  1192. }
  1193. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1194. if err != nil {
  1195. log.Warningf("CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1196. continue
  1197. }
  1198. namespace := values["namespace"]
  1199. pod := values["pod"]
  1200. name := values["persistentvolumeclaim"]
  1201. volume := values["persistentvolume"]
  1202. podKey := newPodKey(cluster, namespace, pod)
  1203. pvKey := newPVKey(cluster, volume)
  1204. pvcKey := newPVCKey(cluster, namespace, name)
  1205. if _, ok := pvMap[pvKey]; !ok {
  1206. log.Warningf("CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1207. continue
  1208. }
  1209. if _, ok := podPVCMap[podKey]; !ok {
  1210. podPVCMap[podKey] = []*PVC{}
  1211. }
  1212. pvc, ok := pvcMap[pvcKey]
  1213. if !ok {
  1214. log.Warningf("CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1215. continue
  1216. }
  1217. count := 1
  1218. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1219. count = len(pod.Allocations)
  1220. } else {
  1221. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1222. }
  1223. pvc.Count = count
  1224. pvc.Mounted = true
  1225. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1226. }
  1227. }
  1228. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1229. unmountedPVBytes := map[string]float64{}
  1230. unmountedPVCost := map[string]float64{}
  1231. for _, pv := range pvMap {
  1232. mounted := false
  1233. for _, pvc := range pvcMap {
  1234. if pvc.Volume == nil {
  1235. continue
  1236. }
  1237. if pvc.Volume == pv {
  1238. mounted = true
  1239. break
  1240. }
  1241. }
  1242. if !mounted {
  1243. gib := pv.Bytes / 1024 / 1024 / 1024
  1244. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1245. cost := pv.CostPerGiBHour * gib * hrs
  1246. unmountedPVCost[pv.Cluster] += cost
  1247. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1248. }
  1249. }
  1250. for cluster, amount := range unmountedPVCost {
  1251. container := kubecost.UnmountedSuffix
  1252. pod := kubecost.UnmountedSuffix
  1253. namespace := kubecost.UnmountedSuffix
  1254. node := ""
  1255. key := newPodKey(cluster, namespace, pod)
  1256. podMap[key] = &Pod{
  1257. Window: window.Clone(),
  1258. Start: *window.Start(),
  1259. End: *window.End(),
  1260. Key: key,
  1261. Allocations: map[string]*kubecost.Allocation{},
  1262. }
  1263. podMap[key].AppendContainer(container)
  1264. podMap[key].Allocations[container].Properties.SetCluster(cluster)
  1265. podMap[key].Allocations[container].Properties.SetNode(node)
  1266. podMap[key].Allocations[container].Properties.SetNamespace(namespace)
  1267. podMap[key].Allocations[container].Properties.SetPod(pod)
  1268. podMap[key].Allocations[container].Properties.SetContainer(container)
  1269. podMap[key].Allocations[container].PVByteHours = unmountedPVBytes[cluster] * window.Minutes() / 60.0
  1270. podMap[key].Allocations[container].PVCost = amount
  1271. }
  1272. }
  1273. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1274. unmountedPVCBytes := map[namespaceKey]float64{}
  1275. unmountedPVCCost := map[namespaceKey]float64{}
  1276. for _, pvc := range pvcMap {
  1277. if !pvc.Mounted && pvc.Volume != nil {
  1278. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1279. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1280. hrs := pvc.Minutes() / 60.0
  1281. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1282. unmountedPVCCost[key] += cost
  1283. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1284. }
  1285. }
  1286. for key, amount := range unmountedPVCCost {
  1287. container := kubecost.UnmountedSuffix
  1288. pod := kubecost.UnmountedSuffix
  1289. namespace := key.Namespace
  1290. node := ""
  1291. cluster := key.Cluster
  1292. podKey := newPodKey(cluster, namespace, pod)
  1293. podMap[podKey] = &Pod{
  1294. Window: window.Clone(),
  1295. Start: *window.Start(),
  1296. End: *window.End(),
  1297. Key: podKey,
  1298. Allocations: map[string]*kubecost.Allocation{},
  1299. }
  1300. podMap[podKey].AppendContainer(container)
  1301. podMap[podKey].Allocations[container].Properties.SetCluster(cluster)
  1302. podMap[podKey].Allocations[container].Properties.SetNode(node)
  1303. podMap[podKey].Allocations[container].Properties.SetNamespace(namespace)
  1304. podMap[podKey].Allocations[container].Properties.SetPod(pod)
  1305. podMap[podKey].Allocations[container].Properties.SetContainer(container)
  1306. podMap[podKey].Allocations[container].PVByteHours = unmountedPVCBytes[key] * window.Minutes() / 60.0
  1307. podMap[podKey].Allocations[container].PVCost = amount
  1308. }
  1309. }
  1310. // getNodePricing determines node pricing, given a key and a mapping from keys
  1311. // to their NodePricing instances, as well as the custom pricing configuration
  1312. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1313. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1314. // back on custom pricing as a default.
  1315. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1316. // Find the relevant NodePricing, if it exists. If not, substitute the
  1317. // custom NodePricing as a default.
  1318. node, ok := nodeMap[nodeKey]
  1319. if !ok || node == nil {
  1320. if nodeKey.Node != "" {
  1321. log.Warningf("CostModel: failed to find node for %s", nodeKey)
  1322. }
  1323. return cm.getCustomNodePricing(false)
  1324. }
  1325. // If custom pricing is enabled and can be retrieved, override detected
  1326. // node pricing with the custom values.
  1327. customPricingConfig, err := cm.Provider.GetConfig()
  1328. if err != nil {
  1329. log.Warningf("CostModel: failed to load custom pricing: %s", err)
  1330. }
  1331. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1332. return cm.getCustomNodePricing(node.Preemptible)
  1333. }
  1334. // If any of the values are NaN or zero, replace them with the custom
  1335. // values as default.
  1336. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1337. // them as strings like this?
  1338. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1339. log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1340. cpuCostStr := customPricingConfig.CPU
  1341. if node.Preemptible {
  1342. cpuCostStr = customPricingConfig.SpotCPU
  1343. }
  1344. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1345. if err != nil {
  1346. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1347. }
  1348. node.CostPerCPUHr = costPerCPUHr
  1349. }
  1350. if math.IsNaN(node.CostPerGPUHr) {
  1351. log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1352. gpuCostStr := customPricingConfig.GPU
  1353. if node.Preemptible {
  1354. gpuCostStr = customPricingConfig.SpotGPU
  1355. }
  1356. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1357. if err != nil {
  1358. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1359. }
  1360. node.CostPerGPUHr = costPerGPUHr
  1361. }
  1362. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1363. log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1364. ramCostStr := customPricingConfig.RAM
  1365. if node.Preemptible {
  1366. ramCostStr = customPricingConfig.SpotRAM
  1367. }
  1368. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1369. if err != nil {
  1370. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1371. }
  1372. node.CostPerRAMGiBHr = costPerRAMHr
  1373. }
  1374. return node
  1375. }
  1376. // getCustomNodePricing converts the CostModel's configured custom pricing
  1377. // values into a NodePricing instance.
  1378. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1379. customPricingConfig, err := cm.Provider.GetConfig()
  1380. if err != nil {
  1381. return nil
  1382. }
  1383. cpuCostStr := customPricingConfig.CPU
  1384. gpuCostStr := customPricingConfig.GPU
  1385. ramCostStr := customPricingConfig.RAM
  1386. if spot {
  1387. cpuCostStr = customPricingConfig.SpotCPU
  1388. gpuCostStr = customPricingConfig.SpotGPU
  1389. ramCostStr = customPricingConfig.SpotRAM
  1390. }
  1391. node := &NodePricing{}
  1392. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1393. if err != nil {
  1394. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1395. }
  1396. node.CostPerCPUHr = costPerCPUHr
  1397. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1398. if err != nil {
  1399. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1400. }
  1401. node.CostPerGPUHr = costPerGPUHr
  1402. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1403. if err != nil {
  1404. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1405. }
  1406. node.CostPerRAMGiBHr = costPerRAMHr
  1407. return node
  1408. }
  1409. type NodePricing struct {
  1410. Name string
  1411. NodeType string
  1412. Preemptible bool
  1413. CostPerCPUHr float64
  1414. CostPerRAMGiBHr float64
  1415. CostPerGPUHr float64
  1416. Discount float64
  1417. Source string
  1418. }
  1419. // Pod describes a running pod's start and end time within a Window and
  1420. // all the Allocations (i.e. containers) contained within it.
  1421. type Pod struct {
  1422. Window kubecost.Window
  1423. Start time.Time
  1424. End time.Time
  1425. Key podKey
  1426. Allocations map[string]*kubecost.Allocation
  1427. }
  1428. // AppendContainer adds an entry for the given container name to the Pod.
  1429. func (p Pod) AppendContainer(container string) {
  1430. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1431. alloc := &kubecost.Allocation{
  1432. Name: name,
  1433. Properties: kubecost.Properties{},
  1434. Window: p.Window.Clone(),
  1435. Start: p.Start,
  1436. End: p.End,
  1437. }
  1438. alloc.Properties.SetContainer(container)
  1439. alloc.Properties.SetPod(p.Key.Pod)
  1440. alloc.Properties.SetNamespace(p.Key.Namespace)
  1441. alloc.Properties.SetCluster(p.Key.Cluster)
  1442. p.Allocations[container] = alloc
  1443. }
  1444. // PVC describes a PersistentVolumeClaim
  1445. // TODO:CLEANUP move to pkg/kubecost?
  1446. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1447. type PVC struct {
  1448. Bytes float64 `json:"bytes"`
  1449. Count int `json:"count"`
  1450. Name string `json:"name"`
  1451. Cluster string `json:"cluster"`
  1452. Namespace string `json:"namespace"`
  1453. Volume *PV `json:"persistentVolume"`
  1454. Mounted bool `json:"mounted"`
  1455. Start time.Time `json:"start"`
  1456. End time.Time `json:"end"`
  1457. }
  1458. // Cost computes the cumulative cost of the PVC
  1459. func (pvc *PVC) Cost() float64 {
  1460. if pvc == nil || pvc.Volume == nil {
  1461. return 0.0
  1462. }
  1463. gib := pvc.Bytes / 1024 / 1024 / 1024
  1464. hrs := pvc.Minutes() / 60.0
  1465. return pvc.Volume.CostPerGiBHour * gib * hrs
  1466. }
  1467. // Minutes computes the number of minutes over which the PVC is defined
  1468. func (pvc *PVC) Minutes() float64 {
  1469. if pvc == nil {
  1470. return 0.0
  1471. }
  1472. return pvc.End.Sub(pvc.Start).Minutes()
  1473. }
  1474. // String returns a string representation of the PVC
  1475. func (pvc *PVC) String() string {
  1476. if pvc == nil {
  1477. return "<nil>"
  1478. }
  1479. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  1480. }
  1481. // PV describes a PersistentVolume
  1482. // TODO:CLEANUP move to pkg/kubecost?
  1483. type PV struct {
  1484. Bytes float64 `json:"bytes"`
  1485. CostPerGiBHour float64 `json:"costPerGiBHour"`
  1486. Cluster string `json:"cluster"`
  1487. Name string `json:"name"`
  1488. StorageClass string `json:"storageClass"`
  1489. }
  1490. // String returns a string representation of the PV
  1491. func (pv *PV) String() string {
  1492. if pv == nil {
  1493. return "<nil>"
  1494. }
  1495. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  1496. }