allocation.go 62 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/env"
  10. "github.com/kubecost/cost-model/pkg/kubecost"
  11. "github.com/kubecost/cost-model/pkg/log"
  12. "github.com/kubecost/cost-model/pkg/prom"
  13. "github.com/kubecost/cost-model/pkg/util"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, cluster_id)[%s:%s]%s`
  18. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  19. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  20. queryFmtRAMUsage = `avg(avg_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  21. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  22. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  23. queryFmtCPUUsage = `avg(rate(container_cpu_usage_seconds_total{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  24. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  25. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  26. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  27. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  28. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
  29. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, cluster_id)[%s:%s]%s`
  30. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, cluster_id)`
  31. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, cluster_id)`
  32. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, cluster_id)`
  33. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, cluster_id)`
  34. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  35. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (cluster_id)`
  36. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  37. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (cluster_id)`
  38. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  39. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (cluster_id)`
  40. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
  41. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
  42. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
  43. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
  44. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
  45. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
  46. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
  47. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, cluster_id)`
  48. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,cluster_id)`
  49. )
  50. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  51. // for the window defined by the given start and end times. The Allocations
  52. // returned are unaggregated (i.e. down to the container level).
  53. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  54. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  55. // 2. Run and apply the results of the remaining queries to
  56. // 3. Build out AllocationSet from completed Pod map
  57. // Create a window spanning the requested query
  58. window := kubecost.NewWindow(&start, &end)
  59. // Create an empty AllocationSet. For safety, in the case of an error, we
  60. // should prefer to return this empty set with the error. (In the case of
  61. // no error, of course we populate the set and return it.)
  62. allocSet := kubecost.NewAllocationSet(start, end)
  63. // (1) Build out Pod map
  64. // Build out a map of Allocations as a mapping from pod-to-container-to-
  65. // underlying-Allocation instance, starting with (start, end) so that we
  66. // begin with minutes, from which we compute resource allocation and cost
  67. // totals from measured rate data.
  68. podMap := map[podKey]*Pod{}
  69. // clusterStarts and clusterEnds record the earliest start and latest end
  70. // times, respectively, on a cluster-basis. These are used for unmounted
  71. // PVs and other "virtual" Allocations so that minutes are maximally
  72. // accurate during start-up or spin-down of a cluster
  73. clusterStart := map[string]time.Time{}
  74. clusterEnd := map[string]time.Time{}
  75. cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
  76. // (2) Run and apply remaining queries
  77. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  78. // including handling Thanos offset
  79. durStr, offStr, err := window.DurationOffsetForPrometheus()
  80. if err != nil {
  81. // Negative duration, so return empty set
  82. return allocSet, nil
  83. }
  84. // Convert resolution duration to a query-ready string
  85. resStr := util.DurationString(resolution)
  86. ctx := prom.NewContext(cm.PrometheusClient)
  87. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr)
  88. resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
  89. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr)
  90. resChRAMRequests := ctx.Query(queryRAMRequests)
  91. queryRAMUsage := fmt.Sprintf(queryFmtRAMUsage, durStr, offStr)
  92. resChRAMUsage := ctx.Query(queryRAMUsage)
  93. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr)
  94. resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
  95. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr)
  96. resChCPURequests := ctx.Query(queryCPURequests)
  97. queryCPUUsage := fmt.Sprintf(queryFmtCPUUsage, durStr, offStr)
  98. resChCPUUsage := ctx.Query(queryCPUUsage)
  99. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr)
  100. resChGPUsRequested := ctx.Query(queryGPUsRequested)
  101. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr)
  102. resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
  103. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr)
  104. resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
  105. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr)
  106. resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
  107. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
  108. resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
  109. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, durStr, resStr, offStr)
  110. resChPVCInfo := ctx.Query(queryPVCInfo)
  111. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr)
  112. resChPVBytes := ctx.Query(queryPVBytes)
  113. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr)
  114. resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
  115. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr)
  116. resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
  117. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr)
  118. resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
  119. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr)
  120. resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
  121. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr)
  122. resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
  123. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr)
  124. resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
  125. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr)
  126. resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
  127. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr)
  128. resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
  129. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr)
  130. resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
  131. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
  132. resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
  133. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
  134. resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
  135. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
  136. resChPodLabels := ctx.Query(queryPodLabels)
  137. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
  138. resChPodAnnotations := ctx.Query(queryPodAnnotations)
  139. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
  140. resChServiceLabels := ctx.Query(queryServiceLabels)
  141. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
  142. resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
  143. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
  144. resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
  145. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr)
  146. resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
  147. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr)
  148. resChJobLabels := ctx.Query(queryJobLabels)
  149. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  150. resCPURequests, _ := resChCPURequests.Await()
  151. resCPUUsage, _ := resChCPUUsage.Await()
  152. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  153. resRAMRequests, _ := resChRAMRequests.Await()
  154. resRAMUsage, _ := resChRAMUsage.Await()
  155. resGPUsRequested, _ := resChGPUsRequested.Await()
  156. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  157. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  158. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  159. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  160. resPVBytes, _ := resChPVBytes.Await()
  161. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  162. resPVCInfo, _ := resChPVCInfo.Await()
  163. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  164. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  165. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  166. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  167. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  168. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  169. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  170. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  171. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  172. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  173. resPodLabels, _ := resChPodLabels.Await()
  174. resPodAnnotations, _ := resChPodAnnotations.Await()
  175. resServiceLabels, _ := resChServiceLabels.Await()
  176. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  177. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  178. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  179. resJobLabels, _ := resChJobLabels.Await()
  180. if ctx.HasErrors() {
  181. for _, err := range ctx.Errors() {
  182. log.Errorf("CostModel.ComputeAllocation: %s", err)
  183. }
  184. return allocSet, ctx.ErrorCollection()
  185. }
  186. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  187. applyCPUCoresRequested(podMap, resCPURequests)
  188. applyCPUCoresUsed(podMap, resCPUUsage)
  189. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  190. applyRAMBytesRequested(podMap, resRAMRequests)
  191. applyRAMBytesUsed(podMap, resRAMUsage)
  192. applyGPUsRequested(podMap, resGPUsRequested)
  193. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  194. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  195. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  196. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  197. podLabels := resToPodLabels(resPodLabels)
  198. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  199. podAnnotations := resToPodAnnotations(resPodAnnotations)
  200. applyLabels(podMap, namespaceLabels, podLabels)
  201. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  202. serviceLabels := getServiceLabels(resServiceLabels)
  203. applyServicesToPods(podMap, podLabels, serviceLabels)
  204. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  205. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  206. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  207. podJobMap := resToPodJobMap(resJobLabels)
  208. applyControllersToPods(podMap, podDeploymentMap)
  209. applyControllersToPods(podMap, podStatefulSetMap)
  210. applyControllersToPods(podMap, podDaemonSetMap)
  211. applyControllersToPods(podMap, podJobMap)
  212. // TODO breakdown network costs?
  213. // Build out a map of Nodes with resource costs, discounts, and node types
  214. // for converting resource allocation data to cumulative costs.
  215. nodeMap := map[nodeKey]*NodePricing{}
  216. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  217. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  218. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  219. applyNodeSpot(nodeMap, resNodeIsSpot)
  220. applyNodeDiscount(nodeMap, cm)
  221. // Build out the map of all PVs with class, size and cost-per-hour.
  222. // Note: this does not record time running, which we may want to
  223. // include later for increased PV precision. (As long as the PV has
  224. // a PVC, we get time running there, so this is only inaccurate
  225. // for short-lived, unmounted PVs.)
  226. pvMap := map[pvKey]*PV{}
  227. buildPVMap(pvMap, resPVCostPerGiBHour)
  228. applyPVBytes(pvMap, resPVBytes)
  229. // Build out the map of all PVCs with time running, bytes requested,
  230. // and connect to the correct PV from pvMap. (If no PV exists, that
  231. // is noted, but does not result in any allocation/cost.)
  232. pvcMap := map[pvcKey]*PVC{}
  233. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  234. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  235. // Build out the relationships of pods to their PVCs. This step
  236. // populates the PVC.Count field so that PVC allocation can be
  237. // split appropriately among each pod's container allocation.
  238. podPVCMap := map[podKey][]*PVC{}
  239. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  240. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  241. // cluster representing each cluster's unmounted PVs (if necessary).
  242. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  243. // (3) Build out AllocationSet from Pod map
  244. for _, pod := range podMap {
  245. for _, alloc := range pod.Allocations {
  246. cluster, _ := alloc.Properties.GetCluster()
  247. nodeName, _ := alloc.Properties.GetNode()
  248. namespace, _ := alloc.Properties.GetNamespace()
  249. pod, _ := alloc.Properties.GetPod()
  250. container, _ := alloc.Properties.GetContainer()
  251. podKey := newPodKey(cluster, namespace, pod)
  252. nodeKey := newNodeKey(cluster, nodeName)
  253. node := cm.getNodePricing(nodeMap, nodeKey)
  254. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  255. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  256. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  257. if pvcs, ok := podPVCMap[podKey]; ok {
  258. for _, pvc := range pvcs {
  259. // Determine the (start, end) of the relationship between the
  260. // given PVC and the associated Allocation so that a precise
  261. // number of hours can be used to compute cumulative cost.
  262. s, e := alloc.Start, alloc.End
  263. if pvc.Start.After(alloc.Start) {
  264. s = pvc.Start
  265. }
  266. if pvc.End.Before(alloc.End) {
  267. e = pvc.End
  268. }
  269. minutes := e.Sub(s).Minutes()
  270. hrs := minutes / 60.0
  271. count := float64(pvc.Count)
  272. if pvc.Count < 1 {
  273. count = 1
  274. }
  275. gib := pvc.Bytes / 1024 / 1024 / 1024
  276. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  277. // Apply the size and cost of the PV to the allocation, each
  278. // weighted by count (i.e. the number of containers in the pod)
  279. alloc.PVByteHours += pvc.Bytes * hrs / count
  280. alloc.PVCost += cost / count
  281. }
  282. }
  283. // Make sure that the name is correct (node may not be present at this
  284. // point due to it missing from queryMinutes) then insert.
  285. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  286. allocSet.Set(alloc)
  287. }
  288. }
  289. return allocSet, nil
  290. }
  291. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  292. // Assumes that window is positive and closed
  293. start, end := *window.Start(), *window.End()
  294. // Convert resolution duration to a query-ready string
  295. resStr := util.DurationString(resolution)
  296. ctx := prom.NewContext(cm.PrometheusClient)
  297. // Query for (start, end) by (pod, namespace, cluster) over the given
  298. // window, using the given resolution, and if necessary in batches no
  299. // larger than the given maximum batch size. If working in batches, track
  300. // overall progress by starting with (window.start, window.start) and
  301. // querying in batches no larger than maxBatchSize from start-to-end,
  302. // folding each result set into podMap as the results come back.
  303. coverage := kubecost.NewWindow(&start, &start)
  304. numQuery := 1
  305. for coverage.End().Before(end) {
  306. // Determine the (start, end) of the current batch
  307. batchStart := *coverage.End()
  308. batchEnd := coverage.End().Add(maxBatchSize)
  309. if batchEnd.After(end) {
  310. batchEnd = end
  311. }
  312. batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
  313. var resPods []*prom.QueryResult
  314. var err error
  315. maxTries := 3
  316. numTries := 0
  317. for resPods == nil && numTries < maxTries {
  318. numTries++
  319. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  320. // including handling Thanos offset
  321. durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
  322. if err != nil || durStr == "" {
  323. // Negative duration, so set empty results and don't query
  324. resPods = []*prom.QueryResult{}
  325. err = nil
  326. break
  327. }
  328. // Submit and profile query
  329. queryPods := fmt.Sprintf(queryFmtPods, durStr, resStr, offStr)
  330. queryProfile := time.Now()
  331. resPods, err = ctx.Query(queryPods).Await()
  332. if err != nil {
  333. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  334. resPods = nil
  335. }
  336. }
  337. if err != nil {
  338. return err
  339. }
  340. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  341. coverage = coverage.ExpandEnd(batchEnd)
  342. numQuery++
  343. }
  344. return nil
  345. }
  346. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  347. for _, res := range resPods {
  348. if len(res.Values) == 0 {
  349. log.Warningf("CostModel.ComputeAllocation: empty minutes result")
  350. continue
  351. }
  352. cluster, err := res.GetString("cluster_id")
  353. if err != nil {
  354. cluster = env.GetClusterID()
  355. }
  356. labels, err := res.GetStrings("namespace", "pod")
  357. if err != nil {
  358. log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  359. continue
  360. }
  361. namespace := labels["namespace"]
  362. pod := labels["pod"]
  363. key := newPodKey(cluster, namespace, pod)
  364. // allocStart and allocEnd are the timestamps of the first and last
  365. // minutes the pod was running, respectively. We subtract one resolution
  366. // from allocStart because this point will actually represent the end
  367. // of the first minute. We don't subtract from allocEnd because it
  368. // already represents the end of the last minute.
  369. var allocStart, allocEnd time.Time
  370. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  371. for _, datum := range res.Values {
  372. t := time.Unix(int64(datum.Timestamp), 0)
  373. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  374. // Set the start timestamp to the earliest non-zero timestamp
  375. allocStart = t
  376. // Record adjustment coefficient, i.e. the portion of the start
  377. // timestamp to "ignore". That is, sometimes the value will be
  378. // 0.5, meaning that we should discount the time running by
  379. // half of the resolution the timestamp stands for.
  380. startAdjustmentCoeff = (1.0 - datum.Value)
  381. }
  382. if datum.Value > 0 && window.Contains(t) {
  383. // Set the end timestamp to the latest non-zero timestamp
  384. allocEnd = t
  385. // Record adjustment coefficient, i.e. the portion of the end
  386. // timestamp to "ignore". (See explanation above for start.)
  387. endAdjustmentCoeff = (1.0 - datum.Value)
  388. }
  389. }
  390. if allocStart.IsZero() || allocEnd.IsZero() {
  391. continue
  392. }
  393. // Adjust timestamps according to the resolution and the adjustment
  394. // coefficients, as described above. That is, count the start timestamp
  395. // from the beginning of the resolution, not the end. Then "reduce" the
  396. // start and end by the correct amount, in the case that the "running"
  397. // value of the first or last timestamp was not a full 1.0.
  398. allocStart = allocStart.Add(-resolution)
  399. // Note: the *100 and /100 are necessary because Duration is an int, so
  400. // 0.5, for instance, will be truncated, resulting in no adjustment.
  401. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  402. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  403. // If there is only one point with a value <= 0.5 that the start and
  404. // end timestamps both share, then we will enter this case because at
  405. // least half of a resolution will be subtracted from both the start
  406. // and the end. If that is the case, then add back half of each side
  407. // so that the pod is said to run for half a resolution total.
  408. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  409. // end up with allocEnd == allocStart and each coeff == 0.5. In
  410. // that case, add 0.25m to each side, resulting in 0.5m duration.
  411. // if !allocEnd.After(allocStart) && startAdjustmentCoeff == endAdjustmentCoeff {
  412. if !allocEnd.After(allocStart) {
  413. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  414. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  415. }
  416. // Set start if unset or this datum's start time is earlier than the
  417. // current earliest time.
  418. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  419. clusterStart[cluster] = allocStart
  420. }
  421. // Set end if unset or this datum's end time is later than the
  422. // current latest time.
  423. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  424. clusterEnd[cluster] = allocEnd
  425. }
  426. if pod, ok := podMap[key]; ok {
  427. // Pod has already been recorded, so update it accordingly
  428. if allocStart.Before(pod.Start) {
  429. pod.Start = allocStart
  430. }
  431. if allocEnd.After(pod.End) {
  432. pod.End = allocEnd
  433. }
  434. } else {
  435. // Pod has not been recorded yet, so insert it
  436. podMap[key] = &Pod{
  437. Window: window.Clone(),
  438. Start: allocStart,
  439. End: allocEnd,
  440. Key: key,
  441. Allocations: map[string]*kubecost.Allocation{},
  442. }
  443. }
  444. }
  445. }
  446. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  447. for _, res := range resCPUCoresAllocated {
  448. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  449. if err != nil {
  450. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  451. continue
  452. }
  453. pod, ok := podMap[key]
  454. if !ok {
  455. // log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result for unidentified pod: %s", key)
  456. continue
  457. }
  458. container, err := res.GetString("container")
  459. if err != nil {
  460. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  461. continue
  462. }
  463. if _, ok := pod.Allocations[container]; !ok {
  464. pod.AppendContainer(container)
  465. }
  466. cpuCores := res.Values[0].Value
  467. hours := pod.Allocations[container].Minutes() / 60.0
  468. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  469. node, err := res.GetString("node")
  470. if err != nil {
  471. log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  472. continue
  473. }
  474. pod.Allocations[container].Properties.SetNode(node)
  475. }
  476. }
  477. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  478. for _, res := range resCPUCoresRequested {
  479. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  480. if err != nil {
  481. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  482. continue
  483. }
  484. pod, ok := podMap[key]
  485. if !ok {
  486. // log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result for unidentified pod: %s", key)
  487. continue
  488. }
  489. container, err := res.GetString("container")
  490. if err != nil {
  491. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  492. continue
  493. }
  494. if _, ok := pod.Allocations[container]; !ok {
  495. pod.AppendContainer(container)
  496. }
  497. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  498. // If CPU allocation is less than requests, set CPUCoreHours to
  499. // request level.
  500. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  501. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  502. }
  503. node, err := res.GetString("node")
  504. if err != nil {
  505. log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  506. continue
  507. }
  508. pod.Allocations[container].Properties.SetNode(node)
  509. }
  510. }
  511. func applyCPUCoresUsed(podMap map[podKey]*Pod, resCPUCoresUsed []*prom.QueryResult) {
  512. for _, res := range resCPUCoresUsed {
  513. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  514. if err != nil {
  515. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage result missing field: %s", err)
  516. continue
  517. }
  518. pod, ok := podMap[key]
  519. if !ok {
  520. // log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage result for unidentified pod: %s", key)
  521. continue
  522. }
  523. container, err := res.GetString("container_name")
  524. if err != nil {
  525. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage query result missing 'container': %s", key)
  526. continue
  527. }
  528. if _, ok := pod.Allocations[container]; !ok {
  529. pod.AppendContainer(container)
  530. }
  531. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  532. }
  533. }
  534. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  535. for _, res := range resRAMBytesAllocated {
  536. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  537. if err != nil {
  538. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  539. continue
  540. }
  541. pod, ok := podMap[key]
  542. if !ok {
  543. // log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result for unidentified pod: %s", key)
  544. continue
  545. }
  546. container, err := res.GetString("container")
  547. if err != nil {
  548. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  549. continue
  550. }
  551. if _, ok := pod.Allocations[container]; !ok {
  552. pod.AppendContainer(container)
  553. }
  554. ramBytes := res.Values[0].Value
  555. hours := pod.Allocations[container].Minutes() / 60.0
  556. pod.Allocations[container].RAMByteHours = ramBytes * hours
  557. node, err := res.GetString("node")
  558. if err != nil {
  559. log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  560. continue
  561. }
  562. pod.Allocations[container].Properties.SetNode(node)
  563. }
  564. }
  565. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  566. for _, res := range resRAMBytesRequested {
  567. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  568. if err != nil {
  569. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  570. continue
  571. }
  572. pod, ok := podMap[key]
  573. if !ok {
  574. // log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result for unidentified pod: %s", key)
  575. continue
  576. }
  577. container, err := res.GetString("container")
  578. if err != nil {
  579. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  580. continue
  581. }
  582. if _, ok := pod.Allocations[container]; !ok {
  583. pod.AppendContainer(container)
  584. }
  585. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  586. // If RAM allocation is less than requests, set RAMByteHours to
  587. // request level.
  588. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  589. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  590. }
  591. node, err := res.GetString("node")
  592. if err != nil {
  593. log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  594. continue
  595. }
  596. pod.Allocations[container].Properties.SetNode(node)
  597. }
  598. }
  599. func applyRAMBytesUsed(podMap map[podKey]*Pod, resRAMBytesUsed []*prom.QueryResult) {
  600. for _, res := range resRAMBytesUsed {
  601. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  602. if err != nil {
  603. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage result missing field: %s", err)
  604. continue
  605. }
  606. pod, ok := podMap[key]
  607. if !ok {
  608. // log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage result for unidentified pod: %s", key)
  609. continue
  610. }
  611. container, err := res.GetString("container_name")
  612. if err != nil {
  613. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage query result missing 'container': %s", key)
  614. continue
  615. }
  616. if _, ok := pod.Allocations[container]; !ok {
  617. pod.AppendContainer(container)
  618. }
  619. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  620. }
  621. }
  622. func applyGPUsRequested(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult) {
  623. for _, res := range resGPUsRequested {
  624. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  625. if err != nil {
  626. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  627. continue
  628. }
  629. pod, ok := podMap[key]
  630. if !ok {
  631. // log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result for unidentified pod: %s", key)
  632. continue
  633. }
  634. container, err := res.GetString("container")
  635. if err != nil {
  636. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  637. continue
  638. }
  639. if _, ok := pod.Allocations[container]; !ok {
  640. pod.AppendContainer(container)
  641. }
  642. hrs := pod.Allocations[container].Minutes() / 60.0
  643. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  644. }
  645. }
  646. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  647. costPerGiBByCluster := map[string]float64{}
  648. for _, res := range resNetworkCostPerGiB {
  649. cluster, err := res.GetString("cluster_id")
  650. if err != nil {
  651. cluster = env.GetClusterID()
  652. }
  653. costPerGiBByCluster[cluster] = res.Values[0].Value
  654. }
  655. for _, res := range resNetworkGiB {
  656. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  657. if err != nil {
  658. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  659. continue
  660. }
  661. pod, ok := podMap[podKey]
  662. if !ok {
  663. // log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result for unidentified pod: %s", podKey)
  664. continue
  665. }
  666. for _, alloc := range pod.Allocations {
  667. gib := res.Values[0].Value / float64(len(pod.Allocations))
  668. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  669. alloc.NetworkCost = gib * costPerGiB
  670. }
  671. }
  672. }
  673. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[string]map[string]string {
  674. namespaceLabels := map[string]map[string]string{}
  675. for _, res := range resNamespaceLabels {
  676. namespace, err := res.GetString("namespace")
  677. if err != nil {
  678. continue
  679. }
  680. if _, ok := namespaceLabels[namespace]; !ok {
  681. namespaceLabels[namespace] = map[string]string{}
  682. }
  683. for k, l := range res.GetLabels() {
  684. namespaceLabels[namespace][k] = l
  685. }
  686. }
  687. return namespaceLabels
  688. }
  689. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  690. podLabels := map[podKey]map[string]string{}
  691. for _, res := range resPodLabels {
  692. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  693. if err != nil {
  694. continue
  695. }
  696. if _, ok := podLabels[podKey]; !ok {
  697. podLabels[podKey] = map[string]string{}
  698. }
  699. for k, l := range res.GetLabels() {
  700. podLabels[podKey][k] = l
  701. }
  702. }
  703. return podLabels
  704. }
  705. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  706. namespaceAnnotations := map[string]map[string]string{}
  707. for _, res := range resNamespaceAnnotations {
  708. namespace, err := res.GetString("namespace")
  709. if err != nil {
  710. continue
  711. }
  712. if _, ok := namespaceAnnotations[namespace]; !ok {
  713. namespaceAnnotations[namespace] = map[string]string{}
  714. }
  715. for k, l := range res.GetAnnotations() {
  716. namespaceAnnotations[namespace][k] = l
  717. }
  718. }
  719. return namespaceAnnotations
  720. }
  721. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  722. podAnnotations := map[podKey]map[string]string{}
  723. for _, res := range resPodAnnotations {
  724. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  725. if err != nil {
  726. continue
  727. }
  728. if _, ok := podAnnotations[podKey]; !ok {
  729. podAnnotations[podKey] = map[string]string{}
  730. }
  731. for k, l := range res.GetAnnotations() {
  732. podAnnotations[podKey][k] = l
  733. }
  734. }
  735. return podAnnotations
  736. }
  737. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[string]map[string]string, podLabels map[podKey]map[string]string) {
  738. for key, pod := range podMap {
  739. for _, alloc := range pod.Allocations {
  740. allocLabels, err := alloc.Properties.GetLabels()
  741. if err != nil {
  742. allocLabels = map[string]string{}
  743. }
  744. // Apply namespace labels first, then pod labels so that pod labels
  745. // overwrite namespace labels.
  746. if labels, ok := namespaceLabels[key.Namespace]; ok {
  747. for k, v := range labels {
  748. allocLabels[k] = v
  749. }
  750. }
  751. if labels, ok := podLabels[key]; ok {
  752. for k, v := range labels {
  753. allocLabels[k] = v
  754. }
  755. }
  756. alloc.Properties.SetLabels(allocLabels)
  757. }
  758. }
  759. }
  760. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  761. for key, pod := range podMap {
  762. for _, alloc := range pod.Allocations {
  763. allocAnnotations, err := alloc.Properties.GetAnnotations()
  764. if err != nil {
  765. allocAnnotations = map[string]string{}
  766. }
  767. // Apply namespace annotations first, then pod annotations so that
  768. // pod labels overwrite namespace labels.
  769. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  770. for k, v := range labels {
  771. allocAnnotations[k] = v
  772. }
  773. }
  774. if labels, ok := podAnnotations[key]; ok {
  775. for k, v := range labels {
  776. allocAnnotations[k] = v
  777. }
  778. }
  779. alloc.Properties.SetAnnotations(allocAnnotations)
  780. }
  781. }
  782. }
  783. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  784. serviceLabels := map[serviceKey]map[string]string{}
  785. for _, res := range resServiceLabels {
  786. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service")
  787. if err != nil {
  788. continue
  789. }
  790. if _, ok := serviceLabels[serviceKey]; !ok {
  791. serviceLabels[serviceKey] = map[string]string{}
  792. }
  793. for k, l := range res.GetLabels() {
  794. serviceLabels[serviceKey][k] = l
  795. }
  796. }
  797. // Prune duplicate services. That is, if the same service exists with
  798. // hyphens instead of underscores, keep the one that uses hyphens.
  799. for key := range serviceLabels {
  800. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  801. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  802. if _, ok := serviceLabels[duplicateKey]; ok {
  803. delete(serviceLabels, key)
  804. }
  805. }
  806. return serviceLabels
  807. }
  808. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  809. deploymentLabels := map[controllerKey]map[string]string{}
  810. for _, res := range resDeploymentLabels {
  811. controllerKey, err := resultDeploymentKey(res, "cluster_id", "namespace", "deployment")
  812. if err != nil {
  813. continue
  814. }
  815. if _, ok := deploymentLabels[controllerKey]; !ok {
  816. deploymentLabels[controllerKey] = map[string]string{}
  817. }
  818. for k, l := range res.GetLabels() {
  819. deploymentLabels[controllerKey][k] = l
  820. }
  821. }
  822. // Prune duplicate deployments. That is, if the same deployment exists with
  823. // hyphens instead of underscores, keep the one that uses hyphens.
  824. for key := range deploymentLabels {
  825. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  826. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  827. if _, ok := deploymentLabels[duplicateKey]; ok {
  828. delete(deploymentLabels, key)
  829. }
  830. }
  831. return deploymentLabels
  832. }
  833. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  834. statefulSetLabels := map[controllerKey]map[string]string{}
  835. for _, res := range resStatefulSetLabels {
  836. controllerKey, err := resultStatefulSetKey(res, "cluster_id", "namespace", "statefulSet")
  837. if err != nil {
  838. continue
  839. }
  840. if _, ok := statefulSetLabels[controllerKey]; !ok {
  841. statefulSetLabels[controllerKey] = map[string]string{}
  842. }
  843. for k, l := range res.GetLabels() {
  844. statefulSetLabels[controllerKey][k] = l
  845. }
  846. }
  847. // Prune duplicate stateful sets. That is, if the same stateful set exists
  848. // with hyphens instead of underscores, keep the one that uses hyphens.
  849. for key := range statefulSetLabels {
  850. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  851. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  852. if _, ok := statefulSetLabels[duplicateKey]; ok {
  853. delete(statefulSetLabels, key)
  854. }
  855. }
  856. return statefulSetLabels
  857. }
  858. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  859. podControllerMap := map[podKey]controllerKey{}
  860. // For each controller, turn the labels into a selector and attempt to
  861. // match it with each set of pod labels. A match indicates that the pod
  862. // belongs to the controller.
  863. for cKey, cLabels := range controllerLabels {
  864. selector := labels.Set(cLabels).AsSelectorPreValidated()
  865. for pKey, pLabels := range podLabels {
  866. // If the pod is in a different cluster or namespace, there is
  867. // no need to compare the labels.
  868. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  869. continue
  870. }
  871. podLabelSet := labels.Set(pLabels)
  872. if selector.Matches(podLabelSet) {
  873. if _, ok := podControllerMap[pKey]; ok {
  874. log.Warningf("CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  875. }
  876. podControllerMap[pKey] = cKey
  877. }
  878. }
  879. }
  880. return podControllerMap
  881. }
  882. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  883. daemonSetLabels := map[podKey]controllerKey{}
  884. for _, res := range resDaemonSetLabels {
  885. controllerKey, err := resultDaemonSetKey(res, "cluster_id", "namespace", "owner_name")
  886. if err != nil {
  887. continue
  888. }
  889. pod, err := res.GetString("pod")
  890. if err != nil {
  891. log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  892. }
  893. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  894. daemonSetLabels[podKey] = controllerKey
  895. }
  896. return daemonSetLabels
  897. }
  898. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  899. jobLabels := map[podKey]controllerKey{}
  900. for _, res := range resJobLabels {
  901. controllerKey, err := resultJobKey(res, "cluster_id", "namespace", "owner_name")
  902. if err != nil {
  903. continue
  904. }
  905. // Convert the name of Jobs generated by CronJobs to the name of the
  906. // CronJob by stripping the timestamp off the end.
  907. match := isCron.FindStringSubmatch(controllerKey.Controller)
  908. if match != nil {
  909. controllerKey.Controller = match[1]
  910. }
  911. pod, err := res.GetString("pod")
  912. if err != nil {
  913. log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  914. }
  915. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  916. jobLabels[podKey] = controllerKey
  917. }
  918. return jobLabels
  919. }
  920. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, serviceLabels map[serviceKey]map[string]string) {
  921. podServicesMap := map[podKey][]serviceKey{}
  922. // For each service, turn the labels into a selector and attempt to
  923. // match it with each set of pod labels. A match indicates that the pod
  924. // belongs to the service.
  925. for sKey, sLabels := range serviceLabels {
  926. selector := labels.Set(sLabels).AsSelectorPreValidated()
  927. for pKey, pLabels := range podLabels {
  928. // If the pod is in a different cluster or namespace, there is
  929. // no need to compare the labels.
  930. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  931. continue
  932. }
  933. podLabelSet := labels.Set(pLabels)
  934. if selector.Matches(podLabelSet) {
  935. if _, ok := podServicesMap[pKey]; !ok {
  936. podServicesMap[pKey] = []serviceKey{}
  937. }
  938. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  939. }
  940. }
  941. }
  942. // For each allocation in each pod, attempt to find and apply the list of
  943. // services associated with the allocation's pod.
  944. for key, pod := range podMap {
  945. for _, alloc := range pod.Allocations {
  946. if sKeys, ok := podServicesMap[key]; ok {
  947. services := []string{}
  948. for _, sKey := range sKeys {
  949. services = append(services, sKey.Service)
  950. }
  951. alloc.Properties.SetServices(services)
  952. }
  953. }
  954. }
  955. }
  956. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  957. for key, pod := range podMap {
  958. for _, alloc := range pod.Allocations {
  959. if controllerKey, ok := podControllerMap[key]; ok {
  960. alloc.Properties.SetControllerKind(controllerKey.ControllerKind)
  961. alloc.Properties.SetController(controllerKey.Controller)
  962. }
  963. }
  964. }
  965. }
  966. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  967. for _, res := range resNodeCostPerCPUHr {
  968. cluster, err := res.GetString("cluster_id")
  969. if err != nil {
  970. cluster = env.GetClusterID()
  971. }
  972. node, err := res.GetString("node")
  973. if err != nil {
  974. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  975. continue
  976. }
  977. instanceType, err := res.GetString("instance_type")
  978. if err != nil {
  979. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  980. continue
  981. }
  982. key := newNodeKey(cluster, node)
  983. if _, ok := nodeMap[key]; !ok {
  984. nodeMap[key] = &NodePricing{
  985. Name: node,
  986. NodeType: instanceType,
  987. }
  988. }
  989. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  990. }
  991. }
  992. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  993. for _, res := range resNodeCostPerRAMGiBHr {
  994. cluster, err := res.GetString("cluster_id")
  995. if err != nil {
  996. cluster = env.GetClusterID()
  997. }
  998. node, err := res.GetString("node")
  999. if err != nil {
  1000. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1001. continue
  1002. }
  1003. instanceType, err := res.GetString("instance_type")
  1004. if err != nil {
  1005. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  1006. continue
  1007. }
  1008. key := newNodeKey(cluster, node)
  1009. if _, ok := nodeMap[key]; !ok {
  1010. nodeMap[key] = &NodePricing{
  1011. Name: node,
  1012. NodeType: instanceType,
  1013. }
  1014. }
  1015. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1016. }
  1017. }
  1018. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1019. for _, res := range resNodeCostPerGPUHr {
  1020. cluster, err := res.GetString("cluster_id")
  1021. if err != nil {
  1022. cluster = env.GetClusterID()
  1023. }
  1024. node, err := res.GetString("node")
  1025. if err != nil {
  1026. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1027. continue
  1028. }
  1029. instanceType, err := res.GetString("instance_type")
  1030. if err != nil {
  1031. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1032. continue
  1033. }
  1034. key := newNodeKey(cluster, node)
  1035. if _, ok := nodeMap[key]; !ok {
  1036. nodeMap[key] = &NodePricing{
  1037. Name: node,
  1038. NodeType: instanceType,
  1039. }
  1040. }
  1041. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1042. }
  1043. }
  1044. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1045. for _, res := range resNodeIsSpot {
  1046. cluster, err := res.GetString("cluster_id")
  1047. if err != nil {
  1048. cluster = env.GetClusterID()
  1049. }
  1050. node, err := res.GetString("node")
  1051. if err != nil {
  1052. log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1053. continue
  1054. }
  1055. key := newNodeKey(cluster, node)
  1056. if _, ok := nodeMap[key]; !ok {
  1057. log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1058. continue
  1059. }
  1060. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1061. }
  1062. }
  1063. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1064. if cm == nil {
  1065. return
  1066. }
  1067. c, err := cm.Provider.GetConfig()
  1068. if err != nil {
  1069. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1070. return
  1071. }
  1072. discount, err := ParsePercentString(c.Discount)
  1073. if err != nil {
  1074. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1075. return
  1076. }
  1077. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1078. if err != nil {
  1079. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1080. return
  1081. }
  1082. for _, node := range nodeMap {
  1083. // TODO GKE Reserved Instances into account
  1084. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1085. node.CostPerCPUHr *= (1.0 - node.Discount)
  1086. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1087. }
  1088. }
  1089. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1090. for _, res := range resPVCostPerGiBHour {
  1091. cluster, err := res.GetString("cluster_id")
  1092. if err != nil {
  1093. cluster = env.GetClusterID()
  1094. }
  1095. name, err := res.GetString("volumename")
  1096. if err != nil {
  1097. log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
  1098. continue
  1099. }
  1100. key := newPVKey(cluster, name)
  1101. pvMap[key] = &PV{
  1102. Cluster: cluster,
  1103. Name: name,
  1104. CostPerGiBHour: res.Values[0].Value,
  1105. }
  1106. }
  1107. }
  1108. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1109. for _, res := range resPVBytes {
  1110. key, err := resultPVKey(res, "cluster_id", "persistentvolume")
  1111. if err != nil {
  1112. log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1113. continue
  1114. }
  1115. if _, ok := pvMap[key]; !ok {
  1116. log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1117. continue
  1118. }
  1119. pvMap[key].Bytes = res.Values[0].Value
  1120. }
  1121. }
  1122. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1123. for _, res := range resPVCInfo {
  1124. cluster, err := res.GetString("cluster_id")
  1125. if err != nil {
  1126. cluster = env.GetClusterID()
  1127. }
  1128. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1129. if err != nil {
  1130. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1131. continue
  1132. }
  1133. namespace := values["namespace"]
  1134. name := values["persistentvolumeclaim"]
  1135. volume := values["volumename"]
  1136. storageClass := values["storageclass"]
  1137. pvKey := newPVKey(cluster, volume)
  1138. pvcKey := newPVCKey(cluster, namespace, name)
  1139. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1140. // the PVC was running, respectively. We subtract 1m from pvcStart
  1141. // because this point will actually represent the end of the first
  1142. // minute. We don't subtract from pvcEnd because it already represents
  1143. // the end of the last minute.
  1144. var pvcStart, pvcEnd time.Time
  1145. for _, datum := range res.Values {
  1146. t := time.Unix(int64(datum.Timestamp), 0)
  1147. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1148. pvcStart = t
  1149. }
  1150. if datum.Value > 0 && window.Contains(t) {
  1151. pvcEnd = t
  1152. }
  1153. }
  1154. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1155. log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1156. }
  1157. pvcStart = pvcStart.Add(-time.Minute)
  1158. if _, ok := pvMap[pvKey]; !ok {
  1159. // log.DedupedWarningf(10, "CostModel.ComputeAllocation: PV missing for PVC info query result: %s", pvKey)
  1160. continue
  1161. }
  1162. pvMap[pvKey].StorageClass = storageClass
  1163. if _, ok := pvcMap[pvcKey]; !ok {
  1164. pvcMap[pvcKey] = &PVC{}
  1165. }
  1166. pvcMap[pvcKey].Name = name
  1167. pvcMap[pvcKey].Namespace = namespace
  1168. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1169. pvcMap[pvcKey].Start = pvcStart
  1170. pvcMap[pvcKey].End = pvcEnd
  1171. }
  1172. }
  1173. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1174. for _, res := range resPVCBytesRequested {
  1175. key, err := resultPVCKey(res, "cluster_id", "namespace", "persistentvolumeclaim")
  1176. if err != nil {
  1177. // log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC bytes requested query result missing field: %s", err)
  1178. continue
  1179. }
  1180. if _, ok := pvcMap[key]; !ok {
  1181. // log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC bytes requested result for missing PVC: %s", key)
  1182. continue
  1183. }
  1184. pvcMap[key].Bytes = res.Values[0].Value
  1185. }
  1186. }
  1187. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1188. for _, res := range resPodPVCAllocation {
  1189. cluster, err := res.GetString("cluster_id")
  1190. if err != nil {
  1191. cluster = env.GetClusterID()
  1192. }
  1193. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1194. if err != nil {
  1195. log.Warningf("CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1196. continue
  1197. }
  1198. namespace := values["namespace"]
  1199. pod := values["pod"]
  1200. name := values["persistentvolumeclaim"]
  1201. volume := values["persistentvolume"]
  1202. podKey := newPodKey(cluster, namespace, pod)
  1203. pvKey := newPVKey(cluster, volume)
  1204. pvcKey := newPVCKey(cluster, namespace, name)
  1205. if _, ok := pvMap[pvKey]; !ok {
  1206. log.Warningf("CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1207. continue
  1208. }
  1209. if _, ok := podPVCMap[podKey]; !ok {
  1210. podPVCMap[podKey] = []*PVC{}
  1211. }
  1212. pvc, ok := pvcMap[pvcKey]
  1213. if !ok {
  1214. log.Warningf("CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1215. continue
  1216. }
  1217. count := 1
  1218. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1219. count = len(pod.Allocations)
  1220. } else {
  1221. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1222. }
  1223. pvc.Count = count
  1224. pvc.Mounted = true
  1225. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1226. }
  1227. }
  1228. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1229. unmountedPVBytes := map[string]float64{}
  1230. unmountedPVCost := map[string]float64{}
  1231. for _, pv := range pvMap {
  1232. mounted := false
  1233. for _, pvc := range pvcMap {
  1234. if pvc.Volume == nil {
  1235. continue
  1236. }
  1237. if pvc.Volume == pv {
  1238. mounted = true
  1239. break
  1240. }
  1241. }
  1242. if !mounted {
  1243. gib := pv.Bytes / 1024 / 1024 / 1024
  1244. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1245. cost := pv.CostPerGiBHour * gib * hrs
  1246. unmountedPVCost[pv.Cluster] += cost
  1247. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1248. }
  1249. }
  1250. for cluster, amount := range unmountedPVCost {
  1251. container := kubecost.UnmountedSuffix
  1252. pod := kubecost.UnmountedSuffix
  1253. namespace := kubecost.UnmountedSuffix
  1254. node := ""
  1255. key := newPodKey(cluster, namespace, pod)
  1256. podMap[key] = &Pod{
  1257. Window: window.Clone(),
  1258. Start: *window.Start(),
  1259. End: *window.End(),
  1260. Key: key,
  1261. Allocations: map[string]*kubecost.Allocation{},
  1262. }
  1263. podMap[key].AppendContainer(container)
  1264. podMap[key].Allocations[container].Properties.SetCluster(cluster)
  1265. podMap[key].Allocations[container].Properties.SetNode(node)
  1266. podMap[key].Allocations[container].Properties.SetNamespace(namespace)
  1267. podMap[key].Allocations[container].Properties.SetPod(pod)
  1268. podMap[key].Allocations[container].Properties.SetContainer(container)
  1269. podMap[key].Allocations[container].PVByteHours = unmountedPVBytes[cluster] * window.Minutes() / 60.0
  1270. podMap[key].Allocations[container].PVCost = amount
  1271. }
  1272. }
  1273. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1274. unmountedPVCBytes := map[namespaceKey]float64{}
  1275. unmountedPVCCost := map[namespaceKey]float64{}
  1276. for _, pvc := range pvcMap {
  1277. if !pvc.Mounted && pvc.Volume != nil {
  1278. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1279. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1280. hrs := pvc.Minutes() / 60.0
  1281. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1282. unmountedPVCCost[key] += cost
  1283. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1284. }
  1285. }
  1286. for key, amount := range unmountedPVCCost {
  1287. container := kubecost.UnmountedSuffix
  1288. pod := kubecost.UnmountedSuffix
  1289. namespace := key.Namespace
  1290. node := ""
  1291. cluster := key.Cluster
  1292. podKey := newPodKey(cluster, namespace, pod)
  1293. podMap[podKey] = &Pod{
  1294. Window: window.Clone(),
  1295. Start: *window.Start(),
  1296. End: *window.End(),
  1297. Key: podKey,
  1298. Allocations: map[string]*kubecost.Allocation{},
  1299. }
  1300. podMap[podKey].AppendContainer(container)
  1301. podMap[podKey].Allocations[container].Properties.SetCluster(cluster)
  1302. podMap[podKey].Allocations[container].Properties.SetNode(node)
  1303. podMap[podKey].Allocations[container].Properties.SetNamespace(namespace)
  1304. podMap[podKey].Allocations[container].Properties.SetPod(pod)
  1305. podMap[podKey].Allocations[container].Properties.SetContainer(container)
  1306. podMap[podKey].Allocations[container].PVByteHours = unmountedPVCBytes[key] * window.Minutes() / 60.0
  1307. podMap[podKey].Allocations[container].PVCost = amount
  1308. }
  1309. }
  1310. // getNodePricing determines node pricing, given a key and a mapping from keys
  1311. // to their NodePricing instances, as well as the custom pricing configuration
  1312. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1313. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1314. // back on custom pricing as a default.
  1315. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1316. // Find the relevant NodePricing, if it exists. If not, substitute the
  1317. // custom NodePricing as a default.
  1318. node, ok := nodeMap[nodeKey]
  1319. if !ok || node == nil {
  1320. if nodeKey.Node != "" {
  1321. log.Warningf("CostModel: failed to find node for %s", nodeKey)
  1322. }
  1323. return cm.getCustomNodePricing(false)
  1324. }
  1325. // If custom pricing is enabled and can be retrieved, override detected
  1326. // node pricing with the custom values.
  1327. customPricingConfig, err := cm.Provider.GetConfig()
  1328. if err != nil {
  1329. log.Warningf("CostModel: failed to load custom pricing: %s", err)
  1330. }
  1331. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1332. return cm.getCustomNodePricing(node.Preemptible)
  1333. }
  1334. node.Source = "prometheus"
  1335. // If any of the values are NaN or zero, replace them with the custom
  1336. // values as default.
  1337. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1338. // them as strings like this?
  1339. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1340. log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1341. cpuCostStr := customPricingConfig.CPU
  1342. if node.Preemptible {
  1343. cpuCostStr = customPricingConfig.SpotCPU
  1344. }
  1345. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1346. if err != nil {
  1347. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1348. }
  1349. node.CostPerCPUHr = costPerCPUHr
  1350. node.Source += "/customCPU"
  1351. }
  1352. if math.IsNaN(node.CostPerGPUHr) {
  1353. log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1354. gpuCostStr := customPricingConfig.GPU
  1355. if node.Preemptible {
  1356. gpuCostStr = customPricingConfig.SpotGPU
  1357. }
  1358. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1359. if err != nil {
  1360. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1361. }
  1362. node.CostPerGPUHr = costPerGPUHr
  1363. node.Source += "/customGPU"
  1364. }
  1365. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1366. log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1367. ramCostStr := customPricingConfig.RAM
  1368. if node.Preemptible {
  1369. ramCostStr = customPricingConfig.SpotRAM
  1370. }
  1371. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1372. if err != nil {
  1373. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1374. }
  1375. node.CostPerRAMGiBHr = costPerRAMHr
  1376. node.Source += "/customRAM"
  1377. }
  1378. return node
  1379. }
  1380. // getCustomNodePricing converts the CostModel's configured custom pricing
  1381. // values into a NodePricing instance.
  1382. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1383. customPricingConfig, err := cm.Provider.GetConfig()
  1384. if err != nil {
  1385. return nil
  1386. }
  1387. cpuCostStr := customPricingConfig.CPU
  1388. gpuCostStr := customPricingConfig.GPU
  1389. ramCostStr := customPricingConfig.RAM
  1390. if spot {
  1391. cpuCostStr = customPricingConfig.SpotCPU
  1392. gpuCostStr = customPricingConfig.SpotGPU
  1393. ramCostStr = customPricingConfig.SpotRAM
  1394. }
  1395. node := &NodePricing{Source: "custom"}
  1396. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1397. if err != nil {
  1398. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1399. }
  1400. node.CostPerCPUHr = costPerCPUHr
  1401. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1402. if err != nil {
  1403. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1404. }
  1405. node.CostPerGPUHr = costPerGPUHr
  1406. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1407. if err != nil {
  1408. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1409. }
  1410. node.CostPerRAMGiBHr = costPerRAMHr
  1411. return node
  1412. }
  1413. // NodePricing describes the resource costs associated with a given node, as
  1414. // well as the source of the information (e.g. prometheus, custom)
  1415. type NodePricing struct {
  1416. Name string
  1417. NodeType string
  1418. Preemptible bool
  1419. CostPerCPUHr float64
  1420. CostPerRAMGiBHr float64
  1421. CostPerGPUHr float64
  1422. Discount float64
  1423. Source string
  1424. }
  1425. // Pod describes a running pod's start and end time within a Window and
  1426. // all the Allocations (i.e. containers) contained within it.
  1427. type Pod struct {
  1428. Window kubecost.Window
  1429. Start time.Time
  1430. End time.Time
  1431. Key podKey
  1432. Allocations map[string]*kubecost.Allocation
  1433. }
  1434. // AppendContainer adds an entry for the given container name to the Pod.
  1435. func (p Pod) AppendContainer(container string) {
  1436. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1437. alloc := &kubecost.Allocation{
  1438. Name: name,
  1439. Properties: kubecost.Properties{},
  1440. Window: p.Window.Clone(),
  1441. Start: p.Start,
  1442. End: p.End,
  1443. }
  1444. alloc.Properties.SetContainer(container)
  1445. alloc.Properties.SetPod(p.Key.Pod)
  1446. alloc.Properties.SetNamespace(p.Key.Namespace)
  1447. alloc.Properties.SetCluster(p.Key.Cluster)
  1448. p.Allocations[container] = alloc
  1449. }
  1450. // PVC describes a PersistentVolumeClaim
  1451. // TODO:CLEANUP move to pkg/kubecost?
  1452. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1453. type PVC struct {
  1454. Bytes float64 `json:"bytes"`
  1455. Count int `json:"count"`
  1456. Name string `json:"name"`
  1457. Cluster string `json:"cluster"`
  1458. Namespace string `json:"namespace"`
  1459. Volume *PV `json:"persistentVolume"`
  1460. Mounted bool `json:"mounted"`
  1461. Start time.Time `json:"start"`
  1462. End time.Time `json:"end"`
  1463. }
  1464. // Cost computes the cumulative cost of the PVC
  1465. func (pvc *PVC) Cost() float64 {
  1466. if pvc == nil || pvc.Volume == nil {
  1467. return 0.0
  1468. }
  1469. gib := pvc.Bytes / 1024 / 1024 / 1024
  1470. hrs := pvc.Minutes() / 60.0
  1471. return pvc.Volume.CostPerGiBHour * gib * hrs
  1472. }
  1473. // Minutes computes the number of minutes over which the PVC is defined
  1474. func (pvc *PVC) Minutes() float64 {
  1475. if pvc == nil {
  1476. return 0.0
  1477. }
  1478. return pvc.End.Sub(pvc.Start).Minutes()
  1479. }
  1480. // String returns a string representation of the PVC
  1481. func (pvc *PVC) String() string {
  1482. if pvc == nil {
  1483. return "<nil>"
  1484. }
  1485. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  1486. }
  1487. // PV describes a PersistentVolume
  1488. // TODO:CLEANUP move to pkg/kubecost?
  1489. type PV struct {
  1490. Bytes float64 `json:"bytes"`
  1491. CostPerGiBHour float64 `json:"costPerGiBHour"`
  1492. Cluster string `json:"cluster"`
  1493. Name string `json:"name"`
  1494. StorageClass string `json:"storageClass"`
  1495. }
  1496. // String returns a string representation of the PV
  1497. func (pv *PV) String() string {
  1498. if pv == nil {
  1499. return "<nil>"
  1500. }
  1501. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  1502. }