allocation.go 61 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800
  1. package costmodel
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/env"
  10. "github.com/kubecost/cost-model/pkg/kubecost"
  11. "github.com/kubecost/cost-model/pkg/log"
  12. "github.com/kubecost/cost-model/pkg/prom"
  13. "github.com/kubecost/cost-model/pkg/util"
  14. "k8s.io/apimachinery/pkg/labels"
  15. )
  16. const (
  17. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, cluster_id)[%s:%s]%s`
  18. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  19. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  20. queryFmtRAMUsage = `avg(avg_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  21. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  22. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  23. queryFmtCPUUsage = `avg(rate(container_cpu_usage_seconds_total{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
  24. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
  25. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  26. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  27. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type)`
  28. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s]%s)`
  29. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, cluster_id)[%s:%s]%s`
  30. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, cluster_id)`
  31. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, cluster_id)`
  32. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, cluster_id)`
  33. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, cluster_id)`
  34. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  35. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (cluster_id)`
  36. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  37. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (cluster_id)`
  38. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
  39. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (cluster_id)`
  40. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s]%s)`
  41. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s]%s)`
  42. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s]%s)`
  43. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s]%s)`
  44. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s]%s)`
  45. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s]%s)`
  46. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s]%s)`
  47. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, cluster_id)`
  48. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,cluster_id)`
  49. )
  50. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  51. // for the window defined by the given start and end times. The Allocations
  52. // returned are unaggregated (i.e. down to the container level).
  53. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  54. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  55. // 2. Run and apply the results of the remaining queries to
  56. // 3. Build out AllocationSet from completed Pod map
  57. // Create a window spanning the requested query
  58. window := kubecost.NewWindow(&start, &end)
  59. // Create an empty AllocationSet. For safety, in the case of an error, we
  60. // should prefer to return this empty set with the error. (In the case of
  61. // no error, of course we populate the set and return it.)
  62. allocSet := kubecost.NewAllocationSet(start, end)
  63. // (1) Build out Pod map
  64. // Build out a map of Allocations as a mapping from pod-to-container-to-
  65. // underlying-Allocation instance, starting with (start, end) so that we
  66. // begin with minutes, from which we compute resource allocation and cost
  67. // totals from measured rate data.
  68. podMap := map[podKey]*Pod{}
  69. // clusterStarts and clusterEnds record the earliest start and latest end
  70. // times, respectively, on a cluster-basis. These are used for unmounted
  71. // PVs and other "virtual" Allocations so that minutes are maximally
  72. // accurate during start-up or spin-down of a cluster
  73. clusterStart := map[string]time.Time{}
  74. clusterEnd := map[string]time.Time{}
  75. cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
  76. // (2) Run and apply remaining queries
  77. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  78. // including handling Thanos offset
  79. durStr, offStr, err := window.DurationOffsetForPrometheus()
  80. if err != nil {
  81. // Negative duration, so return empty set
  82. return allocSet, nil
  83. }
  84. // Convert resolution duration to a query-ready string
  85. resStr := util.DurationString(resolution)
  86. ctx := prom.NewContext(cm.PrometheusClient)
  87. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr)
  88. resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
  89. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr)
  90. resChRAMRequests := ctx.Query(queryRAMRequests)
  91. queryRAMUsage := fmt.Sprintf(queryFmtRAMUsage, durStr, offStr)
  92. resChRAMUsage := ctx.Query(queryRAMUsage)
  93. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr)
  94. resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
  95. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr)
  96. resChCPURequests := ctx.Query(queryCPURequests)
  97. queryCPUUsage := fmt.Sprintf(queryFmtCPUUsage, durStr, offStr)
  98. resChCPUUsage := ctx.Query(queryCPUUsage)
  99. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr)
  100. resChGPUsRequested := ctx.Query(queryGPUsRequested)
  101. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr)
  102. resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
  103. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr)
  104. resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
  105. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr)
  106. resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
  107. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
  108. resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
  109. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, durStr, resStr, offStr)
  110. resChPVCInfo := ctx.Query(queryPVCInfo)
  111. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr)
  112. resChPVBytes := ctx.Query(queryPVBytes)
  113. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr)
  114. resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
  115. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr)
  116. resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
  117. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr)
  118. resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
  119. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr)
  120. resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
  121. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr)
  122. resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
  123. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr)
  124. resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
  125. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr)
  126. resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
  127. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr)
  128. resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
  129. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr)
  130. resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
  131. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
  132. resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
  133. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
  134. resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
  135. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
  136. resChPodLabels := ctx.Query(queryPodLabels)
  137. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
  138. resChPodAnnotations := ctx.Query(queryPodAnnotations)
  139. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
  140. resChServiceLabels := ctx.Query(queryServiceLabels)
  141. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
  142. resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
  143. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
  144. resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
  145. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr)
  146. resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
  147. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr)
  148. resChJobLabels := ctx.Query(queryJobLabels)
  149. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  150. resCPURequests, _ := resChCPURequests.Await()
  151. resCPUUsage, _ := resChCPUUsage.Await()
  152. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  153. resRAMRequests, _ := resChRAMRequests.Await()
  154. resRAMUsage, _ := resChRAMUsage.Await()
  155. resGPUsRequested, _ := resChGPUsRequested.Await()
  156. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  157. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  158. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  159. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  160. resPVBytes, _ := resChPVBytes.Await()
  161. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  162. resPVCInfo, _ := resChPVCInfo.Await()
  163. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  164. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  165. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  166. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  167. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  168. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  169. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  170. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  171. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  172. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  173. resPodLabels, _ := resChPodLabels.Await()
  174. resPodAnnotations, _ := resChPodAnnotations.Await()
  175. resServiceLabels, _ := resChServiceLabels.Await()
  176. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  177. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  178. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  179. resJobLabels, _ := resChJobLabels.Await()
  180. if ctx.HasErrors() {
  181. for _, err := range ctx.Errors() {
  182. log.Errorf("CostModel.ComputeAllocation: %s", err)
  183. }
  184. return allocSet, ctx.ErrorCollection()
  185. }
  186. applyCPUCoresAllocated(podMap, resCPUCoresAllocated)
  187. applyCPUCoresRequested(podMap, resCPURequests)
  188. applyCPUCoresUsed(podMap, resCPUUsage)
  189. applyRAMBytesAllocated(podMap, resRAMBytesAllocated)
  190. applyRAMBytesRequested(podMap, resRAMRequests)
  191. applyRAMBytesUsed(podMap, resRAMUsage)
  192. applyGPUsRequested(podMap, resGPUsRequested)
  193. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB)
  194. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB)
  195. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB)
  196. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  197. podLabels := resToPodLabels(resPodLabels)
  198. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  199. podAnnotations := resToPodAnnotations(resPodAnnotations)
  200. applyLabels(podMap, namespaceLabels, podLabels)
  201. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  202. serviceLabels := getServiceLabels(resServiceLabels)
  203. applyServicesToPods(podMap, podLabels, serviceLabels)
  204. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  205. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  206. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels)
  207. podJobMap := resToPodJobMap(resJobLabels)
  208. applyControllersToPods(podMap, podDeploymentMap)
  209. applyControllersToPods(podMap, podStatefulSetMap)
  210. applyControllersToPods(podMap, podDaemonSetMap)
  211. applyControllersToPods(podMap, podJobMap)
  212. // TODO breakdown network costs?
  213. // Build out a map of Nodes with resource costs, discounts, and node types
  214. // for converting resource allocation data to cumulative costs.
  215. nodeMap := map[nodeKey]*NodePricing{}
  216. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  217. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  218. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  219. applyNodeSpot(nodeMap, resNodeIsSpot)
  220. applyNodeDiscount(nodeMap, cm)
  221. // Build out the map of all PVs with class, size and cost-per-hour.
  222. // Note: this does not record time running, which we may want to
  223. // include later for increased PV precision. (As long as the PV has
  224. // a PVC, we get time running there, so this is only inaccurate
  225. // for short-lived, unmounted PVs.)
  226. pvMap := map[pvKey]*PV{}
  227. buildPVMap(pvMap, resPVCostPerGiBHour)
  228. applyPVBytes(pvMap, resPVBytes)
  229. // Build out the map of all PVCs with time running, bytes requested,
  230. // and connect to the correct PV from pvMap. (If no PV exists, that
  231. // is noted, but does not result in any allocation/cost.)
  232. pvcMap := map[pvcKey]*PVC{}
  233. buildPVCMap(window, pvcMap, pvMap, resPVCInfo)
  234. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  235. // Build out the relationships of pods to their PVCs. This step
  236. // populates the PVC.Count field so that PVC allocation can be
  237. // split appropriately among each pod's container allocation.
  238. podPVCMap := map[podKey][]*PVC{}
  239. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation)
  240. // Identify unmounted PVs (PVs without PVCs) and add one Allocation per
  241. // cluster representing each cluster's unmounted PVs (if necessary).
  242. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  243. // (3) Build out AllocationSet from Pod map
  244. for _, pod := range podMap {
  245. for _, alloc := range pod.Allocations {
  246. cluster, _ := alloc.Properties.GetCluster()
  247. nodeName, _ := alloc.Properties.GetNode()
  248. namespace, _ := alloc.Properties.GetNamespace()
  249. pod, _ := alloc.Properties.GetPod()
  250. container, _ := alloc.Properties.GetContainer()
  251. podKey := newPodKey(cluster, namespace, pod)
  252. nodeKey := newNodeKey(cluster, nodeName)
  253. node := cm.getNodePricing(nodeMap, nodeKey)
  254. alloc.CPUCost = alloc.CPUCoreHours * node.CostPerCPUHr
  255. alloc.RAMCost = (alloc.RAMByteHours / 1024 / 1024 / 1024) * node.CostPerRAMGiBHr
  256. alloc.GPUCost = alloc.GPUHours * node.CostPerGPUHr
  257. if pvcs, ok := podPVCMap[podKey]; ok {
  258. for _, pvc := range pvcs {
  259. // Determine the (start, end) of the relationship between the
  260. // given PVC and the associated Allocation so that a precise
  261. // number of hours can be used to compute cumulative cost.
  262. s, e := alloc.Start, alloc.End
  263. if pvc.Start.After(alloc.Start) {
  264. s = pvc.Start
  265. }
  266. if pvc.End.Before(alloc.End) {
  267. e = pvc.End
  268. }
  269. minutes := e.Sub(s).Minutes()
  270. hrs := minutes / 60.0
  271. count := float64(pvc.Count)
  272. if pvc.Count < 1 {
  273. count = 1
  274. }
  275. gib := pvc.Bytes / 1024 / 1024 / 1024
  276. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  277. // Apply the size and cost of the PV to the allocation, each
  278. // weighted by count (i.e. the number of containers in the pod)
  279. alloc.PVByteHours += pvc.Bytes * hrs / count
  280. alloc.PVCost += cost / count
  281. }
  282. }
  283. // Make sure that the name is correct (node may not be present at this
  284. // point due to it missing from queryMinutes) then insert.
  285. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, pod, container)
  286. allocSet.Set(alloc)
  287. }
  288. }
  289. return allocSet, nil
  290. }
  291. func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time) error {
  292. // Assumes that window is positive and closed
  293. start, end := *window.Start(), *window.End()
  294. // Convert resolution duration to a query-ready string
  295. resStr := util.DurationString(resolution)
  296. ctx := prom.NewContext(cm.PrometheusClient)
  297. // Query for (start, end) by (pod, namespace, cluster) over the given
  298. // window, using the given resolution, and if necessary in batches no
  299. // larger than the given maximum batch size. If working in batches, track
  300. // overall progress by starting with (window.start, window.start) and
  301. // querying in batches no larger than maxBatchSize from start-to-end,
  302. // folding each result set into podMap as the results come back.
  303. coverage := kubecost.NewWindow(&start, &start)
  304. numQuery := 1
  305. for coverage.End().Before(end) {
  306. // Determine the (start, end) of the current batch
  307. batchStart := *coverage.End()
  308. batchEnd := coverage.End().Add(maxBatchSize)
  309. if batchEnd.After(end) {
  310. batchEnd = end
  311. }
  312. batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
  313. var resPods []*prom.QueryResult
  314. var err error
  315. maxTries := 3
  316. numTries := 0
  317. for resPods == nil && numTries < maxTries {
  318. numTries++
  319. // Convert window (start, end) to (duration, offset) for querying Prometheus,
  320. // including handling Thanos offset
  321. durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
  322. if err != nil || durStr == "" {
  323. // Negative duration, so set empty results and don't query
  324. resPods = []*prom.QueryResult{}
  325. err = nil
  326. break
  327. }
  328. // Submit and profile query
  329. queryPods := fmt.Sprintf(queryFmtPods, durStr, resStr, offStr)
  330. queryProfile := time.Now()
  331. resPods, err = ctx.Query(queryPods).Await()
  332. if err != nil {
  333. log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
  334. resPods = nil
  335. }
  336. }
  337. if err != nil {
  338. return err
  339. }
  340. applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods)
  341. coverage = coverage.ExpandEnd(batchEnd)
  342. numQuery++
  343. }
  344. return nil
  345. }
  346. func applyPodResults(window kubecost.Window, resolution time.Duration, podMap map[podKey]*Pod, clusterStart, clusterEnd map[string]time.Time, resPods []*prom.QueryResult) {
  347. for _, res := range resPods {
  348. if len(res.Values) == 0 {
  349. log.Warningf("CostModel.ComputeAllocation: empty minutes result")
  350. continue
  351. }
  352. cluster, err := res.GetString("cluster_id")
  353. if err != nil {
  354. cluster = env.GetClusterID()
  355. }
  356. labels, err := res.GetStrings("namespace", "pod")
  357. if err != nil {
  358. log.Warningf("CostModel.ComputeAllocation: minutes query result missing field: %s", err)
  359. continue
  360. }
  361. namespace := labels["namespace"]
  362. pod := labels["pod"]
  363. key := newPodKey(cluster, namespace, pod)
  364. // allocStart and allocEnd are the timestamps of the first and last
  365. // minutes the pod was running, respectively. We subtract one resolution
  366. // from allocStart because this point will actually represent the end
  367. // of the first minute. We don't subtract from allocEnd because it
  368. // already represents the end of the last minute.
  369. var allocStart, allocEnd time.Time
  370. startAdjustmentCoeff, endAdjustmentCoeff := 1.0, 1.0
  371. for _, datum := range res.Values {
  372. t := time.Unix(int64(datum.Timestamp), 0)
  373. if allocStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  374. // Set the start timestamp to the earliest non-zero timestamp
  375. allocStart = t
  376. // Record adjustment coefficient, i.e. the portion of the start
  377. // timestamp to "ignore". That is, sometimes the value will be
  378. // 0.5, meaning that we should discount the time running by
  379. // half of the resolution the timestamp stands for.
  380. startAdjustmentCoeff = (1.0 - datum.Value)
  381. }
  382. if datum.Value > 0 && window.Contains(t) {
  383. // Set the end timestamp to the latest non-zero timestamp
  384. allocEnd = t
  385. // Record adjustment coefficient, i.e. the portion of the end
  386. // timestamp to "ignore". (See explanation above for start.)
  387. endAdjustmentCoeff = (1.0 - datum.Value)
  388. }
  389. }
  390. if allocStart.IsZero() || allocEnd.IsZero() {
  391. continue
  392. }
  393. // Adjust timestamps accorind to the resolution and the adjustment
  394. // coefficients, as described above. That is, count the start timestamp
  395. // from the beginning of the resolution, not the end. Then "reduce" the
  396. // start and end by the correct amount, in the case that the "running"
  397. // value of the first or last timestamp was not a full 1.0.
  398. allocStart = allocStart.Add(-resolution)
  399. // Note: the *100 and /100 are necessary because Duration is an int, so
  400. // 0.5, for instance, will be truncated, resulting in no adjustment.
  401. allocStart = allocStart.Add(time.Duration(startAdjustmentCoeff*100) * resolution / time.Duration(100))
  402. allocEnd = allocEnd.Add(-time.Duration(endAdjustmentCoeff*100) * resolution / time.Duration(100))
  403. // If there is only one point with a value <= 0.5 that the start and
  404. // end timestamps both share, then we will enter this case because at
  405. // least half of a resolution will be subtracted from both the start
  406. // and the end. If that is the case, then add back half of each side
  407. // so that the pod is said to run for half a resolution total.
  408. // e.g. For resolution 1m and a value of 0.5 at one timestamp, we'll
  409. // end up with allocEnd == allocStart and each coeff == 0.5. In
  410. // that case, add 0.25m to each side, resulting in 0.5m duration.
  411. if !allocEnd.After(allocStart) {
  412. allocStart = allocStart.Add(-time.Duration(50*startAdjustmentCoeff) * resolution / time.Duration(100))
  413. allocEnd = allocEnd.Add(time.Duration(50*endAdjustmentCoeff) * resolution / time.Duration(100))
  414. }
  415. // Set start if unset or this datum's start time is earlier than the
  416. // current earliest time.
  417. if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
  418. clusterStart[cluster] = allocStart
  419. }
  420. // Set end if unset or this datum's end time is later than the
  421. // current latest time.
  422. if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
  423. clusterEnd[cluster] = allocEnd
  424. }
  425. if pod, ok := podMap[key]; ok {
  426. // Pod has already been recorded, so update it accordingly
  427. if allocStart.Before(pod.Start) {
  428. pod.Start = allocStart
  429. }
  430. if allocEnd.After(pod.End) {
  431. pod.End = allocEnd
  432. }
  433. } else {
  434. // Pod has not been recorded yet, so insert it
  435. podMap[key] = &Pod{
  436. Window: window.Clone(),
  437. Start: allocStart,
  438. End: allocEnd,
  439. Key: key,
  440. Allocations: map[string]*kubecost.Allocation{},
  441. }
  442. }
  443. }
  444. }
  445. func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
  446. for _, res := range resCPUCoresAllocated {
  447. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  448. if err != nil {
  449. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
  450. continue
  451. }
  452. pod, ok := podMap[key]
  453. if !ok {
  454. continue
  455. }
  456. container, err := res.GetString("container")
  457. if err != nil {
  458. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
  459. continue
  460. }
  461. if _, ok := pod.Allocations[container]; !ok {
  462. pod.AppendContainer(container)
  463. }
  464. cpuCores := res.Values[0].Value
  465. hours := pod.Allocations[container].Minutes() / 60.0
  466. pod.Allocations[container].CPUCoreHours = cpuCores * hours
  467. node, err := res.GetString("node")
  468. if err != nil {
  469. log.Warningf("CostModel.ComputeAllocation: CPU allocation query result missing 'node': %s", key)
  470. continue
  471. }
  472. pod.Allocations[container].Properties.SetNode(node)
  473. }
  474. }
  475. func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
  476. for _, res := range resCPUCoresRequested {
  477. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  478. if err != nil {
  479. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
  480. continue
  481. }
  482. pod, ok := podMap[key]
  483. if !ok {
  484. continue
  485. }
  486. container, err := res.GetString("container")
  487. if err != nil {
  488. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
  489. continue
  490. }
  491. if _, ok := pod.Allocations[container]; !ok {
  492. pod.AppendContainer(container)
  493. }
  494. pod.Allocations[container].CPUCoreRequestAverage = res.Values[0].Value
  495. // If CPU allocation is less than requests, set CPUCoreHours to
  496. // request level.
  497. if pod.Allocations[container].CPUCores() < res.Values[0].Value {
  498. pod.Allocations[container].CPUCoreHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  499. }
  500. node, err := res.GetString("node")
  501. if err != nil {
  502. log.Warningf("CostModel.ComputeAllocation: CPU request query result missing 'node': %s", key)
  503. continue
  504. }
  505. pod.Allocations[container].Properties.SetNode(node)
  506. }
  507. }
  508. func applyCPUCoresUsed(podMap map[podKey]*Pod, resCPUCoresUsed []*prom.QueryResult) {
  509. for _, res := range resCPUCoresUsed {
  510. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  511. if err != nil {
  512. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage result missing field: %s", err)
  513. continue
  514. }
  515. pod, ok := podMap[key]
  516. if !ok {
  517. continue
  518. }
  519. container, err := res.GetString("container_name")
  520. if err != nil {
  521. log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage query result missing 'container': %s", key)
  522. continue
  523. }
  524. if _, ok := pod.Allocations[container]; !ok {
  525. pod.AppendContainer(container)
  526. }
  527. pod.Allocations[container].CPUCoreUsageAverage = res.Values[0].Value
  528. }
  529. }
  530. func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
  531. for _, res := range resRAMBytesAllocated {
  532. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  533. if err != nil {
  534. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
  535. continue
  536. }
  537. pod, ok := podMap[key]
  538. if !ok {
  539. continue
  540. }
  541. container, err := res.GetString("container")
  542. if err != nil {
  543. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
  544. continue
  545. }
  546. if _, ok := pod.Allocations[container]; !ok {
  547. pod.AppendContainer(container)
  548. }
  549. ramBytes := res.Values[0].Value
  550. hours := pod.Allocations[container].Minutes() / 60.0
  551. pod.Allocations[container].RAMByteHours = ramBytes * hours
  552. node, err := res.GetString("node")
  553. if err != nil {
  554. log.Warningf("CostModel.ComputeAllocation: RAM allocation query result missing 'node': %s", key)
  555. continue
  556. }
  557. pod.Allocations[container].Properties.SetNode(node)
  558. }
  559. }
  560. func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
  561. for _, res := range resRAMBytesRequested {
  562. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  563. if err != nil {
  564. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
  565. continue
  566. }
  567. pod, ok := podMap[key]
  568. if !ok {
  569. continue
  570. }
  571. container, err := res.GetString("container")
  572. if err != nil {
  573. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
  574. continue
  575. }
  576. if _, ok := pod.Allocations[container]; !ok {
  577. pod.AppendContainer(container)
  578. }
  579. pod.Allocations[container].RAMBytesRequestAverage = res.Values[0].Value
  580. // If RAM allocation is less than requests, set RAMByteHours to
  581. // request level.
  582. if pod.Allocations[container].RAMBytes() < res.Values[0].Value {
  583. pod.Allocations[container].RAMByteHours = res.Values[0].Value * (pod.Allocations[container].Minutes() / 60.0)
  584. }
  585. node, err := res.GetString("node")
  586. if err != nil {
  587. log.Warningf("CostModel.ComputeAllocation: RAM request query result missing 'node': %s", key)
  588. continue
  589. }
  590. pod.Allocations[container].Properties.SetNode(node)
  591. }
  592. }
  593. func applyRAMBytesUsed(podMap map[podKey]*Pod, resRAMBytesUsed []*prom.QueryResult) {
  594. for _, res := range resRAMBytesUsed {
  595. key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  596. if err != nil {
  597. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage result missing field: %s", err)
  598. continue
  599. }
  600. pod, ok := podMap[key]
  601. if !ok {
  602. continue
  603. }
  604. container, err := res.GetString("container_name")
  605. if err != nil {
  606. log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage query result missing 'container': %s", key)
  607. continue
  608. }
  609. if _, ok := pod.Allocations[container]; !ok {
  610. pod.AppendContainer(container)
  611. }
  612. pod.Allocations[container].RAMBytesUsageAverage = res.Values[0].Value
  613. }
  614. }
  615. func applyGPUsRequested(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult) {
  616. for _, res := range resGPUsRequested {
  617. key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  618. if err != nil {
  619. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
  620. continue
  621. }
  622. pod, ok := podMap[key]
  623. if !ok {
  624. continue
  625. }
  626. container, err := res.GetString("container")
  627. if err != nil {
  628. log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
  629. continue
  630. }
  631. if _, ok := pod.Allocations[container]; !ok {
  632. pod.AppendContainer(container)
  633. }
  634. hrs := pod.Allocations[container].Minutes() / 60.0
  635. pod.Allocations[container].GPUHours = res.Values[0].Value * hrs
  636. }
  637. }
  638. func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryResult, resNetworkCostPerGiB []*prom.QueryResult) {
  639. costPerGiBByCluster := map[string]float64{}
  640. for _, res := range resNetworkCostPerGiB {
  641. cluster, err := res.GetString("cluster_id")
  642. if err != nil {
  643. cluster = env.GetClusterID()
  644. }
  645. costPerGiBByCluster[cluster] = res.Values[0].Value
  646. }
  647. for _, res := range resNetworkGiB {
  648. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
  649. if err != nil {
  650. log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
  651. continue
  652. }
  653. pod, ok := podMap[podKey]
  654. if !ok {
  655. continue
  656. }
  657. for _, alloc := range pod.Allocations {
  658. gib := res.Values[0].Value / float64(len(pod.Allocations))
  659. costPerGiB := costPerGiBByCluster[podKey.Cluster]
  660. alloc.NetworkCost = gib * costPerGiB
  661. }
  662. }
  663. }
  664. func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[string]map[string]string {
  665. namespaceLabels := map[string]map[string]string{}
  666. for _, res := range resNamespaceLabels {
  667. namespace, err := res.GetString("namespace")
  668. if err != nil {
  669. continue
  670. }
  671. if _, ok := namespaceLabels[namespace]; !ok {
  672. namespaceLabels[namespace] = map[string]string{}
  673. }
  674. for k, l := range res.GetLabels() {
  675. namespaceLabels[namespace][k] = l
  676. }
  677. }
  678. return namespaceLabels
  679. }
  680. func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]string {
  681. podLabels := map[podKey]map[string]string{}
  682. for _, res := range resPodLabels {
  683. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  684. if err != nil {
  685. continue
  686. }
  687. if _, ok := podLabels[podKey]; !ok {
  688. podLabels[podKey] = map[string]string{}
  689. }
  690. for k, l := range res.GetLabels() {
  691. podLabels[podKey][k] = l
  692. }
  693. }
  694. return podLabels
  695. }
  696. func resToNamespaceAnnotations(resNamespaceAnnotations []*prom.QueryResult) map[string]map[string]string {
  697. namespaceAnnotations := map[string]map[string]string{}
  698. for _, res := range resNamespaceAnnotations {
  699. namespace, err := res.GetString("namespace")
  700. if err != nil {
  701. continue
  702. }
  703. if _, ok := namespaceAnnotations[namespace]; !ok {
  704. namespaceAnnotations[namespace] = map[string]string{}
  705. }
  706. for k, l := range res.GetAnnotations() {
  707. namespaceAnnotations[namespace][k] = l
  708. }
  709. }
  710. return namespaceAnnotations
  711. }
  712. func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[string]string {
  713. podAnnotations := map[podKey]map[string]string{}
  714. for _, res := range resPodAnnotations {
  715. podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
  716. if err != nil {
  717. continue
  718. }
  719. if _, ok := podAnnotations[podKey]; !ok {
  720. podAnnotations[podKey] = map[string]string{}
  721. }
  722. for k, l := range res.GetAnnotations() {
  723. podAnnotations[podKey][k] = l
  724. }
  725. }
  726. return podAnnotations
  727. }
  728. func applyLabels(podMap map[podKey]*Pod, namespaceLabels map[string]map[string]string, podLabels map[podKey]map[string]string) {
  729. for key, pod := range podMap {
  730. for _, alloc := range pod.Allocations {
  731. allocLabels, err := alloc.Properties.GetLabels()
  732. if err != nil {
  733. allocLabels = map[string]string{}
  734. }
  735. // Apply namespace labels first, then pod labels so that pod labels
  736. // overwrite namespace labels.
  737. if labels, ok := namespaceLabels[key.Namespace]; ok {
  738. for k, v := range labels {
  739. allocLabels[k] = v
  740. }
  741. }
  742. if labels, ok := podLabels[key]; ok {
  743. for k, v := range labels {
  744. allocLabels[k] = v
  745. }
  746. }
  747. alloc.Properties.SetLabels(allocLabels)
  748. }
  749. }
  750. }
  751. func applyAnnotations(podMap map[podKey]*Pod, namespaceAnnotations map[string]map[string]string, podAnnotations map[podKey]map[string]string) {
  752. for key, pod := range podMap {
  753. for _, alloc := range pod.Allocations {
  754. allocAnnotations, err := alloc.Properties.GetAnnotations()
  755. if err != nil {
  756. allocAnnotations = map[string]string{}
  757. }
  758. // Apply namespace annotations first, then pod annotations so that
  759. // pod labels overwrite namespace labels.
  760. if labels, ok := namespaceAnnotations[key.Namespace]; ok {
  761. for k, v := range labels {
  762. allocAnnotations[k] = v
  763. }
  764. }
  765. if labels, ok := podAnnotations[key]; ok {
  766. for k, v := range labels {
  767. allocAnnotations[k] = v
  768. }
  769. }
  770. alloc.Properties.SetAnnotations(allocAnnotations)
  771. }
  772. }
  773. }
  774. func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[string]string {
  775. serviceLabels := map[serviceKey]map[string]string{}
  776. for _, res := range resServiceLabels {
  777. serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service")
  778. if err != nil {
  779. continue
  780. }
  781. if _, ok := serviceLabels[serviceKey]; !ok {
  782. serviceLabels[serviceKey] = map[string]string{}
  783. }
  784. for k, l := range res.GetLabels() {
  785. serviceLabels[serviceKey][k] = l
  786. }
  787. }
  788. // Prune duplicate services. That is, if the same service exists with
  789. // hyphens instead of underscores, keep the one that uses hyphens.
  790. for key := range serviceLabels {
  791. duplicateService := strings.Replace(key.Service, "_", "-", -1)
  792. duplicateKey := newServiceKey(key.Cluster, key.Namespace, duplicateService)
  793. if _, ok := serviceLabels[duplicateKey]; ok {
  794. delete(serviceLabels, key)
  795. }
  796. }
  797. return serviceLabels
  798. }
  799. func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  800. deploymentLabels := map[controllerKey]map[string]string{}
  801. for _, res := range resDeploymentLabels {
  802. controllerKey, err := resultDeploymentKey(res, "cluster_id", "namespace", "deployment")
  803. if err != nil {
  804. continue
  805. }
  806. if _, ok := deploymentLabels[controllerKey]; !ok {
  807. deploymentLabels[controllerKey] = map[string]string{}
  808. }
  809. for k, l := range res.GetLabels() {
  810. deploymentLabels[controllerKey][k] = l
  811. }
  812. }
  813. // Prune duplicate deployments. That is, if the same deployment exists with
  814. // hyphens instead of underscores, keep the one that uses hyphens.
  815. for key := range deploymentLabels {
  816. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  817. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  818. if _, ok := deploymentLabels[duplicateKey]; ok {
  819. delete(deploymentLabels, key)
  820. }
  821. }
  822. return deploymentLabels
  823. }
  824. func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[controllerKey]map[string]string {
  825. statefulSetLabels := map[controllerKey]map[string]string{}
  826. for _, res := range resStatefulSetLabels {
  827. controllerKey, err := resultStatefulSetKey(res, "cluster_id", "namespace", "statefulSet")
  828. if err != nil {
  829. continue
  830. }
  831. if _, ok := statefulSetLabels[controllerKey]; !ok {
  832. statefulSetLabels[controllerKey] = map[string]string{}
  833. }
  834. for k, l := range res.GetLabels() {
  835. statefulSetLabels[controllerKey][k] = l
  836. }
  837. }
  838. // Prune duplicate stateful sets. That is, if the same stateful set exists
  839. // with hyphens instead of underscores, keep the one that uses hyphens.
  840. for key := range statefulSetLabels {
  841. duplicateController := strings.Replace(key.Controller, "_", "-", -1)
  842. duplicateKey := newControllerKey(key.Cluster, key.Namespace, key.ControllerKind, duplicateController)
  843. if _, ok := statefulSetLabels[duplicateKey]; ok {
  844. delete(statefulSetLabels, key)
  845. }
  846. }
  847. return statefulSetLabels
  848. }
  849. func labelsToPodControllerMap(podLabels map[podKey]map[string]string, controllerLabels map[controllerKey]map[string]string) map[podKey]controllerKey {
  850. podControllerMap := map[podKey]controllerKey{}
  851. // For each controller, turn the labels into a selector and attempt to
  852. // match it with each set of pod labels. A match indicates that the pod
  853. // belongs to the controller.
  854. for cKey, cLabels := range controllerLabels {
  855. selector := labels.Set(cLabels).AsSelectorPreValidated()
  856. for pKey, pLabels := range podLabels {
  857. // If the pod is in a different cluster or namespace, there is
  858. // no need to compare the labels.
  859. if cKey.Cluster != pKey.Cluster || cKey.Namespace != pKey.Namespace {
  860. continue
  861. }
  862. podLabelSet := labels.Set(pLabels)
  863. if selector.Matches(podLabelSet) {
  864. if _, ok := podControllerMap[pKey]; ok {
  865. log.Warningf("CostModel.ComputeAllocation: PodControllerMap match already exists: %s matches %s and %s", pKey, podControllerMap[pKey], cKey)
  866. }
  867. podControllerMap[pKey] = cKey
  868. }
  869. }
  870. }
  871. return podControllerMap
  872. }
  873. func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]controllerKey {
  874. daemonSetLabels := map[podKey]controllerKey{}
  875. for _, res := range resDaemonSetLabels {
  876. controllerKey, err := resultDaemonSetKey(res, "cluster_id", "namespace", "owner_name")
  877. if err != nil {
  878. continue
  879. }
  880. pod, err := res.GetString("pod")
  881. if err != nil {
  882. log.Warningf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
  883. }
  884. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  885. daemonSetLabels[podKey] = controllerKey
  886. }
  887. return daemonSetLabels
  888. }
  889. func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
  890. jobLabels := map[podKey]controllerKey{}
  891. for _, res := range resJobLabels {
  892. controllerKey, err := resultJobKey(res, "cluster_id", "namespace", "owner_name")
  893. if err != nil {
  894. continue
  895. }
  896. // Convert the name of Jobs generated by CronJobs to the name of the
  897. // CronJob by stripping the timestamp off the end.
  898. match := isCron.FindStringSubmatch(controllerKey.Controller)
  899. if match != nil {
  900. controllerKey.Controller = match[1]
  901. }
  902. pod, err := res.GetString("pod")
  903. if err != nil {
  904. log.Warningf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
  905. }
  906. podKey := newPodKey(controllerKey.Cluster, controllerKey.Namespace, pod)
  907. jobLabels[podKey] = controllerKey
  908. }
  909. return jobLabels
  910. }
  911. func applyServicesToPods(podMap map[podKey]*Pod, podLabels map[podKey]map[string]string, serviceLabels map[serviceKey]map[string]string) {
  912. podServicesMap := map[podKey][]serviceKey{}
  913. // For each service, turn the labels into a selector and attempt to
  914. // match it with each set of pod labels. A match indicates that the pod
  915. // belongs to the service.
  916. for sKey, sLabels := range serviceLabels {
  917. selector := labels.Set(sLabels).AsSelectorPreValidated()
  918. for pKey, pLabels := range podLabels {
  919. // If the pod is in a different cluster or namespace, there is
  920. // no need to compare the labels.
  921. if sKey.Cluster != pKey.Cluster || sKey.Namespace != pKey.Namespace {
  922. continue
  923. }
  924. podLabelSet := labels.Set(pLabels)
  925. if selector.Matches(podLabelSet) {
  926. if _, ok := podServicesMap[pKey]; !ok {
  927. podServicesMap[pKey] = []serviceKey{}
  928. }
  929. podServicesMap[pKey] = append(podServicesMap[pKey], sKey)
  930. }
  931. }
  932. }
  933. // For each allocation in each pod, attempt to find and apply the list of
  934. // services associated with the allocation's pod.
  935. for key, pod := range podMap {
  936. for _, alloc := range pod.Allocations {
  937. if sKeys, ok := podServicesMap[key]; ok {
  938. services := []string{}
  939. for _, sKey := range sKeys {
  940. services = append(services, sKey.Service)
  941. }
  942. alloc.Properties.SetServices(services)
  943. }
  944. }
  945. }
  946. }
  947. func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]controllerKey) {
  948. for key, pod := range podMap {
  949. for _, alloc := range pod.Allocations {
  950. if controllerKey, ok := podControllerMap[key]; ok {
  951. alloc.Properties.SetControllerKind(controllerKey.ControllerKind)
  952. alloc.Properties.SetController(controllerKey.Controller)
  953. }
  954. }
  955. }
  956. }
  957. func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult) {
  958. for _, res := range resNodeCostPerCPUHr {
  959. cluster, err := res.GetString("cluster_id")
  960. if err != nil {
  961. cluster = env.GetClusterID()
  962. }
  963. node, err := res.GetString("node")
  964. if err != nil {
  965. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  966. continue
  967. }
  968. instanceType, err := res.GetString("instance_type")
  969. if err != nil {
  970. log.Warningf("CostModel.ComputeAllocation: Node CPU cost query result missing field: %s", err)
  971. continue
  972. }
  973. key := newNodeKey(cluster, node)
  974. if _, ok := nodeMap[key]; !ok {
  975. nodeMap[key] = &NodePricing{
  976. Name: node,
  977. NodeType: instanceType,
  978. }
  979. }
  980. nodeMap[key].CostPerCPUHr = res.Values[0].Value
  981. }
  982. }
  983. func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult) {
  984. for _, res := range resNodeCostPerRAMGiBHr {
  985. cluster, err := res.GetString("cluster_id")
  986. if err != nil {
  987. cluster = env.GetClusterID()
  988. }
  989. node, err := res.GetString("node")
  990. if err != nil {
  991. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  992. continue
  993. }
  994. instanceType, err := res.GetString("instance_type")
  995. if err != nil {
  996. log.Warningf("CostModel.ComputeAllocation: Node RAM cost query result missing field: %s", err)
  997. continue
  998. }
  999. key := newNodeKey(cluster, node)
  1000. if _, ok := nodeMap[key]; !ok {
  1001. nodeMap[key] = &NodePricing{
  1002. Name: node,
  1003. NodeType: instanceType,
  1004. }
  1005. }
  1006. nodeMap[key].CostPerRAMGiBHr = res.Values[0].Value
  1007. }
  1008. }
  1009. func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult) {
  1010. for _, res := range resNodeCostPerGPUHr {
  1011. cluster, err := res.GetString("cluster_id")
  1012. if err != nil {
  1013. cluster = env.GetClusterID()
  1014. }
  1015. node, err := res.GetString("node")
  1016. if err != nil {
  1017. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1018. continue
  1019. }
  1020. instanceType, err := res.GetString("instance_type")
  1021. if err != nil {
  1022. log.Warningf("CostModel.ComputeAllocation: Node GPU cost query result missing field: %s", err)
  1023. continue
  1024. }
  1025. key := newNodeKey(cluster, node)
  1026. if _, ok := nodeMap[key]; !ok {
  1027. nodeMap[key] = &NodePricing{
  1028. Name: node,
  1029. NodeType: instanceType,
  1030. }
  1031. }
  1032. nodeMap[key].CostPerGPUHr = res.Values[0].Value
  1033. }
  1034. }
  1035. func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
  1036. for _, res := range resNodeIsSpot {
  1037. cluster, err := res.GetString("cluster_id")
  1038. if err != nil {
  1039. cluster = env.GetClusterID()
  1040. }
  1041. node, err := res.GetString("node")
  1042. if err != nil {
  1043. log.Warningf("CostModel.ComputeAllocation: Node spot query result missing field: %s", err)
  1044. continue
  1045. }
  1046. key := newNodeKey(cluster, node)
  1047. if _, ok := nodeMap[key]; !ok {
  1048. log.Warningf("CostModel.ComputeAllocation: Node spot query result for missing node: %s", key)
  1049. continue
  1050. }
  1051. nodeMap[key].Preemptible = res.Values[0].Value > 0
  1052. }
  1053. }
  1054. func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
  1055. if cm == nil {
  1056. return
  1057. }
  1058. c, err := cm.Provider.GetConfig()
  1059. if err != nil {
  1060. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1061. return
  1062. }
  1063. discount, err := ParsePercentString(c.Discount)
  1064. if err != nil {
  1065. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1066. return
  1067. }
  1068. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  1069. if err != nil {
  1070. log.Errorf("CostModel.ComputeAllocation: applyNodeDiscount: %s", err)
  1071. return
  1072. }
  1073. for _, node := range nodeMap {
  1074. // TODO GKE Reserved Instances into account
  1075. node.Discount = cm.Provider.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  1076. node.CostPerCPUHr *= (1.0 - node.Discount)
  1077. node.CostPerRAMGiBHr *= (1.0 - node.Discount)
  1078. }
  1079. }
  1080. func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
  1081. for _, res := range resPVCostPerGiBHour {
  1082. cluster, err := res.GetString("cluster_id")
  1083. if err != nil {
  1084. cluster = env.GetClusterID()
  1085. }
  1086. name, err := res.GetString("volumename")
  1087. if err != nil {
  1088. log.Warningf("CostModel.ComputeAllocation: PV cost without volumename")
  1089. continue
  1090. }
  1091. key := newPVKey(cluster, name)
  1092. pvMap[key] = &PV{
  1093. Cluster: cluster,
  1094. Name: name,
  1095. CostPerGiBHour: res.Values[0].Value,
  1096. }
  1097. }
  1098. }
  1099. func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
  1100. for _, res := range resPVBytes {
  1101. key, err := resultPVKey(res, "cluster_id", "persistentvolume")
  1102. if err != nil {
  1103. log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
  1104. continue
  1105. }
  1106. if _, ok := pvMap[key]; !ok {
  1107. log.Warningf("CostModel.ComputeAllocation: PV bytes result for missing PV: %s", err)
  1108. continue
  1109. }
  1110. pvMap[key].Bytes = res.Values[0].Value
  1111. }
  1112. }
  1113. func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
  1114. for _, res := range resPVCInfo {
  1115. cluster, err := res.GetString("cluster_id")
  1116. if err != nil {
  1117. cluster = env.GetClusterID()
  1118. }
  1119. values, err := res.GetStrings("persistentvolumeclaim", "storageclass", "volumename", "namespace")
  1120. if err != nil {
  1121. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC info query result missing field: %s", err)
  1122. continue
  1123. }
  1124. namespace := values["namespace"]
  1125. name := values["persistentvolumeclaim"]
  1126. volume := values["volumename"]
  1127. storageClass := values["storageclass"]
  1128. pvKey := newPVKey(cluster, volume)
  1129. pvcKey := newPVCKey(cluster, namespace, name)
  1130. // pvcStart and pvcEnd are the timestamps of the first and last minutes
  1131. // the PVC was running, respectively. We subtract 1m from pvcStart
  1132. // because this point will actually represent the end of the first
  1133. // minute. We don't subtract from pvcEnd because it already represents
  1134. // the end of the last minute.
  1135. var pvcStart, pvcEnd time.Time
  1136. for _, datum := range res.Values {
  1137. t := time.Unix(int64(datum.Timestamp), 0)
  1138. if pvcStart.IsZero() && datum.Value > 0 && window.Contains(t) {
  1139. pvcStart = t
  1140. }
  1141. if datum.Value > 0 && window.Contains(t) {
  1142. pvcEnd = t
  1143. }
  1144. }
  1145. if pvcStart.IsZero() || pvcEnd.IsZero() {
  1146. log.Warningf("CostModel.ComputeAllocation: PVC %s has no running time", pvcKey)
  1147. }
  1148. pvcStart = pvcStart.Add(-time.Minute)
  1149. if _, ok := pvMap[pvKey]; !ok {
  1150. continue
  1151. }
  1152. pvMap[pvKey].StorageClass = storageClass
  1153. if _, ok := pvcMap[pvcKey]; !ok {
  1154. pvcMap[pvcKey] = &PVC{}
  1155. }
  1156. pvcMap[pvcKey].Name = name
  1157. pvcMap[pvcKey].Namespace = namespace
  1158. pvcMap[pvcKey].Volume = pvMap[pvKey]
  1159. pvcMap[pvcKey].Start = pvcStart
  1160. pvcMap[pvcKey].End = pvcEnd
  1161. }
  1162. }
  1163. func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
  1164. for _, res := range resPVCBytesRequested {
  1165. key, err := resultPVCKey(res, "cluster_id", "namespace", "persistentvolumeclaim")
  1166. if err != nil {
  1167. continue
  1168. }
  1169. if _, ok := pvcMap[key]; !ok {
  1170. continue
  1171. }
  1172. pvcMap[key].Bytes = res.Values[0].Value
  1173. }
  1174. }
  1175. func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
  1176. for _, res := range resPodPVCAllocation {
  1177. cluster, err := res.GetString("cluster_id")
  1178. if err != nil {
  1179. cluster = env.GetClusterID()
  1180. }
  1181. values, err := res.GetStrings("persistentvolume", "persistentvolumeclaim", "pod", "namespace")
  1182. if err != nil {
  1183. log.Warningf("CostModel.ComputeAllocation: PVC allocation query result missing field: %s", err)
  1184. continue
  1185. }
  1186. namespace := values["namespace"]
  1187. pod := values["pod"]
  1188. name := values["persistentvolumeclaim"]
  1189. volume := values["persistentvolume"]
  1190. podKey := newPodKey(cluster, namespace, pod)
  1191. pvKey := newPVKey(cluster, volume)
  1192. pvcKey := newPVCKey(cluster, namespace, name)
  1193. if _, ok := pvMap[pvKey]; !ok {
  1194. log.Warningf("CostModel.ComputeAllocation: PV missing for PVC allocation query result: %s", pvKey)
  1195. continue
  1196. }
  1197. if _, ok := podPVCMap[podKey]; !ok {
  1198. podPVCMap[podKey] = []*PVC{}
  1199. }
  1200. pvc, ok := pvcMap[pvcKey]
  1201. if !ok {
  1202. log.Warningf("CostModel.ComputeAllocation: PVC missing for PVC allocation query: %s", pvcKey)
  1203. continue
  1204. }
  1205. count := 1
  1206. if pod, ok := podMap[podKey]; ok && len(pod.Allocations) > 0 {
  1207. count = len(pod.Allocations)
  1208. } else {
  1209. log.DedupedWarningf(10, "CostModel.ComputeAllocation: PVC %s for missing pod %s", pvcKey, podKey)
  1210. }
  1211. pvc.Count = count
  1212. pvc.Mounted = true
  1213. podPVCMap[podKey] = append(podPVCMap[podKey], pvc)
  1214. }
  1215. }
  1216. func applyUnmountedPVs(window kubecost.Window, podMap map[podKey]*Pod, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC) {
  1217. unmountedPVBytes := map[string]float64{}
  1218. unmountedPVCost := map[string]float64{}
  1219. for _, pv := range pvMap {
  1220. mounted := false
  1221. for _, pvc := range pvcMap {
  1222. if pvc.Volume == nil {
  1223. continue
  1224. }
  1225. if pvc.Volume == pv {
  1226. mounted = true
  1227. break
  1228. }
  1229. }
  1230. if !mounted {
  1231. gib := pv.Bytes / 1024 / 1024 / 1024
  1232. hrs := window.Minutes() / 60.0 // TODO improve with PV hours, not window hours
  1233. cost := pv.CostPerGiBHour * gib * hrs
  1234. unmountedPVCost[pv.Cluster] += cost
  1235. unmountedPVBytes[pv.Cluster] += pv.Bytes
  1236. }
  1237. }
  1238. for cluster, amount := range unmountedPVCost {
  1239. container := kubecost.UnmountedSuffix
  1240. pod := kubecost.UnmountedSuffix
  1241. namespace := kubecost.UnmountedSuffix
  1242. node := ""
  1243. key := newPodKey(cluster, namespace, pod)
  1244. podMap[key] = &Pod{
  1245. Window: window.Clone(),
  1246. Start: *window.Start(),
  1247. End: *window.End(),
  1248. Key: key,
  1249. Allocations: map[string]*kubecost.Allocation{},
  1250. }
  1251. podMap[key].AppendContainer(container)
  1252. podMap[key].Allocations[container].Properties.SetCluster(cluster)
  1253. podMap[key].Allocations[container].Properties.SetNode(node)
  1254. podMap[key].Allocations[container].Properties.SetNamespace(namespace)
  1255. podMap[key].Allocations[container].Properties.SetPod(pod)
  1256. podMap[key].Allocations[container].Properties.SetContainer(container)
  1257. podMap[key].Allocations[container].PVByteHours = unmountedPVBytes[cluster] * window.Minutes() / 60.0
  1258. podMap[key].Allocations[container].PVCost = amount
  1259. }
  1260. }
  1261. func applyUnmountedPVCs(window kubecost.Window, podMap map[podKey]*Pod, pvcMap map[pvcKey]*PVC) {
  1262. unmountedPVCBytes := map[namespaceKey]float64{}
  1263. unmountedPVCCost := map[namespaceKey]float64{}
  1264. for _, pvc := range pvcMap {
  1265. if !pvc.Mounted && pvc.Volume != nil {
  1266. key := newNamespaceKey(pvc.Cluster, pvc.Namespace)
  1267. gib := pvc.Volume.Bytes / 1024 / 1024 / 1024
  1268. hrs := pvc.Minutes() / 60.0
  1269. cost := pvc.Volume.CostPerGiBHour * gib * hrs
  1270. unmountedPVCCost[key] += cost
  1271. unmountedPVCBytes[key] += pvc.Volume.Bytes
  1272. }
  1273. }
  1274. for key, amount := range unmountedPVCCost {
  1275. container := kubecost.UnmountedSuffix
  1276. pod := kubecost.UnmountedSuffix
  1277. namespace := key.Namespace
  1278. node := ""
  1279. cluster := key.Cluster
  1280. podKey := newPodKey(cluster, namespace, pod)
  1281. podMap[podKey] = &Pod{
  1282. Window: window.Clone(),
  1283. Start: *window.Start(),
  1284. End: *window.End(),
  1285. Key: podKey,
  1286. Allocations: map[string]*kubecost.Allocation{},
  1287. }
  1288. podMap[podKey].AppendContainer(container)
  1289. podMap[podKey].Allocations[container].Properties.SetCluster(cluster)
  1290. podMap[podKey].Allocations[container].Properties.SetNode(node)
  1291. podMap[podKey].Allocations[container].Properties.SetNamespace(namespace)
  1292. podMap[podKey].Allocations[container].Properties.SetPod(pod)
  1293. podMap[podKey].Allocations[container].Properties.SetContainer(container)
  1294. podMap[podKey].Allocations[container].PVByteHours = unmountedPVCBytes[key] * window.Minutes() / 60.0
  1295. podMap[podKey].Allocations[container].PVCost = amount
  1296. }
  1297. }
  1298. // getNodePricing determines node pricing, given a key and a mapping from keys
  1299. // to their NodePricing instances, as well as the custom pricing configuration
  1300. // inherent to the CostModel instance. If custom pricing is set, use that. If
  1301. // not, use the pricing defined by the given key. If that doesn't exist, fall
  1302. // back on custom pricing as a default.
  1303. func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*NodePricing, nodeKey nodeKey) *NodePricing {
  1304. // Find the relevant NodePricing, if it exists. If not, substitute the
  1305. // custom NodePricing as a default.
  1306. node, ok := nodeMap[nodeKey]
  1307. if !ok || node == nil {
  1308. if nodeKey.Node != "" {
  1309. log.Warningf("CostModel: failed to find node for %s", nodeKey)
  1310. }
  1311. return cm.getCustomNodePricing(false)
  1312. }
  1313. // If custom pricing is enabled and can be retrieved, override detected
  1314. // node pricing with the custom values.
  1315. customPricingConfig, err := cm.Provider.GetConfig()
  1316. if err != nil {
  1317. log.Warningf("CostModel: failed to load custom pricing: %s", err)
  1318. }
  1319. if cloud.CustomPricesEnabled(cm.Provider) && customPricingConfig != nil {
  1320. return cm.getCustomNodePricing(node.Preemptible)
  1321. }
  1322. node.Source = "prometheus"
  1323. // If any of the values are NaN or zero, replace them with the custom
  1324. // values as default.
  1325. // TODO:CLEANUP can't we parse these custom prices once? why do we store
  1326. // them as strings like this?
  1327. if node.CostPerCPUHr == 0 || math.IsNaN(node.CostPerCPUHr) {
  1328. log.Warningf("CostModel: node pricing has illegal CostPerCPUHr; replacing with custom pricing: %s", nodeKey)
  1329. cpuCostStr := customPricingConfig.CPU
  1330. if node.Preemptible {
  1331. cpuCostStr = customPricingConfig.SpotCPU
  1332. }
  1333. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1334. if err != nil {
  1335. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1336. }
  1337. node.CostPerCPUHr = costPerCPUHr
  1338. node.Source += "/customCPU"
  1339. }
  1340. if math.IsNaN(node.CostPerGPUHr) {
  1341. log.Warningf("CostModel: node pricing has illegal CostPerGPUHr; replacing with custom pricing: %s", nodeKey)
  1342. gpuCostStr := customPricingConfig.GPU
  1343. if node.Preemptible {
  1344. gpuCostStr = customPricingConfig.SpotGPU
  1345. }
  1346. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1347. if err != nil {
  1348. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1349. }
  1350. node.CostPerGPUHr = costPerGPUHr
  1351. node.Source += "/customGPU"
  1352. }
  1353. if node.CostPerRAMGiBHr == 0 || math.IsNaN(node.CostPerRAMGiBHr) {
  1354. log.Warningf("CostModel: node pricing has illegal CostPerRAMHr; replacing with custom pricing: %s", nodeKey)
  1355. ramCostStr := customPricingConfig.RAM
  1356. if node.Preemptible {
  1357. ramCostStr = customPricingConfig.SpotRAM
  1358. }
  1359. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1360. if err != nil {
  1361. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1362. }
  1363. node.CostPerRAMGiBHr = costPerRAMHr
  1364. node.Source += "/customRAM"
  1365. }
  1366. return node
  1367. }
  1368. // getCustomNodePricing converts the CostModel's configured custom pricing
  1369. // values into a NodePricing instance.
  1370. func (cm *CostModel) getCustomNodePricing(spot bool) *NodePricing {
  1371. customPricingConfig, err := cm.Provider.GetConfig()
  1372. if err != nil {
  1373. return nil
  1374. }
  1375. cpuCostStr := customPricingConfig.CPU
  1376. gpuCostStr := customPricingConfig.GPU
  1377. ramCostStr := customPricingConfig.RAM
  1378. if spot {
  1379. cpuCostStr = customPricingConfig.SpotCPU
  1380. gpuCostStr = customPricingConfig.SpotGPU
  1381. ramCostStr = customPricingConfig.SpotRAM
  1382. }
  1383. node := &NodePricing{Source: "custom"}
  1384. costPerCPUHr, err := strconv.ParseFloat(cpuCostStr, 64)
  1385. if err != nil {
  1386. log.Warningf("CostModel: custom pricing has illegal CPU cost: %s", cpuCostStr)
  1387. }
  1388. node.CostPerCPUHr = costPerCPUHr
  1389. costPerGPUHr, err := strconv.ParseFloat(gpuCostStr, 64)
  1390. if err != nil {
  1391. log.Warningf("CostModel: custom pricing has illegal GPU cost: %s", gpuCostStr)
  1392. }
  1393. node.CostPerGPUHr = costPerGPUHr
  1394. costPerRAMHr, err := strconv.ParseFloat(ramCostStr, 64)
  1395. if err != nil {
  1396. log.Warningf("CostModel: custom pricing has illegal RAM cost: %s", ramCostStr)
  1397. }
  1398. node.CostPerRAMGiBHr = costPerRAMHr
  1399. return node
  1400. }
  1401. // NodePricing describes the resource costs associated with a given node, as
  1402. // well as the source of the information (e.g. prometheus, custom)
  1403. type NodePricing struct {
  1404. Name string
  1405. NodeType string
  1406. Preemptible bool
  1407. CostPerCPUHr float64
  1408. CostPerRAMGiBHr float64
  1409. CostPerGPUHr float64
  1410. Discount float64
  1411. Source string
  1412. }
  1413. // Pod describes a running pod's start and end time within a Window and
  1414. // all the Allocations (i.e. containers) contained within it.
  1415. type Pod struct {
  1416. Window kubecost.Window
  1417. Start time.Time
  1418. End time.Time
  1419. Key podKey
  1420. Allocations map[string]*kubecost.Allocation
  1421. }
  1422. // AppendContainer adds an entry for the given container name to the Pod.
  1423. func (p Pod) AppendContainer(container string) {
  1424. name := fmt.Sprintf("%s/%s/%s/%s", p.Key.Cluster, p.Key.Namespace, p.Key.Pod, container)
  1425. alloc := &kubecost.Allocation{
  1426. Name: name,
  1427. Properties: kubecost.Properties{},
  1428. Window: p.Window.Clone(),
  1429. Start: p.Start,
  1430. End: p.End,
  1431. }
  1432. alloc.Properties.SetContainer(container)
  1433. alloc.Properties.SetPod(p.Key.Pod)
  1434. alloc.Properties.SetNamespace(p.Key.Namespace)
  1435. alloc.Properties.SetCluster(p.Key.Cluster)
  1436. p.Allocations[container] = alloc
  1437. }
  1438. // PVC describes a PersistentVolumeClaim
  1439. // TODO:CLEANUP move to pkg/kubecost?
  1440. // TODO:CLEANUP add PersistentVolumeClaims field to type Allocation?
  1441. type PVC struct {
  1442. Bytes float64 `json:"bytes"`
  1443. Count int `json:"count"`
  1444. Name string `json:"name"`
  1445. Cluster string `json:"cluster"`
  1446. Namespace string `json:"namespace"`
  1447. Volume *PV `json:"persistentVolume"`
  1448. Mounted bool `json:"mounted"`
  1449. Start time.Time `json:"start"`
  1450. End time.Time `json:"end"`
  1451. }
  1452. // Cost computes the cumulative cost of the PVC
  1453. func (pvc *PVC) Cost() float64 {
  1454. if pvc == nil || pvc.Volume == nil {
  1455. return 0.0
  1456. }
  1457. gib := pvc.Bytes / 1024 / 1024 / 1024
  1458. hrs := pvc.Minutes() / 60.0
  1459. return pvc.Volume.CostPerGiBHour * gib * hrs
  1460. }
  1461. // Minutes computes the number of minutes over which the PVC is defined
  1462. func (pvc *PVC) Minutes() float64 {
  1463. if pvc == nil {
  1464. return 0.0
  1465. }
  1466. return pvc.End.Sub(pvc.Start).Minutes()
  1467. }
  1468. // String returns a string representation of the PVC
  1469. func (pvc *PVC) String() string {
  1470. if pvc == nil {
  1471. return "<nil>"
  1472. }
  1473. return fmt.Sprintf("%s/%s/%s{Bytes:%.2f, Cost:%.6f, Start,End:%s}", pvc.Cluster, pvc.Namespace, pvc.Name, pvc.Bytes, pvc.Cost(), kubecost.NewWindow(&pvc.Start, &pvc.End))
  1474. }
  1475. // PV describes a PersistentVolume
  1476. // TODO:CLEANUP move to pkg/kubecost?
  1477. type PV struct {
  1478. Bytes float64 `json:"bytes"`
  1479. CostPerGiBHour float64 `json:"costPerGiBHour"`
  1480. Cluster string `json:"cluster"`
  1481. Name string `json:"name"`
  1482. StorageClass string `json:"storageClass"`
  1483. }
  1484. // String returns a string representation of the PV
  1485. func (pv *PV) String() string {
  1486. if pv == nil {
  1487. return "<nil>"
  1488. }
  1489. return fmt.Sprintf("%s/%s{Bytes:%.2f, Cost/GiB*Hr:%.6f, StorageClass:%s}", pv.Cluster, pv.Name, pv.Bytes, pv.CostPerGiBHour, pv.StorageClass)
  1490. }